Rework locking and state management of NTCP connections

trac ticket #972
	up version to -4
This commit is contained in:
zab2
2013-07-20 17:37:46 +00:00
parent 02b92ac3fe
commit 97c1676bcb
6 changed files with 44 additions and 42 deletions

View File

@@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 3;
public final static long BUILD = 4;
/** for example "-test" */
public final static String EXTRA = "";

View File

@@ -59,6 +59,9 @@ import net.i2p.util.Log;
*
*/
class EstablishState {
public static final VerifiedEstablishState VERIFIED = new VerifiedEstablishState();
private final RouterContext _context;
private final Log _log;
@@ -107,6 +110,17 @@ class EstablishState {
private boolean _verified;
private boolean _confirmWritten;
private boolean _failedBySkew;
private EstablishState() {
_context = null;
_log = null;
_X = null;
_hX_xor_bobIdentHash = null;
_curDecrypted = null;
_dh = null;
_transport = null;
_con = null;
}
public EstablishState(RouterContext ctx, NTCPTransport transport, NTCPConnection con) {
_context = ctx;
@@ -773,6 +787,10 @@ class EstablishState {
return false;
}
}
private static class VerifiedEstablishState extends EstablishState {
@Override public boolean isComplete() { return true; }
}
/** @deprecated unused */
/*********

View File

@@ -749,8 +749,10 @@ class EventPumper implements Runnable {
if (!_wantsWrite.isEmpty()) {
for (Iterator<NTCPConnection> iter = _wantsWrite.iterator(); iter.hasNext(); ) {
con = iter.next();
iter.remove();
SelectionKey key = con.getKey();
if (key == null)
continue;
iter.remove();
try {
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
} catch (CancelledKeyException cke) {

View File

@@ -82,7 +82,6 @@ class NTCPConnection {
/** Requests that were not granted immediately */
private final Set<FIFOBandwidthLimiter.Request> _bwInRequests;
private final Set<FIFOBandwidthLimiter.Request> _bwOutRequests;
private boolean _established;
private long _establishedOn;
private EstablishState _establishState;
private final NTCPTransport _transport;
@@ -209,6 +208,7 @@ class NTCPConnection {
_inboundListener = new InboundListener();
_outboundListener = new OutboundListener();
initialize();
_establishState = new EstablishState(ctx, transport, this);
}
private void initialize() {
@@ -232,7 +232,7 @@ class NTCPConnection {
public void setChannel(SocketChannel chan) { _chan = chan; }
public void setKey(SelectionKey key) { _conKey = key; }
public boolean isInbound() { return _isInbound; }
public boolean isEstablished() { return _established; }
public synchronized boolean isEstablished() { return _establishState.isComplete(); }
/**
* @since IPv6
@@ -269,12 +269,11 @@ class NTCPConnection {
System.arraycopy(prevReadEnd, prevReadEnd.length - BLOCK_SIZE, _prevReadBlock, 0, BLOCK_SIZE);
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Inbound established, prevWriteEnd: " + Base64.encode(prevWriteEnd) + " prevReadEnd: " + Base64.encode(prevReadEnd));
_established = true;
_establishedOn = System.currentTimeMillis();
_transport.inboundEstablished(this);
_establishState = null;
_nextMetaTime = System.currentTimeMillis() + (META_FREQUENCY / 2) + _context.random().nextInt(META_FREQUENCY);
_nextInfoTime = System.currentTimeMillis() + (INFO_FREQUENCY / 2) + _context.random().nextInt(INFO_FREQUENCY);
_establishState = EstablishState.VERIFIED;
}
/** @return seconds */
@@ -282,7 +281,7 @@ class NTCPConnection {
/** @return milliseconds */
public long getUptime() {
if (!_established)
if (!isEstablished())
return getTimeSinceCreated();
else
return System.currentTimeMillis()-_establishedOn;
@@ -333,7 +332,7 @@ class NTCPConnection {
_closed = true;
if (_chan != null) try { _chan.close(); } catch (IOException ioe) { }
if (_conKey != null) _conKey.cancel();
_establishState = null;
_establishState = EstablishState.VERIFIED;
_transport.removeCon(this);
_transport.getReader().connectionClosed(this);
_transport.getWriter().connectionClosed(this);
@@ -409,7 +408,7 @@ class NTCPConnection {
//_context.statManager().addRateData("ntcp.sendQueueSize", enqueued);
boolean noOutbound = (_currentOutbound == null);
//if (_log.shouldLog(Log.DEBUG)) _log.debug("messages enqueued on " + toString() + ": " + enqueued + " new one: " + msg.getMessageId() + " of " + msg.getMessageType());
if (_established && noOutbound)
if (isEstablished() && noOutbound)
_transport.getWriter().wantsWrite(this, "enqueued");
}
@@ -541,9 +540,8 @@ class NTCPConnection {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Outbound established, prevWriteEnd: " + Base64.encode(prevWriteEnd) + " prevReadEnd: " + Base64.encode(prevReadEnd));
_established = true;
_establishedOn = System.currentTimeMillis();
_establishState = null;
_establishState = EstablishState.VERIFIED;
_transport.markReachable(getRemotePeer().calculateHash(), false);
//_context.banlist().unbanlistRouter(getRemotePeer().calculateHash(), NTCPTransport.STYLE);
boolean msgs = !_outbound.isEmpty();
@@ -693,15 +691,7 @@ class NTCPConnection {
return;
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("prepare next write w/ isInbound? " + _isInbound + " established? " + _established);
if (!_isInbound && !_established) {
if (_establishState == null) {
// shouldn't happen
_establishState = new EstablishState(_context, _transport, this);
_establishState.prepareOutbound();
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("prepare next write, but we have already prepared the first outbound and we are not yet established..." + toString());
}
if (!_isInbound && !isEstablished()) {
return;
}
@@ -1535,7 +1525,7 @@ class NTCPConnection {
return "NTCP conn " +
(_isInbound ? "from " : "to ") +
(_remotePeer == null ? "unknown" : _remotePeer.calculateHash().toBase64().substring(0,6)) +
(_established ? "" : " not established") +
(isEstablished() ? "" : " not established") +
" created " + DataHelper.formatDuration(getTimeSinceCreated()) + " ago";
}
}

View File

@@ -231,6 +231,7 @@ public class NTCPTransport extends TransportImpl {
con.setChannel(channel);
channel.configureBlocking(false);
_pumper.registerConnect(con);
con.getEstablishState().prepareOutbound();
} catch (IOException ioe) {
if (_log.shouldLog(Log.ERROR))
_log.error("Error opening a channel", ioe);

View File

@@ -132,30 +132,24 @@ class Reader {
* Return read buffers back to the pool as we process them.
*/
private void processRead(NTCPConnection con) {
if (con.isClosed())
return;
ByteBuffer buf = null;
while (!con.isClosed() && !con.isEstablished() && ( (buf = con.getNextReadBuf()) != null) ) {
while(true) {
synchronized(con) {
if (con.isClosed())
return;
if (con.isEstablished())
break;
}
if ((buf = con.getNextReadBuf()) == null)
return;
EstablishState est = con.getEstablishState();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Processing read buffer as an establishment for " + con + " with [" + est + "]");
if (est == null) {
EventPumper.releaseBuf(buf);
if (!con.isEstablished()) {
// establish state is only removed when the connection is fully established,
// yet if that happens, con.isEstablished() should return true...
throw new RuntimeException("connection was not established, yet the establish state is null for " + con);
} else {
// hmm, there shouldn't be a race here - only one reader should
// be running on a con at a time...
_log.error("no establishment state but " + con + " is established... race?");
break;
}
}
if (est.isComplete()) {
// why is it complete yet !con.isEstablished?
_log.error("establishment state [" + est + "] is complete, yet the connection isn't established? "
+ con.isEstablished() + " (inbound? " + con.isInbound() + " " + con + ")");
_log.error("establishment state [" + est + "] is complete, yet the connection isn't established? "
+ con.isEstablished() + " (inbound? " + con.isInbound() + " " + con + ")");
EventPumper.releaseBuf(buf);
break;
}
@@ -172,9 +166,6 @@ class Reader {
if (est.isComplete() && est.getExtraBytes() != null)
con.recvEncryptedI2NP(ByteBuffer.wrap(est.getExtraBytes()));
}
// catch race?
if (!con.isEstablished())
return;
while (!con.isClosed() && (buf = con.getNextReadBuf()) != null) {
// decrypt the data and push it into an i2np message
if (_log.shouldLog(Log.DEBUG))