* I2PTunnel:

- For clients, use a common thread pool that expires idle threads
        rather than keeping 5 accept() threads for each client.
        This also removes the configurable (30s default) max wait time
        for a socket, this may have to be restored.
      - Use pool for HTTP decompression also.
This commit is contained in:
zzz
2010-12-04 18:47:22 +00:00
parent 04ea1fb9ca
commit eadf472dd0
2 changed files with 61 additions and 141 deletions

View File

@@ -16,6 +16,7 @@ import java.io.OutputStream;
import java.io.PipedInputStream; import java.io.PipedInputStream;
import java.io.PipedOutputStream; import java.io.PipedOutputStream;
import java.util.zip.GZIPInputStream; import java.util.zip.GZIPInputStream;
import java.util.concurrent.RejectedExecutionException;
import net.i2p.I2PAppContext; import net.i2p.I2PAppContext;
import net.i2p.data.ByteArray; import net.i2p.data.ByteArray;
@@ -228,7 +229,15 @@ class HTTPResponseOutputStream extends FilterOutputStream {
//out.flush(); //out.flush();
PipedInputStream pi = new PipedInputStream(); PipedInputStream pi = new PipedInputStream();
PipedOutputStream po = new PipedOutputStream(pi); PipedOutputStream po = new PipedOutputStream(pi);
new I2PAppThread(new Pusher(pi, out), "HTTP decompressor").start(); // Run in the client thread pool, as there should be an unused thread
// there after the accept().
// Overridden in I2PTunnelHTTPServer, where it does not use the client pool.
try {
I2PTunnelClientBase._executor.execute(new Pusher(pi, out));
} catch (RejectedExecutionException ree) {
// shouldn't happen
throw ree;
}
out = po; out = po;
} }

View File

