forked from I2P_Developers/i2p.i2p
propagate from branch 'i2p.i2p.zzz.test2' (head 0feb2e6806927f68c7333aaa0892de185bb2629c)
to branch 'i2p.i2p' (head 0482fa843cb1e9d7ec281440056eef3a0ab07bdb)
This commit is contained in:
@@ -528,9 +528,9 @@ public class TrackerClient implements Runnable {
|
||||
!snark.isChecking() &&
|
||||
info.getSeedCount() > 100 &&
|
||||
coordinator.getPeerCount() <= 0 &&
|
||||
_util.getContext().clock().now() > _startedOn + 2*60*60*1000 &&
|
||||
_util.getContext().clock().now() > _startedOn + 30*60*1000 &&
|
||||
snark.getTotalLength() > 0 &&
|
||||
uploaded >= snark.getTotalLength() * 5 / 4) {
|
||||
uploaded >= snark.getTotalLength() / 2) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Auto stopping " + snark.getBaseName());
|
||||
snark.setAutoStoppable(false);
|
||||
|
@@ -21,6 +21,7 @@ import net.i2p.I2PAppContext;
|
||||
import net.i2p.data.ByteArray;
|
||||
import net.i2p.util.BigPipedInputStream;
|
||||
import net.i2p.util.ByteCache;
|
||||
import net.i2p.util.I2PAppThread;
|
||||
import net.i2p.util.Log;
|
||||
import net.i2p.util.ReusableGZIPInputStream;
|
||||
|
||||
@@ -251,16 +252,27 @@ class HTTPResponseOutputStream extends FilterOutputStream {
|
||||
//out.flush();
|
||||
PipedInputStream pi = BigPipedInputStream.getInstance();
|
||||
PipedOutputStream po = new PipedOutputStream(pi);
|
||||
// Run in the client thread pool, as there should be an unused thread
|
||||
// there after the accept().
|
||||
// Overridden in I2PTunnelHTTPServer, where it does not use the client pool.
|
||||
try {
|
||||
I2PTunnelClientBase.getClientExecutor().execute(new Pusher(pi, out));
|
||||
} catch (RejectedExecutionException ree) {
|
||||
// shouldn't happen
|
||||
throw ree;
|
||||
}
|
||||
Runnable r = new Pusher(pi, out);
|
||||
out = po;
|
||||
// TODO we should be able to do this inline somehow
|
||||
TunnelControllerGroup tcg = TunnelControllerGroup.getInstance();
|
||||
if (tcg != null) {
|
||||
// Run in the client thread pool, as there should be an unused thread
|
||||
// there after the accept().
|
||||
// Overridden in I2PTunnelHTTPServer, where it does not use the client pool.
|
||||
try {
|
||||
tcg.getClientExecutor().execute(r);
|
||||
} catch (RejectedExecutionException ree) {
|
||||
// shouldn't happen
|
||||
throw ree;
|
||||
}
|
||||
} else {
|
||||
// Fallback in case TCG.getInstance() is null, never instantiated
|
||||
// and we were not started by TCG.
|
||||
// Maybe a plugin loaded before TCG? Should be rare.
|
||||
Thread t = new I2PAppThread(r, "Pusher");
|
||||
t.start();
|
||||
}
|
||||
}
|
||||
|
||||
private class Pusher implements Runnable {
|
||||
|
@@ -122,9 +122,11 @@ public class I2PTunnelClient extends I2PTunnelClientBase {
|
||||
int port = addr.getPort();
|
||||
i2ps = createI2PSocket(clientDest, port);
|
||||
i2ps.setReadTimeout(readTimeout);
|
||||
Thread t = new I2PTunnelRunner(s, i2ps, sockLock, null, null, mySockets,
|
||||
I2PTunnelRunner t = new I2PTunnelRunner(s, i2ps, sockLock, null, null, mySockets,
|
||||
(I2PTunnelRunner.FailCallback) null);
|
||||
t.start();
|
||||
// we are called from an unlimited thread pool, so run inline
|
||||
//t.start();
|
||||
t.run();
|
||||
} catch (Exception ex) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Error connecting", ex);
|
||||
|
@@ -16,12 +16,8 @@ import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import javax.net.ssl.SSLServerSocket;
|
||||
@@ -77,18 +73,7 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
|
||||
// true if we are chained from a server.
|
||||
private boolean chained;
|
||||
|
||||
/** how long to wait before dropping an idle thread */
|
||||
private static final long HANDLER_KEEPALIVE_MS = 2*60*1000;
|
||||
|
||||
/**
|
||||
* We keep a static pool of socket handlers for all clients,
|
||||
* as there is no need for isolation on the client side.
|
||||
* Extending classes may use it for other purposes.
|
||||
* Not for use by servers, as there is no limit on threads.
|
||||
*/
|
||||
private static volatile ThreadPoolExecutor _executor;
|
||||
private static int _executorThreadCount;
|
||||
private static final Object _executorLock = new Object();
|
||||
private volatile ThreadPoolExecutor _executor;
|
||||
|
||||
public static final String PROP_USE_SSL = I2PTunnelServer.PROP_USE_SSL;
|
||||
|
||||
@@ -116,11 +101,6 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
|
||||
_context.statManager().createRateStat("i2ptunnel.client.buildRunTime", "How long it takes to run a queued socket into an i2ptunnel runner?", "I2PTunnel", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
|
||||
_log = _context.logManager().getLog(getClass());
|
||||
|
||||
synchronized (_executorLock) {
|
||||
if (_executor == null)
|
||||
_executor = new CustomThreadPoolExecutor();
|
||||
}
|
||||
|
||||
Thread t = new I2PAppThread(this, "Client " + tunnel.listenHost + ':' + localPort);
|
||||
t.start();
|
||||
open = true;
|
||||
@@ -184,11 +164,6 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
|
||||
_context.statManager().createRateStat("i2ptunnel.client.buildRunTime", "How long it takes to run a queued socket into an i2ptunnel runner?", "I2PTunnel", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
|
||||
_log = _context.logManager().getLog(getClass());
|
||||
|
||||
synchronized (_executorLock) {
|
||||
if (_executor == null)
|
||||
_executor = new CustomThreadPoolExecutor();
|
||||
}
|
||||
|
||||
// normalize path so we can find it
|
||||
if (pkf != null) {
|
||||
File keyFile = new File(pkf);
|
||||
@@ -361,6 +336,16 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
|
||||
return socketManager;
|
||||
}
|
||||
|
||||
/**
|
||||
* Kill the shared client, so that on restart in android
|
||||
* we won't latch onto the old one
|
||||
*
|
||||
* @since 0.9.18
|
||||
*/
|
||||
protected static synchronized void killSharedClient() {
|
||||
socketManager = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* This may take a LONG time.
|
||||
*
|
||||
@@ -653,6 +638,16 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
|
||||
}
|
||||
}
|
||||
|
||||
TunnelControllerGroup tcg = TunnelControllerGroup.getInstance();
|
||||
if (tcg != null) {
|
||||
_executor = tcg.getClientExecutor();
|
||||
} else {
|
||||
// Fallback in case TCG.getInstance() is null, never instantiated
|
||||
// and we were not started by TCG.
|
||||
// Maybe a plugin loaded before TCG? Should be rare.
|
||||
// Never shut down.
|
||||
_executor = new TunnelControllerGroup.CustomThreadPoolExecutor();
|
||||
}
|
||||
while (open) {
|
||||
Socket s = ss.accept();
|
||||
manageConnection(s);
|
||||
@@ -672,30 +667,6 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return may be null if no class has been instantiated
|
||||
* @since 0.8.8
|
||||
*/
|
||||
static ThreadPoolExecutor getClientExecutor() {
|
||||
return _executor;
|
||||
}
|
||||
|
||||
/**
|
||||
* @since 0.8.8
|
||||
*/
|
||||
static void killClientExecutor() {
|
||||
synchronized (_executorLock) {
|
||||
if (_executor != null) {
|
||||
_executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
|
||||
_executor.shutdownNow();
|
||||
_executor = null;
|
||||
}
|
||||
// kill the shared client, so that on restart in android
|
||||
// we won't latch onto the old one
|
||||
socketManager = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Manage the connection just opened on the specified socket
|
||||
*
|
||||
@@ -721,26 +692,6 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Not really needed for now but in case we want to add some hooks like afterExecute().
|
||||
*/
|
||||
private static class CustomThreadPoolExecutor extends ThreadPoolExecutor {
|
||||
public CustomThreadPoolExecutor() {
|
||||
super(0, Integer.MAX_VALUE, HANDLER_KEEPALIVE_MS, TimeUnit.MILLISECONDS,
|
||||
new SynchronousQueue<Runnable>(), new CustomThreadFactory());
|
||||
}
|
||||
}
|
||||
|
||||
/** just to set the name and set Daemon */
|
||||
private static class CustomThreadFactory implements ThreadFactory {
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread rv = Executors.defaultThreadFactory().newThread(r);
|
||||
rv.setName("I2PTunnel Client Runner " + (++_executorThreadCount));
|
||||
rv.setDaemon(true);
|
||||
return rv;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Blocking runner, used during the connection establishment
|
||||
*/
|
||||
@@ -822,7 +773,10 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
|
||||
|
||||
/**
|
||||
* Manage a connection in a separate thread. This only works if
|
||||
* you do not override manageConnection()
|
||||
* you do not override manageConnection().
|
||||
*
|
||||
* This is run in a thread from an unlimited-size thread pool,
|
||||
* so it may block or run indefinitely.
|
||||
*/
|
||||
protected abstract void clientConnectionRun(Socket s);
|
||||
}
|
||||
|
@@ -292,7 +292,9 @@ public class I2PTunnelConnectClient extends I2PTunnelHTTPClientBase implements R
|
||||
response = SUCCESS_RESPONSE;
|
||||
OnTimeout onTimeout = new OnTimeout(s, s.getOutputStream(), targetRequest, usingWWWProxy, currentProxy, requestId);
|
||||
Thread t = new I2PTunnelRunner(s, i2ps, sockLock, data, response, mySockets, onTimeout);
|
||||
t.start();
|
||||
// we are called from an unlimited thread pool, so run inline
|
||||
//t.start();
|
||||
t.run();
|
||||
} catch (IOException ex) {
|
||||
_log.info(getPrefix(requestId) + "Error trying to connect", ex);
|
||||
handleClientException(ex, out, targetRequest, usingWWWProxy, currentProxy, requestId);
|
||||
|
@@ -972,7 +972,9 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
|
||||
response = null;
|
||||
}
|
||||
Thread t = new I2PTunnelOutproxyRunner(s, outSocket, sockLock, data, response, onTimeout);
|
||||
t.start();
|
||||
// we are called from an unlimited thread pool, so run inline
|
||||
//t.start();
|
||||
t.run();
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -1091,6 +1093,7 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
|
||||
sktOpts.setPort(remotePort);
|
||||
I2PSocket i2ps = createI2PSocket(clientDest, sktOpts);
|
||||
OnTimeout onTimeout = new OnTimeout(s, s.getOutputStream(), targetRequest, usingWWWProxy, currentProxy, requestId);
|
||||
Thread t;
|
||||
if (method.toUpperCase(Locale.US).equals("CONNECT")) {
|
||||
byte[] data;
|
||||
byte[] response;
|
||||
@@ -1101,13 +1104,14 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
|
||||
data = null;
|
||||
response = SUCCESS_RESPONSE;
|
||||
}
|
||||
Thread t = new I2PTunnelRunner(s, i2ps, sockLock, data, response, mySockets, onTimeout);
|
||||
t.start();
|
||||
t = new I2PTunnelRunner(s, i2ps, sockLock, data, response, mySockets, onTimeout);
|
||||
} else {
|
||||
byte[] data = newRequest.toString().getBytes("ISO-8859-1");
|
||||
Thread t = new I2PTunnelHTTPClientRunner(s, i2ps, sockLock, data, mySockets, onTimeout);
|
||||
t.start();
|
||||
t = new I2PTunnelHTTPClientRunner(s, i2ps, sockLock, data, mySockets, onTimeout);
|
||||
}
|
||||
// we are called from an unlimited thread pool, so run inline
|
||||
//t.start();
|
||||
t.run();
|
||||
} catch(IOException ex) {
|
||||
if(_log.shouldLog(Log.INFO)) {
|
||||
_log.info(getPrefix(requestId) + "Error trying to connect", ex);
|
||||
|
@@ -86,7 +86,8 @@ public class I2PTunnelHTTPClientRunner extends I2PTunnelRunner {
|
||||
// ignore
|
||||
}
|
||||
t1.join(30*1000);
|
||||
t2.join(30*1000);
|
||||
// t2 = fromI2P now run inline
|
||||
//t2.join(30*1000);
|
||||
}
|
||||
|
||||
}
|
||||
|
@@ -302,16 +302,16 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Modified header: [" + modifiedHeader + "]");
|
||||
|
||||
Runnable t;
|
||||
if (allowGZIP && useGZIP) {
|
||||
I2PAppThread req = new I2PAppThread(
|
||||
new CompressedRequestor(s, socket, modifiedHeader, getTunnel().getContext(), _log),
|
||||
Thread.currentThread().getName()+".hc");
|
||||
req.start();
|
||||
t = new CompressedRequestor(s, socket, modifiedHeader, getTunnel().getContext(), _log);
|
||||
} else {
|
||||
Thread t = new I2PTunnelRunner(s, socket, slock, null, modifiedHeader.getBytes(),
|
||||
t = new I2PTunnelRunner(s, socket, slock, null, modifiedHeader.getBytes(),
|
||||
null, (I2PTunnelRunner.FailCallback) null);
|
||||
t.start();
|
||||
}
|
||||
// run in the unlimited client pool
|
||||
//t.start();
|
||||
_clientExecutor.execute(t);
|
||||
|
||||
long afterHandle = getTunnel().getContext().clock().now();
|
||||
long timeToHandle = afterHandle - afterAccept;
|
||||
|
@@ -136,8 +136,11 @@ public class I2PTunnelIRCClient extends I2PTunnelClientBase {
|
||||
DCCHelper dcc = _dccEnabled ? new DCC(s.getLocalAddress().getAddress()) : null;
|
||||
Thread in = new I2PAppThread(new IrcInboundFilter(s,i2ps, expectedPong, _log, dcc), "IRC Client " + _clientId + " in", true);
|
||||
in.start();
|
||||
Thread out = new I2PAppThread(new IrcOutboundFilter(s,i2ps, expectedPong, _log, dcc), "IRC Client " + _clientId + " out", true);
|
||||
out.start();
|
||||
//Thread out = new I2PAppThread(new IrcOutboundFilter(s,i2ps, expectedPong, _log, dcc), "IRC Client " + _clientId + " out", true);
|
||||
Runnable out = new IrcOutboundFilter(s,i2ps, expectedPong, _log, dcc);
|
||||
// we are called from an unlimited thread pool, so run inline
|
||||
//out.start();
|
||||
out.run();
|
||||
} catch (Exception ex) {
|
||||
// generally NoRouteToHostException
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
|
@@ -140,7 +140,9 @@ public class I2PTunnelIRCServer extends I2PTunnelServer implements Runnable {
|
||||
Socket s = getSocket(socket.getPeerDestination().calculateHash(), socket.getLocalPort());
|
||||
Thread t = new I2PTunnelRunner(s, socket, slock, null, modifiedRegistration.getBytes(),
|
||||
null, (I2PTunnelRunner.FailCallback) null);
|
||||
t.start();
|
||||
// run in the unlimited client pool
|
||||
//t.start();
|
||||
_clientExecutor.execute(t);
|
||||
} catch (SocketException ex) {
|
||||
try {
|
||||
// Send a response so the user doesn't just see a disconnect
|
||||
|
@@ -62,8 +62,6 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
|
||||
private long totalSent;
|
||||
private long totalReceived;
|
||||
|
||||
private static final AtomicLong __forwarderId = new AtomicLong();
|
||||
|
||||
/**
|
||||
* For use in new constructor
|
||||
* @since 0.9.14
|
||||
@@ -268,9 +266,10 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
|
||||
in = new BufferedInputStream(in, 2*NETWORK_BUFFER_SIZE);
|
||||
StreamForwarder toI2P = new StreamForwarder(in, i2pout, true);
|
||||
StreamForwarder fromI2P = new StreamForwarder(i2pin, out, false);
|
||||
// TODO can we run one of these inline and save a thread?
|
||||
toI2P.start();
|
||||
fromI2P.start();
|
||||
// We are already a thread, so run the second one inline
|
||||
//fromI2P.start();
|
||||
fromI2P.run();
|
||||
synchronized (finishLock) {
|
||||
while (!finished) {
|
||||
finishLock.wait();
|
||||
@@ -384,7 +383,8 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
|
||||
// ignore
|
||||
}
|
||||
t1.join(30*1000);
|
||||
t2.join(30*1000);
|
||||
// t2 = fromI2P now run inline
|
||||
//t2.join(30*1000);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -426,7 +426,7 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
|
||||
_toI2P = toI2P;
|
||||
direction = (toI2P ? "toI2P" : "fromI2P");
|
||||
_cache = ByteCache.getInstance(32, NETWORK_BUFFER_SIZE);
|
||||
setName("StreamForwarder " + _runnerId + '.' + __forwarderId.incrementAndGet());
|
||||
setName("StreamForwarder " + _runnerId + '.' + direction);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@@ -80,6 +80,7 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable {
|
||||
protected I2PTunnelTask task;
|
||||
protected boolean bidir;
|
||||
private ThreadPoolExecutor _executor;
|
||||
protected volatile ThreadPoolExecutor _clientExecutor;
|
||||
private final Map<Integer, InetSocketAddress> _socketMap = new ConcurrentHashMap<Integer, InetSocketAddress>(4);
|
||||
|
||||
/** unused? port should always be specified */
|
||||
@@ -470,6 +471,16 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable {
|
||||
if (_usePool) {
|
||||
_executor = new CustomThreadPoolExecutor(getHandlerCount(), "ServerHandler pool " + remoteHost + ':' + remotePort);
|
||||
}
|
||||
TunnelControllerGroup tcg = TunnelControllerGroup.getInstance();
|
||||
if (tcg != null) {
|
||||
_clientExecutor = tcg.getClientExecutor();
|
||||
} else {
|
||||
// Fallback in case TCG.getInstance() is null, never instantiated
|
||||
// and we were not started by TCG.
|
||||
// Maybe a plugin loaded before TCG? Should be rare.
|
||||
// Never shut down.
|
||||
_clientExecutor = new TunnelControllerGroup.CustomThreadPoolExecutor();
|
||||
}
|
||||
while (open) {
|
||||
try {
|
||||
I2PServerSocket ci2pss = i2pss;
|
||||
@@ -563,6 +574,17 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This is run in a thread from a limited-size thread pool via Handler.run(),
|
||||
* except for a standard server (this class, no extension, as determined in getUsePool()),
|
||||
* it is run directly in the acceptor thread (see run()).
|
||||
*
|
||||
* In either case, this method and any overrides must spawn a thread and return quickly.
|
||||
* If blocking while reading the headers (as in HTTP and IRC), the thread pool
|
||||
* may be exhausted.
|
||||
*
|
||||
* See PROP_USE_POOL, DEFAULT_USE_POOL, PROP_HANDLER_COUNT, DEFAULT_HANDLER_COUNT
|
||||
*/
|
||||
protected void blockingHandle(I2PSocket socket) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Incoming connection to '" + toString() + "' port " + socket.getLocalPort() +
|
||||
@@ -577,7 +599,9 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable {
|
||||
afterSocket = getTunnel().getContext().clock().now();
|
||||
Thread t = new I2PTunnelRunner(s, socket, slock, null, null,
|
||||
null, (I2PTunnelRunner.FailCallback) null);
|
||||
t.start();
|
||||
// run in the unlimited client pool
|
||||
//t.start();
|
||||
_clientExecutor.execute(t);
|
||||
|
||||
long afterHandle = getTunnel().getContext().clock().now();
|
||||
long timeToHandle = afterHandle - afterAccept;
|
||||
|
@@ -425,7 +425,7 @@ public class TunnelController implements Logging {
|
||||
// We use _sessions AND the tunnel sessions as
|
||||
// _sessions will be null for delay-open tunnels - see acquire().
|
||||
// We want the current sessions.
|
||||
Set<I2PSession> sessions = new HashSet(_tunnel.getSessions());
|
||||
Set<I2PSession> sessions = new HashSet<I2PSession>(_tunnel.getSessions());
|
||||
if (_sessions != null)
|
||||
sessions.addAll(_sessions);
|
||||
return sessions;
|
||||
|
@@ -9,6 +9,13 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import net.i2p.I2PAppContext;
|
||||
import net.i2p.app.*;
|
||||
@@ -48,6 +55,21 @@ public class TunnelControllerGroup implements ClientApp {
|
||||
*/
|
||||
private final Map<I2PSession, Set<TunnelController>> _sessions;
|
||||
|
||||
/**
|
||||
* We keep a pool of socket handlers for all clients,
|
||||
* as there is no need for isolation on the client side.
|
||||
* Extending classes may use it for other purposes.
|
||||
*
|
||||
* May also be used by servers, carefully,
|
||||
* as there is no limit on threads.
|
||||
*/
|
||||
private ThreadPoolExecutor _executor;
|
||||
private static final AtomicLong _executorThreadCount = new AtomicLong();
|
||||
private final Object _executorLock = new Object();
|
||||
/** how long to wait before dropping an idle thread */
|
||||
private static final long HANDLER_KEEPALIVE_MS = 2*60*1000;
|
||||
|
||||
|
||||
/**
|
||||
* In I2PAppContext will instantiate if necessary and always return non-null.
|
||||
* As of 0.9.4, when in RouterContext, will return null (except in Android)
|
||||
@@ -206,8 +228,7 @@ public class TunnelControllerGroup implements ClientApp {
|
||||
if (_instance == this)
|
||||
_instance = null;
|
||||
}
|
||||
/// fixme static
|
||||
I2PTunnelClientBase.killClientExecutor();
|
||||
killClientExecutor();
|
||||
changeState(STOPPED);
|
||||
}
|
||||
|
||||
@@ -500,4 +521,59 @@ public class TunnelControllerGroup implements ClientApp {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return non-null
|
||||
* @since 0.8.8 Moved from I2PTunnelClientBase in 0.9.18
|
||||
*/
|
||||
ThreadPoolExecutor getClientExecutor() {
|
||||
synchronized (_executorLock) {
|
||||
if (_executor == null)
|
||||
_executor = new CustomThreadPoolExecutor();
|
||||
}
|
||||
return _executor;
|
||||
}
|
||||
|
||||
/**
|
||||
* @since 0.8.8 Moved from I2PTunnelClientBase in 0.9.18
|
||||
*/
|
||||
private void killClientExecutor() {
|
||||
synchronized (_executorLock) {
|
||||
if (_executor != null) {
|
||||
_executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
|
||||
_executor.shutdownNow();
|
||||
_executor = null;
|
||||
}
|
||||
}
|
||||
// kill the shared client, so that on restart in android
|
||||
// we won't latch onto the old one
|
||||
I2PTunnelClientBase.killSharedClient();
|
||||
}
|
||||
|
||||
/**
|
||||
* Not really needed for now but in case we want to add some hooks like afterExecute().
|
||||
* Package private for fallback in case TCG.getInstance() is null, never instantiated
|
||||
* but a plugin still needs it... should be rare.
|
||||
*
|
||||
* @since 0.9.18 Moved from I2PTunnelClientBase
|
||||
*/
|
||||
static class CustomThreadPoolExecutor extends ThreadPoolExecutor {
|
||||
public CustomThreadPoolExecutor() {
|
||||
super(0, Integer.MAX_VALUE, HANDLER_KEEPALIVE_MS, TimeUnit.MILLISECONDS,
|
||||
new SynchronousQueue<Runnable>(), new CustomThreadFactory());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Just to set the name and set Daemon
|
||||
* @since 0.9.18 Moved from I2PTunnelClientBase
|
||||
*/
|
||||
private static class CustomThreadFactory implements ThreadFactory {
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread rv = Executors.defaultThreadFactory().newThread(r);
|
||||
rv.setName("I2PTunnel Client Runner " + _executorThreadCount.incrementAndGet());
|
||||
rv.setDaemon(true);
|
||||
return rv;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -76,7 +76,9 @@ public class I2PTunnelDCCClient extends I2PTunnelClientBase {
|
||||
try {
|
||||
i2ps = createI2PSocket(dest, opts);
|
||||
Thread t = new Runner(s, i2ps);
|
||||
t.start();
|
||||
// we are called from an unlimited thread pool, so run inline
|
||||
//t.start();
|
||||
t.run();
|
||||
} catch (Exception ex) {
|
||||
_log.error("Could not make DCC connection to " + _dest + ':' + _remotePort, ex);
|
||||
closeSocket(s);
|
||||
|
@@ -111,7 +111,9 @@ public class I2PTunnelDCCServer extends I2PTunnelServer {
|
||||
_sockList.add(socket);
|
||||
Thread t = new I2PTunnelRunner(s, socket, slock, null, null, _sockList,
|
||||
(I2PTunnelRunner.FailCallback) null);
|
||||
t.start();
|
||||
// run in the unlimited client pool
|
||||
//t.start();
|
||||
_clientExecutor.execute(t);
|
||||
local.socket = socket;
|
||||
local.expire = getTunnel().getContext().clock().now() + OUTBOUND_EXPIRE;
|
||||
_active.put(Integer.valueOf(myPort), local);
|
||||
|
@@ -55,9 +55,12 @@ public class I2PSOCKSIRCTunnel extends I2PSOCKSTunnel {
|
||||
Thread in = new I2PAppThread(new IrcInboundFilter(clientSock, destSock, expectedPong, _log),
|
||||
"SOCKS IRC Client " + id + " in", true);
|
||||
in.start();
|
||||
Thread out = new I2PAppThread(new IrcOutboundFilter(clientSock, destSock, expectedPong, _log),
|
||||
"SOCKS IRC Client " + id + " out", true);
|
||||
out.start();
|
||||
//Thread out = new I2PAppThread(new IrcOutboundFilter(clientSock, destSock, expectedPong, _log),
|
||||
// "SOCKS IRC Client " + id + " out", true);
|
||||
Runnable out = new IrcOutboundFilter(clientSock, destSock, expectedPong, _log);
|
||||
// we are called from an unlimited thread pool, so run inline
|
||||
//out.start();
|
||||
out.run();
|
||||
} catch (SOCKSException e) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Error from SOCKS connection", e);
|
||||
|
@@ -56,7 +56,9 @@ public class I2PSOCKSTunnel extends I2PTunnelClientBase {
|
||||
I2PSocket destSock = serv.getDestinationI2PSocket(this);
|
||||
Thread t = new I2PTunnelRunner(clientSock, destSock, sockLock, null, null, mySockets,
|
||||
(I2PTunnelRunner.FailCallback) null);
|
||||
t.start();
|
||||
// we are called from an unlimited thread pool, so run inline
|
||||
//t.start();
|
||||
t.run();
|
||||
} catch (SOCKSException e) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Error from SOCKS connection", e);
|
||||
|
@@ -148,6 +148,13 @@ public class NetDbRenderer {
|
||||
else
|
||||
buf.append(dest.toBase64().substring(0, 6));
|
||||
buf.append(")<br>\n");
|
||||
String b32 = dest.toBase32();
|
||||
buf.append("<a href=\"http://").append(b32).append("\">").append(b32).append("</a><br>\n");
|
||||
String host = _context.namingService().reverseLookup(dest);
|
||||
if (host == null) {
|
||||
buf.append("<a href=\"/susidns/addressbook.jsp?book=private&destination=")
|
||||
.append(dest.toBase64()).append("#add\">").append(_("Add to local addressbook")).append("</a><br>\n");
|
||||
}
|
||||
} else {
|
||||
buf.append(" (").append(_("Destination")).append(' ');
|
||||
String host = _context.namingService().reverseLookup(dest);
|
||||
|
BIN
apps/susimail/src/icons/drive_edit.png
Normal file
BIN
apps/susimail/src/icons/drive_edit.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 714 B |
@@ -88,6 +88,10 @@ class Mail {
|
||||
error = "";
|
||||
}
|
||||
|
||||
/**
|
||||
* This may or may not contain the body also.
|
||||
* @return may be null
|
||||
*/
|
||||
public synchronized ReadBuffer getHeader() {
|
||||
return header;
|
||||
}
|
||||
@@ -103,6 +107,10 @@ class Mail {
|
||||
return header != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* This contains the header also.
|
||||
* @return may be null
|
||||
*/
|
||||
public synchronized ReadBuffer getBody() {
|
||||
return body;
|
||||
}
|
||||
|
@@ -116,6 +116,7 @@ public class WebMail extends HttpServlet
|
||||
private static final String LOGOUT = "logout";
|
||||
private static final String RELOAD = "reload";
|
||||
private static final String SAVE = "save";
|
||||
private static final String SAVE_AS = "saveas";
|
||||
private static final String REFRESH = "refresh";
|
||||
private static final String CONFIGURE = "configure";
|
||||
private static final String NEW = "new";
|
||||
@@ -1298,6 +1299,33 @@ public class WebMail extends HttpServlet
|
||||
return isRaw;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Process save-as link in message view
|
||||
*
|
||||
* @param sessionObject
|
||||
* @param request
|
||||
* @return If true, we sent the file or 404, do not send any other response
|
||||
* @since 0.9.18
|
||||
*/
|
||||
private static boolean processSaveAsLink(SessionObject sessionObject, RequestWrapper request, HttpServletResponse response)
|
||||
{
|
||||
String str = request.getParameter(SAVE_AS);
|
||||
if( str == null )
|
||||
return false;
|
||||
Mail mail = sessionObject.mailCache.getMail( sessionObject.showUIDL, MailCache.FetchMode.ALL );
|
||||
if( mail != null ) {
|
||||
if (sendMailSaveAs(sessionObject, mail, response))
|
||||
return true;
|
||||
}
|
||||
// error if we get here
|
||||
sessionObject.error += _("Message not found.");
|
||||
try {
|
||||
response.sendError(404, _("Message not found."));
|
||||
} catch (IOException ioe) {}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param hashCode
|
||||
* @return the part or null
|
||||
@@ -1631,6 +1659,10 @@ public class WebMail extends HttpServlet
|
||||
// download or raw view sent, or 404
|
||||
return;
|
||||
}
|
||||
if (processSaveAsLink(sessionObject, request, response)) {
|
||||
// download or sent, or 404
|
||||
return;
|
||||
}
|
||||
// If the last message has just been deleted then
|
||||
// sessionObject.state = STATE_LIST and
|
||||
// sessionObject.showUIDL = null
|
||||
@@ -1790,7 +1822,7 @@ public class WebMail extends HttpServlet
|
||||
name = part.name;
|
||||
else
|
||||
name = "part" + part.hashCode();
|
||||
String name2 = name.replace( "\\.", "_" );
|
||||
String name2 = sanitizeFilename(name);
|
||||
response.setContentType( "application/zip; name=\"" + name2 + ".zip\"" );
|
||||
response.addHeader( "Content-Disposition:", "attachment; filename=\"" + name2 + ".zip\"" );
|
||||
ZipEntry entry = new ZipEntry( name );
|
||||
@@ -1809,6 +1841,54 @@ public class WebMail extends HttpServlet
|
||||
}
|
||||
return shown;
|
||||
}
|
||||
|
||||
/**
|
||||
* Send the mail to be saved by the browser
|
||||
*
|
||||
* @param sessionObject
|
||||
* @param response
|
||||
* @return success
|
||||
* @since 0.9.18
|
||||
*/
|
||||
private static boolean sendMailSaveAs(SessionObject sessionObject, Mail mail,
|
||||
HttpServletResponse response)
|
||||
{
|
||||
ReadBuffer content = mail.getBody();
|
||||
|
||||
if(content == null)
|
||||
return false;
|
||||
String name = mail.subject != null ? sanitizeFilename(mail.subject) : "message";
|
||||
try {
|
||||
response.setContentType("message/rfc822");
|
||||
response.setContentLength(content.length);
|
||||
// cache-control?
|
||||
response.addHeader( "Content-Disposition:", "attachment; filename=\"" + name + ".eml\"" );
|
||||
response.getOutputStream().write(content.content, content.offset, content.length);
|
||||
return true;
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert the UTF-8 to ISO-8859-1 suitable for inclusion in a header.
|
||||
* This will result in a bunch of ??? for non-Western languages.
|
||||
*
|
||||
* @param sessionObject
|
||||
* @param response
|
||||
* @return success
|
||||
* @since 0.9.18
|
||||
*/
|
||||
private static String sanitizeFilename(String name) {
|
||||
try {
|
||||
name = new String(name.getBytes("ISO-8859-1"), "ISO-8859-1");
|
||||
} catch( UnsupportedEncodingException uee ) {}
|
||||
// strip control chars?
|
||||
name = name.replace('"', '_');
|
||||
return name;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param sessionObject
|
||||
* @param request
|
||||
@@ -2255,7 +2335,8 @@ public class WebMail extends HttpServlet
|
||||
out.println( button( NEW, _("New") ) + spacer +
|
||||
button( REPLY, _("Reply") ) +
|
||||
button( REPLYALL, _("Reply All") ) +
|
||||
button( FORWARD, _("Forward") ) + spacer);
|
||||
button( FORWARD, _("Forward") ) + spacer +
|
||||
button( SAVE_AS, _("Save As") ) + spacer);
|
||||
if (sessionObject.reallyDelete)
|
||||
out.println(button2(DELETE, _("Delete")));
|
||||
else
|
||||
|
Reference in New Issue
Block a user