Catch uncaught exceptions in ClientConnectionRunner and stop connection

Catch null SessionId in messages and stop connection instead of NPE
Wait for LS in SubSession in connect() so we don't send data w/o
a session ID and leaseset
Log tweaks
This commit is contained in:
zzz
2015-06-17 02:16:06 +00:00
parent bc85543ef2
commit 036b77746b
6 changed files with 78 additions and 23 deletions

View File

@@ -81,7 +81,7 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2
/** this session's Id */ /** this session's Id */
private SessionId _sessionId; private SessionId _sessionId;
/** currently granted lease set, or null */ /** currently granted lease set, or null */
private volatile LeaseSet _leaseSet; protected volatile LeaseSet _leaseSet;
// subsession stuff // subsession stuff
// registered subsessions // registered subsessions
@@ -130,7 +130,7 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2
protected final I2PAppContext _context; protected final I2PAppContext _context;
/** monitor for waiting until a lease set has been granted */ /** monitor for waiting until a lease set has been granted */
private final Object _leaseSetWait = new Object(); protected final Object _leaseSetWait = new Object();
/** /**
* @since 0.9.8 * @since 0.9.8

View File

@@ -101,12 +101,36 @@ class SubSession extends I2PSessionMuxedImpl {
_state = State.OPENING; _state = State.OPENING;
} }
} }
_primary.connect(); boolean success = false;
synchronized(_stateLock) { try {
if (_state != State.OPEN) { _primary.connect();
Thread notifier = new I2PAppThread(_availabilityNotifier, "ClientNotifier " + getPrefix(), true); // wait until we have created a lease set
notifier.start(); int waitcount = 0;
_state = State.OPEN; while (_leaseSet == null) {
if (waitcount++ > 5*60) {
throw new IOException("No tunnels built after waiting 5 minutes. Your network connection may be down, or there is severe network congestion.");
}
synchronized (_leaseSetWait) {
// InterruptedException caught below
_leaseSetWait.wait(1000);
}
}
synchronized(_stateLock) {
if (_state != State.OPEN) {
Thread notifier = new I2PAppThread(_availabilityNotifier, "ClientNotifier " + getPrefix(), true);
notifier.start();
_state = State.OPEN;
}
}
success = true;
} catch (InterruptedException ie) {
throw new I2PSessionException("Interrupted", ie);
} catch (IOException ioe) {
throw new I2PSessionException(getPrefix() + "Cannot connect to the router on " + _hostname + ':' + _portNum, ioe);
} finally {
if (!success) {
_availabilityNotifier.stopNotifying();
changeState(State.CLOSED);
} }
} }
} }

View File

@@ -158,6 +158,20 @@ public class I2CPMessageReader {
} }
public void run() { public void run() {
try {
run2();
} catch (Exception e) {
_log.log(Log.CRIT, "Uncaught I2CP error", e);
_listener.readError(I2CPMessageReader.this, e);
cancelRunner();
}
}
/**
* Called by run()
* @since 0.9.21
*/
protected void run2() {
while (_stayAlive) { while (_stayAlive) {
while (_doRun) { while (_doRun) {
// do read // do read

View File

@@ -42,7 +42,7 @@ public class QueuedI2CPMessageReader extends I2CPMessageReader {
* Pumps messages from the incoming message queue to the listener. * Pumps messages from the incoming message queue to the listener.
*/ */
@Override @Override
public void run() { protected void run2() {
while (_stayAlive) { while (_stayAlive) {
while (_doRun) { while (_doRun) {
// do read // do read

View File

@@ -228,9 +228,12 @@ class ClientConnectionRunner {
* Current client's config, * Current client's config,
* will be null if session not found * will be null if session not found
* IS subsession aware. * IS subsession aware.
* Returns null if id is null.
* @since 0.9.21 added id param * @since 0.9.21 added id param
*/ */
public SessionConfig getConfig(SessionId id) { public SessionConfig getConfig(SessionId id) {
if (id == null)
return null;
for (SessionParams sp : _sessions.values()) { for (SessionParams sp : _sessions.values()) {
if (id.equals(sp.sessionId)) if (id.equals(sp.sessionId))
return sp.config; return sp.config;
@@ -317,6 +320,8 @@ class ClientConnectionRunner {
* @since 0.9.21 * @since 0.9.21
*/ */
public Hash getDestHash(SessionId id) { public Hash getDestHash(SessionId id) {
if (id == null)
return null;
for (Map.Entry<Hash, SessionParams> e : _sessions.entrySet()) { for (Map.Entry<Hash, SessionParams> e : _sessions.entrySet()) {
if (id.equals(e.getValue().sessionId)) if (id.equals(e.getValue().sessionId))
return e.getKey(); return e.getKey();
@@ -330,6 +335,8 @@ class ClientConnectionRunner {
* @since 0.9.21 * @since 0.9.21
*/ */
public Destination getDestination(SessionId id) { public Destination getDestination(SessionId id) {
if (id == null)
return null;
for (SessionParams sp : _sessions.values()) { for (SessionParams sp : _sessions.values()) {
if (id.equals(sp.sessionId)) if (id.equals(sp.sessionId))
return sp.dest; return sp.dest;
@@ -391,6 +398,8 @@ class ClientConnectionRunner {
void setSessionId(Hash hash, SessionId id) { void setSessionId(Hash hash, SessionId id) {
if (hash == null) if (hash == null)
throw new IllegalStateException(); throw new IllegalStateException();
if (id == null)
throw new NullPointerException();
SessionParams sp = _sessions.get(hash); SessionParams sp = _sessions.get(hash);
if (sp == null || sp.sessionId != null) if (sp == null || sp.sessionId != null)
throw new IllegalStateException(); throw new IllegalStateException();
@@ -403,6 +412,8 @@ class ClientConnectionRunner {
* @since 0.9.21 * @since 0.9.21
*/ */
void removeSession(SessionId id) { void removeSession(SessionId id) {
if (id == null)
return;
boolean isPrimary = false; boolean isPrimary = false;
for (Iterator<SessionParams> iter = _sessions.values().iterator(); iter.hasNext(); ) { for (Iterator<SessionParams> iter = _sessions.values().iterator(); iter.hasNext(); ) {
SessionParams sp = iter.next(); SessionParams sp = iter.next();
@@ -813,18 +824,20 @@ class ClientConnectionRunner {
synchronized (this) { synchronized (this) {
state = sp.leaseRequest; state = sp.leaseRequest;
if (state != null) { if (state != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Already requesting " + state);
LeaseSet requested = state.getRequested(); LeaseSet requested = state.getRequested();
LeaseSet granted = state.getGranted(); LeaseSet granted = state.getGranted();
long ours = set.getEarliestLeaseDate(); long ours = set.getEarliestLeaseDate();
if ( ( (requested != null) && (requested.getEarliestLeaseDate() > ours) ) || if ( ( (requested != null) && (requested.getEarliestLeaseDate() > ours) ) ||
( (granted != null) && (granted.getEarliestLeaseDate() > ours) ) ) { ( (granted != null) && (granted.getEarliestLeaseDate() > ours) ) ) {
// theirs is newer // theirs is newer
if (_log.shouldLog(Log.DEBUG))
_log.debug("Already requesting, theirs newer, do nothing: " + state);
} else { } else {
// ours is newer, so wait a few secs and retry // ours is newer, so wait a few secs and retry
set.setDestination(dest); set.setDestination(dest);
_context.simpleTimer2().addEvent(new Rerequest(set, expirationTime, onCreateJob, onFailedJob), 3*1000); _context.simpleTimer2().addEvent(new Rerequest(set, expirationTime, onCreateJob, onFailedJob), 3*1000);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Already requesting, ours newer, wait 3 sec: " + state);
} }
// fire onCreated? // fire onCreated?
return; // already requesting return; // already requesting

View File

@@ -80,7 +80,11 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
* *
*/ */
public void messageReceived(I2CPMessageReader reader, I2CPMessage message) { public void messageReceived(I2CPMessageReader reader, I2CPMessage message) {
if (_runner.isDead()) return; if (_runner.isDead()) {
if (_log.shouldLog(Log.WARN))
_log.warn("Received but runner dead: \n" + message);
return;
}
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Message received: \n" + message); _log.debug("Message received: \n" + message);
int type = message.getType(); int type = message.getType();
@@ -362,8 +366,8 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
SessionConfig cfg = _runner.getConfig(sid); SessionConfig cfg = _runner.getConfig(sid);
if (cfg == null) { if (cfg == null) {
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.ERROR))
_log.error("SendMessage w/o session"); _log.error("SendMessage w/o session: " + sid);
_runner.disconnectClient("SendMessage w/o session"); _runner.disconnectClient("SendMessage w/o session: " + sid);
return; return;
} }
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
@@ -372,10 +376,10 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
MessageId id = _runner.distributeMessage(message); MessageId id = _runner.distributeMessage(message);
long timeToDistribute = _context.clock().now() - beforeDistribute; long timeToDistribute = _context.clock().now() - beforeDistribute;
// TODO validate session id // TODO validate session id
_runner.ackSendMessage(message.getSessionId(), id, message.getNonce()); _runner.ackSendMessage(sid, id, message.getNonce());
_context.statManager().addRateData("client.distributeTime", timeToDistribute); _context.statManager().addRateData("client.distributeTime", timeToDistribute);
if ( (timeToDistribute > 50) && (_log.shouldLog(Log.INFO)) ) if ( (timeToDistribute > 50) && (_log.shouldLog(Log.DEBUG)) )
_log.info("Took too long to distribute the message (which holds up the ack): " + timeToDistribute); _log.debug("Took too long to distribute the message (which holds up the ack): " + timeToDistribute);
} }
@@ -386,7 +390,7 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
private void handleReceiveBegin(ReceiveMessageBeginMessage message) { private void handleReceiveBegin(ReceiveMessageBeginMessage message) {
if (_runner.isDead()) return; if (_runner.isDead()) return;
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Handling recieve begin: id = " + message.getMessageId()); _log.debug("Handling receive begin: id = " + message.getMessageId());
MessagePayloadMessage msg = new MessagePayloadMessage(); MessagePayloadMessage msg = new MessagePayloadMessage();
msg.setMessageId(message.getMessageId()); msg.setMessageId(message.getMessageId());
// TODO validate session id // TODO validate session id
@@ -409,7 +413,7 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
} }
/** /**
* The client told us that the message has been recieved completely. This currently * The client told us that the message has been received completely. This currently
* does not do any security checking prior to removing the message from the * does not do any security checking prior to removing the message from the
* pending queue, though it should. * pending queue, though it should.
* *
@@ -443,8 +447,8 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
SessionConfig cfg = _runner.getConfig(id); SessionConfig cfg = _runner.getConfig(id);
if (cfg == null) { if (cfg == null) {
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.ERROR))
_log.error("CreateLeaseSet w/o session"); _log.error("CreateLeaseSet w/o session: " + id);
_runner.disconnectClient("CreateLeaseSet w/o session"); _runner.disconnectClient("CreateLeaseSet w/o session: " + id);
return; return;
} }
Destination dest = cfg.getDestination(); Destination dest = cfg.getDestination();
@@ -533,9 +537,9 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
SessionConfig cfg = _runner.getConfig(id); SessionConfig cfg = _runner.getConfig(id);
if (cfg == null) { if (cfg == null) {
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.ERROR))
_log.error("ReconfigureSession w/o session"); _log.error("ReconfigureSession w/o session: " + id);
//sendStatusMessage(id, SessionStatusMessage.STATUS_INVALID); //sendStatusMessage(id, SessionStatusMessage.STATUS_INVALID);
_runner.disconnectClient("ReconfigureSession w/o session"); _runner.disconnectClient("ReconfigureSession w/o session: " + id);
return; return;
} }
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))