forked from I2P_Developers/i2p.i2p
- Tracking, expiration, closing of DCC tunnels
- I2PTunnelRunner cleanups
This commit is contained in:
@ -59,7 +59,7 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
|
||||
|
||||
private boolean listenerReady = false;
|
||||
|
||||
private ServerSocket ss;
|
||||
protected ServerSocket ss;
|
||||
|
||||
private final Object startLock = new Object();
|
||||
private boolean startRunning = false;
|
||||
|
@ -144,6 +144,10 @@ public class I2PTunnelIRCClient extends I2PTunnelClientBase implements DCCHelper
|
||||
_DCCServer.close(forced);
|
||||
_DCCServer = null;
|
||||
}
|
||||
if (_DCCClientManager != null) {
|
||||
_DCCClientManager.close(forced);
|
||||
_DCCClientManager = null;
|
||||
}
|
||||
}
|
||||
return super.close(forced);
|
||||
}
|
||||
|
@ -20,10 +20,10 @@ import net.i2p.util.I2PAppThread;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErrorListener {
|
||||
private final static Log _log = new Log(I2PTunnelRunner.class);
|
||||
private final Log _log = new Log(I2PTunnelRunner.class);
|
||||
|
||||
private static volatile long __runnerId;
|
||||
private long _runnerId;
|
||||
private final long _runnerId;
|
||||
/**
|
||||
* max bytes streamed in a packet - smaller ones might be filled
|
||||
* up to this size. Larger ones are not split (at least not on
|
||||
@ -34,20 +34,20 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
|
||||
|
||||
static final int NETWORK_BUFFER_SIZE = MAX_PACKET_SIZE;
|
||||
|
||||
private Socket s;
|
||||
private I2PSocket i2ps;
|
||||
final Object slock, finishLock = new Object();
|
||||
private final Socket s;
|
||||
private final I2PSocket i2ps;
|
||||
private final Object slock, finishLock = new Object();
|
||||
boolean finished = false;
|
||||
HashMap ostreams, sockets;
|
||||
byte[] initialI2PData;
|
||||
byte[] initialSocketData;
|
||||
private HashMap ostreams, sockets;
|
||||
private final byte[] initialI2PData;
|
||||
private final byte[] initialSocketData;
|
||||
/** when the last data was sent/received (or -1 if never) */
|
||||
private long lastActivityOn;
|
||||
/** when the runner started up */
|
||||
private long startedOn;
|
||||
private List sockList;
|
||||
private final long startedOn;
|
||||
private final List sockList;
|
||||
/** if we die before receiving any data, run this job */
|
||||
private Runnable onTimeout;
|
||||
private final Runnable onTimeout;
|
||||
private long totalSent;
|
||||
private long totalReceived;
|
||||
|
||||
@ -56,12 +56,23 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
|
||||
public I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData, List sockList) {
|
||||
this(s, i2ps, slock, initialI2PData, null, sockList, null);
|
||||
}
|
||||
|
||||
public I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData, byte[] initialSocketData, List sockList) {
|
||||
this(s, i2ps, slock, initialI2PData, initialSocketData, sockList, null);
|
||||
}
|
||||
|
||||
public I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData, List sockList, Runnable onTimeout) {
|
||||
this(s, i2ps, slock, initialI2PData, null, sockList, onTimeout);
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts itself
|
||||
*
|
||||
* @param initialI2PData may be null
|
||||
* @param initialSocketData may be null
|
||||
* @param sockList may be null
|
||||
* @param onTImeout may be null
|
||||
*/
|
||||
public I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData, byte[] initialSocketData, List sockList, Runnable onTimeout) {
|
||||
this.sockList = sockList;
|
||||
this.s = s;
|
||||
@ -237,11 +248,11 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
|
||||
|
||||
private class StreamForwarder extends I2PAppThread {
|
||||
|
||||
InputStream in;
|
||||
OutputStream out;
|
||||
String direction;
|
||||
private boolean _toI2P;
|
||||
private ByteCache _cache;
|
||||
private final InputStream in;
|
||||
private final OutputStream out;
|
||||
private final String direction;
|
||||
private final boolean _toI2P;
|
||||
private final ByteCache _cache;
|
||||
|
||||
private StreamForwarder(InputStream in, OutputStream out, boolean toI2P) {
|
||||
this.in = in;
|
||||
|
@ -24,21 +24,22 @@ import net.i2p.util.Log;
|
||||
*
|
||||
* @since 0.8.9
|
||||
*/
|
||||
public class DCCClientManager {
|
||||
|
||||
public class DCCClientManager extends EventReceiver {
|
||||
private final I2PSocketManager sockMgr;
|
||||
private final EventDispatcher _dispatch;
|
||||
private final Logging l;
|
||||
private final I2PTunnel _tunnel;
|
||||
private final Log _log;
|
||||
|
||||
private final ConcurrentHashMap<Integer, I2PAddress> _incoming;
|
||||
private final ConcurrentHashMap<Integer, I2PTunnelDCCClient> _incoming;
|
||||
private final ConcurrentHashMap<Integer, I2PTunnelDCCClient> _active;
|
||||
|
||||
// list of client tunnels?
|
||||
private static long _id;
|
||||
|
||||
private static final int MAX_INCOMING_PENDING = 10;
|
||||
private static final int MAX_INCOMING_ACTIVE = 10;
|
||||
private static final long INBOUND_EXPIRE = 30*60*1000;
|
||||
private static final long ACTIVE_EXPIRE = 60*60*1000;
|
||||
|
||||
public DCCClientManager(I2PSocketManager sktMgr, Logging logging,
|
||||
EventDispatcher dispatch, I2PTunnel tunnel) {
|
||||
@ -48,6 +49,19 @@ public class DCCClientManager {
|
||||
_tunnel = tunnel;
|
||||
_log = tunnel.getContext().logManager().getLog(DCCClientManager.class);
|
||||
_incoming = new ConcurrentHashMap(8);
|
||||
_active = new ConcurrentHashMap(8);
|
||||
}
|
||||
|
||||
public boolean close(boolean forced) {
|
||||
for (I2PTunnelDCCClient c : _incoming.values()) {
|
||||
c.stop();
|
||||
}
|
||||
_incoming.clear();
|
||||
for (I2PTunnelDCCClient c : _active.values()) {
|
||||
c.stop();
|
||||
}
|
||||
_active.clear();
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -60,21 +74,23 @@ public class DCCClientManager {
|
||||
*/
|
||||
public int newIncoming(String b32, int port, String type) {
|
||||
expireInbound();
|
||||
if (_incoming.size() >= MAX_INCOMING_PENDING) {
|
||||
_log.error("Too many incoming DCC, max is " + MAX_INCOMING_PENDING);
|
||||
if (_incoming.size() >= MAX_INCOMING_PENDING ||
|
||||
_active.size() >= MAX_INCOMING_PENDING) {
|
||||
_log.error("Too many incoming DCC, max is " + MAX_INCOMING_PENDING +
|
||||
'/' + MAX_INCOMING_ACTIVE + " pending/active");
|
||||
return -1;
|
||||
}
|
||||
I2PAddress client = new I2PAddress(b32, port, _tunnel.getContext().clock().now() + INBOUND_EXPIRE);
|
||||
try {
|
||||
// Transparent tunnel used for all types...
|
||||
// Do we need to do any filtering for chat?
|
||||
I2PTunnelDCCClient cTunnel = new I2PTunnelDCCClient(b32, port, l, sockMgr,
|
||||
_dispatch, _tunnel, ++_id);
|
||||
cTunnel.attachEventDispatcher(this);
|
||||
int lport = cTunnel.getLocalPort();
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Opened client tunnel at port " + lport +
|
||||
" pointing to " + b32 + ':' + port);
|
||||
_incoming.put(Integer.valueOf(lport), client);
|
||||
_incoming.put(Integer.valueOf(lport), cTunnel);
|
||||
return lport;
|
||||
} catch (IllegalArgumentException uhe) {
|
||||
l.log("Could not find listen host to bind to [" + _tunnel.host + "]");
|
||||
@ -83,23 +99,52 @@ public class DCCClientManager {
|
||||
}
|
||||
}
|
||||
|
||||
private void expireInbound() {
|
||||
for (Iterator<I2PAddress> iter = _incoming.values().iterator(); iter.hasNext(); ) {
|
||||
I2PAddress a = iter.next();
|
||||
if (a.expire < _tunnel.getContext().clock().now())
|
||||
iter.remove();
|
||||
/**
|
||||
* The EventReceiver callback
|
||||
*/
|
||||
public void notifyEvent(String eventName, Object args) {
|
||||
if (eventName.equals(I2PTunnelDCCClient.CONNECT_START_EVENT)) {
|
||||
try {
|
||||
I2PTunnelDCCClient client = (I2PTunnelDCCClient) args;
|
||||
connStarted(client);
|
||||
} catch (ClassCastException cce) {}
|
||||
} else if (eventName.equals(I2PTunnelDCCClient.CONNECT_STOP_EVENT)) {
|
||||
try {
|
||||
Integer port = (Integer) args;
|
||||
connStopped(port);
|
||||
} catch (ClassCastException cce) {}
|
||||
}
|
||||
}
|
||||
|
||||
private static class I2PAddress {
|
||||
public final String dest;
|
||||
public final int port;
|
||||
public final long expire;
|
||||
|
||||
public I2PAddress(String b32, int p, long exp) {
|
||||
dest = b32;
|
||||
port = p;
|
||||
expire = exp;
|
||||
private void connStarted(I2PTunnelDCCClient client) {
|
||||
Integer lport = Integer.valueOf(client.getLocalPort());
|
||||
I2PTunnelDCCClient c = _incoming.remove(lport);
|
||||
if (c != null) {
|
||||
_active.put(lport, client);
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Added client tunnel for port " + lport +
|
||||
" pending count now: " + _incoming.size() +
|
||||
" active count now: " + _active.size());
|
||||
}
|
||||
}
|
||||
|
||||
private void connStopped(Integer lport) {
|
||||
_incoming.remove(lport);
|
||||
_active.remove(lport);
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Removed client tunnel for port " + lport +
|
||||
" pending count now: " + _incoming.size() +
|
||||
" active count now: " + _active.size());
|
||||
}
|
||||
|
||||
private void expireInbound() {
|
||||
for (Iterator<I2PTunnelDCCClient> iter = _incoming.values().iterator(); iter.hasNext(); ) {
|
||||
I2PTunnelDCCClient c = iter.next();
|
||||
if (c.getExpires() < _tunnel.getContext().clock().now()) {
|
||||
iter.remove();
|
||||
c.stop();
|
||||
}
|
||||
}
|
||||
// shouldn't need to expire active
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,76 @@
|
||||
package net.i2p.i2ptunnel.irc;
|
||||
|
||||
/*
|
||||
* free (adj.): unencumbered; not under the control of others Written
|
||||
* by human & jrandom in 2004 and released into the public domain with
|
||||
* no warranty of any kind, either expressed or implied. It probably
|
||||
* won't make your computer catch on fire, or eat your children, but
|
||||
* it might. Use at your own risk.
|
||||
*
|
||||
*/
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
import net.i2p.util.EventDispatcher;
|
||||
|
||||
/**
|
||||
* An implementation of the EventDispatcher interface for
|
||||
* receiving events via in-line notifyEvent() only.
|
||||
* Does not support chaining to additional dispatchers.
|
||||
* Does not support waitEventValue().
|
||||
* Does not support ignoring.
|
||||
*
|
||||
* @since 0.8.9
|
||||
*/
|
||||
public abstract class EventReceiver implements EventDispatcher {
|
||||
|
||||
public EventDispatcher getEventDispatcher() {
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws UnsupportedOperationException always
|
||||
*/
|
||||
public void attachEventDispatcher(EventDispatcher ev) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws UnsupportedOperationException always
|
||||
*/
|
||||
public void detachEventDispatcher(EventDispatcher ev) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
public abstract void notifyEvent(String eventName, Object args);
|
||||
|
||||
/**
|
||||
* @throws UnsupportedOperationException always
|
||||
*/
|
||||
public Object getEventValue(String name) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws UnsupportedOperationException always
|
||||
*/
|
||||
public Set<String> getEvents() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws UnsupportedOperationException always
|
||||
*/
|
||||
public void ignoreEvents() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
public void unIgnoreEvents() {}
|
||||
|
||||
/**
|
||||
* @throws UnsupportedOperationException always
|
||||
*/
|
||||
public Object waitEventValue(String name) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
@ -29,6 +29,11 @@ public class I2PTunnelDCCClient extends I2PTunnelClientBase {
|
||||
// delay resolution until connect time
|
||||
private final String _dest;
|
||||
private final int _remotePort;
|
||||
private final long _expires;
|
||||
|
||||
private static final long INBOUND_EXPIRE = 30*60*1000;
|
||||
public static final String CONNECT_START_EVENT = "connectionStarted";
|
||||
public static final String CONNECT_STOP_EVENT = "connectionStopped";
|
||||
|
||||
/**
|
||||
* @param dest the target, presumably b32
|
||||
@ -41,14 +46,16 @@ public class I2PTunnelDCCClient extends I2PTunnelClientBase {
|
||||
super(0, l, sktMgr, tunnel, notifyThis, clientId);
|
||||
_dest = dest;
|
||||
_remotePort = remotePort;
|
||||
_expires = tunnel.getContext().clock().now() + INBOUND_EXPIRE;
|
||||
|
||||
setName("DCC send -> " + dest + ':' + remotePort);
|
||||
|
||||
startRunning();
|
||||
|
||||
notifyEvent("openClientResult", "ok");
|
||||
}
|
||||
|
||||
/**
|
||||
* Accept one connection only.
|
||||
*/
|
||||
protected void clientConnectionRun(Socket s) {
|
||||
I2PSocket i2ps = null;
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
@ -57,7 +64,8 @@ public class I2PTunnelDCCClient extends I2PTunnelClientBase {
|
||||
if (dest == null) {
|
||||
_log.error("Could not find leaseset for DCC connection to " + _dest + ':' + _remotePort);
|
||||
closeSocket(s);
|
||||
// shutdown?
|
||||
stop();
|
||||
notifyEvent(CONNECT_STOP_EVENT, Integer.valueOf(getLocalPort()));
|
||||
return;
|
||||
}
|
||||
|
||||
@ -65,13 +73,48 @@ public class I2PTunnelDCCClient extends I2PTunnelClientBase {
|
||||
opts.setPort(_remotePort);
|
||||
try {
|
||||
i2ps = createI2PSocket(dest, opts);
|
||||
new I2PTunnelRunner(s, i2ps, sockLock, null, mySockets);
|
||||
new Runner(s, i2ps);
|
||||
} catch (Exception ex) {
|
||||
_log.error("Could not make DCC connection to " + _dest + ':' + _remotePort, ex);
|
||||
closeSocket(s);
|
||||
if (i2ps != null) {
|
||||
try { i2ps.close(); } catch (IOException ioe) {}
|
||||
}
|
||||
notifyEvent(CONNECT_STOP_EVENT, Integer.valueOf(getLocalPort()));
|
||||
}
|
||||
stop();
|
||||
}
|
||||
|
||||
public long getExpires() {
|
||||
return _expires;
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop listening for new sockets.
|
||||
* We can't call super.close() as it kills all sockets in the sockMgr
|
||||
*/
|
||||
public void stop() {
|
||||
open = false;
|
||||
try {
|
||||
ss.close();
|
||||
} catch (IOException ioe) {}
|
||||
}
|
||||
|
||||
/**
|
||||
* Just so we can do the callbacks
|
||||
*/
|
||||
private class Runner extends I2PTunnelRunner {
|
||||
|
||||
public Runner(Socket s, I2PSocket i2ps) {
|
||||
// super calls start()
|
||||
super(s, i2ps, sockLock, null, mySockets);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
notifyEvent(CONNECT_START_EVENT, I2PTunnelDCCClient.this);
|
||||
super.run();
|
||||
notifyEvent(CONNECT_STOP_EVENT, Integer.valueOf(getLocalPort()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -36,6 +36,8 @@ import net.i2p.util.Log;
|
||||
public class I2PTunnelDCCServer extends I2PTunnelServer {
|
||||
|
||||
private final ConcurrentHashMap<Integer, LocalAddress> _outgoing;
|
||||
private final ConcurrentHashMap<Integer, I2PSocket> _active;
|
||||
|
||||
// list of client tunnels?
|
||||
private static long _id;
|
||||
|
||||
@ -54,6 +56,7 @@ public class I2PTunnelDCCServer extends I2PTunnelServer {
|
||||
private static final int MAX_OUTGOING_PENDING = 20;
|
||||
private static final int MAX_OUTGOING_ACTIVE = 20;
|
||||
private static final long OUTBOUND_EXPIRE = 30*60*1000;
|
||||
private static final long ACTIVE_EXPIRE = 60*60*1000;
|
||||
|
||||
/**
|
||||
* There's no support for unsolicited incoming I2P connections,
|
||||
@ -67,6 +70,7 @@ public class I2PTunnelDCCServer extends I2PTunnelServer {
|
||||
EventDispatcher notifyThis, I2PTunnel tunnel) {
|
||||
super(DUMMY, 0, sktMgr, l, notifyThis, tunnel);
|
||||
_outgoing = new ConcurrentHashMap(8);
|
||||
_active = new ConcurrentHashMap(8);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -81,11 +85,11 @@ public class I2PTunnelDCCServer extends I2PTunnelServer {
|
||||
try {
|
||||
expireOutbound();
|
||||
int myPort = socket.getLocalPort();
|
||||
// TODO remove, add to active
|
||||
LocalAddress local = _outgoing.get(Integer.valueOf(myPort));
|
||||
// Port is a one-time-use only
|
||||
LocalAddress local = _outgoing.remove(Integer.valueOf(myPort));
|
||||
if (local == null) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Incoming DCC connection for unknown port " + myPort);
|
||||
_log.warn("Rejecting incoming DCC connection for unknown port " + myPort);
|
||||
try {
|
||||
socket.close();
|
||||
} catch (IOException ioe) {}
|
||||
@ -96,6 +100,7 @@ public class I2PTunnelDCCServer extends I2PTunnelServer {
|
||||
" sending to " + local.ia + ':' + local.port);
|
||||
Socket s = new Socket(local.ia, local.port);
|
||||
new I2PTunnelRunner(s, socket, slock, null, null);
|
||||
_active.put(Integer.valueOf(myPort), socket);
|
||||
} catch (SocketException ex) {
|
||||
try {
|
||||
socket.close();
|
||||
@ -107,6 +112,13 @@ public class I2PTunnelDCCServer extends I2PTunnelServer {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean close(boolean forced) {
|
||||
_outgoing.clear();
|
||||
_active.clear();
|
||||
return super.close(forced);
|
||||
}
|
||||
|
||||
/**
|
||||
* An outgoing DCC request
|
||||
*
|
||||
@ -117,8 +129,10 @@ public class I2PTunnelDCCServer extends I2PTunnelServer {
|
||||
*/
|
||||
public int newOutgoing(byte[] ip, int port, String type) {
|
||||
expireOutbound();
|
||||
if (_outgoing.size() >= MAX_OUTGOING_PENDING) {
|
||||
_log.error("Too many outgoing DCC, max is " + MAX_OUTGOING_PENDING);
|
||||
if (_outgoing.size() >= MAX_OUTGOING_PENDING ||
|
||||
_active.size() >= MAX_OUTGOING_ACTIVE) {
|
||||
_log.error("Too many outgoing DCC, max is " + MAX_OUTGOING_PENDING +
|
||||
'/' + MAX_OUTGOING_ACTIVE + " pending/active");
|
||||
return -1;
|
||||
}
|
||||
InetAddress ia;
|
||||
@ -130,6 +144,8 @@ public class I2PTunnelDCCServer extends I2PTunnelServer {
|
||||
LocalAddress client = new LocalAddress(ia, port, getTunnel().getContext().clock().now() + OUTBOUND_EXPIRE);
|
||||
for (int i = 0; i < 10; i++) {
|
||||
int iport = MIN_I2P_PORT + getTunnel().getContext().random().nextInt(1 + MAX_I2P_PORT - MIN_I2P_PORT);
|
||||
if (_active.containsKey(Integer.valueOf(iport)))
|
||||
continue;
|
||||
LocalAddress old = _outgoing.putIfAbsent(Integer.valueOf(iport), client);
|
||||
if (old != null)
|
||||
continue;
|
||||
@ -157,6 +173,11 @@ public class I2PTunnelDCCServer extends I2PTunnelServer {
|
||||
if (a.expire < getTunnel().getContext().clock().now())
|
||||
iter.remove();
|
||||
}
|
||||
for (Iterator<I2PSocket> iter = _active.values().iterator(); iter.hasNext(); ) {
|
||||
I2PSocket s = iter.next();
|
||||
if (s.isClosed())
|
||||
iter.remove();
|
||||
}
|
||||
}
|
||||
|
||||
private static class LocalAddress {
|
||||
|
Reference in New Issue
Block a user