From 1293dccf3504b0e83f406f42f55a9fc6f7990a32 Mon Sep 17 00:00:00 2001 From: zzz Date: Wed, 18 Mar 2015 12:59:50 +0000 Subject: [PATCH] I2CP Multisession support and multiple destinations in one tunnel pool. Work in progress. Router-side I2CP mostly done. Client-side I2CP mostly done but undecided on how to handle listeners. Streaming stubbed out but may be wrong, may need multiple socket managers, not clear how to proceed. I2PTunnel not started. Blacklist of DSA-only dests not started. Router leaseset publishing not correct. Not clear whether to have additional tunnel pools with flags, or put the tunnel pools into the client hashmap twice. Client config contains destination, may need to move that to tunnel pool. --- .../client/streaming/I2PSocketManager.java | 23 ++ .../streaming/impl/I2PSocketManagerFull.java | 38 ++ core/java/src/net/i2p/client/I2PSession.java | 25 +- .../src/net/i2p/client/I2PSessionImpl.java | 212 +++++++++- .../src/net/i2p/client/I2PSessionImpl2.java | 32 +- .../net/i2p/client/I2PSessionMuxedImpl.java | 18 + core/java/src/net/i2p/client/SubSession.java | 299 ++++++++++++++ .../i2p/data/i2cp/CreateLeaseSetMessage.java | 5 + .../i2p/data/i2cp/DestroySessionMessage.java | 10 + .../net/i2p/data/i2cp/HostLookupMessage.java | 10 + .../net/i2p/data/i2cp/HostReplyMessage.java | 10 + .../src/net/i2p/data/i2cp/I2CPMessage.java | 17 +- .../i2p/data/i2cp/I2CPMessageException.java | 2 +- .../net/i2p/data/i2cp/I2CPMessageImpl.java | 11 + .../i2p/data/i2cp/MessagePayloadMessage.java | 10 + .../i2p/data/i2cp/MessageStatusMessage.java | 10 + .../data/i2cp/ReceiveMessageBeginMessage.java | 10 + .../data/i2cp/ReceiveMessageEndMessage.java | 10 + .../data/i2cp/ReconfigureSessionMessage.java | 10 + .../net/i2p/data/i2cp/ReportAbuseMessage.java | 10 + .../i2p/data/i2cp/RequestLeaseSetMessage.java | 10 + .../i2cp/RequestVariableLeaseSetMessage.java | 10 + .../net/i2p/data/i2cp/SendMessageMessage.java | 10 + .../i2p/data/i2cp/SessionStatusMessage.java | 10 + .../net/i2p/router/TunnelManagerFacade.java | 15 + .../net/i2p/router/TunnelPoolSettings.java | 38 +- .../router/client/ClientConnectionRunner.java | 367 ++++++++++++++---- .../net/i2p/router/client/ClientManager.java | 74 ++-- .../client/ClientManagerFacadeImpl.java | 2 +- .../client/ClientMessageEventListener.java | 102 +++-- .../i2p/router/client/CreateSessionJob.java | 21 +- .../i2p/router/client/LeaseRequestState.java | 1 + .../i2p/router/client/MessageReceivedJob.java | 27 +- .../net/i2p/router/client/ReportAbuseJob.java | 15 +- .../i2p/router/client/RequestLeaseSetJob.java | 11 +- .../dummy/DummyTunnelManagerFacade.java | 2 + .../i2p/router/tunnel/pool/TunnelPool.java | 7 + .../router/tunnel/pool/TunnelPoolManager.java | 103 ++++- 38 files changed, 1404 insertions(+), 193 deletions(-) create mode 100644 core/java/src/net/i2p/client/SubSession.java diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java index b7a5732d0..52d27889d 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java @@ -5,17 +5,20 @@ package net.i2p.client.streaming; import java.io.IOException; +import java.io.InputStream; import java.io.InterruptedIOException; import java.net.ConnectException; import java.net.NoRouteToHostException; import java.net.ServerSocket; import java.net.Socket; +import java.util.List; import java.util.Properties; import java.util.Set; import net.i2p.I2PAppContext; import net.i2p.I2PException; import net.i2p.client.I2PSession; +import net.i2p.client.I2PSessionException; import net.i2p.data.Destination; @@ -34,6 +37,26 @@ public interface I2PSocketManager { */ public I2PSession getSession(); + /** + * @return a new subsession, non-null + * @param privateKeyStream null for transient, if non-null must have same encryption keys as primary session + * and different signing keys + * @param opts subsession options if any, may be null + * @since 0.9.19 + */ + public I2PSession addSubsession(InputStream privateKeyStream, Properties opts) throws I2PSessionException; + + /** + * @since 0.9.19 + */ + public void removeSubsession(I2PSession session); + + /** + * @return a list of subsessions, non-null, does not include the primary session + * @since 0.9.19 + */ + public List getSubsessions(); + /** * How long should we wait for the client to .accept() a socket before * sending back a NACK/Close? diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketManagerFull.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketManagerFull.java index 41396edb8..c0d25efbb 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketManagerFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketManagerFull.java @@ -1,14 +1,17 @@ package net.i2p.client.streaming.impl; import java.io.IOException; +import java.io.InputStream; import java.net.ConnectException; import java.net.NoRouteToHostException; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketTimeoutException; import java.util.HashSet; +import java.util.List; import java.util.Properties; import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -37,6 +40,7 @@ public class I2PSocketManagerFull implements I2PSocketManager { private final I2PAppContext _context; private final Log _log; private final I2PSession _session; + private final List _subsessions; private final I2PServerSocketFull _serverSocket; private StandardServerSocket _realServerSocket; private final ConnectionOptions _defaultOptions; @@ -80,6 +84,7 @@ public class I2PSocketManagerFull implements I2PSocketManager { public I2PSocketManagerFull(I2PAppContext context, I2PSession session, Properties opts, String name) { _context = context; _session = session; + _subsessions = new CopyOnWriteArrayList(); _log = _context.logManager().getLog(I2PSocketManagerFull.class); _name = name + " " + (__managerId.incrementAndGet()); @@ -117,6 +122,39 @@ public class I2PSocketManagerFull implements I2PSocketManager { return _session; } +//////////// gahhh we want a socket manager, not a session + /** + * @return a new subsession, non-null + * @param privateKeyStream null for transient, if non-null must have same encryption keys as primary session + * and different signing keys + * @param opts subsession options if any, may be null + * @since 0.9.19 + */ + public I2PSession addSubsession(InputStream privateKeyStream, Properties opts) throws I2PSessionException { + I2PSession rv = _session.addSubsession(privateKeyStream, opts); + _subsessions.add(rv); + return rv; + } + + /** + * Remove the subsession + * + * @since 0.9.19 + */ + public void removeSubsession(I2PSession session) { + _session.removeSubsession(session); + _subsessions.remove(session); + // ... + } + + /** + * @return a list of subsessions, non-null, does not include the primary session + * @since 0.9.19 + */ + public List getSubsessions() { + return _session.getSubsessions(); + } + public ConnectionManager getConnectionManager() { return _connectionManager; } diff --git a/core/java/src/net/i2p/client/I2PSession.java b/core/java/src/net/i2p/client/I2PSession.java index da67e2c02..54f8540f3 100644 --- a/core/java/src/net/i2p/client/I2PSession.java +++ b/core/java/src/net/i2p/client/I2PSession.java @@ -9,6 +9,8 @@ package net.i2p.client; * */ +import java.io.InputStream; +import java.util.List; import java.util.Properties; import java.util.Set; @@ -21,7 +23,7 @@ import net.i2p.data.SigningPrivateKey; /** *

Define the standard means of sending and receiving messages on the * I2P network by using the I2CP (the client protocol). This is done over a - * bidirectional TCP socket and never sends any private keys. + * bidirectional TCP socket. * * End to end encryption in I2PSession was disabled in release 0.6. * @@ -247,6 +249,27 @@ public interface I2PSession { * */ public void destroySession() throws I2PSessionException; + + /** + * @return a new subsession, non-null + * @param privateKeyStream null for transient, if non-null must have same encryption keys as primary session + * and different signing keys + * @param opts subsession options if any, may be null + * @since 0.9.19 + */ + public I2PSession addSubsession(InputStream privateKeyStream, Properties opts) throws I2PSessionException; + + /** + * @return a list of subsessions, non-null, does not include the primary session + * @since 0.9.19 + */ + public void removeSubsession(I2PSession session); + + /** + * @return a list of subsessions, non-null, does not include the primary session + * @since 0.9.19 + */ + public List getSubsessions(); /** * Actually connect the session and start receiving/sending messages diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java index 7fd175978..93bc2af78 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl.java @@ -23,6 +23,7 @@ import java.util.Locale; import java.util.Map; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; @@ -43,6 +44,7 @@ import net.i2p.data.i2cp.I2CPMessage; import net.i2p.data.i2cp.I2CPMessageReader; import net.i2p.data.i2cp.MessagePayloadMessage; import net.i2p.data.i2cp.SessionId; +import net.i2p.data.i2cp.SessionStatusMessage; import net.i2p.internal.I2CPMessageQueue; import net.i2p.internal.InternalClientManager; import net.i2p.internal.QueuedI2CPMessageReader; @@ -76,6 +78,15 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa /** currently granted lease set, or null */ private volatile LeaseSet _leaseSet; + // subsession stuff + // registered subsessions + private final List _subsessions; + // established subsessions + private final ConcurrentHashMap _subsessionMap; + private final Object _subsessionLock = new Object(); + private static final String MIN_SUBSESSION_VERSION = "0.9.19"; + private volatile boolean _routerSupportsSubsessions; + /** hostname of router - will be null if in RouterContext */ protected final String _hostname; /** port num to router - will be 0 if in RouterContext */ @@ -179,6 +190,9 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa TEST_LOOKUP || (routerVersion != null && routerVersion.length() > 0 && VersionComparator.comp(routerVersion, MIN_HOST_LOOKUP_VERSION) >= 0); + _routerSupportsSubsessions = _context.isRouterContext() || + (routerVersion != null && routerVersion.length() > 0 && + VersionComparator.comp(routerVersion, MIN_SUBSESSION_VERSION) >= 0); synchronized (_stateLock) { if (_state == State.OPENING) { _state = State.GOTDATE; @@ -196,18 +210,42 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa */ protected I2PSessionImpl(I2PAppContext context, Properties options, I2PClientMessageHandlerMap handlerMap) { - this(context, options, handlerMap, false); + this(context, options, handlerMap, null, false); } - + + /* + * For extension by SubSession via I2PSessionMuxedImpl and I2PSessionImpl2 + * + * @param destKeyStream stream containing the private key data, + * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} + * @param options set of options to configure the router with, if null will use System properties + * @since 0.9.19 + */ + protected I2PSessionImpl(I2PSessionImpl primary, InputStream destKeyStream, Properties options) throws I2PSessionException { + this(primary.getContext(), options, primary.getHandlerMap(), primary.getProducer(), true); + _availabilityNotifier = new AvailabilityNotifier(); + try { + readDestination(destKeyStream); + } catch (DataFormatException dfe) { + throw new I2PSessionException("Error reading the destination key stream", dfe); + } catch (IOException ioe) { + throw new I2PSessionException("Error reading the destination key stream", ioe); + } + } + /** * Basic setup of finals * @since 0.9.7 */ private I2PSessionImpl(I2PAppContext context, Properties options, - I2PClientMessageHandlerMap handlerMap, boolean hasDest) { + I2PClientMessageHandlerMap handlerMap, + I2CPMessageProducer producer, + boolean hasDest) { _context = context; _handlerMap = handlerMap; _log = context.logManager().getLog(getClass()); + _subsessions = new CopyOnWriteArrayList(); + _subsessionMap = new ConcurrentHashMap(4); if (options == null) options = (Properties) System.getProperties().clone(); _options = loadConfig(options); @@ -215,7 +253,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa _portNum = getPort(); _fastReceive = Boolean.parseBoolean(_options.getProperty(I2PClient.PROP_FAST_RECEIVE)); if (hasDest) { - _producer = new I2CPMessageProducer(context); + _producer = producer; _availableMessages = new ConcurrentHashMap(); _myDestination = new Destination(); _privateKey = new PrivateKey(); @@ -238,10 +276,10 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa * @param destKeyStream stream containing the private key data, * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} * @param options set of options to configure the router with, if null will use System properties - * @throws I2PSessionException if there is a problem loading the private keys or + * @throws I2PSessionException if there is a problem loading the private keys */ public I2PSessionImpl(I2PAppContext context, InputStream destKeyStream, Properties options) throws I2PSessionException { - this(context, options, new I2PClientMessageHandlerMap(context), true); + this(context, options, new I2PClientMessageHandlerMap(context), new I2CPMessageProducer(context), true); _availabilityNotifier = new AvailabilityNotifier(); try { readDestination(destKeyStream); @@ -251,6 +289,66 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa throw new I2PSessionException("Error reading the destination key stream", ioe); } } + + /** + * Router must be connected or was connected... for now. + * + * @return a new subsession, non-null + * @param privateKeyStream null for transient, if non-null must have same encryption keys as primary session + * and different signing keys + * @param opts subsession options if any, may be null + * @since 0.9.19 + */ + public I2PSession addSubsession(InputStream privateKeyStream, Properties opts) throws I2PSessionException { + if (!_routerSupportsSubsessions) + throw new I2PSessionException("Router does not support subsessions"); + SubSession sub; + synchronized(_subsessionLock) { + if (_subsessions.size() > _subsessionMap.size()) + throw new I2PSessionException("Subsession request already pending"); + sub = new SubSession(this, privateKeyStream, opts); + for (SubSession ss : _subsessions) { + if (ss.getDecryptionKey().equals(sub.getDecryptionKey()) && + ss.getPrivateKey().equals(sub.getPrivateKey())) { + throw new I2PSessionException("Dup subsession"); + } + } + _subsessions.add(sub); + } + + synchronized (_stateLock) { + if (_state == State.OPEN) { + _producer.connect(sub); + } // else will be called in connect() + } + return sub; + } + + /** + * @since 0.9.19 + */ + public void removeSubsession(I2PSession session) { + if (!(session instanceof SubSession)) + return; + synchronized(_subsessionLock) { + _subsessions.remove(session); + SessionId id = ((SubSession) session).getSessionId(); + if (id != null) + _subsessionMap.remove(id); + /// tell the subsession + ///.... + } + } + + /** + * @return a list of subsessions, non-null, does not include the primary session + * @since 0.9.19 + */ + public List getSubsessions() { + synchronized(_subsessionLock) { + return new ArrayList(_subsessions); + } + } /** * Parse the config for anything we know about. @@ -536,6 +634,14 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa startIdleMonitor(); startVerifyUsage(); success = true; + + // now send CreateSessionMessages for all subsessions, one at a time, must wait for each response + synchronized(_subsessionLock) { + for (SubSession ss : _subsessions) { + _producer.connect(ss); + } + } + } catch (InterruptedException ie) { throw new I2PSessionException("Interrupted", ie); } catch (UnknownHostException uhe) { @@ -739,19 +845,80 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa /** * The I2CPMessageEventListener callback. * Recieve notification of some I2CP message and handle it if possible. + * + * We route the message based on message type AND session ID. + * + * The following types never contain a session ID and are not routable to + * a subsession: + * BandwidthLimitsMessage, DestReplyMessage + * + * The following types may not ontain a valid session ID + * even when intended for a subsession, so we must take special care: + * SessionStatusMessage + * * @param reader unused */ public void messageReceived(I2CPMessageReader reader, I2CPMessage message) { - I2CPMessageHandler handler = _handlerMap.getHandler(message.getType()); - if (handler == null) { - if (_log.shouldLog(Log.WARN)) - _log.warn(getPrefix() + "Unknown message or unhandleable message received: type = " - + message.getType()); + int type = message.getType(); + SessionId id = message.sessionId(); + if (id == null || id.equals(_sessionId) || + (_sessionId == null && id != null && type == SessionStatusMessage.MESSAGE_TYPE)) { + // it's for us + I2CPMessageHandler handler = _handlerMap.getHandler(type); + if (handler != null) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getPrefix() + "Message received of type " + type + + " to be handled by " + handler.getClass().getSimpleName()); + handler.handleMessage(message, this); + } else { + if (_log.shouldLog(Log.WARN)) + _log.warn(getPrefix() + "Unknown message or unhandleable message received: type = " + + type); + } } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getPrefix() + "Message received of type " + message.getType() - + " to be handled by " + handler.getClass().getSimpleName()); - handler.handleMessage(message, this); + SubSession sub = _subsessionMap.get(id); + if (sub != null) { + // it's for a subsession + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getPrefix() + "Message received of type " + type + + " to be handled by " + sub); + sub.messageReceived(reader, message); + } else if (id != null && type == SessionStatusMessage.MESSAGE_TYPE) { + // look for a subsession without a session + synchronized (_subsessionLock) { + for (SubSession sess : _subsessions) { + if (sess.getSessionId() == null) { + sub.messageReceived(reader, message); + id = sess.getSessionId(); + if (id != null) { + if (id.equals(_sessionId)) { + // shouldnt happen + sess.setSessionId(null); + if (_log.shouldLog(Log.WARN)) + _log.warn("Dup or our session id " + id); + } else { + SubSession old = _subsessionMap.putIfAbsent(id, sess); + if (old != null) { + // shouldnt happen + sess.setSessionId(null); + if (_log.shouldLog(Log.WARN)) + _log.warn("Dup session id " + id); + } + } + } + return; + } + if (_log.shouldLog(Log.WARN)) + _log.warn(getPrefix() + "No session " + id + " to handle message: type = " + + type); + } + } + } else { + // it's for nobody + if (_log.shouldLog(Log.WARN)) + _log.warn(getPrefix() + "No session " + id + " to handle message: type = " + + type); + } } } @@ -786,6 +953,18 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa */ I2CPMessageProducer getProducer() { return _producer; } + /** + * For Subsessions + * @since 0.9.19 + */ + I2PClientMessageHandlerMap getHandlerMap() { return _handlerMap; } + + /** + * For Subsessions + * @since 0.9.19 + */ + I2PAppContext getContext() { return _context; } + /** * Retrieve the configuration options * @return non-null, if insantiated with null options, this will be the System properties. @@ -803,7 +982,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa /** * Has the session been closed (or not yet connected)? - * False when open and during transitions. Unsynchronized. + * False when open and during transitions. */ public boolean isClosed() { synchronized (_stateLock) { @@ -892,6 +1071,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa if (_availabilityNotifier != null) _availabilityNotifier.stopNotifying(); closeSocket(); + _subsessionMap.clear(); if (_sessionListener != null) _sessionListener.disconnected(this); } diff --git a/core/java/src/net/i2p/client/I2PSessionImpl2.java b/core/java/src/net/i2p/client/I2PSessionImpl2.java index 9f8a5465d..21bb04737 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl2.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl2.java @@ -50,9 +50,9 @@ class I2PSessionImpl2 extends I2PSessionImpl { private static final long REMOVE_EXPIRED_TIME = 63*1000; - /** - * for extension by SimpleSession (no dest) - */ + /** + * for extension by SimpleSession (no dest) + */ protected I2PSessionImpl2(I2PAppContext context, Properties options, I2PClientMessageHandlerMap handlerMap) { super(context, options, handlerMap); @@ -61,15 +61,17 @@ class I2PSessionImpl2 extends I2PSessionImpl { } /** + * for extension by I2PSessionMuxedImpl + * * Create a new session, reading the Destination, PrivateKey, and SigningPrivateKey * from the destKeyStream, and using the specified options to connect to the router * * @param destKeyStream stream containing the private key data, * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} * @param options set of options to configure the router with, if null will use System properties - * @throws I2PSessionException if there is a problem loading the private keys or + * @throws I2PSessionException if there is a problem loading the private keys */ - public I2PSessionImpl2(I2PAppContext ctx, InputStream destKeyStream, Properties options) throws I2PSessionException { + protected I2PSessionImpl2(I2PAppContext ctx, InputStream destKeyStream, Properties options) throws I2PSessionException { super(ctx, destKeyStream, options); _sendingStates = new ConcurrentHashMap(32); _sendMessageNonce = new AtomicLong(); @@ -94,6 +96,26 @@ class I2PSessionImpl2 extends I2PSessionImpl { _context.statManager().createRateStat("i2cp.tx.msgExpanded", "size before compression", "i2cp", new long[] { 30*60*1000 }); } + /* + * For extension by SubSession via I2PSessionMuxedImpl + * + * @param destKeyStream stream containing the private key data, + * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} + * @param options set of options to configure the router with, if null will use System properties + * @since 0.9.19 + */ + protected I2PSessionImpl2(I2PSessionImpl primary, InputStream destKeyStream, Properties options) throws I2PSessionException { + super(primary, destKeyStream, options); + _sendingStates = new ConcurrentHashMap(32); + _sendMessageNonce = new AtomicLong(); + _noEffort = "none".equals(getOptions().getProperty(I2PClient.PROP_RELIABILITY, "").toLowerCase(Locale.US)); + _context.statManager().createRateStat("i2cp.receiveStatusTime.1", "How long it took to get status=1 back", "i2cp", new long[] { 10*60*1000 }); + _context.statManager().createRateStat("i2cp.receiveStatusTime.4", "How long it took to get status=4 back", "i2cp", new long[] { 10*60*1000 }); + _context.statManager().createRateStat("i2cp.receiveStatusTime.5", "How long it took to get status=5 back", "i2cp", new long[] { 10*60*1000 }); + _context.statManager().createRateStat("i2cp.tx.msgCompressed", "compressed size transferred", "i2cp", new long[] { 30*60*1000 }); + _context.statManager().createRateStat("i2cp.tx.msgExpanded", "size before compression", "i2cp", new long[] { 30*60*1000 }); + } + /** * Fire up a periodic task to check for unclaimed messages * @since 0.9.14 diff --git a/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java b/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java index 48ca6016e..880670a53 100644 --- a/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java @@ -82,6 +82,24 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 { // discards the one in super(), sorry about that... (no it wasn't started yet) _availabilityNotifier = new MuxedAvailabilityNotifier(); } + + /* + * For extension by SubSession + * + * @param destKeyStream stream containing the private key data, + * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} + * @param options set of options to configure the router with, if null will use System properties + * @since 0.9.19 + */ + protected I2PSessionMuxedImpl(I2PSessionImpl primary, InputStream destKeyStream, Properties options) throws I2PSessionException { + super(primary, destKeyStream, options); + // also stored in _sessionListener but we keep it in _demultipexer + // as well so we don't have to keep casting + _demultiplexer = new I2PSessionDemultiplexer(primary.getContext()); + super.setSessionListener(_demultiplexer); + // discards the one in super(), sorry about that... (no it wasn't started yet) + _availabilityNotifier = new MuxedAvailabilityNotifier(); + } /** listen on all protocols and ports */ @Override diff --git a/core/java/src/net/i2p/client/SubSession.java b/core/java/src/net/i2p/client/SubSession.java new file mode 100644 index 000000000..1e75fa409 --- /dev/null +++ b/core/java/src/net/i2p/client/SubSession.java @@ -0,0 +1,299 @@ +package net.i2p.client; + +/* + * free (adj.): unencumbered; not under the control of others + * Written by jrandom in 2003 and released into the public domain + * with no warranty of any kind, either expressed or implied. + * It probably won't make your computer catch on fire, or eat + * your children, but it might. Use at your own risk. + * + */ + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import net.i2p.I2PAppContext; +import net.i2p.data.Destination; +import net.i2p.data.Hash; +import net.i2p.data.PrivateKey; +import net.i2p.data.SigningPrivateKey; +import net.i2p.data.i2cp.I2CPMessage; +import net.i2p.data.i2cp.SessionId; + +/** + * An additional session using another session's connection. + * + * A subsession uses the same connection to the router as the primary session, + * but has a different Destination. It uses the same tunnels as the primary + * but has its own leaseset. It must use the same encryption keys as the primary + * so that garlic encryption/decryption works. + * + * The message handler map and message producer are reused from primary. + * + * Does NOT reuse the session listener ???? + * + * While the I2CP protocol, in theory, allows for fully independent sessions + * over the same I2CP connection, this is not currently supported by the router. + * + * @since 0.9.19 + */ +class SubSession extends I2PSessionMuxedImpl { + private final I2PSessionMuxedImpl _primary; + + /** + * @param primary must be a I2PSessionMuxedImpl + */ + public SubSession(I2PSession primary, InputStream destKeyStream, Properties options) throws I2PSessionException { + super((I2PSessionMuxedImpl)primary, destKeyStream, options); + _primary = (I2PSessionMuxedImpl) primary; + if (!getDecryptionKey().equals(_primary.getDecryptionKey())) + throw new I2PSessionException("encryption key mismatch"); + if (getPrivateKey().equals(_primary.getPrivateKey())) + throw new I2PSessionException("signing key must differ"); + // state management + } + + /** + * Unsupported in a subsession. + * @throws UnsupportedOperationException always + * @since 0.9.19 + */ + @Override + public I2PSession addSubsession(InputStream destKeyStream, Properties opts) throws I2PSessionException { + throw new UnsupportedOperationException(); + } + + /** + * Unsupported in a subsession. + * Does nothing. + * @since 0.9.19 + */ + @Override + public void removeSubsession(I2PSession session) {} + + /** + * Unsupported in a subsession. + * @return empty list always + * @since 0.9.19 + */ + @Override + public List getSubsessions() { + return Collections.emptyList(); + } + + /** + * Does nothing for now + */ + @Override + public void updateOptions(Properties options) {} + + /** + * Connect to the router and establish a session. This call blocks until + * a session is granted. + * + * Should be threadsafe, other threads will block until complete. + * Disconnect / destroy from another thread may be called simultaneously and + * will (should?) interrupt the connect. + * + * @throws I2PSessionException if there is a configuration error or the router is + * not reachable + */ + @Override + public void connect() throws I2PSessionException { + _primary.connect(); + } + + /** + * Has the session been closed (or not yet connected)? + * False when open and during transitions. + */ + @Override + public boolean isClosed() { + return getSessionId() == null || _primary.isClosed(); + } + + /** + * Deliver an I2CP message to the router + * May block for several seconds if the write queue to the router is full + * + * @throws I2PSessionException if the message is malformed or there is an error writing it out + */ + @Override + void sendMessage(I2CPMessage message) throws I2PSessionException { + if (isClosed()) + throw new I2PSessionException("Already closed"); + _primary.sendMessage(message); + } + + /** + * Pass off the error to the listener + * Misspelled, oh well. + * @param error non-null + */ + @Override + void propogateError(String msg, Throwable error) { + _primary.propogateError(msg, error); + if (_sessionListener != null) _sessionListener.errorOccurred(this, msg, error); + } + + /** + * Tear down the session, and do NOT reconnect. + * + * Blocks if session has not been fully started. + */ + @Override + public void destroySession() { + _primary.destroySession(); + if (_availabilityNotifier != null) + _availabilityNotifier.stopNotifying(); + if (_sessionListener != null) _sessionListener.disconnected(this); + } + + /** + * Will interrupt a connect in progress. + */ + @Override + protected void disconnect() { + _primary.disconnect(); + } + + @Override + protected boolean reconnect() { + return _primary.reconnect(); + } + + /** + * Called by the message handler + * on reception of DestReplyMessage + * + * This will never happen, as the dest reply message does not contain a session ID. + */ + @Override + void destReceived(Destination d) { + _primary.destReceived(d); + } + + /** + * Called by the message handler + * on reception of DestReplyMessage + * + * This will never happen, as the dest reply message does not contain a session ID. + * + * @param h non-null + */ + @Override + void destLookupFailed(Hash h) { + _primary.destLookupFailed(h); + } + + /** + * Called by the message handler + * on reception of HostReplyMessage + * @param d non-null + */ + void destReceived(long nonce, Destination d) { + _primary.destReceived(nonce, d); + } + + /** + * Called by the message handler + * on reception of HostReplyMessage + */ + @Override + void destLookupFailed(long nonce) { + _primary.destLookupFailed(nonce); + } + + /** + * Called by the message handler. + * This will never happen, as the bw limits message does not contain a session ID. + */ + @Override + void bwReceived(int[] i) { + _primary.bwReceived(i); + } + + /** + * Blocking. Waits a max of 10 seconds by default. + * See lookupDest with maxWait parameter to change. + * Implemented in 0.8.3 in I2PSessionImpl; + * previously was available only in I2PSimpleSession. + * Multiple outstanding lookups are now allowed. + * @return null on failure + */ + @Override + public Destination lookupDest(Hash h) throws I2PSessionException { + return _primary.lookupDest(h); + } + + /** + * Blocking. + * @param maxWait ms + * @return null on failure + */ + @Override + public Destination lookupDest(Hash h, long maxWait) throws I2PSessionException { + return _primary.lookupDest(h, maxWait); + } + + /** + * Ask the router to lookup a Destination by host name. + * Blocking. Waits a max of 10 seconds by default. + * + * This only makes sense for a b32 hostname, OR outside router context. + * Inside router context, just query the naming service. + * Outside router context, this does NOT query the context naming service. + * Do that first if you expect a local addressbook. + * + * This will log a warning for non-b32 in router context. + * + * See interface for suggested implementation. + * + * Requires router side to be 0.9.11 or higher. If the router is older, + * this will return null immediately. + */ + @Override + public Destination lookupDest(String name) throws I2PSessionException { + return _primary.lookupDest(name); + } + + /** + * Ask the router to lookup a Destination by host name. + * Blocking. See above for details. + * @param maxWait ms + * @return null on failure + */ + @Override + public Destination lookupDest(String name, long maxWait) throws I2PSessionException { + return _primary.lookupDest(name, maxWait); + } + + /** + * This may not work???????????, as the reply does not contain a session ID, so + * it won't be routed back to us? + */ + @Override + public int[] bandwidthLimits() throws I2PSessionException { + return _primary.bandwidthLimits(); + } + + @Override + protected void updateActivity() { + _primary.updateActivity(); + } + + @Override + public long lastActivity() { + return _primary.lastActivity(); + } + + @Override + public void setReduced() { + _primary.setReduced(); + } +} diff --git a/core/java/src/net/i2p/data/i2cp/CreateLeaseSetMessage.java b/core/java/src/net/i2p/data/i2cp/CreateLeaseSetMessage.java index 6477768c5..0cbb854ff 100644 --- a/core/java/src/net/i2p/data/i2cp/CreateLeaseSetMessage.java +++ b/core/java/src/net/i2p/data/i2cp/CreateLeaseSetMessage.java @@ -38,6 +38,11 @@ public class CreateLeaseSetMessage extends I2CPMessageImpl { return _sessionId; } + @Override + public SessionId sessionId() { + return _sessionId; + } + public void setSessionId(SessionId id) { _sessionId = id; } diff --git a/core/java/src/net/i2p/data/i2cp/DestroySessionMessage.java b/core/java/src/net/i2p/data/i2cp/DestroySessionMessage.java index b67f5fa6f..4275863e1 100644 --- a/core/java/src/net/i2p/data/i2cp/DestroySessionMessage.java +++ b/core/java/src/net/i2p/data/i2cp/DestroySessionMessage.java @@ -32,6 +32,16 @@ public class DestroySessionMessage extends I2CPMessageImpl { return _sessionId; } + /** + * Return the SessionId for this message. + * + * @since 0.9.19 + */ + @Override + public SessionId sessionId() { + return _sessionId; + } + public void setSessionId(SessionId id) { _sessionId = id; } diff --git a/core/java/src/net/i2p/data/i2cp/HostLookupMessage.java b/core/java/src/net/i2p/data/i2cp/HostLookupMessage.java index 1a8fcfe86..c4405f90a 100644 --- a/core/java/src/net/i2p/data/i2cp/HostLookupMessage.java +++ b/core/java/src/net/i2p/data/i2cp/HostLookupMessage.java @@ -76,6 +76,16 @@ public class HostLookupMessage extends I2CPMessageImpl { return _sessionId; } + /** + * Return the SessionId for this message. + * + * @since 0.9.19 + */ + @Override + public SessionId sessionId() { + return _sessionId; + } + /** * @return 0 to 2**32 - 1 */ diff --git a/core/java/src/net/i2p/data/i2cp/HostReplyMessage.java b/core/java/src/net/i2p/data/i2cp/HostReplyMessage.java index 37faaa276..b350f225a 100644 --- a/core/java/src/net/i2p/data/i2cp/HostReplyMessage.java +++ b/core/java/src/net/i2p/data/i2cp/HostReplyMessage.java @@ -73,6 +73,16 @@ public class HostReplyMessage extends I2CPMessageImpl { return _sessionId; } + /** + * Return the SessionId for this message. + * + * @since 0.9.19 + */ + @Override + public SessionId sessionId() { + return _sessionId; + } + /** * @return 0 to 2**32 - 1 */ diff --git a/core/java/src/net/i2p/data/i2cp/I2CPMessage.java b/core/java/src/net/i2p/data/i2cp/I2CPMessage.java index 92c4f6163..58d312013 100644 --- a/core/java/src/net/i2p/data/i2cp/I2CPMessage.java +++ b/core/java/src/net/i2p/data/i2cp/I2CPMessage.java @@ -60,9 +60,20 @@ public interface I2CPMessage extends DataStructure { public void writeMessage(OutputStream out) throws I2CPMessageException, IOException; /** - * Return the unique identifier for this type of APIMessage, as specified in the + * Return the unique identifier for this type of message, as specified in the * network specification document under #ClientAccessLayerMessages - * @return unique identifier for this type of APIMessage + * @return unique identifier for this type of message */ public int getType(); -} \ No newline at end of file + + /** + * Return the SessionId for this type of message. + * Most but not all message types include a SessionId. + * The ones that do already define getSessionId(), but some return a SessionId and + * some return a long, so we define a new method here. + * + * @return SessionId or null if this message type does not include a SessionId + * @since 0.9.19 + */ + public SessionId sessionId(); +} diff --git a/core/java/src/net/i2p/data/i2cp/I2CPMessageException.java b/core/java/src/net/i2p/data/i2cp/I2CPMessageException.java index 63d13b040..b2e38c21e 100644 --- a/core/java/src/net/i2p/data/i2cp/I2CPMessageException.java +++ b/core/java/src/net/i2p/data/i2cp/I2CPMessageException.java @@ -12,7 +12,7 @@ package net.i2p.data.i2cp; import net.i2p.I2PException; /** - * Represent an error serializing or deserializing an APIMessage + * Represent an error serializing or deserializing a message * * @author jrandom */ diff --git a/core/java/src/net/i2p/data/i2cp/I2CPMessageImpl.java b/core/java/src/net/i2p/data/i2cp/I2CPMessageImpl.java index 3ace5df9d..19b8d1cc3 100644 --- a/core/java/src/net/i2p/data/i2cp/I2CPMessageImpl.java +++ b/core/java/src/net/i2p/data/i2cp/I2CPMessageImpl.java @@ -127,4 +127,15 @@ public abstract class I2CPMessageImpl extends DataStructureImpl implements I2CPM throw new DataFormatException("Error writing the message", ime); } } + + /** + * Return the SessionId for this type of message. + * Most but not all message types include a SessionId. + * The ones that do already define getSessionId(), but some return a SessionId and + * some return a long, so we define a new method here. + * + * @return null always. Extending classes with a SessionId must override. + * @since 0.9.19 + */ + public SessionId sessionId() { return null; } } diff --git a/core/java/src/net/i2p/data/i2cp/MessagePayloadMessage.java b/core/java/src/net/i2p/data/i2cp/MessagePayloadMessage.java index 331ee3dad..4e88f71ba 100644 --- a/core/java/src/net/i2p/data/i2cp/MessagePayloadMessage.java +++ b/core/java/src/net/i2p/data/i2cp/MessagePayloadMessage.java @@ -37,6 +37,16 @@ public class MessagePayloadMessage extends I2CPMessageImpl { return _sessionId; } + /** + * Return the SessionId for this message. + * + * @since 0.9.19 + */ + @Override + public SessionId sessionId() { + return _sessionId >= 0 ? new SessionId(_sessionId) : null; + } + /** @param id 0-65535 */ public void setSessionId(long id) { _sessionId = (int) id; diff --git a/core/java/src/net/i2p/data/i2cp/MessageStatusMessage.java b/core/java/src/net/i2p/data/i2cp/MessageStatusMessage.java index d9122bd77..5b77ffc4c 100644 --- a/core/java/src/net/i2p/data/i2cp/MessageStatusMessage.java +++ b/core/java/src/net/i2p/data/i2cp/MessageStatusMessage.java @@ -193,6 +193,16 @@ public class MessageStatusMessage extends I2CPMessageImpl { return _sessionId; } + /** + * Return the SessionId for this message. + * + * @since 0.9.19 + */ + @Override + public SessionId sessionId() { + return _sessionId >= 0 ? new SessionId(_sessionId) : null; + } + /** @param id 0-65535 */ public void setSessionId(long id) { _sessionId = (int) id; diff --git a/core/java/src/net/i2p/data/i2cp/ReceiveMessageBeginMessage.java b/core/java/src/net/i2p/data/i2cp/ReceiveMessageBeginMessage.java index 032c1ea8b..ccb96cb60 100644 --- a/core/java/src/net/i2p/data/i2cp/ReceiveMessageBeginMessage.java +++ b/core/java/src/net/i2p/data/i2cp/ReceiveMessageBeginMessage.java @@ -36,6 +36,16 @@ public class ReceiveMessageBeginMessage extends I2CPMessageImpl { return _sessionId; } + /** + * Return the SessionId for this message. + * + * @since 0.9.19 + */ + @Override + public SessionId sessionId() { + return _sessionId >= 0 ? new SessionId(_sessionId) : null; + } + /** @param id 0-65535 */ public void setSessionId(long id) { _sessionId = (int) id; diff --git a/core/java/src/net/i2p/data/i2cp/ReceiveMessageEndMessage.java b/core/java/src/net/i2p/data/i2cp/ReceiveMessageEndMessage.java index c405b93a3..5be2bbfeb 100644 --- a/core/java/src/net/i2p/data/i2cp/ReceiveMessageEndMessage.java +++ b/core/java/src/net/i2p/data/i2cp/ReceiveMessageEndMessage.java @@ -35,6 +35,16 @@ public class ReceiveMessageEndMessage extends I2CPMessageImpl { return _sessionId; } + /** + * Return the SessionId for this message. + * + * @since 0.9.19 + */ + @Override + public SessionId sessionId() { + return _sessionId >= 0 ? new SessionId(_sessionId) : null; + } + /** @param id 0-65535 */ public void setSessionId(long id) { _sessionId = (int) id; diff --git a/core/java/src/net/i2p/data/i2cp/ReconfigureSessionMessage.java b/core/java/src/net/i2p/data/i2cp/ReconfigureSessionMessage.java index 01130670a..8f80f531b 100644 --- a/core/java/src/net/i2p/data/i2cp/ReconfigureSessionMessage.java +++ b/core/java/src/net/i2p/data/i2cp/ReconfigureSessionMessage.java @@ -33,6 +33,16 @@ public class ReconfigureSessionMessage extends I2CPMessageImpl { return _sessionId; } + /** + * Return the SessionId for this message. + * + * @since 0.9.19 + */ + @Override + public SessionId sessionId() { + return _sessionId; + } + public void setSessionId(SessionId id) { _sessionId = id; } diff --git a/core/java/src/net/i2p/data/i2cp/ReportAbuseMessage.java b/core/java/src/net/i2p/data/i2cp/ReportAbuseMessage.java index 76311ae6e..cfdbd6cfe 100644 --- a/core/java/src/net/i2p/data/i2cp/ReportAbuseMessage.java +++ b/core/java/src/net/i2p/data/i2cp/ReportAbuseMessage.java @@ -35,6 +35,16 @@ public class ReportAbuseMessage extends I2CPMessageImpl { return _sessionId; } + /** + * Return the SessionId for this message. + * + * @since 0.9.19 + */ + @Override + public SessionId sessionId() { + return _sessionId; + } + public void setSessionId(SessionId id) { _sessionId = id; } diff --git a/core/java/src/net/i2p/data/i2cp/RequestLeaseSetMessage.java b/core/java/src/net/i2p/data/i2cp/RequestLeaseSetMessage.java index 1e9b2dcf6..15d9b70ec 100644 --- a/core/java/src/net/i2p/data/i2cp/RequestLeaseSetMessage.java +++ b/core/java/src/net/i2p/data/i2cp/RequestLeaseSetMessage.java @@ -45,6 +45,16 @@ public class RequestLeaseSetMessage extends I2CPMessageImpl { return _sessionId; } + /** + * Return the SessionId for this message. + * + * @since 0.9.19 + */ + @Override + public SessionId sessionId() { + return _sessionId; + } + public void setSessionId(SessionId id) { _sessionId = id; } diff --git a/core/java/src/net/i2p/data/i2cp/RequestVariableLeaseSetMessage.java b/core/java/src/net/i2p/data/i2cp/RequestVariableLeaseSetMessage.java index 160e193dc..3edd06171 100644 --- a/core/java/src/net/i2p/data/i2cp/RequestVariableLeaseSetMessage.java +++ b/core/java/src/net/i2p/data/i2cp/RequestVariableLeaseSetMessage.java @@ -55,6 +55,16 @@ public class RequestVariableLeaseSetMessage extends I2CPMessageImpl { return _sessionId; } + /** + * Return the SessionId for this message. + * + * @since 0.9.19 + */ + @Override + public SessionId sessionId() { + return _sessionId; + } + public void setSessionId(SessionId id) { _sessionId = id; } diff --git a/core/java/src/net/i2p/data/i2cp/SendMessageMessage.java b/core/java/src/net/i2p/data/i2cp/SendMessageMessage.java index 67a67ffa4..e783c62c8 100644 --- a/core/java/src/net/i2p/data/i2cp/SendMessageMessage.java +++ b/core/java/src/net/i2p/data/i2cp/SendMessageMessage.java @@ -38,6 +38,16 @@ public class SendMessageMessage extends I2CPMessageImpl { return _sessionId; } + /** + * Return the SessionId for this message. + * + * @since 0.9.19 + */ + @Override + public SessionId sessionId() { + return _sessionId; + } + public void setSessionId(SessionId id) { _sessionId = id; } diff --git a/core/java/src/net/i2p/data/i2cp/SessionStatusMessage.java b/core/java/src/net/i2p/data/i2cp/SessionStatusMessage.java index 5af2a649b..122cf5d1e 100644 --- a/core/java/src/net/i2p/data/i2cp/SessionStatusMessage.java +++ b/core/java/src/net/i2p/data/i2cp/SessionStatusMessage.java @@ -42,6 +42,16 @@ public class SessionStatusMessage extends I2CPMessageImpl { return _sessionId; } + /** + * Return the SessionId for this message. + * + * @since 0.9.19 + */ + @Override + public SessionId sessionId() { + return _sessionId; + } + public void setSessionId(SessionId id) { _sessionId = id; } diff --git a/router/java/src/net/i2p/router/TunnelManagerFacade.java b/router/java/src/net/i2p/router/TunnelManagerFacade.java index 2a35d0c97..a95fbd1f1 100644 --- a/router/java/src/net/i2p/router/TunnelManagerFacade.java +++ b/router/java/src/net/i2p/router/TunnelManagerFacade.java @@ -146,6 +146,21 @@ public interface TunnelManagerFacade extends Service { * */ public void buildTunnels(Destination client, ClientTunnelSettings settings); + + /** + * Add another destination to the same tunnels. + * Must have same encryption key an a different signing key. + * @throws IllegalArgumentException if not + * @return success + * @since 0.9.19 + */ + public boolean addAlias(Destination dest, ClientTunnelSettings settings, Destination existingClient); + + /** + * Remove another destination to the same tunnels. + * @since 0.9.19 + */ + public void removeAlias(Destination dest); public TunnelPoolSettings getInboundSettings(); public TunnelPoolSettings getOutboundSettings(); diff --git a/router/java/src/net/i2p/router/TunnelPoolSettings.java b/router/java/src/net/i2p/router/TunnelPoolSettings.java index fd718e479..3c47a7762 100644 --- a/router/java/src/net/i2p/router/TunnelPoolSettings.java +++ b/router/java/src/net/i2p/router/TunnelPoolSettings.java @@ -1,11 +1,13 @@ package net.i2p.router; +import java.util.Set; import java.util.Locale; import java.util.Map; import java.util.Properties; import net.i2p.data.Base64; import net.i2p.data.Hash; +import net.i2p.util.ConcurrentHashSet; import net.i2p.util.NativeBigInteger; import net.i2p.util.RandomSource; import net.i2p.util.SystemVersion; @@ -31,6 +33,8 @@ public class TunnelPoolSettings { private final Properties _unknownOptions; private Hash _randomKey; private int _priority; + private final Set _aliases; + private Hash _aliasOf; /** prefix used to override the router's defaults for clients */ // unimplemented @@ -119,6 +123,10 @@ public class TunnelPoolSettings { _randomKey = generateRandomKey(); if (_isExploratory && !_isInbound) _priority = EXPLORATORY_PRIORITY; + if (!_isExploratory) + _aliases = new ConcurrentHashSet(4); + else + _aliases = null; } /** how many tunnels should be available at all times */ @@ -206,6 +214,34 @@ public class TunnelPoolSettings { /** what destination is this a client tunnel for (or null if exploratory) */ public Hash getDestination() { return _destination; } + + /** + * Other destinations that use the same tunnel (or null if exploratory) + * Modifiable, concurrent, not a copy + * @since 0.9.19 + */ + public Set getAliases() { + return _aliases; + } + + /** + * Other destination that this is an alias of (or null). + * If non-null, don't build tunnels. + * @since 0.9.19 + */ + public Hash getAliasOf() { + return _aliasOf; + } + + + /** + * Set other destination that this is an alias of (or null). + * If non-null, don't build tunnels. + * @since 0.9.19 + */ + public void setAliasOf(Hash h) { + _aliasOf = h; + } /** * random key used for peer ordering @@ -235,7 +271,7 @@ public class TunnelPoolSettings { public int getPriority() { return _priority; } public Properties getUnknownOptions() { return _unknownOptions; } - + /** * @param prefix non-null */ diff --git a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java index fbfa0a560..943555643 100644 --- a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java +++ b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java @@ -16,6 +16,7 @@ import java.io.OutputStream; import java.net.Socket; import java.util.concurrent.ConcurrentHashMap; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; @@ -39,6 +40,7 @@ import net.i2p.data.i2cp.SendMessageMessage; import net.i2p.data.i2cp.SendMessageExpiresMessage; import net.i2p.data.i2cp.SessionConfig; import net.i2p.data.i2cp.SessionId; +import net.i2p.data.i2cp.SessionStatusMessage; import net.i2p.router.Job; import net.i2p.router.JobImpl; import net.i2p.router.RouterContext; @@ -51,6 +53,9 @@ import net.i2p.util.SimpleTimer; /** * Bridge the router and the client - managing state for a client. * + * As of release 0.9.19, multiple sessions are supported on a single + * I2CP connection. These sessions share tunnels and some configuration. + * * @author jrandom */ class ClientConnectionRunner { @@ -61,21 +66,16 @@ class ClientConnectionRunner { private final Socket _socket; /** output stream of the socket that I2CP messages bound to the client should be written to */ private OutputStream _out; - /** session ID of the current client */ - private SessionId _sessionId; - /** user's config */ - private SessionConfig _config; + + private final ConcurrentHashMap _sessions; + private String _clientVersion; /** * Mapping of MessageId to Payload, storing messages for retrieval. * Unused for i2cp.fastReceive = "true" (_dontSendMSMOnRecive = true) */ private final Map _messages; - /** lease set request state, or null if there is no request pending on at the moment */ - private LeaseRequestState _leaseRequest; private int _consecutiveLeaseRequestFails; - /** currently allocated leaseSet, or null if none is allocated */ - private LeaseSet _currentLeaseSet; /** * Set of messageIds created but not yet ACCEPTED. * Unused for i2cp.messageReliability = "none" (_dontSendMSM = true) @@ -83,7 +83,7 @@ class ClientConnectionRunner { private final Set _acceptedPending; /** thingy that does stuff */ protected I2CPMessageReader _reader; - /** just for this destination */ + /** Used for all sessions, which must all have the same crypto keys */ private SessionKeyManager _sessionKeyManager; /** * This contains the last 10 MessageIds that have had their (non-ack) status @@ -91,7 +91,6 @@ class ClientConnectionRunner { */ private final List _alreadyProcessed; private ClientWriterRunner _writer; - private Hash _destHashCache; /** are we, uh, dead */ private volatile boolean _dead; /** For outbound traffic. true if i2cp.messageReliability = "none"; @since 0.8.1 */ @@ -108,11 +107,30 @@ class ClientConnectionRunner { private static final int MAX_LEASE_FAILS = 5; private static final int BUF_SIZE = 32*1024; + private static final int MAX_SESSIONS = 4; /** @since 0.9.2 */ private static final String PROP_TAGS = "crypto.tagsToSend"; private static final String PROP_THRESH = "crypto.lowTagThreshold"; + /** + * For multisession + * @since 0.9.19 + */ + private static class SessionParams { + final Destination dest; + final boolean isPrimary; + SessionId sessionId; + SessionConfig config; + LeaseRequestState leaseRequest; + LeaseSet currentLeaseSet; + + SessionParams(Destination d, boolean isPrimary) { + dest = d; + this.isPrimary = isPrimary; + } + } + /** * Create a new runner against the given socket * @@ -124,6 +142,7 @@ class ClientConnectionRunner { _socket = socket; // unused for fastReceive _messages = new ConcurrentHashMap(); + _sessions = new ConcurrentHashMap(4); _alreadyProcessed = new ArrayList(); _acceptedPending = new ConcurrentHashSet(); _messageId = new AtomicInteger(_context.random().nextInt()); @@ -166,8 +185,7 @@ class ClientConnectionRunner { public synchronized void stopRunning() { if (_dead) return; if (_context.router().isAlive() && _log.shouldLog(Log.WARN)) - _log.warn("Stop the I2CP connection! current leaseSet: " - + _currentLeaseSet, new Exception("Stop client connection")); + _log.warn("Stop the I2CP connection!", new Exception("Stop client connection")); _dead = true; // we need these keys to unpublish the leaseSet if (_reader != null) _reader.stopReading(); @@ -178,21 +196,56 @@ class ClientConnectionRunner { if (_sessionKeyManager != null) _sessionKeyManager.shutdown(); _manager.unregisterConnection(this); - if (_currentLeaseSet != null) - _context.netDb().unpublish(_currentLeaseSet); - _leaseRequest = null; + for (SessionParams sp : _sessions.values()) { + LeaseSet ls = sp.currentLeaseSet; + if (ls != null) + _context.netDb().unpublish(ls); + } synchronized (_alreadyProcessed) { _alreadyProcessed.clear(); } - //_config = null; - //_manager = null; + _sessions.clear(); } /** * Current client's config, - * will be null before session is established + * will be null if session not found + * IS subsession aware. + * @since 0.9.19 added hash param */ - public SessionConfig getConfig() { return _config; } + public SessionConfig getConfig(Hash h) { + SessionParams sp = _sessions.get(h); + if (sp == null) + return null; + return sp.config; + } + + /** + * Current client's config, + * will be null if session not found + * IS subsession aware. + * @since 0.9.19 added id param + */ + public SessionConfig getConfig(SessionId id) { + for (SessionParams sp : _sessions.values()) { + if (id.equals(sp.sessionId)) + return sp.config; + } + return null; + } + + /** + * Primary client's config, + * will be null if session not set up + * @since 0.9.19 + */ + public SessionConfig getPrimaryConfig() { + for (SessionParams sp : _sessions.values()) { + if (sp.isPrimary) + return sp.config; + } + return null; + } /** * The client version. @@ -214,41 +267,186 @@ class ClientConnectionRunner { /** current client's sessionkeymanager */ public SessionKeyManager getSessionKeyManager() { return _sessionKeyManager; } - /** currently allocated leaseSet */ - public LeaseSet getLeaseSet() { return _currentLeaseSet; } - void setLeaseSet(LeaseSet ls) { _currentLeaseSet = ls; } + /** + * Currently allocated leaseSet. + * IS subsession aware. Returns primary leaseset only. + * @return leaseSet or null if not yet set or unknown hash + * @since 0.9.19 added hash parameter + */ + public LeaseSet getLeaseSet(Hash h) { + SessionParams sp = _sessions.get(h); + if (sp == null) + return null; + return sp.currentLeaseSet; + } + + /** + * Currently allocated leaseSet. + * IS subsession aware. + */ + void setLeaseSet(LeaseSet ls) { + Hash h = ls.getDestination().calculateHash(); + SessionParams sp = _sessions.get(h); + if (sp == null) + return; + sp.currentLeaseSet = ls; + } /** * Equivalent to getConfig().getDestination().calculateHash(); * will be null before session is established + * Not subsession aware. Returns random hash from the sessions. + * Don't use if you can help it. + * + * @return primary hash or null if not yet set */ - public Hash getDestHash() { return _destHashCache; } + public Hash getDestHash() { + for (Hash h : _sessions.keySet()) { + return h; + } + return null; + } + + /** + * Return the hash for the given ID + * @return hash or null if unknown + * @since 0.9.19 + */ + public Hash getDestHash(SessionId id) { + for (Map.Entry e : _sessions.entrySet()) { + if (id.equals(e.getValue().sessionId)) + return e.getKey(); + } + return null; + } + + /** + * Return the dest for the given ID + * @return dest or null if unknown + * @since 0.9.19 + */ + public Destination getDestination(SessionId id) { + for (SessionParams sp : _sessions.values()) { + if (id.equals(sp.sessionId)) + return sp.dest; + } + return null; + } /** - * @return current client's sessionId or null if not yet set + * Subsession aware. + * + * @param h the local target + * @return current client's sessionId or null if not yet set or not a valid hash + * @since 0.9.19 */ - SessionId getSessionId() { return _sessionId; } + SessionId getSessionId(Hash h) { + SessionParams sp = _sessions.get(h); + if (sp == null) + return null; + return sp.sessionId; + } + + /** + * Subsession aware. + * + * @return all current client's sessionIds, non-null + * @since 0.9.19 + */ + List getSessionIds() { + List rv = new ArrayList(_sessions.size()); + for (SessionParams sp : _sessions.values()) { + SessionId id = sp.sessionId; + if (id != null) + rv.add(id); + } + return rv; + } + + /** + * Subsession aware. + * + * @return all current client's destinations, non-null + * @since 0.9.19 + */ + List getDestinations() { + List rv = new ArrayList(_sessions.size()); + for (SessionParams sp : _sessions.values()) { + rv.add(sp.dest); + } + return rv; + } /** * To be called only by ClientManager. * + * @param hash for the session * @throws IllegalStateException if already set + * @since 0.9.19 added hash param */ - void setSessionId(SessionId id) { - if (_sessionId != null) + void setSessionId(Hash hash, SessionId id) { + if (hash == null) throw new IllegalStateException(); - _sessionId = id; + SessionParams sp = _sessions.get(hash); + if (sp == null || sp.sessionId != null) + throw new IllegalStateException(); + sp.sessionId = id; + } + + /** + * Kill the session. Caller must kill runner if none left. + * + * @since 0.9.19 + */ + void removeSession(SessionId id) { + boolean isPrimary = false; + for (Iterator iter = _sessions.values().iterator(); iter.hasNext(); ) { + SessionParams sp = iter.next(); + if (id.equals(sp.sessionId)) { + if (_log.shouldLog(Log.INFO)) + _log.info("Destroying client session " + id); + iter.remove(); + // Tell client manger + _manager.unregisterSession(id, sp.dest); + LeaseSet ls = sp.currentLeaseSet; + if (ls != null) + _context.netDb().unpublish(ls); + isPrimary = sp.isPrimary; + } + } + if (isPrimary) { + // kill all the others also + for (SessionParams sp : _sessions.values()) { + _manager.unregisterSession(id, sp.dest); + LeaseSet ls = sp.currentLeaseSet; + if (ls != null) + _context.netDb().unpublish(ls); + } + } } - /** data for the current leaseRequest, or null if there is no active leaseSet request */ - LeaseRequestState getLeaseRequest() { return _leaseRequest; } + /** + * Data for the current leaseRequest, or null if there is no active leaseSet request. + * Not subsession aware. Returns primary ID only. + * @since 0.9.19 added hash param + */ + LeaseRequestState getLeaseRequest(Hash h) { + SessionParams sp = _sessions.get(h); + if (sp == null) + return null; + return sp.leaseRequest; + } /** @param req non-null */ public void failLeaseRequest(LeaseRequestState req) { boolean disconnect = false; + Hash h = req.getRequested().getDestination().calculateHash(); + SessionParams sp = _sessions.get(h); + if (sp == null) + return; synchronized (this) { - if (_leaseRequest == req) { - _leaseRequest = null; + if (sp.leaseRequest == req) { + sp.leaseRequest = null; disconnect = ++_consecutiveLeaseRequestFails > MAX_LEASE_FAILS; } } @@ -289,19 +487,34 @@ class ClientConnectionRunner { * @return SessionStatusMessage return code, 1 for success, != 1 for failure */ public int sessionEstablished(SessionConfig config) { - _destHashCache = config.getDestination().calculateHash(); + Destination dest = config.getDestination(); + Hash destHash = dest.calculateHash(); if (_log.shouldLog(Log.DEBUG)) - _log.debug("SessionEstablished called for destination " + _destHashCache.toBase64()); - _config = config; + _log.debug("SessionEstablished called for destination " + destHash); + if (_sessions.size() > MAX_SESSIONS) + return SessionStatusMessage.STATUS_REFUSED; + boolean isPrimary = _sessions.isEmpty(); + if (!isPrimary) { + // all encryption keys must be the same + for (SessionParams sp : _sessions.values()) { + if (!dest.getPublicKey().equals(sp.dest.getPublicKey())) + return SessionStatusMessage.STATUS_INVALID; + } + } + SessionParams sp = new SessionParams(dest, isPrimary); + sp.config = config; + SessionParams old = _sessions.putIfAbsent(destHash, sp); + if (old != null) + return SessionStatusMessage.STATUS_INVALID; // We process a few options here, but most are handled by the tunnel manager. // The ones here can't be changed later. Properties opts = config.getOptions(); - if (opts != null) { + if (isPrimary && opts != null) { _dontSendMSM = "none".equals(opts.getProperty(I2PClient.PROP_RELIABILITY, "").toLowerCase(Locale.US)); _dontSendMSMOnReceive = Boolean.parseBoolean(opts.getProperty(I2PClient.PROP_FAST_RECEIVE)); } // per-destination session key manager to prevent rather easy correlation - if (_sessionKeyManager == null) { + if (isPrimary && _sessionKeyManager == null) { int tags = TransientSessionKeyManager.DEFAULT_TAGS; int thresh = TransientSessionKeyManager.LOW_THRESHOLD; if (opts != null) { @@ -315,10 +528,8 @@ class ClientConnectionRunner { } } _sessionKeyManager = new TransientSessionKeyManager(_context, tags, thresh); - } else { - _log.error("SessionEstablished called for twice for destination " + _destHashCache.toBase64().substring(0,4)); } - return _manager.destinationEstablished(this); + return _manager.destinationEstablished(this, dest); } /** @@ -329,14 +540,21 @@ class ClientConnectionRunner { * * Do not use for status = STATUS_SEND_ACCEPTED; use ackSendMessage() for that. * + * @param dest the client * @param id the router's ID for this message * @param messageNonce the client's ID for this message * @param status see I2CP MessageStatusMessage for success/failure codes */ - void updateMessageDeliveryStatus(MessageId id, long messageNonce, int status) { + void updateMessageDeliveryStatus(Destination dest, MessageId id, long messageNonce, int status) { if (_dead || messageNonce <= 0) return; - _context.jobQueue().addJob(new MessageDeliveryStatusUpdate(id, messageNonce, status)); + SessionParams sp = _sessions.get(dest.calculateHash()); + if (sp == null) + return; + SessionId sid = sp.sessionId; + if (sid == null) + return; // sid = new SessionId(foo) ??? + _context.jobQueue().addJob(new MessageDeliveryStatusUpdate(sid, id, messageNonce, status)); } /** @@ -344,19 +562,23 @@ class ClientConnectionRunner { * updated. This takes care of all the LeaseRequestState stuff (including firing any jobs) */ void leaseSetCreated(LeaseSet ls) { - LeaseRequestState state = null; + Hash h = ls.getDestination().calculateHash(); + SessionParams sp = _sessions.get(h); + if (sp == null) + return; + LeaseRequestState state; synchronized (this) { - state = _leaseRequest; + state = sp.leaseRequest; if (state == null) { if (_log.shouldLog(Log.WARN)) _log.warn("LeaseRequest is null and we've received a new lease?! perhaps this is odd... " + ls); return; } else { state.setIsSuccessful(true); - _currentLeaseSet = ls; + setLeaseSet(ls); if (_log.shouldLog(Log.DEBUG)) _log.debug("LeaseSet created fully: " + state + " / " + ls); - _leaseRequest = null; + sp.leaseRequest = null; _consecutiveLeaseRequestFails = 0; } } @@ -425,12 +647,12 @@ class ClientConnectionRunner { if (_log.shouldLog(Log.DEBUG)) _log.debug("** Receiving message " + id.getMessageId() + " with payload of size " - + payload.getSize() + " for session " + _sessionId.getSessionId()); + + payload.getSize() + " for session " + message.getSessionId()); //long beforeDistribute = _context.clock().now(); // the following blocks as described above - SessionConfig cfg = _config; - if (cfg != null) - _manager.distributeMessage(cfg.getDestination(), dest, payload, + Destination fromDest = getDestination(message.getSessionId()); + if (fromDest != null) + _manager.distributeMessage(fromDest, dest, payload, id, message.getNonce(), expiration, flags); // else log error? //long timeToDistribute = _context.clock().now() - beforeDistribute; @@ -450,11 +672,9 @@ class ClientConnectionRunner { * @param id OUR id for the message * @param nonce HIS id for the message */ - void ackSendMessage(MessageId id, long nonce) { + void ackSendMessage(SessionId sid, MessageId id, long nonce) { if (_dontSendMSM || nonce == 0) return; - SessionId sid = _sessionId; - if (sid == null) return; if (_log.shouldLog(Log.DEBUG)) _log.debug("Acking message send [accepted]" + id + " / " + nonce + " for sessionId " + sid); @@ -476,6 +696,7 @@ class ClientConnectionRunner { /** * Asynchronously deliver the message to the current runner * + * @param fromDest generally null when from remote, non-null if from local */ void receiveMessage(Destination toDest, Destination fromDest, Payload payload) { if (_dead) return; @@ -489,9 +710,9 @@ class ClientConnectionRunner { * Send async abuse message to the client * */ - public void reportAbuse(String reason, int severity) { + public void reportAbuse(Destination dest, String reason, int severity) { if (_dead) return; - _context.jobQueue().addJob(new ReportAbuseJob(_context, this, reason, severity)); + _context.jobQueue().addJob(new ReportAbuseJob(_context, this, dest, reason, severity)); } /** @@ -524,12 +745,16 @@ class ClientConnectionRunner { // so the comparison will always work. int leases = set.getLeaseCount(); // synch so _currentLeaseSet isn't changed out from under us + LeaseSet current = null; synchronized (this) { - if (_currentLeaseSet != null && _currentLeaseSet.getLeaseCount() == leases) { + Destination dest = set.getDestination(); + if (dest != null) + current = getLeaseSet(dest.calculateHash()); + if (current != null && current.getLeaseCount() == leases) { for (int i = 0; i < leases; i++) { - if (! _currentLeaseSet.getLease(i).getTunnelId().equals(set.getLease(i).getTunnelId())) + if (! current.getLease(i).getTunnelId().equals(set.getLease(i).getTunnelId())) break; - if (! _currentLeaseSet.getLease(i).getGateway().equals(set.getLease(i).getGateway())) + if (! current.getLease(i).getGateway().equals(set.getLease(i).getGateway())) break; if (i == leases - 1) { if (_log.shouldLog(Log.INFO)) @@ -542,10 +767,14 @@ class ClientConnectionRunner { } } if (_log.shouldLog(Log.INFO)) - _log.info("Current leaseSet " + _currentLeaseSet + "\nNew leaseSet " + set); - LeaseRequestState state = null; + _log.info("Current leaseSet " + current + "\nNew leaseSet " + set); + Hash h = set.getDestination().calculateHash(); + SessionParams sp = _sessions.get(h); + if (sp == null) + return; + LeaseRequestState state; synchronized (this) { - state = _leaseRequest; + state = sp.leaseRequest; if (state != null) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Already requesting " + state); @@ -562,7 +791,7 @@ class ClientConnectionRunner { // fire onCreated? return; // already requesting } else { - _leaseRequest = state = new LeaseRequestState(onCreateJob, onFailedJob, _context.clock().now() + expirationTime, set); + sp.leaseRequest = state = new LeaseRequestState(onCreateJob, onFailedJob, _context.clock().now() + expirationTime, set); if (_log.shouldLog(Log.DEBUG)) _log.debug("New request: " + state); } @@ -693,6 +922,7 @@ class ClientConnectionRunner { private static final int MAX_REQUEUE = 60; // 30 sec. private class MessageDeliveryStatusUpdate extends JobImpl { + private final SessionId _sessId; private final MessageId _messageId; private final long _messageNonce; private final int _status; @@ -706,8 +936,9 @@ class ClientConnectionRunner { * @param messageNonce the client's ID for this message * @param status see I2CP MessageStatusMessage for success/failure codes */ - public MessageDeliveryStatusUpdate(MessageId id, long messageNonce, int status) { + public MessageDeliveryStatusUpdate(SessionId sid, MessageId id, long messageNonce, int status) { super(ClientConnectionRunner.this._context); + _sessId = sid; _messageId = id; _messageNonce = messageNonce; _status = status; @@ -723,7 +954,7 @@ class ClientConnectionRunner { MessageStatusMessage msg = new MessageStatusMessage(); msg.setMessageId(_messageId.getMessageId()); - msg.setSessionId(_sessionId.getSessionId()); + msg.setSessionId(_sessId.getSessionId()); // has to be >= 0, it is initialized to -1 msg.setNonce(_messageNonce); msg.setSize(0); @@ -734,12 +965,12 @@ class ClientConnectionRunner { // bug requeueing forever? failsafe _log.error("Abandon update for message " + _messageId + " to " + MessageStatusMessage.getStatusString(msg.getStatus()) - + " for session " + _sessionId.getSessionId()); + + " for " + _sessId); } else { if (_log.shouldLog(Log.WARN)) _log.warn("Almost send an update for message " + _messageId + " to " + MessageStatusMessage.getStatusString(msg.getStatus()) - + " for session " + _sessionId.getSessionId() + + " for " + _sessId + " before they knew the messageId! delaying .5s"); _lastTried = _context.clock().now(); requeue(REQUEUE_DELAY); @@ -774,14 +1005,14 @@ class ClientConnectionRunner { if (_log.shouldLog(Log.DEBUG)) _log.info("Updating message status for message " + _messageId + " to " + MessageStatusMessage.getStatusString(msg.getStatus()) - + " for session " + _sessionId.getSessionId() + + " for " + _sessId + " (with nonce=2), retrying after " + (_context.clock().now() - _lastTried)); } else { if (_log.shouldLog(Log.DEBUG)) _log.debug("Updating message status for message " + _messageId + " to " + MessageStatusMessage.getStatusString(msg.getStatus()) - + " for session " + _sessionId.getSessionId() + " (with nonce=2)"); + + " for " + _sessId + " (with nonce=2)"); } try { diff --git a/router/java/src/net/i2p/router/client/ClientManager.java b/router/java/src/net/i2p/router/client/ClientManager.java index 9a8850c9b..a875f0f32 100644 --- a/router/java/src/net/i2p/router/client/ClientManager.java +++ b/router/java/src/net/i2p/router/client/ClientManager.java @@ -51,9 +51,11 @@ class ClientManager { protected final List _listeners; // Destination --> ClientConnectionRunner // Locked for adds/removes but not lookups + // If a runner has multiple sessions it will be in here multiple times, one for each dest private final Map _runners; // Same as what's in _runners, but for fast lookup by Hash // Locked for adds/removes but not lookups + // If a runner has multiple sessions it will be in here multiple times, one for each dest private final Map _runnersByHash; // ClientConnectionRunner for clients w/out a Dest yet private final Set _pendingRunners; @@ -203,24 +205,44 @@ class ClientManager { } } + /** + * Remove all sessions for this runner. + */ public void unregisterConnection(ClientConnectionRunner runner) { - _log.warn("Unregistering (dropping) a client connection"); + if (_log.shouldLog(Log.WARN)) + _log.warn("Unregistering (dropping) a client connection"); synchronized (_pendingRunners) { _pendingRunners.remove(runner); } - if ( (runner.getConfig() != null) && (runner.getConfig().getDestination() != null) ) { - // after connection establishment - Destination dest = runner.getConfig().getDestination(); - synchronized (_runners) { - SessionId id = runner.getSessionId(); - if (id != null) - _runnerSessionIds.remove(id); + + List ids = runner.getSessionIds(); + List dests = runner.getDestinations(); + synchronized (_runners) { + for (SessionId id : ids) { + _runnerSessionIds.remove(id); + } + for (Destination dest : dests) { _runners.remove(dest); _runnersByHash.remove(dest.calculateHash()); } } } + /** + * Remove only the following session. Does not remove the runner if it has more. + * + * @since 0.9.19 + */ + public void unregisterSession(SessionId id, Destination dest) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Unregistering client session " + id); + synchronized (_runners) { + _runnerSessionIds.remove(id); + _runners.remove(dest); + _runnersByHash.remove(dest.calculateHash()); + } + } + /** * Add to the clients list. Check for a dup destination. * Side effect: Sets the session ID of the runner. @@ -228,8 +250,7 @@ class ClientManager { * * @return SessionStatusMessage return code, 1 for success, != 1 for failure */ - public int destinationEstablished(ClientConnectionRunner runner) { - Destination dest = runner.getConfig().getDestination(); + public int destinationEstablished(ClientConnectionRunner runner, Destination dest) { if (_log.shouldLog(Log.DEBUG)) _log.debug("DestinationEstablished called for destination " + dest.calculateHash().toBase64()); @@ -244,9 +265,10 @@ class ClientManager { } else { SessionId id = locked_getNextSessionId(); if (id != null) { - runner.setSessionId(id); + Hash h = dest.calculateHash(); + runner.setSessionId(h, id); _runners.put(dest, runner); - _runnersByHash.put(dest.calculateHash(), runner); + _runnersByHash.put(h, runner); rv = SessionStatusMessage.STATUS_CREATED; } else { rv = SessionStatusMessage.STATUS_REFUSED; @@ -310,8 +332,11 @@ class ClientManager { // sender went away return; } - ClientMessage msg = new ClientMessage(toDest, payload, runner.getConfig(), - runner.getConfig().getDestination(), msgId, + SessionConfig config = runner.getConfig(fromDest.calculateHash()); + if (config == null) + return; + ClientMessage msg = new ClientMessage(toDest, payload, config, + fromDest, msgId, messageNonce, expiration, flags); _ctx.clientMessagePool().add(msg, true); } @@ -347,7 +372,7 @@ class ClientManager { public void runJob() { _to.receiveMessage(_toDest, _fromDest, _payload); if (_from != null) { - _from.updateMessageDeliveryStatus(_msgId, _messageNonce, MessageStatusMessage.STATUS_SEND_SUCCESS_LOCAL); + _from.updateMessageDeliveryStatus(_fromDest, _msgId, _messageNonce, MessageStatusMessage.STATUS_SEND_SUCCESS_LOCAL); } } } @@ -410,7 +435,9 @@ class ClientManager { if (destHash == null) return true; ClientConnectionRunner runner = getRunner(destHash); if (runner == null) return true; - return !Boolean.parseBoolean(runner.getConfig().getOptions().getProperty(ClientManagerFacade.PROP_CLIENT_ONLY)); + SessionConfig config = runner.getConfig(destHash); + if (config == null) return true; + return !Boolean.parseBoolean(config.getOptions().getProperty(ClientManagerFacade.PROP_CLIENT_ONLY)); } /** @@ -437,7 +464,7 @@ class ClientManager { public SessionConfig getClientSessionConfig(Destination dest) { ClientConnectionRunner runner = getRunner(dest); if (runner != null) - return runner.getConfig(); + return runner.getConfig(dest.calculateHash()); else return null; } @@ -475,7 +502,7 @@ class ClientManager { if (_log.shouldLog(Log.DEBUG)) _log.debug("Delivering status " + status + " to " + fromDest.calculateHash() + " for message " + id); - runner.updateMessageDeliveryStatus(id, messageNonce, status); + runner.updateMessageDeliveryStatus(fromDest, id, messageNonce, status); } else { if (_log.shouldLog(Log.WARN)) _log.warn("Cannot deliver status " + status + " to " @@ -499,7 +526,7 @@ class ClientManager { if (dest != null) { ClientConnectionRunner runner = getRunner(dest); if (runner != null) { - runner.reportAbuse(reason, severity); + runner.reportAbuse(dest, reason, severity); } } else { for (Destination d : _runners.keySet()) { @@ -577,21 +604,22 @@ class ClientManager { public void runJob() { ClientConnectionRunner runner; - if (_msg.getDestination() != null) - runner = getRunner(_msg.getDestination()); + Destination dest = _msg.getDestination(); + if (dest != null) + runner = getRunner(dest); else runner = getRunner(_msg.getDestinationHash()); if (runner != null) { //_ctx.statManager().addRateData("client.receiveMessageSize", // _msg.getPayload().getSize(), 0); - runner.receiveMessage(_msg.getDestination(), null, _msg.getPayload()); + runner.receiveMessage(dest, null, _msg.getPayload()); } else { // no client connection... // we should pool these somewhere... if (_log.shouldLog(Log.WARN)) _log.warn("Message received but we don't have a connection to " - + _msg.getDestination() + "/" + _msg.getDestinationHash() + + dest + "/" + _msg.getDestinationHash() + " currently. DROPPED"); } } diff --git a/router/java/src/net/i2p/router/client/ClientManagerFacadeImpl.java b/router/java/src/net/i2p/router/client/ClientManagerFacadeImpl.java index 6168ab794..d4e476ad7 100644 --- a/router/java/src/net/i2p/router/client/ClientManagerFacadeImpl.java +++ b/router/java/src/net/i2p/router/client/ClientManagerFacadeImpl.java @@ -90,7 +90,7 @@ public class ClientManagerFacadeImpl extends ClientManagerFacade implements Inte for (Destination dest : _manager.getRunnerDestinations()) { ClientConnectionRunner runner = _manager.getRunner(dest); if ( (runner == null) || (runner.getIsDead())) continue; - LeaseSet ls = runner.getLeaseSet(); + LeaseSet ls = runner.getLeaseSet(dest.calculateHash()); if (ls == null) continue; // still building long howLongAgo = _context.clock().now() - ls.getEarliestLeaseDate(); diff --git a/router/java/src/net/i2p/router/client/ClientMessageEventListener.java b/router/java/src/net/i2p/router/client/ClientMessageEventListener.java index 382552ce6..b3f1ad1ca 100644 --- a/router/java/src/net/i2p/router/client/ClientMessageEventListener.java +++ b/router/java/src/net/i2p/router/client/ClientMessageEventListener.java @@ -195,12 +195,13 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi */ private void handleCreateSession(CreateSessionMessage message) { SessionConfig in = message.getSessionConfig(); + Destination dest = in.getDestination(); if (in.verifySignature()) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Signature verified correctly on create session message"); } else { // For now, we do NOT send a SessionStatusMessage - see javadoc above - int itype = in.getDestination().getCertificate().getCertificateType(); + int itype = dest.getCertificate().getCertificateType(); SigType stype = SigType.getByCode(itype); if (stype == null || !stype.isAvailable()) { _log.error("Client requested unsupported signature type " + itype); @@ -217,7 +218,7 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi if (!checkAuth(inProps)) return; - SessionId id = _runner.getSessionId(); + SessionId id = _runner.getSessionId(dest.calculateHash()); if (id != null) { _runner.disconnectClient("Already have session " + id); return; @@ -226,11 +227,12 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi // Copy over the whole config structure so we don't later corrupt it on // the client side if we change settings or later get a // ReconfigureSessionMessage - SessionConfig cfg = new SessionConfig(in.getDestination()); + SessionConfig cfg = new SessionConfig(dest); cfg.setSignature(in.getSignature()); Properties props = new Properties(); props.putAll(in.getOptions()); cfg.setOptions(props); + boolean isPrimary = _runner.getSessionIds().isEmpty(); int status = _runner.sessionEstablished(cfg); if (status != SessionStatusMessage.STATUS_CREATED) { // For now, we do NOT send a SessionStatusMessage - see javadoc above @@ -246,11 +248,29 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi _runner.disconnectClient(msg); return; } - sendStatusMessage(status); + sendStatusMessage(id, status); if (_log.shouldLog(Log.INFO)) - _log.info("Session " + _runner.getSessionId() + " established for " + _runner.getDestHash()); - startCreateSessionJob(); + _log.info("Session " + id + " established for " + dest.calculateHash()); + if (isPrimary) { + startCreateSessionJob(cfg); + } else { + SessionConfig pcfg = _runner.getPrimaryConfig(); + if (pcfg != null) { + /////////// + // new tunnel name etc. + ClientTunnelSettings settings = new ClientTunnelSettings(dest.calculateHash()); + // all the primary options, then the overrides from the alias + props.putAll(pcfg.getOptions()); + props.putAll(props); + settings.readFromProperties(props); + boolean ok = _context.tunnelManager().addAlias(dest, settings, pcfg.getDestination()); + if (!ok) { + _log.error("Add alias failed"); + // send status message... + } + } + } } /** @@ -296,8 +316,8 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi * @since 0.9.8 * */ - protected void startCreateSessionJob() { - _context.jobQueue().addJob(new CreateSessionJob(_context, _runner)); + protected void startCreateSessionJob(SessionConfig config) { + _context.jobQueue().addJob(new CreateSessionJob(_context, config)); } /** @@ -311,7 +331,8 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi long beforeDistribute = _context.clock().now(); MessageId id = _runner.distributeMessage(message); long timeToDistribute = _context.clock().now() - beforeDistribute; - _runner.ackSendMessage(id, message.getNonce()); + // TODO validate session id + _runner.ackSendMessage(message.getSessionId(), id, message.getNonce()); _context.statManager().addRateData("client.distributeTime", timeToDistribute); if ( (timeToDistribute > 50) && (_log.shouldLog(Log.INFO)) ) _log.info("Took too long to distribute the message (which holds up the ack): " + timeToDistribute); @@ -328,7 +349,8 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi _log.debug("Handling recieve begin: id = " + message.getMessageId()); MessagePayloadMessage msg = new MessagePayloadMessage(); msg.setMessageId(message.getMessageId()); - msg.setSessionId(_runner.getSessionId().getSessionId()); + // TODO validate session id + msg.setSessionId(message.getSessionId()); Payload payload = _runner.getPayload(new MessageId(message.getMessageId())); if (payload == null) { if (_log.shouldLog(Log.WARN)) @@ -357,9 +379,18 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi } private void handleDestroySession(DestroySessionMessage message) { - if (_log.shouldLog(Log.INFO)) - _log.info("Destroying client session " + _runner.getSessionId()); - _runner.stopRunning(); + SessionId id = message.getSessionId(); + SessionConfig cfg = _runner.getConfig(id); + _runner.removeSession(id); + int left = _runner.getSessionIds().size(); + if (left <= 0) { + _runner.stopRunning(); + } else { + if (cfg != null) + _context.tunnelManager().removeAlias(cfg.getDestination()); + if (_log.shouldLog(Log.INFO)) + _log.info("Still " + left + " sessions left"); + } } /** override for testing */ @@ -370,7 +401,13 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi _runner.disconnectClient("Invalid CreateLeaseSetMessage"); return; } - Destination dest = _runner.getConfig().getDestination(); + SessionId id = message.getSessionId(); + SessionConfig config = _runner.getConfig(id); + if (config == null) { + _log.error("Unknown session in CLSM"); + return; + } + Destination dest = config.getDestination(); Destination ndest = message.getLeaseSet().getDestination(); if (!dest.equals(ndest)) { if (_log.shouldLog(Log.ERROR)) @@ -414,8 +451,7 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi return; } if (_log.shouldLog(Log.INFO)) - _log.info("New lease set granted for destination " - + _runner.getDestHash()); + _log.info("New lease set granted for destination " + dest); // leaseSetCreated takes care of all the LeaseRequestState stuff (including firing any jobs) _runner.leaseSetCreated(message.getLeaseSet()); @@ -423,6 +459,7 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi /** override for testing */ protected void handleDestLookup(DestLookupMessage message) { + // no session id in DLM _context.jobQueue().addJob(new LookupDestJob(_context, _runner, message.getHash(), _runner.getDestHash())); } @@ -432,10 +469,12 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi * @since 0.9.11 */ protected void handleHostLookup(HostLookupMessage message) { + Hash h = _runner.getDestHash(message.getSessionId()); + if (h == null) + return; // ok? _context.jobQueue().addJob(new LookupDestJob(_context, _runner, message.getReqID(), message.getTimeout(), message.getSessionId(), - message.getHash(), message.getHostname(), - _runner.getDestHash())); + message.getHash(), message.getHostname(), h)); } /** @@ -447,32 +486,37 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi * ClientConnectionRunner.sessionEstablished(). Those can't be changed later. */ private void handleReconfigureSession(ReconfigureSessionMessage message) { + SessionId id = message.getSessionId(); + SessionConfig config = _runner.getConfig(id); + if (config == null) { + _log.error("Unknown session"); + sendStatusMessage(id, SessionStatusMessage.STATUS_INVALID); + //_runner.stopRunning(); // ok? + return; + } if (_log.shouldLog(Log.INFO)) - _log.info("Updating options - old: " + _runner.getConfig() + " new: " + message.getSessionConfig()); - if (!message.getSessionConfig().getDestination().equals(_runner.getConfig().getDestination())) { + _log.info("Updating options - old: " + _runner.getConfig(id) + " new: " + message.getSessionConfig()); + if (!message.getSessionConfig().getDestination().equals(config.getDestination())) { _log.error("Dest mismatch"); - sendStatusMessage(SessionStatusMessage.STATUS_INVALID); + sendStatusMessage(id, SessionStatusMessage.STATUS_INVALID); _runner.stopRunning(); return; } - _runner.getConfig().getOptions().putAll(message.getSessionConfig().getOptions()); - Hash dest = _runner.getDestHash(); + Hash dest = config.getDestination().calculateHash(); + config.getOptions().putAll(message.getSessionConfig().getOptions()); ClientTunnelSettings settings = new ClientTunnelSettings(dest); Properties props = new Properties(); - props.putAll(_runner.getConfig().getOptions()); + props.putAll(config.getOptions()); settings.readFromProperties(props); _context.tunnelManager().setInboundSettings(dest, settings.getInboundSettings()); _context.tunnelManager().setOutboundSettings(dest, settings.getOutboundSettings()); - sendStatusMessage(SessionStatusMessage.STATUS_UPDATED); + sendStatusMessage(id, SessionStatusMessage.STATUS_UPDATED); } - private void sendStatusMessage(int status) { + private void sendStatusMessage(SessionId id, int status) { SessionStatusMessage msg = new SessionStatusMessage(); - SessionId id = _runner.getSessionId(); - if (id == null) - id = ClientManager.UNKNOWN_SESSION_ID; msg.setSessionId(id); msg.setStatus(status); try { diff --git a/router/java/src/net/i2p/router/client/CreateSessionJob.java b/router/java/src/net/i2p/router/client/CreateSessionJob.java index af01a496c..558dba8e7 100644 --- a/router/java/src/net/i2p/router/client/CreateSessionJob.java +++ b/router/java/src/net/i2p/router/client/CreateSessionJob.java @@ -26,25 +26,20 @@ import net.i2p.util.Log; */ class CreateSessionJob extends JobImpl { private final Log _log; - private final ClientConnectionRunner _runner; + private final SessionConfig _config; - public CreateSessionJob(RouterContext context, ClientConnectionRunner runner) { + public CreateSessionJob(RouterContext context, SessionConfig config) { super(context); _log = context.logManager().getLog(CreateSessionJob.class); - _runner = runner; + _config = config; if (_log.shouldLog(Log.DEBUG)) - _log.debug("CreateSessionJob for runner " + _runner + " / config: " + _runner.getConfig()); + _log.debug("CreateSessionJob for config: " + config); } public String getName() { return "Request tunnels for a new client"; } + public void runJob() { - SessionConfig cfg = _runner.getConfig(); - if ( (cfg == null) || (cfg.getDestination() == null) ) { - if (_log.shouldLog(Log.ERROR)) - _log.error("No session config on runner " + _runner); - return; - } - Hash dest = cfg.getDestination().calculateHash(); + Hash dest = _config.getDestination().calculateHash(); if (_log.shouldLog(Log.INFO)) _log.info("Requesting lease set for destination " + dest); ClientTunnelSettings settings = new ClientTunnelSettings(dest); @@ -61,10 +56,10 @@ class CreateSessionJob extends JobImpl { // XXX props.putAll(Router.getInstance().getConfigMap()); // override them by the client's settings - props.putAll(cfg.getOptions()); + props.putAll(_config.getOptions()); // and load 'em up (using anything not yet set as the software defaults) settings.readFromProperties(props); - getContext().tunnelManager().buildTunnels(cfg.getDestination(), settings); + getContext().tunnelManager().buildTunnels(_config.getDestination(), settings); } } diff --git a/router/java/src/net/i2p/router/client/LeaseRequestState.java b/router/java/src/net/i2p/router/client/LeaseRequestState.java index 67968df3b..1fb50620c 100644 --- a/router/java/src/net/i2p/router/client/LeaseRequestState.java +++ b/router/java/src/net/i2p/router/client/LeaseRequestState.java @@ -40,6 +40,7 @@ class LeaseRequestState { /** created lease set from client - FIXME always null */ public LeaseSet getGranted() { return _grantedLeaseSet; } + /** FIXME unused - why? */ public void setGranted(LeaseSet ls) { _grantedLeaseSet = ls; } diff --git a/router/java/src/net/i2p/router/client/MessageReceivedJob.java b/router/java/src/net/i2p/router/client/MessageReceivedJob.java index 0ee148282..7ce5f5cba 100644 --- a/router/java/src/net/i2p/router/client/MessageReceivedJob.java +++ b/router/java/src/net/i2p/router/client/MessageReceivedJob.java @@ -14,6 +14,7 @@ import net.i2p.data.i2cp.I2CPMessageException; import net.i2p.data.i2cp.MessageId; import net.i2p.data.i2cp.MessagePayloadMessage; import net.i2p.data.i2cp.MessageStatusMessage; +import net.i2p.data.i2cp.SessionId; import net.i2p.router.JobImpl; import net.i2p.router.RouterContext; import net.i2p.util.Log; @@ -26,14 +27,20 @@ import net.i2p.util.Log; class MessageReceivedJob extends JobImpl { private final Log _log; private final ClientConnectionRunner _runner; + private final Destination _toDest; private final Payload _payload; private final boolean _sendDirect; + /** + * @param toDest requred to pick session + * @param fromDest ignored, generally null + */ public MessageReceivedJob(RouterContext ctx, ClientConnectionRunner runner, Destination toDest, Destination fromDest, Payload payload, boolean sendDirect) { super(ctx); _log = ctx.logManager().getLog(MessageReceivedJob.class); _runner = runner; + _toDest = toDest; _payload = payload; _sendDirect = sendDirect; } @@ -43,8 +50,8 @@ class MessageReceivedJob extends JobImpl { public void runJob() { if (_runner.isDead()) return; MessageId id = null; - long nextID = _runner.getNextMessageId(); try { + long nextID = _runner.getNextMessageId(); if (_sendDirect) { sendMessage(nextID); } else { @@ -55,7 +62,7 @@ class MessageReceivedJob extends JobImpl { } catch (I2CPMessageException ime) { if (_log.shouldLog(Log.WARN)) _log.warn("Error writing out the message", ime); - if (!_sendDirect) + if (id != null && !_sendDirect) _runner.removePayload(id); } } @@ -69,7 +76,13 @@ class MessageReceivedJob extends JobImpl { // + " (with nonce=1)", new Exception("available")); MessageStatusMessage msg = new MessageStatusMessage(); msg.setMessageId(id.getMessageId()); - msg.setSessionId(_runner.getSessionId().getSessionId()); + SessionId sid = _runner.getSessionId(_toDest.calculateHash()); + if (sid == null) { + if (_log.shouldLog(Log.WARN)) + _log.warn("No session for " + _toDest.calculateHash()); + return; + } + msg.setSessionId(sid.getSessionId()); msg.setSize(size); // has to be >= 0, it is initialized to -1 msg.setNonce(1); @@ -84,7 +97,13 @@ class MessageReceivedJob extends JobImpl { private void sendMessage(long id) throws I2CPMessageException { MessagePayloadMessage msg = new MessagePayloadMessage(); msg.setMessageId(id); - msg.setSessionId(_runner.getSessionId().getSessionId()); + SessionId sid = _runner.getSessionId(_toDest.calculateHash()); + if (sid == null) { + if (_log.shouldLog(Log.WARN)) + _log.warn("No session for " + _toDest.calculateHash()); + return; + } + msg.setSessionId(sid.getSessionId()); msg.setPayload(_payload); _runner.doSend(msg); } diff --git a/router/java/src/net/i2p/router/client/ReportAbuseJob.java b/router/java/src/net/i2p/router/client/ReportAbuseJob.java index 8dd36ac3c..c24e5e9ba 100644 --- a/router/java/src/net/i2p/router/client/ReportAbuseJob.java +++ b/router/java/src/net/i2p/router/client/ReportAbuseJob.java @@ -8,10 +8,12 @@ package net.i2p.router.client; * */ +import net.i2p.data.Destination; import net.i2p.data.i2cp.AbuseReason; import net.i2p.data.i2cp.AbuseSeverity; import net.i2p.data.i2cp.I2CPMessageException; import net.i2p.data.i2cp.ReportAbuseMessage; +import net.i2p.data.i2cp.SessionId; import net.i2p.router.JobImpl; import net.i2p.router.RouterContext; import net.i2p.util.Log; @@ -23,17 +25,22 @@ import net.i2p.util.Log; class ReportAbuseJob extends JobImpl { private final Log _log; private final ClientConnectionRunner _runner; + private final Destination _dest; private final String _reason; private final int _severity; - public ReportAbuseJob(RouterContext context, ClientConnectionRunner runner, String reason, int severity) { + + public ReportAbuseJob(RouterContext context, ClientConnectionRunner runner, + Destination dest, String reason, int severity) { super(context); _log = context.logManager().getLog(ReportAbuseJob.class); _runner = runner; + _dest = dest; _reason = reason; _severity = severity; } public String getName() { return "Report Abuse"; } + public void runJob() { if (_runner.isDead()) return; AbuseReason res = new AbuseReason(); @@ -41,9 +48,11 @@ class ReportAbuseJob extends JobImpl { AbuseSeverity sev = new AbuseSeverity(); sev.setSeverity(_severity); ReportAbuseMessage msg = new ReportAbuseMessage(); - msg.setMessageId(null); msg.setReason(res); - msg.setSessionId(_runner.getSessionId()); + SessionId id = _runner.getSessionId(_dest.calculateHash()); + if (id == null) + return; + msg.setSessionId(id); msg.setSeverity(sev); try { _runner.doSend(msg); diff --git a/router/java/src/net/i2p/router/client/RequestLeaseSetJob.java b/router/java/src/net/i2p/router/client/RequestLeaseSetJob.java index 9b07978f6..f5ec343a0 100644 --- a/router/java/src/net/i2p/router/client/RequestLeaseSetJob.java +++ b/router/java/src/net/i2p/router/client/RequestLeaseSetJob.java @@ -16,6 +16,7 @@ import net.i2p.data.i2cp.I2CPMessage; import net.i2p.data.i2cp.I2CPMessageException; import net.i2p.data.i2cp.RequestLeaseSetMessage; import net.i2p.data.i2cp.RequestVariableLeaseSetMessage; +import net.i2p.data.i2cp.SessionId; import net.i2p.router.JobImpl; import net.i2p.router.RouterContext; import net.i2p.util.Log; @@ -63,13 +64,16 @@ class RequestLeaseSetJob extends JobImpl { // _log.debug("Adding fudge " + fudge); endTime += fudge; + SessionId id = _runner.getSessionId(_requestState.getRequested().getDestination().calculateHash()); + if (id == null) + return; I2CPMessage msg; if (getContext().getProperty(PROP_VARIABLE, DFLT_VARIABLE) && (_runner instanceof QueuedClientConnectionRunner || RequestVariableLeaseSetMessage.isSupported(_runner.getClientVersion()))) { // new style - leases will have individual expirations RequestVariableLeaseSetMessage rmsg = new RequestVariableLeaseSetMessage(); - rmsg.setSessionId(_runner.getSessionId()); + rmsg.setSessionId(id); for (int i = 0; i < requested.getLeaseCount(); i++) { Lease lease = requested.getLease(i); if (lease.getEndDate().getTime() < endTime) { @@ -90,7 +94,7 @@ class RequestLeaseSetJob extends JobImpl { RequestLeaseSetMessage rmsg = new RequestLeaseSetMessage(); Date end = new Date(endTime); rmsg.setEndDate(end); - rmsg.setSessionId(_runner.getSessionId()); + rmsg.setSessionId(id); for (int i = 0; i < requested.getLeaseCount(); i++) { Lease lease = requested.getLease(i); rmsg.addEndpoint(lease.getGateway(), @@ -144,8 +148,7 @@ class RequestLeaseSetJob extends JobImpl { CheckLeaseRequestStatus.this.getContext().statManager().addRateData("client.requestLeaseSetTimeout", 1); if (_log.shouldLog(Log.ERROR)) { long waited = System.currentTimeMillis() - _start; - _log.error("Failed to receive a leaseSet in the time allotted (" + waited + "): " + _requestState + " for " - + _runner.getConfig().getDestination().calculateHash().toBase64()); + _log.error("Failed to receive a leaseSet in the time allotted (" + waited + "): " + _requestState); } if (_requestState.getOnFailed() != null) RequestLeaseSetJob.this.getContext().jobQueue().addJob(_requestState.getOnFailed()); diff --git a/router/java/src/net/i2p/router/dummy/DummyTunnelManagerFacade.java b/router/java/src/net/i2p/router/dummy/DummyTunnelManagerFacade.java index 5de3db1a3..96dceea18 100644 --- a/router/java/src/net/i2p/router/dummy/DummyTunnelManagerFacade.java +++ b/router/java/src/net/i2p/router/dummy/DummyTunnelManagerFacade.java @@ -50,6 +50,8 @@ public class DummyTunnelManagerFacade implements TunnelManagerFacade { public int getOutboundClientTunnelCount(Hash destination) { return 0; } public long getLastParticipatingExpiration() { return -1; } public void buildTunnels(Destination client, ClientTunnelSettings settings) {} + public boolean addAlias(Destination dest, ClientTunnelSettings settings, Destination existingClient) { return false; } + public void removeAlias(Destination dest) {} public TunnelPoolSettings getInboundSettings() { return null; } public TunnelPoolSettings getOutboundSettings() { return null; } public TunnelPoolSettings getInboundSettings(Hash client) { return null; } diff --git a/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java b/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java index 258e01046..4159014f2 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java +++ b/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java @@ -8,6 +8,7 @@ import java.util.Date; import java.util.Iterator; import java.util.List; import java.util.Properties; +import java.util.Set; import java.util.TreeSet; import net.i2p.data.Hash; @@ -609,6 +610,12 @@ public class TunnelPool { } if (ls != null) { _context.clientManager().requestLeaseSet(_settings.getDestination(), ls); + Set aliases = _settings.getAliases(); + if (aliases != null && !aliases.isEmpty()) { + for (Hash h : aliases) { + _context.clientManager().requestLeaseSet(h, ls); + } + } } } } diff --git a/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java b/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java index d311d46a9..61fbb1ddd 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java +++ b/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java @@ -97,7 +97,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { ctx.statManager().createRateStat("tunnel.testAborted", "Tunnel test could not occur, since there weren't any tunnels to test with", "Tunnels", RATES); } - + /** * Pick a random inbound exploratory tunnel. * Warning - selectInboundExploratoryTunnel(Hash) is preferred. @@ -113,7 +113,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { } return info; } - + /** * Pick a random inbound tunnel from the given destination's pool. * Warning - selectOutboundTunnel(Hash, Hash) is preferred. @@ -132,7 +132,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { " but there isn't a pool?"); return null; } - + /** * Pick a random outbound exploratory tunnel. * Warning - selectOutboundExploratoryTunnel(Hash) is preferred. @@ -148,7 +148,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { } return info; } - + /** * Pick a random outbound tunnel from the given destination's pool. * Warning - selectOutboundTunnel(Hash, Hash) is preferred. @@ -164,7 +164,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { } return null; } - + /** * Pick the inbound exploratory tunnel with the gateway closest to the given hash. * By using this instead of the random selectTunnel(), @@ -184,7 +184,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { } return info; } - + /** * Pick the inbound tunnel with the gateway closest to the given hash * from the given destination's pool. @@ -208,7 +208,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { " but there isn't a pool?"); return null; } - + /** * Pick the outbound exploratory tunnel with the endpoint closest to the given hash. * By using this instead of the random selectTunnel(), @@ -228,7 +228,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { } return info; } - + /** * Pick the outbound tunnel with the endpoint closest to the given hash * from the given destination's pool. @@ -249,7 +249,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { } return null; } - + /** * Expensive (iterates through all tunnels of all pools) and unnecessary. * @deprecated unused @@ -267,7 +267,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { if (info != null) return info; return null; } - + /** @return number of inbound exploratory tunnels */ public int getFreeTunnelCount() { return _inboundExploratory.size(); @@ -304,10 +304,11 @@ public class TunnelPoolManager implements TunnelManagerFacade { return pool.getTunnelCount(); return 0; } - + public int getParticipatingCount() { return _context.tunnelDispatcher().getParticipatingCount(); } + public long getLastParticipatingExpiration() { return _context.tunnelDispatcher().getLastParticipatingExpiration(); } - + /** * @return (number of part. tunnels) / (estimated total number of hops in our expl.+client tunnels) * 100 max. @@ -330,7 +331,6 @@ public class TunnelPoolManager implements TunnelManagerFacade { return Math.min(part / (double) count, 100d); } - public boolean isValidTunnel(Hash client, TunnelInfo tunnel) { if (tunnel.getExpiration() < _context.clock().now()) return false; @@ -386,17 +386,18 @@ public class TunnelPoolManager implements TunnelManagerFacade { pool.setSettings(settings); } } - + public synchronized void restart() { _handler.restart(); _executor.restart(); shutdownExploratory(); startup(); } - + /** * Used only at session startup. * Do not use to change settings. + * Do not use for aliased destinations; use addAlias(). */ public void buildTunnels(Destination client, ClientTunnelSettings settings) { Hash dest = client.calculateHash(); @@ -434,8 +435,68 @@ public class TunnelPoolManager implements TunnelManagerFacade { else outbound.startup(); } - - + + /** + * Add another destination to the same tunnels. + * Must have same encryption key an a different signing key. + * @throws IllegalArgumentException if not + * @return success + * @since 0.9.19 + */ + public boolean addAlias(Destination dest, ClientTunnelSettings settings, Destination existingClient) { + if (dest.getSigningPublicKey().equals(existingClient.getSigningPublicKey())) + throw new IllegalArgumentException("signing key must differ"); + if (!dest.getPublicKey().equals(existingClient.getPublicKey())) + throw new IllegalArgumentException("encryption key mismatch"); + Hash h = dest.calculateHash(); + Hash e = existingClient.calculateHash(); + synchronized(this) { + TunnelPool inbound = _clientInboundPools.get(e); + TunnelPool outbound = _clientOutboundPools.get(e); +/////// gah same tunnel pool or different? + if (inbound == null || outbound == null) + return false; + _clientInboundPools.put(h, inbound); + _clientOutboundPools.put(h, outbound); + } + return true; + } + + /** + * Remove a destination for the same tunnels as another. + * @since 0.9.19 + */ + public void removeAlias(Destination dest) { + Hash h = dest.calculateHash(); + synchronized(this) { + TunnelPool inbound = _clientInboundPools.remove(h); + if (inbound != null) { + Hash p = inbound.getSettings().getAliasOf(); + if (p != null) { + TunnelPool pri = _clientInboundPools.get(p); + if (pri != null) { + Set aliases = pri.getSettings().getAliases(); + if (aliases != null) + aliases.remove(h); + } + } + } + TunnelPool outbound = _clientOutboundPools.remove(h); + if (outbound != null) { + Hash p = inbound.getSettings().getAliasOf(); + if (p != null) { + TunnelPool pri = _clientInboundPools.get(p); + if (pri != null) { + Set aliases = pri.getSettings().getAliases(); + if (aliases != null) + aliases.remove(h); + } + } + } + // TODO if primary already vanished... + } + } + private static class DelayedStartup implements SimpleTimer.TimedEvent { private final TunnelPool pool; @@ -469,7 +530,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { if (outbound != null) outbound.shutdown(); } - + /** queue a recurring test job if appropriate */ void buildComplete(PooledTunnelCreatorConfig cfg) { if (cfg.getLength() > 1 && @@ -518,7 +579,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { _context.jobQueue().addJob(new BootstrapPool(_context, _inboundExploratory)); _context.jobQueue().addJob(new BootstrapPool(_context, _outboundExploratory)); } - + private static class BootstrapPool extends JobImpl { private TunnelPool _pool; public BootstrapPool(RouterContext ctx, TunnelPool pool) { @@ -531,7 +592,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { _pool.buildFallback(); } } - + /** * Cannot be restarted */ @@ -546,7 +607,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { _inboundExploratory.shutdown(); _outboundExploratory.shutdown(); } - + /** list of TunnelPool instances currently in play */ public void listPools(List out) { out.addAll(_clientInboundPools.values());