diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionHandler.java index 5ac1468ce..8c872a30d 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionHandler.java @@ -76,6 +76,11 @@ class ConnectionHandler { } return; } + if (_manager.wasRecentlyClosed(packet.getSendStreamId())) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Dropping packet for recently closed stream: " + packet); + return; + } if (_log.shouldLog(Log.INFO)) _log.info("Receive new SYN: " + packet + ": timeout in " + _acceptTimeout); // also check if expiration of the head is long past for overload detection with peek() ? diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java index 5924e818c..805234f4a 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java @@ -16,6 +16,7 @@ import net.i2p.data.Hash; import net.i2p.data.SessionKey; import net.i2p.util.ConcurrentHashSet; import net.i2p.util.ConvertToHash; +import net.i2p.util.LHMCache; import net.i2p.util.Log; import net.i2p.util.SimpleTimer2; @@ -38,7 +39,7 @@ class ConnectionManager { /** Inbound stream ID (Long) to Connection map */ private final ConcurrentHashMap _connectionByInboundId; /** Ping ID (Long) to PingRequest */ - private final Map _pendingPings; + private final ConcurrentHashMap _pendingPings; private volatile boolean _throttlersInitialized; private final ConnectionOptions _defaultOptions; private final AtomicInteger _numWaiting = new AtomicInteger(); @@ -48,6 +49,9 @@ class ConnectionManager { private volatile ConnThrottler _dayThrottler; /** since 0.9, each manager instantiates its own timer */ private final SimpleTimer2 _timer; + private final Map _recentlyClosed; + private static final Object DUMMY = new Object(); + /** cache of the property to detect changes */ private static volatile String _currentBlacklist = ""; private static final Set _globalBlacklist = new ConcurrentHashSet(); @@ -82,6 +86,7 @@ class ConnectionManager { int protocol = defaultOptions.getEnforceProtocol() ? I2PSession.PROTO_STREAMING : I2PSession.PROTO_ANY; _session.addMuxedSessionListener(_messageHandler, protocol, defaultOptions.getLocalPort()); _outboundQueue = new PacketQueue(_context, _session, this); + _recentlyClosed = new LHMCache(32); /** Socket timeout for accept() */ _soTimeout = -1; @@ -109,6 +114,7 @@ class ConnectionManager { Connection getConnectionByInboundId(long id) { return _connectionByInboundId.get(Long.valueOf(id)); } + /** * not guaranteed to be unique, but in case we receive more than one packet * on an inbound connection that we havent ack'ed yet... @@ -120,6 +126,18 @@ class ConnectionManager { } return null; } + + /** + * Was this conn recently closed? + * @since 0.9.12 + */ + public boolean wasRecentlyClosed(long inboundID) { + synchronized(_recentlyClosed) { + // use get() instead of containsKey() to update LRU access order, + // as we may get additional packets with the same ID + return _recentlyClosed.get(Long.valueOf(inboundID)) != null; + } + } /** * Set the socket accept() timeout. @@ -195,7 +213,6 @@ class ConnectionManager { opts.setLocalPort(synPacket.getLocalPort()); Connection con = new Connection(_context, this, _schedulerChooser, _timer, _outboundQueue, _conPacketHandler, opts, true); _tcbShare.updateOptsFromShare(con); - long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1; boolean reject = false; int active = 0; int total = 0; @@ -220,15 +237,7 @@ class ConnectionManager { (synPacket.getOptionalFrom() == null ? "" : ": " + synPacket.getOptionalFrom().calculateHash().toBase64())); reject = true; } else { - while (true) { - Connection oldCon = _connectionByInboundId.putIfAbsent(Long.valueOf(receiveId), con); - if (oldCon == null) { - break; - } else { - // receiveId already taken, try another - receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1; - } - } + assignReceiveStreamId(con); } } @@ -269,7 +278,6 @@ class ConnectionManager { return null; } - con.setReceiveStreamId(receiveId); // finally, we know enough that we can log the packet with the conn filled in if (I2PSocketManagerFull.pcapWriter != null && _context.getBooleanProperty(I2PSocketManagerFull.PROP_PCAP)) @@ -278,7 +286,7 @@ class ConnectionManager { // This validates the packet, and sets the con's SendStreamID and RemotePeer con.getPacketHandler().receivePacket(synPacket, con); } catch (I2PException ie) { - _connectionByInboundId.remove(Long.valueOf(receiveId)); + _connectionByInboundId.remove(Long.valueOf(con.getReceiveStreamId())); return null; } @@ -324,6 +332,46 @@ class ConnectionManager { return true; } + /** + * Pick a new random stream ID for the con and assign it, + * taking care to avoid duplicates, and put it in the connection table. + * + * @since 0.9.12 consolidated from receiveConnection() and connect() + */ + private void assignReceiveStreamId(Connection con) { + long receiveId; + synchronized(_recentlyClosed) { + Long rcvID; + do { + receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1; + rcvID = Long.valueOf(receiveId); + } while (_recentlyClosed.containsKey(rcvID) || + _pendingPings.containsKey(rcvID) || + _connectionByInboundId.putIfAbsent(rcvID, con) != null); + } + con.setReceiveStreamId(receiveId); + } + + /** + * Pick a new random stream ID for a ping and assign it, + * taking care to avoid duplicates, and return it. + * + * @since 0.9.12 + */ + private long assignPingId(PingRequest req) { + long receiveId; + synchronized(_recentlyClosed) { + Long rcvID; + do { + receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1; + rcvID = Long.valueOf(receiveId); + } while (_recentlyClosed.containsKey(rcvID) || + _connectionByInboundId.containsKey(rcvID) || + _pendingPings.putIfAbsent(rcvID, req) != null); + } + return receiveId; + } + private static final long DEFAULT_STREAM_DELAY_MAX = 10*1000; /** @@ -336,10 +384,12 @@ class ConnectionManager { */ public Connection connect(Destination peer, ConnectionOptions opts) { Connection con = null; - long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1; - long expiration = _context.clock().now() + opts.getConnectTimeout(); - if (opts.getConnectTimeout() <= 0) - expiration = _context.clock().now() + DEFAULT_STREAM_DELAY_MAX; + long expiration = _context.clock().now(); + long tmout = opts.getConnectTimeout(); + if (tmout <= 0) + expiration += DEFAULT_STREAM_DELAY_MAX; + else + expiration += tmout; _numWaiting.incrementAndGet(); while (true) { long remaining = expiration - _context.clock().now(); @@ -368,18 +418,13 @@ class ConnectionManager { } else { con = new Connection(_context, this, _schedulerChooser, _timer, _outboundQueue, _conPacketHandler, opts, false); con.setRemotePeer(peer); - - while (_connectionByInboundId.containsKey(Long.valueOf(receiveId))) { - receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1; - } - _connectionByInboundId.put(Long.valueOf(receiveId), con); + assignReceiveStreamId(con); break; // stop looping as a psuedo-wait } } // ok we're in... - con.setReceiveStreamId(receiveId); con.eventOccurred(); if (_log.shouldLog(Log.DEBUG)) @@ -554,6 +599,10 @@ class ConnectionManager { con.disconnect(false, false); iter.remove(); } + synchronized(_recentlyClosed) { + _recentlyClosed.clear(); + } + _pendingPings.clear(); } /** @@ -577,6 +626,11 @@ class ConnectionManager { */ public void removeConnection(Connection con) { + Long rcvID = Long.valueOf(con.getReceiveStreamId()); + synchronized(_recentlyClosed) { + _recentlyClosed.put(rcvID, DUMMY); + } + Object o = _connectionByInboundId.remove(Long.valueOf(con.getReceiveStreamId())); boolean removed = (o == con); if (_log.shouldLog(Log.DEBUG)) @@ -650,9 +704,10 @@ class ConnectionManager { */ public boolean ping(Destination peer, int fromPort, int toPort, long timeoutMs, boolean blocking, PingNotifier notifier) { - Long id = Long.valueOf(_context.random().nextLong(Packet.MAX_STREAM_ID-1)+1); + PingRequest req = new PingRequest(notifier); + long id = assignPingId(req); PacketLocal packet = new PacketLocal(_context, peer); - packet.setSendStreamId(id.longValue()); + packet.setSendStreamId(id); packet.setFlag(Packet.FLAG_ECHO | Packet.FLAG_NO_ACK | Packet.FLAG_SIGNATURE_INCLUDED); @@ -669,10 +724,6 @@ class ConnectionManager { } - PingRequest req = new PingRequest(notifier); - - _pendingPings.put(id, req); - _outboundQueue.enqueue(packet); packet.releasePayload(); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketHandler.java index 1deef1f22..b2983114f 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketHandler.java @@ -207,7 +207,7 @@ class PacketHandler { if (!con.getResetSent()) { // someone is sending us a packet on the wrong stream // It isn't a SYN so it isn't likely to have a FROM to send a reset back to - if (_log.shouldLog(Log.ERROR)) { + if (_log.shouldLog(Log.WARN)) { StringBuilder buf = new StringBuilder(512); buf.append("Received a packet on the wrong stream: "); buf.append(packet); @@ -217,7 +217,7 @@ class PacketHandler { for (Connection cur : _manager.listConnections()) { buf.append('\n').append(cur); } - _log.error(buf.toString(), new Exception("Wrong stream")); + _log.warn(buf.toString(), new Exception("Wrong stream")); } } packet.releasePayload(); @@ -288,8 +288,11 @@ class PacketHandler { } } else { // if it has a send ID, it's almost certainly for a recently removed connection. - if (_log.shouldLog(Log.WARN)) - _log.warn("Dropping pkt w/ send ID but no con found, recently disconnected? " + packet); + if (_log.shouldLog(Log.WARN)) { + boolean recent = _manager.wasRecentlyClosed(packet.getSendStreamId()); + _log.warn("Dropping pkt w/ send ID but no con found, recently disconnected? " + + recent + ' ' + packet); + } // don't bother sending reset packet.releasePayload(); return; diff --git a/history.txt b/history.txt index 026654d18..bb2a19538 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,8 @@ +2014-03-12 zzz + * Console: Handle ISO-639-2 language codes (ticket #1229) + * Streaming: Track recently closed connections (ticket #1161) + * Wrapper: Fix failed restarts on ARM (ticket #1230) + 2014-03-08 zzz * PeerManager: Restore profileOrganizer.sameCountryBonus advanced config, inadvertently removed in 0.9.10 diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index e1f87e077..8ab24107c 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 16; + public final static long BUILD = 17; /** for example "-test" */ public final static String EXTRA = "";