diff --git a/core/java/src/net/i2p/stat/StatManager.java b/core/java/src/net/i2p/stat/StatManager.java index ffb7d6c52..528421048 100644 --- a/core/java/src/net/i2p/stat/StatManager.java +++ b/core/java/src/net/i2p/stat/StatManager.java @@ -50,7 +50,7 @@ public class StatManager { "tunnel.acceptLoad,tunnel.buildRequestTime,tunnel.rejectOverloaded,tunnel.rejectTimeout" + "tunnel.buildClientExpire,tunnel.buildClientReject,tunnel.buildClientSuccess," + "tunnel.buildExploratoryExpire,tunnel.buildExploratoryReject,tunnel.buildExploratorySuccess," + - "tunnel.buildRatio.*,tunnel.corruptMessage," + + "tunnel.buildRatio.*,tunnel.corruptMessage,tunnel.dropLoad*," + "tunnel.decryptRequestTime,tunnel.fragmentedDropped,tunnel.participatingMessageCount,"+ "tunnel.participatingTunnels,tunnel.testFailedTime,tunnel.testSuccessTime," + "tunnel.participatingBandwidth,udp.sendPacketSize,udp.packetsRetransmitted" ; diff --git a/router/java/src/net/i2p/router/CommSystemFacade.java b/router/java/src/net/i2p/router/CommSystemFacade.java index 57ada6bf2..6d0927c63 100644 --- a/router/java/src/net/i2p/router/CommSystemFacade.java +++ b/router/java/src/net/i2p/router/CommSystemFacade.java @@ -34,6 +34,7 @@ public abstract class CommSystemFacade implements Service { public int countActivePeers() { return 0; } public int countActiveSendPeers() { return 0; } + public boolean haveCapacity() { return true; } public List getMostRecentErrorMessages() { return Collections.EMPTY_LIST; } /** diff --git a/router/java/src/net/i2p/router/transport/CommSystemFacadeImpl.java b/router/java/src/net/i2p/router/transport/CommSystemFacadeImpl.java index b80e05694..d20c97846 100644 --- a/router/java/src/net/i2p/router/transport/CommSystemFacadeImpl.java +++ b/router/java/src/net/i2p/router/transport/CommSystemFacadeImpl.java @@ -60,6 +60,7 @@ public class CommSystemFacadeImpl extends CommSystemFacade { public int countActivePeers() { return (_manager == null ? 0 : _manager.countActivePeers()); } public int countActiveSendPeers() { return (_manager == null ? 0 : _manager.countActiveSendPeers()); } + public boolean haveCapacity() { return (_manager == null ? false : _manager.haveCapacity()); } /** * Framed average clock skew of connected peers in seconds, or null if we cannot answer. diff --git a/router/java/src/net/i2p/router/transport/Transport.java b/router/java/src/net/i2p/router/transport/Transport.java index 84a37f68e..f95d2dc8f 100644 --- a/router/java/src/net/i2p/router/transport/Transport.java +++ b/router/java/src/net/i2p/router/transport/Transport.java @@ -40,6 +40,7 @@ public interface Transport { public int countActivePeers(); public int countActiveSendPeers(); + public boolean haveCapacity(); public Vector getClockSkews(); public List getMostRecentErrorMessages(); diff --git a/router/java/src/net/i2p/router/transport/TransportImpl.java b/router/java/src/net/i2p/router/transport/TransportImpl.java index ba395b6b5..04a2cde91 100644 --- a/router/java/src/net/i2p/router/transport/TransportImpl.java +++ b/router/java/src/net/i2p/router/transport/TransportImpl.java @@ -31,7 +31,9 @@ import net.i2p.router.Job; import net.i2p.router.JobImpl; import net.i2p.router.MessageSelector; import net.i2p.router.OutNetMessage; +import net.i2p.router.Router; import net.i2p.router.RouterContext; +import net.i2p.router.networkdb.kademlia.FloodfillNetworkDatabaseFacade; import net.i2p.util.Log; /** @@ -78,6 +80,33 @@ public abstract class TransportImpl implements Transport { * How many peers are we actively sending messages to (this minute) */ public int countActiveSendPeers() { return 0; } + + /** Default is 500 for floodfills... */ + public static final int DEFAULT_MAX_CONNECTIONS = 500; + /** ...and 60/120/180/240/300 for BW Tiers K/L/M/N/O */ + public static final int MAX_CONNECTION_FACTOR = 60; + /** Per-transport connection limit */ + public int getMaxConnections() { + String style = getStyle(); + if (style.equals("SSU")) + style = "udp"; + else + style = style.toLowerCase(); + int def = DEFAULT_MAX_CONNECTIONS; + RouterInfo ri = _context.router().getRouterInfo(); + if (ri != null) { + char bw = ri.getBandwidthTier().charAt(0); + if (bw != 'U' && + ! ((FloodfillNetworkDatabaseFacade)_context.netDb()).floodfillEnabled()) + def = MAX_CONNECTION_FACTOR * (1 + bw - Router.CAPABILITY_BW12); + } + return _context.getProperty("i2np." + style + ".maxConnections", def); + } + + /** + * Can we initiate or accept a connection to another peer, saving some margin + */ + public boolean haveCapacity() { return true; } /** * Return our peer clock skews on a transport. diff --git a/router/java/src/net/i2p/router/transport/TransportManager.java b/router/java/src/net/i2p/router/transport/TransportManager.java index 3d49780c3..d993a600d 100644 --- a/router/java/src/net/i2p/router/transport/TransportManager.java +++ b/router/java/src/net/i2p/router/transport/TransportManager.java @@ -151,6 +151,19 @@ public class TransportManager implements TransportEventListener { return peers; } + /** + * Is at least one transport below its connection limit + some margin + * Use for throttling in the router. + * Perhaps we should just use SSU? + */ + public boolean haveCapacity() { + for (int i = 0; i < _transports.size(); i++) { + if (((Transport)_transports.get(i)).haveCapacity()) + return true; + } + return false; + } + /** * Return our peer clock skews on all transports. * Vector composed of Long, each element representing a peer skew in seconds. diff --git a/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java b/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java index 2f81318d9..d27a95172 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java +++ b/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java @@ -39,6 +39,7 @@ public class EventPumper implements Runnable { private List _wantsRegister; private List _wantsConRegister; private NTCPTransport _transport; + private long _expireIdleWriteTime; private static final int BUF_SIZE = 8*1024; private static final int MAX_CACHE_SIZE = 64; @@ -50,6 +51,8 @@ public class EventPumper implements Runnable { * the time to iterate across them to check a few flags shouldn't be a problem. */ private static final long FAILSAFE_ITERATION_FREQ = 2*1000l; + private static final long MIN_EXPIRE_IDLE_TIME = 5*60*1000l; + private static final long MAX_EXPIRE_IDLE_TIME = 15*60*1000l; public EventPumper(RouterContext ctx, NTCPTransport transport) { _context = ctx; @@ -57,6 +60,7 @@ public class EventPumper implements Runnable { _transport = transport; _alive = false; _bufCache = new ArrayList(MAX_CACHE_SIZE); + _expireIdleWriteTime = MAX_EXPIRE_IDLE_TIME; } public void startPumping() { @@ -135,8 +139,12 @@ public class EventPumper implements Runnable { int failsafeWrites = 0; int failsafeCloses = 0; int failsafeInvalid = 0; - // pointless if we do this every 2 seconds? - long expireIdleWriteTime = 10*60*1000l; // + _context.random().nextLong(60*60*1000l); + + // Increase allowed idle time if we are well under allowed connections, otherwise decrease + if (_transport.haveCapacity()) + _expireIdleWriteTime = Math.min(_expireIdleWriteTime + 1000, MAX_EXPIRE_IDLE_TIME); + else + _expireIdleWriteTime = Math.max(_expireIdleWriteTime - 3000, MIN_EXPIRE_IDLE_TIME); for (Iterator iter = all.iterator(); iter.hasNext(); ) { try { SelectionKey key = (SelectionKey)iter.next(); @@ -181,8 +189,8 @@ public class EventPumper implements Runnable { failsafeWrites++; } - if ( con.getTimeSinceSend() > expireIdleWriteTime && - con.getTimeSinceReceive() > expireIdleWriteTime) { + if ( con.getTimeSinceSend() > _expireIdleWriteTime && + con.getTimeSinceReceive() > _expireIdleWriteTime) { // we haven't sent or received anything in a really long time, so lets just close 'er up con.close(); failsafeCloses++; @@ -680,4 +688,5 @@ public class EventPumper implements Runnable { private void expireTimedOut() { _transport.expireTimedOut(); } + public long getIdleTimeout() { return _expireIdleWriteTime; } } diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java index 349669066..7d78f618a 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java @@ -300,18 +300,13 @@ public class NTCPTransport extends TransportImpl { return _slowBid; } - private static final int DEFAULT_MAX_CONNECTIONS = 500; public boolean allowConnection() { - int max = DEFAULT_MAX_CONNECTIONS; - String mc = _context.getProperty("i2np.ntcp.maxConnections"); - if (mc != null) { - try { - max = Integer.parseInt(mc); - } catch (NumberFormatException nfe) {} - } - return countActivePeers() < max; + return countActivePeers() < getMaxConnections(); } + public boolean haveCapacity() { + return countActivePeers() < getMaxConnections() * 4 / 5; + } void sendComplete(OutNetMessage msg) { _finisher.add(msg); } /** async afterSend call, which can take some time w/ jobs, etc */ @@ -581,7 +576,10 @@ public class NTCPTransport extends TransportImpl { long totalRecv = 0; StringBuffer buf = new StringBuffer(512); - buf.append("NTCP connections: ").append(peers.size()).append("
\n"); + buf.append("NTCP connections: ").append(peers.size()); + buf.append(" limit: ").append(getMaxConnections()); + buf.append(" timeout: ").append(DataHelper.formatDuration(_pumper.getIdleTimeout())); + buf.append("
\n"); buf.append("\n"); buf.append(" "); buf.append(" "); diff --git a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java index f7ca62cc2..6ab159408 100644 --- a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java +++ b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java @@ -271,6 +271,8 @@ public class EstablishmentManager { _log.warn("Receive session request from blocklisted IP: " + from); return; // drop the packet } + if (!_transport.allowConnection()) + return; // drop the packet state = new InboundEstablishState(_context, from.getIP(), from.getPort(), _transport.getLocalPort()); state.receiveSessionRequest(reader.getSessionRequestReader()); isNew = true; diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index d40c64118..9dd328f7a 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -90,6 +90,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority /** list of RemoteHostId for peers whose packets we want to drop outright */ private List _dropList; + private int _expireTimeout; + private static final int DROPLIST_PERIOD = 10*60*1000; private static final int MAX_DROPLIST_SIZE = 256; @@ -159,6 +161,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _fragments = new OutboundMessageFragments(_context, this, _activeThrottle); _inboundFragments = new InboundMessageFragments(_context, _fragments, this); _flooder = new UDPFlooder(_context, this); + _expireTimeout = EXPIRE_TIMEOUT; _expireEvent = new ExpirePeerEvent(); _testEvent = new PeerTestEvent(); _reachabilityStatus = CommSystemFacade.STATUS_UNKNOWN; @@ -887,6 +890,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority return null; } } + if (!allowConnection()) + return null; if (_log.shouldLog(Log.DEBUG)) _log.debug("bidding on a message to an unestablished peer: " + to.toBase64()); @@ -922,6 +927,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority // in the IntroductionManager a chance to work. public static final int EXPIRE_TIMEOUT = 30*60*1000; private static final int MAX_IDLE_TIME = EXPIRE_TIMEOUT; + private static final int MIN_EXPIRE_TIMEOUT = 10*60*1000; public String getStyle() { return STYLE; } public void send(OutNetMessage msg) { @@ -1264,6 +1270,18 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority return getPeerState(dest) != null; } + public boolean allowConnection() { + synchronized (_peersByIdent) { + return _peersByIdent.size() < getMaxConnections(); + } + } + + public boolean haveCapacity() { + synchronized (_peersByIdent) { + return _peersByIdent.size() < getMaxConnections() * 4 / 5; + } + } + /** * Return our peer clock skews on this transport. * Vector composed of Long, each element representing a peer skew in seconds. @@ -1622,7 +1640,10 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority int numPeers = 0; StringBuffer buf = new StringBuffer(512); - buf.append("UDP connections: ").append(peers.size()).append("
\n"); + buf.append("UDP connections: ").append(peers.size()); + buf.append(" limit: ").append(getMaxConnections()); + buf.append(" timeout: ").append(DataHelper.formatDuration(_expireTimeout)); + buf.append("
\n"); buf.append("
peerdir
\n"); buf.append("
peer"); if (sortFlags == FLAG_ALPHA) @@ -1951,12 +1972,25 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _expireBuffer = new ArrayList(128); } public void timeReached() { - long inactivityCutoff = _context.clock().now() - EXPIRE_TIMEOUT; + // Increase allowed idle time if we are well under allowed connections, otherwise decrease + if (haveCapacity()) + _expireTimeout = Math.min(_expireTimeout + 15*1000, EXPIRE_TIMEOUT); + else + _expireTimeout = Math.max(_expireTimeout - 45*1000, MIN_EXPIRE_TIMEOUT); + long shortInactivityCutoff = _context.clock().now() - _expireTimeout; + long longInactivityCutoff = _context.clock().now() - EXPIRE_TIMEOUT; + long pingCutoff = _context.clock().now() - (2 * 60*60*1000); _expireBuffer.clear(); synchronized (_expirePeers) { int sz = _expirePeers.size(); for (int i = 0; i < sz; i++) { PeerState peer = (PeerState)_expirePeers.get(i); + long inactivityCutoff; + // if we offered to introduce them, or we used them as introducer in last 2 hours + if (peer.getWeRelayToThemAs() > 0 || peer.getIntroducerTime() > pingCutoff) + inactivityCutoff = longInactivityCutoff; + else + inactivityCutoff = shortInactivityCutoff; if ( (peer.getLastReceiveTime() < inactivityCutoff) && (peer.getLastSendTime() < inactivityCutoff) ) { _expireBuffer.add(peer); _expirePeers.remove(i); diff --git a/router/java/src/net/i2p/router/tunnel/pool/BuildExecutor.java b/router/java/src/net/i2p/router/tunnel/pool/BuildExecutor.java index 6ca93d516..e1f040b8b 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/BuildExecutor.java +++ b/router/java/src/net/i2p/router/tunnel/pool/BuildExecutor.java @@ -214,6 +214,9 @@ class BuildExecutor implements Runnable { } */ + /** Set 1.5 * LOOP_TIME < BuildRequestor.REQUEST_TIMEOUT/4 - margin */ + private static final int LOOP_TIME = 1000; + public void run() { _isRunning = true; List wanted = new ArrayList(8); @@ -316,7 +319,7 @@ class BuildExecutor implements Runnable { //if (_log.shouldLog(Log.DEBUG)) // _log.debug("Nothin' doin (allowed=" + allowed + ", wanted=" + wanted.size() + ", pending=" + pendingRemaining + "), wait for a while"); //if (allowed <= 0) - _currentlyBuilding.wait(2000 + _context.random().nextInt(2*1000)); + _currentlyBuilding.wait((LOOP_TIME/2) + _context.random().nextInt(LOOP_TIME)); //else // wanted <= 0 // _currentlyBuilding.wait(_context.random().nextInt(30*1000)); } diff --git a/router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java b/router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java index c5121e7c8..3c2a9dd20 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java +++ b/router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java @@ -498,8 +498,15 @@ class BuildHandler { } } + /* + * Being a IBGW or OBEP generally leads to more connections, so if we are + * approaching our connection limit (i.e. !haveCapacity()), + * reject this request. + */ if (response == 0 && (isInGW || isOutEnd) && - Boolean.valueOf(_context.getProperty(PROP_REJECT_NONPARTICIPANT)).booleanValue()) { + (Boolean.valueOf(_context.getProperty(PROP_REJECT_NONPARTICIPANT)).booleanValue() || + ! _context.commSystem().haveCapacity())) { + _context.throttle().setTunnelStatus("Rejecting tunnels: Connection limit"); response = TunnelHistory.TUNNEL_REJECT_BANDWIDTH; }