diff --git a/apps/i2psnark/java/src/org/klomp/snark/ConnectionAcceptor.java b/apps/i2psnark/java/src/org/klomp/snark/ConnectionAcceptor.java index 68808c8b8..130e10133 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/ConnectionAcceptor.java +++ b/apps/i2psnark/java/src/org/klomp/snark/ConnectionAcceptor.java @@ -125,7 +125,13 @@ public class ConnectionAcceptor implements Runnable if (socketChanged) { continue; } else { - Snark.debug("Null socket accepted, but socket wasn't changed?", Snark.ERROR); + I2PServerSocket ss = I2PSnarkUtil.instance().getServerSocket(); + if (ss != serverSocket) { + serverSocket = ss; + } else { + Snark.debug("Null socket accepted, but socket wasn't changed?", Snark.ERROR); + try { Thread.sleep(10*1000); } catch (InterruptedException ie) {} + } } } else { Thread t = new I2PThread(new Handler(socket), "Connection-" + socket); diff --git a/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java b/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java index 175af64f3..adda93abf 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java +++ b/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java @@ -52,7 +52,8 @@ public class SnarkManager implements Snark.CompleteListener { while (_messages.size() > MAX_MESSAGES) _messages.remove(0); } - _log.info("MSG: " + message); + if (_log.shouldLog(Log.INFO)) + _log.info("MSG: " + message); } /** newest last */ diff --git a/history.txt b/history.txt index ef8a56ebc..c4910e402 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,18 @@ -$Id: history.txt,v 1.392 2006/01/18 01:37:53 cervantes Exp $ +$Id: history.txt,v 1.393 2006/01/19 23:40:24 complication Exp $ + +2006-01-22 jrandom + * New tunnel build process - does not use the new crypto or new peer + selection strategies. However, it does drop the fallback tunnel + procedure, except for tunnels who are configured to allow them, or for + the exploratory pool during bootstrapping or after a catastrophic + failure. This new process prefers to fail rather than use too-short + tunnels, so while it can do some pretty aggressive tunnel rebuilding, + it may expose more tunnel failures to the user. + * Always prefer normal tunnels to fallback tunnels. + * Potential fix for a bug while changing i2cp settings on I2PSnark (thanks + bar!) + * Do all of the netDb entry writing in a separate thread, avoiding + duplicates and batching them up. 2006-01-19 Complication * Explain better where eepsite's destkey can be found diff --git a/router/java/src/net/i2p/router/Router.java b/router/java/src/net/i2p/router/Router.java index bce455adc..fab4d2b85 100644 --- a/router/java/src/net/i2p/router/Router.java +++ b/router/java/src/net/i2p/router/Router.java @@ -364,7 +364,12 @@ public class Router { public static final char CAPABILITY_UNREACHABLE = 'U'; public static final String PROP_FORCE_UNREACHABLE = "router.forceUnreachable"; + public static final char CAPABILITY_NEW_TUNNEL = 'T'; + public void addReachabilityCapability(RouterInfo ri) { + // routers who can understand TunnelBuildMessages + ////ri.addCapability(CAPABILITY_NEW_TUNNEL); + String forceUnreachable = _context.getProperty(PROP_FORCE_UNREACHABLE); if ( (forceUnreachable != null) && ("true".equalsIgnoreCase(forceUnreachable)) ) { ri.addCapability(CAPABILITY_UNREACHABLE); diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index f7c2a2cf9..536b00636 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.336 $ $Date: 2006/01/18 01:38:49 $"; + public final static String ID = "$Revision: 1.337 $ $Date: 2006/01/19 23:40:25 $"; public final static String VERSION = "0.6.1.9"; - public final static long BUILD = 5; + public final static long BUILD = 6; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/client/ClientManagerFacadeImpl.java b/router/java/src/net/i2p/router/client/ClientManagerFacadeImpl.java index 44df7d1a8..796a098a9 100644 --- a/router/java/src/net/i2p/router/client/ClientManagerFacadeImpl.java +++ b/router/java/src/net/i2p/router/client/ClientManagerFacadeImpl.java @@ -74,7 +74,7 @@ public class ClientManagerFacadeImpl extends ClientManagerFacade { startup(); } - private static final long MAX_TIME_TO_REBUILD = 5*60*1000; + private static final long MAX_TIME_TO_REBUILD = 10*60*1000; public boolean verifyClientLiveliness() { if (_manager == null) return true; boolean lively = true; diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/PersistentDataStore.java b/router/java/src/net/i2p/router/networkdb/kademlia/PersistentDataStore.java index ee28d7c51..eaa4d7cdb 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/PersistentDataStore.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/PersistentDataStore.java @@ -14,6 +14,7 @@ import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.FilenameFilter; import java.io.IOException; +import java.util.*; import net.i2p.data.DataFormatException; import net.i2p.data.DataStructure; @@ -22,6 +23,7 @@ import net.i2p.data.LeaseSet; import net.i2p.data.RouterInfo; import net.i2p.router.JobImpl; import net.i2p.router.RouterContext; +import net.i2p.util.I2PThread; import net.i2p.util.Log; /** @@ -33,6 +35,7 @@ class PersistentDataStore extends TransientDataStore { private Log _log; private String _dbDir; private KademliaNetworkDatabaseFacade _facade; + private Writer _writer; private final static int READ_DELAY = 60*1000; @@ -42,6 +45,12 @@ class PersistentDataStore extends TransientDataStore { _dbDir = dbDir; _facade = facade; _context.jobQueue().addJob(new ReadJob()); + ctx.statManager().createRateStat("netDb.writeClobber", "How often we clobber a pending netDb write", "Network Database", new long[] { 60*1000, 10*60*1000 }); + ctx.statManager().createRateStat("netDb.writePending", "How many pending writes are there", "Network Database", new long[] { 60*1000, 10*60*1000 }); + _writer = new Writer(); + I2PThread writer = new I2PThread(_writer, "DBWriter"); + writer.setDaemon(true); + writer.start(); } public void restart() { @@ -56,7 +65,7 @@ class PersistentDataStore extends TransientDataStore { public void put(Hash key, DataStructure data) { if ( (data == null) || (key == null) ) return; super.put(key, data); - _context.jobQueue().addJob(new WriteJob(key, data)); + _writer.queue(key, data); } public int countLeaseSets() { @@ -103,61 +112,99 @@ class PersistentDataStore extends TransientDataStore { } } - private class WriteJob extends JobImpl { - private Hash _key; - private DataStructure _data; - public WriteJob(Hash key, DataStructure data) { - super(PersistentDataStore.this._context); - _key = key; - _data = data; + private class Writer implements Runnable { + private Map _keys; + private List _keyOrder; + public Writer() { + _keys = new HashMap(64); + _keyOrder = new ArrayList(64); } - public String getName() { return "DB Writer Job"; } - public void runJob() { - _log.info("Writing key " + _key); - FileOutputStream fos = null; - File dbFile = null; - try { - String filename = null; - File dbDir = getDbDir(); - - if (_data instanceof LeaseSet) - filename = getLeaseSetName(_key); - else if (_data instanceof RouterInfo) - filename = getRouterInfoName(_key); - else - throw new IOException("We don't know how to write objects of type " + _data.getClass().getName()); - - dbFile = new File(dbDir, filename); - long dataPublishDate = getPublishDate(); - if (dbFile.lastModified() < dataPublishDate) { - // our filesystem is out of date, lets replace it - fos = new FileOutputStream(dbFile); - try { - _data.writeBytes(fos); - fos.close(); - dbFile.setLastModified(dataPublishDate); - } catch (DataFormatException dfe) { - _log.error("Error writing out malformed object as " + _key + ": " - + _data, dfe); - dbFile.delete(); + public void queue(Hash key, DataStructure data) { + boolean exists = false; + int pending = 0; + synchronized (_keys) { + pending = _keys.size(); + exists = (null != _keys.put(key, data)); + if (!exists) + _keyOrder.add(key); + _keys.notifyAll(); + } + if (exists) + _context.statManager().addRateData("netDb.writeClobber", pending, 0); + _context.statManager().addRateData("netDb.writePending", pending, 0); + } + public void run() { + Hash key = null; + DataStructure data = null; + while (true) { // hmm, probably want a shutdown handle... though this is a daemon thread + try { + synchronized (_keys) { + if (_keyOrder.size() <= 0) { + _keys.wait(); + } else { + key = (Hash)_keyOrder.remove(0); + data = (DataStructure)_keys.remove(key); + } } - } else { - // we've already written the file, no need to waste our time - } - } catch (IOException ioe) { - _log.error("Error writing out the object", ioe); - } finally { - if (fos != null) try { fos.close(); } catch (IOException ioe) {} + } catch (InterruptedException ie) {} + + if ( (key != null) && (data != null) ) + write(key, data); + key = null; + data = null; + try { Thread.sleep(10*1000); } catch (InterruptedException ie) {} } } - private long getPublishDate() { - if (_data instanceof RouterInfo) { - return ((RouterInfo)_data).getPublished(); - } else if (_data instanceof LeaseSet) { - return ((LeaseSet)_data).getEarliestLeaseDate(); + } + + private void write(Hash key, DataStructure data) { + _log.info("Writing key " + key); + FileOutputStream fos = null; + File dbFile = null; + try { + String filename = null; + File dbDir = getDbDir(); + + if (data instanceof LeaseSet) + filename = getLeaseSetName(key); + else if (data instanceof RouterInfo) + filename = getRouterInfoName(key); + else + throw new IOException("We don't know how to write objects of type " + data.getClass().getName()); + + dbFile = new File(dbDir, filename); + long dataPublishDate = getPublishDate(data); + if (dbFile.lastModified() < dataPublishDate) { + // our filesystem is out of date, lets replace it + fos = new FileOutputStream(dbFile); + try { + data.writeBytes(fos); + fos.close(); + dbFile.setLastModified(dataPublishDate); + } catch (DataFormatException dfe) { + _log.error("Error writing out malformed object as " + key + ": " + + data, dfe); + dbFile.delete(); + } } else { - return -1; + // we've already written the file, no need to waste our time + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Not writing " + key.toBase64() + ", as its up to date on disk (file mod-publish=" + + (dbFile.lastModified()-dataPublishDate) + ")"); } + } catch (IOException ioe) { + _log.error("Error writing out the object", ioe); + } finally { + if (fos != null) try { fos.close(); } catch (IOException ioe) {} + } + } + private long getPublishDate(DataStructure data) { + if (data instanceof RouterInfo) { + return ((RouterInfo)data).getPublished(); + } else if (data instanceof LeaseSet) { + return ((LeaseSet)data).getEarliestLeaseDate(); + } else { + return -1; } } diff --git a/router/java/src/net/i2p/router/peermanager/IsFailingCalculator.java b/router/java/src/net/i2p/router/peermanager/IsFailingCalculator.java index f2607bea3..1391bd8a1 100644 --- a/router/java/src/net/i2p/router/peermanager/IsFailingCalculator.java +++ b/router/java/src/net/i2p/router/peermanager/IsFailingCalculator.java @@ -30,6 +30,7 @@ public class IsFailingCalculator extends Calculator { public boolean calcBoolean(PeerProfile profile) { // have we failed in the last 119 seconds? + /* if ( (profile.getCommError().getRate(60*1000).getCurrentEventCount() > 0) || (profile.getCommError().getRate(60*1000).getLastEventCount() > 0) || (profile.getCommError().getRate(10*60*1000).getCurrentEventCount() > 0) ) { @@ -38,7 +39,7 @@ public class IsFailingCalculator extends Calculator { + " is failing because it had a comm error recently "); return true; } else { - + */ //if ( (profile.getDBHistory().getFailedLookupRate().getRate(60*1000).getCurrentEventCount() > 0) || // (profile.getDBHistory().getFailedLookupRate().getRate(60*1000).getLastEventCount() > 0) ) { // // are they overloaded (or disconnected)? @@ -76,6 +77,6 @@ public class IsFailingCalculator extends Calculator { return true; return false; - } + //} } } diff --git a/router/java/src/net/i2p/router/tunnel/pool/BuildExecutor.java b/router/java/src/net/i2p/router/tunnel/pool/BuildExecutor.java new file mode 100644 index 000000000..d6781830e --- /dev/null +++ b/router/java/src/net/i2p/router/tunnel/pool/BuildExecutor.java @@ -0,0 +1,251 @@ +package net.i2p.router.tunnel.pool; + +import java.util.*; +import net.i2p.router.Job; +import net.i2p.router.JobImpl; +import net.i2p.router.RouterContext; +import net.i2p.router.tunnel.TunnelCreatorConfig; +import net.i2p.util.Log; + +/** + * Single threaded controller of the tunnel creation process, spanning all tunnel pools. + * Essentially, this loops across the pools, sees which want to build tunnels, and fires + * off the necessary activities if the load allows. If nothing wants to build any tunnels, + * it waits for a short period before looping again (or until it is told that something + * changed, such as a tunnel failed, new client started up, or tunnel creation was aborted). + * + */ +class BuildExecutor implements Runnable { + private RouterContext _context; + private Log _log; + private TunnelPoolManager _manager; + /** list of TunnelCreatorConfig elements of tunnels currently being built */ + private List _currentlyBuilding; + private boolean _isRunning; + + public BuildExecutor(RouterContext ctx, TunnelPoolManager mgr) { + _context = ctx; + _log = ctx.logManager().getLog(getClass()); + _manager = mgr; + _currentlyBuilding = new ArrayList(10); + _context.statManager().createRateStat("tunnel.concurrentBuilds", "How many builds are going at once", "Tunnels", new long[] { 60*1000, 5*60*1000, 60*60*1000 }); + _context.statManager().createRateStat("tunnel.concurrentBuildsLagged", "How many builds are going at once when we reject further builds, due to job lag (period is lag)", "Tunnels", new long[] { 60*1000, 5*60*1000, 60*60*1000 }); + } + + private int allowed() { + StringBuffer buf = null; + if (_log.shouldLog(Log.DEBUG)) { + buf = new StringBuffer(128); + buf.append("Allowed: "); + } + int allowed = 20; + String prop = _context.getProperty("router.tunnelConcurrentBuilds"); + if (prop != null) + try { allowed = Integer.valueOf(prop).intValue(); } catch (NumberFormatException nfe) {} + + int concurrent = 0; + synchronized (_currentlyBuilding) { + concurrent = _currentlyBuilding.size(); + allowed -= concurrent; + if (buf != null) + buf.append(allowed).append(" ").append(_currentlyBuilding.toString()); + } + if (buf != null) + _log.debug(buf.toString()); + + _context.statManager().addRateData("tunnel.concurrentBuilds", concurrent, 0); + + long lag = _context.jobQueue().getMaxLag(); + if ( (lag > 2000) && (_context.router().getUptime() > 5*60*1000) ) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Too lagged [" + lag + "], don't allow building"); + _context.statManager().addRateData("tunnel.concurrentBuildsLagged", concurrent, lag); + return 0; // if we have a job heavily blocking our jobqueue, ssllloowww dddooowwwnnn + } + //if (isOverloaded()) + // return 0; + + return allowed; + } + + public void run() { + _isRunning = true; + List wanted = new ArrayList(8); + List pools = new ArrayList(8); + + while (!_manager.isShutdown()) { + try { + _manager.listPools(pools); + for (int i = 0; i < pools.size(); i++) { + TunnelPool pool = (TunnelPool)pools.get(i); + int howMany = pool.countHowManyToBuild(); + for (int j = 0; j < howMany; j++) + wanted.add(pool); + } + + int allowed = allowed(); + + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Allowed: " + allowed + " wanted: " + wanted); + + // zero hop ones can run inline + allowed = buildZeroHopTunnels(wanted, allowed); + + if ( (allowed > 0) && (wanted.size() > 0) ) { + Collections.shuffle(wanted, _context.random()); + for (int i = 0; (i < allowed) && (wanted.size() > 0); i++) { + TunnelPool pool = (TunnelPool)wanted.remove(0); + //if (pool.countWantedTunnels() <= 0) + // continue; + PooledTunnelCreatorConfig cfg = pool.configureNewTunnel(); + if (cfg != null) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Configuring new tunnel " + i + " for " + pool + ": " + cfg); + synchronized (_currentlyBuilding) { + _currentlyBuilding.add(cfg); + } + buildTunnel(pool, cfg); + if (cfg.getLength() <= 1) + i--; //0hop, we can keep going, as there's no worry about throttling + } else { + i--; + } + } + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Nothin' doin, wait for a while"); + try { + synchronized (_currentlyBuilding) { + if (allowed <= 0) + _currentlyBuilding.wait(_context.random().nextInt(5*1000)); + else // wanted <= 0 + _currentlyBuilding.wait(_context.random().nextInt(30*1000)); + } + } catch (InterruptedException ie) { + // someone wanted to build something + } + } + + wanted.clear(); + pools.clear(); + } catch (Exception e) { + if (_log.shouldLog(Log.CRIT)) + _log.log(Log.CRIT, "B0rked in the tunnel builder", e); + } + } + + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Done building"); + _isRunning = false; + } + + /** + * iterate over the 0hop tunnels, running them all inline regardless of how many are allowed + * @return number of tunnels allowed after processing these zero hop tunnels (almost always the same as before) + */ + private int buildZeroHopTunnels(List wanted, int allowed) { + for (int i = 0; i < wanted.size(); i++) { + TunnelPool pool = (TunnelPool)wanted.get(0); + if (pool.getSettings().getLength() == 0) { + PooledTunnelCreatorConfig cfg = pool.configureNewTunnel(); + if (cfg != null) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Configuring short tunnel " + i + " for " + pool + ": " + cfg); + synchronized (_currentlyBuilding) { + _currentlyBuilding.add(cfg); + } + buildTunnel(pool, cfg); + if (cfg.getLength() > 1) { + allowed--; // oops... shouldn't have done that, but hey, its not that bad... + } + wanted.remove(i); + i--; + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Configured a null tunnel"); + } + } + } + return allowed; + } + + public boolean isRunning() { return _isRunning; } + + void buildTunnel(TunnelPool pool, PooledTunnelCreatorConfig cfg) { + // old style here, replace with the new crypto stuff later + CompleteJob onCreated = new CompleteJob(_context, cfg, new SuccessJob(_context, cfg, pool), pool); + CompleteJob onFailed = new CompleteJob(_context, cfg, null, pool); + RequestTunnelJob j = new RequestTunnelJob(_context, cfg, onCreated, onFailed, cfg.getLength()-1, false, cfg.getDestination()==null); + if (cfg.getLength() <= 1) // length == 1 ==> hops = 0, so do it inline (as its immediate) + j.runJob(); + else + j.runJob(); // always inline, as this is on its own thread so it can block + //_context.jobQueue().addJob(j); + } + + public void buildComplete(PooledTunnelCreatorConfig cfg, TunnelPool pool) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Build complete for " + cfg); + pool.buildComplete(cfg); + synchronized (_currentlyBuilding) { + _currentlyBuilding.remove(cfg); + _currentlyBuilding.notifyAll(); + } + } + + public void repoll() { + synchronized (_currentlyBuilding) { _currentlyBuilding.notifyAll(); } + } + + + private class CompleteJob extends JobImpl { + private PooledTunnelCreatorConfig _cfg; + private TunnelPool _pool; + private Job _onRun; + public CompleteJob(RouterContext ctx, PooledTunnelCreatorConfig cfg, Job onRun, TunnelPool pool) { + super(ctx); + _cfg = cfg; + _onRun = onRun; + _pool = pool; + } + public String getName() { return "Tunnel create complete"; } + public void runJob() { + if (_onRun != null) + _onRun.runJob(); + //getContext().jobQueue().addJob(_onRun); + buildComplete(_cfg, _pool); + } + } + private class SuccessJob extends JobImpl { + private PooledTunnelCreatorConfig _cfg; + private TunnelPool _pool; + public SuccessJob(RouterContext ctx, PooledTunnelCreatorConfig cfg, TunnelPool pool) { + super(ctx); + _cfg = cfg; + _pool = pool; + } + public String getName() { return "Tunnel create successful"; } + public void runJob() { + _log.debug("Created successfully: " + _cfg); + if (_cfg.isInbound()) { + getContext().tunnelDispatcher().joinInbound(_cfg); + } else { + getContext().tunnelDispatcher().joinOutbound(_cfg); + } + + _pool.addTunnel(_cfg); + _pool.getManager().buildComplete(_cfg); + TestJob testJob = (_cfg.getLength() > 1 ? new TestJob(getContext(), _cfg, _pool) : null); + //RebuildJob rebuildJob = new RebuildJob(getContext(), _cfg, _pool); + ExpireJob expireJob = new ExpireJob(getContext(), _cfg, _pool); + _cfg.setTunnelPool(_pool); + _cfg.setTestJob(testJob); + //_cfg.setRebuildJob(rebuildJob); + _cfg.setExpireJob(expireJob); + if (_cfg.getLength() > 1) // no need to test 0 hop tunnels + getContext().jobQueue().addJob(testJob); + //getContext().jobQueue().addJob(rebuildJob); // always try to rebuild (ignored if too many) + getContext().jobQueue().addJob(expireJob); + } + } +} diff --git a/router/java/src/net/i2p/router/tunnel/pool/ClientPeerSelector.java b/router/java/src/net/i2p/router/tunnel/pool/ClientPeerSelector.java index b8c996eac..d4d03e056 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/ClientPeerSelector.java +++ b/router/java/src/net/i2p/router/tunnel/pool/ClientPeerSelector.java @@ -14,6 +14,8 @@ class ClientPeerSelector extends TunnelPeerSelector { int length = getLength(ctx, settings); if (length < 0) return null; + if ( (length == 0) && (settings.getLength()+settings.getLengthVariance() > 0) ) + return null; HashSet matches = new HashSet(length); if (length > 0) { diff --git a/router/java/src/net/i2p/router/tunnel/pool/ExploratoryPeerSelector.java b/router/java/src/net/i2p/router/tunnel/pool/ExploratoryPeerSelector.java index 5e1d0a57f..0e11a561d 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/ExploratoryPeerSelector.java +++ b/router/java/src/net/i2p/router/tunnel/pool/ExploratoryPeerSelector.java @@ -30,7 +30,11 @@ class ExploratoryPeerSelector extends TunnelPeerSelector { Set exclude = getExclude(ctx, settings.isInbound(), settings.isExploratory()); exclude.add(ctx.routerHash()); HashSet matches = new HashSet(length); - ctx.profileOrganizer().selectNotFailingPeers(length, exclude, matches, false); + boolean exploreHighCap = Boolean.valueOf(ctx.getProperty("router.exploreHighCapacity", "false")).booleanValue(); + if (exploreHighCap) + ctx.profileOrganizer().selectHighCapacityPeers(length, exclude, matches); + else + ctx.profileOrganizer().selectNotFailingPeers(length, exclude, matches, false); if (l.shouldLog(Log.DEBUG)) l.debug("profileOrganizer.selectNotFailing(" + length + ") found " + matches); diff --git a/router/java/src/net/i2p/router/tunnel/pool/HandleTunnelCreateMessageJob.java b/router/java/src/net/i2p/router/tunnel/pool/HandleTunnelCreateMessageJob.java index afa3dd7d0..6c90c0d96 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/HandleTunnelCreateMessageJob.java +++ b/router/java/src/net/i2p/router/tunnel/pool/HandleTunnelCreateMessageJob.java @@ -128,11 +128,16 @@ public class HandleTunnelCreateMessageJob extends JobImpl { if (_log.shouldLog(Log.INFO)) _log.info("join as inbound tunnel gateway pointing at " + _request.getNextRouter().toBase64().substring(0,4) + ":" - + _request.getNextTunnelId().getTunnelId() + + _request.getNextTunnelId() + " (nonce=" + _request.getNonce() + ")"); // serve as the inbound tunnel gateway cfg.setSendTo(_request.getNextRouter()); - cfg.setSendTunnelId(DataHelper.toLong(4, _request.getNextTunnelId().getTunnelId())); + TunnelId id = _request.getNextTunnelId(); + if (id == null) { + sendRejection(TunnelHistory.TUNNEL_REJECT_CRIT); + return; + } + cfg.setSendTunnelId(DataHelper.toLong(4, id.getTunnelId())); getContext().tunnelDispatcher().joinInboundGateway(cfg); } else if (_request.getNextRouter() == null) { if (_log.shouldLog(Log.INFO)) @@ -143,11 +148,16 @@ public class HandleTunnelCreateMessageJob extends JobImpl { if (_log.shouldLog(Log.INFO)) _log.info("join as tunnel participant pointing at " + _request.getNextRouter().toBase64().substring(0,4) + ":" - + _request.getNextTunnelId().getTunnelId() + + _request.getNextTunnelId() + " (nonce=" + _request.getNonce() + ")"); // serve as a general participant cfg.setSendTo(_request.getNextRouter()); - cfg.setSendTunnelId(DataHelper.toLong(4, _request.getNextTunnelId().getTunnelId())); + TunnelId id = _request.getNextTunnelId(); + if (id == null) { + sendRejection(TunnelHistory.TUNNEL_REJECT_CRIT); + return; + } + cfg.setSendTunnelId(DataHelper.toLong(4, id.getTunnelId())); getContext().tunnelDispatcher().joinParticipant(cfg); } diff --git a/router/java/src/net/i2p/router/tunnel/pool/OnCreatedJob.java b/router/java/src/net/i2p/router/tunnel/pool/OnCreatedJob.java deleted file mode 100644 index 62d431730..000000000 --- a/router/java/src/net/i2p/router/tunnel/pool/OnCreatedJob.java +++ /dev/null @@ -1,46 +0,0 @@ -package net.i2p.router.tunnel.pool; - -import net.i2p.router.JobImpl; -import net.i2p.router.RouterContext; -import net.i2p.util.Log; - -/** - * The tunnel is fully built, so now add it to our handler, to the pool, and - * build the necessary test and rebuilding jobs. - * - */ -class OnCreatedJob extends JobImpl { - private Log _log; - private TunnelPool _pool; - private PooledTunnelCreatorConfig _cfg; - - public OnCreatedJob(RouterContext ctx, TunnelPool pool, PooledTunnelCreatorConfig cfg) { - super(ctx); - _log = ctx.logManager().getLog(OnCreatedJob.class); - _pool = pool; - _cfg = cfg; - } - public String getName() { return "Tunnel built"; } - public void runJob() { - _log.debug("Created successfully: " + _cfg); - if (_cfg.isInbound()) { - getContext().tunnelDispatcher().joinInbound(_cfg); - } else { - getContext().tunnelDispatcher().joinOutbound(_cfg); - } - - _pool.getManager().buildComplete(_cfg); - _pool.addTunnel(_cfg); - TestJob testJob = (_cfg.getLength() > 1 ? new TestJob(getContext(), _cfg, _pool) : null); - RebuildJob rebuildJob = new RebuildJob(getContext(), _cfg, _pool); - ExpireJob expireJob = new ExpireJob(getContext(), _cfg, _pool); - _cfg.setTunnelPool(_pool); - _cfg.setTestJob(testJob); - _cfg.setRebuildJob(rebuildJob); - _cfg.setExpireJob(expireJob); - if (_cfg.getLength() > 1) // no need to test 0 hop tunnels - getContext().jobQueue().addJob(testJob); - getContext().jobQueue().addJob(rebuildJob); // always try to rebuild (ignored if too many) - getContext().jobQueue().addJob(expireJob); - } -} \ No newline at end of file diff --git a/router/java/src/net/i2p/router/tunnel/pool/PooledTunnelCreatorConfig.java b/router/java/src/net/i2p/router/tunnel/pool/PooledTunnelCreatorConfig.java index 0cd5457d3..698807143 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/PooledTunnelCreatorConfig.java +++ b/router/java/src/net/i2p/router/tunnel/pool/PooledTunnelCreatorConfig.java @@ -13,7 +13,6 @@ public class PooledTunnelCreatorConfig extends TunnelCreatorConfig { private TunnelPool _pool; private boolean _failed; private TestJob _testJob; - private RebuildJob _rebuildJob; private Job _expireJob; /** Creates a new instance of PooledTunnelCreatorConfig */ @@ -48,11 +47,6 @@ public class PooledTunnelCreatorConfig extends TunnelCreatorConfig { // selected again. _expireJob is left to do its thing, in case there // are any straggling messages coming down the tunnel _pool.tunnelFailed(this); - if (_rebuildJob != null) { - // rebuild asap (_rebuildJob will be null if we were just a stopgap) - _rebuildJob.getTiming().setStartAfter(_context.clock().now() + 10*1000); - _context.jobQueue().addJob(_rebuildJob); - } if (_testJob != null) // just in case... _context.jobQueue().removeJob(_testJob); } @@ -61,6 +55,5 @@ public class PooledTunnelCreatorConfig extends TunnelCreatorConfig { public TunnelPool getTunnelPool() { return _pool; } public void setTestJob(TestJob job) { _testJob = job; } - public void setRebuildJob(RebuildJob job) { _rebuildJob = job; } public void setExpireJob(Job job) { _expireJob = job; } } diff --git a/router/java/src/net/i2p/router/tunnel/pool/RebuildJob.java b/router/java/src/net/i2p/router/tunnel/pool/RebuildJob.java deleted file mode 100644 index fd4a6eda3..000000000 --- a/router/java/src/net/i2p/router/tunnel/pool/RebuildJob.java +++ /dev/null @@ -1,61 +0,0 @@ -package net.i2p.router.tunnel.pool; - -import net.i2p.router.JobImpl; -import net.i2p.router.RouterContext; -import net.i2p.router.tunnel.TunnelCreatorConfig; - -/** - * Build a new tunnel to replace the existing one before it expires. This job - * should be removed (or scheduled to run immediately) if the tunnel fails. - * If an exploratory tunnel build at a random time between 3 1/2 and 4 minutes early; - * else if only one tunnel in pool build 4 minutes early; - * otherwise build at a random time between 2 and 4 minutes early. - * Five build attempts in parallel if an exploratory tunnel. - */ -class RebuildJob extends JobImpl { - private TunnelPool _pool; - private TunnelCreatorConfig _cfg; - - public RebuildJob(RouterContext ctx, TunnelCreatorConfig cfg, TunnelPool pool) { - super(ctx); - _pool = pool; - _cfg = cfg; - long rebuildOn; - if (_pool.getSettings().isExploratory()) { - rebuildOn = cfg.getExpiration() - (((pool.getSettings().getRebuildPeriod() * 7) / 2)); - rebuildOn -= ctx.random().nextInt(pool.getSettings().getRebuildPeriod() / 2); - } else if ((pool.getSettings().getQuantity() + pool.getSettings().getBackupQuantity()) == 1) { - rebuildOn = cfg.getExpiration() - (pool.getSettings().getRebuildPeriod() * 4); - } else { - rebuildOn = cfg.getExpiration() - (pool.getSettings().getRebuildPeriod() * 2); - rebuildOn -= ctx.random().nextInt(pool.getSettings().getRebuildPeriod() * 2); - } - getTiming().setStartAfter(rebuildOn); - } - public String getName() { - if (_pool.getSettings().isExploratory()) { - if (_pool.getSettings().isInbound()) { - return "Rebuild exploratory inbound tunnel"; - } else { - return "Rebuild exploratory outbound tunnel"; - } - } else { - StringBuffer rv = new StringBuffer(32); - if (_pool.getSettings().isInbound()) - rv.append("Rebuild inbound client tunnel for "); - else - rv.append("Rebuild outbound client tunnel for "); - if (_pool.getSettings().getDestinationNickname() != null) - rv.append(_pool.getSettings().getDestinationNickname()); - else - rv.append(_pool.getSettings().getDestination().toBase64().substring(0,4)); - return rv.toString(); - } - } - public void runJob() { - if (_pool.getSettings().isExploratory()) - _pool.refreshBuilders(4, 4); - else - _pool.refreshBuilders(1, 4); - } -} diff --git a/router/java/src/net/i2p/router/tunnel/pool/RequestTunnelJob.java b/router/java/src/net/i2p/router/tunnel/pool/RequestTunnelJob.java index 0b8186e25..8d5c6bec5 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/RequestTunnelJob.java +++ b/router/java/src/net/i2p/router/tunnel/pool/RequestTunnelJob.java @@ -45,8 +45,9 @@ public class RequestTunnelJob extends JobImpl { private boolean _isFake; private boolean _isExploratory; - static final int HOP_REQUEST_TIMEOUT = 20*1000; - private static final int LOOKUP_TIMEOUT = 10*1000; + static final int HOP_REQUEST_TIMEOUT_CLIENT = 15*1000; + static final int HOP_REQUEST_TIMEOUT_EXPLORATORY = 10*1000; + private static final int LOOKUP_TIMEOUT = 5*1000; public RequestTunnelJob(RouterContext ctx, TunnelCreatorConfig cfg, Job onCreated, Job onFailed, int hop, boolean isFake, boolean isExploratory) { super(ctx); @@ -58,7 +59,7 @@ public class RequestTunnelJob extends JobImpl { _currentPeer = null; _lookups = 0; _lastSendTime = 0; - _isFake = isFake; + _isFake = isFake || (cfg.getLength() <= 1); _isExploratory = isExploratory; ctx.statManager().createRateStat("tunnel.receiveRejectionProbabalistic", "How often we are rejected probabalistically?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); @@ -77,6 +78,8 @@ public class RequestTunnelJob extends JobImpl { ctx.statManager().createRateStat("tunnel.buildExploratorySuccess3Hop", "How often we succeed building a 3 hop exploratory tunnel?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); ctx.statManager().createRateStat("tunnel.buildPartialTime", "How long a non-exploratory request took to be accepted?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); ctx.statManager().createRateStat("tunnel.buildExploratoryPartialTime", "How long an exploratory request took to be accepted?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); + ctx.statManager().createRateStat("tunnel.buildExploratoryTimeout", "How often a request for an exploratory peer times out?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); + ctx.statManager().createRateStat("tunnel.buildClientTimeout", "How often a request for an exploratory peer times out?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); if (_log.shouldLog(Log.DEBUG)) _log.debug("Requesting hop " + hop + " in " + cfg); @@ -136,7 +139,7 @@ public class RequestTunnelJob extends JobImpl { else getContext().jobQueue().addJob(_onCreated); } - getContext().statManager().addRateData("tunnel.buildSuccess", 1, 0); + //getContext().statManager().addRateData("tunnel.buildSuccess", 1, 0); } } else { // outbound tunnel, we're the gateway and hence the last person asked @@ -176,7 +179,8 @@ public class RequestTunnelJob extends JobImpl { _onCreated.runJob(); else getContext().jobQueue().addJob(_onCreated); - getContext().statManager().addRateData("tunnel.buildSuccess", 1, 0); + if (_config.getLength() > 1) + getContext().statManager().addRateData("tunnel.buildSuccess", 1, 0); } } } @@ -301,15 +305,18 @@ public class RequestTunnelJob extends JobImpl { getContext().jobQueue().addJob(_onCreated); if (_isExploratory) { int i = _config.getLength(); - getContext().statManager().addRateData("tunnel.buildExploratorySuccess", 1, 0); + if (i > 1) + getContext().statManager().addRateData("tunnel.buildExploratorySuccess", 1, 0); if (i == 2) getContext().statManager().addRateData("tunnel.buildExploratorySuccess1Hop", 1, 0); else if (i == 3) getContext().statManager().addRateData("tunnel.buildExploratorySuccess2Hop", 1, 0); else if (i == 4) getContext().statManager().addRateData("tunnel.buildExploratorySuccess3Hop", 1, 0); - } else - getContext().statManager().addRateData("tunnel.buildSuccess", 1, 0); + } else { + if (_config.getLength() > 1) + getContext().statManager().addRateData("tunnel.buildSuccess", 1, 0); + } } } @@ -357,6 +364,10 @@ public class RequestTunnelJob extends JobImpl { if (_log.shouldLog(Log.WARN)) _log.warn("request timeout: " + _config + " at hop " + _currentHop + " with nonce " + _nonce); + if (_isExploratory) + getContext().statManager().addRateData("tunnel.buildExploratoryTimeout", 1, 0); + else + getContext().statManager().addRateData("tunnel.buildClientTimeout", 1, 0); peerFail(0); } } @@ -369,7 +380,7 @@ public class RequestTunnelJob extends JobImpl { public ReplySelector(long nonce) { _nonce = nonce; _nonceFound = false; - _expiration = getContext().clock().now() + HOP_REQUEST_TIMEOUT; + _expiration = getContext().clock().now() + (_isExploratory ? HOP_REQUEST_TIMEOUT_EXPLORATORY : HOP_REQUEST_TIMEOUT_CLIENT); } public boolean continueMatching() { return (!_nonceFound) && (getContext().clock().now() < _expiration); diff --git a/router/java/src/net/i2p/router/tunnel/pool/SendGarlicMessageJob.java b/router/java/src/net/i2p/router/tunnel/pool/SendGarlicMessageJob.java index 48a075d6e..2348f241f 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/SendGarlicMessageJob.java +++ b/router/java/src/net/i2p/router/tunnel/pool/SendGarlicMessageJob.java @@ -35,8 +35,6 @@ class SendGarlicMessageJob extends JobImpl { private SessionKey _sentKey; private Set _sentTags; - private static final int TIMEOUT = RequestTunnelJob.HOP_REQUEST_TIMEOUT; - public SendGarlicMessageJob(RouterContext ctx, I2NPMessage payload, RouterInfo target, MessageSelector selector, ReplyJob onReply, Job onTimeout, SessionKey sentKey, Set sentTags) { super(ctx); _log = ctx.logManager().getLog(SendGarlicMessageJob.class); @@ -61,14 +59,15 @@ class SendGarlicMessageJob extends JobImpl { payload.setRecipient(_target); payload.setDeliveryInstructions(instructions); payload.setRequestAck(false); - payload.setExpiration(getContext().clock().now() + RequestTunnelJob.HOP_REQUEST_TIMEOUT); + payload.setExpiration(_payload.getMessageExpiration()); + int timeout = (int)(payload.getExpiration() - getContext().clock().now()); GarlicMessage msg = GarlicMessageBuilder.buildMessage(getContext(), payload, _sentKey, _sentTags); // so we will look for the reply - OutNetMessage dummyMessage = getContext().messageRegistry().registerPending(_replySelector, _onReply, _onTimeout, TIMEOUT); + OutNetMessage dummyMessage = getContext().messageRegistry().registerPending(_replySelector, _onReply, _onTimeout, timeout); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Scheduling timeout job (" + _onTimeout + ") to be run in " + TIMEOUT + "ms"); + _log.debug("Scheduling timeout job (" + _onTimeout + ") to be run in " + timeout + "ms"); // now find an outbound tunnel and send 'er off TunnelInfo out = getContext().tunnelManager().selectOutboundTunnel(); diff --git a/router/java/src/net/i2p/router/tunnel/pool/TunnelBuilder.java b/router/java/src/net/i2p/router/tunnel/pool/TunnelBuilder.java deleted file mode 100644 index 1d0a5adba..000000000 --- a/router/java/src/net/i2p/router/tunnel/pool/TunnelBuilder.java +++ /dev/null @@ -1,116 +0,0 @@ -package net.i2p.router.tunnel.pool; - -import java.util.ArrayList; -import java.util.List; - -import net.i2p.data.Hash; -import net.i2p.router.RouterContext; -import net.i2p.router.JobImpl; -import net.i2p.router.tunnel.HopConfig; -import net.i2p.router.TunnelPoolSettings; -import net.i2p.util.Log; - -/** - * - */ -public class TunnelBuilder { - /** - * Build a new tunnel per the pool's wishes (using the preferred length, - * peers, ordering, etc). After the tunnel is built, it is added to the - * pool as well as the dispatcher, and the necessary test and maintenance - * jobs are built. This call does not block. - * - */ - public void buildTunnel(RouterContext ctx, TunnelPool pool) { - buildTunnel(ctx, pool, false); - } - public void buildTunnel(RouterContext ctx, TunnelPool pool, boolean zeroHop) { - if (!pool.isAlive()) { - pool.getManager().buildComplete(); - return; - } - // this is probably overkill (ya think?) - pool.refreshSettings(); - - PooledTunnelCreatorConfig cfg = configTunnel(ctx, pool, zeroHop); - if (cfg == null) { - pool.getManager().buildComplete(); - return; - } - OnCreatedJob onCreated = new OnCreatedJob(ctx, pool, cfg); - RetryJob onFailed= new RetryJob(ctx, pool); - // queue up a job to request the endpoint to join the tunnel, which then - // requeues up another job for earlier hops, etc, until it reaches the - // gateway. after the gateway is confirmed, onCreated is fired - RequestTunnelJob req = new RequestTunnelJob(ctx, cfg, onCreated, onFailed, cfg.getLength()-1, zeroHop, pool.getSettings().isExploratory()); - if (zeroHop || (cfg.getLength() <= 1) ) // lets get it done inline, as we /need/ it asap - req.runJob(); - else - ctx.jobQueue().addJob(req); - } - - private PooledTunnelCreatorConfig configTunnel(RouterContext ctx, TunnelPool pool, boolean zeroHop) { - Log log = ctx.logManager().getLog(TunnelBuilder.class); - TunnelPoolSettings settings = pool.getSettings(); - long expiration = ctx.clock().now() + settings.getDuration(); - List peers = null; - - if (zeroHop) { - peers = new ArrayList(1); - peers.add(ctx.routerHash()); - if (log.shouldLog(Log.WARN)) - log.warn("Building failsafe tunnel for " + pool); - } else { - peers = pool.getSelector().selectPeers(ctx, settings); - } - if ( (peers == null) || (peers.size() <= 0) ) { - // no inbound or outbound tunnels to send the request through, and - // the pool is refusing 0 hop tunnels - if (peers == null) { - if (log.shouldLog(Log.ERROR)) - log.error("No peers to put in the new tunnel! selectPeers returned null! boo, hiss! fake=" + zeroHop); - } else { - if (log.shouldLog(Log.ERROR)) - log.error("No peers to put in the new tunnel! selectPeers returned an empty list?! fake=" + zeroHop); - } - return null; - } - - PooledTunnelCreatorConfig cfg = new PooledTunnelCreatorConfig(ctx, peers.size(), settings.isInbound(), settings.getDestination()); - // peers[] is ordered endpoint first, but cfg.getPeer() is ordered gateway first - for (int i = 0; i < peers.size(); i++) { - int j = peers.size() - 1 - i; - cfg.setPeer(j, (Hash)peers.get(i)); - HopConfig hop = cfg.getConfig(j); - hop.setExpiration(expiration); - hop.setIVKey(ctx.keyGenerator().generateSessionKey()); - hop.setLayerKey(ctx.keyGenerator().generateSessionKey()); - // tunnelIds will be updated during building, and as the creator, we - // don't need to worry about prev/next hop - } - cfg.setExpiration(expiration); - - Log l = ctx.logManager().getLog(TunnelBuilder.class); - if (l.shouldLog(Log.DEBUG)) - l.debug("Config contains " + peers + ": " + cfg); - return cfg; - } - - /** - * If the building fails, try, try again. - * - */ - private class RetryJob extends JobImpl { - private TunnelPool _pool; - public RetryJob(RouterContext ctx, TunnelPool pool) { - super(ctx); - _pool = pool; - } - public String getName() { return "Tunnel create failed"; } - public void runJob() { - // yikes, nothing left, lets get some backup (if we're allowed) - _pool.getManager().buildComplete(); - _pool.refreshBuilders(1, 4); - } - } -} diff --git a/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java b/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java index f676c2e5d..c7ba3edf0 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java +++ b/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java @@ -15,6 +15,7 @@ import net.i2p.router.JobImpl; import net.i2p.router.RouterContext; import net.i2p.router.TunnelPoolSettings; import net.i2p.router.TunnelInfo; +import net.i2p.router.tunnel.HopConfig; import net.i2p.util.Log; /** @@ -26,164 +27,50 @@ public class TunnelPool { private TunnelPoolSettings _settings; private ArrayList _tunnels; private TunnelPeerSelector _peerSelector; - private TunnelBuilder _builder; private TunnelPoolManager _manager; private boolean _alive; private long _lifetimeProcessed; - private int _buildsThisMinute; - private long _currentMinute; - private RefreshJob _refreshJob; private TunnelInfo _lastSelected; private long _lastSelectionPeriod; + private int _expireSkew; - /** - * Only 10 builds per minute per pool, even if we have failing tunnels, - * etc. On overflow, the necessary additional tunnels are built by the - * RefreshJob - */ - private static final int MAX_BUILDS_PER_MINUTE = 10; - - public TunnelPool(RouterContext ctx, TunnelPoolManager mgr, TunnelPoolSettings settings, TunnelPeerSelector sel, TunnelBuilder builder) { + public TunnelPool(RouterContext ctx, TunnelPoolManager mgr, TunnelPoolSettings settings, TunnelPeerSelector sel) { _context = ctx; _log = ctx.logManager().getLog(TunnelPool.class); _manager = mgr; _settings = settings; _tunnels = new ArrayList(settings.getLength() + settings.getBackupQuantity()); _peerSelector = sel; - _builder = builder; _alive = false; _lastSelectionPeriod = 0; _lastSelected = null; _lifetimeProcessed = 0; - _buildsThisMinute = 0; - _currentMinute = ctx.clock().now(); - _refreshJob = new RefreshJob(ctx); + _expireSkew = _context.random().nextInt(90*1000); refreshSettings(); } public void startup() { _alive = true; - _refreshJob.getTiming().setStartAfter(_context.clock().now() + 60*1000); - _context.jobQueue().addJob(_refreshJob); - int added = refreshBuilders(0, 0); - if (added <= 0) { + _manager.getExecutor().repoll(); + if (_settings.isInbound() && (_settings.getDestination() != null) ) { // we just reconnected and didn't require any new tunnel builders. // however, we /do/ want a leaseSet, so build one LeaseSet ls = null; synchronized (_tunnels) { - if (_settings.isInbound() && (_settings.getDestination() != null) ) - ls = locked_buildNewLeaseSet(); + ls = locked_buildNewLeaseSet(); } if (ls != null) _context.clientManager().requestLeaseSet(_settings.getDestination(), ls); } } + public void shutdown() { _alive = false; _lastSelectionPeriod = 0; _lastSelected = null; } - /** - * Return number of tunnels expiring greater than - * timeFactor * RebuildPeriod from now - * - */ - private int countUsableTunnels(int timeFactor) { - int valid = 0; - synchronized (_tunnels) { - for (int i = 0; i < _tunnels.size(); i++) { - TunnelInfo info = (TunnelInfo)_tunnels.get(i); - if (info.getExpiration() > _context.clock().now() + (timeFactor * _settings.getRebuildPeriod())) - valid++; - } - } - return valid; - } - - /** - * Fire up as many buildTunnel tasks as necessary, returning how many - * were added. - * Build maxBuild tunnels (0 = unlimited), use timeFactor * RebuildPeriod. - * Fire off up to six extra jobs if an exploratory tunnel is - * requested by RebuildJob or tunnelFailed (maxBuild > 1). - * Throttle builds to a maximum per minute; reduce maximum if job lag is high, - * or if we have network errors which indicate we are disconnected from the network. - * Override pool length setting and build a 1-hop tunnel if time is short. - * - */ - int refreshBuilders(int maxBuild, int timeFactor) { - if ( (_settings.getDestination() != null) && (!_context.clientManager().isLocal(_settings.getDestination())) ) - _alive = false; - if (!_alive) return 0; - // only start up new build tasks if we need more of 'em - int baseTarget = _settings.getQuantity() + _settings.getBackupQuantity(); - int target = baseTarget; - int usableTunnels = countUsableTunnels(timeFactor); - if (_settings.isExploratory() && target > 0 && maxBuild > 1) - target+= 6; - - if ( (target > usableTunnels) ) - if ( (target > usableTunnels) && (_log.shouldLog(Log.INFO)) ) - _log.info(toString() + ": refreshing builders, previously had " + usableTunnels - + ", want a total of " + target + ", creating " - + (target-usableTunnels) + " new ones (" + maxBuild + " max)."); - - if (target > usableTunnels) { - long minute = _context.clock().now(); - minute = minute - (minute % 60*1000); - if (_currentMinute < minute) { - _currentMinute = minute; - _buildsThisMinute = 0; - } - int build = (target - usableTunnels); - if (maxBuild > 0 && build > maxBuild) - build = maxBuild; - int buildThrottle = MAX_BUILDS_PER_MINUTE; - long lag = _context.jobQueue().getMaxLag(); - int netErrors = (int) _context.statManager().getRate("udp.sendException").getRate(60*1000).getLastEventCount(); - if (lag > 3 * 1000 || netErrors > 5) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Throttling tunnel builds lag = " + lag + "; net errors = " + netErrors); - if (_settings.isExploratory()) - buildThrottle = 3; - else - buildThrottle = 1; - } else if (lag > 1 * 1000) { - if (_settings.isExploratory()) - buildThrottle = 5; - else - buildThrottle = 2; - } - if (build > (buildThrottle - _buildsThisMinute)) - build = (buildThrottle - _buildsThisMinute); - if (build <= 0) return 0; - - if ((_settings.isExploratory() && baseTarget > countUsableTunnels(1)) || - ((!_settings.isExploratory()) && baseTarget > countUsableTunnels(0)) || - ((!_settings.isExploratory()) && countUsableTunnels(1) == 0)) - _settings.setLengthOverride(1); - else - _settings.setLengthOverride(0); - - int wanted = build; - build = _manager.allocateBuilds(build); - - if ( (wanted != build) && (_log.shouldLog(Log.ERROR)) ) - _log.error("Wanted to build " + wanted + " tunnels, but throttled down to " - + build + ", due to concurrent requests (cpu overload?)"); - - for (int i = 0; i < build; i++) - _builder.buildTunnel(_context, this); - _buildsThisMinute += build; - - return build; - } else { - return 0; - } - } - TunnelPoolManager getManager() { return _manager; } void refreshSettings() { @@ -222,6 +109,8 @@ public class TunnelPool { */ public TunnelInfo selectTunnel() { return selectTunnel(true); } private TunnelInfo selectTunnel(boolean allowRecurseOnFail) { + boolean avoidZeroHop = ((getSettings().getLength() + getSettings().getLengthVariance()) > 0); + long period = curPeriod(); synchronized (_tunnels) { if (_lastSelectionPeriod == period) { @@ -237,8 +126,21 @@ public class TunnelPool { if (_log.shouldLog(Log.WARN)) _log.warn(toString() + ": No tunnels to select from"); } else { - // pick 'em randomly Collections.shuffle(_tunnels, _context.random()); + + // if there are nonzero hop tunnels and the zero hop tunnels are fallbacks, + // avoid the zero hop tunnels + if (avoidZeroHop) { + for (int i = 0; i < _tunnels.size(); i++) { + TunnelInfo info = (TunnelInfo)_tunnels.get(i); + if ( (info.getLength() > 1) && (info.getExpiration() > _context.clock().now()) ) { + _lastSelected = info; + return info; + } + } + } + // ok, either we are ok using zero hop tunnels, or only fallback tunnels remain. pick 'em + // randomly for (int i = 0; i < _tunnels.size(); i++) { TunnelInfo info = (TunnelInfo)_tunnels.get(i); if (info.getExpiration() > _context.clock().now()) { @@ -287,16 +189,18 @@ public class TunnelPool { } } + /** list of tunnelInfo instances of tunnels currently being built */ + public List listPending() { synchronized (_inProgress) { return new ArrayList(_inProgress); } } + int getTunnelCount() { synchronized (_tunnels) { return _tunnels.size(); } } - public TunnelBuilder getBuilder() { return _builder; } public TunnelPoolSettings getSettings() { return _settings; } public void setSettings(TunnelPoolSettings settings) { _settings = settings; if (_settings != null) { if (_log.shouldLog(Log.INFO)) _log.info(toString() + ": Settings updated on the pool: " + settings); - refreshBuilders(1, 4); // to start/stop new sequences, in case the quantities changed + _manager.getExecutor().repoll(); // in case we need more } } public TunnelPeerSelector getSelector() { return _peerSelector; } @@ -309,7 +213,7 @@ public class TunnelPool { public void addTunnel(TunnelInfo info) { if (_log.shouldLog(Log.DEBUG)) - _log.debug(toString() + ": Adding tunnel " + info); + _log.debug(toString() + ": Adding tunnel " + info, new Exception("Creator")); LeaseSet ls = null; synchronized (_tunnels) { _tunnels.add(info); @@ -356,6 +260,8 @@ public class TunnelPool { } } + _manager.getExecutor().repoll(); + boolean connected = true; if ( (_settings.getDestination() != null) && (!_context.clientManager().isLocal(_settings.getDestination())) ) connected = false; @@ -381,31 +287,14 @@ public class TunnelPool { } } + _manager.tunnelFailed(); + _lifetimeProcessed += cfg.getProcessedMessagesCount(); if (_settings.isInbound() && (_settings.getDestination() != null) ) { if (ls != null) { _context.clientManager().requestLeaseSet(_settings.getDestination(), ls); - if (_settings.isExploratory()) - refreshBuilders(3, 4); - else - refreshBuilders(1, 4); - } else { - if (_log.shouldLog(Log.WARN)) - _log.warn(toString() + ": unable to build a new leaseSet on failure (" + remaining - + " remaining), request a new tunnel"); - if (remaining < _settings.getBackupQuantity() + _settings.getQuantity()) - if (!buildFallback()) - if (_settings.isExploratory()) - refreshBuilders(3, 4); - else - refreshBuilders(1, 4); } - } else { - if (_settings.isExploratory()) - refreshBuilders(3, 4); - else - refreshBuilders(1, 4); } } @@ -421,13 +310,6 @@ public class TunnelPool { } if (ls != null) { _context.clientManager().requestLeaseSet(_settings.getDestination(), ls); - } else { - if (_log.shouldLog(Log.WARN)) - _log.warn(toString() + ": unable to build a new leaseSet on expire (" + remaining - + " remaining), request a new tunnel"); - if ( (remaining < _settings.getBackupQuantity() + _settings.getQuantity()) - && (_settings.getAllowZeroHop()) ) - buildFallback(); } } } @@ -438,17 +320,28 @@ public class TunnelPool { */ boolean buildFallback() { int quantity = _settings.getBackupQuantity() + _settings.getQuantity(); - int usable = countUsableTunnels(1); - if (usable >= quantity) return false; + int usable = 0; + synchronized (_tunnels) { + usable = _tunnels.size(); + } + if (usable > 0) + return false; - if ( (usable == 0) && (_settings.getAllowZeroHop()) ) { + if (_settings.getAllowZeroHop()) { + if ( (_settings.getLength() + _settings.getLengthVariance() > 0) && + (_settings.getDestination() != null) && + (_context.profileOrganizer().countActivePeers() > 0) ) { + // if it is a client tunnel pool and our variance doesn't allow 0 hop, prefer failure to + // 0 hop operation (unless our router is offline) + return false; + } if (_log.shouldLog(Log.INFO)) _log.info(toString() + ": building a fallback tunnel (usable: " + usable + " needed: " + quantity + ")"); - _builder.buildTunnel(_context, this, true); + + // runs inline, since its 0hop + _manager.getExecutor().buildTunnel(this, configureNewTunnel(true)); return true; } - //else - // _builder.buildTunnel(_context, this); return false; } @@ -507,6 +400,163 @@ public class TunnelPool { public long getLifetimeProcessed() { return _lifetimeProcessed; } + /** + * Gather the data to see how many tunnels to build, and then actually compute that value (delegated to + * the countHowManyToBuild function below) + * + */ + public int countHowManyToBuild() { + int wanted = getSettings().getBackupQuantity() + getSettings().getQuantity(); + + boolean allowZeroHop = ((getSettings().getLength() + getSettings().getLengthVariance()) <= 0); + + long expireAfter = _context.clock().now() + (2 * _settings.getRebuildPeriod()); + expireAfter += _expireSkew; + + long earliestExpire = -1; + int live = 0; + int fallback = 0; + int usable = 0; + synchronized (_tunnels) { + boolean enough = _tunnels.size() > wanted; + for (int i = 0; i < _tunnels.size(); i++) { + TunnelInfo info = (TunnelInfo)_tunnels.get(i); + if (info.getExpiration() > expireAfter) { + if (allowZeroHop || (info.getLength() > 1)) { + usable++; + if ( (info.getExpiration() < earliestExpire) || (earliestExpire < 0) ) + earliestExpire = info.getExpiration(); + } + } + live++; + if ( (info.getLength() <= 1) && (info.getExpiration() > expireAfter) ) + fallback++; + } + } + + if (usable < wanted) { + // if we are short on tunnels, build fast + earliestExpire = 0; + } + + int inProgress = 0; + synchronized (_inProgress) { + inProgress = _inProgress.size(); + for (int i = 0; i < _inProgress.size(); i++) { + PooledTunnelCreatorConfig cfg = (PooledTunnelCreatorConfig)_inProgress.get(i); + if (cfg.getLength() <= 1) + fallback++; + } + } + + return countHowManyToBuild(allowZeroHop, earliestExpire, usable, wanted, inProgress, fallback); + } + + /** + * This is the big scary function determining how many new tunnels we want to try to build at this + * point in time, as used by the BuildExecutor + * + * @param allowZeroHop do we normally allow zero hop tunnels? If true, treat fallback tunnels like normal ones + * @param earliestExpire how soon do some of our usable tunnels expire, or, if we are missing tunnels, -1 + * @param usable how many tunnels will be around for a while (may include fallback tunnels) + * @param standardAmount how many tunnels we want to have, in general + * @param inProgress how many tunnels are being built for this pool right now (may include fallback tunnels) + * @param fallback how many zero hop tunnels do we have, or are being built + */ + private int countHowManyToBuild(boolean allowZeroHop, long earliestExpire, int usable, int standardAmount, + int inProgress, int fallback) { + int howMany = 0; + if (allowZeroHop) + howMany = standardAmount - usable; + else + howMany = standardAmount - (usable - fallback); + + int concurrentBuildWeight = 1; + if (howMany > 0) { + long now = _context.clock().now(); + if (earliestExpire - now < 60*1000) + concurrentBuildWeight = 4; // right before expiration, allow up to 4x quantity tunnels to be pending + else if (earliestExpire - now < 120*1000) + concurrentBuildWeight = 3; // allow up to 3x quantity tunnels to be pending from 1-2m + else if (earliestExpire - now < 180*1000) + concurrentBuildWeight = 2; // allow up to 2x quantity tunnels to be pending from 2-3m + + // e.g. we want 3 tunnels, but only have 1 usable, we'd want 2 more. however, if the tunnels + // expire in 90 seconds, we'd act like we wanted 6 (and assume 4 would fail building). + howMany = (howMany * concurrentBuildWeight) - inProgress; + } + + int rv = howMany; + // ok, we're actually swamped with tunnels, so lets hold off on replacing the + // fallback ones for a bit + if ( (usable + inProgress + fallback > 2*standardAmount) && (howMany > 0) ) + rv = 0; + + if (allowZeroHop && (rv > standardAmount)) + rv = standardAmount; + + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Count: rv: " + rv + " howMany " + howMany + " concurrentWeight " + concurrentBuildWeight + + " allow? " + allowZeroHop + " usable " + usable + + " std " + standardAmount + " inProgress " + inProgress + " fallback " + fallback + + " for " + toString()); + + if (rv < 0) + return 0; + return rv; + } + + PooledTunnelCreatorConfig configureNewTunnel() { return configureNewTunnel(false); } + private PooledTunnelCreatorConfig configureNewTunnel(boolean forceZeroHop) { + TunnelPoolSettings settings = getSettings(); + List peers = null; + long expiration = _context.clock().now() + settings.getDuration(); + + if (!forceZeroHop) { + peers = _peerSelector.selectPeers(_context, settings); + if ( (peers == null) || (peers.size() <= 0) ) { + // no inbound or outbound tunnels to send the request through, and + // the pool is refusing 0 hop tunnels + if (peers == null) { + if (_log.shouldLog(Log.ERROR)) + _log.error("No peers to put in the new tunnel! selectPeers returned null! boo, hiss!"); + } else { + if (_log.shouldLog(Log.ERROR)) + _log.error("No peers to put in the new tunnel! selectPeers returned an empty list?!"); + } + return null; + } + } else { + peers = new ArrayList(1); + peers.add(_context.routerHash()); + } + PooledTunnelCreatorConfig cfg = new PooledTunnelCreatorConfig(_context, peers.size(), settings.isInbound(), settings.getDestination()); + // peers[] is ordered endpoint first, but cfg.getPeer() is ordered gateway first + for (int i = 0; i < peers.size(); i++) { + int j = peers.size() - 1 - i; + cfg.setPeer(j, (Hash)peers.get(i)); + HopConfig hop = cfg.getConfig(j); + hop.setExpiration(expiration); + hop.setIVKey(_context.keyGenerator().generateSessionKey()); + hop.setLayerKey(_context.keyGenerator().generateSessionKey()); + // tunnelIds will be updated during building, and as the creator, we + // don't need to worry about prev/next hop + } + cfg.setExpiration(expiration); + + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Config contains " + peers + ": " + cfg); + synchronized (_inProgress) { + _inProgress.add(cfg); + } + return cfg; + } + + private List _inProgress = new ArrayList(); + void buildComplete(PooledTunnelCreatorConfig cfg) { + synchronized (_inProgress) { _inProgress.remove(cfg); } + } + public String toString() { if (_settings.isExploratory()) { if (_settings.isInbound()) @@ -527,27 +577,4 @@ public class TunnelPool { } } - - /** - * We choke the # of rebuilds per pool per minute, so we need this to - * make sure to build enough tunnels. - * - */ - private class RefreshJob extends JobImpl { - public RefreshJob(RouterContext ctx) { - super(ctx); - } - public String getName() { return "Refresh " + TunnelPool.this.toString(); } - public void runJob() { - if (!_alive) return; - int added; - if (_settings.isExploratory()) - added = refreshBuilders(0, 2); - else - added = refreshBuilders(0, 1); - if ( (added > 0) && (_log.shouldLog(Log.WARN)) ) - _log.warn(added + " additional parallel rebuild(s) for " + TunnelPool.this.toString()); - requeue(30*1000); - } - } } diff --git a/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java b/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java index 126b10d83..664d00feb 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java +++ b/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java @@ -24,6 +24,7 @@ import net.i2p.router.TunnelManagerFacade; import net.i2p.router.TunnelPoolSettings; import net.i2p.router.tunnel.HopConfig; import net.i2p.router.tunnel.TunnelCreatorConfig; +import net.i2p.util.I2PThread; import net.i2p.util.Log; /** @@ -38,17 +39,9 @@ public class TunnelPoolManager implements TunnelManagerFacade { private Map _clientOutboundPools; private TunnelPool _inboundExploratory; private TunnelPool _outboundExploratory; - /** how many build requests are in process */ - private int _outstandingBuilds; - /** max # of concurrent build requests */ - private int _maxOutstandingBuilds; private LoadTestManager _loadTestManager; - - private static final String PROP_MAX_OUTSTANDING_BUILDS = "router.tunnel.maxConcurrentBuilds"; - private static final int DEFAULT_MAX_OUTSTANDING_BUILDS = 20; - - private static final String PROP_THROTTLE_CONCURRENT_TUNNELS = "router.tunnel.shouldThrottle"; - private static final boolean DEFAULT_THROTTLE_CONCURRENT_TUNNELS = false; + private BuildExecutor _executor; + private boolean _isShutdown; public TunnelPoolManager(RouterContext ctx) { _context = ctx; @@ -62,19 +55,14 @@ public class TunnelPoolManager implements TunnelManagerFacade { _clientInboundPools = new HashMap(4); _clientOutboundPools = new HashMap(4); - _outstandingBuilds = 0; - _maxOutstandingBuilds = DEFAULT_MAX_OUTSTANDING_BUILDS; - String max = ctx.getProperty(PROP_MAX_OUTSTANDING_BUILDS, String.valueOf(DEFAULT_MAX_OUTSTANDING_BUILDS)); - if (max != null) { - try { - _maxOutstandingBuilds = Integer.parseInt(max); - } catch (NumberFormatException nfe) { - _maxOutstandingBuilds = DEFAULT_MAX_OUTSTANDING_BUILDS; - } - } - _loadTestManager = new LoadTestManager(_context); + _isShutdown = false; + _executor = new BuildExecutor(ctx, this); + I2PThread execThread = new I2PThread(_executor, "BuildExecutor"); + execThread.setDaemon(true); + execThread.start(); + ctx.statManager().createRateStat("tunnel.testSuccessTime", "How long do successful tunnel tests take?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l }); @@ -254,6 +242,8 @@ public class TunnelPoolManager implements TunnelManagerFacade { } public void buildTunnels(Destination client, ClientTunnelSettings settings) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Building tunnels for the client " + client.calculateHash().toBase64() + ": " + settings); Hash dest = client.calculateHash(); settings.getInboundSettings().setDestination(dest); settings.getOutboundSettings().setDestination(dest); @@ -264,7 +254,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { inbound = (TunnelPool)_clientInboundPools.get(dest); if (inbound == null) { inbound = new TunnelPool(_context, this, settings.getInboundSettings(), - new ClientPeerSelector(), new TunnelBuilder()); + new ClientPeerSelector()); _clientInboundPools.put(dest, inbound); } else { inbound.setSettings(settings.getInboundSettings()); @@ -274,7 +264,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { outbound = (TunnelPool)_clientOutboundPools.get(dest); if (outbound == null) { outbound = new TunnelPool(_context, this, settings.getOutboundSettings(), - new ClientPeerSelector(), new TunnelBuilder()); + new ClientPeerSelector()); _clientOutboundPools.put(dest, outbound); } else { outbound.setSettings(settings.getOutboundSettings()); @@ -306,70 +296,34 @@ public class TunnelPoolManager implements TunnelManagerFacade { outbound.shutdown(); } - /** - * Check to make sure we can build this many new tunnels (throttled so - * we don't build too many at a time across all pools). - * - * @param wanted how many tunnels the pool wants to build - * @return how many are allowed to be built - */ - int allocateBuilds(int wanted) { - boolean shouldThrottle = shouldThrottleTunnels(); - if (!shouldThrottle) return wanted; - - synchronized (this) { - if (_outstandingBuilds >= _maxOutstandingBuilds) { - // ok, as a failsafe, always let one through - // nah, its failsafe for a reason. fix the cause. - //_outstandingBuilds++; - //return 1; - return 0; - } - if (_outstandingBuilds + wanted < _maxOutstandingBuilds) { - _outstandingBuilds += wanted; - return wanted; - } else { - int allowed = _maxOutstandingBuilds - _outstandingBuilds; - _outstandingBuilds = _maxOutstandingBuilds; - return allowed; - } - } - } - - private boolean shouldThrottleTunnels() { - Boolean rv = Boolean.valueOf(_context.getProperty(PROP_THROTTLE_CONCURRENT_TUNNELS, ""+DEFAULT_THROTTLE_CONCURRENT_TUNNELS)); - return rv.booleanValue(); - } - void buildComplete(TunnelCreatorConfig cfg) { buildComplete(); _loadTestManager.addTunnelTestCandidate(cfg); } - void buildComplete() { - synchronized (this) { - if (_outstandingBuilds > 0) - _outstandingBuilds--; - } - } - + void buildComplete() {} private static final String PROP_LOAD_TEST = "router.loadTest"; public void startup() { - TunnelBuilder builder = new TunnelBuilder(); + _isShutdown = false; + if (!_executor.isRunning()) { + I2PThread t = new I2PThread(_executor, "BuildExecutor"); + t.setDaemon(true); + t.start(); + } ExploratoryPeerSelector selector = new ExploratoryPeerSelector(); TunnelPoolSettings inboundSettings = new TunnelPoolSettings(); inboundSettings.setIsExploratory(true); inboundSettings.setIsInbound(true); - _inboundExploratory = new TunnelPool(_context, this, inboundSettings, selector, builder); + _inboundExploratory = new TunnelPool(_context, this, inboundSettings, selector); _inboundExploratory.startup(); try { Thread.sleep(3*1000); } catch (InterruptedException ie) {} TunnelPoolSettings outboundSettings = new TunnelPoolSettings(); outboundSettings.setIsExploratory(true); outboundSettings.setIsInbound(false); - _outboundExploratory = new TunnelPool(_context, this, outboundSettings, selector, builder); + _outboundExploratory = new TunnelPool(_context, this, outboundSettings, selector); _outboundExploratory.startup(); // try to build up longer tunnels @@ -399,8 +353,26 @@ public class TunnelPoolManager implements TunnelManagerFacade { _inboundExploratory.shutdown(); if (_outboundExploratory != null) _outboundExploratory.shutdown(); + _isShutdown = true; } + /** list of TunnelPool instances currently in play */ + void listPools(List out) { + synchronized (_clientInboundPools) { + out.addAll(_clientInboundPools.values()); + } + synchronized (_clientOutboundPools) { + out.addAll(_clientOutboundPools.values()); + } + if (_inboundExploratory != null) + out.add(_inboundExploratory); + if (_outboundExploratory != null) + out.add(_outboundExploratory); + } + void tunnelFailed() { _executor.repoll(); } + BuildExecutor getExecutor() { return _executor; } + boolean isShutdown() { return _isShutdown; } + public void renderStatusHTML(Writer out) throws IOException { out.write("

