diff --git a/history.txt b/history.txt index 4205f2a26..a0a79f67f 100644 --- a/history.txt +++ b/history.txt @@ -1,5 +1,7 @@ 2015-02-06 zzz * NetDB: Reduce max job lag for floodfill + * NTCP: Block IP for a while when incoming connection is dropped before + receiving a message. Possible workaround for tickets #551, #1075, #1411. * Transport: - Decrease DH refiller initial delay and increase buffer size to reduce chance of running out on high-bandwidth routers diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 43dca9240..9319fc01c 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 21; + public final static long BUILD = 22; /** for example "-test" */ public final static String EXTRA = ""; diff --git a/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java b/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java index 3510ef0d9..fbe8a9763 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java +++ b/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java @@ -1,6 +1,7 @@ package net.i2p.router.transport.ntcp; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.CancelledKeyException; @@ -20,6 +21,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; import net.i2p.I2PAppContext; +import net.i2p.data.ByteArray; import net.i2p.data.router.RouterAddress; import net.i2p.data.router.RouterIdentity; import net.i2p.router.CommSystemFacade; @@ -29,6 +31,7 @@ import net.i2p.util.Addresses; import net.i2p.util.ConcurrentHashSet; import net.i2p.util.I2PThread; import net.i2p.util.Log; +import net.i2p.util.ObjectCounter; import net.i2p.util.SystemVersion; /** @@ -47,6 +50,7 @@ class EventPumper implements Runnable { private final Queue _wantsRegister = new ConcurrentLinkedQueue(); private final Queue _wantsConRegister = new ConcurrentLinkedQueue(); private final NTCPTransport _transport; + private final ObjectCounter _blockedIPs; private long _expireIdleWriteTime; private boolean _useDirect; @@ -75,6 +79,7 @@ class EventPumper implements Runnable { */ private static final long FAILSAFE_ITERATION_FREQ = 2*1000l; private static final long SELECTOR_LOOP_DELAY = 200; + private static final long BLOCKED_IP_FREQ = 3*60*1000; /** tunnel test now disabled, but this should be long enough to allow an active tunnel to get started */ private static final long MIN_EXPIRE_IDLE_TIME = 135*1000l; @@ -100,11 +105,13 @@ class EventPumper implements Runnable { _log = ctx.logManager().getLog(getClass()); _transport = transport; _expireIdleWriteTime = MAX_EXPIRE_IDLE_TIME; + _blockedIPs = new ObjectCounter(); _context.statManager().createRateStat("ntcp.pumperKeySetSize", "", "ntcp", new long[] {10*60*1000} ); //_context.statManager().createRateStat("ntcp.pumperKeysPerLoop", "", "ntcp", new long[] {10*60*1000} ); _context.statManager().createRateStat("ntcp.pumperLoopsPerSecond", "", "ntcp", new long[] {10*60*1000} ); _context.statManager().createRateStat("ntcp.zeroRead", "", "ntcp", new long[] {10*60*1000} ); _context.statManager().createRateStat("ntcp.zeroReadDrop", "", "ntcp", new long[] {10*60*1000} ); + _context.statManager().createRateStat("ntcp.dropInboundNoMessage", "", "ntcp", new long[] {10*60*1000} ); } public synchronized void startPumping() { @@ -165,6 +172,7 @@ class EventPumper implements Runnable { public void run() { int loopCount = 0; long lastFailsafeIteration = System.currentTimeMillis(); + long lastBlockedIPClear = lastFailsafeIteration; while (_alive && _selector.isOpen()) { try { loopCount++; @@ -194,12 +202,13 @@ class EventPumper implements Runnable { continue; } - if (lastFailsafeIteration + FAILSAFE_ITERATION_FREQ < System.currentTimeMillis()) { + long now = System.currentTimeMillis(); + if (lastFailsafeIteration + FAILSAFE_ITERATION_FREQ < now) { // in the *cough* unthinkable possibility that there are bugs in // the code, lets periodically pass over all NTCP connections and // make sure that anything which should be able to write has been // properly marked as such, etc - lastFailsafeIteration = System.currentTimeMillis(); + lastFailsafeIteration = now; try { Set all = _selector.keys(); _context.statManager().addRateData("ntcp.pumperKeySetSize", all.size()); @@ -254,6 +263,8 @@ class EventPumper implements Runnable { // the data queued to be sent has already passed through // the bw limiter and really just wants to get shoved // out the door asap. + if (_log.shouldLog(Log.INFO)) + _log.info("Failsafe write for " + con); key.interestOps(SelectionKey.OP_WRITE | key.interestOps()); failsafeWrites++; } @@ -279,16 +290,20 @@ class EventPumper implements Runnable { } } else { // another 100% CPU workaround - if ((loopCount % 128) == 127) { - if (_log.shouldLog(Log.WARN)) - _log.warn("EventPumper throttle " + loopCount + " loops in " + - (System.currentTimeMillis() - lastFailsafeIteration) + " ms"); + if ((loopCount % 512) == 511) { + if (_log.shouldLog(Log.INFO)) + _log.info("EventPumper throttle " + loopCount + " loops in " + + (now - lastFailsafeIteration) + " ms"); _context.statManager().addRateData("ntcp.failsafeThrottle", 1); try { Thread.sleep(25); } catch (InterruptedException ie) {} } } + if (lastBlockedIPClear + BLOCKED_IP_FREQ < now) { + _blockedIPs.clear(); + lastBlockedIPClear = now; + } // Clear the cache if the user changes the setting, @@ -509,7 +524,8 @@ class EventPumper implements Runnable { return; } - if (_context.blocklist().isBlocklisted(chan.socket().getInetAddress().getAddress())) { + byte[] ip = chan.socket().getInetAddress().getAddress(); + if (_context.blocklist().isBlocklisted(ip)) { if (_log.shouldLog(Log.WARN)) _log.warn("Receive session request from blocklisted IP: " + chan.socket().getInetAddress()); // need to add this stat first @@ -517,6 +533,18 @@ class EventPumper implements Runnable { try { chan.close(); } catch (IOException ioe) { } return; } + + ByteArray ba = new ByteArray(ip); + int count = _blockedIPs.count(ba); + if (count > 0) { + count = _blockedIPs.increment(ba); + if (_log.shouldLog(Log.WARN)) + _log.warn("Blocking accept of IP with count " + count + ": " + Addresses.toString(ip)); + _context.statManager().addRateData("ntcp.dropInboundNoMessage", count); + try { chan.close(); } catch (IOException ioe) { } + return; + } + // BUGFIX for firewalls. --Sponge if (_context.commSystem().getReachabilityStatus() != CommSystemFacade.STATUS_OK) chan.socket().setKeepAlive(true); @@ -573,9 +601,27 @@ class EventPumper implements Runnable { ByteBuffer buf = acquireBuf(); try { int read = con.getChannel().read(buf); - if (read == -1) { - //if (_log.shouldLog(Log.DEBUG)) _log.debug("EOF on " + con); + if (read < 0) { //_context.statManager().addRateData("ntcp.readEOF", 1); + if (con.isInbound() && con.getMessagesReceived() <= 0) { + InetAddress addr = con.getChannel().socket().getInetAddress(); + int count; + if (addr != null) { + byte[] ip = addr.getAddress(); + ByteArray ba = new ByteArray(ip); + count = _blockedIPs.increment(ba); + if (_log.shouldLog(Log.WARN)) + _log.warn("Blocking IP " + Addresses.toString(ip) + " with count " + count + ": " + con); + } else { + count = 1; + if (_log.shouldLog(Log.WARN)) + _log.warn("EOF on inbound before receiving any: " + con); + } + _context.statManager().addRateData("ntcp.dropInboundNoMessage", count); + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("EOF on " + con); + } con.close(); releaseBuf(buf); } else if (read == 0) { @@ -596,7 +642,7 @@ class EventPumper implements Runnable { if (_log.shouldLog(Log.INFO)) _log.info("nothing to read for " + con + ", but stay interested"); } - } else if (read > 0) { + } else { // clear counter for workaround above con.clearZeroRead(); // ZERO COPY. The buffer will be returned in Reader.processRead() @@ -627,8 +673,25 @@ class EventPumper implements Runnable { } catch (IOException ioe) { // common, esp. at outbound connect time releaseBuf(buf); - if (_log.shouldLog(Log.INFO)) - _log.info("error reading on " + con, ioe); + if (con.isInbound() && con.getMessagesReceived() <= 0) { + InetAddress addr = con.getChannel().socket().getInetAddress(); + int count; + if (addr != null) { + byte[] ip = addr.getAddress(); + ByteArray ba = new ByteArray(ip); + count = _blockedIPs.increment(ba); + if (_log.shouldLog(Log.WARN)) + _log.warn("Blocking IP " + Addresses.toString(ip) + " with count " + count + ": " + con); + } else { + count = 1; + if (_log.shouldLog(Log.WARN)) + _log.warn("IOE on inbound before receiving any: " + con); + } + _context.statManager().addRateData("ntcp.dropInboundNoMessage", count); + } else { + if (_log.shouldLog(Log.INFO)) + _log.info("error reading on " + con, ioe); + } if (con.isEstablished()) { _context.statManager().addRateData("ntcp.readError", 1); } else { @@ -739,6 +802,8 @@ class EventPumper implements Runnable { key.interestOps(key.interestOps() | SelectionKey.OP_READ); } catch (CancelledKeyException cke) { // ignore, we remove/etc elsewhere + if (_log.shouldLog(Log.WARN)) + _log.warn("RDE CKE 1", cke); } catch (IllegalArgumentException iae) { // JamVM (Gentoo: jamvm-1.5.4, gnu-classpath-0.98+gmp) // throws @@ -769,6 +834,8 @@ class EventPumper implements Runnable { try { key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); } catch (CancelledKeyException cke) { + if (_log.shouldLog(Log.WARN)) + _log.warn("RDE CKE 2", cke); // ignore } catch (IllegalArgumentException iae) { // see above