I2CP: Fix additional connections getting rejected during tunnel open (ticket #1650)

State change cleanups
State checking consolidation
This commit is contained in:
zzz
2015-11-05 21:18:01 +00:00
parent eca234c187
commit c1d77dfe5c
6 changed files with 82 additions and 49 deletions

View File

@@ -102,7 +102,7 @@ class I2CPMessageProducer {
if (_log.shouldLog(Log.DEBUG)) _log.debug("config signed");
msg.setSessionConfig(cfg);
if (_log.shouldLog(Log.DEBUG)) _log.debug("config loaded into message");
session.sendMessage(msg);
session.sendMessage_unchecked(msg);
if (_log.shouldLog(Log.DEBUG)) _log.debug("config message sent");
}
@@ -114,7 +114,7 @@ class I2CPMessageProducer {
if (session.isClosed()) return;
DestroySessionMessage dmsg = new DestroySessionMessage();
dmsg.setSessionId(session.getSessionId());
session.sendMessage(dmsg);
session.sendMessage_unchecked(dmsg);
// use DisconnectMessage only if we fail and drop connection...
// todo: update the code to fire off DisconnectMessage on socket error
//DisconnectMessage msg = new DisconnectMessage();
@@ -371,7 +371,7 @@ class I2CPMessageProducer {
return;
}
msg.setSessionId(sid);
session.sendMessage(msg);
session.sendMessage_unchecked(msg);
}
/**

View File

@@ -209,8 +209,7 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2
VersionComparator.comp(routerVersion, MIN_SUBSESSION_VERSION) >= 0);
synchronized (_stateLock) {
if (_state == State.OPENING) {
_state = State.GOTDATE;
_stateLock.notifyAll();
changeState(State.GOTDATE);
}
}
}
@@ -635,7 +634,7 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2
auth.setProperty(PROP_USER, _options.getProperty(PROP_USER));
auth.setProperty(PROP_PW, _options.getProperty(PROP_PW));
}
sendMessage(new GetDateMessage(CoreVersion.VERSION, auth));
sendMessage_unchecked(new GetDateMessage(CoreVersion.VERSION, auth));
waitForDate();
if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Before producer.connect()");
@@ -737,14 +736,7 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2
* Report abuse with regards to the given messageId
*/
public void reportAbuse(int msgId, int severity) throws I2PSessionException {
synchronized (_stateLock) {
if (_state == State.CLOSED)
throw new I2PSessionException("Already closed");
if (_state == State.INIT)
throw new I2PSessionException("Not open, must call connect() first");
if (_state == State.OPENING) // not before GOTDATE
throw new I2PSessionException("Session not open yet");
}
verifyOpen();
_producer.reportAbuse(this, msgId, severity);
}
@@ -1034,6 +1026,39 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2
}
}
/**
* Throws I2PSessionException if uninitialized, closed or closing.
* Blocks if opening.
*
* @since 0.9.23
*/
protected void verifyOpen() throws I2PSessionException {
synchronized (_stateLock) {
while (true) {
switch (_state) {
case INIT:
throw new I2PSessionException("Not open, must call connect() first");
case OPENING: // fall thru
case GOTDATE:
try {
_stateLock.wait(5*1000);
continue;
} catch (InterruptedException ie) {
throw new I2PSessionException("Interrupted", ie);
}
case OPEN:
return;
case CLOSING: // fall thru
case CLOSED:
throw new I2PSessionException("Already closed");
}
}
}
}
/**
* Deliver an I2CP message to the router
* As of 0.9.3, may block for several seconds if the write queue to the router is full
@@ -1041,12 +1066,19 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2
* @throws I2PSessionException if the message is malformed or there is an error writing it out
*/
void sendMessage(I2CPMessage message) throws I2PSessionException {
synchronized (_stateLock) {
if (_state == State.CLOSED)
throw new I2PSessionException("Already closed");
if (_state == State.INIT)
throw new I2PSessionException("Not open, must call connect() first");
}
verifyOpen();
sendMessage_unchecked(message);
}
/**
* Deliver an I2CP message to the router.
* Does NOT check state. Call only from connect() or other methods that need to
* send messages when not in OPEN state.
*
* @throws I2PSessionException if the message is malformed or there is an error writing it out
* @since 0.9.23
*/
void sendMessage_unchecked(I2CPMessage message) throws I2PSessionException {
if (_queue != null) {
// internal
try {
@@ -1055,11 +1087,13 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2
} catch (InterruptedException ie) {
throw new I2PSessionException("Interrupted", ie);
}
} else if (_writer == null) {
// race here
throw new I2PSessionException("Already closed or not open");
} else {
_writer.addMessage(message);
ClientWriterRunner writer = _writer;
if (writer == null) {
throw new I2PSessionException("Already closed or not open");
} else {
writer.addMessage(message);
}
}
}
@@ -1441,11 +1475,11 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2
SessionId id = _sessionId;
if (id == null)
id = new SessionId(65535);
sendMessage(new HostLookupMessage(id, h, nonce, maxWait));
sendMessage_unchecked(new HostLookupMessage(id, h, nonce, maxWait));
} else {
if (_log.shouldLog(Log.INFO))
_log.info("Sending DestLookup for " + h);
sendMessage(new DestLookupMessage(h));
sendMessage_unchecked(new DestLookupMessage(h));
}
try {
synchronized (waiter) {
@@ -1533,7 +1567,7 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2
SessionId id = _sessionId;
if (id == null)
id = new SessionId(65535);
sendMessage(new HostLookupMessage(id, name, nonce, maxWait));
sendMessage_unchecked(new HostLookupMessage(id, name, nonce, maxWait));
try {
synchronized (waiter) {
waiter.wait(maxWait);
@@ -1567,7 +1601,7 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2
return null;
}
}
sendMessage(new GetBandwidthLimitsMessage());
sendMessage_unchecked(new GetBandwidthLimitsMessage());
try {
synchronized (_bwReceivedLock) {
_bwReceivedLock.wait(5*1000);

View File

@@ -276,14 +276,7 @@ class I2PSessionImpl2 extends I2PSessionImpl {
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set<SessionTag> tagsSent, long expires)
throws I2PSessionException {
if (_log.shouldLog(Log.DEBUG)) _log.debug("sending message");
synchronized (_stateLock) {
if (_state == State.CLOSED)
throw new I2PSessionException("Already closed");
if (_state == State.INIT)
throw new I2PSessionException("Not open, must call connect() first");
if (_state == State.OPENING || _state == State.GOTDATE) // not before GOTDATE or session
throw new I2PSessionException("Session not open yet");
}
verifyOpen();
updateActivity();
// Sadly there is no way to send something completely uncompressed in a backward-compatible way,

View File

@@ -280,14 +280,7 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 {
* @since 0.9.14
*/
private byte[] prepPayload(byte[] payload, int offset, int size, int proto, int fromPort, int toPort) throws I2PSessionException {
synchronized (_stateLock) {
if (_state == State.CLOSED)
throw new I2PSessionException("Already closed");
if (_state == State.INIT)
throw new I2PSessionException("Not open, must call connect() first");
if (_state == State.OPENING || _state == State.GOTDATE) // not before GOTDATE or session
throw new I2PSessionException("Session not open yet");
}
verifyOpen();
updateActivity();
if (shouldCompress(size))

View File

@@ -122,11 +122,11 @@ public class I2PSimpleSession extends I2PSessionImpl2 {
Properties auth = new OrderedProperties();
auth.setProperty(PROP_USER, opts.getProperty(PROP_USER));
auth.setProperty(PROP_PW, opts.getProperty(PROP_PW));
sendMessage(new GetDateMessage(CoreVersion.VERSION, auth));
sendMessage_unchecked(new GetDateMessage(CoreVersion.VERSION, auth));
} else {
// we must now send a GetDate even in SimpleSession, or we won't know
// what version we are talking with and cannot use HostLookup
sendMessage(new GetDateMessage(CoreVersion.VERSION));
sendMessage_unchecked(new GetDateMessage(CoreVersion.VERSION));
}
waitForDate();
}

View File

@@ -100,7 +100,7 @@ class SubSession extends I2PSessionMuxedImpl {
public void connect() throws I2PSessionException {
synchronized(_stateLock) {
if (_state != State.OPEN) {
_state = State.OPENING;
changeState(State.OPENING);
}
}
boolean success = false;
@@ -121,7 +121,7 @@ class SubSession extends I2PSessionMuxedImpl {
if (_state != State.OPEN) {
Thread notifier = new I2PAppThread(_availabilityNotifier, "ClientNotifier " + getPrefix(), true);
notifier.start();
_state = State.OPEN;
changeState(State.OPEN);
}
}
success = true;
@@ -161,7 +161,20 @@ class SubSession extends I2PSessionMuxedImpl {
message.getType() != CreateSessionMessage.MESSAGE_TYPE &&
message.getType() != CreateLeaseSetMessage.MESSAGE_TYPE)
throw new I2PSessionException("Already closed");
_primary.sendMessage(message);
_primary.sendMessage_unchecked(message);
}
/**
* Deliver an I2CP message to the router.
* Does NOT check state. Call only from connect() or other methods that need to
* send messages when not in OPEN state.
*
* @throws I2PSessionException if the message is malformed or there is an error writing it out
* @since 0.9.23
*/
@Override
void sendMessage_unchecked(I2CPMessage message) throws I2PSessionException {
_primary.sendMessage_unchecked(message);
}
/**