Exploratory tunnels (config):

\n"); renderPool(out, _inboundExploratory, _outboundExploratory); @@ -439,8 +411,13 @@ public class TunnelPoolManager implements TunnelManagerFacade { RateStat rs = _context.statManager().getRate("tunnel.participatingMessageCount"); if (rs != null) processed = (long)rs.getRate(10*60*1000).getLifetimeTotalValue(); + int inactive = 0; for (int i = 0; i < participating.size(); i++) { HopConfig cfg = (HopConfig)participating.get(i); + if (cfg.getProcessedMessagesCount() <= 0) { + inactive++; + continue; + } out.write(""); if (cfg.getReceiveTunnel() != null) out.write("" + cfg.getReceiveTunnel().getTunnelId() +""); @@ -468,6 +445,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { processed += cfg.getProcessedMessagesCount(); } out.write("\n"); + out.write("Inactive participating tunnels: " + inactive + "
\n"); out.write("Lifetime bandwidth usage: " + processed + "KB
\n"); } @@ -512,9 +490,22 @@ public class TunnelPoolManager implements TunnelManagerFacade { else processedOut += info.getProcessedMessagesCount(); } - if (live <= 0) - out.write("No tunnels, waiting for the grace period to end\n"); out.write("\n"); + List pending = in.listPending(); + for (int i = 0; i < pending.size(); i++) { + TunnelInfo info = (TunnelInfo)pending.get(i); + out.write("In progress: " + info.toString() + "
\n"); + } + live += pending.size(); + pending = outPool.listPending(); + for (int i = 0; i < pending.size(); i++) { + TunnelInfo info = (TunnelInfo)pending.get(i); + out.write("In progress: " + info.toString() + "
\n"); + } + live += pending.size(); + + if (live <= 0) + out.write("No tunnels, waiting for the grace period to end
\n"); out.write("Lifetime bandwidth usage: " + processedIn + "KB in, " + processedOut + "KB out
"); } }