Transport: Add failsafe to prevent complete SSU stall waiting

for bandwidth limiter, root cause unknown
This commit is contained in:
zzz
2015-06-24 19:11:05 +00:00
parent 355b2a1528
commit 25268e7cb2
6 changed files with 46 additions and 12 deletions

View File

@@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 10;
public final static long BUILD = 11;
/** for example "-test" */
public final static String EXTRA = "";

View File

@@ -717,8 +717,8 @@ public class FIFOBandwidthLimiter {
_availableInbound.addAndGet(0 - requested);
_totalAllocatedInboundBytes.addAndGet(requested);
}
if (_log.shouldLog(Log.INFO))
_log.info("IB shortcut for " + requested + "B? " + rv);
//if (_log.shouldLog(Log.INFO))
// _log.info("IB shortcut for " + requested + "B? " + rv);
return rv;
}
@@ -739,8 +739,8 @@ public class FIFOBandwidthLimiter {
_availableOutbound.addAndGet(0 - requested);
_totalAllocatedOutboundBytes.addAndGet(requested);
}
if (_log.shouldLog(Log.INFO))
_log.info("OB shortcut for " + requested + "B? " + rv);
//if (_log.shouldLog(Log.INFO))
// _log.info("OB shortcut for " + requested + "B? " + rv);
return rv;
}
@@ -809,7 +809,12 @@ public class FIFOBandwidthLimiter {
public int getTotalRequested() { return _total; }
public int getPendingRequested() { return _total - _allocated; }
public boolean getAborted() { return _aborted; }
public void abort() { _aborted = true; }
public void abort() {
_aborted = true;
// so isComplete() will return true
_allocated = _total;
notifyAllocation();
}
public CompleteListener getCompleteListener() { return _lsnr; }
public void setCompleteListener(CompleteListener lsnr) {
@@ -829,6 +834,10 @@ public class FIFOBandwidthLimiter {
private boolean isComplete() { return _allocated >= _total; }
/**
* May return without allocating.
* Check getPendingRequested() > 0 in a loop.
*/
public void waitForNextAllocation() {
_waited = true;
_allocationsSinceWait = 0;
@@ -838,7 +847,7 @@ public class FIFOBandwidthLimiter {
if (isComplete())
complete = true;
else
wait();
wait(100);
}
} catch (InterruptedException ie) {}
if (complete && _lsnr != null)
@@ -899,7 +908,11 @@ public class FIFOBandwidthLimiter {
public int getTotalRequested();
/** how many bytes were requested and haven't yet been allocated? */
public int getPendingRequested();
/** block until we are allocated some more bytes */
/**
* Block until we are allocated some more bytes.
* May return without allocating.
* Check getPendingRequested() > 0 in a loop.
*/
public void waitForNextAllocation();
/** we no longer want the data requested (the connection closed) */
public void abort();

View File

@@ -52,6 +52,7 @@ class UDPReceiver {
//_context.statManager().createRateStat("udp.droppedInbound", "How many packet are queued up but not yet received when we drop", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.receiveHolePunch", "How often we receive a NAT hole punch", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.ignorePacketFromDroplist", "Packet lifetime for those dropped on the drop list", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.receiveFailsafe", "limiter stuck?", "udp", new long[] { 24*60*60*1000L });
}
/**
@@ -265,8 +266,16 @@ class UDPReceiver {
//_context.bandwidthLimiter().requestInbound(req, size, "UDP receiver");
FIFOBandwidthLimiter.Request req =
_context.bandwidthLimiter().requestInbound(size, "UDP receiver");
while (req.getPendingRequested() > 0)
// failsafe, don't wait forever
int waitCount = 0;
while (req.getPendingRequested() > 0 && waitCount++ < 5) {
req.waitForNextAllocation();
}
if (waitCount >= 5) {
// tell FBL we didn't receive it, but receive it anyway
req.abort();
_context.statManager().addRateData("udp.receiveFailsafe", 1);
}
receive(packet);
//_context.statManager().addRateData("udp.receivePacketSize", size);

View File

@@ -54,6 +54,7 @@ class UDPSender {
//_context.statManager().createRateStat("udp.socketSendTime", "How long the actual socket.send took", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendBWThrottleTime", "How long the send is blocked by the bandwidth throttle", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendACKTime", "How long an ACK packet is blocked for (duration == lifetime)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendFailsafe", "limiter stuck?", "udp", new long[] { 24*60*60*1000L });
// used in RouterWatchdog
_context.statManager().createRequiredRateStat("udp.sendException", "Send fails (Windows exception?)", "udp", new long[] { 60*1000, 10*60*1000 });
@@ -230,8 +231,16 @@ class UDPSender {
//_context.bandwidthLimiter().requestOutbound(req, size, "UDP sender");
FIFOBandwidthLimiter.Request req =
_context.bandwidthLimiter().requestOutbound(size, 0, "UDP sender");
while (req.getPendingRequested() > 0)
// failsafe, don't wait forever
int waitCount = 0;
while (req.getPendingRequested() > 0 && waitCount++ < 5) {
req.waitForNextAllocation();
}
if (waitCount >= 5) {
// tell FBL we didn't send it, but send it anyway
req.abort();
_context.statManager().addRateData("udp.sendFailsafe", 1);
}
}
long afterBW = _context.clock().now();

View File

@@ -2261,7 +2261,6 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
void failed(OutboundMessageState msg, boolean allowPeerFailure) {
if (msg == null) return;
int consecutive = 0;
OutNetMessage m = msg.getMessage();
if ( allowPeerFailure && (msg.getPeer() != null) &&
( (msg.getMaxSends() >= OutboundMessageFragments.MAX_VOLLEYS) ||
@@ -2273,7 +2272,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
// + " lastReceived: " + recvDelay
// + " lastSentFully: " + sendDelay
// + " expired? " + msg.isExpired());
consecutive = msg.getPeer().incrementConsecutiveFailedSends();
int consecutive = msg.getPeer().incrementConsecutiveFailedSends();
if (_log.shouldLog(Log.INFO))
_log.info("Consecutive failure #" + consecutive
+ " on " + msg.toString()