From e9e550fb55e9aade89eeb99a3948ed532002bb54 Mon Sep 17 00:00:00 2001 From: zab2 Date: Sun, 7 Jul 2013 19:15:08 +0000 Subject: [PATCH] cleanup, sync, more logging --- .../net/i2p/client/streaming/PacketLocal.java | 52 +++++++++++++------ 1 file changed, 35 insertions(+), 17 deletions(-) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java index 97666db6f..5dcc4e512 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java @@ -1,7 +1,9 @@ package net.i2p.client.streaming; import java.io.IOException; +import java.util.Collections; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import net.i2p.I2PAppContext; import net.i2p.data.Destination; @@ -21,14 +23,14 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { private final Connection _connection; private final Destination _to; private SessionKey _keyUsed; - private Set _tagsSent; private final long _createdOn; private volatile int _numSends; - private long _lastSend; + private volatile long _lastSend; private long _acceptedOn; - private long _ackOn; + /** LOCKING: this */ + private long _ackOn; private long _cancelledOn; - private volatile int _nackCount; + private final AtomicInteger _nackCount = new AtomicInteger(0); private volatile boolean _retransmitted; private SimpleTimer2.TimedEvent _resendEvent; @@ -66,7 +68,7 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { /** * @deprecated should always return null or an empty set */ - public Set getTagsSent() { return _tagsSent; } + public Set getTagsSent() { return Collections.EMPTY_SET; } /** * @deprecated I2PSession throws out the tags @@ -111,9 +113,10 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { _lastSend = _context.clock().now(); } public void ackReceived() { + final long now = _context.clock().now(); synchronized (this) { if (_ackOn <= 0) - _ackOn = _context.clock().now(); + _ackOn = now; releasePayload(); notifyAll(); } @@ -134,7 +137,7 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { /** how long after packet creation was it acked? * @return how long after packet creation the packet was ACKed in ms */ - public int getAckTime() { + public synchronized int getAckTime() { if (_ackOn <= 0) return -1; else @@ -151,15 +154,28 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { * but only if it's the lowest unacked (see Connection.ResendPacketEvent) */ public void incrementNACKs() { - int cnt = ++_nackCount; + final int cnt = _nackCount.incrementAndGet(); SimpleTimer2.TimedEvent evt = _resendEvent; if (cnt >= Connection.FAST_RETRANSMIT_THRESHOLD && evt != null && (!_retransmitted) && (_numSends == 1 || _lastSend < _context.clock().now() + 4*1000)) { // Don't fast retx if we recently resent it _retransmitted = true; evt.reschedule(0); + // shouldn't ^^^ be clock.now() - 4000 ??? --zab + + if (_log.shouldLog(Log.DEBUG)) { + final String log = String.format("%s nacks and retransmits. Criteria: nacks=%d, retransmitted=%b,"+ + " numSends=%d, lastSend=%d, now=%d", + toString(), cnt, _retransmitted, _numSends, _lastSend, _context.clock().now()); + _log.debug(log); + } + } else if (_log.shouldLog(Log.DEBUG)) { + final String log = String.format("%s nack but no retransmit. Criteria: nacks=%d, retransmitted=%b,"+ + " numSends=%d, lastSend=%d, now=%d", + toString(), cnt, _retransmitted, _numSends, _lastSend, _context.clock().now()); + _log.debug(log); } } - public int getNACKs() { return _nackCount; } + public int getNACKs() { return _nackCount.get(); } public void setResendPacketEvent(SimpleTimer2.TimedEvent evt) { _resendEvent = evt; } @@ -173,12 +189,14 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { //if ( (_tagsSent != null) && (!_tagsSent.isEmpty()) ) // buf.append(" with tags"); + final int nackCount = _nackCount.get(); + if (nackCount > 0) + buf.append(" nacked ").append(nackCount).append(" times"); - if (_nackCount > 0) - buf.append(" nacked ").append(_nackCount).append(" times"); - - if (_ackOn > 0) - buf.append(" ack after ").append(getAckTime()); + synchronized(this) { + if (_ackOn > 0) + buf.append(" ack after ").append(getAckTime()); + } if (_numSends > 1) buf.append(" sent ").append(_numSends).append(" times"); @@ -256,9 +274,9 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { releasePayload(); } - public boolean writeAccepted() { return _acceptedOn > 0 && _cancelledOn <= 0; } - public boolean writeFailed() { return _cancelledOn > 0; } - public boolean writeSuccessful() { return _ackOn > 0 && _cancelledOn <= 0; } + public synchronized boolean writeAccepted() { return _acceptedOn > 0 && _cancelledOn <= 0; } + public synchronized boolean writeFailed() { return _cancelledOn > 0; } + public synchronized boolean writeSuccessful() { return _ackOn > 0 && _cancelledOn <= 0; } /** Generate a pcap/tcpdump-compatible format, * so we can use standard debugging tools.