diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java index d637901f5..217ae341e 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java @@ -97,7 +97,7 @@ class NTCPConnection { //private final CoDelPriorityBlockingQueue _outbound; private final PriBlockingQueue _outbound; /** - * current prepared OutNetMessage, or null - synchronize on _outbound to modify + * current prepared OutNetMessage, or null - synchronize on _outbound to modify or read * FIXME why do we need this??? */ private OutNetMessage _currentOutbound; @@ -302,12 +302,21 @@ class NTCPConnection { public long getMessagesSent() { return _messagesWritten; } public long getMessagesReceived() { return _messagesRead; } - public long getOutboundQueueSize() { - int queued = _outbound.size(); - if (_currentOutbound != null) - queued++; + public long getOutboundQueueSize() { + int queued; + synchronized(_outbound) { + queued = _outbound.size(); + if (getCurrentOutbound() != null) + queued++; + } return queued; } + + private OutNetMessage getCurrentOutbound() { + synchronized(_outbound) { + return _currentOutbound; + } + } /** @return milliseconds */ public long getTimeSinceSend() { return System.currentTimeMillis()-_lastSendTime; } @@ -390,7 +399,7 @@ class NTCPConnection { _transport.afterSend(msg, false, allowRequeue, msg.getLifetime()); } - OutNetMessage msg = _currentOutbound; + OutNetMessage msg = getCurrentOutbound(); if (msg != null) { Object buf = msg.releasePreparationBuffer(); if (buf != null) @@ -434,7 +443,7 @@ class NTCPConnection { //int enqueued = _outbound.size(); // although stat description says ahead of this one, not including this one... //_context.statManager().addRateData("ntcp.sendQueueSize", enqueued); - boolean noOutbound = (_currentOutbound == null); + boolean noOutbound = (getCurrentOutbound() == null); //if (_log.shouldLog(Log.DEBUG)) _log.debug("messages enqueued on " + toString() + ": " + enqueued + " new one: " + msg.getMessageId() + " of " + msg.getMessageType()); if (isEstablished() && noOutbound) _transport.getWriter().wantsWrite(this, "enqueued"); @@ -468,7 +477,7 @@ class NTCPConnection { int size = _outbound.size(); if (_log.shouldLog(Log.WARN)) { int writeBufs = _writeBufs.size(); - boolean currentOutboundSet = _currentOutbound != null; + boolean currentOutboundSet = getCurrentOutbound() != null; try { _log.warn("Too backlogged: size is " + size + ", wantsWrite? " + (0 != (_conKey.interestOps()&SelectionKey.OP_WRITE)) @@ -1072,8 +1081,7 @@ class NTCPConnection { _log.info("I2NP meta message sent completely"); } - boolean msgs = ((!_outbound.isEmpty()) || (_currentOutbound != null)); - if (msgs) // push through the bw limiter to reach _writeBufs + if (getOutboundQueueSize() > 0) // push through the bw limiter to reach _writeBufs _transport.getWriter().wantsWrite(this, "write completed"); // this is not necessary, EventPumper.processWrite() handles this