From 1ac18ba10e625a246b830e071725140e3ba3df82 Mon Sep 17 00:00:00 2001 From: jrandom Date: Wed, 29 Dec 2004 20:06:43 +0000 Subject: [PATCH] 2004-12-29 jrandom * Add in a new keepalive event on each TCP connection, proactively sending a (tiny) time message every minute or two, as well as killing the connection if no message has been fully sent within 5 minutes or so. This should help deal with hung connections from IP address changes. --- history.txt | 8 ++- .../src/net/i2p/router/RouterVersion.java | 4 +- .../transport/tcp/ConnectionRunner.java | 51 ++++++++++++++++++- .../router/transport/tcp/TCPConnection.java | 1 + 4 files changed, 60 insertions(+), 4 deletions(-) diff --git a/history.txt b/history.txt index 47a9ddac4..691d7a15b 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,10 @@ -$Id: history.txt,v 1.118 2004/12/21 13:23:03 jrandom Exp $ +$Id: history.txt,v 1.119 2004/12/29 10:53:30 jrandom Exp $ + +2004-12-29 jrandom + * Add in a new keepalive event on each TCP connection, proactively sending + a (tiny) time message every minute or two, as well as killing the + connection if no message has been fully sent within 5 minutes or so. + This should help deal with hung connections from IP address changes. 2004-12-28 jrandom * Cleaned up the resending and choking algorithm in the streaming lib. diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index b8a796ff2..1e1b45724 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.123 $ $Date: 2004/12/21 13:23:03 $"; + public final static String ID = "$Revision: 1.124 $ $Date: 2004/12/29 10:53:28 $"; public final static String VERSION = "0.4.2.5"; - public final static long BUILD = 1; + public final static long BUILD = 2; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/transport/tcp/ConnectionRunner.java b/router/java/src/net/i2p/router/transport/tcp/ConnectionRunner.java index c3a736778..6832c2539 100644 --- a/router/java/src/net/i2p/router/transport/tcp/ConnectionRunner.java +++ b/router/java/src/net/i2p/router/transport/tcp/ConnectionRunner.java @@ -4,7 +4,9 @@ import java.io.IOException; import java.io.OutputStream; import java.util.Date; +import net.i2p.data.DataHelper; import net.i2p.data.DataFormatException; +import net.i2p.data.RouterInfo; import net.i2p.data.i2np.I2NPMessage; import net.i2p.data.i2np.DeliveryStatusMessage; import net.i2p.router.OutNetMessage; @@ -12,6 +14,7 @@ import net.i2p.router.Router; import net.i2p.router.RouterContext; import net.i2p.util.I2PThread; import net.i2p.util.Log; +import net.i2p.util.SimpleTimer; /** * Push out I2NPMessages across the wire @@ -24,6 +27,7 @@ class ConnectionRunner implements Runnable { private boolean _keepRunning; private byte _writeBuffer[]; private long _lastTimeSend; + private long _lastWrite; private static final long TIME_SEND_FREQUENCY = 60*1000; @@ -32,6 +36,7 @@ class ConnectionRunner implements Runnable { _log = ctx.logManager().getLog(ConnectionRunner.class); _con = con; _keepRunning = false; + _lastWrite = ctx.clock().now(); } public void startRunning() { @@ -44,6 +49,9 @@ class ConnectionRunner implements Runnable { + _con.getRemoteRouterIdentity().calculateHash().toBase64().substring(0,6); I2PThread t = new I2PThread(this, name); t.start(); + + long delay = TIME_SEND_FREQUENCY + _context.random().nextInt(60*1000); + SimpleTimer.getInstance().addEvent(new KeepaliveEvent(), delay); } public void stopRunning() { @@ -54,7 +62,7 @@ class ConnectionRunner implements Runnable { while (_keepRunning && !_con.getIsClosed()) { OutNetMessage msg = _con.getNextMessage(); if (msg == null) { - if (_keepRunning) + if (_keepRunning && !_con.getIsClosed()) _log.error("next message is null but we should keep running?"); _con.closeConnection(); return; @@ -124,6 +132,7 @@ class ConnectionRunner implements Runnable { _con.closeConnection(); } _con.sent(msg, ok, after - before); + _lastWrite = after; } /** @@ -139,4 +148,44 @@ class ConnectionRunner implements Runnable { tm.setUniqueId(0); return tm; } + + /** + * Every few minutes, send a (tiny) message to the peer if we haven't + * spoken with them recently. This will help kill off any hung + * connections (due to IP address changes, etc). If we don't get any + * messages through in 5 minutes, kill the connection as well. + * + */ + private class KeepaliveEvent implements SimpleTimer.TimedEvent { + public void timeReached() { + if (!_keepRunning) return; + if (_con.getIsClosed()) return; + long timeSinceWrite = _context.clock().now() - _lastWrite; + if (timeSinceWrite > 5*TIME_SEND_FREQUENCY) { + TCPTransport t = _con.getTransport(); + t.addConnectionErrorMessage("Connection closed with " + + _con.getRemoteRouterIdentity().getHash().toBase64().substring(0,6) + + " due to " + DataHelper.formatDuration(timeSinceWrite) + + " of inactivity after " + + DataHelper.formatDuration(_con.getLifetime())); + _con.closeConnection(); + return; + } + if (_lastTimeSend < _context.clock().now() - 2*TIME_SEND_FREQUENCY) + enqueueTimeMessage(); + long delay = 2*TIME_SEND_FREQUENCY + _context.random().nextInt((int)TIME_SEND_FREQUENCY); + SimpleTimer.getInstance().addEvent(KeepaliveEvent.this, delay); + } + } + + private void enqueueTimeMessage() { + OutNetMessage msg = new OutNetMessage(_context); + RouterInfo ri = _context.netDb().lookupRouterInfoLocally(_con.getRemoteRouterIdentity().getHash()); + if (ri == null) return; + msg.setTarget(ri); + msg.setExpiration(_context.clock().now() + TIME_SEND_FREQUENCY); + msg.setMessage(buildTimeMessage()); + msg.setPriority(100); + _con.addMessage(msg); + } } diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java b/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java index 77c89f742..79e87bfcf 100644 --- a/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java +++ b/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java @@ -86,6 +86,7 @@ public class TCPConnection { /** skew that the other peer has from our clock */ public long getOffsetReceived() { return _offsetReceived; } public void setOffsetReceived(long ms) { _offsetReceived = ms; } + public TCPTransport getTransport() { return _transport; } /** * Actually start processing the messages on the connection (and reading