From a4546e1045091cb3473e15e13f0ccc4dbd53c1b2 Mon Sep 17 00:00:00 2001 From: zzz Date: Mon, 21 Dec 2015 17:19:40 +0000 Subject: [PATCH] SSU: Hand all messages pending after establishment to the outbound queue at once, for efficiency. This allows more aggressive combining of fragments in packets, and ensures the priority queue works as designed. Don't sort outbound messages by size, keep priority order instead. Log tweaks --- history.txt | 5 ++ .../src/net/i2p/router/RouterVersion.java | 2 +- .../transport/udp/EstablishmentManager.java | 32 ++++++++-- .../udp/OutboundMessageFragments.java | 34 ++++++++-- .../transport/udp/OutboundMessageState.java | 2 +- .../router/transport/udp/UDPTransport.java | 64 ++++++++++++++++++- 6 files changed, 123 insertions(+), 16 deletions(-) diff --git a/history.txt b/history.txt index 185feeb26..f340136d1 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,8 @@ +2015-12-21 zzz + * SSU: Hand all messages pending after establishment to the + outbound queue at once, for efficiency. + Don't sort outbound messages by size, keep priority order instead. + 2015-12-20 zzz * BuildHandler: Additional fixes (ticket #1738) * CertUtil: Add methods to export private keys diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 8ab24107c..80cfb9c84 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 17; + public final static long BUILD = 18; /** for example "-test" */ public final static String EXTRA = ""; diff --git a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java index 77fdd78ba..0fd9c83af 100644 --- a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java +++ b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java @@ -770,7 +770,7 @@ class EstablishmentManager { // so it needs to be caught in InNetMessagePool. dsm.setMessageExpiration(_context.clock().now() + DATA_MESSAGE_TIMEOUT); dsm.setMessageId(_context.random().nextLong(I2NPMessage.MAX_ID_VALUE)); - _transport.send(dsm, peer); + // sent below // just do this inline //_context.simpleTimer2().addEvent(new PublishToNewInbound(peer), 0); @@ -780,8 +780,14 @@ class EstablishmentManager { // ok, we are fine with them, send them our latest info //if (_log.shouldLog(Log.INFO)) // _log.info("Publishing to the peer after confirm plus delay (without banlist): " + peer); - sendOurInfo(peer, true); + // bundle the two messages together for efficiency + DatabaseStoreMessage dbsm = getOurInfo(); + List msgs = new ArrayList(2); + msgs.add(dsm); + msgs.add(dbsm); + _transport.send(msgs, peer); } else { + _transport.send(dsm, peer); // nuh uh. if (_log.shouldLog(Log.WARN)) _log.warn("NOT publishing to the peer after confirm plus delay (WITH banlist): " + (hash != null ? hash.toString() : "unknown")); @@ -828,12 +834,14 @@ class EstablishmentManager { _transport.setIP(remote.calculateHash(), state.getSentIP()); _context.statManager().addRateData("udp.outboundEstablishTime", state.getLifetime(), 0); + DatabaseStoreMessage dbsm = null; if (!state.isFirstMessageOurDSM()) { - sendOurInfo(peer, false); + dbsm = getOurInfo(); } else if (_log.shouldLog(Log.INFO)) { _log.info("Skipping publish: " + state); } + List msgs = new ArrayList(8); OutNetMessage msg; while ((msg = state.getNextQueuedMessage()) != null) { if (now - Router.CLOCK_FUDGE_FACTOR > msg.getExpiration()) { @@ -841,21 +849,33 @@ class EstablishmentManager { _transport.failed(msg, "Took too long to establish, but it was established"); } else { msg.timestamp("session fully established and sent"); - _transport.send(msg); + msgs.add(msg); } } + _transport.send(dbsm, msgs, peer); return peer; } +/**** private void sendOurInfo(PeerState peer, boolean isInbound) { if (_log.shouldLog(Log.INFO)) _log.info("Publishing to the peer after confirm: " + (isInbound ? " inbound con from " + peer : "outbound con to " + peer)); - + DatabaseStoreMessage m = getOurInfo(); + _transport.send(m, peer); + } +****/ + + /** + * A database store message with our router info + * @return non-null + * @since 0.9.24 split from sendOurInfo() + */ + private DatabaseStoreMessage getOurInfo() { DatabaseStoreMessage m = new DatabaseStoreMessage(_context); m.setEntry(_context.router().getRouterInfo()); m.setMessageExpiration(_context.clock().now() + DATA_MESSAGE_TIMEOUT); - _transport.send(m, peer); + return m; } /** the relay tag is a 4-byte field in the protocol */ diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java index d9cdfe01a..b0350e2bd 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java @@ -172,11 +172,12 @@ class OutboundMessageFragments { } /** - * short circuit the OutNetMessage, letting us send the establish - * complete message reliably + * Short circuit the OutNetMessage, letting us send the establish + * complete message reliably. + * If you have multiple messages, use the list variant, + * so the messages may be bundled efficiently. */ - public void add(OutboundMessageState state) { - PeerState peer = state.getPeer(); + public void add(OutboundMessageState state, PeerState peer) { if (peer == null) throw new RuntimeException("null peer for " + state); peer.add(state); @@ -184,6 +185,22 @@ class OutboundMessageFragments { //_context.statManager().addRateData("udp.outboundActiveCount", active, 0); } + /** + * Short circuit the OutNetMessage, letting us send multiple messages + * reliably and efficiently. + * @since 0.9.24 + */ + public void add(List states, PeerState peer) { + if (peer == null) + throw new RuntimeException("null peer"); + int sz = states.size(); + for (int i = 0; i < sz; i++) { + peer.add(states.get(i)); + } + add(peer); + //_context.statManager().addRateData("udp.outboundActiveCount", active, 0); + } + /** * Add the peer to the list of peers wanting to transmit something. * This wakes up the packet pusher if it is sleeping. @@ -400,8 +417,10 @@ class OutboundMessageFragments { int fragmentsToSend = toSend.size(); // sort by size, biggest first // don't bother unless more than one state (fragments are already sorted within a state) - if (fragmentsToSend > 1 && states.size() > 1) - Collections.sort(toSend, new FragmentComparator()); + // This puts the DeliveryStatusMessage after the DatabaseStoreMessage, don't do it for now. + // It also undoes the ordering of the priority queue in PeerState. + //if (fragmentsToSend > 1 && states.size() > 1) + // Collections.sort(toSend, new FragmentComparator()); List sendNext = new ArrayList(Math.min(toSend.size(), 4)); List rv = new ArrayList(toSend.size()); @@ -490,6 +509,7 @@ class OutboundMessageFragments { * Biggest first * @since 0.9.16 */ +/**** private static class FragmentComparator implements Comparator, Serializable { public int compare(Fragment l, Fragment r) { @@ -497,7 +517,9 @@ class OutboundMessageFragments { return r.state.fragmentSize(r.num) - l.state.fragmentSize(l.num); } } +****/ + /** throttle */ public interface ActiveThrottle { public void choke(Hash peer); public void unchoke(Hash peer); diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java index b700aa80d..de804ae9f 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java @@ -305,7 +305,7 @@ class OutboundMessageState implements CDPQEntry { buf.append(i).append(' '); } } - buf.append(" to: ").append(_peer.toString()); + //buf.append(" to: ").append(_peer.toString()); return buf.toString(); } } diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index 9eb45b6b7..503fb0fef 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -1828,14 +1828,74 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority } /** - * "injected" message from the EstablishmentManager + * "injected" message from the EstablishmentManager. + * If you have multiple messages, use the list variant, + * so the messages may be bundled efficiently. + * + * @param peer all messages MUST be going to this peer */ void send(I2NPMessage msg, PeerState peer) { try { OutboundMessageState state = new OutboundMessageState(_context, msg, peer); if (_log.shouldLog(Log.DEBUG)) _log.debug("Injecting a data message to a new peer: " + peer); - _fragments.add(state); + _fragments.add(state, peer); + } catch (IllegalArgumentException iae) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Shouldnt happen", new Exception("I did it")); + } + } + + /** + * "injected" message from the EstablishmentManager, + * plus pending messages to send, + * so the messages may be bundled efficiently. + * Called at end of outbound establishment. + * + * @param msg may be null if nothing to inject + * @param msgs non-null, may be empty + * @param peer all messages MUST be going to this peer + * @since 0.9.24 + */ + void send(I2NPMessage msg, List msgs, PeerState peer) { + try { + int sz = msgs.size(); + List states = new ArrayList(sz + 1); + if (msg != null) { + OutboundMessageState state = new OutboundMessageState(_context, msg, peer); + states.add(state); + } + for (int i = 0; i < sz; i++) { + OutboundMessageState state = new OutboundMessageState(_context, msgs.get(i), peer); + states.add(state); + } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Injecting " + states.size() + " data messages to a new peer: " + peer); + _fragments.add(states, peer); + } catch (IllegalArgumentException iae) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Shouldnt happen", new Exception("I did it")); + } + } + + /** + * "injected" messages from the EstablishmentManager. + * Called at end of inbound establishment. + * + * @param peer all messages MUST be going to this peer + * @since 0.9.24 + */ + void send(List msgs, PeerState peer) { + try { + int sz = msgs.size(); + List states = new ArrayList(sz); + for (int i = 0; i < sz; i++) { + OutboundMessageState state = new OutboundMessageState(_context, msgs.get(i), peer); + states.add(state); + } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Injecting " + sz + " data messages to a new peer: " + peer); + _fragments.add(states, peer); } catch (IllegalArgumentException iae) { if (_log.shouldLog(Log.WARN)) _log.warn("Shouldnt happen", new Exception("I did it"));