From 4dc90ef5da9119d06e2099ea68549f1e1fee0b4e Mon Sep 17 00:00:00 2001 From: zzz Date: Tue, 2 Oct 2012 18:36:06 +0000 Subject: [PATCH] * SSU: - Fix memory leak in _peersByRemoteHost map caused by not removing peers that change IP or port - Send keepalives if firewalled - Handle peers that change ports on an established session - Synchronize adds and drops - Don't use peers with high RTTs in clock skew calculation - Reduce initial RTT/RTO --- history.txt | 18 ++ .../src/net/i2p/router/RouterVersion.java | 2 +- .../router/transport/udp/PacketHandler.java | 76 +++++++- .../i2p/router/transport/udp/PeerState.java | 32 +++- .../router/transport/udp/UDPTransport.java | 181 +++++++++++++----- 5 files changed, 253 insertions(+), 56 deletions(-) diff --git a/history.txt b/history.txt index c52500c4f..80550cb8a 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,21 @@ +2012-10-02 zzz + * I2CP: Delay after sending disconnect message to + help it get through + * i2psnark: Fix delete download message + * i2ptunnel: Fix log message + * NTCP: Only set keepalive if firewalled + * OOMListener: Dump threads on OOM + * PRNG, LogWriter: Use I2PThread to catch OOM + * SimpleByteCache: Fix ABQ/LBQ selection + * SSU: + - Fix memory leak in _peersByRemoteHost map caused by not + removing peers that change IP or port + - Send keepalives if firewalled + - Handle peers that change ports on an established session + - Synchronize adds and drops + - Don't use peers with high RTTs in clock skew calculation + - Reduce initial RTT/RTO + 2012-09-28 zzz * i2psnark: - Fix bugs in rarest-first tracking diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index f6e6df3a5..4e28a2c8d 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 4; + public final static long BUILD = 5; /** for example "-test" */ public final static String EXTRA = ""; diff --git a/router/java/src/net/i2p/router/transport/udp/PacketHandler.java b/router/java/src/net/i2p/router/transport/udp/PacketHandler.java index ff707407b..96ceec9c0 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketHandler.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketHandler.java @@ -1,11 +1,14 @@ package net.i2p.router.transport.udp; import java.util.Date; +import java.util.List; +import java.util.Map; import net.i2p.router.Router; import net.i2p.router.RouterContext; import net.i2p.data.DataHelper; import net.i2p.util.I2PThread; +import net.i2p.util.LHMCache; import net.i2p.util.Log; /** @@ -30,6 +33,8 @@ class PacketHandler { private final IntroductionManager _introManager; private volatile boolean _keepReading; private final Handler[] _handlers; + private final Map _failCache; + private static final Object DUMMY = new Object(); private static final int MIN_NUM_HANDLERS = 1; // unless < 32MB private static final int MAX_NUM_HANDLERS = 1; @@ -46,6 +51,7 @@ class PacketHandler { _inbound = inbound; _testManager = testManager; _introManager = introManager; + _failCache = new LHMCache(24); long maxMemory = Runtime.getRuntime().maxMemory(); if (maxMemory == Long.MAX_VALUE) @@ -143,8 +149,8 @@ class PacketHandler { if (packet == null) break; // keepReading is probably false, or bind failed... packet.received(); - if (_log.shouldLog(Log.INFO)) - _log.info("Received: " + packet); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Received: " + packet); _state = 4; long queueTime = packet.getLifetime(); long handleStart = _context.clock().now(); @@ -294,7 +300,7 @@ class PacketHandler { receivePacket(reader, packet, est, false); } else { if (_log.shouldLog(Log.WARN)) - _log.warn("Validation with existing con failed, and validation as reestablish failed too. DROP"); + _log.warn("Validation with existing con failed, and validation as reestablish failed too. DROP " + packet); _context.statManager().addRateData("udp.droppedInvalidReestablish", packet.getLifetime(), packet.getExpiration()); } return; @@ -327,8 +333,70 @@ class PacketHandler { if (!isValid) { // Note that the vast majority of these are NOT corrupted packets, but // packets for which we don't have the PeerState (i.e. SessionKey) + // Case 1: 48 byte destroy packet, we already closed + // Case 2: 369 byte session created packet, re-tx of one that failed validation + // (peer probably doesn't know his correct external port, esp. on <= 0.9.1 + + // Case 3: + // For peers that change ports, look for an existing session with the same IP + // If we find it, and the packet validates with its mac key, tell the transport + // to change the port, and handle the packet. + // All this since 0.9.3. + RemoteHostId remoteHost = packet.getRemoteHost(); + boolean alreadyFailed; + synchronized(_failCache) { + alreadyFailed = _failCache.get(remoteHost) != null; + } + if (!alreadyFailed) { + // this is slow, that's why we cache it above. + List peers = _transport.getPeerStatesByIP(remoteHost); + if (!peers.isEmpty()) { + StringBuilder buf = new StringBuilder(256); + buf.append("Established peers with this IP: "); + boolean foundSamePort = false; + PeerState state = null; + int newPort = remoteHost.getPort(); + for (PeerState ps : peers) { + boolean valid = false; + long now = _context.clock().now(); + if (_log.shouldLog(Log.WARN)) + buf.append(ps.getRemoteHostId().toString()) + .append(" last sent: ").append(now - ps.getLastSendTime()) + .append(" last rcvd: ").append(now - ps.getLastReceiveTime()); + if (ps.getRemotePort() == newPort) { + foundSamePort = true; + } else if (packet.validate(ps.getCurrentMACKey())) { + packet.decrypt(ps.getCurrentCipherKey()); + reader.initialize(packet); + if (_log.shouldLog(Log.WARN)) + buf.append(" VALID type ").append(reader.readPayloadType()).append("; "); + valid = true; + if (state == null) + state = ps; + } else { + if (_log.shouldLog(Log.WARN)) + buf.append(" INVALID; "); + } + } + if (state != null && !foundSamePort) { + _transport.changePeerPort(state, newPort); + if (_log.shouldLog(Log.WARN)) { + buf.append(" CHANGED PORT TO ").append(newPort).append(" AND HANDLED"); + _log.warn(buf.toString()); + } + handlePacket(reader, packet, state, null, null, true); + return; + } + if (_log.shouldLog(Log.WARN)) + _log.warn(buf.toString()); + } + synchronized(_failCache) { + _failCache.put(remoteHost, DUMMY); + } + } if (_log.shouldLog(Log.WARN)) - _log.warn("Cannot validate rcvd pkt (path): " + packet); + _log.warn("Cannot validate rcvd pkt (path) wasCached? " + alreadyFailed + ": " + packet); + _context.statManager().addRateData("udp.droppedInvalidEstablish", packet.getLifetime(), packet.getExpiration()); switch (peerType) { case INBOUND_FALLBACK: diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index 694a2740a..d61f73d59 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -137,11 +137,11 @@ class PeerState { /** what IP is the peer sending and receiving packets on? */ private final byte[] _remoteIP; /** cached IP address */ - private transient InetAddress _remoteIPAddress; + private volatile InetAddress _remoteIPAddress; /** what port is the peer sending and receiving packets on? */ - private final int _remotePort; + private volatile int _remotePort; /** cached RemoteHostId, used to find the peerState by remote info */ - private final RemoteHostId _remoteHostId; + private volatile RemoteHostId _remoteHostId; /** if we need to contact them, do we need to talk to an introducer? */ //private boolean _remoteRequiresIntroduction; @@ -284,8 +284,10 @@ class PeerState { */ public static final int LARGE_MTU = 1484; + /** 600 */ private static final int MIN_RTO = 100 + ACKSender.ACK_FREQUENCY; - private static final int INIT_RTO = 4*1000; + private static final int INIT_RTO = 3*1000; + public static final int INIT_RTT = INIT_RTO / 2; private static final int MAX_RTO = 15*1000; public PeerState(RouterContext ctx, UDPTransport transport, @@ -313,7 +315,7 @@ class PeerState { //_mtuLastChecked = -1; _lastACKSend = -1; _rto = INIT_RTO; - _rtt = INIT_RTO / 2; + _rtt = INIT_RTT; _rttDeviation = _rtt; _inboundMessages = new HashMap(8); _outboundMessages = new ArrayList(32); @@ -325,6 +327,17 @@ class PeerState { _remoteHostId = new RemoteHostId(remoteIP, remotePort); } + /** + * Caller should sync; UDPTransport must remove and add to peersByRemoteHost map + * @since 0.9.3 + */ + public void changePort(int newPort) { + if (newPort != _remotePort) { + _remoteHostId = new RemoteHostId(_remoteIP, newPort); + _remotePort = newPort; + } + } + /** * The peer are we talking to. This should be set as soon as this * state is created if we are initiating a connection, but if we are @@ -342,9 +355,12 @@ class PeerState { * connection is established. */ public SessionKey getCurrentCipherKey() { return _currentCipherKey; } + /** * The pending AES key for verifying packets if we are rekeying the * connection, or null if we are not in the process of rekeying. + * + * @return null always, rekeying unimplemented */ public SessionKey getNextMACKey() { return _nextMACKey; } @@ -352,6 +368,8 @@ class PeerState { * The pending AES key for encrypting/decrypting packets if we are * rekeying the connection, or null if we are not in the process * of rekeying. + * + * @return null always, rekeying unimplemented */ public SessionKey getNextCipherKey() { return _nextCipherKey; } @@ -1213,6 +1231,7 @@ class PeerState { // return MAX_RTO; } + /** @return non-null */ RemoteHostId getRemoteHostId() { return _remoteHostId; } /** @@ -1875,6 +1894,9 @@ class PeerState { buf.append(" consecFail: ").append(_consecutiveFailedSends); buf.append(" recv OK/Dup: ").append(_packetsReceived).append('/').append(_packetsReceivedDuplicate); buf.append(" send OK/Dup: ").append(_packetsTransmitted).append('/').append(_packetsRetransmitted); + buf.append(" IBM: ").append(_inboundMessages.size()); + buf.append(" OBQ: ").append(_outboundQueue.size()); + buf.append(" OBL: ").append(_outboundMessages.size()); return buf.toString(); } } diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index 3fb2a6b77..d25a12c0c 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -51,6 +51,7 @@ import net.i2p.util.Translate; public class UDPTransport extends TransportImpl implements TimedWeightedPriorityMessageQueue.FailedListener { private final Log _log; private UDPEndpoint _endpoint; + private final Object _addDropLock = new Object(); /** Peer (Hash) to PeerState */ private final Map _peersByIdent; /** RemoteHostId to PeerState */ @@ -404,7 +405,11 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority */ SessionKey getIntroKey() { return _introKey; } - public int getLocalPort() { return _externalListenPort; } + /** @deprecated unused */ + public int getLocalPort() { + return _endpoint != null ? _endpoint.getListenPort() : -1; + } + public InetAddress getLocalAddress() { return _externalListenHost; } public int getExternalPort() { return _externalListenPort; } @@ -720,6 +725,23 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority PeerState getPeerState(RemoteHostId hostInfo) { return _peersByRemoteHost.get(hostInfo); } + + /** + * Get the states for all peers at the given remote host, ignoring port. + * Used for a last-chance search for a peer that changed port, by PacketHandler. + * @since 0.9.3 + */ + List getPeerStatesByIP(RemoteHostId hostInfo) { + List rv = new ArrayList(4); + byte[] ip = hostInfo.getIP(); + if (ip != null) { + for (PeerState ps : _peersByIdent.values()) { + if (DataHelper.eq(ip, ps.getRemoteIP())) + rv.add(ps); + } + } + return rv; + } /** * get the state for the peer with the given ident, or null @@ -729,6 +751,24 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority return _peersByIdent.get(remotePeer); } + /** + * Remove and add to peersByRemoteHost map + * @since 0.9.3 + */ + public void changePeerPort(PeerState peer, int newPort) { + int oldPort; + synchronized (_addDropLock) { + oldPort = peer.getRemotePort(); + if (oldPort != newPort) { + _peersByRemoteHost.remove(peer.getRemoteHostId()); + peer.changePort(newPort); + _peersByRemoteHost.put(peer.getRemoteHostId(), peer); + } + } + if (_log.shouldLog(Log.WARN) && oldPort != newPort) + _log.warn("Changed port from " + oldPort + " to " + newPort + " for " + peer); + } + /** * For IntroductionManager * @return may be null if not started @@ -799,47 +839,69 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority boolean addRemotePeerState(PeerState peer) { if (_log.shouldLog(Log.INFO)) _log.info("Add remote peer state: " + peer); + synchronized(_addDropLock) { + return locked_addRemotePeerState(peer); + } + } + + private boolean locked_addRemotePeerState(PeerState peer) { Hash remotePeer = peer.getRemotePeer(); long oldEstablishedOn = -1; PeerState oldPeer = null; if (remotePeer != null) { - oldPeer = _peersByIdent.put(remotePeer, peer); - if ( (oldPeer != null) && (oldPeer != peer) ) { - // transfer over the old state/inbound message fragments/etc - peer.loadFrom(oldPeer); - oldEstablishedOn = oldPeer.getKeyEstablishedTime(); - } + oldPeer = _peersByIdent.put(remotePeer, peer); + if ( (oldPeer != null) && (oldPeer != peer) ) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Peer already connected (PBID): old=" + oldPeer + " new=" + peer); + // transfer over the old state/inbound message fragments/etc + peer.loadFrom(oldPeer); + oldEstablishedOn = oldPeer.getKeyEstablishedTime(); + } } + RemoteHostId remoteId = peer.getRemoteHostId(); if (oldPeer != null) { oldPeer.dropOutbound(); _introManager.remove(oldPeer); _expireEvent.remove(oldPeer); + RemoteHostId oldID = oldPeer.getRemoteHostId(); + if (!remoteId.equals(oldID)) { + // leak fix, remove old address + if (_log.shouldLog(Log.WARN)) + _log.warn(remotePeer + " changed address FROM " + oldID + " TO " + remoteId); + PeerState oldPeer2 = _peersByRemoteHost.remove(oldID); + // different ones in the two maps? shouldn't happen + if (oldPeer2 != oldPeer) { + oldPeer2.dropOutbound(); + _introManager.remove(oldPeer2); + _expireEvent.remove(oldPeer2); + } + } } - oldPeer = null; - - RemoteHostId remoteId = peer.getRemoteHostId(); - if (remoteId == null) return false; + // Should always be direct... except maybe for hidden mode? // or do we always know the IP by now? if (remoteId.getIP() == null && _log.shouldLog(Log.WARN)) _log.warn("Add indirect: " + peer); - oldPeer = _peersByRemoteHost.put(remoteId, peer); - if ( (oldPeer != null) && (oldPeer != peer) ) { + // don't do this twice + PeerState oldPeer2 = _peersByRemoteHost.put(remoteId, peer); + if (oldPeer2 != null && oldPeer2 != peer && oldPeer2 != oldPeer) { + // this shouldn't happen, should have been removed above + if (_log.shouldLog(Log.WARN)) + _log.warn("Peer already connected (PBRH): old=" + oldPeer2 + " new=" + peer); // transfer over the old state/inbound message fragments/etc peer.loadFrom(oldPeer); oldEstablishedOn = oldPeer.getKeyEstablishedTime(); + oldPeer2.dropOutbound(); + _introManager.remove(oldPeer2); + _expireEvent.remove(oldPeer2); } - - if (oldPeer != null) { - oldPeer.dropOutbound(); - _introManager.remove(oldPeer); - _expireEvent.remove(oldPeer); - } - - if ( (oldPeer != null) && (_log.shouldLog(Log.WARN)) ) - _log.warn("Peer already connected: old=" + oldPeer + " new=" + peer, new Exception("dup")); + + if (_log.shouldLog(Log.WARN) && _peersByIdent.size() != _peersByRemoteHost.size()) + _log.warn("Size Mismatch after add: " + peer + + " byIDsz = " + _peersByIdent.size() + + " byHostsz = " + _peersByRemoteHost.size()); _activeThrottle.unchoke(peer.getRemotePeer()); markReachable(peer.getRemotePeer(), peer.isInbound()); @@ -996,15 +1058,20 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority */ _log.info(buf.toString(), new Exception("Dropped by")); } - + synchronized(_addDropLock) { + locked_dropPeer(peer, shouldShitlist, why); + } + if (needsRebuild()) + rebuildExternalAddress(); + } + + private void locked_dropPeer(PeerState peer, boolean shouldShitlist, String why) { peer.dropOutbound(); peer.expireInboundMessages(); _introManager.remove(peer); _fragments.dropPeer(peer); PeerState altByIdent = null; - PeerState altByHost = null; - if (peer.getRemotePeer() != null) { dropPeerCapacities(peer); @@ -1018,9 +1085,14 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority } RemoteHostId remoteId = peer.getRemoteHostId(); - if (remoteId != null) { - altByHost = _peersByRemoteHost.remove(remoteId); - } + PeerState altByHost = _peersByRemoteHost.remove(remoteId); + + if (altByIdent != altByHost && _log.shouldLog(Log.WARN)) + _log.warn("Mismatch on remove, RHID = " + remoteId + + " byID = " + altByIdent + + " byHost = " + altByHost + + " byIDsz = " + _peersByIdent.size() + + " byHostsz = " + _peersByRemoteHost.size()); // unchoke 'em, but just because we'll never talk again... _activeThrottle.unchoke(peer.getRemotePeer()); @@ -1029,12 +1101,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority // _flooder.removePeer(peer); _expireEvent.remove(peer); - if (needsRebuild()) - rebuildExternalAddress(); - // deal with races to make sure we drop the peers fully - if ( (altByIdent != null) && (peer != altByIdent) ) dropPeer(altByIdent, shouldShitlist, "recurse"); - if ( (altByHost != null) && (peer != altByHost) ) dropPeer(altByHost, shouldShitlist, "recurse"); + if ( (altByIdent != null) && (peer != altByIdent) ) locked_dropPeer(altByIdent, shouldShitlist, "recurse"); + if ( (altByHost != null) && (peer != altByHost) ) locked_dropPeer(altByHost, shouldShitlist, "recurse"); } private boolean needsRebuild() { @@ -1688,7 +1757,6 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority } public boolean allowConnection() { - return _peersByIdent.size() < getMaxConnections(); } @@ -1698,20 +1766,17 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority */ @Override public Vector getClockSkews() { - Vector skews = new Vector(); - Vector peers = new Vector(); - - peers.addAll(_peersByIdent.values()); // If our clock is way off, we may not have many (or any) successful connections, // so try hard in that case to return good data - boolean includeEverybody = _context.router().getUptime() < 10*60*1000 || peers.size() < 10; + boolean includeEverybody = _context.router().getUptime() < 10*60*1000 || _peersByIdent.size() < 10; long now = _context.clock().now(); - for (Iterator iter = peers.iterator(); iter.hasNext(); ) { - PeerState peer = iter.next(); - if ((!includeEverybody) && now - peer.getLastReceiveTime() > 15*60*1000) + for (PeerState peer : _peersByIdent.values()) { + if ((!includeEverybody) && now - peer.getLastReceiveTime() > 5*60*1000) continue; // skip old peers + if (peer.getRTT() > PeerState.INIT_RTT - 250) + continue; // Big RTT makes for a poor calculation skews.addElement(Long.valueOf(peer.getClockSkew() / 1000)); } if (_log.shouldLog(Log.DEBUG)) @@ -2371,6 +2436,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority } private class ExpirePeerEvent extends SimpleTimer2.TimedEvent { + // TODO why have separate Set, just use _peersByIdent.values() private final Set _expirePeers; private final List _expireBuffer; private volatile boolean _alive; @@ -2387,9 +2453,13 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _expireTimeout = Math.min(_expireTimeout + 15*1000, EXPIRE_TIMEOUT); else _expireTimeout = Math.max(_expireTimeout - 45*1000, MIN_EXPIRE_TIMEOUT); - long shortInactivityCutoff = _context.clock().now() - _expireTimeout; - long longInactivityCutoff = _context.clock().now() - EXPIRE_TIMEOUT; - long pingCutoff = _context.clock().now() - (2 * 60*60*1000); + long now = _context.clock().now(); + long shortInactivityCutoff = now - _expireTimeout; + long longInactivityCutoff = now - EXPIRE_TIMEOUT; + long pingCutoff = now - (2 * 60*60*1000); + long pingFirewallCutoff = now - (60 * 1000); + boolean shouldPingFirewall = _reachabilityStatus != CommSystemFacade.STATUS_OK; + boolean pingOneOnly = shouldPingFirewall && _externalListenPort == _endpoint.getListenPort(); _expireBuffer.clear(); for (Iterator iter = _expirePeers.iterator(); iter.hasNext(); ) { @@ -2403,7 +2473,23 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority if ( (peer.getLastReceiveTime() < inactivityCutoff) && (peer.getLastSendTime() < inactivityCutoff) ) { _expireBuffer.add(peer); iter.remove(); - } + } else if (shouldPingFirewall && + peer.getLastSendTime() < pingFirewallCutoff && + peer.getLastReceiveTime() < pingFirewallCutoff) { + // ping if firewall is mapping the port to keep port the same... + // if the port changes we are screwed + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Pinging for firewall: " + peer); + // don't update or idle time won't be right and peer won't get dropped + // TODO if both sides are firewalled should only one ping + // or else session will stay open forever? + //peer.setLastSendTime(now); + send(_destroyBuilder.buildPing(peer)); + // If external port is different, it may be changing the port for every + // session, so ping all of them. Otherwise only one. + if (pingOneOnly) + shouldPingFirewall = false; + } } for (PeerState peer : _expireBuffer) { @@ -2415,12 +2501,15 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority if (_alive) schedule(30*1000); } + public void add(PeerState peer) { _expirePeers.add(peer); } + public void remove(PeerState peer) { _expirePeers.remove(peer); } + public void setIsAlive(boolean isAlive) { _alive = isAlive; if (isAlive) {