diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java index 1bacea50f..8ca0e982c 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java @@ -25,12 +25,22 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { _connection = con; } + /** + * This tells the flusher in MessageOutputStream whether to flush. + * It won't flush if this returns true. + * It was: return con.getUnackedPacketsSent() > 0; + * But then, for data that fills more than one packet, the last part of + * the data isn't sent until all the previous packets are acked. Which is very slow. + * + * So let's send data along unless the outbound window is full. + * + * @return !flush + */ public boolean writeInProcess() { Connection con = _connection; if (con != null) - return con.getUnackedPacketsSent() > 0; - else - return false; + return con.getUnackedPacketsSent() >= con.getOptions().getWindowSize(); + return false; } /** diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java index 972b08139..7d1d4827f 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java @@ -41,6 +41,10 @@ class ConnectionHandler { } public boolean getActive() { return _active; } + /** + * Non-SYN packets with a zero SendStreamID may also be queued here so + * that they don't get thrown away while the SYN packet before it is queued. + */ public void receiveNewSyn(Packet packet) { if (!_active) { if (_log.shouldLog(Log.WARN)) @@ -59,6 +63,8 @@ class ConnectionHandler { /** * Receive an incoming connection (built from a received SYN) + * Non-SYN packets with a zero SendStreamID may also be queued here so + * that they don't get thrown away while the SYN packet before it is queued. * * @param timeoutMs max amount of time to wait for a connection (if less * than 1ms, wait indefinitely) @@ -111,14 +117,40 @@ class ConnectionHandler { if (syn != null) { // deal with forged / invalid syn packets - Connection con = _manager.receiveConnection(syn); - if (con != null) - return con; + + // Handle both SYN and non-SYN packets in the queue + if (syn.isFlagSet(Packet.FLAG_SYNCHRONIZE)) { + Connection con = _manager.receiveConnection(syn); + if (con != null) + return con; + } else { + reReceivePacket(syn); + // ... and keep looping + } } // keep looping... } } - + + /** + * We found a non-SYN packet that was queued in the syn queue, + * check to see if it has a home now, else drop it ... + */ + private void reReceivePacket(Packet packet) { + Connection con = _manager.getConnectionByOutboundId(packet.getReceiveStreamId()); + if (con != null) { + // Send it through the packet handler again + if (_log.shouldLog(Log.WARN)) + _log.warn("Found con for queued non-syn packet: " + packet); + _manager.getPacketHandler().receivePacket(packet); + } else { + // goodbye + if (_log.shouldLog(Log.WARN)) + _log.warn("Did not find con for queued non-syn packet, dropping: " + packet); + packet.releasePayload(); + } + } + private void sendReset(Packet packet) { boolean ok = packet.verifySignature(_context, packet.getOptionalFrom(), null); if (!ok) { @@ -152,8 +184,12 @@ class ConnectionHandler { } if (removed) { - // timeout - send RST - sendReset(_synPacket); + if (_synPacket.isFlagSet(Packet.FLAG_SYNCHRONIZE)) + // timeout - send RST + sendReset(_synPacket); + else + // non-syn packet got stranded on the syn queue, send it to the con + reReceivePacket(_synPacket); } else { // handled. noop } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java index 0753765d0..28146ce72 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -108,9 +108,12 @@ public class ConnectionPacketHandler { boolean isNew = false; boolean allowAck = true; + // We allow the SendStreamID to be 0 so that the originator can send + // multiple packets before he gets the first ACK back. + // If we want to limit the number of packets we receive without a + // SendStreamID, do it in PacketHandler.receiveUnknownCon(). if ( (!packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) && - ( (packet.getSendStreamId() <= 0) || - (packet.getReceiveStreamId() <= 0) ) ) + (packet.getReceiveStreamId() <= 0) ) allowAck = false; if (allowAck) { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java index 952f1f793..4a19d5e09 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java @@ -43,10 +43,15 @@ public class MessageOutputStream extends OutputStream { private long _sendPeriodBytes; private int _sendBps; + private static final int DEFAULT_PASSIVE_FLUSH_DELAY = 250; + public MessageOutputStream(I2PAppContext ctx, DataReceiver receiver) { this(ctx, receiver, Packet.MAX_PAYLOAD_SIZE); } public MessageOutputStream(I2PAppContext ctx, DataReceiver receiver, int bufSize) { + this(ctx, receiver, bufSize, DEFAULT_PASSIVE_FLUSH_DELAY); + } + public MessageOutputStream(I2PAppContext ctx, DataReceiver receiver, int bufSize, int passiveFlushDelay) { super(); _dataCache = ByteCache.getInstance(128, bufSize); _context = ctx; @@ -57,7 +62,7 @@ public class MessageOutputStream extends OutputStream { _written = 0; _closed = false; _writeTimeout = -1; - _passiveFlushDelay = 500; + _passiveFlushDelay = passiveFlushDelay; _nextBufferSize = -1; _sendPeriodBeginTime = ctx.clock().now(); _sendPeriodBytes = 0; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java index 529eb7894..1f2b35902 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java @@ -31,6 +31,8 @@ public class PacketHandler { _lastDelay = _context.random().nextInt(30*1000); } +/** what is the point of this ? */ +/***** private boolean choke(Packet packet) { if (true) return true; //if ( (_dropped == 0) && true ) { //&& (_manager.getSent() <= 0) ) { @@ -45,9 +47,7 @@ public class PacketHandler { return false; } else { // if (true) return true; // no lag, just drop - /* - int delay = _context.random().nextInt(5*1000); - */ + // int delay = _context.random().nextInt(5*1000); int delay = _context.random().nextInt(1*1000); int delayFactor = _context.random().nextInt(100); if (delayFactor > 80) { @@ -85,10 +85,11 @@ public class PacketHandler { receivePacketDirect(_packet); } } +*****/ void receivePacket(Packet packet) { - boolean ok = choke(packet); - if (ok) + //boolean ok = choke(packet); + //if (ok) receivePacketDirect(packet); } @@ -239,19 +240,20 @@ public class PacketHandler { } packet.releasePayload(); } else { - //if (_log.shouldLog(Log.DEBUG) && !packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) - // _log.debug("Packet received on an unknown stream (and not an ECHO or SYN): " + packet); + if (_log.shouldLog(Log.WARN) && !packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) + _log.warn("Packet received on an unknown stream (and not an ECHO or SYN): " + packet); if (sendId <= 0) { Connection con = _manager.getConnectionByOutboundId(packet.getReceiveStreamId()); if (con != null) { if ( (con.getHighestAckedThrough() <= 5) && (packet.getSequenceNum() <= 5) ) { - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("Received additional packets before the syn on " + con + ": " + packet); + if (_log.shouldLog(Log.WARN)) + _log.warn("Received additional packet w/o SendStreamID after the syn on " + con + ": " + packet); receiveKnownCon(con, packet); return; } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("hrmph, received while ack of syn was in flight on " + con + ": " + packet + " acked: " + con.getAckedPackets()); + if (_log.shouldLog(Log.WARN)) + _log.warn("hrmph, received while ack of syn was in flight on " + con + ": " + packet + " acked: " + con.getAckedPackets()); + // allow unlimited packets without a SendStreamID for now receiveKnownCon(con, packet); return; } @@ -261,8 +263,17 @@ public class PacketHandler { if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) { _manager.getConnectionHandler().receiveNewSyn(packet); } else { + // We can get here on the 2nd+ packet if the 1st (SYN) packet + // is still on the _synQueue in the ConnectionHandler, and + // ConnectionManager.receiveConnection() hasn't run yet to put + // the StreamID on the getConnectionByOutboundId list. + // Then the 2nd packet gets discarded and has to be retransmitted. + // + // We fix this by putting this packet on the syn queue too! + // Then ConnectionHandler.accept() will check the connection list + // and call receivePacket() above instead of receiveConnection(). if (_log.shouldLog(Log.WARN)) { - _log.warn("Packet belongs to no other cons: " + packet); + _log.warn("Packet belongs to no other cons, putting on the syn queue: " + packet); } if (_log.shouldLog(Log.DEBUG)) { StringBuffer buf = new StringBuffer(128); @@ -274,7 +285,8 @@ public class PacketHandler { _log.debug("connections: " + buf.toString() + " sendId: " + (sendId > 0 ? Packet.toId(sendId) : " unknown")); } - packet.releasePayload(); + //packet.releasePayload(); + _manager.getConnectionHandler().receiveNewSyn(packet); } } } diff --git a/history.txt b/history.txt index 736fa8d33..3badded12 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,16 @@ +2008-11-11 zzz + * Streaming - Fix several bugs and improve performance + when the initial data is larger than one MTU, + e.g. HTTP GETs with large URLs, CGI params or cookies, + or large HTTP POSTS: + - Don't reject additional packets received without a + send stream ID (i.e. sent before the SYN ACK was received) + - Put unknown non-SYN packets on the SYN queue also + so they won't be rejected + - Reduce flusher delay to 250ms (was 500) + - Flush unless window is full (was window is non-empty) + * NetDb: Fix a deadlock caused by last checkin + 2008-11-09 zzz * build.xml: Build speedups: - Don't distclean in the updaterRouter target diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 8d9108c84..d066d26a8 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -17,7 +17,7 @@ import net.i2p.CoreVersion; public class RouterVersion { public final static String ID = "$Revision: 1.548 $ $Date: 2008-06-07 23:00:00 $"; public final static String VERSION = "0.6.4"; - public final static long BUILD = 9; + public final static long BUILD = 10; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("Router ID: " + RouterVersion.ID);