diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java index e209e86515..e35908ab56 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java @@ -19,6 +19,8 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Properties; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicInteger; import java.util.zip.GZIPOutputStream; import javax.net.ssl.SSLException; @@ -55,11 +57,13 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer { public static final String OPT_REJECT_REFERER = "rejectReferer"; public static final String OPT_REJECT_USER_AGENTS = "rejectUserAgents"; public static final String OPT_USER_AGENTS = "userAgentRejectList"; + public static final String OPT_KEEPALIVE = "keepalive.i2p"; public static final int DEFAULT_POST_WINDOW = 5*60; public static final int DEFAULT_POST_BAN_TIME = 20*60; public static final int DEFAULT_POST_TOTAL_BAN_TIME = 10*60; public static final int DEFAULT_POST_MAX = 6; public static final int DEFAULT_POST_TOTAL_MAX = 20; + private static final boolean DEFAULT_KEEPALIVE = true; /** what Host: should we seem to be to the webserver? */ private String _spoofHost; @@ -83,8 +87,8 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer { PROXY_HEADER, PROXY_CONN_HEADER}; /** timeout for first request line */ private static final long HEADER_TIMEOUT = 15*1000; - /** total timeout for the request and all the headers */ - private static final long TOTAL_HEADER_TIMEOUT = 2 * HEADER_TIMEOUT; + /** timeout for the rest of the request headers */ + private static final long HEADER_FINISH_TIMEOUT = HEADER_TIMEOUT; private static final long START_INTERVAL = (60 * 1000) * 3; private static final int MAX_LINE_LENGTH = 8*1024; /** ridiculously long, just to prevent OOM DOS @since 0.7.13 */ @@ -98,7 +102,8 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer { // Set a relatively short timeout for GET/HEAD, // and a long failsafe timeout for POST/CONNECT, since the user // could be POSTing a massive file - private static final int SERVER_READ_TIMEOUT_GET = 5*60*1000; + private static final int SERVER_READ_TIMEOUT_GET = 60*1000; + private static final int SERVER_READ_TIMEOUT_MEDIUM = 5*60*1000; private static final int SERVER_READ_TIMEOUT_POST = 4*60*60*1000; private long _startedOn = 0L; @@ -299,6 +304,9 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer { } catch (IOException ioe) {} return; } + // We don't know if this is GET or POST or what, set a huge + // timeout and rely on the server to do the actual timeout + socket.setReadTimeout(SERVER_READ_TIMEOUT_POST); Socket s = getSocket(socket.getPeerDestination().calculateHash(), 443); Runnable t = new I2PTunnelRunner(s, socket, slock, null, null, null, (I2PTunnelRunner.FailCallback) null); @@ -307,6 +315,17 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer { } long afterAccept = getTunnel().getContext().clock().now(); + int requestCount = 0; + boolean keepalive = getBooleanOption(OPT_KEEPALIVE, DEFAULT_KEEPALIVE); + + // indent + do { + // indent + + if (requestCount > 0) { + if (_log.shouldInfo()) + _log.info("Keepalive, awaiting request #" + requestCount); + } // The headers _should_ be in the first packet, but // may not be, depending on the client-side options @@ -316,27 +335,35 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer { try { // catch specific exceptions thrown, to return a good // error to the client + // Add 10s to client-side timeout so the client will timeout first and minimize races + long timeout = requestCount > 0 ? I2PTunnelHTTPClient.BROWSER_KEEPALIVE_TIMEOUT + 10*1000 : HEADER_TIMEOUT; headers = readHeaders(socket, null, command, - CLIENT_SKIPHEADERS, getTunnel().getContext()); + CLIENT_SKIPHEADERS, getTunnel().getContext(), timeout); } catch (SocketTimeoutException ste) { - try { - sendError(socket, ERR_REQUEST_TIMEOUT); - } catch (IOException ioe) { - } finally { - try { socket.close(); } catch (IOException ioe) {} + if (requestCount > 0) { + if (_log.shouldDebug()) + _log.debug("Timeout awaiting request #" + requestCount); + } else { + try { + sendError(socket, ERR_REQUEST_TIMEOUT); + } catch (IOException ioe) {} + if (_log.shouldLog(Log.WARN)) + _log.warn("Error in the HTTP request from " + peerB32, ste); } - if (_log.shouldLog(Log.WARN)) - _log.warn("Error in the HTTP request from " + peerB32, ste); + try { socket.close(); } catch (IOException ioe) {} return; } catch (EOFException eofe) { - try { - sendError(socket, ERR_BAD_REQUEST); - } catch (IOException ioe) { - } finally { - try { socket.close(); } catch (IOException ioe) {} + if (requestCount > 0) { + if (_log.shouldDebug()) + _log.debug("Client closed awaiting request #" + requestCount); + } else { + try { + sendError(socket, ERR_BAD_REQUEST); + } catch (IOException ioe) {} + if (_log.shouldLog(Log.WARN)) + _log.warn("Error in the HTTP request from " + peerB32, eofe); } - if (_log.shouldLog(Log.WARN)) - _log.warn("Error in the HTTP request from " + peerB32, eofe); + try { socket.close(); } catch (IOException ioe) {} return; } catch (LineTooLongException ltle) { try { @@ -518,9 +545,35 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer { setEntry(headers, "Host", spoofHost); // Force Connection: close, unless websocket + boolean upgrade = false; String conn = getEntryOrNull(headers, "Connection"); - if (conn == null || !conn.toLowerCase(Locale.US).contains("upgrade")) + if (conn == null) { setEntry(headers, "Connection", "close"); + } else { + String connlc = conn.toLowerCase(Locale.US); + if (connlc.contains("upgrade")) { + upgrade = true; + keepalive = false; + } else { + if (!connlc.contains("keep-alive")) + keepalive = false; + setEntry(headers, "Connection", "close"); + } + } + + // HTTP Persistent Connections (RFC 2616) + // for the I2P socket. + // Keep it very simple. + // Will be set to false for non-GET/HEAD, non-HTTP/1.1, + // Connection: close, InternalSocket, + // or after analysis of the response headers in CompressedOutputStream, + // or on errors in I2PTunnelRunner. + // We do NOT support keepalive on the server socket. + String cmd = command.toString().trim(); + if (!cmd.endsWith(" HTTP/1.1") || + !(cmd.startsWith("GET ") || cmd.startsWith("HEAD "))) { + keepalive = false; + } // we keep the enc sent by the browser before clobbering it, since it may have // been x-i2p-gzip @@ -554,33 +607,62 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer { String modifiedHeader = formatHeaders(headers, command); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Modified header: [" + modifiedHeader + "]"); + _log.debug("Modified headers: [\n" + modifiedHeader + "]"); - // Set a relatively short timeout for GET/HEAD, - // and a long failsafe timeout for POST/CONNECT, since the user - // could be POSTing a massive file - if (modifiedHeader.startsWith("GET ") || modifiedHeader.startsWith("HEAD ")) - s.setSoTimeout(SERVER_READ_TIMEOUT_GET); - else - s.setSoTimeout(SERVER_READ_TIMEOUT_POST); - boolean compress = allowGZIP && useGZIP; - Runnable t = new CompressedRequestor(s, socket, modifiedHeader, getTunnel().getContext(), _log, compress); + // waiter is notified when the thread is done + AtomicInteger waiter = keepalive ? new AtomicInteger() : null; + Runnable t = new CompressedRequestor(s, socket, modifiedHeader, getTunnel().getContext(), + _log, compress, upgrade, _clientExecutor, keepalive, waiter); // run in the unlimited client pool //t.start(); _clientExecutor.execute(t); long afterHandle = getTunnel().getContext().clock().now(); - long timeToHandle = afterHandle - afterAccept; - getTunnel().getContext().statManager().addRateData("i2ptunnel.httpserver.blockingHandleTime", timeToHandle); - if ( (timeToHandle > 1000) && (_log.shouldLog(Log.WARN)) ) - _log.warn("Took a while to handle the request for " + remoteHost + ':' + remotePort + - " from: " + peerB32 + - " [" + timeToHandle + - ", read headers: " + (afterHeaders-afterAccept) + - ", socket create: " + (afterSocket-afterHeaders) + - ", start runners: " + (afterHandle-afterSocket) + - "]"); + if (requestCount == 0) { + long timeToHandle = afterHandle - afterAccept; + getTunnel().getContext().statManager().addRateData("i2ptunnel.httpserver.blockingHandleTime", timeToHandle); + if ( (timeToHandle > 1000) && (_log.shouldLog(Log.WARN)) ) + _log.warn("Took a while to handle the request for " + remoteHost + ':' + remotePort + + " from: " + peerB32 + + " [" + timeToHandle + + ", read headers: " + (afterHeaders-afterAccept) + + ", socket create: " + (afterSocket-afterHeaders) + + ", start runners: " + (afterHandle-afterSocket) + + "]"); + } + + if (keepalive) { + // wait for the response to finish, then determine + // if we can receive another request on this socket + if (_log.shouldDebug()) + _log.debug("Waiting for response " + requestCount + " to finish"); + try { + synchronized(waiter) { + if (waiter.get() == 0) + waiter.wait(30*1000); + } + } catch (InterruptedException ie) { + if (_log.shouldWarn()) + _log.warn("Interrupted waiting for response to finish"); + break; + } + if (_log.shouldInfo()) { + long timeToWait = getTunnel().getContext().clock().now() - afterAccept; + _log.info("Waited " + timeToWait + " for response " + requestCount + " to complete, code: " + waiter); + } + // 0: not done; 1: not keepalive-able response; 2: keepalive + if (waiter.get() != 2) + break; + } + + // go around again + requestCount++; + + // indent + } while (keepalive); + // indent + } catch (SocketException ex) { int port = socket.getLocalPort(); try { @@ -636,47 +718,68 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer { // shadows _log in super() private final Log _log; private final boolean _shouldCompress; + private final boolean _upgrade; + private final ThreadPoolExecutor _tpe; + private boolean _keepalive; + private final AtomicInteger _waiter; private static final int BUF_SIZE = 8*1024; /** * @param shouldCompress if false, don't compress, just filter server headers + * @param waiter to notify when done; will set value to 1: not keepalive-able response, or 2: keepalive */ public CompressedRequestor(Socket webserver, I2PSocket browser, String headers, - I2PAppContext ctx, Log log, boolean shouldCompress) { + I2PAppContext ctx, Log log, boolean shouldCompress, boolean upgrade, + ThreadPoolExecutor tpe, boolean keepalive, AtomicInteger waiter) { _webserver = webserver; _browser = browser; _headers = headers; _ctx = ctx; _log = log; _shouldCompress = shouldCompress; + _upgrade = upgrade; + _tpe = tpe; + _keepalive = keepalive; + _waiter = waiter; } public void run() { OutputStream serverout = null; OutputStream browserout = null; + CompressedResponseOutputStream compressedout = null; InputStream browserin = null; InputStream serverin = null; Sender s = null; + Sender sender = null; IOException ioex = null; try { serverout = _webserver.getOutputStream(); - if (_log.shouldLog(Log.INFO)) - _log.info("request headers: " + _headers); serverout.write(DataHelper.getUTF8(_headers)); browserin = _browser.getInputStream(); - // Don't spin off a thread for this except for POSTs and PUTs - // TODO Upgrade: + // Don't spin off a thread for this except for POSTs and PUTs and Connection: Upgrade // beware interference with Shoutcast, etc.? - if ((!(_headers.startsWith("GET ") || _headers.startsWith("HEAD "))) || + boolean isHead = _headers.startsWith("HEAD "); + boolean isGet = _headers.startsWith("GET "); + boolean isPost = _headers.startsWith("POST "); + if (!(isGet || isHead) || + _upgrade || browserin.available() > 0) { // just in case - I2PAppThread sender = new I2PAppThread(new Sender(serverout, browserin, "server: browser to server", _log), - Thread.currentThread().getName() + "hcs"); - sender.start(); - } else { - // todo - half close? reduce MessageInputStream buffer size? + // Unless this is POST, set a huge + // timeout and rely on the server to do the actual timeout + _browser.setReadTimeout(isPost ? + SERVER_READ_TIMEOUT_MEDIUM : // medium + SERVER_READ_TIMEOUT_POST); // long + _keepalive = false; + sender = new Sender(serverout, browserin, "server: browser to server", _log); + // run in the unlimited client pool + _tpe.execute(sender); } + int timeout = (isGet || isHead) ? + SERVER_READ_TIMEOUT_GET : // short + SERVER_READ_TIMEOUT_POST; // long + _webserver.setSoTimeout(timeout); browserout = _browser.getOutputStream(); // NPE seen here in 0.7-7, caused by addition of socket.close() in the // catch (IOException ioe) block above in blockingHandle() ??? @@ -700,33 +803,48 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer { //Change headers to protect server identity StringBuilder command = new StringBuilder(128); Map> headers = readHeaders(null, serverin, command, - SERVER_SKIPHEADERS, _ctx); + SERVER_SKIPHEADERS, _ctx, timeout); String modifiedHeaders = formatHeaders(headers, command); + // after the headers, set a short timeout + _webserver.setSoTimeout(SERVER_READ_TIMEOUT_GET); if (_shouldCompress) { - CompressedResponseOutputStream compressedOut = new CompressedResponseOutputStream(browserout); - compressedOut.write(DataHelper.getUTF8(modifiedHeaders)); - s = new Sender(compressedOut, serverin, "server: server to browser compressor", _log); + compressedout = new CompressedResponseOutputStream(browserout, _keepalive); + compressedout.write(DataHelper.getUTF8(modifiedHeaders)); + s = new Sender(compressedout, serverin, "server: server to browser compressor", _log); + browserout = compressedout; } else { browserout.write(DataHelper.getUTF8(modifiedHeaders)); s = new Sender(browserout, serverin, "server: server to browser uncompressed", _log); } + if (_log.shouldInfo()) + _log.info("Running server-to-browser: compressed? " + _shouldCompress + " keepalive? " + _keepalive); s.run(); // same thread } catch (SSLException she) { _log.error("SSL error", she); try { - if (browserout == null) - browserout = _browser.getOutputStream(); - browserout.write(ERR_UNAVAILABLE.getBytes("UTF-8")); + if (_browser.getLocalPort() == 443) { + _browser.reset(); + } else { + if (browserout == null) + browserout = _browser.getOutputStream(); + browserout.write(ERR_UNAVAILABLE.getBytes("UTF-8")); + } } catch (IOException ioe) {} + _keepalive = false; } catch (IOException ioe) { if (_log.shouldLog(Log.WARN)) _log.warn("error compressing", ioe); - ioex = ioe; + ioex = ioe; + _keepalive = false; } finally { - if (ioex == null && s != null) + if (ioex == null && s != null) { ioex = s.getFailure(); + if (ioex == null && sender != null) + ioex = sender.getFailure(); + } if (ioex != null) { + _keepalive = false; // Reset propagation, simplified from I2PTunnelRunner boolean i2pReset = false; if (ioex instanceof I2PSocketException) { @@ -739,12 +857,6 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer { try { _webserver.setSoLinger(true, 0); } catch (IOException ioe) {} - try { - _webserver.close(); - } catch (IOException ioe) {} - try { - _browser.close(); - } catch (IOException ioe) {} } } if (!i2pReset && ioex instanceof SocketException) { @@ -756,16 +868,34 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer { try { _browser.reset(); } catch (IOException ioe) {} - try { - _webserver.close(); - } catch (IOException ioe) {} } } } - if (browserout != null) try { browserout.close(); } catch (IOException ioe) {} + if (_waiter != null) { + synchronized(_waiter) { + _waiter.set(_keepalive ? 2 : 1); + _waiter.notify(); + } + } + if (browserout != null) { + try { + if (_keepalive) { + if (compressedout != null) + compressedout.finish(); + else + browserout.flush(); + } else { + browserout.close(); + } + } catch (IOException ioe) {} + } if (serverout != null) try { serverout.close(); } catch (IOException ioe) {} - if (browserin != null) try { browserin.close(); } catch (IOException ioe) {} + if (!_keepalive && browserin != null) try { browserin.close(); } catch (IOException ioe) {} if (serverin != null) try { serverin.close(); } catch (IOException ioe) {} + try { _webserver.close(); } catch (IOException ioe) {} + if (!_keepalive) try { _browser.close(); } catch (IOException ioe) {} + if (_log.shouldInfo()) + _log.info("Finished server-to-browser: compressed? " + _shouldCompress + " keepalive? " + _keepalive); } } } @@ -778,6 +908,9 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer { private final Log _log; private IOException _failure; + /** + * Caller MUST close streams + */ public Sender(OutputStream out, InputStream in, String name, Log log) { _out = out; _in = in; @@ -792,16 +925,12 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer { DataHelper.copy(_in, _out); if (_log.shouldDebug()) _log.debug(_name + ": Done sending"); - //_out.flush(); } catch (IOException ioe) { if (_log.shouldLog(Log.DEBUG)) _log.debug(_name + " Error sending", ioe); synchronized(this) { _failure = ioe; } - } finally { - if (_out != null) try { _out.close(); } catch (IOException ioe) {} - if (_in != null) try { _in.close(); } catch (IOException ioe) {} } } @@ -821,28 +950,32 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer { private static class CompressedResponseOutputStream extends HTTPResponseOutputStream { private InternalGZIPOutputStream _gzipOut; - public CompressedResponseOutputStream(OutputStream o) { - super(o); - _dataExpected = -1; + public CompressedResponseOutputStream(OutputStream o, boolean keepalive) { + super(o, false, keepalive, false, null); + } + + /** + * Finish gzipping but don't close the output stream, + * if keepalive is true. + * + * @since 0.9.62 + */ + public void finish() throws IOException { + if (getKeepAliveOut()) { + if (_gzipOut != null) + _gzipOut.finish(); + else + flush(); + } else { + close(); + } } - /** - * Overridden to peek at response code. Always returns line. - */ - @Override - protected String filterResponseLine(String line) { - String[] s = DataHelper.split(line, " ", 3); - if (s.length > 1 && - (s[1].startsWith("3") || s[1].startsWith("5"))) - _dataExpected = 0; - return line; - } - /** * Don't compress small responses or images. * Don't compress things that are already compressed. - * Compression is inline but decompression on the client side - * creates a new thread. + * Compression is inline, and decompression on the client side is now also, + * but it's still CPU. */ @Override protected boolean shouldCompress() { @@ -995,15 +1128,15 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer { * @since public since 0.9.57 for SOCKS */ public static Map> readHeaders(I2PSocket socket, InputStream in, StringBuilder command, - String[] skipHeaders, I2PAppContext ctx) throws IOException { + String[] skipHeaders, I2PAppContext ctx, long initialTimeout) throws IOException { HashMap> headers = new HashMap>(); StringBuilder buf = new StringBuilder(128); // slowloris / darkloris - long expire = ctx.clock().now() + TOTAL_HEADER_TIMEOUT; + long expire = ctx.clock().now() + initialTimeout + HEADER_FINISH_TIMEOUT; if (socket != null) { try { - readLine(socket, command, HEADER_TIMEOUT); + readLine(socket, command, initialTimeout); } catch (LineTooLongException ltle) { // convert for first line throw new RequestTooLongException("Request too long - max " + MAX_LINE_LENGTH); diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKS5Server.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKS5Server.java index 4836baee4f..7a35e77db5 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKS5Server.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKS5Server.java @@ -670,7 +670,7 @@ class SOCKS5Server extends SOCKSServer { pout.flush(); // eat the response and headers buf.setLength(0); - I2PTunnelHTTPServer.readHeaders(destSock, null, buf, _skipHeaders, _context); + I2PTunnelHTTPServer.readHeaders(destSock, null, buf, _skipHeaders, _context, 15*1000); String[] f = DataHelper.split(buf.toString(), " ", 2); if (f.length < 2) throw new IOException("Bad response from proxy"); diff --git a/apps/i2ptunnel/java/test/junit/net/i2p/i2ptunnel/I2PTunnelHTTPServerTest.java b/apps/i2ptunnel/java/test/junit/net/i2p/i2ptunnel/I2PTunnelHTTPServerTest.java index 7132a7da2f..b8340b140c 100644 --- a/apps/i2ptunnel/java/test/junit/net/i2p/i2ptunnel/I2PTunnelHTTPServerTest.java +++ b/apps/i2ptunnel/java/test/junit/net/i2p/i2ptunnel/I2PTunnelHTTPServerTest.java @@ -28,7 +28,7 @@ public class I2PTunnelHTTPServerTest extends TestCase { headerString += "BLAH: something\r\n"; headerString += "\r\n"; InputStream in = fillInputStream(headerString); - Map> headers = I2PTunnelHTTPServer.readHeaders(null, in, new StringBuilder(128), new String[0], I2PAppContext.getGlobalContext()); + Map> headers = I2PTunnelHTTPServer.readHeaders(null, in, new StringBuilder(128), new String[0], I2PAppContext.getGlobalContext(), 15*1000); assertEquals(headers.size(), 1); //One header } @@ -38,7 +38,7 @@ public class I2PTunnelHTTPServerTest extends TestCase { headerString += "someHeader: oh my, duplication!\r\n"; headerString += "\r\n"; InputStream in = fillInputStream(headerString); - Map> headers = I2PTunnelHTTPServer.readHeaders(null, in, new StringBuilder(128), new String[0], I2PAppContext.getGlobalContext()); + Map> headers = I2PTunnelHTTPServer.readHeaders(null, in, new StringBuilder(128), new String[0], I2PAppContext.getGlobalContext(), 15*1000); assertEquals(headers.size(), 1); assertEquals(headers.get("someHeader").size(), 2); } @@ -51,7 +51,7 @@ public class I2PTunnelHTTPServerTest extends TestCase { headerString += "\r\n"; InputStream in = fillInputStream(headerString); StringBuilder builder = new StringBuilder(128); - Map> headers = I2PTunnelHTTPServer.readHeaders(null, in, builder, new String[0], I2PAppContext.getGlobalContext()); + Map> headers = I2PTunnelHTTPServer.readHeaders(null, in, builder, new String[0], I2PAppContext.getGlobalContext(), 15*1000); String result = I2PTunnelHTTPServer.formatHeaders(headers, builder); int first = result.indexOf("abc"); assertTrue(first >= 0);