diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java index e298a9321..3c9808945 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java @@ -891,4 +891,12 @@ class ConnectionManager { if (req != null) req.pong(payload); } + + /** + * @since 0.9.20 + */ + @Override + public String toString() { + return "ConnectionManager for " + _session; + } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketManagerFull.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketManagerFull.java index ff3a09514..8122ab50c 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketManagerFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketManagerFull.java @@ -388,11 +388,16 @@ public class I2PSocketManagerFull implements I2PSocketManager { } private void verifySession() throws I2PException { + verifySession(_connectionManager); + } + + /** @since 0.9.20 */ + private void verifySession(ConnectionManager cm) throws I2PException { if (_isDestroyed.get()) throw new I2PException("Session was closed"); - if (!_connectionManager.getSession().isClosed()) + if (!cm.getSession().isClosed()) return; - _connectionManager.getSession().connect(); + cm.getSession().connect(); } /** @@ -411,7 +416,6 @@ public class I2PSocketManagerFull implements I2PSocketManager { */ public I2PSocket connect(Destination peer, I2PSocketOptions options) throws I2PException, NoRouteToHostException { - verifySession(); if (options == null) options = _defaultOptions; ConnectionOptions opts = null; @@ -437,6 +441,7 @@ public class I2PSocketManagerFull implements I2PSocketManager { } } } + verifySession(cm); // the following blocks unless connect delay > 0 Connection con = cm.connect(peer, opts); if (con == null) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageHandler.java index 9da5568c4..9577e0e5b 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageHandler.java @@ -50,7 +50,7 @@ class MessageHandler implements I2PSessionMuxedListener { * @param size size of the message */ public void messageAvailable(I2PSession session, int msgId, long size, int proto, int fromPort, int toPort) { - byte data[] = null; + byte data[]; try { data = session.receiveMessage(msgId); } catch (I2PSessionException ise) { @@ -59,7 +59,17 @@ class MessageHandler implements I2PSessionMuxedListener { _log.warn("Error receiving the message", ise); return; } - if (data == null) return; + if (data == null) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Received null data on " + session + " proto: " + proto + + " fromPort: " + fromPort + " toPort: " + toPort); + return; + } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Received " + data.length + " bytes on " + session + + " (" + _manager + ')' + + " proto: " + proto + + " fromPort: " + fromPort + " toPort: " + toPort); Packet packet = new Packet(); try { packet.readPacket(data, 0, data.length); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageInputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageInputStream.java index b8ea9a0a2..402db0159 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageInputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageInputStream.java @@ -498,6 +498,27 @@ class MessageInputStream extends InputStream { @Override public void close() { synchronized (_dataLock) { + if (_log.shouldLog(Log.DEBUG)) { + StringBuilder buf = new StringBuilder(128); + buf.append("close(), ready bytes: "); + long available = 0; + for (int i = 0; i < _readyDataBlocks.size(); i++) + available += _readyDataBlocks.get(i).getValid(); + available -= _readyDataBlockIndex; + buf.append(available); + buf.append(" blocks: ").append(_readyDataBlocks.size()); + buf.append(" not ready blocks: "); + long notAvailable = 0; + for (Long id : _notYetReadyBlocks.keySet()) { + ByteArray ba = _notYetReadyBlocks.get(id); + buf.append(id).append(" "); + if (ba != null) + notAvailable += ba.getValid(); + } + buf.append("not ready bytes: ").append(notAvailable); + buf.append(" highest ready block: ").append(_highestReadyBlockId); + _log.debug(buf.toString()); + } //while (_readyDataBlocks.size() > 0) // _cache.release((ByteArray)_readyDataBlocks.remove(0)); _readyDataBlocks.clear(); diff --git a/core/java/src/net/i2p/client/I2PSessionDemultiplexer.java b/core/java/src/net/i2p/client/I2PSessionDemultiplexer.java index 1f07a32fc..ea1ef2437 100644 --- a/core/java/src/net/i2p/client/I2PSessionDemultiplexer.java +++ b/core/java/src/net/i2p/client/I2PSessionDemultiplexer.java @@ -74,7 +74,9 @@ public class I2PSessionDemultiplexer implements I2PSessionMuxedListener { * (Streaming lib) */ public void addListener(I2PSessionListener l, int proto, int port) { - _listeners.put(key(proto, port), new NoPortsListener(l)); + I2PSessionListener old = _listeners.put(key(proto, port), new NoPortsListener(l)); + if (old != null && _log.shouldLog(Log.WARN)) + _log.warn("Listener " + l + " replaces " + old + " for proto: " + proto + " port: " + port); } /** @@ -82,7 +84,9 @@ public class I2PSessionDemultiplexer implements I2PSessionMuxedListener { * UDP perhaps */ public void addMuxedListener(I2PSessionMuxedListener l, int proto, int port) { - _listeners.put(key(proto, port), l); + I2PSessionListener old = _listeners.put(key(proto, port), l); + if (old != null && _log.shouldLog(Log.WARN)) + _log.warn("Listener " + l + " replaces " + old + " for proto: " + proto + " port: " + port); } public void removeListener(int proto, int port) { diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java index 6990f2a51..35baaf51c 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl.java @@ -139,7 +139,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa CLOSED } - private State _state = State.CLOSED; + protected State _state = State.CLOSED; protected final Object _stateLock = new Object(); /** diff --git a/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java b/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java index 880670a53..e62fc3e58 100644 --- a/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java @@ -326,9 +326,9 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 { protected class MuxedAvailabilityNotifier extends AvailabilityNotifier { private final LinkedBlockingQueue _msgs; - private volatile boolean _alive = false; + private volatile boolean _alive; private static final int POISON_SIZE = -99999; - private final AtomicBoolean stopping = new AtomicBoolean(false); + private final AtomicBoolean stopping = new AtomicBoolean(); public MuxedAvailabilityNotifier() { _msgs = new LinkedBlockingQueue(); @@ -336,12 +336,12 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 { @Override public void stopNotifying() { - boolean again = true; synchronized (stopping) { if( !stopping.getAndSet(true)) { - if (_alive == true) { + _msgs.clear(); + if (_alive) { // System.out.println("I2PSessionMuxedImpl.stopNotifying()"); - _msgs.clear(); + boolean again = true; while(again) { try { _msgs.put(new MsgData(0, POISON_SIZE, 0, 0, 0)); @@ -351,8 +351,8 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 { continue; } } + _alive = false; } - _alive = false; stopping.set(false); } // stopping.notifyAll(); @@ -366,17 +366,24 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 { try { _msgs.put(new MsgData((int)(msgId & 0xffffffff), size, proto, fromPort, toPort)); } catch (InterruptedException ie) {} + if (!_alive && _log.shouldLog(Log.WARN)) + _log.warn(getPrefix() + "message available but notifier not running"); } @Override public void run() { - MsgData msg; + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getPrefix() + "starting muxed availability notifier"); + _msgs.clear(); _alive=true; while (_alive) { + MsgData msg; try { msg = _msgs.take(); } catch (InterruptedException ie) { - _log.debug("I2PSessionMuxedImpl.run() InterruptedException " + String.valueOf(_msgs.size()) + " Messages, Alive " + _alive); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("I2PSessionMuxedImpl.run() InterruptedException " + + String.valueOf(_msgs.size()) + " Messages, Alive " + _alive); continue; } if (msg.size == POISON_SIZE) { diff --git a/core/java/src/net/i2p/client/MessagePayloadMessageHandler.java b/core/java/src/net/i2p/client/MessagePayloadMessageHandler.java index 49cc2c45b..39dc99403 100644 --- a/core/java/src/net/i2p/client/MessagePayloadMessageHandler.java +++ b/core/java/src/net/i2p/client/MessagePayloadMessageHandler.java @@ -33,7 +33,7 @@ class MessagePayloadMessageHandler extends HandlerImpl { public void handleMessage(I2CPMessage message, I2PSessionImpl session) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Handle message " + message); + _log.debug("Handle message " + message + " for session " + session); try { MessagePayloadMessage msg = (MessagePayloadMessage) message; long id = msg.getMessageId(); diff --git a/core/java/src/net/i2p/client/SubSession.java b/core/java/src/net/i2p/client/SubSession.java index 9dcf04e72..2a2882335 100644 --- a/core/java/src/net/i2p/client/SubSession.java +++ b/core/java/src/net/i2p/client/SubSession.java @@ -22,8 +22,11 @@ import net.i2p.data.Destination; import net.i2p.data.Hash; import net.i2p.data.PrivateKey; import net.i2p.data.SigningPrivateKey; +import net.i2p.data.i2cp.CreateLeaseSetMessage; +import net.i2p.data.i2cp.CreateSessionMessage; import net.i2p.data.i2cp.I2CPMessage; import net.i2p.data.i2cp.SessionId; +import net.i2p.util.I2PAppThread; /** * An additional session using another session's connection. @@ -105,7 +108,19 @@ class SubSession extends I2PSessionMuxedImpl { */ @Override public void connect() throws I2PSessionException { + synchronized(_stateLock) { + if (_state != State.OPEN) { + _state = State.OPENING; + } + } _primary.connect(); + synchronized(_stateLock) { + if (_state != State.OPEN) { + Thread notifier = new I2PAppThread(_availabilityNotifier, "ClientNotifier " + getPrefix(), true); + notifier.start(); + _state = State.OPEN; + } + } } /** @@ -115,7 +130,7 @@ class SubSession extends I2PSessionMuxedImpl { @Override public boolean isClosed() { // FIXME - return /* getSessionId() == null || */ _primary.isClosed(); + return super.isClosed() || _primary.isClosed(); } /** @@ -126,7 +141,12 @@ class SubSession extends I2PSessionMuxedImpl { */ @Override void sendMessage(I2CPMessage message) throws I2PSessionException { - if (isClosed()) + // workaround for now, as primary will send out our CreateSession + // from his connect, while we are still closed. + // If we did it in connect() we wouldn't need this + if (isClosed() && + message.getType() != CreateSessionMessage.MESSAGE_TYPE && + message.getType() != CreateLeaseSetMessage.MESSAGE_TYPE) throw new I2PSessionException("Already closed"); _primary.sendMessage(message); } @@ -153,6 +173,7 @@ class SubSession extends I2PSessionMuxedImpl { if (_availabilityNotifier != null) _availabilityNotifier.stopNotifying(); if (_sessionListener != null) _sessionListener.disconnected(this); + changeState(State.CLOSED); } /**