* Tunnel RED:

- Complete rework of participating traffic RED.
      Implement an accurate bandwidth tracker in FIFOBandwidthRefiller.
    - Fix drop priority of VTBM at OBEP
    - Lower drop priority of VTBRM at IBGW
    - Raise threshold from 95% to 120%
    - Remove unused things in HopConfig
...needs more testing...
This commit is contained in:
zzz
2011-12-04 19:01:52 +00:00
parent 69cae1a052
commit e9d0d79809
10 changed files with 294 additions and 98 deletions

View File

@@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 14;
public final static long BUILD = 15;
/** for example "-test" */
public final static String EXTRA = "";

View File

@@ -51,18 +51,20 @@ public class FIFOBandwidthLimiter {
private int _maxInbound;
/** how large _availableOutbound can get - aka our outbound rate during a burst */
private int _maxOutbound;
/** shortcut of whether our outbound rate is unlimited */
/** shortcut of whether our outbound rate is unlimited - UNUSED always false for now */
private boolean _outboundUnlimited;
/** shortcut of whether our inbound rate is unlimited */
/** shortcut of whether our inbound rate is unlimited - UNUSED always false for now */
private boolean _inboundUnlimited;
/** lifetime counter of bytes received */
private final AtomicLong _totalAllocatedInboundBytes = new AtomicLong();
/** lifetime counter of bytes sent */
private final AtomicLong _totalAllocatedOutboundBytes = new AtomicLong();
/** lifetime counter of tokens available for use but exceeded our maxInboundBurst size */
private final AtomicLong _totalWastedInboundBytes = new AtomicLong();
//private final AtomicLong _totalWastedInboundBytes = new AtomicLong();
/** lifetime counter of tokens available for use but exceeded our maxOutboundBurst size */
private final AtomicLong _totalWastedOutboundBytes = new AtomicLong();
//private final AtomicLong _totalWastedOutboundBytes = new AtomicLong();
private final FIFOBandwidthRefiller _refiller;
private final Thread _refillerThread;
@@ -101,25 +103,41 @@ public class FIFOBandwidthLimiter {
//public long getAvailableOutboundBytes() { return _availableOutboundBytes; }
public long getTotalAllocatedInboundBytes() { return _totalAllocatedInboundBytes.get(); }
public long getTotalAllocatedOutboundBytes() { return _totalAllocatedOutboundBytes.get(); }
public long getTotalWastedInboundBytes() { return _totalWastedInboundBytes.get(); }
public long getTotalWastedOutboundBytes() { return _totalWastedOutboundBytes.get(); }
//public long getTotalWastedInboundBytes() { return _totalWastedInboundBytes.get(); }
//public long getTotalWastedOutboundBytes() { return _totalWastedOutboundBytes.get(); }
//public long getMaxInboundBytes() { return _maxInboundBytes; }
//public void setMaxInboundBytes(int numBytes) { _maxInboundBytes = numBytes; }
//public long getMaxOutboundBytes() { return _maxOutboundBytes; }
//public void setMaxOutboundBytes(int numBytes) { _maxOutboundBytes = numBytes; }
public boolean getInboundUnlimited() { return _inboundUnlimited; }
public void setInboundUnlimited(boolean isUnlimited) { _inboundUnlimited = isUnlimited; }
public boolean getOutboundUnlimited() { return _outboundUnlimited; }
public void setOutboundUnlimited(boolean isUnlimited) { _outboundUnlimited = isUnlimited; }
/** @deprecated unused for now, we are always limited */
void setInboundUnlimited(boolean isUnlimited) { _inboundUnlimited = isUnlimited; }
/** @deprecated unused for now, we are always limited */
void setOutboundUnlimited(boolean isUnlimited) { _outboundUnlimited = isUnlimited; }
/** @return smoothed one second rate */
public float getSendBps() { return _sendBps; }
/** @return smoothed one second rate */
public float getReceiveBps() { return _recvBps; }
/** @return smoothed 15 second rate */
public float getSendBps15s() { return _sendBps15s; }
/** @return smoothed 15 second rate */
public float getReceiveBps15s() { return _recvBps15s; }
/** These are the configured maximums, not the current rate */
/** The configured maximum, not the current rate */
public int getOutboundKBytesPerSecond() { return _refiller.getOutboundKBytesPerSecond(); }
/** The configured maximum, not the current rate */
public int getInboundKBytesPerSecond() { return _refiller.getInboundKBytesPerSecond(); }
/** The configured maximum, not the current rate */
public int getOutboundBurstKBytesPerSecond() { return _refiller.getOutboundBurstKBytesPerSecond(); }
/** The configured maximum, not the current rate */
public int getInboundBurstKBytesPerSecond() { return _refiller.getInboundBurstKBytesPerSecond(); }
public void reinitialize() {
@@ -146,10 +164,31 @@ public class FIFOBandwidthLimiter {
_maxOutboundBurst = 0;
_unavailableInboundBurst.set(0);
_unavailableOutboundBurst.set(0);
_inboundUnlimited = false;
_outboundUnlimited = false;
// always limited for now
//_inboundUnlimited = false;
//_outboundUnlimited = false;
}
/**
* We sent a message.
*
* @param size bytes
* @since 0.8.12
*/
public void sentParticipatingMessage(int size) {
_refiller.incrementParticipatingMessageBytes(size);
}
/**
* Out bandwidth. Actual bandwidth, not smoothed, not bucketed.
*
* @return Bps in recent period (a few seconds)
* @since 0.8.12
*/
public int getCurrentParticipatingBandwidth() {
return _refiller.getCurrentParticipatingBandwidth();
}
public Request createRequest() { return new SimpleRequest(); }
/**
@@ -241,6 +280,7 @@ public class FIFOBandwidthLimiter {
* More bytes are available - add them to the queue and satisfy any requests
* we can
*
* @param buf contains satisfied outbound requests, really just to avoid object thrash, not really used
* @param maxBurstIn allow up to this many bytes in from the burst section for this time period (may be negative)
* @param maxBurstOut allow up to this many bytes in from the burst section for this time period (may be negative)
*/
@@ -261,7 +301,7 @@ public class FIFOBandwidthLimiter {
int uib = _unavailableInboundBurst.addAndGet(avi - _maxInbound);
_availableInbound.set(_maxInbound);
if (uib > _maxInboundBurst) {
_totalWastedInboundBytes.addAndGet(uib - _maxInboundBurst);
//_totalWastedInboundBytes.addAndGet(uib - _maxInboundBurst);
_unavailableInboundBurst.set(_maxInboundBurst);
}
} else {
@@ -292,7 +332,7 @@ public class FIFOBandwidthLimiter {
_availableOutbound.set(_maxOutbound);
if (uob > _maxOutboundBurst) {
_totalWastedOutboundBytes.getAndAdd(uob - _maxOutboundBurst);
//_totalWastedOutboundBytes.getAndAdd(uob - _maxOutboundBurst);
_unavailableOutboundBurst.set(_maxOutboundBurst);
}
} else {
@@ -376,7 +416,7 @@ public class FIFOBandwidthLimiter {
* Go through the queue, satisfying as many requests as possible (notifying
* each one satisfied that the request has been granted).
*
* @param buffer returned with the satisfied outbound requests only
* @param buffer Out parameter, returned with the satisfied outbound requests only
*/
private final void satisfyRequests(List<Request> buffer) {
buffer.clear();
@@ -385,6 +425,9 @@ public class FIFOBandwidthLimiter {
satisfyOutboundRequests(buffer);
}
/**
* @param satisfied Out parameter, returned with the satisfied requests added
*/
private final void satisfyInboundRequests(List<Request> satisfied) {
synchronized (_pendingInboundRequests) {
if (_inboundUnlimited) {
@@ -529,6 +572,9 @@ public class FIFOBandwidthLimiter {
}
}
/**
* @param satisfied Out parameter, returned with the satisfied requests added
*/
private final void satisfyOutboundRequests(List<Request> satisfied) {
synchronized (_pendingOutboundRequests) {
if (_outboundUnlimited) {
@@ -895,6 +941,7 @@ public class FIFOBandwidthLimiter {
/** thar be dragons */
public void init(int in, int out, String target);
public void setCompleteListener(CompleteListener lsnr);
/** Only supported if the request is not satisfied */
public void attach(Object obj);
public Object attachment();
public CompleteListener getCompleteListener();
@@ -905,9 +952,8 @@ public class FIFOBandwidthLimiter {
}
private static final NoopRequest _noop = new NoopRequest();
private static class NoopRequest implements Request {
private CompleteListener _lsnr;
private Object _attachment;
public void abort() {}
public boolean getAborted() { return false; }
public int getPendingInboundRequested() { return 0; }
@@ -918,12 +964,13 @@ public class FIFOBandwidthLimiter {
public int getTotalOutboundRequested() { return 0; }
public void waitForNextAllocation() {}
public void init(int in, int out, String target) {}
public CompleteListener getCompleteListener() { return _lsnr; }
public CompleteListener getCompleteListener() { return null; }
public void setCompleteListener(CompleteListener lsnr) {
_lsnr = lsnr;
lsnr.complete(NoopRequest.this);
}
public void attach(Object obj) { _attachment = obj; }
public Object attachment() { return _attachment; }
public void attach(Object obj) {
throw new UnsupportedOperationException("Don't attach to a satisfied request");
}
public Object attachment() { return null; }
}
}

View File

@@ -2,10 +2,22 @@ package net.i2p.router.transport;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import net.i2p.I2PAppContext;
import net.i2p.data.DataHelper;
import net.i2p.util.Log;
/**
* Thread that runs every 100 ms to "give" bandwidth to
* FIFOBandwidthLimiter.
* Instantiated by FIFOBandwidthLimiter.
*
* As of 0.8.12, this also contains a counter for outbound participating bandwidth.
* This was a good place for it since we needed a 100ms thread for it.
*
* Public only for the properties and defaults.
*/
public class FIFOBandwidthRefiller implements Runnable {
private final Log _log;
private final I2PAppContext _context;
@@ -63,7 +75,7 @@ public class FIFOBandwidthRefiller implements Runnable {
*/
private static final long REPLENISH_FREQUENCY = 100;
public FIFOBandwidthRefiller(I2PAppContext context, FIFOBandwidthLimiter limiter) {
FIFOBandwidthRefiller(I2PAppContext context, FIFOBandwidthLimiter limiter) {
_limiter = limiter;
_context = context;
_log = context.logManager().getLog(FIFOBandwidthRefiller.class);
@@ -72,7 +84,7 @@ public class FIFOBandwidthRefiller implements Runnable {
}
/** @since 0.8.8 */
public void shutdown() {
void shutdown() {
_isRunning = false;
}
@@ -88,6 +100,7 @@ public class FIFOBandwidthRefiller implements Runnable {
_lastCheckConfigTime = now;
}
updateParticipating(now);
boolean updated = updateQueues(buffer, now);
if (updated) {
_lastRefillTime = now;
@@ -97,7 +110,7 @@ public class FIFOBandwidthRefiller implements Runnable {
}
}
public void reinitialize() {
void reinitialize() {
_lastRefillTime = _limiter.now();
checkConfig();
_lastCheckConfigTime = _lastRefillTime;
@@ -105,8 +118,8 @@ public class FIFOBandwidthRefiller implements Runnable {
private boolean updateQueues(List<FIFOBandwidthLimiter.Request> buffer, long now) {
long numMs = (now - _lastRefillTime);
if (_log.shouldLog(Log.INFO))
_log.info("Updating bandwidth after " + numMs + " (status: " + _limiter.getStatus().toString()
if (_log.shouldLog(Log.DEBUG))
_log.debug("Updating bandwidth after " + numMs + " (status: " + _limiter.getStatus().toString()
+ " rate in="
+ _inboundKBytesPerSecond + ", out="
+ _outboundKBytesPerSecond +")");
@@ -120,6 +133,7 @@ public class FIFOBandwidthRefiller implements Runnable {
if (inboundToAdd < 0) inboundToAdd = 0;
if (outboundToAdd < 0) outboundToAdd = 0;
/**** Always limited for now
if (_inboundKBytesPerSecond <= 0) {
_limiter.setInboundUnlimited(true);
inboundToAdd = 0;
@@ -132,15 +146,16 @@ public class FIFOBandwidthRefiller implements Runnable {
} else {
_limiter.setOutboundUnlimited(false);
}
****/
long maxBurstIn = ((_inboundBurstKBytesPerSecond-_inboundKBytesPerSecond)*1024*numMs)/1000;
long maxBurstOut = ((_outboundBurstKBytesPerSecond-_outboundKBytesPerSecond)*1024*numMs)/1000;
_limiter.refillBandwidthQueues(buffer, inboundToAdd, outboundToAdd, maxBurstIn, maxBurstOut);
if (_log.shouldLog(Log.DEBUG)) {
_log.debug("Adding " + inboundToAdd + " bytes to inboundAvailable");
_log.debug("Adding " + outboundToAdd + " bytes to outboundAvailable");
}
//if (_log.shouldLog(Log.DEBUG)) {
// _log.debug("Adding " + inboundToAdd + " bytes to inboundAvailable");
// _log.debug("Adding " + outboundToAdd + " bytes to outboundAvailable");
//}
return true;
} else {
if (_log.shouldLog(Log.DEBUG))
@@ -157,17 +172,9 @@ public class FIFOBandwidthRefiller implements Runnable {
updateInboundPeak();
updateOutboundPeak();
if (_inboundKBytesPerSecond <= 0) {
_limiter.setInboundUnlimited(true);
} else {
_limiter.setInboundUnlimited(false);
}
if (_outboundKBytesPerSecond <= 0) {
_limiter.setOutboundUnlimited(true);
} else {
_limiter.setOutboundUnlimited(false);
}
// We are always limited for now
//_limiter.setInboundUnlimited(_inboundKBytesPerSecond <= 0);
//_limiter.setOutboundUnlimited(_outboundKBytesPerSecond <= 0);
}
private void updateInboundRate() {
@@ -185,6 +192,7 @@ public class FIFOBandwidthRefiller implements Runnable {
if (_inboundKBytesPerSecond <= 0)
_inboundKBytesPerSecond = DEFAULT_INBOUND_BANDWIDTH;
}
private void updateOutboundRate() {
int out = _context.getProperty(PROP_OUTBOUND_BANDWIDTH, DEFAULT_OUTBOUND_BANDWIDTH);
if (out != _outboundKBytesPerSecond) {
@@ -276,4 +284,87 @@ public class FIFOBandwidthRefiller implements Runnable {
int getInboundKBytesPerSecond() { return _inboundKBytesPerSecond; }
int getOutboundBurstKBytesPerSecond() { return _outboundBurstKBytesPerSecond; }
int getInboundBurstKBytesPerSecond() { return _inboundBurstKBytesPerSecond; }
/**
* Participating counter stuff below here
* TOTAL_TIME needs to be high enough to get a burst without dropping
* @since 0.8.12
*/
private static final int TOTAL_TIME = 4000;
private static final int PERIODS = TOTAL_TIME / (int) REPLENISH_FREQUENCY;
/** count in current 100 ms period */
private final AtomicInteger _currentParticipating = new AtomicInteger();
private long _lastPartUpdateTime;
private int _lastTotal;
/** the actual length of last total period as coalesced (nominally TOTAL_TIME) */
private long _lastTotalTime;
private int _lastIndex;
/** buffer of count per 100 ms period, last is at _lastIndex, older at higher indexes (wraps) */
private final int[] _counts = new int[PERIODS];
/** the actual length of the period (nominally REPLENISH_FREQUENCY) */
private final long[] _times = new long[PERIODS];
/**
* We sent a message.
*
* @param size bytes
* @since 0.8.12
*/
void incrementParticipatingMessageBytes(int size) {
_currentParticipating.addAndGet(size);
}
/**
* Out bandwidth. Actual bandwidth, not smoothed, not bucketed.
*
* @return Bps in recent period (a few seconds)
* @since 0.8.12
*/
synchronized int getCurrentParticipatingBandwidth() {
int current = _currentParticipating.get();
long totalTime = (_limiter.now() - _lastPartUpdateTime) + _lastTotalTime;
if (totalTime <= 0)
return 0;
// 1000 for ms->seconds in denominator
long bw = 1000l * (current + _lastTotal) / totalTime;
if (bw > Integer.MAX_VALUE)
return 0;
return (int) bw;
}
/**
* Run once every 100 ms
*
* @since 0.8.12
*/
private synchronized void updateParticipating(long now) {
long elapsed = now - _lastPartUpdateTime;
if (elapsed <= 0) {
// glitch in the matrix
_lastPartUpdateTime = now;
return;
}
_lastPartUpdateTime = now;
if (--_lastIndex < 0)
_lastIndex = PERIODS - 1;
_counts[_lastIndex] = _currentParticipating.getAndSet(0);
_times[_lastIndex] = elapsed;
_lastTotal = 0;
_lastTotalTime = 0;
// add up total counts and times
for (int i = 0; i < PERIODS; i++) {
int idx = (_lastIndex + i) % PERIODS;
_lastTotal += _counts[idx];
_lastTotalTime += _times[idx];
if (_lastTotalTime >= TOTAL_TIME)
break;
}
if (_lastIndex == 0 && _lastTotalTime > 0) {
long bw = 1000l * _lastTotal / _lastTotalTime;
_context.statManager().addRateData("tunnel.participatingBandwidthOut", bw);
if (_lastTotal > 0 && _log.shouldLog(Log.INFO))
_log.info(DataHelper.formatSize(_lastTotal) + " bytes out part. tunnels in last " + _lastTotalTime + " ms: " +
DataHelper.formatSize(bw) + " Bps");
}
}
}

View File

@@ -25,13 +25,14 @@ public class HopConfig {
private ByteArray _replyIV;
private long _creation;
private long _expiration;
private Map _options;
//private Map _options;
// these 4 were longs, let's save some space
// 2 billion * 1KB / 10 minutes = 3 GBps in a single tunnel
private int _messagesProcessed;
private int _oldMessagesProcessed;
private int _messagesSent;
private int _oldMessagesSent;
//private int _messagesSent;
//private int _oldMessagesSent;
/** IV length for {@link #getReplyIV} */
public static final int REPLY_IV_LENGTH = 16;
@@ -48,6 +49,7 @@ public class HopConfig {
_receiveTunnel = getTunnel(_receiveTunnelId);
return _receiveTunnel;
}
public void setReceiveTunnelId(byte id[]) { _receiveTunnelId = id; }
public void setReceiveTunnelId(TunnelId id) { _receiveTunnelId = DataHelper.toLong(4, id.getTunnelId()); }
@@ -106,11 +108,13 @@ public class HopConfig {
* would be a Boolean, etc).
*
*/
public Map getOptions() { return _options; }
public void setOptions(Map options) { _options = options; }
//public Map getOptions() { return _options; }
//public void setOptions(Map options) { _options = options; }
/** take note of a message being pumped through this tunnel */
/** "processed" is for incoming and "sent" is for outgoing (could be dropped in between) */
/**
* Take note of a message being pumped through this tunnel.
* "processed" is for incoming and "sent" is for outgoing (could be dropped in between)
*/
public void incrementProcessedMessages() { _messagesProcessed++; }
public int getProcessedMessagesCount() { return _messagesProcessed; }
@@ -121,6 +125,11 @@ public class HopConfig {
return rv;
}
/**
* Take note of a message being pumped through this tunnel.
* "processed" is for incoming and "sent" is for outgoing (could be dropped in between)
*/
/****
public void incrementSentMessages() { _messagesSent++; }
public int getSentMessagesCount() { return _messagesSent; }
@@ -130,7 +139,9 @@ public class HopConfig {
_oldMessagesSent = _messagesSent;
return rv;
}
****/
/** */
@Override
public String toString() {
StringBuilder buf = new StringBuilder(64);

View File

@@ -48,7 +48,8 @@ class InboundGatewayReceiver implements TunnelGateway.Receiver {
// We do this before the preprocessor now (i.e. before fragmentation)
//if (_context.tunnelDispatcher().shouldDropParticipatingMessage("IBGW", encrypted.length))
// return -1;
_config.incrementSentMessages();
//_config.incrementSentMessages();
_context.bandwidthLimiter().sentParticipatingMessage(1024);
TunnelDataMessage msg = new TunnelDataMessage(_context);
msg.setData(encrypted);
msg.setTunnelId(_config.getSendTunnel());

View File

@@ -50,11 +50,18 @@ class OutboundTunnelEndpoint {
+ " to be forwarded on to "
+ (toRouter != null ? toRouter.toBase64().substring(0,4) : "")
+ (toTunnel != null ? ":" + toTunnel.getTunnelId() : ""));
int size = msg.getMessageSize();
// don't drop it if we are the target
if ((!_context.routerHash().equals(toRouter)) &&
_context.tunnelDispatcher().shouldDropParticipatingMessage("OBEP " + msg.getType(), msg.getMessageSize()))
boolean toUs = _context.routerHash().equals(toRouter);
if ((!toUs) &&
_context.tunnelDispatcher().shouldDropParticipatingMessage(TunnelDispatcher.Location.OBEP, msg.getType(), size))
return;
_config.incrementSentMessages();
// this overstates the stat somewhat, but ok for now
//int kb = (size + 1023) / 1024;
//for (int i = 0; i < kb; i++)
// _config.incrementSentMessages();
if (!toUs)
_context.bandwidthLimiter().sentParticipatingMessage(size);
_outDistributor.distribute(msg, toRouter, toTunnel);
}
}

View File

@@ -1,8 +1,5 @@
package net.i2p.router.tunnel;
import java.util.ArrayList;
import java.util.List;
import net.i2p.data.Hash;
import net.i2p.data.TunnelId;
import net.i2p.data.i2np.I2NPMessage;
@@ -38,13 +35,13 @@ class ThrottledPumpedTunnelGateway extends PumpedTunnelGateway {
// for the purpose of estimating outgoing size.
// We assume that it's the outbound bandwidth that is the issue...
int size = Math.max(msg.getMessageSize(), 1024/2);
if (_context.tunnelDispatcher().shouldDropParticipatingMessage("IBGW " + msg.getType(), size)) {
if (_context.tunnelDispatcher().shouldDropParticipatingMessage(TunnelDispatcher.Location.IBGW, msg.getType(), size)) {
// this overstates the stat somewhat, but ok for now
int kb = (size + 1023) / 1024;
for (int i = 0; i < kb; i++)
_config.incrementProcessedMessages();
return;
}
super.add(msg, toRouter,toTunnel);
super.add(msg, toRouter, toTunnel);
}
}

View File

@@ -12,8 +12,12 @@ import net.i2p.data.DataHelper;
import net.i2p.data.Hash;
import net.i2p.data.TunnelId;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.data.i2np.TunnelBuildMessage;
import net.i2p.data.i2np.TunnelBuildReplyMessage;
import net.i2p.data.i2np.TunnelDataMessage;
import net.i2p.data.i2np.TunnelGatewayMessage;
import net.i2p.data.i2np.VariableTunnelBuildMessage;
import net.i2p.data.i2np.VariableTunnelBuildReplyMessage;
import net.i2p.router.JobImpl;
import net.i2p.router.Router;
import net.i2p.router.RouterContext;
@@ -41,9 +45,12 @@ public class TunnelDispatcher implements Service {
private BloomFilterIVValidator _validator;
private final LeaveTunnel _leaveJob;
/** what is the date/time we last deliberately dropped a tunnel? **/
private long _lastDropTime;
//private long _lastDropTime;
private final TunnelGatewayPumper _pumper;
/** for shouldDropParticipatingMessage() */
enum Location {OBEP, PARTICIPANT, IBGW}
private static final long[] RATES = { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000 };
/** Creates a new instance of TunnelDispatcher */
@@ -198,13 +205,13 @@ public class TunnelDispatcher implements Service {
TunnelGateway gw = new PumpedTunnelGateway(_context, preproc, sender, receiver, _pumper);
TunnelId outId = cfg.getConfig(0).getSendTunnel();
_outboundGateways.put(outId, gw);
_context.statManager().addRateData("tunnel.joinOutboundGateway", 1, 0);
_context.statManager().addRateData("tunnel.joinOutboundGateway", 1);
_context.messageHistory().tunnelJoined("outbound", cfg);
} else {
TunnelGatewayZeroHop gw = new TunnelGatewayZeroHop(_context, cfg);
TunnelId outId = cfg.getConfig(0).getSendTunnel();
_outboundGateways.put(outId, gw);
_context.statManager().addRateData("tunnel.joinOutboundGatewayZeroHop", 1, 0);
_context.statManager().addRateData("tunnel.joinOutboundGatewayZeroHop", 1);
_context.messageHistory().tunnelJoined("outboundZeroHop", cfg);
}
}
@@ -220,13 +227,13 @@ public class TunnelDispatcher implements Service {
TunnelParticipant participant = new TunnelParticipant(_context, new InboundEndpointProcessor(_context, cfg, _validator));
TunnelId recvId = cfg.getConfig(cfg.getLength()-1).getReceiveTunnel();
_participants.put(recvId, participant);
_context.statManager().addRateData("tunnel.joinInboundEndpoint", 1, 0);
_context.statManager().addRateData("tunnel.joinInboundEndpoint", 1);
_context.messageHistory().tunnelJoined("inboundEndpoint", cfg);
} else {
TunnelGatewayZeroHop gw = new TunnelGatewayZeroHop(_context, cfg);
TunnelId recvId = cfg.getConfig(0).getReceiveTunnel();
_inboundGateways.put(recvId, gw);
_context.statManager().addRateData("tunnel.joinInboundEndpointZeroHop", 1, 0);
_context.statManager().addRateData("tunnel.joinInboundEndpointZeroHop", 1);
_context.messageHistory().tunnelJoined("inboundEndpointZeroHop", cfg);
}
}
@@ -243,7 +250,7 @@ public class TunnelDispatcher implements Service {
_participants.put(recvId, participant);
_participatingConfig.put(recvId, cfg);
_context.messageHistory().tunnelJoined("participant", cfg);
_context.statManager().addRateData("tunnel.joinParticipant", 1, 0);
_context.statManager().addRateData("tunnel.joinParticipant", 1);
if (cfg.getExpiration() > _lastParticipatingExpiration)
_lastParticipatingExpiration = cfg.getExpiration();
_leaveJob.add(cfg);
@@ -261,7 +268,7 @@ public class TunnelDispatcher implements Service {
_outboundEndpoints.put(recvId, endpoint);
_participatingConfig.put(recvId, cfg);
_context.messageHistory().tunnelJoined("outboundEndpoint", cfg);
_context.statManager().addRateData("tunnel.joinOutboundEndpoint", 1, 0);
_context.statManager().addRateData("tunnel.joinOutboundEndpoint", 1);
if (cfg.getExpiration() > _lastParticipatingExpiration)
_lastParticipatingExpiration = cfg.getExpiration();
@@ -284,7 +291,7 @@ public class TunnelDispatcher implements Service {
_inboundGateways.put(recvId, gw);
_participatingConfig.put(recvId, cfg);
_context.messageHistory().tunnelJoined("inboundGateway", cfg);
_context.statManager().addRateData("tunnel.joinInboundGateway", 1, 0);
_context.statManager().addRateData("tunnel.joinInboundGateway", 1);
if (cfg.getExpiration() > _lastParticipatingExpiration)
_lastParticipatingExpiration = cfg.getExpiration();
@@ -388,7 +395,7 @@ public class TunnelDispatcher implements Service {
+ recvFrom.toBase64().substring(0,4));
_context.messageHistory().tunnelDispatched(msg.getUniqueId(), msg.getTunnelId(), "participant");
participant.dispatch(msg, recvFrom);
_context.statManager().addRateData("tunnel.dispatchParticipant", 1, 0);
_context.statManager().addRateData("tunnel.dispatchParticipant", 1);
} else {
OutboundTunnelEndpoint endpoint = _outboundEndpoints.get(msg.getTunnelIdObj());
if (endpoint != null) {
@@ -399,7 +406,7 @@ public class TunnelDispatcher implements Service {
_context.messageHistory().tunnelDispatched(msg.getUniqueId(), msg.getTunnelId(), "outbound endpoint");
endpoint.dispatch(msg, recvFrom);
_context.statManager().addRateData("tunnel.dispatchEndpoint", 1, 0);
_context.statManager().addRateData("tunnel.dispatchEndpoint", 1);
} else {
_context.messageHistory().droppedTunnelDataMessageUnknown(msg.getUniqueId(), msg.getTunnelId());
int level = (_context.router().getUptime() > 10*60*1000 ? Log.WARN : Log.DEBUG);
@@ -446,7 +453,7 @@ public class TunnelDispatcher implements Service {
// + msg.getTunnelId().getTunnelId() + " as inbound gateway");
_context.messageHistory().tunnelDispatched(msg.getUniqueId(), msg.getMessage().getUniqueId(), msg.getTunnelId().getTunnelId(), "inbound gateway");
gw.add(msg);
_context.statManager().addRateData("tunnel.dispatchInbound", 1, 0);
_context.statManager().addRateData("tunnel.dispatchInbound", 1);
} else {
_context.messageHistory().droppedTunnelGatewayMessageUnknown(msg.getUniqueId(), msg.getTunnelId().getTunnelId());
int level = (_context.router().getUptime() > 10*60*1000 ? Log.WARN : Log.INFO);
@@ -481,6 +488,7 @@ public class TunnelDispatcher implements Service {
public void dispatchOutbound(I2NPMessage msg, TunnelId outboundTunnel, Hash targetPeer) {
dispatchOutbound(msg, outboundTunnel, null, targetPeer);
}
/**
* We are the outbound tunnel gateway (we created it), so wrap up this message
* with instructions to be forwarded to the targetTunnel on the targetPeer when
@@ -523,9 +531,9 @@ public class TunnelDispatcher implements Service {
_context.messageHistory().tunnelDispatched(msg.getUniqueId(), tid1, tid2, targetPeer, "outbound gateway");
gw.add(msg, targetPeer, targetTunnel);
if (targetTunnel == null)
_context.statManager().addRateData("tunnel.dispatchOutboundPeer", 1, 0);
_context.statManager().addRateData("tunnel.dispatchOutboundPeer", 1);
else
_context.statManager().addRateData("tunnel.dispatchOutboundTunnel", 1, 0);
_context.statManager().addRateData("tunnel.dispatchOutboundTunnel", 1);
} else {
_context.messageHistory().droppedTunnelGatewayMessageUnknown(msg.getUniqueId(), outboundTunnel.getTunnelId());
@@ -561,22 +569,16 @@ public class TunnelDispatcher implements Service {
* and computing the average from that.
*/
public void updateParticipatingStats(int ms) {
List<HopConfig> participating = listParticipatingTunnels();
int size = participating.size();
long count = 0;
long bw = 0;
long bwOut = 0;
//long bwOut = 0;
long tcount = 0;
long tooYoung = _context.clock().now() - 60*1000;
long tooOld = tooYoung - 9*60*1000;
for (int i = 0; i < size; i++) {
HopConfig cfg = participating.get(i);
// rare NPE seen here, guess CHS.values() isn't atomic?
if (cfg == null)
continue;
for (HopConfig cfg : _participatingConfig.values()) {
long c = cfg.getRecentMessagesCount();
bw += c;
bwOut += cfg.getRecentSentMessagesCount();
//bwOut += cfg.getRecentSentMessagesCount();
long created = cfg.getCreation();
if (created > tooYoung || created < tooOld)
continue;
@@ -587,8 +589,9 @@ public class TunnelDispatcher implements Service {
count = count * 30 / tcount;
_context.statManager().addRateData("tunnel.participatingMessageCount", count, ms);
_context.statManager().addRateData("tunnel.participatingBandwidth", bw*1024/(ms/1000), ms);
_context.statManager().addRateData("tunnel.participatingBandwidthOut", bwOut*1024/(ms/1000), ms);
_context.statManager().addRateData("tunnel.participatingTunnels", size, 0);
// moved to FIFOBandwidthRefiller
//_context.statManager().addRateData("tunnel.participatingBandwidthOut", bwOut*1024/(ms/1000), ms);
_context.statManager().addRateData("tunnel.participatingTunnels", tcount);
}
/**
@@ -609,12 +612,22 @@ public class TunnelDispatcher implements Service {
* Also, the OBEP is the earliest identifiable hop in the message's path
* (a plain participant could be earlier or later, but on average is later)
*
* @param type message hop location and type
* @param loc message hop location
* @param type I2NP message type
* @param length the length of the message
*/
public boolean shouldDropParticipatingMessage(String type, int length) {
public boolean shouldDropParticipatingMessage(Location loc, int type, int length) {
if (length <= 0)
return false;
/****
Don't use the tunnel.participatingBandwidth stat any more. It could be up to 3 minutes old.
Also, it counts inbound bandwidth, i.e. before dropping, which resulted in too many drops
during a burst.
We now use the bandwidth limiter to track outbound participating bandwidth
over the last few seconds.
****/
/****
RateStat rs = _context.statManager().getRate("tunnel.participatingBandwidth");
if (rs == null)
return false;
@@ -630,34 +643,45 @@ public class TunnelDispatcher implements Service {
bw = (int) r.getLifetimeAverageValue();
int usedIn = Math.min(_context.router().get1sRateIn(), _context.router().get15sRateIn());
usedIn = Math.min(usedIn, bw);
if (bw < usedIn)
usedIn = bw;
if (usedIn <= 0)
return false;
int usedOut = Math.min(_context.router().get1sRate(true), _context.router().get15sRate(true));
usedOut = Math.min(usedOut, bw);
if (bw < usedOut)
usedOut = bw;
if (usedOut <= 0)
return false;
int used = Math.min(usedIn, usedOut);
****/
int used = _context.bandwidthLimiter().getCurrentParticipatingBandwidth();
int maxKBps = Math.min(_context.bandwidthLimiter().getInboundKBytesPerSecond(),
_context.bandwidthLimiter().getOutboundKBytesPerSecond());
float share = (float) _context.router().getSharePercentage();
// start dropping at 95% of the limit
float maxBps = maxKBps * share * 1024f * 0.95f;
// start dropping at 120% of the limit,
// as we rely on Throttle for long-term bandwidth control by rejecting tunnels
float maxBps = maxKBps * share * (1024f * 1.20f);
float pctDrop = (used - maxBps) / used;
if (pctDrop <= 0)
return false;
// increase the drop probability for OBEP,
// (except lower it for tunnel build messages (type 21)),
// (except lower it for tunnel build messages type 21/22/23/24),
// and lower it for IBGW, for network efficiency
double len = length;
if (type.startsWith("OBEP")) {
if (type.equals("OBEP 21"))
if (loc == Location.OBEP) {
// we don't need to check for VTBRM/TBRM as that happens at tunnel creation
if (type == VariableTunnelBuildMessage.MESSAGE_TYPE || type == TunnelBuildMessage.MESSAGE_TYPE)
len /= 1.5;
else
len *= 1.5;
} else if (type.startsWith("IBGW")) {
len /= 1.5;
} else if (loc == Location.IBGW) {
// we don't need to check for VTBM/TBM as that happens at tunnel creation
if (type == VariableTunnelBuildReplyMessage.MESSAGE_TYPE || type == TunnelBuildReplyMessage.MESSAGE_TYPE)
len /= 1.5 * 1.5 * 1.5;
else
len /= 1.5;
}
// drop in proportion to size w.r.t. a standard 1024-byte message
// this is a little expensive but we want to adjust the curve between 0 and 1
@@ -671,9 +695,9 @@ public class TunnelDispatcher implements Service {
int availBps = (int) (((maxKBps*1024)*share) - used);
_log.warn("Drop part. msg. avail/max/used " + availBps + "/" + (int) maxBps + "/"
+ used + " %Drop = " + pctDrop
+ ' ' + type + ' ' + length);
+ ' ' + loc + ' ' + type + ' ' + length);
}
_context.statManager().addRateData("tunnel.participatingMessageDropped", 1, 0);
_context.statManager().addRateData("tunnel.participatingMessageDropped", 1);
}
return reject;
}

View File

@@ -181,9 +181,11 @@ class TunnelParticipant {
}
private void send(HopConfig config, TunnelDataMessage msg, RouterInfo ri) {
if (_context.tunnelDispatcher().shouldDropParticipatingMessage("TDM", 1024))
if (_context.tunnelDispatcher().shouldDropParticipatingMessage(TunnelDispatcher.Location.PARTICIPANT,
TunnelDataMessage.MESSAGE_TYPE, 1024))
return;
_config.incrementSentMessages();
//_config.incrementSentMessages();
_context.bandwidthLimiter().sentParticipatingMessage(1024);
long oldId = msg.getUniqueId();
long newId = _context.random().nextLong(I2NPMessage.MAX_ID_VALUE);
_context.messageHistory().wrap("TunnelDataMessage", oldId, "TunnelDataMessage", newId);