* I2CP/I2PTunnel locking fixes (partial fixes for tickets 650. 815, 946, 947, 953):

- I2PSocketManagerFactory: New createDisconnectedManager(), javadocs
   - I2PSessionImpl: Rewrite state management and locking, prevent multiple
     connect() calls, but allow disconnect() to interrupt connect()
   - I2PSimpleSession: Changes to match I2PSessionImpl
   - I2PTunnelServer: Don't connect in constructor, use createDisconnectedManager()
     for a final manager, finals and cleanups
   Lightly tested.
   Todo: I2PTunnelClientBase
This commit is contained in:
zzz
2013-07-10 18:54:25 +00:00
parent 9a1e1a92ca
commit f3c4a26483
4 changed files with 312 additions and 200 deletions

View File

@@ -80,12 +80,10 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
protected I2CPMessageReader _reader;
/** writer message queue */
protected ClientWriterRunner _writer;
/** where we pipe our messages */
protected /* FIXME final FIXME */OutputStream _out;
/**
* Used for internal connections to the router.
* If this is set, _socket, _writer, and _out will be null.
* If this is set, _socket and _writer will be null.
* @since 0.8.3
*/
protected I2CPMessageQueue _queue;
@@ -111,22 +109,24 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
/** monitor for waiting until a lease set has been granted */
private final Object _leaseSetWait = new Object();
/** whether the session connection has already been closed (or not yet opened) */
protected volatile boolean _closed;
/**
* @since 0.9.8
*/
protected enum State {
OPENING,
OPEN,
CLOSING,
CLOSED
}
/** whether the session connection is in the process of being closed */
protected volatile boolean _closing;
protected State _state = State.CLOSED;
protected final Object _stateLock = new Object();
/** have we received the current date from the router yet? */
private volatile boolean _dateReceived;
/** lock that we wait upon, that the SetDateMessageHandler notifies */
private final Object _dateReceivedLock = new Object();
/** whether the session connection is in the process of being opened */
protected volatile boolean _opening;
/** monitor for waiting until opened */
private final Object _openingWait = new Object();
/**
* thread that we tell when new messages are available who then tells us
* to fetch them. The point of this is so that the fetch doesn't block the
@@ -168,7 +168,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
public static final int LISTEN_PORT = 7654;
private static final int BUF_SIZE = 32*1024;
/**
* for extension by SimpleSession (no dest)
*/
@@ -183,7 +183,6 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
private I2PSessionImpl(I2PAppContext context, Properties options, boolean hasDest) {
_context = context;
_log = context.logManager().getLog(getClass());
_closed = true;
if (options == null)
options = (Properties) System.getProperties().clone();
_options = loadConfig(options);
@@ -351,17 +350,13 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
return _leaseSet;
}
void setOpening(boolean ls) {
_opening = ls;
synchronized (_openingWait) {
_openingWait.notifyAll();
protected void changeState(State state) {
synchronized (_stateLock) {
_state = state;
_stateLock.notifyAll();
}
}
boolean getOpening() {
return _opening;
}
/**
* Load up the destKeyFile for our Destination, PrivateKey, and SigningPrivateKey
*
@@ -378,12 +373,39 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
* Connect to the router and establish a session. This call blocks until
* a session is granted.
*
* Should be threadsafe, other threads will block until complete.
* Disconnect / destroy from another thread may be called simultaneously and
* will (should?) interrupt the connect.
*
* @throws I2PSessionException if there is a configuration error or the router is
* not reachable
*/
public void connect() throws I2PSessionException {
setOpening(true);
_closed = false;
synchronized(_stateLock) {
boolean wasOpening = false;
boolean loop = true;
while (loop) {
switch (_state) {
case CLOSED:
if (wasOpening)
throw new I2PSessionException("connect by other thread failed");
loop = false;
break;
case OPENING:
wasOpening = true;
try {
_stateLock.wait();
} catch (InterruptedException ie) {}
break;
case CLOSING:
throw new I2PSessionException("close in progress");
case OPEN:
return;
}
}
changeState(State.OPENING);
}
_availabilityNotifier.stopNotifying();
if ( (_options != null) &&
@@ -392,29 +414,33 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
_log.error("I2CP guaranteed delivery mode has been removed, using best effort.");
}
boolean success = false;
long startConnect = _context.clock().now();
try {
// If we are in the router JVM, connect using the interal queue
if (_context.isRouterContext()) {
// _socket, _out, and _writer remain null
InternalClientManager mgr = _context.internalClientManager();
if (mgr == null)
throw new I2PSessionException("Router is not ready for connections");
// the following may throw an I2PSessionException
_queue = mgr.connect();
_reader = new QueuedI2CPMessageReader(_queue, this);
} else {
if (Boolean.parseBoolean(_options.getProperty(PROP_ENABLE_SSL)))
_socket = I2CPSSLSocketFactory.createSocket(_context, _hostname, _portNum);
else
_socket = new Socket(_hostname, _portNum);
// _socket.setSoTimeout(1000000); // Uhmmm we could really-really use a real timeout, and handle it.
_out = _socket.getOutputStream();
_out.write(I2PClient.PROTOCOL_BYTE);
_out.flush();
_writer = new ClientWriterRunner(_out, this);
InputStream in = new BufferedInputStream(_socket.getInputStream(), BUF_SIZE);
_reader = new I2CPMessageReader(in, this);
// protect w/ closeSocket()
synchronized(_stateLock) {
// If we are in the router JVM, connect using the interal queue
if (_context.isRouterContext()) {
// _socket and _writer remain null
InternalClientManager mgr = _context.internalClientManager();
if (mgr == null)
throw new I2PSessionException("Router is not ready for connections");
// the following may throw an I2PSessionException
_queue = mgr.connect();
_reader = new QueuedI2CPMessageReader(_queue, this);
} else {
if (Boolean.parseBoolean(_options.getProperty(PROP_ENABLE_SSL)))
_socket = I2CPSSLSocketFactory.createSocket(_context, _hostname, _portNum);
else
_socket = new Socket(_hostname, _portNum);
// _socket.setSoTimeout(1000000); // Uhmmm we could really-really use a real timeout, and handle it.
OutputStream out = _socket.getOutputStream();
out.write(I2PClient.PROTOCOL_BYTE);
out.flush();
_writer = new ClientWriterRunner(out, this);
InputStream in = new BufferedInputStream(_socket.getInputStream(), BUF_SIZE);
_reader = new I2CPMessageReader(in, this);
}
}
Thread notifier = new I2PAppThread(_availabilityNotifier, "ClientNotifier " + getPrefix(), true);
notifier.start();
@@ -466,15 +492,13 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
+ "ms - ready to participate in the network!");
startIdleMonitor();
startVerifyUsage();
setOpening(false);
success = true;
} catch (UnknownHostException uhe) {
_closed = true;
setOpening(false);
throw new I2PSessionException(getPrefix() + "Cannot connect to the router on " + _hostname + ':' + _portNum, uhe);
} catch (IOException ioe) {
_closed = true;
setOpening(false);
throw new I2PSessionException(getPrefix() + "Cannot connect to the router on " + _hostname + ':' + _portNum, ioe);
} finally {
changeState(success ? State.OPEN : State.CLOSED);
}
}
@@ -570,8 +594,8 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
* Needs work.
*/
protected class AvailabilityNotifier implements Runnable {
private final List _pendingIds;
private final List _pendingSizes;
private final List<Long> _pendingIds;
private final List<Integer> _pendingSizes;
private volatile boolean _alive;
public AvailabilityNotifier() {
@@ -606,8 +630,8 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
}
}
if (!_pendingIds.isEmpty()) {
msgId = (Long)_pendingIds.remove(0);
size = (Integer)_pendingSizes.remove(0);
msgId = _pendingIds.remove(0);
size = _pendingSizes.remove(0);
}
}
if ( (msgId != null) && (size != null) ) {
@@ -695,8 +719,11 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
/** configure the listener */
public void setSessionListener(I2PSessionListener lsnr) { _sessionListener = lsnr; }
/** has the session been closed (or not yet connected)? */
public boolean isClosed() { return _closed; }
/**
* Has the session been closed (or not yet connected)?
* False when open and during transitions. Unsynchronized.
*/
public boolean isClosed() { return _state == State.CLOSED; }
/**
* Deliver an I2CP message to the router
@@ -756,21 +783,16 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
/**
* Tear down the session, and do NOT reconnect.
*
* Blocks if session has not been fully started.
* Will interrupt an open in progress.
*/
public void destroySession(boolean sendDisconnect) {
while (_opening) {
synchronized (_openingWait) {
try {
_openingWait.wait(1000);
} catch (InterruptedException ie) { // nop
}
}
synchronized(_stateLock) {
if (_state == State.CLOSING || _state == State.CLOSED)
return;
changeState(State.CLOSING);
}
if (_closed) return;
if (_log.shouldLog(Log.INFO)) _log.info(getPrefix() + "Destroy the session", new Exception("DestroySession()"));
_closing = true; // we use this to prevent a race
if (sendDisconnect && _producer != null) { // only null if overridden by I2PSimpleSession
try {
_producer.disconnect(this);
@@ -783,19 +805,27 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
// SimpleSession does not initialize
if (_availabilityNotifier != null)
_availabilityNotifier.stopNotifying();
_closed = true;
_closing = false;
closeSocket();
if (_sessionListener != null) _sessionListener.disconnected(this);
}
/**
* Close the socket carefully
*
* Close the socket carefully.
*/
private void closeSocket() {
synchronized(_stateLock) {
changeState(State.CLOSING);
locked_closeSocket();
changeState(State.CLOSED);
}
}
/**
* Close the socket carefully.
* Caller must change state.
*/
private void locked_closeSocket() {
if (_log.shouldLog(Log.INFO)) _log.info(getPrefix() + "Closing the socket", new Exception("closeSocket"));
_closed = true;
if (_reader != null) {
_reader.stopReading();
_reader = null;
@@ -830,8 +860,15 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
disconnect();
}
/**
* Will interrupt a connect in progress.
*/
protected void disconnect() {
if (_closed || _closing) return;
synchronized(_stateLock) {
if (_state == State.CLOSING || _state == State.CLOSED)
return;
changeState(State.CLOSING);
}
if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Disconnect() called", new Exception("Disconnect"));
if (shouldReconnect()) {
if (reconnect()) {
@@ -842,11 +879,11 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
}
if (_log.shouldLog(Log.ERROR))
_log.error(getPrefix() + "Disconned from the router, and not trying to reconnect further. I hope you're not hoping anything else will happen");
_log.error(getPrefix() + "Disconned from the router, and not trying to reconnect");
if (_sessionListener != null) _sessionListener.disconnected(this);
_closed = true;
closeSocket();
changeState(State.CLOSED);
}
private final static int MAX_RECONNECT_DELAY = 320*1000;
@@ -970,7 +1007,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
if (rv != null)
return rv;
}
if (_closed)
if (isClosed())
return null;
LookupWaiter waiter = new LookupWaiter(h);
_pendingLookups.offer(waiter);
@@ -996,7 +1033,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
* @return null on failure
*/
public int[] bandwidthLimits() throws I2PSessionException {
if (_closed)
if (isClosed())
return null;
sendMessage(new GetBandwidthLimitsMessage());
try {

View File

@@ -8,6 +8,7 @@ package net.i2p.client;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Properties;
@@ -45,45 +46,50 @@ class I2PSimpleSession extends I2PSessionImpl2 {
* Connect to the router and establish a session. This call blocks until
* a session is granted.
*
* NOT threadsafe, do not call from multiple threads.
*
* @throws I2PSessionException if there is a configuration error or the router is
* not reachable
*/
@Override
public void connect() throws I2PSessionException {
_closed = false;
changeState(State.OPENING);
boolean success = false;
try {
// If we are in the router JVM, connect using the interal queue
if (_context.isRouterContext()) {
// _socket, _out, and _writer remain null
InternalClientManager mgr = _context.internalClientManager();
if (mgr == null)
throw new I2PSessionException("Router is not ready for connections");
// the following may throw an I2PSessionException
_queue = mgr.connect();
_reader = new QueuedI2CPMessageReader(_queue, this);
} else {
if (Boolean.parseBoolean(getOptions().getProperty(PROP_ENABLE_SSL)))
_socket = I2CPSSLSocketFactory.createSocket(_context, _hostname, _portNum);
else
_socket = new Socket(_hostname, _portNum);
_out = _socket.getOutputStream();
_out.write(I2PClient.PROTOCOL_BYTE);
_out.flush();
_writer = new ClientWriterRunner(_out, this);
InputStream in = new BufferedInputStream(_socket.getInputStream(), BUF_SIZE);
_reader = new I2CPMessageReader(in, this);
// protect w/ closeSocket()
synchronized(_stateLock) {
// If we are in the router JVM, connect using the interal queue
if (_context.isRouterContext()) {
// _socket and _writer remain null
InternalClientManager mgr = _context.internalClientManager();
if (mgr == null)
throw new I2PSessionException("Router is not ready for connections");
// the following may throw an I2PSessionException
_queue = mgr.connect();
_reader = new QueuedI2CPMessageReader(_queue, this);
} else {
if (Boolean.parseBoolean(getOptions().getProperty(PROP_ENABLE_SSL)))
_socket = I2CPSSLSocketFactory.createSocket(_context, _hostname, _portNum);
else
_socket = new Socket(_hostname, _portNum);
OutputStream out = _socket.getOutputStream();
out.write(I2PClient.PROTOCOL_BYTE);
out.flush();
_writer = new ClientWriterRunner(out, this);
InputStream in = new BufferedInputStream(_socket.getInputStream(), BUF_SIZE);
_reader = new I2CPMessageReader(in, this);
}
}
// we do not receive payload messages, so we do not need an AvailabilityNotifier
// ... or an Idle timer, or a VerifyUsage
_reader.startReading();
success = true;
} catch (UnknownHostException uhe) {
_closed = true;
throw new I2PSessionException(getPrefix() + "Cannot connect to the router on " + _hostname + ':' + _portNum, uhe);
} catch (IOException ioe) {
_closed = true;
throw new I2PSessionException(getPrefix() + "Cannot connect to the router on " + _hostname + ':' + _portNum, ioe);
} finally {
changeState(success ? State.OPEN : State.CLOSED);
}
}