diff --git a/router/java/src/net/i2p/data/i2np/I2NPMessageReader.java b/router/java/src/net/i2p/data/i2np/I2NPMessageReader.java index f55dffbc7..bb9fa3bab 100644 --- a/router/java/src/net/i2p/data/i2np/I2NPMessageReader.java +++ b/router/java/src/net/i2p/data/i2np/I2NPMessageReader.java @@ -114,6 +114,9 @@ public class I2NPMessageReader { public void run() { while (_stayAlive) { while (_doRun) { + while (!_context.throttle().acceptNetworkMessage()) { + try { Thread.sleep(500 + _context.random().nextInt(512)); } catch (InterruptedException ie) {} + } // do read try { I2NPMessage msg = _handler.readMessage(_stream); diff --git a/router/java/src/net/i2p/router/RouterContext.java b/router/java/src/net/i2p/router/RouterContext.java index 5c56f2706..9b69834a7 100644 --- a/router/java/src/net/i2p/router/RouterContext.java +++ b/router/java/src/net/i2p/router/RouterContext.java @@ -49,6 +49,7 @@ public class RouterContext extends I2PAppContext { private Shitlist _shitlist; private MessageValidator _messageValidator; private MessageStateMonitor _messageStateMonitor; + private RouterThrottle _throttle; private Calculator _isFailingCalc; private Calculator _integrationCalc; private Calculator _speedCalc; @@ -83,6 +84,7 @@ public class RouterContext extends I2PAppContext { _statPublisher = new StatisticsManager(this); _shitlist = new Shitlist(this); _messageValidator = new MessageValidator(this); + _throttle = new RouterThrottleImpl(this); _isFailingCalc = new IsFailingCalculator(this); _integrationCalc = new IntegrationCalculator(this); _speedCalc = new SpeedCalculator(this); @@ -188,6 +190,11 @@ public class RouterContext extends I2PAppContext { * well as other criteria for "validity". */ public MessageValidator messageValidator() { return _messageValidator; } + /** + * Component to coordinate our accepting/rejecting of requests under load + * + */ + public RouterThrottle throttle() { return _throttle; } /** how do we rank the failure of profiles? */ public Calculator isFailingCalculator() { return _isFailingCalc; } diff --git a/router/java/src/net/i2p/router/RouterThrottle.java b/router/java/src/net/i2p/router/RouterThrottle.java new file mode 100644 index 000000000..45d8bf9f5 --- /dev/null +++ b/router/java/src/net/i2p/router/RouterThrottle.java @@ -0,0 +1,34 @@ +package net.i2p.router; + +import net.i2p.data.Hash; +import net.i2p.data.i2np.TunnelCreateMessage; + +/** + * Gatekeeper for deciding whether to throttle the further processing + * of messages through the router. This is seperate from the bandwidth + * limiting which simply makes sure the bytes transferred dont exceed the + * bytes allowed (though the router throttle should take into account the + * current bandwidth usage and limits when determining whether to accept or + * reject certain activities, such as tunnels) + * + */ +public interface RouterThrottle { + /** + * Should we accept any more data from the network for any sort of message, + * taking into account our current load, or should we simply slow down? + * + */ + public boolean acceptNetworkMessage(); + /** + * Should we accept the request to participate in the given tunnel, + * taking into account our current load and bandwidth usage commitments? + * + */ + public boolean acceptTunnelRequest(TunnelCreateMessage msg); + /** + * Should we accept the netDb lookup message, replying either with the + * value or some closer peers, or should we simply drop it due to overload? + * + */ + public boolean acceptNetDbLookupRequest(Hash key); +} diff --git a/router/java/src/net/i2p/router/RouterThrottleImpl.java b/router/java/src/net/i2p/router/RouterThrottleImpl.java new file mode 100644 index 000000000..6cbc28468 --- /dev/null +++ b/router/java/src/net/i2p/router/RouterThrottleImpl.java @@ -0,0 +1,82 @@ +package net.i2p.router; + +import net.i2p.data.Hash; +import net.i2p.data.i2np.TunnelCreateMessage; +import net.i2p.util.Log; + +/** + * Simple throttle that basically stops accepting messages or nontrivial + * requests if the jobQueue lag is too large. + * + */ +class RouterThrottleImpl implements RouterThrottle { + private RouterContext _context; + private Log _log; + + /** + * arbitrary hard limit of 5 seconds - if its taking this long to get + * to a job, we're congested. + * + */ + private static int JOB_LAG_LIMIT = 5000; + + public RouterThrottleImpl(RouterContext context) { + _context = context; + _log = context.logManager().getLog(RouterThrottleImpl.class); + _context.statManager().createRateStat("router.throttleNetworkCause", "How lagged the jobQueue was when an I2NP was throttled", "Throttle", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("router.throttleNetDbCause", "How lagged the jobQueue was when a networkDb request was throttled", "Throttle", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("router.throttleTunnelCause", "How lagged the jobQueue was when a tunnel request was throttled", "Throttle", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("tunnel.bytesAllocatedAtAccept", "How many bytes had been 'allocated' for participating tunnels when we accepted a request?", "Tunnels", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 }); + } + + public boolean acceptNetworkMessage() { + long lag = _context.jobQueue().getMaxLag(); + if (lag > JOB_LAG_LIMIT) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Throttling network reader, as the job lag is " + lag); + _context.statManager().addRateData("router.throttleNetworkCause", lag, lag); + return false; + } else { + return true; + } + } + + public boolean acceptNetDbLookupRequest(Hash key) { + long lag = _context.jobQueue().getMaxLag(); + if (lag > JOB_LAG_LIMIT) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Refusing netDb request, as the job lag is " + lag); + _context.statManager().addRateData("router.throttleNetDbCause", lag, lag); + return false; + } else { + return true; + } + } + public boolean acceptTunnelRequest(TunnelCreateMessage msg) { + long lag = _context.jobQueue().getMaxLag(); + if (lag > JOB_LAG_LIMIT) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Refusing tunnel request, as the job lag is " + lag); + _context.statManager().addRateData("router.throttleTunnelCause", lag, lag); + return false; + } else { + // ok, we're not hosed, but can we handle the bandwidth requirements + // of another tunnel? + double msgsPerTunnel = _context.statManager().getRate("tunnel.participatingMessagesProcessed").getRate(10*60*1000).getAverageValue(); + double bytesPerMsg = _context.statManager().getRate("tunnel.relayMessageSize").getRate(10*60*1000).getAverageValue(); + double bytesPerTunnel = msgsPerTunnel * bytesPerMsg; + + + int numTunnels = _context.tunnelManager().getParticipatingCount(); + double bytesAllocated = (numTunnels + 1) * bytesPerTunnel; + + _context.statManager().addRateData("tunnel.bytesAllocatedAtAccept", (long)bytesAllocated, msg.getTunnelDurationSeconds()*1000); + // todo: um, throttle (include bw usage of the netDb, our own tunnels, the clients, + // and check to see that they are less than the bandwidth limits + + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Accepting a new tunnel request (now allocating " + bytesAllocated + " bytes across " + numTunnels + " tunnels"); + return true; + } + } +} diff --git a/router/java/src/net/i2p/router/StatisticsManager.java b/router/java/src/net/i2p/router/StatisticsManager.java index 709473694..7ef180850 100644 --- a/router/java/src/net/i2p/router/StatisticsManager.java +++ b/router/java/src/net/i2p/router/StatisticsManager.java @@ -106,7 +106,7 @@ public class StatisticsManager implements Service { includeRate("crypto.garlic.decryptFail", stats, new long[] { 60*60*1000, 24*60*60*1000 }); includeRate("tunnel.unknownTunnelTimeLeft", stats, new long[] { 60*60*1000, 24*60*60*1000 }); includeRate("jobQueue.readyJobs", stats, new long[] { 60*1000, 60*60*1000 }); - includeRate("jobQueue.droppedJobs", stats, new long[] { 60*60*1000, 24*60*60*1000 }); + //includeRate("jobQueue.droppedJobs", stats, new long[] { 60*60*1000, 24*60*60*1000 }); includeRate("inNetPool.dropped", stats, new long[] { 60*60*1000, 24*60*60*1000 }); includeRate("tunnel.participatingTunnels", stats, new long[] { 5*60*1000, 60*60*1000 }); includeRate("tunnel.testSuccessTime", stats, new long[] { 60*60*1000l, 24*60*60*1000l }); @@ -114,6 +114,7 @@ public class StatisticsManager implements Service { includeRate("tunnel.inboundMessagesProcessed", stats, new long[] { 10*60*1000, 60*60*1000 }); includeRate("tunnel.participatingMessagesProcessed", stats, new long[] { 10*60*1000, 60*60*1000 }); includeRate("tunnel.expiredAfterAcceptTime", stats, new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); + includeRate("tunnel.bytesAllocatedAtAccept", stats, new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); includeRate("netDb.lookupsReceived", stats, new long[] { 5*60*1000, 60*60*1000 }); includeRate("netDb.lookupsHandled", stats, new long[] { 5*60*1000, 60*60*1000 }); includeRate("netDb.lookupsMatched", stats, new long[] { 5*60*1000, 60*60*1000 }); @@ -121,16 +122,17 @@ public class StatisticsManager implements Service { includeRate("netDb.successPeers", stats, new long[] { 60*60*1000 }); includeRate("netDb.failedPeers", stats, new long[] { 60*60*1000 }); includeRate("netDb.searchCount", stats, new long[] { 3*60*60*1000}); - includeRate("inNetMessage.timeToDiscard", stats, new long[] { 5*60*1000, 10*60*1000, 60*60*1000 }); - includeRate("outNetMessage.timeToDiscard", stats, new long[] { 5*60*1000, 10*60*1000, 60*60*1000 }); + //includeRate("inNetMessage.timeToDiscard", stats, new long[] { 5*60*1000, 10*60*1000, 60*60*1000 }); + //includeRate("outNetMessage.timeToDiscard", stats, new long[] { 5*60*1000, 10*60*1000, 60*60*1000 }); + includeRate("router.throttleNetworkCause", stats, new long[] { 10*60*1000, 60*60*1000 }); includeRate("transport.receiveMessageSize", stats, new long[] { 5*60*1000, 60*60*1000 }); - includeRate("transport.sendMessageSize", stats, new long[] { 5*60*1000, 60*60*1000 }); - includeRate("transport.sendMessageSmall", stats, new long[] { 5*60*1000, 60*60*1000 }); - includeRate("transport.sendMessageMedium", stats, new long[] { 5*60*1000, 60*60*1000 }); - includeRate("transport.sendMessageLarge", stats, new long[] { 5*60*1000, 60*60*1000 }); - includeRate("transport.receiveMessageSmall", stats, new long[] { 5*60*1000, 60*60*1000 }); - includeRate("transport.receiveMessageMedium", stats, new long[] { 5*60*1000, 60*60*1000 }); - includeRate("transport.receiveMessageLarge", stats, new long[] { 5*60*1000, 60*60*1000 }); + //includeRate("transport.sendMessageSize", stats, new long[] { 5*60*1000, 60*60*1000 }); + //includeRate("transport.sendMessageSmall", stats, new long[] { 5*60*1000, 60*60*1000 }); + //includeRate("transport.sendMessageMedium", stats, new long[] { 5*60*1000, 60*60*1000 }); + //includeRate("transport.sendMessageLarge", stats, new long[] { 5*60*1000, 60*60*1000 }); + //includeRate("transport.receiveMessageSmall", stats, new long[] { 5*60*1000, 60*60*1000 }); + //includeRate("transport.receiveMessageMedium", stats, new long[] { 5*60*1000, 60*60*1000 }); + //includeRate("transport.receiveMessageLarge", stats, new long[] { 5*60*1000, 60*60*1000 }); includeRate("client.sendAckTime", stats, new long[] { 60*60*1000, 24*60*60*1000l }, true); stats.setProperty("stat_uptime", DataHelper.formatDuration(_context.router().getUptime())); stats.setProperty("stat__rateKey", "avg;maxAvg;pctLifetime;[sat;satLim;maxSat;maxSatLim;][num;lifetimeFreq;maxFreq]"); diff --git a/router/java/src/net/i2p/router/networkdb/DatabaseLookupMessageHandler.java b/router/java/src/net/i2p/router/networkdb/DatabaseLookupMessageHandler.java index ab42e2d37..52dee9a6e 100644 --- a/router/java/src/net/i2p/router/networkdb/DatabaseLookupMessageHandler.java +++ b/router/java/src/net/i2p/router/networkdb/DatabaseLookupMessageHandler.java @@ -16,6 +16,7 @@ import net.i2p.data.i2np.SourceRouteBlock; import net.i2p.router.HandlerJobBuilder; import net.i2p.router.Job; import net.i2p.router.RouterContext; +import net.i2p.util.Log; /** * Build a HandleDatabaseLookupMessageJob whenever a DatabaseLookupMessage arrives @@ -23,14 +24,24 @@ import net.i2p.router.RouterContext; */ public class DatabaseLookupMessageHandler implements HandlerJobBuilder { private RouterContext _context; + private Log _log; public DatabaseLookupMessageHandler(RouterContext context) { _context = context; + _log = context.logManager().getLog(DatabaseLookupMessageHandler.class); _context.statManager().createRateStat("netDb.lookupsReceived", "How many netDb lookups have we received?", "Network Database", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); + _context.statManager().createRateStat("netDb.lookupsDropped", "How many netDb lookups did we drop due to throttling?", "Network Database", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); } public Job createJob(I2NPMessage receivedMessage, RouterIdentity from, Hash fromHash, SourceRouteBlock replyBlock) { _context.statManager().addRateData("netDb.lookupsReceived", 1, 0); - // ignore the reply block for the moment - return new HandleDatabaseLookupMessageJob(_context, (DatabaseLookupMessage)receivedMessage, from, fromHash); + + if (_context.throttle().acceptNetDbLookupRequest(((DatabaseLookupMessage)receivedMessage).getSearchKey())) { + return new HandleDatabaseLookupMessageJob(_context, (DatabaseLookupMessage)receivedMessage, from, fromHash); + } else { + if (_log.shouldLog(Log.INFO)) + _log.info("Dropping lookup request as throttled"); + _context.statManager().addRateData("netDb.lookupsDropped", 1, 1); + return null; + } } } diff --git a/router/java/src/net/i2p/router/tunnelmanager/HandleTunnelCreateMessageJob.java b/router/java/src/net/i2p/router/tunnelmanager/HandleTunnelCreateMessageJob.java index 7ef558e13..85ccc5fed 100644 --- a/router/java/src/net/i2p/router/tunnelmanager/HandleTunnelCreateMessageJob.java +++ b/router/java/src/net/i2p/router/tunnelmanager/HandleTunnelCreateMessageJob.java @@ -39,6 +39,7 @@ public class HandleTunnelCreateMessageJob extends JobImpl { RouterIdentity from, Hash fromHash, SourceRouteBlock replyBlock) { super(ctx); _log = ctx.logManager().getLog(HandleTunnelCreateMessageJob.class); + ctx.statManager().createRateStat("tunnel.rejectOverloaded", "How many tunnels did we deny due to throttling?", "Tunnels", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); _message = receivedMessage; _from = from; _fromHash = fromHash; @@ -83,8 +84,13 @@ public class HandleTunnelCreateMessageJob extends JobImpl { } private boolean isOverloaded() { - // hmmm.... - return false; + boolean shouldAccept = _context.throttle().acceptTunnelRequest(_message); + if (!shouldAccept) { + _context.statManager().addRateData("tunnel.rejectOverloaded", 1, 1); + if (_log.shouldLog(Log.INFO)) + _log.info("Refusing tunnel request due to overload"); + } + return !shouldAccept; } private class TestJob extends JobImpl {