forked from I2P_Developers/i2p.i2p
* Streaming: Track recently closed connections (ticket #1161)
Consolidate and synchronize code for selecting a random stream ID
This commit is contained in:
@@ -76,6 +76,11 @@ class ConnectionHandler {
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (_manager.wasRecentlyClosed(packet.getSendStreamId())) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Dropping packet for recently closed stream: " + packet);
|
||||
return;
|
||||
}
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Receive new SYN: " + packet + ": timeout in " + _acceptTimeout);
|
||||
// also check if expiration of the head is long past for overload detection with peek() ?
|
||||
|
@@ -16,6 +16,7 @@ import net.i2p.data.Hash;
|
||||
import net.i2p.data.SessionKey;
|
||||
import net.i2p.util.ConcurrentHashSet;
|
||||
import net.i2p.util.ConvertToHash;
|
||||
import net.i2p.util.LHMCache;
|
||||
import net.i2p.util.Log;
|
||||
import net.i2p.util.SimpleTimer2;
|
||||
|
||||
@@ -38,7 +39,7 @@ class ConnectionManager {
|
||||
/** Inbound stream ID (Long) to Connection map */
|
||||
private final ConcurrentHashMap<Long, Connection> _connectionByInboundId;
|
||||
/** Ping ID (Long) to PingRequest */
|
||||
private final Map<Long, PingRequest> _pendingPings;
|
||||
private final ConcurrentHashMap<Long, PingRequest> _pendingPings;
|
||||
private volatile boolean _throttlersInitialized;
|
||||
private final ConnectionOptions _defaultOptions;
|
||||
private final AtomicInteger _numWaiting = new AtomicInteger();
|
||||
@@ -48,6 +49,9 @@ class ConnectionManager {
|
||||
private volatile ConnThrottler _dayThrottler;
|
||||
/** since 0.9, each manager instantiates its own timer */
|
||||
private final SimpleTimer2 _timer;
|
||||
private final Map<Long, Object> _recentlyClosed;
|
||||
private static final Object DUMMY = new Object();
|
||||
|
||||
/** cache of the property to detect changes */
|
||||
private static volatile String _currentBlacklist = "";
|
||||
private static final Set<Hash> _globalBlacklist = new ConcurrentHashSet<Hash>();
|
||||
@@ -82,6 +86,7 @@ class ConnectionManager {
|
||||
int protocol = defaultOptions.getEnforceProtocol() ? I2PSession.PROTO_STREAMING : I2PSession.PROTO_ANY;
|
||||
_session.addMuxedSessionListener(_messageHandler, protocol, defaultOptions.getLocalPort());
|
||||
_outboundQueue = new PacketQueue(_context, _session, this);
|
||||
_recentlyClosed = new LHMCache<Long, Object>(32);
|
||||
/** Socket timeout for accept() */
|
||||
_soTimeout = -1;
|
||||
|
||||
@@ -109,6 +114,7 @@ class ConnectionManager {
|
||||
Connection getConnectionByInboundId(long id) {
|
||||
return _connectionByInboundId.get(Long.valueOf(id));
|
||||
}
|
||||
|
||||
/**
|
||||
* not guaranteed to be unique, but in case we receive more than one packet
|
||||
* on an inbound connection that we havent ack'ed yet...
|
||||
@@ -121,6 +127,18 @@ class ConnectionManager {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Was this conn recently closed?
|
||||
* @since 0.9.12
|
||||
*/
|
||||
public boolean wasRecentlyClosed(long inboundID) {
|
||||
synchronized(_recentlyClosed) {
|
||||
// use get() instead of containsKey() to update LRU access order,
|
||||
// as we may get additional packets with the same ID
|
||||
return _recentlyClosed.get(Long.valueOf(inboundID)) != null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the socket accept() timeout.
|
||||
* @param x
|
||||
@@ -195,7 +213,6 @@ class ConnectionManager {
|
||||
opts.setLocalPort(synPacket.getLocalPort());
|
||||
Connection con = new Connection(_context, this, _schedulerChooser, _timer, _outboundQueue, _conPacketHandler, opts, true);
|
||||
_tcbShare.updateOptsFromShare(con);
|
||||
long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
|
||||
boolean reject = false;
|
||||
int active = 0;
|
||||
int total = 0;
|
||||
@@ -220,15 +237,7 @@ class ConnectionManager {
|
||||
(synPacket.getOptionalFrom() == null ? "" : ": " + synPacket.getOptionalFrom().calculateHash().toBase64()));
|
||||
reject = true;
|
||||
} else {
|
||||
while (true) {
|
||||
Connection oldCon = _connectionByInboundId.putIfAbsent(Long.valueOf(receiveId), con);
|
||||
if (oldCon == null) {
|
||||
break;
|
||||
} else {
|
||||
// receiveId already taken, try another
|
||||
receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
|
||||
}
|
||||
}
|
||||
assignReceiveStreamId(con);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -269,7 +278,6 @@ class ConnectionManager {
|
||||
return null;
|
||||
}
|
||||
|
||||
con.setReceiveStreamId(receiveId);
|
||||
// finally, we know enough that we can log the packet with the conn filled in
|
||||
if (I2PSocketManagerFull.pcapWriter != null &&
|
||||
_context.getBooleanProperty(I2PSocketManagerFull.PROP_PCAP))
|
||||
@@ -278,7 +286,7 @@ class ConnectionManager {
|
||||
// This validates the packet, and sets the con's SendStreamID and RemotePeer
|
||||
con.getPacketHandler().receivePacket(synPacket, con);
|
||||
} catch (I2PException ie) {
|
||||
_connectionByInboundId.remove(Long.valueOf(receiveId));
|
||||
_connectionByInboundId.remove(Long.valueOf(con.getReceiveStreamId()));
|
||||
return null;
|
||||
}
|
||||
|
||||
@@ -324,6 +332,46 @@ class ConnectionManager {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Pick a new random stream ID for the con and assign it,
|
||||
* taking care to avoid duplicates, and put it in the connection table.
|
||||
*
|
||||
* @since 0.9.12 consolidated from receiveConnection() and connect()
|
||||
*/
|
||||
private void assignReceiveStreamId(Connection con) {
|
||||
long receiveId;
|
||||
synchronized(_recentlyClosed) {
|
||||
Long rcvID;
|
||||
do {
|
||||
receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
|
||||
rcvID = Long.valueOf(receiveId);
|
||||
} while (_recentlyClosed.containsKey(rcvID) ||
|
||||
_pendingPings.containsKey(rcvID) ||
|
||||
_connectionByInboundId.putIfAbsent(rcvID, con) != null);
|
||||
}
|
||||
con.setReceiveStreamId(receiveId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Pick a new random stream ID for a ping and assign it,
|
||||
* taking care to avoid duplicates, and return it.
|
||||
*
|
||||
* @since 0.9.12
|
||||
*/
|
||||
private long assignPingId(PingRequest req) {
|
||||
long receiveId;
|
||||
synchronized(_recentlyClosed) {
|
||||
Long rcvID;
|
||||
do {
|
||||
receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
|
||||
rcvID = Long.valueOf(receiveId);
|
||||
} while (_recentlyClosed.containsKey(rcvID) ||
|
||||
_connectionByInboundId.containsKey(rcvID) ||
|
||||
_pendingPings.putIfAbsent(rcvID, req) != null);
|
||||
}
|
||||
return receiveId;
|
||||
}
|
||||
|
||||
private static final long DEFAULT_STREAM_DELAY_MAX = 10*1000;
|
||||
|
||||
/**
|
||||
@@ -336,10 +384,12 @@ class ConnectionManager {
|
||||
*/
|
||||
public Connection connect(Destination peer, ConnectionOptions opts) {
|
||||
Connection con = null;
|
||||
long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
|
||||
long expiration = _context.clock().now() + opts.getConnectTimeout();
|
||||
if (opts.getConnectTimeout() <= 0)
|
||||
expiration = _context.clock().now() + DEFAULT_STREAM_DELAY_MAX;
|
||||
long expiration = _context.clock().now();
|
||||
long tmout = opts.getConnectTimeout();
|
||||
if (tmout <= 0)
|
||||
expiration += DEFAULT_STREAM_DELAY_MAX;
|
||||
else
|
||||
expiration += tmout;
|
||||
_numWaiting.incrementAndGet();
|
||||
while (true) {
|
||||
long remaining = expiration - _context.clock().now();
|
||||
@@ -368,18 +418,13 @@ class ConnectionManager {
|
||||
} else {
|
||||
con = new Connection(_context, this, _schedulerChooser, _timer, _outboundQueue, _conPacketHandler, opts, false);
|
||||
con.setRemotePeer(peer);
|
||||
|
||||
while (_connectionByInboundId.containsKey(Long.valueOf(receiveId))) {
|
||||
receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
|
||||
}
|
||||
_connectionByInboundId.put(Long.valueOf(receiveId), con);
|
||||
assignReceiveStreamId(con);
|
||||
break; // stop looping as a psuedo-wait
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// ok we're in...
|
||||
con.setReceiveStreamId(receiveId);
|
||||
con.eventOccurred();
|
||||
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
@@ -554,6 +599,10 @@ class ConnectionManager {
|
||||
con.disconnect(false, false);
|
||||
iter.remove();
|
||||
}
|
||||
synchronized(_recentlyClosed) {
|
||||
_recentlyClosed.clear();
|
||||
}
|
||||
_pendingPings.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -577,6 +626,11 @@ class ConnectionManager {
|
||||
*/
|
||||
public void removeConnection(Connection con) {
|
||||
|
||||
Long rcvID = Long.valueOf(con.getReceiveStreamId());
|
||||
synchronized(_recentlyClosed) {
|
||||
_recentlyClosed.put(rcvID, DUMMY);
|
||||
}
|
||||
|
||||
Object o = _connectionByInboundId.remove(Long.valueOf(con.getReceiveStreamId()));
|
||||
boolean removed = (o == con);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
@@ -650,9 +704,10 @@ class ConnectionManager {
|
||||
*/
|
||||
public boolean ping(Destination peer, int fromPort, int toPort, long timeoutMs,
|
||||
boolean blocking, PingNotifier notifier) {
|
||||
Long id = Long.valueOf(_context.random().nextLong(Packet.MAX_STREAM_ID-1)+1);
|
||||
PingRequest req = new PingRequest(notifier);
|
||||
long id = assignPingId(req);
|
||||
PacketLocal packet = new PacketLocal(_context, peer);
|
||||
packet.setSendStreamId(id.longValue());
|
||||
packet.setSendStreamId(id);
|
||||
packet.setFlag(Packet.FLAG_ECHO |
|
||||
Packet.FLAG_NO_ACK |
|
||||
Packet.FLAG_SIGNATURE_INCLUDED);
|
||||
@@ -669,10 +724,6 @@ class ConnectionManager {
|
||||
}
|
||||
|
||||
|
||||
PingRequest req = new PingRequest(notifier);
|
||||
|
||||
_pendingPings.put(id, req);
|
||||
|
||||
_outboundQueue.enqueue(packet);
|
||||
packet.releasePayload();
|
||||
|
||||
|
@@ -207,7 +207,7 @@ class PacketHandler {
|
||||
if (!con.getResetSent()) {
|
||||
// someone is sending us a packet on the wrong stream
|
||||
// It isn't a SYN so it isn't likely to have a FROM to send a reset back to
|
||||
if (_log.shouldLog(Log.ERROR)) {
|
||||
if (_log.shouldLog(Log.WARN)) {
|
||||
StringBuilder buf = new StringBuilder(512);
|
||||
buf.append("Received a packet on the wrong stream: ");
|
||||
buf.append(packet);
|
||||
@@ -217,7 +217,7 @@ class PacketHandler {
|
||||
for (Connection cur : _manager.listConnections()) {
|
||||
buf.append('\n').append(cur);
|
||||
}
|
||||
_log.error(buf.toString(), new Exception("Wrong stream"));
|
||||
_log.warn(buf.toString(), new Exception("Wrong stream"));
|
||||
}
|
||||
}
|
||||
packet.releasePayload();
|
||||
@@ -288,8 +288,11 @@ class PacketHandler {
|
||||
}
|
||||
} else {
|
||||
// if it has a send ID, it's almost certainly for a recently removed connection.
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Dropping pkt w/ send ID but no con found, recently disconnected? " + packet);
|
||||
if (_log.shouldLog(Log.WARN)) {
|
||||
boolean recent = _manager.wasRecentlyClosed(packet.getSendStreamId());
|
||||
_log.warn("Dropping pkt w/ send ID but no con found, recently disconnected? " +
|
||||
recent + ' ' + packet);
|
||||
}
|
||||
// don't bother sending reset
|
||||
packet.releasePayload();
|
||||
return;
|
||||
|
@@ -1,3 +1,8 @@
|
||||
2014-03-12 zzz
|
||||
* Console: Handle ISO-639-2 language codes (ticket #1229)
|
||||
* Streaming: Track recently closed connections (ticket #1161)
|
||||
* Wrapper: Fix failed restarts on ARM (ticket #1230)
|
||||
|
||||
2014-03-08 zzz
|
||||
* PeerManager: Restore profileOrganizer.sameCountryBonus advanced config,
|
||||
inadvertently removed in 0.9.10
|
||||
|
@@ -18,7 +18,7 @@ public class RouterVersion {
|
||||
/** deprecated */
|
||||
public final static String ID = "Monotone";
|
||||
public final static String VERSION = CoreVersion.VERSION;
|
||||
public final static long BUILD = 16;
|
||||
public final static long BUILD = 17;
|
||||
|
||||
/** for example "-test" */
|
||||
public final static String EXTRA = "";
|
||||
|
Reference in New Issue
Block a user