I2CP Multisession - Work in progress:

Start availability notifier in subsession
Availability notifier cleanup
Various log tweaks added while chasing this down
Better subsession state management
I2PSocketManagerFull verifies subsession to force connect()
Successfully tested
This commit is contained in:
zzz
2015-04-19 19:05:53 +00:00
parent b8c8d5b447
commit b2872e6110
9 changed files with 95 additions and 19 deletions

View File

@@ -891,4 +891,12 @@ class ConnectionManager {
if (req != null)
req.pong(payload);
}
/**
* @since 0.9.20
*/
@Override
public String toString() {
return "ConnectionManager for " + _session;
}
}

View File

@@ -388,11 +388,16 @@ public class I2PSocketManagerFull implements I2PSocketManager {
}
private void verifySession() throws I2PException {
verifySession(_connectionManager);
}
/** @since 0.9.20 */
private void verifySession(ConnectionManager cm) throws I2PException {
if (_isDestroyed.get())
throw new I2PException("Session was closed");
if (!_connectionManager.getSession().isClosed())
if (!cm.getSession().isClosed())
return;
_connectionManager.getSession().connect();
cm.getSession().connect();
}
/**
@@ -411,7 +416,6 @@ public class I2PSocketManagerFull implements I2PSocketManager {
*/
public I2PSocket connect(Destination peer, I2PSocketOptions options)
throws I2PException, NoRouteToHostException {
verifySession();
if (options == null)
options = _defaultOptions;
ConnectionOptions opts = null;
@@ -437,6 +441,7 @@ public class I2PSocketManagerFull implements I2PSocketManager {
}
}
}
verifySession(cm);
// the following blocks unless connect delay > 0
Connection con = cm.connect(peer, opts);
if (con == null)

View File

