reduce the max slice time (aka max time to pump out a message + some cleanup) to 60 seconds

close connections to peers who are so slow that they leave messages on the queue to expire
reduce the default max queue size per connection to 10 messages
(as always, this is a configurable param, via "i2np.tcp.maxQueuedMessages" in router.config)
This commit is contained in:
jrandom
2004-06-20 00:44:43 +00:00
committed by zzz
parent baedcdb2c1
commit 95a7938328

View File

@@ -68,7 +68,7 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
protected RouterContext _context;
public final static String PARAM_MAX_QUEUED_MESSAGES = "i2np.tcp.maxQueuedMessages";
private final static int DEFAULT_MAX_QUEUED_MESSAGES = 20;
private final static int DEFAULT_MAX_QUEUED_MESSAGES = 10;
public TCPConnection(RouterContext context, Socket s, boolean locallyInitiated) {
_context = context;
@@ -267,7 +267,8 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
pending.append(totalPending).append(": ");
for (int i = 0; i < totalPending; i++) {
OutNetMessage cur = (OutNetMessage)_toBeSent.get(i);
pending.append(cur.getMessage().getClass().getName());
pending.append(cur.getMessageSize()).append(" byte ");
pending.append(cur.getMessageType()).append(" message added");
pending.append(" added ").append(cur.getLifetime()).append(" ms ago, ");
}
_toBeSent.notifyAll();
@@ -285,6 +286,9 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
msg.timestamp("TCPConnection.addMessage exceeded max queued");
_transport.afterSend(msg, false);
// should we really be closing a connection if they're that slow?
// yeah, i think we should.
closeConnection();
return;
}
@@ -347,7 +351,7 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
if (_log.shouldLog(Log.WARN))
_log.warn("Connection closed while the message was sitting on the TCP Connection's queue! too slow by: "
+ (now-msg.getExpiration()) + "ms: " + msg);
_transport.afterSend(msg, false);
_transport.afterSend(msg, false, false);
}
_toBeSent.clear();
}
@@ -395,7 +399,7 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
public void readError(I2NPMessageReader reader, Exception error) {
if (_log.shouldLog(Log.ERROR))
_log.error("Error reading from stream to " + _remoteIdentity.getHash().toBase64());
_log.error("Error reading from stream to " + _remoteIdentity.getHash().toBase64() + ": " + error.getMessage());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Error reading from stream to " + _remoteIdentity.getHash().toBase64(), error);
}
@@ -406,7 +410,7 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
* larger messages - perhaps this min-throughput should be implemented on the
* output stream as part of the throttling code? hmmm)
*/
private final static long MAX_SLICE_DURATION = 120*1000;
private final static long MAX_SLICE_DURATION = 60*1000;
/**
* Determine if the connection runner is hanging while running its slices. This can
* occur if there's a broken TCP connection that hasn't timed out yet (3 minutes later..)
@@ -451,11 +455,11 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
synchronized (_toBeSent) {
// loop through, dropping expired messages, waiting until a non-expired
// one is added, or 30 seconds have passed (catchall in case things bork)
// one is added, or 10 seconds have passed (catchall in case things bork)
while (msg == null) {
if (_toBeSent.size() <= 0) {
try {
_toBeSent.wait(30*1000);
_toBeSent.wait(10*1000);
} catch (InterruptedException ie) {}
}
remaining = _toBeSent.size();
@@ -507,8 +511,10 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
_log.debug("Sending " + data.length + " bytes in the slice... to "
+ _remoteIdentity.getHash().toBase64());
long beforeWrite = 0;
try {
synchronized (_out) {
beforeWrite = _context.clock().now();
_out.write(data);
_out.flush();
}
@@ -519,17 +525,21 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
return false;
}
msg.timestamp("TCPConnection.runner.processSlice sent and flushed");
long exp = msg.getMessage().getMessageExpiration().getTime();
long end = _context.clock().now();
long timeLeft = exp - end;
msg.timestamp("TCPConnection.runner.processSlice sent and flushed");
long timeLeft = msg.getMessage().getMessageExpiration().getTime() - end;
if (_log.shouldLog(Log.INFO))
_log.info("Message " + msg.getMessage().getClass().getName()
+ " (expiring in " + timeLeft + "ms) sent to "
+ _remoteIdentity.getHash().toBase64() + " from "
+ _context.routerHash().toBase64()
+ " over connection " + _id + " with " + data.length
+ " bytes in " + (end - start) + "ms");
+ " bytes in " + (end - afterExpire) + "ms (write took "
+ (end - beforeWrite) + "ms, prepare took "
+ (beforeWrite - afterExpire) + "ms)");
if (timeLeft < 10*1000) {
if (_log.shouldLog(Log.DEBUG))
_log.warn("Very little time left... time to send [" + (end-start)
@@ -549,7 +559,7 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Processing slice - message sent completely: "
+ msg.getMessage().getClass().getName() + " to "
+ msg.getMessageSize() + " byte " + msg.getMessageType() + " message to "
+ _remoteIdentity.getHash().toBase64());
if (end - afterExpire > 1000) {
if (_log.shouldLog(Log.WARN))