diff --git a/router/java/src/net/i2p/router/tunnel/pool/BuildExecutor.java b/router/java/src/net/i2p/router/tunnel/pool/BuildExecutor.java index 4691f2ae3..ffe67b9e9 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/BuildExecutor.java +++ b/router/java/src/net/i2p/router/tunnel/pool/BuildExecutor.java @@ -26,20 +26,21 @@ import net.i2p.util.Log; * changed, such as a tunnel failed, new client started up, or tunnel creation was aborted). * * Note that 10 minute tunnel expiration is hardcoded in here. + * + * As of 0.8.11, inbound request handling is done in a separate thread. */ class BuildExecutor implements Runnable { private final ArrayList _recentBuildIds = new ArrayList(100); private final RouterContext _context; private final Log _log; private final TunnelPoolManager _manager; - /** list of TunnelCreatorConfig elements of tunnels currently being built */ + /** Notify lock */ private final Object _currentlyBuilding; /** indexed by ptcc.getReplyMessageId() */ private final ConcurrentHashMap _currentlyBuildingMap; /** indexed by ptcc.getReplyMessageId() */ private final ConcurrentHashMap _recentlyBuildingMap; private boolean _isRunning; - private final BuildHandler _handler; private boolean _repoll; private static final int MAX_CONCURRENT_BUILDS = 10; /** accept replies up to a minute after we gave up on them */ @@ -63,7 +64,7 @@ class BuildExecutor implements Runnable { _context.statManager().createRequiredRateStat("tunnel.buildRequestTime", "Time to build a tunnel request (ms)", "Tunnels", new long[] { 60*1000, 10*60*1000 }); _context.statManager().createRateStat("tunnel.buildConfigTime", "Time to build a tunnel request (ms)", "Tunnels", new long[] { 60*1000, 10*60*1000 }); _context.statManager().createRateStat("tunnel.buildRequestZeroHopTime", "How long it takes to build a zero hop tunnel", "Tunnels", new long[] { 60*1000, 10*60*1000 }); - _context.statManager().createRateStat("tunnel.pendingRemaining", "How many inbound requests are pending after a pass (period is how long the pass takes)?", "Tunnels", new long[] { 60*1000, 10*60*1000 }); + //_context.statManager().createRateStat("tunnel.pendingRemaining", "How many inbound requests are pending after a pass (period is how long the pass takes)?", "Tunnels", new long[] { 60*1000, 10*60*1000 }); _context.statManager().createRateStat("tunnel.buildFailFirstHop", "How often we fail to build a OB tunnel because we can't contact the first hop", "Tunnels", new long[] { 60*1000, 10*60*1000 }); _context.statManager().createRateStat("tunnel.buildReplySlow", "Build reply late, but not too late", "Tunnels", new long[] { 10*60*1000 }); @@ -81,8 +82,6 @@ class BuildExecutor implements Runnable { statMgr.createRateStat("tunnel.tierAgreeUnknown", "Agreed joins from unknown", "Tunnels", new long[] { 60*1000, 10*60*1000 }); statMgr.createRateStat("tunnel.tierRejectUnknown", "Rejected joins from unknown", "Tunnels", new long[] { 60*1000, 10*60*1000 }); statMgr.createRateStat("tunnel.tierExpireUnknown", "Expired joins from unknown", "Tunnels", new long[] { 60*1000, 10*60*1000 }); - - _handler = new BuildHandler(ctx, this); } private int allowed() { @@ -266,8 +265,6 @@ class BuildExecutor implements Runnable { List wanted = new ArrayList(MAX_CONCURRENT_BUILDS); List pools = new ArrayList(8); - int pendingRemaining = 0; - //long loopBegin = 0; //long afterBuildZeroHop = 0; long afterBuildReal = 0; @@ -276,7 +273,7 @@ class BuildExecutor implements Runnable { while (!_manager.isShutdown()){ //loopBegin = System.currentTimeMillis(); try { - _repoll = pendingRemaining > 0; // resets repoll to false unless there are inbound requeusts pending + _repoll = false; // resets repoll to false unless there are inbound requeusts pending _manager.listPools(pools); for (int i = 0; i < pools.size(); i++) { TunnelPool pool = pools.get(i); @@ -308,7 +305,7 @@ class BuildExecutor implements Runnable { synchronized (_currentlyBuilding) { if (!_repoll) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("No tunnel to build with (allowed=" + allowed + ", wanted=" + wanted.size() + ", pending=" + pendingRemaining + "), wait for a while"); + _log.debug("No tunnel to build with (allowed=" + allowed + ", wanted=" + wanted.size() + "), wait for a while"); try { _currentlyBuilding.wait(1*1000+_context.random().nextInt(1*1000)); } catch (InterruptedException ie) {} @@ -369,12 +366,6 @@ class BuildExecutor implements Runnable { afterBuildReal = System.currentTimeMillis(); - pendingRemaining = _handler.handleInboundRequests(); - afterHandleInbound = System.currentTimeMillis(); - - if (pendingRemaining > 0) - _context.statManager().addRateData("tunnel.pendingRemaining", pendingRemaining, afterHandleInbound-afterBuildReal); - //if (_log.shouldLog(Log.DEBUG)) // _log.debug("build loop complete, tot=" + (afterHandleInbound-loopBegin) + // " inReply=" + (afterHandleInboundReplies-beforeHandleInboundReplies) + @@ -387,7 +378,6 @@ class BuildExecutor implements Runnable { wanted.clear(); pools.clear(); } catch (RuntimeException e) { - if (_log.shouldLog(Log.CRIT)) _log.log(Log.CRIT, "B0rked in the tunnel builder", e); } } @@ -568,6 +558,4 @@ class BuildExecutor implements Runnable { } return rv; } - - public int getInboundBuildQueueSize() { return _handler.getInboundBuildQueueSize(); } } diff --git a/router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java b/router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java index 6a77a5e57..7857482eb 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java +++ b/router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java @@ -39,26 +39,31 @@ import net.i2p.util.Log; * and updating stats. * * Replies are handled immediately on reception; requests are queued. + * As of 0.8.11 the request queue is handled in a separate thread, + * it used to be called from the BuildExecutor thread loop. * * Note that 10 minute tunnel expiration is hardcoded in here. */ -class BuildHandler { +class BuildHandler implements Runnable { private final RouterContext _context; private final Log _log; + private final TunnelPoolManager _manager; private final BuildExecutor _exec; private final Job _buildMessageHandlerJob; private final Job _buildReplyMessageHandlerJob; private final LinkedBlockingQueue _inboundBuildMessages; private final BuildMessageProcessor _processor; private final ParticipatingThrottler _throttler; + private boolean _isRunning; /** TODO these may be too high, review and adjust */ - private static final int MIN_QUEUE = 12; - private static final int MAX_QUEUE = 96; + private static final int MIN_QUEUE = 18; + private static final int MAX_QUEUE = 192; - public BuildHandler(RouterContext ctx, BuildExecutor exec) { + public BuildHandler(RouterContext ctx, TunnelPoolManager manager, BuildExecutor exec) { _context = ctx; _log = ctx.logManager().getLog(getClass()); + _manager = manager; _exec = exec; // Queue size = 12 * share BW / 48K int sz = Math.min(MAX_QUEUE, Math.max(MIN_QUEUE, TunnelDispatcher.getShareBandwidth(ctx) * MIN_QUEUE / 48)); @@ -82,7 +87,7 @@ class BuildHandler { _context.statManager().createRequiredRateStat("tunnel.dropLoadBacklog", "Pending request count when dropped", "Tunnels", new long[] { 60*1000, 10*60*1000 }); _context.statManager().createRequiredRateStat("tunnel.dropLoadProactive", "Delay estimate when dropped (ms)", "Tunnels", new long[] { 60*1000, 10*60*1000 }); _context.statManager().createRequiredRateStat("tunnel.dropLoadProactiveAbort", "Allowed requests during load", "Tunnels", new long[] { 60*1000, 10*60*1000 }); - _context.statManager().createRateStat("tunnel.handleRemaining", "How many pending inbound requests were left on the queue after one pass?", "Tunnels", new long[] { 60*1000, 10*60*1000 }); + //_context.statManager().createRateStat("tunnel.handleRemaining", "How many pending inbound requests were left on the queue after one pass?", "Tunnels", new long[] { 60*1000, 10*60*1000 }); _context.statManager().createRateStat("tunnel.buildReplyTooSlow", "How often a tunnel build reply came back after we had given up waiting for it?", "Tunnels", new long[] { 60*1000, 10*60*1000 }); _context.statManager().createRateStat("tunnel.receiveRejectionProbabalistic", "How often we are rejected probabalistically?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); @@ -105,18 +110,37 @@ class BuildHandler { ctx.inNetMessagePool().registerHandlerJobBuilder(VariableTunnelBuildReplyMessage.MESSAGE_TYPE, tbrmhjb); } - private static final int MAX_HANDLE_AT_ONCE = 2; private static final int NEXT_HOP_LOOKUP_TIMEOUT = 15*1000; /** - * Blocking call to handle a few of the pending inbound requests, returning how many - * requests remain after this pass. This is called by BuildExecutor. + * Thread to handle inbound requests + * @since 0.8.11 */ - int handleInboundRequests() { - for (int i = 0; i < MAX_HANDLE_AT_ONCE; ) { - BuildMessageState state = _inboundBuildMessages.poll(); - if (state == null) - return 0; + public void run() { + _isRunning = true; + while (!_manager.isShutdown()) { + try { + handleInboundRequest(); + } catch (Exception e) { + _log.log(Log.CRIT, "B0rked in the tunnel handler", e); + } + } + if (_log.shouldLog(Log.WARN)) + _log.warn("Done handling"); + _isRunning = false; + } + + /** + * Blocking call to handle a single inbound request + */ + private void handleInboundRequest() { + BuildMessageState state = null; + + try { + state = _inboundBuildMessages.take(); + } catch (InterruptedException ie) { + return; + } long dropBefore = System.currentTimeMillis() - (BuildRequestor.REQUEST_TIMEOUT/4); if (state.recvTime <= dropBefore) { if (_log.shouldLog(Log.WARN)) @@ -124,23 +148,19 @@ class BuildHandler { + ", since we received it a long time ago: " + (System.currentTimeMillis() - state.recvTime)); _context.statManager().addRateData("tunnel.dropLoadDelay", System.currentTimeMillis() - state.recvTime, 0); _context.throttle().setTunnelStatus(_x("Dropping tunnel requests: Too slow")); - continue; + return; } + handleRequest(state); - i++; - long beforeHandle = System.currentTimeMillis(); - long actualTime = handleRequest(state); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Handle took " + (System.currentTimeMillis()-beforeHandle) + "/" + actualTime + - " (" + i + " with " + _inboundBuildMessages.size() + " remaining)"); - } - - int remaining = _inboundBuildMessages.size(); - if (remaining > 0) - _context.statManager().addRateData("tunnel.handleRemaining", remaining, 0); - return remaining; + //int remaining = _inboundBuildMessages.size(); + //if (remaining > 0) + // _context.statManager().addRateData("tunnel.handleRemaining", remaining, 0); + //return remaining; } + /** + * Blocking call to handle a single inbound reply + */ private void handleReply(BuildReplyMessageState state) { // search through the tunnels for a reply long replyMessageId = state.msg.getUniqueId(); @@ -157,6 +177,9 @@ class BuildHandler { } } + /** + * Blocking call to handle a single inbound reply + */ private void handleReply(TunnelBuildReplyMessage msg, PooledTunnelCreatorConfig cfg, long delay) { long requestedOn = cfg.getExpiration() - 10*60*1000; long rtt = _context.clock().now() - requestedOn; @@ -797,6 +820,7 @@ class BuildHandler { recvTime = System.currentTimeMillis(); } } + /** replies for outbound tunnels that we have created */ private static class BuildReplyMessageState { final TunnelBuildReplyMessage msg; @@ -806,6 +830,7 @@ class BuildHandler { recvTime = System.currentTimeMillis(); } } + /** replies for inbound tunnels we have created */ private static class BuildEndMessageState { final TunnelBuildMessage msg; diff --git a/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java b/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java index a29d42679..05fa2bc7e 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java +++ b/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java @@ -26,6 +26,7 @@ import net.i2p.router.TunnelInfo; import net.i2p.router.TunnelManagerFacade; import net.i2p.router.TunnelPoolSettings; import net.i2p.router.tunnel.HopConfig; +import net.i2p.router.tunnel.TunnelDispatcher; import net.i2p.stat.RateStat; import net.i2p.util.I2PThread; import net.i2p.util.Log; @@ -34,7 +35,8 @@ import net.i2p.util.SimpleScheduler; import net.i2p.util.SimpleTimer; /** - * + * Manage all the exploratory and client tunnel pools. + * Run the tunnel builder and handler threads. */ public class TunnelPoolManager implements TunnelManagerFacade { private final RouterContext _context; @@ -46,8 +48,12 @@ public class TunnelPoolManager implements TunnelManagerFacade { private TunnelPool _inboundExploratory; private TunnelPool _outboundExploratory; private final BuildExecutor _executor; + private final BuildHandler _handler; private boolean _isShutdown; private static final long[] RATES = { 60*1000, 10*60*1000l, 60*60*1000l }; + + private static final int MIN_KBPS_TWO_HANDLERS = 512; + private static final int MIN_KBPS_THREE_HANDLERS = 1024; public TunnelPoolManager(RouterContext ctx) { _context = ctx; @@ -63,9 +69,21 @@ public class TunnelPoolManager implements TunnelManagerFacade { _clientOutboundPools = new ConcurrentHashMap(4); _executor = new BuildExecutor(ctx, this); - I2PThread execThread = new I2PThread(_executor, "BuildExecutor"); - execThread.setDaemon(true); + I2PThread execThread = new I2PThread(_executor, "BuildExecutor", true); execThread.start(); + _handler = new BuildHandler(ctx, this, _executor); + int numHandlerThreads; + int share = TunnelDispatcher.getShareBandwidth(ctx); + if (share >= MIN_KBPS_THREE_HANDLERS) + numHandlerThreads = 3; + else if (share >= MIN_KBPS_TWO_HANDLERS) + numHandlerThreads = 2; + else + numHandlerThreads = 1; + for (int i = 1; i <= numHandlerThreads; i++) { + I2PThread hThread = new I2PThread(_handler, "BuildHandler " + i + '/' + numHandlerThreads, true); + hThread.start(); + } // The following are for TestJob ctx.statManager().createRequiredRateStat("tunnel.testFailedTime", "Time for tunnel test failure (ms)", "Tunnels", @@ -550,7 +568,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { BuildExecutor getExecutor() { return _executor; } boolean isShutdown() { return _isShutdown; } - public int getInboundBuildQueueSize() { return _executor.getInboundBuildQueueSize(); } + public int getInboundBuildQueueSize() { return _handler.getInboundBuildQueueSize(); } /** @deprecated moved to routerconsole */ public void renderStatusHTML(Writer out) throws IOException {