From 3f9bf28382abb21a1d235b35b793d5d2b79e8bea Mon Sep 17 00:00:00 2001 From: jrandom Date: Tue, 22 Mar 2005 01:38:21 +0000 Subject: [PATCH] 2005-03-21 jrandom * Fixed the tunnel fragmentation handler to deal with multiple fragments in a single message properly (rather than release the buffer into the cache after processing the first one) (duh!) * Added the batching preprocessor which will bundle together multiple small messages inside a single tunnel message by delaying their delivery up to .5s, or whenever the pending data will fill a full message, whichever comes first. This is disabled at the moment, since without the above bugfix widely deployed, lots and lots of messages would fail. * Within each tunnel pool, stick with a randomly selected peer for up to .5s before randomizing and selecting again, instead of randomizing the pool each time a tunnel is needed. --- history.txt | 15 +- .../src/net/i2p/router/RouterVersion.java | 4 +- .../src/net/i2p/router/StatisticsManager.java | 3 + .../router/tunnel/BatchedFragmentTest.java | 184 ++++++++++++ .../router/tunnel/BatchedPreprocessor.java | 193 +++++++++++++ .../tunnel/BatchedRouterPreprocessor.java | 55 ++++ .../i2p/router/tunnel/FragmentHandler.java | 14 +- .../net/i2p/router/tunnel/FragmentTest.java | 207 ++++++++++---- .../i2p/router/tunnel/FragmentedMessage.java | 10 +- .../router/tunnel/TrivialPreprocessor.java | 261 ++++++++---------- .../router/tunnel/TunnelCreatorConfig.java | 3 + .../i2p/router/tunnel/TunnelDispatcher.java | 14 +- .../net/i2p/router/tunnel/TunnelGateway.java | 15 +- .../pool/PooledTunnelCreatorConfig.java | 6 + 14 files changed, 775 insertions(+), 209 deletions(-) create mode 100644 router/java/src/net/i2p/router/tunnel/BatchedFragmentTest.java create mode 100644 router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java create mode 100644 router/java/src/net/i2p/router/tunnel/BatchedRouterPreprocessor.java diff --git a/history.txt b/history.txt index 8bfa0c3d5..0c23f1f9e 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,17 @@ -$Id: history.txt,v 1.172 2005/03/18 03:48:00 jrandom Exp $ +$Id: history.txt,v 1.173 2005/03/18 17:34:54 jrandom Exp $ + +2005-03-21 jrandom + * Fixed the tunnel fragmentation handler to deal with multiple fragments + in a single message properly (rather than release the buffer into the + cache after processing the first one) (duh!) + * Added the batching preprocessor which will bundle together multiple + small messages inside a single tunnel message by delaying their delivery + up to .5s, or whenever the pending data will fill a full message, + whichever comes first. This is disabled at the moment, since without the + above bugfix widely deployed, lots and lots of messages would fail. + * Within each tunnel pool, stick with a randomly selected peer for up to + .5s before randomizing and selecting again, instead of randomizing the + pool each time a tunnel is needed. * 2005-03-18 0.5.0.3 released diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index bef5cc0a8..599681fe9 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.166 $ $Date: 2005/03/18 03:48:01 $"; + public final static String ID = "$Revision: 1.167 $ $Date: 2005/03/18 17:34:52 $"; public final static String VERSION = "0.5.0.3"; - public final static long BUILD = 0; + public final static long BUILD = 1; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/StatisticsManager.java b/router/java/src/net/i2p/router/StatisticsManager.java index 4de9eb22e..10152286c 100644 --- a/router/java/src/net/i2p/router/StatisticsManager.java +++ b/router/java/src/net/i2p/router/StatisticsManager.java @@ -112,6 +112,9 @@ public class StatisticsManager implements Service { includeRate("tunnel.buildFailure", stats, new long[] { 60*60*1000 }); includeRate("tunnel.buildSuccess", stats, new long[] { 60*60*1000 }); + + includeRate("tunnel.batchDelaySent", stats, new long[] { 10*60*1000, 60*60*1000 }); + includeRate("tunnel.batchMultipleCount", stats, new long[] { 10*60*1000, 60*60*1000 }); includeRate("router.throttleTunnelProbTestSlow", stats, new long[] { 60*60*1000 }); includeRate("router.throttleTunnelProbTooFast", stats, new long[] { 60*60*1000 }); diff --git a/router/java/src/net/i2p/router/tunnel/BatchedFragmentTest.java b/router/java/src/net/i2p/router/tunnel/BatchedFragmentTest.java new file mode 100644 index 000000000..fd1997950 --- /dev/null +++ b/router/java/src/net/i2p/router/tunnel/BatchedFragmentTest.java @@ -0,0 +1,184 @@ +package net.i2p.router.tunnel; + +import java.util.ArrayList; +import net.i2p.I2PAppContext; +import net.i2p.data.Base64; +import net.i2p.data.Hash; +import net.i2p.data.TunnelId; +import net.i2p.data.i2np.DataMessage; +import net.i2p.data.i2np.I2NPMessage; +import net.i2p.util.Log; + +/** + * Test the batching behavior of the preprocessor with one, two, or three + * messages of various sizes and settings. + * + */ +public class BatchedFragmentTest extends FragmentTest { + + public BatchedFragmentTest() { + super(); + BatchedPreprocessor.DEFAULT_DELAY = 200; + } + + protected TunnelGateway.QueuePreprocessor createPreprocessor(I2PAppContext ctx) { + return new BatchedPreprocessor(ctx); + } + + /** + * Send a small message, wait a second, then send a large message, pushing + * the first one through immediately, with the rest of the large one passed + * after a brief delay. + * + */ + public void runBatched() { + TunnelGateway.Pending pending1 = createPending(10, false, false); + ArrayList messages = new ArrayList(); + messages.add(pending1); + + TunnelGateway.Pending pending2 = createPending(1024, false, false); + + TunnelGateway.QueuePreprocessor pre = createPreprocessor(_context); + SenderImpl sender = new SenderImpl(); + DefragmentedReceiverImpl handleReceiver = new DefragmentedReceiverImpl(pending1.getData(), pending2.getData()); + FragmentHandler handler = new FragmentHandler(_context, handleReceiver); + ReceiverImpl receiver = new ReceiverImpl(handler, 0); + byte msg[] = pending1.getData(); + _log.debug("SEND(" + msg.length + "): " + Base64.encode(msg) + " " + _context.sha().calculateHash(msg).toBase64()); + + boolean keepGoing = true; + boolean alreadyAdded = false; + while (keepGoing) { + keepGoing = pre.preprocessQueue(messages, new SenderImpl(), receiver); + if (keepGoing) { + try { Thread.sleep(150); } catch (InterruptedException ie) {} + + if (!alreadyAdded) { + messages.add(pending2); + alreadyAdded = true; + } + } + } + + if (handleReceiver.receivedOk()) + _log.info("Receive batched ok"); + else + _log.info("Failed to receive batched"); + } + + + /** + * Send a small message, wait a second, then send a large message, pushing + * the first one through immediately, with the rest of the large one passed + * after a brief delay. + * + */ + public void runBatches() { + int success = 0; + //success += testBatched(1, false, false, 1024, false, false); + // this takes a long fucking time + for (int i = 1; i <= 1024; i++) { + success += testBatched(i, false, false, 1024, false, false, 1024, false, false); + success += testBatched(i, true, false, 1024, false, false, 1024, false, false); + success += testBatched(i, true, true, 1024, false, false, 1024, false, false); + success += testBatched(i, false, false, 1024, true, false, 1024, false, false); + success += testBatched(i, true, false, 1024, true, false, 1024, false, false); + success += testBatched(i, true, true, 1024, true, false, 1024, false, false); + success += testBatched(i, false, false, 1024, true, true, 1024, false, false); + success += testBatched(i, true, false, 1024, true, true, 1024, false, false); + success += testBatched(i, true, true, 1024, true, true, 1024, false, false); + + success += testBatched(i, false, false, 1024, false, false, 1024, true, false); + success += testBatched(i, true, false, 1024, false, false, 1024, true, false); + success += testBatched(i, true, true, 1024, false, false, 1024, true, false); + success += testBatched(i, false, false, 1024, true, false, 1024, true, false); + success += testBatched(i, true, false, 1024, true, false, 1024, true, false); + success += testBatched(i, true, true, 1024, true, false, 1024, true, false); + success += testBatched(i, false, false, 1024, true, true, 1024, true, false); + success += testBatched(i, true, false, 1024, true, true, 1024, true, false); + success += testBatched(i, true, true, 1024, true, true, 1024, true, false); + + success += testBatched(i, false, false, 1024, false, false, 1024, true, true); + success += testBatched(i, true, false, 1024, false, false, 1024, true, true); + success += testBatched(i, true, true, 1024, false, false, 1024, true, true); + success += testBatched(i, false, false, 1024, true, false, 1024, true, true); + success += testBatched(i, true, false, 1024, true, false, 1024, true, true); + success += testBatched(i, true, true, 1024, true, false, 1024, true, true); + success += testBatched(i, false, false, 1024, true, true, 1024, true, true); + success += testBatched(i, true, false, 1024, true, true, 1024, true, true); + success += testBatched(i, true, true, 1024, true, true, 1024, true, true); + } + + _log.info("** Batches complete with " + success + " successful runs"); + } + + private int testBatched(int firstSize, boolean firstRouter, boolean firstTunnel, + int secondSize, boolean secondRouter, boolean secondTunnel, + int thirdSize, boolean thirdRouter, boolean thirdTunnel) { + TunnelGateway.Pending pending1 = createPending(firstSize, firstRouter, firstTunnel); + TunnelGateway.Pending pending2 = createPending(secondSize, secondRouter, secondTunnel); + TunnelGateway.Pending pending3 = createPending(thirdSize, thirdRouter, thirdTunnel); + + boolean ok = runBatch(pending1, pending2, pending3); + if (ok) { + _log.info("OK: " + firstSize + "." + firstRouter + "." + firstTunnel + + " " + secondSize + "." + secondRouter + "." + secondTunnel + + " " + thirdSize + "." + thirdRouter + "." + thirdTunnel); + return 1; + } else { + _log.info("FAIL: " + firstSize + "." + firstRouter + "." + firstTunnel + + " " + secondSize + "." + secondRouter + "." + secondTunnel + + " " + thirdSize + "." + thirdRouter + "." + thirdTunnel); + return 0; + } + } + + private boolean runBatch(TunnelGateway.Pending pending1, TunnelGateway.Pending pending2, TunnelGateway.Pending pending3) { + ArrayList messages = new ArrayList(); + messages.add(pending1); + + TunnelGateway.QueuePreprocessor pre = createPreprocessor(_context); + SenderImpl sender = new SenderImpl(); + DefragmentedReceiverImpl handleReceiver = new DefragmentedReceiverImpl(pending1.getData(), pending2.getData(), pending3.getData()); + FragmentHandler handler = new FragmentHandler(_context, handleReceiver); + ReceiverImpl receiver = new ReceiverImpl(handler, 0); + byte msg[] = pending1.getData(); + _log.debug("SEND(" + msg.length + "): " + Base64.encode(msg) + " " + _context.sha().calculateHash(msg).toBase64()); + + boolean keepGoing = true; + int added = 0; + while (keepGoing) { + keepGoing = pre.preprocessQueue(messages, new SenderImpl(), receiver); + if ( (keepGoing) || ((messages.size() == 0) && (added < 2) ) ) { + try { Thread.sleep(150); } catch (InterruptedException ie) {} + + if (added == 0) { + _log.debug("Adding pending2"); + messages.add(pending2); + added++; + keepGoing = true; + } else if (added == 1) { + _log.debug("Adding pending3"); + messages.add(pending3); + added++; + keepGoing = true; + } + } + } + + return handleReceiver.receivedOk(); + } + + + public void runTests() { + //super.runVaried(); + //super.runTests(); + //runBatched(); + runBatches(); + } + + public static void main(String args[]) { + BatchedFragmentTest t = new BatchedFragmentTest(); + t.runTests(); + } +} diff --git a/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java b/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java new file mode 100644 index 000000000..4231197c5 --- /dev/null +++ b/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java @@ -0,0 +1,193 @@ +package net.i2p.router.tunnel; + +import java.util.ArrayList; +import java.util.List; + +import net.i2p.I2PAppContext; +import net.i2p.data.ByteArray; +import net.i2p.util.Log; + +/** + * Batching preprocessor that will briefly delay the sending of a message + * if it doesn't fill up a full tunnel message, in which case it queues up + * an additional flush task. This is a very simple threshold algorithm - + * as soon as there is enough data for a full tunnel message, it is sent. If + * after the delay there still isn't enough data, what is available is sent + * and padded. + * + */ +public class BatchedPreprocessor extends TrivialPreprocessor { + private Log _log; + private long _pendingSince; + + public BatchedPreprocessor(I2PAppContext ctx) { + super(ctx); + _log = ctx.logManager().getLog(BatchedPreprocessor.class); + _pendingSince = 0; + ctx.statManager().createRateStat("tunnel.batchMultipleCount", "How many messages are batched into a tunnel message", "Tunnels", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 }); + ctx.statManager().createRateStat("tunnel.batchDelay", "How many messages were pending when the batching waited", "Tunnels", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 }); + ctx.statManager().createRateStat("tunnel.batchDelaySent", "How many messages were flushed when the batching delay completed", "Tunnels", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 }); + } + + private static final int FULL_SIZE = PREPROCESSED_SIZE + - IV_SIZE + - 1 // 0x00 ending the padding + - 4; // 4 byte checksum + + /* not final or private so the test code can adjust */ + static long DEFAULT_DELAY = 500; + /** wait up to 2 seconds before sending a small message */ + protected long getSendDelay() { return DEFAULT_DELAY; } + + public boolean preprocessQueue(List pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) { + if (_log.shouldLog(Log.INFO)) + _log.info("Preprocess queue with " + pending.size() + " to send"); + + if (getSendDelay() <= 0) { + if (_log.shouldLog(Log.INFO)) + _log.info("No batching, send all messages immediately"); + while (pending.size() > 0) { + // loops because sends may be partial + TunnelGateway.Pending msg = (TunnelGateway.Pending)pending.get(0); + send(pending, 0, 0, sender, rec); + if (msg.getOffset() >= msg.getData().length) + pending.remove(0); + } + return false; + } + + while (pending.size() > 0) { + int allocated = 0; + for (int i = 0; i < pending.size(); i++) { + TunnelGateway.Pending msg = (TunnelGateway.Pending)pending.get(i); + int instructionsSize = getInstructionsSize(msg); + instructionsSize += getInstructionAugmentationSize(msg, allocated, instructionsSize); + int curWanted = msg.getData().length - msg.getOffset() + instructionsSize; + allocated += curWanted; + if (allocated >= FULL_SIZE) { + if (allocated - curWanted + instructionsSize >= FULL_SIZE) { + // the instructions alone exceed the size, so we won't get any + // of the message into it. don't include it + i--; + msg = (TunnelGateway.Pending)pending.get(i); + allocated -= curWanted; + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Pushback of " + curWanted + " (message " + (i+1) + ")"); + } + if (_pendingSince > 0) + _context.statManager().addRateData("tunnel.batchDelaySent", pending.size(), 0); + _pendingSince = 0; + send(pending, 0, i, sender, rec); + if (_log.shouldLog(Log.INFO)) + _log.info("Allocated=" + allocated + " so we sent " + (i+1) + + " (last complete? " + (msg.getOffset() >= msg.getData().length) + ")"); + + for (int j = 0; j < i; j++) + pending.remove(0); + if (msg.getOffset() >= msg.getData().length) { + // ok, this last message fit perfectly, remove it too + pending.remove(0); + } + if (i > 0) + _context.statManager().addRateData("tunnel.batchMultipleCount", i+1, 0); + allocated = 0; + break; + } + } + + if (allocated > 0) { + // after going through the entire pending list, we still don't + // have enough data to send a full message + + if ( (_pendingSince > 0) && (_pendingSince + getSendDelay() <= _context.clock().now()) ) { + if (_log.shouldLog(Log.INFO)) + _log.info("Passed through pending list, with " + allocated + "/" + pending.size() + + " left to clean up, but we've waited, so flush"); + + // not even a full message, but we want to flush it anyway + + if (pending.size() > 1) + _context.statManager().addRateData("tunnel.batchMultipleCount", pending.size(), 0); + _context.statManager().addRateData("tunnel.batchDelaySent", pending.size(), 0); + + send(pending, 0, pending.size()-1, sender, rec); + pending.clear(); + _pendingSince = 0; + return false; + } else { + if (_log.shouldLog(Log.INFO)) + _log.info("Passed through pending list, with " + allocated + "/"+ pending.size() + + " left to clean up, but we've haven't waited, so don't flush (wait=" + + (_context.clock().now() - _pendingSince) + " / " + _pendingSince + ")"); + _context.statManager().addRateData("tunnel.batchDelay", pending.size(), 0); + if (_pendingSince <= 0) + _pendingSince = _context.clock().now(); + // not yet time to send the delayed flush + return true; + } + } else { + // ok, we sent some, but haven't gone back for another + // pass yet. keep looping + } + } + + if (_log.shouldLog(Log.INFO)) + _log.info("Sent everything on the list (pending=" + pending.size() + ")"); + + // sent everything from the pending list, no need to delayed flush + return false; + } + + /** + * Preprocess the messages from the pending list, grouping items startAt + * through sendThrough (though only part of the last one may be fully + * sent), delivering them through the sender/receiver. + * + * @param startAt first index in pending to send (inclusive) + * @param sendThrough last index in pending to send (inclusive) + */ + protected void send(List pending, int startAt, int sendThrough, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Sending " + startAt + ":" + sendThrough + " out of " + pending.size()); + byte preprocessed[] = _dataCache.acquire().getData(); + ByteArray ivBuf = _ivCache.acquire(); + byte iv[] = ivBuf.getData(); // new byte[IV_SIZE]; + _context.random().nextBytes(iv); + + int offset = 0; + offset = writeFragments(pending, startAt, sendThrough, preprocessed, offset); + // preprocessed[0:offset] now contains the fragments from the pending, + // so we need to format, pad, and rearrange according to the spec to + // generate the final preprocessed data + + preprocess(preprocessed, offset); + + sender.sendPreprocessed(preprocessed, rec); + } + + /** + * Write the fragments out of the pending list onto the target, updating + * each of the Pending message's offsets accordingly. + * + * @return new offset into the target for further bytes to be written + */ + private int writeFragments(List pending, int startAt, int sendThrough, byte target[], int offset) { + for (int i = startAt; i <= sendThrough; i++) { + TunnelGateway.Pending msg = (TunnelGateway.Pending)pending.get(i); + int prevOffset = offset; + if (msg.getOffset() == 0) { + offset = writeFirstFragment(msg, target, offset); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("writing " + msg.getMessageId() + " fragment 0, ending at " + offset + " prev " + prevOffset + + " leaving " + (msg.getData().length - msg.getOffset()) + " bytes for later"); + } else { + offset = writeSubsequentFragment(msg, target, offset); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("writing " + msg.getMessageId() + " fragment " + (msg.getFragmentNumber()-1) + + ", ending at " + offset + " prev " + prevOffset + + " leaving " + (msg.getData().length - msg.getOffset()) + " bytes for later"); + } + } + return offset; + } +} diff --git a/router/java/src/net/i2p/router/tunnel/BatchedRouterPreprocessor.java b/router/java/src/net/i2p/router/tunnel/BatchedRouterPreprocessor.java new file mode 100644 index 000000000..b8f179516 --- /dev/null +++ b/router/java/src/net/i2p/router/tunnel/BatchedRouterPreprocessor.java @@ -0,0 +1,55 @@ +package net.i2p.router.tunnel; + +import java.util.Properties; +import net.i2p.router.RouterContext; + +/** + * Honor the 'batchFrequency' tunnel pool setting or the 'router.batchFrequency' + * router config setting, and track fragmentation. + * + */ +public class BatchedRouterPreprocessor extends BatchedPreprocessor { + private RouterContext _routerContext; + private TunnelCreatorConfig _config; + + /** + * How frequently should we flush non-full messages, in milliseconds + */ + public static final String PROP_BATCH_FREQUENCY = "batchFrequency"; + public static final String PROP_ROUTER_BATCH_FREQUENCY = "router.batchFrequency"; + public static final int DEFAULT_BATCH_FREQUENCY = 0; + + public BatchedRouterPreprocessor(RouterContext ctx) { + this(ctx, null); + } + public BatchedRouterPreprocessor(RouterContext ctx, TunnelCreatorConfig cfg) { + super(ctx); + _routerContext = ctx; + _config = cfg; + } + + /** how long should we wait before flushing */ + protected long getSendDelay() { + String freq = null; + if (_config != null) { + Properties opts = _config.getOptions(); + if (opts != null) + freq = opts.getProperty(PROP_BATCH_FREQUENCY); + } else { + freq = _routerContext.getProperty(PROP_ROUTER_BATCH_FREQUENCY); + } + + if (freq != null) { + try { + return Integer.parseInt(freq); + } catch (NumberFormatException nfe) { + return DEFAULT_BATCH_FREQUENCY; + } + } + return DEFAULT_BATCH_FREQUENCY; + } + + protected void notePreprocessing(long messageId, int numFragments) { + _routerContext.messageHistory().fragmentMessage(messageId, numFragments); + } +} diff --git a/router/java/src/net/i2p/router/tunnel/FragmentHandler.java b/router/java/src/net/i2p/router/tunnel/FragmentHandler.java index bdaa5669a..b6c71fe97 100644 --- a/router/java/src/net/i2p/router/tunnel/FragmentHandler.java +++ b/router/java/src/net/i2p/router/tunnel/FragmentHandler.java @@ -33,8 +33,9 @@ public class FragmentHandler { private int _failed; /** don't wait more than 60s to defragment the partial message */ - private static final long MAX_DEFRAGMENT_TIME = 60*1000; - + static long MAX_DEFRAGMENT_TIME = 60*1000; + private static final ByteCache _cache = ByteCache.getInstance(512, TrivialPreprocessor.PREPROCESSED_SIZE); + public FragmentHandler(I2PAppContext context, DefragmentedReceiver receiver) { _context = context; _log = context.logManager().getLog(FragmentHandler.class); @@ -62,6 +63,7 @@ public class FragmentHandler { if (!ok) { _log.error("Unable to verify preprocessed data (pre.length=" + preprocessed.length + " off=" +offset + " len=" + length, new Exception("failed")); + _cache.release(new ByteArray(preprocessed)); return; } offset += HopProcessor.IV_LENGTH; // skip the IV @@ -83,6 +85,11 @@ public class FragmentHandler { if (_log.shouldLog(Log.ERROR)) _log.error("Corrupt fragment received: offset = " + offset, e); throw e; + } finally { + // each of the FragmentedMessages populated make a copy out of the + // payload, which they release separately, so we can release + // immediately + _cache.release(new ByteArray(preprocessed)); } } @@ -254,6 +261,9 @@ public class FragmentHandler { } offset += size; + + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Handling finished message " + msg.getMessageId() + " at offset " + offset); return offset; } diff --git a/router/java/src/net/i2p/router/tunnel/FragmentTest.java b/router/java/src/net/i2p/router/tunnel/FragmentTest.java index 81cee4f75..24b1b705c 100644 --- a/router/java/src/net/i2p/router/tunnel/FragmentTest.java +++ b/router/java/src/net/i2p/router/tunnel/FragmentTest.java @@ -3,6 +3,7 @@ package net.i2p.router.tunnel; import java.util.ArrayList; import net.i2p.I2PAppContext; import net.i2p.data.Base64; +import net.i2p.data.DataHelper; import net.i2p.data.Hash; import net.i2p.data.TunnelId; import net.i2p.data.i2np.DataMessage; @@ -15,12 +16,18 @@ import net.i2p.util.Log; * */ public class FragmentTest { - private I2PAppContext _context; - private Log _log; + protected I2PAppContext _context; + protected Log _log; public FragmentTest() { _context = I2PAppContext.getGlobalContext(); - _log = _context.logManager().getLog(FragmentTest.class); + _log = _context.logManager().getLog(getClass()); + _context.random().nextBoolean(); + FragmentHandler.MAX_DEFRAGMENT_TIME = 10*1000; + } + + protected TunnelGateway.QueuePreprocessor createPreprocessor(I2PAppContext ctx) { + return new TrivialPreprocessor(ctx); } /** @@ -28,23 +35,26 @@ public class FragmentTest { * */ public void runSingle() { - DataMessage m = new DataMessage(_context); - byte data[] = new byte[949]; - _context.random().nextBytes(data); - m.setData(data); - m.setUniqueId(42); - m.setMessageExpiration(_context.clock().now() + 60*1000); + TunnelGateway.Pending pending = createPending(949, false, false); ArrayList messages = new ArrayList(); - TunnelGateway.Pending pending = new TunnelGateway.Pending(m, null, null); messages.add(pending); - - TrivialPreprocessor pre = new TrivialPreprocessor(_context); + + TunnelGateway.QueuePreprocessor pre = createPreprocessor(_context); SenderImpl sender = new SenderImpl(); - FragmentHandler handler = new FragmentHandler(_context, new DefragmentedReceiverImpl(m)); + DefragmentedReceiverImpl handleReceiver = new DefragmentedReceiverImpl(pending.getData()); + FragmentHandler handler = new FragmentHandler(_context, handleReceiver); ReceiverImpl receiver = new ReceiverImpl(handler, 0); - byte msg[] = m.toByteArray(); + byte msg[] = pending.getData(); _log.debug("SEND(" + msg.length + "): " + Base64.encode(msg) + " " + _context.sha().calculateHash(msg).toBase64()); - pre.preprocessQueue(messages, new SenderImpl(), receiver); + + boolean keepGoing = true; + while (keepGoing) { + keepGoing = pre.preprocessQueue(messages, new SenderImpl(), receiver); + if (keepGoing) + try { Thread.sleep(100); } catch (InterruptedException ie) {} + } + if (handleReceiver.receivedOk()) + _log.info("received OK"); } /** @@ -52,23 +62,26 @@ public class FragmentTest { * */ public void runMultiple() { - DataMessage m = new DataMessage(_context); - byte data[] = new byte[2048]; - _context.random().nextBytes(data); - m.setData(data); - m.setUniqueId(42); - m.setMessageExpiration(_context.clock().now() + 60*1000); + TunnelGateway.Pending pending = createPending(2048, false, false); ArrayList messages = new ArrayList(); - TunnelGateway.Pending pending = new TunnelGateway.Pending(m, null, null); messages.add(pending); - TrivialPreprocessor pre = new TrivialPreprocessor(_context); + TunnelGateway.QueuePreprocessor pre = createPreprocessor(_context); SenderImpl sender = new SenderImpl(); - FragmentHandler handler = new FragmentHandler(_context, new DefragmentedReceiverImpl(m)); + DefragmentedReceiverImpl handleReceiver = new DefragmentedReceiverImpl(pending.getData()); + FragmentHandler handler = new FragmentHandler(_context, handleReceiver); ReceiverImpl receiver = new ReceiverImpl(handler, 0); - byte msg[] = m.toByteArray(); + byte msg[] = pending.getData(); _log.debug("SEND(" + msg.length + "): " + Base64.encode(msg) + " " + _context.sha().calculateHash(msg).toBase64()); - pre.preprocessQueue(messages, new SenderImpl(), receiver); + + boolean keepGoing = true; + while (keepGoing) { + keepGoing = pre.preprocessQueue(messages, new SenderImpl(), receiver); + if (keepGoing) + try { Thread.sleep(100); } catch (InterruptedException ie) {} + } + if (handleReceiver.receivedOk()) + _log.info("received OK"); } /** @@ -77,31 +90,88 @@ public class FragmentTest { * */ public void runDelayed() { - DataMessage m = new DataMessage(_context); - byte data[] = new byte[2048]; - _context.random().nextBytes(data); - m.setData(data); - m.setUniqueId(42); - m.setMessageExpiration(_context.clock().now() + 60*1000); + TunnelGateway.Pending pending = createPending(2048, false, false); ArrayList messages = new ArrayList(); - TunnelGateway.Pending pending = new TunnelGateway.Pending(m, null, null); messages.add(pending); - - TrivialPreprocessor pre = new TrivialPreprocessor(_context); + TunnelGateway.QueuePreprocessor pre = createPreprocessor(_context); SenderImpl sender = new SenderImpl(); - FragmentHandler handler = new FragmentHandler(_context, new DefragmentedReceiverImpl(m)); - ReceiverImpl receiver = new ReceiverImpl(handler, 21*1000); - byte msg[] = m.toByteArray(); + FragmentHandler handler = new FragmentHandler(_context, new DefragmentedReceiverImpl(pending.getData())); + ReceiverImpl receiver = new ReceiverImpl(handler, 11*1000); + byte msg[] = pending.getData(); _log.debug("SEND(" + msg.length + "): " + Base64.encode(msg) + " " + _context.sha().calculateHash(msg).toBase64()); - pre.preprocessQueue(messages, new SenderImpl(), receiver); + boolean keepGoing = true; + while (keepGoing) { + keepGoing = pre.preprocessQueue(messages, new SenderImpl(), receiver); + if (keepGoing) + try { Thread.sleep(100); } catch (InterruptedException ie) {} + } } - private class SenderImpl implements TunnelGateway.Sender { + public void runVaried() { + int failures = 0; + for (int i = 0; i <= 4096; i++) { + boolean ok = runVaried(i, false, false); + if (!ok) { _log.error("** processing " + i+ " w/ no router, no tunnel failed"); failures++; } + ok = runVaried(i, true, false); + if (!ok) { _log.error("** processing " + i+ " w/ router, no tunnel failed"); failures++; } + ok = runVaried(i, true, true); + if (!ok) { _log.error("** processing " + i+ " w/ router, tunnel failed"); failures++; } + else _log.info("Tests pass for size " + i); + } + if (failures == 0) + _log.info("** success after all varied tests"); + else + _log.error("** failed " + failures +" varied tests"); + } + + protected boolean runVaried(int size, boolean includeRouter, boolean includeTunnel) { + TunnelGateway.Pending pending = createPending(size, includeRouter, includeTunnel); + ArrayList messages = new ArrayList(); + messages.add(pending); + + DefragmentedReceiverImpl handleReceiver = new DefragmentedReceiverImpl(pending.getData()); + TunnelGateway.QueuePreprocessor pre = createPreprocessor(_context); + SenderImpl sender = new SenderImpl(); + FragmentHandler handler = new FragmentHandler(_context, handleReceiver); + ReceiverImpl receiver = new ReceiverImpl(handler, 0); + byte msg[] = pending.getData(); + _log.debug("SEND(" + msg.length + "): " + Base64.encode(msg) + " " + _context.sha().calculateHash(msg).toBase64()); + + boolean keepGoing = true; + while (keepGoing) { + keepGoing = pre.preprocessQueue(messages, new SenderImpl(), receiver); + if (keepGoing) + try { Thread.sleep(100); } catch (InterruptedException ie) {} + } + + return handleReceiver.receivedOk(); + } + + protected TunnelGateway.Pending createPending(int size, boolean includeRouter, boolean includeTunnel) { + DataMessage m = new DataMessage(_context); + byte data[] = new byte[size]; + _context.random().nextBytes(data); + m.setData(data); + m.setUniqueId(_context.random().nextLong(I2NPMessage.MAX_ID_VALUE)); + m.setMessageExpiration(_context.clock().now() + 60*1000); + + Hash toRouter = null; + TunnelId toTunnel = null; + if (includeRouter) { + toRouter = new Hash(new byte[Hash.HASH_LENGTH]); + _context.random().nextBytes(toRouter.getData()); + } + if (includeTunnel) + toTunnel = new TunnelId(_context.random().nextLong(TunnelId.MAX_ID_VALUE)); + return new TunnelGateway.Pending(m, toRouter, toTunnel); + } + + protected class SenderImpl implements TunnelGateway.Sender { public void sendPreprocessed(byte[] preprocessed, TunnelGateway.Receiver receiver) { receiver.receiveEncrypted(preprocessed); } } - private class ReceiverImpl implements TunnelGateway.Receiver { + protected class ReceiverImpl implements TunnelGateway.Receiver { private FragmentHandler _handler; private int _delay; public ReceiverImpl(FragmentHandler handler, int delay) { @@ -114,21 +184,62 @@ public class FragmentTest { } } - private class DefragmentedReceiverImpl implements FragmentHandler.DefragmentedReceiver { - private I2NPMessage _expected; - public DefragmentedReceiverImpl(I2NPMessage expected) { + protected class DefragmentedReceiverImpl implements FragmentHandler.DefragmentedReceiver { + private byte _expected[]; + private byte _expected2[]; + private byte _expected3[]; + private int _received; + public DefragmentedReceiverImpl(byte expected[]) { + this(expected, null); + } + public DefragmentedReceiverImpl(byte expected[], byte expected2[]) { + this(expected, expected2, null); + } + public DefragmentedReceiverImpl(byte expected[], byte expected2[], byte expected3[]) { _expected = expected; + _expected2 = expected2; + _expected3 = expected3; + _received = 0; } public void receiveComplete(I2NPMessage msg, Hash toRouter, TunnelId toTunnel) { - _log.debug("equal? " + _expected.equals(msg)); + boolean ok = false; + byte m[] = msg.toByteArray(); + if ( (_expected != null) && (DataHelper.eq(_expected, m)) ) + ok = true; + if (!ok && (_expected2 != null) && (DataHelper.eq(_expected2, m)) ) + ok = true; + if (!ok && (_expected3 != null) && (DataHelper.eq(_expected3, m)) ) + ok = true; + if (ok) + _received++; + //_log.info("** equal? " + ok); } + public boolean receivedOk() { + if ( (_expected != null) && (_expected2 != null) && (_expected3 != null) ) + return _received == 3; + else if ( (_expected != null) && (_expected2 != null) ) + return _received == 2; + else if ( (_expected != null) || (_expected2 != null) ) + return _received == 1; + else + return _received == 0; + } + } + + public void runTests() { + runVaried(); + _log.info("\n===========================Begin runSingle()\n\n"); + runSingle(); + _log.info("\n===========================Begin runMultiple()\n\n"); + runMultiple(); + _log.info("\n===========================Begin runDelayed() (should have 3 errors)\n\n"); + runDelayed(); + _log.info("\n===========================After runDelayed()\n\n"); } public static void main(String args[]) { FragmentTest t = new FragmentTest(); - t.runSingle(); - t.runMultiple(); - t.runDelayed(); + t.runTests(); } } diff --git a/router/java/src/net/i2p/router/tunnel/FragmentedMessage.java b/router/java/src/net/i2p/router/tunnel/FragmentedMessage.java index 266fe6eec..cf671c604 100644 --- a/router/java/src/net/i2p/router/tunnel/FragmentedMessage.java +++ b/router/java/src/net/i2p/router/tunnel/FragmentedMessage.java @@ -74,7 +74,10 @@ public class FragmentedMessage { _log.debug("Receive message " + messageId + " fragment " + fragmentNum + " with " + length + " bytes (last? " + isLast + ") offset = " + offset); _messageId = messageId; // we should just use payload[] and use an offset/length on it - ByteArray ba = new ByteArray(payload, offset, length); //new byte[length]); + ByteArray ba = _cache.acquire(); //new ByteArray(payload, offset, length); //new byte[length]); + System.arraycopy(payload, offset, ba.getData(), 0, length); + ba.setValid(length); + ba.setOffset(0); //System.arraycopy(payload, offset, ba.getData(), 0, length); if (_log.shouldLog(Log.DEBUG)) _log.debug("fragment[" + fragmentNum + "/" + offset + "/" + length + "]: " @@ -107,7 +110,10 @@ public class FragmentedMessage { if (_log.shouldLog(Log.DEBUG)) _log.debug("Receive message " + messageId + " with " + length + " bytes (last? " + isLast + ") targetting " + toRouter + " / " + toTunnel + " offset=" + offset); _messageId = messageId; - ByteArray ba = new ByteArray(payload, offset, length); // new byte[length]); + ByteArray ba = _cache.acquire(); // new ByteArray(payload, offset, length); // new byte[length]); + System.arraycopy(payload, offset, ba.getData(), 0, length); + ba.setValid(length); + ba.setOffset(0); //System.arraycopy(payload, offset, ba.getData(), 0, length); if (_log.shouldLog(Log.DEBUG)) _log.debug("fragment[0/" + offset + "/" + length + "]: " diff --git a/router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java b/router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java index a29ea9644..62efac9b9 100644 --- a/router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java +++ b/router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java @@ -20,19 +20,24 @@ import net.i2p.util.Log; * */ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor { - private I2PAppContext _context; + protected I2PAppContext _context; private Log _log; static final int PREPROCESSED_SIZE = 1024; - private static final int IV_SIZE = HopProcessor.IV_LENGTH; - private static final ByteCache _dataCache = ByteCache.getInstance(512, PREPROCESSED_SIZE); - private static final ByteCache _ivCache = ByteCache.getInstance(128, IV_SIZE); + protected static final int IV_SIZE = HopProcessor.IV_LENGTH; + protected static final ByteCache _dataCache = ByteCache.getInstance(512, PREPROCESSED_SIZE); + protected static final ByteCache _ivCache = ByteCache.getInstance(128, IV_SIZE); public TrivialPreprocessor(I2PAppContext ctx) { _context = ctx; _log = ctx.logManager().getLog(TrivialPreprocessor.class); } + /** + * Return true if there were messages remaining, and we should queue up + * a delayed flush to clear them + * + */ public boolean preprocessQueue(List pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) { while (pending.size() > 0) { TunnelGateway.Pending msg = (TunnelGateway.Pending)pending.remove(0); @@ -75,13 +80,82 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor { * */ private byte[] preprocessFragment(TunnelGateway.Pending msg) { + byte target[] = _dataCache.acquire().getData(); + + int offset = 0; if (msg.getOffset() <= 0) - return preprocessFirstFragment(msg); + offset = writeFirstFragment(msg, target, offset); else - return preprocessSubsequentFragment(msg); + offset = writeSubsequentFragment(msg, target, offset); + + preprocess(target, offset); + return target; } - + /** + * Wrap the preprocessed fragments with the necessary padding / checksums + * to act as a tunnel message. + * + * @param fragmentLength fragments[0:fragmentLength] is used + */ + protected void preprocess(byte fragments[], int fragmentLength) { + ByteArray ivBuf = _ivCache.acquire(); + byte iv[] = ivBuf.getData(); // new byte[IV_SIZE]; + _context.random().nextBytes(iv); + + SHA256EntryCache.CacheEntry cache = _context.sha().cache().acquire(PREPROCESSED_SIZE); + + // payload ready, now H(instructions+payload+IV) + System.arraycopy(iv, 0, fragments, fragmentLength, IV_SIZE); + Hash h = _context.sha().calculateHash(fragments, 0, fragmentLength + IV_SIZE, cache); + //Hash h = _context.sha().calculateHash(target, 0, offset + IV_SIZE); + //_log.debug("before shift: " + Base64.encode(target)); + // now shiiiiiift + int distance = PREPROCESSED_SIZE - fragmentLength; + System.arraycopy(fragments, 0, fragments, distance, fragmentLength); + + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug(msg.getMessageId() + ": fragments begin at " + distance + " (size=" + // + payloadLength + " offset=" + offset +")"); + + java.util.Arrays.fill(fragments, 0, distance, (byte)0x0); + //_log.debug("after shift: " + Base64.encode(target)); + + int offset = 0; + System.arraycopy(iv, 0, fragments, offset, IV_SIZE); + offset += IV_SIZE; + System.arraycopy(h.getData(), 0, fragments, offset, 4); + offset += 4; + //_log.debug("before pad : " + Base64.encode(target)); + + _context.sha().cache().release(cache); + _ivCache.release(ivBuf); + + // fits in a single message, so may be smaller than the full size + int numPadBytes = PREPROCESSED_SIZE // max + - IV_SIZE // hmm.. + - 4 // 4 bytes of the SHA256 + - 1 // the 0x00 after the padding + - fragmentLength; // the size of the fragments (instructions+payload) + + //_log.debug("# pad bytes: " + numPadBytes + " payloadLength: " + payloadLength + " instructions: " + instructionsLength); + + int paddingRemaining = numPadBytes; + while (paddingRemaining > 0) { + byte b = (byte)(_context.random().nextInt() & 0xFF); + if (b != 0x00) { + fragments[offset] = b; + offset++; + paddingRemaining--; + } + } + + fragments[offset] = 0x0; // no more padding + offset++; + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Preprocessing beginning of the fragment instructions at " + offset); + } + /** is this a follw up byte? */ private static final byte MASK_IS_SUBSEQUENT = FragmentHandler.MASK_IS_SUBSEQUENT; /** how should this be delivered? shift this 5 the right and get TYPE_* */ @@ -92,24 +166,28 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor { private static final byte MASK_EXTENDED = FragmentHandler.MASK_EXTENDED; private static final byte MASK_TUNNEL = (byte)(FragmentHandler.TYPE_TUNNEL << 5); private static final byte MASK_ROUTER = (byte)(FragmentHandler.TYPE_ROUTER << 5); - - private byte[] preprocessFirstFragment(TunnelGateway.Pending msg) { + + protected int writeFirstFragment(TunnelGateway.Pending msg, byte target[], int offset) { boolean fragmented = false; - ByteArray ivBuf = _ivCache.acquire(); - byte iv[] = ivBuf.getData(); // new byte[IV_SIZE]; - _context.random().nextBytes(iv); - - byte target[] = _dataCache.acquire().getData(); //new byte[PREPROCESSED_SIZE]; + int origOffset = offset; int instructionsLength = getInstructionsSize(msg); - int payloadLength = msg.getData().length; - if (payloadLength + instructionsLength + IV_SIZE + 1 + 4 > PREPROCESSED_SIZE) { + int payloadLength = msg.getData().length - msg.getOffset(); + if (offset + payloadLength + instructionsLength + IV_SIZE + 1 + 4 > PREPROCESSED_SIZE) { fragmented = true; instructionsLength += 4; // messageId - payloadLength = PREPROCESSED_SIZE - IV_SIZE - 1 - 4 - instructionsLength; + payloadLength = PREPROCESSED_SIZE - IV_SIZE - 1 - 4 - instructionsLength - offset; + if (payloadLength <= 0) + throw new RuntimeException("Fragment too small! payloadLen=" + payloadLength + + " target.length=" + target.length + " offset="+offset + + " msg.length=" + msg.getData().length + " msg.offset=" + msg.getOffset() + + " instructionsLength=" + instructionsLength + " for " + msg); } - - int offset = 0; + if (payloadLength <= 0) + throw new RuntimeException("Full size too small! payloadLen=" + payloadLength + + " target.length=" + target.length + " offset="+offset + + " msg.length=" + msg.getData().length + " msg.offset=" + msg.getOffset() + + " instructionsLength=" + instructionsLength + " for " + msg); // first fragment, or full message target[offset] = 0x0; @@ -142,89 +220,21 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor { DataHelper.toLong(target, offset, 2, payloadLength); offset += 2; //_log.debug("raw data : " + Base64.encode(msg.getData())); - System.arraycopy(msg.getData(), 0, target, offset, payloadLength); + System.arraycopy(msg.getData(), msg.getOffset(), target, offset, payloadLength); if (_log.shouldLog(Log.DEBUG)) _log.debug("initial fragment[" + msg.getMessageId() + "/" + msg.getFragmentNumber()+ "/" + (PREPROCESSED_SIZE - offset - payloadLength) + "/" + payloadLength + "]: " + Base64.encode(target, offset, payloadLength)); offset += payloadLength; - - SHA256EntryCache.CacheEntry cache = _context.sha().cache().acquire(PREPROCESSED_SIZE); - - // payload ready, now H(instructions+payload+IV) - System.arraycopy(iv, 0, target, offset, IV_SIZE); - Hash h = _context.sha().calculateHash(target, 0, offset + IV_SIZE, cache); - //Hash h = _context.sha().calculateHash(target, 0, offset + IV_SIZE); - //_log.debug("before shift: " + Base64.encode(target)); - // now shiiiiiift - int distance = PREPROCESSED_SIZE - offset; - System.arraycopy(target, 0, target, distance, offset); - - if (_log.shouldLog(Log.DEBUG)) - _log.debug(msg.getMessageId() + ": fragments begin at " + distance + " (size=" - + payloadLength + " offset=" + offset +")"); - - java.util.Arrays.fill(target, 0, distance, (byte)0x0); - //_log.debug("after shift: " + Base64.encode(target)); - - offset = 0; - System.arraycopy(iv, 0, target, offset, IV_SIZE); - offset += IV_SIZE; - System.arraycopy(h.getData(), 0, target, offset, 4); - offset += 4; - //_log.debug("before pad : " + Base64.encode(target)); - - _context.sha().cache().release(cache); - _ivCache.release(ivBuf); - - if (!fragmented) { - // fits in a single message, so may be smaller than the full size - int numPadBytes = PREPROCESSED_SIZE // max - - IV_SIZE // hmm.. - - 4 // 4 bytes of the SHA256 - - 1 // the 0x00 after the padding - - payloadLength // the, er, payload - - instructionsLength; // wanna guess? - //_log.debug("# pad bytes: " + numPadBytes + " payloadLength: " + payloadLength + " instructions: " + instructionsLength); - - int paddingRemaining = numPadBytes; - while (paddingRemaining > 0) { - byte b = (byte)(_context.random().nextInt() & 0xFF); - if (b != 0x00) { - target[offset] = b; - offset++; - paddingRemaining--; - } - /* - long rnd = _context.random().nextLong(); - for (long i = 0; i < 8; i++) { - byte b = (byte)(((rnd >>> i * 8l) & 0xFF)); - if (b == 0x00) - continue; - target[offset] = b; - offset++; - paddingRemaining--; - } - */ - } - } - target[offset] = 0x0; // no padding here - offset++; - - msg.setOffset(payloadLength); + msg.setOffset(msg.getOffset() + payloadLength); msg.incrementFragmentNumber(); - return target; + return offset; } - private byte[] preprocessSubsequentFragment(TunnelGateway.Pending msg) { - ByteArray ivBuf = _ivCache.acquire(); + protected int writeSubsequentFragment(TunnelGateway.Pending msg, byte target[], int offset) { boolean isLast = true; - byte iv[] = ivBuf.getData(); // new byte[IV_SIZE]; - _context.random().nextBytes(iv); - - byte target[] = _dataCache.acquire().getData(); // new byte[PREPROCESSED_SIZE]; int instructionsLength = getInstructionsSize(msg); int payloadLength = msg.getData().length - msg.getOffset(); @@ -233,8 +243,6 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor { payloadLength = PREPROCESSED_SIZE - IV_SIZE - 1 - 4 - instructionsLength; } - int offset = 0; - // first fragment, or full message target[offset] = 0x0; target[offset] |= MASK_IS_SUBSEQUENT; @@ -259,63 +267,13 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor { + Base64.encode(target, offset, payloadLength)); offset += payloadLength; - - SHA256EntryCache.CacheEntry cache = _context.sha().cache().acquire(PREPROCESSED_SIZE); - // payload ready, now H(instructions+payload+IV) - System.arraycopy(iv, 0, target, offset, IV_SIZE); - Hash h = _context.sha().calculateHash(target, 0, offset + IV_SIZE, cache); - //Hash h = _context.sha().calculateHash(target, 0, offset + IV_SIZE); - // now shiiiiiift - int distance = PREPROCESSED_SIZE - offset; - System.arraycopy(target, 0, target, distance, offset); - - if (_log.shouldLog(Log.DEBUG)) - _log.debug(msg.getMessageId() + ": fragments begin at " + distance + " (size=" - + payloadLength + " offset=" + offset +")"); - - offset = 0; - System.arraycopy(iv, 0, target, 0, IV_SIZE); - offset += IV_SIZE; - _ivCache.release(ivBuf); - - System.arraycopy(h.getData(), 0, target, offset, 4); - offset += 4; - - _context.sha().cache().release(cache); - - if (isLast) { - // this is the last message, so may be smaller than the full size - int numPadBytes = PREPROCESSED_SIZE // max - - IV_SIZE // hmm.. - - 4 // 4 bytes of the SHA256 - - 1 // the 0x00 after the padding - - payloadLength // the, er, payload - - instructionsLength; // wanna guess? - - for (int i = 0; i < numPadBytes; i++) { - // wouldn't it be nice if random could write to an array? - byte rnd = (byte)_context.random().nextInt(); - if (rnd != 0x0) { - target[offset] = rnd; - offset++; - } else { - i--; - } - } - - if (_log.shouldLog(Log.DEBUG)) - _log.debug("# pad bytes: " + numPadBytes); - } - target[offset] = 0x0; // end of padding - offset++; - - msg.setOffset(msg.getOffset() + payloadLength); msg.incrementFragmentNumber(); - return target; + msg.setOffset(msg.getOffset() + payloadLength); + return offset; } - - private int getInstructionsSize(TunnelGateway.Pending msg) { + + protected int getInstructionsSize(TunnelGateway.Pending msg) { if (msg.getFragmentNumber() > 0) return 7; int header = 1; @@ -324,7 +282,16 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor { if (msg.getToRouter() != null) header += 32; header += 2; + return header; } + protected int getInstructionAugmentationSize(TunnelGateway.Pending msg, int offset, int instructionsSize) { + int payloadLength = msg.getData().length - msg.getOffset(); + if (offset + payloadLength + instructionsSize + IV_SIZE + 1 + 4 > PREPROCESSED_SIZE) { + // requires fragmentation, so include the messageId + return 4; + } + return 0; + } } diff --git a/router/java/src/net/i2p/router/tunnel/TunnelCreatorConfig.java b/router/java/src/net/i2p/router/tunnel/TunnelCreatorConfig.java index 715544e42..cef9c93c1 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelCreatorConfig.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelCreatorConfig.java @@ -2,6 +2,7 @@ package net.i2p.router.tunnel; import java.util.Date; import java.util.Locale; +import java.util.Properties; import java.text.SimpleDateFormat; import net.i2p.data.Base64; @@ -44,6 +45,8 @@ public class TunnelCreatorConfig implements TunnelInfo { /** how many hops are there in the tunnel? */ public int getLength() { return _config.length; } + public Properties getOptions() { return null; } + /** * retrieve the config for the given hop. the gateway is * hop 0. diff --git a/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java b/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java index 134ed050e..334521772 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java @@ -106,6 +106,16 @@ public class TunnelDispatcher implements Service { "How many messages are sent through a participating tunnel?", "Tunnels", new long[] { 60*10*1000l, 60*60*1000l, 24*60*60*1000l }); } + + private TunnelGateway.QueuePreprocessor createPreprocessor() { + return createPreprocessor(null); + } + private TunnelGateway.QueuePreprocessor createPreprocessor(TunnelCreatorConfig cfg) { + if (true) + return new BatchedRouterPreprocessor(_context, cfg); + else + return new TrivialRouterPreprocessor(_context); + } /** * We are the outbound gateway - we created this tunnel @@ -114,7 +124,7 @@ public class TunnelDispatcher implements Service { if (_log.shouldLog(Log.INFO)) _log.info("Outbound built successfully: " + cfg); if (cfg.getLength() > 1) { - TunnelGateway.QueuePreprocessor preproc = new TrivialRouterPreprocessor(_context); + TunnelGateway.QueuePreprocessor preproc = createPreprocessor(cfg); TunnelGateway.Sender sender = new OutboundSender(_context, cfg); TunnelGateway.Receiver receiver = new OutboundReceiver(_context, cfg); TunnelGateway gw = new TunnelGateway(_context, preproc, sender, receiver); @@ -211,7 +221,7 @@ public class TunnelDispatcher implements Service { public void joinInboundGateway(HopConfig cfg) { if (_log.shouldLog(Log.INFO)) _log.info("Joining as inbound gateway: " + cfg); - TunnelGateway.QueuePreprocessor preproc = new TrivialRouterPreprocessor(_context); + TunnelGateway.QueuePreprocessor preproc = createPreprocessor(); TunnelGateway.Sender sender = new InboundSender(_context, cfg); TunnelGateway.Receiver receiver = new InboundGatewayReceiver(_context, cfg); TunnelGateway gw = new TunnelGateway(_context, preproc, sender, receiver); diff --git a/router/java/src/net/i2p/router/tunnel/TunnelGateway.java b/router/java/src/net/i2p/router/tunnel/TunnelGateway.java index 25642da4a..7ec95790b 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelGateway.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelGateway.java @@ -97,6 +97,8 @@ public class TunnelGateway { for (int i = 0; i < _queue.size(); i++) { Pending m = (Pending)_queue.get(i); if (m.getExpiration() < _lastFlush) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Expire on the queue: " + m); _queue.remove(i); i--; } @@ -175,13 +177,16 @@ public class TunnelGateway { private class DelayedFlush implements SimpleTimer.TimedEvent { public void timeReached() { - long now = _context.clock().now(); + boolean wantRequeue = false; synchronized (_queue) { - if ( (_queue.size() > 0) && (_lastFlush + _flushFrequency < now) ) { - _preprocessor.preprocessQueue(_queue, _sender, _receiver); - _lastFlush = _context.clock().now(); - } + if (_queue.size() > 0) + wantRequeue = _preprocessor.preprocessQueue(_queue, _sender, _receiver); } + + if (wantRequeue) + SimpleTimer.getInstance().addEvent(_delayedFlush, _flushFrequency); + else + _lastFlush = _context.clock().now(); } } } diff --git a/router/java/src/net/i2p/router/tunnel/pool/PooledTunnelCreatorConfig.java b/router/java/src/net/i2p/router/tunnel/pool/PooledTunnelCreatorConfig.java index 91a2b2538..0e634d6ec 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/PooledTunnelCreatorConfig.java +++ b/router/java/src/net/i2p/router/tunnel/pool/PooledTunnelCreatorConfig.java @@ -1,5 +1,6 @@ package net.i2p.router.tunnel.pool; +import java.util.Properties; import net.i2p.data.Hash; import net.i2p.router.RouterContext; import net.i2p.router.tunnel.TunnelCreatorConfig; @@ -34,6 +35,11 @@ public class PooledTunnelCreatorConfig extends TunnelCreatorConfig { } } + public Properties getOptions() { + if (_pool == null) return null; + return _pool.getSettings().getUnknownOptions(); + } + /** * The tunnel failed, so stop using it */