From f87e3b52e305ed78cd67874d2052355603cf32af Mon Sep 17 00:00:00 2001 From: zzz Date: Sat, 16 Jul 2011 20:22:00 +0000 Subject: [PATCH] more on resume/accept, untested --- .../net/i2p/i2ptunnel/I2PTunnelIRCClient.java | 12 +++ .../i2p/i2ptunnel/irc/DCCClientManager.java | 98 +++++++++++++++-- .../i2p/i2ptunnel/irc/I2PTunnelDCCClient.java | 18 +++- .../i2p/i2ptunnel/irc/I2PTunnelDCCServer.java | 100 ++++++++++++++++-- 4 files changed, 204 insertions(+), 24 deletions(-) diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCClient.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCClient.java index d5f4d62d2..a07fb8fea 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCClient.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCClient.java @@ -201,18 +201,30 @@ public class I2PTunnelIRCClient extends I2PTunnelClientBase implements DCCHelper } public int resumeOutgoing(int port) { + DCCClientManager tracker = _DCCClientManager; + if (tracker != null) + return tracker.resumeOutgoing(port); return -1; } public int resumeIncoming(int port) { + I2PTunnelDCCServer server = _DCCServer; + if (server != null) + return server.resumeIncoming(port); return -1; } public int acceptOutgoing(int port) { + I2PTunnelDCCServer server = _DCCServer; + if (server != null) + return server.acceptOutgoing(port); return -1; } public int acceptIncoming(int port) { + DCCClientManager tracker = _DCCClientManager; + if (tracker != null) + return tracker.acceptIncoming(port); return -1; } } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/DCCClientManager.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/DCCClientManager.java index 1b24bc0dc..d13151963 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/DCCClientManager.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/DCCClientManager.java @@ -15,10 +15,14 @@ import net.i2p.util.Log; * *
  *
- *                <---  I2PTunnelDCCServer <--------------- I2PTunnelDCCClient <----
+ *                                            direct conn
+ *                <---> I2PTunnelDCCServer <--------------->I2PTunnelDCCClient <---->
  *   originating                                                                     responding
  *   chat client                                                                     chat client
- *                ---> I2PTunnelIRCClient --> IRC server --> I2TunnelIRCClient ----->
+ *        CHAT    ---> I2PTunnelIRCClient --> IRC server --> I2TunnelIRCClient ----->
+ *        SEND    ---> I2PTunnelIRCClient --> IRC server --> I2TunnelIRCClient ----->
+ *        RESUME  <--- I2PTunnelIRCClient <-- IRC server <-- I2TunnelIRCClient <-----
+ *        ACCEPT  ---> I2PTunnelIRCClient --> IRC server --> I2TunnelIRCClient ----->
  *
  * 
