diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java b/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java index 1797b376a..d38104858 100644 --- a/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java +++ b/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java @@ -63,16 +63,12 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener { protected SessionKey _key; protected ByteArray _extraBytes; protected byte[] _iv; - protected int _maxQueuedMessages; private long _lastSliceRun; private boolean _closed; private boolean _weInitiated; private long _created; protected RouterContext _context; - public final static String PARAM_MAX_QUEUED_MESSAGES = "i2np.tcp.maxQueuedMessages"; - private final static int DEFAULT_MAX_QUEUED_MESSAGES = 20; - public TCPConnection(RouterContext context, Socket s, boolean locallyInitiated) { _context = context; _log = context.logManager().getLog(TCPConnection.class); @@ -101,7 +97,6 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener { _remotePort = s.getPort(); if (_log.shouldLog(Log.INFO)) _log.info("Connected with peer: " + _remoteHost + ":" + _remotePort); - updateMaxQueuedMessages(); } /** how long has this connection been around for, or -1 if it isn't established yet */ @@ -114,21 +109,6 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener { protected boolean weInitiatedConnection() { return _weInitiated; } - private void updateMaxQueuedMessages() { - String str = _context.router().getConfigSetting(PARAM_MAX_QUEUED_MESSAGES); - if ( (str != null) && (str.trim().length() > 0) ) { - try { - int max = Integer.parseInt(str); - _maxQueuedMessages = max; - return; - } catch (NumberFormatException nfe) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Invalid max queued messages [" + str + "]"); - } - } - _maxQueuedMessages = DEFAULT_MAX_QUEUED_MESSAGES; - } - public RouterIdentity getRemoteRouterIdentity() { return _remoteIdentity; } int getId() { return _id; } int getPendingMessageCount() { @@ -265,20 +245,28 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener { long beforeAdd = _context.clock().now(); StringBuffer pending = new StringBuffer(64); synchronized (_toBeSent) { - if ( (_maxQueuedMessages > 0) && (_toBeSent.size() >= _maxQueuedMessages) ) { - fail = true; - } else { + for (int i = 0; i < _toBeSent.size(); i++) { + OutNetMessage cur = (OutNetMessage)_toBeSent.get(i); + if (cur.getExpiration() < beforeAdd) { + fail = true; + break; + } + } + if (!fail) { _toBeSent.add(msg); - // the ConnectionRunner.processSlice does a wait() until we have messages } totalPending = _toBeSent.size(); pending.append(totalPending).append(": "); - for (int i = 0; i < totalPending; i++) { - OutNetMessage cur = (OutNetMessage)_toBeSent.get(i); - pending.append(cur.getMessageSize()).append(" byte "); - pending.append(cur.getMessageType()).append(" message added"); - pending.append(" added ").append(cur.getLifetime()).append(" ms ago, "); + if (fail) { + for (int i = 0; i < totalPending; i++) { + OutNetMessage cur = (OutNetMessage)_toBeSent.get(i); + pending.append(cur.getMessageSize()).append(" byte "); + pending.append(cur.getMessageType()).append(" message added"); + pending.append(" added ").append(cur.getLifetime()).append(" ms ago, "); + } } + + // the ConnectionRunner.processSlice does a wait() until we have messages _toBeSent.notifyAll(); } long afterAdd = _context.clock().now(); @@ -287,12 +275,12 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener { if (fail) { if (_log.shouldLog(Log.ERROR)) - _log.error("too many queued messages to " + _remoteIdentity.getHash().toBase64() + ": " + pending.toString()); + _log.error("messages expired on the queue to " + _remoteIdentity.getHash().toBase64() + ": " + pending.toString()); // do we really want to give them a comm error because they're so.damn.slow reading their stream? _context.profileManager().commErrorOccurred(_remoteIdentity.getHash()); - msg.timestamp("TCPConnection.addMessage exceeded max queued"); + msg.timestamp("TCPConnection.addMessage saw an expired queued message"); _transport.afterSend(msg, false); // should we really be closing a connection if they're that slow? // yeah, i think we should.