* Streaming:

- Allow at least 3 packets and up to half the window to be active resends
     instead of just 1, to reduce stall time after a packet drop
   - Increase fast retransmit threshold back to 3 to reduce retransmissions
   - Don't fast retransmit if we recently retransmitted it already
   - Allow double the window as long as gaps are less than the window
   - Don't set the MSS in a resent packet (saves 2 bytes)
   - Remove redundant calls to updateAcks()
   - Update activity timer when resending a packet
   - Reset unackedPacketsReceived counter at all places where acks are sent
     so it isn't wrong
   - Fix some places where the activeResends count could become wrong
   - Prevent storm of CLOSE packets
   - Never resend the whole packet in ackImmediately(), just send an ack
   - Cancel flusher timer in MessageOutputStream when closed
   - Move some createRateStats to ConnectionManager to reduce repeated calls
   - Cleanups, javadocs, logging, volatile, finals
This commit is contained in:
zzz
2012-06-29 14:53:53 +00:00
parent ebb6609a2b
commit 4092f61898
18 changed files with 363 additions and 244 deletions

View File

@@ -34,19 +34,19 @@ class Connection {
private boolean _resetReceived; private boolean _resetReceived;
private boolean _resetSent; private boolean _resetSent;
private long _resetSentOn; private long _resetSentOn;
private boolean _connected; private volatile boolean _connected;
private boolean _hardDisconnected; private boolean _hardDisconnected;
private final MessageInputStream _inputStream; private final MessageInputStream _inputStream;
private final MessageOutputStream _outputStream; private final MessageOutputStream _outputStream;
private final SchedulerChooser _chooser; private final SchedulerChooser _chooser;
private long _nextSendTime; private volatile long _nextSendTime;
private long _ackedPackets; private long _ackedPackets;
private final long _createdOn; private final long _createdOn;
private long _closeSentOn; private long _closeSentOn;
private long _closeReceivedOn; private long _closeReceivedOn;
private int _unackedPacketsReceived; private int _unackedPacketsReceived;
private long _congestionWindowEnd; private long _congestionWindowEnd;
private long _highestAckedThrough; private volatile long _highestAckedThrough;
private boolean _isInbound; private boolean _isInbound;
private boolean _updatedShareOpts; private boolean _updatedShareOpts;
/** Packet ID (Long) to PacketLocal for sent but unacked packets */ /** Packet ID (Long) to PacketLocal for sent but unacked packets */
@@ -60,11 +60,11 @@ class Connection {
private String _connectionError; private String _connectionError;
private long _disconnectScheduledOn; private long _disconnectScheduledOn;
private long _lastReceivedOn; private long _lastReceivedOn;
private ActivityTimer _activityTimer; private final ActivityTimer _activityTimer;
/** window size when we last saw congestion */ /** window size when we last saw congestion */
private int _lastCongestionSeenAt; private int _lastCongestionSeenAt;
private long _lastCongestionTime; private long _lastCongestionTime;
private long _lastCongestionHighestUnacked; private volatile long _lastCongestionHighestUnacked;
private boolean _ackSinceCongestion; private boolean _ackSinceCongestion;
/** Notify this on connection (or connection failure) */ /** Notify this on connection (or connection failure) */
private final Object _connectLock; private final Object _connectLock;
@@ -96,7 +96,9 @@ class Connection {
} }
****/ ****/
/** */ /**
* @param opts may be null
*/
public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser,
SimpleTimer2 timer, SimpleTimer2 timer,
PacketQueue queue, ConnectionPacketHandler handler, ConnectionOptions opts) { PacketQueue queue, ConnectionPacketHandler handler, ConnectionOptions opts) {
@@ -138,10 +140,7 @@ class Connection {
_resetSentOn = -1; _resetSentOn = -1;
_connectionEvent = new ConEvent(); _connectionEvent = new ConEvent();
_randomWait = _context.random().nextInt(10*1000); // just do this once to reduce usage _randomWait = _context.random().nextInt(10*1000); // just do this once to reduce usage
_context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); // all createRateStats in ConnectionManager
_context.statManager().createRateStat("stream.chokeSizeBegin", "How many messages were outstanding when we started to choke?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.chokeSizeEnd", "How many messages were outstanding when we stopped being choked?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.fastRetransmit", "How long a packet has been around for if it has been resent per the fast retransmit timer?", "Stream", new long[] { 60*1000, 10*60*1000 });
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("New connection created with options: " + _options); _log.info("New connection created with options: " + _options);
} }
@@ -169,7 +168,6 @@ class Connection {
* will return false after 5 minutes even if timeoutMs is <= 0. * will return false after 5 minutes even if timeoutMs is <= 0.
*/ */
boolean packetSendChoke(long timeoutMs) { boolean packetSendChoke(long timeoutMs) {
// if (false) return true; // <--- what the fuck??
long start = _context.clock().now(); long start = _context.clock().now();
long writeExpire = start + timeoutMs; // only used if timeoutMs > 0 long writeExpire = start + timeoutMs; // only used if timeoutMs > 0
boolean started = false; boolean started = false;
@@ -187,19 +185,26 @@ class Connection {
if (!_connected) if (!_connected)
return false; return false;
started = true; started = true;
if ( (_outboundPackets.size() >= _options.getWindowSize()) || (_activeResends > 0) || // Try to keep things moving even during NACKs and retransmissions...
(_lastSendId.get() - _highestAckedThrough > _options.getWindowSize()) ) { // Limit unacked packets to the window
// Limit active resends to half the window
// Limit (highest-lowest) to twice the window (if far end doesn't like it, it can send a choke)
int unacked = _outboundPackets.size();
int wsz = _options.getWindowSize();
if (unacked >= wsz ||
_activeResends >= (wsz + 1) / 2 ||
_lastSendId.get() - _highestAckedThrough >= Math.max(MAX_WINDOW_SIZE, 2 * wsz)) {
if (timeoutMs > 0) { if (timeoutMs > 0) {
if (timeLeft <= 0) { if (timeLeft <= 0) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Outbound window is full of " + _outboundPackets.size() _log.info("Outbound window is full " + unacked
+ " with " + _activeResends + " active resends" + " unacked with " + _activeResends + " active resends"
+ " and we've waited too long (" + (0-(timeLeft - timeoutMs)) + "ms): " + " and we've waited too long (" + (0-(timeLeft - timeoutMs)) + "ms): "
+ toString()); + toString());
return false; return false;
} }
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _options.getWindowSize() + "/" _log.debug("Outbound window is full (" + unacked + "/" + wsz + "/"
+ _activeResends + "), waiting " + timeLeft); + _activeResends + "), waiting " + timeLeft);
try { _outboundPackets.wait(Math.min(timeLeft,250l)); } catch (InterruptedException ie) { if (_log.shouldLog(Log.DEBUG)) _log.debug("InterruptedException while Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends +")"); return false;} try { _outboundPackets.wait(Math.min(timeLeft,250l)); } catch (InterruptedException ie) { if (_log.shouldLog(Log.DEBUG)) _log.debug("InterruptedException while Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends +")"); return false;}
} else { } else {
@@ -223,6 +228,12 @@ class Connection {
void ackImmediately() { void ackImmediately() {
PacketLocal packet = null; PacketLocal packet = null;
/*** why would we do this?
was it to force a congestion indication at the other end?
an expensive way to do that...
One big user was via SchedulerClosing to resend a CLOSE packet,
but why do that either...
synchronized (_outboundPackets) { synchronized (_outboundPackets) {
if (!_outboundPackets.isEmpty()) { if (!_outboundPackets.isEmpty()) {
// ordered, so pick the lowest to retransmit // ordered, so pick the lowest to retransmit
@@ -239,6 +250,7 @@ class Connection {
} }
ResendPacketEvent evt = (ResendPacketEvent)packet.getResendEvent(); ResendPacketEvent evt = (ResendPacketEvent)packet.getResendEvent();
if (evt != null) { if (evt != null) {
// fixme should we set a flag and reschedule instead? or synch?
boolean sent = evt.retransmit(false); boolean sent = evt.retransmit(false);
if (sent) { if (sent) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
@@ -251,7 +263,9 @@ class Connection {
} }
} }
} }
***/
// if we don't have anything to retransmit, send a small ack // if we don't have anything to retransmit, send a small ack
// this calls sendPacket() below
packet = _receiver.send(null, 0, 0); packet = _receiver.send(null, 0, 0);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("sending new ack: " + packet); _log.debug("sending new ack: " + packet);
@@ -281,11 +295,15 @@ class Connection {
reply.setReceiveStreamId(_receiveStreamId); reply.setReceiveStreamId(_receiveStreamId);
reply.setOptionalFrom(_connectionManager.getSession().getMyDestination()); reply.setOptionalFrom(_connectionManager.getSession().getMyDestination());
// this just sends the packet - no retries or whatnot // this just sends the packet - no retries or whatnot
_outboundQueue.enqueue(reply); if (_outboundQueue.enqueue(reply)) {
_unackedPacketsReceived = 0;
_lastSendTime = _context.clock().now();
resetActivityTimer();
}
} }
/** /**
* Flush any data that we can * Flush any data that we can. Non-blocking.
*/ */
void sendAvailable() { void sendAvailable() {
// this grabs the data, builds a packet, and queues it up via sendPacket // this grabs the data, builds a packet, and queues it up via sendPacket
@@ -301,7 +319,6 @@ class Connection {
if (packet == null) return; if (packet == null) return;
setNextSendTime(-1); setNextSendTime(-1);
_unackedPacketsReceived = 0;
if (_options.getRequireFullySigned()) { if (_options.getRequireFullySigned()) {
packet.setFlag(Packet.FLAG_SIGNATURE_INCLUDED); packet.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
packet.setFlag(Packet.FLAG_SIGNATURE_REQUESTED); packet.setFlag(Packet.FLAG_SIGNATURE_REQUESTED);
@@ -328,8 +345,8 @@ class Connection {
(packet.getSequenceNum() % 8 == 0)) { (packet.getSequenceNum() % 8 == 0)) {
packet.setOptionalDelay(0); packet.setOptionalDelay(0);
packet.setFlag(Packet.FLAG_DELAY_REQUESTED); packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
if (_log.shouldLog(Log.DEBUG)) //if (_log.shouldLog(Log.DEBUG))
_log.debug("Requesting no ack delay for packet " + packet); // _log.debug("Requesting no ack delay for packet " + packet);
} else { } else {
// This is somewhat of a waste of time, unless the RTT < 4000, // This is somewhat of a waste of time, unless the RTT < 4000,
// since the other end limits it to getSendAckDelay() // since the other end limits it to getSendAckDelay()
@@ -358,10 +375,12 @@ class Connection {
// warning, getStatLog() can be null // warning, getStatLog() can be null
//_context.statManager().getStatLog().addData(Packet.toId(_sendStreamId), "stream.rtt", _options.getRTT(), _options.getWindowSize()); //_context.statManager().getStatLog().addData(Packet.toId(_sendStreamId), "stream.rtt", _options.getRTT(), _options.getWindowSize());
_lastSendTime = _context.clock().now(); if (_outboundQueue.enqueue(packet)) {
_outboundQueue.enqueue(packet); _unackedPacketsReceived = 0;
resetActivityTimer(); _lastSendTime = _context.clock().now();
resetActivityTimer();
}
/* /*
if (ackOnly) { if (ackOnly) {
// ACK only, don't schedule this packet for retries // ACK only, don't schedule this packet for retries
@@ -397,6 +416,7 @@ class Connection {
* @return List of packets acked or null * @return List of packets acked or null
*/ */
List<PacketLocal> ackPackets(long ackThrough, long nacks[]) { List<PacketLocal> ackPackets(long ackThrough, long nacks[]) {
// FIXME synch this part too?
if (ackThrough < _highestAckedThrough) { if (ackThrough < _highestAckedThrough) {
// dupack which won't tell us anything // dupack which won't tell us anything
} else { } else {
@@ -415,16 +435,17 @@ class Connection {
List<PacketLocal> acked = null; List<PacketLocal> acked = null;
synchronized (_outboundPackets) { synchronized (_outboundPackets) {
for (Iterator<Long> iter = _outboundPackets.keySet().iterator(); iter.hasNext(); ) { for (Map.Entry<Long, PacketLocal> e : _outboundPackets.entrySet()) {
Long id = iter.next(); long id = e.getKey().longValue();
if (id.longValue() <= ackThrough) { if (id <= ackThrough) {
boolean nacked = false; boolean nacked = false;
if (nacks != null) { if (nacks != null) {
// linear search since its probably really tiny // linear search since its probably really tiny
for (int i = 0; i < nacks.length; i++) { for (int i = 0; i < nacks.length; i++) {
if (nacks[i] == id.longValue()) { if (nacks[i] == id) {
nacked = true; nacked = true;
PacketLocal nackedPacket = _outboundPackets.get(id); PacketLocal nackedPacket = e.getValue();
// this will do a fast retransmit if appropriate
nackedPacket.incrementNACKs(); nackedPacket.incrementNACKs();
break; // NACKed break; // NACKed
} }
@@ -433,11 +454,27 @@ class Connection {
if (!nacked) { // aka ACKed if (!nacked) { // aka ACKed
if (acked == null) if (acked == null)
acked = new ArrayList(1); acked = new ArrayList(1);
PacketLocal ackedPacket = _outboundPackets.get(id); PacketLocal ackedPacket = e.getValue();
ackedPacket.ackReceived(); ackedPacket.ackReceived();
acked.add(ackedPacket); acked.add(ackedPacket);
} }
} else { } else {
// TODO
// we do not currently do an "implicit nack" of the packets higher
// than ackThrough, so those will not be fast retransmitted
// we could incrementNACK them here... but we may need to set the fastRettransmit
// threshold back to 3 for that.
// this will do a fast retransmit if appropriate
// This doesn't work because every packet has an ACK in it, so we hit the
// FAST_TRANSMIT threshold in a heartbeat and retransmit everything,
// even with the threshold at 3. (we never set the NO_ACK field in the header)
// Also, we may need to track that we
// have the same ackThrough for 3 or 4 consecutive times.
// See https://secure.wikimedia.org/wikipedia/en/wiki/Fast_retransmit
//if (_log.shouldLog(Log.INFO))
// _log.info("ACK thru " + ackThrough + " implicitly NACKs " + id);
//PacketLocal nackedPacket = e.getValue();
//nackedPacket.incrementNACKs();
break; // _outboundPackets is ordered break; // _outboundPackets is ordered
} }
} }
@@ -465,31 +502,33 @@ class Connection {
return acked; return acked;
} }
private long _occurredTime; //private long _occurredTime;
private long _occurredEventCount; //private long _occurredEventCount;
void eventOccurred() { void eventOccurred() {
long now = System.currentTimeMillis(); //long now = System.currentTimeMillis();
TaskScheduler sched = _chooser.getScheduler(this); TaskScheduler sched = _chooser.getScheduler(this);
now = now - now % 1000; //now = now - now % 1000;
if (_occurredTime == now) { //if (_occurredTime == now) {
_occurredEventCount++; // _occurredEventCount++;
} else { //} else {
_occurredTime = now; // _occurredTime = now;
if ( (_occurredEventCount > 1000) && (_log.shouldLog(Log.WARN)) ) { // if ( (_occurredEventCount > 1000) && (_log.shouldLog(Log.WARN)) ) {
_log.warn("More than 1000 events (" + _occurredEventCount + ") in a second on " // _log.warn("More than 1000 events (" + _occurredEventCount + ") in a second on "
+ toString() + ": scheduler = " + sched); // + toString() + ": scheduler = " + sched);
} // }
_occurredEventCount = 0; // _occurredEventCount = 0;
} //}
long before = System.currentTimeMillis(); long before = System.currentTimeMillis();
sched.eventOccurred(this); sched.eventOccurred(this);
long elapsed = System.currentTimeMillis() - before; long elapsed = System.currentTimeMillis() - before;
if ( (elapsed > 1000) && (_log.shouldLog(Log.WARN)) ) // 250 and warn for debugging
_log.warn("Took " + elapsed + "ms to pump through " + sched); if ( (elapsed > 250) && (_log.shouldLog(Log.WARN)) )
_log.warn("Took " + elapsed + "ms to pump through " + sched + " on " + toString());
} }
void resetReceived() { void resetReceived() {
@@ -498,12 +537,8 @@ class Connection {
SimpleScheduler.getInstance().addEvent(new DisconnectEvent(), DISCONNECT_TIMEOUT); SimpleScheduler.getInstance().addEvent(new DisconnectEvent(), DISCONNECT_TIMEOUT);
} }
_resetReceived = true; _resetReceived = true;
MessageOutputStream mos = _outputStream; _outputStream.streamErrorOccurred(new IOException("Reset received"));
MessageInputStream mis = _inputStream; _inputStream.streamErrorOccurred(new IOException("Reset received"));
if (mos != null)
mos.streamErrorOccurred(new IOException("Reset received"));
if (mis != null)
mis.streamErrorOccurred(new IOException("Reset received"));
_connectionError = "Connection reset"; _connectionError = "Connection reset";
synchronized (_connectLock) { _connectLock.notifyAll(); } synchronized (_connectLock) { _connectLock.notifyAll(); }
} }
@@ -556,15 +591,10 @@ class Connection {
s.destroy2(); s.destroy2();
_socket = null; _socket = null;
} }
if (_outputStream != null) _outputStream.destroy();
_outputStream.destroy(); _receiver.destroy();
if (_receiver != null) _activityTimer.cancel();
_receiver.destroy(); _inputStream.streamErrorOccurred(new IOException("disconnected!"));
if (_activityTimer != null)
_activityTimer.cancel();
//_activityTimer = null;
if (_inputStream != null)
_inputStream.streamErrorOccurred(new IOException("disconnected!"));
if (_disconnectScheduledOn < 0) { if (_disconnectScheduledOn < 0) {
_disconnectScheduledOn = _context.clock().now(); _disconnectScheduledOn = _context.clock().now();
@@ -656,11 +686,7 @@ class Connection {
* @return Last time we sent data * @return Last time we sent data
*/ */
public long getLastSendTime() { return _lastSendTime; } public long getLastSendTime() { return _lastSendTime; }
/** Set the time we sent data.
* @param when The time we sent data
*/
public void setLastSendTime(long when) { _lastSendTime = when; }
/** What was the last packet Id sent to the peer? /** What was the last packet Id sent to the peer?
* @return The last sent packet ID * @return The last sent packet ID
*/ */
@@ -795,10 +821,9 @@ class Connection {
public long getCongestionWindowEnd() { return _congestionWindowEnd; } public long getCongestionWindowEnd() { return _congestionWindowEnd; }
public void setCongestionWindowEnd(long endMsg) { _congestionWindowEnd = endMsg; } public void setCongestionWindowEnd(long endMsg) { _congestionWindowEnd = endMsg; }
/** @return the highest outbound packet we have recieved an ack for */ /** @return the highest outbound packet we have recieved an ack for */
public long getHighestAckedThrough() { return _highestAckedThrough; } public long getHighestAckedThrough() { return _highestAckedThrough; }
/** @deprecated unused */
public void setHighestAckedThrough(long msgNum) { _highestAckedThrough = msgNum; }
public long getLastActivityOn() { public long getLastActivityOn() {
return (_lastSendTime > _lastReceivedOn ? _lastSendTime : _lastReceivedOn); return (_lastSendTime > _lastReceivedOn ? _lastSendTime : _lastReceivedOn);
@@ -878,17 +903,12 @@ class Connection {
} }
private void resetActivityTimer() { private void resetActivityTimer() {
if (_options.getInactivityTimeout() <= 0) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Resetting the inactivity timer, but its gone!", new Exception("where did it go?"));
return;
}
if (_activityTimer == null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Resetting the inactivity timer, but its gone!", new Exception("where did it go?"));
return;
}
long howLong = _options.getInactivityTimeout(); long howLong = _options.getInactivityTimeout();
if (howLong <= 0) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Resetting the inactivity timer, but its gone!", new Exception("where did it go?"));
return;
}
howLong += _randomWait; // randomize it a bit, so both sides don't do it at once howLong += _randomWait; // randomize it a bit, so both sides don't do it at once
//if (_log.shouldLog(Log.DEBUG)) //if (_log.shouldLog(Log.DEBUG))
// _log.debug("Resetting the inactivity timer to " + howLong); // _log.debug("Resetting the inactivity timer to " + howLong);
@@ -983,12 +1003,12 @@ class Connection {
} }
/** stream that the local peer receives data on /** stream that the local peer receives data on
* @return the inbound message stream * @return the inbound message stream, non-null
*/ */
public MessageInputStream getInputStream() { return _inputStream; } public MessageInputStream getInputStream() { return _inputStream; }
/** stream that the local peer sends data to the remote peer on /** stream that the local peer sends data to the remote peer on
* @return the outbound message stream * @return the outbound message stream, non-null
*/ */
public MessageOutputStream getOutputStream() { return _outputStream; } public MessageOutputStream getOutputStream() { return _outputStream; }
@@ -1032,12 +1052,10 @@ class Connection {
*/ */
buf.append("unacked in: ").append(getUnackedPacketsReceived()); buf.append("unacked in: ").append(getUnackedPacketsReceived());
int missing = 0; int missing = 0;
if (_inputStream != null) { long nacks[] = _inputStream.getNacks();
long nacks[] = _inputStream.getNacks(); if (nacks != null) {
if (nacks != null) { missing = nacks.length;
missing = nacks.length; buf.append(" [").append(missing).append(" missing]");
buf.append(" [").append(missing).append(" missing]");
}
} }
if (getResetSent()) if (getResetSent())
@@ -1053,8 +1071,7 @@ class Connection {
if (getCloseReceivedOn() > 0) if (getCloseReceivedOn() > 0)
buf.append(" close received ").append(DataHelper.formatDuration(_context.clock().now() - getCloseReceivedOn())).append(" ago"); buf.append(" close received ").append(DataHelper.formatDuration(_context.clock().now() - getCloseReceivedOn())).append(" ago");
buf.append(" sent: ").append(1 + _lastSendId.get()); buf.append(" sent: ").append(1 + _lastSendId.get());
if (_inputStream != null) buf.append(" rcvd: ").append(1 + _inputStream.getHighestBlockId() - missing);
buf.append(" rcvd: ").append(1 + _inputStream.getHighestBlockId() - missing);
buf.append(" maxWin ").append(getOptions().getMaxWindowSize()); buf.append(" maxWin ").append(getOptions().getMaxWindowSize());
buf.append(" MTU ").append(getOptions().getMaxMessageSize()); buf.append(" MTU ").append(getOptions().getMaxMessageSize());
@@ -1086,14 +1103,15 @@ class Connection {
* there are other packets in flight. 3 takes forever, let's try 2. * there are other packets in flight. 3 takes forever, let's try 2.
* *
*/ */
static final int FAST_RETRANSMIT_THRESHOLD = 2; static final int FAST_RETRANSMIT_THRESHOLD = 3;
/** /**
* Coordinate the resends of a given packet * Coordinate the resends of a given packet
*/ */
class ResendPacketEvent extends SimpleTimer2.TimedEvent { class ResendPacketEvent extends SimpleTimer2.TimedEvent {
private PacketLocal _packet; private final PacketLocal _packet;
private long _nextSendTime; private long _nextSendTime;
public ResendPacketEvent(PacketLocal packet, long delay) { public ResendPacketEvent(PacketLocal packet, long delay) {
super(_timer); super(_timer);
_packet = packet; _packet = packet;
@@ -1111,6 +1129,8 @@ class Connection {
* we have to use forceReschedule() instead of schedule() below, * we have to use forceReschedule() instead of schedule() below,
* to prevent duplicates in the timer queue. * to prevent duplicates in the timer queue.
* *
* don't synchronize this, deadlock with ackPackets->ackReceived->SimpleTimer2.cancel
*
* @param penalize true if this retransmission is caused by a timeout, false if we * @param penalize true if this retransmission is caused by a timeout, false if we
* are just sending this packet instead of an ACK * are just sending this packet instead of an ACK
* @return true if the packet was sent, false if it was not * @return true if the packet was sent, false if it was not
@@ -1131,7 +1151,12 @@ class Connection {
boolean resend = false; boolean resend = false;
boolean isLowest = false; boolean isLowest = false;
synchronized (_outboundPackets) { synchronized (_outboundPackets) {
if (_packet.getSequenceNum() == _highestAckedThrough + 1) // allow appx. half the window to be "lowest" and be active resends, minimum of 3
// Note: we should really pick the N lowest, not the lowest one + N more who
// happen to get here next, as the timers get out-of-order esp. after fast retx
if (_packet.getSequenceNum() == _highestAckedThrough + 1 ||
_packet.getNumSends() > 1 ||
_activeResends < Math.max(3, (_options.getWindowSize() + 1) / 2))
isLowest = true; isLowest = true;
if (_outboundPackets.containsKey(Long.valueOf(_packet.getSequenceNum()))) if (_outboundPackets.containsKey(Long.valueOf(_packet.getSequenceNum())))
resend = true; resend = true;
@@ -1145,24 +1170,28 @@ class Connection {
// BUG? seq# = 0, activeResends = 0, loop forever - why? // BUG? seq# = 0, activeResends = 0, loop forever - why?
// also seen with seq# > 0. Is the _activeResends count reliable? // also seen with seq# > 0. Is the _activeResends count reliable?
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Delaying resend of " + _packet + " as there are " _log.info("Delaying resend of " + _packet + " with "
+ _activeResends + " active resends already in play"); + _activeResends + " active resend, "
forceReschedule(1000); + _outboundPackets.size() + " unacked, window size = " + _options.getWindowSize());
_nextSendTime = 1000 + _context.clock().now(); forceReschedule(1333);
_nextSendTime = 1333 + _context.clock().now();
return false; return false;
} }
// It's the lowest, or it's fast retransmit time. Resend the packet.
if (fastRetransmit) if (fastRetransmit)
_context.statManager().addRateData("stream.fastRetransmit", _packet.getLifetime(), _packet.getLifetime()); _context.statManager().addRateData("stream.fastRetransmit", _packet.getLifetime(), _packet.getLifetime());
// revamp various fields, in case we need to ack more, etc // revamp various fields, in case we need to ack more, etc
_inputStream.updateAcks(_packet); // updateAcks done in enqueue()
//_inputStream.updateAcks(_packet);
int choke = getOptions().getChoke(); int choke = getOptions().getChoke();
_packet.setOptionalDelay(choke); _packet.setOptionalDelay(choke);
if (choke > 0) if (choke > 0)
_packet.setFlag(Packet.FLAG_DELAY_REQUESTED); _packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
// this seems unnecessary to send the MSS again: // this seems unnecessary to send the MSS again:
_packet.setOptionalMaxSize(getOptions().getMaxMessageSize()); //_packet.setOptionalMaxSize(getOptions().getMaxMessageSize());
// bugfix release 0.7.8, we weren't dividing by 1000 // bugfix release 0.7.8, we weren't dividing by 1000
_packet.setResendDelay(getOptions().getResendDelay() / 1000); _packet.setResendDelay(getOptions().getResendDelay() / 1000);
if (_packet.getReceiveStreamId() <= 0) if (_packet.getReceiveStreamId() <= 0)
@@ -1186,7 +1215,7 @@ class Connection {
getOptions().setWindowSize(newWindowSize); getOptions().setWindowSize(newWindowSize);
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Congestion resending packet " + _packet.getSequenceNum() + ": new windowSize " + newWindowSize _log.warn("Congestion, resending packet " + _packet.getSequenceNum() + " (new windowSize " + newWindowSize
+ "/" + getOptions().getWindowSize() + ") for " + Connection.this.toString()); + "/" + getOptions().getWindowSize() + ") for " + Connection.this.toString());
windowAdjusted(); windowAdjusted();
@@ -1195,10 +1224,6 @@ class Connection {
int numSends = _packet.getNumSends() + 1; int numSends = _packet.getNumSends() + 1;
if (numSends == 2) {
// first resend for this packet
_activeResends++;
}
// in case things really suck, the other side may have lost thier // in case things really suck, the other side may have lost thier
// session tags (e.g. they restarted), so jump back to ElGamal. // session tags (e.g. they restarted), so jump back to ElGamal.
@@ -1225,27 +1250,34 @@ class Connection {
// set this before enqueue() as it passes it on to the router // set this before enqueue() as it passes it on to the router
_nextSendTime = timeout + _context.clock().now(); _nextSendTime = timeout + _context.clock().now();
if (_log.shouldLog(Log.INFO)) if (_outboundQueue.enqueue(_packet)) {
_log.info("Resend packet " + _packet + " time " + numSends + // first resend for this packet ?
if (numSends == 2)
_activeResends++;
if (_log.shouldLog(Log.INFO))
_log.info("Resent packet " +
(fastRetransmit ? "(fast) " : "(timeout) ") +
_packet +
" next resend in " + timeout + "ms" +
" activeResends: " + _activeResends + " activeResends: " + _activeResends +
" (wsize " " (wsize "
+ newWindowSize + " lifetime " + newWindowSize + " lifetime "
+ (_context.clock().now() - _packet.getCreatedOn()) + "ms)"); + (_context.clock().now() - _packet.getCreatedOn()) + "ms)");
_outboundQueue.enqueue(_packet); _unackedPacketsReceived = 0;
_lastSendTime = _context.clock().now(); _lastSendTime = _context.clock().now();
// timer reset added 0.9.1
resetActivityTimer();
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Scheduling resend in " + timeout + "ms for " + _packet);
forceReschedule(timeout); forceReschedule(timeout);
} }
// acked during resending (... or somethin') // acked during resending (... or somethin') ????????????
if ( (_packet.getAckTime() > 0) && (_packet.getNumSends() > 1) ) { if ( (_packet.getAckTime() > 0) && (_packet.getNumSends() > 1) ) {
_activeResends--; _activeResends--;
synchronized (_outboundPackets) { synchronized (_outboundPackets) {
_outboundPackets.notifyAll(); _outboundPackets.notifyAll();
} }
return true;
} }
return true; return true;

View File

@@ -10,7 +10,12 @@ import net.i2p.util.Log;
* do NOT block, but they also do not necessary imply immediate * do NOT block, but they also do not necessary imply immediate
* delivery, or even the generation of a new packet. This class * delivery, or even the generation of a new packet. This class
* is the only one that builds useful outbound Packet objects. * is the only one that builds useful outbound Packet objects.
* *<p>
* MessageOutputStream -> ConnectionDataReceiver -> Connection -> PacketQueue -> I2PSession
*<p>
* There's one of these per MessageOutputStream.
* It stores no state. It sends everything to the Connection unless
* the Connection is closed,
*/ */
class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
private final I2PAppContext _context; private final I2PAppContext _context;
@@ -82,7 +87,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
if (_log.shouldLog(Log.INFO) && !doSend) if (_log.shouldLog(Log.INFO) && !doSend)
_log.info("writeData called: size="+size + " doSend=" + doSend _log.info("writeData called: size="+size + " doSend=" + doSend
+ " unackedReceived: " + con.getUnackedPacketsReceived() + " unackedReceived: " + con.getUnackedPacketsReceived()
+ " con: " + con, new Exception("write called by")); + " con: " + con /* , new Exception("write called by") */ );
if (doSend) { if (doSend) {
PacketLocal packet = send(buf, off, size); PacketLocal packet = send(buf, off, size);
@@ -111,6 +116,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
public PacketLocal send(byte buf[], int off, int size) { public PacketLocal send(byte buf[], int off, int size) {
return send(buf, off, size, false); return send(buf, off, size, false);
} }
/** /**
* @param buf data to be sent - may be null * @param buf data to be sent - may be null
* @param off offset into the buffer to start writing from * @param off offset into the buffer to start writing from
@@ -120,22 +126,20 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
* @return the packet sent * @return the packet sent
*/ */
public PacketLocal send(byte buf[], int off, int size, boolean forceIncrement) { public PacketLocal send(byte buf[], int off, int size, boolean forceIncrement) {
Connection con = _connection; //long before = System.currentTimeMillis();
//if (con == null) return null; PacketLocal packet = buildPacket(buf, off, size, forceIncrement);
long before = System.currentTimeMillis(); //long built = System.currentTimeMillis();
PacketLocal packet = buildPacket(con, buf, off, size, forceIncrement); _connection.sendPacket(packet);
long built = System.currentTimeMillis(); //long sent = System.currentTimeMillis();
con.sendPacket(packet);
long sent = System.currentTimeMillis();
if ( (built-before > 5*1000) && (_log.shouldLog(Log.WARN)) ) //if ( (built-before > 5*1000) && (_log.shouldLog(Log.WARN)) )
_log.warn("wtf, took " + (built-before) + "ms to build a packet: " + packet); // _log.warn("wtf, took " + (built-before) + "ms to build a packet: " + packet);
if ( (sent-built> 5*1000) && (_log.shouldLog(Log.WARN)) ) //if ( (sent-built> 5*1000) && (_log.shouldLog(Log.WARN)) )
_log.warn("wtf, took " + (sent-built) + "ms to send a packet: " + packet); // _log.warn("wtf, took " + (sent-built) + "ms to send a packet: " + packet);
return packet; return packet;
} }
private boolean isAckOnly(Connection con, int size) { private static boolean isAckOnly(Connection con, int size) {
boolean ackOnly = ( (size <= 0) && // no data boolean ackOnly = ( (size <= 0) && // no data
(con.getLastSendId() >= 0) && // not a SYN (con.getLastSendId() >= 0) && // not a SYN
( (!con.getOutputStream().getClosed()) || // not a CLOSE ( (!con.getOutputStream().getClosed()) || // not a CLOSE
@@ -144,7 +148,16 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
return ackOnly; return ackOnly;
} }
private PacketLocal buildPacket(Connection con, byte buf[], int off, int size, boolean forceIncrement) { /**
* @param buf data to be sent - may be null
* @param off offset into the buffer to start writing from
* @param size how many bytes of the buffer to write (may be 0)
* @param forceIncrement even if the buffer is empty, increment the packetId
* so we get an ACK back
* @return the packet to be sent
*/
private PacketLocal buildPacket(byte buf[], int off, int size, boolean forceIncrement) {
Connection con = _connection;
if (size > Packet.MAX_PAYLOAD_SIZE) throw new IllegalArgumentException("size is too large (" + size + ")"); if (size > Packet.MAX_PAYLOAD_SIZE) throw new IllegalArgumentException("size is too large (" + size + ")");
boolean ackOnly = isAckOnly(con, size); boolean ackOnly = isAckOnly(con, size);
boolean isFirst = (con.getAckedPackets() <= 0) && (con.getUnackedPacketsSent() <= 0); boolean isFirst = (con.getAckedPackets() <= 0) && (con.getUnackedPacketsSent() <= 0);
@@ -164,7 +177,8 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
packet.setSendStreamId(con.getSendStreamId()); packet.setSendStreamId(con.getSendStreamId());
packet.setReceiveStreamId(con.getReceiveStreamId()); packet.setReceiveStreamId(con.getReceiveStreamId());
con.getInputStream().updateAcks(packet); // not needed here, handled in PacketQueue.enqueue()
//con.getInputStream().updateAcks(packet);
// note that the optional delay is usually rewritten in Connection.sendPacket() // note that the optional delay is usually rewritten in Connection.sendPacket()
int choke = con.getOptions().getChoke(); int choke = con.getOptions().getChoke();
packet.setOptionalDelay(choke); packet.setOptionalDelay(choke);
@@ -195,6 +209,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
// don't set the closed flag if this is a plain ACK and there are outstanding // don't set the closed flag if this is a plain ACK and there are outstanding
// packets sent, otherwise the other side could receive the CLOSE prematurely, // packets sent, otherwise the other side could receive the CLOSE prematurely,
// since this ACK could arrive before the unacked payload message. // since this ACK could arrive before the unacked payload message.
// TODO if the only unacked packet is the CLOSE packet and it didn't have any data...
if (con.getOutputStream().getClosed() && if (con.getOutputStream().getClosed() &&
( (size > 0) || (con.getUnackedPacketsSent() <= 0) || (packet.getSequenceNum() > 0) ) ) { ( (size > 0) || (con.getUnackedPacketsSent() <= 0) || (packet.getSequenceNum() > 0) ) ) {
packet.setFlag(Packet.FLAG_CLOSE); packet.setFlag(Packet.FLAG_CLOSE);

View File

@@ -75,6 +75,7 @@ class ConnectionManager {
/** Socket timeout for accept() */ /** Socket timeout for accept() */
_soTimeout = -1; _soTimeout = -1;
// Stats for this class
_context.statManager().createRateStat("stream.con.lifetimeMessagesSent", "How many messages do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("stream.con.lifetimeMessagesSent", "How many messages do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("stream.con.lifetimeMessagesReceived", "How many messages do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("stream.con.lifetimeMessagesReceived", "How many messages do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("stream.con.lifetimeBytesSent", "How many bytes do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("stream.con.lifetimeBytesSent", "How many bytes do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
@@ -85,6 +86,14 @@ class ConnectionManager {
_context.statManager().createRateStat("stream.con.lifetimeCongestionSeenAt", "When was the last congestion seen at when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("stream.con.lifetimeCongestionSeenAt", "When was the last congestion seen at when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("stream.con.lifetimeSendWindowSize", "What is the final send window size when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("stream.con.lifetimeSendWindowSize", "What is the final send window size when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("stream.receiveActive", "How many streams are active when a new one is received (period being not yet dropped)", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("stream.receiveActive", "How many streams are active when a new one is received (period being not yet dropped)", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
// Stats for Connection
_context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.chokeSizeBegin", "How many messages were outstanding when we started to choke?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.chokeSizeEnd", "How many messages were outstanding when we stopped being choked?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.fastRetransmit", "How long a packet has been around for if it has been resent per the fast retransmit timer?", "Stream", new long[] { 60*1000, 10*60*1000 });
// Stats for PacketQueue
_context.statManager().createRateStat("stream.con.sendMessageSize", "Size of a message sent on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.con.sendDuplicateSize", "Size of a message resent on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
} }
Connection getConnectionByInboundId(long id) { Connection getConnectionByInboundId(long id) {
@@ -420,13 +429,11 @@ class ConnectionManager {
if (removed) { if (removed) {
_context.statManager().addRateData("stream.con.lifetimeMessagesSent", 1+con.getLastSendId(), con.getLifetime()); _context.statManager().addRateData("stream.con.lifetimeMessagesSent", 1+con.getLastSendId(), con.getLifetime());
MessageInputStream stream = con.getInputStream(); MessageInputStream stream = con.getInputStream();
if (stream != null) {
long rcvd = 1 + stream.getHighestBlockId(); long rcvd = 1 + stream.getHighestBlockId();
long nacks[] = stream.getNacks(); long nacks[] = stream.getNacks();
if (nacks != null) if (nacks != null)
rcvd -= nacks.length; rcvd -= nacks.length;
_context.statManager().addRateData("stream.con.lifetimeMessagesReceived", rcvd, con.getLifetime()); _context.statManager().addRateData("stream.con.lifetimeMessagesReceived", rcvd, con.getLifetime());
}
_context.statManager().addRateData("stream.con.lifetimeBytesSent", con.getLifetimeBytesSent(), con.getLifetime()); _context.statManager().addRateData("stream.con.lifetimeBytesSent", con.getLifetimeBytesSent(), con.getLifetime());
_context.statManager().addRateData("stream.con.lifetimeBytesReceived", con.getLifetimeBytesReceived(), con.getLifetime()); _context.statManager().addRateData("stream.con.lifetimeBytesReceived", con.getLifetimeBytesReceived(), con.getLifetime());
_context.statManager().addRateData("stream.con.lifetimeDupMessagesSent", con.getLifetimeDupMessagesSent(), con.getLifetime()); _context.statManager().addRateData("stream.con.lifetimeDupMessagesSent", con.getLifetimeDupMessagesSent(), con.getLifetime());

View File

@@ -98,6 +98,10 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
public static final int DEFAULT_INITIAL_ACK_DELAY = 2*1000; public static final int DEFAULT_INITIAL_ACK_DELAY = 2*1000;
static final int MIN_WINDOW_SIZE = 1; static final int MIN_WINDOW_SIZE = 1;
private static final boolean DEFAULT_ANSWER_PINGS = true; private static final boolean DEFAULT_ANSWER_PINGS = true;
private static final int DEFAULT_INACTIVITY_TIMEOUT = 90*1000;
private static final int DEFAULT_INACTIVITY_ACTION = INACTIVITY_ACTION_SEND;
/** /**
* If PROTO is enforced, we cannot communicate with destinations earlier than version 0.7.1. * If PROTO is enforced, we cannot communicate with destinations earlier than version 0.7.1.
* @since 0.9.1 * @since 0.9.1
@@ -302,6 +306,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
_maxTotalConnsPerDay = opts.getMaxTotalConnsPerDay(); _maxTotalConnsPerDay = opts.getMaxTotalConnsPerDay();
} }
/** called by super's constructor */
@Override @Override
protected void init(Properties opts) { protected void init(Properties opts) {
super.init(opts); super.init(opts);
@@ -318,8 +323,8 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
setMaxResends(getInt(opts, PROP_MAX_RESENDS, DEFAULT_MAX_SENDS)); setMaxResends(getInt(opts, PROP_MAX_RESENDS, DEFAULT_MAX_SENDS));
// handled in super() // handled in super()
//setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1)); //setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1));
setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 90*1000)); setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, DEFAULT_INACTIVITY_TIMEOUT));
setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, INACTIVITY_ACTION_SEND)); setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, DEFAULT_INACTIVITY_ACTION));
setInboundBufferSize(getMaxMessageSize() * (Connection.MAX_WINDOW_SIZE + 2)); setInboundBufferSize(getMaxMessageSize() * (Connection.MAX_WINDOW_SIZE + 2));
setCongestionAvoidanceGrowthRateFactor(getInt(opts, PROP_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR, 1)); setCongestionAvoidanceGrowthRateFactor(getInt(opts, PROP_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR, 1));
setSlowStartGrowthRateFactor(getInt(opts, PROP_SLOW_START_GROWTH_RATE_FACTOR, 1)); setSlowStartGrowthRateFactor(getInt(opts, PROP_SLOW_START_GROWTH_RATE_FACTOR, 1));
@@ -367,9 +372,9 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
//if (opts.containsKey(PROP_WRITE_TIMEOUT)) //if (opts.containsKey(PROP_WRITE_TIMEOUT))
// setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1)); // setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1));
if (opts.containsKey(PROP_INACTIVITY_TIMEOUT)) if (opts.containsKey(PROP_INACTIVITY_TIMEOUT))
setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 90*1000)); setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, DEFAULT_INACTIVITY_TIMEOUT));
if (opts.containsKey(PROP_INACTIVITY_ACTION)) if (opts.containsKey(PROP_INACTIVITY_ACTION))
setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, INACTIVITY_ACTION_SEND)); setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, DEFAULT_INACTIVITY_ACTION));
setInboundBufferSize(getMaxMessageSize() * (Connection.MAX_WINDOW_SIZE + 2)); setInboundBufferSize(getMaxMessageSize() * (Connection.MAX_WINDOW_SIZE + 2));
if (opts.contains(PROP_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR)) if (opts.contains(PROP_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR))
setCongestionAvoidanceGrowthRateFactor(getInt(opts, PROP_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR, 2)); setCongestionAvoidanceGrowthRateFactor(getInt(opts, PROP_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR, 2));

View File

@@ -12,7 +12,13 @@ import net.i2p.util.SimpleTimer;
/** /**
* Receive a packet for a particular connection - placing the data onto the * Receive a packet for a particular connection - placing the data onto the
* queue, marking packets as acked, updating various fields, etc. * queue, marking packets as acked, updating various fields, etc.
* *<p>
* I2PSession -> MessageHandler -> PacketHandler -> ConnectionPacketHandler -> MessageInputStream
*<p>
* One of these is instantiated per-Destination
* (i.e. per-ConnectionManager, not per-Connection).
* It doesn't store any state.
*/ */
class ConnectionPacketHandler { class ConnectionPacketHandler {
private final I2PAppContext _context; private final I2PAppContext _context;
@@ -94,19 +100,24 @@ class ConnectionPacketHandler {
} }
} }
long ready = con.getInputStream().getHighestReadyBockId(); if (packet.getPayloadSize() > 0) {
int available = con.getOptions().getInboundBufferSize() - con.getInputStream().getTotalReadySize(); // Here, for the purposes of calculating whether the input stream is full,
int allowedBlocks = available/con.getOptions().getMaxMessageSize(); // we assume all the not-ready blocks are the max message size.
if ( (packet.getPayloadSize() > 0) && (packet.getSequenceNum() > ready + allowedBlocks) ) { // This prevents us from getting DoSed by accepting unlimited out-of-order small messages
if (_log.shouldLog(Log.WARN)) long ready = con.getInputStream().getHighestReadyBockId();
_log.warn("Inbound buffer exceeded on connection " + con + " (" int available = con.getOptions().getInboundBufferSize() - con.getInputStream().getTotalReadySize();
+ ready + "/"+ (ready+allowedBlocks) + "/" + available int allowedBlocks = available/con.getOptions().getMaxMessageSize();
+ ": dropping " + packet); if (packet.getSequenceNum() > ready + allowedBlocks) {
ack(con, packet.getAckThrough(), packet.getNacks(), null, false, choke); if (_log.shouldLog(Log.WARN))
con.getOptions().setChoke(61*1000); _log.warn("Inbound buffer exceeded on connection " + con + " ("
packet.releasePayload(); + ready + "/"+ (ready+allowedBlocks) + "/" + available
con.ackImmediately(); + ": dropping " + packet);
return; ack(con, packet.getAckThrough(), packet.getNacks(), null, false, choke);
con.getOptions().setChoke(61*1000);
packet.releasePayload();
con.ackImmediately();
return;
}
} }
con.getOptions().setChoke(0); con.getOptions().setChoke(0);
@@ -513,12 +524,14 @@ class ConnectionPacketHandler {
} }
private class AckDup implements SimpleTimer.TimedEvent { private class AckDup implements SimpleTimer.TimedEvent {
private long _created; private final long _created;
private Connection _con; private final Connection _con;
public AckDup(Connection con) { public AckDup(Connection con) {
_created = _context.clock().now(); _created = _context.clock().now();
_con = con; _con = con;
} }
public void timeReached() { public void timeReached() {
if (_con.getLastSendTime() <= _created) { if (_con.getLastSendTime() <= _created) {
if (_con.getResetReceived() || _con.getResetSent()) { if (_con.getResetReceived() || _con.getResetSent()) {

View File

@@ -13,7 +13,8 @@ import net.i2p.util.Log;
/** /**
* Receive raw information from the I2PSession and turn it into * Receive raw information from the I2PSession and turn it into
* Packets, if we can. * Packets, if we can.
* *<p>
* I2PSession -> MessageHandler -> PacketHandler -> ConnectionPacketHandler -> MessageInputStream
*/ */
class MessageHandler implements I2PSessionMuxedListener { class MessageHandler implements I2PSessionMuxedListener {
private final ConnectionManager _manager; private final ConnectionManager _manager;

View File

@@ -16,6 +16,11 @@ import net.i2p.util.Log;
/** /**
* Stream that can be given messages out of order * Stream that can be given messages out of order
* yet present them in order. * yet present them in order.
*<p>
* I2PSession -> MessageHandler -> PacketHandler -> ConnectionPacketHandler -> MessageInputStream
*<p>
* This buffers unlimited data via messageReceived() -
* limiting / blocking is done in ConnectionPacketHandler.receivePacket().
* *
*/ */
class MessageInputStream extends InputStream { class MessageInputStream extends InputStream {
@@ -113,6 +118,9 @@ class MessageInputStream extends InputStream {
} }
} }
/**
* Adds the ack-through and nack fields to a packet we are building for transmission
*/
public void updateAcks(PacketLocal packet) { public void updateAcks(PacketLocal packet) {
synchronized (_dataLock) { synchronized (_dataLock) {
packet.setAckThrough(_highestBlockId); packet.setAckThrough(_highestBlockId);
@@ -126,6 +134,7 @@ class MessageInputStream extends InputStream {
* *
* @return block IDs greater than the highest ready block ID, or null if there aren't any. * @return block IDs greater than the highest ready block ID, or null if there aren't any.
*/ */
/***
public long[] getOutOfOrderBlocks() { public long[] getOutOfOrderBlocks() {
long blocks[] = null; long blocks[] = null;
synchronized (_dataLock) { synchronized (_dataLock) {
@@ -140,15 +149,18 @@ class MessageInputStream extends InputStream {
Arrays.sort(blocks); Arrays.sort(blocks);
return blocks; return blocks;
} }
***/
/** how many blocks have we received that we still have holes before? /** how many blocks have we received that we still have holes before?
* @return Count of blocks received that still have holes * @return Count of blocks received that still have holes
*/ */
/***
public int getOutOfOrderBlockCount() { public int getOutOfOrderBlockCount() {
synchronized (_dataLock) { synchronized (_dataLock) {
return _notYetReadyBlocks.size(); return _notYetReadyBlocks.size();
} }
} }
***/
/** /**
* how long a read() call should block (if less than 0, block indefinitely, * how long a read() call should block (if less than 0, block indefinitely,
@@ -205,9 +217,9 @@ class MessageInputStream extends InputStream {
* @return true if this is a new packet, false if it is a dup * @return true if this is a new packet, false if it is a dup
*/ */
public boolean messageReceived(long messageId, ByteArray payload) { public boolean messageReceived(long messageId, ByteArray payload) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("received " + messageId + " with " + (payload != null ? payload.getValid()+"" : "no payload"));
synchronized (_dataLock) { synchronized (_dataLock) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("received " + messageId + " with " + (payload != null ? payload.getValid()+"" : "no payload"));
if (messageId <= _highestReadyBlockId) { if (messageId <= _highestReadyBlockId) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("ignoring dup message " + messageId); _log.debug("ignoring dup message " + messageId);
@@ -237,7 +249,6 @@ class MessageInputStream extends InputStream {
cur++; cur++;
_highestReadyBlockId++; _highestReadyBlockId++;
} }
_dataLock.notifyAll();
} else { } else {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("message is out of order: " + messageId); _log.debug("message is out of order: " + messageId);
@@ -245,8 +256,8 @@ class MessageInputStream extends InputStream {
_notYetReadyBlocks.put(Long.valueOf(messageId), new ByteArray(null)); _notYetReadyBlocks.put(Long.valueOf(messageId), new ByteArray(null));
else else
_notYetReadyBlocks.put(Long.valueOf(messageId), payload); _notYetReadyBlocks.put(Long.valueOf(messageId), payload);
_dataLock.notifyAll();
} }
_dataLock.notifyAll();
} }
return true; return true;
} }
@@ -278,7 +289,7 @@ class MessageInputStream extends InputStream {
while (_readyDataBlocks.isEmpty()) { while (_readyDataBlocks.isEmpty()) {
if (_locallyClosed) if (_locallyClosed)
throw new IOException("Already closed, you wanker"); throw new IOException("Already closed");
if ( (_notYetReadyBlocks.isEmpty()) && (_closeReceived) ) { if ( (_notYetReadyBlocks.isEmpty()) && (_closeReceived) ) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
@@ -360,7 +371,7 @@ class MessageInputStream extends InputStream {
@Override @Override
public int available() throws IOException { public int available() throws IOException {
if (_locallyClosed) throw new IOException("Already closed, you wanker"); if (_locallyClosed) throw new IOException("Already closed");
throwAnyError(); throwAnyError();
int numBytes = 0; int numBytes = 0;
synchronized (_dataLock) { synchronized (_dataLock) {
@@ -384,6 +395,7 @@ class MessageInputStream extends InputStream {
* *
* @return Count of bytes waiting to be read * @return Count of bytes waiting to be read
*/ */
/***
public int getTotalQueuedSize() { public int getTotalQueuedSize() {
synchronized (_dataLock) { synchronized (_dataLock) {
if (_locallyClosed) return 0; if (_locallyClosed) return 0;
@@ -401,7 +413,11 @@ class MessageInputStream extends InputStream {
return numBytes; return numBytes;
} }
} }
***/
/**
* Same as available() but doesn't throw IOE
*/
public int getTotalReadySize() { public int getTotalReadySize() {
synchronized (_dataLock) { synchronized (_dataLock) {
if (_locallyClosed) return 0; if (_locallyClosed) return 0;

View File

@@ -14,6 +14,8 @@ import net.i2p.util.SimpleTimer2;
* A stream that we can shove data into that fires off those bytes * A stream that we can shove data into that fires off those bytes
* on flush or when the buffer is full. It also blocks according * on flush or when the buffer is full. It also blocks according
* to the data receiver's needs. * to the data receiver's needs.
*<p>
* MessageOutputStream -> ConnectionDataReceiver -> Connection -> PacketQueue -> I2PSession
*/ */
class MessageOutputStream extends OutputStream { class MessageOutputStream extends OutputStream {
private final I2PAppContext _context; private final I2PAppContext _context;
@@ -21,17 +23,17 @@ class MessageOutputStream extends OutputStream {
private byte _buf[]; private byte _buf[];
private int _valid; private int _valid;
private final Object _dataLock; private final Object _dataLock;
private DataReceiver _dataReceiver; private final DataReceiver _dataReceiver;
private IOException _streamError; private IOException _streamError;
private boolean _closed; private volatile boolean _closed;
private long _written; private long _written;
private int _writeTimeout; private int _writeTimeout;
private ByteCache _dataCache; private ByteCache _dataCache;
private final Flusher _flusher; private final Flusher _flusher;
private long _lastFlushed; private long _lastFlushed;
private long _lastBuffered; private volatile long _lastBuffered;
/** if we enqueue data but don't flush it in this period, flush it passively */ /** if we enqueue data but don't flush it in this period, flush it passively */
private int _passiveFlushDelay; private final int _passiveFlushDelay;
/** /**
* if we are changing the buffer size during operation, set this to the new * if we are changing the buffer size during operation, set this to the new
* buffer size, and next time we are flushing, update the _buf array to the new * buffer size, and next time we are flushing, update the _buf array to the new
@@ -39,9 +41,9 @@ class MessageOutputStream extends OutputStream {
*/ */
private volatile int _nextBufferSize; private volatile int _nextBufferSize;
// rate calc helpers // rate calc helpers
private long _sendPeriodBeginTime; //private long _sendPeriodBeginTime;
private long _sendPeriodBytes; //private long _sendPeriodBytes;
private int _sendBps; //private int _sendBps;
/** /**
* Since this is less than i2ptunnel's i2p.streaming.connectDelay default of 1000, * Since this is less than i2ptunnel's i2p.streaming.connectDelay default of 1000,
@@ -73,11 +75,11 @@ class MessageOutputStream extends OutputStream {
_writeTimeout = -1; _writeTimeout = -1;
_passiveFlushDelay = passiveFlushDelay; _passiveFlushDelay = passiveFlushDelay;
_nextBufferSize = -1; _nextBufferSize = -1;
_sendPeriodBeginTime = ctx.clock().now(); //_sendPeriodBeginTime = ctx.clock().now();
_context.statManager().createRateStat("stream.sendBps", "How fast we pump data through the stream", "Stream", new long[] { 60*1000, 5*60*1000, 60*60*1000 }); //_context.statManager().createRateStat("stream.sendBps", "How fast we pump data through the stream", "Stream", new long[] { 60*1000, 5*60*1000, 60*60*1000 });
_flusher = new Flusher(timer); _flusher = new Flusher(timer);
if (_log.shouldLog(Log.DEBUG)) //if (_log.shouldLog(Log.DEBUG))
_log.debug("MessageOutputStream created"); // _log.debug("MessageOutputStream created");
} }
public void setWriteTimeout(int ms) { public void setWriteTimeout(int ms) {
@@ -131,15 +133,9 @@ class MessageOutputStream extends OutputStream {
remaining -= toWrite; remaining -= toWrite;
cur += toWrite; cur += toWrite;
_valid = _buf.length; _valid = _buf.length;
// avoid NPE from race with destroy()
DataReceiver rcvr = _dataReceiver;
if (rcvr == null) {
throwAnyError();
return;
}
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("write() direct valid = " + _valid); _log.info("write() direct valid = " + _valid);
ws = rcvr.writeData(_buf, 0, _valid); ws = _dataReceiver.writeData(_buf, 0, _valid);
_written += _valid; _written += _valid;
_valid = 0; _valid = 0;
throwAnyError(); throwAnyError();
@@ -167,17 +163,18 @@ class MessageOutputStream extends OutputStream {
_log.info("After waitForAccept of " + ws); _log.info("After waitForAccept of " + ws);
} }
} else { } else {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.DEBUG))
_log.info("Queued " + len + " without sending to the receiver"); _log.debug("Queued " + len + " without sending to the receiver");
} }
} }
long elapsed = _context.clock().now() - begin; long elapsed = _context.clock().now() - begin;
if ( (elapsed > 10*1000) && (_log.shouldLog(Log.INFO)) ) if ( (elapsed > 10*1000) && (_log.shouldLog(Log.INFO)) )
_log.info("wtf, took " + elapsed + "ms to write to the stream?", new Exception("foo")); _log.info("wtf, took " + elapsed + "ms to write to the stream?", new Exception("foo"));
throwAnyError(); throwAnyError();
updateBps(len); //updateBps(len);
} }
/****
private void updateBps(int len) { private void updateBps(int len) {
long now = _context.clock().now(); long now = _context.clock().now();
int periods = (int)Math.floor((now - _sendPeriodBeginTime) / 1000d); int periods = (int)Math.floor((now - _sendPeriodBeginTime) / 1000d);
@@ -191,7 +188,9 @@ class MessageOutputStream extends OutputStream {
_sendPeriodBytes += len; _sendPeriodBytes += len;
} }
} }
****/
/** */
public void write(int b) throws IOException { public void write(int b) throws IOException {
write(new byte[] { (byte)b }, 0, 1); write(new byte[] { (byte)b }, 0, 1);
throwAnyError(); throwAnyError();
@@ -240,14 +239,15 @@ class MessageOutputStream extends OutputStream {
_enqueued = true; _enqueued = true;
} }
public void timeReached() { public void timeReached() {
if (_closed)
return;
_enqueued = false; _enqueued = false;
DataReceiver rec = _dataReceiver;
long timeLeft = (_lastBuffered + _passiveFlushDelay - _context.clock().now()); long timeLeft = (_lastBuffered + _passiveFlushDelay - _context.clock().now());
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("flusher time reached: left = " + timeLeft); _log.debug("flusher time reached: left = " + timeLeft);
if (timeLeft > 0) if (timeLeft > 0)
enqueue(); enqueue();
else if ( (rec != null) && (rec.writeInProcess()) ) else if (_dataReceiver.writeInProcess())
enqueue(); // don't passive flush if there is a write being done (unacked outbound) enqueue(); // don't passive flush if there is a write being done (unacked outbound)
else else
doFlush(); doFlush();
@@ -261,10 +261,8 @@ class MessageOutputStream extends OutputStream {
if ( (_valid > 0) && (flushTime <= _context.clock().now()) ) { if ( (_valid > 0) && (flushTime <= _context.clock().now()) ) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("doFlush() valid = " + _valid); _log.info("doFlush() valid = " + _valid);
// avoid NPE from race with destroy() if (_buf != null) {
DataReceiver rcvr = _dataReceiver; ws = _dataReceiver.writeData(_buf, 0, _valid);
if ( (_buf != null) && (rcvr != null) ) {
ws = rcvr.writeData(_buf, 0, _valid);
_written += _valid; _written += _valid;
_valid = 0; _valid = 0;
_lastFlushed = _context.clock().now(); _lastFlushed = _context.clock().now();
@@ -317,25 +315,18 @@ class MessageOutputStream extends OutputStream {
if (_log.shouldLog(Log.INFO) && _valid > 0) if (_log.shouldLog(Log.INFO) && _valid > 0)
_log.info("flush() valid = " + _valid); _log.info("flush() valid = " + _valid);
// avoid NPE from race with destroy()
DataReceiver rcvr = _dataReceiver;
synchronized (_dataLock) { synchronized (_dataLock) {
if (_buf == null) { if (_buf == null) {
_dataLock.notifyAll(); _dataLock.notifyAll();
throw new IOException("closed (buffer went away)"); throw new IOException("closed (buffer went away)");
} }
if (rcvr == null) {
_dataLock.notifyAll();
throwAnyError();
return;
}
// if valid == 0 return ??? - no, this could flush a CLOSE packet too. // if valid == 0 return ??? - no, this could flush a CLOSE packet too.
// Yes, flush here, inside the data lock, and do all the waitForCompletion() stuff below // Yes, flush here, inside the data lock, and do all the waitForCompletion() stuff below
// (disabled) // (disabled)
if (!wait_for_accept_only) { if (!wait_for_accept_only) {
ws = rcvr.writeData(_buf, 0, _valid); ws = _dataReceiver.writeData(_buf, 0, _valid);
_written += _valid; _written += _valid;
_valid = 0; _valid = 0;
locked_updateBufferSize(); locked_updateBufferSize();
@@ -347,7 +338,7 @@ class MessageOutputStream extends OutputStream {
// Skip all the waitForCompletion() stuff below, which is insanity, as of 0.8.1 // Skip all the waitForCompletion() stuff below, which is insanity, as of 0.8.1
// must do this outside the data lock // must do this outside the data lock
if (wait_for_accept_only) { if (wait_for_accept_only) {
flushAvailable(rcvr, true); flushAvailable(_dataReceiver, true);
return; return;
} }
@@ -387,6 +378,7 @@ class MessageOutputStream extends OutputStream {
} }
// setting _closed before flush() will force flush() to send a CLOSE packet // setting _closed before flush() will force flush() to send a CLOSE packet
_closed = true; _closed = true;
_flusher.cancel();
// In 0.8.1 we rewrote flush() to only wait for accept into the window, // In 0.8.1 we rewrote flush() to only wait for accept into the window,
// not "completion" (i.e. ack from the far end). // not "completion" (i.e. ack from the far end).
@@ -415,10 +407,11 @@ class MessageOutputStream extends OutputStream {
/** /**
* nonblocking close - * nonblocking close -
* Use outside of this package is deprecated, should be made package local * Only for use inside package
*/ */
public void closeInternal() { public void closeInternal() {
_closed = true; _closed = true;
_flusher.cancel();
if (_streamError == null) if (_streamError == null)
_streamError = new IOException("Closed internally"); _streamError = new IOException("Closed internally");
clearData(true); clearData(true);
@@ -429,12 +422,10 @@ class MessageOutputStream extends OutputStream {
if (_log.shouldLog(Log.INFO) && _valid > 0) if (_log.shouldLog(Log.INFO) && _valid > 0)
_log.info("clearData() valid = " + _valid); _log.info("clearData() valid = " + _valid);
// avoid NPE from race with destroy()
DataReceiver rcvr = _dataReceiver;
synchronized (_dataLock) { synchronized (_dataLock) {
// flush any data, but don't wait for it // flush any data, but don't wait for it
if ( (rcvr != null) && (_valid > 0) && shouldFlush) if (_valid > 0 && shouldFlush)
rcvr.writeData(_buf, 0, _valid); _dataReceiver.writeData(_buf, 0, _valid);
_written += _valid; _written += _valid;
_valid = 0; _valid = 0;
@@ -503,15 +494,15 @@ class MessageOutputStream extends OutputStream {
throw new InterruptedIOException("Flush available timed out (" + _writeTimeout + "ms)"); throw new InterruptedIOException("Flush available timed out (" + _writeTimeout + "ms)");
} }
long afterAccept = System.currentTimeMillis(); long afterAccept = System.currentTimeMillis();
if ( (afterAccept - afterBuild > 1000) && (_log.shouldLog(Log.DEBUG)) ) if ( (afterAccept - afterBuild > 1000) && (_log.shouldLog(Log.INFO)) )
_log.debug("Took " + (afterAccept-afterBuild) + "ms to accept a packet? " + ws); _log.info("Took " + (afterAccept-afterBuild) + "ms to accept a packet? " + ws);
return; return;
} }
void destroy() { void destroy() {
_dataReceiver = null; _closed = true;
_flusher.cancel();
synchronized (_dataLock) { synchronized (_dataLock) {
_closed = true;
_dataLock.notifyAll(); _dataLock.notifyAll();
} }
} }

View File

@@ -395,6 +395,7 @@ class Packet {
DataHelper.toLong(buffer, cur, 4, _ackThrough > 0 ? _ackThrough : 0); DataHelper.toLong(buffer, cur, 4, _ackThrough > 0 ? _ackThrough : 0);
cur += 4; cur += 4;
if (_nacks != null) { if (_nacks != null) {
// if max win is ever > 255, limit to 255
DataHelper.toLong(buffer, cur, 1, _nacks.length); DataHelper.toLong(buffer, cur, 1, _nacks.length);
cur++; cur++;
for (int i = 0; i < _nacks.length; i++) { for (int i = 0; i < _nacks.length; i++) {
@@ -461,7 +462,7 @@ class Packet {
* @return How large the current packet would be * @return How large the current packet would be
* @throws IllegalStateException * @throws IllegalStateException
*/ */
public int writtenSize() throws IllegalStateException { private int writtenSize() {
int size = 0; int size = 0;
size += 4; // _sendStreamId.length; size += 4; // _sendStreamId.length;
size += 4; // _receiveStreamId.length; size += 4; // _receiveStreamId.length;
@@ -469,6 +470,7 @@ class Packet {
size += 4; // ackThrough size += 4; // ackThrough
if (_nacks != null) { if (_nacks != null) {
size++; // nacks length size++; // nacks length
// if max win is ever > 255, limit to 255
size += 4 * _nacks.length; size += 4 * _nacks.length;
} else { } else {
size++; // nacks length size++; // nacks length
@@ -671,10 +673,11 @@ class Packet {
buf.append(toId(_sendStreamId)); buf.append(toId(_sendStreamId));
//buf.append("<-->"); //buf.append("<-->");
buf.append(toId(_receiveStreamId)).append(": #").append(_sequenceNum); buf.append(toId(_receiveStreamId)).append(": #").append(_sequenceNum);
if (_sequenceNum < 10) //if (_sequenceNum < 10)
buf.append(" \t"); // so the tab lines up right // buf.append(" \t"); // so the tab lines up right
else //else
buf.append('\t'); // buf.append('\t');
buf.append(' ');
buf.append(toFlagString()); buf.append(toFlagString());
buf.append(" ACK ").append(getAckThrough()); buf.append(" ACK ").append(getAckThrough());
if (_nacks != null) { if (_nacks != null) {

View File

@@ -2,7 +2,6 @@ package net.i2p.client.streaming;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.Date; import java.util.Date;
import java.util.Iterator;
import java.util.Set; import java.util.Set;
import net.i2p.I2PAppContext; import net.i2p.I2PAppContext;
@@ -13,7 +12,8 @@ import net.i2p.util.Log;
/** /**
* receive a packet and dispatch it correctly to the connection specified, * receive a packet and dispatch it correctly to the connection specified,
* the server socket, or queue a reply RST packet. * the server socket, or queue a reply RST packet.
* *<p>
* I2PSession -> MessageHandler -> PacketHandler -> ConnectionPacketHandler -> MessageInputStream
*/ */
class PacketHandler { class PacketHandler {
private final ConnectionManager _manager; private final ConnectionManager _manager;
@@ -86,6 +86,7 @@ class PacketHandler {
} }
*****/ *****/
/** */
void receivePacket(Packet packet) { void receivePacket(Packet packet) {
//boolean ok = choke(packet); //boolean ok = choke(packet);
//if (ok) //if (ok)
@@ -202,15 +203,13 @@ class PacketHandler {
// someone is sending us a packet on the wrong stream // someone is sending us a packet on the wrong stream
// It isn't a SYN so it isn't likely to have a FROM to send a reset back to // It isn't a SYN so it isn't likely to have a FROM to send a reset back to
if (_log.shouldLog(Log.ERROR)) { if (_log.shouldLog(Log.ERROR)) {
Set cons = _manager.listConnections();
StringBuilder buf = new StringBuilder(512); StringBuilder buf = new StringBuilder(512);
buf.append("Received a packet on the wrong stream: "); buf.append("Received a packet on the wrong stream: ");
buf.append(packet); buf.append(packet);
buf.append("\nthis connection:\n"); buf.append("\nthis connection:\n");
buf.append(con); buf.append(con);
buf.append("\nall connections:"); buf.append("\nall connections:");
for (Iterator iter = cons.iterator(); iter.hasNext();) { for (Connection cur : _manager.listConnections()) {
Connection cur = (Connection)iter.next();
buf.append('\n').append(cur); buf.append('\n').append(cur);
} }
_log.error(buf.toString(), new Exception("Wrong stream")); _log.error(buf.toString(), new Exception("Wrong stream"));
@@ -299,9 +298,7 @@ class PacketHandler {
} }
if (_log.shouldLog(Log.DEBUG)) { if (_log.shouldLog(Log.DEBUG)) {
StringBuilder buf = new StringBuilder(128); StringBuilder buf = new StringBuilder(128);
Set cons = _manager.listConnections(); for (Connection con : _manager.listConnections()) {
for (Iterator iter = cons.iterator(); iter.hasNext(); ) {
Connection con = (Connection)iter.next();
buf.append(con.toString()).append(" "); buf.append(con.toString()).append(" ");
} }
_log.debug("connections: " + buf.toString() + " sendId: " _log.debug("connections: " + buf.toString() + " sendId: "

View File

@@ -16,11 +16,11 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
private final I2PAppContext _context; private final I2PAppContext _context;
private final Log _log; private final Log _log;
private final Connection _connection; private final Connection _connection;
private Destination _to; private final Destination _to;
private SessionKey _keyUsed; private SessionKey _keyUsed;
private Set _tagsSent; private Set _tagsSent;
private final long _createdOn; private final long _createdOn;
private int _numSends; private volatile int _numSends;
private long _lastSend; private long _lastSend;
private long _acceptedOn; private long _acceptedOn;
private long _ackOn; private long _ackOn;
@@ -45,7 +45,6 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
} }
public Destination getTo() { return _to; } public Destination getTo() { return _to; }
public void setTo(Destination to) { _to = to; }
/** /**
* @deprecated should always return null * @deprecated should always return null
@@ -72,6 +71,7 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
public void setTagsSent(Set tags) { public void setTagsSent(Set tags) {
if (tags != null && !tags.isEmpty()) if (tags != null && !tags.isEmpty())
_log.error("Who is sending tags thru the streaming lib? " + tags.size()); _log.error("Who is sending tags thru the streaming lib? " + tags.size());
/****
if ( (_tagsSent != null) && (!_tagsSent.isEmpty()) && (!tags.isEmpty()) ) { if ( (_tagsSent != null) && (!_tagsSent.isEmpty()) && (!tags.isEmpty()) ) {
//int old = _tagsSent.size(); //int old = _tagsSent.size();
//_tagsSent.addAll(tags); //_tagsSent.addAll(tags);
@@ -80,6 +80,7 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
} else { } else {
_tagsSent = tags; _tagsSent = tags;
} }
****/
} }
public boolean shouldSign() { public boolean shouldSign() {
@@ -142,10 +143,15 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
/** @return null if not bound */ /** @return null if not bound */
public Connection getConnection() { return _connection; } public Connection getConnection() { return _connection; }
/**
* Will force a fast restransmit on the 3rd call (FAST_RETRANSMIT_THRESHOLD)
* but only if it's the lowest unacked (see Connection.ResendPacketEvent)
*/
public void incrementNACKs() { public void incrementNACKs() {
int cnt = ++_nackCount; int cnt = ++_nackCount;
SimpleTimer2.TimedEvent evt = _resendEvent; SimpleTimer2.TimedEvent evt = _resendEvent;
if ( (cnt >= Connection.FAST_RETRANSMIT_THRESHOLD) && (evt != null) && (!_retransmitted)) { if (cnt >= Connection.FAST_RETRANSMIT_THRESHOLD && evt != null && (!_retransmitted) &&
(_numSends == 1 || _lastSend < _context.clock().now() + 4*1000)) { // Don't fast retx if we recently resent it
_retransmitted = true; _retransmitted = true;
evt.reschedule(0); evt.reschedule(0);
} }
@@ -162,8 +168,11 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
if (con != null) if (con != null)
buf.append(" rtt ").append(con.getOptions().getRTT()); buf.append(" rtt ").append(con.getOptions().getRTT());
if ( (_tagsSent != null) && (!_tagsSent.isEmpty()) ) //if ( (_tagsSent != null) && (!_tagsSent.isEmpty()) )
buf.append(" with tags"); // buf.append(" with tags");
if (_nackCount > 0)
buf.append(" nacked ").append(_nackCount).append(" times");
if (_ackOn > 0) if (_ackOn > 0)
buf.append(" ack after ").append(getAckTime()); buf.append(" ack after ").append(getAckTime());
@@ -200,8 +209,6 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
* @param maxWaitMs MessageOutputStream is the only caller, generally with -1 * @param maxWaitMs MessageOutputStream is the only caller, generally with -1
*/ */
public void waitForAccept(int maxWaitMs) { public void waitForAccept(int maxWaitMs) {
if (_connection == null)
throw new IllegalStateException("Cannot wait for accept with no connection");
long before = _context.clock().now(); long before = _context.clock().now();
int queued = _connection.getUnackedPacketsSent(); int queued = _connection.getUnackedPacketsSent();
int window = _connection.getOptions().getWindowSize(); int window = _connection.getOptions().getWindowSize();
@@ -216,7 +223,7 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
int afterQueued = _connection.getUnackedPacketsSent(); int afterQueued = _connection.getUnackedPacketsSent();
if ( (after - before > 1000) && (_log.shouldLog(Log.DEBUG)) ) if ( (after - before > 1000) && (_log.shouldLog(Log.DEBUG)) )
_log.debug("Took " + (after-before) + "ms to get " _log.debug("Took " + (after-before) + "ms to get "
+ (accepted ? " accepted" : " rejected") + (accepted ? "accepted" : "rejected")
+ (_cancelledOn > 0 ? " and CANCELLED" : "") + (_cancelledOn > 0 ? " and CANCELLED" : "")
+ ", queued behind " + queued +" with a window size of " + window + ", queued behind " + queued +" with a window size of " + window
+ ", finally accepted with " + afterQueued + " queued: " + ", finally accepted with " + afterQueued + " queued: "

View File

@@ -12,7 +12,8 @@ import net.i2p.util.Log;
* Well, thats the theory at least... in practice we just * Well, thats the theory at least... in practice we just
* send them immediately with no blocking, since the * send them immediately with no blocking, since the
* mode=bestEffort doesnt block in the SDK. * mode=bestEffort doesnt block in the SDK.
* *<p>
* MessageOutputStream -> ConnectionDataReceiver -> Connection -> PacketQueue -> I2PSession
*/ */
class PacketQueue { class PacketQueue {
private final I2PAppContext _context; private final I2PAppContext _context;
@@ -26,16 +27,17 @@ class PacketQueue {
_session = session; _session = session;
_connectionManager = mgr; _connectionManager = mgr;
_log = context.logManager().getLog(PacketQueue.class); _log = context.logManager().getLog(PacketQueue.class);
_context.statManager().createRateStat("stream.con.sendMessageSize", "Size of a message sent on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); // all createRateStats in ConnectionManager
_context.statManager().createRateStat("stream.con.sendDuplicateSize", "Size of a message resent on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
} }
/** /**
* Add a new packet to be sent out ASAP * Add a new packet to be sent out ASAP
* *
* keys and tags disabled since dropped in I2PSession * keys and tags disabled since dropped in I2PSession
* @return true if sent
*/ */
public void enqueue(PacketLocal packet) { public boolean enqueue(PacketLocal packet) {
// this updates the ack/nack field
packet.prepare(); packet.prepare();
//SessionKey keyUsed = packet.getKeyUsed(); //SessionKey keyUsed = packet.getKeyUsed();
@@ -52,7 +54,7 @@ class PacketQueue {
if (packet.getAckTime() > 0) { if (packet.getAckTime() > 0) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Not resending " + packet); _log.debug("Not resending " + packet);
return; return false;
} else { } else {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending... " + packet); _log.debug("Sending... " + packet);
@@ -76,7 +78,7 @@ class PacketQueue {
_log.warn("took " + writeTime + "ms to write the packet: " + packet); _log.warn("took " + writeTime + "ms to write the packet: " + packet);
// last chance to short circuit... // last chance to short circuit...
if (packet.getAckTime() > 0) return; if (packet.getAckTime() > 0) return false;
// this should not block! // this should not block!
begin = _context.clock().now(); begin = _context.clock().now();
@@ -158,6 +160,7 @@ class PacketQueue {
// reset // reset
packet.releasePayload(); packet.releasePayload();
} }
return sent;
} }
} }

View File

@@ -15,7 +15,7 @@ class SchedulerChooser {
private final Log _log; private final Log _log;
private final TaskScheduler _nullScheduler; private final TaskScheduler _nullScheduler;
/** list of TaskScheduler objects */ /** list of TaskScheduler objects */
private final List _schedulers; private final List<TaskScheduler> _schedulers;
public SchedulerChooser(I2PAppContext context) { public SchedulerChooser(I2PAppContext context) {
_context = context; _context = context;
@@ -26,7 +26,7 @@ class SchedulerChooser {
public TaskScheduler getScheduler(Connection con) { public TaskScheduler getScheduler(Connection con) {
for (int i = 0; i < _schedulers.size(); i++) { for (int i = 0; i < _schedulers.size(); i++) {
TaskScheduler scheduler = (TaskScheduler)_schedulers.get(i); TaskScheduler scheduler = _schedulers.get(i);
if (scheduler.accept(con)) { if (scheduler.accept(con)) {
//if (_log.shouldLog(Log.DEBUG)) //if (_log.shouldLog(Log.DEBUG))
// _log.debug("Scheduling for " + con + " with " + scheduler.getClass().getName()); // _log.debug("Scheduling for " + con + " with " + scheduler.getClass().getName());
@@ -50,11 +50,7 @@ class SchedulerChooser {
} }
private class NullScheduler implements TaskScheduler { private class NullScheduler implements TaskScheduler {
private final Log _log;
public NullScheduler() {
_log = _context.logManager().getLog(NullScheduler.class);
}
public void eventOccurred(Connection con) { public void eventOccurred(Connection con) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Yell at jrandom: Event occurred on " + con, new Exception("source")); _log.warn("Yell at jrandom: Event occurred on " + con, new Exception("source"));

View File

@@ -45,18 +45,25 @@ class SchedulerClosing extends SchedulerImpl {
} }
public void eventOccurred(Connection con) { public void eventOccurred(Connection con) {
if (con.getNextSendTime() <= 0) long nextSend = con.getNextSendTime();
con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay()); long now = _context.clock().now();
long remaining = con.getNextSendTime() - _context.clock().now(); long remaining;
if (nextSend <= 0) {
remaining = con.getOptions().getSendAckDelay();
nextSend = now + remaining;
con.setNextSendTime(nextSend);
} else {
remaining = nextSend - now;
}
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Event occurred w/ remaining: " + remaining + " on " + con); _log.debug("Event occurred w/ remaining: " + remaining + " on " + con);
if (remaining <= 0) { if (remaining <= 0) {
if (con.getCloseSentOn() <= 0) { if (con.getCloseSentOn() <= 0) {
con.sendAvailable(); con.sendAvailable();
con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay());
} else { } else {
con.ackImmediately(); //con.ackImmediately();
} }
con.setNextSendTime(now + con.getOptions().getSendAckDelay());
} else { } else {
//if (remaining < 5*1000) //if (remaining < 5*1000)
// remaining = 5*1000; // remaining = 5*1000;

View File

@@ -19,4 +19,9 @@ abstract class SchedulerImpl implements TaskScheduler {
protected void reschedule(long msToWait, Connection con) { protected void reschedule(long msToWait, Connection con) {
SimpleScheduler.getInstance().addEvent(con.getConnectionEvent(), msToWait); SimpleScheduler.getInstance().addEvent(con.getConnectionEvent(), msToWait);
} }
@Override
public String toString() {
return getClass().getSimpleName();
}
} }

View File

@@ -130,8 +130,8 @@ class TCBShare {
super(timer); super(timer);
} }
public void timeReached() { public void timeReached() {
for (Iterator iter = _cache.keySet().iterator(); iter.hasNext(); ) { for (Iterator<Entry> iter = _cache.values().iterator(); iter.hasNext(); ) {
if (_cache.get(iter.next()).isExpired()) if (iter.next().isExpired())
iter.remove(); iter.remove();
} }
schedule(CLEAN_TIME); schedule(CLEAN_TIME);

View File

@@ -1,3 +1,24 @@
2012-06-29 zzz
* HTTP Proxy: Change the error code for unknown host from 404 to 500
* SimpleTimer: Fix logging
* Streaming:
- Allow at least 3 packets and up to half the window to be active resends
instead of just 1, to reduce stall time after a packet drop
- Increase fast retransmit threshold back to 3 to reduce retransmissions
- Don't fast retransmit if we recently retransmitted it already
- Allow double the window as long as gaps are less than the window
- Don't set the MSS in a resent packet (saves 2 bytes)
- Remove redundant calls to updateAcks()
- Update activity timer when resending a packet
- Reset unackedPacketsReceived counter at all places where acks are sent
so it isn't wrong
- Fix some places where the activeResends count could become wrong
- Prevent storm of CLOSE packets
- Never resend the whole packet in ackImmediately(), just send an ack
- Cancel flusher timer in MessageOutputStream when closed
- Move some createRateStats to ConnectionManager to reduce repeated calls
- Cleanups, javadocs, logging, volatile, finals
2012-06-24 zzz 2012-06-24 zzz
* ElGamalAESEngine: Fix bad size estimate when tags are included, * ElGamalAESEngine: Fix bad size estimate when tags are included,
resulting in trailing zeros after the padding resulting in trailing zeros after the padding

View File

@@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */ /** deprecated */
public final static String ID = "Monotone"; public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION; public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 19; public final static long BUILD = 20;
/** for example "-test" */ /** for example "-test" */
public final static String EXTRA = ""; public final static String EXTRA = "";