* @@ -31,8 +35,12 @@ public class DCCClientManager extends EventReceiver { private final I2PTunnel _tunnel; private final Log _log; + /** key is the DCC client's local port */ private final ConcurrentHashMap _incoming; + /** key is the DCC client's local port */ private final ConcurrentHashMap _active; + /** key is the DCC client's local port */ + private final ConcurrentHashMap _complete; // list of client tunnels? private static long _id; @@ -50,6 +58,7 @@ public class DCCClientManager extends EventReceiver { _log = tunnel.getContext().logManager().getLog(DCCClientManager.class); _incoming = new ConcurrentHashMap(8); _active = new ConcurrentHashMap(8); + _complete = new ConcurrentHashMap(8); } public boolean close(boolean forced) { @@ -61,18 +70,26 @@ public class DCCClientManager extends EventReceiver { c.stop(); } _active.clear(); + _complete.clear(); return true; } /** * An incoming DCC request * - * @param b32 remote dcc server address - * @param port remote dcc server port + * @param b32 remote dcc server b32 address + * @param port remote dcc server I2P port * @param type ignored - * @return local server port or -1 on error + * @return local DCC client tunnel port or -1 on error */ public int newIncoming(String b32, int port, String type) { + return newIncoming(b32, port, type, 0); + } + + /** + * @param localPort bind to port or 0; if nonzero it will be the rv + */ + private int newIncoming(String b32, int port, String type, int localPort) { expireInbound(); if (_incoming.size() >= MAX_INCOMING_PENDING || _active.size() >= MAX_INCOMING_PENDING) { @@ -83,7 +100,7 @@ public class DCCClientManager extends EventReceiver { try { // Transparent tunnel used for all types... // Do we need to do any filtering for chat? - I2PTunnelDCCClient cTunnel = new I2PTunnelDCCClient(b32, port, l, sockMgr, + I2PTunnelDCCClient cTunnel = new I2PTunnelDCCClient(b32, localPort, port, l, sockMgr, _dispatch, _tunnel, ++_id); cTunnel.attachEventDispatcher(this); int lport = cTunnel.getLocalPort(); @@ -99,6 +116,54 @@ public class DCCClientManager extends EventReceiver { } } + /** + * An outgoing RESUME request + * + * @param port local DCC client tunnel port + * @return remote DCC server i2p port or -1 on error + */ + public int resumeOutgoing(int port) { + Integer lport = Integer.valueOf(port); + I2PTunnelDCCClient tun = _complete.get(lport); + if (tun == null) { + tun = _active.get(lport); + if (tun == null) + // shouldn't happen + tun = _incoming.get(lport); + } + if (tun != null) { + tun.stop(); + return tun.getLocalPort(); + } + return -1; + } + + /** + * An incoming ACCEPT response + * + * @param port remote dcc server I2P port + * @return local DCC client tunnel port or -1 on error + */ + public int acceptIncoming(int port) { + // do a reverse lookup + for (I2PTunnelDCCClient tun : _complete.values()) { + if (tun.getRemotePort() == port) + return newIncoming(tun.getDest(), port, "ACCEPT", tun.getLocalPort()); + } + for (I2PTunnelDCCClient tun : _active.values()) { + if (tun.getRemotePort() == port) + return newIncoming(tun.getDest(), port, "ACCEPT", tun.getLocalPort()); + } + for (I2PTunnelDCCClient tun : _incoming.values()) { + if (tun.getRemotePort() == port) { + // shouldn't happen + tun.stop(); + return newIncoming(tun.getDest(), port, "ACCEPT", tun.getLocalPort()); + } + } + return -1; + } + /** * The EventReceiver callback */ @@ -124,17 +189,23 @@ public class DCCClientManager extends EventReceiver { if (_log.shouldLog(Log.WARN)) _log.warn("Added client tunnel for port " + lport + " pending count now: " + _incoming.size() + - " active count now: " + _active.size()); + " active count now: " + _active.size() + + " complete count now: " + _complete.size()); } } private void connStopped(Integer lport) { - _incoming.remove(lport); - _active.remove(lport); + I2PTunnelDCCClient tun = _incoming.remove(lport); + if (tun != null) + _complete.put(lport, tun); + tun = _active.remove(lport); + if (tun != null) + _complete.put(lport, tun); if (_log.shouldLog(Log.WARN)) _log.warn("Removed client tunnel for port " + lport + " pending count now: " + _incoming.size() + - " active count now: " + _active.size()); + " active count now: " + _active.size() + + " complete count now: " + _complete.size()); } private void expireInbound() { @@ -146,5 +217,12 @@ public class DCCClientManager extends EventReceiver { } } // shouldn't need to expire active + for (Iterator iter = _complete.values().iterator(); iter.hasNext(); ) { + I2PTunnelDCCClient c = iter.next(); + if (c.getExpires() < _tunnel.getContext().clock().now()) { + iter.remove(); + c.stop(); + } + } } } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCClient.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCClient.java index 88d462926..4d70d2ee2 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCClient.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCClient.java @@ -29,21 +29,23 @@ public class I2PTunnelDCCClient extends I2PTunnelClientBase { // delay resolution until connect time private final String _dest; private final int _remotePort; - private final long _expires; + private long _expires; private static final long INBOUND_EXPIRE = 30*60*1000; + private static final long INBOUND_STOP_EXPIRE = 30*60*1000; public static final String CONNECT_START_EVENT = "connectionStarted"; public static final String CONNECT_STOP_EVENT = "connectionStopped"; /** * @param dest the target, presumably b32 + * @param localPort if 0, use any port, get actual port selected with getLocalPort() * @throws IllegalArgumentException if the I2PTunnel does not contain * valid config to contact the router */ - public I2PTunnelDCCClient(String dest, int remotePort, Logging l, + public I2PTunnelDCCClient(String dest, int localPort, int remotePort, Logging l, I2PSocketManager sktMgr, EventDispatcher notifyThis, I2PTunnel tunnel, long clientId) throws IllegalArgumentException { - super(0, l, sktMgr, tunnel, notifyThis, clientId); + super(localPort, l, sktMgr, tunnel, notifyThis, clientId); _dest = dest; _remotePort = remotePort; _expires = tunnel.getContext().clock().now() + INBOUND_EXPIRE; @@ -89,6 +91,14 @@ public class I2PTunnelDCCClient extends I2PTunnelClientBase { return _expires; } + public String getDest() { + return _dest; + } + + public int getRemotePort() { + return _remotePort; + } + /** * Stop listening for new sockets. * We can't call super.close() as it kills all sockets in the sockMgr @@ -112,8 +122,10 @@ public class I2PTunnelDCCClient extends I2PTunnelClientBase { @Override public void run() { + _expires = getTunnel().getContext().clock().now() + INBOUND_STOP_EXPIRE; notifyEvent(CONNECT_START_EVENT, I2PTunnelDCCClient.this); super.run(); + _expires = getTunnel().getContext().clock().now() + INBOUND_STOP_EXPIRE; notifyEvent(CONNECT_STOP_EVENT, Integer.valueOf(getLocalPort())); } } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCServer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCServer.java index ee0432208..0c3ec96ff 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCServer.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCServer.java @@ -6,8 +6,10 @@ import java.net.Socket; import java.net.SocketException; import java.net.UnknownHostException; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import net.i2p.client.streaming.I2PSocket; import net.i2p.client.streaming.I2PSocketManager; @@ -24,10 +26,14 @@ import net.i2p.util.Log; * *
  *
- *                <---  I2PTunnelDCCServer <--------------- I2PTunnelDCCClient <----
+ *                                            direct conn
+ *                <---> I2PTunnelDCCServer <--------------->I2PTunnelDCCClient <---->
  *   originating                                                                     responding
  *   chat client                                                                     chat client
- *                ---> I2PTunnelIRCClient --> IRC server --> I2TunnelIRCClient ----->
+ *        CHAT    ---> I2PTunnelIRCClient --> IRC server --> I2TunnelIRCClient ----->
+ *        SEND    ---> I2PTunnelIRCClient --> IRC server --> I2TunnelIRCClient ----->
+ *        RESUME  <--- I2PTunnelIRCClient <-- IRC server <-- I2TunnelIRCClient <-----
+ *        ACCEPT  ---> I2PTunnelIRCClient --> IRC server --> I2TunnelIRCClient ----->
  *
  * 
* @@ -35,8 +41,13 @@ import net.i2p.util.Log; */ public class I2PTunnelDCCServer extends I2PTunnelServer { + /** key is the server's local I2P port */ private final ConcurrentHashMap _outgoing; - private final ConcurrentHashMap _active; + /** key is the server's local I2P port */ + private final ConcurrentHashMap _active; + /** key is the server's local I2P port */ + private final ConcurrentHashMap _resume; + private final List _sockList; // list of client tunnels? private static long _id; @@ -71,6 +82,8 @@ public class I2PTunnelDCCServer extends I2PTunnelServer { super(DUMMY, 0, sktMgr, l, notifyThis, tunnel); _outgoing = new ConcurrentHashMap(8); _active = new ConcurrentHashMap(8); + _resume = new ConcurrentHashMap(8); + _sockList = new CopyOnWriteArrayList(); } /** @@ -99,8 +112,11 @@ public class I2PTunnelDCCServer extends I2PTunnelServer { _log.warn("Incoming DCC connection for I2P port " + myPort + " sending to " + local.ia + ':' + local.port); Socket s = new Socket(local.ia, local.port); - new I2PTunnelRunner(s, socket, slock, null, null); - _active.put(Integer.valueOf(myPort), socket); + _sockList.add(socket); + new I2PTunnelRunner(s, socket, slock, null, _sockList); + local.socket = socket; + local.expire = getTunnel().getContext().clock().now() + OUTBOUND_EXPIRE; + _active.put(Integer.valueOf(myPort), local); } catch (SocketException ex) { try { socket.close(); @@ -116,6 +132,12 @@ public class I2PTunnelDCCServer extends I2PTunnelServer { public boolean close(boolean forced) { _outgoing.clear(); _active.clear(); + for (I2PSocket s : _sockList) { + try { + s.close(); + } catch (IOException ioe) {} + } + _sockList.clear(); return super.close(forced); } @@ -128,6 +150,13 @@ public class I2PTunnelDCCServer extends I2PTunnelServer { * @return i2p port or -1 on error */ public int newOutgoing(byte[] ip, int port, String type) { + return newOutgoing(ip, port, type, 0); + } + + /** + * @param port local dcc server I2P port or 0 to pick one at random + */ + private int newOutgoing(byte[] ip, int port, String type, int i2pPort) { expireOutbound(); if (_outgoing.size() >= MAX_OUTGOING_PENDING || _active.size() >= MAX_OUTGOING_ACTIVE) { @@ -141,9 +170,14 @@ public class I2PTunnelDCCServer extends I2PTunnelServer { } catch (UnknownHostException uhe) { return -1; } + int limit = i2pPort > 0 ? 10 : 1; LocalAddress client = new LocalAddress(ia, port, getTunnel().getContext().clock().now() + OUTBOUND_EXPIRE); - for (int i = 0; i < 10; i++) { - int iport = MIN_I2P_PORT + getTunnel().getContext().random().nextInt(1 + MAX_I2P_PORT - MIN_I2P_PORT); + for (int i = 0; i < limit; i++) { + int iport; + if (i2pPort > 0) + iport = i2pPort; + else + iport = MIN_I2P_PORT + getTunnel().getContext().random().nextInt(1 + MAX_I2P_PORT - MIN_I2P_PORT); if (_active.containsKey(Integer.valueOf(iport))) continue; LocalAddress old = _outgoing.putIfAbsent(Integer.valueOf(iport), client); @@ -156,6 +190,48 @@ public class I2PTunnelDCCServer extends I2PTunnelServer { return -1; } + /** + * An incoming RESUME request + * + * @param port local dcc server I2P port + * @return local IRC client DCC port or -1 on error + */ + public int resumeIncoming(int port) { + Integer iport = Integer.valueOf(port); + LocalAddress local = _active.remove(iport); + if (local != null) { + local.expire = getTunnel().getContext().clock().now() + OUTBOUND_EXPIRE; + _resume.put(Integer.valueOf(local.port), local); + return local.port; + } + local = _outgoing.get(iport); + if (local != null) { + // shouldn't happen + local.expire = getTunnel().getContext().clock().now() + OUTBOUND_EXPIRE; + return local.port; + } + return -1; + } + + /** + * An outgoing ACCEPT response + * + * @param port local irc client DCC port + * @return local DCC server i2p port or -1 on error + */ + public int acceptOutgoing(int port) { + // do a reverse lookup + for (Iterator> iter = _resume.entrySet().iterator(); iter.hasNext(); ) { + Map.Entry e = iter.next(); + LocalAddress local = e.getValue(); + if (local.port == port) { + iter.remove(); + return newOutgoing(local.ia.getAddress(), port, "ACCEPT", e.getKey().intValue()); + } + } + return -1; + } + private InetAddress getListenHost(Logging l) { try { return InetAddress.getByName(getTunnel().listenHost); @@ -173,9 +249,10 @@ public class I2PTunnelDCCServer extends I2PTunnelServer { if (a.expire < getTunnel().getContext().clock().now()) iter.remove(); } - for (Iterator iter = _active.values().iterator(); iter.hasNext(); ) { - I2PSocket s = iter.next(); - if (s.isClosed()) + for (Iterator iter = _active.values().iterator(); iter.hasNext(); ) { + LocalAddress a = iter.next(); + I2PSocket s = a.socket; + if (s != null && s.isClosed()) iter.remove(); } } @@ -183,7 +260,8 @@ public class I2PTunnelDCCServer extends I2PTunnelServer { private static class LocalAddress { public final InetAddress ia; public final int port; - public final long expire; + public long expire; + public I2PSocket socket; public LocalAddress(InetAddress a, int p, long exp) { ia = a;