diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java index b7a5732d0..52d27889d 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java @@ -5,17 +5,20 @@ package net.i2p.client.streaming; import java.io.IOException; +import java.io.InputStream; import java.io.InterruptedIOException; import java.net.ConnectException; import java.net.NoRouteToHostException; import java.net.ServerSocket; import java.net.Socket; +import java.util.List; import java.util.Properties; import java.util.Set; import net.i2p.I2PAppContext; import net.i2p.I2PException; import net.i2p.client.I2PSession; +import net.i2p.client.I2PSessionException; import net.i2p.data.Destination; @@ -34,6 +37,26 @@ public interface I2PSocketManager { */ public I2PSession getSession(); + /** + * @return a new subsession, non-null + * @param privateKeyStream null for transient, if non-null must have same encryption keys as primary session + * and different signing keys + * @param opts subsession options if any, may be null + * @since 0.9.19 + */ + public I2PSession addSubsession(InputStream privateKeyStream, Properties opts) throws I2PSessionException; + + /** + * @since 0.9.19 + */ + public void removeSubsession(I2PSession session); + + /** + * @return a list of subsessions, non-null, does not include the primary session + * @since 0.9.19 + */ + public List getSubsessions(); + /** * How long should we wait for the client to .accept() a socket before * sending back a NACK/Close? diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketManagerFull.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketManagerFull.java index 41396edb8..c0d25efbb 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketManagerFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketManagerFull.java @@ -1,14 +1,17 @@ package net.i2p.client.streaming.impl; import java.io.IOException; +import java.io.InputStream; import java.net.ConnectException; import java.net.NoRouteToHostException; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketTimeoutException; import java.util.HashSet; +import java.util.List; import java.util.Properties; import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -37,6 +40,7 @@ public class I2PSocketManagerFull implements I2PSocketManager { private final I2PAppContext _context; private final Log _log; private final I2PSession _session; + private final List _subsessions; private final I2PServerSocketFull _serverSocket; private StandardServerSocket _realServerSocket; private final ConnectionOptions _defaultOptions; @@ -80,6 +84,7 @@ public class I2PSocketManagerFull implements I2PSocketManager { public I2PSocketManagerFull(I2PAppContext context, I2PSession session, Properties opts, String name) { _context = context; _session = session; + _subsessions = new CopyOnWriteArrayList(); _log = _context.logManager().getLog(I2PSocketManagerFull.class); _name = name + " " + (__managerId.incrementAndGet()); @@ -117,6 +122,39 @@ public class I2PSocketManagerFull implements I2PSocketManager { return _session; } +//////////// gahhh we want a socket manager, not a session + /** + * @return a new subsession, non-null + * @param privateKeyStream null for transient, if non-null must have same encryption keys as primary session + * and different signing keys + * @param opts subsession options if any, may be null + * @since 0.9.19 + */ + public I2PSession addSubsession(InputStream privateKeyStream, Properties opts) throws I2PSessionException { + I2PSession rv = _session.addSubsession(privateKeyStream, opts); + _subsessions.add(rv); + return rv; + } + + /** + * Remove the subsession + * + * @since 0.9.19 + */ + public void removeSubsession(I2PSession session) { + _session.removeSubsession(session); + _subsessions.remove(session); + // ... + } + + /** + * @return a list of subsessions, non-null, does not include the primary session + * @since 0.9.19 + */ + public List getSubsessions() { + return _session.getSubsessions(); + } + public ConnectionManager getConnectionManager() { return _connectionManager; } diff --git a/core/java/src/net/i2p/client/I2PSession.java b/core/java/src/net/i2p/client/I2PSession.java index da67e2c02..54f8540f3 100644 --- a/core/java/src/net/i2p/client/I2PSession.java +++ b/core/java/src/net/i2p/client/I2PSession.java @@ -9,6 +9,8 @@ package net.i2p.client; * */ +import java.io.InputStream; +import java.util.List; import java.util.Properties; import java.util.Set; @@ -21,7 +23,7 @@ import net.i2p.data.SigningPrivateKey; /** *

Define the standard means of sending and receiving messages on the * I2P network by using the I2CP (the client protocol). This is done over a - * bidirectional TCP socket and never sends any private keys. + * bidirectional TCP socket. * * End to end encryption in I2PSession was disabled in release 0.6. * @@ -247,6 +249,27 @@ public interface I2PSession { * */ public void destroySession() throws I2PSessionException; + + /** + * @return a new subsession, non-null + * @param privateKeyStream null for transient, if non-null must have same encryption keys as primary session + * and different signing keys + * @param opts subsession options if any, may be null + * @since 0.9.19 + */ + public I2PSession addSubsession(InputStream privateKeyStream, Properties opts) throws I2PSessionException; + + /** + * @return a list of subsessions, non-null, does not include the primary session + * @since 0.9.19 + */ + public void removeSubsession(I2PSession session); + + /** + * @return a list of subsessions, non-null, does not include the primary session + * @since 0.9.19 + */ + public List getSubsessions(); /** * Actually connect the session and start receiving/sending messages diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java index 7fd175978..93bc2af78 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl.java @@ -23,6 +23,7 @@ import java.util.Locale; import java.util.Map; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; @@ -43,6 +44,7 @@ import net.i2p.data.i2cp.I2CPMessage; import net.i2p.data.i2cp.I2CPMessageReader; import net.i2p.data.i2cp.MessagePayloadMessage; import net.i2p.data.i2cp.SessionId; +import net.i2p.data.i2cp.SessionStatusMessage; import net.i2p.internal.I2CPMessageQueue; import net.i2p.internal.InternalClientManager; import net.i2p.internal.QueuedI2CPMessageReader; @@ -76,6 +78,15 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa /** currently granted lease set, or null */ private volatile LeaseSet _leaseSet; + // subsession stuff + // registered subsessions + private final List _subsessions; + // established subsessions + private final ConcurrentHashMap _subsessionMap; + private final Object _subsessionLock = new Object(); + private static final String MIN_SUBSESSION_VERSION = "0.9.19"; + private volatile boolean _routerSupportsSubsessions; + /** hostname of router - will be null if in RouterContext */ protected final String _hostname; /** port num to router - will be 0 if in RouterContext */ @@ -179,6 +190,9 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa TEST_LOOKUP || (routerVersion != null && routerVersion.length() > 0 && VersionComparator.comp(routerVersion, MIN_HOST_LOOKUP_VERSION) >= 0); + _routerSupportsSubsessions = _context.isRouterContext() || + (routerVersion != null && routerVersion.length() > 0 && + VersionComparator.comp(routerVersion, MIN_SUBSESSION_VERSION) >= 0); synchronized (_stateLock) { if (_state == State.OPENING) { _state = State.GOTDATE; @@ -196,18 +210,42 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa */ protected I2PSessionImpl(I2PAppContext context, Properties options, I2PClientMessageHandlerMap handlerMap) { - this(context, options, handlerMap, false); + this(context, options, handlerMap, null, false); } - + + /* + * For extension by SubSession via I2PSessionMuxedImpl and I2PSessionImpl2 + * + * @param destKeyStream stream containing the private key data, + * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} + * @param options set of options to configure the router with, if null will use System properties + * @since 0.9.19 + */ + protected I2PSessionImpl(I2PSessionImpl primary, InputStream destKeyStream, Properties options) throws I2PSessionException { + this(primary.getContext(), options, primary.getHandlerMap(), primary.getProducer(), true); + _availabilityNotifier = new AvailabilityNotifier(); + try { + readDestination(destKeyStream); + } catch (DataFormatException dfe) { + throw new I2PSessionException("Error reading the destination key stream", dfe); + } catch (IOException ioe) { + throw new I2PSessionException("Error reading the destination key stream", ioe); + } + } + /** * Basic setup of finals * @since 0.9.7 */ private I2PSessionImpl(I2PAppContext context, Properties options, - I2PClientMessageHandlerMap handlerMap, boolean hasDest) { + I2PClientMessageHandlerMap handlerMap, + I2CPMessageProducer producer, + boolean hasDest) { _context = context; _handlerMap = handlerMap; _log = context.logManager().getLog(getClass()); + _subsessions = new CopyOnWriteArrayList(); + _subsessionMap = new ConcurrentHashMap(4); if (options == null) options = (Properties) System.getProperties().clone(); _options = loadConfig(options); @@ -215,7 +253,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa _portNum = getPort(); _fastReceive = Boolean.parseBoolean(_options.getProperty(I2PClient.PROP_FAST_RECEIVE)); if (hasDest) { - _producer = new I2CPMessageProducer(context); + _producer = producer; _availableMessages = new ConcurrentHashMap(); _myDestination = new Destination(); _privateKey = new PrivateKey(); @@ -238,10 +276,10 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa * @param destKeyStream stream containing the private key data, * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} * @param options set of options to configure the router with, if null will use System properties - * @throws I2PSessionException if there is a problem loading the private keys or + * @throws I2PSessionException if there is a problem loading the private keys */ public I2PSessionImpl(I2PAppContext context, InputStream destKeyStream, Properties options) throws I2PSessionException { - this(context, options, new I2PClientMessageHandlerMap(context), true); + this(context, options, new I2PClientMessageHandlerMap(context), new I2CPMessageProducer(context), true); _availabilityNotifier = new AvailabilityNotifier(); try { readDestination(destKeyStream); @@ -251,6 +289,66 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa throw new I2PSessionException("Error reading the destination key stream", ioe); } } + + /** + * Router must be connected or was connected... for now. + * + * @return a new subsession, non-null + * @param privateKeyStream null for transient, if non-null must have same encryption keys as primary session + * and different signing keys + * @param opts subsession options if any, may be null + * @since 0.9.19 + */ + public I2PSession addSubsession(InputStream privateKeyStream, Properties opts) throws I2PSessionException { + if (!_routerSupportsSubsessions) + throw new I2PSessionException("Router does not support subsessions"); + SubSession sub; + synchronized(_subsessionLock) { + if (_subsessions.size() > _subsessionMap.size()) + throw new I2PSessionException("Subsession request already pending"); + sub = new SubSession(this, privateKeyStream, opts); + for (SubSession ss : _subsessions) { + if (ss.getDecryptionKey().equals(sub.getDecryptionKey()) && + ss.getPrivateKey().equals(sub.getPrivateKey())) { + throw new I2PSessionException("Dup subsession"); + } + } + _subsessions.add(sub); + } + + synchronized (_stateLock) { + if (_state == State.OPEN) { + _producer.connect(sub); + } // else will be called in connect() + } + return sub; + } + + /** + * @since 0.9.19 + */ + public void removeSubsession(I2PSession session) { + if (!(session instanceof SubSession)) + return; + synchronized(_subsessionLock) { + _subsessions.remove(session); + SessionId id = ((SubSession) session).getSessionId(); + if (id != null) + _subsessionMap.remove(id); + /// tell the subsession + ///.... + } + } + + /** + * @return a list of subsessions, non-null, does not include the primary session + * @since 0.9.19 + */ + public List getSubsessions() { + synchronized(_subsessionLock) { + return new ArrayList(_subsessions); + } + } /** * Parse the config for anything we know about. @@ -536,6 +634,14 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa startIdleMonitor(); startVerifyUsage(); success = true; + + // now send CreateSessionMessages for all subsessions, one at a time, must wait for each response + synchronized(_subsessionLock) { + for (SubSession ss : _subsessions) { + _producer.connect(ss); + } + } + } catch (InterruptedException ie) { throw new I2PSessionException("Interrupted", ie); } catch (UnknownHostException uhe) { @@ -739,19 +845,80 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa /** * The I2CPMessageEventListener callback. * Recieve notification of some I2CP message and handle it if possible. + * + * We route the message based on message type AND session ID. + * + * The following types never contain a session ID and are not routable to + * a subsession: + * BandwidthLimitsMessage, DestReplyMessage + * + * The following types may not ontain a valid session ID + * even when intended for a subsession, so we must take special care: + * SessionStatusMessage + * * @param reader unused */ public void messageReceived(I2CPMessageReader reader, I2CPMessage message) { - I2CPMessageHandler handler = _handlerMap.getHandler(message.getType()); - if (handler == null) { - if (_log.shouldLog(Log.WARN)) - _log.warn(getPrefix() + "Unknown message or unhandleable message received: type = " - + message.getType()); + int type = message.getType(); + SessionId id = message.sessionId(); + if (id == null || id.equals(_sessionId) || + (_sessionId == null && id != null && type == SessionStatusMessage.MESSAGE_TYPE)) { + // it's for us + I2CPMessageHandler handler = _handlerMap.getHandler(type); + if (handler != null) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getPrefix() + "Message received of type " + type + + " to be handled by " + handler.getClass().getSimpleName()); + handler.handleMessage(message, this); + } else { + if (_log.shouldLog(Log.WARN)) + _log.warn(getPrefix() + "Unknown message or unhandleable message received: type = " + + type); + } } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getPrefix() + "Message received of type " + message.getType() - + " to be handled by " + handler.getClass().getSimpleName()); - handler.handleMessage(message, this); + SubSession sub = _subsessionMap.get(id); + if (sub != null) { + // it's for a subsession + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getPrefix() + "Message received of type " + type + + " to be handled by " + sub); + sub.messageReceived(reader, message); + } else if (id != null && type == SessionStatusMessage.MESSAGE_TYPE) { + // look for a subsession without a session + synchronized (_subsessionLock) { + for (SubSession sess : _subsessions) { + if (sess.getSessionId() == null) { + sub.messageReceived(reader, message); + id = sess.getSessionId(); + if (id != null) { + if (id.equals(_sessionId)) { + // shouldnt happen + sess.setSessionId(null); + if (_log.shouldLog(Log.WARN)) + _log.warn("Dup or our session id " + id); + } else { + SubSession old = _subsessionMap.putIfAbsent(id, sess); + if (old != null) { + // shouldnt happen + sess.setSessionId(null); + if (_log.shouldLog(Log.WARN)) + _log.warn("Dup session id " + id); + } + } + } + return; + } + if (_log.shouldLog(Log.WARN)) + _log.warn(getPrefix() + "No session " + id + " to handle message: type = " + + type); + } + } + } else { + // it's for nobody + if (_log.shouldLog(Log.WARN)) + _log.warn(getPrefix() + "No session " + id + " to handle message: type = " + + type); + } } } @@ -786,6 +953,18 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa */ I2CPMessageProducer getProducer() { return _producer; } + /** + * For Subsessions + * @since 0.9.19 + */ + I2PClientMessageHandlerMap getHandlerMap() { return _handlerMap; } + + /** + * For Subsessions + * @since 0.9.19 + */ + I2PAppContext getContext() { return _context; } + /** * Retrieve the configuration options * @return non-null, if insantiated with null options, this will be the System properties. @@ -803,7 +982,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa /** * Has the session been closed (or not yet connected)? - * False when open and during transitions. Unsynchronized. + * False when open and during transitions. */ public boolean isClosed() { synchronized (_stateLock) { @@ -892,6 +1071,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa if (_availabilityNotifier != null) _availabilityNotifier.stopNotifying(); closeSocket(); + _subsessionMap.clear(); if (_sessionListener != null) _sessionListener.disconnected(this); } diff --git a/core/java/src/net/i2p/client/I2PSessionImpl2.java b/core/java/src/net/i2p/client/I2PSessionImpl2.java index 9f8a5465d..21bb04737 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl2.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl2.java @@ -50,9 +50,9 @@ class I2PSessionImpl2 extends I2PSessionImpl { private static final long REMOVE_EXPIRED_TIME = 63*1000; - /** - * for extension by SimpleSession (no dest) - */ + /** + * for extension by SimpleSession (no dest) + */ protected I2PSessionImpl2(I2PAppContext context, Properties options, I2PClientMessageHandlerMap handlerMap) { super(context, options, handlerMap); @@ -61,15 +61,17 @@ class I2PSessionImpl2 extends I2PSessionImpl { } /** + * for extension by I2PSessionMuxedImpl + * * Create a new session, reading the Destination, PrivateKey, and SigningPrivateKey * from the destKeyStream, and using the specified options to connect to the router * * @param destKeyStream stream containing the private key data, * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} * @param options set of options to configure the router with, if null will use System properties - * @throws I2PSessionException if there is a problem loading the private keys or + * @throws I2PSessionException if there is a problem loading the private keys */ - public I2PSessionImpl2(I2PAppContext ctx, InputStream destKeyStream, Properties options) throws I2PSessionException { + protected I2PSessionImpl2(I2PAppContext ctx, InputStream destKeyStream, Properties options) throws I2PSessionException { super(ctx, destKeyStream, options); _sendingStates = new ConcurrentHashMap(32); _sendMessageNonce = new AtomicLong(); @@ -94,6 +96,26 @@ class I2PSessionImpl2 extends I2PSessionImpl { _context.statManager().createRateStat("i2cp.tx.msgExpanded", "size before compression", "i2cp", new long[] { 30*60*1000 }); } + /* + * For extension by SubSession via I2PSessionMuxedImpl + * + * @param destKeyStream stream containing the private key data, + * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} + * @param options set of options to configure the router with, if null will use System properties + * @since 0.9.19 + */ + protected I2PSessionImpl2(I2PSessionImpl primary, InputStream destKeyStream, Properties options) throws I2PSessionException { + super(primary, destKeyStream, options); + _sendingStates = new ConcurrentHashMap(32); + _sendMessageNonce = new AtomicLong(); + _noEffort = "none".equals(getOptions().getProperty(I2PClient.PROP_RELIABILITY, "").toLowerCase(Locale.US)); + _context.statManager().createRateStat("i2cp.receiveStatusTime.1", "How long it took to get status=1 back", "i2cp", new long[] { 10*60*1000 }); + _context.statManager().createRateStat("i2cp.receiveStatusTime.4", "How long it took to get status=4 back", "i2cp", new long[] { 10*60*1000 }); + _context.statManager().createRateStat("i2cp.receiveStatusTime.5", "How long it took to get status=5 back", "i2cp", new long[] { 10*60*1000 }); + _context.statManager().createRateStat("i2cp.tx.msgCompressed", "compressed size transferred", "i2cp", new long[] { 30*60*1000 }); + _context.statManager().createRateStat("i2cp.tx.msgExpanded", "size before compression", "i2cp", new long[] { 30*60*1000 }); + } + /** * Fire up a periodic task to check for unclaimed messages * @since 0.9.14 diff --git a/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java b/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java index 48ca6016e..880670a53 100644 --- a/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java @@ -82,6 +82,24 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 { // discards the one in super(), sorry about that... (no it wasn't started yet) _availabilityNotifier = new MuxedAvailabilityNotifier(); } + + /* + * For extension by SubSession + * + * @param destKeyStream stream containing the private key data, + * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} + * @param options set of options to configure the router with, if null will use System properties + * @since 0.9.19 + */ + protected I2PSessionMuxedImpl(I2PSessionImpl primary, InputStream destKeyStream, Properties options) throws I2PSessionException { + super(primary, destKeyStream, options); + // also stored in _sessionListener but we keep it in _demultipexer + // as well so we don't have to keep casting + _demultiplexer = new I2PSessionDemultiplexer(primary.getContext()); + super.setSessionListener(_demultiplexer); + // discards the one in super(), sorry about that... (no it wasn't started yet) + _availabilityNotifier = new MuxedAvailabilityNotifier(); + } /** listen on all protocols and ports */ @Override diff --git a/core/java/src/net/i2p/client/SubSession.java b/core/java/src/net/i2p/client/SubSession.java new file mode 100644 index 000000000..1e75fa409 --- /dev/null +++ b/core/java/src/net/i2p/client/SubSession.java @@ -0,0 +1,299 @@ +package net.i2p.client; + +/* + * free (adj.): unencumbered; not under the control of others + * Written by jrandom in 2003 and released into the public domain + * with no warranty of any kind, either expressed or implied. + * It probably won't make your computer catch on fire, or eat + * your children, but it might. Use at your own risk. + * + */ + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import net.i2p.I2PAppContext; +import net.i2p.data.Destination; +import net.i2p.data.Hash; +import net.i2p.data.PrivateKey; +import net.i2p.data.SigningPrivateKey; +import net.i2p.data.i2cp.I2CPMessage; +import net.i2p.data.i2cp.SessionId; + +/** + * An additional session using another session's connection. + * + * A subsession uses the same connection to the router as the primary session, + * but has a different Destination. It uses the same tunnels as the primary + * but has its own leaseset. It must use the same encryption keys as the primary + * so that garlic encryption/decryption works. + * + * The message handler map and message producer are reused from primary. + * + * Does NOT reuse the session listener ???? + * + * While the I2CP protocol, in theory, allows for fully independent sessions + * over the same I2CP connection, this is not currently supported by the router. + * + * @since 0.9.19 + */ +class SubSession extends I2PSessionMuxedImpl { + private final I2PSessionMuxedImpl _primary; + + /** + * @param primary must be a I2PSessionMuxedImpl + */ + public SubSession(I2PSession primary, InputStream destKeyStream, Properties options) throws I2PSessionException { + super((I2PSessionMuxedImpl)primary, destKeyStream, options); + _primary = (I2PSessionMuxedImpl) primary; + if (!getDecryptionKey().equals(_primary.getDecryptionKey())) + throw new I2PSessionException("encryption key mismatch"); + if (getPrivateKey().equals(_primary.getPrivateKey())) + throw new I2PSessionException("signing key must differ"); + // state management + } + + /** + * Unsupported in a subsession. + * @throws UnsupportedOperationException always + * @since 0.9.19 + */ + @Override + public I2PSession addSubsession(InputStream destKeyStream, Properties opts) throws I2PSessionException { + throw new UnsupportedOperationException(); + } + + /** + * Unsupported in a subsession. + * Does nothing. + * @since 0.9.19 + */ + @Override + public void removeSubsession(I2PSession session) {} + + /** + * Unsupported in a subsession. + * @return empty list always + * @since 0.9.19 + */ + @Override + public List getSubsessions() { + return Collections.emptyList(); + } + + /** + * Does nothing for now + */ + @Override + public void updateOptions(Properties options) {} + + /** + * Connect to the router and establish a session. This call blocks until + * a session is granted. + * + * Should be threadsafe, other threads will block until complete. + * Disconnect / destroy from another thread may be called simultaneously and + * will (should?) interrupt the connect. + * + * @throws I2PSessionException if there is a configuration error or the router is + * not reachable + */ + @Override + public void connect() throws I2PSessionException { + _primary.connect(); + } + + /** + * Has the session been closed (or not yet connected)? + * False when open and during transitions. + */ + @Override + public boolean isClosed() { + return getSessionId() == null || _primary.isClosed(); + } + + /** + * Deliver an I2CP message to the router + * May block for several seconds if the write queue to the router is full + * + * @throws I2PSessionException if the message is malformed or there is an error writing it out + */ + @Override + void sendMessage(I2CPMessage message) throws I2PSessionException { + if (isClosed()) + throw new I2PSessionException("Already closed"); + _primary.sendMessage(message); + } + + /** + * Pass off the error to the listener + * Misspelled, oh well. + * @param error non-null + */ + @Override + void propogateError(String msg, Throwable error) { + _primary.propogateError(msg, error); + if (_sessionListener != null) _sessionListener.errorOccurred(this, msg, error); + } + + /** + * Tear down the session, and do NOT reconnect. + * + * Blocks if session has not been fully started. + */ + @Override + public void destroySession() { + _primary.destroySession(); + if (_availabilityNotifier != null) + _availabilityNotifier.stopNotifying(); + if (_sessionListener != null) _sessionListener.disconnected(this); + } + + /** + * Will interrupt a connect in progress. + */ + @Override + protected void disconnect() { + _primary.disconnect(); + } + + @Override + protected boolean reconnect() { + return _primary.reconnect(); + } + + /** + * Called by the message handler + * on reception of DestReplyMessage + * + * This will never happen, as the dest reply message does not contain a session ID. + */ + @Override + void destReceived(Destination d) { + _primary.destReceived(d); + } + + /** + * Called by the message handler + * on reception of DestReplyMessage + * + * This will never happen, as the dest reply message does not contain a session ID. + * + * @param h non-null + */ + @Override + void destLookupFailed(Hash h) { + _primary.destLookupFailed(h); + } + + /** + * Called by the message handler + * on reception of HostReplyMessage + * @param d non-null + */ + void destReceived(long nonce, Destination d) { + _primary.destReceived(nonce, d); + } + + /** + * Called by the message handler + * on reception of HostReplyMessage + */ + @Override + void destLookupFailed(long nonce) { + _primary.destLookupFailed(nonce); + } + + /** + * Called by the message handler. + * This will never happen, as the bw limits message does not contain a session ID. + */ + @Override + void bwReceived(int[] i) { + _primary.bwReceived(i); + } + + /** + * Blocking. Waits a max of 10 seconds by default. + * See lookupDest with maxWait parameter to change. + * Implemented in 0.8.3 in I2PSessionImpl; + * previously was available only in I2PSimpleSession. + * Multiple outstanding lookups are now allowed. + * @return null on failure + */ + @Override + public Destination lookupDest(Hash h) throws I2PSessionException { + return _primary.lookupDest(h); + } + + /** + * Blocking. + * @param maxWait ms + * @return null on failure + */ + @Override + public Destination lookupDest(Hash h, long maxWait) throws I2PSessionException { + return _primary.lookupDest(h, maxWait); + } + + /** + * Ask the router to lookup a Destination by host name. + * Blocking. Waits a max of 10 seconds by default. + * + * This only makes sense for a b32 hostname, OR outside router context. + * Inside router context, just query the naming service. + * Outside router context, this does NOT query the context naming service. + * Do that first if you expect a local addressbook. + * + * This will log a warning for non-b32 in router context. + * + * See interface for suggested implementation. + * + * Requires router side to be 0.9.11 or higher. If the router is older, + * this will return null immediately. + */ + @Override + public Destination lookupDest(String name) throws I2PSessionException { + return _primary.lookupDest(name); + } + + /** + * Ask the router to lookup a Destination by host name. + * Blocking. See above for details. + * @param maxWait ms + * @return null on failure + */ + @Override + public Destination lookupDest(String name, long maxWait) throws I2PSessionException { + return _primary.lookupDest(name, maxWait); + } + + /** + * This may not work???????????, as the reply does not contain a session ID, so + * it won't be routed back to us? + */ + @Override + public int[] bandwidthLimits() throws I2PSessionException { + return _primary.bandwidthLimits(); + } + + @Override + protected void updateActivity() { + _primary.updateActivity(); + } + + @Override + public long lastActivity() { + return _primary.lastActivity(); + } + + @Override + public void setReduced() { + _primary.setReduced(); + } +} diff --git a/core/java/src/net/i2p/data/i2cp/CreateLeaseSetMessage.java b/core/java/src/net/i2p/data/i2cp/CreateLeaseSetMessage.java index 6477768c5..0cbb854ff 100644 --- a/core/java/src/net/i2p/data/i2cp/CreateLeaseSetMessage.java +++ b/core/java/src/net/i2p/data/i2cp/CreateLeaseSetMessage.java @@ -38,6 +38,11 @@ public class CreateLeaseSetMessage extends I2CPMessageImpl { return _sessionId; } + @Override + public SessionId sessionId() { + return _sessionId; + } + public void setSessionId(SessionId id) { _sessionId = id; } diff --git a/core/java/src/net/i2p/data/i2cp/DestroySessionMessage.java b/core/java/src/net/i2p/data/i2cp/DestroySessionMessage.java index b67f5fa6f..4275863e1 100644 --- a/core/java/src/net/i2p/data/i2cp/DestroySessionMessage.java +++ b/core/java/src/net/i2p/data/i2cp/DestroySessionMessage.java @@ -32,6 +32,16 @@ public class DestroySessionMessage extends I2CPMessageImpl { return _sessionId; } + /** + * Return the SessionId for this message. + * + * @since 0.9.19 + */ + @Override + public SessionId sessionId() { + return _sessionId; + } + public void setSessionId(SessionId id) { _sessionId = id; } diff --git a/core/java/src/net/i2p/data/i2cp/HostLookupMessage.java b/core/java/src/net/i2p/data/i2cp/HostLookupMessage.java index 1a8fcfe86..c4405f90a 100644 --- a/core/java/src/net/i2p/data/i2cp/HostLookupMessage.java +++ b/core/java/src/net/i2p/data/i2cp/HostLookupMessage.java @@ -76,6 +76,16 @@ public class HostLookupMessage extends I2CPMessageImpl { return _sessionId; } + /** + * Return the SessionId for this message. + * + * @since 0.9.19 + */ + @Override + public SessionId sessionId() { + return _sessionId; + } + /** * @return 0 to 2**32 - 1 */ diff --git a/core/java/src/net/i2p/data/i2cp/HostReplyMessage.java b/core/java/src/net/i2p/data/i2cp/HostReplyMessage.java index 37faaa276..b350f225a 100644 --- a/core/java/src/net/i2p/data/i2cp/HostReplyMessage.java +++ b/core/java/src/net/i2p/data/i2cp/HostReplyMessage.java @@ -73,6 +73,16 @@ public class HostReplyMessage extends I2CPMessageImpl { return _sessionId; } + /** + * Return the SessionId for this message. + * + * @since 0.9.19 + */ + @Override + public SessionId sessionId() { + return _sessionId; + } + /** * @return 0 to 2**32 - 1 */ diff --git a/core/java/src/net/i2p/data/i2cp/I2CPMessage.java b/core/java/src/net/i2p/data/i2cp/I2CPMessage.java index 92c4f6163..58d312013 100644 --- a/core/java/src/net/i2p/data/i2cp/I2CPMessage.java +++ b/core/java/src/net/i2p/data/i2cp/I2CPMessage.java @@ -60,9 +60,20 @@ public interface I2CPMessage extends DataStructure { public void writeMessage(OutputStream out) throws I2CPMessageException, IOException; /** - * Return the unique identifier for this type of APIMessage, as specified in the + * Return the unique identifier for this type of message, as specified in the * network specification document under #ClientAccessLayerMessages - * @return unique identifier for this type of APIMessage + * @return unique identifier for this type of message */ public int getType(); -} \ No newline at end of file + + /** + * Return the SessionId for this type of message. + * Most but not all message types include a SessionId. + * The ones that do already define getSessionId(), but some return a SessionId and + * some return a long, so we define a new method here. + * + * @return SessionId or null if this message type does not include a SessionId + * @since 0.9.19 + */ + public SessionId sessionId(); +} diff --git a/core/java/src/net/i2p/data/i2cp/I2CPMessageException.java b/core/java/src/net/i2p/data/i2cp/I2CPMessageException.java index 63d13b040..b2e38c21e 100644 --- a/core/java/src/net/i2p/data/i2cp/I2CPMessageException.java +++ b/core/java/src/net/i2p/data/i2cp/I2CPMessageException.java @@ -12,7 +12,7 @@ package net.i2p.data.i2cp; import net.i2p.I2PException; /** - * Represent an error serializing or deserializing an APIMessage + * Represent an error serializing or deserializing a message * * @author jrandom */ diff --git a/core/java/src/net/i2p/data/i2cp/I2CPMessageImpl.java b/core/java/src/net/i2p/data/i2cp/I2CPMessageImpl.java index 3ace5df9d..19b8d1cc3 100644 --- a/core/java/src/net/i2p/data/i2cp/I2CPMessageImpl.java +++ b/core/java/src/net/i2p/data/i2cp/I2CPMessageImpl.java @@ -127,4 +127,15 @@ public abstract class I2CPMessageImpl extends DataStructureImpl implements I2CPM throw new DataFormatException("Error writing the message", ime); } } + + /** + * Return the SessionId for this type of message. + * Most but not all message types include a SessionId. + * The ones that do already define getSessionId(), but some return a SessionId and + * some return a long, so we define a new method here. + * + * @return null always. Extending classes with a SessionId must override. + * @since 0.9.19 + */ + public SessionId sessionId() { return null; } } diff --git a/core/java/src/net/i2p/data/i2cp/MessagePayloadMessage.java b/core/java/src/net/i2p/data/i2cp/MessagePayloadMessage.java index 331ee3dad..4e88f71ba 100644 --- a/core/java/src/net/i2p/data/i2cp/MessagePayloadMessage.java +++ b/core/java/src/net/i2p/data/i2cp/MessagePayloadMessage.java @@ -37,6 +37,16 @@ public class MessagePayloadMessage extends I2CPMessageImpl { return _sessionId; } + /** + * Return the SessionId for this message. + * + * @since 0.9.19 + */ + @Override + public SessionId sessionId() { + return _sessionId >= 0 ? new SessionId(_sessionId) : null; + } + /** @param id 0-65535 */ public void setSessionId(long id) { _sessionId = (int) id; diff --git a/core/java/src/net/i2p/data/i2cp/MessageStatusMessage.java b/core/java/src/net/i2p/data/i2cp/MessageStatusMessage.java index d9122bd77..5b77ffc4c 100644 --- a/core/java/src/net/i2p/data/i2cp/MessageStatusMessage.java +++ b/core/java/src/net/i2p/data/i2cp/MessageStatusMessage.java @@ -193,6 +193,16 @@ public class MessageStatusMessage extends I2CPMessageImpl { return _sessionId; } + /** + * Return the SessionId for this message. + * + * @since 0.9.19 + */ + @Override + public SessionId sessionId() { + return _sessionId >= 0 ? new SessionId(_sessionId) : null; + } + /** @param id 0-65535 */ public void setSessionId(long id) { _sessionId = (int) id; diff --git a/core/java/src/net/i2p/data/i2cp/ReceiveMessageBeginMessage.java b/core/java/src/net/i2p/data/i2cp/ReceiveMessageBeginMessage.java index 032c1ea8b..ccb96cb60 100644 --- a/core/java/src/net/i2p/data/i2cp/ReceiveMessageBeginMessage.java +++ b/core/java/src/net/i2p/data/i2cp/ReceiveMessageBeginMessage.java @@ -36,6 +36,16 @@ public class ReceiveMessageBeginMessage extends I2CPMessageImpl { return _sessionId; } + /** + * Return the SessionId for this message. + * + * @since 0.9.19 + */ + @Override + public SessionId sessionId() { + return _sessionId >= 0 ? new SessionId(_sessionId) : null; + } + /** @param id 0-65535 */ public void setSessionId(long id) { _sessionId = (int) id; diff --git a/core/java/src/net/i2p/data/i2cp/ReceiveMessageEndMessage.java b/core/java/src/net/i2p/data/i2cp/ReceiveMessageEndMessage.java index c405b93a3..5be2bbfeb 100644 --- a/core/java/src/net/i2p/data/i2cp/ReceiveMessageEndMessage.java +++ b/core/java/src/net/i2p/data/i2cp/ReceiveMessageEndMessage.java @@ -35,6 +35,16 @@ public class ReceiveMessageEndMessage extends I2CPMessageImpl { return _sessionId; } + /** + * Return the SessionId for this message. + * + * @since 0.9.19 + */ + @Override + public SessionId sessionId() { + return _sessionId >= 0 ? new SessionId(_sessionId) : null; + } + /** @param id 0-65535 */ public void setSessionId(long id) { _sessionId = (int) id; diff --git a/core/java/src/net/i2p/data/i2cp/ReconfigureSessionMessage.java b/core/java/src/net/i2p/data/i2cp/ReconfigureSessionMessage.java index 01130670a..8f80f531b 100644 --- a/core/java/src/net/i2p/data/i2cp/ReconfigureSessionMessage.java +++ b/core/java/src/net/i2p/data/i2cp/ReconfigureSessionMessage.java @@ -33,6 +33,16 @@ public class ReconfigureSessionMessage extends I2CPMessageImpl { return _sessionId; } + /** + * Return the SessionId for this message. + * + * @since 0.9.19 + */ + @Override + public SessionId sessionId() { + return _sessionId; + } + public void setSessionId(SessionId id) { _sessionId = id; } diff --git a/core/java/src/net/i2p/data/i2cp/ReportAbuseMessage.java b/core/java/src/net/i2p/data/i2cp/ReportAbuseMessage.java index 76311ae6e..cfdbd6cfe 100644 --- a/core/java/src/net/i2p/data/i2cp/ReportAbuseMessage.java +++ b/core/java/src/net/i2p/data/i2cp/ReportAbuseMessage.java @@ -35,6 +35,16 @@ public class ReportAbuseMessage extends I2CPMessageImpl { return _sessionId; } + /** + * Return the SessionId for this message. + * + * @since 0.9.19 + */ + @Override + public SessionId sessionId() { + return _sessionId; + } + public void setSessionId(SessionId id) { _sessionId = id; } diff --git a/core/java/src/net/i2p/data/i2cp/RequestLeaseSetMessage.java b/core/java/src/net/i2p/data/i2cp/RequestLeaseSetMessage.java index 1e9b2dcf6..15d9b70ec 100644 --- a/core/java/src/net/i2p/data/i2cp/RequestLeaseSetMessage.java +++ b/core/java/src/net/i2p/data/i2cp/RequestLeaseSetMessage.java @@ -45,6 +45,16 @@ public class RequestLeaseSetMessage extends I2CPMessageImpl { return _sessionId; } + /** + * Return the SessionId for this message. + * + * @since 0.9.19 + */ + @Override + public SessionId sessionId() { + return _sessionId; + } + public void setSessionId(SessionId id) { _sessionId = id; } diff --git a/core/java/src/net/i2p/data/i2cp/RequestVariableLeaseSetMessage.java b/core/java/src/net/i2p/data/i2cp/RequestVariableLeaseSetMessage.java index 160e193dc..3edd06171 100644 --- a/core/java/src/net/i2p/data/i2cp/RequestVariableLeaseSetMessage.java +++ b/core/java/src/net/i2p/data/i2cp/RequestVariableLeaseSetMessage.java @@ -55,6 +55,16 @@ public class RequestVariableLeaseSetMessage extends I2CPMessageImpl { return _sessionId; } + /** + * Return the SessionId for this message. + * + * @since 0.9.19 + */ + @Override + public SessionId sessionId() { + return _sessionId; + } + public void setSessionId(SessionId id) { _sessionId = id; } diff --git a/core/java/src/net/i2p/data/i2cp/SendMessageMessage.java b/core/java/src/net/i2p/data/i2cp/SendMessageMessage.java index 67a67ffa4..e783c62c8 100644 --- a/core/java/src/net/i2p/data/i2cp/SendMessageMessage.java +++ b/core/java/src/net/i2p/data/i2cp/SendMessageMessage.java @@ -38,6 +38,16 @@ public class SendMessageMessage extends I2CPMessageImpl { return _sessionId; } + /** + * Return the SessionId for this message. + * + * @since 0.9.19 + */ + @Override + public SessionId sessionId() { + return _sessionId; + } + public void setSessionId(SessionId id) { _sessionId = id; } diff --git a/core/java/src/net/i2p/data/i2cp/SessionStatusMessage.java b/core/java/src/net/i2p/data/i2cp/SessionStatusMessage.java index 5af2a649b..122cf5d1e 100644 --- a/core/java/src/net/i2p/data/i2cp/SessionStatusMessage.java +++ b/core/java/src/net/i2p/data/i2cp/SessionStatusMessage.java @@ -42,6 +42,16 @@ public class SessionStatusMessage extends I2CPMessageImpl { return _sessionId; } + /** + * Return the SessionId for this message. + * + * @since 0.9.19 + */ + @Override + public SessionId sessionId() { + return _sessionId; + } + public void setSessionId(SessionId id) { _sessionId = id; } diff --git a/router/java/src/net/i2p/router/TunnelManagerFacade.java b/router/java/src/net/i2p/router/TunnelManagerFacade.java index 2a35d0c97..a95fbd1f1 100644 --- a/router/java/src/net/i2p/router/TunnelManagerFacade.java +++ b/router/java/src/net/i2p/router/TunnelManagerFacade.java @@ -146,6 +146,21 @@ public interface TunnelManagerFacade extends Service { * */ public void buildTunnels(Destination client, ClientTunnelSettings settings); + + /** + * Add another destination to the same tunnels. + * Must have same encryption key an a different signing key. + * @throws IllegalArgumentException if not + * @return success + * @since 0.9.19 + */ + public boolean addAlias(Destination dest, ClientTunnelSettings settings, Destination existingClient); + + /** + * Remove another destination to the same tunnels. + * @since 0.9.19 + */ + public void removeAlias(Destination dest); public TunnelPoolSettings getInboundSettings(); public TunnelPoolSettings getOutboundSettings(); diff --git a/router/java/src/net/i2p/router/TunnelPoolSettings.java b/router/java/src/net/i2p/router/TunnelPoolSettings.java index fd718e479..3c47a7762 100644 --- a/router/java/src/net/i2p/router/TunnelPoolSettings.java +++ b/router/java/src/net/i2p/router/TunnelPoolSettings.java @@ -1,11 +1,13 @@ package net.i2p.router; +import java.util.Set; import java.util.Locale; import java.util.Map; import java.util.Properties; import net.i2p.data.Base64; import net.i2p.data.Hash; +import net.i2p.util.ConcurrentHashSet; import net.i2p.util.NativeBigInteger; import net.i2p.util.RandomSource; import net.i2p.util.SystemVersion; @@ -31,6 +33,8 @@ public class TunnelPoolSettings { private final Properties _unknownOptions; private Hash _randomKey; private int _priority; + private final Set _aliases; + private Hash _aliasOf; /** prefix used to override the router's defaults for clients */ // unimplemented @@ -119,6 +123,10 @@ public class TunnelPoolSettings { _randomKey = generateRandomKey(); if (_isExploratory && !_isInbound) _priority = EXPLORATORY_PRIORITY; + if (!_isExploratory) + _aliases = new ConcurrentHashSet(4); + else + _aliases = null; } /** how many tunnels should be available at all times */ @@ -206,6 +214,34 @@ public class TunnelPoolSettings { /** what destination is this a client tunnel for (or null if exploratory) */ public Hash getDestination() { return _destination; } + + /** + * Other destinations that use the same tunnel (or null if exploratory) + * Modifiable, concurrent, not a copy + * @since 0.9.19 + */ + public Set getAliases() { + return _aliases; + } + + /** + * Other destination that this is an alias of (or null). + * If non-null, don't build tunnels. + * @since 0.9.19 + */ + public Hash getAliasOf() { + return _aliasOf; + } + + + /** + * Set other destination that this is an alias of (or null). + * If non-null, don't build tunnels. + * @since 0.9.19 + */ + public void setAliasOf(Hash h) { + _aliasOf = h; + } /** * random key used for peer ordering @@ -235,7 +271,7 @@ public class TunnelPoolSettings { public int getPriority() { return _priority; } public Properties getUnknownOptions() { return _unknownOptions; } - + /** * @param prefix non-null */ diff --git a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java index fbfa0a560..943555643 100644 --- a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java +++ b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java @@ -16,6 +16,7 @@ import java.io.OutputStream; import java.net.Socket; import java.util.concurrent.ConcurrentHashMap; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; @@ -39,6 +40,7 @@ import net.i2p.data.i2cp.SendMessageMessage; import net.i2p.data.i2cp.SendMessageExpiresMessage; import net.i2p.data.i2cp.SessionConfig; import net.i2p.data.i2cp.SessionId; +import net.i2p.data.i2cp.SessionStatusMessage; import net.i2p.router.Job; import net.i2p.router.JobImpl; import net.i2p.router.RouterContext; @@ -51,6 +53,9 @@ import net.i2p.util.SimpleTimer; /** * Bridge the router and the client - managing state for a client. * + * As of release 0.9.19, multiple sessions are supported on a single + * I2CP connection. These sessions share tunnels and some configuration. + * * @author jrandom */ class ClientConnectionRunner { @@ -61,21 +66,16 @@ class ClientConnectionRunner { private final Socket _socket; /** output stream of the socket that I2CP messages bound to the client should be written to */ private OutputStream _out; - /** session ID of the current client */ - private SessionId _sessionId; - /** user's config */ - private SessionConfig _config; + + private final ConcurrentHashMap _sessions; + private String _clientVersion; /** * Mapping of MessageId to Payload, storing messages for retrieval. * Unused for i2cp.fastReceive = "true" (_dontSendMSMOnRecive = true) */ private final Map _messages; - /** lease set request state, or null if there is no request pending on at the moment */ - private LeaseRequestState _leaseRequest; private int _consecutiveLeaseRequestFails; - /** currently allocated leaseSet, or null if none is allocated */ - private LeaseSet _currentLeaseSet; /** * Set of messageIds created but not yet ACCEPTED. * Unused for i2cp.messageReliability = "none" (_dontSendMSM = true) @@ -83,7 +83,7 @@ class ClientConnectionRunner { private final Set _acceptedPending; /** thingy that does stuff */ protected I2CPMessageReader _reader; - /** just for this destination */ + /** Used for all sessions, which must all have the same crypto keys */ private SessionKeyManager _sessionKeyManager; /** * This contains the last 10 MessageIds that have had their (non-ack) status @@ -91,7 +91,6 @@ class ClientConnectionRunner { */ private final List _alreadyProcessed; private ClientWriterRunner _writer; - private Hash _destHashCache; /** are we, uh, dead */ private volatile boolean _dead; /** For outbound traffic. true if i2cp.messageReliability = "none"; @since 0.8.1 */ @@ -108,11 +107,30 @@ class ClientConnectionRunner { private static final int MAX_LEASE_FAILS = 5; private static final int BUF_SIZE = 32*1024; + private static final int MAX_SESSIONS = 4; /** @since 0.9.2 */ private static final String PROP_TAGS = "crypto.tagsToSend"; private static final String PROP_THRESH = "crypto.lowTagThreshold"; + /** + * For multisession + * @since 0.9.19 + */ + private static class SessionParams { + final Destination dest; + final boolean isPrimary; + SessionId sessionId; + SessionConfig config; + LeaseRequestState leaseRequest; + LeaseSet currentLeaseSet; + + SessionParams(Destination d, boolean isPrimary) { + dest = d; + this.isPrimary = isPrimary; + } + } + /** * Create a new runner against the given socket * @@ -124,6 +142,7 @@ class ClientConnectionRunner { _socket = socket; // unused for fastReceive _messages = new ConcurrentHashMap(); + _sessions = new ConcurrentHashMap(4); _alreadyProcessed = new ArrayList(); _acceptedPending = new ConcurrentHashSet(); _messageId = new AtomicInteger(_context.random().nextInt()); @@ -166,8 +185,7 @@ class ClientConnectionRunner { public synchronized void stopRunning() { if (_dead) return; if (_context.router().isAlive() && _log.shouldLog(Log.WARN)) - _log.warn("Stop the I2CP connection! current leaseSet: " - + _currentLeaseSet, new Exception("Stop client connection")); + _log.warn("Stop the I2CP connection!", new Exception("Stop client connection")); _dead = true; // we need these keys to unpublish the leaseSet if (_reader != null) _reader.stopReading(); @@ -178,21 +196,56 @@ class ClientConnectionRunner { if (_sessionKeyManager != null) _sessionKeyManager.shutdown(); _manager.unregisterConnection(this); - if (_currentLeaseSet != null) - _context.netDb().unpublish(_currentLeaseSet); - _leaseRequest = null; + for (SessionParams sp : _sessions.values()) { + LeaseSet ls = sp.currentLeaseSet; + if (ls != null) + _context.netDb().unpublish(ls); + } synchronized (_alreadyProcessed) { _alreadyProcessed.clear(); } - //_config = null; - //_manager = null; + _sessions.clear(); } /** * Current client's config, - * will be null before session is established + * will be null if session not found + * IS subsession aware. + * @since 0.9.19 added hash param */ - public SessionConfig getConfig() { return _config; } + public SessionConfig getConfig(Hash h) { + SessionParams sp = _sessions.get(h); + if (sp == null) + return null; + return sp.config; + } + + /** + * Current client's config, + * will be null if session not found + * IS subsession aware. + * @since 0.9.19 added id param + */ + public SessionConfig getConfig(SessionId id) { + for (SessionParams sp : _sessions.values()) { + if (id.equals(sp.sessionId)) + return sp.config; + } + return null; + } + + /** + * Primary client's config, + * will be null if session not set up + * @since 0.9.19 + */ + public SessionConfig getPrimaryConfig() { + for (SessionParams sp : _sessions.values()) { + if (sp.isPrimary) + return sp.config; + } + return null; + } /** * The client version. @@ -214,41 +267,186 @@ class ClientConnectionRunner { /** current client's sessionkeymanager */ public SessionKeyManager getSessionKeyManager() { return _sessionKeyManager; } - /** currently allocated leaseSet */ - public LeaseSet getLeaseSet() { return _currentLeaseSet; } - void setLeaseSet(LeaseSet ls) { _currentLeaseSet = ls; } + /** + * Currently allocated leaseSet. + * IS subsession aware. Returns primary leaseset only. + * @return leaseSet or null if not yet set or unknown hash + * @since 0.9.19 added hash parameter + */ + public LeaseSet getLeaseSet(Hash h) { + SessionParams sp = _sessions.get(h); + if (sp == null) + return null; + return sp.currentLeaseSet; + } + + /** + * Currently allocated leaseSet. + * IS subsession aware. + */ + void setLeaseSet(LeaseSet ls) { + Hash h = ls.getDestination().calculateHash(); + SessionParams sp = _sessions.get(h); + if (sp == null) + return; + sp.currentLeaseSet = ls; + } /** * Equivalent to getConfig().getDestination().calculateHash(); * will be null before session is established + * Not subsession aware. Returns random hash from the sessions. + * Don't use if you can help it. + * + * @return primary hash or null if not yet set */ - public Hash getDestHash() { return _destHashCache; } + public Hash getDestHash() { + for (Hash h : _sessions.keySet()) { + return h; + } + return null; + } + + /** + * Return the hash for the given ID + * @return hash or null if unknown + * @since 0.9.19 + */ + public Hash getDestHash(SessionId id) { + for (Map.Entry e : _sessions.entrySet()) { + if (id.equals(e.getValue().sessionId)) + return e.getKey(); + } + return null; + } + + /** + * Return the dest for the given ID + * @return dest or null if unknown + * @since 0.9.19 + */ + public Destination getDestination(SessionId id) { + for (SessionParams sp : _sessions.values()) { + if (id.equals(sp.sessionId)) + return sp.dest; + } + return null; + } /** - * @return current client's sessionId or null if not yet set + * Subsession aware. + * + * @param h the local target + * @return current client's sessionId or null if not yet set or not a valid hash + * @since 0.9.19 */ - SessionId getSessionId() { return _sessionId; } + SessionId getSessionId(Hash h) { + SessionParams sp = _sessions.get(h); + if (sp == null) + return null; + return sp.sessionId; + } + + /** + * Subsession aware. + * + * @return all current client's sessionIds, non-null + * @since 0.9.19 + */ + List getSessionIds() { + List rv = new ArrayList(_sessions.size()); + for (SessionParams sp : _sessions.values()) { + SessionId id = sp.sessionId; + if (id != null) + rv.add(id); + } + return rv; + } + + /** + * Subsession aware. + * + * @return all current client's destinations, non-null + * @since 0.9.19 + */ + List getDestinations() { + List rv = new ArrayList(_sessions.size()); + for (SessionParams sp : _sessions.values()) { + rv.add(sp.dest); + } + return rv; + } /** * To be called only by ClientManager. * + * @param hash for the session * @throws IllegalStateException if already set + * @since 0.9.19 added hash param */ - void setSessionId(SessionId id) { - if (_sessionId != null) + void setSessionId(Hash hash, SessionId id) { + if (hash == null) throw new IllegalStateException(); - _sessionId = id; + SessionParams sp = _sessions.get(hash); + if (sp == null || sp.sessionId != null) + throw new IllegalStateException(); + sp.sessionId = id; + } + + /** + * Kill the session. Caller must kill runner if none left. + * + * @since 0.9.19 + */ + void removeSession(SessionId id) { + boolean isPrimary = false; + for (Iterator iter = _sessions.values().iterator(); iter.hasNext(); ) { + SessionParams sp = iter.next(); + if (id.equals(sp.sessionId)) { + if (_log.shouldLog(Log.INFO)) + _log.info("Destroying client session " + id); + iter.remove(); + // Tell client manger + _manager.unregisterSession(id, sp.dest); + LeaseSet ls = sp.currentLeaseSet; + if (ls != null) + _context.netDb().unpublish(ls); + isPrimary = sp.isPrimary; + } + } + if (isPrimary) { + // kill all the others also + for (SessionParams sp : _sessions.values()) { + _manager.unregisterSession(id, sp.dest); + LeaseSet ls = sp.currentLeaseSet; + if (ls != null) + _context.netDb().unpublish(ls); + } + } } - /** data for the current leaseRequest, or null if there is no active leaseSet request */ - LeaseRequestState getLeaseRequest() { return _leaseRequest; } + /** + * Data for the current leaseRequest, or null if there is no active leaseSet request. + * Not subsession aware. Returns primary ID only. + * @since 0.9.19 added hash param + */ + LeaseRequestState getLeaseRequest(Hash h) { + SessionParams sp = _sessions.get(h); + if (sp == null) + return null; + return sp.leaseRequest; + } /** @param req non-null */ public void failLeaseRequest(LeaseRequestState req) { boolean disconnect = false; + Hash h = req.getRequested().getDestination().calculateHash(); + SessionParams sp = _sessions.get(h); + if (sp == null) + return; synchronized (this) { - if (_leaseRequest == req) { - _leaseRequest = null; + if (sp.leaseRequest == req) { + sp.leaseRequest = null; disconnect = ++_consecutiveLeaseRequestFails > MAX_LEASE_FAILS; } } @@ -289,19 +487,34 @@ class ClientConnectionRunner { * @return SessionStatusMessage return code, 1 for success, != 1 for failure */ public int sessionEstablished(SessionConfig config) { - _destHashCache = config.getDestination().calculateHash(); + Destination dest = config.getDestination(); + Hash destHash = dest.calculateHash(); if (_log.shouldLog(Log.DEBUG)) - _log.debug("SessionEstablished called for destination " + _destHashCache.toBase64()); - _config = config; + _log.debug("SessionEstablished called for destination " + destHash); + if (_sessions.size() > MAX_SESSIONS) + return SessionStatusMessage.STATUS_REFUSED; + boolean isPrimary = _sessions.isEmpty(); + if (!isPrimary) { + // all encryption keys must be the same + for (SessionParams sp : _sessions.values()) { + if (!dest.getPublicKey().equals(sp.dest.getPublicKey())) + return SessionStatusMessage.STATUS_INVALID; + } + } + SessionParams sp = new SessionParams(dest, isPrimary); + sp.config = config; + SessionParams old = _sessions.putIfAbsent(destHash, sp); + if (old != null) + return SessionStatusMessage.STATUS_INVALID; // We process a few options here, but most are handled by the tunnel manager. // The ones here can't be changed later. Properties opts = config.getOptions(); - if (opts != null) { + if (isPrimary && opts != null) { _dontSendMSM = "none".equals(opts.getProperty(I2PClient.PROP_RELIABILITY, "").toLowerCase(Locale.US)); _dontSendMSMOnReceive = Boolean.parseBoolean(opts.getProperty(I2PClient.PROP_FAST_RECEIVE)); } // per-destination session key manager to prevent rather easy correlation - if (_sessionKeyManager == null) { + if (isPrimary && _sessionKeyManager == null) { int tags = TransientSessionKeyManager.DEFAULT_TAGS; int thresh = TransientSessionKeyManager.LOW_THRESHOLD; if (opts != null) { @@ -315,10 +528,8 @@ class ClientConnectionRunner { } } _sessionKeyManager = new TransientSessionKeyManager(_context, tags, thresh); - } else { - _log.error("SessionEstablished called for twice for destination " + _destHashCache.toBase64().substring(0,4)); } - return _manager.destinationEstablished(this); + return _manager.destinationEstablished(this, dest); } /** @@ -329,14 +540,21 @@ class ClientConnectionRunner { * * Do not use for status = STATUS_SEND_ACCEPTED; use ackSendMessage() for that. * + * @param dest the client * @param id the router's ID for this message * @param messageNonce the client's ID for this message * @param status see I2CP MessageStatusMessage for success/failure codes */ - void updateMessageDeliveryStatus(MessageId id, long messageNonce, int status) { + void updateMessageDeliveryStatus(Destination dest, MessageId id, long messageNonce, int status) { if (_dead || messageNonce <= 0) return; - _context.jobQueue().addJob(new MessageDeliveryStatusUpdate(id, messageNonce, status)); + SessionParams sp = _sessions.get(dest.calculateHash()); + if (sp == null) + return; + SessionId sid = sp.sessionId; + if (sid == null) + return; // sid = new SessionId(foo) ??? + _context.jobQueue().addJob(new MessageDeliveryStatusUpdate(sid, id, messageNonce, status)); } /** @@ -344,19 +562,23 @@ class ClientConnectionRunner { * updated. This takes care of all the LeaseRequestState stuff (including firing any jobs) */ void leaseSetCreated(LeaseSet ls) { - LeaseRequestState state = null; + Hash h = ls.getDestination().calculateHash(); + SessionParams sp = _sessions.get(h); + if (sp == null) + return; + LeaseRequestState state; synchronized (this) { - state = _leaseRequest; + state = sp.leaseRequest; if (state == null) { if (_log.shouldLog(Log.WARN)) _log.warn("LeaseRequest is null and we've received a new lease?! perhaps this is odd... " + ls); return; } else { state.setIsSuccessful(true); - _currentLeaseSet = ls; + setLeaseSet(ls); if (_log.shouldLog(Log.DEBUG)) _log.debug("LeaseSet created fully: " + state + " / " + ls); - _leaseRequest = null; + sp.leaseRequest = null; _consecutiveLeaseRequestFails = 0; } } @@ -425,12 +647,12 @@ class ClientConnectionRunner { if (_log.shouldLog(Log.DEBUG)) _log.debug("** Receiving message " + id.getMessageId() + " with payload of size " - + payload.getSize() + " for session " + _sessionId.getSessionId()); + + payload.getSize() + " for session " + message.getSessionId()); //long beforeDistribute = _context.clock().now(); // the following blocks as described above - SessionConfig cfg = _config; - if (cfg != null) - _manager.distributeMessage(cfg.getDestination(), dest, payload, + Destination fromDest = getDestination(message.getSessionId()); + if (fromDest != null) + _manager.distributeMessage(fromDest, dest, payload, id, message.getNonce(), expiration, flags); // else log error? //long timeToDistribute = _context.clock().now() - beforeDistribute; @@ -450,11 +672,9 @@ class ClientConnectionRunner { * @param id OUR id for the message * @param nonce HIS id for the message */ - void ackSendMessage(MessageId id, long nonce) { + void ackSendMessage(SessionId sid, MessageId id, long nonce) { if (_dontSendMSM || nonce == 0) return; - SessionId sid = _sessionId; - if (sid == null) return; if (_log.shouldLog(Log.DEBUG)) _log.debug("Acking message send [accepted]" + id + " / " + nonce + " for sessionId " + sid); @@ -476,6 +696,7 @@ class ClientConnectionRunner { /** * Asynchronously deliver the message to the current runner * + * @param fromDest generally null when from remote, non-null if from local */ void receiveMessage(Destination toDest, Destination fromDest, Payload payload) { if (_dead) return; @@ -489,9 +710,9 @@ class ClientConnectionRunner { * Send async abuse message to the client * */ - public void reportAbuse(String reason, int severity) { + public void reportAbuse(Destination dest, String reason, int severity) { if (_dead) return; - _context.jobQueue().addJob(new ReportAbuseJob(_context, this, reason, severity)); + _context.jobQueue().addJob(new ReportAbuseJob(_context, this, dest, reason, severity)); } /** @@ -524,12 +745,16 @@ class ClientConnectionRunner { // so the comparison will always work. int leases = set.getLeaseCount(); // synch so _currentLeaseSet isn't changed out from under us + LeaseSet current = null; synchronized (this) { - if (_currentLeaseSet != null && _currentLeaseSet.getLeaseCount() == leases) { + Destination dest = set.getDestination(); + if (dest != null) + current = getLeaseSet(dest.calculateHash()); + if (current != null && current.getLeaseCount() == leases) { for (int i = 0; i < leases; i++) { - if (! _currentLeaseSet.getLease(i).getTunnelId().equals(set.getLease(i).getTunnelId())) + if (! current.getLease(i).getTunnelId().equals(set.getLease(i).getTunnelId())) break; - if (! _currentLeaseSet.getLease(i).getGateway().equals(set.getLease(i).getGateway())) + if (! current.getLease(i).getGateway().equals(set.getLease(i).getGateway())) break; if (i == leases - 1) { if (_log.shouldLog(Log.INFO)) @@ -542,10 +767,14 @@ class ClientConnectionRunner { } } if (_log.shouldLog(Log.INFO)) - _log.info("Current leaseSet " + _currentLeaseSet + "\nNew leaseSet " + set); - LeaseRequestState state = null; + _log.info("Current leaseSet " + current + "\nNew leaseSet " + set); + Hash h = set.getDestination().calculateHash(); + SessionParams sp = _sessions.get(h); + if (sp == null) + return; + LeaseRequestState state; synchronized (this) { - state = _leaseRequest; + state = sp.leaseRequest; if (state != null) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Already requesting " + state); @@ -562,7 +791,7 @@ class ClientConnectionRunner { // fire onCreated? return; // already requesting } else { - _leaseRequest = state = new LeaseRequestState(onCreateJob, onFailedJob, _context.clock().now() + expirationTime, set); + sp.leaseRequest = state = new LeaseRequestState(onCreateJob, onFailedJob, _context.clock().now() + expirationTime, set); if (_log.shouldLog(Log.DEBUG)) _log.debug("New request: " + state); } @@ -693,6 +922,7 @@ class ClientConnectionRunner { private static final int MAX_REQUEUE = 60; // 30 sec. private class MessageDeliveryStatusUpdate extends JobImpl { + private final SessionId _sessId; private final MessageId _messageId; private final long _messageNonce; private final int _status; @@ -706,8 +936,9 @@ class ClientConnectionRunner { * @param messageNonce the client's ID for this message * @param status see I2CP MessageStatusMessage for success/failure codes */ - public MessageDeliveryStatusUpdate(MessageId id, long messageNonce, int status) { + public MessageDeliveryStatusUpdate(SessionId sid, MessageId id, long messageNonce, int status) { super(ClientConnectionRunner.this._context); + _sessId = sid; _messageId = id; _messageNonce = messageNonce; _status = status; @@ -723,7 +954,7 @@ class ClientConnectionRunner { MessageStatusMessage msg = new MessageStatusMessage(); msg.setMessageId(_messageId.getMessageId()); - msg.setSessionId(_sessionId.getSessionId()); + msg.setSessionId(_sessId.getSessionId()); // has to be >= 0, it is initialized to -1 msg.setNonce(_messageNonce); msg.setSize(0); @@ -734,12 +965,12 @@ class ClientConnectionRunner { // bug requeueing forever? failsafe _log.error("Abandon update for message " + _messageId + " to " + MessageStatusMessage.getStatusString(msg.getStatus()) - + " for session " + _sessionId.getSessionId()); + + " for " + _sessId); } else { if (_log.shouldLog(Log.WARN)) _log.warn("Almost send an update for message " + _messageId + " to " + MessageStatusMessage.getStatusString(msg.getStatus()) - + " for session " + _sessionId.getSessionId() + + " for " + _sessId + " before they knew the messageId! delaying .5s"); _lastTried = _context.clock().now(); requeue(REQUEUE_DELAY); @@ -774,14 +1005,14 @@ class ClientConnectionRunner { if (_log.shouldLog(Log.DEBUG)) _log.info("Updating message status for message " + _messageId + " to " + MessageStatusMessage.getStatusString(msg.getStatus()) - + " for session " + _sessionId.getSessionId() + + " for " + _sessId + " (with nonce=2), retrying after " + (_context.clock().now() - _lastTried)); } else { if (_log.shouldLog(Log.DEBUG)) _log.debug("Updating message status for message " + _messageId + " to " + MessageStatusMessage.getStatusString(msg.getStatus()) - + " for session " + _sessionId.getSessionId() + " (with nonce=2)"); + + " for " + _sessId + " (with nonce=2)"); } try { diff --git a/router/java/src/net/i2p/router/client/ClientManager.java b/router/java/src/net/i2p/router/client/ClientManager.java index 9a8850c9b..a875f0f32 100644 --- a/router/java/src/net/i2p/router/client/ClientManager.java +++ b/router/java/src/net/i2p/router/client/ClientManager.java @@ -51,9 +51,11 @@ class ClientManager { protected final List _listeners; // Destination --> ClientConnectionRunner // Locked for adds/removes but not lookups + // If a runner has multiple sessions it will be in here multiple times, one for each dest private final Map _runners; // Same as what's in _runners, but for fast lookup by Hash // Locked for adds/removes but not lookups + // If a runner has multiple sessions it will be in here multiple times, one for each dest private final Map _runnersByHash; // ClientConnectionRunner for clients w/out a Dest yet private final Set _pendingRunners; @@ -203,24 +205,44 @@ class ClientManager { } } + /** + * Remove all sessions for this runner. + */ public void unregisterConnection(ClientConnectionRunner runner) { - _log.warn("Unregistering (dropping) a client connection"); + if (_log.shouldLog(Log.WARN)) + _log.warn("Unregistering (dropping) a client connection"); synchronized (_pendingRunners) { _pendingRunners.remove(runner); } - if ( (runner.getConfig() != null) && (runner.getConfig().getDestination() != null) ) { - // after connection establishment - Destination dest = runner.getConfig().getDestination(); - synchronized (_runners) { - SessionId id = runner.getSessionId(); - if (id != null) - _runnerSessionIds.remove(id); + + List ids = runner.getSessionIds(); + List dests = runner.getDestinations(); + synchronized (_runners) { + for (SessionId id : ids) { + _runnerSessionIds.remove(id); + } + for (Destination dest : dests) { _runners.remove(dest); _runnersByHash.remove(dest.calculateHash()); } } } + /** + * Remove only the following session. Does not remove the runner if it has more. + * + * @since 0.9.19 + */ + public void unregisterSession(SessionId id, Destination dest) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Unregistering client session " + id); + synchronized (_runners) { + _runnerSessionIds.remove(id); + _runners.remove(dest); + _runnersByHash.remove(dest.calculateHash()); + } + } + /** * Add to the clients list. Check for a dup destination. * Side effect: Sets the session ID of the runner. @@ -228,8 +250,7 @@ class ClientManager { * * @return SessionStatusMessage return code, 1 for success, != 1 for failure */ - public int destinationEstablished(ClientConnectionRunner runner) { - Destination dest = runner.getConfig().getDestination(); + public int destinationEstablished(ClientConnectionRunner runner, Destination dest) { if (_log.shouldLog(Log.DEBUG)) _log.debug("DestinationEstablished called for destination " + dest.calculateHash().toBase64()); @@ -244,9 +265,10 @@ class ClientManager { } else { SessionId id = locked_getNextSessionId(); if (id != null) { - runner.setSessionId(id); + Hash h = dest.calculateHash(); + runner.setSessionId(h, id); _runners.put(dest, runner); - _runnersByHash.put(dest.calculateHash(), runner); + _runnersByHash.put(h, runner); rv = SessionStatusMessage.STATUS_CREATED; } else { rv = SessionStatusMessage.STATUS_REFUSED; @@ -310,8 +332,11 @@ class ClientManager { // sender went away return; } - ClientMessage msg = new ClientMessage(toDest, payload, runner.getConfig(), - runner.getConfig().getDestination(), msgId, + SessionConfig config = runner.getConfig(fromDest.calculateHash()); + if (config == null) + return; + ClientMessage msg = new ClientMessage(toDest, payload, config, + fromDest, msgId, messageNonce, expiration, flags); _ctx.clientMessagePool().add(msg, true); } @@ -347,7 +372,7 @@ class ClientManager { public void runJob() { _to.receiveMessage(_toDest, _fromDest, _payload); if (_from != null) { - _from.updateMessageDeliveryStatus(_msgId, _messageNonce, MessageStatusMessage.STATUS_SEND_SUCCESS_LOCAL); + _from.updateMessageDeliveryStatus(_fromDest, _msgId, _messageNonce, MessageStatusMessage.STATUS_SEND_SUCCESS_LOCAL); } } } @@ -410,7 +435,9 @@ class ClientManager { if (destHash == null) return true; ClientConnectionRunner runner = getRunner(destHash); if (runner == null) return true; - return !Boolean.parseBoolean(runner.getConfig().getOptions().getProperty(ClientManagerFacade.PROP_CLIENT_ONLY)); + SessionConfig config = runner.getConfig(destHash); + if (config == null) return true; + return !Boolean.parseBoolean(config.getOptions().getProperty(ClientManagerFacade.PROP_CLIENT_ONLY)); } /** @@ -437,7 +464,7 @@ class ClientManager { public SessionConfig getClientSessionConfig(Destination dest) { ClientConnectionRunner runner = getRunner(dest); if (runner != null) - return runner.getConfig(); + return runner.getConfig(dest.calculateHash()); else return null; } @@ -475,7 +502,7 @@ class ClientManager { if (_log.shouldLog(Log.DEBUG)) _log.debug("Delivering status " + status + " to " + fromDest.calculateHash() + " for message " + id); - runner.updateMessageDeliveryStatus(id, messageNonce, status); + runner.updateMessageDeliveryStatus(fromDest, id, messageNonce, status); } else { if (_log.shouldLog(Log.WARN)) _log.warn("Cannot deliver status " + status + " to " @@ -499,7 +526,7 @@ class ClientManager { if (dest != null) { ClientConnectionRunner runner = getRunner(dest); if (runner != null) { - runner.reportAbuse(reason, severity); + runner.reportAbuse(dest, reason, severity); } } else { for (Destination d : _runners.keySet()) { @@ -577,21 +604,22 @@ class ClientManager { public void runJob() { ClientConnectionRunner runner; - if (_msg.getDestination() != null) - runner = getRunner(_msg.getDestination()); + Destination dest = _msg.getDestination(); + if (dest != null) + runner = getRunner(dest); else runner = getRunner(_msg.getDestinationHash()); if (runner != null) { //_ctx.statManager().addRateData("client.receiveMessageSize", // _msg.getPayload().getSize(), 0); - runner.receiveMessage(_msg.getDestination(), null, _msg.getPayload()); + runner.receiveMessage(dest, null, _msg.getPayload()); } else { // no client connection... // we should pool these somewhere... if (_log.shouldLog(Log.WARN)) _log.warn("Message received but we don't have a connection to " - + _msg.getDestination() + "/" + _msg.getDestinationHash() + + dest + "/" + _msg.getDestinationHash() + " currently. DROPPED"); } } diff --git a/router/java/src/net/i2p/router/client/ClientManagerFacadeImpl.java b/router/java/src/net/i2p/router/client/ClientManagerFacadeImpl.java index 6168ab794..d4e476ad7 100644 --- a/router/java/src/net/i2p/router/client/ClientManagerFacadeImpl.java +++ b/router/java/src/net/i2p/router/client/ClientManagerFacadeImpl.java @@ -90,7 +90,7 @@ public class ClientManagerFacadeImpl extends ClientManagerFacade implements Inte for (Destination dest : _manager.getRunnerDestinations()) { ClientConnectionRunner runner = _manager.getRunner(dest); if ( (runner == null) || (runner.getIsDead())) continue; - LeaseSet ls = runner.getLeaseSet(); + LeaseSet ls = runner.getLeaseSet(dest.calculateHash()); if (ls == null) continue; // still building long howLongAgo = _context.clock().now() - ls.getEarliestLeaseDate(); diff --git a/router/java/src/net/i2p/router/client/ClientMessageEventListener.java b/router/java/src/net/i2p/router/client/ClientMessageEventListener.java index 382552ce6..b3f1ad1ca 100644 --- a/router/java/src/net/i2p/router/client/ClientMessageEventListener.java +++ b/router/java/src/net/i2p/router/client/ClientMessageEventListener.java @@ -195,12 +195,13 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi */ private void handleCreateSession(CreateSessionMessage message) { SessionConfig in = message.getSessionConfig(); + Destination dest = in.getDestination(); if (in.verifySignature()) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Signature verified correctly on create session message"); } else { // For now, we do NOT send a SessionStatusMessage - see javadoc above - int itype = in.getDestination().getCertificate().getCertificateType(); + int itype = dest.getCertificate().getCertificateType(); SigType stype = SigType.getByCode(itype); if (stype == null || !stype.isAvailable()) { _log.error("Client requested unsupported signature type " + itype); @@ -217,7 +218,7 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi if (!checkAuth(inProps)) return; - SessionId id = _runner.getSessionId(); + SessionId id = _runner.getSessionId(dest.calculateHash()); if (id != null) { _runner.disconnectClient("Already have session " + id); return; @@ -226,11 +227,12 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi // Copy over the whole config structure so we don't later corrupt it on // the client side if we change settings or later get a // ReconfigureSessionMessage - SessionConfig cfg = new SessionConfig(in.getDestination()); + SessionConfig cfg = new SessionConfig(dest); cfg.setSignature(in.getSignature()); Properties props = new Properties(); props.putAll(in.getOptions()); cfg.setOptions(props); + boolean isPrimary = _runner.getSessionIds().isEmpty(); int status = _runner.sessionEstablished(cfg); if (status != SessionStatusMessage.STATUS_CREATED) { // For now, we do NOT send a SessionStatusMessage - see javadoc above @@ -246,11 +248,29 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi _runner.disconnectClient(msg); return; } - sendStatusMessage(status); + sendStatusMessage(id, status); if (_log.shouldLog(Log.INFO)) - _log.info("Session " + _runner.getSessionId() + " established for " + _runner.getDestHash()); - startCreateSessionJob(); + _log.info("Session " + id + " established for " + dest.calculateHash()); + if (isPrimary) { + startCreateSessionJob(cfg); + } else { + SessionConfig pcfg = _runner.getPrimaryConfig(); + if (pcfg != null) { + /////////// + // new tunnel name etc. + ClientTunnelSettings settings = new ClientTunnelSettings(dest.calculateHash()); + // all the primary options, then the overrides from the alias + props.putAll(pcfg.getOptions()); + props.putAll(props); + settings.readFromProperties(props); + boolean ok = _context.tunnelManager().addAlias(dest, settings, pcfg.getDestination()); + if (!ok) { + _log.error("Add alias failed"); + // send status message... + } + } + } } /** @@ -296,8 +316,8 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi * @since 0.9.8 * */ - protected void startCreateSessionJob() { - _context.jobQueue().addJob(new CreateSessionJob(_context, _runner)); + protected void startCreateSessionJob(SessionConfig config) { + _context.jobQueue().addJob(new CreateSessionJob(_context, config)); } /** @@ -311,7 +331,8 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi long beforeDistribute = _context.clock().now(); MessageId id = _runner.distributeMessage(message); long timeToDistribute = _context.clock().now() - beforeDistribute; - _runner.ackSendMessage(id, message.getNonce()); + // TODO validate session id + _runner.ackSendMessage(message.getSessionId(), id, message.getNonce()); _context.statManager().addRateData("client.distributeTime", timeToDistribute); if ( (timeToDistribute > 50) && (_log.shouldLog(Log.INFO)) ) _log.info("Took too long to distribute the message (which holds up the ack): " + timeToDistribute); @@ -328,7 +349,8 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi _log.debug("Handling recieve begin: id = " + message.getMessageId()); MessagePayloadMessage msg = new MessagePayloadMessage(); msg.setMessageId(message.getMessageId()); - msg.setSessionId(_runner.getSessionId().getSessionId()); + // TODO validate session id + msg.setSessionId(message.getSessionId()); Payload payload = _runner.getPayload(new MessageId(message.getMessageId())); if (payload == null) { if (_log.shouldLog(Log.WARN)) @@ -357,9 +379,18 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi } private void handleDestroySession(DestroySessionMessage message) { - if (_log.shouldLog(Log.INFO)) - _log.info("Destroying client session " + _runner.getSessionId()); - _runner.stopRunning(); + SessionId id = message.getSessionId(); + SessionConfig cfg = _runner.getConfig(id); + _runner.removeSession(id); + int left = _runner.getSessionIds().size(); + if (left <= 0) { + _runner.stopRunning(); + } else { + if (cfg != null) + _context.tunnelManager().removeAlias(cfg.getDestination()); + if (_log.shouldLog(Log.INFO)) + _log.info("Still " + left + " sessions left"); + } } /** override for testing */ @@ -370,7 +401,13 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi _runner.disconnectClient("Invalid CreateLeaseSetMessage"); return; } - Destination dest = _runner.getConfig().getDestination(); + SessionId id = message.getSessionId(); + SessionConfig config = _runner.getConfig(id); + if (config == null) { + _log.error("Unknown session in CLSM"); + return; + } + Destination dest = config.getDestination(); Destination ndest = message.getLeaseSet().getDestination(); if (!dest.equals(ndest)) { if (_log.shouldLog(Log.ERROR)) @@ -414,8 +451,7 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi return; } if (_log.shouldLog(Log.INFO)) - _log.info("New lease set granted for destination " - + _runner.getDestHash()); + _log.info("New lease set granted for destination " + dest); // leaseSetCreated takes care of all the LeaseRequestState stuff (including firing any jobs) _runner.leaseSetCreated(message.getLeaseSet()); @@ -423,6 +459,7 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi /** override for testing */ protected void handleDestLookup(DestLookupMessage message) { + // no session id in DLM _context.jobQueue().addJob(new LookupDestJob(_context, _runner, message.getHash(), _runner.getDestHash())); } @@ -432,10 +469,12 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi * @since 0.9.11 */ protected void handleHostLookup(HostLookupMessage message) { + Hash h = _runner.getDestHash(message.getSessionId()); + if (h == null) + return; // ok? _context.jobQueue().addJob(new LookupDestJob(_context, _runner, message.getReqID(), message.getTimeout(), message.getSessionId(), - message.getHash(), message.getHostname(), - _runner.getDestHash())); + message.getHash(), message.getHostname(), h)); } /** @@ -447,32 +486,37 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi * ClientConnectionRunner.sessionEstablished(). Those can't be changed later. */ private void handleReconfigureSession(ReconfigureSessionMessage message) { + SessionId id = message.getSessionId(); + SessionConfig config = _runner.getConfig(id); + if (config == null) { + _log.error("Unknown session"); + sendStatusMessage(id, SessionStatusMessage.STATUS_INVALID); + //_runner.stopRunning(); // ok? + return; + } if (_log.shouldLog(Log.INFO)) - _log.info("Updating options - old: " + _runner.getConfig() + " new: " + message.getSessionConfig()); - if (!message.getSessionConfig().getDestination().equals(_runner.getConfig().getDestination())) { + _log.info("Updating options - old: " + _runner.getConfig(id) + " new: " + message.getSessionConfig()); + if (!message.getSessionConfig().getDestination().equals(config.getDestination())) { _log.error("Dest mismatch"); - sendStatusMessage(SessionStatusMessage.STATUS_INVALID); + sendStatusMessage(id, SessionStatusMessage.STATUS_INVALID); _runner.stopRunning(); return; } - _runner.getConfig().getOptions().putAll(message.getSessionConfig().getOptions()); - Hash dest = _runner.getDestHash(); + Hash dest = config.getDestination().calculateHash(); + config.getOptions().putAll(message.getSessionConfig().getOptions()); ClientTunnelSettings settings = new ClientTunnelSettings(dest); Properties props = new Properties(); - props.putAll(_runner.getConfig().getOptions()); + props.putAll(config.getOptions()); settings.readFromProperties(props); _context.tunnelManager().setInboundSettings(dest, settings.getInboundSettings()); _context.tunnelManager().setOutboundSettings(dest, settings.getOutboundSettings()); - sendStatusMessage(SessionStatusMessage.STATUS_UPDATED); + sendStatusMessage(id, SessionStatusMessage.STATUS_UPDATED); } - private void sendStatusMessage(int status) { + private void sendStatusMessage(SessionId id, int status) { SessionStatusMessage msg = new SessionStatusMessage(); - SessionId id = _runner.getSessionId(); - if (id == null) - id = ClientManager.UNKNOWN_SESSION_ID; msg.setSessionId(id); msg.setStatus(status); try { diff --git a/router/java/src/net/i2p/router/client/CreateSessionJob.java b/router/java/src/net/i2p/router/client/CreateSessionJob.java index af01a496c..558dba8e7 100644 --- a/router/java/src/net/i2p/router/client/CreateSessionJob.java +++ b/router/java/src/net/i2p/router/client/CreateSessionJob.java @@ -26,25 +26,20 @@ import net.i2p.util.Log; */ class CreateSessionJob extends JobImpl { private final Log _log; - private final ClientConnectionRunner _runner; + private final SessionConfig _config; - public CreateSessionJob(RouterContext context, ClientConnectionRunner runner) { + public CreateSessionJob(RouterContext context, SessionConfig config) { super(context); _log = context.logManager().getLog(CreateSessionJob.class); - _runner = runner; + _config = config; if (_log.shouldLog(Log.DEBUG)) - _log.debug("CreateSessionJob for runner " + _runner + " / config: " + _runner.getConfig()); + _log.debug("CreateSessionJob for config: " + config); } public String getName() { return "Request tunnels for a new client"; } + public void runJob() { - SessionConfig cfg = _runner.getConfig(); - if ( (cfg == null) || (cfg.getDestination() == null) ) { - if (_log.shouldLog(Log.ERROR)) - _log.error("No session config on runner " + _runner); - return; - } - Hash dest = cfg.getDestination().calculateHash(); + Hash dest = _config.getDestination().calculateHash(); if (_log.shouldLog(Log.INFO)) _log.info("Requesting lease set for destination " + dest); ClientTunnelSettings settings = new ClientTunnelSettings(dest); @@ -61,10 +56,10 @@ class CreateSessionJob extends JobImpl { // XXX props.putAll(Router.getInstance().getConfigMap()); // override them by the client's settings - props.putAll(cfg.getOptions()); + props.putAll(_config.getOptions()); // and load 'em up (using anything not yet set as the software defaults) settings.readFromProperties(props); - getContext().tunnelManager().buildTunnels(cfg.getDestination(), settings); + getContext().tunnelManager().buildTunnels(_config.getDestination(), settings); } } diff --git a/router/java/src/net/i2p/router/client/LeaseRequestState.java b/router/java/src/net/i2p/router/client/LeaseRequestState.java index 67968df3b..1fb50620c 100644 --- a/router/java/src/net/i2p/router/client/LeaseRequestState.java +++ b/router/java/src/net/i2p/router/client/LeaseRequestState.java @@ -40,6 +40,7 @@ class LeaseRequestState { /** created lease set from client - FIXME always null */ public LeaseSet getGranted() { return _grantedLeaseSet; } + /** FIXME unused - why? */ public void setGranted(LeaseSet ls) { _grantedLeaseSet = ls; } diff --git a/router/java/src/net/i2p/router/client/MessageReceivedJob.java b/router/java/src/net/i2p/router/client/MessageReceivedJob.java index 0ee148282..7ce5f5cba 100644 --- a/router/java/src/net/i2p/router/client/MessageReceivedJob.java +++ b/router/java/src/net/i2p/router/client/MessageReceivedJob.java @@ -14,6 +14,7 @@ import net.i2p.data.i2cp.I2CPMessageException; import net.i2p.data.i2cp.MessageId; import net.i2p.data.i2cp.MessagePayloadMessage; import net.i2p.data.i2cp.MessageStatusMessage; +import net.i2p.data.i2cp.SessionId; import net.i2p.router.JobImpl; import net.i2p.router.RouterContext; import net.i2p.util.Log; @@ -26,14 +27,20 @@ import net.i2p.util.Log; class MessageReceivedJob extends JobImpl { private final Log _log; private final ClientConnectionRunner _runner; + private final Destination _toDest; private final Payload _payload; private final boolean _sendDirect; + /** + * @param toDest requred to pick session + * @param fromDest ignored, generally null + */ public MessageReceivedJob(RouterContext ctx, ClientConnectionRunner runner, Destination toDest, Destination fromDest, Payload payload, boolean sendDirect) { super(ctx); _log = ctx.logManager().getLog(MessageReceivedJob.class); _runner = runner; + _toDest = toDest; _payload = payload; _sendDirect = sendDirect; } @@ -43,8 +50,8 @@ class MessageReceivedJob extends JobImpl { public void runJob() { if (_runner.isDead()) return; MessageId id = null; - long nextID = _runner.getNextMessageId(); try { + long nextID = _runner.getNextMessageId(); if (_sendDirect) { sendMessage(nextID); } else { @@ -55,7 +62,7 @@ class MessageReceivedJob extends JobImpl { } catch (I2CPMessageException ime) { if (_log.shouldLog(Log.WARN)) _log.warn("Error writing out the message", ime); - if (!_sendDirect) + if (id != null && !_sendDirect) _runner.removePayload(id); } } @@ -69,7 +76,13 @@ class MessageReceivedJob extends JobImpl { // + " (with nonce=1)", new Exception("available")); MessageStatusMessage msg = new MessageStatusMessage(); msg.setMessageId(id.getMessageId()); - msg.setSessionId(_runner.getSessionId().getSessionId()); + SessionId sid = _runner.getSessionId(_toDest.calculateHash()); + if (sid == null) { + if (_log.shouldLog(Log.WARN)) + _log.warn("No session for " + _toDest.calculateHash()); + return; + } + msg.setSessionId(sid.getSessionId()); msg.setSize(size); // has to be >= 0, it is initialized to -1 msg.setNonce(1); @@ -84,7 +97,13 @@ class MessageReceivedJob extends JobImpl { private void sendMessage(long id) throws I2CPMessageException { MessagePayloadMessage msg = new MessagePayloadMessage(); msg.setMessageId(id); - msg.setSessionId(_runner.getSessionId().getSessionId()); + SessionId sid = _runner.getSessionId(_toDest.calculateHash()); + if (sid == null) { + if (_log.shouldLog(Log.WARN)) + _log.warn("No session for " + _toDest.calculateHash()); + return; + } + msg.setSessionId(sid.getSessionId()); msg.setPayload(_payload); _runner.doSend(msg); } diff --git a/router/java/src/net/i2p/router/client/ReportAbuseJob.java b/router/java/src/net/i2p/router/client/ReportAbuseJob.java index 8dd36ac3c..c24e5e9ba 100644 --- a/router/java/src/net/i2p/router/client/ReportAbuseJob.java +++ b/router/java/src/net/i2p/router/client/ReportAbuseJob.java @@ -8,10 +8,12 @@ package net.i2p.router.client; * */ +import net.i2p.data.Destination; import net.i2p.data.i2cp.AbuseReason; import net.i2p.data.i2cp.AbuseSeverity; import net.i2p.data.i2cp.I2CPMessageException; import net.i2p.data.i2cp.ReportAbuseMessage; +import net.i2p.data.i2cp.SessionId; import net.i2p.router.JobImpl; import net.i2p.router.RouterContext; import net.i2p.util.Log; @@ -23,17 +25,22 @@ import net.i2p.util.Log; class ReportAbuseJob extends JobImpl { private final Log _log; private final ClientConnectionRunner _runner; + private final Destination _dest; private final String _reason; private final int _severity; - public ReportAbuseJob(RouterContext context, ClientConnectionRunner runner, String reason, int severity) { + + public ReportAbuseJob(RouterContext context, ClientConnectionRunner runner, + Destination dest, String reason, int severity) { super(context); _log = context.logManager().getLog(ReportAbuseJob.class); _runner = runner; + _dest = dest; _reason = reason; _severity = severity; } public String getName() { return "Report Abuse"; } + public void runJob() { if (_runner.isDead()) return; AbuseReason res = new AbuseReason(); @@ -41,9 +48,11 @@ class ReportAbuseJob extends JobImpl { AbuseSeverity sev = new AbuseSeverity(); sev.setSeverity(_severity); ReportAbuseMessage msg = new ReportAbuseMessage(); - msg.setMessageId(null); msg.setReason(res); - msg.setSessionId(_runner.getSessionId()); + SessionId id = _runner.getSessionId(_dest.calculateHash()); + if (id == null) + return; + msg.setSessionId(id); msg.setSeverity(sev); try { _runner.doSend(msg); diff --git a/router/java/src/net/i2p/router/client/RequestLeaseSetJob.java b/router/java/src/net/i2p/router/client/RequestLeaseSetJob.java index 9b07978f6..f5ec343a0 100644 --- a/router/java/src/net/i2p/router/client/RequestLeaseSetJob.java +++ b/router/java/src/net/i2p/router/client/RequestLeaseSetJob.java @@ -16,6 +16,7 @@ import net.i2p.data.i2cp.I2CPMessage; import net.i2p.data.i2cp.I2CPMessageException; import net.i2p.data.i2cp.RequestLeaseSetMessage; import net.i2p.data.i2cp.RequestVariableLeaseSetMessage; +import net.i2p.data.i2cp.SessionId; import net.i2p.router.JobImpl; import net.i2p.router.RouterContext; import net.i2p.util.Log; @@ -63,13 +64,16 @@ class RequestLeaseSetJob extends JobImpl { // _log.debug("Adding fudge " + fudge); endTime += fudge; + SessionId id = _runner.getSessionId(_requestState.getRequested().getDestination().calculateHash()); + if (id == null) + return; I2CPMessage msg; if (getContext().getProperty(PROP_VARIABLE, DFLT_VARIABLE) && (_runner instanceof QueuedClientConnectionRunner || RequestVariableLeaseSetMessage.isSupported(_runner.getClientVersion()))) { // new style - leases will have individual expirations RequestVariableLeaseSetMessage rmsg = new RequestVariableLeaseSetMessage(); - rmsg.setSessionId(_runner.getSessionId()); + rmsg.setSessionId(id); for (int i = 0; i < requested.getLeaseCount(); i++) { Lease lease = requested.getLease(i); if (lease.getEndDate().getTime() < endTime) { @@ -90,7 +94,7 @@ class RequestLeaseSetJob extends JobImpl { RequestLeaseSetMessage rmsg = new RequestLeaseSetMessage(); Date end = new Date(endTime); rmsg.setEndDate(end); - rmsg.setSessionId(_runner.getSessionId()); + rmsg.setSessionId(id); for (int i = 0; i < requested.getLeaseCount(); i++) { Lease lease = requested.getLease(i); rmsg.addEndpoint(lease.getGateway(), @@ -144,8 +148,7 @@ class RequestLeaseSetJob extends JobImpl { CheckLeaseRequestStatus.this.getContext().statManager().addRateData("client.requestLeaseSetTimeout", 1); if (_log.shouldLog(Log.ERROR)) { long waited = System.currentTimeMillis() - _start; - _log.error("Failed to receive a leaseSet in the time allotted (" + waited + "): " + _requestState + " for " - + _runner.getConfig().getDestination().calculateHash().toBase64()); + _log.error("Failed to receive a leaseSet in the time allotted (" + waited + "): " + _requestState); } if (_requestState.getOnFailed() != null) RequestLeaseSetJob.this.getContext().jobQueue().addJob(_requestState.getOnFailed()); diff --git a/router/java/src/net/i2p/router/dummy/DummyTunnelManagerFacade.java b/router/java/src/net/i2p/router/dummy/DummyTunnelManagerFacade.java index 5de3db1a3..96dceea18 100644 --- a/router/java/src/net/i2p/router/dummy/DummyTunnelManagerFacade.java +++ b/router/java/src/net/i2p/router/dummy/DummyTunnelManagerFacade.java @@ -50,6 +50,8 @@ public class DummyTunnelManagerFacade implements TunnelManagerFacade { public int getOutboundClientTunnelCount(Hash destination) { return 0; } public long getLastParticipatingExpiration() { return -1; } public void buildTunnels(Destination client, ClientTunnelSettings settings) {} + public boolean addAlias(Destination dest, ClientTunnelSettings settings, Destination existingClient) { return false; } + public void removeAlias(Destination dest) {} public TunnelPoolSettings getInboundSettings() { return null; } public TunnelPoolSettings getOutboundSettings() { return null; } public TunnelPoolSettings getInboundSettings(Hash client) { return null; } diff --git a/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java b/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java index 258e01046..4159014f2 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java +++ b/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java @@ -8,6 +8,7 @@ import java.util.Date; import java.util.Iterator; import java.util.List; import java.util.Properties; +import java.util.Set; import java.util.TreeSet; import net.i2p.data.Hash; @@ -609,6 +610,12 @@ public class TunnelPool { } if (ls != null) { _context.clientManager().requestLeaseSet(_settings.getDestination(), ls); + Set aliases = _settings.getAliases(); + if (aliases != null && !aliases.isEmpty()) { + for (Hash h : aliases) { + _context.clientManager().requestLeaseSet(h, ls); + } + } } } } diff --git a/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java b/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java index d311d46a9..61fbb1ddd 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java +++ b/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java @@ -97,7 +97,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { ctx.statManager().createRateStat("tunnel.testAborted", "Tunnel test could not occur, since there weren't any tunnels to test with", "Tunnels", RATES); } - + /** * Pick a random inbound exploratory tunnel. * Warning - selectInboundExploratoryTunnel(Hash) is preferred. @@ -113,7 +113,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { } return info; } - + /** * Pick a random inbound tunnel from the given destination's pool. * Warning - selectOutboundTunnel(Hash, Hash) is preferred. @@ -132,7 +132,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { " but there isn't a pool?"); return null; } - + /** * Pick a random outbound exploratory tunnel. * Warning - selectOutboundExploratoryTunnel(Hash) is preferred. @@ -148,7 +148,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { } return info; } - + /** * Pick a random outbound tunnel from the given destination's pool. * Warning - selectOutboundTunnel(Hash, Hash) is preferred. @@ -164,7 +164,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { } return null; } - + /** * Pick the inbound exploratory tunnel with the gateway closest to the given hash. * By using this instead of the random selectTunnel(), @@ -184,7 +184,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { } return info; } - + /** * Pick the inbound tunnel with the gateway closest to the given hash * from the given destination's pool. @@ -208,7 +208,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { " but there isn't a pool?"); return null; } - + /** * Pick the outbound exploratory tunnel with the endpoint closest to the given hash. * By using this instead of the random selectTunnel(), @@ -228,7 +228,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { } return info; } - + /** * Pick the outbound tunnel with the endpoint closest to the given hash * from the given destination's pool. @@ -249,7 +249,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { } return null; } - + /** * Expensive (iterates through all tunnels of all pools) and unnecessary. * @deprecated unused @@ -267,7 +267,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { if (info != null) return info; return null; } - + /** @return number of inbound exploratory tunnels */ public int getFreeTunnelCount() { return _inboundExploratory.size(); @@ -304,10 +304,11 @@ public class TunnelPoolManager implements TunnelManagerFacade { return pool.getTunnelCount(); return 0; } - + public int getParticipatingCount() { return _context.tunnelDispatcher().getParticipatingCount(); } + public long getLastParticipatingExpiration() { return _context.tunnelDispatcher().getLastParticipatingExpiration(); } - + /** * @return (number of part. tunnels) / (estimated total number of hops in our expl.+client tunnels) * 100 max. @@ -330,7 +331,6 @@ public class TunnelPoolManager implements TunnelManagerFacade { return Math.min(part / (double) count, 100d); } - public boolean isValidTunnel(Hash client, TunnelInfo tunnel) { if (tunnel.getExpiration() < _context.clock().now()) return false; @@ -386,17 +386,18 @@ public class TunnelPoolManager implements TunnelManagerFacade { pool.setSettings(settings); } } - + public synchronized void restart() { _handler.restart(); _executor.restart(); shutdownExploratory(); startup(); } - + /** * Used only at session startup. * Do not use to change settings. + * Do not use for aliased destinations; use addAlias(). */ public void buildTunnels(Destination client, ClientTunnelSettings settings) { Hash dest = client.calculateHash(); @@ -434,8 +435,68 @@ public class TunnelPoolManager implements TunnelManagerFacade { else outbound.startup(); } - - + + /** + * Add another destination to the same tunnels. + * Must have same encryption key an a different signing key. + * @throws IllegalArgumentException if not + * @return success + * @since 0.9.19 + */ + public boolean addAlias(Destination dest, ClientTunnelSettings settings, Destination existingClient) { + if (dest.getSigningPublicKey().equals(existingClient.getSigningPublicKey())) + throw new IllegalArgumentException("signing key must differ"); + if (!dest.getPublicKey().equals(existingClient.getPublicKey())) + throw new IllegalArgumentException("encryption key mismatch"); + Hash h = dest.calculateHash(); + Hash e = existingClient.calculateHash(); + synchronized(this) { + TunnelPool inbound = _clientInboundPools.get(e); + TunnelPool outbound = _clientOutboundPools.get(e); +/////// gah same tunnel pool or different? + if (inbound == null || outbound == null) + return false; + _clientInboundPools.put(h, inbound); + _clientOutboundPools.put(h, outbound); + } + return true; + } + + /** + * Remove a destination for the same tunnels as another. + * @since 0.9.19 + */ + public void removeAlias(Destination dest) { + Hash h = dest.calculateHash(); + synchronized(this) { + TunnelPool inbound = _clientInboundPools.remove(h); + if (inbound != null) { + Hash p = inbound.getSettings().getAliasOf(); + if (p != null) { + TunnelPool pri = _clientInboundPools.get(p); + if (pri != null) { + Set aliases = pri.getSettings().getAliases(); + if (aliases != null) + aliases.remove(h); + } + } + } + TunnelPool outbound = _clientOutboundPools.remove(h); + if (outbound != null) { + Hash p = inbound.getSettings().getAliasOf(); + if (p != null) { + TunnelPool pri = _clientInboundPools.get(p); + if (pri != null) { + Set aliases = pri.getSettings().getAliases(); + if (aliases != null) + aliases.remove(h); + } + } + } + // TODO if primary already vanished... + } + } + private static class DelayedStartup implements SimpleTimer.TimedEvent { private final TunnelPool pool; @@ -469,7 +530,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { if (outbound != null) outbound.shutdown(); } - + /** queue a recurring test job if appropriate */ void buildComplete(PooledTunnelCreatorConfig cfg) { if (cfg.getLength() > 1 && @@ -518,7 +579,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { _context.jobQueue().addJob(new BootstrapPool(_context, _inboundExploratory)); _context.jobQueue().addJob(new BootstrapPool(_context, _outboundExploratory)); } - + private static class BootstrapPool extends JobImpl { private TunnelPool _pool; public BootstrapPool(RouterContext ctx, TunnelPool pool) { @@ -531,7 +592,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { _pool.buildFallback(); } } - + /** * Cannot be restarted */ @@ -546,7 +607,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { _inboundExploratory.shutdown(); _outboundExploratory.shutdown(); } - + /** list of TunnelPool instances currently in play */ public void listPools(List out) { out.addAll(_clientInboundPools.values());