Make activeResends field atomic

This commit is contained in:
zab2
2013-07-09 13:55:09 +00:00
parent b486ae5c26
commit 67859f67b0
2 changed files with 10 additions and 9 deletions

View File

@@ -6,6 +6,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import net.i2p.I2PAppContext;
@@ -69,7 +70,7 @@ class Connection {
/** Notify this on connection (or connection failure) */
private final Object _connectLock;
/** how many messages have been resent and not yet ACKed? */
private int _activeResends;
private final AtomicInteger _activeResends = new AtomicInteger(0);
private final ConEvent _connectionEvent;
private final int _randomWait;
private int _localPort;
@@ -197,7 +198,7 @@ class Connection {
int unacked = _outboundPackets.size();
int wsz = _options.getWindowSize();
if (unacked >= wsz ||
_activeResends >= (wsz + 1) / 2 ||
_activeResends.get() >= (wsz + 1) / 2 ||
_lastSendId.get() - _highestAckedThrough >= Math.max(MAX_WINDOW_SIZE, 2 * wsz)) {
if (timeoutMs > 0) {
if (timeLeft <= 0) {
@@ -489,16 +490,16 @@ class Connection {
_outboundPackets.remove(Long.valueOf(p.getSequenceNum()));
_ackedPackets++;
if (p.getNumSends() > 1) {
_activeResends--;
_activeResends.decrementAndGet();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Active resend of " + p + " successful, # active left: " + _activeResends);
}
}
}
if ( (_outboundPackets.isEmpty()) && (_activeResends != 0) ) {
if ( (_outboundPackets.isEmpty()) && (_activeResends.get() != 0) ) {
if (_log.shouldLog(Log.INFO))
_log.info("All outbound packets acked, clearing " + _activeResends);
_activeResends = 0;
_activeResends.set(0);
}
_outboundPackets.notifyAll();
}
@@ -1172,7 +1173,7 @@ class Connection {
// happen to get here next, as the timers get out-of-order esp. after fast retx
if (_packet.getSequenceNum() == _highestAckedThrough + 1 ||
_packet.getNumSends() > 1 ||
_activeResends < Math.max(3, (_options.getWindowSize() + 1) / 2))
_activeResends.get() < Math.max(3, (_options.getWindowSize() + 1) / 2))
isLowest = true;
if (_outboundPackets.containsKey(Long.valueOf(_packet.getSequenceNum())))
resend = true;
@@ -1269,7 +1270,7 @@ class Connection {
if (_outboundQueue.enqueue(_packet)) {
// first resend for this packet ?
if (numSends == 2)
_activeResends++;
_activeResends.incrementAndGet();
if (_log.shouldLog(Log.INFO))
_log.info("Resent packet " +
(fastRetransmit ? "(fast) " : "(timeout) ") +
@@ -1290,7 +1291,7 @@ class Connection {
// acked during resending (... or somethin') ????????????
if ( (_packet.getAckTime() > 0) && (_packet.getNumSends() > 1) ) {
_activeResends--;
_activeResends.decrementAndGet();
synchronized (_outboundPackets) {
_outboundPackets.notifyAll();
}