From eadf472dd085271f688fb2f321e5e978e44006ea Mon Sep 17 00:00:00 2001 From: zzz Date: Sat, 4 Dec 2010 18:47:22 +0000 Subject: [PATCH] * I2PTunnel: - For clients, use a common thread pool that expires idle threads rather than keeping 5 accept() threads for each client. This also removes the configurable (30s default) max wait time for a socket, this may have to be restored. - Use pool for HTTP decompression also. --- .../i2ptunnel/HTTPResponseOutputStream.java | 11 +- .../i2p/i2ptunnel/I2PTunnelClientBase.java | 191 +++++------------- 2 files changed, 61 insertions(+), 141 deletions(-) diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/HTTPResponseOutputStream.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/HTTPResponseOutputStream.java index 890a4f141..7fc808643 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/HTTPResponseOutputStream.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/HTTPResponseOutputStream.java @@ -16,6 +16,7 @@ import java.io.OutputStream; import java.io.PipedInputStream; import java.io.PipedOutputStream; import java.util.zip.GZIPInputStream; +import java.util.concurrent.RejectedExecutionException; import net.i2p.I2PAppContext; import net.i2p.data.ByteArray; @@ -228,7 +229,15 @@ class HTTPResponseOutputStream extends FilterOutputStream { //out.flush(); PipedInputStream pi = new PipedInputStream(); PipedOutputStream po = new PipedOutputStream(pi); - new I2PAppThread(new Pusher(pi, out), "HTTP decompressor").start(); + // Run in the client thread pool, as there should be an unused thread + // there after the accept(). + // Overridden in I2PTunnelHTTPServer, where it does not use the client pool. + try { + I2PTunnelClientBase._executor.execute(new Pusher(pi, out)); + } catch (RejectedExecutionException ree) { + // shouldn't happen + throw ree; + } out = po; } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java index 2dcb2af15..af89cb2a2 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java @@ -17,6 +17,13 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Properties; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ThreadFactory; import net.i2p.I2PAppContext; import net.i2p.I2PException; @@ -64,35 +71,24 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna private String handlerName; private String privKeyFile; - // private Object conLock = new Object(); - - /** List of Socket for those accept()ed but not yet started up */ - protected final List _waitingSockets = new ArrayList(4); // FIXME should be final and use a factory. FIXME - /** How many connections will we allow to be in the process of being built at once? */ - private int _numConnectionBuilders; - /** How long will we allow sockets to sit in the _waitingSockets map before killing them? */ - private int _maxWaitTime; - - /** - * How many concurrent connections this I2PTunnel instance will allow to be - * in the process of connecting (or if less than 1, there is no limit)? - */ - public static final String PROP_NUM_CONNECTION_BUILDERS = "i2ptunnel.numConnectionBuilders"; - /** - * How long will we let a socket wait after being accept()ed without getting - * pumped through a connection builder (in milliseconds). If this time is - * reached, the socket is unceremoniously closed and discarded. If the max - * wait time is less than 1, there is no limit. - * - */ - public static final String PROP_MAX_WAIT_TIME = "i2ptunnel.maxWaitTime"; - - private static final int DEFAULT_NUM_CONNECTION_BUILDERS = 5; - private static final int DEFAULT_MAX_WAIT_TIME = 30*1000; - // true if we are chained from a server. private boolean chained = false; + /** how long to wait before dropping an idle thread */ + private static final long HANDLER_KEEPALIVE_MS = 2*60*1000; + + /** + * We keep a static pool of socket handlers for all clients, + * as there is no need for isolation on the client side. + * Extending classes may use it for other purposes. + * Not for use by servers, as there is no limit on threads. + */ + static final Executor _executor; + private static int _executorThreadCount; + static { + _executor = new CustomThreadPoolExecutor(); + } + public I2PTunnelClientBase(int localPort, Logging l, I2PSocketManager sktMgr, I2PTunnel tunnel, EventDispatcher notifyThis, long clientId ) throws IllegalArgumentException { @@ -111,8 +107,7 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna _context.statManager().createRateStat("i2ptunnel.client.buildRunTime", "How long it takes to run a queued socket into an i2ptunnel runner?", "I2PTunnel", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _log = _context.logManager().getLog(getClass()); - Thread t = new I2PAppThread(this); - t.setName("Client " + _clientId); + Thread t = new I2PAppThread(this, "Client " + tunnel.listenHost + ':' + localPort); listenerReady = false; t.start(); open = true; @@ -126,8 +121,6 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna } } - configurePool(tunnel); - if (open && listenerReady) { l.log("Client ready, listening on " + tunnel.listenHost + ':' + localPort); notifyEvent("openBaseClientResult", "ok"); @@ -136,6 +129,7 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna notifyEvent("openBaseClientResult", "error"); } } + public I2PTunnelClientBase(int localPort, boolean ownDest, Logging l, EventDispatcher notifyThis, String handlerName, I2PTunnel tunnel) throws IllegalArgumentException { @@ -212,8 +206,6 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna } } - configurePool(tunnel); - if (open && listenerReady) { if (openNow) l.log("Client ready, listening on " + tunnel.listenHost + ':' + localPort); @@ -226,37 +218,6 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna } } - /** - * build and configure the pool handling accept()ed but not yet - * established connections - * - */ - private void configurePool(I2PTunnel tunnel) { - //_waitingSockets = new ArrayList(4); - - Properties opts = tunnel.getClientOptions(); - String maxWait = opts.getProperty(PROP_MAX_WAIT_TIME, DEFAULT_MAX_WAIT_TIME+""); - try { - _maxWaitTime = Integer.parseInt(maxWait); - } catch (NumberFormatException nfe) { - _maxWaitTime = DEFAULT_MAX_WAIT_TIME; - } - - String numBuild = opts.getProperty(PROP_NUM_CONNECTION_BUILDERS, DEFAULT_NUM_CONNECTION_BUILDERS+""); - try { - _numConnectionBuilders = Integer.parseInt(numBuild); - } catch (NumberFormatException nfe) { - _numConnectionBuilders = DEFAULT_NUM_CONNECTION_BUILDERS; - } - - for (int i = 0; i < _numConnectionBuilders; i++) { - String name = "ClientBuilder" + _clientId + '.' + i; - I2PAppThread b = new I2PAppThread(new TunnelConnectionBuilder(), name); - b.setDaemon(true); - b.start(); - } - } - /** * Sets the this.sockMgr field if it is null, or if we want a new one * @@ -543,7 +504,6 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna synchronized (this) { notifyAll(); } - synchronized (_waitingSockets) { _waitingSockets.notifyAll(); } return; } ss = new ServerSocket(localPort, 0, addr); @@ -572,12 +532,9 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna } } - while (true) { + while (open) { Socket s = ss.accept(); - long before = System.currentTimeMillis(); manageConnection(s); - long total = System.currentTimeMillis() - before; - _context.statManager().addRateData("i2ptunnel.client.manageTime", total, total); } } catch (IOException ex) { if (open) { @@ -592,9 +549,6 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna notifyAll(); } } - synchronized (_waitingSockets) { - _waitingSockets.notifyAll(); - } } /** @@ -604,24 +558,38 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna */ protected void manageConnection(Socket s) { if (s == null) return; - if (_numConnectionBuilders <= 0) { - new I2PAppThread(new BlockingRunner(s), "Clinet run").start(); - return; + try { + _executor.execute(new BlockingRunner(s)); + } catch (RejectedExecutionException ree) { + // should never happen, we have an unbounded pool and never stop the executor + try { + s.close(); + } catch (IOException ioe) {} } - - if (_maxWaitTime > 0) - SimpleScheduler.getInstance().addEvent(new CloseEvent(s), _maxWaitTime); + } - synchronized (_waitingSockets) { - _waitingSockets.add(s); - _waitingSockets.notifyAll(); + /** + * Not really needed for now but in case we want to add some hooks like afterExecute(). + */ + private static class CustomThreadPoolExecutor extends ThreadPoolExecutor { + public CustomThreadPoolExecutor() { + super(0, Integer.MAX_VALUE, HANDLER_KEEPALIVE_MS, TimeUnit.MILLISECONDS, + new SynchronousQueue(), new CustomThreadFactory()); + } + } + + /** just to set the name and set Daemon */ + private static class CustomThreadFactory implements ThreadFactory { + public Thread newThread(Runnable r) { + Thread rv = Executors.defaultThreadFactory().newThread(r); + rv.setName("I2PTunnel Client Runner " + (++_executorThreadCount)); + rv.setDaemon(true); + return rv; } } /** - * Blocking runner, used during the connection establishment whenever we - * are not using the queued builders. - * + * Blocking runner, used during the connection establishment */ private class BlockingRunner implements Runnable { private Socket _s; @@ -631,32 +599,6 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna } } - /** - * Remove and close the socket from the waiting list, if it is still there. - * - */ - private class CloseEvent implements SimpleTimer.TimedEvent { - private Socket _s; - public CloseEvent(Socket s) { _s = s; } - public void timeReached() { - int remaining = 0; - boolean stillWaiting = false; - synchronized (_waitingSockets) { - stillWaiting = _waitingSockets.remove(_s); - remaining = _waitingSockets.size(); - } - if (stillWaiting) { - try { _s.close(); } catch (IOException ioe) {} - if (_log.shouldLog(Log.INFO)) { - _context.statManager().addRateData("i2ptunnel.client.closeBacklog", remaining, 0); - _log.info("Closed a waiting socket because of backlog"); - } - } else { - _context.statManager().addRateData("i2ptunnel.client.closeNoBacklog", remaining, 0); - } - } - } - public boolean close(boolean forced) { if (_log.shouldLog(Log.INFO)) _log.info("close() called: forced = " + forced + " open = " + open + " sockMgr = " + sockMgr); @@ -694,7 +636,6 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna //l.log("Client closed."); } - synchronized (_waitingSockets) { _waitingSockets.notifyAll(); } return true; } @@ -706,36 +647,6 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna } } - /** - * Pool runner pulling sockets off the waiting list and pushing them - * through clientConnectionRun. This dies when the I2PTunnel instance - * is closed. - * - */ - private class TunnelConnectionBuilder implements Runnable { - public void run() { - Socket s = null; - while (open) { - try { - synchronized (_waitingSockets) { - if (_waitingSockets.isEmpty()) - _waitingSockets.wait(); - else - s = (Socket)_waitingSockets.remove(0); - } - } catch (InterruptedException ie) {} - - if (s != null) { - long before = System.currentTimeMillis(); - clientConnectionRun(s); - long total = System.currentTimeMillis() - before; - _context.statManager().addRateData("i2ptunnel.client.buildRunTime", total, 0); - } - s = null; - } - } - } - /** * Manage a connection in a separate thread. This only works if * you do not override manageConnection()