diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java index e2ef2938b..a65f34530 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java @@ -12,8 +12,10 @@ import java.net.ServerSocket; import java.net.Socket; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Properties; import net.i2p.I2PException; @@ -26,6 +28,7 @@ import net.i2p.client.streaming.I2PSocketOptions; import net.i2p.data.Destination; import net.i2p.util.EventDispatcher; import net.i2p.util.I2PThread; +import net.i2p.util.SimpleTimer; import net.i2p.util.Log; public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runnable { @@ -58,7 +61,30 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna private String handlerName; private Object conLock = new Object(); - private int pendingConnections = 0; + + /** List of Socket for those accept()ed but not yet started up */ + private List _waitingSockets; + /** How many connections will we allow to be in the process of being built at once? */ + private int _numConnectionBuilders; + /** How long will we allow sockets to sit in the _waitingSockets map before killing them? */ + private int _maxWaitTime; + + /** + * How many concurrent connections this I2PTunnel instance will allow to be + * in the process of connecting (or if less than 1, there is no limit)? + */ + public static final String PROP_NUM_CONNECTION_BUILDERS = "i2ptunnel.numConnectionBuilders"; + /** + * How long will we let a socket wait after being accept()ed without getting + * pumped through a connection builder (in milliseconds). If this time is + * reached, the socket is unceremoniously closed and discarded. If the max + * wait time is less than 1, there is no limit. + * + */ + public static final String PROP_MAX_WAIT_TIME = "i2ptunnel.maxWaitTime"; + + private static final int DEFAULT_NUM_CONNECTION_BUILDERS = 5; + private static final int DEFAULT_MAX_WAIT_TIME = 30*1000; //public I2PTunnelClientBase(int localPort, boolean ownDest, // Logging l) { @@ -108,6 +134,8 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna } } + configurePool(tunnel); + if (open && listenerReady) { l.log("Ready! Port " + getLocalPort()); notifyEvent("openBaseClientResult", "ok"); @@ -116,6 +144,37 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna notifyEvent("openBaseClientResult", "error"); } } + + /** + * build and configure the pool handling accept()ed but not yet + * established connections + * + */ + private void configurePool(I2PTunnel tunnel) { + _waitingSockets = new ArrayList(4); + + Properties opts = tunnel.getClientOptions(); + String maxWait = opts.getProperty(PROP_MAX_WAIT_TIME, DEFAULT_MAX_WAIT_TIME+""); + try { + _maxWaitTime = Integer.parseInt(maxWait); + } catch (NumberFormatException nfe) { + _maxWaitTime = DEFAULT_MAX_WAIT_TIME; + } + + String numBuild = opts.getProperty(PROP_NUM_CONNECTION_BUILDERS, DEFAULT_NUM_CONNECTION_BUILDERS+""); + try { + _numConnectionBuilders = Integer.parseInt(numBuild); + } catch (NumberFormatException nfe) { + _numConnectionBuilders = DEFAULT_NUM_CONNECTION_BUILDERS; + } + + for (int i = 0; i < _numConnectionBuilders; i++) { + String name = "ClientBuilder" + _clientId + '.' + i; + I2PThread b = new I2PThread(new TunnelConnectionBuilder(), name); + b.setDaemon(true); + b.start(); + } + } private static I2PSocketManager socketManager; @@ -250,6 +309,7 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna synchronized (this) { notifyAll(); } + synchronized (_waitingSockets) { _waitingSockets.notifyAll(); } return; } ss = new ServerSocket(localPort, 0, addr); @@ -292,6 +352,9 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna notifyAll(); } } + synchronized (_waitingSockets) { + _waitingSockets.notifyAll(); + } } /** @@ -300,16 +363,52 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna * @param s Socket to take care of */ protected void manageConnection(Socket s) { - boolean useBlocking = false; - synchronized (conLock) { - pendingConnections++; - if (pendingConnections > 5) - useBlocking = true; + if (s == null) return; + if (_numConnectionBuilders <= 0) { + new I2PThread(new BlockingRunner(s), "Clinet run").start(); + return; + } + + if (_maxWaitTime > 0) + SimpleTimer.getInstance().addEvent(new CloseEvent(s), _maxWaitTime); + + synchronized (_waitingSockets) { + _waitingSockets.add(s); + _waitingSockets.notifyAll(); + } + } + + /** + * Blocking runner, used during the connection establishment whenever we + * are not using the queued builders. + * + */ + private class BlockingRunner implements Runnable { + private Socket _s; + public BlockingRunner(Socket s) { _s = s; } + public void run() { + clientConnectionRun(_s); + } + } + + /** + * Remove and close the socket from the waiting list, if it is still there. + * + */ + private class CloseEvent implements SimpleTimer.TimedEvent { + private Socket _s; + public CloseEvent(Socket s) { _s = s; } + public void timeReached() { + boolean stillWaiting = false; + synchronized (_waitingSockets) { + stillWaiting = _waitingSockets.remove(_s); + } + if (stillWaiting) { + try { _s.close(); } catch (IOException ioe) {} + if (_log.shouldLog(Log.INFO)) + _log.info("Closed a waiting socket because of backlog"); + } } - if (useBlocking) - clientConnectionRun(s); - else - new ClientConnectionRunner(s, handlerName); } public boolean close(boolean forced) { @@ -341,8 +440,10 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna } l.log("Client closed."); open = false; - return true; } + + synchronized (_waitingSockets) { _waitingSockets.notifyAll(); } + return true; } public static void closeSocket(Socket s) { @@ -352,22 +453,29 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna _log.error("Could not close socket", ex); } } - - private static volatile long __runnerId = 0; - public class ClientConnectionRunner extends I2PThread { - private Socket s; - - public ClientConnectionRunner(Socket s, String name) { - this.s = s; - setName(name + '.' + (++__runnerId)); - start(); - } - - public void run() { - clientConnectionRun(s); - synchronized (conLock) { - pendingConnections--; + /** + * Pool runner pulling sockets off the waiting list and pushing them + * through clientConnectionRun. This dies when the I2PTunnel instance + * is closed. + * + */ + private class TunnelConnectionBuilder implements Runnable { + public void run() { + Socket s = null; + while (open) { + try { + synchronized (_waitingSockets) { + if (_waitingSockets.size() <= 0) + _waitingSockets.wait(); + else + s = (Socket)_waitingSockets.remove(0); + } + } catch (InterruptedException ie) {} + + if (s != null) + clientConnectionRun(s); + s = null; } } } diff --git a/history.txt b/history.txt index 2ff4b0ce9..157ca6e8a 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,15 @@ -$Id: history.txt,v 1.120 2004/12/29 15:06:43 jrandom Exp $ +$Id: history.txt,v 1.121 2004/12/29 17:16:42 jrandom Exp $ + +2004-12-30 jrandom + * Revised the I2PTunnel client and httpclient connection establishment + throttles. There is now a pool of threads that build the I2PSocket + connections with a default size of 5, configurable via the I2PTunnel + client option 'i2ptunnel.numConnectionBuilders' (if set to 0, it will + not throttle the number of concurrent builders, but will launch a thread + per socket during establishment). In addition, sockets accepted but + not yet allocated to one of the connection builders will be destroyed + after 30 seconds, configurable via 'i2ptunnel.maxWaitTime' (if set to + 0, it will wait indefinitely). 2004-12-29 jrandom * Imported Ragnarok's addressbook source (2.0.2) which is built but not diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 337727400..17a3bb461 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.125 $ $Date: 2004/12/29 15:06:44 $"; + public final static String ID = "$Revision: 1.126 $ $Date: 2004/12/29 17:16:42 $"; public final static String VERSION = "0.4.2.5"; - public final static long BUILD = 3; + public final static long BUILD = 4; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/transport/tcp/ConnectionRunner.java b/router/java/src/net/i2p/router/transport/tcp/ConnectionRunner.java index cd1c57262..f4809fb7b 100644 --- a/router/java/src/net/i2p/router/transport/tcp/ConnectionRunner.java +++ b/router/java/src/net/i2p/router/transport/tcp/ConnectionRunner.java @@ -163,11 +163,14 @@ class ConnectionRunner implements Runnable { long timeSinceWrite = _context.clock().now() - _lastWrite; if (timeSinceWrite > 5*TIME_SEND_FREQUENCY) { TCPTransport t = _con.getTransport(); - t.addConnectionErrorMessage("Connection closed with " - + _con.getRemoteRouterIdentity().getHash().toBase64().substring(0,6) - + " due to " + DataHelper.formatDuration(timeSinceWrite) - + " of inactivity after " - + DataHelper.formatDuration(_con.getLifetime())); + String msg = "Connection closed with " + + _con.getRemoteRouterIdentity().getHash().toBase64().substring(0,6) + + " due to " + DataHelper.formatDuration(timeSinceWrite) + + " of inactivity after " + + DataHelper.formatDuration(_con.getLifetime()); + t.addConnectionErrorMessage(msg); + if (_log.shouldLog(Log.INFO)) + _log.info(msg); _con.closeConnection(false); return; } @@ -187,5 +190,7 @@ class ConnectionRunner implements Runnable { msg.setMessage(buildTimeMessage()); msg.setPriority(100); _con.addMessage(msg); + if (_log.shouldLog(Log.INFO)) + _log.info("Enqueueing time message to " + _con.getRemoteRouterIdentity().getHash().toBase64().substring(0,6)); } }