I2CP Multisession support and multiple destinations in one tunnel pool.

Work in progress.
Router-side I2CP mostly done.
Client-side I2CP mostly done but undecided on how to handle
listeners.
Streaming stubbed out but may be wrong, may need multiple socket managers,
not clear how to proceed.
I2PTunnel not started.
Blacklist of DSA-only dests not started.
Router leaseset publishing not correct. Not clear whether to have
additional tunnel pools with flags, or put the tunnel pools into
the client hashmap twice. Client config contains destination,
may need to move that to tunnel pool.
This commit is contained in:
zzz
2015-03-18 12:59:50 +00:00
parent 91fe62eee3
commit 1293dccf35
38 changed files with 1404 additions and 193 deletions

View File

@@ -5,17 +5,20 @@
package net.i2p.client.streaming;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.net.ConnectException;
import java.net.NoRouteToHostException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import net.i2p.I2PAppContext;
import net.i2p.I2PException;
import net.i2p.client.I2PSession;
import net.i2p.client.I2PSessionException;
import net.i2p.data.Destination;
@@ -34,6 +37,26 @@ public interface I2PSocketManager {
*/
public I2PSession getSession();
/**
* @return a new subsession, non-null
* @param privateKeyStream null for transient, if non-null must have same encryption keys as primary session
* and different signing keys
* @param opts subsession options if any, may be null
* @since 0.9.19
*/
public I2PSession addSubsession(InputStream privateKeyStream, Properties opts) throws I2PSessionException;
/**
* @since 0.9.19
*/
public void removeSubsession(I2PSession session);
/**
* @return a list of subsessions, non-null, does not include the primary session
* @since 0.9.19
*/
public List<I2PSession> getSubsessions();
/**
* How long should we wait for the client to .accept() a socket before
* sending back a NACK/Close?

View File

@@ -1,14 +1,17 @@
package net.i2p.client.streaming.impl;
import java.io.IOException;
import java.io.InputStream;
import java.net.ConnectException;
import java.net.NoRouteToHostException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -37,6 +40,7 @@ public class I2PSocketManagerFull implements I2PSocketManager {
private final I2PAppContext _context;
private final Log _log;
private final I2PSession _session;
private final List<I2PSession> _subsessions;
private final I2PServerSocketFull _serverSocket;
private StandardServerSocket _realServerSocket;
private final ConnectionOptions _defaultOptions;
@@ -80,6 +84,7 @@ public class I2PSocketManagerFull implements I2PSocketManager {
public I2PSocketManagerFull(I2PAppContext context, I2PSession session, Properties opts, String name) {
_context = context;
_session = session;
_subsessions = new CopyOnWriteArrayList<I2PSession>();
_log = _context.logManager().getLog(I2PSocketManagerFull.class);
_name = name + " " + (__managerId.incrementAndGet());
@@ -117,6 +122,39 @@ public class I2PSocketManagerFull implements I2PSocketManager {
return _session;
}
//////////// gahhh we want a socket manager, not a session
/**
* @return a new subsession, non-null
* @param privateKeyStream null for transient, if non-null must have same encryption keys as primary session
* and different signing keys
* @param opts subsession options if any, may be null
* @since 0.9.19
*/
public I2PSession addSubsession(InputStream privateKeyStream, Properties opts) throws I2PSessionException {
I2PSession rv = _session.addSubsession(privateKeyStream, opts);
_subsessions.add(rv);
return rv;
}
/**
* Remove the subsession
*
* @since 0.9.19
*/
public void removeSubsession(I2PSession session) {
_session.removeSubsession(session);
_subsessions.remove(session);
// ...
}
/**
* @return a list of subsessions, non-null, does not include the primary session
* @since 0.9.19
*/
public List<I2PSession> getSubsessions() {
return _session.getSubsessions();
}
public ConnectionManager getConnectionManager() {
return _connectionManager;
}

View File

@@ -9,6 +9,8 @@ package net.i2p.client;
*
*/
import java.io.InputStream;
import java.util.List;
import java.util.Properties;
import java.util.Set;
@@ -21,7 +23,7 @@ import net.i2p.data.SigningPrivateKey;
/**
* <p>Define the standard means of sending and receiving messages on the
* I2P network by using the I2CP (the client protocol). This is done over a
* bidirectional TCP socket and never sends any private keys.
* bidirectional TCP socket.
*
* End to end encryption in I2PSession was disabled in release 0.6.
*
@@ -247,6 +249,27 @@ public interface I2PSession {
*
*/
public void destroySession() throws I2PSessionException;
/**
* @return a new subsession, non-null
* @param privateKeyStream null for transient, if non-null must have same encryption keys as primary session
* and different signing keys
* @param opts subsession options if any, may be null
* @since 0.9.19
*/
public I2PSession addSubsession(InputStream privateKeyStream, Properties opts) throws I2PSessionException;
/**
* @return a list of subsessions, non-null, does not include the primary session
* @since 0.9.19
*/
public void removeSubsession(I2PSession session);
/**
* @return a list of subsessions, non-null, does not include the primary session
* @since 0.9.19
*/
public List<I2PSession> getSubsessions();
/**
* Actually connect the session and start receiving/sending messages

View File

@@ -23,6 +23,7 @@ import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
@@ -43,6 +44,7 @@ import net.i2p.data.i2cp.I2CPMessage;
import net.i2p.data.i2cp.I2CPMessageReader;
import net.i2p.data.i2cp.MessagePayloadMessage;
import net.i2p.data.i2cp.SessionId;
import net.i2p.data.i2cp.SessionStatusMessage;
import net.i2p.internal.I2CPMessageQueue;
import net.i2p.internal.InternalClientManager;
import net.i2p.internal.QueuedI2CPMessageReader;
@@ -76,6 +78,15 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
/** currently granted lease set, or null */
private volatile LeaseSet _leaseSet;
// subsession stuff
// registered subsessions
private final List<SubSession> _subsessions;
// established subsessions
private final ConcurrentHashMap<SessionId, SubSession> _subsessionMap;
private final Object _subsessionLock = new Object();
private static final String MIN_SUBSESSION_VERSION = "0.9.19";
private volatile boolean _routerSupportsSubsessions;
/** hostname of router - will be null if in RouterContext */
protected final String _hostname;
/** port num to router - will be 0 if in RouterContext */
@@ -179,6 +190,9 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
TEST_LOOKUP ||
(routerVersion != null && routerVersion.length() > 0 &&
VersionComparator.comp(routerVersion, MIN_HOST_LOOKUP_VERSION) >= 0);
_routerSupportsSubsessions = _context.isRouterContext() ||
(routerVersion != null && routerVersion.length() > 0 &&
VersionComparator.comp(routerVersion, MIN_SUBSESSION_VERSION) >= 0);
synchronized (_stateLock) {
if (_state == State.OPENING) {
_state = State.GOTDATE;
@@ -196,18 +210,42 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
*/
protected I2PSessionImpl(I2PAppContext context, Properties options,
I2PClientMessageHandlerMap handlerMap) {
this(context, options, handlerMap, false);
this(context, options, handlerMap, null, false);
}
/*
* For extension by SubSession via I2PSessionMuxedImpl and I2PSessionImpl2
*
* @param destKeyStream stream containing the private key data,
* format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile}
* @param options set of options to configure the router with, if null will use System properties
* @since 0.9.19
*/
protected I2PSessionImpl(I2PSessionImpl primary, InputStream destKeyStream, Properties options) throws I2PSessionException {
this(primary.getContext(), options, primary.getHandlerMap(), primary.getProducer(), true);
_availabilityNotifier = new AvailabilityNotifier();
try {
readDestination(destKeyStream);
} catch (DataFormatException dfe) {
throw new I2PSessionException("Error reading the destination key stream", dfe);
} catch (IOException ioe) {
throw new I2PSessionException("Error reading the destination key stream", ioe);
}
}
/**
* Basic setup of finals
* @since 0.9.7
*/
private I2PSessionImpl(I2PAppContext context, Properties options,
I2PClientMessageHandlerMap handlerMap, boolean hasDest) {
I2PClientMessageHandlerMap handlerMap,
I2CPMessageProducer producer,
boolean hasDest) {
_context = context;
_handlerMap = handlerMap;
_log = context.logManager().getLog(getClass());
_subsessions = new CopyOnWriteArrayList<SubSession>();
_subsessionMap = new ConcurrentHashMap<SessionId, SubSession>(4);
if (options == null)
options = (Properties) System.getProperties().clone();
_options = loadConfig(options);
@@ -215,7 +253,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
_portNum = getPort();
_fastReceive = Boolean.parseBoolean(_options.getProperty(I2PClient.PROP_FAST_RECEIVE));
if (hasDest) {
_producer = new I2CPMessageProducer(context);
_producer = producer;
_availableMessages = new ConcurrentHashMap<Long, MessagePayloadMessage>();
_myDestination = new Destination();
_privateKey = new PrivateKey();
@@ -238,10 +276,10 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
* @param destKeyStream stream containing the private key data,
* format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile}
* @param options set of options to configure the router with, if null will use System properties
* @throws I2PSessionException if there is a problem loading the private keys or
* @throws I2PSessionException if there is a problem loading the private keys
*/
public I2PSessionImpl(I2PAppContext context, InputStream destKeyStream, Properties options) throws I2PSessionException {
this(context, options, new I2PClientMessageHandlerMap(context), true);
this(context, options, new I2PClientMessageHandlerMap(context), new I2CPMessageProducer(context), true);
_availabilityNotifier = new AvailabilityNotifier();
try {
readDestination(destKeyStream);
@@ -251,6 +289,66 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
throw new I2PSessionException("Error reading the destination key stream", ioe);
}
}
/**
* Router must be connected or was connected... for now.
*
* @return a new subsession, non-null
* @param privateKeyStream null for transient, if non-null must have same encryption keys as primary session
* and different signing keys
* @param opts subsession options if any, may be null
* @since 0.9.19
*/
public I2PSession addSubsession(InputStream privateKeyStream, Properties opts) throws I2PSessionException {
if (!_routerSupportsSubsessions)
throw new I2PSessionException("Router does not support subsessions");
SubSession sub;
synchronized(_subsessionLock) {
if (_subsessions.size() > _subsessionMap.size())
throw new I2PSessionException("Subsession request already pending");
sub = new SubSession(this, privateKeyStream, opts);
for (SubSession ss : _subsessions) {
if (ss.getDecryptionKey().equals(sub.getDecryptionKey()) &&
ss.getPrivateKey().equals(sub.getPrivateKey())) {
throw new I2PSessionException("Dup subsession");
}
}
_subsessions.add(sub);
}
synchronized (_stateLock) {
if (_state == State.OPEN) {
_producer.connect(sub);
} // else will be called in connect()
}
return sub;
}
/**
* @since 0.9.19
*/
public void removeSubsession(I2PSession session) {
if (!(session instanceof SubSession))
return;
synchronized(_subsessionLock) {
_subsessions.remove(session);
SessionId id = ((SubSession) session).getSessionId();
if (id != null)
_subsessionMap.remove(id);
/// tell the subsession
///....
}
}
/**
* @return a list of subsessions, non-null, does not include the primary session
* @since 0.9.19
*/
public List<I2PSession> getSubsessions() {
synchronized(_subsessionLock) {
return new ArrayList<I2PSession>(_subsessions);
}
}
/**
* Parse the config for anything we know about.
@@ -536,6 +634,14 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
startIdleMonitor();
startVerifyUsage();
success = true;
// now send CreateSessionMessages for all subsessions, one at a time, must wait for each response
synchronized(_subsessionLock) {
for (SubSession ss : _subsessions) {
_producer.connect(ss);
}
}
} catch (InterruptedException ie) {
throw new I2PSessionException("Interrupted", ie);
} catch (UnknownHostException uhe) {
@@ -739,19 +845,80 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
/**
* The I2CPMessageEventListener callback.
* Recieve notification of some I2CP message and handle it if possible.
*
* We route the message based on message type AND session ID.
*
* The following types never contain a session ID and are not routable to
* a subsession:
* BandwidthLimitsMessage, DestReplyMessage
*
* The following types may not ontain a valid session ID
* even when intended for a subsession, so we must take special care:
* SessionStatusMessage
*
* @param reader unused
*/
public void messageReceived(I2CPMessageReader reader, I2CPMessage message) {
I2CPMessageHandler handler = _handlerMap.getHandler(message.getType());
if (handler == null) {
if (_log.shouldLog(Log.WARN))
_log.warn(getPrefix() + "Unknown message or unhandleable message received: type = "
+ message.getType());
int type = message.getType();
SessionId id = message.sessionId();
if (id == null || id.equals(_sessionId) ||
(_sessionId == null && id != null && type == SessionStatusMessage.MESSAGE_TYPE)) {
// it's for us
I2CPMessageHandler handler = _handlerMap.getHandler(type);
if (handler != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + "Message received of type " + type
+ " to be handled by " + handler.getClass().getSimpleName());
handler.handleMessage(message, this);
} else {
if (_log.shouldLog(Log.WARN))
_log.warn(getPrefix() + "Unknown message or unhandleable message received: type = "
+ type);
}
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + "Message received of type " + message.getType()
+ " to be handled by " + handler.getClass().getSimpleName());
handler.handleMessage(message, this);
SubSession sub = _subsessionMap.get(id);
if (sub != null) {
// it's for a subsession
if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + "Message received of type " + type
+ " to be handled by " + sub);
sub.messageReceived(reader, message);
} else if (id != null && type == SessionStatusMessage.MESSAGE_TYPE) {
// look for a subsession without a session
synchronized (_subsessionLock) {
for (SubSession sess : _subsessions) {
if (sess.getSessionId() == null) {
sub.messageReceived(reader, message);
id = sess.getSessionId();
if (id != null) {
if (id.equals(_sessionId)) {
// shouldnt happen
sess.setSessionId(null);
if (_log.shouldLog(Log.WARN))
_log.warn("Dup or our session id " + id);
} else {
SubSession old = _subsessionMap.putIfAbsent(id, sess);
if (old != null) {
// shouldnt happen
sess.setSessionId(null);
if (_log.shouldLog(Log.WARN))
_log.warn("Dup session id " + id);
}
}
}
return;
}
if (_log.shouldLog(Log.WARN))
_log.warn(getPrefix() + "No session " + id + " to handle message: type = "
+ type);
}
}
} else {
// it's for nobody
if (_log.shouldLog(Log.WARN))
_log.warn(getPrefix() + "No session " + id + " to handle message: type = "
+ type);
}
}
}
@@ -786,6 +953,18 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
*/
I2CPMessageProducer getProducer() { return _producer; }
/**
* For Subsessions
* @since 0.9.19
*/
I2PClientMessageHandlerMap getHandlerMap() { return _handlerMap; }
/**
* For Subsessions
* @since 0.9.19
*/
I2PAppContext getContext() { return _context; }
/**
* Retrieve the configuration options
* @return non-null, if insantiated with null options, this will be the System properties.
@@ -803,7 +982,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
/**
* Has the session been closed (or not yet connected)?
* False when open and during transitions. Unsynchronized.
* False when open and during transitions.
*/
public boolean isClosed() {
synchronized (_stateLock) {
@@ -892,6 +1071,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
if (_availabilityNotifier != null)
_availabilityNotifier.stopNotifying();
closeSocket();
_subsessionMap.clear();
if (_sessionListener != null) _sessionListener.disconnected(this);
}

View File

@@ -50,9 +50,9 @@ class I2PSessionImpl2 extends I2PSessionImpl {
private static final long REMOVE_EXPIRED_TIME = 63*1000;
/**
* for extension by SimpleSession (no dest)
*/
/**
* for extension by SimpleSession (no dest)
*/
protected I2PSessionImpl2(I2PAppContext context, Properties options,
I2PClientMessageHandlerMap handlerMap) {
super(context, options, handlerMap);
@@ -61,15 +61,17 @@ class I2PSessionImpl2 extends I2PSessionImpl {
}
/**
* for extension by I2PSessionMuxedImpl
*
* Create a new session, reading the Destination, PrivateKey, and SigningPrivateKey
* from the destKeyStream, and using the specified options to connect to the router
*
* @param destKeyStream stream containing the private key data,
* format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile}
* @param options set of options to configure the router with, if null will use System properties
* @throws I2PSessionException if there is a problem loading the private keys or
* @throws I2PSessionException if there is a problem loading the private keys
*/
public I2PSessionImpl2(I2PAppContext ctx, InputStream destKeyStream, Properties options) throws I2PSessionException {
protected I2PSessionImpl2(I2PAppContext ctx, InputStream destKeyStream, Properties options) throws I2PSessionException {
super(ctx, destKeyStream, options);
_sendingStates = new ConcurrentHashMap<Long, MessageState>(32);
_sendMessageNonce = new AtomicLong();
@@ -94,6 +96,26 @@ class I2PSessionImpl2 extends I2PSessionImpl {
_context.statManager().createRateStat("i2cp.tx.msgExpanded", "size before compression", "i2cp", new long[] { 30*60*1000 });
}
/*
* For extension by SubSession via I2PSessionMuxedImpl
*
* @param destKeyStream stream containing the private key data,
* format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile}
* @param options set of options to configure the router with, if null will use System properties
* @since 0.9.19
*/
protected I2PSessionImpl2(I2PSessionImpl primary, InputStream destKeyStream, Properties options) throws I2PSessionException {
super(primary, destKeyStream, options);
_sendingStates = new ConcurrentHashMap<Long, MessageState>(32);
_sendMessageNonce = new AtomicLong();
_noEffort = "none".equals(getOptions().getProperty(I2PClient.PROP_RELIABILITY, "").toLowerCase(Locale.US));
_context.statManager().createRateStat("i2cp.receiveStatusTime.1", "How long it took to get status=1 back", "i2cp", new long[] { 10*60*1000 });
_context.statManager().createRateStat("i2cp.receiveStatusTime.4", "How long it took to get status=4 back", "i2cp", new long[] { 10*60*1000 });
_context.statManager().createRateStat("i2cp.receiveStatusTime.5", "How long it took to get status=5 back", "i2cp", new long[] { 10*60*1000 });
_context.statManager().createRateStat("i2cp.tx.msgCompressed", "compressed size transferred", "i2cp", new long[] { 30*60*1000 });
_context.statManager().createRateStat("i2cp.tx.msgExpanded", "size before compression", "i2cp", new long[] { 30*60*1000 });
}
/**
* Fire up a periodic task to check for unclaimed messages
* @since 0.9.14

View File

@@ -82,6 +82,24 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 {
// discards the one in super(), sorry about that... (no it wasn't started yet)
_availabilityNotifier = new MuxedAvailabilityNotifier();
}
/*
* For extension by SubSession
*
* @param destKeyStream stream containing the private key data,
* format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile}
* @param options set of options to configure the router with, if null will use System properties
* @since 0.9.19
*/
protected I2PSessionMuxedImpl(I2PSessionImpl primary, InputStream destKeyStream, Properties options) throws I2PSessionException {
super(primary, destKeyStream, options);
// also stored in _sessionListener but we keep it in _demultipexer
// as well so we don't have to keep casting
_demultiplexer = new I2PSessionDemultiplexer(primary.getContext());
super.setSessionListener(_demultiplexer);
// discards the one in super(), sorry about that... (no it wasn't started yet)
_availabilityNotifier = new MuxedAvailabilityNotifier();
}
/** listen on all protocols and ports */
@Override

View File

@@ -0,0 +1,299 @@
package net.i2p.client;
/*
* free (adj.): unencumbered; not under the control of others
* Written by jrandom in 2003 and released into the public domain
* with no warranty of any kind, either expressed or implied.
* It probably won't make your computer catch on fire, or eat
* your children, but it might. Use at your own risk.
*
*/
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import net.i2p.I2PAppContext;
import net.i2p.data.Destination;
import net.i2p.data.Hash;
import net.i2p.data.PrivateKey;
import net.i2p.data.SigningPrivateKey;
import net.i2p.data.i2cp.I2CPMessage;
import net.i2p.data.i2cp.SessionId;
/**
* An additional session using another session's connection.
*
* A subsession uses the same connection to the router as the primary session,
* but has a different Destination. It uses the same tunnels as the primary
* but has its own leaseset. It must use the same encryption keys as the primary
* so that garlic encryption/decryption works.
*
* The message handler map and message producer are reused from primary.
*
* Does NOT reuse the session listener ????
*
* While the I2CP protocol, in theory, allows for fully independent sessions
* over the same I2CP connection, this is not currently supported by the router.
*
* @since 0.9.19
*/
class SubSession extends I2PSessionMuxedImpl {
private final I2PSessionMuxedImpl _primary;
/**
* @param primary must be a I2PSessionMuxedImpl
*/
public SubSession(I2PSession primary, InputStream destKeyStream, Properties options) throws I2PSessionException {
super((I2PSessionMuxedImpl)primary, destKeyStream, options);
_primary = (I2PSessionMuxedImpl) primary;
if (!getDecryptionKey().equals(_primary.getDecryptionKey()))
throw new I2PSessionException("encryption key mismatch");
if (getPrivateKey().equals(_primary.getPrivateKey()))
throw new I2PSessionException("signing key must differ");
// state management
}
/**
* Unsupported in a subsession.
* @throws UnsupportedOperationException always
* @since 0.9.19
*/
@Override
public I2PSession addSubsession(InputStream destKeyStream, Properties opts) throws I2PSessionException {
throw new UnsupportedOperationException();
}
/**
* Unsupported in a subsession.
* Does nothing.
* @since 0.9.19
*/
@Override
public void removeSubsession(I2PSession session) {}
/**
* Unsupported in a subsession.
* @return empty list always
* @since 0.9.19
*/
@Override
public List<I2PSession> getSubsessions() {
return Collections.emptyList();
}
/**
* Does nothing for now
*/
@Override
public void updateOptions(Properties options) {}
/**
* Connect to the router and establish a session. This call blocks until
* a session is granted.
*
* Should be threadsafe, other threads will block until complete.
* Disconnect / destroy from another thread may be called simultaneously and
* will (should?) interrupt the connect.
*
* @throws I2PSessionException if there is a configuration error or the router is
* not reachable
*/
@Override
public void connect() throws I2PSessionException {
_primary.connect();
}
/**
* Has the session been closed (or not yet connected)?
* False when open and during transitions.
*/
@Override
public boolean isClosed() {
return getSessionId() == null || _primary.isClosed();
}
/**
* Deliver an I2CP message to the router
* May block for several seconds if the write queue to the router is full
*
* @throws I2PSessionException if the message is malformed or there is an error writing it out
*/
@Override
void sendMessage(I2CPMessage message) throws I2PSessionException {
if (isClosed())
throw new I2PSessionException("Already closed");
_primary.sendMessage(message);
}
/**
* Pass off the error to the listener
* Misspelled, oh well.
* @param error non-null
*/
@Override
void propogateError(String msg, Throwable error) {
_primary.propogateError(msg, error);
if (_sessionListener != null) _sessionListener.errorOccurred(this, msg, error);
}
/**
* Tear down the session, and do NOT reconnect.
*
* Blocks if session has not been fully started.
*/
@Override
public void destroySession() {
_primary.destroySession();
if (_availabilityNotifier != null)
_availabilityNotifier.stopNotifying();
if (_sessionListener != null) _sessionListener.disconnected(this);
}
/**
* Will interrupt a connect in progress.
*/
@Override
protected void disconnect() {
_primary.disconnect();
}
@Override
protected boolean reconnect() {
return _primary.reconnect();
}
/**
* Called by the message handler
* on reception of DestReplyMessage
*
* This will never happen, as the dest reply message does not contain a session ID.
*/
@Override
void destReceived(Destination d) {
_primary.destReceived(d);
}
/**
* Called by the message handler
* on reception of DestReplyMessage
*
* This will never happen, as the dest reply message does not contain a session ID.
*
* @param h non-null
*/
@Override
void destLookupFailed(Hash h) {
_primary.destLookupFailed(h);
}
/**
* Called by the message handler
* on reception of HostReplyMessage
* @param d non-null
*/
void destReceived(long nonce, Destination d) {
_primary.destReceived(nonce, d);
}
/**
* Called by the message handler
* on reception of HostReplyMessage
*/
@Override
void destLookupFailed(long nonce) {
_primary.destLookupFailed(nonce);
}
/**
* Called by the message handler.
* This will never happen, as the bw limits message does not contain a session ID.
*/
@Override
void bwReceived(int[] i) {
_primary.bwReceived(i);
}
/**
* Blocking. Waits a max of 10 seconds by default.
* See lookupDest with maxWait parameter to change.
* Implemented in 0.8.3 in I2PSessionImpl;
* previously was available only in I2PSimpleSession.
* Multiple outstanding lookups are now allowed.
* @return null on failure
*/
@Override
public Destination lookupDest(Hash h) throws I2PSessionException {
return _primary.lookupDest(h);
}
/**
* Blocking.
* @param maxWait ms
* @return null on failure
*/
@Override
public Destination lookupDest(Hash h, long maxWait) throws I2PSessionException {
return _primary.lookupDest(h, maxWait);
}
/**
* Ask the router to lookup a Destination by host name.
* Blocking. Waits a max of 10 seconds by default.
*
* This only makes sense for a b32 hostname, OR outside router context.
* Inside router context, just query the naming service.
* Outside router context, this does NOT query the context naming service.
* Do that first if you expect a local addressbook.
*
* This will log a warning for non-b32 in router context.
*
* See interface for suggested implementation.
*
* Requires router side to be 0.9.11 or higher. If the router is older,
* this will return null immediately.
*/
@Override
public Destination lookupDest(String name) throws I2PSessionException {
return _primary.lookupDest(name);
}
/**
* Ask the router to lookup a Destination by host name.
* Blocking. See above for details.
* @param maxWait ms
* @return null on failure
*/
@Override
public Destination lookupDest(String name, long maxWait) throws I2PSessionException {
return _primary.lookupDest(name, maxWait);
}
/**
* This may not work???????????, as the reply does not contain a session ID, so
* it won't be routed back to us?
*/
@Override
public int[] bandwidthLimits() throws I2PSessionException {
return _primary.bandwidthLimits();
}
@Override
protected void updateActivity() {
_primary.updateActivity();
}
@Override
public long lastActivity() {
return _primary.lastActivity();
}
@Override
public void setReduced() {
_primary.setReduced();
}
}

View File

@@ -38,6 +38,11 @@ public class CreateLeaseSetMessage extends I2CPMessageImpl {
return _sessionId;
}
@Override
public SessionId sessionId() {
return _sessionId;
}
public void setSessionId(SessionId id) {
_sessionId = id;
}

View File

@@ -32,6 +32,16 @@ public class DestroySessionMessage extends I2CPMessageImpl {
return _sessionId;
}
/**
* Return the SessionId for this message.
*
* @since 0.9.19
*/
@Override
public SessionId sessionId() {
return _sessionId;
}
public void setSessionId(SessionId id) {
_sessionId = id;
}

View File

@@ -76,6 +76,16 @@ public class HostLookupMessage extends I2CPMessageImpl {
return _sessionId;
}
/**
* Return the SessionId for this message.
*
* @since 0.9.19
*/
@Override
public SessionId sessionId() {
return _sessionId;
}
/**
* @return 0 to 2**32 - 1
*/

View File

@@ -73,6 +73,16 @@ public class HostReplyMessage extends I2CPMessageImpl {
return _sessionId;
}
/**
* Return the SessionId for this message.
*
* @since 0.9.19
*/
@Override
public SessionId sessionId() {
return _sessionId;
}
/**
* @return 0 to 2**32 - 1
*/

View File

@@ -60,9 +60,20 @@ public interface I2CPMessage extends DataStructure {
public void writeMessage(OutputStream out) throws I2CPMessageException, IOException;
/**
* Return the unique identifier for this type of APIMessage, as specified in the
* Return the unique identifier for this type of message, as specified in the
* network specification document under #ClientAccessLayerMessages
* @return unique identifier for this type of APIMessage
* @return unique identifier for this type of message
*/
public int getType();
}
/**
* Return the SessionId for this type of message.
* Most but not all message types include a SessionId.
* The ones that do already define getSessionId(), but some return a SessionId and
* some return a long, so we define a new method here.
*
* @return SessionId or null if this message type does not include a SessionId
* @since 0.9.19
*/
public SessionId sessionId();
}

View File

@@ -12,7 +12,7 @@ package net.i2p.data.i2cp;
import net.i2p.I2PException;
/**
* Represent an error serializing or deserializing an APIMessage
* Represent an error serializing or deserializing a message
*
* @author jrandom
*/

View File

@@ -127,4 +127,15 @@ public abstract class I2CPMessageImpl extends DataStructureImpl implements I2CPM
throw new DataFormatException("Error writing the message", ime);
}
}
/**
* Return the SessionId for this type of message.
* Most but not all message types include a SessionId.
* The ones that do already define getSessionId(), but some return a SessionId and
* some return a long, so we define a new method here.
*
* @return null always. Extending classes with a SessionId must override.
* @since 0.9.19
*/
public SessionId sessionId() { return null; }
}

View File

@@ -37,6 +37,16 @@ public class MessagePayloadMessage extends I2CPMessageImpl {
return _sessionId;
}
/**
* Return the SessionId for this message.
*
* @since 0.9.19
*/
@Override
public SessionId sessionId() {
return _sessionId >= 0 ? new SessionId(_sessionId) : null;
}
/** @param id 0-65535 */
public void setSessionId(long id) {
_sessionId = (int) id;

View File

@@ -193,6 +193,16 @@ public class MessageStatusMessage extends I2CPMessageImpl {
return _sessionId;
}
/**
* Return the SessionId for this message.
*
* @since 0.9.19
*/
@Override
public SessionId sessionId() {
return _sessionId >= 0 ? new SessionId(_sessionId) : null;
}
/** @param id 0-65535 */
public void setSessionId(long id) {
_sessionId = (int) id;

View File

@@ -36,6 +36,16 @@ public class ReceiveMessageBeginMessage extends I2CPMessageImpl {
return _sessionId;
}
/**
* Return the SessionId for this message.
*
* @since 0.9.19
*/
@Override
public SessionId sessionId() {
return _sessionId >= 0 ? new SessionId(_sessionId) : null;
}
/** @param id 0-65535 */
public void setSessionId(long id) {
_sessionId = (int) id;

View File

@@ -35,6 +35,16 @@ public class ReceiveMessageEndMessage extends I2CPMessageImpl {
return _sessionId;
}
/**
* Return the SessionId for this message.
*
* @since 0.9.19
*/
@Override
public SessionId sessionId() {
return _sessionId >= 0 ? new SessionId(_sessionId) : null;
}
/** @param id 0-65535 */
public void setSessionId(long id) {
_sessionId = (int) id;

View File

@@ -33,6 +33,16 @@ public class ReconfigureSessionMessage extends I2CPMessageImpl {
return _sessionId;
}
/**
* Return the SessionId for this message.
*
* @since 0.9.19
*/
@Override
public SessionId sessionId() {
return _sessionId;
}
public void setSessionId(SessionId id) {
_sessionId = id;
}

View File

@@ -35,6 +35,16 @@ public class ReportAbuseMessage extends I2CPMessageImpl {
return _sessionId;
}
/**
* Return the SessionId for this message.
*
* @since 0.9.19
*/
@Override
public SessionId sessionId() {
return _sessionId;
}
public void setSessionId(SessionId id) {
_sessionId = id;
}

View File

@@ -45,6 +45,16 @@ public class RequestLeaseSetMessage extends I2CPMessageImpl {
return _sessionId;
}
/**
* Return the SessionId for this message.
*
* @since 0.9.19
*/
@Override
public SessionId sessionId() {
return _sessionId;
}
public void setSessionId(SessionId id) {
_sessionId = id;
}

View File

@@ -55,6 +55,16 @@ public class RequestVariableLeaseSetMessage extends I2CPMessageImpl {
return _sessionId;
}
/**
* Return the SessionId for this message.
*
* @since 0.9.19
*/
@Override
public SessionId sessionId() {
return _sessionId;
}
public void setSessionId(SessionId id) {
_sessionId = id;
}

View File

@@ -38,6 +38,16 @@ public class SendMessageMessage extends I2CPMessageImpl {
return _sessionId;
}
/**
* Return the SessionId for this message.
*
* @since 0.9.19
*/
@Override
public SessionId sessionId() {
return _sessionId;
}
public void setSessionId(SessionId id) {
_sessionId = id;
}

View File

@@ -42,6 +42,16 @@ public class SessionStatusMessage extends I2CPMessageImpl {
return _sessionId;
}
/**
* Return the SessionId for this message.
*
* @since 0.9.19
*/
@Override
public SessionId sessionId() {
return _sessionId;
}
public void setSessionId(SessionId id) {
_sessionId = id;
}

View File

@@ -146,6 +146,21 @@ public interface TunnelManagerFacade extends Service {
*
*/
public void buildTunnels(Destination client, ClientTunnelSettings settings);
/**
* Add another destination to the same tunnels.
* Must have same encryption key an a different signing key.
* @throws IllegalArgumentException if not
* @return success
* @since 0.9.19
*/
public boolean addAlias(Destination dest, ClientTunnelSettings settings, Destination existingClient);
/**
* Remove another destination to the same tunnels.
* @since 0.9.19
*/
public void removeAlias(Destination dest);
public TunnelPoolSettings getInboundSettings();
public TunnelPoolSettings getOutboundSettings();

View File

@@ -1,11 +1,13 @@
package net.i2p.router;
import java.util.Set;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import net.i2p.data.Base64;
import net.i2p.data.Hash;
import net.i2p.util.ConcurrentHashSet;
import net.i2p.util.NativeBigInteger;
import net.i2p.util.RandomSource;
import net.i2p.util.SystemVersion;
@@ -31,6 +33,8 @@ public class TunnelPoolSettings {
private final Properties _unknownOptions;
private Hash _randomKey;
private int _priority;
private final Set<Hash> _aliases;
private Hash _aliasOf;
/** prefix used to override the router's defaults for clients */
// unimplemented
@@ -119,6 +123,10 @@ public class TunnelPoolSettings {
_randomKey = generateRandomKey();
if (_isExploratory && !_isInbound)
_priority = EXPLORATORY_PRIORITY;
if (!_isExploratory)
_aliases = new ConcurrentHashSet<Hash>(4);
else
_aliases = null;
}
/** how many tunnels should be available at all times */
@@ -206,6 +214,34 @@ public class TunnelPoolSettings {
/** what destination is this a client tunnel for (or null if exploratory) */
public Hash getDestination() { return _destination; }
/**
* Other destinations that use the same tunnel (or null if exploratory)
* Modifiable, concurrent, not a copy
* @since 0.9.19
*/
public Set<Hash> getAliases() {
return _aliases;
}
/**
* Other destination that this is an alias of (or null).
* If non-null, don't build tunnels.
* @since 0.9.19
*/
public Hash getAliasOf() {
return _aliasOf;
}
/**
* Set other destination that this is an alias of (or null).
* If non-null, don't build tunnels.
* @since 0.9.19
*/
public void setAliasOf(Hash h) {
_aliasOf = h;
}
/**
* random key used for peer ordering
@@ -235,7 +271,7 @@ public class TunnelPoolSettings {
public int getPriority() { return _priority; }
public Properties getUnknownOptions() { return _unknownOptions; }
/**
* @param prefix non-null
*/

View File

@@ -16,6 +16,7 @@ import java.io.OutputStream;
import java.net.Socket;
import java.util.concurrent.ConcurrentHashMap;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -39,6 +40,7 @@ import net.i2p.data.i2cp.SendMessageMessage;
import net.i2p.data.i2cp.SendMessageExpiresMessage;
import net.i2p.data.i2cp.SessionConfig;
import net.i2p.data.i2cp.SessionId;
import net.i2p.data.i2cp.SessionStatusMessage;
import net.i2p.router.Job;
import net.i2p.router.JobImpl;
import net.i2p.router.RouterContext;
@@ -51,6 +53,9 @@ import net.i2p.util.SimpleTimer;
/**
* Bridge the router and the client - managing state for a client.
*
* As of release 0.9.19, multiple sessions are supported on a single
* I2CP connection. These sessions share tunnels and some configuration.
*
* @author jrandom
*/
class ClientConnectionRunner {
@@ -61,21 +66,16 @@ class ClientConnectionRunner {
private final Socket _socket;
/** output stream of the socket that I2CP messages bound to the client should be written to */
private OutputStream _out;
/** session ID of the current client */
private SessionId _sessionId;
/** user's config */
private SessionConfig _config;
private final ConcurrentHashMap<Hash, SessionParams> _sessions;
private String _clientVersion;
/**
* Mapping of MessageId to Payload, storing messages for retrieval.
* Unused for i2cp.fastReceive = "true" (_dontSendMSMOnRecive = true)
*/
private final Map<MessageId, Payload> _messages;
/** lease set request state, or null if there is no request pending on at the moment */
private LeaseRequestState _leaseRequest;
private int _consecutiveLeaseRequestFails;
/** currently allocated leaseSet, or null if none is allocated */
private LeaseSet _currentLeaseSet;
/**
* Set of messageIds created but not yet ACCEPTED.
* Unused for i2cp.messageReliability = "none" (_dontSendMSM = true)
@@ -83,7 +83,7 @@ class ClientConnectionRunner {
private final Set<MessageId> _acceptedPending;
/** thingy that does stuff */
protected I2CPMessageReader _reader;
/** just for this destination */
/** Used for all sessions, which must all have the same crypto keys */
private SessionKeyManager _sessionKeyManager;
/**
* This contains the last 10 MessageIds that have had their (non-ack) status
@@ -91,7 +91,6 @@ class ClientConnectionRunner {
*/
private final List<MessageId> _alreadyProcessed;
private ClientWriterRunner _writer;
private Hash _destHashCache;
/** are we, uh, dead */
private volatile boolean _dead;
/** For outbound traffic. true if i2cp.messageReliability = "none"; @since 0.8.1 */
@@ -108,11 +107,30 @@ class ClientConnectionRunner {
private static final int MAX_LEASE_FAILS = 5;
private static final int BUF_SIZE = 32*1024;
private static final int MAX_SESSIONS = 4;
/** @since 0.9.2 */
private static final String PROP_TAGS = "crypto.tagsToSend";
private static final String PROP_THRESH = "crypto.lowTagThreshold";
/**
* For multisession
* @since 0.9.19
*/
private static class SessionParams {
final Destination dest;
final boolean isPrimary;
SessionId sessionId;
SessionConfig config;
LeaseRequestState leaseRequest;
LeaseSet currentLeaseSet;
SessionParams(Destination d, boolean isPrimary) {
dest = d;
this.isPrimary = isPrimary;
}
}
/**
* Create a new runner against the given socket
*
@@ -124,6 +142,7 @@ class ClientConnectionRunner {
_socket = socket;
// unused for fastReceive
_messages = new ConcurrentHashMap<MessageId, Payload>();
_sessions = new ConcurrentHashMap<Hash, SessionParams>(4);
_alreadyProcessed = new ArrayList<MessageId>();
_acceptedPending = new ConcurrentHashSet<MessageId>();
_messageId = new AtomicInteger(_context.random().nextInt());
@@ -166,8 +185,7 @@ class ClientConnectionRunner {
public synchronized void stopRunning() {
if (_dead) return;
if (_context.router().isAlive() && _log.shouldLog(Log.WARN))
_log.warn("Stop the I2CP connection! current leaseSet: "
+ _currentLeaseSet, new Exception("Stop client connection"));
_log.warn("Stop the I2CP connection!", new Exception("Stop client connection"));
_dead = true;
// we need these keys to unpublish the leaseSet
if (_reader != null) _reader.stopReading();
@@ -178,21 +196,56 @@ class ClientConnectionRunner {
if (_sessionKeyManager != null)
_sessionKeyManager.shutdown();
_manager.unregisterConnection(this);
if (_currentLeaseSet != null)
_context.netDb().unpublish(_currentLeaseSet);
_leaseRequest = null;
for (SessionParams sp : _sessions.values()) {
LeaseSet ls = sp.currentLeaseSet;
if (ls != null)
_context.netDb().unpublish(ls);
}
synchronized (_alreadyProcessed) {
_alreadyProcessed.clear();
}
//_config = null;
//_manager = null;
_sessions.clear();
}
/**
* Current client's config,
* will be null before session is established
* will be null if session not found
* IS subsession aware.
* @since 0.9.19 added hash param
*/
public SessionConfig getConfig() { return _config; }
public SessionConfig getConfig(Hash h) {
SessionParams sp = _sessions.get(h);
if (sp == null)
return null;
return sp.config;
}
/**
* Current client's config,
* will be null if session not found
* IS subsession aware.
* @since 0.9.19 added id param
*/
public SessionConfig getConfig(SessionId id) {
for (SessionParams sp : _sessions.values()) {
if (id.equals(sp.sessionId))
return sp.config;
}
return null;
}
/**
* Primary client's config,
* will be null if session not set up
* @since 0.9.19
*/
public SessionConfig getPrimaryConfig() {
for (SessionParams sp : _sessions.values()) {
if (sp.isPrimary)
return sp.config;
}
return null;
}
/**
* The client version.
@@ -214,41 +267,186 @@ class ClientConnectionRunner {
/** current client's sessionkeymanager */
public SessionKeyManager getSessionKeyManager() { return _sessionKeyManager; }
/** currently allocated leaseSet */
public LeaseSet getLeaseSet() { return _currentLeaseSet; }
void setLeaseSet(LeaseSet ls) { _currentLeaseSet = ls; }
/**
* Currently allocated leaseSet.
* IS subsession aware. Returns primary leaseset only.
* @return leaseSet or null if not yet set or unknown hash
* @since 0.9.19 added hash parameter
*/
public LeaseSet getLeaseSet(Hash h) {
SessionParams sp = _sessions.get(h);
if (sp == null)
return null;
return sp.currentLeaseSet;
}
/**
* Currently allocated leaseSet.
* IS subsession aware.
*/
void setLeaseSet(LeaseSet ls) {
Hash h = ls.getDestination().calculateHash();
SessionParams sp = _sessions.get(h);
if (sp == null)
return;
sp.currentLeaseSet = ls;
}
/**
* Equivalent to getConfig().getDestination().calculateHash();
* will be null before session is established
* Not subsession aware. Returns random hash from the sessions.
* Don't use if you can help it.
*
* @return primary hash or null if not yet set
*/
public Hash getDestHash() { return _destHashCache; }
public Hash getDestHash() {
for (Hash h : _sessions.keySet()) {
return h;
}
return null;
}
/**
* Return the hash for the given ID
* @return hash or null if unknown
* @since 0.9.19
*/
public Hash getDestHash(SessionId id) {
for (Map.Entry<Hash, SessionParams> e : _sessions.entrySet()) {
if (id.equals(e.getValue().sessionId))
return e.getKey();
}
return null;
}
/**
* Return the dest for the given ID
* @return dest or null if unknown
* @since 0.9.19
*/
public Destination getDestination(SessionId id) {
for (SessionParams sp : _sessions.values()) {
if (id.equals(sp.sessionId))
return sp.dest;
}
return null;
}
/**
* @return current client's sessionId or null if not yet set
* Subsession aware.
*
* @param h the local target
* @return current client's sessionId or null if not yet set or not a valid hash
* @since 0.9.19
*/
SessionId getSessionId() { return _sessionId; }
SessionId getSessionId(Hash h) {
SessionParams sp = _sessions.get(h);
if (sp == null)
return null;
return sp.sessionId;
}
/**
* Subsession aware.
*
* @return all current client's sessionIds, non-null
* @since 0.9.19
*/
List<SessionId> getSessionIds() {
List<SessionId> rv = new ArrayList<SessionId>(_sessions.size());
for (SessionParams sp : _sessions.values()) {
SessionId id = sp.sessionId;
if (id != null)
rv.add(id);
}
return rv;
}
/**
* Subsession aware.
*
* @return all current client's destinations, non-null
* @since 0.9.19
*/
List<Destination> getDestinations() {
List<Destination> rv = new ArrayList<Destination>(_sessions.size());
for (SessionParams sp : _sessions.values()) {
rv.add(sp.dest);
}
return rv;
}
/**
* To be called only by ClientManager.
*
* @param hash for the session
* @throws IllegalStateException if already set
* @since 0.9.19 added hash param
*/
void setSessionId(SessionId id) {
if (_sessionId != null)
void setSessionId(Hash hash, SessionId id) {
if (hash == null)
throw new IllegalStateException();
_sessionId = id;
SessionParams sp = _sessions.get(hash);
if (sp == null || sp.sessionId != null)
throw new IllegalStateException();
sp.sessionId = id;
}
/**
* Kill the session. Caller must kill runner if none left.
*
* @since 0.9.19
*/
void removeSession(SessionId id) {
boolean isPrimary = false;
for (Iterator<SessionParams> iter = _sessions.values().iterator(); iter.hasNext(); ) {
SessionParams sp = iter.next();
if (id.equals(sp.sessionId)) {
if (_log.shouldLog(Log.INFO))
_log.info("Destroying client session " + id);
iter.remove();
// Tell client manger
_manager.unregisterSession(id, sp.dest);
LeaseSet ls = sp.currentLeaseSet;
if (ls != null)
_context.netDb().unpublish(ls);
isPrimary = sp.isPrimary;
}
}
if (isPrimary) {
// kill all the others also
for (SessionParams sp : _sessions.values()) {
_manager.unregisterSession(id, sp.dest);
LeaseSet ls = sp.currentLeaseSet;
if (ls != null)
_context.netDb().unpublish(ls);
}
}
}
/** data for the current leaseRequest, or null if there is no active leaseSet request */
LeaseRequestState getLeaseRequest() { return _leaseRequest; }
/**
* Data for the current leaseRequest, or null if there is no active leaseSet request.
* Not subsession aware. Returns primary ID only.
* @since 0.9.19 added hash param
*/
LeaseRequestState getLeaseRequest(Hash h) {
SessionParams sp = _sessions.get(h);
if (sp == null)
return null;
return sp.leaseRequest;
}
/** @param req non-null */
public void failLeaseRequest(LeaseRequestState req) {
boolean disconnect = false;
Hash h = req.getRequested().getDestination().calculateHash();
SessionParams sp = _sessions.get(h);
if (sp == null)
return;
synchronized (this) {
if (_leaseRequest == req) {
_leaseRequest = null;
if (sp.leaseRequest == req) {
sp.leaseRequest = null;
disconnect = ++_consecutiveLeaseRequestFails > MAX_LEASE_FAILS;
}
}
@@ -289,19 +487,34 @@ class ClientConnectionRunner {
* @return SessionStatusMessage return code, 1 for success, != 1 for failure
*/
public int sessionEstablished(SessionConfig config) {
_destHashCache = config.getDestination().calculateHash();
Destination dest = config.getDestination();
Hash destHash = dest.calculateHash();
if (_log.shouldLog(Log.DEBUG))
_log.debug("SessionEstablished called for destination " + _destHashCache.toBase64());
_config = config;
_log.debug("SessionEstablished called for destination " + destHash);
if (_sessions.size() > MAX_SESSIONS)
return SessionStatusMessage.STATUS_REFUSED;
boolean isPrimary = _sessions.isEmpty();
if (!isPrimary) {
// all encryption keys must be the same
for (SessionParams sp : _sessions.values()) {
if (!dest.getPublicKey().equals(sp.dest.getPublicKey()))
return SessionStatusMessage.STATUS_INVALID;
}
}
SessionParams sp = new SessionParams(dest, isPrimary);
sp.config = config;
SessionParams old = _sessions.putIfAbsent(destHash, sp);
if (old != null)
return SessionStatusMessage.STATUS_INVALID;
// We process a few options here, but most are handled by the tunnel manager.
// The ones here can't be changed later.
Properties opts = config.getOptions();
if (opts != null) {
if (isPrimary && opts != null) {
_dontSendMSM = "none".equals(opts.getProperty(I2PClient.PROP_RELIABILITY, "").toLowerCase(Locale.US));
_dontSendMSMOnReceive = Boolean.parseBoolean(opts.getProperty(I2PClient.PROP_FAST_RECEIVE));
}
// per-destination session key manager to prevent rather easy correlation
if (_sessionKeyManager == null) {
if (isPrimary && _sessionKeyManager == null) {
int tags = TransientSessionKeyManager.DEFAULT_TAGS;
int thresh = TransientSessionKeyManager.LOW_THRESHOLD;
if (opts != null) {
@@ -315,10 +528,8 @@ class ClientConnectionRunner {
}
}
_sessionKeyManager = new TransientSessionKeyManager(_context, tags, thresh);
} else {
_log.error("SessionEstablished called for twice for destination " + _destHashCache.toBase64().substring(0,4));
}
return _manager.destinationEstablished(this);
return _manager.destinationEstablished(this, dest);
}
/**
@@ -329,14 +540,21 @@ class ClientConnectionRunner {
*
* Do not use for status = STATUS_SEND_ACCEPTED; use ackSendMessage() for that.
*
* @param dest the client
* @param id the router's ID for this message
* @param messageNonce the client's ID for this message
* @param status see I2CP MessageStatusMessage for success/failure codes
*/
void updateMessageDeliveryStatus(MessageId id, long messageNonce, int status) {
void updateMessageDeliveryStatus(Destination dest, MessageId id, long messageNonce, int status) {
if (_dead || messageNonce <= 0)
return;
_context.jobQueue().addJob(new MessageDeliveryStatusUpdate(id, messageNonce, status));
SessionParams sp = _sessions.get(dest.calculateHash());
if (sp == null)
return;
SessionId sid = sp.sessionId;
if (sid == null)
return; // sid = new SessionId(foo) ???
_context.jobQueue().addJob(new MessageDeliveryStatusUpdate(sid, id, messageNonce, status));
}
/**
@@ -344,19 +562,23 @@ class ClientConnectionRunner {
* updated. This takes care of all the LeaseRequestState stuff (including firing any jobs)
*/
void leaseSetCreated(LeaseSet ls) {
LeaseRequestState state = null;
Hash h = ls.getDestination().calculateHash();
SessionParams sp = _sessions.get(h);
if (sp == null)
return;
LeaseRequestState state;
synchronized (this) {
state = _leaseRequest;
state = sp.leaseRequest;
if (state == null) {
if (_log.shouldLog(Log.WARN))
_log.warn("LeaseRequest is null and we've received a new lease?! perhaps this is odd... " + ls);
return;
} else {
state.setIsSuccessful(true);
_currentLeaseSet = ls;
setLeaseSet(ls);
if (_log.shouldLog(Log.DEBUG))
_log.debug("LeaseSet created fully: " + state + " / " + ls);
_leaseRequest = null;
sp.leaseRequest = null;
_consecutiveLeaseRequestFails = 0;
}
}
@@ -425,12 +647,12 @@ class ClientConnectionRunner {
if (_log.shouldLog(Log.DEBUG))
_log.debug("** Receiving message " + id.getMessageId() + " with payload of size "
+ payload.getSize() + " for session " + _sessionId.getSessionId());
+ payload.getSize() + " for session " + message.getSessionId());
//long beforeDistribute = _context.clock().now();
// the following blocks as described above
SessionConfig cfg = _config;
if (cfg != null)
_manager.distributeMessage(cfg.getDestination(), dest, payload,
Destination fromDest = getDestination(message.getSessionId());
if (fromDest != null)
_manager.distributeMessage(fromDest, dest, payload,
id, message.getNonce(), expiration, flags);
// else log error?
//long timeToDistribute = _context.clock().now() - beforeDistribute;
@@ -450,11 +672,9 @@ class ClientConnectionRunner {
* @param id OUR id for the message
* @param nonce HIS id for the message
*/
void ackSendMessage(MessageId id, long nonce) {
void ackSendMessage(SessionId sid, MessageId id, long nonce) {
if (_dontSendMSM || nonce == 0)
return;
SessionId sid = _sessionId;
if (sid == null) return;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Acking message send [accepted]" + id + " / " + nonce + " for sessionId "
+ sid);
@@ -476,6 +696,7 @@ class ClientConnectionRunner {
/**
* Asynchronously deliver the message to the current runner
*
* @param fromDest generally null when from remote, non-null if from local
*/
void receiveMessage(Destination toDest, Destination fromDest, Payload payload) {
if (_dead) return;
@@ -489,9 +710,9 @@ class ClientConnectionRunner {
* Send async abuse message to the client
*
*/
public void reportAbuse(String reason, int severity) {
public void reportAbuse(Destination dest, String reason, int severity) {
if (_dead) return;
_context.jobQueue().addJob(new ReportAbuseJob(_context, this, reason, severity));
_context.jobQueue().addJob(new ReportAbuseJob(_context, this, dest, reason, severity));
}
/**
@@ -524,12 +745,16 @@ class ClientConnectionRunner {
// so the comparison will always work.
int leases = set.getLeaseCount();
// synch so _currentLeaseSet isn't changed out from under us
LeaseSet current = null;
synchronized (this) {
if (_currentLeaseSet != null && _currentLeaseSet.getLeaseCount() == leases) {
Destination dest = set.getDestination();
if (dest != null)
current = getLeaseSet(dest.calculateHash());
if (current != null && current.getLeaseCount() == leases) {
for (int i = 0; i < leases; i++) {
if (! _currentLeaseSet.getLease(i).getTunnelId().equals(set.getLease(i).getTunnelId()))
if (! current.getLease(i).getTunnelId().equals(set.getLease(i).getTunnelId()))
break;
if (! _currentLeaseSet.getLease(i).getGateway().equals(set.getLease(i).getGateway()))
if (! current.getLease(i).getGateway().equals(set.getLease(i).getGateway()))
break;
if (i == leases - 1) {
if (_log.shouldLog(Log.INFO))
@@ -542,10 +767,14 @@ class ClientConnectionRunner {
}
}
if (_log.shouldLog(Log.INFO))
_log.info("Current leaseSet " + _currentLeaseSet + "\nNew leaseSet " + set);
LeaseRequestState state = null;
_log.info("Current leaseSet " + current + "\nNew leaseSet " + set);
Hash h = set.getDestination().calculateHash();
SessionParams sp = _sessions.get(h);
if (sp == null)
return;
LeaseRequestState state;
synchronized (this) {
state = _leaseRequest;
state = sp.leaseRequest;
if (state != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Already requesting " + state);
@@ -562,7 +791,7 @@ class ClientConnectionRunner {
// fire onCreated?
return; // already requesting
} else {
_leaseRequest = state = new LeaseRequestState(onCreateJob, onFailedJob, _context.clock().now() + expirationTime, set);
sp.leaseRequest = state = new LeaseRequestState(onCreateJob, onFailedJob, _context.clock().now() + expirationTime, set);
if (_log.shouldLog(Log.DEBUG))
_log.debug("New request: " + state);
}
@@ -693,6 +922,7 @@ class ClientConnectionRunner {
private static final int MAX_REQUEUE = 60; // 30 sec.
private class MessageDeliveryStatusUpdate extends JobImpl {
private final SessionId _sessId;
private final MessageId _messageId;
private final long _messageNonce;
private final int _status;
@@ -706,8 +936,9 @@ class ClientConnectionRunner {
* @param messageNonce the client's ID for this message
* @param status see I2CP MessageStatusMessage for success/failure codes
*/
public MessageDeliveryStatusUpdate(MessageId id, long messageNonce, int status) {
public MessageDeliveryStatusUpdate(SessionId sid, MessageId id, long messageNonce, int status) {
super(ClientConnectionRunner.this._context);
_sessId = sid;
_messageId = id;
_messageNonce = messageNonce;
_status = status;
@@ -723,7 +954,7 @@ class ClientConnectionRunner {
MessageStatusMessage msg = new MessageStatusMessage();
msg.setMessageId(_messageId.getMessageId());
msg.setSessionId(_sessionId.getSessionId());
msg.setSessionId(_sessId.getSessionId());
// has to be >= 0, it is initialized to -1
msg.setNonce(_messageNonce);
msg.setSize(0);
@@ -734,12 +965,12 @@ class ClientConnectionRunner {
// bug requeueing forever? failsafe
_log.error("Abandon update for message " + _messageId + " to "
+ MessageStatusMessage.getStatusString(msg.getStatus())
+ " for session " + _sessionId.getSessionId());
+ " for " + _sessId);
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Almost send an update for message " + _messageId + " to "
+ MessageStatusMessage.getStatusString(msg.getStatus())
+ " for session " + _sessionId.getSessionId()
+ " for " + _sessId
+ " before they knew the messageId! delaying .5s");
_lastTried = _context.clock().now();
requeue(REQUEUE_DELAY);
@@ -774,14 +1005,14 @@ class ClientConnectionRunner {
if (_log.shouldLog(Log.DEBUG))
_log.info("Updating message status for message " + _messageId + " to "
+ MessageStatusMessage.getStatusString(msg.getStatus())
+ " for session " + _sessionId.getSessionId()
+ " for " + _sessId
+ " (with nonce=2), retrying after "
+ (_context.clock().now() - _lastTried));
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Updating message status for message " + _messageId + " to "
+ MessageStatusMessage.getStatusString(msg.getStatus())
+ " for session " + _sessionId.getSessionId() + " (with nonce=2)");
+ " for " + _sessId + " (with nonce=2)");
}
try {

View File

@@ -51,9 +51,11 @@ class ClientManager {
protected final List<ClientListenerRunner> _listeners;
// Destination --> ClientConnectionRunner
// Locked for adds/removes but not lookups
// If a runner has multiple sessions it will be in here multiple times, one for each dest
private final Map<Destination, ClientConnectionRunner> _runners;
// Same as what's in _runners, but for fast lookup by Hash
// Locked for adds/removes but not lookups
// If a runner has multiple sessions it will be in here multiple times, one for each dest
private final Map<Hash, ClientConnectionRunner> _runnersByHash;
// ClientConnectionRunner for clients w/out a Dest yet
private final Set<ClientConnectionRunner> _pendingRunners;
@@ -203,24 +205,44 @@ class ClientManager {
}
}
/**
* Remove all sessions for this runner.
*/
public void unregisterConnection(ClientConnectionRunner runner) {
_log.warn("Unregistering (dropping) a client connection");
if (_log.shouldLog(Log.WARN))
_log.warn("Unregistering (dropping) a client connection");
synchronized (_pendingRunners) {
_pendingRunners.remove(runner);
}
if ( (runner.getConfig() != null) && (runner.getConfig().getDestination() != null) ) {
// after connection establishment
Destination dest = runner.getConfig().getDestination();
synchronized (_runners) {
SessionId id = runner.getSessionId();
if (id != null)
_runnerSessionIds.remove(id);
List<SessionId> ids = runner.getSessionIds();
List<Destination> dests = runner.getDestinations();
synchronized (_runners) {
for (SessionId id : ids) {
_runnerSessionIds.remove(id);
}
for (Destination dest : dests) {
_runners.remove(dest);
_runnersByHash.remove(dest.calculateHash());
}
}
}
/**
* Remove only the following session. Does not remove the runner if it has more.
*
* @since 0.9.19
*/
public void unregisterSession(SessionId id, Destination dest) {
if (_log.shouldLog(Log.WARN))
_log.warn("Unregistering client session " + id);
synchronized (_runners) {
_runnerSessionIds.remove(id);
_runners.remove(dest);
_runnersByHash.remove(dest.calculateHash());
}
}
/**
* Add to the clients list. Check for a dup destination.
* Side effect: Sets the session ID of the runner.
@@ -228,8 +250,7 @@ class ClientManager {
*
* @return SessionStatusMessage return code, 1 for success, != 1 for failure
*/
public int destinationEstablished(ClientConnectionRunner runner) {
Destination dest = runner.getConfig().getDestination();
public int destinationEstablished(ClientConnectionRunner runner, Destination dest) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("DestinationEstablished called for destination " + dest.calculateHash().toBase64());
@@ -244,9 +265,10 @@ class ClientManager {
} else {
SessionId id = locked_getNextSessionId();
if (id != null) {
runner.setSessionId(id);
Hash h = dest.calculateHash();
runner.setSessionId(h, id);
_runners.put(dest, runner);
_runnersByHash.put(dest.calculateHash(), runner);
_runnersByHash.put(h, runner);
rv = SessionStatusMessage.STATUS_CREATED;
} else {
rv = SessionStatusMessage.STATUS_REFUSED;
@@ -310,8 +332,11 @@ class ClientManager {
// sender went away
return;
}
ClientMessage msg = new ClientMessage(toDest, payload, runner.getConfig(),
runner.getConfig().getDestination(), msgId,
SessionConfig config = runner.getConfig(fromDest.calculateHash());
if (config == null)
return;
ClientMessage msg = new ClientMessage(toDest, payload, config,
fromDest, msgId,
messageNonce, expiration, flags);
_ctx.clientMessagePool().add(msg, true);
}
@@ -347,7 +372,7 @@ class ClientManager {
public void runJob() {
_to.receiveMessage(_toDest, _fromDest, _payload);
if (_from != null) {
_from.updateMessageDeliveryStatus(_msgId, _messageNonce, MessageStatusMessage.STATUS_SEND_SUCCESS_LOCAL);
_from.updateMessageDeliveryStatus(_fromDest, _msgId, _messageNonce, MessageStatusMessage.STATUS_SEND_SUCCESS_LOCAL);
}
}
}
@@ -410,7 +435,9 @@ class ClientManager {
if (destHash == null) return true;
ClientConnectionRunner runner = getRunner(destHash);
if (runner == null) return true;
return !Boolean.parseBoolean(runner.getConfig().getOptions().getProperty(ClientManagerFacade.PROP_CLIENT_ONLY));
SessionConfig config = runner.getConfig(destHash);
if (config == null) return true;
return !Boolean.parseBoolean(config.getOptions().getProperty(ClientManagerFacade.PROP_CLIENT_ONLY));
}
/**
@@ -437,7 +464,7 @@ class ClientManager {
public SessionConfig getClientSessionConfig(Destination dest) {
ClientConnectionRunner runner = getRunner(dest);
if (runner != null)
return runner.getConfig();
return runner.getConfig(dest.calculateHash());
else
return null;
}
@@ -475,7 +502,7 @@ class ClientManager {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Delivering status " + status + " to "
+ fromDest.calculateHash() + " for message " + id);
runner.updateMessageDeliveryStatus(id, messageNonce, status);
runner.updateMessageDeliveryStatus(fromDest, id, messageNonce, status);
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Cannot deliver status " + status + " to "
@@ -499,7 +526,7 @@ class ClientManager {
if (dest != null) {
ClientConnectionRunner runner = getRunner(dest);
if (runner != null) {
runner.reportAbuse(reason, severity);
runner.reportAbuse(dest, reason, severity);
}
} else {
for (Destination d : _runners.keySet()) {
@@ -577,21 +604,22 @@ class ClientManager {
public void runJob() {
ClientConnectionRunner runner;
if (_msg.getDestination() != null)
runner = getRunner(_msg.getDestination());
Destination dest = _msg.getDestination();
if (dest != null)
runner = getRunner(dest);
else
runner = getRunner(_msg.getDestinationHash());
if (runner != null) {
//_ctx.statManager().addRateData("client.receiveMessageSize",
// _msg.getPayload().getSize(), 0);
runner.receiveMessage(_msg.getDestination(), null, _msg.getPayload());
runner.receiveMessage(dest, null, _msg.getPayload());
} else {
// no client connection...
// we should pool these somewhere...
if (_log.shouldLog(Log.WARN))
_log.warn("Message received but we don't have a connection to "
+ _msg.getDestination() + "/" + _msg.getDestinationHash()
+ dest + "/" + _msg.getDestinationHash()
+ " currently. DROPPED");
}
}

View File

@@ -90,7 +90,7 @@ public class ClientManagerFacadeImpl extends ClientManagerFacade implements Inte
for (Destination dest : _manager.getRunnerDestinations()) {
ClientConnectionRunner runner = _manager.getRunner(dest);
if ( (runner == null) || (runner.getIsDead())) continue;
LeaseSet ls = runner.getLeaseSet();
LeaseSet ls = runner.getLeaseSet(dest.calculateHash());
if (ls == null)
continue; // still building
long howLongAgo = _context.clock().now() - ls.getEarliestLeaseDate();

View File

@@ -195,12 +195,13 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
*/
private void handleCreateSession(CreateSessionMessage message) {
SessionConfig in = message.getSessionConfig();
Destination dest = in.getDestination();
if (in.verifySignature()) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Signature verified correctly on create session message");
} else {
// For now, we do NOT send a SessionStatusMessage - see javadoc above
int itype = in.getDestination().getCertificate().getCertificateType();
int itype = dest.getCertificate().getCertificateType();
SigType stype = SigType.getByCode(itype);
if (stype == null || !stype.isAvailable()) {
_log.error("Client requested unsupported signature type " + itype);
@@ -217,7 +218,7 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
if (!checkAuth(inProps))
return;
SessionId id = _runner.getSessionId();
SessionId id = _runner.getSessionId(dest.calculateHash());
if (id != null) {
_runner.disconnectClient("Already have session " + id);
return;
@@ -226,11 +227,12 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
// Copy over the whole config structure so we don't later corrupt it on
// the client side if we change settings or later get a
// ReconfigureSessionMessage
SessionConfig cfg = new SessionConfig(in.getDestination());
SessionConfig cfg = new SessionConfig(dest);
cfg.setSignature(in.getSignature());
Properties props = new Properties();
props.putAll(in.getOptions());
cfg.setOptions(props);
boolean isPrimary = _runner.getSessionIds().isEmpty();
int status = _runner.sessionEstablished(cfg);
if (status != SessionStatusMessage.STATUS_CREATED) {
// For now, we do NOT send a SessionStatusMessage - see javadoc above
@@ -246,11 +248,29 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
_runner.disconnectClient(msg);
return;
}
sendStatusMessage(status);
sendStatusMessage(id, status);
if (_log.shouldLog(Log.INFO))
_log.info("Session " + _runner.getSessionId() + " established for " + _runner.getDestHash());
startCreateSessionJob();
_log.info("Session " + id + " established for " + dest.calculateHash());
if (isPrimary) {
startCreateSessionJob(cfg);
} else {
SessionConfig pcfg = _runner.getPrimaryConfig();
if (pcfg != null) {
///////////
// new tunnel name etc.
ClientTunnelSettings settings = new ClientTunnelSettings(dest.calculateHash());
// all the primary options, then the overrides from the alias
props.putAll(pcfg.getOptions());
props.putAll(props);
settings.readFromProperties(props);
boolean ok = _context.tunnelManager().addAlias(dest, settings, pcfg.getDestination());
if (!ok) {
_log.error("Add alias failed");
// send status message...
}
}
}
}
/**
@@ -296,8 +316,8 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
* @since 0.9.8
*
*/
protected void startCreateSessionJob() {
_context.jobQueue().addJob(new CreateSessionJob(_context, _runner));
protected void startCreateSessionJob(SessionConfig config) {
_context.jobQueue().addJob(new CreateSessionJob(_context, config));
}
/**
@@ -311,7 +331,8 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
long beforeDistribute = _context.clock().now();
MessageId id = _runner.distributeMessage(message);
long timeToDistribute = _context.clock().now() - beforeDistribute;
_runner.ackSendMessage(id, message.getNonce());
// TODO validate session id
_runner.ackSendMessage(message.getSessionId(), id, message.getNonce());
_context.statManager().addRateData("client.distributeTime", timeToDistribute);
if ( (timeToDistribute > 50) && (_log.shouldLog(Log.INFO)) )
_log.info("Took too long to distribute the message (which holds up the ack): " + timeToDistribute);
@@ -328,7 +349,8 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
_log.debug("Handling recieve begin: id = " + message.getMessageId());
MessagePayloadMessage msg = new MessagePayloadMessage();
msg.setMessageId(message.getMessageId());
msg.setSessionId(_runner.getSessionId().getSessionId());
// TODO validate session id
msg.setSessionId(message.getSessionId());
Payload payload = _runner.getPayload(new MessageId(message.getMessageId()));
if (payload == null) {
if (_log.shouldLog(Log.WARN))
@@ -357,9 +379,18 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
}
private void handleDestroySession(DestroySessionMessage message) {
if (_log.shouldLog(Log.INFO))
_log.info("Destroying client session " + _runner.getSessionId());
_runner.stopRunning();
SessionId id = message.getSessionId();
SessionConfig cfg = _runner.getConfig(id);
_runner.removeSession(id);
int left = _runner.getSessionIds().size();
if (left <= 0) {
_runner.stopRunning();
} else {
if (cfg != null)
_context.tunnelManager().removeAlias(cfg.getDestination());
if (_log.shouldLog(Log.INFO))
_log.info("Still " + left + " sessions left");
}
}
/** override for testing */
@@ -370,7 +401,13 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
_runner.disconnectClient("Invalid CreateLeaseSetMessage");
return;
}
Destination dest = _runner.getConfig().getDestination();
SessionId id = message.getSessionId();
SessionConfig config = _runner.getConfig(id);
if (config == null) {
_log.error("Unknown session in CLSM");
return;
}
Destination dest = config.getDestination();
Destination ndest = message.getLeaseSet().getDestination();
if (!dest.equals(ndest)) {
if (_log.shouldLog(Log.ERROR))
@@ -414,8 +451,7 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
return;
}
if (_log.shouldLog(Log.INFO))
_log.info("New lease set granted for destination "
+ _runner.getDestHash());
_log.info("New lease set granted for destination " + dest);
// leaseSetCreated takes care of all the LeaseRequestState stuff (including firing any jobs)
_runner.leaseSetCreated(message.getLeaseSet());
@@ -423,6 +459,7 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
/** override for testing */
protected void handleDestLookup(DestLookupMessage message) {
// no session id in DLM
_context.jobQueue().addJob(new LookupDestJob(_context, _runner, message.getHash(),
_runner.getDestHash()));
}
@@ -432,10 +469,12 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
* @since 0.9.11
*/
protected void handleHostLookup(HostLookupMessage message) {
Hash h = _runner.getDestHash(message.getSessionId());
if (h == null)
return; // ok?
_context.jobQueue().addJob(new LookupDestJob(_context, _runner, message.getReqID(),
message.getTimeout(), message.getSessionId(),
message.getHash(), message.getHostname(),
_runner.getDestHash()));
message.getHash(), message.getHostname(), h));
}
/**
@@ -447,32 +486,37 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
* ClientConnectionRunner.sessionEstablished(). Those can't be changed later.
*/
private void handleReconfigureSession(ReconfigureSessionMessage message) {
SessionId id = message.getSessionId();
SessionConfig config = _runner.getConfig(id);
if (config == null) {
_log.error("Unknown session");
sendStatusMessage(id, SessionStatusMessage.STATUS_INVALID);
//_runner.stopRunning(); // ok?
return;
}
if (_log.shouldLog(Log.INFO))
_log.info("Updating options - old: " + _runner.getConfig() + " new: " + message.getSessionConfig());
if (!message.getSessionConfig().getDestination().equals(_runner.getConfig().getDestination())) {
_log.info("Updating options - old: " + _runner.getConfig(id) + " new: " + message.getSessionConfig());
if (!message.getSessionConfig().getDestination().equals(config.getDestination())) {
_log.error("Dest mismatch");
sendStatusMessage(SessionStatusMessage.STATUS_INVALID);
sendStatusMessage(id, SessionStatusMessage.STATUS_INVALID);
_runner.stopRunning();
return;
}
_runner.getConfig().getOptions().putAll(message.getSessionConfig().getOptions());
Hash dest = _runner.getDestHash();
Hash dest = config.getDestination().calculateHash();
config.getOptions().putAll(message.getSessionConfig().getOptions());
ClientTunnelSettings settings = new ClientTunnelSettings(dest);
Properties props = new Properties();
props.putAll(_runner.getConfig().getOptions());
props.putAll(config.getOptions());
settings.readFromProperties(props);
_context.tunnelManager().setInboundSettings(dest,
settings.getInboundSettings());
_context.tunnelManager().setOutboundSettings(dest,
settings.getOutboundSettings());
sendStatusMessage(SessionStatusMessage.STATUS_UPDATED);
sendStatusMessage(id, SessionStatusMessage.STATUS_UPDATED);
}
private void sendStatusMessage(int status) {
private void sendStatusMessage(SessionId id, int status) {
SessionStatusMessage msg = new SessionStatusMessage();
SessionId id = _runner.getSessionId();
if (id == null)
id = ClientManager.UNKNOWN_SESSION_ID;
msg.setSessionId(id);
msg.setStatus(status);
try {

View File

@@ -26,25 +26,20 @@ import net.i2p.util.Log;
*/
class CreateSessionJob extends JobImpl {
private final Log _log;
private final ClientConnectionRunner _runner;
private final SessionConfig _config;
public CreateSessionJob(RouterContext context, ClientConnectionRunner runner) {
public CreateSessionJob(RouterContext context, SessionConfig config) {
super(context);
_log = context.logManager().getLog(CreateSessionJob.class);
_runner = runner;
_config = config;
if (_log.shouldLog(Log.DEBUG))
_log.debug("CreateSessionJob for runner " + _runner + " / config: " + _runner.getConfig());
_log.debug("CreateSessionJob for config: " + config);
}
public String getName() { return "Request tunnels for a new client"; }
public void runJob() {
SessionConfig cfg = _runner.getConfig();
if ( (cfg == null) || (cfg.getDestination() == null) ) {
if (_log.shouldLog(Log.ERROR))
_log.error("No session config on runner " + _runner);
return;
}
Hash dest = cfg.getDestination().calculateHash();
Hash dest = _config.getDestination().calculateHash();
if (_log.shouldLog(Log.INFO))
_log.info("Requesting lease set for destination " + dest);
ClientTunnelSettings settings = new ClientTunnelSettings(dest);
@@ -61,10 +56,10 @@ class CreateSessionJob extends JobImpl {
// XXX props.putAll(Router.getInstance().getConfigMap());
// override them by the client's settings
props.putAll(cfg.getOptions());
props.putAll(_config.getOptions());
// and load 'em up (using anything not yet set as the software defaults)
settings.readFromProperties(props);
getContext().tunnelManager().buildTunnels(cfg.getDestination(), settings);
getContext().tunnelManager().buildTunnels(_config.getDestination(), settings);
}
}

View File

@@ -40,6 +40,7 @@ class LeaseRequestState {
/** created lease set from client - FIXME always null */
public LeaseSet getGranted() { return _grantedLeaseSet; }
/** FIXME unused - why? */
public void setGranted(LeaseSet ls) { _grantedLeaseSet = ls; }

View File

@@ -14,6 +14,7 @@ import net.i2p.data.i2cp.I2CPMessageException;
import net.i2p.data.i2cp.MessageId;
import net.i2p.data.i2cp.MessagePayloadMessage;
import net.i2p.data.i2cp.MessageStatusMessage;
import net.i2p.data.i2cp.SessionId;
import net.i2p.router.JobImpl;
import net.i2p.router.RouterContext;
import net.i2p.util.Log;
@@ -26,14 +27,20 @@ import net.i2p.util.Log;
class MessageReceivedJob extends JobImpl {
private final Log _log;
private final ClientConnectionRunner _runner;
private final Destination _toDest;
private final Payload _payload;
private final boolean _sendDirect;
/**
* @param toDest requred to pick session
* @param fromDest ignored, generally null
*/
public MessageReceivedJob(RouterContext ctx, ClientConnectionRunner runner, Destination toDest,
Destination fromDest, Payload payload, boolean sendDirect) {
super(ctx);
_log = ctx.logManager().getLog(MessageReceivedJob.class);
_runner = runner;
_toDest = toDest;
_payload = payload;
_sendDirect = sendDirect;
}
@@ -43,8 +50,8 @@ class MessageReceivedJob extends JobImpl {
public void runJob() {
if (_runner.isDead()) return;
MessageId id = null;
long nextID = _runner.getNextMessageId();
try {
long nextID = _runner.getNextMessageId();
if (_sendDirect) {
sendMessage(nextID);
} else {
@@ -55,7 +62,7 @@ class MessageReceivedJob extends JobImpl {
} catch (I2CPMessageException ime) {
if (_log.shouldLog(Log.WARN))
_log.warn("Error writing out the message", ime);
if (!_sendDirect)
if (id != null && !_sendDirect)
_runner.removePayload(id);
}
}
@@ -69,7 +76,13 @@ class MessageReceivedJob extends JobImpl {
// + " (with nonce=1)", new Exception("available"));
MessageStatusMessage msg = new MessageStatusMessage();
msg.setMessageId(id.getMessageId());
msg.setSessionId(_runner.getSessionId().getSessionId());
SessionId sid = _runner.getSessionId(_toDest.calculateHash());
if (sid == null) {
if (_log.shouldLog(Log.WARN))
_log.warn("No session for " + _toDest.calculateHash());
return;
}
msg.setSessionId(sid.getSessionId());
msg.setSize(size);
// has to be >= 0, it is initialized to -1
msg.setNonce(1);
@@ -84,7 +97,13 @@ class MessageReceivedJob extends JobImpl {
private void sendMessage(long id) throws I2CPMessageException {
MessagePayloadMessage msg = new MessagePayloadMessage();
msg.setMessageId(id);
msg.setSessionId(_runner.getSessionId().getSessionId());
SessionId sid = _runner.getSessionId(_toDest.calculateHash());
if (sid == null) {
if (_log.shouldLog(Log.WARN))
_log.warn("No session for " + _toDest.calculateHash());
return;
}
msg.setSessionId(sid.getSessionId());
msg.setPayload(_payload);
_runner.doSend(msg);
}

View File

@@ -8,10 +8,12 @@ package net.i2p.router.client;
*
*/
import net.i2p.data.Destination;
import net.i2p.data.i2cp.AbuseReason;
import net.i2p.data.i2cp.AbuseSeverity;
import net.i2p.data.i2cp.I2CPMessageException;
import net.i2p.data.i2cp.ReportAbuseMessage;
import net.i2p.data.i2cp.SessionId;
import net.i2p.router.JobImpl;
import net.i2p.router.RouterContext;
import net.i2p.util.Log;
@@ -23,17 +25,22 @@ import net.i2p.util.Log;
class ReportAbuseJob extends JobImpl {
private final Log _log;
private final ClientConnectionRunner _runner;
private final Destination _dest;
private final String _reason;
private final int _severity;
public ReportAbuseJob(RouterContext context, ClientConnectionRunner runner, String reason, int severity) {
public ReportAbuseJob(RouterContext context, ClientConnectionRunner runner,
Destination dest, String reason, int severity) {
super(context);
_log = context.logManager().getLog(ReportAbuseJob.class);
_runner = runner;
_dest = dest;
_reason = reason;
_severity = severity;
}
public String getName() { return "Report Abuse"; }
public void runJob() {
if (_runner.isDead()) return;
AbuseReason res = new AbuseReason();
@@ -41,9 +48,11 @@ class ReportAbuseJob extends JobImpl {
AbuseSeverity sev = new AbuseSeverity();
sev.setSeverity(_severity);
ReportAbuseMessage msg = new ReportAbuseMessage();
msg.setMessageId(null);
msg.setReason(res);
msg.setSessionId(_runner.getSessionId());
SessionId id = _runner.getSessionId(_dest.calculateHash());
if (id == null)
return;
msg.setSessionId(id);
msg.setSeverity(sev);
try {
_runner.doSend(msg);

View File

@@ -16,6 +16,7 @@ import net.i2p.data.i2cp.I2CPMessage;
import net.i2p.data.i2cp.I2CPMessageException;
import net.i2p.data.i2cp.RequestLeaseSetMessage;
import net.i2p.data.i2cp.RequestVariableLeaseSetMessage;
import net.i2p.data.i2cp.SessionId;
import net.i2p.router.JobImpl;
import net.i2p.router.RouterContext;
import net.i2p.util.Log;
@@ -63,13 +64,16 @@ class RequestLeaseSetJob extends JobImpl {
// _log.debug("Adding fudge " + fudge);
endTime += fudge;
SessionId id = _runner.getSessionId(_requestState.getRequested().getDestination().calculateHash());
if (id == null)
return;
I2CPMessage msg;
if (getContext().getProperty(PROP_VARIABLE, DFLT_VARIABLE) &&
(_runner instanceof QueuedClientConnectionRunner ||
RequestVariableLeaseSetMessage.isSupported(_runner.getClientVersion()))) {
// new style - leases will have individual expirations
RequestVariableLeaseSetMessage rmsg = new RequestVariableLeaseSetMessage();
rmsg.setSessionId(_runner.getSessionId());
rmsg.setSessionId(id);
for (int i = 0; i < requested.getLeaseCount(); i++) {
Lease lease = requested.getLease(i);
if (lease.getEndDate().getTime() < endTime) {
@@ -90,7 +94,7 @@ class RequestLeaseSetJob extends JobImpl {
RequestLeaseSetMessage rmsg = new RequestLeaseSetMessage();
Date end = new Date(endTime);
rmsg.setEndDate(end);
rmsg.setSessionId(_runner.getSessionId());
rmsg.setSessionId(id);
for (int i = 0; i < requested.getLeaseCount(); i++) {
Lease lease = requested.getLease(i);
rmsg.addEndpoint(lease.getGateway(),
@@ -144,8 +148,7 @@ class RequestLeaseSetJob extends JobImpl {
CheckLeaseRequestStatus.this.getContext().statManager().addRateData("client.requestLeaseSetTimeout", 1);
if (_log.shouldLog(Log.ERROR)) {
long waited = System.currentTimeMillis() - _start;
_log.error("Failed to receive a leaseSet in the time allotted (" + waited + "): " + _requestState + " for "
+ _runner.getConfig().getDestination().calculateHash().toBase64());
_log.error("Failed to receive a leaseSet in the time allotted (" + waited + "): " + _requestState);
}
if (_requestState.getOnFailed() != null)
RequestLeaseSetJob.this.getContext().jobQueue().addJob(_requestState.getOnFailed());

View File

@@ -50,6 +50,8 @@ public class DummyTunnelManagerFacade implements TunnelManagerFacade {
public int getOutboundClientTunnelCount(Hash destination) { return 0; }
public long getLastParticipatingExpiration() { return -1; }
public void buildTunnels(Destination client, ClientTunnelSettings settings) {}
public boolean addAlias(Destination dest, ClientTunnelSettings settings, Destination existingClient) { return false; }
public void removeAlias(Destination dest) {}
public TunnelPoolSettings getInboundSettings() { return null; }
public TunnelPoolSettings getOutboundSettings() { return null; }
public TunnelPoolSettings getInboundSettings(Hash client) { return null; }

View File

@@ -8,6 +8,7 @@ import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import net.i2p.data.Hash;
@@ -609,6 +610,12 @@ public class TunnelPool {
}
if (ls != null) {
_context.clientManager().requestLeaseSet(_settings.getDestination(), ls);
Set<Hash> aliases = _settings.getAliases();
if (aliases != null && !aliases.isEmpty()) {
for (Hash h : aliases) {
_context.clientManager().requestLeaseSet(h, ls);
}
}
}
}
}

View File

@@ -97,7 +97,7 @@ public class TunnelPoolManager implements TunnelManagerFacade {
ctx.statManager().createRateStat("tunnel.testAborted", "Tunnel test could not occur, since there weren't any tunnels to test with", "Tunnels",
RATES);
}
/**
* Pick a random inbound exploratory tunnel.
* Warning - selectInboundExploratoryTunnel(Hash) is preferred.
@@ -113,7 +113,7 @@ public class TunnelPoolManager implements TunnelManagerFacade {
}
return info;
}
/**
* Pick a random inbound tunnel from the given destination's pool.
* Warning - selectOutboundTunnel(Hash, Hash) is preferred.
@@ -132,7 +132,7 @@ public class TunnelPoolManager implements TunnelManagerFacade {
" but there isn't a pool?");
return null;
}
/**
* Pick a random outbound exploratory tunnel.
* Warning - selectOutboundExploratoryTunnel(Hash) is preferred.
@@ -148,7 +148,7 @@ public class TunnelPoolManager implements TunnelManagerFacade {
}
return info;
}
/**
* Pick a random outbound tunnel from the given destination's pool.
* Warning - selectOutboundTunnel(Hash, Hash) is preferred.
@@ -164,7 +164,7 @@ public class TunnelPoolManager implements TunnelManagerFacade {
}
return null;
}
/**
* Pick the inbound exploratory tunnel with the gateway closest to the given hash.
* By using this instead of the random selectTunnel(),
@@ -184,7 +184,7 @@ public class TunnelPoolManager implements TunnelManagerFacade {
}
return info;
}
/**
* Pick the inbound tunnel with the gateway closest to the given hash
* from the given destination's pool.
@@ -208,7 +208,7 @@ public class TunnelPoolManager implements TunnelManagerFacade {
" but there isn't a pool?");
return null;
}
/**
* Pick the outbound exploratory tunnel with the endpoint closest to the given hash.
* By using this instead of the random selectTunnel(),
@@ -228,7 +228,7 @@ public class TunnelPoolManager implements TunnelManagerFacade {
}
return info;
}
/**
* Pick the outbound tunnel with the endpoint closest to the given hash
* from the given destination's pool.
@@ -249,7 +249,7 @@ public class TunnelPoolManager implements TunnelManagerFacade {
}
return null;
}
/**
* Expensive (iterates through all tunnels of all pools) and unnecessary.
* @deprecated unused
@@ -267,7 +267,7 @@ public class TunnelPoolManager implements TunnelManagerFacade {
if (info != null) return info;
return null;
}
/** @return number of inbound exploratory tunnels */
public int getFreeTunnelCount() {
return _inboundExploratory.size();
@@ -304,10 +304,11 @@ public class TunnelPoolManager implements TunnelManagerFacade {
return pool.getTunnelCount();
return 0;
}
public int getParticipatingCount() { return _context.tunnelDispatcher().getParticipatingCount(); }
public long getLastParticipatingExpiration() { return _context.tunnelDispatcher().getLastParticipatingExpiration(); }
/**
* @return (number of part. tunnels) / (estimated total number of hops in our expl.+client tunnels)
* 100 max.
@@ -330,7 +331,6 @@ public class TunnelPoolManager implements TunnelManagerFacade {
return Math.min(part / (double) count, 100d);
}
public boolean isValidTunnel(Hash client, TunnelInfo tunnel) {
if (tunnel.getExpiration() < _context.clock().now())
return false;
@@ -386,17 +386,18 @@ public class TunnelPoolManager implements TunnelManagerFacade {
pool.setSettings(settings);
}
}
public synchronized void restart() {
_handler.restart();
_executor.restart();
shutdownExploratory();
startup();
}
/**
* Used only at session startup.
* Do not use to change settings.
* Do not use for aliased destinations; use addAlias().
*/
public void buildTunnels(Destination client, ClientTunnelSettings settings) {
Hash dest = client.calculateHash();
@@ -434,8 +435,68 @@ public class TunnelPoolManager implements TunnelManagerFacade {
else
outbound.startup();
}
/**
* Add another destination to the same tunnels.
* Must have same encryption key an a different signing key.
* @throws IllegalArgumentException if not
* @return success
* @since 0.9.19
*/
public boolean addAlias(Destination dest, ClientTunnelSettings settings, Destination existingClient) {
if (dest.getSigningPublicKey().equals(existingClient.getSigningPublicKey()))
throw new IllegalArgumentException("signing key must differ");
if (!dest.getPublicKey().equals(existingClient.getPublicKey()))
throw new IllegalArgumentException("encryption key mismatch");
Hash h = dest.calculateHash();
Hash e = existingClient.calculateHash();
synchronized(this) {
TunnelPool inbound = _clientInboundPools.get(e);
TunnelPool outbound = _clientOutboundPools.get(e);
/////// gah same tunnel pool or different?
if (inbound == null || outbound == null)
return false;
_clientInboundPools.put(h, inbound);
_clientOutboundPools.put(h, outbound);
}
return true;
}
/**
* Remove a destination for the same tunnels as another.
* @since 0.9.19
*/
public void removeAlias(Destination dest) {
Hash h = dest.calculateHash();
synchronized(this) {
TunnelPool inbound = _clientInboundPools.remove(h);
if (inbound != null) {
Hash p = inbound.getSettings().getAliasOf();
if (p != null) {
TunnelPool pri = _clientInboundPools.get(p);
if (pri != null) {
Set<Hash> aliases = pri.getSettings().getAliases();
if (aliases != null)
aliases.remove(h);
}
}
}
TunnelPool outbound = _clientOutboundPools.remove(h);
if (outbound != null) {
Hash p = inbound.getSettings().getAliasOf();
if (p != null) {
TunnelPool pri = _clientInboundPools.get(p);
if (pri != null) {
Set<Hash> aliases = pri.getSettings().getAliases();
if (aliases != null)
aliases.remove(h);
}
}
}
// TODO if primary already vanished...
}
}
private static class DelayedStartup implements SimpleTimer.TimedEvent {
private final TunnelPool pool;
@@ -469,7 +530,7 @@ public class TunnelPoolManager implements TunnelManagerFacade {
if (outbound != null)
outbound.shutdown();
}
/** queue a recurring test job if appropriate */
void buildComplete(PooledTunnelCreatorConfig cfg) {
if (cfg.getLength() > 1 &&
@@ -518,7 +579,7 @@ public class TunnelPoolManager implements TunnelManagerFacade {
_context.jobQueue().addJob(new BootstrapPool(_context, _inboundExploratory));
_context.jobQueue().addJob(new BootstrapPool(_context, _outboundExploratory));
}
private static class BootstrapPool extends JobImpl {
private TunnelPool _pool;
public BootstrapPool(RouterContext ctx, TunnelPool pool) {
@@ -531,7 +592,7 @@ public class TunnelPoolManager implements TunnelManagerFacade {
_pool.buildFallback();
}
}
/**
* Cannot be restarted
*/
@@ -546,7 +607,7 @@ public class TunnelPoolManager implements TunnelManagerFacade {
_inboundExploratory.shutdown();
_outboundExploratory.shutdown();
}
/** list of TunnelPool instances currently in play */
public void listPools(List<TunnelPool> out) {
out.addAll(_clientInboundPools.values());