From 25268e7cb253150f380098026ddc0122e0585c6d Mon Sep 17 00:00:00 2001 From: zzz Date: Wed, 24 Jun 2015 19:11:05 +0000 Subject: [PATCH] Transport: Add failsafe to prevent complete SSU stall waiting for bandwidth limiter, root cause unknown --- history.txt | 4 +++ .../src/net/i2p/router/RouterVersion.java | 2 +- .../transport/FIFOBandwidthLimiter.java | 27 ++++++++++++++----- .../i2p/router/transport/udp/UDPReceiver.java | 11 +++++++- .../i2p/router/transport/udp/UDPSender.java | 11 +++++++- .../router/transport/udp/UDPTransport.java | 3 +-- 6 files changed, 46 insertions(+), 12 deletions(-) diff --git a/history.txt b/history.txt index 5ecc00a62..4547341e1 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,7 @@ +2015-06-24 zzz + * Transport: Add failsafe to prevent complete SSU stall waiting + for bandwidth limiter + 2015-06-23 zzz * Console: Fix NPE on /configtunnels * GeoIP: Add countries and flags for Asia/Pacific, Bonaire, St. Barts, diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 7ca1a3ec2..5a58b19bc 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 10; + public final static long BUILD = 11; /** for example "-test" */ public final static String EXTRA = ""; diff --git a/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java b/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java index b650e5c03..2f891ec64 100644 --- a/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java +++ b/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java @@ -717,8 +717,8 @@ public class FIFOBandwidthLimiter { _availableInbound.addAndGet(0 - requested); _totalAllocatedInboundBytes.addAndGet(requested); } - if (_log.shouldLog(Log.INFO)) - _log.info("IB shortcut for " + requested + "B? " + rv); + //if (_log.shouldLog(Log.INFO)) + // _log.info("IB shortcut for " + requested + "B? " + rv); return rv; } @@ -739,8 +739,8 @@ public class FIFOBandwidthLimiter { _availableOutbound.addAndGet(0 - requested); _totalAllocatedOutboundBytes.addAndGet(requested); } - if (_log.shouldLog(Log.INFO)) - _log.info("OB shortcut for " + requested + "B? " + rv); + //if (_log.shouldLog(Log.INFO)) + // _log.info("OB shortcut for " + requested + "B? " + rv); return rv; } @@ -809,7 +809,12 @@ public class FIFOBandwidthLimiter { public int getTotalRequested() { return _total; } public int getPendingRequested() { return _total - _allocated; } public boolean getAborted() { return _aborted; } - public void abort() { _aborted = true; } + public void abort() { + _aborted = true; + // so isComplete() will return true + _allocated = _total; + notifyAllocation(); + } public CompleteListener getCompleteListener() { return _lsnr; } public void setCompleteListener(CompleteListener lsnr) { @@ -829,6 +834,10 @@ public class FIFOBandwidthLimiter { private boolean isComplete() { return _allocated >= _total; } + /** + * May return without allocating. + * Check getPendingRequested() > 0 in a loop. + */ public void waitForNextAllocation() { _waited = true; _allocationsSinceWait = 0; @@ -838,7 +847,7 @@ public class FIFOBandwidthLimiter { if (isComplete()) complete = true; else - wait(); + wait(100); } } catch (InterruptedException ie) {} if (complete && _lsnr != null) @@ -899,7 +908,11 @@ public class FIFOBandwidthLimiter { public int getTotalRequested(); /** how many bytes were requested and haven't yet been allocated? */ public int getPendingRequested(); - /** block until we are allocated some more bytes */ + /** + * Block until we are allocated some more bytes. + * May return without allocating. + * Check getPendingRequested() > 0 in a loop. + */ public void waitForNextAllocation(); /** we no longer want the data requested (the connection closed) */ public void abort(); diff --git a/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java b/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java index bc16de99c..cae3030f2 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java @@ -52,6 +52,7 @@ class UDPReceiver { //_context.statManager().createRateStat("udp.droppedInbound", "How many packet are queued up but not yet received when we drop", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.receiveHolePunch", "How often we receive a NAT hole punch", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.ignorePacketFromDroplist", "Packet lifetime for those dropped on the drop list", "udp", UDPTransport.RATES); + _context.statManager().createRateStat("udp.receiveFailsafe", "limiter stuck?", "udp", new long[] { 24*60*60*1000L }); } /** @@ -265,8 +266,16 @@ class UDPReceiver { //_context.bandwidthLimiter().requestInbound(req, size, "UDP receiver"); FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestInbound(size, "UDP receiver"); - while (req.getPendingRequested() > 0) + // failsafe, don't wait forever + int waitCount = 0; + while (req.getPendingRequested() > 0 && waitCount++ < 5) { req.waitForNextAllocation(); + } + if (waitCount >= 5) { + // tell FBL we didn't receive it, but receive it anyway + req.abort(); + _context.statManager().addRateData("udp.receiveFailsafe", 1); + } receive(packet); //_context.statManager().addRateData("udp.receivePacketSize", size); diff --git a/router/java/src/net/i2p/router/transport/udp/UDPSender.java b/router/java/src/net/i2p/router/transport/udp/UDPSender.java index 9c1fdf2ab..7091b4ced 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPSender.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPSender.java @@ -54,6 +54,7 @@ class UDPSender { //_context.statManager().createRateStat("udp.socketSendTime", "How long the actual socket.send took", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.sendBWThrottleTime", "How long the send is blocked by the bandwidth throttle", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.sendACKTime", "How long an ACK packet is blocked for (duration == lifetime)", "udp", UDPTransport.RATES); + _context.statManager().createRateStat("udp.sendFailsafe", "limiter stuck?", "udp", new long[] { 24*60*60*1000L }); // used in RouterWatchdog _context.statManager().createRequiredRateStat("udp.sendException", "Send fails (Windows exception?)", "udp", new long[] { 60*1000, 10*60*1000 }); @@ -230,8 +231,16 @@ class UDPSender { //_context.bandwidthLimiter().requestOutbound(req, size, "UDP sender"); FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestOutbound(size, 0, "UDP sender"); - while (req.getPendingRequested() > 0) + // failsafe, don't wait forever + int waitCount = 0; + while (req.getPendingRequested() > 0 && waitCount++ < 5) { req.waitForNextAllocation(); + } + if (waitCount >= 5) { + // tell FBL we didn't send it, but send it anyway + req.abort(); + _context.statManager().addRateData("udp.sendFailsafe", 1); + } } long afterBW = _context.clock().now(); diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index 7429314f6..c616e5a82 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -2261,7 +2261,6 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority void failed(OutboundMessageState msg, boolean allowPeerFailure) { if (msg == null) return; - int consecutive = 0; OutNetMessage m = msg.getMessage(); if ( allowPeerFailure && (msg.getPeer() != null) && ( (msg.getMaxSends() >= OutboundMessageFragments.MAX_VOLLEYS) || @@ -2273,7 +2272,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority // + " lastReceived: " + recvDelay // + " lastSentFully: " + sendDelay // + " expired? " + msg.isExpired()); - consecutive = msg.getPeer().incrementConsecutiveFailedSends(); + int consecutive = msg.getPeer().incrementConsecutiveFailedSends(); if (_log.shouldLog(Log.INFO)) _log.info("Consecutive failure #" + consecutive + " on " + msg.toString()