2006-02-15 jrandom

* Merged in the i2p_0_6_1_10_PRE branch to the trunk, so CVS HEAD is no
      longer backwards compatible (and should not be used until 0.6.1.1 is
      out)
This commit is contained in:
jrandom
2006-02-15 05:33:17 +00:00
committed by zzz
parent 1374ea0ea1
commit 113fbc1df3
127 changed files with 2687 additions and 1309 deletions

View File

@@ -123,6 +123,7 @@ public class Connection {
_context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.chokeSizeBegin", "How many messages were outstanding when we started to choke?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.chokeSizeEnd", "How many messages were outstanding when we stopped being choked?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.fastRetransmit", "How long a packet has been around for if it has been resent per the fast retransmit timer?", "Stream", new long[] { 60*1000, 10*60*1000 });
if (_log.shouldLog(Log.INFO))
_log.info("New connection created with options: " + _options);
}
@@ -377,6 +378,8 @@ public class Connection {
for (int i = 0; i < nacks.length; i++) {
if (nacks[i] == id.longValue()) {
nacked = true;
PacketLocal nackedPacket = (PacketLocal)_outboundPackets.get(id);
nackedPacket.incrementNACKs();
break; // NACKed
}
}
@@ -929,6 +932,13 @@ public class Connection {
public String toString() { return "event on " + Connection.this.toString(); }
}
/**
* If we have been explicitly NACKed three times, retransmit the packet even if
* there are other packets in flight.
*
*/
static final int FAST_RETRANSMIT_THRESHOLD = 3;
/**
* Coordinate the resends of a given packet
*/
@@ -969,8 +979,9 @@ public class Connection {
if (_outboundPackets.containsKey(new Long(_packet.getSequenceNum())))
resend = true;
}
if ( (resend) && (_packet.getAckTime() < 0) ) {
if (!isLowest) {
if ( (resend) && (_packet.getAckTime() <= 0) ) {
boolean fastRetransmit = ( (_packet.getNACKs() >= FAST_RETRANSMIT_THRESHOLD) && (_packet.getNumSends() == 1));
if ( (!isLowest) && (!fastRetransmit) ) {
// we want to resend this packet, but there are already active
// resends in the air and we dont want to make a bad situation
// worse. wait another second
@@ -981,6 +992,10 @@ public class Connection {
_nextSendTime = 1000 + _context.clock().now();
return false;
}
if (fastRetransmit)
_context.statManager().addRateData("stream.fastRetransmit", _packet.getLifetime(), _packet.getLifetime());
// revamp various fields, in case we need to ack more, etc
_inputStream.updateAcks(_packet);
int choke = getOptions().getChoke();

View File

@@ -54,10 +54,10 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
public static final String PROP_SLOW_START_GROWTH_RATE_FACTOR = "i2p.streaming.slowStartGrowthRateFactor";
private static final int TREND_COUNT = 3;
static final int INITIAL_WINDOW_SIZE = 6;
static final int INITIAL_WINDOW_SIZE = 12;
static final int DEFAULT_MAX_SENDS = 8;
static final int MIN_WINDOW_SIZE = 6;
static final int MIN_WINDOW_SIZE = INITIAL_WINDOW_SIZE;
public ConnectionOptions() {
super();
@@ -105,7 +105,7 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
setRTT(getInt(opts, PROP_INITIAL_RTT, 10*1000));
setReceiveWindow(getInt(opts, PROP_INITIAL_RECEIVE_WINDOW, 1));
setResendDelay(getInt(opts, PROP_INITIAL_RESEND_DELAY, 1000));
setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, 500));
setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, 2000));
setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, INITIAL_WINDOW_SIZE));
setMaxResends(getInt(opts, PROP_MAX_RESENDS, DEFAULT_MAX_SENDS));
setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1));
@@ -136,7 +136,7 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
if (opts.containsKey(PROP_INITIAL_RESEND_DELAY))
setResendDelay(getInt(opts, PROP_INITIAL_RESEND_DELAY, 1000));
if (opts.containsKey(PROP_INITIAL_ACK_DELAY))
setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, 500));
setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, 2000));
if (opts.containsKey(PROP_INITIAL_WINDOW_SIZE))
setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, INITIAL_WINDOW_SIZE));
if (opts.containsKey(PROP_MAX_RESENDS))

View File

