From 48cdf17a4f63829c28b653f83ae10d01c3f7e752 Mon Sep 17 00:00:00 2001 From: jrandom Date: Thu, 28 Oct 2004 02:03:38 +0000 Subject: [PATCH] * revamped locking to block on flush and close until all of the packets through that point have been ACKed, throwing an InterruptedIOException if there was a writeTimeout or an IOException if the con failed * revamped the ack/nack field settings to ack as much as possible * handle some strange timeout/resend errors on connection * pass 1/2rtt as the packet 'optional delay' field, and use that to schedule the ack time (the 'last' messages in a window set the optional delay to 0, asking for immediate ack of all received) * increase the optional delay to 2 bytes (#ms to delay) * inject random failures and delays if configured to do so in PacketHandler.choke * fix up the window size adjustment (increment on ack, /= 2 on resend) * use the highest RTT in the new RTT calculation so that we fit more in (via SACK) * fix up the SACK handling (duh) * revise the resend time calculation --- .../net/i2p/client/streaming/Connection.java | 105 ++++++++++++----- .../streaming/ConnectionDataReceiver.java | 33 ++++-- .../client/streaming/ConnectionHandler.java | 15 ++- .../client/streaming/ConnectionManager.java | 10 +- .../client/streaming/ConnectionOptions.java | 10 +- .../streaming/ConnectionPacketHandler.java | 86 +++++++------- .../client/streaming/MessageInputStream.java | 30 +++-- .../client/streaming/MessageOutputStream.java | 107 ++++++++++++++---- .../src/net/i2p/client/streaming/Packet.java | 50 +++++--- .../i2p/client/streaming/PacketHandler.java | 98 ++++++++++++---- .../net/i2p/client/streaming/PacketLocal.java | 59 +++++++++- .../net/i2p/client/streaming/PacketQueue.java | 11 +- 12 files changed, 454 insertions(+), 160 deletions(-) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index ffa840f42..cbea38847 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -92,9 +92,9 @@ public class Connection { * * @return true if the packet should be sent */ - boolean packetSendChoke() { + boolean packetSendChoke(long timeoutMs) { if (false) return true; - long writeExpire = _options.getWriteTimeout(); + long writeExpire = timeoutMs; while (true) { long timeLeft = writeExpire - _context.clock().now(); synchronized (_outboundPackets) { @@ -130,7 +130,7 @@ public class Connection { void sendAvailable() { // this grabs the data, builds a packet, and queues it up via sendPacket try { - _outputStream.flushAvailable(_receiver); + _outputStream.flushAvailable(_receiver, false); } catch (IOException ioe) { if (_log.shouldLog(Log.ERROR)) _log.error("Error flushing available", ioe); @@ -149,12 +149,28 @@ public class Connection { if ( (packet.getSequenceNum() == 0) && (!packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) ) { ackOnly = true; + if (_log.shouldLog(Log.DEBUG)) + _log.debug("No resend for " + packet); } else { + int remaining = 0; synchronized (_outboundPackets) { _outboundPackets.put(new Long(packet.getSequenceNum()), packet); + remaining = _options.getWindowSize() - _outboundPackets.size() ; _outboundPackets.notifyAll(); } - SimpleTimer.getInstance().addEvent(new ResendPacketEvent(packet), _options.getRTT()*2); + if (remaining < 0) + remaining = 0; + if (packet.isFlagSet(Packet.FLAG_CLOSE) || (remaining < 2)) { + packet.setOptionalDelay(0); + } else { + int delay = _options.getRTT() / 2; + packet.setOptionalDelay(delay); + _log.debug("Requesting ack delay of " + delay + "ms for packet " + packet); + } + packet.setFlag(Packet.FLAG_DELAY_REQUESTED); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Resend in " + (_options.getRTT()*2) + " for " + packet); + SimpleTimer.getInstance().addEvent(new ResendPacketEvent(packet), (_options.getRTT()*2 < 5000 ? 5000 : _options.getRTT()*2)); } _lastSendTime = _context.clock().now(); @@ -167,7 +183,7 @@ public class Connection { // ACKs don't get ACKed, but pings do. if (packet.getTagsSent().size() > 0) { _log.warn("Sending a ping since the ACK we just sent has " + packet.getTagsSent().size() + " tags"); - _connectionManager.ping(_remotePeer, _options.getRTT()*2, false, packet.getKeyUsed(), packet.getTagsSent()); + _connectionManager.ping(_remotePeer, 30*1000, false, packet.getKeyUsed(), packet.getTagsSent()); } } } @@ -178,13 +194,17 @@ public class Connection { for (Iterator iter = _outboundPackets.keySet().iterator(); iter.hasNext(); ) { Long id = (Long)iter.next(); if (id.longValue() <= ackThrough) { + boolean nacked = false; if (nacks != null) { // linear search since its probably really tiny - for (int i = 0; i < nacks.length; i++) - if (nacks[i] == id.longValue()) - continue; // NACKed - } else { - // ACKed + for (int i = 0; i < nacks.length; i++) { + if (nacks[i] == id.longValue()) { + nacked = true; + break; // NACKed + } + } + } + if (!nacked) { // aka ACKed if (acked == null) acked = new ArrayList(1); PacketLocal ackedPacket = (PacketLocal)_outboundPackets.get(id); @@ -231,16 +251,15 @@ public class Connection { if (cleanDisconnect) { // send close packets and schedule stuff... - try { - _outputStream.close(); - _inputStream.close(); - } catch (IOException ioe) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Error on clean disconnect", ioe); - } + _outputStream.closeInternal(); + _inputStream.close(); } else { doClose(); synchronized (_outboundPackets) { + for (Iterator iter = _outboundPackets.values().iterator(); iter.hasNext(); ) { + PacketLocal pl = (PacketLocal)iter.next(); + pl.cancelled(); + } _outboundPackets.clear(); _outboundPackets.notifyAll(); } @@ -297,10 +316,25 @@ public class Connection { */ public long getNextSendTime() { return _nextSendTime; } public void setNextSendTime(long when) { - if (_nextSendTime > 0) - if (_log.shouldLog(Log.DEBUG)) - _log.debug("set next send time to " + (when-_nextSendTime) + "ms after it was before ("+when+")"); - _nextSendTime = when; + if (_nextSendTime >= 0) { + if (when < _nextSendTime) + _nextSendTime = when; + } else { + _nextSendTime = when; + } + + if (_nextSendTime >= 0) { + long max = _context.clock().now() + _options.getSendAckDelay(); + if (max < _nextSendTime) + _nextSendTime = max; + } + + if (_log.shouldLog(Log.DEBUG) && false) { + if (_nextSendTime <= 0) + _log.debug("set next send time to an unknown time", new Exception(toString())); + else + _log.debug("set next send time to " + (_nextSendTime-_context.clock().now()) + "ms from now", new Exception(toString())); + } } public long getAckedPackets() { return _ackedPackets; } @@ -346,6 +380,12 @@ public class Connection { buf.append("] "); } buf.append("unacked inbound? ").append(getUnackedPacketsReceived()); + buf.append(" [high ").append(_inputStream.getHighestBlockId()); + long nacks[] = _inputStream.getNacks(); + if (nacks != null) + for (int i = 0; i < nacks.length; i++) + buf.append(" ").append(nacks[i]); + buf.append("]"); buf.append("]"); return buf.toString(); } @@ -360,6 +400,8 @@ public class Connection { } public void timeReached() { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Resend period reached for " + _packet); boolean resend = false; synchronized (_outboundPackets) { if (_outboundPackets.containsKey(new Long(_packet.getSequenceNum()))) @@ -367,18 +409,26 @@ public class Connection { } if ( (resend) && (_packet.getAckTime() < 0) ) { // revamp various fields, in case we need to ack more, etc - _packet.setAckThrough(getInputStream().getHighestBlockId()); - _packet.setNacks(getInputStream().getNacks()); + _inputStream.updateAcks(_packet); _packet.setOptionalDelay(getOptions().getChoke()); _packet.setOptionalMaxSize(getOptions().getMaxMessageSize()); _packet.setResendDelay(getOptions().getResendDelay()); _packet.setReceiveStreamId(_receiveStreamId); _packet.setSendStreamId(_sendStreamId); + // shrink the window + int newWindowSize = getOptions().getWindowSize(); + newWindowSize /= 2; + if (newWindowSize <= 0) + newWindowSize = 1; + getOptions().setWindowSize(newWindowSize); + int numSends = _packet.getNumSends() + 1; if (_log.shouldLog(Log.WARN)) - _log.warn("Resend packet " + _packet + " time " + numSends + " on " + Connection.this); + _log.warn("Resend packet " + _packet + " time " + numSends + " (wsize " + + newWindowSize + " lifetime " + + (_context.clock().now() - _packet.getCreatedOn()) + "ms)"); _outboundQueue.enqueue(_packet); if (numSends > _options.getMaxResends()) { @@ -387,14 +437,15 @@ public class Connection { disconnect(false); } else { //long timeout = _options.getResendDelay() << numSends; - long timeout = _options.getRTT() << numSends; + long timeout = _options.getRTT() << (numSends-1); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Scheduling resend in " + timeout + "ms"); + _log.debug("Scheduling resend in " + timeout + "ms for " + _packet); SimpleTimer.getInstance().addEvent(ResendPacketEvent.this, timeout); } } else { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Packet acked before resend: " + _packet + " on " + Connection.this); + _log.debug("Packet acked before resend (resend="+ resend + "): " + + _packet + " on " + Connection.this); } } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java index d98ff0d95..1dc8747e0 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java @@ -1,6 +1,7 @@ package net.i2p.client.streaming; import java.io.InterruptedIOException; +import java.io.IOException; import net.i2p.I2PAppContext; import net.i2p.util.Log; @@ -11,16 +12,16 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { private I2PAppContext _context; private Log _log; private Connection _connection; + private MessageOutputStream.WriteStatus _dummyStatus; public ConnectionDataReceiver(I2PAppContext ctx, Connection con) { _context = ctx; _log = ctx.logManager().getLog(ConnectionDataReceiver.class); _connection = con; + _dummyStatus = new DummyStatus(); } - public void writeData(byte[] buf, int off, int size) throws InterruptedIOException { - if (!_connection.packetSendChoke()) - throw new InterruptedIOException("Timeout expired waiting to write"); + public MessageOutputStream.WriteStatus writeData(byte[] buf, int off, int size) { boolean doSend = true; if ( (size <= 0) && (_connection.getLastSendId() >= 0) ) { if (_connection.getOutputStream().getClosed()) { @@ -45,15 +46,18 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { + " con: " + _connection, new Exception("write called by")); if (doSend) { - send(buf, off, size); + PacketLocal packet = send(buf, off, size); + return packet; } else { - //_connection.flushPackets(); + return _dummyStatus; } } - public void send(byte buf[], int off, int size) { + + public PacketLocal send(byte buf[], int off, int size) { PacketLocal packet = buildPacket(buf, off, size); _connection.sendPacket(packet); + return packet; } private boolean isAckOnly(int size) { @@ -67,7 +71,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { private PacketLocal buildPacket(byte buf[], int off, int size) { boolean ackOnly = isAckOnly(size); - PacketLocal packet = new PacketLocal(_context, _connection.getRemotePeer()); + PacketLocal packet = new PacketLocal(_context, _connection.getRemotePeer(), _connection); byte data[] = new byte[size]; if (size > 0) System.arraycopy(buf, off, data, 0, size); @@ -79,8 +83,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { packet.setSendStreamId(_connection.getSendStreamId()); packet.setReceiveStreamId(_connection.getReceiveStreamId()); - packet.setAckThrough(_connection.getInputStream().getHighestBlockId()); - packet.setNacks(_connection.getInputStream().getNacks()); + _connection.getInputStream().updateAcks(packet); packet.setOptionalDelay(_connection.getOptions().getChoke()); packet.setOptionalMaxSize(_connection.getOptions().getMaxMessageSize()); packet.setResendDelay(_connection.getOptions().getResendDelay()); @@ -103,10 +106,18 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { if (_log.shouldLog(Log.DEBUG)) _log.debug("Closed is set for a new packet on " + _connection + ": " + packet); } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Closed is not set for a new packet on " + _connection + ": " + packet); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("Closed is not set for a new packet on " + _connection + ": " + packet); } return packet; } + + private static final class DummyStatus implements MessageOutputStream.WriteStatus { + public final void waitForAccept(int maxWaitMs) { return; } + public final void waitForCompletion(int maxWaitMs) { return; } + public final boolean writeAccepted() { return true; } + public final boolean writeFailed() { return false; } + public final boolean writeSuccessful() { return true; } + } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java index 9bee67a1e..4c4277985 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java @@ -77,7 +77,20 @@ class ConnectionHandler { } if (syn != null) { - return _manager.receiveConnection(syn); + // deal with forged / invalid syn packets + Connection con = _manager.receiveConnection(syn); + if (con != null) { + return con; + } else if (timeoutMs > 0) { + long remaining = expiration - _context.clock().now(); + if (remaining <= 0) { + return null; + } else { + return accept(remaining); + } + } else { + return accept(timeoutMs); + } } else { return null; } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java index 1348bf44e..f888e06c3 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java @@ -7,6 +7,7 @@ import java.util.Map; import java.util.Set; import net.i2p.I2PAppContext; +import net.i2p.I2PException; import net.i2p.client.I2PSession; import net.i2p.data.ByteArray; import net.i2p.data.Destination; @@ -90,7 +91,14 @@ public class ConnectionManager { } con.setReceiveStreamId(receiveId); - con.getPacketHandler().receivePacket(synPacket, con); + try { + con.getPacketHandler().receivePacket(synPacket, con); + } catch (I2PException ie) { + synchronized (_connectionLock) { + _connectionByInboundId.remove(new ByteArray(receiveId)); + } + return null; + } return con; } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java index e72926d6f..cac9fd199 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java @@ -52,10 +52,10 @@ public class ConnectionOptions extends I2PSocketOptions { setConnectDelay(2*1000); setProfile(PROFILE_BULK); setMaxMessageSize(Packet.MAX_PAYLOAD_SIZE); - setRTT(5*1000); + setRTT(30*1000); setReceiveWindow(1); setResendDelay(5*1000); - setSendAckDelay(1*1000); + setSendAckDelay(2*1000); setWindowSize(1); setMaxResends(10); setWriteTimeout(-1); @@ -102,7 +102,11 @@ public class ConnectionOptions extends I2PSocketOptions { * What to set the round trip time estimate to (in milliseconds) */ public int getRTT() { return _rtt; } - public void setRTT(int ms) { _rtt = ms; } + public void setRTT(int ms) { + _rtt = ms; + if (_rtt > 60*1000) + _rtt = 60*1000; + } /** How long after sending a packet will we wait before resending? */ public int getResendDelay() { return _resendDelay; } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java index a788ffa18..82193aa1b 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -3,6 +3,7 @@ package net.i2p.client.streaming; import java.util.List; import net.i2p.I2PAppContext; +import net.i2p.I2PException; import net.i2p.data.DataHelper; import net.i2p.data.Destination; import net.i2p.util.Log; @@ -25,7 +26,7 @@ public class ConnectionPacketHandler { } /** distribute a packet to the connection specified */ - void receivePacket(Packet packet, Connection con) { + void receivePacket(Packet packet, Connection con) throws I2PException { boolean ok = verifyPacket(packet, con); if (!ok) return; boolean isNew = con.getInputStream().messageReceived(packet.getSequenceNum(), packet.getPayload()); @@ -36,15 +37,17 @@ public class ConnectionPacketHandler { if (isNew) { con.incrementUnackedPacketsReceived(); - long nextTime = con.getNextSendTime(); - if (nextTime <= 0) { - con.setNextSendTime(con.getOptions().getSendAckDelay() + _context.clock().now()); + if (packet.isFlagSet(Packet.FLAG_DELAY_REQUESTED) && (packet.getOptionalDelay() <= 0) ) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Scheduling ack in " + con.getOptions().getSendAckDelay() + "ms for received packet " + packet); + _log.debug("Scheduling immediate ack for " + packet); + con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay()); } else { + int delay = con.getOptions().getSendAckDelay(); + if (packet.isFlagSet(Packet.FLAG_DELAY_REQUESTED)) // delayed ACK requested + delay += packet.getOptionalDelay(); + con.setNextSendTime(delay + _context.clock().now()); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Ack is already scheduled in " + (nextTime-_context.clock().now()) - + "ms, though we just received " + packet); + _log.debug("Scheduling ack in " + delay + "ms for received packet " + packet); } } else { if (packet.getSequenceNum() > 0) { @@ -54,9 +57,15 @@ public class ConnectionPacketHandler { if (_log.shouldLog(Log.WARN)) _log.warn("congestion.. dup " + packet); con.incrementUnackedPacketsReceived(); + con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay()); } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("ACK only packet received: " + packet); + if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) { + con.incrementUnackedPacketsReceived(); + con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay()); + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("ACK only packet received: " + packet); + } } } @@ -65,14 +74,14 @@ public class ConnectionPacketHandler { if ( (acked != null) && (acked.size() > 0) ) { if (_log.shouldLog(Log.DEBUG)) _log.debug(acked.size() + " of our packets acked with " + packet); - // use the lowest RTT, since these would likely be bunched together, - // waiting for the most recent packet received before sending the ACK - int lowestRtt = -1; + // use the highest RTT, since these would likely be bunched together, + // and the highest rtt lets us set our resend delay properly + int highestRTT = -1; for (int i = 0; i < acked.size(); i++) { PacketLocal p = (PacketLocal)acked.get(i); - if ( (lowestRtt < 0) || (p.getAckTime() < lowestRtt) ) { + if (p.getAckTime() > highestRTT) { //if (p.getNumSends() <= 1) - lowestRtt = p.getAckTime(); + highestRTT = p.getAckTime(); } if (p.getNumSends() > 1) @@ -88,24 +97,25 @@ public class ConnectionPacketHandler { if (_log.shouldLog(Log.DEBUG)) _log.debug("Packet acked after " + p.getAckTime() + "ms: " + p); } - if (lowestRtt > 0) { + if (highestRTT > 0) { int oldRTT = con.getOptions().getRTT(); - int newRTT = (int)(RTT_DAMPENING*oldRTT + (1-RTT_DAMPENING)*lowestRtt); + int newRTT = (int)(RTT_DAMPENING*oldRTT + (1-RTT_DAMPENING)*highestRTT); con.getOptions().setRTT(newRTT); } } - boolean fastAck = adjustWindow(con, isNew, packet.getSequenceNum(), numResends); + boolean fastAck = adjustWindow(con, isNew, packet.getSequenceNum(), numResends, (acked != null ? acked.size() : 0)); con.eventOccurred(); if (fastAck) { if (con.getLastSendTime() + con.getOptions().getRTT() < _context.clock().now()) { - _log.error("Fast ack for dup " + packet); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Fast ack for dup " + packet); con.ackImmediately(); } } } - private boolean adjustWindow(Connection con, boolean isNew, long sequenceNum, int numResends) { + private boolean adjustWindow(Connection con, boolean isNew, long sequenceNum, int numResends, int acked) { if ( (!isNew) && (sequenceNum > 0) ) { // dup real packet int oldSize = con.getOptions().getWindowSize(); @@ -115,22 +125,17 @@ public class ConnectionPacketHandler { con.getOptions().setWindowSize(oldSize); return true; } else if (numResends > 0) { - int newWindowSize = con.getOptions().getWindowSize(); - newWindowSize /= 2; // >>>= numResends; - if (newWindowSize <= 0) - newWindowSize = 1; - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Shrink the window to " + newWindowSize + " (#resends: " + numResends - + ") for " + con); - con.getOptions().setWindowSize(newWindowSize); + // window sizes are shrunk on resend, not on ack } else { - // new packet that ack'ed uncongested data, or an empty ack - int newWindowSize = con.getOptions().getWindowSize(); - newWindowSize += 1; //acked.size(); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("New window size " + newWindowSize + " (#resends: " + numResends - + ") for " + con); - con.getOptions().setWindowSize(newWindowSize); + if (acked > 0) { + // new packet that ack'ed uncongested data, or an empty ack + int newWindowSize = con.getOptions().getWindowSize(); + newWindowSize += 1; // acked; // 1 + if (_log.shouldLog(Log.DEBUG)) + _log.debug("New window size " + newWindowSize + " (#resends: " + numResends + + ") for " + con); + con.getOptions().setWindowSize(newWindowSize); + } } return false; } @@ -141,12 +146,12 @@ public class ConnectionPacketHandler { * @return true if the packet is ok for this connection, false if we shouldn't * continue processing. */ - private boolean verifyPacket(Packet packet, Connection con) { + private boolean verifyPacket(Packet packet, Connection con) throws I2PException { if (packet.isFlagSet(Packet.FLAG_RESET)) { verifyReset(packet, con); return false; } else { - boolean sigOk = verifySignature(packet, con); + verifySignature(packet, con); if (con.getSendStreamId() == null) { if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) { @@ -204,9 +209,9 @@ public class ConnectionPacketHandler { /** * Verify the signature if necessary. * - * @return false only if the signature was required and it was invalid + * @throws I2PException if the signature was necessary and it was invalid */ - private boolean verifySignature(Packet packet, Connection con) { + private void verifySignature(Packet packet, Connection con) throws I2PException { // verify the signature if necessary if (con.getOptions().getRequireFullySigned() || packet.isFlagSet(Packet.FLAG_SYNCHRONIZE) || @@ -217,11 +222,8 @@ public class ConnectionPacketHandler { from = packet.getOptionalFrom(); boolean sigOk = packet.verifySignature(_context, from, null); if (!sigOk) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Received unsigned / forged packet: " + packet); - return false; + throw new I2PException("Received unsigned / forged packet: " + packet); } } - return true; } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java index 3069ca03e..433103487 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java @@ -90,17 +90,20 @@ public class MessageInputStream extends InputStream { * */ public long[] getNacks() { - List ids = null; synchronized (_dataLock) { - for (long i = _highestReadyBlockId + 1; i < _highestBlockId; i++) { - Long l = new Long(i); - if (_notYetReadyBlocks.containsKey(l)) { - // ACK - } else { - if (ids == null) - ids = new ArrayList(4); - ids.add(l); - } + return locked_getNacks(); + } + } + private long[] locked_getNacks() { + List ids = null; + for (long i = _highestReadyBlockId + 1; i < _highestBlockId; i++) { + Long l = new Long(i); + if (_notYetReadyBlocks.containsKey(l)) { + // ACK + } else { + if (ids == null) + ids = new ArrayList(4); + ids.add(l); } } if (ids != null) { @@ -113,6 +116,13 @@ public class MessageInputStream extends InputStream { } } + public void updateAcks(PacketLocal packet) { + synchronized (_dataLock) { + packet.setAckThrough(_highestBlockId); + packet.setNacks(locked_getNacks()); + } + } + /** * Ascending list of block IDs greater than the highest * ready block ID, or null if there aren't any. diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java index 6b62083e3..784a8de7b 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java @@ -1,6 +1,7 @@ package net.i2p.client.streaming; import java.io.IOException; +import java.io.InterruptedIOException; import java.io.OutputStream; import net.i2p.I2PAppContext; @@ -19,6 +20,7 @@ public class MessageOutputStream extends OutputStream { private IOException _streamError; private boolean _closed; private long _written; + private int _writeTimeout; public MessageOutputStream(I2PAppContext ctx, DataReceiver receiver) { this(ctx, receiver, Packet.MAX_PAYLOAD_SIZE); @@ -32,8 +34,12 @@ public class MessageOutputStream extends OutputStream { _dataLock = new Object(); _written = 0; _closed = false; + _writeTimeout = -1; } + public void setWriteTimeout(int ms) { _writeTimeout = ms; } + public int getWriteTimeout() { return _writeTimeout; } + public void write(byte b[]) throws IOException { write(b, 0, b.length); } @@ -41,10 +47,15 @@ public class MessageOutputStream extends OutputStream { public void write(byte b[], int off, int len) throws IOException { //if (_log.shouldLog(Log.DEBUG)) // _log.debug("write(b[], " + off + ", " + len + ")"); - synchronized (_dataLock) { - int cur = off; - int remaining = len; - while (remaining > 0) { + int cur = off; + int remaining = len; + while (remaining > 0) { + WriteStatus ws = null; + // we do any waiting outside the synchronized() block because we + // want to allow other threads to flushAvailable() whenever they want. + // this is the only method that *adds* to the _buf, and all + // code that reads from it is synchronized + synchronized (_dataLock) { if (_valid + remaining < _buf.length) { // simply buffer the data, no flush System.arraycopy(b, cur, _buf, _valid, remaining); @@ -52,8 +63,6 @@ public class MessageOutputStream extends OutputStream { cur += remaining; _written += remaining; remaining = 0; - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("write(...): appending valid = " + _valid + " remaining=" + remaining); } else { // buffer whatever we can fit then flush, // repeating until we've pushed all of the @@ -63,19 +72,24 @@ public class MessageOutputStream extends OutputStream { remaining -= toWrite; cur += toWrite; _valid = _buf.length; - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("write(...): flushing valid = " + _valid + " remaining=" + remaining); - // this blocks until the packet is ack window is open. it - // also throws InterruptedIOException if the write timeout - // expires - _dataReceiver.writeData(_buf, 0, _valid); - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("write(...): flushing complete valid = " + _valid + " remaining=" + remaining); + ws = _dataReceiver.writeData(_buf, 0, _valid); _written += _valid; _valid = 0; throwAnyError(); } } + if (ws != null) { + // ok, we've actually added a new packet - lets wait until + // its accepted into the queue before moving on (so that we + // dont fill our buffer instantly) + ws.waitForAccept(_writeTimeout); + if (!ws.writeAccepted()) { + if (_writeTimeout > 0) + throw new InterruptedIOException("Write not accepted within timeout"); + else + throw new IOException("Write not accepted into the queue"); + } + } } throwAnyError(); } @@ -86,19 +100,19 @@ public class MessageOutputStream extends OutputStream { } public void flush() throws IOException { + WriteStatus ws = null; synchronized (_dataLock) { - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("flush(): valid = " + _valid); - // this blocks until the packet is ack window is open. it - // also throws InterruptedIOException if the write timeout - // expires - _dataReceiver.writeData(_buf, 0, _valid); + ws = _dataReceiver.writeData(_buf, 0, _valid); _written += _valid; - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("flush(): valid = " + _valid + " complete"); _valid = 0; _dataLock.notifyAll(); } + + ws.waitForCompletion(_writeTimeout); + if (ws.writeFailed() && (_writeTimeout > 0) ) + throw new InterruptedIOException("Timed out during write"); + else if (ws.writeFailed()) + throw new IOException("Write failed"); throwAnyError(); } @@ -107,6 +121,19 @@ public class MessageOutputStream extends OutputStream { flush(); _log.debug("Output stream closed after writing " + _written); } + public void closeInternal() { + _closed = true; + _streamError = new IOException("Closed internally"); + synchronized (_dataLock) { + // flush any data, but don't wait for it + if (_valid > 0) { + _dataReceiver.writeData(_buf, 0, _valid); + _written += _valid; + _valid = 0; + } + _dataLock.notifyAll(); + } + } public boolean getClosed() { return _closed; } @@ -126,17 +153,49 @@ public class MessageOutputStream extends OutputStream { * called whenever the engine wants to push more data to the * peer * + * @return true if the data was flushed */ void flushAvailable(DataReceiver target) throws IOException { + flushAvailable(target, true); + } + void flushAvailable(DataReceiver target, boolean blocking) throws IOException { + WriteStatus ws = null; synchronized (_dataLock) { - target.writeData(_buf, 0, _valid); + ws = target.writeData(_buf, 0, _valid); _written += _valid; _valid = 0; _dataLock.notifyAll(); } + if (blocking) { + ws.waitForAccept(_writeTimeout); + if (ws.writeFailed()) + throw new IOException("Flush available failed"); + else if (!ws.writeAccepted()) + throw new InterruptedIOException("Flush available timed out"); + } + return; } public interface DataReceiver { - public void writeData(byte buf[], int off, int size) throws IOException; + /** + * Nonblocking write + */ + public WriteStatus writeData(byte buf[], int off, int size); + } + + public interface WriteStatus { + /** wait until the data written either fails or succeeds */ + public void waitForCompletion(int maxWaitMs); + /** + * wait until the data written is accepted into the outbound pool, + * which we throttle rather than accept arbitrary data and queue + */ + public void waitForAccept(int maxWaitMs); + /** was the write accepted? aka did the socket not close? */ + public boolean writeAccepted(); + /** did the write fail? */ + public boolean writeFailed(); + /** did the write succeed? */ + public boolean writeSuccessful(); } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java index 292f75c32..7df101e09 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java @@ -133,6 +133,7 @@ public class Packet { public static final int FLAG_ECHO = (1 << 9); public static final int DEFAULT_MAX_SIZE = 32*1024; + private static final int MAX_DELAY_REQUEST = 65535; /** what stream is this packet a part of? */ public byte[] getSendStreamId() { @@ -236,9 +237,14 @@ public class Packet { * set) */ public int getOptionalDelay() { return _optionDelay; } - public void setOptionalDelay(int delayMs) { - setFlag(FLAG_DELAY_REQUESTED, delayMs > 0); - _optionDelay = delayMs; + public void setOptionalDelay(int delayMs) { + setFlag(FLAG_DELAY_REQUESTED, delayMs > 0); + if (delayMs > MAX_DELAY_REQUEST) + _optionDelay = MAX_DELAY_REQUEST; + else if (delayMs < 0) + _optionDelay = 0; + else + _optionDelay = delayMs; } /** @@ -298,7 +304,7 @@ public class Packet { int optionSize = 0; if (isFlagSet(FLAG_DELAY_REQUESTED)) - optionSize += 1; + optionSize += 2; if (isFlagSet(FLAG_FROM_INCLUDED)) optionSize += _optionFrom.size(); if (isFlagSet(FLAG_MAX_PACKET_SIZE_INCLUDED)) @@ -310,8 +316,8 @@ public class Packet { cur += 2; if (isFlagSet(FLAG_DELAY_REQUESTED)) { - DataHelper.toLong(buffer, cur, 1, _optionDelay > 0 ? _optionDelay : 0); - cur++; + DataHelper.toLong(buffer, cur, 2, _optionDelay > 0 ? _optionDelay : 0); + cur += 2; } if (isFlagSet(FLAG_FROM_INCLUDED)) { cur += _optionFrom.writeBytes(buffer, cur); @@ -361,7 +367,7 @@ public class Packet { size += 2; // flags if (isFlagSet(FLAG_DELAY_REQUESTED)) - size += 1; + size += 2; if (isFlagSet(FLAG_FROM_INCLUDED)) size += _optionFrom.size(); if (isFlagSet(FLAG_MAX_PACKET_SIZE_INCLUDED)) @@ -428,8 +434,8 @@ public class Packet { // ok now lets go back and deal with the options if (isFlagSet(FLAG_DELAY_REQUESTED)) { - _optionDelay = (int)DataHelper.fromLong(buffer, cur, 1); - cur++; + _optionDelay = (int)DataHelper.fromLong(buffer, cur, 2); + cur += 2; } if (isFlagSet(FLAG_FROM_INCLUDED)) { _optionFrom = new Destination(); @@ -458,10 +464,20 @@ public class Packet { if (!isFlagSet(FLAG_SIGNATURE_INCLUDED)) return false; if (_optionSignature == null) return false; + int size = writtenSize(); + if (buffer == null) - buffer = new byte[writtenSize()]; - int size = writePacket(buffer, 0, false); - return ctx.dsa().verifySignature(_optionSignature, buffer, 0, size, from.getSigningPublicKey()); + buffer = new byte[size]; + int written = writePacket(buffer, 0, false); + if (written != size) { + ctx.logManager().getLog(Packet.class).error("Written " + written + " size " + size + " for " + toString(), new Exception("moo")); + return false; + } + boolean ok = ctx.dsa().verifySignature(_optionSignature, buffer, 0, size, from.getSigningPublicKey()); + if (!ok) { + ctx.logManager().getLog(Packet.class).error("Signature failed with sig " + Base64.encode(_optionSignature.getData()), new Exception("moo")); + } + return ok; } /** @@ -485,7 +501,7 @@ public class Packet { + 1 // resendDelay + 2 // flags + 2 // optionSize - + (isFlagSet(FLAG_DELAY_REQUESTED) ? 1 : 0) + + (isFlagSet(FLAG_DELAY_REQUESTED) ? 2 : 0) + (isFlagSet(FLAG_FROM_INCLUDED) ? _optionFrom.size() : 0) + (isFlagSet(FLAG_MAX_PACKET_SIZE_INCLUDED) ? 2 : 0); System.arraycopy(_optionSignature.getData(), 0, buffer, signatureOffset, Signature.SIGNATURE_BYTES); @@ -497,7 +513,11 @@ public class Packet { buf.append(toId(_sendStreamId)); //buf.append("<-->"); buf.append(toId(_receiveStreamId)).append(": #").append(_sequenceNum); - buf.append(" ").append(toFlagString()); + if (_sequenceNum < 10) + buf.append(" \t"); // so the tab lines up right + else + buf.append('\t'); + buf.append(toFlagString()); buf.append(" ACK ").append(_ackThrough); if (_nacks != null) { buf.append(" NACK"); @@ -520,7 +540,7 @@ public class Packet { private final String toFlagString() { StringBuffer buf = new StringBuffer(32); if (isFlagSet(FLAG_CLOSE)) buf.append(" CLOSE"); - if (isFlagSet(FLAG_DELAY_REQUESTED)) buf.append(" DELAY"); + if (isFlagSet(FLAG_DELAY_REQUESTED)) buf.append(" DELAY ").append(_optionDelay); if (isFlagSet(FLAG_ECHO)) buf.append(" ECHO"); if (isFlagSet(FLAG_FROM_INCLUDED)) buf.append(" FROM"); if (isFlagSet(FLAG_MAX_PACKET_SIZE_INCLUDED)) buf.append(" MS"); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java index 3ae1a2430..5f3246ff5 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java @@ -6,9 +6,11 @@ import java.util.Set; import java.text.SimpleDateFormat; import net.i2p.I2PAppContext; +import net.i2p.I2PException; import net.i2p.data.Base64; import net.i2p.data.DataHelper; import net.i2p.util.Log; +import net.i2p.util.SimpleTimer; /** * receive a packet and dispatch it correctly to the connection specified, @@ -19,35 +21,71 @@ public class PacketHandler { private ConnectionManager _manager; private I2PAppContext _context; private Log _log; + private int _lastDelay; public PacketHandler(I2PAppContext ctx, ConnectionManager mgr) { _manager = mgr; _context = ctx; _log = ctx.logManager().getLog(PacketHandler.class); + _lastDelay = _context.random().nextInt(30*1000); } private boolean choke(Packet packet) { if (false) { - // artificial choke: 2% random drop and a 1s - // random delay + // artificial choke: 2% random drop and a 0-30s + // random tiered delay from 0-30s if (_context.random().nextInt(100) >= 98) { - _log.error("DROP: " + packet); + displayPacket(packet, "DROP"); return false; } else { - int delay = _context.random().nextInt(1000); - try { Thread.sleep(delay); } catch (InterruptedException ie) {} - _log.debug("OK : " + packet + " delay = " + delay); - return true; + /* + int delay = _context.random().nextInt(5*1000); + */ + int delay = _context.random().nextInt(6*1000); + int delayFactor = _context.random().nextInt(100); + if (delayFactor > 80) { + if (delayFactor > 98) + delay *= 5; + else if (delayFactor > 95) + delay *= 4; + else if (delayFactor > 90) + delay *= 3; + else + delay *= 2; + } + + if (_context.random().nextInt(100) >= 20) + delay = _lastDelay; + + _lastDelay = delay; + SimpleTimer.getInstance().addEvent(new Reinject(packet, delay), delay); + return false; } } else { return true; } } + private class Reinject implements SimpleTimer.TimedEvent { + private Packet _packet; + private int _delay; + public Reinject(Packet packet, int delay) { + _packet = packet; + _delay = delay; + } + public void timeReached() { + _log.debug("Reinjecting after " + _delay + ": " + _packet); + receivePacketDirect(_packet); + } + } + void receivePacket(Packet packet) { boolean ok = choke(packet); - if (!ok) return; - + if (ok) + receivePacketDirect(packet); + } + + private void receivePacketDirect(Packet packet) { if (_log.shouldLog(Log.DEBUG)) _log.debug("packet received: " + packet); @@ -58,21 +96,20 @@ public class PacketHandler { Connection con = (sendId != null ? _manager.getConnectionByInboundId(sendId) : null); if (con != null) { receiveKnownCon(con, packet); - displayPacket(packet, con); + displayPacket(packet, "RECV"); } else { receiveUnknownCon(packet, sendId); - displayPacket(packet, null); + displayPacket(packet, "UNKN"); } } - private void displayPacket(Packet packet, Connection con) { - if (_log.shouldLog(Log.DEBUG)) { - SimpleDateFormat fmt = new SimpleDateFormat("hh:mm:ss.SSS"); - String now = fmt.format(new Date()); - String msg = packet + (con != null ? " on " + con : " on unknown con"); - //_log.debug(msg); - System.out.println(now + ": " + msg); + private static final SimpleDateFormat _fmt = new SimpleDateFormat("hh:mm:ss.SSS"); + static void displayPacket(Packet packet, String prefix) { + String msg = null; + synchronized (_fmt) { + msg = _fmt.format(new Date()) + ": " + prefix + " " + packet.toString(); } + System.out.println(msg); } private void receiveKnownCon(Connection con, Packet packet) { @@ -81,19 +118,36 @@ public class PacketHandler { // the packet's receive stream ID also matches what we expect if (_log.shouldLog(Log.DEBUG)) _log.debug("receive valid: " + packet); - con.getPacketHandler().receivePacket(packet, con); + try { + con.getPacketHandler().receivePacket(packet, con); + } catch (I2PException ie) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Received forged packet for " + con, ie); + } } else { if (packet.isFlagSet(Packet.FLAG_RESET)) { // refused if (_log.shouldLog(Log.DEBUG)) _log.debug("receive reset: " + packet); - con.getPacketHandler().receivePacket(packet, con); + try { + con.getPacketHandler().receivePacket(packet, con); + } catch (I2PException ie) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Received forged reset for " + con, ie); + } } else if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) { if ( (con.getSendStreamId() == null) || (DataHelper.eq(con.getSendStreamId(), packet.getReceiveStreamId())) ) { + byte oldId[] =con.getSendStreamId(); // con fully established, w00t con.setSendStreamId(packet.getReceiveStreamId()); - con.getPacketHandler().receivePacket(packet, con); + try { + con.getPacketHandler().receivePacket(packet, con); + } catch (I2PException ie) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Received forged syn for " + con, ie); + con.setSendStreamId(oldId); + } } else { if (_log.shouldLog(Log.WARN)) _log.warn("Receive a syn packet with the wrong IDs: " + packet); @@ -146,7 +200,7 @@ public class PacketHandler { Set cons = _manager.listConnections(); for (Iterator iter = cons.iterator(); iter.hasNext(); ) { Connection con = (Connection)iter.next(); - buf.append(Base64.encode(con.getReceiveStreamId())).append(" "); + buf.append(con.toString()).append(" "); } _log.warn("Packet belongs to no other cons: " + packet + " connections: " + buf.toString() + " sendId: " diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java index 8af6eb713..d53f8223f 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java @@ -10,21 +10,29 @@ import net.i2p.data.SessionKey; * coordinate local attributes about a packet - send time, ack time, number of * retries, etc. */ -public class PacketLocal extends Packet { +public class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { private I2PAppContext _context; + private Connection _connection; private Destination _to; private SessionKey _keyUsed; private Set _tagsSent; private long _createdOn; private int _numSends; private long _lastSend; + private long _acceptedOn; private long _ackOn; + private long _cancelledOn; public PacketLocal(I2PAppContext ctx, Destination to) { + this(ctx, to, null); + } + public PacketLocal(I2PAppContext ctx, Destination to, Connection con) { _context = ctx; _createdOn = ctx.clock().now(); _to = to; + _connection = con; _lastSend = -1; + _cancelledOn = -1; } public Destination getTo() { return _to; } @@ -50,15 +58,31 @@ public class PacketLocal extends Packet { isFlagSet(FLAG_CLOSE); } + /** last minute update of ack fields, just before write/sign */ + public void prepare() { + if (_connection != null) + _connection.getInputStream().updateAcks(this); + } + public long getCreatedOn() { return _createdOn; } public void incrementSends() { _numSends++; _lastSend = _context.clock().now(); } public void ackReceived() { - if (_ackOn <= 0) - _ackOn = _context.clock().now(); + synchronized (this) { + if (_ackOn <= 0) + _ackOn = _context.clock().now(); + notifyAll(); + } } + public void cancelled() { + synchronized (this) { + _cancelledOn = _context.clock().now(); + notifyAll(); + } + } + /** how long after packet creation was it acked? */ public int getAckTime() { if (_ackOn <= 0) @@ -68,6 +92,7 @@ public class PacketLocal extends Packet { } public int getNumSends() { return _numSends; } public long getLastSend() { return _lastSend; } + public Connection getConnection() { return _connection; } public String toString() { String str = super.toString(); @@ -76,4 +101,32 @@ public class PacketLocal extends Packet { else return str; } + + public void waitForAccept(int maxWaitMs) { + if (_connection == null) + throw new IllegalStateException("Cannot wait for accept with no connection"); + long expiration = _context.clock().now()+maxWaitMs; + boolean accepted = _connection.packetSendChoke(maxWaitMs); + if (accepted) + _acceptedOn = _context.clock().now(); + else + _acceptedOn = -1; + } + + public void waitForCompletion(int maxWaitMs) { + long expiration = _context.clock().now()+maxWaitMs; + while ((maxWaitMs <= 0) || (expiration < _context.clock().now())) { + synchronized (this) { + if (_ackOn > 0) + return; + if (_cancelledOn > 0) + return; + try { wait(); } catch (InterruptedException ie) {} + } + } + } + + public boolean writeAccepted() { return _acceptedOn > 0 && _cancelledOn <= 0; } + public boolean writeFailed() { return _cancelledOn > 0; } + public boolean writeSuccessful() { return _ackOn > 0 && _cancelledOn <= 0; } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java index 7489811c9..b62117f61 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java @@ -29,6 +29,7 @@ class PacketQueue { * Add a new packet to be sent out ASAP */ public void enqueue(PacketLocal packet) { + packet.prepare(); int size = 0; if (packet.shouldSign()) size = packet.writeSignedPacket(_buf, 0, _context, _session.getPrivateKey()); @@ -42,8 +43,12 @@ class PacketQueue { if (tagsSent == null) tagsSent = new HashSet(); try { + // cache this from before sendMessage + String conStr = packet.getConnection() + ""; // this should not block! + long begin = _context.clock().now(); boolean sent = _session.sendMessage(packet.getTo(), _buf, 0, size, keyUsed, tagsSent); + long end = _context.clock().now(); if (!sent) { if (_log.shouldLog(Log.WARN)) _log.warn("Send failed for " + packet); @@ -55,13 +60,17 @@ class PacketQueue { String msg = "SEND " + packet + (tagsSent.size() > 0 ? " with " + tagsSent.size() + " tags" : "") - + " send # " + packet.getNumSends(); + + " send # " + packet.getNumSends() + + " sendTime: " + (end-begin) + + " con: " + conStr; _log.debug(msg); } + PacketHandler.displayPacket(packet, "SEND"); } } catch (I2PSessionException ise) { if (_log.shouldLog(Log.WARN)) _log.warn("Unable to send the packet " + packet, ise); } } + }