diff --git a/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java b/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java index 751d629a5..37c173c75 100644 --- a/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java +++ b/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java @@ -13,6 +13,36 @@ import net.i2p.util.Log; * after the delay there still isn't enough data, what is available is sent * and padded. * + * As explained in the tunnel document, the preprocessor has a lot of + * potential flexibility in delay, padding, or even reordering. + * We keep things relatively simple for now. + * + * However much of the efficiency results from the clients selecting + * the correct MTU in the streaming lib such that the maximum-size + * streaming lib message fits in an integral number of tunnel messages. + * See ConnectionOptions in the streaming lib for details. + * + * Aside from obvious goals of minimizing delay and padding, we also + * want to minimize the number of tunnel messages a message occupies, + * to minimize the impact of a router dropping a tunnel message. + * So there is some benefit in starting a message in a new tunnel message, + * especially if it will fit perfectly if we do that (a 964 or 1956 byte + * message, for example). + * + * An idea for the future... + * + * If we are in the middle of a tunnel msg and starting a new i2np msg, + * and this one won't fit, + * let's look to see if we have somthing that would fit instead + * by reordering: + * if (allocated > 0 && msg.getFragment == 0) { + * for (j = i+1, j < pending.size(); j++) { + * if it will fit and it is fragment 0 { + * msg = pending.remove(j) + * pending.add(0, msg) + * } + * } + * } */ public class BatchedPreprocessor extends TrivialPreprocessor { private Log _log; @@ -24,13 +54,18 @@ public class BatchedPreprocessor extends TrivialPreprocessor { _log = ctx.logManager().getLog(BatchedPreprocessor.class); _name = name; _pendingSince = 0; - ctx.statManager().createRateStat("tunnel.batchMultipleCount", "How many messages are batched into a tunnel message", "Tunnels", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 }); - ctx.statManager().createRateStat("tunnel.batchDelay", "How many messages were pending when the batching waited", "Tunnels", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 }); - ctx.statManager().createRateStat("tunnel.batchDelaySent", "How many messages were flushed when the batching delay completed", "Tunnels", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 }); - ctx.statManager().createRateStat("tunnel.batchCount", "How many groups of messages were flushed together", "Tunnels", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); - ctx.statManager().createRateStat("tunnel.batchDelayAmount", "How long we should wait before flushing the batch", "Tunnels", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); - ctx.statManager().createRateStat("tunnel.batchFlushRemaining", "How many messages remain after flushing", "Tunnels", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); - ctx.statManager().createRateStat("tunnel.writeDelay", "How long after a message reaches the gateway is it processed (lifetime is size)", "Tunnels", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); + ctx.statManager().createRateStat("tunnel.batchMultipleCount", "How many messages are batched into a tunnel message", "Tunnels", new long[] { 10*60*1000, 60*60*1000 }); + ctx.statManager().createRateStat("tunnel.batchDelay", "How many messages were pending when the batching waited", "Tunnels", new long[] { 10*60*1000, 60*60*1000 }); + ctx.statManager().createRateStat("tunnel.batchDelaySent", "How many messages were flushed when the batching delay completed", "Tunnels", new long[] { 10*60*1000, 60*60*1000 }); + ctx.statManager().createRateStat("tunnel.batchCount", "How many groups of messages were flushed together", "Tunnels", new long[] { 10*60*1000, 60*60*1000 }); + ctx.statManager().createRateStat("tunnel.batchDelayAmount", "How long we should wait before flushing the batch", "Tunnels", new long[] { 10*60*1000, 60*60*1000 }); + ctx.statManager().createRateStat("tunnel.batchFlushRemaining", "How many messages remain after flushing", "Tunnels", new long[] { 10*60*1000, 60*60*1000 }); + ctx.statManager().createRateStat("tunnel.writeDelay", "How long after a message reaches the gateway is it processed (lifetime is size)", "Tunnels", new long[] { 10*60*1000, 60*60*1000 }); + ctx.statManager().createRateStat("tunnel.batchSmallFragments", "How many outgoing pad bytes are in small fragments?", + "Tunnels", new long[] { 10*60*1000l, 60*60*1000l }); + ctx.statManager().createRateStat("tunnel.batchFullFragments", "How many outgoing tunnel messages use the full data area?", + "Tunnels", new long[] { 10*60*1000l, 60*60*1000l }); + ctx.statManager().createRateStat("tunnel.batchFragmentation", "Avg. number of fragments per msg", "Tunnels", new long[] { 10*60*1000, 60*60*1000 }); } private static final int FULL_SIZE = PREPROCESSED_SIZE @@ -38,11 +73,11 @@ public class BatchedPreprocessor extends TrivialPreprocessor { - 1 // 0x00 ending the padding - 4; // 4 byte checksum - private static final boolean DISABLE_BATCHING = false; + //private static final boolean DISABLE_BATCHING = false; /* not final or private so the test code can adjust */ static long DEFAULT_DELAY = 100; - /** wait up to 2 seconds before sending a small message */ + /** Wait up to this long before sending (flushing) a small tunnel message */ protected long getSendDelay() { return DEFAULT_DELAY; } /** if we have 50 messages queued that are too small, flush them anyway */ @@ -71,11 +106,11 @@ public class BatchedPreprocessor extends TrivialPreprocessor { timingBuf = new StringBuilder(128); timingBuf.append("Preprocess with " + pending.size() + " to send. "); } - if (DISABLE_BATCHING) { - if (_log.shouldLog(Log.INFO)) - _log.info("Disabled batching, pushing " + pending + " immediately"); - return super.preprocessQueue(pending, sender, rec); - } + //if (DISABLE_BATCHING) { + // if (_log.shouldLog(Log.INFO)) + // _log.info("Disabled batching, pushing " + pending + " immediately"); + // return super.preprocessQueue(pending, sender, rec); + //} long start = System.currentTimeMillis(); int batchCount = 0; @@ -112,6 +147,7 @@ public class BatchedPreprocessor extends TrivialPreprocessor { long beforeSend = System.currentTimeMillis(); _pendingSince = 0; send(pending, 0, i, sender, rec); + _context.statManager().addRateData("tunnel.batchFullFragments", 1, 0); long afterSend = System.currentTimeMillis(); if (_log.shouldLog(Log.INFO)) _log.info("Allocated=" + allocated + " so we sent " + (i+1) @@ -126,6 +162,7 @@ public class BatchedPreprocessor extends TrivialPreprocessor { if (timingBuf != null) timingBuf.append(" sent " + cur); notePreprocessing(cur.getMessageId(), cur.getFragmentNumber(), cur.getData().length, cur.getMessageIds(), "flushed allocated"); + _context.statManager().addRateData("tunnel.batchFragmentation", cur.getFragmentNumber() + 1, 0); _context.statManager().addRateData("tunnel.writeDelay", cur.getLifetime(), cur.getData().length); } if (msg.getOffset() >= msg.getData().length) { @@ -134,6 +171,7 @@ public class BatchedPreprocessor extends TrivialPreprocessor { if (timingBuf != null) timingBuf.append(" sent perfect fit " + cur).append("."); notePreprocessing(cur.getMessageId(), cur.getFragmentNumber(), msg.getData().length, msg.getMessageIds(), "flushed tail, remaining: " + pending); + _context.statManager().addRateData("tunnel.batchFragmentation", cur.getFragmentNumber() + 1, 0); _context.statManager().addRateData("tunnel.writeDelay", cur.getLifetime(), cur.getData().length); } if (i > 0) @@ -169,6 +207,7 @@ public class BatchedPreprocessor extends TrivialPreprocessor { _context.statManager().addRateData("tunnel.batchDelaySent", pending.size(), 0); send(pending, 0, pending.size()-1, sender, rec); + _context.statManager().addRateData("tunnel.batchSmallFragments", FULL_SIZE - allocated, 0); int beforeSize = pending.size(); for (int i = 0; i < pending.size(); i++) { @@ -176,6 +215,7 @@ public class BatchedPreprocessor extends TrivialPreprocessor { if (cur.getOffset() >= cur.getData().length) { pending.remove(i); notePreprocessing(cur.getMessageId(), cur.getFragmentNumber(), cur.getData().length, cur.getMessageIds(), "flushed remaining"); + _context.statManager().addRateData("tunnel.batchFragmentation", cur.getFragmentNumber() + 1, 0); _context.statManager().addRateData("tunnel.writeDelay", cur.getLifetime(), cur.getData().length); i--; } @@ -330,10 +370,15 @@ public class BatchedPreprocessor extends TrivialPreprocessor { + " leaving " + (msg.getData().length - msg.getOffset()) + " bytes for later"); } else { offset = writeSubsequentFragment(msg, target, offset); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("writing " + msg.getMessageId() + " fragment " + (msg.getFragmentNumber()-1) + if (_log.shouldLog(Log.DEBUG)) { + int frag = msg.getFragmentNumber(); + int later = msg.getData().length - msg.getOffset(); + if (later > 0) + frag--; + _log.debug("writing " + msg.getMessageId() + " fragment " + frag + ", ending at " + offset + " prev " + prevOffset - + " leaving " + (msg.getData().length - msg.getOffset()) + " bytes for later"); + + " leaving " + later + " bytes for later"); + } } } return offset; diff --git a/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java b/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java index 3cb4fc9ab..681dadec1 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java @@ -528,6 +528,9 @@ public class TunnelDispatcher implements Service { long tooOld = tooYoung - 9*60*1000; for (int i = 0; i < size; i++) { HopConfig cfg = participating.get(i); + // rare NPE seen here, guess CHS.values() isn't atomic? + if (cfg == null) + continue; long c = cfg.getRecentMessagesCount(); bw += c; bwOut += cfg.getRecentSentMessagesCount();