From 467b082e82af8a1f4d7f3d4246b7eee013d1224f Mon Sep 17 00:00:00 2001 From: zzz Date: Sat, 4 Dec 2010 02:03:08 +0000 Subject: [PATCH] * I2PTunnel: - Limit server blockingHandle threads - Run standard server blockingHandles inline --- .../net/i2p/i2ptunnel/I2PTunnelServer.java | 175 +++++++++++------- 1 file changed, 108 insertions(+), 67 deletions(-) diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java index bce682eef..ab013f536 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java @@ -16,6 +16,12 @@ import java.net.SocketException; import java.net.SocketTimeoutException; import java.util.Iterator; import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ThreadFactory; import net.i2p.I2PAppContext; import net.i2p.I2PException; @@ -47,12 +53,19 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { /** default timeout to 3 minutes - override if desired */ protected long readTimeout = DEFAULT_READ_TIMEOUT; - private static final boolean DEFAULT_USE_POOL = false; + /** do we use threads? default true (ignored for standard servers, always false) */ + private static final String PROP_USE_POOL = "i2ptunnel.usePool"; + private static final boolean DEFAULT_USE_POOL = true; protected static volatile long __serverId = 0; + /** max number of threads - this many slowlorisses will DOS this server, but too high could OOM the JVM */ private static final String PROP_HANDLER_COUNT = "i2ptunnel.blockingHandlerCount"; - private static final int DEFAULT_HANDLER_COUNT = 10; - - + private static final int DEFAULT_HANDLER_COUNT = 40; + /** min number of threads */ + private static final int MIN_HANDLERS = 0; + /** how many accept()s to queue waiting for a thread */ + private static final int HANDLER_QUEUE_SIZE = 99; + /** how long to wait before dropping an idle thread */ + private static final long HANDLER_KEEPALIVE_MS = 10*1000; protected I2PTunnelTask task = null; protected boolean bidir = false; @@ -68,7 +81,6 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { super("Server at " + host + ':' + port, notifyThis, tunnel); _log = tunnel.getContext().logManager().getLog(getClass()); ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decode(privData)); - setUsePool(tunnel); init(host, port, bais, privData, l); } @@ -80,7 +92,6 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { EventDispatcher notifyThis, I2PTunnel tunnel) { super("Server at " + host + ':' + port, notifyThis, tunnel); _log = tunnel.getContext().logManager().getLog(getClass()); - setUsePool(tunnel); FileInputStream fis = null; try { fis = new FileInputStream(privkey); @@ -101,19 +112,9 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { public I2PTunnelServer(InetAddress host, int port, InputStream privData, String privkeyname, Logging l, EventDispatcher notifyThis, I2PTunnel tunnel) { super("Server at " + host + ':' + port, notifyThis, tunnel); _log = tunnel.getContext().logManager().getLog(getClass()); - setUsePool(tunnel); init(host, port, privData, privkeyname, l); } - - private void setUsePool(I2PTunnel tunnel) { - String usePool = tunnel.getClientOptions().getProperty("i2ptunnel.usePool"); - if (usePool != null) - _usePool = "true".equalsIgnoreCase(usePool); - else - _usePool = DEFAULT_USE_POOL; - } - private static final int RETRY_DELAY = 20*1000; private static final int MAX_RETRIES = 4; @@ -145,6 +146,16 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { return; } + // extending classes default to threaded, but for a standard server, we can't get slowlorissed + _usePool = !getClass().equals(I2PTunnelServer.class); + if (_usePool) { + String usePool = getTunnel().getClientOptions().getProperty(PROP_USE_POOL); + if (usePool != null) + _usePool = "true".equalsIgnoreCase(usePool); + else + _usePool = DEFAULT_USE_POOL; + } + // Todo: Can't stop a tunnel from the UI while it's in this loop (no session yet) int retries = 0; while (sockMgr == null) { @@ -201,8 +212,7 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { * */ public void startRunning() { - Thread t = new I2PAppThread(this); - t.setName("Server " + (++__serverId)); + Thread t = new I2PAppThread(this, "Server " + remoteHost + ':' + remotePort, true); t.start(); } @@ -238,7 +248,7 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { } return false; } - l.log("Stopping tunnels for server at " + getTunnel().listenHost + ':' + this.remotePort); + l.log("Stopping tunnels for server at " + this.remoteHost + ':' + this.remotePort); try { if (i2pss != null) i2pss.close(); getTunnel().removeSession(sockMgr.getSession()); @@ -261,74 +271,105 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { rv = Integer.parseInt(cnt); if (rv <= 0) rv = DEFAULT_HANDLER_COUNT; - } catch (NumberFormatException nfe) { - rv = DEFAULT_HANDLER_COUNT; - } + } catch (NumberFormatException nfe) {} } return rv; } /** - * If usePool is set, this starts the Handler pool and exits. - * Otherwise, this is the accept() loop, and it - * hands each I2P socket to a new thread. - * Note that there is no option to handle each socket in-line, which - * might be appropriate for standard servers that are not filtering headers, - * and are thus unlikely to block. + * If usePool is set, this starts the executor pool. + * Then, do the accept() loop, and either + * hands each I2P socket to the executor or runs it in-line. */ public void run() { I2PServerSocket i2pS_S = sockMgr.getServerSocket(); - if (shouldUsePool()) { - int handlers = getHandlerCount(); - for (int i = 0; i < handlers; i++) { - I2PAppThread handler = new I2PAppThread(new Handler(i2pS_S), "Handle Server " + i); - handler.start(); - } - } else { - while (true) { - try { - final I2PSocket i2ps = i2pS_S.accept(); - if (i2ps == null) throw new I2PException("I2PServerSocket closed"); - new I2PAppThread(new Runnable() { public void run() { blockingHandle(i2ps); } }).start(); - } catch (I2PException ipe) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Error accepting - KILLING THE TUNNEL SERVER", ipe); - return; - } catch (ConnectException ce) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Error accepting", ce); - // not killing the server.. - } catch(SocketTimeoutException ste) { - // ignored, we never set the timeout + ThreadPoolExecutor executor = null; + if (_log.shouldLog(Log.WARN)) { + if (_usePool) + _log.warn("Starting executor with " + getHandlerCount() + " threads max"); + else + _log.warn("Threads disabled, running blockingHandles inline"); + } + if (_usePool) { + executor = new CustomThreadPoolExecutor(getHandlerCount(), "ServerHandler pool " + remoteHost + ':' + remotePort); + } + while (open) { + try { + final I2PSocket i2ps = i2pS_S.accept(); + if (i2ps == null) throw new I2PException("I2PServerSocket closed"); + if (_usePool) { + try { + executor.execute(new Handler(i2ps)); + } catch (RejectedExecutionException ree) { + if (open && _log.shouldLog(Log.ERROR)) + _log.error("ServerHandler queue full for " + remoteHost + ':' + remotePort + + "; increase " + PROP_HANDLER_COUNT + '?', ree); + } + } else { + // use only for standard servers that can't get slowlorissed! Not for http or irc + blockingHandle(i2ps); } + } catch (I2PException ipe) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Error accepting - KILLING THE TUNNEL SERVER", ipe); + return; + } catch (ConnectException ce) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Error accepting", ce); + // not killing the server.. + try { + Thread.currentThread().sleep(500); + } catch (InterruptedException ie) {} + } catch(SocketTimeoutException ste) { + // ignored, we never set the timeout } } + if (executor != null) + executor.shutdownNow(); } + /** + * Not really needed for now but in case we want to add some hooks like afterExecute(). + * By default, sets up a pool of 0-10 threads with a max queue of 90 and + * an idle time expiration of 5 seconds. + */ + private static class CustomThreadPoolExecutor extends ThreadPoolExecutor { + public CustomThreadPoolExecutor(int max, String name) { + super(MIN_HANDLERS, max, HANDLER_KEEPALIVE_MS, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(HANDLER_QUEUE_SIZE), new CustomThreadFactory(name)); + } + } + + /** just to set the name and set Daemon */ + private static class CustomThreadFactory implements ThreadFactory { + private String _name; + + public CustomThreadFactory(String name) { + _name = name; + } + + public Thread newThread(Runnable r) { + Thread rv = Executors.defaultThreadFactory().newThread(r); + rv.setName(_name); + rv.setDaemon(true); + return rv; + } + } + public boolean shouldUsePool() { return _usePool; } /** - * Minor thread pool to pull off the accept() concurrently. - * This is NOT used by default; enable and set number of Handlers - * in the options. + * Run the blockingHandler. */ private class Handler implements Runnable { - private I2PServerSocket _serverSocket; - public Handler(I2PServerSocket serverSocket) { - _serverSocket = serverSocket; + private I2PSocket _i2ps; + + public Handler(I2PSocket socket) { + _i2ps = socket; } + public void run() { - while (open) { - try { - blockingHandle(_serverSocket.accept()); - } catch (I2PException ex) { - _log.error("Error while waiting for I2PConnections", ex); - return; - } catch (IOException ex) { - _log.error("Error while waiting for I2PConnections", ex); - return; - } - } + blockingHandle(_i2ps); } }