From 91e98ba447ecbaf12ec635c0ee7d5f5fb08ee7d8 Mon Sep 17 00:00:00 2001 From: zzz Date: Sat, 18 Apr 2015 19:01:23 +0000 Subject: [PATCH] I2CP Multisession Work in progress: Fix NPE in requestLeaseSet() Fix setting new session ID in SessionStatusMessage Fix subsession support detection Streaming: one socket manager, multiple connection managers. Change data structure for subessions in socket manager Subsession cleanup on destroy I2PTunnel: add DSA subsession for non-DSA shared client Javadocs --- .../i2p/i2ptunnel/I2PTunnelClientBase.java | 36 ++++++++++++++++++ .../streaming/impl/ConnectionManager.java | 3 +- .../streaming/impl/I2PSocketManagerFull.java | 37 +++++++++++++++---- .../src/net/i2p/client/I2PSessionImpl.java | 8 +++- .../router/client/ClientConnectionRunner.java | 27 +++++++++----- .../net/i2p/router/client/ClientManager.java | 16 ++++++-- .../client/ClientManagerFacadeImpl.java | 10 +++++ .../client/ClientMessageEventListener.java | 3 ++ .../i2p/router/client/LeaseRequestState.java | 3 ++ 9 files changed, 120 insertions(+), 23 deletions(-) diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java index a2c408973..5ca28ce36 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java @@ -31,6 +31,7 @@ import net.i2p.client.streaming.I2PSocket; import net.i2p.client.streaming.I2PSocketManager; import net.i2p.client.streaming.I2PSocketManagerFactory; import net.i2p.client.streaming.I2PSocketOptions; +import net.i2p.crypto.SigType; import net.i2p.data.Destination; import net.i2p.util.EventDispatcher; import net.i2p.util.I2PAppThread; @@ -291,6 +292,10 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna socketManager.destroySocketManager(); // We could be here a LONG time, holding the lock socketManager = buildSocketManager(tunnel, pkf); + // FIXME may not be the right place for this + I2PSession sub = addSubsession(tunnel); + if (sub != null && _log.shouldLog(Log.WARN)) + _log.warn("Added subsession " + sub); } else { if (_log.shouldLog(Log.INFO)) _log.info(tunnel.getClientOptions().getProperty("inbound.nickname") + ": Not building a new socket manager since the old one is open [s=" + s + "]"); @@ -303,10 +308,41 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna if (_log.shouldLog(Log.INFO)) _log.info(tunnel.getClientOptions().getProperty("inbound.nickname") + ": Building a new socket manager since there is no other one"); socketManager = buildSocketManager(tunnel, pkf); + I2PSession sub = addSubsession(tunnel); + if (sub != null && _log.shouldLog(Log.WARN)) + _log.warn("Added subsession " + sub); } return socketManager; } + /** + * Add a subsession to a shared client if necessary. + * + * @since 0.9.20 + */ + protected static synchronized I2PSession addSubsession(I2PTunnel tunnel) { + I2PSession sess = socketManager.getSession(); + if (sess.getMyDestination().getSigType() == SigType.DSA_SHA1) + return null; + Properties props = new Properties(); + props.putAll(tunnel.getClientOptions()); + String name = props.getProperty("inbound.nickname"); + if (name != null) + props.setProperty("inbound.nickname", name + " (DSA)"); + name = props.getProperty("outbound.nickname"); + if (name != null) + props.setProperty("outbound.nickname", name + " (DSA)"); + // TODO set sig type in props? + try { + return socketManager.addSubsession(null, props); + } catch (I2PSessionException ise) { + Log log = tunnel.getContext().logManager().getLog(I2PTunnelClientBase.class); + if (log.shouldLog(Log.WARN)) + log.warn("Failed to add subssession", ise); + return null; + } + } + /** * Kill the shared client, so that on restart in android * we won't latch onto the old one diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java index 950670efc..e298a9321 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java @@ -429,7 +429,8 @@ class ConnectionManager { // try { _connectionLock.wait(remaining); } catch (InterruptedException ie) {} try { Thread.sleep(remaining/4); } catch (InterruptedException ie) {} } else { - con = new Connection(_context, this, _schedulerChooser, _timer, _outboundQueue, _conPacketHandler, opts, false); + con = new Connection(_context, this, _schedulerChooser, _timer, + _outboundQueue, _conPacketHandler, opts, false); con.setRemotePeer(peer); assignReceiveStreamId(con); break; // stop looping as a psuedo-wait diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketManagerFull.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketManagerFull.java index 26a7c6e7e..ff9be1351 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketManagerFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketManagerFull.java @@ -11,7 +11,7 @@ import java.util.HashSet; import java.util.List; import java.util.Properties; import java.util.Set; -import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -40,7 +40,7 @@ public class I2PSocketManagerFull implements I2PSocketManager { private final I2PAppContext _context; private final Log _log; private final I2PSession _session; - private final List _subsessions; + private final ConcurrentHashMap _subsessions; private final I2PServerSocketFull _serverSocket; private StandardServerSocket _realServerSocket; private final ConnectionOptions _defaultOptions; @@ -84,7 +84,7 @@ public class I2PSocketManagerFull implements I2PSocketManager { public I2PSocketManagerFull(I2PAppContext context, I2PSession session, Properties opts, String name) { _context = context; _session = session; - _subsessions = new CopyOnWriteArrayList(); + _subsessions = new ConcurrentHashMap(4); _log = _context.logManager().getLog(I2PSocketManagerFull.class); _name = name + " " + (__managerId.incrementAndGet()); @@ -125,7 +125,6 @@ public class I2PSocketManagerFull implements I2PSocketManager { return _session; } -//////////// gahhh we want a socket manager, not a session /** * @return a new subsession, non-null * @param privateKeyStream null for transient, if non-null must have same encryption keys as primary session @@ -135,7 +134,17 @@ public class I2PSocketManagerFull implements I2PSocketManager { */ public I2PSession addSubsession(InputStream privateKeyStream, Properties opts) throws I2PSessionException { I2PSession rv = _session.addSubsession(privateKeyStream, opts); - _subsessions.add(rv); + ConnectionOptions defaultOptions = new ConnectionOptions(opts); + ConnectionManager connectionManager = new ConnectionManager(_context, rv, defaultOptions); + ConnectionManager old = _subsessions.putIfAbsent(rv, connectionManager); + if (old != null) { + // shouldn't happen + _session.removeSubsession(rv); + connectionManager.shutdown(); + throw new I2PSessionException("dup"); + } + if (_log.shouldLog(Log.WARN)) + _log.warn("Added subsession " + rv); return rv; } @@ -146,8 +155,15 @@ public class I2PSocketManagerFull implements I2PSocketManager { */ public void removeSubsession(I2PSession session) { _session.removeSubsession(session); - _subsessions.remove(session); - // ... + ConnectionManager cm = _subsessions.remove(session); + if (cm != null) { + cm.shutdown(); + if (_log.shouldLog(Log.WARN)) + _log.warn("Removeed subsession " + session); + } else { + if (_log.shouldLog(Log.WARN)) + _log.warn("Subsession not found to remove " + session); + } } /** @@ -335,6 +351,7 @@ public class I2PSocketManagerFull implements I2PSocketManager { if (_log.shouldLog(Log.INFO)) _log.info("Connecting to " + peer.calculateHash().toBase64().substring(0,6) + " with options: " + opts); +// fixme pick the subsession here // the following blocks unless connect delay > 0 Connection con = _connectionManager.connect(peer, opts); if (con == null) @@ -419,6 +436,12 @@ public class I2PSocketManagerFull implements I2PSocketManager { } _connectionManager.setAllowIncomingConnections(false); _connectionManager.shutdown(); + if (!_subsessions.isEmpty()) { + for (I2PSession sess : _subsessions.keySet()) { + removeSubsession(sess); + } + } + // should we destroy the _session too? // yes, since the old lib did (and SAM wants it to, and i dont know why not) if ( (_session != null) && (!_session.isClosed()) ) { diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java index e711f04c4..2a0f9ed4a 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl.java @@ -267,6 +267,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa } _routerSupportsFastReceive = _context.isRouterContext(); _routerSupportsHostLookup = _context.isRouterContext(); + _routerSupportsSubsessions = _context.isRouterContext(); } /** @@ -338,7 +339,10 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa if (id != null) _subsessionMap.remove(id); /// tell the subsession - ///.... + try { + // doesn't really throw + session.destroySession(); + } catch (I2PSessionException ise) {} } } @@ -893,7 +897,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa synchronized (_subsessionLock) { for (SubSession sess : _subsessions) { if (sess.getSessionId() == null) { - sub.messageReceived(reader, message); + sess.messageReceived(reader, message); id = sess.getSessionId(); if (id != null) { if (id.equals(_sessionId)) { diff --git a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java index 79fa79b2c..3acdfdadc 100644 --- a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java +++ b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java @@ -723,13 +723,15 @@ class ClientConnectionRunner { * within the timeout specified, queue up the onFailedJob. This call does not * block. * + * @param h the Destination's hash * @param set LeaseSet with requested leases - this object must be updated to contain the * signed version (as well as any changed/added/removed Leases) + * The LeaseSet contains Leases and destination only, it is unsigned. * @param expirationTime ms to wait before failing * @param onCreateJob Job to run after the LeaseSet is authorized, null OK * @param onFailedJob Job to run after the timeout passes without receiving authorization, null OK */ - void requestLeaseSet(LeaseSet set, long expirationTime, Job onCreateJob, Job onFailedJob) { + void requestLeaseSet(Hash h, LeaseSet set, long expirationTime, Job onCreateJob, Job onFailedJob) { if (_dead) { if (_log.shouldLog(Log.WARN)) _log.warn("Requesting leaseSet from a dead client: " + set); @@ -737,6 +739,12 @@ class ClientConnectionRunner { _context.jobQueue().addJob(onFailedJob); return; } + SessionParams sp = _sessions.get(h); + if (sp == null) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Requesting leaseSet for an unknown sesssion"); + return; + } // We can't use LeaseSet.equals() here because the dest, keys, and sig on // the new LeaseSet are null. So we compare leases one by one. // In addition, the client rewrites the expiration time of all the leases to @@ -748,10 +756,9 @@ class ClientConnectionRunner { int leases = set.getLeaseCount(); // synch so _currentLeaseSet isn't changed out from under us LeaseSet current = null; + Destination dest = sp.dest; synchronized (this) { - Destination dest = set.getDestination(); - if (dest != null) - current = getLeaseSet(dest.calculateHash()); + current = sp.currentLeaseSet; if (current != null && current.getLeaseCount() == leases) { for (int i = 0; i < leases; i++) { if (! current.getLease(i).getTunnelId().equals(set.getLease(i).getTunnelId())) @@ -770,10 +777,6 @@ class ClientConnectionRunner { } if (_log.shouldLog(Log.INFO)) _log.info("Current leaseSet " + current + "\nNew leaseSet " + set); - Hash h = set.getDestination().calculateHash(); - SessionParams sp = _sessions.get(h); - if (sp == null) - return; LeaseRequestState state; synchronized (this) { state = sp.leaseRequest; @@ -788,12 +791,15 @@ class ClientConnectionRunner { // theirs is newer } else { // ours is newer, so wait a few secs and retry + set.setDestination(dest); _context.simpleTimer2().addEvent(new Rerequest(set, expirationTime, onCreateJob, onFailedJob), 3*1000); } // fire onCreated? return; // already requesting } else { - sp.leaseRequest = state = new LeaseRequestState(onCreateJob, onFailedJob, _context.clock().now() + expirationTime, set); + set.setDestination(dest); + sp.leaseRequest = state = new LeaseRequestState(onCreateJob, onFailedJob, + _context.clock().now() + expirationTime, set); if (_log.shouldLog(Log.DEBUG)) _log.debug("New request: " + state); } @@ -807,6 +813,7 @@ class ClientConnectionRunner { private final Job _onCreate; private final Job _onFailed; + /** @param ls dest must be set */ public Rerequest(LeaseSet ls, long expirationTime, Job onCreate, Job onFailed) { _ls = ls; _expirationTime = expirationTime; @@ -815,7 +822,7 @@ class ClientConnectionRunner { } public void timeReached() { - requestLeaseSet(_ls, _expirationTime, _onCreate, _onFailed); + requestLeaseSet(_ls.getDestination().calculateHash(), _ls, _expirationTime, _onCreate, _onFailed); } } diff --git a/router/java/src/net/i2p/router/client/ClientManager.java b/router/java/src/net/i2p/router/client/ClientManager.java index 10bfdf7e4..6174973e4 100644 --- a/router/java/src/net/i2p/router/client/ClientManager.java +++ b/router/java/src/net/i2p/router/client/ClientManager.java @@ -392,7 +392,8 @@ class ClientManager { * * @param dest Destination from which the LeaseSet's authorization should be requested * @param set LeaseSet with requested leases - this object must be updated to contain the - * signed version (as well as any changed/added/removed Leases) + * signed version (as well as any changed/added/removed Leases). + * The LeaseSet contains Leases only; it is unsigned and does not have the destination set. * @param timeout ms to wait before failing * @param onCreateJob Job to run after the LeaseSet is authorized * @param onFailedJob Job to run after the timeout passes without receiving authorization @@ -405,15 +406,24 @@ class ClientManager { + dest.calculateHash().toBase64() + ". disconnected?"); _ctx.jobQueue().addJob(onFailedJob); } else { - runner.requestLeaseSet(set, timeout, onCreateJob, onFailedJob); + runner.requestLeaseSet(dest.calculateHash(), set, timeout, onCreateJob, onFailedJob); } } + /** + * Request that a particular client authorize the Leases contained in the + * LeaseSet. + * + * @param dest Destination from which the LeaseSet's authorization should be requested + * @param ls LeaseSet with requested leases - this object must be updated to contain the + * signed version (as well as any changed/added/removed Leases). + * The LeaseSet contains Leases only; it is unsigned and does not have the destination set. + */ public void requestLeaseSet(Hash dest, LeaseSet ls) { ClientConnectionRunner runner = getRunner(dest); if (runner != null) { // no need to fire off any jobs... - runner.requestLeaseSet(ls, REQUEST_LEASESET_TIMEOUT, null, null); + runner.requestLeaseSet(dest, ls, REQUEST_LEASESET_TIMEOUT, null, null); } } diff --git a/router/java/src/net/i2p/router/client/ClientManagerFacadeImpl.java b/router/java/src/net/i2p/router/client/ClientManagerFacadeImpl.java index d4e476ad7..cc82592d5 100644 --- a/router/java/src/net/i2p/router/client/ClientManagerFacadeImpl.java +++ b/router/java/src/net/i2p/router/client/ClientManagerFacadeImpl.java @@ -115,6 +115,7 @@ public class ClientManagerFacadeImpl extends ClientManagerFacade implements Inte * @param dest Destination from which the LeaseSet's authorization should be requested * @param set LeaseSet with requested leases - this object must be updated to contain the * signed version (as well as any changed/added/removed Leases) + * The LeaseSet contains Leases only; it is unsigned and does not have the destination set. * @param timeout ms to wait before failing * @param onCreateJob Job to run after the LeaseSet is authorized * @param onFailedJob Job to run after the timeout passes without receiving authorization @@ -126,6 +127,15 @@ public class ClientManagerFacadeImpl extends ClientManagerFacade implements Inte _log.error("Null manager on requestLeaseSet!"); } + /** + * Request that a particular client authorize the Leases contained in the + * LeaseSet. + * + * @param dest Destination from which the LeaseSet's authorization should be requested + * @param ls LeaseSet with requested leases - this object must be updated to contain the + * signed version (as well as any changed/added/removed Leases). + * The LeaseSet contains Leases only; it is unsigned and does not have the destination set. + */ public void requestLeaseSet(Hash dest, LeaseSet set) { if (_manager != null) _manager.requestLeaseSet(dest, set); diff --git a/router/java/src/net/i2p/router/client/ClientMessageEventListener.java b/router/java/src/net/i2p/router/client/ClientMessageEventListener.java index 320ce235d..e12205e83 100644 --- a/router/java/src/net/i2p/router/client/ClientMessageEventListener.java +++ b/router/java/src/net/i2p/router/client/ClientMessageEventListener.java @@ -251,6 +251,7 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi props.putAll(in.getOptions()); cfg.setOptions(props); boolean isPrimary = _runner.getSessionIds().isEmpty(); + // this sets the session id int status = _runner.sessionEstablished(cfg); if (status != SessionStatusMessage.STATUS_CREATED) { // For now, we do NOT send a SessionStatusMessage - see javadoc above @@ -266,6 +267,8 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi _runner.disconnectClient(msg); return; } + // get the new session ID + id = _runner.getSessionId(dest.calculateHash()); sendStatusMessage(id, status); if (_log.shouldLog(Log.INFO)) diff --git a/router/java/src/net/i2p/router/client/LeaseRequestState.java b/router/java/src/net/i2p/router/client/LeaseRequestState.java index 27072db6f..2fb6b9760 100644 --- a/router/java/src/net/i2p/router/client/LeaseRequestState.java +++ b/router/java/src/net/i2p/router/client/LeaseRequestState.java @@ -30,6 +30,9 @@ class LeaseRequestState { /** * @param expiration absolute time, when the request expires (not when the LS expires) + * @param requested LeaseSet with requested leases - this object must be updated to contain the + * signed version (as well as any changed/added/removed Leases) + * The LeaseSet contains Leases and destination only, it is unsigned. */ public LeaseRequestState(Job onGranted, Job onFailed, long expiration, LeaseSet requested) { _onGranted = onGranted;