propagate from branch 'i2p.i2p' (head 07028378508ab46278d193039b97c543d12ee22e)

to branch 'i2p.i2p.zzz.test2' (head 0074b91cb9fe0ed875457dc0bf1989df03fa9e9a)
This commit is contained in:
zzz
2015-05-30 11:16:00 +00:00
101 changed files with 28058 additions and 25253 deletions

View File

@@ -46,6 +46,7 @@ class HTTPResponseOutputStream extends FilterOutputStream {
protected boolean _gzip;
protected long _dataExpected;
protected String _contentType;
private PipedInputStream _pipedInputStream;
private static final int CACHE_SIZE = 8*1024;
private static final ByteCache _cache = ByteCache.getInstance(8, CACHE_SIZE);
@@ -155,6 +156,8 @@ class HTTPResponseOutputStream extends FilterOutputStream {
responseLine = new String(_headerBuffer.getData(), 0, i+1); // includes NL
responseLine = filterResponseLine(responseLine);
responseLine = (responseLine.trim() + "\r\n");
if (_log.shouldLog(Log.INFO))
_log.info("Response: " + responseLine.trim());
out.write(responseLine.getBytes());
} else {
for (int j = lastEnd+1; j < i; j++) {
@@ -246,7 +249,30 @@ class HTTPResponseOutputStream extends FilterOutputStream {
@Override
public void close() throws IOException {
out.close();
if (_log.shouldLog(Log.INFO))
_log.info("Closing " + out + " threaded?? " + shouldCompress(), new Exception("I did it"));
PipedInputStream pi;
synchronized(this) {
// synch with changing out field below
super.close();
pi = _pipedInputStream;
}
// Prevent truncation of gunzipped data as
// I2PTunnelHTTPClientRunner.close() closes the Socket after this.
// Closing pipe only notifies read end, doesn't wait.
// TODO switch to Java 6 InflaterOutputStream and get rid of Pusher thread
if (pi != null) {
for (int i = 0; i < 50; i++) {
if (pi.available() <= 0) {
if (i > 0 && _log.shouldWarn())
_log.warn("Waited " + (i*20) + " for read side to close");
break;
}
try {
Thread.sleep(20);
} catch (InterruptedException ie) {}
}
}
}
protected void beginProcessing() throws IOException {
@@ -254,7 +280,12 @@ class HTTPResponseOutputStream extends FilterOutputStream {
PipedInputStream pi = BigPipedInputStream.getInstance();
PipedOutputStream po = new PipedOutputStream(pi);
Runnable r = new Pusher(pi, out);
out = po;
if (_log.shouldLog(Log.INFO))
_log.info("Starting threaded decompressing pusher to " + out);
synchronized(this) {
out = po;
_pipedInputStream = pi;
}
// TODO we should be able to do this inline somehow
TunnelControllerGroup tcg = TunnelControllerGroup.getInstance();
if (tcg != null) {
@@ -286,6 +317,8 @@ class HTTPResponseOutputStream extends FilterOutputStream {
}
public void run() {
if (_log.shouldLog(Log.INFO))
_log.info("Starting pusher from " + _inRaw + " to: " + _out);
ReusableGZIPInputStream _in = ReusableGZIPInputStream.acquire();
long written = 0;
ByteArray ba = null;
@@ -302,11 +335,11 @@ class HTTPResponseOutputStream extends FilterOutputStream {
_out.flush();
written += read;
}
if (_log.shouldLog(Log.INFO))
_log.info("Decompressed: " + written + ", " + _in.getTotalRead() + "/" + _in.getTotalExpanded());
} catch (IOException ioe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Error decompressing: " + written + ", " + _in.getTotalRead() + "/" + _in.getTotalExpanded(), ioe);
_log.warn("Error decompressing: " + written + ", " + _in.getTotalRead() +
"/" + _in.getTotalExpanded() +
" from " + _inRaw + " to: " + _out, ioe);
} catch (OutOfMemoryError oom) {
_log.error("OOM in HTTP Decompressor", oom);
} finally {
@@ -314,7 +347,8 @@ class HTTPResponseOutputStream extends FilterOutputStream {
_log.info("After decompression, written=" + written +
" read=" + _in.getTotalRead()
+ ", expanded=" + _in.getTotalExpanded() + ", remaining=" + _in.getRemaining()
+ ", finished=" + _in.getFinished());
+ ", finished=" + _in.getFinished() +
" from " + _inRaw + " to: " + _out);
if (ba != null)
_cache.release(ba);
if (_out != null) try {

View File

@@ -77,6 +77,16 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
private volatile ThreadPoolExecutor _executor;
/** this is ONLY for shared clients */
private static I2PSocketManager socketManager;
/**
* Only destroy and replace a static shared client socket manager if it's been connected before
* @since 0.9.20
*/
private enum SocketManagerState { INIT, CONNECTED }
private static SocketManagerState _socketManagerState = SocketManagerState.INIT;
public static final String PROP_USE_SSL = I2PTunnelServer.PROP_USE_SSL;
/**
@@ -239,10 +249,6 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
connectManager();
}
/** this is ONLY for shared clients */
private static I2PSocketManager socketManager;
/**
* This is ONLY for shared clients.
* As of 0.9.20 this is fast, and does NOT connect the manager to the router.
@@ -283,12 +289,13 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
Log _log = tunnel.getContext().logManager().getLog(I2PTunnelClientBase.class);
if (socketManager != null && !socketManager.isDestroyed()) {
I2PSession s = socketManager.getSession();
if (s.isClosed()) {
if (s.isClosed() && _socketManagerState != SocketManagerState.INIT) {
if (_log.shouldLog(Log.INFO))
_log.info(tunnel.getClientOptions().getProperty("inbound.nickname") + ": Building a new socket manager since the old one closed [s=" + s + "]");
tunnel.removeSession(s);
// make sure the old one is closed
socketManager.destroySocketManager();
_socketManagerState = SocketManagerState.INIT;
// We could be here a LONG time, holding the lock
socketManager = buildSocketManager(tunnel, pkf);
} else {
@@ -424,6 +431,10 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
while (sockMgr.getSession().isClosed()) {
try {
sockMgr.getSession().connect();
synchronized(I2PTunnelClientBase.class) {
if (sockMgr == socketManager)
_socketManagerState = SocketManagerState.CONNECTED;
}
} catch (I2PSessionException ise) {
// shadows instance _log
Log _log = getTunnel().getContext().logManager().getLog(I2PTunnelClientBase.class);