From e4ee5e301633ff39df346913dd2fae6b962d2531 Mon Sep 17 00:00:00 2001 From: zzz Date: Wed, 16 Nov 2011 01:00:08 +0000 Subject: [PATCH] * NTCP: Reduce log level for race (ticket #392) * NTCPConnection: Concurrent PrepBufs * OutNetMessage: Remove some fields and methods used only in NTCP debugging --- history.txt | 7 ++ .../src/net/i2p/router/OutNetMessage.java | 22 +--- .../i2p/router/transport/TransportImpl.java | 5 +- .../router/transport/ntcp/NTCPConnection.java | 119 +++++++----------- .../router/transport/ntcp/NTCPTransport.java | 2 - 5 files changed, 63 insertions(+), 92 deletions(-) diff --git a/history.txt b/history.txt index 5377be76b..4390b3ba2 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,10 @@ +2011-11-16 zzz + * Console: Add Jetty version to logs page + * NTCP: Reduce log level for race (ticket #392) + * NTCPConnection: Concurrent PrepBufs + * OutNetMessage: Remove some fields and methods used only in NTCP debugging + * Router: Move router.ping file from temp directory to config directory + 2011-11-14 zzz * Console: Remove % chart at bottom of tunnels.jsp * Profiles: Only use same-country metric for countries with diff --git a/router/java/src/net/i2p/router/OutNetMessage.java b/router/java/src/net/i2p/router/OutNetMessage.java index 749baeb67..07ed4fbc4 100644 --- a/router/java/src/net/i2p/router/OutNetMessage.java +++ b/router/java/src/net/i2p/router/OutNetMessage.java @@ -48,7 +48,6 @@ public class OutNetMessage { private MessageSelector _replySelector; private Set _failedTransports; private long _sendBegin; - private long _transmitBegin; //private Exception _createdBy; private final long _created; /** for debugging, contains a mapping of even name to Long (e.g. "begin sending", "handleOutbound", etc) */ @@ -58,9 +57,6 @@ public class OutNetMessage { * (some JVMs have less than 10ms resolution, so the Long above doesn't guarantee order) */ private List _timestampOrder; - private int _queueSize; - private long _prepareBegin; - private long _prepareEnd; private Object _preparationBuf; public OutNetMessage(RouterContext context) { @@ -247,14 +243,13 @@ public class OutNetMessage { /** when did the sending process begin */ public long getSendBegin() { return _sendBegin; } + public void beginSend() { _sendBegin = _context.clock().now(); } - public void beginTransmission() { _transmitBegin = _context.clock().now(); } - public void beginPrepare() { _prepareBegin = _context.clock().now(); } - public void prepared() { prepared(null); } + public void prepared(Object buf) { - _prepareEnd = _context.clock().now(); _preparationBuf = buf; } + public Object releasePreparationBuffer() { Object rv = _preparationBuf; _preparationBuf = null; @@ -262,18 +257,13 @@ public class OutNetMessage { } public long getCreated() { return _created; } + /** time since the message was created */ public long getLifetime() { return _context.clock().now() - _created; } + /** time the transport tries to send the message (including any queueing) */ public long getSendTime() { return _context.clock().now() - _sendBegin; } - /** time during which the i2np message is actually in flight */ - public long getTransmissionTime() { return _context.clock().now() - _transmitBegin; } - /** how long it took to prepare the i2np message for transmission (including serialization and transport layer encryption) */ - public long getPreparationTime() { return _prepareEnd - _prepareBegin; } - /** number of messages ahead of this one going to the targetted peer when it is first enqueued */ - public int getQueueSize() { return _queueSize; } - public void setQueueSize(int size) { _queueSize = size; } - + /** * We've done what we need to do with the data from this message, though * we may keep the object around for a while to use its ID, jobs, etc. diff --git a/router/java/src/net/i2p/router/transport/TransportImpl.java b/router/java/src/net/i2p/router/transport/TransportImpl.java index 9d63dfe07..1ee7a2573 100644 --- a/router/java/src/net/i2p/router/transport/TransportImpl.java +++ b/router/java/src/net/i2p/router/transport/TransportImpl.java @@ -213,8 +213,7 @@ public abstract class TransportImpl implements Transport { if (_log.shouldLog(Log.WARN)) _log.warn("afterSend slow: [success=" + sendSuccessful + "] " + msg.getMessageSize() + " byte " + msg.getMessageType() + " " + msg.getMessageId() + " to " - + msg.getTarget().getIdentity().calculateHash().toBase64().substring(0,6) + " took " + msToSend - + "/" + msg.getTransmissionTime()); + + msg.getTarget().getIdentity().calculateHash().toBase64().substring(0,6) + " took " + msToSend); } //if (true) // _log.error("(not error) I2NP message sent? " + sendSuccessful + " " + msg.getMessageId() + " after " + msToSend + "/" + msg.getTransmissionTime()); @@ -225,7 +224,7 @@ public abstract class TransportImpl implements Transport { if (!sendSuccessful) level = Log.INFO; if (_log.shouldLog(level)) - _log.log(level, "afterSend slow (" + lifetime + "/" + msToSend + "/" + msg.getTransmissionTime() + "): [success=" + sendSuccessful + "] " + msg.getMessageSize() + " byte " + _log.log(level, "afterSend slow (" + lifetime + "/" + msToSend + "): [success=" + sendSuccessful + "] " + msg.getMessageSize() + " byte " + msg.getMessageType() + " " + msg.getMessageId() + " from " + _context.routerHash().toBase64().substring(0,6) + " to " + msg.getTarget().getIdentity().calculateHash().toBase64().substring(0,6) + ": " + msg.toString()); } else { diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java index e4d2d694c..673663761 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java @@ -70,7 +70,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { private boolean _established; private long _establishedOn; private EstablishState _establishState; - private NTCPTransport _transport; + private final NTCPTransport _transport; private final boolean _isInbound; private boolean _closed; private NTCPAddress _remAddr; @@ -80,7 +80,10 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { * pending unprepared OutNetMessage instances */ private final LinkedBlockingQueue _outbound; - /** current prepared OutNetMessage, or null - synchronize on _outbound to modify */ + /** + * current prepared OutNetMessage, or null - synchronize on _outbound to modify + * FIXME why do we need this??? + */ private OutNetMessage _currentOutbound; private SessionKey _sessionKey; /** encrypted block of the current I2NP message being read */ @@ -290,12 +293,13 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { } _consecutiveBacklog = 0; int enqueued = 0; - if (FAST_LARGE) + //if (FAST_LARGE) bufferedPrepare(msg); boolean noOutbound = false; _outbound.offer(msg); enqueued = _outbound.size(); - msg.setQueueSize(enqueued); + // although stat description says ahead of this one, not including this one... + _context.statManager().addRateData("ntcp.sendQueueSize", enqueued); noOutbound = (_currentOutbound == null); if (_log.shouldLog(Log.DEBUG)) _log.debug("messages enqueued on " + toString() + ": " + enqueued + " new one: " + msg.getMessageId() + " of " + msg.getMessageType()); if (_established && noOutbound) @@ -437,6 +441,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { _transport.getWriter().wantsWrite(this, "outbound established"); } + /** // Time vs space tradeoff: // on slow GCing jvms, the mallocs in the following preparation can cause the // write to get congested, taking up a substantial portion of the Writer's @@ -450,6 +455,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { // just the currently transmitting one. // // hmm. + */ private static final boolean FAST_LARGE = true; // otherwise, SLOW_SMALL /** @@ -605,6 +611,11 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { if (msg == null) return; } else { + // FIXME + // This is a linear search to implement a priority queue, O(n**2) + // Also race with unsynchronized removal in close() above + // Either implement a real (concurrent?) priority queue or just comment out all of this, + // as it isn't clear how effective the priorities on a per-connection basis are. int slot = 0; // only for logging Iterator it = _outbound.iterator(); for (int i = 0; it.hasNext() && i < 75; i++) { //arbitrary bound @@ -627,11 +638,15 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { _currentOutbound = msg; } - msg.beginTransmission(); //long begin = System.currentTimeMillis(); PrepBuffer buf = (PrepBuffer)msg.releasePreparationBuffer(); - if (buf == null) - throw new RuntimeException("buf is null for " + msg); + if (buf == null) { + // race, see ticket #392 + //throw new RuntimeException("buf is null for " + msg); + if (_log.shouldLog(Log.WARN)) + _log.warn("Null prep buf for " + msg); + return; + } _context.aes().encrypt(buf.unencrypted, 0, buf.encrypted, 0, _sessionKey, _prevWriteEnd, 0, buf.unencryptedLength); System.arraycopy(buf.encrypted, buf.encrypted.length-16, _prevWriteEnd, 0, _prevWriteEnd.length); //long encryptedTime = System.currentTimeMillis(); @@ -669,7 +684,6 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { private void bufferedPrepare(OutNetMessage msg) { //if (!_isInbound && !_established) // return; - msg.beginPrepare(); //long begin = System.currentTimeMillis(); PrepBuffer buf = acquireBuf(); //long alloc = System.currentTimeMillis(); @@ -710,66 +724,34 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { // + " serialize=" + (serialized-alloc) + " crc=" + (crced-serialized)); } - private static final int MIN_PREP_BUFS = 5; - private static int NUM_PREP_BUFS = 5; - private static int __liveBufs = 0; - private static int __consecutiveExtra; - private final static List _bufs = new ArrayList(NUM_PREP_BUFS); - private PrepBuffer acquireBuf() { - synchronized (_bufs) { - if (!_bufs.isEmpty()) { - PrepBuffer b = (PrepBuffer)_bufs.remove(0); - b.acquired(); - return b; - } - } - PrepBuffer b = new PrepBuffer(); - b.init(); - NUM_PREP_BUFS = ++__liveBufs; - if (_log.shouldLog(Log.DEBUG)) - _log.debug("creating a new prep buffer with " + __liveBufs + " live"); - _context.statManager().addRateData("ntcp.prepBufCache", NUM_PREP_BUFS, 0); - b.acquired(); + private static int NUM_PREP_BUFS = 6; + + private final static LinkedBlockingQueue _bufs = new LinkedBlockingQueue(NUM_PREP_BUFS); + + /** + * @return initialized buffer + */ + private static PrepBuffer acquireBuf() { + PrepBuffer b = _bufs.poll(); + if (b == null) + b = new PrepBuffer(); return b; } - private void releaseBuf(PrepBuffer buf) { + + private static void releaseBuf(PrepBuffer buf) { buf.init(); - long lifetime = buf.lifetime(); - int extra = 0; - boolean cached = false; - synchronized (_bufs) { - if (_bufs.size() < NUM_PREP_BUFS) { - extra = _bufs.size(); - _bufs.add(buf); - cached = true; - if (extra > 5) { - __consecutiveExtra++; - if (__consecutiveExtra >= 20) { - NUM_PREP_BUFS = Math.max(NUM_PREP_BUFS - 1, MIN_PREP_BUFS); - __consecutiveExtra = 0; - } - } - } else { - buf.unencrypted = null; - buf.base = null; - buf.pad = null; - buf.crc = null; - --__liveBufs; - } - } - if (cached && _log.shouldLog(Log.DEBUG)) - _log.debug("releasing cached buffer with " + __liveBufs + " live after " + lifetime); + _bufs.offer(buf); } + private static class PrepBuffer { - byte unencrypted[]; + final byte unencrypted[]; int unencryptedLength; - byte base[]; + final byte base[]; int baseLength; - byte pad[]; + final byte pad[]; int padLength; - Adler32 crc; + final Adler32 crc; byte encrypted[]; - private long acquiredOn; PrepBuffer() { unencrypted = new byte[BUFFER_SIZE]; @@ -777,6 +759,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { pad = new byte[16]; crc = new Adler32(); } + private void init() { unencryptedLength = 0; baseLength = 0; @@ -784,8 +767,6 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { encrypted = null; crc.reset(); } - public void acquired() { acquiredOn = System.currentTimeMillis(); } - public long lifetime() { return System.currentTimeMillis()-acquiredOn; } } /** @@ -906,13 +887,10 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { if (msg != null) { _lastSendTime = System.currentTimeMillis(); _context.statManager().addRateData("ntcp.sendTime", msg.getSendTime(), msg.getSendTime()); - _context.statManager().addRateData("ntcp.transmitTime", msg.getTransmissionTime(), msg.getTransmissionTime()); - _context.statManager().addRateData("ntcp.sendQueueSize", msg.getQueueSize(), msg.getLifetime()); if (_log.shouldLog(Log.INFO)) { _log.info("I2NP message " + _messagesWritten + "/" + msg.getMessageId() + " sent after " - + msg.getSendTime() + "/" + msg.getTransmissionTime() + "/" - + msg.getPreparationTime() + "/" + msg.getLifetime() - + " queued after " + msg.getQueueSize() + + msg.getSendTime() + "/" + + msg.getLifetime() + " with " + buf.capacity() + " bytes (uid=" + System.identityHashCode(msg)+" on " + toString() + ")"); } _messagesWritten++; @@ -1112,8 +1090,9 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { public long getReadTime() { return _curReadState.getReadTime(); } private static class DataBuf { - byte data[]; - ByteArrayInputStream bais; + final byte data[]; + final ByteArrayInputStream bais; + public DataBuf() { data = new byte[BUFFER_SIZE]; bais = new ByteArrayInputStream(data); @@ -1139,9 +1118,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { static void releaseResources() { _i2npHandlers.clear(); _dataReadBufs.clear(); - synchronized(_bufs) { - _bufs.clear(); - } + _bufs.clear(); } /** diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java index 8d943fd07..8ad1ea8c3 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java @@ -73,7 +73,6 @@ public class NTCPTransport extends TransportImpl { _log = ctx.logManager().getLog(getClass()); _context.statManager().createRateStat("ntcp.sendTime", "Total message lifetime when sent completely", "ntcp", RATES); - _context.statManager().createRateStat("ntcp.transmitTime", "How long after message preparation before the message was fully sent", "ntcp", RATES); _context.statManager().createRateStat("ntcp.sendQueueSize", "How many messages were ahead of the current one on the connection's queue when it was first added", "ntcp", RATES); _context.statManager().createRateStat("ntcp.receiveTime", "How long it takes to receive an inbound message", "ntcp", RATES); _context.statManager().createRateStat("ntcp.receiveSize", "How large the received message was", "ntcp", RATES); @@ -122,7 +121,6 @@ public class NTCPTransport extends TransportImpl { _context.statManager().createRateStat("ntcp.outboundFailedIOEImmediate", "", "ntcp", RATES); _context.statManager().createRateStat("ntcp.invalidOutboundSkew", "", "ntcp", RATES); _context.statManager().createRateStat("ntcp.noBidTooLargeI2NP", "send size", "ntcp", RATES); - _context.statManager().createRateStat("ntcp.prepBufCache", "", "ntcp", RATES); _context.statManager().createRateStat("ntcp.queuedRecv", "", "ntcp", RATES); _context.statManager().createRateStat("ntcp.read", "", "ntcp", RATES); _context.statManager().createRateStat("ntcp.readEOF", "", "ntcp", RATES);