I2PTunnel client-side locking fixes (ticket #815)

Checkin of patches from Oct. 2013, based on 0.9.8.1.
Had some issues back then, and not tested recently.
Prop from i2p.i2p to follow.
This commit is contained in:
zzz
2015-03-27 14:16:41 +00:00
parent d7c3ffa4de
commit 3a8ce64c84
6 changed files with 168 additions and 130 deletions

View File

@@ -707,13 +707,13 @@ public class I2PTunnel extends EventDispatcherImpl implements Logging {
if (portNum <= 0)
throw new IllegalArgumentException(getPrefix() + "Bad port " + args[0]);
I2PTunnelTask task;
ownDest = !isShared;
try {
String privateKeyFile = null;
if (args.length >= 4)
privateKeyFile = args[3];
task = new I2PTunnelClient(portNum, args[1], l, ownDest, this, this, privateKeyFile);
I2PTunnelClientBase task = new I2PTunnelClient(portNum, args[1], l, ownDest, this, this, privateKeyFile);
task.startRunning();
addtask(task);
notifyEvent("clientTaskId", Integer.valueOf(task.getId()));
} catch (IllegalArgumentException iae) {
@@ -786,10 +786,10 @@ public class I2PTunnel extends EventDispatcherImpl implements Logging {
}
}
I2PTunnelTask task;
ownDest = !isShared;
try {
task = new I2PTunnelHTTPClient(clientPort, l, ownDest, proxy, this, this);
I2PTunnelClientBase task = new I2PTunnelHTTPClient(clientPort, l, ownDest, proxy, this, this);
task.startRunning();
addtask(task);
notifyEvent("httpclientTaskId", Integer.valueOf(task.getId()));
} catch (IllegalArgumentException iae) {
@@ -855,10 +855,10 @@ public class I2PTunnel extends EventDispatcherImpl implements Logging {
}
}
I2PTunnelTask task;
ownDest = !isShared;
try {
task = new I2PTunnelConnectClient(_port, l, ownDest, proxy, this, this);
I2PTunnelClientBase task = new I2PTunnelConnectClient(_port, l, ownDest, proxy, this, this);
task.startRunning();
addtask(task);
} catch (IllegalArgumentException iae) {
String msg = "Invalid I2PTunnel configuration to create a CONNECT client connecting to the router at " + host + ':'+ port +
@@ -917,13 +917,13 @@ public class I2PTunnel extends EventDispatcherImpl implements Logging {
}
}
I2PTunnelTask task;
ownDest = !isShared;
try {
String privateKeyFile = null;
if (args.length >= 4)
privateKeyFile = args[3];
task = new I2PTunnelIRCClient(_port, args[1], l, ownDest, this, this, privateKeyFile);
I2PTunnelClientBase task = new I2PTunnelIRCClient(_port, args[1], l, ownDest, this, this, privateKeyFile);
task.startRunning();
addtask(task);
notifyEvent("ircclientTaskId", Integer.valueOf(task.getId()));
} catch (IllegalArgumentException iae) {
@@ -977,7 +977,8 @@ public class I2PTunnel extends EventDispatcherImpl implements Logging {
ownDest = !isShared;
try {
I2PTunnelTask task = new I2PSOCKSTunnel(_port, l, ownDest, this, this, null);
I2PTunnelClientBase task = new I2PSOCKSTunnel(_port, l, ownDest, this, this, null);
task.startRunning();
addtask(task);
notifyEvent("sockstunnelTaskId", Integer.valueOf(task.getId()));
} catch (IllegalArgumentException iae) {
@@ -1024,7 +1025,8 @@ public class I2PTunnel extends EventDispatcherImpl implements Logging {
if (args.length == 3)
privateKeyFile = args[2];
try {
I2PTunnelTask task = new I2PSOCKSIRCTunnel(_port, l, ownDest, this, this, privateKeyFile);
I2PTunnelClientBase task = new I2PSOCKSIRCTunnel(_port, l, ownDest, this, this, privateKeyFile);
task.startRunning();
addtask(task);
notifyEvent("sockstunnelTaskId", Integer.valueOf(task.getId()));
} catch (IllegalArgumentException iae) {
@@ -1437,9 +1439,11 @@ public class I2PTunnel extends EventDispatcherImpl implements Logging {
*/
private void runPing(String allargs, Logging l) {
if (allargs.length() != 0) {
I2PTunnelTask task;
// pings always use the main destination
task = new I2Ping(allargs, l, false, this, this);
I2PTunnelTask task = new I2Ping(allargs, l, false, this, this);
// TODO
//I2PTunnelClientBase task = new I2Ping(allargs, l, false, this, this);
//task.startRunning();
addtask(task);
notifyEvent("pingTaskId", Integer.valueOf(task.getId()));
} else {

View File

@@ -32,11 +32,6 @@ public class I2PTunnelClient extends I2PTunnelClientBase {
"Standard client on " + tunnel.listenHost + ':' + localPort,
tunnel, pkf);
if (waitEventValue("openBaseClientResult").equals("error")) {
notifyEvent("openClientResult", "error");
return;
}
StringTokenizer tok = new StringTokenizer(destinations, ", ");
dests = new ArrayList(1);
while (tok.hasMoreTokens()) {

View File

@@ -26,6 +26,7 @@ import java.util.concurrent.ThreadFactory;
import net.i2p.I2PAppContext;
import net.i2p.I2PException;
import net.i2p.client.I2PSession;
import net.i2p.client.I2PSessionException;
import net.i2p.client.streaming.I2PSocket;
import net.i2p.client.streaming.I2PSocketManager;
import net.i2p.client.streaming.I2PSocketManagerFactory;
@@ -52,6 +53,7 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
protected Destination dest = null;
private int localPort;
private final String _handlerName;
private boolean listenerReady = false;
@@ -96,6 +98,7 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
chained = true;
sockMgr = sktMgr;
_clientId = clientId;
_handlerName = "chained";
this.localPort = localPort;
this.l = l;
_ownDest = true; // == ! shared client
@@ -111,31 +114,14 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
_executor = new CustomThreadPoolExecutor();
}
Thread t = new I2PAppThread(this, "Client " + tunnel.listenHost + ':' + localPort);
t.start();
open = true;
synchronized (this) {
while (!listenerReady && open) {
try {
wait();
} catch (InterruptedException e) {
// ignore
}
}
}
if (open && listenerReady) {
l.log("Client ready, listening on " + tunnel.listenHost + ':' + localPort);
notifyEvent("openBaseClientResult", "ok");
} else {
l.log("Client error for " + tunnel.listenHost + ':' + localPort + ", check logs");
notifyEvent("openBaseClientResult", "error");
}
startup();
}
/**
* The main constructor.
* This may take a LONG time if building and starting a new manager.
*
* As of 0.9.20 this is fast, and does NOT connect the manager to the router,
* or open the local socket. You MUST call startRunning() for that.
*
* @param localPort if 0, use any port, get actual port selected with getLocalPort()
* @throws IllegalArgumentException if the I2CP configuration is b0rked so
@@ -149,7 +135,9 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
/**
* Use this to build a client with a persistent private key.
* This may take a LONG time if building and starting a new manager.
*
* As of 0.9.20 this is fast, and does NOT connect the manager to the router,
* or open the local socket. You MUST call startRunning() for that.
*
* @param localPort if 0, use any port, get actual port selected with getLocalPort()
* @param pkf Path to the private key file, or null to generate a transient key
@@ -165,7 +153,7 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
this.localPort = localPort;
this.l = l;
_ownDest = ownDest; // == ! shared client
_handlerName = handlerName;
_context = tunnel.getContext();
_context.statManager().createRateStat("i2ptunnel.client.closeBacklog", "How many pending sockets remain when we close one due to backlog?", "I2PTunnel", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
@@ -194,53 +182,12 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
if (!dccEnabled)
tunnel.getClientOptions().setProperty("i2cp.dontPublishLeaseSet", "true");
boolean openNow = !Boolean.parseBoolean(tunnel.getClientOptions().getProperty("i2cp.delayOpen"));
if (openNow) {
while (sockMgr == null) {
verifySocketManager();
if (sockMgr == null) {
_log.error("Unable to connect to router and build tunnels for " + handlerName);
// FIXME there is a loop in buildSocketManager(), do we really need another one here?
// no matter, buildSocketManager() now throws an IllegalArgumentException
try { Thread.sleep(10*1000); } catch (InterruptedException ie) {}
}
}
// can't be null unless we limit the loop above
//if (sockMgr == null) {
// l.log("Invalid I2CP configuration");
// throw new IllegalArgumentException("Socket manager could not be created");
//}
l.log("Tunnels ready for client: " + handlerName);
} // else delay creating session until createI2PSocket() is called
Thread t = new I2PAppThread(this);
t.setName("Client " + _clientId);
t.start();
open = true;
synchronized (this) {
while (!listenerReady && open) {
try {
wait();
} catch (InterruptedException e) {
// ignore
}
}
}
if (open && listenerReady) {
if (openNow)
l.log("Client ready, listening on " + tunnel.listenHost + ':' + localPort);
else
l.log("Client ready, listening on " + tunnel.listenHost + ':' + localPort + ", delaying tunnel open until required");
notifyEvent("openBaseClientResult", "ok");
} else {
l.log("Client error for " + tunnel.listenHost + ':' + localPort + ", check logs");
notifyEvent("openBaseClientResult", "error");
}
}
/**
* Create the manager if it doesn't exist, AND connect it to the router and
* build tunnels.
*
* Sets the this.sockMgr field if it is null, or if we want a new one.
* This may take a LONG time if building a new manager.
*
@@ -272,6 +219,7 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
this.sockMgr = getSocketManager();
}
}
connectManager();
}
/** this is ONLY for shared clients */
@@ -280,7 +228,8 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
/**
* This is ONLY for shared clients.
* This may take a LONG time if building a new manager.
* As of 0.9.20 this is fast, and does NOT connect the manager to the router.
* Call verifySocketManager() for that.
*
* @return non-null
* @throws IllegalArgumentException if the I2CP configuration is b0rked so
@@ -292,7 +241,8 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
/**
* This is ONLY for shared clients.
* This may take a LONG time if building a new manager.
* As of 0.9.20 this is fast, and does NOT connect the manager to the router.
* Call verifySocketManager() for that.
*
* @return non-null
* @throws IllegalArgumentException if the I2CP configuration is b0rked so
@@ -304,7 +254,8 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
/**
* This is ONLY for shared clients.
* This may take a LONG time if building a new manager.
* As of 0.9.20 this is fast, and does NOT connect the manager to the router.
* Call verifySocketManager() for that.
*
* @return non-null
* @throws IllegalArgumentException if the I2CP configuration is b0rked so
@@ -335,7 +286,10 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
}
/**
* This may take a LONG time.
* For NON-SHARED clients (ownDest = true).
*
* As of 0.9.20 this is fast, and does NOT connect the manager to the router.
* Call verifySocketManager() for that.
*
* @return non-null
* @throws IllegalArgumentException if the I2CP configuration is b0rked so
@@ -345,7 +299,8 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
return buildSocketManager(getTunnel(), this.privKeyFile, this.l);
}
/**
* This may take a LONG time.
* As of 0.9.20 this is fast, and does NOT connect the manager to the router.
* Call verifySocketManager() for that.
*
* @return non-null
* @throws IllegalArgumentException if the I2CP configuration is b0rked so
@@ -359,7 +314,8 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
private static final int MAX_RETRIES = 4;
/**
* This may take a LONG time.
* As of 0.9.20 this is fast, and does NOT connect the manager to the router.
* Call verifySocketManager() for that.
*
* @param pkf absolute path or null
* @return non-null
@@ -371,7 +327,8 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
}
/**
* This may take a LONG time.
* As of 0.9.20 this is fast, and does NOT connect the manager to the router.
* Call verifySocketManager() for that.
*
* @param pkf absolute path or null
* @return non-null
@@ -388,20 +345,22 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
try {
portNum = Integer.parseInt(tunnel.port);
} catch (NumberFormatException nfe) {
_log.log(Log.CRIT, "Invalid port specified [" + tunnel.port + "], reverting to " + portNum);
throw new IllegalArgumentException("Invalid port specified [" + tunnel.port + "]", nfe);
}
}
I2PSocketManager sockManager = null;
// FIXME: Can't stop a tunnel from the UI while it's in this loop (no session yet)
int retries = 0;
while (sockManager == null) {
if (pkf != null) {
// Persistent client dest
FileInputStream fis = null;
try {
if (pkf != null) {
// Persistent client dest
fis = new FileInputStream(pkf);
sockManager = I2PSocketManagerFactory.createManager(fis, tunnel.host, portNum, props);
sockManager = I2PSocketManagerFactory.createDisconnectedManager(fis, tunnel.host, portNum, props);
} else {
sockManager = I2PSocketManagerFactory.createDisconnectedManager(null, tunnel.host, portNum, props);
}
} catch (I2PSessionException ise) {
throw new IllegalArgumentException("Can't create socket manager", ise);
} catch (IOException ioe) {
if (log != null)
log.log("Error opening key file " + ioe);
@@ -411,13 +370,36 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
if (fis != null)
try { fis.close(); } catch (IOException ioe) {}
}
} else {
sockManager = I2PSocketManagerFactory.createManager(tunnel.host, portNum, props);
sockManager.setName("Client");
if (_log.shouldLog(Log.INFO))
_log.info(tunnel.getClientOptions().getProperty("inbound.nickname") + ": Built a new socket manager [s=" + sockManager.getSession() + "]");
tunnel.addSession(sockManager.getSession());
return sockManager;
}
if (sockManager == null) {
// try to make this error sensible as it will happen... sadly we can't get to the listenPort, only the listenHost
String msg = "Unable to connect to the router at " + tunnel.host + ':' + portNum +
/**
* Warning, blocks while connecting to router and building tunnels;
* This may take a LONG time.
*
* @throws IllegalArgumentException if the I2CP configuration is b0rked so
* badly that we cant create a socketManager
* @since 0.9.20
*/
private void connectManager() {
// shadows instance _log
Log _log = getTunnel().getContext().logManager().getLog(I2PTunnelClientBase.class);
Logging log = this.l;
int retries = 0;
while (sockMgr.getSession().isClosed()) {
try {
sockMgr.getSession().connect();
} catch (I2PSessionException ise) {
// try to make this error sensible as it will happen...
String portNum = getTunnel().port;
if (portNum == null)
portNum = "7654";
String msg = "Unable to connect to the router at " + getTunnel().host + ':' + portNum +
" and build tunnels for the client";
if (++retries < MAX_RETRIES) {
if (log != null)
@@ -434,11 +416,6 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
try { Thread.sleep(RETRY_DELAY); } catch (InterruptedException ie) {}
}
}
sockManager.setName("Client");
if (_log.shouldLog(Log.INFO))
_log.info(tunnel.getClientOptions().getProperty("inbound.nickname") + ": Built a new socket manager [s=" + sockManager.getSession() + "]");
tunnel.addSession(sockManager.getSession());
return sockManager;
}
public final int getLocalPort() {
@@ -457,11 +434,65 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
}
/**
* Actually start working on incoming connections. *Must* be
* Actually open the local socket and start working on incoming connections. *Must* be
* called by derived classes after initialization.
*
* (this wasn't actually true until 0.9.20)
*
* This will be fast if i2cp.delayOpen is true, but could take
* a LONG TIME if it is false, as it connects to the router and builds tunnels.
*
*/
public void startRunning() {
boolean openNow = !Boolean.parseBoolean(getTunnel().getClientOptions().getProperty("i2cp.delayOpen"));
if (openNow) {
while (sockMgr == null) {
verifySocketManager();
if (sockMgr == null) {
_log.error("Unable to connect to router and build tunnels for " + _handlerName);
// FIXME there is a loop in buildSocketManager(), do we really need another one here?
// no matter, buildSocketManager() now throws an IllegalArgumentException
try { Thread.sleep(10*1000); } catch (InterruptedException ie) {}
}
}
// can't be null unless we limit the loop above
//if (sockMgr == null) {
// l.log("Invalid I2CP configuration");
// throw new IllegalArgumentException("Socket manager could not be created");
//}
l.log("Tunnels ready for client: " + _handlerName);
} // else delay creating session until createI2PSocket() is called
startup();
}
private void startup() {
// prevent JVM exit when running outside the router
boolean isDaemon = getTunnel().getContext().isRouterContext();
Thread t = new I2PAppThread(this, "Client " + _clientId, isDaemon);
t.start();
open = true;
synchronized (this) {
while (!listenerReady && open) {
try {
wait();
} catch (InterruptedException e) {
// ignore
}
}
}
if (open && listenerReady) {
boolean openNow = !Boolean.parseBoolean(getTunnel().getClientOptions().getProperty("i2cp.delayOpen"));
if (openNow)
l.log("Client ready, listening on " + getTunnel().listenHost + ':' + localPort);
else
l.log("Client ready, listening on " + getTunnel().listenHost + ':' + localPort + ", delaying tunnel open until required");
notifyEvent("openBaseClientResult", "ok");
} else {
l.log("Client error for " + getTunnel().listenHost + ':' + localPort + ", check logs");
notifyEvent("openBaseClientResult", "error");
}
synchronized (startLock) {
startRunning = true;
startLock.notify();
@@ -521,8 +552,25 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
* @return a new I2PSocket
*/
public I2PSocket createI2PSocket(Destination dest) throws I2PException, ConnectException, NoRouteToHostException, InterruptedIOException {
return createI2PSocket(dest, 0);
}
/**
* Create a new I2PSocket towards to the specified destination,
* adding it to the list of connections actually managed by this
* tunnel.
*
* @param dest The destination to connect to
* @param port The destination port to connect to 0 - 65535
* @return a new I2PSocket
* @since 0.9.20
*/
public I2PSocket createI2PSocket(Destination dest, int port)
throws I2PException, ConnectException, NoRouteToHostException, InterruptedIOException {
verifySocketManager();
return createI2PSocket(dest, getDefaultOptions());
I2PSocketOptions opts = getDefaultOptions();
opts.setPort(port);
return createI2PSocket(dest, opts);
}
/**

View File

@@ -106,11 +106,6 @@ public class I2PTunnelConnectClient extends I2PTunnelHTTPClientBase implements R
I2PTunnel tunnel) throws IllegalArgumentException {
super(localPort, ownDest, l, notifyThis, "HTTPS Proxy on " + tunnel.listenHost + ':' + localPort + " #" + (++__clientId), tunnel);
if (waitEventValue("openBaseClientResult").equals("error")) {
notifyEvent("openConnectClientResult", "error");
return;
}
if (wwwProxy != null) {
StringTokenizer tok = new StringTokenizer(wwwProxy, ", ");
while (tok.hasMoreTokens())

View File

@@ -206,10 +206,6 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
_proxyNonce = Long.toString(_context.random().nextLong());
//proxyList = new ArrayList(); // We won't use outside of i2p
if(waitEventValue("openBaseClientResult").equals("error")) {
notifyEvent("openHTTPClientResult", "error");
return;
}
if(wwwProxy != null) {
StringTokenizer tok = new StringTokenizer(wwwProxy, ", ");

View File

@@ -246,7 +246,7 @@ public class TunnelControllerGroup implements ClientApp {
for (int i = 0; i < _controllers.size(); i++) {
TunnelController controller = _controllers.get(i);
if (controller.getStartOnLoad())
controller.startTunnel();
controller.startTunnelBackground();
}
}
}