From de21a5ec48120c82e81016c44d0b817165045a2b Mon Sep 17 00:00:00 2001 From: zzz Date: Sat, 15 Nov 2008 20:57:52 +0000 Subject: [PATCH] drop old tcp transport and old tunnel build sources --- .../transport/CommSystemFacadeImpl.java | 35 - .../router/transport/TransportManager.java | 10 - .../transport/tcp/ConnectionBuilder.java | 795 --------------- .../transport/tcp/ConnectionHandler.java | 913 ------------------ .../transport/tcp/ConnectionRunner.java | 218 ----- .../transport/tcp/ConnectionTagManager.java | 107 -- .../router/transport/tcp/MessageHandler.java | 78 -- .../tcp/PersistentConnectionTagManager.java | 200 ---- .../i2p/router/transport/tcp/TCPAddress.java | 177 ---- .../router/transport/tcp/TCPConnection.java | 450 --------- .../tcp/TCPConnectionEstablisher.java | 69 -- .../i2p/router/transport/tcp/TCPListener.java | 338 ------- .../router/transport/tcp/TCPTransport.java | 851 ---------------- .../net/i2p/router/transport/tcp/package.html | 144 --- .../pool/HandleTunnelCreateMessageJob.java | 224 ----- .../pool/TunnelMessageHandlerBuilder.java | 48 - 16 files changed, 4657 deletions(-) delete mode 100644 router/java/src/net/i2p/router/transport/tcp/ConnectionBuilder.java delete mode 100644 router/java/src/net/i2p/router/transport/tcp/ConnectionHandler.java delete mode 100644 router/java/src/net/i2p/router/transport/tcp/ConnectionRunner.java delete mode 100644 router/java/src/net/i2p/router/transport/tcp/ConnectionTagManager.java delete mode 100644 router/java/src/net/i2p/router/transport/tcp/MessageHandler.java delete mode 100644 router/java/src/net/i2p/router/transport/tcp/PersistentConnectionTagManager.java delete mode 100644 router/java/src/net/i2p/router/transport/tcp/TCPAddress.java delete mode 100644 router/java/src/net/i2p/router/transport/tcp/TCPConnection.java delete mode 100644 router/java/src/net/i2p/router/transport/tcp/TCPConnectionEstablisher.java delete mode 100644 router/java/src/net/i2p/router/transport/tcp/TCPListener.java delete mode 100644 router/java/src/net/i2p/router/transport/tcp/TCPTransport.java delete mode 100644 router/java/src/net/i2p/router/transport/tcp/package.html delete mode 100644 router/java/src/net/i2p/router/tunnel/pool/HandleTunnelCreateMessageJob.java delete mode 100644 router/java/src/net/i2p/router/tunnel/pool/TunnelMessageHandlerBuilder.java diff --git a/router/java/src/net/i2p/router/transport/CommSystemFacadeImpl.java b/router/java/src/net/i2p/router/transport/CommSystemFacadeImpl.java index 2a0e7c48b..b80e05694 100644 --- a/router/java/src/net/i2p/router/transport/CommSystemFacadeImpl.java +++ b/router/java/src/net/i2p/router/transport/CommSystemFacadeImpl.java @@ -26,7 +26,6 @@ import net.i2p.router.OutNetMessage; import net.i2p.router.RouterContext; import net.i2p.router.transport.ntcp.NTCPAddress; import net.i2p.router.transport.ntcp.NTCPTransport; -import net.i2p.router.transport.tcp.TCPTransport; import net.i2p.router.transport.udp.UDPAddress; import net.i2p.util.Log; @@ -159,11 +158,6 @@ public class CommSystemFacadeImpl extends CommSystemFacade { newCreated = true; } - if (!addresses.containsKey(TCPTransport.STYLE)) { - RouterAddress addr = createTCPAddress(); - if (addr != null) - addresses.put(TCPTransport.STYLE, addr); - } if (!addresses.containsKey(NTCPTransport.STYLE)) { RouterAddress addr = createNTCPAddress(_context); if (_log.shouldLog(Log.INFO)) @@ -177,35 +171,6 @@ public class CommSystemFacadeImpl extends CommSystemFacade { return new HashSet(addresses.values()); } - private final static String PROP_I2NP_TCP_HOSTNAME = "i2np.tcp.hostname"; - private final static String PROP_I2NP_TCP_PORT = "i2np.tcp.port"; - private final static String PROP_I2NP_TCP_DISABLED = "i2np.tcp.disable"; - - private RouterAddress createTCPAddress() { - if (!TransportManager.ALLOW_TCP) return null; - RouterAddress addr = new RouterAddress(); - addr.setCost(10); - addr.setExpiration(null); - Properties props = new Properties(); - String name = _context.router().getConfigSetting(PROP_I2NP_TCP_HOSTNAME); - String port = _context.router().getConfigSetting(PROP_I2NP_TCP_PORT); - String disabledStr = _context.router().getConfigSetting(PROP_I2NP_TCP_DISABLED); - boolean disabled = false; - if ( (disabledStr == null) || ("true".equalsIgnoreCase(disabledStr)) ) - return null; - if ( (name == null) || (port == null) ) { - //_log.info("TCP Host/Port not specified in config file - skipping TCP transport"); - return null; - } else { - _log.info("Creating TCP address on " + name + ":" + port); - } - props.setProperty("host", name); - props.setProperty("port", port); - addr.setOptions(props); - addr.setTransportStyle(TCPTransport.STYLE); - return addr; - } - public final static String PROP_I2NP_NTCP_HOSTNAME = "i2np.ntcp.hostname"; public final static String PROP_I2NP_NTCP_PORT = "i2np.ntcp.port"; public final static String PROP_I2NP_NTCP_AUTO_PORT = "i2np.ntcp.autoip"; diff --git a/router/java/src/net/i2p/router/transport/TransportManager.java b/router/java/src/net/i2p/router/transport/TransportManager.java index 41ec2fedf..5979961bb 100644 --- a/router/java/src/net/i2p/router/transport/TransportManager.java +++ b/router/java/src/net/i2p/router/transport/TransportManager.java @@ -28,7 +28,6 @@ import net.i2p.router.CommSystemFacade; import net.i2p.router.OutNetMessage; import net.i2p.router.RouterContext; import net.i2p.router.transport.ntcp.NTCPTransport; -import net.i2p.router.transport.tcp.TCPTransport; import net.i2p.router.transport.udp.UDPTransport; import net.i2p.util.Log; @@ -70,15 +69,6 @@ public class TransportManager implements TransportEventListener { static final boolean ALLOW_TCP = false; private void configTransports() { - String disableTCP = _context.router().getConfigSetting(PROP_DISABLE_TCP); - // Unless overridden by constant or explicit config property, start TCP tranport - if ( !ALLOW_TCP || ((disableTCP != null) && (Boolean.TRUE.toString().equalsIgnoreCase(disableTCP))) ) { - _log.info("Explicitly disabling the TCP transport!"); - } else { - Transport t = new TCPTransport(_context); - t.setListener(this); - _transports.add(t); - } String enableUDP = _context.router().getConfigSetting(PROP_ENABLE_UDP); if (enableUDP == null) enableUDP = DEFAULT_ENABLE_UDP; diff --git a/router/java/src/net/i2p/router/transport/tcp/ConnectionBuilder.java b/router/java/src/net/i2p/router/transport/tcp/ConnectionBuilder.java deleted file mode 100644 index 62ad79c18..000000000 --- a/router/java/src/net/i2p/router/transport/tcp/ConnectionBuilder.java +++ /dev/null @@ -1,795 +0,0 @@ -package net.i2p.router.transport.tcp; - -import java.io.BufferedOutputStream; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.Socket; -import java.net.SocketException; -import java.util.Date; -import java.util.Properties; - -import net.i2p.crypto.AESInputStream; -import net.i2p.crypto.AESOutputStream; -import net.i2p.crypto.DHSessionKeyBuilder; -import net.i2p.data.Base64; -import net.i2p.data.ByteArray; -import net.i2p.data.DataFormatException; -import net.i2p.data.DataHelper; -import net.i2p.data.Hash; -import net.i2p.data.RouterAddress; -import net.i2p.data.RouterInfo; -import net.i2p.data.SessionKey; -import net.i2p.data.Signature; -import net.i2p.router.RouterContext; -import net.i2p.router.transport.BandwidthLimitedInputStream; -import net.i2p.router.transport.BandwidthLimitedOutputStream; -import net.i2p.util.I2PThread; -import net.i2p.util.Log; -import net.i2p.util.SimpleTimer; - -/** - * Class responsible for all of the handshaking necessary to establish a - * connection with a peer. - * - */ -public class ConnectionBuilder { - private Log _log; - private RouterContext _context; - private TCPTransport _transport; - /** who we're trying to talk with */ - private RouterInfo _target; - /** who we're actually talking with */ - private RouterInfo _actualPeer; - /** raw socket to the peer */ - private Socket _socket; - /** raw stream to read from the peer */ - private InputStream _rawIn; - /** raw stream to write to the peer */ - private OutputStream _rawOut; - /** secure stream to read from the peer */ - private InputStream _connectionIn; - /** secure stream to write to the peer */ - private OutputStream _connectionOut; - /** protocol version agreed to, or -1 */ - private int _agreedProtocol; - /** IP address the peer says we are at */ - private String _localIP; - /** IP address of the peer we connected to */ - private TCPAddress _remoteAddress; - /** connection tag to identify ourselves, or null if no known tag is available */ - private ByteArray _connectionTag; - /** connection tag to identify ourselves next time */ - private ByteArray _nextConnectionTag; - /** nonce the peer gave us */ - private ByteArray _nonce; - /** key that we will be encrypting comm with */ - private SessionKey _key; - /** initialization vector for the encryption */ - private byte[] _iv; - /** - * Contains a message describing why the connection failed (or null if it - * succeeded). This should include a timestamp of some sort. - */ - private String _error; - - /** If the connection hasn't been built in 30 seconds, give up */ - public static final int CONNECTION_TIMEOUT = 20*1000; - - public static final int WRITE_BUFFER_SIZE = 2*1024; - - public ConnectionBuilder(RouterContext context, TCPTransport transport, RouterInfo info) { - _context = context; - _log = context.logManager().getLog(ConnectionBuilder.class); - _transport = transport; - _target = info; - _error = null; - _agreedProtocol = -1; - } - - /** - * Blocking call to establish a TCP connection to the given peer through a - * brand new socket. - * - * @return fully established but not yet running connection, or null on error - */ - public TCPConnection establishConnection() { - SimpleTimer.getInstance().addEvent(new DieIfTooSlow(), CONNECTION_TIMEOUT); - try { - return doEstablishConnection(); - } catch (Exception e) { // catchall in case the timeout gets us flat footed - if (_socket != null) - fail("Error connecting", e); - return null; - } - } - private TCPConnection doEstablishConnection() { - createSocket(); - if ( (_socket == null) || (_error != null) ) - return null; - - try { _socket.setSoTimeout(CONNECTION_TIMEOUT); } catch (SocketException se) {} - - negotiateProtocol(); - - if ( (_agreedProtocol < 0) || (_error != null) ) - return null; - - boolean ok = false; - if (_connectionTag != null) - ok = connectExistingSession(); - else - ok = connectNewSession(); - - if (_log.shouldLog(Log.DEBUG)) - _log.debug("connection ok? " + ok + " error: " + _error); - - if (ok && (_error == null) ) { - establishComplete(); - - try { _socket.setSoTimeout(0); } catch (SocketException se) {} - - TCPConnection con = new TCPConnection(_context); - con.setInputStream(_connectionIn); - con.setOutputStream(_connectionOut); - con.setSocket(_socket); - con.setRemoteRouterIdentity(_actualPeer.getIdentity()); - con.setRemoteAddress(_remoteAddress); - con.setAttemptedPeer(_target.getIdentity().getHash()); - con.setShownAddress(_localIP); - if (_error == null) { - if (_log.shouldLog(Log.INFO)) - _log.info("Establishment successful! returning the con"); - return con; - } else { - return null; - } - } else { - return null; - } - } - - /** - * Agree on what protocol to communicate with, and set _agreedProtocol - * accordingly. If no common protocols are available, disconnect, set - * _agreedProtocol to -1, and update the _error accordingly. - */ - private void negotiateProtocol() { - ConnectionTagManager mgr = _transport.getTagManager(); - ByteArray tag = mgr.getTag(_target.getIdentity().getHash()); - _key = mgr.getKey(_target.getIdentity().getHash()); - _connectionTag = tag; - boolean ok = sendPreferredProtocol(); - if (!ok) return; - ok = receiveAgreedProtocol(); - if (!ok) return; - } - - - /** - * Send #bytesFollowing + #versions + v1 [+ v2 [etc]] + - * tag? + tagData + properties - */ - private boolean sendPreferredProtocol() { - try { - // #bytesFollowing + #versions + v1 [+ v2 [etc]] + tag? + tagData + properties - ByteArrayOutputStream baos = new ByteArrayOutputStream(64); - DataHelper.writeLong(baos, 1, TCPTransport.SUPPORTED_PROTOCOLS.length); - for (int i = 0; i < TCPTransport.SUPPORTED_PROTOCOLS.length; i++) { - DataHelper.writeLong(baos, 1, TCPTransport.SUPPORTED_PROTOCOLS[i]); - } - if (_connectionTag != null) { - baos.write(ConnectionHandler.FLAG_TAG_FOLLOWING); - baos.write(_connectionTag.getData()); - } else { - baos.write(ConnectionHandler.FLAG_TAG_NOT_FOLLOWING); - } - DataHelper.writeProperties(baos, null); // no options atm - byte line[] = baos.toByteArray(); - DataHelper.writeLong(_rawOut, 2, line.length); - _rawOut.write(line); - _rawOut.flush(); - - if (_log.shouldLog(Log.DEBUG)) - _log.debug("SendProtocol[X]: tag: " - + (_connectionTag != null ? Base64.encode(_connectionTag.getData()) : "none") - + " socket: " + _socket); - - return true; - } catch (IOException ioe) { - fail("Error sending our handshake to " - + _target.getIdentity().calculateHash().toBase64().substring(0,6) - + ": " + ioe.getMessage(), ioe); - return false; - } catch (DataFormatException dfe) { - fail("Error sending our handshake to " - + _target.getIdentity().calculateHash().toBase64().substring(0,6) - + ": " + dfe.getMessage(), dfe); - return false; - } - } - - /** - * Receive #bytesFollowing + versionOk + #bytesIP + IP + tagOk? + nonce + properties - * - */ - private boolean receiveAgreedProtocol() { - try { - // #bytesFollowing + versionOk + #bytesIP + IP + tagOk? + nonce + properties - int numBytes = (int)DataHelper.readLong(_rawIn, 2); - if ( (numBytes <= 0) || (numBytes >= ConnectionHandler.FLAG_TEST) ) - throw new IOException("Invalid number of bytes in response"); - - byte line[] = new byte[numBytes]; - int read = DataHelper.read(_rawIn, line); - if (read != numBytes) { - fail("Handshake too short with " - + _target.getIdentity().calculateHash().toBase64().substring(0,6)); - return false; - } - - if (_log.shouldLog(Log.DEBUG)) - _log.debug("ReadProtocol1[X]: " - + "\nLine: " + Base64.encode(line)); - - ByteArrayInputStream bais = new ByteArrayInputStream(line); - - int version = (int)DataHelper.readLong(bais, 1); - for (int i = 0; i < TCPTransport.SUPPORTED_PROTOCOLS.length; i++) { - if (version == TCPTransport.SUPPORTED_PROTOCOLS[i]) { - _agreedProtocol = version; - break; - } - } - if (_agreedProtocol == ConnectionHandler.FLAG_PROTOCOL_NONE) { - fail("No valid protocol versions to contact " - + _target.getIdentity().calculateHash().toBase64().substring(0,6)); - return false; - } - - int bytesInIP = (int)DataHelper.readLong(bais, 1); - byte ip[] = new byte[bytesInIP]; - DataHelper.read(bais, ip); // ignore return value, this is an array - _localIP = new String(ip); - // if we don't already know our IP address, this may cause us - // to fire up a socket listener, so may take a few seconds. - _transport.ourAddressReceived(_localIP); - - int tagOk = (int)DataHelper.readLong(bais, 1); - if ( (tagOk == ConnectionHandler.FLAG_TAG_OK) && (_connectionTag != null) ) { - // tag is ok - } else { - _connectionTag = null; - _key = null; - } - - byte nonce[] = new byte[4]; - read = DataHelper.read(bais, nonce); - if (read != 4) { - fail("No nonce specified by " - + _target.getIdentity().calculateHash().toBase64().substring(0,6)); - return false; - } - _nonce = new ByteArray(nonce); - - Properties opts = DataHelper.readProperties(bais); - - if (_log.shouldLog(Log.DEBUG)) - _log.debug("ReadProtocol[X]: agreed=" + _agreedProtocol + " nonce: " - + Base64.encode(nonce) + " tag: " - + (_connectionTag != null ? Base64.encode(_connectionTag.getData()) : "none") - + " props: " + opts - + " socket: " + _socket - + "\nLine: " + Base64.encode(line)); - - // we dont care about any of the properties, so we can just - // ignore it, and we're done with this step - return true; - } catch (IOException ioe) { - fail("Error reading the handshake from " - + _target.getIdentity().calculateHash().toBase64().substring(0,6) - + ": " + ioe.getMessage(), ioe); - return false; - } catch (DataFormatException dfe) { - fail("Error reading the handshake from " - + _target.getIdentity().calculateHash().toBase64().substring(0,6) - + ": " + dfe.getMessage(), dfe); - return false; - } - - } - - /** Set the next tag to H(E(nonce + tag, sessionKey)) */ - private void updateNextTagExisting() { - byte pre[] = new byte[48]; - System.arraycopy(_nonce.getData(), 0, pre, 0, 4); - System.arraycopy(_connectionTag.getData(), 0, pre, 4, 32); - _context.aes().encrypt(pre, 0, pre, 0, _key, _iv, pre.length); - Hash h = _context.sha().calculateHash(pre); - _nextConnectionTag = new ByteArray(h.getData()); - } - - /** - * We have a valid tag, so use it to do the handshaking. On error, fail() - * appropriately. - * - * @return true if the connection went ok, or false if it failed. - */ - private boolean connectExistingSession() { - // iv to the SHA256 of the tag appended by the nonce. - byte data[] = new byte[36]; - System.arraycopy(_connectionTag.getData(), 0, data, 0, 32); - System.arraycopy(_nonce.getData(), 0, data, 32, 4); - Hash h = _context.sha().calculateHash(data); - _iv = new byte[16]; - System.arraycopy(h.getData(), 0, _iv, 0, 16); - - updateNextTagExisting(); - - _rawOut = new BufferedOutputStream(_rawOut, ConnectionBuilder.WRITE_BUFFER_SIZE); - - _rawOut = new AESOutputStream(_context, _rawOut, _key, _iv); - _rawIn = new AESInputStream(_context, _rawIn, _key, _iv); - - // send: H(nonce) - try { - h = _context.sha().calculateHash(_nonce.getData()); - h.writeBytes(_rawOut); - _rawOut.flush(); - } catch (IOException ioe) { - fail("Error writing the encrypted nonce to " - + _target.getIdentity().calculateHash().toBase64().substring(0,6) - + ": " + ioe.getMessage()); - return false; - } catch (DataFormatException dfe) { - fail("Error writing the encrypted nonce to " - + _target.getIdentity().calculateHash().toBase64().substring(0,6) - + ": " + dfe.getMessage()); - return false; - } - - // read: H(tag) - try { - Hash readHash = new Hash(); - readHash.readBytes(_rawIn); - - Hash expectedHash = _context.sha().calculateHash(_connectionTag.getData()); - - if (!readHash.equals(expectedHash)) { - fail("Key verification failed with " - + _target.getIdentity().calculateHash().toBase64().substring(0,6)); - return false; - } - } catch (IOException ioe) { - fail("Error reading the initial key verification from " - + _target.getIdentity().calculateHash().toBase64().substring(0,6) - + ": " + ioe.getMessage()); - return false; - } catch (DataFormatException dfe) { - fail("Error reading the initial key verification from " - + _target.getIdentity().calculateHash().toBase64().substring(0,6) - + ": " + dfe.getMessage()); - return false; - } - - // send: routerInfo + currentTime + H(routerInfo + currentTime + nonce + tag) - try { - ByteArrayOutputStream baos = new ByteArrayOutputStream(512); - _context.router().getRouterInfo().writeBytes(baos); - DataHelper.writeDate(baos, new Date(_context.clock().now())); - - _rawOut.write(baos.toByteArray()); - - baos.write(_nonce.getData()); - baos.write(_connectionTag.getData()); - Hash verification = _context.sha().calculateHash(baos.toByteArray()); - - verification.writeBytes(_rawOut); - _rawOut.flush(); - } catch (IOException ioe) { - fail("Error writing the verified info to " - + _target.getIdentity().calculateHash().toBase64().substring(0,6) - + ": " + ioe.getMessage()); - return false; - } catch (DataFormatException dfe) { - fail("Error writing the verified info to " - + _target.getIdentity().calculateHash().toBase64().substring(0,6) - + ": " + dfe.getMessage()); - return false; - } - - // read: routerInfo + status + properties - // + H(routerInfo + status + properties + nonce + tag) - try { - RouterInfo peer = new RouterInfo(); - peer.readBytes(_rawIn); - int status = (int)_rawIn.read() & 0xFF; - - Properties props = DataHelper.readProperties(_rawIn); - // ignore these now - - boolean ok = validateStatus(status, props); - if (!ok) return false; - - Hash readHash = new Hash(); - readHash.readBytes(_rawIn); - - // H(routerInfo + status + properties + nonce + tag) - ByteArrayOutputStream baos = new ByteArrayOutputStream(512); - peer.writeBytes(baos); - baos.write(status); - DataHelper.writeProperties(baos, props); - baos.write(_nonce.getData()); - baos.write(_connectionTag.getData()); - Hash expectedHash = _context.sha().calculateHash(baos.toByteArray()); - - if (!expectedHash.equals(readHash)) { - fail("Error verifying info from " - + _target.getIdentity().calculateHash().toBase64().substring(0,6) - + " (claiming to be " - + peer.getIdentity().calculateHash().toBase64().substring(0,6) - + ")"); - return false; - } - - _actualPeer = peer; - - try { - _context.netDb().store(peer.getIdentity().getHash(), peer); - return true; - } catch (IllegalArgumentException iae) { - fail("Peer sent us bad info - " + _target.getIdentity().getHash().toBase64().substring(0,6) - + ": " + iae.getMessage()); - return false; - } - } catch (IOException ioe) { - fail("Error reading the verified info from " - + _target.getIdentity().calculateHash().toBase64().substring(0,6) - + ": " + ioe.getMessage()); - return false; - } catch (DataFormatException dfe) { - fail("Error reading the verified info from " - + _target.getIdentity().calculateHash().toBase64().substring(0,6) - + ": " + dfe.getMessage()); - return false; - } - } - - /** - * We do not have a valid tag, so exchange a new one and then do the - * handshaking. On error, fail() appropriately. - * - * @return true if the connection went ok, or false if it failed. - */ - private boolean connectNewSession() { - DHSessionKeyBuilder builder = null; - try { - builder = DHSessionKeyBuilder.exchangeKeys(_rawIn, _rawOut); - } catch (IOException ioe) { - fail("Error exchanging keys with " - + _target.getIdentity().calculateHash().toBase64().substring(0,6)); - return false; - } - - // load up the key initialize the encrypted streams - _key = builder.getSessionKey(); - byte extra[] = builder.getExtraBytes().getData(); - _iv = new byte[16]; - System.arraycopy(extra, 0, _iv, 0, 16); - byte nextTag[] = new byte[32]; - System.arraycopy(extra, 16, nextTag, 0, 32); - _nextConnectionTag = new ByteArray(nextTag); - - if (_log.shouldLog(Log.DEBUG)) - _log.debug("\nNew session[X]: key=" + _key.toBase64() + " iv=" - + Base64.encode(_iv) + " nonce=" + Base64.encode(_nonce.getData()) - + " socket: " + _socket); - - _rawOut = new BufferedOutputStream(_rawOut, ConnectionBuilder.WRITE_BUFFER_SIZE); - - _rawOut = new AESOutputStream(_context, _rawOut, _key, _iv); - _rawIn = new AESInputStream(_context, _rawIn, _key, _iv); - - // send: H(nonce) - try { - Hash h = _context.sha().calculateHash(_nonce.getData()); - h.writeBytes(_rawOut); - _rawOut.flush(); - } catch (IOException ioe) { - fail("Error writing the verification to " - + _target.getIdentity().calculateHash().toBase64().substring(0,6), ioe); - return false; - } catch (DataFormatException dfe) { - fail("Error writing the verification to " - + _target.getIdentity().calculateHash().toBase64().substring(0,6), dfe); - return false; - } - - // read: H(nextTag) - try { - byte val[] = new byte[32]; - int read = DataHelper.read(_rawIn, val); - if (read != 32) { - fail("Not enough data (" + read + ") to read the verification from " - + _target.getIdentity().calculateHash().toBase64().substring(0,6)); - return false; - } - - Hash expected = _context.sha().calculateHash(_nextConnectionTag.getData()); - if (!DataHelper.eq(expected.getData(), val)) { - fail("Verification failed from " - + _target.getIdentity().calculateHash().toBase64().substring(0,6)); - return false; - } - } catch (IOException ioe) { - fail("Error reading the verification from " - + _target.getIdentity().calculateHash().toBase64().substring(0,6), ioe); - return false; - } - - // our public == X, since we are establishing the connection - byte X[] = builder.getMyPublicValueBytes(); - byte Y[] = builder.getPeerPublicValueBytes(); - - // send: routerInfo + currentTime - // + S(routerInfo + currentTime + nonce + nextTag + X + Y, routerIdent.signingKey) - try { - ByteArrayOutputStream baos = new ByteArrayOutputStream(512); - _context.router().getRouterInfo().writeBytes(baos); - DataHelper.writeDate(baos, new Date(_context.clock().now())); - - _rawOut.write(baos.toByteArray()); - - baos.write(_nonce.getData()); - baos.write(_nextConnectionTag.getData()); - baos.write(X); - baos.write(Y); - Signature sig = _context.dsa().sign(baos.toByteArray(), - _context.keyManager().getSigningPrivateKey()); - - sig.writeBytes(_rawOut); - _rawOut.flush(); - } catch (IOException ioe) { - fail("Error sending the info to " - + _target.getIdentity().calculateHash().toBase64().substring(0,6)); - return false; - } catch (DataFormatException dfe) { - fail("Error sending the info to " - + _target.getIdentity().calculateHash().toBase64().substring(0,6)); - return false; - } - - // read: routerInfo + status + properties - // + S(routerInfo + status + properties + nonce + nextTag + X + Y, routerIdent.signingKey) - try { - RouterInfo peer = new RouterInfo(); - peer.readBytes(_rawIn); - int status = (int)_rawIn.read() & 0xFF; - - Properties props = DataHelper.readProperties(_rawIn); - // ignore these now - - boolean ok = validateStatus(status, props); - if (!ok) return false; - - Signature sig = new Signature(); - sig.readBytes(_rawIn); - - // S(routerInfo + status + properties + nonce + nextTag, routerIdent.signingKey) - ByteArrayOutputStream baos = new ByteArrayOutputStream(512); - peer.writeBytes(baos); - baos.write(status); - DataHelper.writeProperties(baos, props); - baos.write(_nonce.getData()); - baos.write(_nextConnectionTag.getData()); - baos.write(X); - baos.write(Y); - ok = _context.dsa().verifySignature(sig, baos.toByteArray(), - peer.getIdentity().getSigningPublicKey()); - - if (!ok) { - fail("Error verifying info from " - + _target.getIdentity().calculateHash().toBase64().substring(0,6) - + " (claiming to be " - + peer.getIdentity().calculateHash().toBase64().substring(0,6) - + ")"); - return false; - } - - _actualPeer = peer; - - try { - _context.netDb().store(peer.getIdentity().getHash(), peer); - return true; - } catch (IllegalArgumentException iae) { - fail("Peer sent us bad info - " + _target.getIdentity().getHash().toBase64().substring(0,6) - + ": " + iae.getMessage()); - return false; - } - } catch (IOException ioe) { - fail("Error reading the verified info from " - + _target.getIdentity().calculateHash().toBase64().substring(0,6) - + ": " + ioe.getMessage()); - return false; - } catch (DataFormatException dfe) { - fail("Error reading the verified info from " - + _target.getIdentity().calculateHash().toBase64().substring(0,6) - + ": " + dfe.getMessage()); - return false; - } - } - - /** - * Is the given status value ok for an existing session? - * - * @return true if ok, false if fail()ed - */ - private boolean validateStatus(int status, Properties props) { - switch (status) { - case -1: // EOF - fail("Error reading the status from " - + _target.getIdentity().calculateHash().toBase64().substring(0,6)); - return false; - case ConnectionHandler.STATUS_OK: - return true; - case ConnectionHandler.STATUS_UNREACHABLE: - fail("According to " - + _target.getIdentity().calculateHash().toBase64().substring(0,6) - + ", we are not reachable on " + _localIP + ":" + _transport.getPort()); - return false; - case ConnectionHandler.STATUS_SKEWED: - fail("According to " - + _target.getIdentity().calculateHash().toBase64().substring(0,6) - + ", our clock is off (they think it is " + props.getProperty("SKEW") + ")"); - return false; - case ConnectionHandler.STATUS_SIGNATURE_FAILED: // (only for new sessions) - fail("Signature failure talking to " - + _target.getIdentity().calculateHash().toBase64().substring(0,6)); - return false; - default: // unknown error - fail("Unknown error [" + status + "] connecting to " - + _target.getIdentity().calculateHash().toBase64().substring(0,6)); - return false; - } - } - - /** - * Finish up the establishment (wrapping the streams, storing the netDb, - * persisting the connection tags, etc) - * - */ - private void establishComplete() { - _connectionIn = new BandwidthLimitedInputStream(_context, _rawIn, _actualPeer.getIdentity()); - OutputStream blos = new BandwidthLimitedOutputStream(_context, _rawOut, _actualPeer.getIdentity()); - _connectionOut = blos; - //_connectionIn = _rawIn; - //_connectionOut = _rawOut; - - Hash peer = _actualPeer.getIdentity().getHash(); - _transport.getTagManager().replaceTag(peer, _nextConnectionTag, _key); - } - - /** - * Build a socket to the peer, and populate _socket, _rawIn, and _rawOut - * accordingly. On error or timeout, close and null them all and - * set _error. - * - */ - private void createSocket() { - CreateSocketRunner r = new CreateSocketRunner(); - I2PThread t = new I2PThread(r); - t.start(); - try { t.join(CONNECTION_TIMEOUT); } catch (InterruptedException ie) {} - if (!r.getCreated()) { - fail("Unable to establish a socket in time to " - + _target.getIdentity().calculateHash().toBase64().substring(0,6)); - } - } - - /** Brief description of why the connection failed (or null if it succeeded) */ - public String getError() { return _error; } - - /** - * Kill the builder, closing all sockets and streams, setting everything - * back to failure states, and setting the given error. - * - */ - private void fail(String error) { - fail(error, null); - } - private void fail(String error, Exception e) { - if (_error == null) { - // only grab the first error - _error = error; - } - - if (_rawIn != null) try { _rawIn.close(); } catch (IOException ioe) {} - if (_rawOut != null) try { _rawOut.close(); } catch (IOException ioe) {} - if (_socket != null) try { _socket.close(); } catch (IOException ioe) {} - - _socket = null; - _rawIn = null; - _rawOut = null; - _agreedProtocol = -1; - _nonce = null; - _connectionTag = null; - _actualPeer = null; - - if (_log.shouldLog(Log.WARN)) - _log.warn(error, e); - } - - /** - * Lookup and establish a connection to the peer, exposing getCreate() == true - * once we are done. This allows for asynchronous timeouts without depending - * upon the interruptability of the socket (since it isn't open yet). - * - */ - private class CreateSocketRunner implements Runnable { - private boolean _created; - public CreateSocketRunner() { - _created = false; - } - - public boolean getCreated() { return _created; } - - public void run() { - if ( (_target == null) || (_transport == null) ) { - fail("Internal error - null while running"); - _log.log(Log.CRIT, "Internal error - target = " + _target + " _transport = " + _transport); - return; - } - RouterAddress addr = _target.getTargetAddress(_transport.getStyle()); - if (addr == null) { - fail("Peer " - + _target.getIdentity().calculateHash().toBase64().substring(0,6) - + " has no TCP addresses"); - return; - } - TCPAddress tcpAddr = new TCPAddress(addr); - if (tcpAddr.getPort() <= 0) { - fail("Peer " - + _target.getIdentity().calculateHash().toBase64().substring(0,6) - + " has an invalid TCP address"); - return; - } - - try { - _socket = new Socket(tcpAddr.getAddress(), tcpAddr.getPort()); - _rawIn = _socket.getInputStream(); - _rawOut = _socket.getOutputStream(); - _error = null; - _remoteAddress = tcpAddr; - _created = true; - } catch (IOException ioe) { - Hash peer = _target.getIdentity().calculateHash(); - String peerName = null; - if (peer == null) - peerName = "unknown"; - else - peerName = peer.toBase64().substring(0,6); - fail("Error contacting " - + peerName - + " on " + tcpAddr.toString() + ": " + ioe.getMessage()); - return; - } - } - } - - /** - * In addition to the socket creation timeout, we have a timed event for - * the overall connection establishment, killing everything if we haven't - * completed a connection yet. - * - */ - private class DieIfTooSlow implements SimpleTimer.TimedEvent { - public void timeReached() { - if ( (_actualPeer == null) && (_error == null) ) { - fail("Took too long to connect with " - + _target.getIdentity().calculateHash().toBase64().substring(0,6)); - } - } - } -} diff --git a/router/java/src/net/i2p/router/transport/tcp/ConnectionHandler.java b/router/java/src/net/i2p/router/transport/tcp/ConnectionHandler.java deleted file mode 100644 index 892106f3d..000000000 --- a/router/java/src/net/i2p/router/transport/tcp/ConnectionHandler.java +++ /dev/null @@ -1,913 +0,0 @@ -package net.i2p.router.transport.tcp; - -import java.io.BufferedOutputStream; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.InetAddress; -import java.net.Socket; -import java.net.SocketException; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.Properties; - -import net.i2p.crypto.AESInputStream; -import net.i2p.crypto.AESOutputStream; -import net.i2p.crypto.DHSessionKeyBuilder; -import net.i2p.data.Base64; -import net.i2p.data.ByteArray; -import net.i2p.data.DataFormatException; -import net.i2p.data.DataHelper; -import net.i2p.data.Hash; -import net.i2p.data.RouterInfo; -import net.i2p.data.SessionKey; -import net.i2p.data.Signature; -import net.i2p.router.Router; -import net.i2p.router.RouterContext; -import net.i2p.router.transport.BandwidthLimitedInputStream; -import net.i2p.router.transport.BandwidthLimitedOutputStream; -import net.i2p.util.Log; - -/** - * Class responsible for all of the handshaking necessary to turn a socket into - * a TCPConnection. - * - */ -public class ConnectionHandler { - private RouterContext _context; - private Log _log; - private TCPTransport _transport; - /** who we're actually talking with */ - private RouterInfo _actualPeer; - /** raw socket to the peer */ - private Socket _socket; - /** raw stream to read from the peer */ - private InputStream _rawIn; - /** raw stream to write to the peer */ - private OutputStream _rawOut; - /** secure stream to read from the peer */ - private InputStream _connectionIn; - /** secure stream to write to the peer */ - private OutputStream _connectionOut; - /** protocol version agreed to, or -1 */ - private int _agreedProtocol; - /** - * Contains a message describing why the connection failed (or null if it - * succeeded). This should include a timestamp of some sort. - */ - private String _error; - /** - * If we're handing a reachability test, set this to true once - * we're done - */ - private boolean _testComplete; - /** IP address of the peer who contacted us */ - private String _from; - /** Where we verified their address */ - private TCPAddress _remoteAddress; - /** connection tag to identify ourselves, or null if no known tag is available */ - private ByteArray _connectionTag; - /** connection tag to identify ourselves next time */ - private ByteArray _nextConnectionTag; - /** nonce the peer gave us */ - private ByteArray _nonce; - /** key that we will be encrypting comm with */ - private SessionKey _key; - /** initialization vector for the encryption */ - private byte[] _iv; - - /** for reading/comparing, this is the #bytes sent if we are being tested */ - public static final int FLAG_TEST = 0xFFFF; - /** protocol version sent if no protocols are ok */ - public static final byte FLAG_PROTOCOL_NONE = 0x0; - /** alice is sending a tag to bob */ - public static final byte FLAG_TAG_FOLLOWING = 0x1; - /** alice is not sending a tag to bob */ - public static final byte FLAG_TAG_NOT_FOLLOWING = 0x0; - /** the connection tag is ok (we have an available key for it) */ - public static final byte FLAG_TAG_OK = 0x1; - /** the connection tag is not ok (must go with a full DH) */ - public static final byte FLAG_TAG_NOT_OK = 0x0; - /** dunno why the peer is bad */ - public static final int STATUS_UNKNOWN = -1; - /** the peer's public addresses could not be verified */ - public static final int STATUS_UNREACHABLE = 1; - /** the peer's clock is too far skewed */ - public static final int STATUS_SKEWED = 2; - /** the peer's signature failed (either some crazy corruption or MITM) */ - public static final int STATUS_SIGNATURE_FAILED = 3; - /** the peer is fine */ - public static final int STATUS_OK = 0; - - private static final int MAX_VERSIONS = 255; - - public ConnectionHandler(RouterContext ctx, TCPTransport transport, Socket socket) { - _context = ctx; - _log = ctx.logManager().getLog(ConnectionHandler.class); - _transport = transport; - _socket = socket; - _error = null; - _agreedProtocol = -1; - InetAddress addr = _socket.getInetAddress(); - try { _socket.setSoTimeout(TCPListener.HANDLE_TIMEOUT); } catch (SocketException se) {} - if (addr != null) { - _from = addr.getHostAddress(); - } - } - - /** - * Blocking call to establish a TCP connection over the current socket. - * At this point, no data whatsoever need to have been transmitted over the - * socket - the builder is responsible for all aspects of the handshaking. - * - * @return fully established but not yet running connection, or null on error - */ - public TCPConnection receiveConnection() { - try { - _rawIn = _socket.getInputStream(); - _rawOut = _socket.getOutputStream(); - } catch (IOException ioe) { - fail("Error accessing the socket streams from " + _from, ioe); - return null; - } - - negotiateProtocol(); - - if ( (_agreedProtocol < 0) || (_error != null) ) - return null; - - boolean ok = false; - if ( (_connectionTag != null) && (_key != null) ) - ok = connectExistingSession(); - else - ok = connectNewSession(); - - if (_log.shouldLog(Log.DEBUG)) - _log.debug("connection ok? " + ok + " error: " + _error); - - if (ok && (_error == null) ) { - establishComplete(); - - if (true) - try { _socket.setSoTimeout(ConnectionRunner.DISCONNECT_INACTIVITY_PERIOD); } catch (SocketException se) {} - else - try { _socket.setSoTimeout(0); } catch (SocketException se) {} - - if (_log.shouldLog(Log.INFO)) - _log.info("Establishment ok... building the con"); - - TCPConnection con = new TCPConnection(_context); - con.setInputStream(_connectionIn); - con.setOutputStream(_connectionOut); - con.setSocket(_socket); - con.setRemoteRouterIdentity(_actualPeer.getIdentity()); - con.setRemoteAddress(_remoteAddress); - if (_error == null) { - if (_log.shouldLog(Log.INFO)) - _log.info("Establishment successful! returning the con"); - con.setIsOutbound(false); - return con; - } else { - if (_log.shouldLog(Log.INFO)) - _log.info("Establishment ok but we failed?! error = " + _error); - return null; - } - } else { - return null; - } - } - - /** - * Agree on what protocol to communicate with, and set _agreedProtocol - * accordingly. If no common protocols are available, disconnect, set - * _agreedProtocol to -1, and update the _error accordingly. - */ - private void negotiateProtocol() { - boolean ok = readPreferredProtocol(); - if (!ok) return; - sendAgreedProtocol(); - } - - /** - * Receive #bytesFollowing + #versions + v1 [+ v2 [etc]] + tag? + tagData + properties - * - */ - private boolean readPreferredProtocol() { - try { - int numBytes = (int)DataHelper.readLong(_rawIn, 2); - if (numBytes <= 0) - throw new IOException("Invalid number of bytes in connection"); - // reachability test - if (numBytes == FLAG_TEST) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("ReadProtocol[Y]: test called, handle it"); - handleTest(); - return false; - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("ReadProtocol[Y]: not a test (line len=" + numBytes + ")"); - } - - byte line[] = new byte[numBytes]; - int read = DataHelper.read(_rawIn, line); - if (read != numBytes) { - fail("Handshake too short from " + _from); - return false; - } - - ByteArrayInputStream bais = new ByteArrayInputStream(line); - - int numVersions = (int)DataHelper.readLong(bais, 1); - if ( (numVersions <= 0) || (numVersions > MAX_VERSIONS) ) { - fail("Invalid number of protocol versions from " + _from); - return false; - } - int versions[] = new int[numVersions]; - for (int i = 0; i < numVersions; i++) - versions[i] = (int)DataHelper.readLong(bais, 1); - - for (int i = 0; i < numVersions && _agreedProtocol == -1; i++) { - for (int j = 0; j < TCPTransport.SUPPORTED_PROTOCOLS.length; j++) { - if (versions[i] == TCPTransport.SUPPORTED_PROTOCOLS[j]) { - _agreedProtocol = versions[i]; - break; - } - } - } - - int tag = (int)DataHelper.readLong(bais, 1); - if (tag == FLAG_TAG_FOLLOWING) { - byte tagData[] = new byte[32]; - read = DataHelper.read(bais, tagData); - if (read != 32) - throw new IOException("Not enough data for the tag"); - _connectionTag = new ByteArray(tagData); - _key = _transport.getTagManager().getKey(_connectionTag); - if (_key == null) - _connectionTag = null; - } - - Properties opts = DataHelper.readProperties(bais); - // ignore them - - if (_log.shouldLog(Log.DEBUG)) - _log.debug("ReadProtocol[Y]: agreed=" + _agreedProtocol + " tag: " - + (_connectionTag != null ? Base64.encode(_connectionTag.getData()) : "none")); - return true; - } catch (IOException ioe) { - fail("Error reading the handshake from " + _from - + ": " + ioe.getMessage(), ioe); - return false; - } catch (DataFormatException dfe) { - fail("Error reading the handshake from " + _from - + ": " + dfe.getMessage(), dfe); - return false; - } - } - - /** - * Send #bytesFollowing + versionOk + #bytesIP + IP + tagOk? + nonce + properties - */ - private void sendAgreedProtocol() { - try { - ByteArrayOutputStream baos = new ByteArrayOutputStream(128); - if (_agreedProtocol <= 0) - baos.write(FLAG_PROTOCOL_NONE); - else - baos.write(_agreedProtocol); - - byte ip[] = _from.getBytes(); - baos.write(ip.length); - baos.write(ip); - - if (_key != null) - baos.write(FLAG_TAG_OK); - else - baos.write(FLAG_TAG_NOT_OK); - - byte nonce[] = new byte[4]; - _context.random().nextBytes(nonce); - _nonce = new ByteArray(nonce); - baos.write(nonce); - - Properties opts = new Properties(); - opts.setProperty("foo", "bar"); - DataHelper.writeProperties(baos, opts); // no options atm - - byte line[] = baos.toByteArray(); - DataHelper.writeLong(_rawOut, 2, line.length); - _rawOut.write(line); - _rawOut.flush(); - - - if (_log.shouldLog(Log.DEBUG)) - _log.debug("SendProtocol[Y]: agreed=" + _agreedProtocol + " IP: " + _from - + " nonce: " + Base64.encode(nonce) + " tag: " - + (_connectionTag != null ? Base64.encode(_connectionTag.getData()) : " none") - + " props: " + opts - + "\nLine: " + Base64.encode(line)); - - if (_agreedProtocol <= 0) { - fail("Connection from " + _from + " rejected, since no compatible protocols were found"); - return; - } - } catch (IOException ioe) { - fail("Error writing the handshake to " + _from - + ": " + ioe.getMessage(), ioe); - return; - } catch (DataFormatException dfe) { - fail("Error writing the handshake to " + _from - + ": " + dfe.getMessage(), dfe); - return; - } - } - - /** Set the next tag to H(E(nonce + tag, sessionKey)) */ - private void updateNextTagExisting() { - byte pre[] = new byte[48]; - System.arraycopy(_nonce.getData(), 0, pre, 0, 4); - System.arraycopy(_connectionTag.getData(), 0, pre, 4, 32); - _context.aes().encrypt(pre, 0, pre, 0, _key, _iv, pre.length); - Hash h = _context.sha().calculateHash(pre); - _nextConnectionTag = new ByteArray(h.getData()); - } - - /** - * We have a valid tag, so use it to do the handshaking. On error, fail() - * appropriately. - * - * @return true if the connection went ok, or false if it failed. - */ - private boolean connectExistingSession() { - // iv = H(tag+nonce) - byte data[] = new byte[36]; - System.arraycopy(_connectionTag.getData(), 0, data, 0, 32); - System.arraycopy(_nonce.getData(), 0, data, 32, 4); - Hash h = _context.sha().calculateHash(data); - _iv = new byte[16]; - System.arraycopy(h.getData(), 0, _iv, 0, 16); - - updateNextTagExisting(); - - _rawOut = new BufferedOutputStream(_rawOut, ConnectionBuilder.WRITE_BUFFER_SIZE); - - _rawOut = new AESOutputStream(_context, _rawOut, _key, _iv); - _rawIn = new AESInputStream(_context, _rawIn, _key, _iv); - - // read: H(nonce) - try { - Hash readHash = new Hash(); - readHash.readBytes(_rawIn); - - Hash expected = _context.sha().calculateHash(_nonce.getData()); - if (!expected.equals(readHash)) { - fail("Verification hash failed from " + _from); - return false; - } - } catch (IOException ioe) { - fail("Error reading the encrypted nonce from " + _from - + ": " + ioe.getMessage(), ioe); - return false; - } catch (DataFormatException dfe) { - fail("Error reading the encrypted nonce from " + _from - + ": " + dfe.getMessage(), dfe); - return false; - } - - // send: H(tag) - try { - Hash tagHash = _context.sha().calculateHash(_connectionTag.getData()); - tagHash.writeBytes(_rawOut); - _rawOut.flush(); - } catch (IOException ioe) { - fail("Error writing the encrypted tag to " + _from - + ": " + ioe.getMessage(), ioe); - return false; - } catch (DataFormatException dfe) { - fail("Error writing the encrypted tag to " + _from - + ": " + dfe.getMessage(), dfe); - return false; - } - - long clockSkew = 0; - - // read: routerInfo + currentTime + H(routerInfo + currentTime + nonce + tag) - try { - RouterInfo peer = new RouterInfo(); - peer.readBytes(_rawIn); - Date now = DataHelper.readDate(_rawIn); - Hash readHash = new Hash(); - readHash.readBytes(_rawIn); - - ByteArrayOutputStream baos = new ByteArrayOutputStream(512); - peer.writeBytes(baos); - DataHelper.writeDate(baos, now); - baos.write(_nonce.getData()); - baos.write(_connectionTag.getData()); - Hash expectedHash = _context.sha().calculateHash(baos.toByteArray()); - - if (!expectedHash.equals(readHash)) { - fail("Invalid hash read for the info from " + _from); - return false; - } - - _actualPeer = peer; - clockSkew = _context.clock().now() - now.getTime(); - } catch (IOException ioe) { - fail("Error reading the peer info from " + _from - + ": " + ioe.getMessage(), ioe); - return false; - } catch (DataFormatException dfe) { - fail("Error reading the peer info from " + _from - + ": " + dfe.getMessage(), dfe); - return false; - } - - // verify routerInfo - boolean reachable = verifyReachability(); - - // send routerInfo + status + properties + H(routerInfo + status + properties + nonce + tag) - try { - ByteArrayOutputStream baos = new ByteArrayOutputStream(512); - _context.router().getRouterInfo().writeBytes(baos); - - Properties props = new Properties(); - - int status = STATUS_UNKNOWN; - if (!reachable) { - status = STATUS_UNREACHABLE; - } else if ( (clockSkew > Router.CLOCK_FUDGE_FACTOR) - || (clockSkew < 0 - Router.CLOCK_FUDGE_FACTOR) ) { - status = STATUS_SKEWED; - SimpleDateFormat fmt = new SimpleDateFormat("yyyyMMddHHmmssSSS"); - props.setProperty("SKEW", fmt.format(new Date(_context.clock().now()))); - } else { - try { - _context.netDb().store(_actualPeer.getIdentity().getHash(), _actualPeer); - status = STATUS_OK; - } catch (IllegalArgumentException iae) { - // bad peer info - status = STATUS_UNKNOWN; - props.setProperty("REASON", "RouterInfoFailed"); - } - } - - baos.write(status); - - DataHelper.writeProperties(baos, props); - byte beginning[] = baos.toByteArray(); - - baos.write(_nonce.getData()); - baos.write(_connectionTag.getData()); - - Hash verification = _context.sha().calculateHash(baos.toByteArray()); - - _rawOut.write(beginning); - verification.writeBytes(_rawOut); - _rawOut.flush(); - - return handleStatus(status, clockSkew); - } catch (IOException ioe) { - fail("Error writing the peer info to " + _from - + ": " + ioe.getMessage(), ioe); - return false; - } catch (DataFormatException dfe) { - fail("Error writing the peer info to " + _from - + ": " + dfe.getMessage(), dfe); - return false; - } - } - - /** - * - * We do not have a valid tag, so DH then do the handshaking. On error, - * fail() appropriately. - * - * @return true if the connection went ok, or false if it failed. - */ - private boolean connectNewSession() { - DHSessionKeyBuilder builder = null; - try { - builder = DHSessionKeyBuilder.exchangeKeys(_rawIn, _rawOut); - if (builder == null) { - fail("Error exchanging the keys with " + _from); - return false; - } - } catch (IOException ioe) { - fail("Error exchanging keys with " + _from); - return false; - } - - // load up the key initialize the encrypted streams - _key = builder.getSessionKey(); - byte extra[] = builder.getExtraBytes().getData(); - _iv = new byte[16]; - System.arraycopy(extra, 0, _iv, 0, 16); - byte nextTag[] = new byte[32]; - System.arraycopy(extra, 16, nextTag, 0, 32); - _nextConnectionTag = new ByteArray(nextTag); - - if (_log.shouldLog(Log.DEBUG)) - _log.debug("\nNew session[Y]: key=" + _key.toBase64() + " iv=" - + Base64.encode(_iv) + " nonce=" + Base64.encode(_nonce.getData()) - + " socket: " + _socket); - - _rawOut = new BufferedOutputStream(_rawOut, ConnectionBuilder.WRITE_BUFFER_SIZE); - - _rawOut = new AESOutputStream(_context, _rawOut, _key, _iv); - _rawIn = new AESInputStream(_context, _rawIn, _key, _iv); - - // read: H(nonce) - try { - Hash h = new Hash(); - h.readBytes(_rawIn); - - Hash expected = _context.sha().calculateHash(_nonce.getData()); - if (!expected.equals(h)) { - fail("Hash after negotiation from " + _from + " does not match"); - return false; - } - } catch (IOException ioe) { - fail("Error reading the hash from " + _from - + ": " + ioe.getMessage(), ioe); - return false; - } catch (DataFormatException dfe) { - fail("Error reading the hash from " + _from - + ": " + dfe.getMessage(), dfe); - return false; - } - - // send: H(nextTag) - try { - Hash h = _context.sha().calculateHash(_nextConnectionTag.getData()); - h.writeBytes(_rawOut); - _rawOut.flush(); - } catch (IOException ioe) { - fail("Error writing the hash to " + _from - + ": " + ioe.getMessage(), ioe); - return false; - } catch (DataFormatException dfe) { - fail("Error writing the hash to " + _from - + ": " + dfe.getMessage(), dfe); - return false; - } - - long clockSkew = 0; - boolean sigOk = false; - - // our public == Y, since we are receiving the connection - byte X[] = builder.getPeerPublicValueBytes(); - byte Y[] = builder.getMyPublicValueBytes(); - - // read: routerInfo + currentTime - // + S(routerInfo + currentTime + nonce + nextTag + X + Y, routerIdent.signingKey) - try { - RouterInfo info = new RouterInfo(); - info.readBytes(_rawIn); - Date now = DataHelper.readDate(_rawIn); - Signature sig = new Signature(); - sig.readBytes(_rawIn); - - ByteArrayOutputStream baos = new ByteArrayOutputStream(512); - info.writeBytes(baos); - DataHelper.writeDate(baos, now); - baos.write(_nonce.getData()); - baos.write(_nextConnectionTag.getData()); - baos.write(X); - baos.write(Y); - - sigOk = _context.dsa().verifySignature(sig, baos.toByteArray(), - info.getIdentity().getSigningPublicKey()); - - clockSkew = _context.clock().now() - now.getTime(); - _actualPeer = info; - } catch (IOException ioe) { - fail("Error reading the info from " + _from - + ": " + ioe.getMessage(), ioe); - return false; - } catch (DataFormatException dfe) { - fail("Error reading the info from " + _from - + ": " + dfe.getMessage(), dfe); - return false; - } - - // verify routerInfo - boolean reachable = verifyReachability(); - - // send: routerInfo + status + properties - // + S(routerInfo + status + properties + nonce + nextTag + X + Y, routerIdent.signingKey) - try { - ByteArrayOutputStream baos = new ByteArrayOutputStream(512); - _context.router().getRouterInfo().writeBytes(baos); - - Properties props = new Properties(); - - int status = STATUS_UNKNOWN; - if (!reachable) { - status = STATUS_UNREACHABLE; - } else if ( (clockSkew > Router.CLOCK_FUDGE_FACTOR) - || (clockSkew < 0 - Router.CLOCK_FUDGE_FACTOR) ) { - status = STATUS_SKEWED; - SimpleDateFormat fmt = new SimpleDateFormat("yyyyMMddHHmmssSSS"); - props.setProperty("SKEW", fmt.format(new Date(_context.clock().now()))); - } else if (!sigOk) { - status = STATUS_SIGNATURE_FAILED; - } else { - try { - _context.netDb().store(_actualPeer.getIdentity().getHash(), _actualPeer); - status = STATUS_OK; - } catch (IllegalArgumentException iae) { - // bad peer info - status = STATUS_UNKNOWN; - props.setProperty("REASON", "RouterInfoFailed"); - } - } - - if (_actualPeer.getIdentity().getHash().equals(_context.routerHash())) { - status = STATUS_UNKNOWN; - props.setProperty("REASON", "wtf, talking to myself?"); - } - - baos.write(status); - - DataHelper.writeProperties(baos, props); - byte beginning[] = baos.toByteArray(); - - baos.write(_nonce.getData()); - baos.write(_nextConnectionTag.getData()); - baos.write(X); - baos.write(Y); - - Signature sig = _context.dsa().sign(baos.toByteArray(), - _context.keyManager().getSigningPrivateKey()); - - _rawOut.write(beginning); - sig.writeBytes(_rawOut); - _rawOut.flush(); - - return handleStatus(status, clockSkew); - } catch (IOException ioe) { - fail("Error writing the info to " + _from - + ": " + ioe.getMessage(), ioe); - return false; - } catch (DataFormatException dfe) { - fail("Error writing the info to " + _from - + ": " + dfe.getMessage(), dfe); - return false; - } - } - - /** - * Act according to the status code, failing as necessary and returning - * whether we should continue going or not. - * - * @return true if we should keep going. - */ - private boolean handleStatus(int status, long clockSkew) { - switch (status) { - case STATUS_OK: - return true; - case STATUS_UNREACHABLE: - fail("Peer " + _actualPeer.getIdentity().calculateHash().toBase64().substring(0,6) - + " at " + _from + " is unreachable"); - return false; - case STATUS_SKEWED: - fail("Peer " + _actualPeer.getIdentity().calculateHash().toBase64().substring(0,6) - + " was skewed by " + DataHelper.formatDuration(clockSkew)); - return false; - case STATUS_SIGNATURE_FAILED: - fail("Forged signature on " + _from + " pretending to be " - + _actualPeer.getIdentity().calculateHash().toBase64().substring(0,6)); - return false; - default: - fail("Unknown error verifying " - + _actualPeer.getIdentity().calculateHash().toBase64().substring(0,6) - + ": " + status); - return false; - } - } - - /** - * Can the peer be contacted on their public addresses? If so, - * be sure to set _remoteAddress. We can do this without branching onto - * another thread because we already have a timer killing this handler if - * it takes too long - */ - private boolean verifyReachability() { - if (_actualPeer == null) return false; - _remoteAddress = new TCPAddress(_actualPeer.getTargetAddress(TCPTransport.STYLE)); - if ( (_remoteAddress.getPort() <= 0) || (_remoteAddress.getPort() > 65535) ) - return false; - - TCPAddress testAddress = _remoteAddress; - - // if it is a LAN address, test with that address and not the public one - if (!TCPAddress.isPubliclyRoutable(_from)) { - testAddress = new TCPAddress(_from, _remoteAddress.getPort()); - } - - try { - return verifyReachability(testAddress); - } catch (IOException ioe) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Error verifying " - + _actualPeer.getIdentity().calculateHash().toBase64().substring(0,6) - + "at " + testAddress, ioe); - return false; - } catch (DataFormatException dfe) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Error verifying " - + _actualPeer.getIdentity().calculateHash().toBase64().substring(0,6) - + "at " + testAddress, dfe); - return false; - } - } - - private static boolean verifyReachability(TCPAddress address) throws IOException, DataFormatException { - //if (true) return true; - Socket s = new Socket(address.getAddress(), address.getPort()); - OutputStream out = s.getOutputStream(); - InputStream in = s.getInputStream(); - - try { s.setSoTimeout(TCPListener.HANDLE_TIMEOUT); } catch (SocketException se) {} - - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("Beginning verification of reachability"); - - // send: 0xFFFF + #versions + v1 [+ v2 [etc]] + properties - DataHelper.writeLong(out, 2, FLAG_TEST); - out.write(TCPTransport.SUPPORTED_PROTOCOLS.length); - for (int i = 0; i < TCPTransport.SUPPORTED_PROTOCOLS.length; i++) - out.write(TCPTransport.SUPPORTED_PROTOCOLS[i]); - DataHelper.writeProperties(out, null); - out.flush(); - - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("Verification of reachability request sent"); - - // read: 0xFFFF + versionOk + #bytesIP + IP + currentTime + properties - int flag = (int)DataHelper.readLong(in, 2); - if (flag != FLAG_TEST) - throw new IOException("Unable to verify the peer - invalid response"); - int version = in.read(); - if (version == -1) - throw new IOException("Unable to verify the peer - invalid version"); - if (version == FLAG_PROTOCOL_NONE) - throw new IOException("Unable to verify the peer - no matching version"); - int numBytes = in.read(); - if ( (numBytes == -1) || (numBytes > 32) ) - throw new IOException("Unable to verify the peer - invalid num bytes"); - byte ip[] = new byte[numBytes]; - int read = DataHelper.read(in, ip); - if (read != numBytes) - throw new IOException("Unable to verify the peer - invalid num bytes"); - Date now = DataHelper.readDate(in); - Properties opts = DataHelper.readProperties(in); - - return true; - } - - /** - * The peer contacting us is just testing us. Verify that we are reachable - * by following the protocol, then close the socket. This is called only - * after reading the initial 0xFFFF. - * - */ - private void handleTest() { - try { - // read: #versions + v1 [+ v2 [etc]] + properties - int numVersions = _rawIn.read(); - if (numVersions == -1) throw new IOException("Unable to read versions"); - if (numVersions > MAX_VERSIONS) throw new IOException("Too many versions"); - int versions[] = new int[numVersions]; - for (int i = 0; i < numVersions; i++) { - versions[i] = _rawIn.read(); - if (versions[i] == -1) - throw new IOException("Not enough versions"); - } - Properties opts = DataHelper.readProperties(_rawIn); - - int version = 0; - for (int i = 0; i < versions.length && version == 0; i++) { - for (int j = 0; j < TCPTransport.SUPPORTED_PROTOCOLS.length; j++) { - if (TCPTransport.SUPPORTED_PROTOCOLS[j] == versions[i]) { - version = versions[i]; - break; - } - } - } - - if (_log.shouldLog(Log.DEBUG)) - _log.debug("HandleTest: version=" + version + " opts=" +opts); - - // send: 0xFFFF + versionOk + #bytesIP + IP + currentTime + properties - DataHelper.writeLong(_rawOut, 2, FLAG_TEST); - _rawOut.write(version); - byte ip[] = _from.getBytes(); - _rawOut.write(ip.length); - _rawOut.write(ip); - DataHelper.writeLong(_rawOut, DataHelper.DATE_LENGTH, _context.clock().now()); - //DataHelper.writeDate(_rawOut, new Date(_context.clock().now())); - DataHelper.writeProperties(_rawOut, null); - _rawOut.flush(); - - if (_log.shouldLog(Log.DEBUG)) - _log.debug("HandleTest: result flushed"); - - } catch (IOException ioe) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Unable to verify test connection from " + _from, ioe); - } catch (DataFormatException dfe) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Unable to verify test connection from " + _from, dfe); - } finally { - if (_rawIn != null) try { _rawIn.close(); } catch (IOException ioe) {} - if (_rawOut != null) try { _rawOut.close(); } catch (IOException ioe) {} - if (_socket != null) try { _socket.close(); } catch (IOException ioe) {} - - _socket = null; - _rawIn = null; - _rawOut = null; - _agreedProtocol = -1; - _nonce = null; - _connectionTag = null; - _actualPeer = null; - _testComplete = true; - } - } - - /** - * Finish up the establishment (wrapping the streams, storing the netDb, - * persisting the connection tags, etc) - * - */ - private void establishComplete() { - _connectionIn = new BandwidthLimitedInputStream(_context, _rawIn, _actualPeer.getIdentity()); - OutputStream blos = new BandwidthLimitedOutputStream(_context, _rawOut, _actualPeer.getIdentity()); - _connectionOut = blos; - //_connectionIn = _rawIn; - //_connectionOut = _rawOut; - - Hash peer = _actualPeer.getIdentity().getHash(); - _transport.getTagManager().replaceTag(peer, _nextConnectionTag, _key); - } - - public String getError() { return _error; } - public boolean getTestComplete() { return _testComplete; } - - /** - * Kill the handler, closing all sockets and streams, setting everything - * back to failure states, and setting the given error. - * - */ - private void fail(String error) { - fail(error, null); - } - private void fail(String error, Exception e) { - if (_error == null) // only grab the first error - _error = error; - - if (_rawIn != null) try { _rawIn.close(); } catch (IOException ioe) {} - if (_rawOut != null) try { _rawOut.close(); } catch (IOException ioe) {} - if (_socket != null) try { _socket.close(); } catch (IOException ioe) {} - - _socket = null; - _rawIn = null; - _rawOut = null; - _agreedProtocol = -1; - _nonce = null; - _connectionTag = null; - _actualPeer = null; - - if (_log.shouldLog(Log.WARN)) - _log.warn(error, e); - } - - /** - * Verify the reachability of a peer. - * Usage: ConnectionHandler hostname portNum - */ - public static void main(String args[]) { - if (false) args = new String[] { "dev.i2p.net", "4108" }; - - if ( (args == null) || (args.length != 2) ) { - System.out.println("Usage: ConnectionHandler hostname portNum"); - System.exit(0); - } - - try { - int port = Integer.parseInt(args[1]); - TCPAddress addr = new TCPAddress(args[0], port); - boolean ok = verifyReachability(addr); - if (ok) - System.out.println("Peer is reachable: " + addr.toString()); - else - System.out.println("Peer is not reachable: " + addr.toString()); - } catch (Exception e) { - System.out.println("Peer is not reachable: " + args[0] + ":" + args[1]); - e.printStackTrace(); - } - } -} diff --git a/router/java/src/net/i2p/router/transport/tcp/ConnectionRunner.java b/router/java/src/net/i2p/router/transport/tcp/ConnectionRunner.java deleted file mode 100644 index a9d88b860..000000000 --- a/router/java/src/net/i2p/router/transport/tcp/ConnectionRunner.java +++ /dev/null @@ -1,218 +0,0 @@ -package net.i2p.router.transport.tcp; - -import java.io.IOException; -import java.io.OutputStream; - -import net.i2p.data.DataHelper; -import net.i2p.data.RouterInfo; -import net.i2p.data.i2np.DateMessage; -import net.i2p.data.i2np.I2NPMessage; -import net.i2p.router.OutNetMessage; -import net.i2p.router.RouterContext; -import net.i2p.util.I2PThread; -import net.i2p.util.Log; -import net.i2p.util.SimpleTimer; - -/** - * Push out I2NPMessages across the wire - * - */ -class ConnectionRunner implements Runnable { - private Log _log; - private RouterContext _context; - private TCPConnection _con; - private boolean _keepRunning; - private byte _writeBuffer[]; - private long _lastTimeSend; - private long _lastWriteEnd; - private long _lastWriteBegin; - private long _lastRealActivity; - - private static final long TIME_SEND_FREQUENCY = 60*1000; - /** if we don't send them any real data in a 10 minute period, drop 'em */ - static final int DISCONNECT_INACTIVITY_PERIOD = 10*60*1000; - - public ConnectionRunner(RouterContext ctx, TCPConnection con) { - _context = ctx; - _log = ctx.logManager().getLog(ConnectionRunner.class); - _con = con; - _keepRunning = false; - _lastWriteBegin = ctx.clock().now(); - _lastWriteEnd = _lastWriteBegin; - _lastRealActivity = _lastWriteBegin; - } - - public void startRunning() { - _keepRunning = true; - _writeBuffer = new byte[38*1024]; // expansion factor - _lastTimeSend = -1; - - String name = "TCP " + _context.routerHash().toBase64().substring(0,6) - + " to " - + _con.getRemoteRouterIdentity().calculateHash().toBase64().substring(0,6); - I2PThread t = new I2PThread(this, name); - t.start(); - - long delay = TIME_SEND_FREQUENCY + _context.random().nextInt(60*1000); - SimpleTimer.getInstance().addEvent(new KeepaliveEvent(), delay); - } - - public void stopRunning() { - _keepRunning = false; - } - - public void run() { - while (_keepRunning && !_con.getIsClosed()) { - OutNetMessage msg = _con.getNextMessage(); - if (msg == null) { - if (_keepRunning && !_con.getIsClosed()) - _log.error("next message is null but we should keep running?"); - _con.closeConnection(); - return; - } else { - sendMessage(msg); - } - } - } - - private void sendMessage(OutNetMessage msg) { - byte buf[] = _writeBuffer; - int written = 0; - try { - written = msg.getMessageData(_writeBuffer); - } catch (ArrayIndexOutOfBoundsException aioobe) { - I2NPMessage m = msg.getMessage(); - if (m != null) { - buf = m.toByteArray(); - written = buf.length; - } - } catch (Exception e) { - _log.log(Log.CRIT, "getting the message data", e); - _con.closeConnection(); - return; - } - if (written <= 0) { - if (_log.shouldLog(Log.WARN)) - _log.warn("message " + msg.getMessageType() + "/" + msg.getMessageId() - + " expired before it could be sent"); - - msg.timestamp("ConnectionRunner.sendMessage noData"); - _con.sent(msg, false, 0); - return; - } - - msg.timestamp("ConnectionRunner.sendMessage data"); - - boolean sendTime = false; - if (_lastTimeSend < _context.clock().now() - TIME_SEND_FREQUENCY) - sendTime = true; - - OutputStream out = _con.getOutputStream(); - boolean ok = false; - - if (!DateMessage.class.getName().equals(msg.getMessageType())) - _lastRealActivity = _context.clock().now(); - try { - synchronized (out) { - _lastWriteBegin = _context.clock().now(); - out.write(buf, 0, written); - if (sendTime) { - out.write(buildTimeMessage().toByteArray()); - _lastTimeSend = _context.clock().now(); - } - out.flush(); - _lastWriteEnd = _context.clock().now(); - } - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Just sent message " + msg.getMessageId() + " to " - + msg.getTarget().getIdentity().getHash().toBase64().substring(0,6) - + " writeTime = " + (_lastWriteEnd - _lastWriteBegin) +"ms" - + " lifetime = " + msg.getLifetime() + "ms"); - - ok = true; - } catch (IOException ioe) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Error writing out the message", ioe); - _con.closeConnection(); - } - _con.sent(msg, ok, _lastWriteEnd - _lastWriteBegin); - } - - /** - * Build up a new message to be sent with the current router's time - * - */ - private I2NPMessage buildTimeMessage() { - DateMessage dm = new DateMessage(_context); - dm.setNow(_context.clock().now()); - return dm; - } - - /** - * Every few minutes, send a (tiny) message to the peer if we haven't - * spoken with them recently. This will help kill off any hung - * connections (due to IP address changes, etc). If we don't get any - * messages through in 5 minutes, kill the connection as well. - * - */ - private class KeepaliveEvent implements SimpleTimer.TimedEvent { - public void timeReached() { - if (!_keepRunning) return; - if (_con.getIsClosed()) return; - long now = _context.clock().now(); - long timeSinceWrite = now - _lastWriteEnd; - long timeSinceWriteBegin = now - _lastWriteBegin; - long timeSinceWriteReal = now - _lastRealActivity; - if (timeSinceWrite > 5*TIME_SEND_FREQUENCY) { - TCPTransport t = _con.getTransport(); - String msg = "Connection closed with " - + _con.getRemoteRouterIdentity().getHash().toBase64().substring(0,6) - + " due to " + DataHelper.formatDuration(timeSinceWrite) - + " of inactivity after " - + DataHelper.formatDuration(_con.getLifetime()); - if (_lastWriteBegin > _lastWriteEnd) - msg = msg + " with a message being written for " + - DataHelper.formatDuration(timeSinceWriteBegin); - t.addConnectionErrorMessage(msg); - if (_log.shouldLog(Log.INFO)) - _log.info(msg); - _con.closeConnection(false); - return; - } - if (timeSinceWriteReal > DISCONNECT_INACTIVITY_PERIOD) { - TCPTransport t = _con.getTransport(); - String msg = "Connection closed with " - + _con.getRemoteRouterIdentity().getHash().toBase64().substring(0,6) - + " due to " + DataHelper.formatDuration(timeSinceWriteReal) - + " of inactivity after " - + DataHelper.formatDuration(_con.getLifetime()); - if (_lastWriteBegin > _lastWriteEnd) - msg = msg + " with a message being written for " + - DataHelper.formatDuration(timeSinceWriteBegin); - t.addConnectionErrorMessage(msg); - if (_log.shouldLog(Log.INFO)) - _log.info(msg); - _con.closeConnection(false); - return; - } - - if (_lastTimeSend < _context.clock().now() - 2*TIME_SEND_FREQUENCY) - enqueueTimeMessage(); - long delay = 2*TIME_SEND_FREQUENCY + _context.random().nextInt((int)TIME_SEND_FREQUENCY); - SimpleTimer.getInstance().addEvent(KeepaliveEvent.this, delay); - } - } - - private void enqueueTimeMessage() { - OutNetMessage msg = new OutNetMessage(_context); - RouterInfo ri = _context.netDb().lookupRouterInfoLocally(_con.getRemoteRouterIdentity().getHash()); - if (ri == null) return; - msg.setTarget(ri); - msg.setExpiration(_context.clock().now() + TIME_SEND_FREQUENCY); - msg.setMessage(buildTimeMessage()); - msg.setPriority(100); - _con.addMessage(msg); - if (_log.shouldLog(Log.INFO)) - _log.info("Enqueueing time message to " + _con.getRemoteRouterIdentity().getHash().toBase64().substring(0,6)); - } -} diff --git a/router/java/src/net/i2p/router/transport/tcp/ConnectionTagManager.java b/router/java/src/net/i2p/router/transport/tcp/ConnectionTagManager.java deleted file mode 100644 index a9eca953a..000000000 --- a/router/java/src/net/i2p/router/transport/tcp/ConnectionTagManager.java +++ /dev/null @@ -1,107 +0,0 @@ -package net.i2p.router.transport.tcp; - -import java.util.HashMap; -import java.util.Map; - -import net.i2p.data.ByteArray; -import net.i2p.data.Hash; -import net.i2p.data.SessionKey; -import net.i2p.router.RouterContext; -import net.i2p.util.Log; - -/** - * Organize the tags used to connect with peers. - * - */ -public class ConnectionTagManager { - protected Log _log; - private RouterContext _context; - /** H(routerIdentity) to ByteArray */ - private Map _tagByPeer; - /** ByteArray to H(routerIdentity) */ - private Map _peerByTag; - /** H(routerIdentity) to SessionKey */ - private Map _keyByPeer; - - /** synchronize against this when dealing with the data */ - private Object _lock; - - /** - * Only keep the keys and tags for up to *cough* 10,000 peers (everyone - * else will need to use a full DH rekey). Ok, yeah, 10,000 is absurd for - * the TCP transport anyway, but we need a limit, and this eats up at most - * 1MB (96 bytes per peer). Later we may add another mapping to drop the - * oldest ones first, but who cares for now. - * - */ - public static final int MAX_CONNECTION_TAGS = 10*1000; - - public ConnectionTagManager(RouterContext context) { - _context = context; - _log = context.logManager().getLog(getClass()); - initialize(); - _lock = new Object(); - } - - protected void initialize() { - initializeData(new HashMap(128), new HashMap(128), new HashMap(128)); - } - - protected void initializeData(Map keyByPeer, Map tagByPeer, Map peerByTag) { - _keyByPeer = keyByPeer; - _tagByPeer = tagByPeer; - _peerByTag = peerByTag; - } - - /** Retrieve the associated tag (but do not consume it) */ - public ByteArray getTag(Hash peer) { - synchronized (_lock) { - return (ByteArray)_tagByPeer.get(peer); - } - } - - public SessionKey getKey(Hash peer) { - synchronized (_lock) { // - return (SessionKey)_keyByPeer.get(peer); - } - } - public SessionKey getKey(ByteArray tag) { - synchronized (_lock) { // - Hash peer = (Hash)_peerByTag.get(tag); - if (peer == null) return null; - return (SessionKey)_keyByPeer.get(peer); - } - } - - /** Update the tag associated with a peer, dropping the old one */ - public void replaceTag(Hash peer, ByteArray newTag, SessionKey key) { - synchronized (_lock) { - while (_keyByPeer.size() > MAX_CONNECTION_TAGS) { - Hash rmPeer = (Hash)_keyByPeer.keySet().iterator().next(); - ByteArray tag = (ByteArray)_tagByPeer.remove(peer); - SessionKey oldKey = (SessionKey)_keyByPeer.remove(peer); - rmPeer = (Hash)_peerByTag.remove(tag); - - if (_log.shouldLog(Log.INFO)) - _log.info("Too many tags, dropping the one for " + rmPeer.toBase64().substring(0,6)); - } - _keyByPeer.put(peer, key); - _peerByTag.put(newTag, peer); - _tagByPeer.put(peer, newTag); - - saveTags(_keyByPeer, _tagByPeer); - } - } - - /** - * Save the tags/keys associated with the peer. - * - * @param keyByPeer H(routerIdentity) to SessionKey - * @param tagByPeer H(routerIdentity) to ByteArray - */ - protected void saveTags(Map keyByPeer, Map tagByPeer) { - // noop, in memory only - } - - protected RouterContext getContext() { return _context; } -} diff --git a/router/java/src/net/i2p/router/transport/tcp/MessageHandler.java b/router/java/src/net/i2p/router/transport/tcp/MessageHandler.java deleted file mode 100644 index cc1c1f5d9..000000000 --- a/router/java/src/net/i2p/router/transport/tcp/MessageHandler.java +++ /dev/null @@ -1,78 +0,0 @@ -package net.i2p.router.transport.tcp; - -import net.i2p.data.DataHelper; -import net.i2p.data.Hash; -import net.i2p.data.RouterIdentity; -import net.i2p.data.i2np.DateMessage; -import net.i2p.data.i2np.I2NPMessage; -import net.i2p.data.i2np.I2NPMessageReader; -import net.i2p.router.Router; -import net.i2p.util.Log; - -/** - * Receive messages from a message reader and bounce them off to the transport - * for further enqueueing. - */ -public class MessageHandler implements I2NPMessageReader.I2NPMessageEventListener { - private Log _log; - private TCPTransport _transport; - private TCPConnection _con; - private RouterIdentity _ident; - private Hash _identHash; - - public MessageHandler(TCPTransport transport, TCPConnection con) { - _transport = transport; - _con = con; - _ident = con.getRemoteRouterIdentity(); - _identHash = _ident.calculateHash(); - _log = con.getRouterContext().logManager().getLog(MessageHandler.class); - transport.getContext().statManager().createRateStat("tcp.disconnectAfterSkew", "How skewed a connection became before we killed it?", "TCP", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l } ); - } - - public void disconnected(I2NPMessageReader reader) { - _con.closeConnection(); - } - - public void messageReceived(I2NPMessageReader reader, I2NPMessage message, long msToRead, int size) { - _con.messageReceived(); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Just received message " + message.getUniqueId() + " from " - + _identHash.toBase64().substring(0,6) - + " readTime = " + msToRead + "ms type = " + message.getClass().getName()); - if (message instanceof DateMessage) { - DateMessage msg = (DateMessage)message; - timeMessageReceived(msg.getNow()); - // dont propogate the message, its just a fake - return; - } - _transport.messageReceived(message, _ident, _identHash, msToRead, size); - } - - private void timeMessageReceived(long remoteTime) { - long delta = _con.getRouterContext().clock().now() - remoteTime; - if ( (delta > Router.CLOCK_FUDGE_FACTOR) || (delta < 0 - Router.CLOCK_FUDGE_FACTOR) ) { - _con.closeConnection(); - String msg = "Peer " + _identHash.toBase64().substring(0,6) + " is too far skewed (" - + DataHelper.formatDuration(delta) + ") after uptime of " - + DataHelper.formatDuration(_con.getLifetime()); - if (_log.shouldLog(Log.WARN)) - _log.warn(msg); - _transport.addConnectionErrorMessage(msg); - _transport.getContext().statManager().addRateData("tcp.disconnectAfterSkew", delta, _con.getLifetime()); - } else { - int level = Log.DEBUG; - if ( (delta > Router.CLOCK_FUDGE_FACTOR/2) || (delta < 0 - Router.CLOCK_FUDGE_FACTOR/2) ) - level = Log.WARN; - if (_log.shouldLog(level)) - _log.log(level, "Peer " + _identHash.toBase64().substring(0,6) + " is only skewed by (" - + DataHelper.formatDuration(delta) + ") after uptime of " - + DataHelper.formatDuration(_con.getLifetime()) ); - _con.setOffsetReceived(delta); - } - } - - public void readError(I2NPMessageReader reader, Exception error) { - _con.closeConnection(); - } - -} diff --git a/router/java/src/net/i2p/router/transport/tcp/PersistentConnectionTagManager.java b/router/java/src/net/i2p/router/transport/tcp/PersistentConnectionTagManager.java deleted file mode 100644 index 83d647611..000000000 --- a/router/java/src/net/i2p/router/transport/tcp/PersistentConnectionTagManager.java +++ /dev/null @@ -1,200 +0,0 @@ -package net.i2p.router.transport.tcp; - -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; - -import net.i2p.data.ByteArray; -import net.i2p.data.DataFormatException; -import net.i2p.data.DataHelper; -import net.i2p.data.Hash; -import net.i2p.data.SessionKey; -import net.i2p.router.RouterContext; -import net.i2p.util.Log; - -/** - * Simple persistent impl writing the connection tags to connectionTag.keys - * (or another file specified via "i2np.tcp.tagFile") - * - */ -public class PersistentConnectionTagManager extends ConnectionTagManager { - private Object _ioLock; - - public PersistentConnectionTagManager(RouterContext context) { - super(context); - _ioLock = new Object(); - } - - public static final String PROP_TAG_FILE = "i2np.tcp.tagFile"; - public static final String DEFAULT_TAG_FILE = "connectionTag.keys"; - - protected void initialize() { - loadTags(); - } - - /** - * Save the tags/keys associated with the peer. - * - * @param keyByPeer H(routerIdentity) to SessionKey - * @param tagByPeer H(routerIdentity) to ByteArray - */ - protected void saveTags(Map keyByPeer, Map tagByPeer) { - byte data[] = prepareData(keyByPeer, tagByPeer); - if (data == null) return; - - synchronized (_ioLock) { - File tagFile = getFile(); - if (tagFile == null) return; - - FileOutputStream fos = null; - try { - fos = new FileOutputStream(tagFile); - fos.write(data); - fos.flush(); - - if (_log.shouldLog(Log.INFO)) - _log.info("Wrote connection tags for " + keyByPeer.size() + " peers"); - } catch (IOException ioe) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Error writing out the tags", ioe); - } finally { - if (fos != null) try { fos.close(); } catch (IOException ioe) {} - } - } - } - - /** - * Get the raw data to be written to disk. - * - * @param keyByPeer H(routerIdentity) to SessionKey - * @param tagByPeer H(routerIdentity) to ByteArray - */ - private byte[] prepareData(Map keyByPeer, Map tagByPeer) { - ByteArrayOutputStream baos = new ByteArrayOutputStream(keyByPeer.size() * 32 * 3 + 32); - try { - for (Iterator iter = keyByPeer.keySet().iterator(); iter.hasNext(); ) { - Hash peer = (Hash)iter.next(); - SessionKey key = (SessionKey)keyByPeer.get(peer); - ByteArray tag = (ByteArray)tagByPeer.get(peer); - - if ( (key == null) || (tag == null) ) continue; - - baos.write(peer.getData()); - baos.write(key.getData()); - baos.write(tag.getData()); - - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Wrote connection tag for " + peer.toBase64().substring(0,6)); - } - byte pre[] = baos.toByteArray(); - Hash check = getContext().sha().calculateHash(pre); - baos.write(check.getData()); - return baos.toByteArray(); - } catch (IOException ioe) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Error preparing the tags", ioe); - return null; - } - } - - private void loadTags() { - File tagFile = getFile(); - if ( (tagFile == null) || (tagFile.length() <= 31) ) { - initializeData(new HashMap(), new HashMap(), new HashMap()); - return; - } - - FileInputStream fin = null; - try { - fin = new FileInputStream(tagFile); - byte data[] = getData(tagFile, fin); - if (data == null) { - initializeData(new HashMap(), new HashMap(), new HashMap()); - return; - } - - int entries = data.length / (32 * 3); - Map keyByPeer = new HashMap(entries); - Map tagByPeer = new HashMap(entries); - Map peerByTag = new HashMap(entries); - - for (int i = 0; i < data.length; i += 32*3) { - byte peer[] = new byte[32]; - byte key[] = new byte[32]; - byte tag[] = new byte[32]; - System.arraycopy(data, i, peer, 0, 32); - System.arraycopy(data, i + 32, key, 0, 32); - System.arraycopy(data, i + 64, tag, 0, 32); - - Hash peerData = new Hash(peer); - SessionKey keyData = new SessionKey(key); - ByteArray tagData = new ByteArray(tag); - - keyByPeer.put(peerData, keyData); - tagByPeer.put(peerData, tagData); - peerByTag.put(tagData, peerData); - - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Loaded connection tag for " + peerData.toBase64().substring(0,6)); - - if (keyByPeer.size() > ConnectionTagManager.MAX_CONNECTION_TAGS) - break; - } - - if (_log.shouldLog(Log.INFO)) - _log.info("Loaded connection tags for " + keyByPeer.size() + " peers"); - initializeData(keyByPeer, tagByPeer, peerByTag); - } catch (IOException ioe) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Connection tag file is corrupt, removing it"); - try { fin.close(); } catch (IOException ioe2) {} - tagFile.delete(); // ignore rv - fin = null; - initializeData(new HashMap(), new HashMap(), new HashMap()); - return; - } finally { - if (fin != null) try { fin.close(); } catch (IOException ioe) {} - } - } - - private byte[] getData(File tagFile, FileInputStream fin) throws IOException { - byte data[] = new byte[(int)tagFile.length() - 32]; - int read = DataHelper.read(fin, data); - if (read != data.length) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Connection tag file is corrupt (too short), removing it"); - try { fin.close(); } catch (IOException ioe) {} - tagFile.delete(); // ignore rv - fin = null; - return null; - } - - Hash readHash = new Hash(); - try { - readHash.readBytes(fin); - } catch (DataFormatException dfe) { - readHash = null; - } - - Hash calcHash = getContext().sha().calculateHash(data); - if ( (readHash == null) || (!calcHash.equals(readHash)) ) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Connection tag file is corrupt, removing it"); - try { fin.close(); } catch (IOException ioe) {} - tagFile.delete(); // ignore rv - fin = null; - return null; - } - - return data; - } - - private File getFile() { - return new File(getContext().getProperty(PROP_TAG_FILE, DEFAULT_TAG_FILE)); - } -} diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPAddress.java b/router/java/src/net/i2p/router/transport/tcp/TCPAddress.java deleted file mode 100644 index 08c07f201..000000000 --- a/router/java/src/net/i2p/router/transport/tcp/TCPAddress.java +++ /dev/null @@ -1,177 +0,0 @@ -package net.i2p.router.transport.tcp; -/* - * free (adj.): unencumbered; not under the control of others - * Written by jrandom in 2003 and released into the public domain - * with no warranty of any kind, either expressed or implied. - * It probably won't make your computer catch on fire, or eat - * your children, but it might. Use at your own risk. - * - */ - -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.Properties; - -import net.i2p.data.DataHelper; -import net.i2p.data.RouterAddress; -import net.i2p.router.transport.TransportImpl; -import net.i2p.util.Log; - -/** - * Wrap up an address - */ -public class TCPAddress { - private final static Log _log = new Log(TCPAddress.class); - private int _port; - private String _host; - private InetAddress _addr; - /** Port number used in RouterAddress definitions */ - public final static String PROP_PORT = "port"; - /** Host name used in RouterAddress definitions */ - public final static String PROP_HOST = "host"; - - public TCPAddress(String host, int port) { - try { - if (host != null) { - InetAddress iaddr = InetAddress.getByName(host); - _host = iaddr.getHostAddress(); - _addr = iaddr; - } - _port = port; - } catch (UnknownHostException uhe) { - _host = null; - _port = -1; - _addr = null; - if (_log.shouldLog(Log.WARN)) - _log.warn("Unknown host [" + host + "] for port [" + port + "]", uhe); - } - } - - public TCPAddress() { - _host = null; - _port = -1; - _addr = null; - } - - public TCPAddress(InetAddress addr, int port) { - if (addr != null) - _host = addr.getHostAddress(); - _addr = addr; - _port = port; - } - - public TCPAddress(RouterAddress addr) { - if (addr == null) { - _host = null; - _port = -1; - return; - } - String host = addr.getOptions().getProperty(PROP_HOST); - if (host == null) { - _host = null; - _port = -1; - } else { - try { - InetAddress iaddr = InetAddress.getByName(host.trim()); - _host = iaddr.getHostAddress(); - _addr = iaddr; - - String port = addr.getOptions().getProperty(PROP_PORT); - if ( (port != null) && (port.trim().length() > 0) ) { - try { - _port = Integer.parseInt(port.trim()); - } catch (NumberFormatException nfe) { - _log.error("Invalid port [" + port + "]", nfe); - _port = -1; - } - } else { - _port = -1; - } - } catch (UnknownHostException uhe) { - _host = null; - _port = -1; - } - } - } - - public RouterAddress toRouterAddress() { - if ( (_host == null) || (_port <= 0) ) - return null; - - RouterAddress addr = new RouterAddress(); - - addr.setCost(10); - addr.setExpiration(null); - - Properties props = new Properties(); - props.setProperty(PROP_HOST, _host); - props.setProperty(PROP_PORT, ""+_port); - - addr.setOptions(props); - addr.setTransportStyle(TCPTransport.STYLE); - return addr; - } - - public String getHost() { return _host; } - public void setHost(String host) { _host = host; } - public InetAddress getAddress() { return _addr; } - public void setAddress(InetAddress addr) { _addr = addr; } - public int getPort() { return _port; } - public void setPort(int port) { _port = port; } - - public boolean isPubliclyRoutable() { - return isPubliclyRoutable(_host); - } - public static boolean isPubliclyRoutable(String host) { - if (host == null) return false; - try { - InetAddress addr = InetAddress.getByName(host); - byte quad[] = addr.getAddress(); - if (quad.length != 4) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Refusing IPv6 address (" + host + " / " + addr.getHostAddress() + ") " - + " since not all peers support it, and we don't support restricted routes"); - return false; - } - return TransportImpl.isPubliclyRoutable(quad); - } catch (Throwable t) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Error checking routability", t); - return false; - } - } - - public String toString() { return _host + ":" + _port; } - - public int hashCode() { - int rv = 0; - rv += _port; - if (_addr != null) - rv += _addr.getHostAddress().hashCode(); - else - if (_host != null) rv += _host.trim().hashCode(); - return rv; - } - - public boolean equals(Object val) { - if ( (val != null) && (val instanceof TCPAddress) ) { - TCPAddress addr = (TCPAddress)val; - String hostname = null; - if (addr.getHost() != null) - hostname = addr.getHost().trim(); - String ourHost = getHost(); - if (ourHost != null) - ourHost = ourHost.trim(); - return DataHelper.eq(hostname, ourHost) && getPort() == addr.getPort(); - } - return false; - } - - public boolean equals(RouterAddress addr) { - if (addr == null) return false; - Properties opts = addr.getOptions(); - if (opts == null) return false; - return ( (_host.equals(opts.getProperty(PROP_HOST))) && - (Integer.toString(_port).equals(opts.getProperty(PROP_PORT))) ); - } -} diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java b/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java deleted file mode 100644 index 7758f6ffa..000000000 --- a/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java +++ /dev/null @@ -1,450 +0,0 @@ -package net.i2p.router.transport.tcp; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.Socket; -import java.util.ArrayList; -import java.util.List; - -import net.i2p.data.DataHelper; -import net.i2p.data.Hash; -import net.i2p.data.RouterIdentity; -import net.i2p.data.i2np.I2NPMessageReader; -import net.i2p.router.OutNetMessage; -import net.i2p.router.RouterContext; -import net.i2p.stat.Rate; -import net.i2p.stat.RateStat; -import net.i2p.util.Log; - -/** - * Central choke point for a single TCP connection to a single peer. - * - */ -public class TCPConnection { - private Log _log; - private RouterContext _context; - private RouterIdentity _ident; - private Hash _attemptedPeer; - private TCPAddress _remoteAddress; - private String _shownAddress; - private List _pendingMessages; - private InputStream _in; - private OutputStream _out; - private Socket _socket; - private TCPTransport _transport; - private ConnectionRunner _runner; - private I2NPMessageReader _reader; - private RateStat _sendRate; - private long _started; - private boolean _closed; - private long _lastRead; - private long _lastWrite; - private long _offsetReceived; - private boolean _isOutbound; - - public TCPConnection(RouterContext ctx) { - _context = ctx; - _log = ctx.logManager().getLog(TCPConnection.class); - _pendingMessages = new ArrayList(4); - _ident = null; - _remoteAddress = null; - _shownAddress = null; - _in = null; - _out = null; - _socket = null; - _transport = null; - _started = -1; - _closed = false; - _lastRead = 0; - _lastWrite = 0; - _offsetReceived = 0; - _isOutbound = false; - _runner = new ConnectionRunner(_context, this); - _context.statManager().createRateStat("tcp.probabalisticDropQueueSize", "How many bytes were queued to be sent when a message as dropped probabalistically?", "TCP", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l } ); - _context.statManager().createRateStat("tcp.queueSize", "How many bytes were queued on a connection?", "TCP", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l } ); - _context.statManager().createRateStat("tcp.sendBps", "How fast are we sending data to a peer?", "TCP", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l } ); - } - - /** Who are we talking with (or null if not identified) */ - public RouterIdentity getRemoteRouterIdentity() { return _ident; } - /** What is the peer's TCP address (using the IP address not hostname) */ - public TCPAddress getRemoteAddress() { return _remoteAddress; } - /** Who we initially were trying to contact */ - public Hash getAttemptedPeer() { return _attemptedPeer; } - /** Who are we talking with (or null if not identified) */ - public void setRemoteRouterIdentity(RouterIdentity ident) { _ident = ident; } - /** What is the peer's TCP address (using the IP address not hostname) */ - public void setRemoteAddress(TCPAddress addr) { _remoteAddress = addr; } - /** Who we initially were trying to contact */ - public void setAttemptedPeer(Hash peer) { _attemptedPeer = peer; } - /** What address the peer said we are reachable on */ - public void setShownAddress(String ip) { _shownAddress = ip; } - /** What address the peer said we are reachable on */ - public String getShownAddress() { return _shownAddress; } - /** skew that the other peer has from our clock */ - public long getOffsetReceived() { return _offsetReceived; } - public void setOffsetReceived(long ms) { _offsetReceived = ms; } - public TCPTransport getTransport() { return _transport; } - public boolean getIsOutbound() { return _isOutbound; } - public void setIsOutbound(boolean outbound) { _isOutbound = outbound; } - - /** - * Actually start processing the messages on the connection (and reading - * from the peer, of course). This call should not block. - * - */ - public void runConnection() { - String peer = _ident.calculateHash().toBase64().substring(0,6); - String name = "TCP Read [" + peer + "]"; - - _sendRate = new RateStat("tcp.sendRatePeer", "How many bytes are in the messages sent to " + peer, peer, new long[] { 60*1000, 5*60*1000, 60*60*1000 }); - - _reader = new I2NPMessageReader(_context, _in, new MessageHandler(_transport, this), name); - _reader.startReading(); - _runner.startRunning(); - _started = _context.clock().now(); - } - - /** - * Disconnect from the peer immediately. This stops any related helper - * threads, closes all streams, and fails all pending messages. This can - * be called multiple times safely. - * - */ - public synchronized void closeConnection() { closeConnection(true); } - public synchronized void closeConnection(boolean wasError) { - if (_log.shouldLog(Log.INFO)) { - if (_ident != null) - _log.info("Connection between " + _ident.getHash().toBase64().substring(0,6) - + " and " + _context.routerHash().toBase64().substring(0,6) - + " closed", new Exception("Closed by")); - else - _log.info("Connection between " + _remoteAddress - + " and " + _context.routerHash().toBase64().substring(0,6) - + " closed", new Exception("Closed by")); - } - if (_closed) return; - _closed = true; - synchronized (_pendingMessages) { - _pendingMessages.notifyAll(); - } - if (_runner != null) - _runner.stopRunning(); - if (_reader != null) - _reader.stopReading(); - if (_in != null) try { _in.close(); } catch (IOException ioe) {} - if (_out != null) try { _out.close(); } catch (IOException ioe) {} - if (_socket != null) try { _socket.close(); } catch (IOException ioe) {} - List msgs = clearPendingMessages(); - for (int i = 0; i < msgs.size(); i++) { - OutNetMessage msg = (OutNetMessage)msgs.get(i); - msg.timestamp("closeConnection"); - _transport.afterSend(msg, false, true, -1); - } - if (wasError) { - _context.profileManager().commErrorOccurred(_ident.getHash()); - _transport.addConnectionErrorMessage("Connection closed with " - + _ident.getHash().toBase64().substring(0,6) - + " after " + DataHelper.formatDuration(getLifetime())); - } - _transport.connectionClosed(this); - } - - /** - * Pull off any unsent OutNetMessages from the queue - * - */ - public List clearPendingMessages() { - List rv = null; - synchronized (_pendingMessages) { - rv = new ArrayList(_pendingMessages); - _pendingMessages.clear(); - _pendingMessages.notifyAll(); - } - return rv; - } - /** - * Add the given message to the outbound queue, notifying our - * runners that we want to send it. - * - */ - public void addMessage(OutNetMessage msg) { - msg.timestamp("TCPConnection.addMessage"); - List expired = null; - int remaining = 0; - long remainingSize = 0; - long curSize = msg.getMessageSize(); // so we don't serialize while locked - synchronized (_pendingMessages) { - _pendingMessages.add(msg); - expired = locked_expireOld(); - List throttled = locked_throttle(); - if (expired == null) - expired = throttled; - else if (throttled != null) - expired.addAll(throttled); - for (int i = 0; i < _pendingMessages.size(); i++) { - OutNetMessage cur = (OutNetMessage)_pendingMessages.get(i); - remaining++; - remainingSize += cur.getMessageSize(); - } - remaining = _pendingMessages.size(); - _pendingMessages.notifyAll(); - } - if (expired != null) { - for (int i = 0; i < expired.size(); i++) { - OutNetMessage cur = (OutNetMessage)expired.get(i); - cur.timestamp("TCPConnection.addMessage expired"); - if (_log.shouldLog(Log.WARN)) - _log.warn("Message " + cur.getMessageId() + " expired on the queue to " - + _ident.getHash().toBase64().substring(0,6) - + " (queue size " + remaining + "/" + remainingSize + ") with lifetime " - + cur.getLifetime() + " and size " + cur.getMessageSize()); - sent(cur, false, 0); - } - } - } - - private boolean shouldDropProbabalistically() { - return Boolean.valueOf(_context.getProperty("tcp.dropProbabalistically", "false")).booleanValue(); - } - - /** - * Implement a probabalistic dropping of messages on the queue to the - * peer along the lines of RFC2309. - * - * @return list of OutNetMessages that were expired, or null - */ - private List locked_throttle() { - if (!shouldDropProbabalistically()) return null; - int bytesQueued = 0; - long earliestExpiration = -1; - for (int i = 0; i < _pendingMessages.size(); i++) { - OutNetMessage msg = (OutNetMessage)_pendingMessages.get(i); - bytesQueued += (int)msg.getMessageSize(); - if ( (earliestExpiration < 0) || (msg.getExpiration() < earliestExpiration) ) - earliestExpiration = msg.getExpiration(); - } - - if (bytesQueued > 0) - _context.statManager().addRateData("tcp.queueSize", bytesQueued, _pendingMessages.size()); - - long sendRate = getSendRate(); - long bytesSendableUntilFirstExpire = sendRate * (earliestExpiration - _context.clock().now()) / 1000; - - // pretend that instead of being able to push bytesSendableUntilFirstExpire, - // that we can only push a fraction of that amount, causing us to probabalistically - // drop more than is necessary (leaving a fraction of the queue 'free' for bursts) - long excessQueued = (long)(bytesQueued - ((double)bytesSendableUntilFirstExpire * (1.0-getQueueFreeFactor()))); - if ( (excessQueued > 0) && (_pendingMessages.size() > 1) && (_transport != null) ) - return locked_probabalisticDrop(excessQueued); - else - return null; - } - - /** - * by default, try to keep the queue completely full, but this can be overridden - * with the property 'tcp.queueFreeFactor' - * - */ - public static final double DEFAULT_QUEUE_FREE_FACTOR = 0.0; - - private double getQueueFreeFactor() { - String factor = _context.getProperty("tcp.queueFreeFactor"); - if (factor != null) { - try { - return Double.parseDouble(factor); - } catch (NumberFormatException nfe) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Invalid tcp.queueFreeFactor [" + factor + "]", nfe); - } - } - return DEFAULT_QUEUE_FREE_FACTOR; - } - - /** how many Bps we are sending data to the peer (or 2KBps if we don't know) */ - public long getSendRate() { - if (_sendRate == null) return 2*1024; - _sendRate.coalesceStats(); - Rate r = _sendRate.getRate(60*1000); - if (r == null) { - return 2*1024; - } else if (r.getLastEventCount() <= 2) { - r = _sendRate.getRate(5*60*1000); - if (r.getLastEventCount() <= 2) - r = _sendRate.getRate(60*60*1000); - } - - if (r.getLastEventCount() <= 2) { - return 2*1024; - } else { - long bps = (long)(r.getLastTotalValue() * 1000 / r.getLastTotalEventTime()); - _context.statManager().addRateData("tcp.sendBps", bps, 0); - return bps; - } - } - - /** - * Probabalistically drop messages in relation to their size vs how much - * we've exceeded our target queue usage. - */ - private List locked_probabalisticDrop(long excessBytesQueued) { - List rv = null; - for (int i = 0; i < _pendingMessages.size() && excessBytesQueued > 0; i++) { - OutNetMessage msg = (OutNetMessage)_pendingMessages.get(i); - int p = getDropProbability(msg.getMessageSize(), excessBytesQueued); - if (_context.random().nextInt(100) < p) { - _pendingMessages.remove(i); - i--; - msg.timestamp("Probabalistically dropped due to queue size " + excessBytesQueued); - if (rv == null) - rv = new ArrayList(1); - rv.add(msg); - //sent(msg, false, -1); - _context.statManager().addRateData("tcp.probabalisticDropQueueSize", excessBytesQueued, msg.getLifetime()); - // since we've already dropped down this amount, lets reduce the - // number of additional messages dropped - excessBytesQueued -= msg.getMessageSize(); - } - } - return rv; - } - - private int getDropProbability(long msgSize, long excessBytesQueued) { - if (msgSize > excessBytesQueued) - return 100; - return (int)(100.0*(msgSize/excessBytesQueued)); - } - - private List locked_expireOld() { - long now = _context.clock().now(); - List expired = null; - for (int i = 0; i < _pendingMessages.size(); i++) { - OutNetMessage cur = (OutNetMessage)_pendingMessages.get(i); - if (cur.getExpiration() < now) { - _pendingMessages.remove(i); - if (expired == null) - expired = new ArrayList(1); - expired.add(cur); - i--; - } - } - return expired; - } - - /** - * Blocking call to retrieve the next pending message. As a side effect, - * this fails messages on the queue that have expired, and in turn never - * returns an expired message. - * - * @return next message or null if the connection has been closed. - */ - OutNetMessage getNextMessage() { - OutNetMessage msg = null; - while ( (msg == null) && (!_closed) ) { - List expired = null; - long now = _context.clock().now(); - int queueSize = 0; - synchronized (_pendingMessages) { - queueSize = _pendingMessages.size(); - for (int i = 0; i < _pendingMessages.size(); i++) { - OutNetMessage cur = (OutNetMessage)_pendingMessages.get(i); - if (cur.getExpiration() < now) { - if (expired == null) - expired = new ArrayList(1); - expired.add(cur); - _pendingMessages.remove(i); - i--; - } - } - - if (_pendingMessages.size() > 0) { - msg = (OutNetMessage)_pendingMessages.remove(0); - } else { - if (expired == null) { - try { - _pendingMessages.wait(); - } catch (InterruptedException ie) {} - } - } - } - if (expired != null) { - for (int i = 0; i < expired.size(); i++) { - OutNetMessage cur = (OutNetMessage)expired.get(i); - cur.timestamp("TCPConnection.getNextMessage expired"); - if (_log.shouldLog(Log.WARN)) - _log.warn("Message " + cur.getMessageId() + " expired on the queue to " - + _ident.getHash().toBase64().substring(0,6) - + " (queue size " + queueSize + ") with lifetime " - + cur.getLifetime()); - sent(cur, false, 0); - } - } - } - - if (msg != null) - msg.timestamp("TCPConnection.getNextMessage retrieved"); - return msg; - } - - /** How long has this connection been active for? */ - public long getLifetime() { return (_started <= 0 ? -1 : _context.clock().now() - _started); } - - void setTransport(TCPTransport transport) { _transport = transport; } - - /** - * Configure where this connection should read its data from. - * This should have any necessary bandwidth limiting and - * encryption filters already wrapped in it. - * - */ - void setInputStream(InputStream in) { _in = in; } - /** - * Configure where this connection should write its data to. - * This should have any necessary bandwidth limiting and - * encryption filters already wrapped in it. - * - */ - void setOutputStream(OutputStream out) { _out = out; } - /** - * Configure what underlying socket this connection uses. - * This is only referenced when closing the connection, and - * only if it was set. - */ - void setSocket(Socket socket) { _socket = socket; } - - /** Where this connection should write its data to. */ - OutputStream getOutputStream() { return _out; } - - /** Have we been closed already? */ - boolean getIsClosed() { return _closed; } - RouterContext getRouterContext() { return _context; } - - boolean getIsActive() { - if ( (_lastRead <= 0) || (_lastWrite <= 0) ) return false; - long recent = (_lastRead > _lastWrite ? _lastRead : _lastWrite); - long howLongAgo = _context.clock().now() - recent; - if (howLongAgo < 1*60*1000) - return true; - else - return false; - } - void messageReceived() { - _lastRead = _context.clock().now(); - } - - /** - * The message was sent. - * - * @param msg message in question - * @param ok was the message sent ok? - * @param time how long did it take to write the message? - */ - void sent(OutNetMessage msg, boolean ok, long time) { - _transport.afterSend(msg, ok, true, time); - if (ok) - _sendRate.addData(msg.getMessageSize(), msg.getLifetime()); - if (ok) - _lastWrite = _context.clock().now(); - } -} diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPConnectionEstablisher.java b/router/java/src/net/i2p/router/transport/tcp/TCPConnectionEstablisher.java deleted file mode 100644 index ae2991495..000000000 --- a/router/java/src/net/i2p/router/transport/tcp/TCPConnectionEstablisher.java +++ /dev/null @@ -1,69 +0,0 @@ -package net.i2p.router.transport.tcp; - -import net.i2p.data.Hash; -import net.i2p.data.RouterInfo; -import net.i2p.router.RouterContext; -import net.i2p.util.Log; - -/** - * Build new outbound connections, one at a time. All the heavy lifting is in - * {@link ConnectionBuilder#establishConnection} - * - */ -public class TCPConnectionEstablisher implements Runnable { - private Log _log; - private RouterContext _context; - private TCPTransport _transport; - - public TCPConnectionEstablisher(RouterContext ctx, TCPTransport transport) { - _context = ctx; - _transport = transport; - _log = ctx.logManager().getLog(TCPConnectionEstablisher.class); - } - - public void run() { - while (true) { - try { - loop(); - } catch (Exception e) { - _log.log(Log.CRIT, "wtf, establisher b0rked. send this stack trace to jrandom", e); - } - } - } - - private void loop() { - RouterInfo info = _transport.getNextPeer(); - if (info == null) { - try { Thread.sleep(5*1000); } catch (InterruptedException ie) {} - return; - } - - ConnectionBuilder cb = new ConnectionBuilder(_context, _transport, info); - TCPConnection con = null; - try { - con = cb.establishConnection(); - } catch (Exception e) { - _log.log(Log.CRIT, "Unhandled exception establishing a connection to " - + info.getIdentity().getHash().toBase64(), e); - } - if (con != null) { - con.setIsOutbound(true); - _transport.connectionEstablished(con); - } else { - if (!_context.router().isAlive()) return; - _transport.addConnectionErrorMessage(cb.getError()); - Hash peer = info.getIdentity().getHash(); - _context.profileManager().commErrorOccurred(peer); - // disabling in preparation for dropping tcp, since other transports may work, and - // hence shitlisting is not appropriate - //_context.shitlist().shitlistRouter(peer, "Unable to contact"); - //_context.netDb().fail(peer); - } - - // this removes the _pending block on the address and - // identity we attempted to contact. if the peer changed - // identities, any additional _pending blocks will also have - // been cleared above with .connectionEstablished - _transport.establishmentComplete(info); - } -} diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPListener.java b/router/java/src/net/i2p/router/transport/tcp/TCPListener.java deleted file mode 100644 index 961cb5cb2..000000000 --- a/router/java/src/net/i2p/router/transport/tcp/TCPListener.java +++ /dev/null @@ -1,338 +0,0 @@ -package net.i2p.router.transport.tcp; -/* - * free (adj.): unencumbered; not under the control of others - * Written by jrandom in 2003 and released into the public domain - * with no warranty of any kind, either expressed or implied. - * It probably won't make your computer catch on fire, or eat - * your children, but it might. Use at your own risk. - * - */ - -import java.io.IOException; -import java.net.InetAddress; -import java.net.ServerSocket; -import java.net.Socket; -import java.net.SocketException; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.List; - -import net.i2p.router.RouterContext; -import net.i2p.util.I2PThread; -import net.i2p.util.Log; -import net.i2p.util.SimpleTimer; - -/** - * Listen for TCP connections with a listener thread - * - */ -class TCPListener { - private Log _log; - private TCPTransport _transport; - private ServerSocket _socket; - private ListenerRunner _listener; - private RouterContext _context; - /** Client Sockets that have been received but not yet handled (oldest first) */ - private List _pendingSockets; - /** List of SocketHandler runners if we're listening (else an empty list) */ - private List _handlers; - - /** - * How many concurrent connection attempts from peers we will try to - * deal with at once. - */ - private static final int CONCURRENT_HANDLERS = 3; - /** - * When things really suck, how long should we wait between attempts to - * listen to the socket? - */ - private final static int MAX_FAIL_DELAY = 5*60*1000; - /** if we're not making progress in 10s, drop 'em */ - final static int HANDLE_TIMEOUT = 30*1000; - /** id generator for the connections */ - private static volatile int __handlerId = 0; - - - public TCPListener(RouterContext context, TCPTransport transport) { - _context = context; - _log = context.logManager().getLog(TCPListener.class); - _transport = transport; - _pendingSockets = new ArrayList(10); - _handlers = new ArrayList(CONCURRENT_HANDLERS); - _context.statManager().createRateStat("tcp.conReceiveOK", "How long does it take to receive a valid connection", "TCP", new long[] { 60*1000, 5*60*1000, 10*60*1000 }); - _context.statManager().createRateStat("tcp.conReceiveFail", "How long does it take to receive a failed connection", "TCP", new long[] { 60*1000, 5*60*1000, 10*60*1000 }); - _context.statManager().createRateStat("tcp.conUnhandled", "How often do we receive a connection but take too long on other ones to handle it", "TCP", new long[] { 60*1000, 5*60*1000, 10*60*1000 }); - } - - /** Make sure we are listening per the transport's config */ - public void startListening() { - TCPAddress addr = new TCPAddress(_transport.getMyHost(), _transport.getPort()); - - if (addr.getPort() > 0) { - if (_listener != null) { - if ( (_listener.getMyAddress().getPort() == addr.getPort()) && - (_listener.getMyAddress().getHost() == null) ) { - _listener.getMyAddress().setHost(addr.getHost()); - } - if (_log.shouldLog(Log.WARN)) - _log.warn("Not starting another listener on " + addr - + " while already listening on " + _listener.getMyAddress()); - return; - } - - _listener = new ListenerRunner(addr); - Thread t = new I2PThread(_listener, "Listener [" + addr.getPort()+"]"); - t.setDaemon(true); - t.start(); - - for (int i = 0; i < CONCURRENT_HANDLERS; i++) { - SocketHandler handler = new SocketHandler(); - _handlers.add(handler); - Thread th = new I2PThread(handler, "Handler " + addr.getPort() + ": " + i); - th.setDaemon(true); - th.start(); - } - } - } - - public void stopListening() { - if (_listener != null) - _listener.stopListening(); - - for (int i = 0; i < _handlers.size(); i++) { - SocketHandler h = (SocketHandler)_handlers.get(i); - h.stopHandling(); - } - _handlers.clear(); - - if (_socket != null) { - try { - _socket.close(); - _socket = null; - } catch (IOException ioe) {} - } - _listener = null; - } - - private InetAddress getInetAddress(String host) { - try { - return InetAddress.getByName(host); - } catch (UnknownHostException uhe) { - _log.warn("Listen host " + host + " unknown", uhe); - try { - return InetAddress.getLocalHost(); - } catch (UnknownHostException uhe2) { - _log.error("Local host is not reachable", uhe2); - return null; - } - } - } - - class ListenerRunner implements Runnable { - private boolean _isRunning; - private int _nextFailDelay = 1000; - private TCPAddress _myAddress; - public ListenerRunner(TCPAddress address) { - _isRunning = true; - _myAddress = address; - } - public void stopListening() { _isRunning = false; } - - public TCPAddress getMyAddress() { return _myAddress; } - - public void run() { - if (_log.shouldLog(Log.INFO)) - _log.info("Beginning TCP listener on " + _myAddress); - - int curDelay = 0; - while (_isRunning) { - try { - if ( (_transport.shouldListenToAllInterfaces()) || (_myAddress.getHost() == null) ) { - _socket = new ServerSocket(_myAddress.getPort()); - } else { - InetAddress listenAddr = getInetAddress(_myAddress.getHost()); - _socket = new ServerSocket(_myAddress.getPort(), 5, listenAddr); - } - String host = (null == _myAddress.getHost() ? "0.0.0.0" : _myAddress.getHost()); - if (_log.shouldLog(Log.INFO)) - _log.info("Begin looping for host " + host + ":" + _myAddress.getPort()); - curDelay = 0; - loop(); - } catch (IOException ioe) { - if (_isRunning && _context.router().isAlive()) - if (_log.shouldLog(Log.ERROR)) - _log.error("Error listening to tcp connection " + _myAddress.getHost() + ":" - + _myAddress.getPort(), ioe); - } - - if (_socket != null) { - try { _socket.close(); } catch (IOException ioe) {} - _socket = null; - } - - if (_log.shouldLog(Log.WARN)) - _log.warn("Error listening, waiting " + _nextFailDelay + "ms before we try again"); - try { Thread.sleep(_nextFailDelay); } catch (InterruptedException ie) {} - curDelay += _nextFailDelay; - _nextFailDelay *= 5; - if (_nextFailDelay > MAX_FAIL_DELAY) - _nextFailDelay = MAX_FAIL_DELAY; - } - if (_isRunning && _context.router().isAlive()) - if (_log.shouldLog(Log.ERROR)) - _log.error("CANCELING TCP LISTEN. delay = " + curDelay); - _isRunning = false; - } - private void loop() { - while (_isRunning && _context.router().isAlive()) { - try { - if (_log.shouldLog(Log.INFO)) - _log.info("Waiting for a connection on " + _myAddress.getHost() + ":" + _myAddress.getPort()); - - Socket s = _socket.accept(); - if (_log.shouldLog(Log.INFO)) - _log.info("Connection handled on " + _myAddress.getHost() + ":" + _myAddress.getPort() + " with " + s.getInetAddress().toString() + ":" + s.getPort()); - - handle(s); - - } catch (SocketException se) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Error handling a connection - closed?", se); - return; - } catch (Throwable t) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Error handling a connection", t); - } - } - } - } - - /** - * Just toss it on a queue for our pool of handlers to deal with (but also - * queue up a timeout event in case they're swamped) - * - */ - private void handle(Socket s) { - SimpleTimer.getInstance().addEvent(new CloseUnhandled(s), HANDLE_TIMEOUT); - synchronized (_pendingSockets) { - _pendingSockets.add(s); - _pendingSockets.notifyAll(); - } - } - - /** callback to close an unhandled socket (if the handlers are overwhelmed) */ - private class CloseUnhandled implements SimpleTimer.TimedEvent { - private Socket _cur; - public CloseUnhandled(Socket socket) { - _cur = socket; - } - public void timeReached() { - boolean removed; - synchronized (_pendingSockets) { - removed = _pendingSockets.remove(_cur); - } - if (removed) { - _context.statManager().addRateData("tcp.conUnhandled", 1, 0); - // handlers hadn't taken it yet, so close it - if (_log.shouldLog(Log.WARN)) - _log.warn("Closing unhandled socket " + _cur); - try { _cur.close(); } catch (IOException ioe) {} - } - } - - } - - /** - * Implement a runner for the pool of handlers, pulling sockets out of the - * _pendingSockets queue and synchronously pumping them through a - * TimedHandler. - * - */ - private class SocketHandler implements Runnable { - private boolean _handle; - public SocketHandler() { - _handle = true; - } - public void run () { - while (_handle) { - Socket cur = null; - try { - synchronized (_pendingSockets) { - if (_pendingSockets.size() <= 0) - _pendingSockets.wait(); - else - cur = (Socket)_pendingSockets.remove(0); - } - } catch (InterruptedException ie) {} - - if (cur != null) - handleSocket(cur); - cur = null; - } - } - public void stopHandling() { _handle = false; } - - /** - * blocking call to establish the basic connection, but with a timeout - * in the TimedHandler - */ - private void handleSocket(Socket s) { - TimedHandler h = new TimedHandler(s); - h.handle(); - } - } - - private class TimedHandler implements SimpleTimer.TimedEvent { - private int _handlerId; - private Socket _socket; - private boolean _wasSuccessful; - public TimedHandler(Socket socket) { - _socket = socket; - _wasSuccessful = false; - _handlerId = ++__handlerId; - } - public int getHandlerId() { return _handlerId; } - public void handle() { - SimpleTimer.getInstance().addEvent(TimedHandler.this, HANDLE_TIMEOUT); - ConnectionHandler ch = new ConnectionHandler(_context, _transport, _socket); - TCPConnection con = null; - try { - long before = System.currentTimeMillis(); - con = ch.receiveConnection(); - long duration = System.currentTimeMillis() - before; - if (con != null) - _context.statManager().addRateData("tcp.conReceiveOK", duration, duration); - else - _context.statManager().addRateData("tcp.conReceiveFail", duration, duration); - } catch (Exception e) { - _log.log(Log.CRIT, "Unhandled exception receiving a connection on " + _socket, e); - } - if (con != null) { - _wasSuccessful = true; - _transport.connectionEstablished(con); - } else if (ch.getTestComplete()) { - // not a connection, but we verified the test - _wasSuccessful = true; - } - if (!_wasSuccessful) - _transport.addConnectionErrorMessage(ch.getError()); - } - public boolean wasSuccessful() { return _wasSuccessful; } - - /** - * Called after a timeout period - if we haven't already established the - * connection, close the socket (interrupting any blocking ops) - * - */ - public void timeReached() { - if (wasSuccessful()) { - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("Handle successful"); - } else { - if (_log.shouldLog(Log.WARN)) - _log.warn("Unable to handle in the time allotted"); - try { _socket.close(); } catch (IOException ioe) {} - } - } - } -} diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java b/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java deleted file mode 100644 index 1f61eed9d..000000000 --- a/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java +++ /dev/null @@ -1,851 +0,0 @@ -package net.i2p.router.transport.tcp; - -import java.io.IOException; -import java.io.Writer; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import net.i2p.data.DataHelper; -import net.i2p.data.Hash; -import net.i2p.data.RouterAddress; -import net.i2p.data.RouterIdentity; -import net.i2p.data.RouterInfo; -import net.i2p.router.OutNetMessage; -import net.i2p.router.RouterContext; -import net.i2p.router.transport.Transport; -import net.i2p.router.transport.TransportBid; -import net.i2p.router.transport.TransportImpl; -import net.i2p.util.I2PThread; -import net.i2p.util.Log; - -/** - * TCP Transport implementation, coordinating the connections - * between peers and the transmission of messages across those - * connections. - * - */ -public class TCPTransport extends TransportImpl { - private final Log _log; - /** Our local TCP address, if known */ - private TCPAddress _myAddress; - /** How we receive connections */ - private TCPListener _listener; - /** Coordinate the agreed connection tags */ - private ConnectionTagManager _tagManager; - - /** H(RouterIdentity) to TCPConnection for fully established connections */ - private Map _connectionsByIdent; - /** TCPAddress::toString() to TCPConnection for fully established connections */ - private Map _connectionsByAddress; - - /** H(RouterIdentity) for not yet established connections */ - private Set _pendingConnectionsByIdent; - /** TCPAddress::toString() for not yet established connections */ - private Set _pendingConnectionsByAddress; - - /** - * H(RouterIdentity) to List of OutNetMessage for messages targetting - * not yet established connections - */ - private Map _pendingMessages; - /** - * Object to lock on when touching the _connection maps or - * the pendingMessages map. In addition, this lock is notified whenever - * a brand new peer is added to the pendingMessages map - */ - private Object _connectionLock; - /** - * List of the most recent connection establishment error messages (where the - * message includes the time) - */ - private List _lastConnectionErrors; - /** All of the operating TCPConnectionEstablisher objects */ - private List _connectionEstablishers; - - private TransportBid _fastBid; - private TransportBid _slowBid; - - /** What is this transport's identifier? */ - public static final String STYLE = "TCP"; - /** Should the TCP listener bind to all interfaces? */ - public static final String BIND_ALL_INTERFACES = "i2np.tcp.bindAllInterfaces"; - /** What host/ip should we be addressed as? */ - public static final String LISTEN_ADDRESS = "i2np.tcp.hostname"; - /** What port number should we listen to? */ - public static final String LISTEN_PORT = "i2np.tcp.port"; - /** Should we allow the transport to listen on a non routable address? */ - public static final String LISTEN_ALLOW_LOCAL = "i2np.tcp.allowLocal"; - /** Keep track of the last 10 error messages wrt establishing a connection */ - public static final int MAX_ERR_MESSAGES = 10; - public static final String PROP_ESTABLISHERS = "i2np.tcp.concurrentEstablishers"; - public static final int DEFAULT_ESTABLISHERS = 3; - - /** Ordered list of supported I2NP protocols */ - public static final int[] SUPPORTED_PROTOCOLS = new int[] { 6 }; // drop < 0.6.1.11 - /** blah, people shouldnt use defaults... */ - public static final int DEFAULT_LISTEN_PORT = 8887; - - /** Creates a new instance of TCPTransport */ - public TCPTransport(RouterContext context) { - super(context); - _log = context.logManager().getLog(TCPTransport.class); - _listener = new TCPListener(context, this); - _myAddress = null; - _tagManager = new PersistentConnectionTagManager(context); - _connectionsByIdent = new HashMap(16); - _connectionsByAddress = new HashMap(16); - _pendingConnectionsByIdent = new HashSet(16); - _pendingConnectionsByAddress = new HashSet(16); - _connectionLock = new Object(); - _pendingMessages = new HashMap(16); - _lastConnectionErrors = new ArrayList(); - _fastBid = new SharedBid(200); - _slowBid = new SharedBid(5000); - - String str = _context.getProperty(PROP_ESTABLISHERS); - int establishers = 0; - if (str != null) { - try { - establishers = Integer.parseInt(str); - } catch (NumberFormatException nfe) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Invalid number of connection establishers [" + str + "]"); - establishers = DEFAULT_ESTABLISHERS; - } - } else { - establishers = DEFAULT_ESTABLISHERS; - } - - _connectionEstablishers = new ArrayList(establishers); - for (int i = 0; i < establishers; i++) { - TCPConnectionEstablisher est = new TCPConnectionEstablisher(_context, this); - _connectionEstablishers.add(est); - String name = _context.routerHash().toBase64().substring(0,6) + " Est" + i; - I2PThread t = new I2PThread(est, name); - t.setDaemon(true); - t.start(); - } - } - - public TransportBid bid(RouterInfo toAddress, long dataSize) { - if (false) return null; - - RouterAddress addr = toAddress.getTargetAddress(STYLE); - - if ( (_myAddress != null) && (_myAddress.equals(addr)) ) - return null; // dont talk to yourself - - if (getIsConnected(toAddress.getIdentity())) { - return _fastBid; - } else { - if (addr == null) - return null; - - return _slowBid; - } - } - - private boolean getIsConnected(RouterIdentity ident) { - Hash peer = ident.calculateHash(); - synchronized (_connectionLock) { - return _connectionsByIdent.containsKey(peer); - } - } - - /** - * Called whenever a new message is ready to be sent. This should - * not block. - * - */ - protected void outboundMessageReady() { - OutNetMessage msg = getNextMessage(); - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("Outbound message ready: " + msg); - - if (msg != null) { - if (msg.getTarget() == null) - throw new IllegalStateException("Null target for a ready message?"); - - msg.timestamp("TCPTransport.outboundMessageReady"); - - TCPConnection con = null; - boolean newPeer = false; - Hash peer = msg.getTarget().getIdentity().calculateHash(); - synchronized (_connectionLock) { - con = (TCPConnection)_connectionsByIdent.get(peer); - if (con == null) { - if (_log.shouldLog(Log.DEBUG)) { - StringBuffer buf = new StringBuffer(128); - buf.append("No connections to "); - buf.append(peer.toBase64().substring(0,6)); - buf.append(", but we are connected to "); - for (Iterator iter = _connectionsByIdent.keySet().iterator(); iter.hasNext(); ) { - Hash cur = (Hash)iter.next(); - buf.append(cur.toBase64().substring(0,6)).append(", "); - } - _log.debug(buf.toString()); - } - List msgs = (List)_pendingMessages.get(peer); - if (msgs == null) { - msgs = new ArrayList(4); - _pendingMessages.put(peer, msgs); - newPeer = true; - } - msgs.add(msg); - msg.timestamp("TCPTransport.outboundMessageReady queued behind " +(msgs.size()-1)); - - if (newPeer) - _connectionLock.notifyAll(); - } - } - - if (con != null) - con.addMessage(msg); - } - } - - - /** - * The connection specified has been fully built - */ - void connectionEstablished(TCPConnection con) { - TCPAddress remAddr = con.getRemoteAddress(); - RouterIdentity ident = con.getRemoteRouterIdentity(); - if ( (remAddr == null) || (ident == null) ) { - con.closeConnection(); - return; - } - - List waitingMsgs = null; - List changedMsgs = null; - boolean alreadyConnected = false; - boolean changedIdents = false; - synchronized (_connectionLock) { - if (_connectionsByAddress.containsKey(remAddr.toString())) { - alreadyConnected = true; - } else { - _connectionsByAddress.put(remAddr.toString(), con); - } - - if (_connectionsByIdent.containsKey(ident.calculateHash())) { - alreadyConnected = true; - } else { - _connectionsByIdent.put(ident.calculateHash(), con); - } - - // just drop the _pending connections - the establisher should fail - // them accordingly. - _pendingConnectionsByAddress.remove(remAddr.toString()); - _pendingConnectionsByIdent.remove(ident.calculateHash()); - if ( (con.getAttemptedPeer() != null) && (!ident.getHash().equals(con.getAttemptedPeer())) ) { - changedIdents = true; - _pendingConnectionsByIdent.remove(con.getAttemptedPeer()); - changedMsgs = (List)_pendingMessages.remove(con.getAttemptedPeer()); - } - - if (!alreadyConnected) - waitingMsgs = (List)_pendingMessages.remove(ident.calculateHash()); - - if (_log.shouldLog(Log.DEBUG)) { - StringBuffer buf = new StringBuffer(256); - buf.append("\nConnection to ").append(ident.getHash().toBase64().substring(0,6)); - buf.append(" built. Already connected? "); - buf.append(alreadyConnected); - buf.append("\nconnectionsByAddress: (cur=").append(remAddr.toString()).append(") "); - for (Iterator iter = _connectionsByAddress.keySet().iterator(); iter.hasNext(); ) { - String addr = (String)iter.next(); - buf.append(addr).append(" "); - } - buf.append("\nconnectionsByIdent: "); - for (Iterator iter = _connectionsByIdent.keySet().iterator(); iter.hasNext(); ) { - Hash h = (Hash)iter.next(); - buf.append(h.toBase64().substring(0,6)).append(" "); - } - - _log.debug(buf.toString()); - } - } - - if (changedIdents) { - _context.shitlist().shitlistRouter(con.getAttemptedPeer(), "Changed identities", STYLE); - if (changedMsgs != null) { - for (int i = 0; i < changedMsgs.size(); i++) { - OutNetMessage cur = (OutNetMessage)changedMsgs.get(i); - cur.timestamp("changedIdents"); - afterSend(cur, false, false, 0); - } - } - } - - if (alreadyConnected) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Closing new duplicate"); - con.setTransport(this); - con.closeConnection(); - } else { - con.setTransport(this); - - if (waitingMsgs != null) { - for (int i = 0; i < waitingMsgs.size(); i++) { - con.addMessage((OutNetMessage)waitingMsgs.get(i)); - } - } - - _context.shitlist().unshitlistRouter(ident.calculateHash(), STYLE); - - con.runConnection(); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Connection set to run"); - } - } - - void connectionClosed(TCPConnection con) { - Hash remotePeer = null; - if (con == null) return; - RouterIdentity peer = con.getRemoteRouterIdentity(); - if (peer == null) return; - remotePeer = peer.getHash(); - synchronized (_connectionLock) { - TCPConnection cur = (TCPConnection)_connectionsByIdent.remove(remotePeer); - if ( (cur != null) && (cur != con) ) - _connectionsByIdent.put(cur.getRemoteRouterIdentity().getHash(), cur); - cur = (TCPConnection)_connectionsByAddress.remove(con.getRemoteAddress().toString()); - if ( (cur != null) && (cur != con) ) - _connectionsByAddress.put(cur.getRemoteAddress().toString(), cur); - - if (_log.shouldLog(Log.DEBUG)) { - StringBuffer buf = new StringBuffer(256); - buf.append("\nCLOSING ").append(con.getRemoteRouterIdentity().getHash().toBase64().substring(0,6)); - buf.append("."); - if (cur != null) - buf.append("\nconnectionsByAddress: (cur=").append(con.getRemoteAddress().toString()).append(") "); - for (Iterator iter = _connectionsByAddress.keySet().iterator(); iter.hasNext(); ) { - String addr = (String)iter.next(); - buf.append(addr).append(" "); - } - buf.append("\nconnectionsByIdent: "); - for (Iterator iter = _connectionsByIdent.keySet().iterator(); iter.hasNext(); ) { - Hash h = (Hash)iter.next(); - buf.append(h.toBase64().substring(0,6)).append(" "); - } - - _log.debug(buf.toString(), new Exception("Closed by")); - } - } - } - - /** - * Blocking call from when a remote peer tells us what they think our - * IP address is. This may do absolutely nothing, or it may fire up a - * new socket listener after stopping an existing one. - * - * @param address address that the remote host said was ours - */ - void ourAddressReceived(String address) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Address received [" + address + "] our address: [" + _myAddress + "]"); - synchronized (_listener) { // no need to lock on the whole TCPTransport - if (allowAddressUpdate(address)) { - int port = getPort(); - TCPAddress addr = new TCPAddress(address, port); - if (addr.getPort() > 0) { - if (allowAddress(addr)) { - if (_myAddress != null) { - if (addr.getAddress().equals(_myAddress.getAddress())) { - // ignore, since there is no change - if (_log.shouldLog(Log.INFO)) - _log.info("Not updating our local address, as it hasnt changed from " + address); - return; - } - } - if (_log.shouldLog(Log.INFO)) - _log.info("Update our local address to " + address); - updateAddress(addr); - } else { - if (_log.shouldLog(Log.WARN)) - _log.warn("Address received is NOT a valid address! [" + addr + "]"); - } - } else { - if (_log.shouldLog(Log.ERROR)) - _log.error("Address specified is not valid [" + address + ":" + port + "]"); - } - } else { - // either we have explicitly specified our IP address, or - // we are already connected to some people. - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Not allowing address update"); - } - } - } - - public RouterAddress startListening() { - configureLocalAddress(); - _listener.startListening(); - if (_myAddress != null) { - RouterAddress rv = _myAddress.toRouterAddress(); - if (rv != null) - replaceAddress(rv); - return rv; - } else { - return null; - } - } - - public void stopListening() { - _listener.stopListening(); - } - - /** - * Should we listen to all interfaces, or just the one specified in - * our TCPAddress? - * - */ - boolean shouldListenToAllInterfaces() { - String val = getContext().getProperty(BIND_ALL_INTERFACES, "TRUE"); - return Boolean.valueOf(val).booleanValue(); - } - - private SimpleDateFormat _fmt = new SimpleDateFormat("dd MMM HH:mm:ss"); - - /** - * Add the given message to the list of most recent connection - * establishment error messages. A timestamp is prefixed to it before - * being rendered on the router console. - * - */ - void addConnectionErrorMessage(String msg) { - synchronized (_fmt) { - msg = _fmt.format(new Date(_context.clock().now())) + ": " + msg; - } - synchronized (_lastConnectionErrors) { - while (_lastConnectionErrors.size() >= MAX_ERR_MESSAGES) - _lastConnectionErrors.remove(0); - _lastConnectionErrors.add(msg); - } - } - - String getMyHost() { - if (_myAddress != null) - return _myAddress.getHost(); - else - return null; - } - public String getStyle() { return STYLE; } - ConnectionTagManager getTagManager() { return _tagManager; } - - /** - * Initialize the _myAddress var with our local address (if possible) - * - */ - private void configureLocalAddress() { - String addr = _context.getProperty(LISTEN_ADDRESS); - int port = getPort(); - if ( (addr == null) || (addr.trim().length() <= 0) ) { - if (_log.shouldLog(Log.ERROR)) - _log.error("External address is not specified - autodetecting IP (be sure to forward port " + port + ")"); - return; - } - if (port != -1) { - TCPAddress address = new TCPAddress(addr, port); - boolean ok = allowAddress(address); - if (ok) { - _myAddress = address; - } else { - if (_log.shouldLog(Log.ERROR)) - _log.error("External address " + addr + " is not valid"); - } - } else { - if (_log.shouldLog(Log.ERROR)) - _log.error("External port is not valid"); - } - } - - /** - * Is the given address a valid one that we could listen to or contact? - * - */ - boolean allowAddress(TCPAddress address) { - if (address == null) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Address is null?!"); - return false; - } - if ( (address.getPort() <= 0) || (address.getPort() > 65535) ) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Port is invalid? " + address.getPort()); - return false; - } - if (!address.isPubliclyRoutable()) { - String allowLocal = _context.getProperty(LISTEN_ALLOW_LOCAL, "false"); - if (Boolean.valueOf(allowLocal).booleanValue()) { - return true; - } else { - if (_log.shouldLog(Log.WARN)) - _log.warn("External address " + address + " is not publicly routable"); - return false; - } - } else { - return true; - } - } - - /** - * Blocking call to unconditionally update our listening address to the - * one specified, updating the routerInfo, etc. - * - */ - private void updateAddress(TCPAddress addr) { - boolean restartListener = true; - if ( (addr.getPort() == getPort()) && (shouldListenToAllInterfaces()) ) - restartListener = false; - - RouterAddress routerAddr = addr.toRouterAddress(); - _myAddress = addr; - - if (restartListener) - _listener.stopListening(); - - replaceAddress(routerAddr); - - _context.router().rebuildRouterInfo(); - - if (_log.shouldLog(Log.INFO)) - _log.info("Updating our local address to include " + addr.toString() - + " and modified our routerInfo to have: " - + _context.router().getRouterInfo().getAddresses()); - - // safe to do multiple times - _listener.startListening(); - } - - /** - * Determine whether we should listen to the peer when they give us what they - * say our IP address is. We should allow a peer to specify our IP address - * if and only if we have not configured our own address explicitly and we - * have no fully established connections. - * - */ - private boolean allowAddressUpdate(String proposedAddress) { - int connectedPeers = countActivePeers(); - boolean addressSpecified = (null != _context.getProperty(LISTEN_ADDRESS)); - if (addressSpecified) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Not allowing address update, sicne we have one specified (#cons=" + connectedPeers + ")"); - return false; - } - if (connectedPeers < 3) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Allowing address update, since the # of connected peers is " + connectedPeers); - return true; - } else if (connectedPeers == 3) { - // ok, now comes the vote: - // if we agree with the majority, allow the update - // otherwise, reject the update - int agreed = countActiveAgreeingPeers(proposedAddress); - if (agreed > 1) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Most common address selected, allowing address update w/ # of connected peers is " + connectedPeers); - return true; - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Proposed address [" + proposedAddress + "] is only used by " + agreed - + ", rejecting address update w/ # of connected peers is " - + connectedPeers); - return false; - } - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Not allowing address update, since the # of connected peers is " + connectedPeers); - return false; - } - } - - /** - * What port should we be reachable on? - * - * @return the port number, or -1 if there is no valid port - */ - int getPort() { - if ( (_myAddress != null) && (_myAddress.getPort() > 0) ) - return _myAddress.getPort(); - - String port = _context.getProperty(LISTEN_PORT, DEFAULT_LISTEN_PORT+""); - if (port != null) { - try { - int portNum = Integer.parseInt(port.trim()); - if ( (portNum >= 1) && (portNum < 65535) ) - return portNum; - } catch (NumberFormatException nfe) { - // fallthrough - } - } - - return -1; - } - - public List getMostRecentErrorMessages() { - return _lastConnectionErrors; - } - - /** - * How many peers can we talk to right now? - * - */ - public int countActivePeers() { - int numActive = 0; - int numInactive = 0; - synchronized (_connectionLock) { - if (_connectionsByIdent.size() <= 0) return 0; - for (Iterator iter = _connectionsByIdent.values().iterator(); iter.hasNext(); ) { - TCPConnection con = (TCPConnection)iter.next(); - if (con.getIsActive()) - numActive++; - else - numInactive++; - } - } - if ( (numInactive > 0) && (_log.shouldLog(Log.DEBUG)) ) - _log.debug("Inactive peers: " + numInactive + " active: " + numActive); - - return numActive; - } - - /** - * How many peers that we are connected to think we are reachable at the given - * address? - * - */ - public int countActiveAgreeingPeers(String address) { - int agreed = 0; - synchronized (_connectionLock) { - if (_connectionsByIdent.size() <= 0) return 0; - for (Iterator iter = _connectionsByIdent.values().iterator(); iter.hasNext(); ) { - TCPConnection con = (TCPConnection)iter.next(); - if (con.getIsActive()) { - String shown = con.getShownAddress(); - if ( (shown != null) && (shown.equals(address)) ) - agreed++; - } - } - } - - return agreed; - } - - /** - * The transport is done sending this message. This exposes the - * superclass's protected method to the current package. - * - * @param msg message in question - * @param sendSuccessful true if the peer received it - * @param msToSend how long it took to transfer the data to the peer - * @param allowRequeue true if we should try other transports if available - */ - public void afterSend(OutNetMessage msg, boolean sendSuccessful, boolean allowRequeue, long msToSend) { - super.afterSend(msg, sendSuccessful, allowRequeue, msToSend); - } - - /** - * Blocking call to retrieve the next peer that we want to establish a - * connection with. - * - */ - RouterInfo getNextPeer() { - while (_context.router().isAlive()) { - synchronized (_connectionLock) { - for (Iterator iter = _pendingMessages.keySet().iterator(); iter.hasNext(); ) { - Hash peer = (Hash)iter.next(); - List msgs = (List)_pendingMessages.get(peer); - if (_pendingConnectionsByIdent.contains(peer)) - continue; // we're already trying to talk to them - - if (msgs.size() <= 0) - continue; // uh... - OutNetMessage msg = (OutNetMessage)msgs.get(0); - RouterAddress addr = msg.getTarget().getTargetAddress(STYLE); - if (addr == null) { - _log.error("Message target has no TCP addresses! " + msg.getTarget()); - iter.remove(); - _context.shitlist().shitlistRouter(peer, "Peer " - + msg.getTarget().getIdentity().calculateHash().toBase64().substring(0,6) - + " has no addresses", STYLE); - _context.netDb().fail(peer); - for (int i = 0; i < msgs.size(); i++) { - OutNetMessage cur = (OutNetMessage)msgs.get(i); - cur.timestamp("no TCP addresses"); - afterSend(cur, false, false, 0); - } - continue; - } - TCPAddress tcpAddr = new TCPAddress(addr); - if (tcpAddr.getPort() <= 0) { - iter.remove(); - _context.shitlist().shitlistRouter(peer, "Peer " - + msg.getTarget().getIdentity().calculateHash().toBase64().substring(0,6) - + " has only invalid addresses", STYLE); - _context.netDb().fail(peer); - for (int i = 0; i < msgs.size(); i++) { - OutNetMessage cur = (OutNetMessage)msgs.get(i); - cur.timestamp("invalid addresses"); - afterSend(cur, false, false, 0); - } - continue; // invalid - } - if (_pendingConnectionsByAddress.contains(tcpAddr.toString())) - continue; // we're already trying to talk to someone at their address - - if (_context.routerHash().equals(peer)) { - _log.error("Message points at us! " + msg.getTarget()); - iter.remove(); - _context.netDb().fail(peer); - for (int i = 0; i < msgs.size(); i++) { - OutNetMessage cur = (OutNetMessage)msgs.get(i); - cur.timestamp("points at us"); - afterSend(cur, false, false, 0); - } - continue; - } - if ( (_myAddress != null) && (_myAddress.equals(tcpAddr)) ) { - _log.error("Message points at our old TCP addresses! " + msg.getTarget()); - iter.remove(); - _context.shitlist().shitlistRouter(peer, "This is our old address...", STYLE); - _context.netDb().fail(peer); - for (int i = 0; i < msgs.size(); i++) { - OutNetMessage cur = (OutNetMessage)msgs.get(i); - cur.timestamp("points at our ip"); - afterSend(cur, false, false, 0); - } - continue; - } - if (!allowAddress(tcpAddr)) { - _log.error("Message points at illegal address! router " - + msg.getTarget().getIdentity().calculateHash().toBase64().substring(0,6) - + " address " + tcpAddr.toString()); - - iter.remove(); - _context.shitlist().shitlistRouter(peer, "Invalid TCP address...", STYLE); - _context.netDb().fail(peer); - for (int i = 0; i < msgs.size(); i++) { - OutNetMessage cur = (OutNetMessage)msgs.get(i); - cur.timestamp("points at an illegal address"); - afterSend(cur, false, false, 0); - } - continue; - } - - // ok, this is someone we can try to contact. mark it as ours. - _pendingConnectionsByIdent.add(peer); - _pendingConnectionsByAddress.add(tcpAddr.toString()); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Add pending connection to: " + peer.toBase64().substring(0,6)); - return msg.getTarget(); - } - - try { - _connectionLock.wait(); - } catch (InterruptedException ie) {} - } - } - return null; - } - - /** Called after an establisher finished (or failed) connecting to the peer */ - void establishmentComplete(RouterInfo info) { - TCPAddress addr = new TCPAddress(info.getTargetAddress(STYLE)); - Hash peer = info.getIdentity().calculateHash(); - List msgs = null; - synchronized (_connectionLock) { - _pendingConnectionsByAddress.remove(addr.toString()); - _pendingConnectionsByIdent.remove(peer); - - msgs = (List)_pendingMessages.remove(peer); - } - - if (msgs != null) { - // messages are only available if the connection failed (since - // connectionEstablished clears them otherwise) - for (int i = 0; i < msgs.size(); i++) { - OutNetMessage msg = (OutNetMessage)msgs.get(i); - msg.timestamp("establishmentComplete(failed)"); - afterSend(msg, false); - } - } - } - - /** Make this stuff pretty (only used in the old console) */ - public void renderStatusHTML(Writer out) throws IOException { - StringBuffer buf = new StringBuffer(1024); - int outbound = 0; - int inbound = 0; - synchronized (_connectionLock) { - long offsetTotal = 0; - buf.append("Connections (").append(_connectionsByIdent.size()).append("):\n"); - - buf.append("Average clock skew, TCP peers: "); - if (_connectionsByIdent.size() > 0) - buf.append(offsetTotal / _connectionsByIdent.size()).append("ms
\n"); - else - buf.append("n/a
\n"); - - buf.append("Connections being built:\n"); - buf.append("Inbound: ").append(inbound).append(", Outbound: ").append(outbound).append("
\n"); - } - - buf.append("Most recent connection errors:"); - - out.write(buf.toString()); - } - - /** - * Cache the bid to reduce object churn - */ - private class SharedBid extends TransportBid { - public SharedBid(int ms) { super(); setLatencyMs(ms); } - public Transport getTransport() { return TCPTransport.this; } - public String toString() { return "TCP bid @ " + getLatencyMs(); } - } -} diff --git a/router/java/src/net/i2p/router/transport/tcp/package.html b/router/java/src/net/i2p/router/transport/tcp/package.html deleted file mode 100644 index c236f03a3..000000000 --- a/router/java/src/net/i2p/router/transport/tcp/package.html +++ /dev/null @@ -1,144 +0,0 @@ - -

-OBSOLETE - see NTCP. -Implements the transport for communicating with other routers via TCP/IP. -

- -

Connection protocol

- -

The protocol used to establish the connection between the peers is -implemented in the {@link net.i2p.router.transport.tcp.ConnectionBuilder} -for "Alice", the initiator, and in -{@link net.i2p.router.transport.tcp.ConnectionHandler} for "Bob", the -receiving peer. (+ implies concatenation)

- -

Common case:

-

1) Alice to Bob:
- #bytesFollowing + #versions + v1 [+ v2 [etc]] + tag? + tagData + properties

-

2) Bob to Alice:
- #bytesFollowing + versionOk + #bytesIP + IP + tagOk? + nonce + properties

- - - -

Whether or not the tagData is specified by Alice and is accepted -by Bob determines which of the scenarios below are used. In addition, the IP -address provided by Bob gives Alice the opportunity to fire up a socket listener -on that interface and include it in her list of reachable addresses. The -properties mappings are left for future expansion.

- -

Connection establishment with a valid tag:

-

With a valid tag and nonce received, both Alice and -Bob load up the previously negotiated sessionKey and set the -iv to the first 16 bytes of H(tag + nonce). The -remainder of the communication is AES256 encrypted per -{@link net.i2p.crypto.AESInputStream} and {@link net.i2p.crypto.AESOutputStream}

- -

3) Alice to Bob:
- H(nonce)

