diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java
index 40566fe41..014d91e9b 100644
--- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java
@@ -12,7 +12,6 @@ import java.net.ConnectException;
import java.net.InetAddress;
import java.net.Socket;
import java.net.SocketException;
-import java.net.SocketTimeoutException;
import java.util.Iterator;
import java.util.Properties;
@@ -220,9 +219,7 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable {
if (_log.shouldLog(Log.ERROR))
_log.error("Error accepting", ce);
// not killing the server..
- } catch(SocketTimeoutException ste) {
- // ignored, we never set the timeout
- }
+ }
}
}
}
diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java
index 7c9927395..726d462ce 100644
--- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java
+++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java
@@ -2,7 +2,6 @@ package net.i2p.client.streaming;
import java.net.ConnectException;
-import java.net.SocketTimeoutException;
import net.i2p.I2PException;
/**
@@ -10,40 +9,26 @@ import net.i2p.I2PException;
*
*/
public interface I2PServerSocket {
+ /**
+ * Closes the socket.
+ */
+ public void close() throws I2PException;
- /**
- * Closes the socket.
- */
- public void close() throws I2PException;
+ /**
+ * Waits for the next socket connecting. If a remote user tried to make a
+ * connection and the local application wasn't .accept()ing new connections,
+ * they should get refused (if .accept() doesnt occur in some small period)
+ *
+ * @return a connected I2PSocket
+ *
+ * @throws I2PException if there is a problem with reading a new socket
+ * from the data available (aka the I2PSession closed, etc)
+ * @throws ConnectException if the I2PServerSocket is closed
+ */
+ public I2PSocket accept() throws I2PException, ConnectException;
- /**
- * Waits for the next socket connecting. If a remote user tried to make a
- * connection and the local application wasn't .accept()ing new connections,
- * they should get refused (if .accept() doesnt occur in some small period)
- *
- * @return a connected I2PSocket
- *
- * @throws I2PException if there is a problem with reading a new socket
- * from the data available (aka the I2PSession closed, etc)
- * @throws ConnectException if the I2PServerSocket is closed
- * @throws SocketTimeoutException
- */
- public I2PSocket accept() throws I2PException, ConnectException, SocketTimeoutException;
-
- /**
- * Set Sock Option accept timeout
- * @param x
- */
- public void setSoTimeout(long x);
-
- /**
- * Get Sock Option accept timeout
- * @return timeout
- */
- public long getSoTimeout();
-
- /**
- * Access the manager which is coordinating the server socket
- */
- public I2PSocketManager getManager();
+ /**
+ * Access the manager which is coordinating the server socket
+ */
+ public I2PSocketManager getManager();
}
diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java
index 2e3dfdb6b..965ba31bf 100644
--- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java
+++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java
@@ -17,159 +17,134 @@ import net.i2p.util.Log;
*
*/
class I2PServerSocketImpl implements I2PServerSocket {
-
- private final static Log _log = new Log(I2PServerSocketImpl.class);
- private I2PSocketManager mgr;
- /** list of sockets waiting for the client to accept them */
- private List pendingSockets = Collections.synchronizedList(new ArrayList(4));
- /** have we been closed */
- private volatile boolean closing = false;
- /** lock on this when accepting a pending socket, and wait on it for notification of acceptance */
- private Object socketAcceptedLock = new Object();
- /** lock on this when adding a new socket to the pending list, and wait on it accordingly */
- private Object socketAddedLock = new Object();
-
- /**
- * Set Sock Option accept timeout stub, does nothing
- * @param x
- */
- public void setSoTimeout(long x) {
- }
-
- /**
- * Get Sock Option accept timeout stub, does nothing
- * @return timeout
- */
- public long getSoTimeout() {
- return -1;
- }
-
- public I2PServerSocketImpl(I2PSocketManager mgr) {
- this.mgr = mgr;
- }
-
- /**
- * Waits for the next socket connecting. If a remote user tried to make a
- * connection and the local application wasn't .accept()ing new connections,
- * they should get refused (if .accept() doesnt occur in some small period -
- * currently 5 seconds)
- *
- * @return a connected I2PSocket
- *
- * @throws I2PException if there is a problem with reading a new socket
- * from the data available (aka the I2PSession closed, etc)
- * @throws ConnectException if the I2PServerSocket is closed
- */
- public I2PSocket accept() throws I2PException, ConnectException {
- if(_log.shouldLog(Log.DEBUG)) {
- _log.debug("accept() called, pending: " + pendingSockets.size());
- }
- I2PSocket ret = null;
-
- while((ret == null) && (!closing)) {
- while(pendingSockets.size() <= 0) {
- if(closing) {
- throw new ConnectException("I2PServerSocket closed");
- }
- try {
- synchronized(socketAddedLock) {
- socketAddedLock.wait();
- }
- } catch(InterruptedException ie) {
- }
- }
- synchronized(pendingSockets) {
- if(pendingSockets.size() > 0) {
- ret = (I2PSocket)pendingSockets.remove(0);
- }
- }
- if(ret != null) {
- synchronized(socketAcceptedLock) {
- socketAcceptedLock.notifyAll();
- }
- }
- }
-
- if(_log.shouldLog(Log.DEBUG)) {
- _log.debug("TIMING: handed out accept result " + ret.hashCode());
- }
- return ret;
- }
-
- /**
- * Make the socket available and wait until the client app accepts it, or until
- * the given timeout elapses. This doesn't have any limits on the queue size -
- * perhaps it should add some choking (e.g. after 5 waiting for accept, refuse)
- *
- * @param timeoutMs how long to wait until accept
- * @return true if the socket was accepted, false if the timeout expired
- * or the socket was closed
- */
- public boolean addWaitForAccept(I2PSocket s, long timeoutMs) {
- if(_log.shouldLog(Log.DEBUG)) {
- _log.debug("addWaitForAccept [new socket arrived [" + s.toString() + "], pending: " + pendingSockets.size());
- }
- if(closing) {
- if(_log.shouldLog(Log.WARN)) {
- _log.warn("Already closing the socket");
- }
- return false;
- }
-
- Clock clock = I2PAppContext.getGlobalContext().clock();
- long start = clock.now();
- long end = start + timeoutMs;
- pendingSockets.add(s);
- synchronized(socketAddedLock) {
- socketAddedLock.notifyAll();
- }
-
- // keep looping until the socket has been grabbed by the accept()
- // (or the expiration passes, or the socket is closed)
- while(pendingSockets.contains(s)) {
- long now = clock.now();
- if(now >= end) {
- if(_log.shouldLog(Log.INFO)) {
- _log.info("Expired while waiting for accept (time elapsed =" + (now - start) + "ms) for socket " + s.toString());
- }
- pendingSockets.remove(s);
- return false;
- }
- if(closing) {
- if(_log.shouldLog(Log.WARN)) {
- _log.warn("Server socket closed while waiting for accept");
- }
- pendingSockets.remove(s);
- return false;
- }
- long remaining = end - now;
- try {
- synchronized(socketAcceptedLock) {
- socketAcceptedLock.wait(remaining);
- }
- } catch(InterruptedException ie) {
- }
- }
- long now = clock.now();
- if(_log.shouldLog(Log.DEBUG)) {
- _log.info("Socket accepted after " + (now - start) + "ms for socket " + s.toString());
- }
- return true;
- }
-
- public void close() {
- closing = true;
- // let anyone .accept()ing know to fsck off
- synchronized(socketAddedLock) {
- socketAddedLock.notifyAll();
- }
- // let anyone addWaitForAccept()ing know to fsck off
- synchronized(socketAcceptedLock) {
- socketAcceptedLock.notifyAll();
- }
- }
-
- public I2PSocketManager getManager() {
- return mgr;
- }
+ private final static Log _log = new Log(I2PServerSocketImpl.class);
+ private I2PSocketManager mgr;
+ /** list of sockets waiting for the client to accept them */
+ private List pendingSockets = Collections.synchronizedList(new ArrayList(4));
+
+ /** have we been closed */
+ private volatile boolean closing = false;
+
+ /** lock on this when accepting a pending socket, and wait on it for notification of acceptance */
+ private Object socketAcceptedLock = new Object();
+ /** lock on this when adding a new socket to the pending list, and wait on it accordingly */
+ private Object socketAddedLock = new Object();
+
+ public I2PServerSocketImpl(I2PSocketManager mgr) {
+ this.mgr = mgr;
+ }
+
+ /**
+ * Waits for the next socket connecting. If a remote user tried to make a
+ * connection and the local application wasn't .accept()ing new connections,
+ * they should get refused (if .accept() doesnt occur in some small period -
+ * currently 5 seconds)
+ *
+ * @return a connected I2PSocket
+ *
+ * @throws I2PException if there is a problem with reading a new socket
+ * from the data available (aka the I2PSession closed, etc)
+ * @throws ConnectException if the I2PServerSocket is closed
+ */
+ public I2PSocket accept() throws I2PException, ConnectException {
+ if (_log.shouldLog(Log.DEBUG))
+ _log.debug("accept() called, pending: " + pendingSockets.size());
+
+ I2PSocket ret = null;
+
+ while ( (ret == null) && (!closing) ){
+ while (pendingSockets.size() <= 0) {
+ if (closing) throw new ConnectException("I2PServerSocket closed");
+ try {
+ synchronized(socketAddedLock) {
+ socketAddedLock.wait();
+ }
+ } catch (InterruptedException ie) {}
+ }
+ synchronized (pendingSockets) {
+ if (pendingSockets.size() > 0) {
+ ret = (I2PSocket)pendingSockets.remove(0);
+ }
+ }
+ if (ret != null) {
+ synchronized (socketAcceptedLock) {
+ socketAcceptedLock.notifyAll();
+ }
+ }
+ }
+
+ if (_log.shouldLog(Log.DEBUG))
+ _log.debug("TIMING: handed out accept result " + ret.hashCode());
+ return ret;
+ }
+
+ /**
+ * Make the socket available and wait until the client app accepts it, or until
+ * the given timeout elapses. This doesn't have any limits on the queue size -
+ * perhaps it should add some choking (e.g. after 5 waiting for accept, refuse)
+ *
+ * @param timeoutMs how long to wait until accept
+ * @return true if the socket was accepted, false if the timeout expired
+ * or the socket was closed
+ */
+ public boolean addWaitForAccept(I2PSocket s, long timeoutMs) {
+ if (_log.shouldLog(Log.DEBUG))
+ _log.debug("addWaitForAccept [new socket arrived [" + s.toString() + "], pending: " + pendingSockets.size());
+
+ if (closing) {
+ if (_log.shouldLog(Log.WARN))
+ _log.warn("Already closing the socket");
+ return false;
+ }
+
+ Clock clock = I2PAppContext.getGlobalContext().clock();
+ long start = clock.now();
+ long end = start + timeoutMs;
+ pendingSockets.add(s);
+ synchronized (socketAddedLock) {
+ socketAddedLock.notifyAll();
+ }
+
+ // keep looping until the socket has been grabbed by the accept()
+ // (or the expiration passes, or the socket is closed)
+ while (pendingSockets.contains(s)) {
+ long now = clock.now();
+ if (now >= end) {
+ if (_log.shouldLog(Log.INFO))
+ _log.info("Expired while waiting for accept (time elapsed =" + (now - start) + "ms) for socket " + s.toString());
+ pendingSockets.remove(s);
+ return false;
+ }
+ if (closing) {
+ if (_log.shouldLog(Log.WARN))
+ _log.warn("Server socket closed while waiting for accept");
+ pendingSockets.remove(s);
+ return false;
+ }
+ long remaining = end - now;
+ try {
+ synchronized (socketAcceptedLock) {
+ socketAcceptedLock.wait(remaining);
+ }
+ } catch (InterruptedException ie) {}
+ }
+ long now = clock.now();
+ if (_log.shouldLog(Log.DEBUG))
+ _log.info("Socket accepted after " + (now-start) + "ms for socket " + s.toString());
+ return true;
+ }
+
+ public void close() {
+ closing = true;
+ // let anyone .accept()ing know to fsck off
+ synchronized (socketAddedLock) {
+ socketAddedLock.notifyAll();
+ }
+ // let anyone addWaitForAccept()ing know to fsck off
+ synchronized (socketAcceptedLock) {
+ socketAcceptedLock.notifyAll();
+ }
+ }
+
+ public I2PSocketManager getManager() { return mgr; }
}
diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java b/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java
index 9f12be6c4..c8b566190 100644
--- a/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java
+++ b/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java
@@ -5,7 +5,6 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.ConnectException;
-import java.net.SocketTimeoutException;
import java.util.Properties;
import net.i2p.I2PAppContext;
@@ -21,203 +20,173 @@ import net.i2p.util.Log;
*
*/
public class StreamSinkServer {
-
- private Log _log;
- private String _sinkDir;
- private String _destFile;
- private String _i2cpHost;
- private int _i2cpPort;
- private int _handlers;
-
- /**
- * Create but do not start the streaming server.
- *
- * @param sinkDir Directory to store received files in
- * @param ourDestFile filename to write our binary destination to
- */
- public StreamSinkServer(String sinkDir, String ourDestFile) {
- this(sinkDir, ourDestFile, null, -1, 3);
- }
-
- public StreamSinkServer(String sinkDir, String ourDestFile, String i2cpHost, int i2cpPort, int handlers) {
- _sinkDir = sinkDir;
- _destFile = ourDestFile;
- _i2cpHost = i2cpHost;
- _i2cpPort = i2cpPort;
- _handlers = handlers;
- _log = I2PAppContext.getGlobalContext().logManager().getLog(StreamSinkServer.class);
- }
-
- /**
- * Actually fire up the server - this call blocks forever (or until the server
- * socket closes)
- *
- */
- public void runServer() {
- I2PSocketManager mgr = null;
- if(_i2cpHost != null) {
- mgr = I2PSocketManagerFactory.createManager(_i2cpHost, _i2cpPort, new Properties());
- } else {
- mgr = I2PSocketManagerFactory.createManager();
- }
- Destination dest = mgr.getSession().getMyDestination();
- if(_log.shouldLog(Log.INFO)) {
- _log.info("Listening for connections on: " + dest.calculateHash().toBase64());
- }
- FileOutputStream fos = null;
- try {
- fos = new FileOutputStream(_destFile);
- dest.writeBytes(fos);
- } catch(IOException ioe) {
- _log.error("Error writing out our destination to " + _destFile, ioe);
- return;
- } catch(DataFormatException dfe) {
- _log.error("Error formatting the destination", dfe);
- return;
- } finally {
- if(fos != null) {
- try {
- fos.close();
- } catch(IOException ioe) {
- }
- }
- }
-
- I2PServerSocket sock = mgr.getServerSocket();
- startup(sock);
- }
-
- public void startup(I2PServerSocket sock) {
- for(int i = 0; i < _handlers; i++) {
- I2PThread t = new I2PThread(new ClientRunner(sock));
- t.setName("Handler " + i);
- t.setDaemon(false);
- t.start();
- }
- }
-
- /**
- * Actually deal with a client - pull anything they send us and write it to a file.
- *
- */
- private class ClientRunner implements Runnable {
-
- private I2PServerSocket _socket;
-
- public ClientRunner(I2PServerSocket socket) {
- _socket = socket;
- }
-
- public void run() {
- while(true) {
- try {
- I2PSocket socket = _socket.accept();
- if(socket != null) {
- handle(socket);
- }
- } catch(I2PException ie) {
- _log.error("Error accepting connection", ie);
- return;
- } catch(ConnectException ce) {
- _log.error("Connection already dropped", ce);
- return;
- } catch(SocketTimeoutException ste) {
- // ignored
- }
- }
- }
-
- private void handle(I2PSocket sock) {
- FileOutputStream fos = null;
- try {
- File sink = new File(_sinkDir);
- if(!sink.exists()) {
- sink.mkdirs();
- }
- File cur = File.createTempFile("clientSink", ".dat", sink);
- fos = new FileOutputStream(cur);
- if(_log.shouldLog(Log.DEBUG)) {
- _log.debug("Writing to " + cur.getAbsolutePath());
- }
- } catch(IOException ioe) {
- _log.error("Error creating sink", ioe);
- return;
- }
-
- long start = System.currentTimeMillis();
- try {
- InputStream in = sock.getInputStream();
- byte buf[] = new byte[4096];
- long written = 0;
- int read = 0;
- while((read = in.read(buf)) != -1) {
- //_fos.write(buf, 0, read);
- written += read;
- if(_log.shouldLog(Log.DEBUG)) {
- _log.debug("read and wrote " + read + " (" + written + ")");
- }
- }
- fos.write(("written: [" + written + "]\n").getBytes());
- long lifetime = System.currentTimeMillis() - start;
- _log.info("Got EOF from client socket [written=" + written + " lifetime=" + lifetime + "]");
- } catch(IOException ioe) {
- _log.error("Error writing the sink", ioe);
- } finally {
- if(fos != null) {
- try {
- fos.close();
- } catch(IOException ioe) {
- }
- }
- if(sock != null) {
- try {
- sock.close();
- } catch(IOException ioe) {
- }
- }
- _log.debug("Client socket closed");
- }
- }
- }
-
- /**
- * Fire up the streaming server. Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [numHandlers]
- *
- * - sinkDir: Directory to store received files in
- * - ourDestFile: filename to write our binary destination to
- * - numHandlers: how many concurrent connections to handle
- *
- */
- public static void main(String args[]) {
- StreamSinkServer server = null;
- switch(args.length) {
- case 0:
- server = new StreamSinkServer("dataDir", "server.key", "localhost", 7654, 3);
- break;
- case 2:
- server = new StreamSinkServer(args[0], args[1]);
- break;
- case 4:
- case 5:
- int handlers = 3;
- if(args.length == 5) {
- try {
- handlers = Integer.parseInt(args[4]);
- } catch(NumberFormatException nfe) {
- }
- }
- try {
- int port = Integer.parseInt(args[1]);
- server = new StreamSinkServer(args[2], args[3], args[0], port, handlers);
- } catch(NumberFormatException nfe) {
- System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [handlers]");
- }
- break;
- default:
- System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [handlers]");
- }
- if(server != null) {
- server.runServer();
- }
- }
+ private Log _log;
+ private String _sinkDir;
+ private String _destFile;
+ private String _i2cpHost;
+ private int _i2cpPort;
+ private int _handlers;
+
+ /**
+ * Create but do not start the streaming server.
+ *
+ * @param sinkDir Directory to store received files in
+ * @param ourDestFile filename to write our binary destination to
+ */
+ public StreamSinkServer(String sinkDir, String ourDestFile) {
+ this(sinkDir, ourDestFile, null, -1, 3);
+ }
+ public StreamSinkServer(String sinkDir, String ourDestFile, String i2cpHost, int i2cpPort, int handlers) {
+ _sinkDir = sinkDir;
+ _destFile = ourDestFile;
+ _i2cpHost = i2cpHost;
+ _i2cpPort = i2cpPort;
+ _handlers = handlers;
+ _log = I2PAppContext.getGlobalContext().logManager().getLog(StreamSinkServer.class);
+ }
+
+ /**
+ * Actually fire up the server - this call blocks forever (or until the server
+ * socket closes)
+ *
+ */
+ public void runServer() {
+ I2PSocketManager mgr = null;
+ if (_i2cpHost != null)
+ mgr = I2PSocketManagerFactory.createManager(_i2cpHost, _i2cpPort, new Properties());
+ else
+ mgr = I2PSocketManagerFactory.createManager();
+ Destination dest = mgr.getSession().getMyDestination();
+ if (_log.shouldLog(Log.INFO))
+ _log.info("Listening for connections on: " + dest.calculateHash().toBase64());
+ FileOutputStream fos = null;
+ try {
+ fos = new FileOutputStream(_destFile);
+ dest.writeBytes(fos);
+ } catch (IOException ioe) {
+ _log.error("Error writing out our destination to " + _destFile, ioe);
+ return;
+ } catch (DataFormatException dfe) {
+ _log.error("Error formatting the destination", dfe);
+ return;
+ } finally {
+ if (fos != null) try { fos.close(); } catch (IOException ioe) {}
+ }
+
+ I2PServerSocket sock = mgr.getServerSocket();
+ startup(sock);
+ }
+
+ public void startup(I2PServerSocket sock) {
+ for (int i = 0; i < _handlers; i++) {
+ I2PThread t = new I2PThread(new ClientRunner(sock));
+ t.setName("Handler " + i);
+ t.setDaemon(false);
+ t.start();
+ }
+ }
+
+ /**
+ * Actually deal with a client - pull anything they send us and write it to a file.
+ *
+ */
+ private class ClientRunner implements Runnable {
+ private I2PServerSocket _socket;
+ public ClientRunner(I2PServerSocket socket) {
+ _socket = socket;
+ }
+ public void run() {
+ while (true) {
+ try {
+ I2PSocket socket = _socket.accept();
+ if (socket != null)
+ handle(socket);
+ } catch (I2PException ie) {
+ _log.error("Error accepting connection", ie);
+ return;
+ } catch (ConnectException ce) {
+ _log.error("Connection already dropped", ce);
+ return;
+ }
+ }
+ }
+
+ private void handle(I2PSocket sock) {
+ FileOutputStream fos = null;
+ try {
+ File sink = new File(_sinkDir);
+ if (!sink.exists())
+ sink.mkdirs();
+ File cur = File.createTempFile("clientSink", ".dat", sink);
+ fos = new FileOutputStream(cur);
+ if (_log.shouldLog(Log.DEBUG))
+ _log.debug("Writing to " + cur.getAbsolutePath());
+ } catch (IOException ioe) {
+ _log.error("Error creating sink", ioe);
+ return;
+ }
+
+ long start = System.currentTimeMillis();
+ try {
+ InputStream in = sock.getInputStream();
+ byte buf[] = new byte[4096];
+ long written = 0;
+ int read = 0;
+ while ( (read = in.read(buf)) != -1) {
+ //_fos.write(buf, 0, read);
+ written += read;
+ if (_log.shouldLog(Log.DEBUG))
+ _log.debug("read and wrote " + read + " (" + written + ")");
+ }
+ fos.write(("written: [" + written + "]\n").getBytes());
+ long lifetime = System.currentTimeMillis() - start;
+ _log.info("Got EOF from client socket [written=" + written + " lifetime=" + lifetime + "]");
+ } catch (IOException ioe) {
+ _log.error("Error writing the sink", ioe);
+ } finally {
+ if (fos != null) try { fos.close(); } catch (IOException ioe) {}
+ if (sock != null) try { sock.close(); } catch (IOException ioe) {}
+ _log.debug("Client socket closed");
+ }
+ }
+ }
+
+ /**
+ * Fire up the streaming server. Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [numHandlers]
+ *
+ * - sinkDir: Directory to store received files in
+ * - ourDestFile: filename to write our binary destination to
+ * - numHandlers: how many concurrent connections to handle
+ *
+ */
+ public static void main(String args[]) {
+ StreamSinkServer server = null;
+ switch (args.length) {
+ case 0:
+ server = new StreamSinkServer("dataDir", "server.key", "localhost", 7654, 3);
+ break;
+ case 2:
+ server = new StreamSinkServer(args[0], args[1]);
+ break;
+ case 4:
+ case 5:
+ int handlers = 3;
+ if (args.length == 5) {
+ try {
+ handlers = Integer.parseInt(args[4]);
+ } catch (NumberFormatException nfe) {}
+ }
+ try {
+ int port = Integer.parseInt(args[1]);
+ server = new StreamSinkServer(args[2], args[3], args[0], port, handlers);
+ } catch (NumberFormatException nfe) {
+ System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [handlers]");
+ }
+ break;
+ default:
+ System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [handlers]");
+ }
+ if (server != null)
+ server.runServer();
+ }
}
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java
index f05ae1c8c..4960f1a22 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java
@@ -1,6 +1,5 @@
package net.i2p.client.streaming;
-import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java
index 08d794877..dcc93c5ec 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java
@@ -21,459 +21,393 @@ import net.i2p.util.SimpleTimer;
*
*/
public class ConnectionManager {
+ private I2PAppContext _context;
+ private Log _log;
+ private I2PSession _session;
+ private MessageHandler _messageHandler;
+ private PacketHandler _packetHandler;
+ private ConnectionHandler _connectionHandler;
+ private PacketQueue _outboundQueue;
+ private SchedulerChooser _schedulerChooser;
+ private ConnectionPacketHandler _conPacketHandler;
+ /** Inbound stream ID (Long) to Connection map */
+ private Map _connectionByInboundId;
+ /** Ping ID (Long) to PingRequest */
+ private Map _pendingPings;
+ private boolean _allowIncoming;
+ private int _maxConcurrentStreams;
+ private ConnectionOptions _defaultOptions;
+ private volatile int _numWaiting;
+ private Object _connectionLock;
+
+ public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent, ConnectionOptions defaultOptions) {
+ _context = context;
+ _log = context.logManager().getLog(ConnectionManager.class);
+ _connectionByInboundId = new HashMap(32);
+ _pendingPings = new HashMap(4);
+ _connectionLock = new Object();
+ _messageHandler = new MessageHandler(context, this);
+ _packetHandler = new PacketHandler(context, this);
+ _connectionHandler = new ConnectionHandler(context, this);
+ _schedulerChooser = new SchedulerChooser(context);
+ _conPacketHandler = new ConnectionPacketHandler(context);
+ _session = session;
+ session.setSessionListener(_messageHandler);
+ _outboundQueue = new PacketQueue(context, session, this);
+ _allowIncoming = false;
+ _maxConcurrentStreams = maxConcurrent;
+ _defaultOptions = defaultOptions;
+ _numWaiting = 0;
+ _context.statManager().createRateStat("stream.con.lifetimeMessagesSent", "How many messages do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
+ _context.statManager().createRateStat("stream.con.lifetimeMessagesReceived", "How many messages do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
+ _context.statManager().createRateStat("stream.con.lifetimeBytesSent", "How many bytes do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
+ _context.statManager().createRateStat("stream.con.lifetimeBytesReceived", "How many bytes do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
+ _context.statManager().createRateStat("stream.con.lifetimeDupMessagesSent", "How many duplicate messages do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
+ _context.statManager().createRateStat("stream.con.lifetimeDupMessagesReceived", "How many duplicate messages do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
+ _context.statManager().createRateStat("stream.con.lifetimeRTT", "What is the final RTT when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
+ _context.statManager().createRateStat("stream.con.lifetimeCongestionSeenAt", "When was the last congestion seen at when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
+ _context.statManager().createRateStat("stream.con.lifetimeSendWindowSize", "What is the final send window size when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
+ _context.statManager().createRateStat("stream.receiveActive", "How many streams are active when a new one is received (period being not yet dropped)", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
+ }
+
+ Connection getConnectionByInboundId(long id) {
+ synchronized (_connectionLock) {
+ return (Connection)_connectionByInboundId.get(new Long(id));
+ }
+ }
+ /**
+ * not guaranteed to be unique, but in case we receive more than one packet
+ * on an inbound connection that we havent ack'ed yet...
+ */
+ Connection getConnectionByOutboundId(long id) {
+ synchronized (_connectionLock) {
+ for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
+ Connection con = (Connection)iter.next();
+ if (DataHelper.eq(con.getSendStreamId(), id))
+ return con;
+ }
+ }
+ return null;
+ }
+
+ public void setAllowIncomingConnections(boolean allow) {
+ _connectionHandler.setActive(allow);
+ }
+ /** should we acceot connections, or just reject everyone? */
+ public boolean getAllowIncomingConnections() {
+ return _connectionHandler.getActive();
+ }
+
+ /**
+ * Create a new connection based on the SYN packet we received.
+ *
+ * @return created Connection with the packet's data already delivered to
+ * it, or null if the syn's streamId was already taken
+ */
+ public Connection receiveConnection(Packet synPacket) {
+ Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, new ConnectionOptions(_defaultOptions));
+ long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
+ boolean reject = false;
+ int active = 0;
+ int total = 0;
+ synchronized (_connectionLock) {
+ total = _connectionByInboundId.size();
+ for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
+ if ( ((Connection)iter.next()).getIsConnected() )
+ active++;
+ }
+ if (locked_tooManyStreams()) {
+ reject = true;
+ } else {
+ while (true) {
+ Connection oldCon = (Connection)_connectionByInboundId.put(new Long(receiveId), con);
+ if (oldCon == null) {
+ break;
+ } else {
+ _connectionByInboundId.put(new Long(receiveId), oldCon);
+ // receiveId already taken, try another
+ receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
+ }
+ }
+ }
+ }
+
+ _context.statManager().addRateData("stream.receiveActive", active, total);
+
+ if (reject) {
+ if (_log.shouldLog(Log.WARN))
+ _log.warn("Refusing connection since we have exceeded our max of "
+ + _maxConcurrentStreams + " connections");
+ PacketLocal reply = new PacketLocal(_context, synPacket.getOptionalFrom());
+ reply.setFlag(Packet.FLAG_RESET);
+ reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
+ reply.setAckThrough(synPacket.getSequenceNum());
+ reply.setSendStreamId(synPacket.getReceiveStreamId());
+ reply.setReceiveStreamId(0);
+ reply.setOptionalFrom(_session.getMyDestination());
+ // this just sends the packet - no retries or whatnot
+ _outboundQueue.enqueue(reply);
+ return null;
+ }
+
+ con.setReceiveStreamId(receiveId);
+ try {
+ con.getPacketHandler().receivePacket(synPacket, con);
+ } catch (I2PException ie) {
+ synchronized (_connectionLock) {
+ _connectionByInboundId.remove(new Long(receiveId));
+ }
+ return null;
+ }
+
+ _context.statManager().addRateData("stream.connectionReceived", 1, 0);
+ return con;
+ }
+
+ private static final long DEFAULT_STREAM_DELAY_MAX = 10*1000;
+
+ /**
+ * Build a new connection to the given peer. This blocks if there is no
+ * connection delay, otherwise it returns immediately.
+ *
+ * @return new connection, or null if we have exceeded our limit
+ */
+ public Connection connect(Destination peer, ConnectionOptions opts) {
+ Connection con = null;
+ long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
+ long expiration = _context.clock().now() + opts.getConnectTimeout();
+ if (opts.getConnectTimeout() <= 0)
+ expiration = _context.clock().now() + DEFAULT_STREAM_DELAY_MAX;
+ _numWaiting++;
+ while (true) {
+ long remaining = expiration - _context.clock().now();
+ if (remaining <= 0) {
+ if (_log.shouldLog(Log.WARN))
+ _log.warn("Refusing to connect since we have exceeded our max of "
+ + _maxConcurrentStreams + " connections");
+ _numWaiting--;
+ return null;
+ }
+ boolean reject = false;
+ synchronized (_connectionLock) {
+ if (locked_tooManyStreams()) {
+ // allow a full buffer of pending/waiting streams
+ if (_numWaiting > _maxConcurrentStreams) {
+ if (_log.shouldLog(Log.WARN))
+ _log.warn("Refusing connection since we have exceeded our max of "
+ + _maxConcurrentStreams + " and there are " + _numWaiting
+ + " waiting already");
+ _numWaiting--;
+ return null;
+ }
+
+ // no remaining streams, lets wait a bit
+ try { _connectionLock.wait(remaining); } catch (InterruptedException ie) {}
+ } else {
+ con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts);
+ con.setRemotePeer(peer);
+
+ while (_connectionByInboundId.containsKey(new Long(receiveId))) {
+ receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
+ }
+ _connectionByInboundId.put(new Long(receiveId), con);
+ break; // stop looping as a psuedo-wait
+ }
+ }
+ }
- private I2PAppContext _context;
- private Log _log;
- private I2PSession _session;
- private MessageHandler _messageHandler;
- private PacketHandler _packetHandler;
- private ConnectionHandler _connectionHandler;
- private PacketQueue _outboundQueue;
- private SchedulerChooser _schedulerChooser;
- private ConnectionPacketHandler _conPacketHandler;
- /** Inbound stream ID (Long) to Connection map */
- private Map _connectionByInboundId;
- /** Ping ID (Long) to PingRequest */
- private Map _pendingPings;
- private boolean _allowIncoming;
- private int _maxConcurrentStreams;
- private ConnectionOptions _defaultOptions;
- private volatile int _numWaiting;
- private Object _connectionLock;
- private long SoTimeout;
+ // ok we're in...
+ con.setReceiveStreamId(receiveId);
+ con.eventOccurred();
+
+ _log.debug("Connect() conDelay = " + opts.getConnectDelay());
+ if (opts.getConnectDelay() <= 0) {
+ con.waitForConnect();
+ }
+ if (_numWaiting > 0)
+ _numWaiting--;
+
+ _context.statManager().addRateData("stream.connectionCreated", 1, 0);
+ return con;
+ }
- public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent, ConnectionOptions defaultOptions) {
- _context = context;
- _log = context.logManager().getLog(ConnectionManager.class);
- _connectionByInboundId = new HashMap(32);
- _pendingPings = new HashMap(4);
- _connectionLock = new Object();
- _messageHandler = new MessageHandler(context, this);
- _packetHandler = new PacketHandler(context, this);
- _connectionHandler = new ConnectionHandler(context, this);
- _schedulerChooser = new SchedulerChooser(context);
- _conPacketHandler = new ConnectionPacketHandler(context);
- _session = session;
- session.setSessionListener(_messageHandler);
- _outboundQueue = new PacketQueue(context, session, this);
- _allowIncoming = false;
- _maxConcurrentStreams = maxConcurrent;
- _defaultOptions = defaultOptions;
- _numWaiting = 0;
- /** Socket timeout for accept() */
- SoTimeout = -1;
+ private boolean locked_tooManyStreams() {
+ if (_maxConcurrentStreams <= 0) return false;
+ if (_connectionByInboundId.size() < _maxConcurrentStreams) return false;
+ int active = 0;
+ for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
+ Connection con = (Connection)iter.next();
+ if (con.getIsConnected())
+ active++;
+ }
+
+ if ( (_connectionByInboundId.size() > 100) && (_log.shouldLog(Log.INFO)) )
+ _log.info("More than 100 connections! " + active
+ + " total: " + _connectionByInboundId.size());
- _context.statManager().createRateStat("stream.con.lifetimeMessagesSent", "How many messages do we send on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
- _context.statManager().createRateStat("stream.con.lifetimeMessagesReceived", "How many messages do we receive on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
- _context.statManager().createRateStat("stream.con.lifetimeBytesSent", "How many bytes do we send on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
- _context.statManager().createRateStat("stream.con.lifetimeBytesReceived", "How many bytes do we receive on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
- _context.statManager().createRateStat("stream.con.lifetimeDupMessagesSent", "How many duplicate messages do we send on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
- _context.statManager().createRateStat("stream.con.lifetimeDupMessagesReceived", "How many duplicate messages do we receive on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
- _context.statManager().createRateStat("stream.con.lifetimeRTT", "What is the final RTT when a stream closes?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
- _context.statManager().createRateStat("stream.con.lifetimeCongestionSeenAt", "When was the last congestion seen at when a stream closes?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
- _context.statManager().createRateStat("stream.con.lifetimeSendWindowSize", "What is the final send window size when a stream closes?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
- _context.statManager().createRateStat("stream.receiveActive", "How many streams are active when a new one is received (period being not yet dropped)", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
- }
+ return (active >= _maxConcurrentStreams);
+ }
+
+ public MessageHandler getMessageHandler() { return _messageHandler; }
+ public PacketHandler getPacketHandler() { return _packetHandler; }
+ public ConnectionHandler getConnectionHandler() { return _connectionHandler; }
+ public I2PSession getSession() { return _session; }
+ public PacketQueue getPacketQueue() { return _outboundQueue; }
+
+ /**
+ * Something b0rked hard, so kill all of our connections without mercy.
+ * Don't bother sending close packets.
+ *
+ */
+ public void disconnectAllHard() {
+ synchronized (_connectionLock) {
+ for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
+ Connection con = (Connection)iter.next();
+ con.disconnect(false, false);
+ }
+ _connectionByInboundId.clear();
+ _connectionLock.notifyAll();
+ }
+ }
+
+ /**
+ * Drop the (already closed) connection on the floor.
+ *
+ */
+ public void removeConnection(Connection con) {
+ boolean removed = false;
+ synchronized (_connectionLock) {
+ Object o = _connectionByInboundId.remove(new Long(con.getReceiveStreamId()));
+ removed = (o == con);
+ if (_log.shouldLog(Log.DEBUG))
+ _log.debug("Connection removed? " + removed + " remaining: "
+ + _connectionByInboundId.size() + ": " + con);
+ if (!removed && _log.shouldLog(Log.DEBUG))
+ _log.debug("Failed to remove " + con +"\n" + _connectionByInboundId.values());
+ _connectionLock.notifyAll();
+ }
+ if (removed) {
+ _context.statManager().addRateData("stream.con.lifetimeMessagesSent", con.getLastSendId(), con.getLifetime());
+ _context.statManager().addRateData("stream.con.lifetimeMessagesReceived", con.getHighestAckedThrough(), con.getLifetime());
+ _context.statManager().addRateData("stream.con.lifetimeBytesSent", con.getLifetimeBytesSent(), con.getLifetime());
+ _context.statManager().addRateData("stream.con.lifetimeBytesReceived", con.getLifetimeBytesReceived(), con.getLifetime());
+ _context.statManager().addRateData("stream.con.lifetimeDupMessagesSent", con.getLifetimeDupMessagesSent(), con.getLifetime());
+ _context.statManager().addRateData("stream.con.lifetimeDupMessagesReceived", con.getLifetimeDupMessagesReceived(), con.getLifetime());
+ _context.statManager().addRateData("stream.con.lifetimeRTT", con.getOptions().getRTT(), con.getLifetime());
+ _context.statManager().addRateData("stream.con.lifetimeCongestionSeenAt", con.getLastCongestionSeenAt(), con.getLifetime());
+ _context.statManager().addRateData("stream.con.lifetimeSendWindowSize", con.getOptions().getWindowSize(), con.getLifetime());
+ }
+ }
+
+ /** return a set of Connection objects */
+ public Set listConnections() {
+ synchronized (_connectionLock) {
+ return new HashSet(_connectionByInboundId.values());
+ }
+ }
+
+ public boolean ping(Destination peer, long timeoutMs) {
+ return ping(peer, timeoutMs, true);
+ }
+ public boolean ping(Destination peer, long timeoutMs, boolean blocking) {
+ return ping(peer, timeoutMs, blocking, null, null, null);
+ }
+ public boolean ping(Destination peer, long timeoutMs, boolean blocking, SessionKey keyToUse, Set tagsToSend, PingNotifier notifier) {
+ Long id = new Long(_context.random().nextLong(Packet.MAX_STREAM_ID-1)+1);
+ PacketLocal packet = new PacketLocal(_context, peer);
+ packet.setSendStreamId(id.longValue());
+ packet.setFlag(Packet.FLAG_ECHO);
+ packet.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
+ packet.setOptionalFrom(_session.getMyDestination());
+ if ( (keyToUse != null) && (tagsToSend != null) ) {
+ packet.setKeyUsed(keyToUse);
+ packet.setTagsSent(tagsToSend);
+ }
+
+ PingRequest req = new PingRequest(peer, packet, notifier);
+
+ synchronized (_pendingPings) {
+ _pendingPings.put(id, req);
+ }
+
+ _outboundQueue.enqueue(packet);
+ packet.releasePayload();
+
+ if (blocking) {
+ synchronized (req) {
+ if (!req.pongReceived())
+ try { req.wait(timeoutMs); } catch (InterruptedException ie) {}
+ }
+
+ synchronized (_pendingPings) {
+ _pendingPings.remove(id);
+ }
+ } else {
+ SimpleTimer.getInstance().addEvent(new PingFailed(id, notifier), timeoutMs);
+ }
+
+ boolean ok = req.pongReceived();
+ return ok;
+ }
- Connection getConnectionByInboundId(long id) {
- synchronized(_connectionLock) {
- return (Connection)_connectionByInboundId.get(new Long(id));
- }
- }
-
- /**
- * not guaranteed to be unique, but in case we receive more than one packet
- * on an inbound connection that we havent ack'ed yet...
- */
- Connection getConnectionByOutboundId(long id) {
- synchronized(_connectionLock) {
- for(Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext();) {
- Connection con = (Connection)iter.next();
- if(DataHelper.eq(con.getSendStreamId(), id)) {
- return con;
- }
- }
- }
- return null;
- }
-
- /**
- * Set the socket accept() timeout.
- * @param x
- */
- public void MsetSoTimeout(long x) {
- SoTimeout = x;
- }
-
- /**
- * Get the socket accept() timeout.
- * @return
- */
- public long MgetSoTimeout() {
- return SoTimeout;
- }
-
- public void setAllowIncomingConnections(boolean allow) {
- _connectionHandler.setActive(allow);
- }
-
- /** should we acceot connections, or just reject everyone? */
- public boolean getAllowIncomingConnections() {
- return _connectionHandler.getActive();
- }
-
- /**
- * Create a new connection based on the SYN packet we received.
- *
- * @return created Connection with the packet's data already delivered to
- * it, or null if the syn's streamId was already taken
- */
- public Connection receiveConnection(Packet synPacket) {
- Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, new ConnectionOptions(_defaultOptions));
- long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID - 1) + 1;
- boolean reject = false;
- int active = 0;
- int total = 0;
- synchronized(_connectionLock) {
- total = _connectionByInboundId.size();
- for(Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext();) {
- if(((Connection)iter.next()).getIsConnected()) {
- active++;
- }
- }
- if(locked_tooManyStreams()) {
- reject = true;
- } else {
- while(true) {
- Connection oldCon = (Connection)_connectionByInboundId.put(new Long(receiveId), con);
- if(oldCon == null) {
- break;
- } else {
- _connectionByInboundId.put(new Long(receiveId), oldCon);
- // receiveId already taken, try another
- receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID - 1) + 1;
- }
- }
- }
- }
-
- _context.statManager().addRateData("stream.receiveActive", active, total);
-
- if(reject) {
- if(_log.shouldLog(Log.WARN)) {
- _log.warn("Refusing connection since we have exceeded our max of " + _maxConcurrentStreams + " connections");
- }
- PacketLocal reply = new PacketLocal(_context, synPacket.getOptionalFrom());
- reply.setFlag(Packet.FLAG_RESET);
- reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
- reply.setAckThrough(synPacket.getSequenceNum());
- reply.setSendStreamId(synPacket.getReceiveStreamId());
- reply.setReceiveStreamId(0);
- reply.setOptionalFrom(_session.getMyDestination());
- // this just sends the packet - no retries or whatnot
- _outboundQueue.enqueue(reply);
- return null;
- }
-
- con.setReceiveStreamId(receiveId);
- try {
- con.getPacketHandler().receivePacket(synPacket, con);
- } catch(I2PException ie) {
- synchronized(_connectionLock) {
- _connectionByInboundId.remove(new Long(receiveId));
- }
- return null;
- }
-
- _context.statManager().addRateData("stream.connectionReceived", 1, 0);
- return con;
- }
- private static final long DEFAULT_STREAM_DELAY_MAX = 10 * 1000;
-
- /**
- * Build a new connection to the given peer. This blocks if there is no
- * connection delay, otherwise it returns immediately.
- *
- * @return new connection, or null if we have exceeded our limit
- */
- public Connection connect(Destination peer, ConnectionOptions opts) {
- Connection con = null;
- long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID - 1) + 1;
- long expiration = _context.clock().now() + opts.getConnectTimeout();
- if(opts.getConnectTimeout() <= 0) {
- expiration = _context.clock().now() + DEFAULT_STREAM_DELAY_MAX;
- }
- _numWaiting++;
- while(true) {
- long remaining = expiration - _context.clock().now();
- if(remaining <= 0) {
- if(_log.shouldLog(Log.WARN)) {
- _log.warn("Refusing to connect since we have exceeded our max of " + _maxConcurrentStreams + " connections");
- }
- _numWaiting--;
- return null;
- }
- boolean reject = false;
- synchronized(_connectionLock) {
- if(locked_tooManyStreams()) {
- // allow a full buffer of pending/waiting streams
- if(_numWaiting > _maxConcurrentStreams) {
- if(_log.shouldLog(Log.WARN)) {
- _log.warn("Refusing connection since we have exceeded our max of " + _maxConcurrentStreams + " and there are " + _numWaiting + " waiting already");
- }
- _numWaiting--;
- return null;
- }
-
- // no remaining streams, lets wait a bit
- try {
- _connectionLock.wait(remaining);
- } catch(InterruptedException ie) {
- }
- } else {
- con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts);
- con.setRemotePeer(peer);
-
- while(_connectionByInboundId.containsKey(new Long(receiveId))) {
- receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID - 1) + 1;
- }
- _connectionByInboundId.put(new Long(receiveId), con);
- break; // stop looping as a psuedo-wait
- }
- }
- }
-
- // ok we're in...
- con.setReceiveStreamId(receiveId);
- con.eventOccurred();
-
- _log.debug("Connect() conDelay = " + opts.getConnectDelay());
- if(opts.getConnectDelay() <= 0) {
- con.waitForConnect();
- }
- if(_numWaiting > 0) {
- _numWaiting--;
- }
- _context.statManager().addRateData("stream.connectionCreated", 1, 0);
- return con;
- }
-
- private boolean locked_tooManyStreams() {
- if(_maxConcurrentStreams <= 0) {
- return false;
- }
- if(_connectionByInboundId.size() < _maxConcurrentStreams) {
- return false;
- }
- int active = 0;
- for(Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext();) {
- Connection con = (Connection)iter.next();
- if(con.getIsConnected()) {
- active++;
- }
- }
-
- if((_connectionByInboundId.size() > 100) && (_log.shouldLog(Log.INFO))) {
- _log.info("More than 100 connections! " + active + " total: " + _connectionByInboundId.size());
- }
- return (active >= _maxConcurrentStreams);
- }
-
- public MessageHandler getMessageHandler() {
- return _messageHandler;
- }
-
- public PacketHandler getPacketHandler() {
- return _packetHandler;
- }
-
- public ConnectionHandler getConnectionHandler() {
- return _connectionHandler;
- }
-
- public I2PSession getSession() {
- return _session;
- }
-
- public PacketQueue getPacketQueue() {
- return _outboundQueue;
- }
-
- /**
- * Something b0rked hard, so kill all of our connections without mercy.
- * Don't bother sending close packets.
- *
- */
- public void disconnectAllHard() {
- synchronized(_connectionLock) {
- for(Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext();) {
- Connection con = (Connection)iter.next();
- con.disconnect(false, false);
- }
- _connectionByInboundId.clear();
- _connectionLock.notifyAll();
- }
- }
-
- /**
- * Drop the (already closed) connection on the floor.
- *
- */
- public void removeConnection(Connection con) {
- boolean removed = false;
- synchronized(_connectionLock) {
- Object o = _connectionByInboundId.remove(new Long(con.getReceiveStreamId()));
- removed = (o == con);
- if(_log.shouldLog(Log.DEBUG)) {
- _log.debug("Connection removed? " + removed + " remaining: " + _connectionByInboundId.size() + ": " + con);
- }
- if(!removed && _log.shouldLog(Log.DEBUG)) {
- _log.debug("Failed to remove " + con + "\n" + _connectionByInboundId.values());
- }
- _connectionLock.notifyAll();
- }
- if(removed) {
- _context.statManager().addRateData("stream.con.lifetimeMessagesSent", con.getLastSendId(), con.getLifetime());
- _context.statManager().addRateData("stream.con.lifetimeMessagesReceived", con.getHighestAckedThrough(), con.getLifetime());
- _context.statManager().addRateData("stream.con.lifetimeBytesSent", con.getLifetimeBytesSent(), con.getLifetime());
- _context.statManager().addRateData("stream.con.lifetimeBytesReceived", con.getLifetimeBytesReceived(), con.getLifetime());
- _context.statManager().addRateData("stream.con.lifetimeDupMessagesSent", con.getLifetimeDupMessagesSent(), con.getLifetime());
- _context.statManager().addRateData("stream.con.lifetimeDupMessagesReceived", con.getLifetimeDupMessagesReceived(), con.getLifetime());
- _context.statManager().addRateData("stream.con.lifetimeRTT", con.getOptions().getRTT(), con.getLifetime());
- _context.statManager().addRateData("stream.con.lifetimeCongestionSeenAt", con.getLastCongestionSeenAt(), con.getLifetime());
- _context.statManager().addRateData("stream.con.lifetimeSendWindowSize", con.getOptions().getWindowSize(), con.getLifetime());
- }
- }
-
- /** return a set of Connection objects */
- public Set listConnections() {
- synchronized(_connectionLock) {
- return new HashSet(_connectionByInboundId.values());
- }
- }
-
- public boolean ping(Destination peer, long timeoutMs) {
- return ping(peer, timeoutMs, true);
- }
-
- public boolean ping(Destination peer, long timeoutMs, boolean blocking) {
- return ping(peer, timeoutMs, blocking, null, null, null);
- }
-
- public boolean ping(Destination peer, long timeoutMs, boolean blocking, SessionKey keyToUse, Set tagsToSend, PingNotifier notifier) {
- Long id = new Long(_context.random().nextLong(Packet.MAX_STREAM_ID - 1) + 1);
- PacketLocal packet = new PacketLocal(_context, peer);
- packet.setSendStreamId(id.longValue());
- packet.setFlag(Packet.FLAG_ECHO);
- packet.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
- packet.setOptionalFrom(_session.getMyDestination());
- if((keyToUse != null) && (tagsToSend != null)) {
- packet.setKeyUsed(keyToUse);
- packet.setTagsSent(tagsToSend);
- }
-
- PingRequest req = new PingRequest(peer, packet, notifier);
-
- synchronized(_pendingPings) {
- _pendingPings.put(id, req);
- }
-
- _outboundQueue.enqueue(packet);
- packet.releasePayload();
-
- if(blocking) {
- synchronized(req) {
- if(!req.pongReceived()) {
- try {
- req.wait(timeoutMs);
- } catch(InterruptedException ie) {
- }
- }
- }
-
- synchronized(_pendingPings) {
- _pendingPings.remove(id);
- }
- } else {
- SimpleTimer.getInstance().addEvent(new PingFailed(id, notifier), timeoutMs);
- }
-
- boolean ok = req.pongReceived();
- return ok;
- }
-
- interface PingNotifier {
-
- public void pingComplete(boolean ok);
- }
-
- private class PingFailed implements SimpleTimer.TimedEvent {
-
- private Long _id;
- private PingNotifier _notifier;
-
- public PingFailed(Long id, PingNotifier notifier) {
- _id = id;
- _notifier = notifier;
- }
-
- public void timeReached() {
- boolean removed = false;
- synchronized(_pendingPings) {
- Object o = _pendingPings.remove(_id);
- if(o != null) {
- removed = true;
- }
- }
- if(removed) {
- if(_notifier != null) {
- _notifier.pingComplete(false);
- }
- if(_log.shouldLog(Log.INFO)) {
- _log.info("Ping failed");
- }
- }
- }
- }
-
- private class PingRequest {
-
- private boolean _ponged;
- private Destination _peer;
- private PacketLocal _packet;
- private PingNotifier _notifier;
-
- public PingRequest(Destination peer, PacketLocal packet, PingNotifier notifier) {
- _ponged = false;
- _peer = peer;
- _packet = packet;
- _notifier = notifier;
- }
-
- public void pong() {
- _log.debug("Ping successful");
- _context.sessionKeyManager().tagsDelivered(_peer.getPublicKey(), _packet.getKeyUsed(), _packet.getTagsSent());
- synchronized(ConnectionManager.PingRequest.this) {
- _ponged = true;
- ConnectionManager.PingRequest.this.notifyAll();
- }
- if(_notifier != null) {
- _notifier.pingComplete(true);
- }
- }
-
- public boolean pongReceived() {
- return _ponged;
- }
- }
-
- void receivePong(long pingId) {
- PingRequest req = null;
- synchronized(_pendingPings) {
- req = (PingRequest)_pendingPings.remove(new Long(pingId));
- }
- if(req != null) {
- req.pong();
- }
- }
+ interface PingNotifier {
+ public void pingComplete(boolean ok);
+ }
+
+ private class PingFailed implements SimpleTimer.TimedEvent {
+ private Long _id;
+ private PingNotifier _notifier;
+ public PingFailed(Long id, PingNotifier notifier) {
+ _id = id;
+ _notifier = notifier;
+ }
+
+ public void timeReached() {
+ boolean removed = false;
+ synchronized (_pendingPings) {
+ Object o = _pendingPings.remove(_id);
+ if (o != null)
+ removed = true;
+ }
+ if (removed) {
+ if (_notifier != null)
+ _notifier.pingComplete(false);
+ if (_log.shouldLog(Log.INFO))
+ _log.info("Ping failed");
+ }
+ }
+ }
+
+ private class PingRequest {
+ private boolean _ponged;
+ private Destination _peer;
+ private PacketLocal _packet;
+ private PingNotifier _notifier;
+ public PingRequest(Destination peer, PacketLocal packet, PingNotifier notifier) {
+ _ponged = false;
+ _peer = peer;
+ _packet = packet;
+ _notifier = notifier;
+ }
+ public void pong() {
+ _log.debug("Ping successful");
+ _context.sessionKeyManager().tagsDelivered(_peer.getPublicKey(), _packet.getKeyUsed(), _packet.getTagsSent());
+ synchronized (ConnectionManager.PingRequest.this) {
+ _ponged = true;
+ ConnectionManager.PingRequest.this.notifyAll();
+ }
+ if (_notifier != null)
+ _notifier.pingComplete(true);
+ }
+ public boolean pongReceived() { return _ponged; }
+ }
+
+ void receivePong(long pingId) {
+ PingRequest req = null;
+ synchronized (_pendingPings) {
+ req = (PingRequest)_pendingPings.remove(new Long(pingId));
+ }
+ if (req != null)
+ req.pong();
+ }
}
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java
index b85459f63..b1a4175f2 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java
@@ -1,8 +1,5 @@
package net.i2p.client.streaming;
-import java.net.SocketTimeoutException;
-import java.util.logging.Level;
-import java.util.logging.Logger;
import net.i2p.I2PException;
/**
@@ -10,46 +7,17 @@ import net.i2p.I2PException;
*
*/
public class I2PServerSocketFull implements I2PServerSocket {
-
- private I2PSocketManagerFull _socketManager;
-
- /**
- *
- * @param mgr
- */
- public I2PServerSocketFull(I2PSocketManagerFull mgr) {
- _socketManager = mgr;
- }
-
- /**
- *
- * @return
- * @throws net.i2p.I2PException
- * @throws SocketTimeoutException
- */
- public I2PSocket accept() throws I2PException, SocketTimeoutException {
- return _socketManager.receiveSocket();
- }
-
- public long getSoTimeout() {
- return _socketManager.getConnectionManager().MgetSoTimeout();
- }
-
- public void setSoTimeout(long x) {
- _socketManager.getConnectionManager().MsetSoTimeout(x);
- }
- /**
- * Close the connection.
- */
- public void close() {
- _socketManager.getConnectionManager().setAllowIncomingConnections(false);
- }
-
- /**
- *
- * @return _socketManager
- */
- public I2PSocketManager getManager() {
- return _socketManager;
- }
+ private I2PSocketManagerFull _socketManager;
+
+ public I2PServerSocketFull(I2PSocketManagerFull mgr) {
+ _socketManager = mgr;
+ }
+
+ public I2PSocket accept() throws I2PException {
+ return _socketManager.receiveSocket();
+ }
+
+ public void close() { _socketManager.getConnectionManager().setAllowIncomingConnections(false); }
+
+ public I2PSocketManager getManager() { return _socketManager; }
}
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java
index 842cf791b..61dd48757 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java
@@ -11,139 +11,119 @@ import net.i2p.data.Destination;
*
*/
public class I2PSocketFull implements I2PSocket {
-
- private Connection _connection;
- private I2PSocket.SocketErrorListener _listener;
- private Destination _remotePeer;
- private Destination _localPeer;
-
- public I2PSocketFull(Connection con) {
- _connection = con;
- if(con != null) {
- _remotePeer = con.getRemotePeer();
- _localPeer = con.getSession().getMyDestination();
- }
- }
-
-
- public void close() throws IOException {
- Connection c = _connection;
- if(c == null) {
- return;
- }
- if(c.getIsConnected()) {
- OutputStream out = c.getOutputStream();
- if(out != null) {
- try {
- out.close();
- } catch(IOException ioe) {
- // ignore any write error, as we want to keep on and kill the
- // con (thanks Complication!)
- }
- }
- c.disconnect(true);
- } else {
- //throw new IOException("Not connected");
- }
- destroy();
- }
-
- Connection getConnection() {
- return _connection;
- }
-
- public InputStream getInputStream() {
- Connection c = _connection;
- if(c != null) {
- return c.getInputStream();
- } else {
- return null;
- }
- }
-
- public I2PSocketOptions getOptions() {
- Connection c = _connection;
- if(c != null) {
- return c.getOptions();
- } else {
- return null;
- }
- }
-
- public OutputStream getOutputStream() throws IOException {
- Connection c = _connection;
- if(c != null) {
- return c.getOutputStream();
- } else {
- return null;
- }
- }
-
- public Destination getPeerDestination() {
- return _remotePeer;
- }
-
- public long getReadTimeout() {
- I2PSocketOptions opts = getOptions();
- if(opts != null) {
- return opts.getReadTimeout();
- } else {
- return -1;
- }
- }
-
- public Destination getThisDestination() {
- return _localPeer;
- }
-
- public void setOptions(I2PSocketOptions options) {
- Connection c = _connection;
- if(c == null) {
- return;
- }
- if(options instanceof ConnectionOptions) {
- c.setOptions((ConnectionOptions)options);
- } else {
- c.setOptions(new ConnectionOptions(options));
- }
- }
-
- public void setReadTimeout(long ms) {
- Connection c = _connection;
- if(c == null) {
- return;
- }
- c.getInputStream().setReadTimeout((int)ms);
- c.getOptions().setReadTimeout(ms);
- }
-
- public void setSocketErrorListener(I2PSocket.SocketErrorListener lsnr) {
- _listener = lsnr;
- }
-
- public boolean isClosed() {
- Connection c = _connection;
- return ((c == null) ||
- (!c.getIsConnected()) ||
- (c.getResetReceived()) ||
- (c.getResetSent()));
- }
-
- void destroy() {
- Connection c = _connection;
- _connection = null;
- _listener = null;
- if(c != null) {
- c.disconnectComplete();
- }
- }
-
- public String toString() {
- Connection c = _connection;
- if(c == null) {
- return super.toString();
- } else {
- return c.toString();
- }
- }
+ private Connection _connection;
+ private I2PSocket.SocketErrorListener _listener;
+ private Destination _remotePeer;
+ private Destination _localPeer;
+
+ public I2PSocketFull(Connection con) {
+ _connection = con;
+ if (con != null) {
+ _remotePeer = con.getRemotePeer();
+ _localPeer = con.getSession().getMyDestination();
+ }
+ }
+
+ public void close() throws IOException {
+ Connection c = _connection;
+ if (c == null) return;
+ if (c.getIsConnected()) {
+ OutputStream out = c.getOutputStream();
+ if (out != null) {
+ try {
+ out.close();
+ } catch (IOException ioe) {
+ // ignore any write error, as we want to keep on and kill the
+ // con (thanks Complication!)
+ }
+ }
+ c.disconnect(true);
+ } else {
+ //throw new IOException("Not connected");
+ }
+ destroy();
+ }
+
+ Connection getConnection() { return _connection; }
+
+ public InputStream getInputStream() {
+ Connection c = _connection;
+ if (c != null)
+ return c.getInputStream();
+ else
+ return null;
+ }
+
+ public I2PSocketOptions getOptions() {
+ Connection c = _connection;
+ if (c != null)
+ return c.getOptions();
+ else
+ return null;
+ }
+
+ public OutputStream getOutputStream() throws IOException {
+ Connection c = _connection;
+ if (c != null)
+ return c.getOutputStream();
+ else
+ return null;
+ }
+
+ public Destination getPeerDestination() { return _remotePeer; }
+
+ public long getReadTimeout() {
+ I2PSocketOptions opts = getOptions();
+ if (opts != null)
+ return opts.getReadTimeout();
+ else
+ return -1;
+ }
+
+ public Destination getThisDestination() { return _localPeer; }
+
+ public void setOptions(I2PSocketOptions options) {
+ Connection c = _connection;
+ if (c == null) return;
+
+ if (options instanceof ConnectionOptions)
+ c.setOptions((ConnectionOptions)options);
+ else
+ c.setOptions(new ConnectionOptions(options));
+ }
+
+ public void setReadTimeout(long ms) {
+ Connection c = _connection;
+ if (c == null) return;
+
+ c.getInputStream().setReadTimeout((int)ms);
+ c.getOptions().setReadTimeout(ms);
+ }
+
+ public void setSocketErrorListener(I2PSocket.SocketErrorListener lsnr) {
+ _listener = lsnr;
+ }
+
+ public boolean isClosed() {
+ Connection c = _connection;
+ return ((c == null) ||
+ (!c.getIsConnected()) ||
+ (c.getResetReceived()) ||
+ (c.getResetSent()));
+ }
+
+ void destroy() {
+ Connection c = _connection;
+ _connection = null;
+ _listener = null;
+ if (c != null)
+ c.disconnectComplete();
+ }
+ public String toString() {
+ Connection c = _connection;
+ if (c == null)
+ return super.toString();
+ else
+ return c.toString();
+ }
}
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java
index b0d1c841a..7384a4972 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java
@@ -1,7 +1,6 @@
package net.i2p.client.streaming;
import java.net.NoRouteToHostException;
-import java.net.SocketTimeoutException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Properties;
@@ -14,6 +13,7 @@ import net.i2p.client.I2PSessionException;
import net.i2p.data.Destination;
import net.i2p.util.Log;
+
/**
* Centralize the coordination and multiplexing of the local client's streaming.
* There should be one I2PSocketManager for each I2PSession, and if an application
@@ -23,317 +23,219 @@ import net.i2p.util.Log;
*
*/
public class I2PSocketManagerFull implements I2PSocketManager {
+ private I2PAppContext _context;
+ private Log _log;
+ private I2PSession _session;
+ private I2PServerSocketFull _serverSocket;
+ private ConnectionOptions _defaultOptions;
+ private long _acceptTimeout;
+ private String _name;
+ private int _maxStreams;
+ private static int __managerId = 0;
+ private ConnectionManager _connectionManager;
+
+ /**
+ * How long to wait for the client app to accept() before sending back CLOSE?
+ * This includes the time waiting in the queue. Currently set to 5 seconds.
+ */
+ private static final long ACCEPT_TIMEOUT_DEFAULT = 5*1000;
+
+ public I2PSocketManagerFull() {
+ _context = null;
+ _session = null;
+ }
+ public I2PSocketManagerFull(I2PAppContext context, I2PSession session, Properties opts, String name) {
+ this();
+ init(context, session, opts, name);
+ }
+
+ /** how many streams will we allow at once? */
+ public static final String PROP_MAX_STREAMS = "i2p.streaming.maxConcurrentStreams";
+
+ /**
+ *
+ */
+ public void init(I2PAppContext context, I2PSession session, Properties opts, String name) {
+ _context = context;
+ _session = session;
+ _log = _context.logManager().getLog(I2PSocketManagerFull.class);
+
+ _maxStreams = -1;
+ try {
+ String num = (opts != null ? opts.getProperty(PROP_MAX_STREAMS, "-1") : "-1");
+ _maxStreams = Integer.parseInt(num);
+ } catch (NumberFormatException nfe) {
+ if (_log.shouldLog(Log.WARN))
+ _log.warn("Invalid max # of concurrent streams, defaulting to unlimited", nfe);
+ _maxStreams = -1;
+ }
+ _name = name + " " + (++__managerId);
+ _acceptTimeout = ACCEPT_TIMEOUT_DEFAULT;
+ _defaultOptions = new ConnectionOptions(opts);
+ _connectionManager = new ConnectionManager(_context, _session, _maxStreams, _defaultOptions);
+ _serverSocket = new I2PServerSocketFull(this);
+
+ if (_log.shouldLog(Log.INFO)) {
+ _log.info("Socket manager created. \ndefault options: " + _defaultOptions
+ + "\noriginal properties: " + opts);
+ }
+ }
- private I2PAppContext _context;
- private Log _log;
- private I2PSession _session;
- private I2PServerSocketFull _serverSocket;
- private ConnectionOptions _defaultOptions;
- private long _acceptTimeout;
- private String _name;
- private int _maxStreams;
- private static int __managerId = 0;
- private ConnectionManager _connectionManager;
- /**
- * How long to wait for the client app to accept() before sending back CLOSE?
- * This includes the time waiting in the queue. Currently set to 5 seconds.
- */
- private static final long ACCEPT_TIMEOUT_DEFAULT = 5 * 1000;
+ public I2PSocketOptions buildOptions() { return buildOptions(null); }
+ public I2PSocketOptions buildOptions(Properties opts) {
+ ConnectionOptions curOpts = new ConnectionOptions(_defaultOptions);
+ curOpts.setProperties(opts);
+ return curOpts;
+ }
+
+ public I2PSession getSession() {
+ return _session;
+ }
+
+ public ConnectionManager getConnectionManager() {
+ return _connectionManager;
+ }
- /**
- *
- */
- public I2PSocketManagerFull() {
- _context = null;
- _session = null;
- }
+ public I2PSocket receiveSocket() throws I2PException {
+ verifySession();
+ Connection con = _connectionManager.getConnectionHandler().accept(-1);
+ if (_log.shouldLog(Log.DEBUG))
+ _log.debug("receiveSocket() called: " + con);
+ if (con != null) {
+ I2PSocketFull sock = new I2PSocketFull(con);
+ con.setSocket(sock);
+ return sock;
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Ping the specified peer, returning true if they replied to the ping within
+ * the timeout specified, false otherwise. This call blocks.
+ *
+ */
+ public boolean ping(Destination peer, long timeoutMs) {
+ return _connectionManager.ping(peer, timeoutMs);
+ }
- /**
- *
- * @param context
- * @param session
- * @param opts
- * @param name
- */
- public I2PSocketManagerFull(I2PAppContext context, I2PSession session, Properties opts, String name) {
- this();
- init(context, session, opts, name);
- }
- /** how many streams will we allow at once? */
- public static final String PROP_MAX_STREAMS = "i2p.streaming.maxConcurrentStreams";
+ /**
+ * How long should we wait for the client to .accept() a socket before
+ * sending back a NACK/Close?
+ *
+ * @param ms milliseconds to wait, maximum
+ */
+ public void setAcceptTimeout(long ms) { _acceptTimeout = ms; }
+ public long getAcceptTimeout() { return _acceptTimeout; }
- /**
- *
- *
- * @param context
- * @param session
- * @param opts
- * @param name
- */
- public void init(I2PAppContext context, I2PSession session, Properties opts, String name) {
- _context = context;
- _session = session;
- _log = _context.logManager().getLog(I2PSocketManagerFull.class);
+ public void setDefaultOptions(I2PSocketOptions options) {
+ _defaultOptions = new ConnectionOptions((ConnectionOptions) options);
+ }
- _maxStreams = -1;
- try {
- String num = (opts != null ? opts.getProperty(PROP_MAX_STREAMS, "-1") : "-1");
- _maxStreams = Integer.parseInt(num);
- } catch(NumberFormatException nfe) {
- if(_log.shouldLog(Log.WARN)) {
- _log.warn("Invalid max # of concurrent streams, defaulting to unlimited", nfe);
- }
- _maxStreams = -1;
- }
- _name = name + " " + (++__managerId);
- _acceptTimeout = ACCEPT_TIMEOUT_DEFAULT;
- _defaultOptions = new ConnectionOptions(opts);
- _connectionManager = new ConnectionManager(_context, _session, _maxStreams, _defaultOptions);
- _serverSocket = new I2PServerSocketFull(this);
+ public I2PSocketOptions getDefaultOptions() {
+ return _defaultOptions;
+ }
- if(_log.shouldLog(Log.INFO)) {
- _log.info("Socket manager created. \ndefault options: " + _defaultOptions + "\noriginal properties: " + opts);
- }
- }
+ public I2PServerSocket getServerSocket() {
+ _connectionManager.setAllowIncomingConnections(true);
+ return _serverSocket;
+ }
- /**
- *
- * @return
- */
- public I2PSocketOptions buildOptions() {
- return buildOptions(null);
- }
+ private void verifySession() throws I2PException {
+ if (!_connectionManager.getSession().isClosed())
+ return;
+ _connectionManager.getSession().connect();
+ }
+
+ /**
+ * Create a new connected socket (block until the socket is created)
+ *
+ * @param peer Destination to connect to
+ * @param options I2P socket options to be used for connecting
+ *
+ * @throws NoRouteToHostException if the peer is not found or not reachable
+ * @throws I2PException if there is some other I2P-related problem
+ */
+ public I2PSocket connect(Destination peer, I2PSocketOptions options)
+ throws I2PException, NoRouteToHostException {
+ verifySession();
+ if (options == null)
+ options = _defaultOptions;
+ ConnectionOptions opts = null;
+ if (options instanceof ConnectionOptions)
+ opts = new ConnectionOptions((ConnectionOptions)options);
+ else
+ opts = new ConnectionOptions(options);
+
+ if (_log.shouldLog(Log.INFO))
+ _log.info("Connecting to " + peer.calculateHash().toBase64().substring(0,6)
+ + " with options: " + opts);
+ Connection con = _connectionManager.connect(peer, opts);
+ if (con == null)
+ throw new TooManyStreamsException("Too many streams (max " + _maxStreams + ")");
+ I2PSocketFull socket = new I2PSocketFull(con);
+ con.setSocket(socket);
+ if (con.getConnectionError() != null) {
+ con.disconnect(false);
+ throw new NoRouteToHostException(con.getConnectionError());
+ }
+ return socket;
+ }
- /**
- *
- * @param opts
- * @return
- */
- public I2PSocketOptions buildOptions(Properties opts) {
- ConnectionOptions curOpts = new ConnectionOptions(_defaultOptions);
- curOpts.setProperties(opts);
- return curOpts;
- }
+ /**
+ * Create a new connected socket (block until the socket is created)
+ *
+ * @param peer Destination to connect to
+ *
+ * @throws NoRouteToHostException if the peer is not found or not reachable
+ * @throws I2PException if there is some other I2P-related problem
+ */
+ public I2PSocket connect(Destination peer) throws I2PException, NoRouteToHostException {
+ return connect(peer, _defaultOptions);
+ }
- /**
- *
- * @return
- */
- public I2PSession getSession() {
- return _session;
- }
+ /**
+ * Destroy the socket manager, freeing all the associated resources. This
+ * method will block untill all the managed sockets are closed.
+ *
+ */
+ public void destroySocketManager() {
+ _connectionManager.disconnectAllHard();
+ _connectionManager.setAllowIncomingConnections(false);
+ // should we destroy the _session too?
+ // yes, since the old lib did (and SAM wants it to, and i dont know why not)
+ if ( (_session != null) && (!_session.isClosed()) ) {
+ try {
+ _session.destroySession();
+ } catch (I2PSessionException ise) {
+ _log.warn("Unable to destroy the session", ise);
+ }
+ }
+ }
- /**
- *
- * @return
- */
- public ConnectionManager getConnectionManager() {
- return _connectionManager;
- }
+ /**
+ * Retrieve a set of currently connected I2PSockets, either initiated locally or remotely.
+ *
+ */
+ public Set listSockets() {
+ Set connections = _connectionManager.listConnections();
+ Set rv = new HashSet(connections.size());
+ for (Iterator iter = connections.iterator(); iter.hasNext(); ) {
+ Connection con = (Connection)iter.next();
+ if (con.getSocket() != null)
+ rv.add(con.getSocket());
+ }
+ return rv;
+ }
- /**
- *
- * @return
- * @throws net.i2p.I2PException
- * @throws java.net.SocketTimeoutException
- */
- public I2PSocket receiveSocket() throws I2PException, SocketTimeoutException {
- verifySession();
- Connection con = _connectionManager.getConnectionHandler().accept(_connectionManager.MgetSoTimeout());
- if(_log.shouldLog(Log.DEBUG)) {
- _log.debug("receiveSocket() called: " + con);
- }
- if(con != null) {
- I2PSocketFull sock = new I2PSocketFull(con);
- con.setSocket(sock);
- return sock;
- } else {
- if(_connectionManager.MgetSoTimeout() == -1) {
- return null;
- }
- throw new SocketTimeoutException("I2PSocket timed out");
- }
- }
-
- /**
- * Ping the specified peer, returning true if they replied to the ping within
- * the timeout specified, false otherwise. This call blocks.
- *
- *
- * @param peer
- * @param timeoutMs
- * @return
- */
- public boolean ping(Destination peer, long timeoutMs) {
- return _connectionManager.ping(peer, timeoutMs);
- }
-
- /**
- * How long should we wait for the client to .accept() a socket before
- * sending back a NACK/Close?
- *
- * @param ms milliseconds to wait, maximum
- */
- public void setAcceptTimeout(long ms) {
- _acceptTimeout = ms;
- }
-
- /**
- *
- * @return
- */
- public long getAcceptTimeout() {
- return _acceptTimeout;
- }
-
- /**
- *
- * @param options
- */
- public void setDefaultOptions(I2PSocketOptions options) {
- _defaultOptions = new ConnectionOptions((ConnectionOptions)options);
- }
-
- /**
- *
- * @return
- */
- public I2PSocketOptions getDefaultOptions() {
- return _defaultOptions;
- }
-
- /**
- *
- * @return
- */
- public I2PServerSocket getServerSocket() {
- _connectionManager.setAllowIncomingConnections(true);
- return _serverSocket;
- }
-
- private void verifySession() throws I2PException {
- if(!_connectionManager.getSession().isClosed()) {
- return;
- }
- _connectionManager.getSession().connect();
- }
-
- /**
- * Create a new connected socket (block until the socket is created)
- *
- * @param peer Destination to connect to
- * @param options I2P socket options to be used for connecting
- *
- * @throws NoRouteToHostException if the peer is not found or not reachable
- * @throws I2PException if there is some other I2P-related problem
- */
- public I2PSocket connect(Destination peer, I2PSocketOptions options)
- throws I2PException, NoRouteToHostException {
- verifySession();
- if(options == null) {
- options = _defaultOptions;
- }
- ConnectionOptions opts = null;
- if(options instanceof ConnectionOptions) {
- opts = new ConnectionOptions((ConnectionOptions)options);
- } else {
- opts = new ConnectionOptions(options);
- }
- if(_log.shouldLog(Log.INFO)) {
- _log.info("Connecting to " + peer.calculateHash().toBase64().substring(0, 6) + " with options: " + opts);
- }
- Connection con = _connectionManager.connect(peer, opts);
- if(con == null) {
- throw new TooManyStreamsException("Too many streams (max " + _maxStreams + ")");
- }
- I2PSocketFull socket = new I2PSocketFull(con);
- con.setSocket(socket);
- if(con.getConnectionError() != null) {
- con.disconnect(false);
- throw new NoRouteToHostException(con.getConnectionError());
- }
- return socket;
- }
-
- /**
- * Create a new connected socket (block until the socket is created)
- *
- * @param peer Destination to connect to
- *
- * @return
- * @throws NoRouteToHostException if the peer is not found or not reachable
- * @throws I2PException if there is some other I2P-related problem
- */
- public I2PSocket connect(Destination peer) throws I2PException, NoRouteToHostException {
- return connect(peer, _defaultOptions);
- }
-
- /**
- * Destroy the socket manager, freeing all the associated resources. This
- * method will block untill all the managed sockets are closed.
- *
- */
- public void destroySocketManager() {
- _connectionManager.disconnectAllHard();
- _connectionManager.setAllowIncomingConnections(false);
- // should we destroy the _session too?
- // yes, since the old lib did (and SAM wants it to, and i dont know why not)
- if((_session != null) && (!_session.isClosed())) {
- try {
- _session.destroySession();
- } catch(I2PSessionException ise) {
- _log.warn("Unable to destroy the session", ise);
- }
- }
- }
-
- /**
- * Retrieve a set of currently connected I2PSockets, either initiated locally or remotely.
- *
- *
- * @return
- */
- public Set listSockets() {
- Set connections = _connectionManager.listConnections();
- Set rv = new HashSet(connections.size());
- for(Iterator iter = connections.iterator(); iter.hasNext();) {
- Connection con = (Connection)iter.next();
- if(con.getSocket() != null) {
- rv.add(con.getSocket());
- }
- }
- return rv;
- }
-
- /**
- *
- * @return
- */
- public String getName() {
- return _name;
- }
-
- /**
- *
- * @param name
- */
- public void setName(String name) {
- _name = name;
- }
-
- /**
- *
- * @param lsnr
- */
- public void addDisconnectListener(I2PSocketManager.DisconnectListener lsnr) {
- _connectionManager.getMessageHandler().addDisconnectListener(lsnr);
- }
-
- /**
- *
- * @param lsnr
- */
- public void removeDisconnectListener(I2PSocketManager.DisconnectListener lsnr) {
- _connectionManager.getMessageHandler().removeDisconnectListener(lsnr);
- }
+ public String getName() { return _name; }
+ public void setName(String name) { _name = name; }
+
+
+ public void addDisconnectListener(I2PSocketManager.DisconnectListener lsnr) {
+ _connectionManager.getMessageHandler().addDisconnectListener(lsnr);
+ }
+ public void removeDisconnectListener(I2PSocketManager.DisconnectListener lsnr) {
+ _connectionManager.getMessageHandler().removeDisconnectListener(lsnr);
+ }
}
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/RetransmissionTimer.java b/apps/streaming/java/src/net/i2p/client/streaming/RetransmissionTimer.java
index c52c373b1..0ea0c83d7 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/RetransmissionTimer.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/RetransmissionTimer.java
@@ -5,7 +5,7 @@ import net.i2p.util.SimpleTimer;
/**
*
*/
-public class RetransmissionTimer extends SimpleTimer {
+class RetransmissionTimer extends SimpleTimer {
private static final RetransmissionTimer _instance = new RetransmissionTimer();
public static final SimpleTimer getInstance() { return _instance; }
protected RetransmissionTimer() { super("StreamingTimer"); }
diff --git a/core/java/src/net/i2p/util/Executor.java b/core/java/src/net/i2p/util/Executor.java
index c5955c999..e3c1b6fbf 100644
--- a/core/java/src/net/i2p/util/Executor.java
+++ b/core/java/src/net/i2p/util/Executor.java
@@ -5,59 +5,42 @@ import java.util.List;
import net.i2p.I2PAppContext;
class Executor implements Runnable {
+ private I2PAppContext _context;
+ private Log _log;
+ private List _readyEvents;
+ public Executor(I2PAppContext ctx, Log log, List events) {
+ _context = ctx;
+ _readyEvents = events;
+ }
+ public void run() {
+ while (true) {
+ SimpleTimer.TimedEvent evt = null;
+ synchronized (_readyEvents) {
+ if (_readyEvents.size() <= 0)
+ try { _readyEvents.wait(); } catch (InterruptedException ie) {}
+ if (_readyEvents.size() > 0)
+ evt = (SimpleTimer.TimedEvent)_readyEvents.remove(0);
+ }
- private I2PAppContext _context;
- private Log _log;
- private List _readyEvents;
- private SimpleStore runn;
-
- public Executor(I2PAppContext ctx, Log log, List events, SimpleStore x) {
- _context = ctx;
- _readyEvents = events;
- runn = x;
- }
-
- public void run() {
- while(runn.getAnswer()) {
- SimpleTimer.TimedEvent evt = null;
- synchronized(_readyEvents) {
- if(_readyEvents.size() <= 0) {
- try {
- _readyEvents.wait();
- } catch(InterruptedException ie) {
- }
- }
- if(_readyEvents.size() > 0) {
- evt = (SimpleTimer.TimedEvent)_readyEvents.remove(0);
- }
- }
-
- if(evt != null) {
- long before = _context.clock().now();
- try {
- evt.timeReached();
- } catch(Throwable t) {
- log("wtf, event borked: " + evt, t);
- }
- long time = _context.clock().now() - before;
- if((time > 1000) && (_log != null) && (_log.shouldLog(Log.WARN))) {
- _log.warn("wtf, event execution took " + time + ": " + evt);
- }
- }
- }
- }
-
- /**
- *
- * @param msg
- * @param t
- */
- private void log(String msg, Throwable t) {
- synchronized(this) {
- if(_log == null) {
- _log = I2PAppContext.getGlobalContext().logManager().getLog(SimpleTimer.class);
- }
- }
- _log.log(Log.CRIT, msg, t);
- }
+ if (evt != null) {
+ long before = _context.clock().now();
+ try {
+ evt.timeReached();
+ } catch (Throwable t) {
+ log("wtf, event borked: " + evt, t);
+ }
+ long time = _context.clock().now() - before;
+ if ( (time > 1000) && (_log != null) && (_log.shouldLog(Log.WARN)) )
+ _log.warn("wtf, event execution took " + time + ": " + evt);
+ }
+ }
+ }
+
+ private void log(String msg, Throwable t) {
+ synchronized (this) {
+ if (_log == null)
+ _log = I2PAppContext.getGlobalContext().logManager().getLog(SimpleTimer.class);
+ }
+ _log.log(Log.CRIT, msg, t);
+ }
}
diff --git a/core/java/src/net/i2p/util/SimpleStore.java b/core/java/src/net/i2p/util/SimpleStore.java
deleted file mode 100644
index b73a8e7eb..000000000
--- a/core/java/src/net/i2p/util/SimpleStore.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * This is free software, do as you please.
- */
-
-package net.i2p.util;
-
-/**
- *
- * @author sponge
- */
-public class SimpleStore {
-
- private boolean answer;
-
- SimpleStore(boolean x) {
- answer=x;
- }
-
- /**
- * set the answer
- *
- * @param x
- */
- public void setAnswer(boolean x) {
- answer = x;
- }
- /**
- *
- * @return boolean
- */
- public boolean getAnswer() {
- return answer;
- }
-
-}
diff --git a/core/java/src/net/i2p/util/SimpleTimer.java b/core/java/src/net/i2p/util/SimpleTimer.java
index 5595fbd5c..9543f72c5 100644
--- a/core/java/src/net/i2p/util/SimpleTimer.java
+++ b/core/java/src/net/i2p/util/SimpleTimer.java
@@ -16,262 +16,211 @@ import net.i2p.I2PAppContext;
*
*/
public class SimpleTimer {
+ private static final SimpleTimer _instance = new SimpleTimer();
+ public static SimpleTimer getInstance() { return _instance; }
+ private I2PAppContext _context;
+ private Log _log;
+ /** event time (Long) to event (TimedEvent) mapping */
+ private TreeMap _events;
+ /** event (TimedEvent) to event time (Long) mapping */
+ private Map _eventTimes;
+ private List _readyEvents;
+
+ protected SimpleTimer() { this("SimpleTimer"); }
+ protected SimpleTimer(String name) {
+ _context = I2PAppContext.getGlobalContext();
+ _log = _context.logManager().getLog(SimpleTimer.class);
+ _events = new TreeMap();
+ _eventTimes = new HashMap(256);
+ _readyEvents = new ArrayList(4);
+ I2PThread runner = new I2PThread(new SimpleTimerRunner());
+ runner.setName(name);
+ runner.setDaemon(true);
+ runner.start();
+ for (int i = 0; i < 3; i++) {
+ I2PThread executor = new I2PThread(new Executor(_context, _log, _readyEvents));
+ executor.setName(name + "Executor " + i);
+ executor.setDaemon(true);
+ executor.start();
+ }
+ }
+
+ public void reschedule(TimedEvent event, long timeoutMs) {
+ addEvent(event, timeoutMs, false);
+ }
+
+ /**
+ * Queue up the given event to be fired no sooner than timeoutMs from now.
+ * However, if this event is already scheduled, the event will be scheduled
+ * for the earlier of the two timeouts, which may be before this stated
+ * timeout. If this is not the desired behavior, call removeEvent first.
+ *
+ */
+ public void addEvent(TimedEvent event, long timeoutMs) { addEvent(event, timeoutMs, true); }
+ /**
+ * @param useEarliestTime if its already scheduled, use the earlier of the
+ * two timeouts, else use the later
+ */
+ public void addEvent(TimedEvent event, long timeoutMs, boolean useEarliestTime) {
+ int totalEvents = 0;
+ long now = System.currentTimeMillis();
+ long eventTime = now + timeoutMs;
+ Long time = new Long(eventTime);
+ synchronized (_events) {
+ // remove the old scheduled position, then reinsert it
+ Long oldTime = (Long)_eventTimes.get(event);
+ if (oldTime != null) {
+ if (useEarliestTime) {
+ if (oldTime.longValue() < eventTime) {
+ _events.notifyAll();
+ return; // already scheduled for sooner than requested
+ } else {
+ _events.remove(oldTime);
+ }
+ } else {
+ if (oldTime.longValue() > eventTime) {
+ _events.notifyAll();
+ return; // already scheduled for later than the given period
+ } else {
+ _events.remove(oldTime);
+ }
+ }
+ }
+ while (_events.containsKey(time))
+ time = new Long(time.longValue() + 1);
+ _events.put(time, event);
+ _eventTimes.put(event, time);
+
+ if ( (_events.size() != _eventTimes.size()) ) {
+ _log.error("Skewed events: " + _events.size() + " for " + _eventTimes.size());
+ for (Iterator iter = _eventTimes.keySet().iterator(); iter.hasNext(); ) {
+ TimedEvent evt = (TimedEvent)iter.next();
+ Long when = (Long)_eventTimes.get(evt);
+ TimedEvent cur = (TimedEvent)_events.get(when);
+ if (cur != evt) {
+ _log.error("event " + evt + " @ " + when + ": " + cur);
+ }
+ }
+ }
+
+ totalEvents = _events.size();
+ _events.notifyAll();
+ }
+ if (time.longValue() > eventTime + 100) {
+ if (_log.shouldLog(Log.WARN))
+ _log.warn("Lots of timer congestion, had to push " + event + " back "
+ + (time.longValue()-eventTime) + "ms (# events: " + totalEvents + ")");
+ }
+ long timeToAdd = System.currentTimeMillis() - now;
+ if (timeToAdd > 50) {
+ if (_log.shouldLog(Log.WARN))
+ _log.warn("timer contention: took " + timeToAdd + "ms to add a job with " + totalEvents + " queued");
+ }
+
+ }
+
+ public boolean removeEvent(TimedEvent evt) {
+ if (evt == null) return false;
+ synchronized (_events) {
+ Long when = (Long)_eventTimes.remove(evt);
+ if (when != null)
+ _events.remove(when);
+ return null != when;
+ }
+ }
+
+ /**
+ * Simple interface for events to be queued up and notified on expiration
+ */
+ public interface TimedEvent {
+ /**
+ * the time requested has been reached (this call should NOT block,
+ * otherwise the whole SimpleTimer gets backed up)
+ *
+ */
+ public void timeReached();
+ }
+
+ private long _occurredTime;
+ private long _occurredEventCount;
+ private TimedEvent _recentEvents[] = new TimedEvent[5];
+
+ private class SimpleTimerRunner implements Runnable {
+ public void run() {
+ List eventsToFire = new ArrayList(1);
+ while (true) {
+ try {
+ synchronized (_events) {
+ //if (_events.size() <= 0)
+ // _events.wait();
+ //if (_events.size() > 100)
+ // _log.warn("> 100 events! " + _events.values());
+ long now = System.currentTimeMillis();
+ long nextEventDelay = -1;
+ Object nextEvent = null;
+ while (true) {
+ if (_events.size() <= 0) break;
+ Long when = (Long)_events.firstKey();
+ if (when.longValue() <= now) {
+ TimedEvent evt = (TimedEvent)_events.remove(when);
+ if (evt != null) {
+ _eventTimes.remove(evt);
+ eventsToFire.add(evt);
+ }
+ } else {
+ nextEventDelay = when.longValue() - now;
+ nextEvent = _events.get(when);
+ break;
+ }
+ }
+ if (eventsToFire.size() <= 0) {
+ if (nextEventDelay != -1) {
+ if (_log.shouldLog(Log.DEBUG))
+ _log.debug("Next event in " + nextEventDelay + ": " + nextEvent);
+ _events.wait(nextEventDelay);
+ } else {
+ _events.wait();
+ }
+ }
+ }
+ } catch (ThreadDeath td) {
+ return; // die
+ } catch (InterruptedException ie) {
+ // ignore
+ } catch (Throwable t) {
+ if (_log != null) {
+ _log.log(Log.CRIT, "Uncaught exception in the SimpleTimer!", t);
+ } else {
+ System.err.println("Uncaught exception in SimpleTimer");
+ t.printStackTrace();
+ }
+ }
+
+ long now = System.currentTimeMillis();
+ now = now - (now % 1000);
- private static final SimpleTimer _instance = new SimpleTimer();
+ synchronized (_readyEvents) {
+ for (int i = 0; i < eventsToFire.size(); i++)
+ _readyEvents.add(eventsToFire.get(i));
+ _readyEvents.notifyAll();
+ }
- public static SimpleTimer getInstance() {
- return _instance;
- }
- private I2PAppContext _context;
- private Log _log;
- /** event time (Long) to event (TimedEvent) mapping */
- private TreeMap _events;
- /** event (TimedEvent) to event time (Long) mapping */
- private Map _eventTimes;
- private List _readyEvents;
- private SimpleStore runn;
+ if (_occurredTime == now) {
+ _occurredEventCount += eventsToFire.size();
+ } else {
+ _occurredTime = now;
+ if (_occurredEventCount > 2500) {
+ StringBuffer buf = new StringBuffer(128);
+ buf.append("Too many simpleTimerJobs (").append(_occurredEventCount);
+ buf.append(") in a second!");
+ _log.log(Log.WARN, buf.toString());
+ }
+ _occurredEventCount = 0;
+ }
- /**
- *
- */
- protected SimpleTimer() {
- this("SimpleTimer");
- }
-
- /**
- *
- * @param name
- */
- protected SimpleTimer(String name) {
- runn = new SimpleStore(true);
- _context = I2PAppContext.getGlobalContext();
- _log = _context.logManager().getLog(SimpleTimer.class);
- _events = new TreeMap();
- _eventTimes = new HashMap(256);
- _readyEvents = new ArrayList(4);
- I2PThread runner = new I2PThread(new SimpleTimerRunner());
- runner.setName(name);
- runner.setDaemon(true);
- runner.start();
- for(int i = 0; i < 3; i++) {
- I2PThread executor = new I2PThread(new Executor(_context, _log, _readyEvents, runn));
- executor.setName(name + "Executor " + i);
- executor.setDaemon(true);
- executor.start();
- }
- }
-
- /**
- * Removes the SimpleTimer.
- */
- public void removeSimpleTimer() {
- synchronized(_events) {
- runn.setAnswer(false);
- _events.notifyAll();
- }
- }
-
- /**
- *
- * @param event
- * @param timeoutMs
- */
- public void reschedule(TimedEvent event, long timeoutMs) {
- addEvent(event, timeoutMs, false);
- }
-
- /**
- * Queue up the given event to be fired no sooner than timeoutMs from now.
- * However, if this event is already scheduled, the event will be scheduled
- * for the earlier of the two timeouts, which may be before this stated
- * timeout. If this is not the desired behavior, call removeEvent first.
- *
- * @param event
- * @param timeoutMs
- */
- public void addEvent(TimedEvent event, long timeoutMs) {
- addEvent(event, timeoutMs, true);
- }
-
- /**
- * @param event
- * @param timeoutMs
- * @param useEarliestTime if its already scheduled, use the earlier of the
- * two timeouts, else use the later
- */
- public void addEvent(TimedEvent event, long timeoutMs, boolean useEarliestTime) {
- int totalEvents = 0;
- long now = System.currentTimeMillis();
- long eventTime = now + timeoutMs;
- Long time = new Long(eventTime);
- synchronized(_events) {
- // remove the old scheduled position, then reinsert it
- Long oldTime = (Long)_eventTimes.get(event);
- if(oldTime != null) {
- if(useEarliestTime) {
- if(oldTime.longValue() < eventTime) {
- _events.notifyAll();
- return; // already scheduled for sooner than requested
- } else {
- _events.remove(oldTime);
- }
- } else {
- if(oldTime.longValue() > eventTime) {
- _events.notifyAll();
- return; // already scheduled for later than the given period
- } else {
- _events.remove(oldTime);
- }
- }
- }
- while(_events.containsKey(time)) {
- time = new Long(time.longValue() + 1);
- }
- _events.put(time, event);
- _eventTimes.put(event, time);
-
- if((_events.size() != _eventTimes.size())) {
- _log.error("Skewed events: " + _events.size() + " for " + _eventTimes.size());
- for(Iterator iter = _eventTimes.keySet().iterator(); iter.hasNext();) {
- TimedEvent evt = (TimedEvent)iter.next();
- Long when = (Long)_eventTimes.get(evt);
- TimedEvent cur = (TimedEvent)_events.get(when);
- if(cur != evt) {
- _log.error("event " + evt + " @ " + when + ": " + cur);
- }
- }
- }
-
- totalEvents = _events.size();
- _events.notifyAll();
- }
- if(time.longValue() > eventTime + 100) {
- if(_log.shouldLog(Log.WARN)) {
- _log.warn("Lots of timer congestion, had to push " + event + " back " + (time.longValue() - eventTime) + "ms (# events: " + totalEvents + ")");
- }
- }
- long timeToAdd = System.currentTimeMillis() - now;
- if(timeToAdd > 50) {
- if(_log.shouldLog(Log.WARN)) {
- _log.warn("timer contention: took " + timeToAdd + "ms to add a job with " + totalEvents + " queued");
- }
- }
-
- }
-
- /**
- *
- * @param evt
- * @return
- */
- public boolean removeEvent(TimedEvent evt) {
- if(evt == null) {
- return false;
- }
- synchronized(_events) {
- Long when = (Long)_eventTimes.remove(evt);
- if(when != null) {
- _events.remove(when);
- }
- return null != when;
- }
- }
-
- /**
- * Simple interface for events to be queued up and notified on expiration
- */
- public interface TimedEvent {
-
- /**
- * the time requested has been reached (this call should NOT block,
- * otherwise the whole SimpleTimer gets backed up)
- *
- */
- public void timeReached();
- }
- private long _occurredTime;
- private long _occurredEventCount;
- // not used
- // private TimedEvent _recentEvents[] = new TimedEvent[5];
- private class SimpleTimerRunner implements Runnable {
-
- public void run() {
- List eventsToFire = new ArrayList(1);
- while(runn.getAnswer()) {
- try {
- synchronized(_events) {
- //if (_events.size() <= 0)
- // _events.wait();
- //if (_events.size() > 100)
- // _log.warn("> 100 events! " + _events.values());
- long now = System.currentTimeMillis();
- long nextEventDelay = -1;
- Object nextEvent = null;
- while(runn.getAnswer()) {
- if(_events.size() <= 0) {
- break;
- }
- Long when = (Long)_events.firstKey();
- if(when.longValue() <= now) {
- TimedEvent evt = (TimedEvent)_events.remove(when);
- if(evt != null) {
- _eventTimes.remove(evt);
- eventsToFire.add(evt);
- }
- } else {
- nextEventDelay = when.longValue() - now;
- nextEvent = _events.get(when);
- break;
- }
- }
- if(eventsToFire.size() <= 0) {
- if(nextEventDelay != -1) {
- if(_log.shouldLog(Log.DEBUG)) {
- _log.debug("Next event in " + nextEventDelay + ": " + nextEvent);
- }
- _events.wait(nextEventDelay);
- } else {
- _events.wait();
- }
- }
- }
- } catch(InterruptedException ie) {
- // ignore
- } catch(Throwable t) {
- if(_log != null) {
- _log.log(Log.CRIT, "Uncaught exception in the SimpleTimer!", t);
- } else {
- System.err.println("Uncaught exception in SimpleTimer");
- t.printStackTrace();
- }
- }
-
- long now = System.currentTimeMillis();
- now = now - (now % 1000);
-
- synchronized(_readyEvents) {
- for(int i = 0; i < eventsToFire.size(); i++) {
- _readyEvents.add(eventsToFire.get(i));
- }
- _readyEvents.notifyAll();
- }
-
- if(_occurredTime == now) {
- _occurredEventCount += eventsToFire.size();
- } else {
- _occurredTime = now;
- if(_occurredEventCount > 2500) {
- StringBuffer buf = new StringBuffer(128);
- buf.append("Too many simpleTimerJobs (").append(_occurredEventCount);
- buf.append(") in a second!");
- _log.log(Log.WARN, buf.toString());
- }
- _occurredEventCount = 0;
- }
-
- eventsToFire.clear();
- }
- }
- }
+ eventsToFire.clear();
+ }
+ }
+ }
}