Merge branch 'i2ptunnel-keepalive-server' into 'master'

Implement server-side HTTP persistence (keepalive) for the i2p socket

See merge request i2p-hackers/i2p.i2p!181
This commit is contained in:
zzz
2024-02-14 10:48:20 +00:00
3 changed files with 231 additions and 98 deletions

View File

@ -19,6 +19,8 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.GZIPOutputStream;
import javax.net.ssl.SSLException;
@ -55,11 +57,13 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer {
public static final String OPT_REJECT_REFERER = "rejectReferer";
public static final String OPT_REJECT_USER_AGENTS = "rejectUserAgents";
public static final String OPT_USER_AGENTS = "userAgentRejectList";
public static final String OPT_KEEPALIVE = "keepalive.i2p";
public static final int DEFAULT_POST_WINDOW = 5*60;
public static final int DEFAULT_POST_BAN_TIME = 20*60;
public static final int DEFAULT_POST_TOTAL_BAN_TIME = 10*60;
public static final int DEFAULT_POST_MAX = 6;
public static final int DEFAULT_POST_TOTAL_MAX = 20;
private static final boolean DEFAULT_KEEPALIVE = true;
/** what Host: should we seem to be to the webserver? */
private String _spoofHost;
@ -83,8 +87,8 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer {
PROXY_HEADER, PROXY_CONN_HEADER};
/** timeout for first request line */
private static final long HEADER_TIMEOUT = 15*1000;
/** total timeout for the request and all the headers */
private static final long TOTAL_HEADER_TIMEOUT = 2 * HEADER_TIMEOUT;
/** timeout for the rest of the request headers */
private static final long HEADER_FINISH_TIMEOUT = HEADER_TIMEOUT;
private static final long START_INTERVAL = (60 * 1000) * 3;
private static final int MAX_LINE_LENGTH = 8*1024;
/** ridiculously long, just to prevent OOM DOS @since 0.7.13 */
@ -98,7 +102,8 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer {
// Set a relatively short timeout for GET/HEAD,
// and a long failsafe timeout for POST/CONNECT, since the user
// could be POSTing a massive file
private static final int SERVER_READ_TIMEOUT_GET = 5*60*1000;
private static final int SERVER_READ_TIMEOUT_GET = 60*1000;
private static final int SERVER_READ_TIMEOUT_MEDIUM = 5*60*1000;
private static final int SERVER_READ_TIMEOUT_POST = 4*60*60*1000;
private long _startedOn = 0L;
@ -299,6 +304,9 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer {
} catch (IOException ioe) {}
return;
}
// We don't know if this is GET or POST or what, set a huge
// timeout and rely on the server to do the actual timeout
socket.setReadTimeout(SERVER_READ_TIMEOUT_POST);
Socket s = getSocket(socket.getPeerDestination().calculateHash(), 443);
Runnable t = new I2PTunnelRunner(s, socket, slock, null, null,
null, (I2PTunnelRunner.FailCallback) null);
@ -307,6 +315,17 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer {
}
long afterAccept = getTunnel().getContext().clock().now();
int requestCount = 0;
boolean keepalive = getBooleanOption(OPT_KEEPALIVE, DEFAULT_KEEPALIVE);
// indent
do {
// indent
if (requestCount > 0) {
if (_log.shouldInfo())
_log.info("Keepalive, awaiting request #" + requestCount);
}
// The headers _should_ be in the first packet, but
// may not be, depending on the client-side options
@ -316,27 +335,35 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer {
try {
// catch specific exceptions thrown, to return a good
// error to the client
// Add 10s to client-side timeout so the client will timeout first and minimize races
long timeout = requestCount > 0 ? I2PTunnelHTTPClient.BROWSER_KEEPALIVE_TIMEOUT + 10*1000 : HEADER_TIMEOUT;
headers = readHeaders(socket, null, command,
CLIENT_SKIPHEADERS, getTunnel().getContext());
CLIENT_SKIPHEADERS, getTunnel().getContext(), timeout);
} catch (SocketTimeoutException ste) {
try {
sendError(socket, ERR_REQUEST_TIMEOUT);
} catch (IOException ioe) {
} finally {
try { socket.close(); } catch (IOException ioe) {}
if (requestCount > 0) {
if (_log.shouldDebug())
_log.debug("Timeout awaiting request #" + requestCount);
} else {
try {
sendError(socket, ERR_REQUEST_TIMEOUT);
} catch (IOException ioe) {}
if (_log.shouldLog(Log.WARN))
_log.warn("Error in the HTTP request from " + peerB32, ste);
}
if (_log.shouldLog(Log.WARN))
_log.warn("Error in the HTTP request from " + peerB32, ste);
try { socket.close(); } catch (IOException ioe) {}
return;
} catch (EOFException eofe) {
try {
sendError(socket, ERR_BAD_REQUEST);
} catch (IOException ioe) {
} finally {
try { socket.close(); } catch (IOException ioe) {}
if (requestCount > 0) {
if (_log.shouldDebug())
_log.debug("Client closed awaiting request #" + requestCount);
} else {
try {
sendError(socket, ERR_BAD_REQUEST);
} catch (IOException ioe) {}
if (_log.shouldLog(Log.WARN))
_log.warn("Error in the HTTP request from " + peerB32, eofe);
}
if (_log.shouldLog(Log.WARN))
_log.warn("Error in the HTTP request from " + peerB32, eofe);
try { socket.close(); } catch (IOException ioe) {}
return;
} catch (LineTooLongException ltle) {
try {
@ -518,9 +545,35 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer {
setEntry(headers, "Host", spoofHost);
// Force Connection: close, unless websocket
boolean upgrade = false;
String conn = getEntryOrNull(headers, "Connection");
if (conn == null || !conn.toLowerCase(Locale.US).contains("upgrade"))
if (conn == null) {
setEntry(headers, "Connection", "close");
} else {
String connlc = conn.toLowerCase(Locale.US);
if (connlc.contains("upgrade")) {
upgrade = true;
keepalive = false;
} else {
if (!connlc.contains("keep-alive"))
keepalive = false;
setEntry(headers, "Connection", "close");
}
}
// HTTP Persistent Connections (RFC 2616)
// for the I2P socket.
// Keep it very simple.
// Will be set to false for non-GET/HEAD, non-HTTP/1.1,
// Connection: close, InternalSocket,
// or after analysis of the response headers in CompressedOutputStream,
// or on errors in I2PTunnelRunner.
// We do NOT support keepalive on the server socket.
String cmd = command.toString().trim();
if (!cmd.endsWith(" HTTP/1.1") ||
!(cmd.startsWith("GET ") || cmd.startsWith("HEAD "))) {
keepalive = false;
}
// we keep the enc sent by the browser before clobbering it, since it may have
// been x-i2p-gzip
@ -554,33 +607,62 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer {
String modifiedHeader = formatHeaders(headers, command);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Modified header: [" + modifiedHeader + "]");
_log.debug("Modified headers: [\n" + modifiedHeader + "]");
// Set a relatively short timeout for GET/HEAD,
// and a long failsafe timeout for POST/CONNECT, since the user
// could be POSTing a massive file
if (modifiedHeader.startsWith("GET ") || modifiedHeader.startsWith("HEAD "))
s.setSoTimeout(SERVER_READ_TIMEOUT_GET);
else
s.setSoTimeout(SERVER_READ_TIMEOUT_POST);
boolean compress = allowGZIP && useGZIP;
Runnable t = new CompressedRequestor(s, socket, modifiedHeader, getTunnel().getContext(), _log, compress);
// waiter is notified when the thread is done
AtomicInteger waiter = keepalive ? new AtomicInteger() : null;
Runnable t = new CompressedRequestor(s, socket, modifiedHeader, getTunnel().getContext(),
_log, compress, upgrade, _clientExecutor, keepalive, waiter);
// run in the unlimited client pool
//t.start();
_clientExecutor.execute(t);
long afterHandle = getTunnel().getContext().clock().now();
long timeToHandle = afterHandle - afterAccept;
getTunnel().getContext().statManager().addRateData("i2ptunnel.httpserver.blockingHandleTime", timeToHandle);
if ( (timeToHandle > 1000) && (_log.shouldLog(Log.WARN)) )
_log.warn("Took a while to handle the request for " + remoteHost + ':' + remotePort +
" from: " + peerB32 +
" [" + timeToHandle +
", read headers: " + (afterHeaders-afterAccept) +
", socket create: " + (afterSocket-afterHeaders) +
", start runners: " + (afterHandle-afterSocket) +
"]");
if (requestCount == 0) {
long timeToHandle = afterHandle - afterAccept;
getTunnel().getContext().statManager().addRateData("i2ptunnel.httpserver.blockingHandleTime", timeToHandle);
if ( (timeToHandle > 1000) && (_log.shouldLog(Log.WARN)) )
_log.warn("Took a while to handle the request for " + remoteHost + ':' + remotePort +
" from: " + peerB32 +
" [" + timeToHandle +
", read headers: " + (afterHeaders-afterAccept) +
", socket create: " + (afterSocket-afterHeaders) +
", start runners: " + (afterHandle-afterSocket) +
"]");
}
if (keepalive) {
// wait for the response to finish, then determine
// if we can receive another request on this socket
if (_log.shouldDebug())
_log.debug("Waiting for response " + requestCount + " to finish");
try {
synchronized(waiter) {
if (waiter.get() == 0)
waiter.wait(30*1000);
}
} catch (InterruptedException ie) {
if (_log.shouldWarn())
_log.warn("Interrupted waiting for response to finish");
break;
}
if (_log.shouldInfo()) {
long timeToWait = getTunnel().getContext().clock().now() - afterAccept;
_log.info("Waited " + timeToWait + " for response " + requestCount + " to complete, code: " + waiter);
}
// 0: not done; 1: not keepalive-able response; 2: keepalive
if (waiter.get() != 2)
break;
}
// go around again
requestCount++;
// indent
} while (keepalive);
// indent
} catch (SocketException ex) {
int port = socket.getLocalPort();
try {
@ -636,47 +718,68 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer {
// shadows _log in super()
private final Log _log;
private final boolean _shouldCompress;
private final boolean _upgrade;
private final ThreadPoolExecutor _tpe;
private boolean _keepalive;
private final AtomicInteger _waiter;
private static final int BUF_SIZE = 8*1024;
/**
* @param shouldCompress if false, don't compress, just filter server headers
* @param waiter to notify when done; will set value to 1: not keepalive-able response, or 2: keepalive
*/
public CompressedRequestor(Socket webserver, I2PSocket browser, String headers,
I2PAppContext ctx, Log log, boolean shouldCompress) {
I2PAppContext ctx, Log log, boolean shouldCompress, boolean upgrade,
ThreadPoolExecutor tpe, boolean keepalive, AtomicInteger waiter) {
_webserver = webserver;
_browser = browser;
_headers = headers;
_ctx = ctx;
_log = log;
_shouldCompress = shouldCompress;
_upgrade = upgrade;
_tpe = tpe;
_keepalive = keepalive;
_waiter = waiter;
}
public void run() {
OutputStream serverout = null;
OutputStream browserout = null;
CompressedResponseOutputStream compressedout = null;
InputStream browserin = null;
InputStream serverin = null;
Sender s = null;
Sender sender = null;
IOException ioex = null;
try {
serverout = _webserver.getOutputStream();
if (_log.shouldLog(Log.INFO))
_log.info("request headers: " + _headers);
serverout.write(DataHelper.getUTF8(_headers));
browserin = _browser.getInputStream();
// Don't spin off a thread for this except for POSTs and PUTs
// TODO Upgrade:
// Don't spin off a thread for this except for POSTs and PUTs and Connection: Upgrade
// beware interference with Shoutcast, etc.?
if ((!(_headers.startsWith("GET ") || _headers.startsWith("HEAD "))) ||
boolean isHead = _headers.startsWith("HEAD ");
boolean isGet = _headers.startsWith("GET ");
boolean isPost = _headers.startsWith("POST ");
if (!(isGet || isHead) ||
_upgrade ||
browserin.available() > 0) { // just in case
I2PAppThread sender = new I2PAppThread(new Sender(serverout, browserin, "server: browser to server", _log),
Thread.currentThread().getName() + "hcs");
sender.start();
} else {
// todo - half close? reduce MessageInputStream buffer size?
// Unless this is POST, set a huge
// timeout and rely on the server to do the actual timeout
_browser.setReadTimeout(isPost ?
SERVER_READ_TIMEOUT_MEDIUM : // medium
SERVER_READ_TIMEOUT_POST); // long
_keepalive = false;
sender = new Sender(serverout, browserin, "server: browser to server", _log);
// run in the unlimited client pool
_tpe.execute(sender);
}
int timeout = (isGet || isHead) ?
SERVER_READ_TIMEOUT_GET : // short
SERVER_READ_TIMEOUT_POST; // long
_webserver.setSoTimeout(timeout);
browserout = _browser.getOutputStream();
// NPE seen here in 0.7-7, caused by addition of socket.close() in the
// catch (IOException ioe) block above in blockingHandle() ???
@ -700,33 +803,48 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer {
//Change headers to protect server identity
StringBuilder command = new StringBuilder(128);
Map<String, List<String>> headers = readHeaders(null, serverin, command,
SERVER_SKIPHEADERS, _ctx);
SERVER_SKIPHEADERS, _ctx, timeout);
String modifiedHeaders = formatHeaders(headers, command);
// after the headers, set a short timeout
_webserver.setSoTimeout(SERVER_READ_TIMEOUT_GET);
if (_shouldCompress) {
CompressedResponseOutputStream compressedOut = new CompressedResponseOutputStream(browserout);
compressedOut.write(DataHelper.getUTF8(modifiedHeaders));
s = new Sender(compressedOut, serverin, "server: server to browser compressor", _log);
compressedout = new CompressedResponseOutputStream(browserout, _keepalive);
compressedout.write(DataHelper.getUTF8(modifiedHeaders));
s = new Sender(compressedout, serverin, "server: server to browser compressor", _log);
browserout = compressedout;
} else {
browserout.write(DataHelper.getUTF8(modifiedHeaders));
s = new Sender(browserout, serverin, "server: server to browser uncompressed", _log);
}
if (_log.shouldInfo())
_log.info("Running server-to-browser: compressed? " + _shouldCompress + " keepalive? " + _keepalive);
s.run(); // same thread
} catch (SSLException she) {
_log.error("SSL error", she);
try {
if (browserout == null)
browserout = _browser.getOutputStream();
browserout.write(ERR_UNAVAILABLE.getBytes("UTF-8"));
if (_browser.getLocalPort() == 443) {
_browser.reset();
} else {
if (browserout == null)
browserout = _browser.getOutputStream();
browserout.write(ERR_UNAVAILABLE.getBytes("UTF-8"));
}
} catch (IOException ioe) {}
_keepalive = false;
} catch (IOException ioe) {
if (_log.shouldLog(Log.WARN))
_log.warn("error compressing", ioe);
ioex = ioe;
ioex = ioe;
_keepalive = false;
} finally {
if (ioex == null && s != null)
if (ioex == null && s != null) {
ioex = s.getFailure();
if (ioex == null && sender != null)
ioex = sender.getFailure();
}
if (ioex != null) {
_keepalive = false;
// Reset propagation, simplified from I2PTunnelRunner
boolean i2pReset = false;
if (ioex instanceof I2PSocketException) {
@ -739,12 +857,6 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer {
try {
_webserver.setSoLinger(true, 0);
} catch (IOException ioe) {}
try {
_webserver.close();
} catch (IOException ioe) {}
try {
_browser.close();
} catch (IOException ioe) {}
}
}
if (!i2pReset && ioex instanceof SocketException) {
@ -756,16 +868,34 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer {
try {
_browser.reset();
} catch (IOException ioe) {}
try {
_webserver.close();
} catch (IOException ioe) {}
}
}
}
if (browserout != null) try { browserout.close(); } catch (IOException ioe) {}
if (_waiter != null) {
synchronized(_waiter) {
_waiter.set(_keepalive ? 2 : 1);
_waiter.notify();
}
}
if (browserout != null) {
try {
if (_keepalive) {
if (compressedout != null)
compressedout.finish();
else
browserout.flush();
} else {
browserout.close();
}
} catch (IOException ioe) {}
}
if (serverout != null) try { serverout.close(); } catch (IOException ioe) {}
if (browserin != null) try { browserin.close(); } catch (IOException ioe) {}
if (!_keepalive && browserin != null) try { browserin.close(); } catch (IOException ioe) {}
if (serverin != null) try { serverin.close(); } catch (IOException ioe) {}
try { _webserver.close(); } catch (IOException ioe) {}
if (!_keepalive) try { _browser.close(); } catch (IOException ioe) {}
if (_log.shouldInfo())
_log.info("Finished server-to-browser: compressed? " + _shouldCompress + " keepalive? " + _keepalive);
}
}
}
@ -778,6 +908,9 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer {
private final Log _log;
private IOException _failure;
/**
* Caller MUST close streams
*/
public Sender(OutputStream out, InputStream in, String name, Log log) {
_out = out;
_in = in;
@ -792,16 +925,12 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer {
DataHelper.copy(_in, _out);
if (_log.shouldDebug())
_log.debug(_name + ": Done sending");
//_out.flush();
} catch (IOException ioe) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(_name + " Error sending", ioe);
synchronized(this) {
_failure = ioe;
}
} finally {
if (_out != null) try { _out.close(); } catch (IOException ioe) {}
if (_in != null) try { _in.close(); } catch (IOException ioe) {}
}
}
@ -821,28 +950,32 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer {
private static class CompressedResponseOutputStream extends HTTPResponseOutputStream {
private InternalGZIPOutputStream _gzipOut;
public CompressedResponseOutputStream(OutputStream o) {
super(o);
_dataExpected = -1;
public CompressedResponseOutputStream(OutputStream o, boolean keepalive) {
super(o, false, keepalive, false, null);
}
/**
* Finish gzipping but don't close the output stream,
* if keepalive is true.
*
* @since 0.9.62
*/
public void finish() throws IOException {
if (getKeepAliveOut()) {
if (_gzipOut != null)
_gzipOut.finish();
else
flush();
} else {
close();
}
}
/**
* Overridden to peek at response code. Always returns line.
*/
@Override
protected String filterResponseLine(String line) {
String[] s = DataHelper.split(line, " ", 3);
if (s.length > 1 &&
(s[1].startsWith("3") || s[1].startsWith("5")))
_dataExpected = 0;
return line;
}
/**
* Don't compress small responses or images.
* Don't compress things that are already compressed.
* Compression is inline but decompression on the client side
* creates a new thread.
* Compression is inline, and decompression on the client side is now also,
* but it's still CPU.
*/
@Override
protected boolean shouldCompress() {
@ -995,15 +1128,15 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer {
* @since public since 0.9.57 for SOCKS
*/
public static Map<String, List<String>> readHeaders(I2PSocket socket, InputStream in, StringBuilder command,
String[] skipHeaders, I2PAppContext ctx) throws IOException {
String[] skipHeaders, I2PAppContext ctx, long initialTimeout) throws IOException {
HashMap<String, List<String>> headers = new HashMap<String, List<String>>();
StringBuilder buf = new StringBuilder(128);
// slowloris / darkloris
long expire = ctx.clock().now() + TOTAL_HEADER_TIMEOUT;
long expire = ctx.clock().now() + initialTimeout + HEADER_FINISH_TIMEOUT;
if (socket != null) {
try {
readLine(socket, command, HEADER_TIMEOUT);
readLine(socket, command, initialTimeout);
} catch (LineTooLongException ltle) {
// convert for first line
throw new RequestTooLongException("Request too long - max " + MAX_LINE_LENGTH);

View File

@ -670,7 +670,7 @@ class SOCKS5Server extends SOCKSServer {
pout.flush();
// eat the response and headers
buf.setLength(0);
I2PTunnelHTTPServer.readHeaders(destSock, null, buf, _skipHeaders, _context);
I2PTunnelHTTPServer.readHeaders(destSock, null, buf, _skipHeaders, _context, 15*1000);
String[] f = DataHelper.split(buf.toString(), " ", 2);
if (f.length < 2)
throw new IOException("Bad response from proxy");

View File

@ -28,7 +28,7 @@ public class I2PTunnelHTTPServerTest extends TestCase {
headerString += "BLAH: something\r\n";
headerString += "\r\n";
InputStream in = fillInputStream(headerString);
Map<String, List<String>> headers = I2PTunnelHTTPServer.readHeaders(null, in, new StringBuilder(128), new String[0], I2PAppContext.getGlobalContext());
Map<String, List<String>> headers = I2PTunnelHTTPServer.readHeaders(null, in, new StringBuilder(128), new String[0], I2PAppContext.getGlobalContext(), 15*1000);
assertEquals(headers.size(), 1); //One header
}
@ -38,7 +38,7 @@ public class I2PTunnelHTTPServerTest extends TestCase {
headerString += "someHeader: oh my, duplication!\r\n";
headerString += "\r\n";
InputStream in = fillInputStream(headerString);
Map<String, List<String>> headers = I2PTunnelHTTPServer.readHeaders(null, in, new StringBuilder(128), new String[0], I2PAppContext.getGlobalContext());
Map<String, List<String>> headers = I2PTunnelHTTPServer.readHeaders(null, in, new StringBuilder(128), new String[0], I2PAppContext.getGlobalContext(), 15*1000);
assertEquals(headers.size(), 1);
assertEquals(headers.get("someHeader").size(), 2);
}
@ -51,7 +51,7 @@ public class I2PTunnelHTTPServerTest extends TestCase {
headerString += "\r\n";
InputStream in = fillInputStream(headerString);
StringBuilder builder = new StringBuilder(128);
Map<String, List<String>> headers = I2PTunnelHTTPServer.readHeaders(null, in, builder, new String[0], I2PAppContext.getGlobalContext());
Map<String, List<String>> headers = I2PTunnelHTTPServer.readHeaders(null, in, builder, new String[0], I2PAppContext.getGlobalContext(), 15*1000);
String result = I2PTunnelHTTPServer.formatHeaders(headers, builder);
int first = result.indexOf("abc");
assertTrue(first >= 0);