-

4) Bob to Alice:
- H(tag)

-

5) If the hashes are not correct, disconnect immediately and do not - consume the tag

-

6) Alice to Bob:
- routerInfo + currentTime + H(routerInfo + currentTime + nonce + tag)

-

7) Bob should now verify that he can establish a connection to her through one of the - routerAddresses specified in her RouterInfo. The testing process is described below.

-

8) Bob to Alice:
- routerInfo + status + properties + H(routerInfo + status + properties + nonce + tag)

-

9) If the status is ok, both Alice and Bob consume the - tagData, updating the next tag to be H(E(nonce + tag, sessionKey)) - (with nonce+tag padded with 12 bytes of 0x0 at the end). - Otherwise, both sides disconnect and do not consume the tag. In addition, on error the - properties mapping has a more detailed reason under the key "MESSAGE".

- - - -

Connection establishment without a vald tag:

- -

3) Alice to Bob
- X

-

4) Bob to Alice
- Y

-

5) Both sides complete the Diffie-Hellman exchange, setting the - sessionKey to the first 32 bytes of the result (e.g. (X^y mod p)), - iv to the next 16 bytes, and the nextTag to the 32 - bytes after that. The rest of the data is AES256 encrypted with those settings per - {@link net.i2p.crypto.AESInputStream} and {@link net.i2p.crypto.AESOutputStream}

