* FIFOBandwidthRefiller:

- Replace global counters with atomics
      - Use lockless shortcut methods to grant
        requests if we can satisfy immediately
This commit is contained in:
zzz
2010-03-09 17:10:18 +00:00
parent 5b603d6627
commit 78a965dc90
3 changed files with 198 additions and 104 deletions

View File

@@ -4,24 +4,45 @@ import java.io.IOException;
import java.io.Writer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import net.i2p.I2PAppContext;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
/**
* Concurrent plan:
*
* It's difficult to get rid of the locks on _pendingInboundRequests
* since locked_satisyInboundAvailable() leaves Requests on the head
* of the queue.
*
* When we go to Java 6, we can convert from a locked ArrayList to
* a LinkedBlockingDeque, where locked_sIA will poll() from the
* head of the queue, and if the request is not fully satisfied,
* offerFirst() (i.e. push) it back on the head.
*
* Ditto outbound of course.
*
* In the meantime, for Java 5, we have lockless 'shortcut'
* methods for the common case where we are under the bandwidth limits.
* And the volatile counters are now AtomicIntegers / AtomicLongs.
*
*/
public class FIFOBandwidthLimiter {
private Log _log;
private I2PAppContext _context;
private final List<Request> _pendingInboundRequests;
private final List<Request> _pendingOutboundRequests;
/** how many bytes we can consume for inbound transmission immediately */
private volatile int _availableInbound;
private AtomicInteger _availableInbound = new AtomicInteger();
/** how many bytes we can consume for outbound transmission immediately */
private volatile int _availableOutbound;
private AtomicInteger _availableOutbound = new AtomicInteger();
/** how many bytes we can queue up for bursting */
private volatile int _unavailableInboundBurst;
private AtomicInteger _unavailableInboundBurst = new AtomicInteger();
/** how many bytes we can queue up for bursting */
private volatile int _unavailableOutboundBurst;
private AtomicInteger _unavailableOutboundBurst = new AtomicInteger();
/** how large _unavailableInbound can get */
private int _maxInboundBurst;
/** how large _unavailableInbound can get */
@@ -35,13 +56,13 @@ public class FIFOBandwidthLimiter {
/** shortcut of whether our inbound rate is unlimited */
private boolean _inboundUnlimited;
/** lifetime counter of bytes received */
private volatile long _totalAllocatedInboundBytes;
private AtomicLong _totalAllocatedInboundBytes = new AtomicLong();
/** lifetime counter of bytes sent */
private volatile long _totalAllocatedOutboundBytes;
private AtomicLong _totalAllocatedOutboundBytes = new AtomicLong();
/** lifetime counter of tokens available for use but exceeded our maxInboundBurst size */
private volatile long _totalWastedInboundBytes;
private AtomicLong _totalWastedInboundBytes = new AtomicLong();
/** lifetime counter of tokens available for use but exceeded our maxOutboundBurst size */
private volatile long _totalWastedOutboundBytes;
private AtomicLong _totalWastedOutboundBytes = new AtomicLong();
private FIFOBandwidthRefiller _refiller;
private long _lastTotalSent;
@@ -75,8 +96,8 @@ public class FIFOBandwidthLimiter {
}
_pendingInboundRequests = new ArrayList(16);
_pendingOutboundRequests = new ArrayList(16);
_lastTotalSent = _totalAllocatedOutboundBytes;
_lastTotalReceived = _totalAllocatedInboundBytes;
_lastTotalSent = _totalAllocatedOutboundBytes.get();
_lastTotalReceived = _totalAllocatedInboundBytes.get();
_sendBps = 0;
_recvBps = 0;
_lastStatsUpdated = now();
@@ -90,10 +111,10 @@ public class FIFOBandwidthLimiter {
//public long getAvailableInboundBytes() { return _availableInboundBytes; }
//public long getAvailableOutboundBytes() { return _availableOutboundBytes; }
public long getTotalAllocatedInboundBytes() { return _totalAllocatedInboundBytes; }
public long getTotalAllocatedOutboundBytes() { return _totalAllocatedOutboundBytes; }
public long getTotalWastedInboundBytes() { return _totalWastedInboundBytes; }
public long getTotalWastedOutboundBytes() { return _totalWastedOutboundBytes; }
public long getTotalAllocatedInboundBytes() { return _totalAllocatedInboundBytes.get(); }
public long getTotalAllocatedOutboundBytes() { return _totalAllocatedOutboundBytes.get(); }
public long getTotalWastedInboundBytes() { return _totalWastedInboundBytes.get(); }
public long getTotalWastedOutboundBytes() { return _totalWastedOutboundBytes.get(); }
//public long getMaxInboundBytes() { return _maxInboundBytes; }
//public void setMaxInboundBytes(int numBytes) { _maxInboundBytes = numBytes; }
//public long getMaxOutboundBytes() { return _maxOutboundBytes; }
@@ -116,14 +137,14 @@ public class FIFOBandwidthLimiter {
public void reinitialize() {
_pendingInboundRequests.clear();
_pendingOutboundRequests.clear();
_availableInbound = 0;
_availableOutbound = 0;
_availableInbound.set(0);
_availableOutbound.set(0);
_maxInbound = 0;
_maxOutbound = 0;
_maxInboundBurst = 0;
_maxOutboundBurst = 0;
_unavailableInboundBurst = 0;
_unavailableOutboundBurst = 0;
_unavailableInboundBurst.set(0);
_unavailableOutboundBurst.set(0);
_inboundUnlimited = false;
_outboundUnlimited = false;
_refiller.reinitialize();
@@ -132,58 +153,66 @@ public class FIFOBandwidthLimiter {
public Request createRequest() { return new SimpleRequest(); }
/**
* Request some bytes, blocking until they become available
*
* Request some bytes. Does not block.
*/
public Request requestInbound(int bytesIn, String purpose) { return requestInbound(bytesIn, purpose, null, null); }
public Request requestInbound(int bytesIn, String purpose, CompleteListener lsnr, Object attachment) {
if (_inboundUnlimited) {
_totalAllocatedInboundBytes += bytesIn;
public Request requestInbound(int bytesIn, String purpose) {
// try to satisfy without grabbing the global lock
if (shortcutSatisfyInboundRequest(bytesIn))
return _noop;
}
return requestInbound(bytesIn, purpose, null, null);
}
public Request requestInbound(int bytesIn, String purpose, CompleteListener lsnr, Object attachment) {
SimpleRequest req = new SimpleRequest(bytesIn, 0, purpose, lsnr, attachment);
requestInbound(req, bytesIn, purpose);
return req;
}
public void requestInbound(Request req, int bytesIn, String purpose) {
req.init(bytesIn, 0, purpose);
if (false) { ((SimpleRequest)req).allocateAll(); return; }
int pending = 0;
/**
* The transports don't use this any more, so make it private
* and a SimpleRequest instead of a Request
* So there's no more casting
*/
private void requestInbound(SimpleRequest req, int bytesIn, String purpose) {
// don't init twice - uncomment if we make public again?
//req.init(bytesIn, 0, purpose);
int pending;
synchronized (_pendingInboundRequests) {
pending = _pendingInboundRequests.size();
_pendingInboundRequests.add(req);
}
satisfyInboundRequests(((SimpleRequest)req).satisfiedBuffer);
((SimpleRequest)req).satisfiedBuffer.clear();
satisfyInboundRequests(req.satisfiedBuffer);
req.satisfiedBuffer.clear();
if (pending > 0)
_context.statManager().addRateData("bwLimiter.pendingInboundRequests", pending, pending);
}
/**
* Request some bytes, blocking until they become available
*
*/
public Request requestOutbound(int bytesOut, String purpose) { return requestOutbound(bytesOut, purpose, null, null); }
public Request requestOutbound(int bytesOut, String purpose, CompleteListener lsnr, Object attachment) {
if (_outboundUnlimited) {
_totalAllocatedOutboundBytes += bytesOut;
return _noop;
}
/**
* Request some bytes. Does not block.
*/
public Request requestOutbound(int bytesOut, String purpose) {
// try to satisfy without grabbing the global lock
if (shortcutSatisfyOutboundRequest(bytesOut))
return _noop;
return requestOutbound(bytesOut, purpose, null, null);
}
public Request requestOutbound(int bytesOut, String purpose, CompleteListener lsnr, Object attachment) {
SimpleRequest req = new SimpleRequest(0, bytesOut, purpose, lsnr, attachment);
requestOutbound(req, bytesOut, purpose);
return req;
}
public void requestOutbound(Request req, int bytesOut, String purpose) {
req.init(0, bytesOut, purpose);
//if (false) { ((SimpleRequest)req).allocateAll(); return; }
int pending = 0;
private void requestOutbound(SimpleRequest req, int bytesOut, String purpose) {
// don't init twice - uncomment if we make public again?
//req.init(0, bytesOut, purpose);
int pending;
synchronized (_pendingOutboundRequests) {
pending = _pendingOutboundRequests.size();
_pendingOutboundRequests.add(req);
}
satisfyOutboundRequests(((SimpleRequest)req).satisfiedBuffer);
((SimpleRequest)req).satisfiedBuffer.clear();
satisfyOutboundRequests(req.satisfiedBuffer);
req.satisfiedBuffer.clear();
if (pending > 0)
_context.statManager().addRateData("bwLimiter.pendingOutboundRequests", pending, pending);
}
@@ -200,7 +229,7 @@ public class FIFOBandwidthLimiter {
void setOutboundBurstBytes(int bytes) { _maxOutboundBurst = bytes; }
StringBuilder getStatus() {
StringBuilder rv = new StringBuilder(64);
StringBuilder rv = new StringBuilder(128);
rv.append("Available: ").append(_availableInbound).append('/').append(_availableOutbound).append(' ');
rv.append("Max: ").append(_maxInbound).append('/').append(_maxOutbound).append(' ');
rv.append("Burst: ").append(_unavailableInboundBurst).append('/').append(_unavailableOutboundBurst).append(' ');
@@ -218,61 +247,69 @@ public class FIFOBandwidthLimiter {
final void refillBandwidthQueues(List<Request> buf, long bytesInbound, long bytesOutbound, long maxBurstIn, long maxBurstOut) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Refilling the queues with " + bytesInbound + "/" + bytesOutbound + ": " + getStatus().toString());
_availableInbound += bytesInbound;
_availableOutbound += bytesOutbound;
if (_availableInbound > _maxInbound) {
// Take some care throughout to minimize accesses to the atomics,
// both for efficiency and to not let strange things happen if
// it changes out from under us
// This never had locks before concurrent, anyway
int avi = _availableInbound.addAndGet((int) bytesInbound);
if (avi > _maxInbound) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("available inbound (" + _availableInbound + ") exceeds our inbound burst (" + _maxInbound + "), so no supplement");
_unavailableInboundBurst += _availableInbound - _maxInbound;
_availableInbound = _maxInbound;
if (_unavailableInboundBurst > _maxInboundBurst) {
_totalWastedInboundBytes += _unavailableInboundBurst - _maxInboundBurst;
_unavailableInboundBurst = _maxInboundBurst;
_log.debug("available inbound (" + avi + ") exceeds our inbound burst (" + _maxInbound + "), so no supplement");
int uib = _unavailableInboundBurst.addAndGet(avi - _maxInbound);
_availableInbound.set(_maxInbound);
if (uib > _maxInboundBurst) {
_totalWastedInboundBytes.addAndGet(uib - _maxInboundBurst);
_unavailableInboundBurst.set(_maxInboundBurst);
}
} else {
// try to pull in up to 1/10th of the burst rate, since we refill every 100ms
int want = (int)maxBurstIn;
if (want > (_maxInbound - _availableInbound))
want = _maxInbound - _availableInbound;
if (want > (_maxInbound - avi))
want = _maxInbound - avi;
if (_log.shouldLog(Log.DEBUG))
_log.debug("want to pull " + want + " from the inbound burst (" + _unavailableInboundBurst + ") to supplement " + _availableInbound + " (max: " + _maxInbound + ")");
_log.debug("want to pull " + want + " from the inbound burst (" + _unavailableInboundBurst + ") to supplement " + avi + " (max: " + _maxInbound + ")");
if (want > 0) {
if (want <= _unavailableInboundBurst) {
_availableInbound += want;
_unavailableInboundBurst -= want;
int uib = _unavailableInboundBurst.get();
if (want <= uib) {
_availableInbound.addAndGet(want);
_unavailableInboundBurst.addAndGet(0 - want);
} else {
_availableInbound += _unavailableInboundBurst;
_unavailableInboundBurst = 0;
_availableInbound.addAndGet(uib);
_unavailableInboundBurst.set(0);
}
}
}
if (_availableOutbound > _maxOutbound) {
int avo = _availableOutbound.addAndGet((int) bytesOutbound);
if (avo > _maxOutbound) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("available outbound (" + _availableOutbound + ") exceeds our outbound burst (" + _maxOutbound + "), so no supplement");
_unavailableOutboundBurst += _availableOutbound - _maxOutbound;
_availableOutbound = _maxOutbound;
if (_unavailableOutboundBurst > _maxOutboundBurst) {
_totalWastedOutboundBytes += _unavailableOutboundBurst - _maxOutboundBurst;
_unavailableOutboundBurst = _maxOutboundBurst;
_log.debug("available outbound (" + avo + ") exceeds our outbound burst (" + _maxOutbound + "), so no supplement");
int uob = _unavailableOutboundBurst.getAndAdd(avo - _maxOutbound);
_availableOutbound.set(_maxOutbound);
if (uob > _maxOutboundBurst) {
_totalWastedOutboundBytes.getAndAdd(uob - _maxOutboundBurst);
_unavailableOutboundBurst.set(_maxOutboundBurst);
}
} else {
// try to pull in up to 1/10th of the burst rate, since we refill every 100ms
int want = (int)maxBurstOut;
if (want > (_maxOutbound - _availableOutbound))
want = _maxOutbound - _availableOutbound;
if (want > (_maxOutbound - avo))
want = _maxOutbound - avo;
if (_log.shouldLog(Log.DEBUG))
_log.debug("want to pull " + want + " from the outbound burst (" + _unavailableOutboundBurst + ") to supplement " + _availableOutbound + " (max: " + _maxOutbound + ")");
_log.debug("want to pull " + want + " from the outbound burst (" + _unavailableOutboundBurst + ") to supplement " + avo + " (max: " + _maxOutbound + ")");
if (want > 0) {
if (want <= _unavailableOutboundBurst) {
_availableOutbound += want;
_unavailableOutboundBurst -= want;
int uob = _unavailableOutboundBurst.get();
if (want <= uob) {
_availableOutbound.addAndGet(want);
_unavailableOutboundBurst.addAndGet(0 - want);
} else {
_availableOutbound += _unavailableOutboundBurst;
_unavailableOutboundBurst = 0;
_availableOutbound.addAndGet(uob);
_unavailableOutboundBurst.set(0);
}
}
}
@@ -286,8 +323,8 @@ public class FIFOBandwidthLimiter {
long time = now - _lastStatsUpdated;
// If at least one second has passed
if (time >= 1000) {
long totS = _totalAllocatedOutboundBytes;
long totR = _totalAllocatedInboundBytes;
long totS = _totalAllocatedOutboundBytes.get();
long totR = _totalAllocatedInboundBytes.get();
long sent = totS - _lastTotalSent; // How much we sent meanwhile
long recv = totR - _lastTotalReceived; // How much we received meanwhile
_lastTotalSent = totS;
@@ -337,6 +374,8 @@ public class FIFOBandwidthLimiter {
/**
* Go through the queue, satisfying as many requests as possible (notifying
* each one satisfied that the request has been granted).
*
* @param buffer returned with the satisfied outbound requests only
*/
private final void satisfyRequests(List<Request> buffer) {
buffer.clear();
@@ -350,7 +389,7 @@ public class FIFOBandwidthLimiter {
if (_inboundUnlimited) {
locked_satisfyInboundUnlimited(satisfied);
} else {
if (_availableInbound > 0) {
if (_availableInbound.get() > 0) {
locked_satisfyInboundAvailable(satisfied);
} else {
// no bandwidth available
@@ -370,6 +409,7 @@ public class FIFOBandwidthLimiter {
}
}
/** called from debug logging only */
private long locked_getLongestInboundWait() {
long start = -1;
for (int i = 0; i < _pendingInboundRequests.size(); i++) {
@@ -382,6 +422,8 @@ public class FIFOBandwidthLimiter {
else
return now() - start;
}
/** called from debug logging only */
private long locked_getLongestOutboundWait() {
long start = -1;
for (int i = 0; i < _pendingOutboundRequests.size(); i++) {
@@ -404,7 +446,7 @@ public class FIFOBandwidthLimiter {
while (_pendingInboundRequests.size() > 0) {
SimpleRequest req = (SimpleRequest)_pendingInboundRequests.remove(0);
int allocated = req.getPendingInboundRequested();
_totalAllocatedInboundBytes += allocated;
_totalAllocatedInboundBytes.addAndGet(allocated);
req.allocateBytes(allocated, 0);
satisfied.add(req);
long waited = now() - req.getRequestTime();
@@ -427,7 +469,7 @@ public class FIFOBandwidthLimiter {
*/
private final void locked_satisfyInboundAvailable(List<Request> satisfied) {
for (int i = 0; i < _pendingInboundRequests.size(); i++) {
if (_availableInbound <= 0) break;
if (_availableInbound.get() <= 0) break;
SimpleRequest req = (SimpleRequest)_pendingInboundRequests.get(i);
long waited = now() - req.getRequestTime();
if (req.getAborted()) {
@@ -452,13 +494,14 @@ public class FIFOBandwidthLimiter {
}
// ok, they are really waiting for us to give them stuff
int requested = req.getPendingInboundRequested();
int allocated = 0;
if (_availableInbound > requested)
int avi = _availableInbound.get();
int allocated;
if (avi >= requested)
allocated = requested;
else
allocated = _availableInbound;
_availableInbound -= allocated;
_totalAllocatedInboundBytes += allocated;
allocated = avi;
_availableInbound.addAndGet(0 - allocated);
_totalAllocatedInboundBytes.addAndGet(allocated);
req.allocateBytes(allocated, 0);
satisfied.add(req);
if (req.getPendingInboundRequested() > 0) {
@@ -490,7 +533,7 @@ public class FIFOBandwidthLimiter {
if (_outboundUnlimited) {
locked_satisfyOutboundUnlimited(satisfied);
} else {
if (_availableOutbound > 0) {
if (_availableOutbound.get() > 0) {
locked_satisfyOutboundAvailable(satisfied);
} else {
// no bandwidth available
@@ -518,7 +561,7 @@ public class FIFOBandwidthLimiter {
while (_pendingOutboundRequests.size() > 0) {
SimpleRequest req = (SimpleRequest)_pendingOutboundRequests.remove(0);
int allocated = req.getPendingOutboundRequested();
_totalAllocatedOutboundBytes += allocated;
_totalAllocatedOutboundBytes.addAndGet(allocated);
req.allocateBytes(0, allocated);
satisfied.add(req);
long waited = now() - req.getRequestTime();
@@ -542,7 +585,7 @@ public class FIFOBandwidthLimiter {
*/
private final void locked_satisfyOutboundAvailable(List<Request> satisfied) {
for (int i = 0; i < _pendingOutboundRequests.size(); i++) {
if (_availableOutbound <= 0) break;
if (_availableOutbound.get() <= 0) break;
SimpleRequest req = (SimpleRequest)_pendingOutboundRequests.get(i);
long waited = now() - req.getRequestTime();
if (req.getAborted()) {
@@ -567,13 +610,14 @@ public class FIFOBandwidthLimiter {
}
// ok, they are really waiting for us to give them stuff
int requested = req.getPendingOutboundRequested();
int allocated = 0;
if (_availableOutbound > requested)
int avo = _availableOutbound.get();
int allocated;
if (avo >= requested)
allocated = requested;
else
allocated = _availableOutbound;
_availableOutbound -= allocated;
_totalAllocatedOutboundBytes += allocated;
allocated = avo;
_availableOutbound.addAndGet(0 - allocated);
_totalAllocatedOutboundBytes.addAndGet(allocated);
req.allocateBytes(0, allocated);
satisfied.add(req);
if (req.getPendingOutboundRequested() > 0) {
@@ -618,6 +662,50 @@ public class FIFOBandwidthLimiter {
}
}
/**
* Lockless total satisfaction,
* at some minor risk of exceeding the limits
* and driving the available counter below zero
*
* @param requested number of bytes
* @return satisfaction
* @since 0.7.13
*/
private boolean shortcutSatisfyInboundRequest(int requested) {
boolean rv = _inboundUnlimited ||
(_pendingInboundRequests.isEmpty() &&
_availableInbound.get() >= requested);
if (rv) {
_availableInbound.addAndGet(0 - requested);
_totalAllocatedInboundBytes.addAndGet(requested);
}
if (_log.shouldLog(Log.INFO))
_log.info("IB shortcut for " + requested + "B? " + rv);
return rv;
}
/**
* Lockless total satisfaction,
* at some minor risk of exceeding the limits
* and driving the available counter below zero
*
* @param requested number of bytes
* @return satisfaction
* @since 0.7.13
*/
private boolean shortcutSatisfyOutboundRequest(int requested) {
boolean rv = _outboundUnlimited ||
(_pendingOutboundRequests.isEmpty() &&
_availableOutbound.get() >= requested);
if (rv) {
_availableOutbound.addAndGet(0 - requested);
_totalAllocatedOutboundBytes.addAndGet(requested);
}
if (_log.shouldLog(Log.INFO))
_log.info("OB shortcut for " + requested + "B? " + rv);
return rv;
}
/** @deprecated not worth translating */
public void renderStatusHTML(Writer out) throws IOException {
/*******
@@ -741,7 +829,6 @@ public class FIFOBandwidthLimiter {
void allocateAll() {
_inAllocated = _inTotal;
_outAllocated = _outTotal;
_outAllocated = _outTotal;
if (_lsnr == null)
_allocationsSinceWait++;
if (_log.shouldLog(Log.DEBUG)) _log.debug("allocate all");
@@ -778,6 +865,13 @@ public class FIFOBandwidthLimiter {
public String toString() { return getRequestName(); }
}
/**
* This is somewhat complicated by having both
* inbound and outbound in a single request.
* Making a request unidirectional would
* be a good simplification.
* But NTCP would have to be changed as it puts them on one queue.
*/
public interface Request {
/** describe this particular request */
public String getRequestName();

View File

@@ -302,7 +302,7 @@ public class EventPumper implements Runnable {
public void wantsWrite(NTCPConnection con, byte data[]) {
ByteBuffer buf = ByteBuffer.wrap(data);
FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestOutbound(data.length, "NTCP write", null, null);//con, buf);
FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestOutbound(data.length, "NTCP write");//con, buf);
if (req.getPendingOutboundRequested() > 0) {
if (_log.shouldLog(Log.INFO))
_log.info("queued write on " + con + " for " + data.length);
@@ -471,7 +471,7 @@ public class EventPumper implements Runnable {
buf.get(data);
releaseBuf(buf);
ByteBuffer rbuf = ByteBuffer.wrap(data);
FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestInbound(read, "NTCP read", null, null); //con, buf);
FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestInbound(read, "NTCP read"); //con, buf);
if (req.getPendingInboundRequested() > 0) {
key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
if (_log.shouldLog(Log.DEBUG))

View File

@@ -62,7 +62,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
* and already cleared through the bandwidth limiter.
*/
private final LinkedBlockingQueue<ByteBuffer> _writeBufs;
/** Todo: This is only so we can abort() them when we close() ??? */
/** Requests that were not granted immediately */
private final Set<FIFOBandwidthLimiter.Request> _bwRequests;
private boolean _established;
private long _establishedOn;