@@ -33,7 +33,8 @@ public class ConnectionPacketHandler {
void receivePacket(Packet packet, Connection con) throws I2PException {
boolean ok = verifyPacket(packet, con);
if (!ok) {
if ( (!packet.isFlagSet(Packet.FLAG_RESET)) && (_log.shouldLog(Log.ERROR)) )
boolean isTooFast = con.getSendStreamId() <= 0;
if ( (!packet.isFlagSet(Packet.FLAG_RESET)) && (!isTooFast) && (_log.shouldLog(Log.ERROR)) )
_log.error("Packet does NOT verify: " + packet + " on " + con);
packet.releasePayload();
return;
@@ -45,6 +46,7 @@ public class ConnectionPacketHandler {
if (_log.shouldLog(Log.WARN))
_log.warn("Received a data packet after hard disconnect: " + packet + " on " + con);
con.sendReset();
con.disconnect(false);
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Received a packet after hard disconnect, ignoring: " + packet + " on " + con);
@@ -59,6 +61,7 @@ public class ConnectionPacketHandler {
_log.warn("Received new data when we've sent them data and all of our data is acked: "
+ packet + " on " + con + "");
con.sendReset();
con.disconnect(false);
packet.releasePayload();
return;
}
@@ -365,8 +368,8 @@ public class ConnectionPacketHandler {
if (packet.getSequenceNum() < MAX_INITIAL_PACKETS) {
return true;
} else {
if (_log.shouldLog(Log.ERROR))
_log.error("Packet without RST or SYN where we dont know stream ID: "
if (_log.shouldLog(Log.WARN))
_log.warn("Packet without RST or SYN where we dont know stream ID: "
+ packet);
return false;
}

View File

@@ -111,7 +111,7 @@ public class PacketHandler {
private static final SimpleDateFormat _fmt = new SimpleDateFormat("HH:mm:ss.SSS");
void displayPacket(Packet packet, String prefix, String suffix) {
if (!_log.shouldLog(Log.DEBUG)) return;
if (!_log.shouldLog(Log.INFO)) return;
StringBuffer buf = new StringBuffer(256);
synchronized (_fmt) {
buf.append(_fmt.format(new Date()));
@@ -120,7 +120,10 @@ public class PacketHandler {
buf.append(packet.toString());
if (suffix != null)
buf.append(" ").append(suffix);
System.out.println(buf.toString());
String str = buf.toString();
System.out.println(str);
if (_log.shouldLog(Log.DEBUG))
_log.debug(str);
}
private void receiveKnownCon(Connection con, Packet packet) {
@@ -162,7 +165,7 @@ public class PacketHandler {
} else {
if ( (con.getSendStreamId() <= 0) ||
(DataHelper.eq(con.getSendStreamId(), packet.getReceiveStreamId())) ||
(packet.getSequenceNum() <= 5) ) { // its in flight from the first batch
(packet.getSequenceNum() <= ConnectionOptions.MIN_WINDOW_SIZE) ) { // its in flight from the first batch
long oldId = con.getSendStreamId();
if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) {
if (oldId <= 0) {
@@ -259,15 +262,17 @@ public class PacketHandler {
_manager.getConnectionHandler().receiveNewSyn(packet);
} else {
if (_log.shouldLog(Log.WARN)) {
_log.warn("Packet belongs to no other cons: " + packet);
}
if (_log.shouldLog(Log.DEBUG)) {
StringBuffer buf = new StringBuffer(128);
Set cons = _manager.listConnections();
for (Iterator iter = cons.iterator(); iter.hasNext(); ) {
Connection con = (Connection)iter.next();
buf.append(con.toString()).append(" ");
}
_log.warn("Packet belongs to no other cons: " + packet + " connections: "
+ buf.toString() + " sendId: "
+ (sendId > 0 ? Packet.toId(sendId) : " unknown"));
_log.debug("connections: " + buf.toString() + " sendId: "
+ (sendId > 0 ? Packet.toId(sendId) : " unknown"));
}
packet.releasePayload();
}

View File

@@ -25,6 +25,8 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat
private long _acceptedOn;
private long _ackOn;
private long _cancelledOn;
private volatile int _nackCount;
private volatile boolean _retransmitted;
private SimpleTimer.TimedEvent _resendEvent;
public PacketLocal(I2PAppContext ctx, Destination to) {
@@ -38,6 +40,8 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat
_connection = con;
_lastSend = -1;
_cancelledOn = -1;
_nackCount = 0;
_retransmitted = false;
}
public Destination getTo() { return _to; }
@@ -113,6 +117,16 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat
public int getNumSends() { return _numSends; }
public long getLastSend() { return _lastSend; }
public Connection getConnection() { return _connection; }
public void incrementNACKs() {
int cnt = ++_nackCount;
SimpleTimer.TimedEvent evt = _resendEvent;
if ( (cnt >= Connection.FAST_RETRANSMIT_THRESHOLD) && (evt != null) && (!_retransmitted)) {
_retransmitted = true;
RetransmissionTimer.getInstance().addEvent(evt, 0);
}
}
public int getNACKs() { return _nackCount; }
public void setResendPacketEvent(SimpleTimer.TimedEvent evt) { _resendEvent = evt; }