From 9e16bc203acad926f1d4c7750311694f35c88100 Mon Sep 17 00:00:00 2001 From: jrandom Date: Mon, 13 Dec 2004 13:45:52 +0000 Subject: [PATCH] 2004-12-13 jrandom * Added some error checking on the new client send job (thanks duck!) * Implemented tunnel rejection based on bandwidth usage (rejecting tunnels proportional to the bytes allocated in existing tunnels vs the bytes allowed through the bandwidth limiter). * Enable a new configuration parameter for triggering a tunnel rebuild (tunnel.maxTunnelFailures), where that is the max allowed test failures before killing the tunnel (default 0). * Gather more data that we rank capacity by (now we monitor and balance the data from 10m/30m/60m/1d instead of just 10m/60m/1d). * Fix a truncation/type conversion problem on the long term capacity values (we were ignoring the daily stats outright) --- history.txt | 15 ++- .../net/i2p/router/RouterThrottleImpl.java | 103 +++++++++++++++++- .../src/net/i2p/router/RouterVersion.java | 4 +- .../OutboundClientMessageOneShotJob.java | 18 ++- .../peermanager/CapacityCalculator.java | 34 +++--- .../i2p/router/peermanager/PeerProfile.java | 2 +- .../i2p/router/tunnelmanager/TunnelPool.java | 16 ++- 7 files changed, 167 insertions(+), 25 deletions(-) diff --git a/history.txt b/history.txt index 4b8604ee5..c93c49118 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,17 @@ -$Id: history.txt,v 1.103 2004/12/11 02:05:12 jrandom Exp $ +$Id: history.txt,v 1.104 2004/12/11 04:26:24 jrandom Exp $ + +2004-12-13 jrandom + * Added some error checking on the new client send job (thanks duck!) + * Implemented tunnel rejection based on bandwidth usage (rejecting tunnels + proportional to the bytes allocated in existing tunnels vs the bytes + allowed through the bandwidth limiter). + * Enable a new configuration parameter for triggering a tunnel rebuild + (tunnel.maxTunnelFailures), where that is the max allowed test failures + before killing the tunnel (default 0). + * Gather more data that we rank capacity by (now we monitor and balance the + data from 10m/30m/60m/1d instead of just 10m/60m/1d). + * Fix a truncation/type conversion problem on the long term capacity + values (we were ignoring the daily stats outright) 2004-12-11 jrandom * Fix the missing HTTP timeout, which was caused by the deferred syn used diff --git a/router/java/src/net/i2p/router/RouterThrottleImpl.java b/router/java/src/net/i2p/router/RouterThrottleImpl.java index 7780aa9c2..b6ad5f11b 100644 --- a/router/java/src/net/i2p/router/RouterThrottleImpl.java +++ b/router/java/src/net/i2p/router/RouterThrottleImpl.java @@ -30,6 +30,8 @@ class RouterThrottleImpl implements RouterThrottle { private static int THROTTLE_EVENT_LIMIT = 300; private static final String PROP_MAX_TUNNELS = "router.maxParticipatingTunnels"; + private static final String PROP_DEFAULT_KBPS_THROTTLE = "router.defaultKBpsThrottle"; + private static final String PROP_BANDWIDTH_SHARE_PERCENTAGE = "router.sharePercentage"; public RouterThrottleImpl(RouterContext context) { _context = context; @@ -43,6 +45,7 @@ class RouterThrottleImpl implements RouterThrottle { _context.statManager().createRateStat("router.throttleTunnelMaxExceeded", "How many tunnels we are participating in when we refuse one due to excees?", "Throttle", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("router.throttleTunnelProbTooFast", "How many tunnels beyond the previous 1h average are we participating in when we throttle?", "Throttle", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("router.throttleTunnelProbTestSlow", "How slow are our tunnel tests when our average exceeds the old average and we throttle?", "Throttle", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("router.throttleTunnelBandwidthExceeded", "How much bandwidth is allocated when we refuse due to bandwidth allocation?", "Throttle", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 }); } public boolean acceptNetworkMessage() { @@ -207,9 +210,12 @@ class RouterThrottleImpl implements RouterThrottle { } } + if (!allowTunnel(bytesAllocated, numTunnels)) { + _context.statManager().addRateData("router.throttleTunnelBandwidthExceeded", (long)bytesAllocated, 0); + return false; + } _context.statManager().addRateData("tunnel.bytesAllocatedAtAccept", (long)bytesAllocated, msg.getTunnelDurationSeconds()*1000); - // todo: um, throttle (include bw usage of the netDb, our own tunnels, the clients, - // and check to see that they are less than the bandwidth limits + if (_log.shouldLog(Log.DEBUG)) _log.debug("Accepting a new tunnel request (now allocating " + bytesAllocated + " bytes across " + numTunnels @@ -217,6 +223,99 @@ class RouterThrottleImpl implements RouterThrottle { return true; } + /** + * with bytesAllocated already accounted for across the numTunnels existing + * tunnels we have agreed to, can we handle another tunnel with our existing + * bandwidth? + * + */ + private boolean allowTunnel(double bytesAllocated, int numTunnels) { + long bytesAllowed = getBytesAllowed(); + + bytesAllowed *= getSharePercentage(); + + double bytesPerTunnel = (numTunnels > 0 ? bytesAllocated / numTunnels : 0); + double toAllocate = (numTunnels > 0 ? bytesPerTunnel * (numTunnels + 1) : 0); + + double pctFull = toAllocate / bytesAllowed; + + double allocatedKBps = toAllocate / (10 * 60 * 1024); + + if (_context.random().nextInt(100) > 100 * pctFull) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Probabalistically allowing the tunnel w/ " + pctFull + " of our " + bytesAllowed + + "bytes/" + allocatedKBps + "KBps allocated through " + numTunnels + " tunnels"); + return true; + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Rejecting the tunnel w/ " + pctFull + " of our " + bytesAllowed + + "bytes allowed (" + toAllocate + "bytes / " + allocatedKBps + + "KBps) through " + numTunnels + " tunnels"); + return false; + } + } + + /** + * What fraction of the bandwidth specified in our bandwidth limits should + * we allow to be consumed by participating tunnels? + * + */ + private double getSharePercentage() { + String pct = _context.getProperty(PROP_BANDWIDTH_SHARE_PERCENTAGE, "0.8"); + if (pct != null) { + try { + return Double.parseDouble(pct); + } catch (NumberFormatException nfe) { + if (_log.shouldLog(Log.INFO)) + _log.info("Unable to get the share percentage"); + } + } + return 0.8; + } + + /** + * BytesPerSecond that we can pass along data + */ + private long getBytesAllowed() { + String kbpsOutStr = _context.getProperty("i2np.bandwidth.outboundKBytesPerSecond"); + long kbpsOut = -1; + if (kbpsOutStr != null) { + try { + kbpsOut = Integer.parseInt(kbpsOutStr); + } catch (NumberFormatException nfe) { + if (_log.shouldLog(Log.INFO)) + _log.info("Unable to get the bytes allowed (outbound)"); + } + } + + String kbpsInStr = _context.getProperty("i2np.bandwidth.inboundKBytesPerSecond"); + long kbpsIn = -1; + if (kbpsInStr != null) { + try { + kbpsIn = Integer.parseInt(kbpsInStr); + } catch (NumberFormatException nfe) { + if (_log.shouldLog(Log.INFO)) + _log.info("Unable to get the bytes allowed (inbound)"); + } + } + + // whats our choke? + long kbps = (kbpsOut > kbpsIn ? kbpsIn : kbpsOut); + + if (kbps <= 0) { + try { + kbps = Integer.parseInt(_context.getProperty(PROP_DEFAULT_KBPS_THROTTLE, "64")); // absurd + } catch (NumberFormatException nfe) { + kbps = 64; + } + } + + return kbps + * 60 // per minute + * 10 // per 10 minute period + * 1024; // bytes; + } + /** dont ever probabalistically throttle tunnels if we have less than this many */ private int getMinThrottleTunnels() { try { diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index cea7089ce..6af926aaa 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.108 $ $Date: 2004/12/11 02:05:13 $"; + public final static String ID = "$Revision: 1.109 $ $Date: 2004/12/11 04:26:24 $"; public final static String VERSION = "0.4.2.3"; - public final static long BUILD = 2; + public final static long BUILD = 3; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java index 635410059..a737a5de1 100644 --- a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java +++ b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java @@ -53,6 +53,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { private MessageId _clientMessageId; private int _clientMessageSize; private Destination _from; + private Destination _to; /** target destination's leaseSet, if known */ private LeaseSet _leaseSet; /** Actual lease the message is being routed through */ @@ -60,6 +61,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { private PayloadGarlicConfig _clove; private long _cloveId; private long _start; + private boolean _finished; /** * final timeout (in milliseconds) that the outbound message will fail in. @@ -114,6 +116,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { _clientMessageId = msg.getMessageId(); _clientMessageSize = msg.getPayload().getSize(); _from = msg.getFromDestination(); + _to = msg.getDestination(); String param = msg.getSenderConfig().getOptions().getProperty(OVERALL_TIMEOUT_MS_PARAM); if (param == null) @@ -132,6 +135,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { _start = getContext().clock().now(); _overallExpiration = timeoutMs + _start; _shouldBundle = getShouldBundle(); + _finished = false; } public String getName() { return "Outbound client message"; } @@ -142,11 +146,10 @@ public class OutboundClientMessageOneShotJob extends JobImpl { buildClove(); if (_log.shouldLog(Log.DEBUG)) _log.debug(getJobId() + ": Clove built"); - Hash to = _clientMessage.getDestination().calculateHash(); long timeoutMs = _overallExpiration - getContext().clock().now(); if (_log.shouldLog(Log.DEBUG)) _log.debug(getJobId() + ": Send outbound client message - sending off leaseSet lookup job"); - getContext().netDb().lookupLeaseSet(to, new SendJob(), new LookupLeaseSetFailedJob(), timeoutMs); + getContext().netDb().lookupLeaseSet(_to.calculateHash(), new SendJob(), new LookupLeaseSetFailedJob(), timeoutMs); } private boolean getShouldBundle() { @@ -194,7 +197,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { * */ private boolean getNextLease() { - _leaseSet = getContext().netDb().lookupLeaseSetLocally(_clientMessage.getDestination().calculateHash()); + _leaseSet = getContext().netDb().lookupLeaseSetLocally(_to.calculateHash()); if (_leaseSet == null) { if (_log.shouldLog(Log.WARN)) _log.warn(getJobId() + ": Lookup locally didn't find the leaseSet"); @@ -278,7 +281,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { GarlicMessage msg = OutboundClientMessageJobHelper.createGarlicMessage(getContext(), token, _overallExpiration, key, _clove, - _clientMessage.getDestination(), + _to, sessKey, tags, true, replyLeaseSet); @@ -338,6 +341,9 @@ public class OutboundClientMessageOneShotJob extends JobImpl { * this is safe to call multiple times (only tells the client once) */ private void dieFatal() { + if (_finished) return; + _finished = true; + long sendTime = getContext().clock().now() - _start; if (_log.shouldLog(Log.WARN)) _log.warn(getJobId() + ": Failed to send the message " + _clientMessageId + " after " @@ -364,7 +370,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { DeliveryInstructions instructions = new DeliveryInstructions(); instructions.setDeliveryMode(DeliveryInstructions.DELIVERY_MODE_DESTINATION); - instructions.setDestination(_clientMessage.getDestination().calculateHash()); + instructions.setDestination(_to.calculateHash()); instructions.setDelayRequested(false); instructions.setDelaySeconds(0); @@ -434,6 +440,8 @@ public class OutboundClientMessageOneShotJob extends JobImpl { public String getName() { return "Send client message successful to a lease"; } public void runJob() { + if (_finished) return; + _finished = true; long sendTime = getContext().clock().now() - _start; if (_log.shouldLog(Log.INFO)) _log.info(OutboundClientMessageOneShotJob.this.getJobId() diff --git a/router/java/src/net/i2p/router/peermanager/CapacityCalculator.java b/router/java/src/net/i2p/router/peermanager/CapacityCalculator.java index af2a9dfd0..bf6d55a46 100644 --- a/router/java/src/net/i2p/router/peermanager/CapacityCalculator.java +++ b/router/java/src/net/i2p/router/peermanager/CapacityCalculator.java @@ -45,18 +45,25 @@ public class CapacityCalculator extends Calculator { private static long ESTIMATE_PERIOD = 60*60*1000; public double calc(PeerProfile profile) { - double capacity = 0; - RateStat acceptStat = profile.getTunnelCreateResponseTime(); RateStat rejectStat = profile.getTunnelHistory().getRejectionRate(); RateStat failedStat = profile.getTunnelHistory().getFailedRate(); - capacity += estimatePartial(acceptStat, rejectStat, failedStat, 10*60*1000); - capacity += estimatePartial(acceptStat, rejectStat, failedStat, 30*60*1000); - capacity += estimatePartial(acceptStat, rejectStat, failedStat, 60*60*1000); - capacity += estimatePartial(acceptStat, rejectStat, failedStat, 24*60*60*1000); + double capacity10m = estimateCapacity(acceptStat, rejectStat, failedStat, 10*60*1000); + double capacity30m = estimateCapacity(acceptStat, rejectStat, failedStat, 30*60*1000); + double capacity60m = estimateCapacity(acceptStat, rejectStat, failedStat, 60*60*1000); + double capacity1d = estimateCapacity(acceptStat, rejectStat, failedStat, 24*60*60*1000); - if (tooOld(profile)) + double capacity = capacity10m * periodWeight(10*60*1000) + + capacity30m * periodWeight(30*60*1000) + + capacity60m * periodWeight(60*60*1000) + + capacity1d * periodWeight(24*60*60*1000); + + // if we actively know they're bad, who cares if they used to be good? + if (capacity10m <= 0) + capacity = 0; + + if (tooOld(profile)) capacity = 1; capacity += profile.getReliabilityBonus(); @@ -74,7 +81,7 @@ public class CapacityCalculator extends Calculator { return true; } - private double estimatePartial(RateStat acceptStat, RateStat rejectStat, RateStat failedStat, int period) { + private double estimateCapacity(RateStat acceptStat, RateStat rejectStat, RateStat failedStat, int period) { Rate curAccepted = acceptStat.getRate(period); Rate curRejected = rejectStat.getRate(period); Rate curFailed = failedStat.getRate(period); @@ -82,25 +89,26 @@ public class CapacityCalculator extends Calculator { long eventCount = 0; if (curAccepted != null) eventCount = curAccepted.getCurrentEventCount() + curAccepted.getLastEventCount(); - double stretch = ESTIMATE_PERIOD / period; + double stretch = ((double)ESTIMATE_PERIOD) / period; double val = eventCount * stretch; long failed = 0; if (curFailed != null) failed = curFailed.getCurrentEventCount() + curFailed.getLastEventCount(); if (failed > 0) { - if ( (period == 10*60*1000) && (curFailed.getCurrentEventCount() > 0) ) + if ( (period <= 10*60*1000) && (curFailed.getCurrentEventCount() > 0) ) return 0.0d; // their tunnels have failed in the last 0-10 minutes else val -= failed * stretch; } - if ( (period == 10*60*1000) && (curRejected.getCurrentEventCount() + curRejected.getLastEventCount() > 0) ) + if ( (period <= 10*60*1000) && (curRejected.getCurrentEventCount() + curRejected.getLastEventCount() > 0) ) { + //System.out.println("10m period has rejected " + (curRejected.getCurrentEventCount() + curRejected.getLastEventCount()) + " times"); return 0.0d; - else + } else val -= stretch * (curRejected.getCurrentEventCount() + curRejected.getLastEventCount()); if (val >= 0) { - return (val + GROWTH_FACTOR) * periodWeight(period); + return (val + GROWTH_FACTOR); } else { // failed too much, don't grow return 0.0d; diff --git a/router/java/src/net/i2p/router/peermanager/PeerProfile.java b/router/java/src/net/i2p/router/peermanager/PeerProfile.java index 563171bff..242782dd8 100644 --- a/router/java/src/net/i2p/router/peermanager/PeerProfile.java +++ b/router/java/src/net/i2p/router/peermanager/PeerProfile.java @@ -248,7 +248,7 @@ public class PeerProfile { if (_dbResponseTime == null) _dbResponseTime = new RateStat("dbResponseTime", "how long it takes to get a db response from the peer (in milliseconds)", group, new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000 } ); if (_tunnelCreateResponseTime == null) - _tunnelCreateResponseTime = new RateStat("tunnelCreateResponseTime", "how long it takes to get a tunnel create response from the peer (in milliseconds)", group, new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000 } ); + _tunnelCreateResponseTime = new RateStat("tunnelCreateResponseTime", "how long it takes to get a tunnel create response from the peer (in milliseconds)", group, new long[] { 10*60*1000l, 30*60*1000l, 60*60*1000l, 24*60*60*1000 } ); if (_tunnelTestResponseTime == null) _tunnelTestResponseTime = new RateStat("tunnelTestResponseTime", "how long it takes to successfully test a tunnel this peer participates in (in milliseconds)", group, new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000 } ); if (_commError == null) diff --git a/router/java/src/net/i2p/router/tunnelmanager/TunnelPool.java b/router/java/src/net/i2p/router/tunnelmanager/TunnelPool.java index 60daddac6..1871e3e96 100644 --- a/router/java/src/net/i2p/router/tunnelmanager/TunnelPool.java +++ b/router/java/src/net/i2p/router/tunnelmanager/TunnelPool.java @@ -529,6 +529,20 @@ class TunnelPool { } private static final int MAX_FAILURES_PER_TUNNEL = 0; + public static final String PROP_MAX_TUNNEL_FAILURES = "tunnel.maxTunnelFailures"; + + private int getMaxTunnelFailures() { + String max = _context.getProperty(PROP_MAX_TUNNEL_FAILURES); + if (max != null) { + try { + return Integer.parseInt(max); + } catch (NumberFormatException nfe) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Max tunnel failures property is invalid [" + max + "]"); + } + } + return MAX_FAILURES_PER_TUNNEL; + } public void tunnelFailed(TunnelId id) { if (!_isLive) return; @@ -536,7 +550,7 @@ class TunnelPool { if (info == null) return; int failures = info.incrementFailures(); - if (failures <= MAX_FAILURES_PER_TUNNEL) { + if (failures <= getMaxTunnelFailures()) { if (_log.shouldLog(Log.INFO)) _log.info("Tunnel " + id + " failure " + failures + ", but not fatal yet"); return;