Streaming read timeout fixes:

i2ptunnel:
- Better timeout handling when reading headers in HTTP server (improved fix for ticket #723)
  Enforce total header timeout with new readLine()
- Prep for returning specific HTTP errors to client on request timeout and header errors,
  instead of just closing socket... further work to be in i2p.i2p.zzz.test2 branch
Streaming:
- Fix read timeout on input stream - was waiting too long, often twice as long as timeout, or more
  Enforce total timeout even when notify()ed
- Fix read() returning 0 on read timeout instead of -1 (possible fix for ticket #335)
  This prevents passing partial headers to server on timeout
- Fix javadocs for read timeout to match current behavior
- Fix StandardSocket SoTimeout to account for differences with I2PSocket readTimeout
- log tweaks
This commit is contained in:
zzz
2015-04-04 17:00:57 +00:00
parent 351a1a8d27
commit 706ee243a5
8 changed files with 181 additions and 38 deletions

View File

@@ -4,6 +4,7 @@
package net.i2p.i2ptunnel; package net.i2p.i2ptunnel;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
import java.io.EOFException;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@@ -11,6 +12,7 @@ import java.io.OutputStream;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.Socket; import java.net.Socket;
import java.net.SocketException; import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@@ -63,9 +65,12 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer {
private static final String SERVER_HEADER = "Server"; private static final String SERVER_HEADER = "Server";
private static final String X_POWERED_BY_HEADER = "X-Powered-By"; private static final String X_POWERED_BY_HEADER = "X-Powered-By";
private static final String[] SERVER_SKIPHEADERS = {SERVER_HEADER, X_POWERED_BY_HEADER}; private static final String[] SERVER_SKIPHEADERS = {SERVER_HEADER, X_POWERED_BY_HEADER};
/** timeout for first request line */
private static final long HEADER_TIMEOUT = 15*1000; private static final long HEADER_TIMEOUT = 15*1000;
/** total timeout for the request and all the headers */
private static final long TOTAL_HEADER_TIMEOUT = 2 * HEADER_TIMEOUT; private static final long TOTAL_HEADER_TIMEOUT = 2 * HEADER_TIMEOUT;
private static final long START_INTERVAL = (60 * 1000) * 3; private static final long START_INTERVAL = (60 * 1000) * 3;
private static final int MAX_LINE_LENGTH = 8*1024;
private long _startedOn = 0L; private long _startedOn = 0L;
private ConnThrottler _postThrottler; private ConnThrottler _postThrottler;
@@ -206,12 +211,9 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer {
long afterAccept = getTunnel().getContext().clock().now(); long afterAccept = getTunnel().getContext().clock().now();
// The headers _should_ be in the first packet, but // The headers _should_ be in the first packet, but
// may not be, depending on the client-side options // may not be, depending on the client-side options
socket.setReadTimeout(HEADER_TIMEOUT);
InputStream in = socket.getInputStream();
StringBuilder command = new StringBuilder(128); StringBuilder command = new StringBuilder(128);
Map<String, List<String>> headers = readHeaders(in, command, Map<String, List<String>> headers = readHeaders(socket, null, command,
CLIENT_SKIPHEADERS, getTunnel().getContext()); CLIENT_SKIPHEADERS, getTunnel().getContext());
long afterHeaders = getTunnel().getContext().clock().now(); long afterHeaders = getTunnel().getContext().clock().now();
@@ -435,7 +437,7 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer {
//Change headers to protect server identity //Change headers to protect server identity
StringBuilder command = new StringBuilder(128); StringBuilder command = new StringBuilder(128);
Map<String, List<String>> headers = readHeaders(serverin, command, Map<String, List<String>> headers = readHeaders(null, serverin, command,
SERVER_SKIPHEADERS, _ctx); SERVER_SKIPHEADERS, _ctx);
String modifiedHeaders = formatHeaders(headers, command); String modifiedHeaders = formatHeaders(headers, command);
compressedOut.write(modifiedHeaders.getBytes()); compressedOut.write(modifiedHeaders.getBytes());
@@ -671,15 +673,33 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer {
} }
} }
protected static Map<String, List<String>> readHeaders(InputStream in, StringBuilder command, /**
* From I2P to server: socket non-null, in null.
* From server to I2P: socket null, in non-null.
*
* @param socket if null, use in as InputStream
* @param in if null, use socket.getInputStream() as InputStream
* @param command out parameter, first line
* @param command out parameter, first line
* @throws SocketTimeoutException if timeout is reached before newline
* @throws EOFException if EOF is reached before newline
* @throws LineTooLongException if too long
* @throws IOException on other errors in the underlying stream
*/
private static Map<String, List<String>> readHeaders(I2PSocket socket, InputStream in, StringBuilder command,
String[] skipHeaders, I2PAppContext ctx) throws IOException { String[] skipHeaders, I2PAppContext ctx) throws IOException {
HashMap<String, List<String>> headers = new HashMap<String, List<String>>(); HashMap<String, List<String>> headers = new HashMap<String, List<String>>();
StringBuilder buf = new StringBuilder(128); StringBuilder buf = new StringBuilder(128);
// slowloris / darkloris // slowloris / darkloris
long expire = ctx.clock().now() + TOTAL_HEADER_TIMEOUT; long expire = ctx.clock().now() + TOTAL_HEADER_TIMEOUT;
boolean ok = DataHelper.readLine(in, command); if (socket != null) {
if (!ok) throw new IOException("EOF reached while reading the HTTP command [" + command.toString() + "]"); readLine(socket, command, HEADER_TIMEOUT);
} else {
boolean ok = DataHelper.readLine(in, command);
if (!ok)
throw new IOException("EOF reached before the end of the headers [" + buf.toString() + "]");
}
//if (_log.shouldLog(Log.DEBUG)) //if (_log.shouldLog(Log.DEBUG))
// _log.debug("Read the http command [" + command.toString() + "]"); // _log.debug("Read the http command [" + command.toString() + "]");
@@ -703,14 +723,19 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer {
if (++i > MAX_HEADERS) if (++i > MAX_HEADERS)
throw new IOException("Too many header lines - max " + MAX_HEADERS); throw new IOException("Too many header lines - max " + MAX_HEADERS);
buf.setLength(0); buf.setLength(0);
ok = DataHelper.readLine(in, buf); if (socket != null) {
if (!ok) throw new IOException("EOF reached before the end of the headers [" + buf.toString() + "]"); readLine(socket, buf, expire - ctx.clock().now());
} else {
boolean ok = DataHelper.readLine(in, buf);
if (!ok)
throw new IOException("EOF reached before the end of the headers [" + buf.toString() + "]");
}
if ( (buf.length() == 0) || if ( (buf.length() == 0) ||
((buf.charAt(0) == '\n') || (buf.charAt(0) == '\r')) ) { ((buf.charAt(0) == '\n') || (buf.charAt(0) == '\r')) ) {
// end of headers reached // end of headers reached
return headers; return headers;
} else { } else {
if (ctx.clock().now() > expire) if (ctx.clock().now() >= expire)
throw new IOException("Headers took too long [" + buf.toString() + "]"); throw new IOException("Headers took too long [" + buf.toString() + "]");
int split = buf.indexOf(":"); int split = buf.indexOf(":");
if (split <= 0) throw new IOException("Invalid HTTP header, missing colon [" + buf.toString() + "]"); if (split <= 0) throw new IOException("Invalid HTTP header, missing colon [" + buf.toString() + "]");
@@ -752,5 +777,59 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer {
} }
} }
} }
/**
* Read a line teriminated by newline, with a total read timeout.
*
* Warning - strips \n but not \r
* Warning - 8KB line length limit as of 0.7.13, @throws IOException if exceeded
* Warning - not UTF-8
*
* @param buf output
* @param timeout throws SocketTimeoutException immediately if zero or negative
* @throws SocketTimeoutException if timeout is reached before newline
* @throws EOFException if EOF is reached before newline
* @throws LineTooLongException if too long
* @throws IOException on other errors in the underlying stream
* @since 0.9.19 modified from DataHelper
*/
private static void readLine(I2PSocket socket, StringBuilder buf, long timeout) throws IOException {
if (timeout <= 0)
throw new SocketTimeoutException();
long expires = System.currentTimeMillis() + timeout;
InputStream in = socket.getInputStream();
int c;
int i = 0;
socket.setReadTimeout(timeout);
while ( (c = in.read()) != -1) {
if (++i > MAX_LINE_LENGTH)
throw new LineTooLongException("Line too long - max " + MAX_LINE_LENGTH);
if (c == '\n')
break;
long newTimeout = expires - System.currentTimeMillis();
if (newTimeout <= 0)
throw new SocketTimeoutException();
buf.append((char)c);
if (newTimeout != timeout) {
timeout = newTimeout;
socket.setReadTimeout(timeout);
}
}
if (c == -1) {
if (System.currentTimeMillis() >= expires)
throw new SocketTimeoutException();
else
throw new EOFException();
}
}
/**
* @since 0.9.19
*/
private static class LineTooLongException extends IOException {
public LineTooLongException(String s) {
super(s);
}
}
} }