@@ -17,6 +17,13 @@ import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ThreadFactory;
import net.i2p.I2PAppContext; import net.i2p.I2PAppContext;
import net.i2p.I2PException; import net.i2p.I2PException;
@@ -64,35 +71,24 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
private String handlerName; private String handlerName;
private String privKeyFile; private String privKeyFile;
// private Object conLock = new Object();
/** List of Socket for those accept()ed but not yet started up */
protected final List _waitingSockets = new ArrayList(4); // FIXME should be final and use a factory. FIXME
/** How many connections will we allow to be in the process of being built at once? */
private int _numConnectionBuilders;
/** How long will we allow sockets to sit in the _waitingSockets map before killing them? */
private int _maxWaitTime;
/**
* How many concurrent connections this I2PTunnel instance will allow to be
* in the process of connecting (or if less than 1, there is no limit)?
*/
public static final String PROP_NUM_CONNECTION_BUILDERS = "i2ptunnel.numConnectionBuilders";
/**
* How long will we let a socket wait after being accept()ed without getting
* pumped through a connection builder (in milliseconds). If this time is
* reached, the socket is unceremoniously closed and discarded. If the max
* wait time is less than 1, there is no limit.
*
*/
public static final String PROP_MAX_WAIT_TIME = "i2ptunnel.maxWaitTime";
private static final int DEFAULT_NUM_CONNECTION_BUILDERS = 5;
private static final int DEFAULT_MAX_WAIT_TIME = 30*1000;
// true if we are chained from a server. // true if we are chained from a server.
private boolean chained = false; private boolean chained = false;
/** how long to wait before dropping an idle thread */
private static final long HANDLER_KEEPALIVE_MS = 2*60*1000;
/**
* We keep a static pool of socket handlers for all clients,
* as there is no need for isolation on the client side.
* Extending classes may use it for other purposes.
* Not for use by servers, as there is no limit on threads.
*/
static final Executor _executor;
private static int _executorThreadCount;
static {
_executor = new CustomThreadPoolExecutor();
}
public I2PTunnelClientBase(int localPort, Logging l, I2PSocketManager sktMgr, public I2PTunnelClientBase(int localPort, Logging l, I2PSocketManager sktMgr,
I2PTunnel tunnel, EventDispatcher notifyThis, long clientId ) I2PTunnel tunnel, EventDispatcher notifyThis, long clientId )
throws IllegalArgumentException { throws IllegalArgumentException {
@@ -111,8 +107,7 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
_context.statManager().createRateStat("i2ptunnel.client.buildRunTime", "How long it takes to run a queued socket into an i2ptunnel runner?", "I2PTunnel", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("i2ptunnel.client.buildRunTime", "How long it takes to run a queued socket into an i2ptunnel runner?", "I2PTunnel", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_log = _context.logManager().getLog(getClass()); _log = _context.logManager().getLog(getClass());
Thread t = new I2PAppThread(this); Thread t = new I2PAppThread(this, "Client " + tunnel.listenHost + ':' + localPort);
t.setName("Client " + _clientId);
listenerReady = false; listenerReady = false;
t.start(); t.start();
open = true; open = true;
@@ -126,8 +121,6 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
} }
} }
configurePool(tunnel);
if (open && listenerReady) { if (open && listenerReady) {
l.log("Client ready, listening on " + tunnel.listenHost + ':' + localPort); l.log("Client ready, listening on " + tunnel.listenHost + ':' + localPort);
notifyEvent("openBaseClientResult", "ok"); notifyEvent("openBaseClientResult", "ok");
@@ -136,6 +129,7 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
notifyEvent("openBaseClientResult", "error"); notifyEvent("openBaseClientResult", "error");
} }
} }
public I2PTunnelClientBase(int localPort, boolean ownDest, Logging l, public I2PTunnelClientBase(int localPort, boolean ownDest, Logging l,
EventDispatcher notifyThis, String handlerName, EventDispatcher notifyThis, String handlerName,
I2PTunnel tunnel) throws IllegalArgumentException { I2PTunnel tunnel) throws IllegalArgumentException {
@@ -212,8 +206,6 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
} }
} }
configurePool(tunnel);
if (open && listenerReady) { if (open && listenerReady) {
if (openNow) if (openNow)
l.log("Client ready, listening on " + tunnel.listenHost + ':' + localPort); l.log("Client ready, listening on " + tunnel.listenHost + ':' + localPort);
@@ -226,37 +218,6 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
} }
} }
/**
* build and configure the pool handling accept()ed but not yet
* established connections
*
*/
private void configurePool(I2PTunnel tunnel) {
//_waitingSockets = new ArrayList(4);
Properties opts = tunnel.getClientOptions();
String maxWait = opts.getProperty(PROP_MAX_WAIT_TIME, DEFAULT_MAX_WAIT_TIME+"");
try {
_maxWaitTime = Integer.parseInt(maxWait);
} catch (NumberFormatException nfe) {
_maxWaitTime = DEFAULT_MAX_WAIT_TIME;
}
String numBuild = opts.getProperty(PROP_NUM_CONNECTION_BUILDERS, DEFAULT_NUM_CONNECTION_BUILDERS+"");
try {
_numConnectionBuilders = Integer.parseInt(numBuild);
} catch (NumberFormatException nfe) {
_numConnectionBuilders = DEFAULT_NUM_CONNECTION_BUILDERS;
}
for (int i = 0; i < _numConnectionBuilders; i++) {
String name = "ClientBuilder" + _clientId + '.' + i;
I2PAppThread b = new I2PAppThread(new TunnelConnectionBuilder(), name);
b.setDaemon(true);
b.start();
}
}
/** /**
* Sets the this.sockMgr field if it is null, or if we want a new one * Sets the this.sockMgr field if it is null, or if we want a new one
* *
@@ -543,7 +504,6 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
synchronized (this) { synchronized (this) {
notifyAll(); notifyAll();
} }
synchronized (_waitingSockets) { _waitingSockets.notifyAll(); }
return; return;
} }
ss = new ServerSocket(localPort, 0, addr); ss = new ServerSocket(localPort, 0, addr);
@@ -572,12 +532,9 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
} }
} }
while (true) { while (open) {
Socket s = ss.accept(); Socket s = ss.accept();
long before = System.currentTimeMillis();
manageConnection(s); manageConnection(s);
long total = System.currentTimeMillis() - before;
_context.statManager().addRateData("i2ptunnel.client.manageTime", total, total);
} }
} catch (IOException ex) { } catch (IOException ex) {
if (open) { if (open) {
@@ -592,9 +549,6 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
notifyAll(); notifyAll();
} }
} }
synchronized (_waitingSockets) {
_waitingSockets.notifyAll();
}
} }
/** /**
@@ -604,24 +558,38 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
*/ */
protected void manageConnection(Socket s) { protected void manageConnection(Socket s) {
if (s == null) return; if (s == null) return;
if (_numConnectionBuilders <= 0) { try {
new I2PAppThread(new BlockingRunner(s), "Clinet run").start(); _executor.execute(new BlockingRunner(s));
return; } catch (RejectedExecutionException ree) {
// should never happen, we have an unbounded pool and never stop the executor
try {
s.close();
} catch (IOException ioe) {}
} }
}
if (_maxWaitTime > 0)
SimpleScheduler.getInstance().addEvent(new CloseEvent(s), _maxWaitTime);
synchronized (_waitingSockets) { /**
_waitingSockets.add(s); * Not really needed for now but in case we want to add some hooks like afterExecute().
_waitingSockets.notifyAll(); */
private static class CustomThreadPoolExecutor extends ThreadPoolExecutor {
public CustomThreadPoolExecutor() {
super(0, Integer.MAX_VALUE, HANDLER_KEEPALIVE_MS, TimeUnit.MILLISECONDS,
new SynchronousQueue(), new CustomThreadFactory());
}
}
/** just to set the name and set Daemon */
private static class CustomThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
Thread rv = Executors.defaultThreadFactory().newThread(r);
rv.setName("I2PTunnel Client Runner " + (++_executorThreadCount));
rv.setDaemon(true);
return rv;
} }
} }
/** /**
* Blocking runner, used during the connection establishment whenever we * Blocking runner, used during the connection establishment
* are not using the queued builders.
*
*/ */
private class BlockingRunner implements Runnable { private class BlockingRunner implements Runnable {
private Socket _s; private Socket _s;
@@ -631,32 +599,6 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
} }
} }
/**
* Remove and close the socket from the waiting list, if it is still there.
*
*/
private class CloseEvent implements SimpleTimer.TimedEvent {
private Socket _s;
public CloseEvent(Socket s) { _s = s; }
public void timeReached() {
int remaining = 0;
boolean stillWaiting = false;
synchronized (_waitingSockets) {
stillWaiting = _waitingSockets.remove(_s);
remaining = _waitingSockets.size();
}
if (stillWaiting) {
try { _s.close(); } catch (IOException ioe) {}
if (_log.shouldLog(Log.INFO)) {
_context.statManager().addRateData("i2ptunnel.client.closeBacklog", remaining, 0);
_log.info("Closed a waiting socket because of backlog");
}
} else {
_context.statManager().addRateData("i2ptunnel.client.closeNoBacklog", remaining, 0);
}
}
}
public boolean close(boolean forced) { public boolean close(boolean forced) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("close() called: forced = " + forced + " open = " + open + " sockMgr = " + sockMgr); _log.info("close() called: forced = " + forced + " open = " + open + " sockMgr = " + sockMgr);
@@ -694,7 +636,6 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
//l.log("Client closed."); //l.log("Client closed.");
} }
synchronized (_waitingSockets) { _waitingSockets.notifyAll(); }
return true; return true;
} }
@@ -706,36 +647,6 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
} }
} }
/**
* Pool runner pulling sockets off the waiting list and pushing them
* through clientConnectionRun. This dies when the I2PTunnel instance
* is closed.
*
*/
private class TunnelConnectionBuilder implements Runnable {
public void run() {
Socket s = null;
while (open) {
try {
synchronized (_waitingSockets) {
if (_waitingSockets.isEmpty())
_waitingSockets.wait();
else
s = (Socket)_waitingSockets.remove(0);
}
} catch (InterruptedException ie) {}
if (s != null) {
long before = System.currentTimeMillis();
clientConnectionRun(s);
long total = System.currentTimeMillis() - before;
_context.statManager().addRateData("i2ptunnel.client.buildRunTime", total, 0);
}
s = null;
}
}
}
/** /**
* Manage a connection in a separate thread. This only works if * Manage a connection in a separate thread. This only works if
* you do not override manageConnection() * you do not override manageConnection()