From 57801202fd9c990fd28fc83a5e36036856e8526c Mon Sep 17 00:00:00 2001 From: jrandom Date: Fri, 25 Jun 2004 18:14:12 +0000 Subject: [PATCH] flush the protocol flag explicitly make the tcp connection handler nonblocking by adding another (very short lived) thread - this prevents a peer connecting to us that is very slow (or unconnectable) from forcing other cons to timeout completely ripped out the fscking bandwidth limiter until i get it more reliable gave threads more explicit names (for the sim) logging --- .../tcp/RestrictiveTCPConnection.java | 10 ++- .../router/transport/tcp/SocketCreator.java | 12 +++- .../router/transport/tcp/TCPConnection.java | 5 +- .../i2p/router/transport/tcp/TCPListener.java | 71 +++++++++++++------ .../router/transport/tcp/TCPTransport.java | 7 +- 5 files changed, 73 insertions(+), 32 deletions(-) diff --git a/router/java/src/net/i2p/router/transport/tcp/RestrictiveTCPConnection.java b/router/java/src/net/i2p/router/transport/tcp/RestrictiveTCPConnection.java index 147982ab4..770e9a5bf 100644 --- a/router/java/src/net/i2p/router/transport/tcp/RestrictiveTCPConnection.java +++ b/router/java/src/net/i2p/router/transport/tcp/RestrictiveTCPConnection.java @@ -223,7 +223,7 @@ class RestrictiveTCPConnection extends TCPConnection { SocketCreator creator = new SocketCreator(peer.getHost(), peer.getPort(), false); I2PThread sockCreator = new I2PThread(creator); sockCreator.setDaemon(true); - sockCreator.setName("PeerCallback"); + sockCreator.setName("PeerCallback:" + _transport.getListenPort()); sockCreator.setPriority(I2PThread.MIN_PRIORITY); sockCreator.start(); @@ -294,8 +294,12 @@ class RestrictiveTCPConnection extends TCPConnection { if (_log.shouldLog(Log.INFO)) _log.info("TCP connection " + _id + " established with " + _remoteIdentity.getHash().toBase64()); - _in = new AESInputStream(_context, new BandwidthLimitedInputStream(_context, _in, _remoteIdentity), _key, _iv); - _out = new AESOutputStream(_context, new BufferedOutputStream(new BandwidthLimitedOutputStream(_context, _out, _remoteIdentity), BUF_SIZE), _key, _iv); + //_in = new AESInputStream(_context, new BandwidthLimitedInputStream(_context, _in, _remoteIdentity), _key, _iv); + //_out = new AESOutputStream(_context, new BufferedOutputStream(new BandwidthLimitedOutputStream(_context, _out, _remoteIdentity), BUF_SIZE), _key, _iv); + //_in = new BandwidthLimitedInputStream(_context, _in, _remoteIdentity); + //_out = new BufferedOutputStream(new BandwidthLimitedOutputStream(_context, _out, _remoteIdentity), BUF_SIZE); + _in = new AESInputStream(_context, _in, _key, _iv); + _out = new AESOutputStream(_context, _out, _key, _iv); _socket.setSoTimeout(0); success = _context.clock().now(); established(); diff --git a/router/java/src/net/i2p/router/transport/tcp/SocketCreator.java b/router/java/src/net/i2p/router/transport/tcp/SocketCreator.java index 61497c0ab..787ce9b63 100644 --- a/router/java/src/net/i2p/router/transport/tcp/SocketCreator.java +++ b/router/java/src/net/i2p/router/transport/tcp/SocketCreator.java @@ -1,6 +1,7 @@ package net.i2p.router.transport.tcp; import java.io.IOException; +import java.io.OutputStream; import java.net.Socket; import java.net.UnknownHostException; @@ -31,6 +32,8 @@ class SocketCreator implements Runnable { /** the first byte sent and received must be 0x2A */ public final static int I2P_FLAG = 0x2A; + /** sent if we arent trying to talk */ + private final static int NOT_I2P_FLAG = 0x2B; public void run() { if (_keepOpen) { @@ -45,7 +48,9 @@ class SocketCreator implements Runnable { _socket = new Socket(_host, _port); if (_log.shouldLog(Log.DEBUG)) _log.debug("Socket created"); - _socket.getOutputStream().write(I2P_FLAG); + OutputStream os = _socket.getOutputStream(); + os.write(I2P_FLAG); + os.flush(); if (_log.shouldLog(Log.DEBUG)) _log.debug("I2P flag sent"); int val = _socket.getInputStream().read(); @@ -86,6 +91,11 @@ class SocketCreator implements Runnable { _socket = new Socket(_host, _port); if (_log.shouldLog(Log.DEBUG)) _log.debug("Socket created (but we're not sending the flag, since we're just testing them)"); + + OutputStream os = _socket.getOutputStream(); + os.write(NOT_I2P_FLAG); + os.flush(); + int val = _socket.getInputStream().read(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Value read: [" + val + "] == flag? [" + I2P_FLAG + "]"); diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java b/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java index a2b1209da..61a800916 100644 --- a/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java +++ b/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java @@ -244,7 +244,7 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener { t.setName("Run Conn [" + _id + "]"); t.setDaemon(true); t.start(); - _reader = new I2NPMessageReader(_context, _in, this, "TCP Read [" + _id + "]"); + _reader = new I2NPMessageReader(_context, _in, this, "TCP Read [" + _id + ":" + _transport.getListenPort() + "]"); _reader.startReading(); } @@ -351,7 +351,8 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener { OutNetMessage msg = (OutNetMessage)iter.next(); msg.timestamp("TCPTransport.closeConnection caused fail"); if (_log.shouldLog(Log.WARN)) - _log.warn("Connection closed while the message was sitting on the TCP Connection's queue! too slow by: " + _log.warn("Connection closed to " + _remoteIdentity.getHash().toBase64() + + " while the message was sitting on the TCP Connection's queue! too slow by: " + (now-msg.getExpiration()) + "ms: " + msg); _transport.afterSend(msg, false, false); } diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPListener.java b/router/java/src/net/i2p/router/transport/tcp/TCPListener.java index 0d5d349e9..781ffbd3c 100644 --- a/router/java/src/net/i2p/router/transport/tcp/TCPListener.java +++ b/router/java/src/net/i2p/router/transport/tcp/TCPListener.java @@ -9,6 +9,7 @@ package net.i2p.router.transport.tcp; */ import java.io.IOException; +import java.io.OutputStream; import java.net.InetAddress; import java.net.ServerSocket; import java.net.Socket; @@ -124,26 +125,8 @@ class TCPListener { if (_log.shouldLog(Log.INFO)) _log.info("Connection handled on " + _myAddress.getHost() + ":" + _myAddress.getPort() + " with " + s.getInetAddress().toString() + ":" + s.getPort()); - TimedHandler h = new TimedHandler(s); - I2PThread t = new I2PThread(h); - t.setDaemon(true); - t.start(); - synchronized (h) { - h.wait(HANDLE_TIMEOUT); - } - if (h.wasSuccessful()) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Handle successful"); - } else { - if (h.receivedIdentByte()) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Unable to handle in the time allotted"); - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Peer didn't send the ident byte, so either they were testing us, or portscanning"); - } - try { s.close(); } catch (IOException ioe) {} - } + handle(s); + } catch (SocketException se) { _log.error("Error handling a connection - closed?", se); return; @@ -154,8 +137,48 @@ class TCPListener { } } + private void handle(Socket s) { + I2PThread t = new I2PThread(new BlockingHandler(s)); + t.setDaemon(true); + t.setName("BlockingHandler:"+_transport.getListenPort()); + t.start(); + } + + private class BlockingHandler implements Runnable { + private Socket _handledSocket; + public BlockingHandler(Socket socket) { + _handledSocket = socket; + } + public void run() { + TimedHandler h = new TimedHandler(_handledSocket); + I2PThread t = new I2PThread(h); + t.setDaemon(true); + t.start(); + try { + synchronized (h) { + h.wait(HANDLE_TIMEOUT); + } + } catch (InterruptedException ie) { + // got through early... + } + if (h.wasSuccessful()) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Handle successful"); + } else { + if (h.receivedIdentByte()) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Unable to handle in the time allotted"); + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Peer didn't send the ident byte, so either they were testing us, or portscanning"); + } + try { _handledSocket.close(); } catch (IOException ioe) {} + } + } + } + /** if we're not making progress in 30s, drop 'em */ - private final static long HANDLE_TIMEOUT = 30*1000; + private final static long HANDLE_TIMEOUT = 10*1000; private static volatile int __handlerId = 0; private class TimedHandler implements Runnable { @@ -170,9 +193,11 @@ class TCPListener { _receivedIdentByte = false; } public void run() { - Thread.currentThread().setName("TimedHandler"+_handlerId); + Thread.currentThread().setName("TimedHandler"+_handlerId + ':' + _transport.getListenPort()); try { - _socket.getOutputStream().write(SocketCreator.I2P_FLAG); + OutputStream os = _socket.getOutputStream(); + os.write(SocketCreator.I2P_FLAG); + os.flush(); if (_log.shouldLog(Log.DEBUG)) _log.debug("listener: I2P flag sent"); int val = _socket.getInputStream().read(); diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java b/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java index 5ccbecdda..b5e49a601 100644 --- a/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java +++ b/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java @@ -101,6 +101,7 @@ public class TCPTransport extends TransportImpl { boolean getListenAddressIsValid() { return _listenAddressIsValid; } SigningPrivateKey getMySigningKey() { return _context.keyManager().getSigningPrivateKey(); } + int getListenPort() { return _listenPort; } /** fetch all of our TCP listening addresses */ TCPAddress[] getMyAddresses() { @@ -235,7 +236,7 @@ public class TCPTransport extends TransportImpl { SocketCreator creator = new SocketCreator(host, port); I2PThread sockCreator = new I2PThread(creator); sockCreator.setDaemon(true); - sockCreator.setName("SocketCreator"); + sockCreator.setName("SocketCreator_:" + _listenPort); sockCreator.setPriority(I2PThread.MIN_PRIORITY); sockCreator.start(); @@ -432,7 +433,7 @@ public class TCPTransport extends TransportImpl { } if (_log.shouldLog(Log.INFO)) - _log.info("Connection established with " + ident); + _log.info("Connection established with " + ident + " after " + (afterEstablish-start) + "ms"); if (target != null) { if (!target.getIdentity().equals(ident)) { _context.statManager().updateFrequency("tcp.acceptFailureFrequency"); @@ -574,7 +575,7 @@ public class TCPTransport extends TransportImpl { public int getId() { return _id; } public void run() { - Thread.currentThread().setName("Conn Establisher" + _id); + Thread.currentThread().setName("Conn Establisher" + _id + ':' + _listenPort); while (_running) { try {