forked from I2P_Developers/i2p.i2p
Sync fix for NTCPConnection._currentOutbound
This commit is contained in:
@@ -97,7 +97,7 @@ class NTCPConnection {
|
||||
//private final CoDelPriorityBlockingQueue<OutNetMessage> _outbound;
|
||||
private final PriBlockingQueue<OutNetMessage> _outbound;
|
||||
/**
|
||||
* current prepared OutNetMessage, or null - synchronize on _outbound to modify
|
||||
* current prepared OutNetMessage, or null - synchronize on _outbound to modify or read
|
||||
* FIXME why do we need this???
|
||||
*/
|
||||
private OutNetMessage _currentOutbound;
|
||||
@@ -302,12 +302,21 @@ class NTCPConnection {
|
||||
|
||||
public long getMessagesSent() { return _messagesWritten; }
|
||||
public long getMessagesReceived() { return _messagesRead; }
|
||||
public long getOutboundQueueSize() {
|
||||
int queued = _outbound.size();
|
||||
if (_currentOutbound != null)
|
||||
queued++;
|
||||
public long getOutboundQueueSize() {
|
||||
int queued;
|
||||
synchronized(_outbound) {
|
||||
queued = _outbound.size();
|
||||
if (getCurrentOutbound() != null)
|
||||
queued++;
|
||||
}
|
||||
return queued;
|
||||
}
|
||||
|
||||
private OutNetMessage getCurrentOutbound() {
|
||||
synchronized(_outbound) {
|
||||
return _currentOutbound;
|
||||
}
|
||||
}
|
||||
|
||||
/** @return milliseconds */
|
||||
public long getTimeSinceSend() { return System.currentTimeMillis()-_lastSendTime; }
|
||||
@@ -390,7 +399,7 @@ class NTCPConnection {
|
||||
_transport.afterSend(msg, false, allowRequeue, msg.getLifetime());
|
||||
}
|
||||
|
||||
OutNetMessage msg = _currentOutbound;
|
||||
OutNetMessage msg = getCurrentOutbound();
|
||||
if (msg != null) {
|
||||
Object buf = msg.releasePreparationBuffer();
|
||||
if (buf != null)
|
||||
@@ -434,7 +443,7 @@ class NTCPConnection {
|
||||
//int enqueued = _outbound.size();
|
||||
// although stat description says ahead of this one, not including this one...
|
||||
//_context.statManager().addRateData("ntcp.sendQueueSize", enqueued);
|
||||
boolean noOutbound = (_currentOutbound == null);
|
||||
boolean noOutbound = (getCurrentOutbound() == null);
|
||||
//if (_log.shouldLog(Log.DEBUG)) _log.debug("messages enqueued on " + toString() + ": " + enqueued + " new one: " + msg.getMessageId() + " of " + msg.getMessageType());
|
||||
if (isEstablished() && noOutbound)
|
||||
_transport.getWriter().wantsWrite(this, "enqueued");
|
||||
@@ -468,7 +477,7 @@ class NTCPConnection {
|
||||
int size = _outbound.size();
|
||||
if (_log.shouldLog(Log.WARN)) {
|
||||
int writeBufs = _writeBufs.size();
|
||||
boolean currentOutboundSet = _currentOutbound != null;
|
||||
boolean currentOutboundSet = getCurrentOutbound() != null;
|
||||
try {
|
||||
_log.warn("Too backlogged: size is " + size
|
||||
+ ", wantsWrite? " + (0 != (_conKey.interestOps()&SelectionKey.OP_WRITE))
|
||||
@@ -1072,8 +1081,7 @@ class NTCPConnection {
|
||||
_log.info("I2NP meta message sent completely");
|
||||
}
|
||||
|
||||
boolean msgs = ((!_outbound.isEmpty()) || (_currentOutbound != null));
|
||||
if (msgs) // push through the bw limiter to reach _writeBufs
|
||||
if (getOutboundQueueSize() > 0) // push through the bw limiter to reach _writeBufs
|
||||
_transport.getWriter().wantsWrite(this, "write completed");
|
||||
|
||||
// this is not necessary, EventPumper.processWrite() handles this
|
||||
|
Reference in New Issue
Block a user