From b0313bd6bf7ac422fd8fed64ddb13a1b1761a82d Mon Sep 17 00:00:00 2001 From: sponge Date: Sat, 27 Sep 2008 16:00:06 +0000 Subject: [PATCH] disapproval of revision '7ed18fd4c3a5430150a2d76bfe202bc491115974' --- .../net/i2p/i2ptunnel/I2PTunnelServer.java | 3 - .../i2p/client/streaming/I2PServerSocket.java | 17 +- .../client/streaming/I2PServerSocketImpl.java | 55 ++----- .../client/streaming/StreamSinkServer.java | 55 ++----- .../client/streaming/ConnectionHandler.java | 1 - .../client/streaming/ConnectionManager.java | 148 +++++------------- .../client/streaming/I2PServerSocketFull.java | 38 +---- .../i2p/client/streaming/I2PSocketFull.java | 62 +++----- .../streaming/I2PSocketManagerFull.java | 146 +++-------------- .../client/streaming/RetransmissionTimer.java | 2 +- core/java/src/net/i2p/util/Executor.java | 31 +--- core/java/src/net/i2p/util/SimpleStore.java | 35 ----- core/java/src/net/i2p/util/SimpleTimer.java | 93 +++-------- 13 files changed, 146 insertions(+), 540 deletions(-) delete mode 100644 core/java/src/net/i2p/util/SimpleStore.java diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java index ce7b230f2..014d91e9b 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java @@ -12,7 +12,6 @@ import java.net.ConnectException; import java.net.InetAddress; import java.net.Socket; import java.net.SocketException; -import java.net.SocketTimeoutException; import java.util.Iterator; import java.util.Properties; @@ -220,8 +219,6 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { if (_log.shouldLog(Log.ERROR)) _log.error("Error accepting", ce); // not killing the server.. - } catch(SocketTimeoutException ste) { - // ignored, we never set the timeout } } } diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java index a925b5354..726d462ce 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java @@ -2,7 +2,6 @@ package net.i2p.client.streaming; import java.net.ConnectException; -import java.net.SocketTimeoutException; import net.i2p.I2PException; /** @@ -10,7 +9,6 @@ import net.i2p.I2PException; * */ public interface I2PServerSocket { - /** * Closes the socket. */ @@ -26,21 +24,8 @@ public interface I2PServerSocket { * @throws I2PException if there is a problem with reading a new socket * from the data available (aka the I2PSession closed, etc) * @throws ConnectException if the I2PServerSocket is closed - * @throws SocketTimeoutException - */ - public I2PSocket accept() throws I2PException, ConnectException, SocketTimeoutException; - - /** - * Set Sock Option accept timeout - * @param x - */ - public void setSoTimeout(long x); - - /** - * Get Sock Option accept timeout - * @return timeout */ - public long getSoTimeout(); + public I2PSocket accept() throws I2PException, ConnectException; /** * Access the manager which is coordinating the server socket diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java index c9b16b1b2..965ba31bf 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java @@ -17,33 +17,19 @@ import net.i2p.util.Log; * */ class I2PServerSocketImpl implements I2PServerSocket { - private final static Log _log = new Log(I2PServerSocketImpl.class); private I2PSocketManager mgr; /** list of sockets waiting for the client to accept them */ private List pendingSockets = Collections.synchronizedList(new ArrayList(4)); + /** have we been closed */ private volatile boolean closing = false; + /** lock on this when accepting a pending socket, and wait on it for notification of acceptance */ private Object socketAcceptedLock = new Object(); /** lock on this when adding a new socket to the pending list, and wait on it accordingly */ private Object socketAddedLock = new Object(); - /** - * Set Sock Option accept timeout stub, does nothing - * @param x - */ - public void setSoTimeout(long x) { - } - - /** - * Get Sock Option accept timeout stub, does nothing - * @return timeout - */ - public long getSoTimeout() { - return -1; - } - public I2PServerSocketImpl(I2PSocketManager mgr) { this.mgr = mgr; } @@ -61,22 +47,19 @@ class I2PServerSocketImpl implements I2PServerSocket { * @throws ConnectException if the I2PServerSocket is closed */ public I2PSocket accept() throws I2PException, ConnectException { - if(_log.shouldLog(Log.DEBUG)) { + if (_log.shouldLog(Log.DEBUG)) _log.debug("accept() called, pending: " + pendingSockets.size()); - } + I2PSocket ret = null; while ( (ret == null) && (!closing) ){ while (pendingSockets.size() <= 0) { - if(closing) { - throw new ConnectException("I2PServerSocket closed"); - } + if (closing) throw new ConnectException("I2PServerSocket closed"); try { synchronized(socketAddedLock) { socketAddedLock.wait(); } - } catch(InterruptedException ie) { - } + } catch (InterruptedException ie) {} } synchronized (pendingSockets) { if (pendingSockets.size() > 0) { @@ -90,9 +73,8 @@ class I2PServerSocketImpl implements I2PServerSocket { } } - if(_log.shouldLog(Log.DEBUG)) { + if (_log.shouldLog(Log.DEBUG)) _log.debug("TIMING: handed out accept result " + ret.hashCode()); - } return ret; } @@ -106,13 +88,12 @@ class I2PServerSocketImpl implements I2PServerSocket { * or the socket was closed */ public boolean addWaitForAccept(I2PSocket s, long timeoutMs) { - if(_log.shouldLog(Log.DEBUG)) { + if (_log.shouldLog(Log.DEBUG)) _log.debug("addWaitForAccept [new socket arrived [" + s.toString() + "], pending: " + pendingSockets.size()); - } + if (closing) { - if(_log.shouldLog(Log.WARN)) { + if (_log.shouldLog(Log.WARN)) _log.warn("Already closing the socket"); - } return false; } @@ -129,16 +110,14 @@ class I2PServerSocketImpl implements I2PServerSocket { while (pendingSockets.contains(s)) { long now = clock.now(); if (now >= end) { - if(_log.shouldLog(Log.INFO)) { + if (_log.shouldLog(Log.INFO)) _log.info("Expired while waiting for accept (time elapsed =" + (now - start) + "ms) for socket " + s.toString()); - } pendingSockets.remove(s); return false; } if (closing) { - if(_log.shouldLog(Log.WARN)) { + if (_log.shouldLog(Log.WARN)) _log.warn("Server socket closed while waiting for accept"); - } pendingSockets.remove(s); return false; } @@ -147,13 +126,11 @@ class I2PServerSocketImpl implements I2PServerSocket { synchronized (socketAcceptedLock) { socketAcceptedLock.wait(remaining); } - } catch(InterruptedException ie) { - } + } catch (InterruptedException ie) {} } long now = clock.now(); - if(_log.shouldLog(Log.DEBUG)) { + if (_log.shouldLog(Log.DEBUG)) _log.info("Socket accepted after " + (now-start) + "ms for socket " + s.toString()); - } return true; } @@ -169,7 +146,5 @@ class I2PServerSocketImpl implements I2PServerSocket { } } - public I2PSocketManager getManager() { - return mgr; - } + public I2PSocketManager getManager() { return mgr; } } diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java b/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java index ad8d07989..c8b566190 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java @@ -5,7 +5,6 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.net.ConnectException; -import java.net.SocketTimeoutException; import java.util.Properties; import net.i2p.I2PAppContext; @@ -21,7 +20,6 @@ import net.i2p.util.Log; * */ public class StreamSinkServer { - private Log _log; private String _sinkDir; private String _destFile; @@ -38,7 +36,6 @@ public class StreamSinkServer { public StreamSinkServer(String sinkDir, String ourDestFile) { this(sinkDir, ourDestFile, null, -1, 3); } - public StreamSinkServer(String sinkDir, String ourDestFile, String i2cpHost, int i2cpPort, int handlers) { _sinkDir = sinkDir; _destFile = ourDestFile; @@ -55,15 +52,13 @@ public class StreamSinkServer { */ public void runServer() { I2PSocketManager mgr = null; - if(_i2cpHost != null) { + if (_i2cpHost != null) mgr = I2PSocketManagerFactory.createManager(_i2cpHost, _i2cpPort, new Properties()); - } else { + else mgr = I2PSocketManagerFactory.createManager(); - } Destination dest = mgr.getSession().getMyDestination(); - if(_log.shouldLog(Log.INFO)) { + if (_log.shouldLog(Log.INFO)) _log.info("Listening for connections on: " + dest.calculateHash().toBase64()); - } FileOutputStream fos = null; try { fos = new FileOutputStream(_destFile); @@ -75,12 +70,7 @@ public class StreamSinkServer { _log.error("Error formatting the destination", dfe); return; } finally { - if(fos != null) { - try { - fos.close(); - } catch(IOException ioe) { - } - } + if (fos != null) try { fos.close(); } catch (IOException ioe) {} } I2PServerSocket sock = mgr.getServerSocket(); @@ -101,28 +91,22 @@ public class StreamSinkServer { * */ private class ClientRunner implements Runnable { - private I2PServerSocket _socket; - public ClientRunner(I2PServerSocket socket) { _socket = socket; } - public void run() { while (true) { try { I2PSocket socket = _socket.accept(); - if(socket != null) { + if (socket != null) handle(socket); - } } catch (I2PException ie) { _log.error("Error accepting connection", ie); return; } catch (ConnectException ce) { _log.error("Connection already dropped", ce); return; - } catch(SocketTimeoutException ste) { - // ignored } } } @@ -131,14 +115,12 @@ public class StreamSinkServer { FileOutputStream fos = null; try { File sink = new File(_sinkDir); - if(!sink.exists()) { + if (!sink.exists()) sink.mkdirs(); - } File cur = File.createTempFile("clientSink", ".dat", sink); fos = new FileOutputStream(cur); - if(_log.shouldLog(Log.DEBUG)) { + if (_log.shouldLog(Log.DEBUG)) _log.debug("Writing to " + cur.getAbsolutePath()); - } } catch (IOException ioe) { _log.error("Error creating sink", ioe); return; @@ -153,28 +135,17 @@ public class StreamSinkServer { while ( (read = in.read(buf)) != -1) { //_fos.write(buf, 0, read); written += read; - if(_log.shouldLog(Log.DEBUG)) { + if (_log.shouldLog(Log.DEBUG)) _log.debug("read and wrote " + read + " (" + written + ")"); } - } fos.write(("written: [" + written + "]\n").getBytes()); long lifetime = System.currentTimeMillis() - start; _log.info("Got EOF from client socket [written=" + written + " lifetime=" + lifetime + "]"); } catch (IOException ioe) { _log.error("Error writing the sink", ioe); } finally { - if(fos != null) { - try { - fos.close(); - } catch(IOException ioe) { - } - } - if(sock != null) { - try { - sock.close(); - } catch(IOException ioe) { - } - } + if (fos != null) try { fos.close(); } catch (IOException ioe) {} + if (sock != null) try { sock.close(); } catch (IOException ioe) {} _log.debug("Client socket closed"); } } @@ -203,8 +174,7 @@ public class StreamSinkServer { if (args.length == 5) { try { handlers = Integer.parseInt(args[4]); - } catch(NumberFormatException nfe) { - } + } catch (NumberFormatException nfe) {} } try { int port = Integer.parseInt(args[1]); @@ -216,8 +186,7 @@ public class StreamSinkServer { default: System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [handlers]"); } - if(server != null) { + if (server != null) server.runServer(); } } -} diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java index f05ae1c8c..4960f1a22 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java @@ -1,6 +1,5 @@ package net.i2p.client.streaming; -import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.List; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java index d2223c181..dcc93c5ec 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java @@ -21,7 +21,6 @@ import net.i2p.util.SimpleTimer; * */ public class ConnectionManager { - private I2PAppContext _context; private Log _log; private I2PSession _session; @@ -40,7 +39,6 @@ public class ConnectionManager { private ConnectionOptions _defaultOptions; private volatile int _numWaiting; private Object _connectionLock; - private long SoTimeout; public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent, ConnectionOptions defaultOptions) { _context = context; @@ -60,9 +58,6 @@ public class ConnectionManager { _maxConcurrentStreams = maxConcurrent; _defaultOptions = defaultOptions; _numWaiting = 0; - /** Socket timeout for accept() */ - SoTimeout = -1; - _context.statManager().createRateStat("stream.con.lifetimeMessagesSent", "How many messages do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("stream.con.lifetimeMessagesReceived", "How many messages do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("stream.con.lifetimeBytesSent", "How many bytes do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); @@ -80,7 +75,6 @@ public class ConnectionManager { return (Connection)_connectionByInboundId.get(new Long(id)); } } - /** * not guaranteed to be unique, but in case we receive more than one packet * on an inbound connection that we havent ack'ed yet... @@ -89,34 +83,16 @@ public class ConnectionManager { synchronized (_connectionLock) { for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) { Connection con = (Connection)iter.next(); - if(DataHelper.eq(con.getSendStreamId(), id)) { + if (DataHelper.eq(con.getSendStreamId(), id)) return con; } } - } return null; } - /** - * Set the socket accept() timeout. - * @param x - */ - public void MsetSoTimeout(long x) { - SoTimeout = x; - } - - /** - * Get the socket accept() timeout. - * @return - */ - public long MgetSoTimeout() { - return SoTimeout; - } - public void setAllowIncomingConnections(boolean allow) { _connectionHandler.setActive(allow); } - /** should we acceot connections, or just reject everyone? */ public boolean getAllowIncomingConnections() { return _connectionHandler.getActive(); @@ -137,10 +113,9 @@ public class ConnectionManager { synchronized (_connectionLock) { total = _connectionByInboundId.size(); for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) { - if(((Connection)iter.next()).getIsConnected()) { + if ( ((Connection)iter.next()).getIsConnected() ) active++; } - } if (locked_tooManyStreams()) { reject = true; } else { @@ -160,9 +135,9 @@ public class ConnectionManager { _context.statManager().addRateData("stream.receiveActive", active, total); if (reject) { - if(_log.shouldLog(Log.WARN)) { - _log.warn("Refusing connection since we have exceeded our max of " + _maxConcurrentStreams + " connections"); - } + if (_log.shouldLog(Log.WARN)) + _log.warn("Refusing connection since we have exceeded our max of " + + _maxConcurrentStreams + " connections"); PacketLocal reply = new PacketLocal(_context, synPacket.getOptionalFrom()); reply.setFlag(Packet.FLAG_RESET); reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED); @@ -188,6 +163,7 @@ public class ConnectionManager { _context.statManager().addRateData("stream.connectionReceived", 1, 0); return con; } + private static final long DEFAULT_STREAM_DELAY_MAX = 10*1000; /** @@ -200,16 +176,15 @@ public class ConnectionManager { Connection con = null; long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1; long expiration = _context.clock().now() + opts.getConnectTimeout(); - if(opts.getConnectTimeout() <= 0) { + if (opts.getConnectTimeout() <= 0) expiration = _context.clock().now() + DEFAULT_STREAM_DELAY_MAX; - } _numWaiting++; while (true) { long remaining = expiration - _context.clock().now(); if (remaining <= 0) { - if(_log.shouldLog(Log.WARN)) { - _log.warn("Refusing to connect since we have exceeded our max of " + _maxConcurrentStreams + " connections"); - } + if (_log.shouldLog(Log.WARN)) + _log.warn("Refusing to connect since we have exceeded our max of " + + _maxConcurrentStreams + " connections"); _numWaiting--; return null; } @@ -218,18 +193,16 @@ public class ConnectionManager { if (locked_tooManyStreams()) { // allow a full buffer of pending/waiting streams if (_numWaiting > _maxConcurrentStreams) { - if(_log.shouldLog(Log.WARN)) { - _log.warn("Refusing connection since we have exceeded our max of " + _maxConcurrentStreams + " and there are " + _numWaiting + " waiting already"); - } + if (_log.shouldLog(Log.WARN)) + _log.warn("Refusing connection since we have exceeded our max of " + + _maxConcurrentStreams + " and there are " + _numWaiting + + " waiting already"); _numWaiting--; return null; } // no remaining streams, lets wait a bit - try { - _connectionLock.wait(remaining); - } catch(InterruptedException ie) { - } + try { _connectionLock.wait(remaining); } catch (InterruptedException ie) {} } else { con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts); con.setRemotePeer(peer); @@ -251,53 +224,35 @@ public class ConnectionManager { if (opts.getConnectDelay() <= 0) { con.waitForConnect(); } - if(_numWaiting > 0) { + if (_numWaiting > 0) _numWaiting--; - } + _context.statManager().addRateData("stream.connectionCreated", 1, 0); return con; } private boolean locked_tooManyStreams() { - if(_maxConcurrentStreams <= 0) { - return false; - } - if(_connectionByInboundId.size() < _maxConcurrentStreams) { - return false; - } + if (_maxConcurrentStreams <= 0) return false; + if (_connectionByInboundId.size() < _maxConcurrentStreams) return false; int active = 0; for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) { Connection con = (Connection)iter.next(); - if(con.getIsConnected()) { + if (con.getIsConnected()) active++; } - } - if((_connectionByInboundId.size() > 100) && (_log.shouldLog(Log.INFO))) { - _log.info("More than 100 connections! " + active + " total: " + _connectionByInboundId.size()); - } + if ( (_connectionByInboundId.size() > 100) && (_log.shouldLog(Log.INFO)) ) + _log.info("More than 100 connections! " + active + + " total: " + _connectionByInboundId.size()); + return (active >= _maxConcurrentStreams); } - public MessageHandler getMessageHandler() { - return _messageHandler; - } - - public PacketHandler getPacketHandler() { - return _packetHandler; - } - - public ConnectionHandler getConnectionHandler() { - return _connectionHandler; - } - - public I2PSession getSession() { - return _session; - } - - public PacketQueue getPacketQueue() { - return _outboundQueue; - } + public MessageHandler getMessageHandler() { return _messageHandler; } + public PacketHandler getPacketHandler() { return _packetHandler; } + public ConnectionHandler getConnectionHandler() { return _connectionHandler; } + public I2PSession getSession() { return _session; } + public PacketQueue getPacketQueue() { return _outboundQueue; } /** * Something b0rked hard, so kill all of our connections without mercy. @@ -324,12 +279,11 @@ public class ConnectionManager { synchronized (_connectionLock) { Object o = _connectionByInboundId.remove(new Long(con.getReceiveStreamId())); removed = (o == con); - if(_log.shouldLog(Log.DEBUG)) { - _log.debug("Connection removed? " + removed + " remaining: " + _connectionByInboundId.size() + ": " + con); - } - if(!removed && _log.shouldLog(Log.DEBUG)) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Connection removed? " + removed + " remaining: " + + _connectionByInboundId.size() + ": " + con); + if (!removed && _log.shouldLog(Log.DEBUG)) _log.debug("Failed to remove " + con +"\n" + _connectionByInboundId.values()); - } _connectionLock.notifyAll(); } if (removed) { @@ -355,11 +309,9 @@ public class ConnectionManager { public boolean ping(Destination peer, long timeoutMs) { return ping(peer, timeoutMs, true); } - public boolean ping(Destination peer, long timeoutMs, boolean blocking) { return ping(peer, timeoutMs, blocking, null, null, null); } - public boolean ping(Destination peer, long timeoutMs, boolean blocking, SessionKey keyToUse, Set tagsToSend, PingNotifier notifier) { Long id = new Long(_context.random().nextLong(Packet.MAX_STREAM_ID-1)+1); PacketLocal packet = new PacketLocal(_context, peer); @@ -383,12 +335,8 @@ public class ConnectionManager { if (blocking) { synchronized (req) { - if(!req.pongReceived()) { - try { - req.wait(timeoutMs); - } catch(InterruptedException ie) { - } - } + if (!req.pongReceived()) + try { req.wait(timeoutMs); } catch (InterruptedException ie) {} } synchronized (_pendingPings) { @@ -403,15 +351,12 @@ public class ConnectionManager { } interface PingNotifier { - public void pingComplete(boolean ok); } private class PingFailed implements SimpleTimer.TimedEvent { - private Long _id; private PingNotifier _notifier; - public PingFailed(Long id, PingNotifier notifier) { _id = id; _notifier = notifier; @@ -421,35 +366,29 @@ public class ConnectionManager { boolean removed = false; synchronized (_pendingPings) { Object o = _pendingPings.remove(_id); - if(o != null) { + if (o != null) removed = true; } - } if (removed) { - if(_notifier != null) { + if (_notifier != null) _notifier.pingComplete(false); - } - if(_log.shouldLog(Log.INFO)) { + if (_log.shouldLog(Log.INFO)) _log.info("Ping failed"); } } } - } private class PingRequest { - private boolean _ponged; private Destination _peer; private PacketLocal _packet; private PingNotifier _notifier; - public PingRequest(Destination peer, PacketLocal packet, PingNotifier notifier) { _ponged = false; _peer = peer; _packet = packet; _notifier = notifier; } - public void pong() { _log.debug("Ping successful"); _context.sessionKeyManager().tagsDelivered(_peer.getPublicKey(), _packet.getKeyUsed(), _packet.getTagsSent()); @@ -457,14 +396,10 @@ public class ConnectionManager { _ponged = true; ConnectionManager.PingRequest.this.notifyAll(); } - if(_notifier != null) { + if (_notifier != null) _notifier.pingComplete(true); } - } - - public boolean pongReceived() { - return _ponged; - } + public boolean pongReceived() { return _ponged; } } void receivePong(long pingId) { @@ -472,8 +407,7 @@ public class ConnectionManager { synchronized (_pendingPings) { req = (PingRequest)_pendingPings.remove(new Long(pingId)); } - if(req != null) { + if (req != null) req.pong(); } } -} diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java index 68f8c0045..b1a4175f2 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java @@ -1,8 +1,5 @@ package net.i2p.client.streaming; -import java.net.SocketTimeoutException; -import java.util.logging.Level; -import java.util.logging.Logger; import net.i2p.I2PException; /** @@ -10,46 +7,17 @@ import net.i2p.I2PException; * */ public class I2PServerSocketFull implements I2PServerSocket { - private I2PSocketManagerFull _socketManager; - /** - * - * @param mgr - */ public I2PServerSocketFull(I2PSocketManagerFull mgr) { _socketManager = mgr; } - /** - * - * @return - * @throws net.i2p.I2PException - * @throws SocketTimeoutException - */ - public I2PSocket accept() throws I2PException, SocketTimeoutException { + public I2PSocket accept() throws I2PException { return _socketManager.receiveSocket(); } - public long getSoTimeout() { - return _socketManager.getConnectionManager().MgetSoTimeout(); - } + public void close() { _socketManager.getConnectionManager().setAllowIncomingConnections(false); } - public void setSoTimeout(long x) { - _socketManager.getConnectionManager().MsetSoTimeout(x); - } - /** - * Close the connection. - */ - public void close() { - _socketManager.getConnectionManager().setAllowIncomingConnections(false); - } - - /** - * - * @return _socketManager - */ - public I2PSocketManager getManager() { - return _socketManager; - } + public I2PSocketManager getManager() { return _socketManager; } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java index 2ea85270f..61dd48757 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java @@ -11,7 +11,6 @@ import net.i2p.data.Destination; * */ public class I2PSocketFull implements I2PSocket { - private Connection _connection; private I2PSocket.SocketErrorListener _listener; private Destination _remotePeer; @@ -25,12 +24,9 @@ public class I2PSocketFull implements I2PSocket { } } - public void close() throws IOException { Connection c = _connection; - if(c == null) { - return; - } + if (c == null) return; if (c.getIsConnected()) { OutputStream out = c.getOutputStream(); if (out != null) { @@ -48,71 +44,58 @@ public class I2PSocketFull implements I2PSocket { destroy(); } - Connection getConnection() { - return _connection; - } + Connection getConnection() { return _connection; } public InputStream getInputStream() { Connection c = _connection; - if(c != null) { + if (c != null) return c.getInputStream(); - } else { + else return null; } - } public I2PSocketOptions getOptions() { Connection c = _connection; - if(c != null) { + if (c != null) return c.getOptions(); - } else { + else return null; } - } public OutputStream getOutputStream() throws IOException { Connection c = _connection; - if(c != null) { + if (c != null) return c.getOutputStream(); - } else { + else return null; } - } - public Destination getPeerDestination() { - return _remotePeer; - } + public Destination getPeerDestination() { return _remotePeer; } public long getReadTimeout() { I2PSocketOptions opts = getOptions(); - if(opts != null) { + if (opts != null) return opts.getReadTimeout(); - } else { + else return -1; } - } - public Destination getThisDestination() { - return _localPeer; - } + public Destination getThisDestination() { return _localPeer; } public void setOptions(I2PSocketOptions options) { Connection c = _connection; - if(c == null) { - return; - } - if(options instanceof ConnectionOptions) { + if (c == null) return; + + if (options instanceof ConnectionOptions) c.setOptions((ConnectionOptions)options); - } else { + else c.setOptions(new ConnectionOptions(options)); } - } public void setReadTimeout(long ms) { Connection c = _connection; - if(c == null) { - return; - } + if (c == null) return; + c.getInputStream().setReadTimeout((int)ms); c.getOptions().setReadTimeout(ms); } @@ -133,17 +116,14 @@ public class I2PSocketFull implements I2PSocket { Connection c = _connection; _connection = null; _listener = null; - if(c != null) { + if (c != null) c.disconnectComplete(); } - } - public String toString() { Connection c = _connection; - if(c == null) { + if (c == null) return super.toString(); - } else { + else return c.toString(); } } -} diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java index 5e79db0ed..7384a4972 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java @@ -1,7 +1,6 @@ package net.i2p.client.streaming; import java.net.NoRouteToHostException; -import java.net.SocketTimeoutException; import java.util.HashSet; import java.util.Iterator; import java.util.Properties; @@ -14,6 +13,7 @@ import net.i2p.client.I2PSessionException; import net.i2p.data.Destination; import net.i2p.util.Log; + /** * Centralize the coordination and multiplexing of the local client's streaming. * There should be one I2PSocketManager for each I2PSession, and if an application @@ -23,7 +23,6 @@ import net.i2p.util.Log; * */ public class I2PSocketManagerFull implements I2PSocketManager { - private I2PAppContext _context; private Log _log; private I2PSession _session; @@ -34,41 +33,27 @@ public class I2PSocketManagerFull implements I2PSocketManager { private int _maxStreams; private static int __managerId = 0; private ConnectionManager _connectionManager; + /** * How long to wait for the client app to accept() before sending back CLOSE? * This includes the time waiting in the queue. Currently set to 5 seconds. */ private static final long ACCEPT_TIMEOUT_DEFAULT = 5*1000; - /** - * - */ public I2PSocketManagerFull() { _context = null; _session = null; } - - /** - * - * @param context - * @param session - * @param opts - * @param name - */ public I2PSocketManagerFull(I2PAppContext context, I2PSession session, Properties opts, String name) { this(); init(context, session, opts, name); } + /** how many streams will we allow at once? */ public static final String PROP_MAX_STREAMS = "i2p.streaming.maxConcurrentStreams"; /** * - * - * @param context - * @param session - * @param opts - * @param name */ public void init(I2PAppContext context, I2PSession session, Properties opts, String name) { _context = context; @@ -80,9 +65,8 @@ public class I2PSocketManagerFull implements I2PSocketManager { String num = (opts != null ? opts.getProperty(PROP_MAX_STREAMS, "-1") : "-1"); _maxStreams = Integer.parseInt(num); } catch (NumberFormatException nfe) { - if(_log.shouldLog(Log.WARN)) { + if (_log.shouldLog(Log.WARN)) _log.warn("Invalid max # of concurrent streams, defaulting to unlimited", nfe); - } _maxStreams = -1; } _name = name + " " + (++__managerId); @@ -92,77 +76,44 @@ public class I2PSocketManagerFull implements I2PSocketManager { _serverSocket = new I2PServerSocketFull(this); if (_log.shouldLog(Log.INFO)) { - _log.info("Socket manager created. \ndefault options: " + _defaultOptions + "\noriginal properties: " + opts); + _log.info("Socket manager created. \ndefault options: " + _defaultOptions + + "\noriginal properties: " + opts); } } - /** - * - * @return - */ - public I2PSocketOptions buildOptions() { - return buildOptions(null); - } - - /** - * - * @param opts - * @return - */ + public I2PSocketOptions buildOptions() { return buildOptions(null); } public I2PSocketOptions buildOptions(Properties opts) { ConnectionOptions curOpts = new ConnectionOptions(_defaultOptions); curOpts.setProperties(opts); return curOpts; } - /** - * - * @return - */ public I2PSession getSession() { return _session; } - /** - * - * @return - */ public ConnectionManager getConnectionManager() { return _connectionManager; } - /** - * - * @return - * @throws net.i2p.I2PException - * @throws java.net.SocketTimeoutException - */ - public I2PSocket receiveSocket() throws I2PException, SocketTimeoutException { + public I2PSocket receiveSocket() throws I2PException { verifySession(); - Connection con = _connectionManager.getConnectionHandler().accept(_connectionManager.MgetSoTimeout()); - if(_log.shouldLog(Log.DEBUG)) { + Connection con = _connectionManager.getConnectionHandler().accept(-1); + if (_log.shouldLog(Log.DEBUG)) _log.debug("receiveSocket() called: " + con); - } if (con != null) { I2PSocketFull sock = new I2PSocketFull(con); con.setSocket(sock); return sock; } else { - if(_connectionManager.MgetSoTimeout() == -1) { return null; } - throw new SocketTimeoutException("I2PSocket timed out"); - } } /** * Ping the specified peer, returning true if they replied to the ping within * the timeout specified, false otherwise. This call blocks. * - * - * @param peer - * @param timeoutMs - * @return */ public boolean ping(Destination peer, long timeoutMs) { return _connectionManager.ping(peer, timeoutMs); @@ -174,47 +125,25 @@ public class I2PSocketManagerFull implements I2PSocketManager { * * @param ms milliseconds to wait, maximum */ - public void setAcceptTimeout(long ms) { - _acceptTimeout = ms; - } + public void setAcceptTimeout(long ms) { _acceptTimeout = ms; } + public long getAcceptTimeout() { return _acceptTimeout; } - /** - * - * @return - */ - public long getAcceptTimeout() { - return _acceptTimeout; - } - - /** - * - * @param options - */ public void setDefaultOptions(I2PSocketOptions options) { _defaultOptions = new ConnectionOptions((ConnectionOptions) options); } - /** - * - * @return - */ public I2PSocketOptions getDefaultOptions() { return _defaultOptions; } - /** - * - * @return - */ public I2PServerSocket getServerSocket() { _connectionManager.setAllowIncomingConnections(true); return _serverSocket; } private void verifySession() throws I2PException { - if(!_connectionManager.getSession().isClosed()) { + if (!_connectionManager.getSession().isClosed()) return; - } _connectionManager.getSession().connect(); } @@ -230,22 +159,20 @@ public class I2PSocketManagerFull implements I2PSocketManager { public I2PSocket connect(Destination peer, I2PSocketOptions options) throws I2PException, NoRouteToHostException { verifySession(); - if(options == null) { + if (options == null) options = _defaultOptions; - } ConnectionOptions opts = null; - if(options instanceof ConnectionOptions) { + if (options instanceof ConnectionOptions) opts = new ConnectionOptions((ConnectionOptions)options); - } else { + else opts = new ConnectionOptions(options); - } - if(_log.shouldLog(Log.INFO)) { - _log.info("Connecting to " + peer.calculateHash().toBase64().substring(0, 6) + " with options: " + opts); - } + + if (_log.shouldLog(Log.INFO)) + _log.info("Connecting to " + peer.calculateHash().toBase64().substring(0,6) + + " with options: " + opts); Connection con = _connectionManager.connect(peer, opts); - if(con == null) { + if (con == null) throw new TooManyStreamsException("Too many streams (max " + _maxStreams + ")"); - } I2PSocketFull socket = new I2PSocketFull(con); con.setSocket(socket); if (con.getConnectionError() != null) { @@ -260,7 +187,6 @@ public class I2PSocketManagerFull implements I2PSocketManager { * * @param peer Destination to connect to * - * @return * @throws NoRouteToHostException if the peer is not found or not reachable * @throws I2PException if there is some other I2P-related problem */ @@ -290,49 +216,25 @@ public class I2PSocketManagerFull implements I2PSocketManager { /** * Retrieve a set of currently connected I2PSockets, either initiated locally or remotely. * - * - * @return */ public Set listSockets() { Set connections = _connectionManager.listConnections(); Set rv = new HashSet(connections.size()); for (Iterator iter = connections.iterator(); iter.hasNext(); ) { Connection con = (Connection)iter.next(); - if(con.getSocket() != null) { + if (con.getSocket() != null) rv.add(con.getSocket()); } - } return rv; } - /** - * - * @return - */ - public String getName() { - return _name; - } + public String getName() { return _name; } + public void setName(String name) { _name = name; } - /** - * - * @param name - */ - public void setName(String name) { - _name = name; - } - /** - * - * @param lsnr - */ public void addDisconnectListener(I2PSocketManager.DisconnectListener lsnr) { _connectionManager.getMessageHandler().addDisconnectListener(lsnr); } - - /** - * - * @param lsnr - */ public void removeDisconnectListener(I2PSocketManager.DisconnectListener lsnr) { _connectionManager.getMessageHandler().removeDisconnectListener(lsnr); } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/RetransmissionTimer.java b/apps/streaming/java/src/net/i2p/client/streaming/RetransmissionTimer.java index c52c373b1..0ea0c83d7 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/RetransmissionTimer.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/RetransmissionTimer.java @@ -5,7 +5,7 @@ import net.i2p.util.SimpleTimer; /** * */ -public class RetransmissionTimer extends SimpleTimer { +class RetransmissionTimer extends SimpleTimer { private static final RetransmissionTimer _instance = new RetransmissionTimer(); public static final SimpleTimer getInstance() { return _instance; } protected RetransmissionTimer() { super("StreamingTimer"); } diff --git a/core/java/src/net/i2p/util/Executor.java b/core/java/src/net/i2p/util/Executor.java index bdb728f97..e3c1b6fbf 100644 --- a/core/java/src/net/i2p/util/Executor.java +++ b/core/java/src/net/i2p/util/Executor.java @@ -5,32 +5,22 @@ import java.util.List; import net.i2p.I2PAppContext; class Executor implements Runnable { - private I2PAppContext _context; private Log _log; private List _readyEvents; - private SimpleStore runn; - - public Executor(I2PAppContext ctx, Log log, List events, SimpleStore x) { + public Executor(I2PAppContext ctx, Log log, List events) { _context = ctx; _readyEvents = events; - runn = x; } - public void run() { - while(runn.getAnswer()) { + while (true) { SimpleTimer.TimedEvent evt = null; synchronized (_readyEvents) { - if(_readyEvents.size() <= 0) { - try { - _readyEvents.wait(); - } catch(InterruptedException ie) { - } - } - if(_readyEvents.size() > 0) { + if (_readyEvents.size() <= 0) + try { _readyEvents.wait(); } catch (InterruptedException ie) {} + if (_readyEvents.size() > 0) evt = (SimpleTimer.TimedEvent)_readyEvents.remove(0); } - } if (evt != null) { long before = _context.clock().now(); @@ -40,24 +30,17 @@ class Executor implements Runnable { log("wtf, event borked: " + evt, t); } long time = _context.clock().now() - before; - if((time > 1000) && (_log != null) && (_log.shouldLog(Log.WARN))) { + if ( (time > 1000) && (_log != null) && (_log.shouldLog(Log.WARN)) ) _log.warn("wtf, event execution took " + time + ": " + evt); } } } - } - /** - * - * @param msg - * @param t - */ private void log(String msg, Throwable t) { synchronized (this) { - if(_log == null) { + if (_log == null) _log = I2PAppContext.getGlobalContext().logManager().getLog(SimpleTimer.class); } - } _log.log(Log.CRIT, msg, t); } } diff --git a/core/java/src/net/i2p/util/SimpleStore.java b/core/java/src/net/i2p/util/SimpleStore.java deleted file mode 100644 index b73a8e7eb..000000000 --- a/core/java/src/net/i2p/util/SimpleStore.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * This is free software, do as you please. - */ - -package net.i2p.util; - -/** - * - * @author sponge - */ -public class SimpleStore { - - private boolean answer; - - SimpleStore(boolean x) { - answer=x; - } - - /** - * set the answer - * - * @param x - */ - public void setAnswer(boolean x) { - answer = x; - } - /** - * - * @return boolean - */ - public boolean getAnswer() { - return answer; - } - -} diff --git a/core/java/src/net/i2p/util/SimpleTimer.java b/core/java/src/net/i2p/util/SimpleTimer.java index e5725a921..9543f72c5 100644 --- a/core/java/src/net/i2p/util/SimpleTimer.java +++ b/core/java/src/net/i2p/util/SimpleTimer.java @@ -16,12 +16,8 @@ import net.i2p.I2PAppContext; * */ public class SimpleTimer { - private static final SimpleTimer _instance = new SimpleTimer(); - - public static SimpleTimer getInstance() { - return _instance; - } + public static SimpleTimer getInstance() { return _instance; } private I2PAppContext _context; private Log _log; /** event time (Long) to event (TimedEvent) mapping */ @@ -29,21 +25,9 @@ public class SimpleTimer { /** event (TimedEvent) to event time (Long) mapping */ private Map _eventTimes; private List _readyEvents; - private SimpleStore runn; - - /** - * - */ - protected SimpleTimer() { - this("SimpleTimer"); - } - /** - * - * @param name - */ + protected SimpleTimer() { this("SimpleTimer"); } protected SimpleTimer(String name) { - runn = new SimpleStore(true); _context = I2PAppContext.getGlobalContext(); _log = _context.logManager().getLog(SimpleTimer.class); _events = new TreeMap(); @@ -54,28 +38,13 @@ public class SimpleTimer { runner.setDaemon(true); runner.start(); for (int i = 0; i < 3; i++) { - I2PThread executor = new I2PThread(new Executor(_context, _log, _readyEvents, runn)); + I2PThread executor = new I2PThread(new Executor(_context, _log, _readyEvents)); executor.setName(name + "Executor " + i); executor.setDaemon(true); executor.start(); } } - /** - * Removes the SimpleTimer. - */ - public void removeSimpleTimer() { - synchronized(_events) { - runn.setAnswer(false); - _events.notifyAll(); - } - } - - /** - * - * @param event - * @param timeoutMs - */ public void reschedule(TimedEvent event, long timeoutMs) { addEvent(event, timeoutMs, false); } @@ -86,16 +55,9 @@ public class SimpleTimer { * for the earlier of the two timeouts, which may be before this stated * timeout. If this is not the desired behavior, call removeEvent first. * - * @param event - * @param timeoutMs */ - public void addEvent(TimedEvent event, long timeoutMs) { - addEvent(event, timeoutMs, true); - } - + public void addEvent(TimedEvent event, long timeoutMs) { addEvent(event, timeoutMs, true); } /** - * @param event - * @param timeoutMs * @param useEarliestTime if its already scheduled, use the earlier of the * two timeouts, else use the later */ @@ -124,9 +86,8 @@ public class SimpleTimer { } } } - while(_events.containsKey(time)) { + while (_events.containsKey(time)) time = new Long(time.longValue() + 1); - } _events.put(time, event); _eventTimes.put(event, time); @@ -146,33 +107,24 @@ public class SimpleTimer { _events.notifyAll(); } if (time.longValue() > eventTime + 100) { - if(_log.shouldLog(Log.WARN)) { - _log.warn("Lots of timer congestion, had to push " + event + " back " + (time.longValue() - eventTime) + "ms (# events: " + totalEvents + ")"); - } + if (_log.shouldLog(Log.WARN)) + _log.warn("Lots of timer congestion, had to push " + event + " back " + + (time.longValue()-eventTime) + "ms (# events: " + totalEvents + ")"); } long timeToAdd = System.currentTimeMillis() - now; if (timeToAdd > 50) { - if(_log.shouldLog(Log.WARN)) { + if (_log.shouldLog(Log.WARN)) _log.warn("timer contention: took " + timeToAdd + "ms to add a job with " + totalEvents + " queued"); } - } } - /** - * - * @param evt - * @return - */ public boolean removeEvent(TimedEvent evt) { - if(evt == null) { - return false; - } + if (evt == null) return false; synchronized (_events) { Long when = (Long)_eventTimes.remove(evt); - if(when != null) { + if (when != null) _events.remove(when); - } return null != when; } } @@ -181,7 +133,6 @@ public class SimpleTimer { * Simple interface for events to be queued up and notified on expiration */ public interface TimedEvent { - /** * the time requested has been reached (this call should NOT block, * otherwise the whole SimpleTimer gets backed up) @@ -189,15 +140,15 @@ public class SimpleTimer { */ public void timeReached(); } + private long _occurredTime; private long _occurredEventCount; - // not used - // private TimedEvent _recentEvents[] = new TimedEvent[5]; + private TimedEvent _recentEvents[] = new TimedEvent[5]; + private class SimpleTimerRunner implements Runnable { - public void run() { List eventsToFire = new ArrayList(1); - while(runn.getAnswer()) { + while (true) { try { synchronized (_events) { //if (_events.size() <= 0) @@ -207,10 +158,8 @@ public class SimpleTimer { long now = System.currentTimeMillis(); long nextEventDelay = -1; Object nextEvent = null; - while(runn.getAnswer()) { - if(_events.size() <= 0) { - break; - } + while (true) { + if (_events.size() <= 0) break; Long when = (Long)_events.firstKey(); if (when.longValue() <= now) { TimedEvent evt = (TimedEvent)_events.remove(when); @@ -226,15 +175,16 @@ public class SimpleTimer { } if (eventsToFire.size() <= 0) { if (nextEventDelay != -1) { - if(_log.shouldLog(Log.DEBUG)) { + if (_log.shouldLog(Log.DEBUG)) _log.debug("Next event in " + nextEventDelay + ": " + nextEvent); - } _events.wait(nextEventDelay); } else { _events.wait(); } } } + } catch (ThreadDeath td) { + return; // die } catch (InterruptedException ie) { // ignore } catch (Throwable t) { @@ -250,9 +200,8 @@ public class SimpleTimer { now = now - (now % 1000); synchronized (_readyEvents) { - for(int i = 0; i < eventsToFire.size(); i++) { + for (int i = 0; i < eventsToFire.size(); i++) _readyEvents.add(eventsToFire.get(i)); - } _readyEvents.notifyAll(); }