@@ -50,7 +50,7 @@ class MessageHandler implements I2PSessionMuxedListener {
* @param size size of the message
*/
public void messageAvailable(I2PSession session, int msgId, long size, int proto, int fromPort, int toPort) {
byte data[] = null;
byte data[];
try {
data = session.receiveMessage(msgId);
} catch (I2PSessionException ise) {
@@ -59,7 +59,17 @@ class MessageHandler implements I2PSessionMuxedListener {
_log.warn("Error receiving the message", ise);
return;
}
if (data == null) return;
if (data == null) {
if (_log.shouldLog(Log.WARN))
_log.warn("Received null data on " + session + " proto: " + proto +
" fromPort: " + fromPort + " toPort: " + toPort);
return;
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received " + data.length + " bytes on " + session +
" (" + _manager + ')' +
" proto: " + proto +
" fromPort: " + fromPort + " toPort: " + toPort);
Packet packet = new Packet();
try {
packet.readPacket(data, 0, data.length);

View File

@@ -498,6 +498,27 @@ class MessageInputStream extends InputStream {
@Override
public void close() {
synchronized (_dataLock) {
if (_log.shouldLog(Log.DEBUG)) {
StringBuilder buf = new StringBuilder(128);
buf.append("close(), ready bytes: ");
long available = 0;
for (int i = 0; i < _readyDataBlocks.size(); i++)
available += _readyDataBlocks.get(i).getValid();
available -= _readyDataBlockIndex;
buf.append(available);
buf.append(" blocks: ").append(_readyDataBlocks.size());
buf.append(" not ready blocks: ");
long notAvailable = 0;
for (Long id : _notYetReadyBlocks.keySet()) {
ByteArray ba = _notYetReadyBlocks.get(id);
buf.append(id).append(" ");
if (ba != null)
notAvailable += ba.getValid();
}
buf.append("not ready bytes: ").append(notAvailable);
buf.append(" highest ready block: ").append(_highestReadyBlockId);
_log.debug(buf.toString());
}
//while (_readyDataBlocks.size() > 0)
// _cache.release((ByteArray)_readyDataBlocks.remove(0));
_readyDataBlocks.clear();

View File

@@ -74,7 +74,9 @@ public class I2PSessionDemultiplexer implements I2PSessionMuxedListener {
* (Streaming lib)
*/
public void addListener(I2PSessionListener l, int proto, int port) {
_listeners.put(key(proto, port), new NoPortsListener(l));
I2PSessionListener old = _listeners.put(key(proto, port), new NoPortsListener(l));
if (old != null && _log.shouldLog(Log.WARN))
_log.warn("Listener " + l + " replaces " + old + " for proto: " + proto + " port: " + port);
}
/**
@@ -82,7 +84,9 @@ public class I2PSessionDemultiplexer implements I2PSessionMuxedListener {
* UDP perhaps
*/
public void addMuxedListener(I2PSessionMuxedListener l, int proto, int port) {
_listeners.put(key(proto, port), l);
I2PSessionListener old = _listeners.put(key(proto, port), l);
if (old != null && _log.shouldLog(Log.WARN))
_log.warn("Listener " + l + " replaces " + old + " for proto: " + proto + " port: " + port);
}
public void removeListener(int proto, int port) {

View File

@@ -139,7 +139,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
CLOSED
}
private State _state = State.CLOSED;
protected State _state = State.CLOSED;
protected final Object _stateLock = new Object();
/**

View File

@@ -326,9 +326,9 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 {
protected class MuxedAvailabilityNotifier extends AvailabilityNotifier {
private final LinkedBlockingQueue<MsgData> _msgs;
private volatile boolean _alive = false;
private volatile boolean _alive;
private static final int POISON_SIZE = -99999;
private final AtomicBoolean stopping = new AtomicBoolean(false);
private final AtomicBoolean stopping = new AtomicBoolean();
public MuxedAvailabilityNotifier() {
_msgs = new LinkedBlockingQueue<MsgData>();
@@ -336,12 +336,12 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 {
@Override
public void stopNotifying() {
boolean again = true;
synchronized (stopping) {
if( !stopping.getAndSet(true)) {
if (_alive == true) {
_msgs.clear();
if (_alive) {
// System.out.println("I2PSessionMuxedImpl.stopNotifying()");
_msgs.clear();
boolean again = true;
while(again) {
try {
_msgs.put(new MsgData(0, POISON_SIZE, 0, 0, 0));
@@ -351,8 +351,8 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 {
continue;
}
}
_alive = false;
}
_alive = false;
stopping.set(false);
}
// stopping.notifyAll();
@@ -366,17 +366,24 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 {
try {
_msgs.put(new MsgData((int)(msgId & 0xffffffff), size, proto, fromPort, toPort));
} catch (InterruptedException ie) {}
if (!_alive && _log.shouldLog(Log.WARN))
_log.warn(getPrefix() + "message available but notifier not running");
}
@Override
public void run() {
MsgData msg;
if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + "starting muxed availability notifier");
_msgs.clear();
_alive=true;
while (_alive) {
MsgData msg;
try {
msg = _msgs.take();
} catch (InterruptedException ie) {
_log.debug("I2PSessionMuxedImpl.run() InterruptedException " + String.valueOf(_msgs.size()) + " Messages, Alive " + _alive);
if (_log.shouldLog(Log.DEBUG))
_log.debug("I2PSessionMuxedImpl.run() InterruptedException " +
String.valueOf(_msgs.size()) + " Messages, Alive " + _alive);
continue;
}
if (msg.size == POISON_SIZE) {

View File

@@ -33,7 +33,7 @@ class MessagePayloadMessageHandler extends HandlerImpl {
public void handleMessage(I2CPMessage message, I2PSessionImpl session) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Handle message " + message);
_log.debug("Handle message " + message + " for session " + session);
try {
MessagePayloadMessage msg = (MessagePayloadMessage) message;
long id = msg.getMessageId();

View File

@@ -22,8 +22,11 @@ import net.i2p.data.Destination;
import net.i2p.data.Hash;
import net.i2p.data.PrivateKey;
import net.i2p.data.SigningPrivateKey;
import net.i2p.data.i2cp.CreateLeaseSetMessage;
import net.i2p.data.i2cp.CreateSessionMessage;
import net.i2p.data.i2cp.I2CPMessage;
import net.i2p.data.i2cp.SessionId;
import net.i2p.util.I2PAppThread;
/**
* An additional session using another session's connection.
@@ -105,7 +108,19 @@ class SubSession extends I2PSessionMuxedImpl {
*/
@Override
public void connect() throws I2PSessionException {
synchronized(_stateLock) {
if (_state != State.OPEN) {
_state = State.OPENING;
}
}
_primary.connect();
synchronized(_stateLock) {
if (_state != State.OPEN) {
Thread notifier = new I2PAppThread(_availabilityNotifier, "ClientNotifier " + getPrefix(), true);
notifier.start();
_state = State.OPEN;
}
}
}
/**
@@ -115,7 +130,7 @@ class SubSession extends I2PSessionMuxedImpl {
@Override
public boolean isClosed() {
// FIXME
return /* getSessionId() == null || */ _primary.isClosed();
return super.isClosed() || _primary.isClosed();
}
/**
@@ -126,7 +141,12 @@ class SubSession extends I2PSessionMuxedImpl {
*/
@Override
void sendMessage(I2CPMessage message) throws I2PSessionException {
if (isClosed())
// workaround for now, as primary will send out our CreateSession
// from his connect, while we are still closed.
// If we did it in connect() we wouldn't need this
if (isClosed() &&
message.getType() != CreateSessionMessage.MESSAGE_TYPE &&
message.getType() != CreateLeaseSetMessage.MESSAGE_TYPE)
throw new I2PSessionException("Already closed");
_primary.sendMessage(message);
}
@@ -153,6 +173,7 @@ class SubSession extends I2PSessionMuxedImpl {
if (_availabilityNotifier != null)
_availabilityNotifier.stopNotifying();
if (_sessionListener != null) _sessionListener.disconnected(this);
changeState(State.CLOSED);
}
/**