diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClient.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClient.java index 9e74f5fe4..373c9fb7d 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClient.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClient.java @@ -407,7 +407,8 @@ public class I2PTunnelHTTPClient extends I2PTunnelClientBase implements Runnable opts.setProperty("i2p.streaming.inactivityTimeoutAction", ""+1); I2PSocket i2ps = createI2PSocket(dest, getDefaultOptions(opts)); byte[] data = newRequest.toString().getBytes("ISO-8859-1"); - I2PTunnelRunner runner = new I2PTunnelRunner(s, i2ps, sockLock, data, mySockets); + Runnable onTimeout = new OnTimeout(s, s.getOutputStream(), targetRequest, usingWWWProxy, currentProxy, requestId); + I2PTunnelRunner runner = new I2PTunnelRunner(s, i2ps, sockLock, data, mySockets, onTimeout); } catch (SocketException ex) { _log.info(getPrefix(requestId) + "Error trying to connect", ex); l.log(ex.getMessage()); @@ -437,6 +438,30 @@ public class I2PTunnelHTTPClient extends I2PTunnelClientBase implements Runnable } } + private class OnTimeout implements Runnable { + private Socket _socket; + private OutputStream _out; + private String _target; + private boolean _usingProxy; + private String _wwwProxy; + private long _requestId; + public OnTimeout(Socket s, OutputStream out, String target, boolean usingProxy, String wwwProxy, long id) { + _socket = s; + _out = out; + _target = target; + _usingProxy = usingProxy; + _wwwProxy = wwwProxy; + _requestId = id; + } + public void run() { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Timeout occured requesting " + _target); + handleHTTPClientException(new RuntimeException("Timeout"), _out, + _target, _usingProxy, _wwwProxy, _requestId); + closeSocket(_socket); + } + } + private static void writeErrorMessage(byte[] errMessage, OutputStream out, String targetRequest, boolean usingWWWProxy, String wwwProxy) throws IOException { if (out != null) { diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java index c4cdc9a7a..6b169f6ca 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java @@ -46,15 +46,23 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL /** when the runner started up */ private long startedOn; private List sockList; + /** if we die before receiving any data, run this job */ + private Runnable onTimeout; + private long totalSent; + private long totalReceived; private volatile long __forwarderId; public I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialData, List sockList) { + this(s, i2ps, slock, initialData, sockList, null); + } + public I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialData, List sockList, Runnable onTimeout) { this.sockList = sockList; this.s = s; this.i2ps = i2ps; this.slock = slock; this.initialData = initialData; + this.onTimeout = onTimeout; lastActivityOn = -1; startedOn = Clock.getInstance().now(); if (_log.shouldLog(Log.INFO)) @@ -97,7 +105,6 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL } public void run() { - boolean closedCleanly = false; try { InputStream in = s.getInputStream(); OutputStream out = s.getOutputStream(); // = new BufferedOutputStream(s.getOutputStream(), NETWORK_BUFFER_SIZE); @@ -113,8 +120,8 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL if (_log.shouldLog(Log.DEBUG)) _log.debug("Initial data " + (initialData != null ? initialData.length : 0) + " written, starting forwarders"); - Thread t1 = new StreamForwarder(in, i2pout, "toI2P"); - Thread t2 = new StreamForwarder(i2pin, out, "fromI2P"); + Thread t1 = new StreamForwarder(in, i2pout, true); + Thread t2 = new StreamForwarder(i2pin, out, false); synchronized (finishLock) { while (!finished) { finishLock.wait(); @@ -122,12 +129,21 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL } if (_log.shouldLog(Log.DEBUG)) _log.debug("At least one forwarder completed, closing and joining"); + + // this task is useful for the httpclient + if (onTimeout != null) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("runner has a timeout job, totalReceived = " + totalReceived + + " totalSent = " + totalSent + " job = " + onTimeout); + if ( (totalSent <= 0) && (totalReceived <= 0) ) + onTimeout.run(); + } + // now one connection is dead - kill the other as well. s.close(); i2ps.close(); t1.join(30*1000); t2.join(30*1000); - closedCleanly = true; } catch (InterruptedException ex) { if (_log.shouldLog(Log.ERROR)) _log.error("Interrupted", ex); @@ -140,21 +156,20 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL } finally { removeRef(); try { - if ( (s != null) && (!closedCleanly) ) + if (s != null) s.close(); } catch (IOException ex) { if (_log.shouldLog(Log.ERROR)) _log.error("Could not close java socket", ex); } - try { - if (i2ps != null) { - if (!closedCleanly) - i2ps.close(); - i2ps.setSocketErrorListener(null); + if (i2ps != null) { + try { + i2ps.close(); + } catch (IOException ex) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Could not close I2PSocket", ex); } - } catch (IOException ex) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Could not close I2PSocket", ex); + i2ps.setSocketErrorListener(null); } } } @@ -181,12 +196,14 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL InputStream in; OutputStream out; String direction; + private boolean _toI2P; private ByteCache _cache; - private StreamForwarder(InputStream in, OutputStream out, String dir) { + private StreamForwarder(InputStream in, OutputStream out, boolean toI2P) { this.in = in; this.out = out; - direction = dir; + _toI2P = toI2P; + direction = (toI2P ? "toI2P" : "fromI2P"); _cache = ByteCache.getInstance(256, NETWORK_BUFFER_SIZE); setName("StreamForwarder " + _runnerId + "." + (++__forwarderId)); start(); @@ -207,6 +224,10 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL int len; while ((len = in.read(buffer)) != -1) { out.write(buffer, 0, len); + if (_toI2P) + totalSent += len; + else + totalReceived += len; if (len > 0) updateActivity(); @@ -259,10 +280,10 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL _log.warn(direction + ": Error closing input stream", ex); } try { - out.close(); - } catch (IOException ex) { + out.flush(); + } catch (IOException ioe) { if (_log.shouldLog(Log.WARN)) - _log.warn(direction + ": Error closing output stream", ex); + _log.warn(direction + ": Error flushing to close", ioe); } synchronized (finishLock) { finished = true; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index 52cf028ac..e8b3d877f 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -304,8 +304,8 @@ public class Connection { _ackedPackets++; if (p.getNumSends() > 1) { _activeResends--; - if (_log.shouldLog(Log.WARN)) - _log.warn("Active resend of " + p + " successful, # active left: " + _activeResends); + if (_log.shouldLog(Log.INFO)) + _log.info("Active resend of " + p + " successful, # active left: " + _activeResends); } } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java index fae7cd0e3..2a17c7a69 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java @@ -157,7 +157,7 @@ public class PacketHandler { } else { // someone is sending us a packet on the wrong stream if (_log.shouldLog(Log.WARN)) - _log.warn("Received a packet on the wrong stream: " + packet); + _log.warn("Received a packet on the wrong stream: " + packet + " connection: " + con); } } } diff --git a/history.txt b/history.txt index 59be21185..4b8604ee5 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,12 @@ -$Id: history.txt,v 1.102 2004/12/08 16:08:11 jrandom Exp $ +$Id: history.txt,v 1.103 2004/12/11 02:05:12 jrandom Exp $ + +2004-12-11 jrandom + * Fix the missing HTTP timeout, which was caused by the deferred syn used + by default. This, in turn, meant the I2PSocket creation doesn't fail + on .connect, but is unable to transfer any data in any direction. We now + detect that condition for the I2PTunnelHTTPClient and throw up the right + error page. + * Logging 2004-12-11 jrandom * Use a simpler and less memory intensive job for processing outbound diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 016ee7f2c..cea7089ce 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.107 $ $Date: 2004/12/08 16:08:10 $"; + public final static String ID = "$Revision: 1.108 $ $Date: 2004/12/11 02:05:13 $"; public final static String VERSION = "0.4.2.3"; - public final static long BUILD = 1; + public final static long BUILD = 2; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID);