diff --git a/core/java/src/net/i2p/client/impl/I2CPMessageProducer.java b/core/java/src/net/i2p/client/impl/I2CPMessageProducer.java index bcdd0b4ea..dd6783f43 100644 --- a/core/java/src/net/i2p/client/impl/I2CPMessageProducer.java +++ b/core/java/src/net/i2p/client/impl/I2CPMessageProducer.java @@ -102,7 +102,7 @@ class I2CPMessageProducer { if (_log.shouldLog(Log.DEBUG)) _log.debug("config signed"); msg.setSessionConfig(cfg); if (_log.shouldLog(Log.DEBUG)) _log.debug("config loaded into message"); - session.sendMessage(msg); + session.sendMessage_unchecked(msg); if (_log.shouldLog(Log.DEBUG)) _log.debug("config message sent"); } @@ -114,7 +114,7 @@ class I2CPMessageProducer { if (session.isClosed()) return; DestroySessionMessage dmsg = new DestroySessionMessage(); dmsg.setSessionId(session.getSessionId()); - session.sendMessage(dmsg); + session.sendMessage_unchecked(dmsg); // use DisconnectMessage only if we fail and drop connection... // todo: update the code to fire off DisconnectMessage on socket error //DisconnectMessage msg = new DisconnectMessage(); @@ -371,7 +371,7 @@ class I2CPMessageProducer { return; } msg.setSessionId(sid); - session.sendMessage(msg); + session.sendMessage_unchecked(msg); } /** diff --git a/core/java/src/net/i2p/client/impl/I2PSessionImpl.java b/core/java/src/net/i2p/client/impl/I2PSessionImpl.java index 4307924de..046683ea4 100644 --- a/core/java/src/net/i2p/client/impl/I2PSessionImpl.java +++ b/core/java/src/net/i2p/client/impl/I2PSessionImpl.java @@ -209,8 +209,7 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2 VersionComparator.comp(routerVersion, MIN_SUBSESSION_VERSION) >= 0); synchronized (_stateLock) { if (_state == State.OPENING) { - _state = State.GOTDATE; - _stateLock.notifyAll(); + changeState(State.GOTDATE); } } } @@ -635,7 +634,7 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2 auth.setProperty(PROP_USER, _options.getProperty(PROP_USER)); auth.setProperty(PROP_PW, _options.getProperty(PROP_PW)); } - sendMessage(new GetDateMessage(CoreVersion.VERSION, auth)); + sendMessage_unchecked(new GetDateMessage(CoreVersion.VERSION, auth)); waitForDate(); if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Before producer.connect()"); @@ -737,14 +736,7 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2 * Report abuse with regards to the given messageId */ public void reportAbuse(int msgId, int severity) throws I2PSessionException { - synchronized (_stateLock) { - if (_state == State.CLOSED) - throw new I2PSessionException("Already closed"); - if (_state == State.INIT) - throw new I2PSessionException("Not open, must call connect() first"); - if (_state == State.OPENING) // not before GOTDATE - throw new I2PSessionException("Session not open yet"); - } + verifyOpen(); _producer.reportAbuse(this, msgId, severity); } @@ -1034,6 +1026,39 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2 } } + /** + * Throws I2PSessionException if uninitialized, closed or closing. + * Blocks if opening. + * + * @since 0.9.23 + */ + protected void verifyOpen() throws I2PSessionException { + synchronized (_stateLock) { + while (true) { + switch (_state) { + case INIT: + throw new I2PSessionException("Not open, must call connect() first"); + + case OPENING: // fall thru + case GOTDATE: + try { + _stateLock.wait(5*1000); + continue; + } catch (InterruptedException ie) { + throw new I2PSessionException("Interrupted", ie); + } + + case OPEN: + return; + + case CLOSING: // fall thru + case CLOSED: + throw new I2PSessionException("Already closed"); + } + } + } + } + /** * Deliver an I2CP message to the router * As of 0.9.3, may block for several seconds if the write queue to the router is full @@ -1041,12 +1066,19 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2 * @throws I2PSessionException if the message is malformed or there is an error writing it out */ void sendMessage(I2CPMessage message) throws I2PSessionException { - synchronized (_stateLock) { - if (_state == State.CLOSED) - throw new I2PSessionException("Already closed"); - if (_state == State.INIT) - throw new I2PSessionException("Not open, must call connect() first"); - } + verifyOpen(); + sendMessage_unchecked(message); + } + + /** + * Deliver an I2CP message to the router. + * Does NOT check state. Call only from connect() or other methods that need to + * send messages when not in OPEN state. + * + * @throws I2PSessionException if the message is malformed or there is an error writing it out + * @since 0.9.23 + */ + void sendMessage_unchecked(I2CPMessage message) throws I2PSessionException { if (_queue != null) { // internal try { @@ -1055,11 +1087,13 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2 } catch (InterruptedException ie) { throw new I2PSessionException("Interrupted", ie); } - } else if (_writer == null) { - // race here - throw new I2PSessionException("Already closed or not open"); } else { - _writer.addMessage(message); + ClientWriterRunner writer = _writer; + if (writer == null) { + throw new I2PSessionException("Already closed or not open"); + } else { + writer.addMessage(message); + } } } @@ -1441,11 +1475,11 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2 SessionId id = _sessionId; if (id == null) id = new SessionId(65535); - sendMessage(new HostLookupMessage(id, h, nonce, maxWait)); + sendMessage_unchecked(new HostLookupMessage(id, h, nonce, maxWait)); } else { if (_log.shouldLog(Log.INFO)) _log.info("Sending DestLookup for " + h); - sendMessage(new DestLookupMessage(h)); + sendMessage_unchecked(new DestLookupMessage(h)); } try { synchronized (waiter) { @@ -1533,7 +1567,7 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2 SessionId id = _sessionId; if (id == null) id = new SessionId(65535); - sendMessage(new HostLookupMessage(id, name, nonce, maxWait)); + sendMessage_unchecked(new HostLookupMessage(id, name, nonce, maxWait)); try { synchronized (waiter) { waiter.wait(maxWait); @@ -1567,7 +1601,7 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2 return null; } } - sendMessage(new GetBandwidthLimitsMessage()); + sendMessage_unchecked(new GetBandwidthLimitsMessage()); try { synchronized (_bwReceivedLock) { _bwReceivedLock.wait(5*1000); diff --git a/core/java/src/net/i2p/client/impl/I2PSessionImpl2.java b/core/java/src/net/i2p/client/impl/I2PSessionImpl2.java index e110889be..0cf397d79 100644 --- a/core/java/src/net/i2p/client/impl/I2PSessionImpl2.java +++ b/core/java/src/net/i2p/client/impl/I2PSessionImpl2.java @@ -276,14 +276,7 @@ class I2PSessionImpl2 extends I2PSessionImpl { public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expires) throws I2PSessionException { if (_log.shouldLog(Log.DEBUG)) _log.debug("sending message"); - synchronized (_stateLock) { - if (_state == State.CLOSED) - throw new I2PSessionException("Already closed"); - if (_state == State.INIT) - throw new I2PSessionException("Not open, must call connect() first"); - if (_state == State.OPENING || _state == State.GOTDATE) // not before GOTDATE or session - throw new I2PSessionException("Session not open yet"); - } + verifyOpen(); updateActivity(); // Sadly there is no way to send something completely uncompressed in a backward-compatible way, diff --git a/core/java/src/net/i2p/client/impl/I2PSessionMuxedImpl.java b/core/java/src/net/i2p/client/impl/I2PSessionMuxedImpl.java index 6ee2b1e11..07003ae2d 100644 --- a/core/java/src/net/i2p/client/impl/I2PSessionMuxedImpl.java +++ b/core/java/src/net/i2p/client/impl/I2PSessionMuxedImpl.java @@ -280,14 +280,7 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 { * @since 0.9.14 */ private byte[] prepPayload(byte[] payload, int offset, int size, int proto, int fromPort, int toPort) throws I2PSessionException { - synchronized (_stateLock) { - if (_state == State.CLOSED) - throw new I2PSessionException("Already closed"); - if (_state == State.INIT) - throw new I2PSessionException("Not open, must call connect() first"); - if (_state == State.OPENING || _state == State.GOTDATE) // not before GOTDATE or session - throw new I2PSessionException("Session not open yet"); - } + verifyOpen(); updateActivity(); if (shouldCompress(size)) diff --git a/core/java/src/net/i2p/client/impl/I2PSimpleSession.java b/core/java/src/net/i2p/client/impl/I2PSimpleSession.java index dd59ea463..010121b1c 100644 --- a/core/java/src/net/i2p/client/impl/I2PSimpleSession.java +++ b/core/java/src/net/i2p/client/impl/I2PSimpleSession.java @@ -122,11 +122,11 @@ public class I2PSimpleSession extends I2PSessionImpl2 { Properties auth = new OrderedProperties(); auth.setProperty(PROP_USER, opts.getProperty(PROP_USER)); auth.setProperty(PROP_PW, opts.getProperty(PROP_PW)); - sendMessage(new GetDateMessage(CoreVersion.VERSION, auth)); + sendMessage_unchecked(new GetDateMessage(CoreVersion.VERSION, auth)); } else { // we must now send a GetDate even in SimpleSession, or we won't know // what version we are talking with and cannot use HostLookup - sendMessage(new GetDateMessage(CoreVersion.VERSION)); + sendMessage_unchecked(new GetDateMessage(CoreVersion.VERSION)); } waitForDate(); } diff --git a/core/java/src/net/i2p/client/impl/SubSession.java b/core/java/src/net/i2p/client/impl/SubSession.java index 51c5f0b16..93a115bff 100644 --- a/core/java/src/net/i2p/client/impl/SubSession.java +++ b/core/java/src/net/i2p/client/impl/SubSession.java @@ -100,7 +100,7 @@ class SubSession extends I2PSessionMuxedImpl { public void connect() throws I2PSessionException { synchronized(_stateLock) { if (_state != State.OPEN) { - _state = State.OPENING; + changeState(State.OPENING); } } boolean success = false; @@ -121,7 +121,7 @@ class SubSession extends I2PSessionMuxedImpl { if (_state != State.OPEN) { Thread notifier = new I2PAppThread(_availabilityNotifier, "ClientNotifier " + getPrefix(), true); notifier.start(); - _state = State.OPEN; + changeState(State.OPEN); } } success = true; @@ -161,7 +161,20 @@ class SubSession extends I2PSessionMuxedImpl { message.getType() != CreateSessionMessage.MESSAGE_TYPE && message.getType() != CreateLeaseSetMessage.MESSAGE_TYPE) throw new I2PSessionException("Already closed"); - _primary.sendMessage(message); + _primary.sendMessage_unchecked(message); + } + + /** + * Deliver an I2CP message to the router. + * Does NOT check state. Call only from connect() or other methods that need to + * send messages when not in OPEN state. + * + * @throws I2PSessionException if the message is malformed or there is an error writing it out + * @since 0.9.23 + */ + @Override + void sendMessage_unchecked(I2CPMessage message) throws I2PSessionException { + _primary.sendMessage_unchecked(message); } /**