* Streaming lib:

- Move ConEvent from SimpleTimer to SimpleScheduler
      - Move RetransmissionTimer (ResendPacketEvent)
        from SimpleTimer to new SimpleTimer2
      - Move ActivityTimer and Flusher from SimpleTimer to RetransmissionTimer
      - SimpleTimer2 allows specifying "fuzz" to reduce
        timer queue churn further
This commit is contained in:
zzz
2009-02-15 05:11:35 +00:00
parent 7b12f700dd
commit cc3165bf72
11 changed files with 322 additions and 44 deletions

View File

@@ -14,6 +14,7 @@ import net.i2p.data.Destination;
import net.i2p.util.Log;
import net.i2p.util.SimpleScheduler;
import net.i2p.util.SimpleTimer;
import net.i2p.util.SimpleTimer2;
/**
* Maintain the state controlling a streaming connection between two
@@ -69,6 +70,7 @@ public class Connection {
/** how many messages have been resent and not yet ACKed? */
private int _activeResends;
private ConEvent _connectionEvent;
private int _randomWait;
private long _lifetimeBytesSent;
private long _lifetimeBytesReceived;
@@ -124,6 +126,7 @@ public class Connection {
_isInbound = false;
_updatedShareOpts = false;
_connectionEvent = new ConEvent();
_randomWait = _context.random().nextInt(10*1000); // just do this once to reduce usage
_context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.chokeSizeBegin", "How many messages were outstanding when we started to choke?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.chokeSizeEnd", "How many messages were outstanding when we stopped being choked?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
@@ -325,7 +328,8 @@ public class Connection {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Resend in " + timeout + " for " + packet, new Exception("Sent by"));
RetransmissionTimer.getInstance().addEvent(new ResendPacketEvent(packet, timeout + _context.clock().now()), timeout);
// schedules itself
ResendPacketEvent rpe = new ResendPacketEvent(packet, timeout);
}
_context.statManager().getStatLog().addData(Packet.toId(_sendStreamId), "stream.rtt", _options.getRTT(), _options.getWindowSize());
@@ -526,7 +530,7 @@ public class Connection {
if (_receiver != null)
_receiver.destroy();
if (_activityTimer != null)
SimpleTimer.getInstance().removeEvent(_activityTimer);
_activityTimer.cancel();
//_activityTimer = null;
if (_inputStream != null)
_inputStream.streamErrorOccurred(new IOException("disconnected!"));
@@ -822,15 +826,18 @@ public class Connection {
return;
}
long howLong = _options.getInactivityTimeout();
howLong += _context.random().nextInt(30*1000); // randomize it a bit, so both sides don't do it at once
howLong += _randomWait; // randomize it a bit, so both sides don't do it at once
if (_log.shouldLog(Log.DEBUG))
_log.debug("Resetting the inactivity timer to " + howLong, new Exception(toString()));
// this will get rescheduled, and rescheduled, and rescheduled...
RetransmissionTimer.getInstance().removeEvent(_activityTimer);
RetransmissionTimer.getInstance().addEvent(_activityTimer, howLong);
_activityTimer.reschedule(howLong, false); // use the later of current and previous timeout
}
private class ActivityTimer implements SimpleTimer.TimedEvent {
private class ActivityTimer extends SimpleTimer2.TimedEvent {
public ActivityTimer() {
super(RetransmissionTimer.getInstance());
setFuzz(5*1000); // sloppy timer, don't reschedule unless at least 5s later
}
public void timeReached() {
// uh, nothing more to do...
if (!_connected) {
@@ -841,7 +848,7 @@ public class Connection {
long left = getTimeLeft();
if (left > 0) {
if (_log.shouldLog(Log.DEBUG)) _log.debug("Inactivity timeout reached, but there is time left (" + left + ")");
RetransmissionTimer.getInstance().addEvent(ActivityTimer.this, left);
schedule(left);
return;
}
// these are either going to time out or cause further rescheduling
@@ -1010,14 +1017,17 @@ public class Connection {
/**
* Coordinate the resends of a given packet
*
*/
public class ResendPacketEvent implements SimpleTimer.TimedEvent {
public class ResendPacketEvent extends SimpleTimer2.TimedEvent {
private PacketLocal _packet;
private long _nextSendTime;
public ResendPacketEvent(PacketLocal packet, long sendTime) {
public ResendPacketEvent(PacketLocal packet, long delay) {
super(RetransmissionTimer.getInstance());
_packet = packet;
_nextSendTime = sendTime;
_nextSendTime = delay + _context.clock().now();
packet.setResendPacketEvent(ResendPacketEvent.this);
schedule(delay);
}
public long getNextSendTime() { return _nextSendTime; }
@@ -1025,6 +1035,10 @@ public class Connection {
/**
* Retransmit the packet if we need to.
*
* ackImmediately() above calls directly in here, so
* we have to use forceReschedule() instead of schedule() below,
* to prevent duplicates in the timer queue.
*
* @param penalize true if this retransmission is caused by a timeout, false if we
* are just sending this packet instead of an ACK
* @return true if the packet was sent, false if it was not
@@ -1057,7 +1071,7 @@ public class Connection {
if (_log.shouldLog(Log.INFO))
_log.info("Delaying resend of " + _packet + " as there are "
+ _activeResends + " active resends already in play");
RetransmissionTimer.getInstance().addEvent(ResendPacketEvent.this, 1000);
forceReschedule(1000);
_nextSendTime = 1000 + _context.clock().now();
return false;
}
@@ -1144,7 +1158,7 @@ public class Connection {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Scheduling resend in " + timeout + "ms for " + _packet);
RetransmissionTimer.getInstance().addEvent(ResendPacketEvent.this, timeout);
forceReschedule(timeout);
}
// acked during resending (... or somethin')

View File

@@ -8,7 +8,7 @@ import net.i2p.I2PAppContext;
import net.i2p.data.ByteArray;
import net.i2p.util.ByteCache;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer;
import net.i2p.util.SimpleTimer2;
/**
* A stream that we can shove data into that fires off those bytes
@@ -200,13 +200,20 @@ public class MessageOutputStream extends OutputStream {
* Flush data that has been enqued but not flushed after a certain
* period of inactivity
*/
private class Flusher implements SimpleTimer.TimedEvent {
private class Flusher extends SimpleTimer2.TimedEvent {
private boolean _enqueued;
public Flusher() {
super(RetransmissionTimer.getInstance());
}
public void enqueue() {
// no need to be overly worried about duplicates - it would just
// push it further out
if (!_enqueued) {
RetransmissionTimer.getInstance().addEvent(_flusher, _passiveFlushDelay);
// Maybe we could just use schedule() here - or even SimpleScheduler - not sure...
// To be safe, use forceReschedule() so we don't get lots of duplicates
// We've seen the queue blow up before, maybe it was this before the rewrite...
// So perhaps it IS wise to be "overly worried" ...
forceReschedule(_passiveFlushDelay);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Enqueueing the flusher for " + _passiveFlushDelay + "ms out");
} else {

View File

@@ -6,7 +6,7 @@ import net.i2p.I2PAppContext;
import net.i2p.data.Destination;
import net.i2p.data.SessionKey;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer;
import net.i2p.util.SimpleTimer2;
/**
* coordinate local attributes about a packet - send time, ack time, number of
@@ -27,7 +27,7 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat
private long _cancelledOn;
private volatile int _nackCount;
private volatile boolean _retransmitted;
private SimpleTimer.TimedEvent _resendEvent;
private SimpleTimer2.TimedEvent _resendEvent;
public PacketLocal(I2PAppContext ctx, Destination to) {
this(ctx, to, null);
@@ -93,7 +93,7 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat
releasePayload();
notifyAll();
}
SimpleTimer.getInstance().removeEvent(_resendEvent);
_resendEvent.cancel();
}
public void cancelled() {
synchronized (this) {
@@ -101,11 +101,11 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat
releasePayload();
notifyAll();
}
SimpleTimer.getInstance().removeEvent(_resendEvent);
_resendEvent.cancel();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Cancelled! " + toString(), new Exception("cancelled"));
}
public SimpleTimer.TimedEvent getResendEvent() { return _resendEvent; }
public SimpleTimer2.TimedEvent getResendEvent() { return _resendEvent; }
/** how long after packet creation was it acked?
* @return how long after packet creation the packet was ACKed in ms
@@ -122,15 +122,15 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat
public void incrementNACKs() {
int cnt = ++_nackCount;
SimpleTimer.TimedEvent evt = _resendEvent;
SimpleTimer2.TimedEvent evt = _resendEvent;
if ( (cnt >= Connection.FAST_RETRANSMIT_THRESHOLD) && (evt != null) && (!_retransmitted)) {
_retransmitted = true;
RetransmissionTimer.getInstance().addEvent(evt, 0);
evt.reschedule(0);
}
}
public int getNACKs() { return _nackCount; }
public void setResendPacketEvent(SimpleTimer.TimedEvent evt) { _resendEvent = evt; }
public void setResendPacketEvent(SimpleTimer2.TimedEvent evt) { _resendEvent = evt; }
@Override
public StringBuffer formatAsString() {

View File

@@ -1,12 +1,12 @@
package net.i2p.client.streaming;
import net.i2p.util.SimpleTimer;
import net.i2p.util.SimpleTimer2;
/**
*
*/
public class RetransmissionTimer extends SimpleTimer {
public class RetransmissionTimer extends SimpleTimer2 {
private static final RetransmissionTimer _instance = new RetransmissionTimer();
public static final SimpleTimer getInstance() { return _instance; }
public static final RetransmissionTimer getInstance() { return _instance; }
protected RetransmissionTimer() { super("StreamingTimer"); }
}

View File

@@ -2,7 +2,7 @@ package net.i2p.client.streaming;
import net.i2p.I2PAppContext;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer;
import net.i2p.util.SimpleScheduler;
/**
* Base scheduler
@@ -17,6 +17,6 @@ abstract class SchedulerImpl implements TaskScheduler {
}
protected void reschedule(long msToWait, Connection con) {
SimpleTimer.getInstance().addEvent(con.getConnectionEvent(), msToWait);
SimpleScheduler.getInstance().addEvent(con.getConnectionEvent(), msToWait);
}
}

View File

@@ -7,7 +7,7 @@ import java.util.concurrent.ConcurrentHashMap;
import net.i2p.I2PAppContext;
import net.i2p.data.Destination;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer;
import net.i2p.util.SimpleTimer2;
/**
* Share important TCP Control Block parameters across Connections
@@ -38,11 +38,11 @@ public class TCBShare {
_log = ctx.logManager().getLog(TCBShare.class);
_cache = new ConcurrentHashMap(4);
_cleaner = new CleanEvent();
SimpleTimer.getInstance().addEvent(_cleaner, CLEAN_TIME);
_cleaner.schedule(CLEAN_TIME);
}
public void stop() {
SimpleTimer.getInstance().removeEvent(_cleaner);
_cleaner.cancel();
}
public void updateOptsFromShare(Connection con) {
@@ -124,14 +124,16 @@ public class TCBShare {
}
}
private class CleanEvent implements SimpleTimer.TimedEvent {
public CleanEvent() {}
private class CleanEvent extends SimpleTimer2.TimedEvent {
public CleanEvent() {
super(RetransmissionTimer.getInstance());
}
public void timeReached() {
for (Iterator iter = _cache.keySet().iterator(); iter.hasNext(); ) {
if (_cache.get(iter.next()).isExpired())
iter.remove();
}
SimpleTimer.getInstance().addEvent(CleanEvent.this, CLEAN_TIME);
schedule(CLEAN_TIME);
}
}
}