NTCP: Block IP for a while when incoming connection is dropped before

receiving a message. Possible workaround for tickets #551, #1075, #1411.
Root cause of problem not yet found.
- Increase threshold for loop throttle, this probably isn't the problem.
- Log tweaks
This commit is contained in:
zzz
2015-02-06 15:09:45 +00:00
parent b0d09d28f4
commit b72271f9a4
3 changed files with 82 additions and 13 deletions

View File

@@ -1,5 +1,7 @@
2015-02-06 zzz 2015-02-06 zzz
* NetDB: Reduce max job lag for floodfill * NetDB: Reduce max job lag for floodfill
* NTCP: Block IP for a while when incoming connection is dropped before
receiving a message. Possible workaround for tickets #551, #1075, #1411.
* Transport: * Transport:
- Decrease DH refiller initial delay and increase buffer size - Decrease DH refiller initial delay and increase buffer size
to reduce chance of running out on high-bandwidth routers to reduce chance of running out on high-bandwidth routers

View File

@@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */ /** deprecated */
public final static String ID = "Monotone"; public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION; public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 21; public final static long BUILD = 22;
/** for example "-test" */ /** for example "-test" */
public final static String EXTRA = ""; public final static String EXTRA = "";

View File

@@ -1,6 +1,7 @@
package net.i2p.router.transport.ntcp; package net.i2p.router.transport.ntcp;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException; import java.nio.channels.CancelledKeyException;
@@ -20,6 +21,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.I2PAppContext; import net.i2p.I2PAppContext;
import net.i2p.data.ByteArray;
import net.i2p.data.router.RouterAddress; import net.i2p.data.router.RouterAddress;
import net.i2p.data.router.RouterIdentity; import net.i2p.data.router.RouterIdentity;
import net.i2p.router.CommSystemFacade; import net.i2p.router.CommSystemFacade;
@@ -29,6 +31,7 @@ import net.i2p.util.Addresses;
import net.i2p.util.ConcurrentHashSet; import net.i2p.util.ConcurrentHashSet;
import net.i2p.util.I2PThread; import net.i2p.util.I2PThread;
import net.i2p.util.Log; import net.i2p.util.Log;
import net.i2p.util.ObjectCounter;
import net.i2p.util.SystemVersion; import net.i2p.util.SystemVersion;
/** /**
@@ -47,6 +50,7 @@ class EventPumper implements Runnable {
private final Queue<ServerSocketChannel> _wantsRegister = new ConcurrentLinkedQueue<ServerSocketChannel>(); private final Queue<ServerSocketChannel> _wantsRegister = new ConcurrentLinkedQueue<ServerSocketChannel>();
private final Queue<NTCPConnection> _wantsConRegister = new ConcurrentLinkedQueue<NTCPConnection>(); private final Queue<NTCPConnection> _wantsConRegister = new ConcurrentLinkedQueue<NTCPConnection>();
private final NTCPTransport _transport; private final NTCPTransport _transport;
private final ObjectCounter<ByteArray> _blockedIPs;
private long _expireIdleWriteTime; private long _expireIdleWriteTime;
private boolean _useDirect; private boolean _useDirect;
@@ -75,6 +79,7 @@ class EventPumper implements Runnable {
*/ */
private static final long FAILSAFE_ITERATION_FREQ = 2*1000l; private static final long FAILSAFE_ITERATION_FREQ = 2*1000l;
private static final long SELECTOR_LOOP_DELAY = 200; private static final long SELECTOR_LOOP_DELAY = 200;
private static final long BLOCKED_IP_FREQ = 3*60*1000;
/** tunnel test now disabled, but this should be long enough to allow an active tunnel to get started */ /** tunnel test now disabled, but this should be long enough to allow an active tunnel to get started */
private static final long MIN_EXPIRE_IDLE_TIME = 135*1000l; private static final long MIN_EXPIRE_IDLE_TIME = 135*1000l;
@@ -100,11 +105,13 @@ class EventPumper implements Runnable {
_log = ctx.logManager().getLog(getClass()); _log = ctx.logManager().getLog(getClass());
_transport = transport; _transport = transport;
_expireIdleWriteTime = MAX_EXPIRE_IDLE_TIME; _expireIdleWriteTime = MAX_EXPIRE_IDLE_TIME;
_blockedIPs = new ObjectCounter<ByteArray>();
_context.statManager().createRateStat("ntcp.pumperKeySetSize", "", "ntcp", new long[] {10*60*1000} ); _context.statManager().createRateStat("ntcp.pumperKeySetSize", "", "ntcp", new long[] {10*60*1000} );
//_context.statManager().createRateStat("ntcp.pumperKeysPerLoop", "", "ntcp", new long[] {10*60*1000} ); //_context.statManager().createRateStat("ntcp.pumperKeysPerLoop", "", "ntcp", new long[] {10*60*1000} );
_context.statManager().createRateStat("ntcp.pumperLoopsPerSecond", "", "ntcp", new long[] {10*60*1000} ); _context.statManager().createRateStat("ntcp.pumperLoopsPerSecond", "", "ntcp", new long[] {10*60*1000} );
_context.statManager().createRateStat("ntcp.zeroRead", "", "ntcp", new long[] {10*60*1000} ); _context.statManager().createRateStat("ntcp.zeroRead", "", "ntcp", new long[] {10*60*1000} );
_context.statManager().createRateStat("ntcp.zeroReadDrop", "", "ntcp", new long[] {10*60*1000} ); _context.statManager().createRateStat("ntcp.zeroReadDrop", "", "ntcp", new long[] {10*60*1000} );
_context.statManager().createRateStat("ntcp.dropInboundNoMessage", "", "ntcp", new long[] {10*60*1000} );
} }
public synchronized void startPumping() { public synchronized void startPumping() {
@@ -165,6 +172,7 @@ class EventPumper implements Runnable {
public void run() { public void run() {
int loopCount = 0; int loopCount = 0;
long lastFailsafeIteration = System.currentTimeMillis(); long lastFailsafeIteration = System.currentTimeMillis();
long lastBlockedIPClear = lastFailsafeIteration;
while (_alive && _selector.isOpen()) { while (_alive && _selector.isOpen()) {
try { try {
loopCount++; loopCount++;
@@ -194,12 +202,13 @@ class EventPumper implements Runnable {
continue; continue;
} }
if (lastFailsafeIteration + FAILSAFE_ITERATION_FREQ < System.currentTimeMillis()) { long now = System.currentTimeMillis();
if (lastFailsafeIteration + FAILSAFE_ITERATION_FREQ < now) {
// in the *cough* unthinkable possibility that there are bugs in // in the *cough* unthinkable possibility that there are bugs in
// the code, lets periodically pass over all NTCP connections and // the code, lets periodically pass over all NTCP connections and
// make sure that anything which should be able to write has been // make sure that anything which should be able to write has been
// properly marked as such, etc // properly marked as such, etc
lastFailsafeIteration = System.currentTimeMillis(); lastFailsafeIteration = now;
try { try {
Set<SelectionKey> all = _selector.keys(); Set<SelectionKey> all = _selector.keys();
_context.statManager().addRateData("ntcp.pumperKeySetSize", all.size()); _context.statManager().addRateData("ntcp.pumperKeySetSize", all.size());
@@ -254,6 +263,8 @@ class EventPumper implements Runnable {
// the data queued to be sent has already passed through // the data queued to be sent has already passed through
// the bw limiter and really just wants to get shoved // the bw limiter and really just wants to get shoved
// out the door asap. // out the door asap.
if (_log.shouldLog(Log.INFO))
_log.info("Failsafe write for " + con);
key.interestOps(SelectionKey.OP_WRITE | key.interestOps()); key.interestOps(SelectionKey.OP_WRITE | key.interestOps());
failsafeWrites++; failsafeWrites++;
} }
@@ -279,16 +290,20 @@ class EventPumper implements Runnable {
} }
} else { } else {
// another 100% CPU workaround // another 100% CPU workaround
if ((loopCount % 128) == 127) { if ((loopCount % 512) == 511) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.INFO))
_log.warn("EventPumper throttle " + loopCount + " loops in " + _log.info("EventPumper throttle " + loopCount + " loops in " +
(System.currentTimeMillis() - lastFailsafeIteration) + " ms"); (now - lastFailsafeIteration) + " ms");
_context.statManager().addRateData("ntcp.failsafeThrottle", 1); _context.statManager().addRateData("ntcp.failsafeThrottle", 1);
try { try {
Thread.sleep(25); Thread.sleep(25);
} catch (InterruptedException ie) {} } catch (InterruptedException ie) {}
} }
} }
if (lastBlockedIPClear + BLOCKED_IP_FREQ < now) {
_blockedIPs.clear();
lastBlockedIPClear = now;
}
// Clear the cache if the user changes the setting, // Clear the cache if the user changes the setting,
@@ -509,7 +524,8 @@ class EventPumper implements Runnable {
return; return;
} }
if (_context.blocklist().isBlocklisted(chan.socket().getInetAddress().getAddress())) { byte[] ip = chan.socket().getInetAddress().getAddress();
if (_context.blocklist().isBlocklisted(ip)) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Receive session request from blocklisted IP: " + chan.socket().getInetAddress()); _log.warn("Receive session request from blocklisted IP: " + chan.socket().getInetAddress());
// need to add this stat first // need to add this stat first
@@ -517,6 +533,18 @@ class EventPumper implements Runnable {
try { chan.close(); } catch (IOException ioe) { } try { chan.close(); } catch (IOException ioe) { }
return; return;
} }
ByteArray ba = new ByteArray(ip);
int count = _blockedIPs.count(ba);
if (count > 0) {
count = _blockedIPs.increment(ba);
if (_log.shouldLog(Log.WARN))
_log.warn("Blocking accept of IP with count " + count + ": " + Addresses.toString(ip));
_context.statManager().addRateData("ntcp.dropInboundNoMessage", count);
try { chan.close(); } catch (IOException ioe) { }
return;
}
// BUGFIX for firewalls. --Sponge // BUGFIX for firewalls. --Sponge
if (_context.commSystem().getReachabilityStatus() != CommSystemFacade.STATUS_OK) if (_context.commSystem().getReachabilityStatus() != CommSystemFacade.STATUS_OK)
chan.socket().setKeepAlive(true); chan.socket().setKeepAlive(true);
@@ -573,9 +601,27 @@ class EventPumper implements Runnable {
ByteBuffer buf = acquireBuf(); ByteBuffer buf = acquireBuf();
try { try {
int read = con.getChannel().read(buf); int read = con.getChannel().read(buf);
if (read == -1) { if (read < 0) {
//if (_log.shouldLog(Log.DEBUG)) _log.debug("EOF on " + con);
//_context.statManager().addRateData("ntcp.readEOF", 1); //_context.statManager().addRateData("ntcp.readEOF", 1);
if (con.isInbound() && con.getMessagesReceived() <= 0) {
InetAddress addr = con.getChannel().socket().getInetAddress();
int count;
if (addr != null) {
byte[] ip = addr.getAddress();
ByteArray ba = new ByteArray(ip);
count = _blockedIPs.increment(ba);
if (_log.shouldLog(Log.WARN))
_log.warn("Blocking IP " + Addresses.toString(ip) + " with count " + count + ": " + con);
} else {
count = 1;
if (_log.shouldLog(Log.WARN))
_log.warn("EOF on inbound before receiving any: " + con);
}
_context.statManager().addRateData("ntcp.dropInboundNoMessage", count);
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("EOF on " + con);
}
con.close(); con.close();
releaseBuf(buf); releaseBuf(buf);
} else if (read == 0) { } else if (read == 0) {
@@ -596,7 +642,7 @@ class EventPumper implements Runnable {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("nothing to read for " + con + ", but stay interested"); _log.info("nothing to read for " + con + ", but stay interested");
} }
} else if (read > 0) { } else {
// clear counter for workaround above // clear counter for workaround above
con.clearZeroRead(); con.clearZeroRead();
// ZERO COPY. The buffer will be returned in Reader.processRead() // ZERO COPY. The buffer will be returned in Reader.processRead()
@@ -627,8 +673,25 @@ class EventPumper implements Runnable {
} catch (IOException ioe) { } catch (IOException ioe) {
// common, esp. at outbound connect time // common, esp. at outbound connect time
releaseBuf(buf); releaseBuf(buf);
if (_log.shouldLog(Log.INFO)) if (con.isInbound() && con.getMessagesReceived() <= 0) {
_log.info("error reading on " + con, ioe); InetAddress addr = con.getChannel().socket().getInetAddress();
int count;
if (addr != null) {
byte[] ip = addr.getAddress();
ByteArray ba = new ByteArray(ip);
count = _blockedIPs.increment(ba);
if (_log.shouldLog(Log.WARN))
_log.warn("Blocking IP " + Addresses.toString(ip) + " with count " + count + ": " + con);
} else {
count = 1;
if (_log.shouldLog(Log.WARN))
_log.warn("IOE on inbound before receiving any: " + con);
}
_context.statManager().addRateData("ntcp.dropInboundNoMessage", count);
} else {
if (_log.shouldLog(Log.INFO))
_log.info("error reading on " + con, ioe);
}
if (con.isEstablished()) { if (con.isEstablished()) {
_context.statManager().addRateData("ntcp.readError", 1); _context.statManager().addRateData("ntcp.readError", 1);
} else { } else {
@@ -739,6 +802,8 @@ class EventPumper implements Runnable {
key.interestOps(key.interestOps() | SelectionKey.OP_READ); key.interestOps(key.interestOps() | SelectionKey.OP_READ);
} catch (CancelledKeyException cke) { } catch (CancelledKeyException cke) {
// ignore, we remove/etc elsewhere // ignore, we remove/etc elsewhere
if (_log.shouldLog(Log.WARN))
_log.warn("RDE CKE 1", cke);
} catch (IllegalArgumentException iae) { } catch (IllegalArgumentException iae) {
// JamVM (Gentoo: jamvm-1.5.4, gnu-classpath-0.98+gmp) // JamVM (Gentoo: jamvm-1.5.4, gnu-classpath-0.98+gmp)
// throws // throws
@@ -769,6 +834,8 @@ class EventPumper implements Runnable {
try { try {
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
} catch (CancelledKeyException cke) { } catch (CancelledKeyException cke) {
if (_log.shouldLog(Log.WARN))
_log.warn("RDE CKE 2", cke);
// ignore // ignore
} catch (IllegalArgumentException iae) { } catch (IllegalArgumentException iae) {
// see above // see above