From 1503ee2dfa6d61df502511e978b3c44cbdcb2a28 Mon Sep 17 00:00:00 2001 From: jrandom Date: Wed, 29 Dec 2004 15:53:28 +0000 Subject: [PATCH] 2004-12-28 jrandom * Cleaned up the resending and choking algorithm in the streaming lib. * Removed the read timeout override for I2PTunnel's httpclient, allowing it to use the default for the streaming lib. * Revised ack triggers in the streaming lib. * Logging. --- .../i2p/i2ptunnel/I2PTunnelHTTPClient.java | 5 +-- .../net/i2p/client/streaming/Connection.java | 44 ++++++++++++++----- .../streaming/ConnectionDataReceiver.java | 6 +-- .../streaming/ConnectionPacketHandler.java | 9 ++-- .../net/i2p/client/streaming/PacketLocal.java | 10 ++++- .../net/i2p/client/streaming/PacketQueue.java | 3 ++ .../src/net/i2p/client/I2PSessionImpl2.java | 16 ++++--- history.txt | 9 +++- .../src/net/i2p/router/RouterVersion.java | 4 +- .../router/transport/tcp/TCPConnection.java | 10 ++++- .../i2p/router/tunnelmanager/TunnelPool.java | 2 +- 11 files changed, 81 insertions(+), 37 deletions(-) diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClient.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClient.java index 373c9fb7d..5610bc213 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClient.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClient.java @@ -147,7 +147,6 @@ public class I2PTunnelHTTPClient extends I2PTunnelClientBase implements Runnable private static final int DEFAULT_READ_TIMEOUT = 60*1000; - /** * create the default options (using the default timeout, etc) * @@ -156,8 +155,8 @@ public class I2PTunnelHTTPClient extends I2PTunnelClientBase implements Runnable Properties defaultOpts = getTunnel().getClientOptions(); if (!defaultOpts.contains(I2PSocketOptions.PROP_READ_TIMEOUT)) defaultOpts.setProperty(I2PSocketOptions.PROP_READ_TIMEOUT, ""+DEFAULT_READ_TIMEOUT); - if (!defaultOpts.contains("i2p.streaming.inactivityTimeout")) - defaultOpts.setProperty("i2p.streaming.inactivityTimeout", ""+DEFAULT_READ_TIMEOUT); + //if (!defaultOpts.contains("i2p.streaming.inactivityTimeout")) + // defaultOpts.setProperty("i2p.streaming.inactivityTimeout", ""+DEFAULT_READ_TIMEOUT); I2PSocketOptions opts = sockMgr.buildOptions(defaultOpts); if (!defaultOpts.containsKey(I2PSocketOptions.PROP_CONNECT_TIMEOUT)) opts.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index 0edb615df..cc7b052e2 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -72,7 +72,7 @@ public class Connection { private long _lifetimeDupMessageReceived; public static final long MAX_RESEND_DELAY = 60*1000; - public static final long MIN_RESEND_DELAY = 40*1000; + public static final long MIN_RESEND_DELAY = 30*1000; /** wait up to 5 minutes after disconnection so we can ack/close packets */ public static int DISCONNECT_TIMEOUT = 5*60*1000; @@ -146,20 +146,25 @@ public class Connection { synchronized (_outboundPackets) { if (!started) _context.statManager().addRateData("stream.chokeSizeBegin", _outboundPackets.size(), timeoutMs); + if (!_connected) + return false; started = true; - if (_outboundPackets.size() >= _options.getWindowSize()) { + if ( (_outboundPackets.size() >= _options.getWindowSize()) || (_activeResends > 0) ) { if (writeExpire > 0) { if (timeLeft <= 0) { _log.error("Outbound window is full of " + _outboundPackets.size() + + " with " + _activeResends + " active resends" + " and we've waited too long (" + writeExpire + "ms)"); return false; } if (_log.shouldLog(Log.DEBUG)) - _log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _options.getWindowSize() + "), waiting " + timeLeft); + _log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _options.getWindowSize() + "/" + + _activeResends + "), waiting " + timeLeft); try { _outboundPackets.wait(timeLeft); } catch (InterruptedException ie) {} } else { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Outbound window is full (" + _outboundPackets.size() + "), waiting indefinitely"); + _log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends + + "), waiting indefinitely"); try { _outboundPackets.wait(); } catch (InterruptedException ie) {} } } else { @@ -242,7 +247,7 @@ public class Connection { if (timeout > MAX_RESEND_DELAY) timeout = MAX_RESEND_DELAY; if (_log.shouldLog(Log.DEBUG)) - _log.debug("Resend in " + timeout + " for " + packet); + _log.debug("Resend in " + timeout + " for " + packet, new Exception("Sent by")); SimpleTimer.getInstance().addEvent(new ResendPacketEvent(packet), timeout); } @@ -688,6 +693,14 @@ public class Connection { default: if (_log.shouldLog(Log.WARN)) _log.warn("Closing connection due to inactivity"); + if (_log.shouldLog(Log.DEBUG)) { + StringBuffer buf = new StringBuffer(128); + buf.append("last sent was: ").append(_context.clock().now() - _lastSendTime); + buf.append("ms ago, last received was: ").append(_context.clock().now()-_lastReceivedOn); + buf.append("ms ago, inactivity timeout is: ").append(_options.getInactivityTimeout()); + _log.debug(buf.toString()); + } + disconnect(true); break; } @@ -752,10 +765,8 @@ public class Connection { */ private class ResendPacketEvent implements SimpleTimer.TimedEvent { private PacketLocal _packet; - private boolean _currentIsActiveResend; public ResendPacketEvent(PacketLocal packet) { _packet = packet; - _currentIsActiveResend = false; packet.setResendPacketEvent(ResendPacketEvent.this); } @@ -763,7 +774,7 @@ public class Connection { if (_packet.getAckTime() > 0) return; - if (!_connected) { + if (_resetSent || _resetReceived) { _packet.cancelled(); return; } @@ -771,12 +782,15 @@ public class Connection { //if (_log.shouldLog(Log.DEBUG)) // _log.debug("Resend period reached for " + _packet); boolean resend = false; + boolean isLowest = false; synchronized (_outboundPackets) { + if (_packet.getSequenceNum() == _highestAckedThrough + 1) + isLowest = true; if (_outboundPackets.containsKey(new Long(_packet.getSequenceNum()))) resend = true; } if ( (resend) && (_packet.getAckTime() < 0) ) { - if ( (_activeResends > 0) && (!_currentIsActiveResend) ) { + if (!isLowest) { // we want to resend this packet, but there are already active // resends in the air and we dont want to make a bad situation // worse. wait another second @@ -808,7 +822,6 @@ public class Connection { if (numSends == 2) { // first resend for this packet _activeResends++; - _currentIsActiveResend = true; } // in case things really suck, the other side may have lost thier @@ -828,6 +841,17 @@ public class Connection { + (_context.clock().now() - _packet.getCreatedOn()) + "ms)"); _outboundQueue.enqueue(_packet); + _lastSendTime = _context.clock().now(); + + // acked during resending (... or somethin') + if ( (_packet.getAckTime() > 0) && (_packet.getNumSends() > 1) ) { + _activeResends--; + synchronized (_outboundPackets) { + _outboundPackets.notifyAll(); + } + return; + } + if (numSends > _options.getMaxResends()) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Too many resends"); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java index 1554d98aa..4e95325cc 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java @@ -114,9 +114,9 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { con.sendPacket(packet); long sent = System.currentTimeMillis(); - if ( (built-before > 1000) && (_log.shouldLog(Log.WARN)) ) + if ( (built-before > 5*1000) && (_log.shouldLog(Log.WARN)) ) _log.warn("wtf, took " + (built-before) + "ms to build a packet: " + packet); - if ( (sent-built> 1000) && (_log.shouldLog(Log.WARN)) ) + if ( (sent-built> 5*1000) && (_log.shouldLog(Log.WARN)) ) _log.warn("wtf, took " + (sent-built) + "ms to send a packet: " + packet); return packet; } @@ -165,7 +165,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { // packets sent, otherwise the other side could receive the CLOSE prematurely, // since this ACK could arrive before the unacked payload message. if (con.getOutputStream().getClosed() && - ( (size > 0) || (con.getUnackedPacketsSent() <= 0) ) ) { + ( (size > 0) || (con.getUnackedPacketsSent() <= 0) || (packet.getSequenceNum() > 0) ) ) { packet.setFlag(Packet.FLAG_CLOSE); con.setCloseSentOn(_context.clock().now()); if (_log.shouldLog(Log.DEBUG)) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java index 36b1d1e98..99d724d6d 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -56,7 +56,7 @@ public class ConnectionPacketHandler { long ready = con.getInputStream().getHighestReadyBockId(); int available = con.getOptions().getInboundBufferSize() - con.getInputStream().getTotalReadySize(); int allowedBlocks = available/con.getOptions().getMaxMessageSize(); - if (packet.getSequenceNum() > ready + allowedBlocks) { + if ( (packet.getPayloadSize() > 0) && (packet.getSequenceNum() > ready + allowedBlocks) ) { if (_log.shouldLog(Log.WARN)) _log.warn("Inbound buffer exceeded on connection " + con + " (" + ready + "/"+ (ready+allowedBlocks) + "/" + available @@ -106,14 +106,11 @@ public class ConnectionPacketHandler { con.incrementDupMessagesReceived(1); // take note of congestion - //con.getOptions().setResendDelay(con.getOptions().getResendDelay()*2); - //con.getOptions().setWindowSize(con.getOptions().getWindowSize()/2); if (_log.shouldLog(Log.WARN)) _log.warn("congestion.. dup " + packet); SimpleTimer.getInstance().addEvent(new AckDup(con), con.getOptions().getSendAckDelay()); - //con.incrementUnackedPacketsReceived(); //con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay()); - fastAck = true; + //fastAck = true; } else { if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) { //con.incrementUnackedPacketsReceived(); @@ -128,7 +125,7 @@ public class ConnectionPacketHandler { fastAck = fastAck || ack(con, packet.getAckThrough(), packet.getNacks(), packet, isNew); con.eventOccurred(); if (fastAck) { - if (con.getLastSendTime() + 1000 < _context.clock().now()) { + if (con.getLastSendTime() + 2000 < _context.clock().now()) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Fast ack for dup " + packet); con.ackImmediately(); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java index aeac85cbd..3dd11ae22 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java @@ -69,6 +69,8 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat public void prepare() { if (_connection != null) _connection.getInputStream().updateAcks(this); + if (_numSends > 0) // so we can debug to differentiate resends + setOptionalDelay(_numSends * 1000); } public long getCreatedOn() { return _createdOn; } @@ -110,10 +112,14 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat public String toString() { String str = super.toString(); + + if ( (_tagsSent != null) && (_tagsSent.size() > 0) ) + str = str + " with tags"; + if (_ackOn > 0) - return str + " ack after " + getAckTime(); + return str + " ack after " + getAckTime() + (_numSends <= 1 ? "" : " sent " + _numSends + " times"); else - return str; + return str + (_numSends <= 1 ? "" : " sent " + _numSends + " times"); } public void waitForAccept(int maxWaitMs) { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java index 5947524ed..8002fe4f8 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java @@ -74,6 +74,9 @@ class PacketQueue { if ( (writeTime > 1000) && (_log.shouldLog(Log.WARN)) ) _log.warn("took " + writeTime + "ms to write the packet: " + packet); + // last chance to short circuit... + if (packet.getAckTime() > 0) return; + // this should not block! begin = _context.clock().now(); sent = _session.sendMessage(packet.getTo(), buf, 0, size, keyUsed, tagsSent); diff --git a/core/java/src/net/i2p/client/I2PSessionImpl2.java b/core/java/src/net/i2p/client/I2PSessionImpl2.java index 27494d8f7..44389e2f1 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl2.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl2.java @@ -105,6 +105,8 @@ class I2PSessionImpl2 extends I2PSessionImpl { } return compressed; } + + private static final int NUM_TAGS = 50; private boolean sendBestEffort(Destination dest, byte payload[], SessionKey keyUsed, Set tagsSent) throws I2PSessionException { @@ -119,14 +121,14 @@ class I2PSessionImpl2 extends I2PSessionImpl { if ( (tagsSent == null) || (tagsSent.size() <= 0) ) { if (oldTags < 10) { - sentTags = createNewTags(50); + sentTags = createNewTags(NUM_TAGS); if (_log.shouldLog(Log.DEBUG)) - _log.debug("** sendBestEffort only had " + oldTags + " with " + availTimeLeft + ", adding 50"); + _log.debug("** sendBestEffort only had " + oldTags + " with " + availTimeLeft + ", adding " + NUM_TAGS); } else if (availTimeLeft < 2 * 60 * 1000) { // if we have > 10 tags, but they expire in under 2 minutes, we want more - sentTags = createNewTags(50); + sentTags = createNewTags(NUM_TAGS); if (_log.shouldLog(Log.DEBUG)) - _log.debug(getPrefix() + "Tags expiring in " + availTimeLeft + ", adding 50 new ones"); + _log.debug(getPrefix() + "Tags expiring in " + availTimeLeft + ", adding " + NUM_TAGS + " new ones"); //_log.error("** sendBestEffort available time left " + availTimeLeft); } else { if (_log.shouldLog(Log.DEBUG)) @@ -240,11 +242,11 @@ class I2PSessionImpl2 extends I2PSessionImpl { SessionTag tag = _context.sessionKeyManager().consumeNextAvailableTag(dest.getPublicKey(), key); Set sentTags = null; if (_context.sessionKeyManager().getAvailableTags(dest.getPublicKey(), key) < 10) { - sentTags = createNewTags(50); + sentTags = createNewTags(NUM_TAGS); } else if (_context.sessionKeyManager().getAvailableTimeLeft(dest.getPublicKey(), key) < 2 * 60 * 1000) { // if we have > 10 tags, but they expire in under 30 seconds, we want more - sentTags = createNewTags(50); - if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Tags are almost expired, adding 50 new ones"); + sentTags = createNewTags(NUM_TAGS); + if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Tags are almost expired, adding " + NUM_TAGS + " new ones"); } SessionKey newKey = null; if (false) // rekey diff --git a/history.txt b/history.txt index 1b356cd6d..47a9ddac4 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,11 @@ -$Id: history.txt,v 1.117 2004/12/21 11:32:50 jrandom Exp $ +$Id: history.txt,v 1.118 2004/12/21 13:23:03 jrandom Exp $ + +2004-12-28 jrandom + * Cleaned up the resending and choking algorithm in the streaming lib. + * Removed the read timeout override for I2PTunnel's httpclient, allowing + it to use the default for the streaming lib. + * Revised ack triggers in the streaming lib. + * Logging. * 2004-12-21 0.4.2.5 released diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 75e753573..b8a796ff2 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.122 $ $Date: 2004/12/21 11:32:50 $"; + public final static String ID = "$Revision: 1.123 $ $Date: 2004/12/21 13:23:03 $"; public final static String VERSION = "0.4.2.5"; - public final static long BUILD = 0; + public final static long BUILD = 1; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java b/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java index e1197cae0..77c89f742 100644 --- a/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java +++ b/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java @@ -168,10 +168,16 @@ public class TCPConnection { msg.timestamp("TCPConnection.addMessage"); List expired = null; int remaining = 0; + long remainingSize = 0; synchronized (_pendingMessages) { _pendingMessages.add(msg); expired = locked_expireOld(); locked_throttle(); + for (int i = 0; i < _pendingMessages.size(); i++) { + OutNetMessage cur = (OutNetMessage)_pendingMessages.get(i); + remaining++; + remainingSize += cur.getMessageSize(); + } remaining = _pendingMessages.size(); _pendingMessages.notifyAll(); } @@ -182,8 +188,8 @@ public class TCPConnection { if (_log.shouldLog(Log.WARN)) _log.warn("Message " + cur.getMessageId() + " expired on the queue to " + _ident.getHash().toBase64().substring(0,6) - + " (queue size " + remaining + ") with lifetime " - + cur.getLifetime()); + + " (queue size " + remaining + "/" + remainingSize + ") with lifetime " + + cur.getLifetime() + " and size " + cur.getMessageSize()); sent(cur, false, 0); } } diff --git a/router/java/src/net/i2p/router/tunnelmanager/TunnelPool.java b/router/java/src/net/i2p/router/tunnelmanager/TunnelPool.java index 65b144de3..abbc0a03f 100644 --- a/router/java/src/net/i2p/router/tunnelmanager/TunnelPool.java +++ b/router/java/src/net/i2p/router/tunnelmanager/TunnelPool.java @@ -559,7 +559,7 @@ class TunnelPool { } if (_log.shouldLog(Log.WARN)) - _log.warn("Tunnel " + id + " marked as not ready, since it /failed/", new Exception("Failed tunnel")); + _log.warn("Tunnel " + id + " marked as not ready, since it /failed/: " + info.toString(), new Exception("Failed tunnel")); _context.messageHistory().tunnelFailed(info.getTunnelId()); info.setIsReady(false); Hash us = _context.routerHash();