View File

@@ -37,22 +37,22 @@ public interface I2PSocketOptions {
/** /**
* What is the longest we'll block on the input stream while waiting * What is the longest we'll block on the input stream while waiting
* for more data. If this value is exceeded, the read() throws * for more data. If this value is exceeded, the read() throws
* InterruptedIOException * InterruptedIOException - FIXME doesn't really, returns -1 or 0 instead.
* *
* WARNING: Default -1 (unlimited), which is probably not what you want. * WARNING: Default -1 (unlimited), which is probably not what you want.
* *
* @return timeout in ms * @return timeout in ms, 0 for nonblocking, -1 for forever
*/ */
public long getReadTimeout(); public long getReadTimeout();
/** /**
* What is the longest we'll block on the input stream while waiting * What is the longest we'll block on the input stream while waiting
* for more data. If this value is exceeded, the read() throws * for more data. If this value is exceeded, the read() throws
* InterruptedIOException * InterruptedIOException - FIXME doesn't really, returns -1 or 0 instead.
* *
* WARNING: Default -1 (unlimited), which is probably not what you want. * WARNING: Default -1 (unlimited), which is probably not what you want.
* *
* @param ms timeout in ms * @param ms timeout in ms, 0 for nonblocking, -1 for forever
*/ */
public void setReadTimeout(long ms); public void setReadTimeout(long ms);

View File

@@ -51,6 +51,8 @@ class I2PSocketFull implements I2PSocket {
} }
Connection c = _connection; Connection c = _connection;
if (c == null) return; if (c == null) return;
if (log.shouldLog(Log.INFO))
log.info("close() called, connected? " + c.getIsConnected() + " : " + c);
if (c.getIsConnected()) { if (c.getIsConnected()) {
MessageInputStream in = c.getInputStream(); MessageInputStream in = c.getInputStream();
in.close(); in.close();
@@ -136,6 +138,8 @@ class I2PSocketFull implements I2PSocket {
Connection c = _connection; Connection c = _connection;
if (c == null) return; if (c == null) return;
if (ms > Integer.MAX_VALUE)
ms = Integer.MAX_VALUE;
c.getInputStream().setReadTimeout((int)ms); c.getInputStream().setReadTimeout((int)ms);
c.getOptions().setReadTimeout(ms); c.getOptions().setReadTimeout(ms);
} }

View File

@@ -144,9 +144,11 @@ class I2PSocketOptionsImpl implements I2PSocketOptions {
/** /**
* What is the longest we'll block on the input stream while waiting * What is the longest we'll block on the input stream while waiting
* for more data. If this value is exceeded, the read() throws * for more data. If this value is exceeded, the read() throws
* InterruptedIOException * InterruptedIOException - FIXME doesn't really, returns -1 or 0 instead.
* *
* WARNING: Default -1 (unlimited), which is probably not what you want. * WARNING: Default -1 (unlimited), which is probably not what you want.
*
* @return timeout in ms, 0 for nonblocking, -1 for forever
*/ */
public long getReadTimeout() { public long getReadTimeout() {
return _readTimeout; return _readTimeout;
@@ -155,9 +157,11 @@ class I2PSocketOptionsImpl implements I2PSocketOptions {
/** /**
* What is the longest we'll block on the input stream while waiting * What is the longest we'll block on the input stream while waiting
* for more data. If this value is exceeded, the read() throws * for more data. If this value is exceeded, the read() throws
* InterruptedIOException * InterruptedIOException - FIXME doesn't really, returns -1 or 0 instead.
* *
* WARNING: Default -1 (unlimited), which is probably not what you want. * WARNING: Default -1 (unlimited), which is probably not what you want.
*
* @param ms timeout in ms, 0 for nonblocking, -1 for forever
*/ */
public void setReadTimeout(long ms) { public void setReadTimeout(long ms) {
_readTimeout = ms; _readTimeout = ms;

View File

@@ -170,10 +170,15 @@ class MessageInputStream extends InputStream {
/** /**
* how long a read() call should block (if less than 0, block indefinitely, * how long a read() call should block (if less than 0, block indefinitely,
* but if it is 0, do not block at all) * but if it is 0, do not block at all)
* @return how long read calls should block, 0 or less indefinitely block * @return how long read calls should block, 0 for nonblocking, negative to indefinitely block
*/ */
public int getReadTimeout() { return _readTimeout; } public int getReadTimeout() { return _readTimeout; }
/**
* how long a read() call should block (if less than 0, block indefinitely,
* but if it is 0, do not block at all)
* @param timeout how long read calls should block, 0 for nonblocking, negative to indefinitely block
*/
public void setReadTimeout(int timeout) { public void setReadTimeout(int timeout) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Changing read timeout from " + _readTimeout + " to " + timeout); _log.debug("Changing read timeout from " + _readTimeout + " to " + timeout);
@@ -230,7 +235,8 @@ class MessageInputStream extends InputStream {
*/ */
public boolean messageReceived(long messageId, ByteArray payload) { public boolean messageReceived(long messageId, ByteArray payload) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("received " + messageId + " with " + (payload != null ? payload.getValid()+"" : "no payload")); _log.debug("received msg ID " + messageId + " with " +
(payload != null ? payload.getValid() + " bytes" : "no payload"));
synchronized (_dataLock) { synchronized (_dataLock) {
if (messageId <= _highestReadyBlockId) { if (messageId <= _highestReadyBlockId) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
@@ -261,6 +267,10 @@ class MessageInputStream extends InputStream {
cur++; cur++;
_highestReadyBlockId++; _highestReadyBlockId++;
} }
// FIXME Javadocs for setReadTimeout() say we will throw
// an InterruptedIOException.
// Java throws a SocketTimeoutException.
// We do neither.
} else { } else {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Message is out of order: " + messageId); _log.info("Message is out of order: " + messageId);
@@ -275,23 +285,41 @@ class MessageInputStream extends InputStream {
return true; return true;
} }
/**
* On a read timeout, this returns -1
* (doesn't throw SocketTimeoutException like Socket)
* (doesn't throw InterruptedIOException like our javadocs say)
*/
public int read() throws IOException { public int read() throws IOException {
int read = read(_oneByte, 0, 1); int read = read(_oneByte, 0, 1);
if (read < 0) if (read <= 0)
return -1; return -1;
return _oneByte[0] & 0xff; return _oneByte[0] & 0xff;
} }
/**
* On a read timeout, this returns 0
* (doesn't throw SocketTimeoutException like Socket)
* (doesn't throw InterruptedIOException like our javadocs say)
*/
@Override @Override
public int read(byte target[]) throws IOException { public int read(byte target[]) throws IOException {
return read(target, 0, target.length); return read(target, 0, target.length);
} }
/**
* On a read timeout, this returns 0
* (doesn't throw SocketTimeoutException like Socket)
* (doesn't throw InterruptedIOException like our javadocs say)
*/
@Override @Override
public int read(byte target[], int offset, int length) throws IOException { public int read(byte target[], int offset, int length) throws IOException {
long expiration = -1; int readTimeout = _readTimeout;
if (_readTimeout > 0) long expiration;
expiration = _readTimeout + System.currentTimeMillis(); if (readTimeout > 0)
expiration = readTimeout + System.currentTimeMillis();
else
expiration = -1;
synchronized (_dataLock) { synchronized (_dataLock) {
if (_locallyClosed) throw new IOException("Already locally closed"); if (_locallyClosed) throw new IOException("Already locally closed");
throwAnyError(); throwAnyError();
@@ -310,10 +338,10 @@ class MessageInputStream extends InputStream {
+ "] got EOF after " + _readTotal + " " + toString()); + "] got EOF after " + _readTotal + " " + toString());
return -1; return -1;
} else { } else {
if (_readTimeout < 0) { if (readTimeout < 0) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("read(...," + offset+", " + length+ ")[" + i _log.debug("read(...," + offset+", " + length+ ")[" + i
+ ") with no timeout: " + toString()); + "] with no timeout: " + toString());
try { try {
_dataLock.wait(); _dataLock.wait();
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
@@ -323,14 +351,14 @@ class MessageInputStream extends InputStream {
} }
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("read(...," + offset+", " + length+ ")[" + i _log.debug("read(...," + offset+", " + length+ ")[" + i
+ ") with no timeout complete: " + toString()); + "] with no timeout complete: " + toString());
throwAnyError(); throwAnyError();
} else if (_readTimeout > 0) { } else if (readTimeout > 0) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("read(...," + offset+", " + length+ ")[" + i _log.debug("read(...," + offset+", " + length+ ")[" + i
+ ") with timeout: " + _readTimeout + ": " + toString()); + "] with timeout: " + readTimeout + ": " + toString());
try { try {
_dataLock.wait(_readTimeout); _dataLock.wait(readTimeout);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
IOException ioe2 = new InterruptedIOException("Interrupted read"); IOException ioe2 = new InterruptedIOException("Interrupted read");
ioe2.initCause(ie); ioe2.initCause(ie);
@@ -338,21 +366,30 @@ class MessageInputStream extends InputStream {
} }
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("read(...," + offset+", " + length+ ")[" + i _log.debug("read(...," + offset+", " + length+ ")[" + i
+ ") with timeout complete: " + _readTimeout + ": " + toString()); + "] with timeout complete: " + readTimeout + ": " + toString());
throwAnyError(); throwAnyError();
} else { // readTimeout == 0 } else { // readTimeout == 0
// noop, don't block // noop, don't block
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("read(...," + offset+", " + length+ ")[" + i _log.debug("read(...," + offset+", " + length+ ")[" + i
+ ") with nonblocking setup: " + toString()); + "] with nonblocking setup: " + toString());
return i; return i;
} }
if (_readyDataBlocks.isEmpty()) { if (_readyDataBlocks.isEmpty()) {
if ( (_readTimeout > 0) && (expiration < System.currentTimeMillis()) ) { if (readTimeout > 0) {
if (_log.shouldLog(Log.INFO)) long remaining = expiration - System.currentTimeMillis();
_log.info("read(...," + offset+", " + length+ ")[" + i if (remaining <= 0) {
+ ") expired: " + toString()); // FIXME Javadocs for setReadTimeout() say we will throw
return i; // an InterruptedIOException.
// Java throws a SocketTimeoutException.
// We do neither.
if (_log.shouldLog(Log.INFO))
_log.info("read(...," + offset+", " + length+ ")[" + i
+ "] expired: " + toString());
return i;
} else {
readTimeout = (int) remaining;
}
} }
} }
} }

View File

@@ -193,7 +193,15 @@ class StandardSocket extends Socket {
I2PSocketOptions opts = _socket.getOptions(); I2PSocketOptions opts = _socket.getOptions();
if (opts == null) if (opts == null)
return 0; return 0;
return (int) opts.getReadTimeout(); long rv = opts.getReadTimeout();
// Java Socket: 0 is forever, and we don't exactly have nonblocking
if (rv > Integer.MAX_VALUE)
rv = Integer.MAX_VALUE;
else if (rv < 0)
rv = 0;
else if (rv == 0)
rv = 1;
return (int) rv;
} }
/** /**
@@ -309,6 +317,9 @@ class StandardSocket extends Socket {
I2PSocketOptions opts = _socket.getOptions(); I2PSocketOptions opts = _socket.getOptions();
if (opts == null) if (opts == null)
throw new SocketException("No options"); throw new SocketException("No options");
// Java Socket: 0 is forever
if (timeout == 0)
timeout = -1;
opts.setReadTimeout(timeout); opts.setReadTimeout(timeout);
} }

View File

@@ -1,3 +1,11 @@
2015-04-04 zzz
* i2ptunnel:
- Better timeout handling when reading headers in HTTP server (ticket #723)
- Fix NoSuchElementException processing proxyList caused by 03-31 checkin
* Streaming:
- Fix read timeout on input stream (ticket #723)
- Fix read() returning 0 instead of -1 on read timeout (ticket #335)
2015-04-03 zzz 2015-04-03 zzz
* i2ptunnel: Fix stopping tunnel on bad args when starting * i2ptunnel: Fix stopping tunnel on bad args when starting
* wrapper.config: Remove old mortbay Jetty parameters * wrapper.config: Remove old mortbay Jetty parameters

View File

@@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */ /** deprecated */
public final static String ID = "Monotone"; public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION; public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 16; public final static long BUILD = 17;
/** for example "-test" */ /** for example "-test" */
public final static String EXTRA = "-rc"; public final static String EXTRA = "-rc";