* I2PTunnel:

- Limit server blockingHandle threads
      - Run standard server blockingHandles inline
This commit is contained in:
zzz
2010-12-04 02:03:08 +00:00
parent 5ab813179d
commit 467b082e82

View File

@@ -16,6 +16,12 @@ import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ThreadFactory;
import net.i2p.I2PAppContext;
import net.i2p.I2PException;
@@ -47,12 +53,19 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable {
/** default timeout to 3 minutes - override if desired */
protected long readTimeout = DEFAULT_READ_TIMEOUT;
private static final boolean DEFAULT_USE_POOL = false;
/** do we use threads? default true (ignored for standard servers, always false) */
private static final String PROP_USE_POOL = "i2ptunnel.usePool";
private static final boolean DEFAULT_USE_POOL = true;
protected static volatile long __serverId = 0;
/** max number of threads - this many slowlorisses will DOS this server, but too high could OOM the JVM */
private static final String PROP_HANDLER_COUNT = "i2ptunnel.blockingHandlerCount";
private static final int DEFAULT_HANDLER_COUNT = 10;
private static final int DEFAULT_HANDLER_COUNT = 40;
/** min number of threads */
private static final int MIN_HANDLERS = 0;
/** how many accept()s to queue waiting for a thread */
private static final int HANDLER_QUEUE_SIZE = 99;
/** how long to wait before dropping an idle thread */
private static final long HANDLER_KEEPALIVE_MS = 10*1000;
protected I2PTunnelTask task = null;
protected boolean bidir = false;
@@ -68,7 +81,6 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable {
super("Server at " + host + ':' + port, notifyThis, tunnel);
_log = tunnel.getContext().logManager().getLog(getClass());
ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decode(privData));
setUsePool(tunnel);
init(host, port, bais, privData, l);
}
@@ -80,7 +92,6 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable {
EventDispatcher notifyThis, I2PTunnel tunnel) {
super("Server at " + host + ':' + port, notifyThis, tunnel);
_log = tunnel.getContext().logManager().getLog(getClass());
setUsePool(tunnel);
FileInputStream fis = null;
try {
fis = new FileInputStream(privkey);
@@ -101,19 +112,9 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable {
public I2PTunnelServer(InetAddress host, int port, InputStream privData, String privkeyname, Logging l, EventDispatcher notifyThis, I2PTunnel tunnel) {
super("Server at " + host + ':' + port, notifyThis, tunnel);
_log = tunnel.getContext().logManager().getLog(getClass());
setUsePool(tunnel);
init(host, port, privData, privkeyname, l);
}
private void setUsePool(I2PTunnel tunnel) {
String usePool = tunnel.getClientOptions().getProperty("i2ptunnel.usePool");
if (usePool != null)
_usePool = "true".equalsIgnoreCase(usePool);
else
_usePool = DEFAULT_USE_POOL;
}
private static final int RETRY_DELAY = 20*1000;
private static final int MAX_RETRIES = 4;
@@ -145,6 +146,16 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable {
return;
}
// extending classes default to threaded, but for a standard server, we can't get slowlorissed
_usePool = !getClass().equals(I2PTunnelServer.class);
if (_usePool) {
String usePool = getTunnel().getClientOptions().getProperty(PROP_USE_POOL);
if (usePool != null)
_usePool = "true".equalsIgnoreCase(usePool);
else
_usePool = DEFAULT_USE_POOL;
}
// Todo: Can't stop a tunnel from the UI while it's in this loop (no session yet)
int retries = 0;
while (sockMgr == null) {
@@ -201,8 +212,7 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable {
*
*/
public void startRunning() {
Thread t = new I2PAppThread(this);
t.setName("Server " + (++__serverId));
Thread t = new I2PAppThread(this, "Server " + remoteHost + ':' + remotePort, true);
t.start();
}
@@ -238,7 +248,7 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable {
}
return false;
}
l.log("Stopping tunnels for server at " + getTunnel().listenHost + ':' + this.remotePort);
l.log("Stopping tunnels for server at " + this.remoteHost + ':' + this.remotePort);
try {
if (i2pss != null) i2pss.close();
getTunnel().removeSession(sockMgr.getSession());
@@ -261,35 +271,44 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable {
rv = Integer.parseInt(cnt);
if (rv <= 0)
rv = DEFAULT_HANDLER_COUNT;
} catch (NumberFormatException nfe) {
rv = DEFAULT_HANDLER_COUNT;
}
} catch (NumberFormatException nfe) {}
}
return rv;
}
/**
* If usePool is set, this starts the Handler pool and exits.
* Otherwise, this is the accept() loop, and it
* hands each I2P socket to a new thread.
* Note that there is no option to handle each socket in-line, which
* might be appropriate for standard servers that are not filtering headers,
* and are thus unlikely to block.
* If usePool is set, this starts the executor pool.
* Then, do the accept() loop, and either
* hands each I2P socket to the executor or runs it in-line.
*/
public void run() {
I2PServerSocket i2pS_S = sockMgr.getServerSocket();
if (shouldUsePool()) {
int handlers = getHandlerCount();
for (int i = 0; i < handlers; i++) {
I2PAppThread handler = new I2PAppThread(new Handler(i2pS_S), "Handle Server " + i);
handler.start();
ThreadPoolExecutor executor = null;
if (_log.shouldLog(Log.WARN)) {
if (_usePool)
_log.warn("Starting executor with " + getHandlerCount() + " threads max");
else
_log.warn("Threads disabled, running blockingHandles inline");
}
} else {
while (true) {
if (_usePool) {
executor = new CustomThreadPoolExecutor(getHandlerCount(), "ServerHandler pool " + remoteHost + ':' + remotePort);
}
while (open) {
try {
final I2PSocket i2ps = i2pS_S.accept();
if (i2ps == null) throw new I2PException("I2PServerSocket closed");
new I2PAppThread(new Runnable() { public void run() { blockingHandle(i2ps); } }).start();
if (_usePool) {
try {
executor.execute(new Handler(i2ps));
} catch (RejectedExecutionException ree) {
if (open && _log.shouldLog(Log.ERROR))
_log.error("ServerHandler queue full for " + remoteHost + ':' + remotePort +
"; increase " + PROP_HANDLER_COUNT + '?', ree);
}
} else {
// use only for standard servers that can't get slowlorissed! Not for http or irc
blockingHandle(i2ps);
}
} catch (I2PException ipe) {
if (_log.shouldLog(Log.ERROR))
_log.error("Error accepting - KILLING THE TUNNEL SERVER", ipe);
@@ -298,37 +317,59 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable {
if (_log.shouldLog(Log.ERROR))
_log.error("Error accepting", ce);
// not killing the server..
try {
Thread.currentThread().sleep(500);
} catch (InterruptedException ie) {}
} catch(SocketTimeoutException ste) {
// ignored, we never set the timeout
}
}
if (executor != null)
executor.shutdownNow();
}
/**
* Not really needed for now but in case we want to add some hooks like afterExecute().
* By default, sets up a pool of 0-10 threads with a max queue of 90 and
* an idle time expiration of 5 seconds.
*/
private static class CustomThreadPoolExecutor extends ThreadPoolExecutor {
public CustomThreadPoolExecutor(int max, String name) {
super(MIN_HANDLERS, max, HANDLER_KEEPALIVE_MS, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue(HANDLER_QUEUE_SIZE), new CustomThreadFactory(name));
}
}
/** just to set the name and set Daemon */
private static class CustomThreadFactory implements ThreadFactory {
private String _name;
public CustomThreadFactory(String name) {
_name = name;
}
public Thread newThread(Runnable r) {
Thread rv = Executors.defaultThreadFactory().newThread(r);
rv.setName(_name);
rv.setDaemon(true);
return rv;
}
}
public boolean shouldUsePool() { return _usePool; }
/**
* Minor thread pool to pull off the accept() concurrently.
* This is NOT used by default; enable and set number of Handlers
* in the options.
* Run the blockingHandler.
*/
private class Handler implements Runnable {
private I2PServerSocket _serverSocket;
public Handler(I2PServerSocket serverSocket) {
_serverSocket = serverSocket;
private I2PSocket _i2ps;
public Handler(I2PSocket socket) {
_i2ps = socket;
}
public void run() {
while (open) {
try {
blockingHandle(_serverSocket.accept());
} catch (I2PException ex) {
_log.error("Error while waiting for I2PConnections", ex);
return;
} catch (IOException ex) {
_log.error("Error while waiting for I2PConnections", ex);
return;
}
}
blockingHandle(_i2ps);
}
}