-

6) Alice to Bob
- H(nonce)

-

7) Bob to Alice
- H(nextTag)

-

8) If they disagree, disconnect immediately and do not persist the tags or keys

-

9) Alice to Bob
- routerInfo + currentTime - + S(routerInfo + currentTime + nonce + nextTag, routerIdent.signingKey)

-

10) Bob should now verify that he can establish a connection to her through one of the - routerAddresses specified in her RouterInfo. The testing process is described below.

-

11) Bob to Alice
- routerInfo + status + properties - + S(routerInfo + status + properties + nonce + nextTag, routerIdent.signingKey)

-

12) If the signature matches on both sides and status is ok, both sides - save the sessionKey negotiated as well as the nextTag. - Otherwise, the keys and tags are discarded and both sides drop the connection.

- - - -

Peer testing

-

As mentioned in steps 7 and 10 above, Bob should verify that Alice is reachable -to prevent a restricted route from being formed (he may decide not to do this once -I2P supports restricted routes)

- -

1) Bob to Alice
- 0xFFFF + #versions + v1 [+ v2 [etc]] + properties

-

2) Alice to Bob
- 0xFFFF + versionOk + #bytesIP + IP + currentTime + properties

-

3) Both sides close the socket

- - diff --git a/router/java/src/net/i2p/router/tunnel/pool/HandleTunnelCreateMessageJob.java b/router/java/src/net/i2p/router/tunnel/pool/HandleTunnelCreateMessageJob.java deleted file mode 100644 index 34fbca63b..000000000 --- a/router/java/src/net/i2p/router/tunnel/pool/HandleTunnelCreateMessageJob.java +++ /dev/null @@ -1,224 +0,0 @@ -package net.i2p.router.tunnel.pool; - -import net.i2p.data.Certificate; -import net.i2p.data.DataHelper; -import net.i2p.data.Hash; -import net.i2p.data.RouterIdentity; -import net.i2p.data.RouterInfo; -import net.i2p.data.TunnelId; -import net.i2p.data.i2np.DeliveryInstructions; -import net.i2p.data.i2np.GarlicMessage; -import net.i2p.data.i2np.I2NPMessage; -import net.i2p.data.i2np.TunnelCreateMessage; -import net.i2p.data.i2np.TunnelCreateStatusMessage; -import net.i2p.data.i2np.TunnelGatewayMessage; -import net.i2p.router.HandlerJobBuilder; -import net.i2p.router.Job; -import net.i2p.router.JobImpl; -import net.i2p.router.Router; -import net.i2p.router.RouterContext; -import net.i2p.router.message.GarlicMessageBuilder; -import net.i2p.router.message.PayloadGarlicConfig; -import net.i2p.router.message.SendMessageDirectJob; -import net.i2p.router.peermanager.TunnelHistory; -import net.i2p.router.tunnel.HopConfig; -import net.i2p.util.Log; - -/** - * Receive a request to join a tunnel, and if we aren't overloaded (per the - * throttle), join it (updating the tunnelDispatcher), then send back the - * agreement. Even if we are overloaded, send back a reply stating how - * overloaded we are. - * - */ -public class HandleTunnelCreateMessageJob extends JobImpl { - private Log _log; - private TunnelCreateMessage _request; - private boolean _alreadySearched; - - /** job builder to redirect all tunnelCreateMessages through this job type */ - static class Builder implements HandlerJobBuilder { - private RouterContext _ctx; - public Builder(RouterContext ctx) { _ctx = ctx; } - public Job createJob(I2NPMessage receivedMessage, RouterIdentity from, Hash fromHash) { - return new HandleTunnelCreateMessageJob(_ctx, (TunnelCreateMessage)receivedMessage); - } - } - - - public HandleTunnelCreateMessageJob(RouterContext ctx, TunnelCreateMessage msg) { - super(ctx); - _log = ctx.logManager().getLog(HandleTunnelCreateMessageJob.class); - _request = msg; - _alreadySearched = false; - } - - private static final int STATUS_DEFERRED = 10000; - - public String getName() { return "Handle tunnel join request"; } - public void runJob() { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("handle join request: " + _request); - int status = shouldAccept(); - if (status == STATUS_DEFERRED) { - return; - } else if (status > 0) { - if (_log.shouldLog(Log.WARN)) - _log.warn("reject(" + status + ") join request: " + _request); - sendRejection(status); - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("accept join request: " + _request); - accept(); - } - } - - /** don't accept requests to join for 15 minutes or more */ - public static final int MAX_DURATION_SECONDS = 15*60; - - private int shouldAccept() { - // Should not see any initiation requests in hidden mode - if ("true".equalsIgnoreCase(getContext().getProperty(Router.PROP_HIDDEN, "false"))) { - return TunnelHistory.TUNNEL_REJECT_CRIT; - } - - if (_request.getDurationSeconds() >= MAX_DURATION_SECONDS) - return TunnelHistory.TUNNEL_REJECT_CRIT; - Hash nextRouter = _request.getNextRouter(); - if (nextRouter != null) { - RouterInfo ri = getContext().netDb().lookupRouterInfoLocally(nextRouter); - if (ri == null) { - if (_alreadySearched) // only search once - return TunnelHistory.TUNNEL_REJECT_TRANSIENT_OVERLOAD; - getContext().netDb().lookupRouterInfo(nextRouter, new DeferredAccept(getContext(), true), new DeferredAccept(getContext(), false), 5*1000); - _alreadySearched = true; - return STATUS_DEFERRED; - } - } - return getContext().throttle().acceptTunnelRequest(); - } - - private class DeferredAccept extends JobImpl { - private boolean _shouldAccept; - public DeferredAccept(RouterContext ctx, boolean shouldAccept) { - super(ctx); - _shouldAccept = shouldAccept; - } - public void runJob() { - HandleTunnelCreateMessageJob.this.runJob(); - } - private static final String NAME_OK = "Deferred tunnel accept"; - private static final String NAME_REJECT = "Deferred tunnel reject"; - public String getName() { return _shouldAccept ? NAME_OK : NAME_REJECT; } - } - - private void accept() { - byte recvId[] = new byte[4]; - getContext().random().nextBytes(recvId); - - HopConfig cfg = new HopConfig(); - long expiration = _request.getDurationSeconds()*1000 + getContext().clock().now(); - cfg.setCreation(getContext().clock().now()); - cfg.setExpiration(expiration); - cfg.setIVKey(_request.getIVKey()); - cfg.setLayerKey(_request.getLayerKey()); - cfg.setOptions(_request.getOptions()); - cfg.setReceiveTunnelId(recvId); - - if (_request.getIsGateway()) { - if (_log.shouldLog(Log.INFO)) - _log.info("join as inbound tunnel gateway pointing at " - + _request.getNextRouter().toBase64().substring(0,4) + ":" - + _request.getNextTunnelId() - + " (nonce=" + _request.getNonce() + ")"); - // serve as the inbound tunnel gateway - cfg.setSendTo(_request.getNextRouter()); - TunnelId id = _request.getNextTunnelId(); - if (id == null) { - sendRejection(TunnelHistory.TUNNEL_REJECT_CRIT); - return; - } - cfg.setSendTunnelId(DataHelper.toLong(4, id.getTunnelId())); - getContext().tunnelDispatcher().joinInboundGateway(cfg); - } else if (_request.getNextRouter() == null) { - if (_log.shouldLog(Log.INFO)) - _log.info("join as outbound tunnel endpoint (nonce=" + _request.getNonce() + ")"); - // serve as the outbound tunnel endpoint - getContext().tunnelDispatcher().joinOutboundEndpoint(cfg); - } else { - if (_log.shouldLog(Log.INFO)) - _log.info("join as tunnel participant pointing at " - + _request.getNextRouter().toBase64().substring(0,4) + ":" - + _request.getNextTunnelId() - + " (nonce=" + _request.getNonce() + ")"); - // serve as a general participant - cfg.setSendTo(_request.getNextRouter()); - TunnelId id = _request.getNextTunnelId(); - if (id == null) { - sendRejection(TunnelHistory.TUNNEL_REJECT_CRIT); - return; - } - cfg.setSendTunnelId(DataHelper.toLong(4, id.getTunnelId())); - getContext().tunnelDispatcher().joinParticipant(cfg); - } - - sendAcceptance(recvId); - } - - private static final byte[] REJECTION_TUNNEL_ID = new byte[] { (byte)0xFF, (byte)0xFF, (byte)0xFF, (byte)0xFF }; - private static final int REPLY_TIMEOUT = 30*1000; - private static final int REPLY_PRIORITY = 500; - - private void sendAcceptance(byte recvId[]) { - sendReply(recvId, TunnelCreateStatusMessage.STATUS_SUCCESS); - } - private void sendRejection(int severity) { - sendReply(REJECTION_TUNNEL_ID, severity); - } - private void sendReply(byte recvId[], int status) { - TunnelCreateStatusMessage reply = new TunnelCreateStatusMessage(getContext()); - reply.setNonce(_request.getNonce()); - reply.setReceiveTunnelId(new TunnelId(DataHelper.fromLong(recvId, 0, 4))); - reply.setStatus(status); - - GarlicMessage msg = createReply(reply); - if (msg == null) - throw new RuntimeException("wtf, couldn't create reply? to " + _request); - - TunnelGatewayMessage gw = new TunnelGatewayMessage(getContext()); - gw.setMessage(msg); - gw.setTunnelId(_request.getReplyTunnel()); - gw.setMessageExpiration(msg.getMessageExpiration()); - - if (_log.shouldLog(Log.DEBUG)) - _log.debug("sending (" + status + ") to the tunnel " - + _request.getReplyGateway().toBase64().substring(0,4) + ":" - + _request.getReplyTunnel() + " wrt " + _request); - SendMessageDirectJob job = new SendMessageDirectJob(getContext(), gw, _request.getReplyGateway(), - REPLY_TIMEOUT, REPLY_PRIORITY); - // run it inline (adds to the outNetPool if it has the router info, otherwise queue a lookup) - job.runJob(); - //getContext().jobQueue().addJob(job); - } - - private GarlicMessage createReply(TunnelCreateStatusMessage reply) { - DeliveryInstructions instructions = new DeliveryInstructions(); - instructions.setDeliveryMode(DeliveryInstructions.DELIVERY_MODE_LOCAL); - - PayloadGarlicConfig cfg = new PayloadGarlicConfig(); - cfg.setPayload(reply); - cfg.setCertificate(new Certificate(Certificate.CERTIFICATE_TYPE_NULL, null)); - cfg.setDeliveryInstructions(instructions); - cfg.setRequestAck(false); - cfg.setExpiration(getContext().clock().now() + REPLY_TIMEOUT); - cfg.setId(getContext().random().nextLong(I2NPMessage.MAX_ID_VALUE)); - - GarlicMessage msg = GarlicMessageBuilder.buildMessage(getContext(), cfg, - null, // we dont care about the tags - null, // or keys sent - null, // and we don't know what public key to use - _request.getReplyKey(), _request.getReplyTag()); - return msg; - } - -} diff --git a/router/java/src/net/i2p/router/tunnel/pool/TunnelMessageHandlerBuilder.java b/router/java/src/net/i2p/router/tunnel/pool/TunnelMessageHandlerBuilder.java deleted file mode 100644 index 8b5262dbd..000000000 --- a/router/java/src/net/i2p/router/tunnel/pool/TunnelMessageHandlerBuilder.java +++ /dev/null @@ -1,48 +0,0 @@ -package net.i2p.router.tunnel.pool; - -import net.i2p.data.Hash; -import net.i2p.data.RouterIdentity; -import net.i2p.data.i2np.I2NPMessage; -import net.i2p.data.i2np.TunnelDataMessage; -import net.i2p.data.i2np.TunnelGatewayMessage; -import net.i2p.router.HandlerJobBuilder; -import net.i2p.router.Job; -import net.i2p.router.JobImpl; -import net.i2p.router.RouterContext; - -/** - * - */ -public class TunnelMessageHandlerBuilder implements HandlerJobBuilder { - private RouterContext _context; - - public TunnelMessageHandlerBuilder(RouterContext ctx) { - _context = ctx; - } - - public Job createJob(I2NPMessage receivedMessage, RouterIdentity from, Hash fromHash) { - if ( (fromHash == null) && (from != null) ) - fromHash = from.calculateHash(); - return new HandleJob(_context, receivedMessage, fromHash); - } - - private class HandleJob extends JobImpl { - private I2NPMessage _msg; - private Hash _from; - public HandleJob(RouterContext ctx, I2NPMessage msg, Hash from) { - super(ctx); - _msg = msg; - _from = from; - } - public void runJob() { - if (_msg instanceof TunnelGatewayMessage) { - getContext().tunnelDispatcher().dispatch((TunnelGatewayMessage)_msg); - } else if (_msg instanceof TunnelDataMessage) { - getContext().tunnelDispatcher().dispatch((TunnelDataMessage)_msg, _from); - } - } - - public String getName() { return "Dispatch tunnel message"; } - } - -}