From 1375d01bdf7c56577aa23b346c8e06ed474ca5b1 Mon Sep 17 00:00:00 2001 From: jrandom Date: Mon, 12 Jul 2004 21:09:05 +0000 Subject: [PATCH] new bandwidth allocation policy and usage to include support for partial allocations (and in turn, partial write(...)) while still keeping the FIFO ordering this will give a much smoother traffic pattern, as instead of waiting 6 seconds to write a 32KB message under a 6KB rate, it'll write 6KB for each of the first 5 seconds, and 2KB the next this also allows people to have small buckets (but again, bucket sizes smaller than the rate just don't make sense) --- .../BandwidthLimitedInputStream.java | 38 +- .../BandwidthLimitedOutputStream.java | 18 +- .../transport/FIFOBandwidthLimiter.java | 349 +++++++++--------- .../transport/FIFOBandwidthRefiller.java | 50 ++- 4 files changed, 250 insertions(+), 205 deletions(-) diff --git a/router/java/src/net/i2p/router/transport/BandwidthLimitedInputStream.java b/router/java/src/net/i2p/router/transport/BandwidthLimitedInputStream.java index 8afea798f..4762c4b55 100644 --- a/router/java/src/net/i2p/router/transport/BandwidthLimitedInputStream.java +++ b/router/java/src/net/i2p/router/transport/BandwidthLimitedInputStream.java @@ -38,36 +38,50 @@ public class BandwidthLimitedInputStream extends FilterInputStream { } public int read() throws IOException { + FIFOBandwidthLimiter.Request req = null; if (_pullFromOutbound) - _context.bandwidthLimiter().requestOutbound(1, _peerSource); + req = _context.bandwidthLimiter().requestOutbound(1, _peerSource); else - _context.bandwidthLimiter().requestInbound(1, _peerSource); + req = _context.bandwidthLimiter().requestInbound(1, _peerSource); + + // since its only a single byte, we dont need to loop + // or check how much was allocated + req.waitForNextAllocation(); return in.read(); } public int read(byte dest[]) throws IOException { - int read = in.read(dest); - if (_pullFromOutbound) - _context.bandwidthLimiter().requestOutbound(read, _peerSource); - else - _context.bandwidthLimiter().requestInbound(read, _peerSource); - return read; + return read(dest, 0, dest.length); } public int read(byte dest[], int off, int len) throws IOException { int read = in.read(dest, off, len); + FIFOBandwidthLimiter.Request req = null; if (_pullFromOutbound) - _context.bandwidthLimiter().requestOutbound(read, _peerSource); + req = _context.bandwidthLimiter().requestOutbound(read, _peerSource); else - _context.bandwidthLimiter().requestInbound(read, _peerSource); + req = _context.bandwidthLimiter().requestInbound(read, _peerSource); + + while ( (req.getPendingInboundRequested() > 0) || + (req.getPendingOutboundRequested() > 0) ) { + // we still haven't been authorized for everything, keep on waiting + req.waitForNextAllocation(); + } return read; } public long skip(long numBytes) throws IOException { long skip = in.skip(numBytes); + FIFOBandwidthLimiter.Request req = null; if (_pullFromOutbound) - _context.bandwidthLimiter().requestOutbound((int)skip, _peerSource); + req = _context.bandwidthLimiter().requestOutbound((int)skip, _peerSource); else - _context.bandwidthLimiter().requestInbound((int)skip, _peerSource); + req = _context.bandwidthLimiter().requestInbound((int)skip, _peerSource); + + while ( (req.getPendingInboundRequested() > 0) || + (req.getPendingOutboundRequested() > 0) ) { + // we still haven't been authorized for everything, keep on waiting + req.waitForNextAllocation(); + } return skip; } } diff --git a/router/java/src/net/i2p/router/transport/BandwidthLimitedOutputStream.java b/router/java/src/net/i2p/router/transport/BandwidthLimitedOutputStream.java index 237b714ed..d9aa56cf5 100644 --- a/router/java/src/net/i2p/router/transport/BandwidthLimitedOutputStream.java +++ b/router/java/src/net/i2p/router/transport/BandwidthLimitedOutputStream.java @@ -33,7 +33,9 @@ public class BandwidthLimitedOutputStream extends FilterOutputStream { public void write(int val) throws IOException { if (_log.shouldLog(Log.DEBUG)) _log.debug("Writing a single byte!", new Exception("Single byte from...")); - _context.bandwidthLimiter().requestOutbound(1, _peerTarget); + FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestOutbound(1, _peerTarget); + // only a single byte, no need to loop + req.waitForNextAllocation(); out.write(val); } public void write(byte src[]) throws IOException { @@ -47,7 +49,17 @@ public class BandwidthLimitedOutputStream extends FilterOutputStream { if (len + off > src.length) throw new IllegalArgumentException("wtf are you thinking? len=" + len + ", off=" + off + ", data=" + src.length); - _context.bandwidthLimiter().requestOutbound(len, _peerTarget); - out.write(src, off, len); + FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestOutbound(len, _peerTarget); + + int written = 0; + while (written < len) { + int allocated = len - req.getPendingOutboundRequested(); + int toWrite = allocated - written; + if (toWrite > 0) { + out.write(src, off + written, toWrite); + written += toWrite; + } + req.waitForNextAllocation(); + } } } diff --git a/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java b/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java index c6eb8f157..aadb8a322 100644 --- a/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java +++ b/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java @@ -15,14 +15,14 @@ public class FIFOBandwidthLimiter { private I2PAppContext _context; private List _pendingInboundRequests; private List _pendingOutboundRequests; - private volatile long _availableInboundBytes; - private volatile long _availableOutboundBytes; + private volatile int _availableInboundBytes; + private volatile int _availableOutboundBytes; private boolean _outboundUnlimited; private boolean _inboundUnlimited; private volatile long _totalAllocatedInboundBytes; private volatile long _totalAllocatedOutboundBytes; - private long _maxInboundBytes; - private long _maxOutboundBytes; + private int _maxInboundBytes; + private int _maxOutboundBytes; private FIFOBandwidthRefiller _refiller; private static int __id = 0; @@ -45,9 +45,9 @@ public class FIFOBandwidthLimiter { public long getTotalAllocatedInboundBytes() { return _totalAllocatedInboundBytes; } public long getTotalAllocatedOutboundBytes() { return _totalAllocatedOutboundBytes; } public long getMaxInboundBytes() { return _maxInboundBytes; } - public void setMaxInboundBytes(long numBytes) { _maxInboundBytes = numBytes; } + public void setMaxInboundBytes(int numBytes) { _maxInboundBytes = numBytes; } public long getMaxOutboundBytes() { return _maxOutboundBytes; } - public void setMaxOutboundBytes(long numBytes) { _maxOutboundBytes = numBytes; } + public void setMaxOutboundBytes(int numBytes) { _maxOutboundBytes = numBytes; } public boolean getInboundUnlimited() { return _inboundUnlimited; } public void setInboundUnlimited(boolean isUnlimited) { _inboundUnlimited = isUnlimited; } public boolean getOutboundUnlimited() { return _outboundUnlimited; } @@ -67,81 +67,25 @@ public class FIFOBandwidthLimiter { * Request some bytes, blocking until they become available * */ - public void requestInbound(int bytesIn, String purpose) { - addInboundRequest(new SimpleRequest(bytesIn, 0, purpose)); + public Request requestInbound(int bytesIn, String purpose) { + SimpleRequest req = new SimpleRequest(bytesIn, 0, purpose); + synchronized (_pendingInboundRequests) { + _pendingInboundRequests.add(req); + } + satisfyInboundRequests(); + return req; } /** * Request some bytes, blocking until they become available * */ - public void requestOutbound(int bytesOut, String purpose) { - addOutboundRequest(new SimpleRequest(0, bytesOut, purpose)); - } - - /** - * Add the request to the queue, blocking the requesting thread until - * bandwidth is available (and all requests for bandwidth ahead of it have - * been granted). Once sufficient bandwidth is available, this call will - * return and request.grantRequest() will have been called. - * - */ - private final void addInboundRequest(BandwidthRequest request) { - synchronized (_pendingInboundRequests) { - if ( (_pendingInboundRequests.size() <= 0) && - ( (request.getRequestedInboundBytes() <= _availableInboundBytes) || (_inboundUnlimited) ) ) { - // the queue is empty and there are sufficient bytes, grant 'em - if (!_inboundUnlimited) - _availableInboundBytes -= request.getRequestedInboundBytes(); - _totalAllocatedInboundBytes += request.getRequestedInboundBytes(); - if (_log.shouldLog(Log.INFO)) - _log.info("Granting inbound request " + request.getRequestName() + " immediately for " - + request.getRequestedInboundBytes()); - request.grantRequest(); - return; - } else { - _pendingInboundRequests.add(request); - } - } - synchronized (request.getAvailabilityMonitor()) { - while (!request.alreadyGranted()) { - try { - request.getAvailabilityMonitor().wait(); - } catch (InterruptedException ie) {} - } - } - } - - /** - * Add the request to the queue, blocking the requesting thread until - * bandwidth is available (and all requests for bandwidth ahead of it have - * been granted). Once sufficient bandwidth is available, this call will - * return and request.grantRequest() will have been called. - * - */ - private final void addOutboundRequest(BandwidthRequest request) { + public Request requestOutbound(int bytesOut, String purpose) { + SimpleRequest req = new SimpleRequest(0, bytesOut, purpose); synchronized (_pendingOutboundRequests) { - if ( (_pendingOutboundRequests.size() <= 0) && - ( (request.getRequestedOutboundBytes() <= _availableOutboundBytes) || (_outboundUnlimited) ) ) { - // the queue is empty and there are sufficient bytes, grant 'em - if (!_outboundUnlimited) - _availableOutboundBytes -= request.getRequestedOutboundBytes(); - _totalAllocatedOutboundBytes += request.getRequestedOutboundBytes(); - if (_log.shouldLog(Log.INFO)) - _log.info("Granting outbound request " + request.getRequestName() + " immediately for " - + request.getRequestedOutboundBytes()); - request.grantRequest(); - return; - } else { - _pendingOutboundRequests.add(request); - } - } - synchronized (request.getAvailabilityMonitor()) { - while (!request.alreadyGranted()) { - try { - request.getAvailabilityMonitor().wait(); - } catch (InterruptedException ie) {} - } + _pendingOutboundRequests.add(req); } + satisfyOutboundRequests(); + return req; } /** @@ -170,80 +114,140 @@ public class FIFOBandwidthLimiter { } private final void satisfyInboundRequests() { + List satisfied = null; synchronized (_pendingInboundRequests) { while (_pendingInboundRequests.size() > 0) { - BandwidthRequest req = (BandwidthRequest)_pendingInboundRequests.get(0); - if ( (req.getRequestedInboundBytes() <= _availableInboundBytes) || (_inboundUnlimited) ) { - _pendingInboundRequests.remove(0); - if (!_inboundUnlimited) - _availableInboundBytes -= req.getRequestedInboundBytes(); - _totalAllocatedInboundBytes += req.getRequestedInboundBytes(); - if (_log.shouldLog(Log.INFO)) - _log.info("Granting inbound request " + req.getRequestName() + " for " - + req.getRequestedInboundBytes() + " bytes (waited " + SimpleRequest req = (SimpleRequest)_pendingInboundRequests.get(0); + if (_inboundUnlimited) { + int allocated = req.getPendingInboundRequested(); + _totalAllocatedInboundBytes += allocated; + req.allocateBytes(allocated, 0); + if (satisfied == null) + satisfied = new ArrayList(2); + satisfied.add(req); + if (_log.shouldLog(Log.INFO)) + _log.info("Granting inbound request " + req.getRequestName() + " fully for " + + req.getTotalInboundRequested() + " bytes (waited " + (_context.clock().now() - req.getRequestTime()) + "ms) pending " + _pendingInboundRequests.size()); - // i hate nested synchronization - synchronized (req.getAvailabilityMonitor()) { - req.grantRequest(); - req.getAvailabilityMonitor().notifyAll(); - } + // if we're unlimited, we always grant it fully, so there's no need to keep it around + _pendingInboundRequests.remove(0); + } else if (_availableInboundBytes > 0) { + int requested = req.getPendingInboundRequested(); + int allocated = 0; + if (_availableInboundBytes > requested) + allocated = requested; + else + allocated = _availableInboundBytes; + _availableInboundBytes -= allocated; + req.allocateBytes(allocated, 0); + if (satisfied == null) + satisfied = new ArrayList(2); + satisfied.add(req); + if (req.getPendingInboundRequested() > 0) { + if (_log.shouldLog(Log.INFO)) + _log.info("Allocating " + allocated + " bytes inbound as a partial grant to " + + req.getRequestName() + " (wanted " + + req.getTotalInboundRequested() + " bytes, waited " + + (_context.clock().now() - req.getRequestTime()) + + "ms) pending " + _pendingInboundRequests.size()); + } else { + if (_log.shouldLog(Log.INFO)) + _log.info("Allocating " + allocated + " bytes inbound to finish the partial grant to " + + req.getRequestName() + " (total " + + req.getTotalInboundRequested() + " bytes, waited " + + (_context.clock().now() - req.getRequestTime()) + + "ms) pending " + _pendingInboundRequests.size()); + _pendingInboundRequests.remove(0); + } } else { - // there isn't sufficient bandwidth for the first request, - // so since we're a FIFO limiter, everyone waits. If we were a - // best fit or ASAP limiter, we'd continue on iterating to see - // if anyone would be satisfied with the current availability + // no bandwidth available if (_log.shouldLog(Log.WARN)) _log.warn("Still denying the first inbound request (" + req.getRequestName() + " for " - + req.getRequestedInboundBytes() + " bytes (available " + + req.getTotalInboundRequested() + " bytes (available " + _availableInboundBytes + "/" + _availableOutboundBytes + " in/out) (waited " + (_context.clock().now() - req.getRequestTime()) + "ms so far) pending " + (_pendingInboundRequests.size())); - return; + break; } } - //if (_log.shouldLog(Log.INFO)) - // _log.info("Nothing pending"); + } + + if (satisfied != null) { + for (int i = 0; i < satisfied.size(); i++) { + SimpleRequest req = (SimpleRequest)satisfied.get(i); + req.notifyAllocation(); + } } } private final void satisfyOutboundRequests() { + List satisfied = null; synchronized (_pendingOutboundRequests) { while (_pendingOutboundRequests.size() > 0) { - BandwidthRequest req = (BandwidthRequest)_pendingOutboundRequests.get(0); - if ( (req.getRequestedOutboundBytes() <= _availableOutboundBytes) || (_outboundUnlimited) ) { - _pendingOutboundRequests.remove(0); - if (!_outboundUnlimited) - _availableOutboundBytes -= req.getRequestedOutboundBytes(); - _totalAllocatedOutboundBytes += req.getRequestedOutboundBytes(); - if (_log.shouldLog(Log.INFO)) - _log.info("Granting outbound request " + req.getRequestName() + " for " - + req.getRequestedOutboundBytes() + " bytes (waited " + SimpleRequest req = (SimpleRequest)_pendingOutboundRequests.get(0); + if (_outboundUnlimited) { + int allocated = req.getPendingOutboundRequested(); + _totalAllocatedOutboundBytes += allocated; + req.allocateBytes(0, allocated); + if (satisfied == null) + satisfied = new ArrayList(2); + satisfied.add(req); + if (_log.shouldLog(Log.INFO)) + _log.info("Granting outbound request " + req.getRequestName() + " fully for " + + req.getTotalOutboundRequested() + " bytes (waited " + (_context.clock().now() - req.getRequestTime()) - + "ms) pending " + (_pendingOutboundRequests.size()-1)); - // i hate nested synchronization - synchronized (req.getAvailabilityMonitor()) { - req.grantRequest(); - req.getAvailabilityMonitor().notifyAll(); - } + + "ms) pending " + _pendingOutboundRequests.size()); + // if we're unlimited, we always grant it fully, so there's no need to keep it around + _pendingOutboundRequests.remove(0); + } else if (_availableOutboundBytes > 0) { + int requested = req.getPendingOutboundRequested(); + int allocated = 0; + if (_availableOutboundBytes > requested) + allocated = requested; + else + allocated = _availableOutboundBytes; + _availableOutboundBytes -= allocated; + req.allocateBytes(0, allocated); + if (satisfied == null) + satisfied = new ArrayList(2); + satisfied.add(req); + if (req.getPendingOutboundRequested() > 0) { + if (_log.shouldLog(Log.INFO)) + _log.info("Allocating " + allocated + " bytes outbound as a partial grant to " + + req.getRequestName() + " (wanted " + + req.getTotalOutboundRequested() + " bytes, waited " + + (_context.clock().now() - req.getRequestTime()) + + "ms) pending " + _pendingOutboundRequests.size()); + } else { + if (_log.shouldLog(Log.INFO)) + _log.info("Allocating " + allocated + " bytes outbound to finish the partial grant to " + + req.getRequestName() + " (total " + + req.getTotalOutboundRequested() + " bytes, waited " + + (_context.clock().now() - req.getRequestTime()) + + "ms) pending " + _pendingOutboundRequests.size()); + _pendingOutboundRequests.remove(0); + } } else { - // there isn't sufficient bandwidth for the first request, - // so since we're a FIFO limiter, everyone waits. If we were a - // best fit or ASAP limiter, we'd continue on iterating to see - // if anyone would be satisfied with the current availability + // no bandwidth available if (_log.shouldLog(Log.WARN)) _log.warn("Still denying the first outbound request (" + req.getRequestName() + " for " - + req.getRequestedOutboundBytes() + " bytes (available " + + req.getTotalOutboundRequested() + " bytes (available " + _availableInboundBytes + "/" + _availableOutboundBytes + " in/out) (waited " + (_context.clock().now() - req.getRequestTime()) + "ms so far) pending " + (_pendingOutboundRequests.size())); - return; + break; } } - //if (_log.shouldLog(Log.INFO)) - // _log.info("Nothing pending"); + } + + if (satisfied != null) { + for (int i = 0; i < satisfied.size(); i++) { + SimpleRequest req = (SimpleRequest)satisfied.get(i); + req.notifyAllocation(); + } } } @@ -256,20 +260,22 @@ public class FIFOBandwidthLimiter { buf.append("
  • Inbound requests:
      "); synchronized (_pendingInboundRequests) { for (int i = 0; i < _pendingInboundRequests.size(); i++) { - BandwidthRequest req = (BandwidthRequest)_pendingInboundRequests.get(i); + Request req = (Request)_pendingInboundRequests.get(i); buf.append("
    1. ").append(req.getRequestName()).append(" for "); - buf.append(req.getRequestedInboundBytes()).append(" bytes "); - buf.append("requested ").append(now-req.getRequestTime()); + buf.append(req.getTotalInboundRequested()).append(" bytes "); + buf.append("requested (").append(req.getPendingInboundRequested()).append(" pending) as of "); + buf.append(now-req.getRequestTime()); buf.append("ms ago
    2. \n"); } } buf.append("
  • Outbound requests:
      \n"); synchronized (_pendingOutboundRequests) { for (int i = 0; i < _pendingOutboundRequests.size(); i++) { - BandwidthRequest req = (BandwidthRequest)_pendingOutboundRequests.get(i); + Request req = (Request)_pendingOutboundRequests.get(i); buf.append("
    1. ").append(req.getRequestName()).append(" for "); - buf.append(req.getRequestedOutboundBytes()).append(" bytes "); - buf.append("requested ").append(now-req.getRequestTime()); + buf.append(req.getTotalOutboundRequested()).append(" bytes "); + buf.append("requested (").append(req.getPendingOutboundRequested()).append(" pending) as of "); + buf.append(now-req.getRequestTime()); buf.append("ms ago
    2. \n"); } } @@ -278,67 +284,66 @@ public class FIFOBandwidthLimiter { } private static long __requestId = 0; - private final class SimpleRequest implements BandwidthRequest { - private boolean _alreadyGranted; - private int _in; - private int _out; + private final class SimpleRequest implements Request { + private int _inAllocated; + private int _inTotal; + private int _outAllocated; + private int _outTotal; private long _requestId; private long _requestTime; private String _target; public SimpleRequest(int in, int out, String target) { - _in = in; - _out = out; + _inTotal = in; + _outTotal = out; + _inAllocated = 0; + _outAllocated = 0; _target = target; - _alreadyGranted = false; _requestId = ++__requestId; _requestTime = _context.clock().now(); } - public boolean alreadyGranted() { return _alreadyGranted; } public Object getAvailabilityMonitor() { return SimpleRequest.this; } public String getRequestName() { return "Req" + _requestId + " to " + _target; } - public int getRequestedInboundBytes() { return _in; } - public int getRequestedOutboundBytes() { return _out; } - public void grantRequest() { _alreadyGranted = true; } public long getRequestTime() { return _requestTime; } + public int getTotalOutboundRequested() { return _outTotal; } + public int getPendingOutboundRequested() { return _outTotal - _outAllocated; } + public int getTotalInboundRequested() { return _inTotal; } + public int getPendingInboundRequested() { return _inTotal - _inAllocated; } + public void waitForNextAllocation() { + if ( (_outAllocated >= _outTotal) && + (_inAllocated >= _inTotal) ) + return; + try { + synchronized (SimpleRequest.this) { + SimpleRequest.this.wait(); + } + } catch (InterruptedException ie) {} + } + void allocateBytes(int in, int out) { + _inAllocated += in; + _outAllocated += out; + } + void notifyAllocation() { + synchronized (SimpleRequest.this) { + SimpleRequest.this.notifyAll(); + } + } } - - /** - * Defines a request for bandwidth allocation - */ - private interface BandwidthRequest { - /** - * how can we summarize this request (in case we want to display a list - * of 'whats pending') - */ + + public interface Request { + /** describe this particular request */ public String getRequestName(); - /** - * How many bytes are we going to send away from the router - */ - public int getRequestedOutboundBytes(); - /** - * How many bytes are we going to read from the network - */ - public int getRequestedInboundBytes(); - /** - * Lock unique to this request that will be wait() & notified upon - * during the queueing - */ - public Object getAvailabilityMonitor(); - /** - * When was the bandwidth requested? - */ + /** when was the request made? */ public long getRequestTime(); - /** - * must return true only if grantRequest has been called, else - * false - */ - public boolean alreadyGranted(); - /** - * flag this request to tell it that it has been or is about to be - * allocated sufficient bytes. This should NOT be used as the notification - * itself - */ - public void grantRequest(); + /** how many outbound bytes were requested? */ + public int getTotalOutboundRequested(); + /** how many outbound bytes were requested and haven't yet been allocated? */ + public int getPendingOutboundRequested(); + /** how many inbound bytes were requested? */ + public int getTotalInboundRequested(); + /** how many inbound bytes were requested and haven't yet been allocated? */ + public int getPendingInboundRequested(); + /** block until we are allocated some more bytes */ + public void waitForNextAllocation(); } } diff --git a/router/java/src/net/i2p/router/transport/FIFOBandwidthRefiller.java b/router/java/src/net/i2p/router/transport/FIFOBandwidthRefiller.java index f3b1c1c9d..855f46b11 100644 --- a/router/java/src/net/i2p/router/transport/FIFOBandwidthRefiller.java +++ b/router/java/src/net/i2p/router/transport/FIFOBandwidthRefiller.java @@ -8,9 +8,9 @@ class FIFOBandwidthRefiller implements Runnable { private I2PAppContext _context; private FIFOBandwidthLimiter _limiter; /** how many KBps do we want to allow? */ - private long _inboundKBytesPerSecond; + private int _inboundKBytesPerSecond; /** how many KBps do we want to allow? */ - private long _outboundKBytesPerSecond; + private int _outboundKBytesPerSecond; /** how frequently do we want to replenish the available queues? */ private long _replenishFrequency; /** when did we last replenish the queue? */ @@ -27,13 +27,13 @@ class FIFOBandwidthRefiller implements Runnable { public static final String PROP_REPLENISH_FREQUENCY = "i2np.bandwidth.replenishFrequencyMs"; /** For now, until there is some tuning and safe throttling, we set the floor at 6KBps inbound */ - public static final long MIN_INBOUND_BANDWIDTH = 6; + public static final int MIN_INBOUND_BANDWIDTH = 6; /** For now, until there is some tuning and safe throttling, we set the floor at 6KBps outbound */ - public static final long MIN_OUTBOUND_BANDWIDTH = 6; + public static final int MIN_OUTBOUND_BANDWIDTH = 6; /** For now, until there is some tuning and safe throttling, we set the floor at a 10 second burst */ - public static final long MIN_INBOUND_BANDWIDTH_PEAK = 60; + public static final int MIN_INBOUND_BANDWIDTH_PEAK = 6; /** For now, until there is some tuning and safe throttling, we set the floor at a 10 second burst */ - public static final long MIN_OUTBOUND_BANDWIDTH_PEAK = 60; + public static final int MIN_OUTBOUND_BANDWIDTH_PEAK = 6; /** Updating the bandwidth more than once a second is silly. once every 2 or 5 seconds is less so. */ public static final long MIN_REPLENISH_FREQUENCY = 1000; @@ -134,7 +134,7 @@ class FIFOBandwidthRefiller implements Runnable { (!(inBwStr.equals(String.valueOf(_inboundKBytesPerSecond)))) ) { // bandwidth was specified *and* changed try { - long in = Long.parseLong(inBwStr); + int in = Integer.parseInt(inBwStr); if ( (in <= 0) || (in > MIN_INBOUND_BANDWIDTH) ) _inboundKBytesPerSecond = in; else @@ -157,7 +157,7 @@ class FIFOBandwidthRefiller implements Runnable { (!(outBwStr.equals(String.valueOf(_outboundKBytesPerSecond)))) ) { // bandwidth was specified *and* changed try { - long out = Long.parseLong(outBwStr); + int out = Integer.parseInt(outBwStr); if ( (out <= 0) || (out >= MIN_OUTBOUND_BANDWIDTH) ) _outboundKBytesPerSecond = out; else @@ -180,11 +180,18 @@ class FIFOBandwidthRefiller implements Runnable { (!(inBwStr.equals(String.valueOf(_limiter.getMaxInboundBytes())))) ) { // peak bw was specified *and* changed try { - long in = Long.parseLong(inBwStr); - if (in >= MIN_INBOUND_BANDWIDTH_PEAK) - _limiter.setMaxInboundBytes(in * 1024); - else - _limiter.setMaxInboundBytes(MIN_INBOUND_BANDWIDTH_PEAK * 1024); + int in = Integer.parseInt(inBwStr); + if (in >= MIN_INBOUND_BANDWIDTH_PEAK) { + if (in < _inboundKBytesPerSecond) + _limiter.setMaxInboundBytes(_inboundKBytesPerSecond * 1024); + else + _limiter.setMaxInboundBytes(in * 1024); + } else { + if (MIN_INBOUND_BANDWIDTH_PEAK < _inboundKBytesPerSecond) + _limiter.setMaxInboundBytes(_inboundKBytesPerSecond * 1024); + else + _limiter.setMaxInboundBytes(MIN_INBOUND_BANDWIDTH_PEAK * 1024); + } } catch (NumberFormatException nfe) { if (_log.shouldLog(Log.WARN)) _log.warn("Invalid inbound bandwidth burst limit [" + inBwStr @@ -203,11 +210,18 @@ class FIFOBandwidthRefiller implements Runnable { (!(outBwStr.equals(String.valueOf(_limiter.getMaxOutboundBytes())))) ) { // peak bw was specified *and* changed try { - long out = Long.parseLong(outBwStr); - if (out >= MIN_OUTBOUND_BANDWIDTH_PEAK) - _limiter.setMaxOutboundBytes(out * 1024); - else - _limiter.setMaxOutboundBytes(MIN_OUTBOUND_BANDWIDTH_PEAK * 1024); + int out = Integer.parseInt(outBwStr); + if (out >= MIN_OUTBOUND_BANDWIDTH_PEAK) { + if (out < _outboundKBytesPerSecond) + _limiter.setMaxOutboundBytes(_outboundKBytesPerSecond * 1024); + else + _limiter.setMaxOutboundBytes(out * 1024); + } else { + if (MIN_OUTBOUND_BANDWIDTH_PEAK < _outboundKBytesPerSecond) + _limiter.setMaxOutboundBytes(_outboundKBytesPerSecond * 1024); + else + _limiter.setMaxOutboundBytes(MIN_OUTBOUND_BANDWIDTH_PEAK * 1024); + } } catch (NumberFormatException nfe) { if (_log.shouldLog(Log.WARN)) _log.warn("Invalid outbound bandwidth burst limit [" + outBwStr