propagate from branch 'i2p.i2p.zzz.android' (head cbf2d39e1944b9d601558761d0eedcdebfd2f589)

to branch 'i2p.i2p' (head c2393e50afccfd5682a9086f0eec2a0700cda2c9)
This commit is contained in:
zzz
2011-06-30 12:27:00 +00:00
53 changed files with 850 additions and 215 deletions

View File

@ -32,7 +32,7 @@ import net.i2p.client.naming.NamingServiceUpdater;
* @author Ragnarok
*
*/
class DaemonThread extends Thread implements NamingServiceUpdater {
public class DaemonThread extends Thread implements NamingServiceUpdater {
private String[] args;

View File

@ -24,7 +24,6 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.TimerTask;
import net.i2p.I2PAppContext;
@ -32,7 +31,7 @@ import net.i2p.I2PAppContext;
* TimerTask that checks for good/bad up/downloader. Works together
* with the PeerCoordinator to select which Peers get (un)choked.
*/
class PeerCheckerTask extends TimerTask
class PeerCheckerTask implements Runnable
{
private static final long KILOPERSECOND = 1024*(PeerCoordinator.CHECK_PERIOD/1000);
@ -54,8 +53,6 @@ class PeerCheckerTask extends TimerTask
List<Peer> peerList = coordinator.peerList();
if (peerList.isEmpty() || coordinator.halted()) {
coordinator.setRateHistory(0, 0);
if (coordinator.halted())
cancel();
return;
}

View File

@ -21,7 +21,6 @@
package org.klomp.snark;
import java.util.Iterator;
import java.util.TimerTask;
import net.i2p.data.DataHelper;
@ -29,7 +28,7 @@ import net.i2p.data.DataHelper;
* TimerTask that monitors the peers and total up/download speeds.
* Works together with the main Snark class to report periodical statistics.
*/
class PeerMonitorTask extends TimerTask
class PeerMonitorTask implements Runnable
{
final static long MONITOR_PERIOD = 10 * 1000; // Ten seconds.
private static final long KILOPERSECOND = 1024 * (MONITOR_PERIOD / 1000);

View File

@ -32,8 +32,6 @@ import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.StringTokenizer;
import java.util.Timer;
import java.util.TimerTask;
import net.i2p.I2PAppContext;
import net.i2p.client.streaming.I2PServerSocket;

View File

@ -48,14 +48,14 @@ public class SnarkManager implements Snark.CompleteListener {
private final Object _addSnarkLock;
private /* FIXME final FIXME */ File _configFile;
private Properties _config;
private I2PAppContext _context;
private Log _log;
private final I2PAppContext _context;
private final Log _log;
private final List _messages;
private I2PSnarkUtil _util;
private final I2PSnarkUtil _util;
private PeerCoordinatorSet _peerCoordinatorSet;
private ConnectionAcceptor _connectionAcceptor;
private Thread _monitor;
private boolean _running;
private volatile boolean _running;
public static final String PROP_I2CP_HOST = "i2psnark.i2cpHost";
public static final String PROP_I2CP_PORT = "i2psnark.i2cpPort";
@ -1089,7 +1089,7 @@ public class SnarkManager implements Snark.CompleteListener {
// although the user will see the default until then
getBWLimit();
boolean doMagnets = true;
while (true) {
while (_running) {
File dir = getDataDir();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Directory Monitor loop over " + dir.getAbsolutePath());

View File

@ -233,7 +233,7 @@ class HTTPResponseOutputStream extends FilterOutputStream {
// there after the accept().
// Overridden in I2PTunnelHTTPServer, where it does not use the client pool.
try {
I2PTunnelClientBase._executor.execute(new Pusher(pi, out));
I2PTunnelClientBase.getClientExecutor().execute(new Pusher(pi, out));
} catch (RejectedExecutionException ree) {
// shouldn't happen
throw ree;

View File

@ -73,12 +73,12 @@ import net.i2p.util.Log;
* Todo: Most events are not listened to elsewhere, so error propagation is poor
*/
public class I2PTunnel implements Logging, EventDispatcher {
private Log _log;
private EventDispatcherImpl _event;
private I2PAppContext _context;
private final Log _log;
private final EventDispatcherImpl _event;
private final I2PAppContext _context;
private static long __tunnelId = 0;
private long _tunnelId;
private Properties _clientOptions;
private final long _tunnelId;
private final Properties _clientOptions;
private final List<I2PSession> _sessions;
public static final int PACKET_DELAY = 100;

View File

@ -83,11 +83,8 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
* Extending classes may use it for other purposes.
* Not for use by servers, as there is no limit on threads.
*/
static final Executor _executor;
private static volatile ThreadPoolExecutor _executor;
private static int _executorThreadCount;
static {
_executor = new CustomThreadPoolExecutor();
}
public I2PTunnelClientBase(int localPort, Logging l, I2PSocketManager sktMgr,
I2PTunnel tunnel, EventDispatcher notifyThis, long clientId )
@ -107,6 +104,11 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
_context.statManager().createRateStat("i2ptunnel.client.buildRunTime", "How long it takes to run a queued socket into an i2ptunnel runner?", "I2PTunnel", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_log = _context.logManager().getLog(getClass());
synchronized (I2PTunnelClientBase.class) {
if (_executor == null)
_executor = new CustomThreadPoolExecutor();
}
Thread t = new I2PAppThread(this, "Client " + tunnel.listenHost + ':' + localPort);
listenerReady = false;
t.start();
@ -160,6 +162,11 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
_context.statManager().createRateStat("i2ptunnel.client.buildRunTime", "How long it takes to run a queued socket into an i2ptunnel runner?", "I2PTunnel", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_log = _context.logManager().getLog(getClass());
synchronized (I2PTunnelClientBase.class) {
if (_executor == null)
_executor = new CustomThreadPoolExecutor();
}
// normalize path so we can find it
if (pkf != null) {
File keyFile = new File(pkf);
@ -551,6 +558,30 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
}
}
/**
* @return may be null if no class has been instantiated
* @since 0.8.8
*/
static ThreadPoolExecutor getClientExecutor() {
return _executor;
}
/**
* @since 0.8.8
*/
static void killClientExecutor() {
synchronized (I2PTunnelClientBase.class) {
if (_executor != null) {
_executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
_executor.shutdownNow();
_executor = null;
}
// kill the shared client, so that on restart in android
// we won't latch onto the old one
socketManager = null;
}
}
/**
* Manage the connection just opened on the specified socket
*
@ -558,8 +589,16 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
*/
protected void manageConnection(Socket s) {
if (s == null) return;
ThreadPoolExecutor tpe = _executor;
if (tpe == null) {
_log.error("No executor for socket!");
try {
s.close();
} catch (IOException ioe) {}
return;
}
try {
_executor.execute(new BlockingRunner(s));
tpe.execute(new BlockingRunner(s));
} catch (RejectedExecutionException ree) {
// should never happen, we have an unbounded pool and never stop the executor
try {
@ -635,7 +674,6 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
}
//l.log("Client closed.");
}
return true;
}

View File

@ -67,6 +67,7 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable {
protected I2PTunnelTask task = null;
protected boolean bidir = false;
private ThreadPoolExecutor _executor;
private int DEFAULT_LOCALPORT = 4488;
protected int localPort = DEFAULT_LOCALPORT;
@ -259,6 +260,10 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable {
}
//l.log("Server shut down.");
open = false;
if (_usePool && _executor != null) {
_executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
_executor.shutdownNow();
}
return true;
}
}
@ -283,7 +288,6 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable {
*/
public void run() {
I2PServerSocket i2pS_S = sockMgr.getServerSocket();
ThreadPoolExecutor executor = null;
if (_log.shouldLog(Log.WARN)) {
if (_usePool)
_log.warn("Starting executor with " + getHandlerCount() + " threads max");
@ -291,7 +295,7 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable {
_log.warn("Threads disabled, running blockingHandles inline");
}
if (_usePool) {
executor = new CustomThreadPoolExecutor(getHandlerCount(), "ServerHandler pool " + remoteHost + ':' + remotePort);
_executor = new CustomThreadPoolExecutor(getHandlerCount(), "ServerHandler pool " + remoteHost + ':' + remotePort);
}
while (open) {
try {
@ -299,7 +303,7 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable {
if (i2ps == null) throw new I2PException("I2PServerSocket closed");
if (_usePool) {
try {
executor.execute(new Handler(i2ps));
_executor.execute(new Handler(i2ps));
} catch (RejectedExecutionException ree) {
try {
i2ps.close();
@ -328,8 +332,8 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable {
// ignored, we never set the timeout
}
}
if (executor != null)
executor.shutdownNow();
if (_executor != null)
_executor.shutdownNow();
}
/**

View File

@ -26,7 +26,7 @@ import net.i2p.util.OrderedProperties;
* Warning - this is a singleton. Todo: fix
*/
public class TunnelControllerGroup {
private final Log _log;
private Log _log;
private static TunnelControllerGroup _instance;
static final String DEFAULT_CONFIG_FILE = "i2ptunnel.config";
@ -55,6 +55,7 @@ public class TunnelControllerGroup {
_configFile = configFile;
_sessions = new HashMap(4);
loadControllers(_configFile);
I2PAppContext.getGlobalContext().addShutdownTask(new Shutdown());
}
public static void main(String args[]) {
@ -71,6 +72,34 @@ public class TunnelControllerGroup {
}
}
}
/**
* Warning - destroys the singleton!
* @since 0.8.8
*/
private static class Shutdown implements Runnable {
public void run() {
shutdown();
}
}
/**
* Warning - destroys the singleton!
* Caller must root a new context before calling instance() or main() again.
* Agressively kill and null everything to reduce memory usage in the JVM
* after stopping, and to recognize what must be reinitialized on restart (Android)
*
* @since 0.8.8
*/
public static void shutdown() {
synchronized (TunnelControllerGroup.class) {
if (_instance == null) return;
_instance.unloadControllers();
_instance._log = null;
_instance = null;
}
I2PTunnelClientBase.killClientExecutor();
}
/**
* Load up all of the tunnels configured in the given file (but do not start

View File

@ -37,6 +37,8 @@ public class NewsFetcher implements Runnable, EepGet.StatusListener {
private File _newsFile;
private File _tempFile;
private static NewsFetcher _instance;
private volatile boolean _isRunning;
//public static final synchronized NewsFetcher getInstance() { return _instance; }
public static final synchronized NewsFetcher getInstance(I2PAppContext ctx) {
if (_instance != null)
@ -64,8 +66,14 @@ public class NewsFetcher implements Runnable, EepGet.StatusListener {
_tempFile = new File(_context.getTempDir(), TEMP_NEWS_FILE);
updateLastFetched();
_updateVersion = "";
_isRunning = true;
}
/** @since 0.8.8 */
void shutdown() {
_isRunning = false;
}
private void updateLastFetched() {
if (_newsFile.exists()) {
if (_lastUpdated == 0)
@ -108,7 +116,7 @@ public class NewsFetcher implements Runnable, EepGet.StatusListener {
public void run() {
try { Thread.sleep(INITIAL_DELAY + _context.random().nextLong(INITIAL_DELAY)); } catch (InterruptedException ie) {}
while (true) {
while (_isRunning) {
if (!_updateAvailable) checkForUpdates();
if (shouldFetchNews()) {
fetchNews();

View File

@ -342,10 +342,10 @@ public class RouterConsoleRunner {
}
NewsFetcher fetcher = NewsFetcher.getInstance(I2PAppContext.getGlobalContext());
Thread t = new I2PAppThread(fetcher, "NewsFetcher", true);
t.start();
Thread newsThread = new I2PAppThread(fetcher, "NewsFetcher", true);
newsThread.start();
t = new I2PAppThread(new StatSummarizer(), "StatSummarizer", true);
Thread t = new I2PAppThread(new StatSummarizer(), "StatSummarizer", true);
t.start();
List<RouterContext> contexts = RouterContext.listContexts();
@ -356,6 +356,9 @@ public class RouterConsoleRunner {
t.start();
ctx.addShutdownTask(new PluginStopper(ctx));
}
ctx.addShutdownTask(new NewsShutdown(fetcher, newsThread));
// stat summarizer registers its own hook
ctx.addShutdownTask(new ServerShutdown());
}
}
@ -495,16 +498,31 @@ public class RouterConsoleRunner {
}
}
/*******
public void stopConsole() {
try {
_server.stop();
} catch (InterruptedException ie) {
ie.printStackTrace();
/** @since 0.8.8 */
private class ServerShutdown implements Runnable {
public void run() {
try {
_server.stop();
} catch (InterruptedException ie) {}
}
}
********/
/** @since 0.8.8 */
private static class NewsShutdown implements Runnable {
private final NewsFetcher _fetcher;
private final Thread _newsThread;
public NewsShutdown(NewsFetcher fetcher, Thread t) {
_fetcher = fetcher;
_newsThread = t;
}
public void run() {
_fetcher.shutdown();
_newsThread.interrupt();
}
}
public static Properties webAppProperties() {
return webAppProperties(I2PAppContext.getGlobalContext().getConfigDir().getAbsolutePath());
}

View File

@ -2,6 +2,7 @@ package gnu.crypto.prng;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.I2PAppContext;
import net.i2p.util.Log;
@ -10,6 +11,12 @@ import net.i2p.util.Log;
* fortuna instance that tries to avoid blocking if at all possible by using separate
* filled buffer segments rather than one buffer (and blocking when that buffer's data
* has been eaten)
*
* Note that this class is not fully Thread safe!
* The following methods must be synchronized externally, they are not
* sycned here or in super():
* addRandomByte(), addRandomBytes(), nextByte(), nextBytes(), seed()
*
*/
public class AsyncFortunaStandalone extends FortunaStandalone implements Runnable {
/**
@ -19,25 +26,23 @@ public class AsyncFortunaStandalone extends FortunaStandalone implements Runnabl
private static final int DEFAULT_BUFFERS = 2;
private static final int DEFAULT_BUFSIZE = 256*1024;
private final int _bufferCount;
private final byte asyncBuffers[][];
private final int status[];
private int nextBuf = 0;
private final int _bufferSize;
/** the lock */
private final Object asyncBuffers = new Object();
private final I2PAppContext _context;
private final Log _log;
private volatile boolean _isRunning;
private Thread _refillThread;
private final LinkedBlockingQueue<AsyncBuffer> _fullBuffers;
private final LinkedBlockingQueue<AsyncBuffer> _emptyBuffers;
private AsyncBuffer _currentBuffer;
private static final int STATUS_NEED_FILL = 0;
private static final int STATUS_FILLING = 1;
private static final int STATUS_FILLED = 2;
private static final int STATUS_LIVE = 3;
public AsyncFortunaStandalone(I2PAppContext context) {
super();
_bufferCount = Math.max(context.getProperty("prng.buffers", DEFAULT_BUFFERS), 2);
int bufferSize = Math.max(context.getProperty("prng.bufferSize", DEFAULT_BUFSIZE), 16*1024);
asyncBuffers = new byte[_bufferCount][bufferSize];
status = new int[_bufferCount];
for (int i = 0; i < _bufferCount; i++)
status[i] = STATUS_NEED_FILL;
_bufferSize = Math.max(context.getProperty("prng.bufferSize", DEFAULT_BUFSIZE), 16*1024);
_emptyBuffers = new LinkedBlockingQueue(_bufferCount);
_fullBuffers = new LinkedBlockingQueue(_bufferCount);
_context = context;
context.statManager().createRequiredRateStat("prng.bufferWaitTime", "Delay for random number buffer (ms)", "Encryption", new long[] { 60*1000, 10*60*1000, 60*60*1000 } );
context.statManager().createRequiredRateStat("prng.bufferFillTime", "Time to fill random number buffer (ms)", "Encryption", new long[] { 60*1000, 10*60*1000, 60*60*1000 } );
@ -45,10 +50,27 @@ public class AsyncFortunaStandalone extends FortunaStandalone implements Runnabl
}
public void startup() {
Thread refillThread = new Thread(this, "PRNG");
refillThread.setDaemon(true);
refillThread.setPriority(Thread.MIN_PRIORITY+1);
refillThread.start();
for (int i = 0; i < _bufferCount; i++)
_emptyBuffers.offer(new AsyncBuffer(_bufferSize));
_isRunning = true;
_refillThread = new Thread(this, "PRNG");
_refillThread.setDaemon(true);
_refillThread.setPriority(Thread.MIN_PRIORITY+1);
_refillThread.start();
}
/**
* Note - methods may hang or NPE or throw IllegalStateExceptions after this
* @since 0.8.8
*/
public void shutdown() {
_isRunning = false;
_emptyBuffers.clear();
_fullBuffers.clear();
_refillThread.interrupt();
// unsynchronized to avoid hanging, may NPE elsewhere
_currentBuffer = null;
buffer = null;
}
/** the seed is only propogated once the prng is started with startup() */
@ -63,80 +85,67 @@ public class AsyncFortunaStandalone extends FortunaStandalone implements Runnabl
@Override
protected void allocBuffer() {}
private static class AsyncBuffer {
public final byte[] buffer;
public AsyncBuffer(int size) {
buffer = new byte[size];
}
}
/**
* make the next available filled buffer current, scheduling any unfilled
* buffers for refill, and blocking until at least one buffer is ready
*/
protected void rotateBuffer() {
synchronized (asyncBuffers) {
// wait until we get some filled
AsyncBuffer old = _currentBuffer;
if (old != null)
_emptyBuffers.offer(old);
long before = System.currentTimeMillis();
long waited = 0;
while (status[nextBuf] != STATUS_FILLED) {
//System.out.println(Thread.currentThread().getName() + ": Next PRNG buffer "
// + nextBuf + " isn't ready (" + status[nextBuf] + ")");
//new Exception("source").printStackTrace();
asyncBuffers.notifyAll();
AsyncBuffer nextBuffer = null;
while (nextBuffer == null) {
if (!_isRunning)
throw new IllegalStateException("shutdown");
try {
asyncBuffers.wait();
} catch (InterruptedException ie) {}
waited = System.currentTimeMillis()-before;
nextBuffer = _fullBuffers.take();
} catch (InterruptedException ie) {
continue;
}
}
long waited = System.currentTimeMillis()-before;
_context.statManager().addRateData("prng.bufferWaitTime", waited, 0);
if (waited > 10*1000 && _log.shouldLog(Log.WARN))
_log.warn(Thread.currentThread().getName() + ": Took " + waited
+ "ms for a full PRNG buffer to be found");
//System.out.println(Thread.currentThread().getName() + ": Switching to prng buffer " + nextBuf);
buffer = asyncBuffers[nextBuf];
status[nextBuf] = STATUS_LIVE;
int prev=nextBuf-1;
if (prev<0)
prev = _bufferCount-1;
if (status[prev] == STATUS_LIVE)
status[prev] = STATUS_NEED_FILL;
nextBuf++;
if (nextBuf >= _bufferCount)
nextBuf = 0;
asyncBuffers.notify();
_currentBuffer = nextBuffer;
buffer = nextBuffer.buffer;
}
}
/**
* The refiller thread
*/
public void run() {
while (true) {
int toFill = -1;
while (_isRunning) {
AsyncBuffer aBuff = null;
try {
synchronized (asyncBuffers) {
for (int i = 0; i < _bufferCount; i++) {
if (status[i] == STATUS_NEED_FILL) {
status[i] = STATUS_FILLING;
toFill = i;
break;
}
}
if (toFill == -1) {
//System.out.println(Thread.currentThread().getName() + ": All pending buffers full");
asyncBuffers.wait();
}
}
} catch (InterruptedException ie) {}
aBuff = _emptyBuffers.take();
} catch (InterruptedException ie) {
continue;
}
if (toFill != -1) {
//System.out.println(Thread.currentThread().getName() + ": Filling prng buffer " + toFill);
long before = System.currentTimeMillis();
doFill(asyncBuffers[toFill]);
doFill(aBuff.buffer);
long after = System.currentTimeMillis();
synchronized (asyncBuffers) {
status[toFill] = STATUS_FILLED;
//System.out.println(Thread.currentThread().getName() + ": Prng buffer " + toFill + " filled after " + (after-before));
asyncBuffers.notifyAll();
}
_fullBuffers.offer(aBuff);
_context.statManager().addRateData("prng.bufferFillTime", after - before, 0);
Thread.yield();
long waitTime = (after-before)*5;
if (waitTime <= 0) // somehow postman saw waitTime show up as negative
waitTime = 50;
try { Thread.sleep(waitTime); } catch (InterruptedException ie) {}
}
}
}

View File

@ -1,6 +1,7 @@
package net.i2p;
import java.io.File;
import java.util.Collections;
import java.util.HashSet;
import java.util.Properties;
import java.util.Random;
@ -100,7 +101,7 @@ public class I2PAppContext {
private volatile boolean _randomInitialized;
private volatile boolean _keyGeneratorInitialized;
protected volatile boolean _keyRingInitialized; // used in RouterContext
private Set<Runnable> _shutdownTasks;
protected final Set<Runnable> _shutdownTasks;
private File _baseDir;
private File _configDir;
private File _routerDir;
@ -114,6 +115,10 @@ public class I2PAppContext {
* Pull the default context, creating a new one if necessary, else using
* the first one created.
*
* Warning - do not save the returned value, or the value of any methods below,
* in a static field, or you will get the old context if a new router is
* started in the same JVM after the first is shut down,
* e.g. on Android.
*/
public static I2PAppContext getGlobalContext() {
// skip the global lock - _gAC must be volatile
@ -164,8 +169,12 @@ public class I2PAppContext {
private I2PAppContext(boolean doInit, Properties envProps) {
if (doInit) {
synchronized (I2PAppContext.class) {
if (_globalAppContext == null)
if (_globalAppContext == null) {
_globalAppContext = this;
} else {
System.out.println("Warning - New context not replacing old one, you now have a second one");
(new Exception("I did it")).printStackTrace();
}
}
}
_overrideProps = new I2PProperties();
@ -185,7 +194,7 @@ public class I2PAppContext {
_elGamalAESEngineInitialized = false;
_logManagerInitialized = false;
_keyRingInitialized = false;
_shutdownTasks = new ConcurrentHashSet(0);
_shutdownTasks = new ConcurrentHashSet(32);
initializeDirs();
}
@ -843,12 +852,24 @@ public class I2PAppContext {
}
}
/**
* WARNING - Shutdown tasks are not executed in an I2PAppContext.
* You must be in a RouterContext for the tasks to be executed
* at shutdown.
* This method moved from Router in 0.7.1 so that clients
* may use it without depending on router.jar.
* @since 0.7.1
*/
public void addShutdownTask(Runnable task) {
_shutdownTasks.add(task);
}
/**
* @return an unmodifiable Set
* @since 0.7.1
*/
public Set<Runnable> getShutdownTasks() {
return new HashSet(_shutdownTasks);
return Collections.unmodifiableSet(_shutdownTasks);
}
/**

View File

@ -47,13 +47,14 @@ import net.i2p.util.RandomSource;
* @author jrandom
*/
public class DHSessionKeyBuilder {
private static final I2PAppContext _context = I2PAppContext.getGlobalContext();
private static final Log _log;
private static I2PAppContext _context = I2PAppContext.getGlobalContext();
private static Log _log;
private static final int MIN_NUM_BUILDERS;
private static final int MAX_NUM_BUILDERS;
private static final int CALC_DELAY;
private static final LinkedBlockingQueue<DHSessionKeyBuilder> _builders;
private static final Thread _precalcThread;
private static Thread _precalcThread;
private static volatile boolean _isRunning;
// the data of importance
private BigInteger _myPrivateValue;
@ -96,14 +97,46 @@ public class DHSessionKeyBuilder {
if (_log.shouldLog(Log.DEBUG))
_log.debug("DH Precalc (minimum: " + MIN_NUM_BUILDERS + " max: " + MAX_NUM_BUILDERS + ", delay: "
+ CALC_DELAY + ")");
startPrecalc();
}
_precalcThread = new I2PThread(new DHSessionKeyBuilderPrecalcRunner(MIN_NUM_BUILDERS, MAX_NUM_BUILDERS));
_precalcThread.setName("DH Precalc");
_precalcThread.setDaemon(true);
/**
* Caller must synch on class
* @since 0.8.8
*/
private static void startPrecalc() {
_context = I2PAppContext.getGlobalContext();
_log = _context.logManager().getLog(DHSessionKeyBuilder.class);
_precalcThread = new I2PThread(new DHSessionKeyBuilderPrecalcRunner(MIN_NUM_BUILDERS, MAX_NUM_BUILDERS),
"DH Precalc", true);
_precalcThread.setPriority(Thread.MIN_PRIORITY);
_isRunning = true;
_precalcThread.start();
}
/**
* Note that this stops the singleton precalc thread.
* You don't want to do this if there are multiple routers in the JVM.
* Fix this if you care. See Router.shutdown().
* @since 0.8.8
*/
public static void shutdown() {
_isRunning = false;
_precalcThread.interrupt();
_builders.clear();
}
/**
* Only required if shutdown() previously called.
* @since 0.8.8
*/
public static void restart() {
synchronized(DHSessionKeyBuilder.class) {
if (!_isRunning)
startPrecalc();
}
}
/**
* Construct a new DH key builder
* or pulls a prebuilt one from the queue.
@ -475,7 +508,7 @@ public class DHSessionKeyBuilder {
}
public void run() {
while (true) {
while (_isRunning) {
int curSize = 0;
long start = System.currentTimeMillis();

View File

@ -78,6 +78,24 @@ public class ElGamalEngine {
}
/**
* Note that this stops the singleton precalc thread.
* You don't want to do this if there are multiple routers in the JVM.
* Fix this if you care. See Router.shutdown().
* @since 0.8.8
*/
public void shutdown() {
YKGenerator.shutdown();
}
/**
* Only required if shutdown() previously called.
* @since 0.8.8
*/
public static void restart() {
YKGenerator.restart();
}
private final static BigInteger _two = new NativeBigInteger(1, new byte[] { 0x02});
private BigInteger[] getNextYK() {

View File

@ -42,8 +42,9 @@ class YKGenerator {
private static final int MAX_NUM_BUILDERS;
private static final int CALC_DELAY;
private static final LinkedBlockingQueue<BigInteger[]> _values;
private static final Thread _precalcThread;
private static final I2PAppContext ctx;
private static Thread _precalcThread;
private static I2PAppContext ctx;
private static volatile boolean _isRunning;
public final static String PROP_YK_PRECALC_MIN = "crypto.yk.precalc.min";
public final static String PROP_YK_PRECALC_MAX = "crypto.yk.precalc.max";
@ -75,16 +76,47 @@ class YKGenerator {
// _log.debug("ElGamal YK Precalc (minimum: " + MIN_NUM_BUILDERS + " max: " + MAX_NUM_BUILDERS + ", delay: "
// + CALC_DELAY + ")");
startPrecalc();
}
/**
* Caller must synch on class
* @since 0.8.8
*/
private static void startPrecalc() {
ctx = I2PAppContext.getGlobalContext();
ctx.statManager().createRateStat("crypto.YKUsed", "Need a YK from the queue", "Encryption", new long[] { 60*60*1000 });
ctx.statManager().createRateStat("crypto.YKEmpty", "YK queue empty", "Encryption", new long[] { 60*60*1000 });
_precalcThread = new I2PThread(new YKPrecalcRunner(MIN_NUM_BUILDERS, MAX_NUM_BUILDERS));
_precalcThread.setName("YK Precalc");
_precalcThread.setDaemon(true);
_precalcThread = new I2PThread(new YKPrecalcRunner(MIN_NUM_BUILDERS, MAX_NUM_BUILDERS),
"YK Precalc", true);
_precalcThread.setPriority(Thread.MIN_PRIORITY);
_isRunning = true;
_precalcThread.start();
}
/**
* Note that this stops the singleton precalc thread.
* You don't want to do this if there are multiple routers in the JVM.
* Fix this if you care. See Router.shutdown().
* @since 0.8.8
*/
public static void shutdown() {
_isRunning = false;
_precalcThread.interrupt();
_values.clear();
}
/**
* Only required if shutdown() previously called.
* @since 0.8.8
*/
public static void restart() {
synchronized(YKGenerator.class) {
if (!_isRunning)
startPrecalc();
}
}
private static final int getSize() {
return _values.size();
}
@ -161,7 +193,7 @@ class YKGenerator {
}
public void run() {
while (true) {
while (_isRunning) {
int curSize = 0;
//long start = Clock.getInstance().now();
int startSize = getSize();
@ -172,7 +204,7 @@ class YKGenerator {
_checkDelay += 1000;
curSize = startSize;
if (curSize < _minSize) {
for (int i = curSize; i < _maxSize; i++) {
for (int i = curSize; i < _maxSize && _isRunning; i++) {
//long begin = Clock.getInstance().now();
if (!addValues(generateYK()))
break;

View File

@ -83,6 +83,18 @@ public class SDSCache<V extends SimpleDataStructure> {
_log.debug("New SDSCache for " + rvClass + " data size: " + len +
" max: " + size + " max mem: " + (len * size));
I2PAppContext.getGlobalContext().statManager().createRateStat(_statName, "Hit rate", "Router", new long[] { 10*60*1000 });
I2PAppContext.getGlobalContext().addShutdownTask(new Shutdown());
}
/**
* @since 0.8.8
*/
private class Shutdown implements Runnable {
public void run() {
synchronized(_cache) {
_cache.clear();
}
}
}
/**

View File

@ -29,6 +29,8 @@ public class Timestamper implements Runnable {
private boolean _daemon;
private boolean _initialized;
private boolean _wellSynced;
private volatile boolean _isRunning;
private Thread _timestamperThread;
private static final int MIN_QUERY_FREQUENCY = 5*60*1000;
private static final int DEFAULT_QUERY_FREQUENCY = 5*60*1000;
@ -106,10 +108,11 @@ public class Timestamper implements Runnable {
}
private void startTimestamper() {
I2PThread t = new I2PThread(this, "Timestamper");
t.setPriority(I2PThread.MIN_PRIORITY);
t.setDaemon(_daemon);
t.start();
_timestamperThread = new I2PThread(this, "Timestamper", _daemon);
_timestamperThread.setPriority(I2PThread.MIN_PRIORITY);
_isRunning = true;
_timestamperThread.start();
_context.addShutdownTask(new Shutdown());
}
public void waitForInitialization() {
@ -121,6 +124,15 @@ public class Timestamper implements Runnable {
} catch (InterruptedException ie) {}
}
/** @since 0.8.8 */
private class Shutdown implements Runnable {
public void run() {
_isRunning = false;
if (_timestamperThread != null)
_timestamperThread.interrupt();
}
}
public void run() {
try { Thread.sleep(1000); } catch (InterruptedException ie) {}
_log = _context.logManager().getLog(Timestamper.class);
@ -128,7 +140,7 @@ public class Timestamper implements Runnable {
_log.info("Starting timestamper");
boolean lastFailed = false;
try {
while (true) {
while (_isRunning) {
updateConfig();
if (!_disabled) {
// first the servers for our country, if we know what country we're in...

View File

@ -107,8 +107,18 @@ public class DecayingBloomFilter {
context.statManager().createRateStat("router.decayingBloomFilter." + name + ".log10(falsePos)",
"log10 of the false positive rate (must have net.i2p.util.DecayingBloomFilter=DEBUG)",
"Router", new long[] { Math.max(60*1000, durationMs) });
context.addShutdownTask(new Shutdown());
}
/**
* @since 0.8.8
*/
private class Shutdown implements Runnable {
public void run() {
clear();
}
}
public long getCurrentDuplicateCount() { return _currentDuplicates; }
public int getInsertedCount() {

View File

@ -93,8 +93,18 @@ public class DecayingHashSet extends DecayingBloomFilter {
"Size", "Router", new long[] { Math.max(60*1000, durationMs) });
context.statManager().createRateStat("router.decayingHashSet." + name + ".dups",
"1000000 * Duplicates/Size", "Router", new long[] { Math.max(60*1000, durationMs) });
context.addShutdownTask(new Shutdown());
}
/**
* @since 0.8.8
*/
private class Shutdown implements Runnable {
public void run() {
clear();
}
}
/** unsynchronized but only used for logging elsewhere */
@Override
public int getInsertedCount() {

View File

@ -55,6 +55,8 @@ public class EepGet {
protected long _bytesTransferred;
protected long _bytesRemaining;
protected int _currentAttempt;
protected int _responseCode = -1;
protected boolean _shouldWriteErrorToOutput;
protected String _etag;
protected String _lastModified;
protected boolean _encodingChunked;
@ -245,7 +247,14 @@ public class EepGet {
public void transferComplete(long alreadyTransferred, long bytesTransferred, long bytesRemaining, String url, String outputFile, boolean notModified);
public void attemptFailed(String url, long bytesTransferred, long bytesRemaining, int currentAttempt, int numRetries, Exception cause);
public void transferFailed(String url, long bytesTransferred, long bytesRemaining, int currentAttempt);
/**
* Note: Headers are not processed, and this is not called, for most error response codes,
* unless setWriteErrorToOutput() is called before fetch().
* To be changed?
*/
public void headerReceived(String url, int currentAttempt, String key, String val);
public void attempting(String url);
}
protected class CLIStatusListener implements StatusListener {
@ -560,7 +569,7 @@ public class EepGet {
throw new IOException("HTTP response size " + _bytesRemaining + " violates maximum of " + _maxSize + " bytes");
int remaining = (int)_bytesRemaining;
byte buf[] = new byte[1024];
byte buf[] = new byte[8*1024];
while (_keepFetching && ( (remaining > 0) || !strictSize ) && !_aborted) {
int toRead = buf.length;
if (strictSize && toRead > remaining)
@ -654,13 +663,13 @@ public class EepGet {
boolean read = DataHelper.readLine(_proxyIn, buf);
if (!read) throw new IOException("Unable to read the first line");
int responseCode = handleStatus(buf.toString());
_responseCode = handleStatus(buf.toString());
boolean redirect = false;
if (_log.shouldLog(Log.DEBUG))
_log.debug("rc: " + responseCode + " for " + _actualURL);
_log.debug("rc: " + _responseCode + " for " + _actualURL);
boolean rcOk = false;
switch (responseCode) {
switch (_responseCode) {
case 200: // full
if (_outputStream != null)
_out = _outputStream;
@ -693,20 +702,51 @@ public class EepGet {
case 404: // not found
case 409: // bad addr helper
case 503: // no outproxy
_keepFetching = false;
_transferFailed = true;
// maybe we should throw instead of return to get the return code back to the user
return;
if (_alreadyTransferred == 0 && !_shouldWriteErrorToOutput) {
_keepFetching = false;
return;
}
// output the error data to the stream
rcOk = true;
if (_out == null) {
if (_outputStream != null)
_out = _outputStream;
else
_out = new FileOutputStream(_outputFile, true);
}
break;
case 416: // completed (or range out of reach)
_bytesRemaining = 0;
_keepFetching = false;
return;
if (_alreadyTransferred == 0 && !_shouldWriteErrorToOutput) {
_keepFetching = false;
return;
}
// output the error data to the stream
rcOk = true;
if (_out == null) {
if (_outputStream != null)
_out = _outputStream;
else
_out = new FileOutputStream(_outputFile, true);
}
break;
case 504: // gateway timeout
// throw out of doFetch() to fetch() and try again
throw new IOException("HTTP Proxy timeout");
default:
rcOk = false;
_keepFetching = false;
if (_alreadyTransferred == 0 && !_shouldWriteErrorToOutput) {
_keepFetching = false;
} else {
// output the error data to the stream
rcOk = true;
if (_out == null) {
if (_outputStream != null)
_out = _outputStream;
else
_out = new FileOutputStream(_outputFile, true);
}
}
_transferFailed = true;
}
@ -742,7 +782,7 @@ public class EepGet {
increment(lookahead, cur);
if (isEndOfHeaders(lookahead)) {
if (!rcOk)
throw new IOException("Invalid HTTP response code: " + responseCode);
throw new IOException("Invalid HTTP response code: " + _responseCode);
if (_encodingChunked) {
_bytesRemaining = readChunkLength();
}
@ -842,7 +882,7 @@ public class EepGet {
if (val.indexOf("chunked") != -1)
_encodingChunked = true;
} else if (key.equalsIgnoreCase("Content-Type")) {
_contentType=val;
_contentType=val.trim();
} else if (key.equalsIgnoreCase("Location")) {
_redirectLocation=val.trim();
} else {
@ -991,4 +1031,34 @@ public class EepGet {
public String getContentType() {
return _contentType;
}
/**
* The server response (200, etc).
* @return -1 if invalid, or if the proxy never responded,
* or if no proxy was used and the server never responded.
* If a non-proxied request partially succeeded (for example a redirect followed
* by a fail, or a partial fetch followed by a fail), this will
* be the last status code received.
* Note that fetch() may return false even if this returns 200.
*
* @since 0.8.8
*/
public int getStatusCode() {
return _responseCode;
}
/**
* If called (before calling fetch()),
* data from the server or proxy will be written to the
* output file or stream even on an error response code (4xx, 5xx, etc).
* The error data will only be written if no previous data was written
* on an earlier try.
* Caller must of course check getStatusCode() or the
* fetch() return value.
*
* @since 0.8.8
*/
public void setWriteErrorToOutput() {
_shouldWriteErrorToOutput = true;
}
}

View File

@ -44,6 +44,14 @@ public class FortunaRandomSource extends RandomSource implements EntropyHarveste
_haveNextGaussian = false;
}
/**
* Note - methods may hang or NPE or throw IllegalStateExceptions after this
* @since 0.8.8
*/
public void shutdown() {
_fortuna.shutdown();
}
@Override
public synchronized void setSeed(byte buf[]) {
_fortuna.addRandomBytes(buf);

View File

@ -20,7 +20,11 @@ import java.util.concurrent.CopyOnWriteArraySet;
*
*/
public class I2PThread extends Thread {
private static volatile Log _log;
/**
* Non-static to avoid refs to old context in Android.
* Probably should just remove all the logging though.
*/
private volatile Log _log;
private static final Set _listeners = new CopyOnWriteArraySet();
private String _name;
private Exception _createdBy;
@ -61,8 +65,9 @@ public class I2PThread extends Thread {
_createdBy = new Exception("Created by");
}
private static void log(int level, String msg) { log(level, msg, null); }
private static void log(int level, String msg, Throwable t) {
private void log(int level, String msg) { log(level, msg, null); }
private void log(int level, String msg, Throwable t) {
// we cant assume log is created
if (_log == null) _log = new Log(I2PThread.class);
if (_log.shouldLog(level))
@ -85,7 +90,9 @@ public class I2PThread extends Thread {
if (t instanceof OutOfMemoryError)
fireOOM((OutOfMemoryError)t);
}
log(Log.INFO, "Thread finished normally: " + _name);
// This creates a new I2PAppContext after it was deleted
// in Router.finalShutdown() via RouterContext.killGlobalContext()
//log(Log.INFO, "Thread finished normally: " + _name);
}
@Override

View File

@ -2,61 +2,79 @@ package net.i2p.util;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.I2PAppContext;
/**
* Offer a glimpse into the last few console messages generated
*
* Offer a glimpse into the last few console messages generated.
* Maintains two buffers, one normal and one critical.
*/
public class LogConsoleBuffer {
private I2PAppContext _context;
private final List<String> _buffer;
private final List<String> _critBuffer;
private final int lim;
private final LinkedBlockingQueue<String> _buffer;
private final LinkedBlockingQueue<String> _critBuffer;
/**
* Uses default limit from LogManager.
* As of 0.8.8, limit is not checked at runtime.
*
* @param context unused
*/
public LogConsoleBuffer(I2PAppContext context) {
_context = context;
_buffer = new ArrayList();
_critBuffer = new ArrayList();
this(LogManager.DEFAULT_CONSOLEBUFFERSIZE);
}
/**
* @param limit max size of each buffer
* In theory the limit is configurable, but it isn't in the UI,
* so set it at construction.
*
* @since 0.8.8
*/
public LogConsoleBuffer(int limit) {
lim = Math.max(limit, 4);
// Add some extra room to minimize the chance of losing a message,
// since we are doing offer() below.
_buffer = new LinkedBlockingQueue(limit + 4);
_critBuffer = new LinkedBlockingQueue(limit + 4);
}
void add(String msg) {
int lim = _context.logManager().getConsoleBufferSize();
synchronized (_buffer) {
while (_buffer.size() >= lim)
_buffer.remove(0);
_buffer.add(msg);
}
}
void addCritical(String msg) {
int lim = _context.logManager().getConsoleBufferSize();
synchronized (_critBuffer) {
while (_critBuffer.size() >= lim)
_critBuffer.remove(0);
_critBuffer.add(msg);
}
_buffer.poll();
_buffer.offer(msg);
}
/**
* Retrieve the currently bufferd messages, earlier values were generated...
* Only adds to the critical buffer, not to both.
*
*/
void addCritical(String msg) {
while (_critBuffer.size() >= lim)
_critBuffer.poll();
_critBuffer.offer(msg);
}
/**
* Retrieve the currently buffered messages, earlier values were generated...
* earlier. All values are strings with no formatting (as they are written
* in the logs)
*
* @return oldest first
*/
public List<String> getMostRecentMessages() {
synchronized (_buffer) {
return new ArrayList(_buffer);
}
}
/**
* Retrieve the currently bufferd crutucak messages, earlier values were generated...
* Retrieve the currently buffered critical messages, earlier values were generated...
* earlier. All values are strings with no formatting (as they are written
* in the logs)
*
* @return oldest first
*/
public List<String> getMostRecentCriticalMessages() {
synchronized (_critBuffer) {
return new ArrayList(_critBuffer);
}
}
}

View File

@ -129,7 +129,7 @@ public class LogManager {
_log = getLog(LogManager.class);
String location = context.getProperty(CONFIG_LOCATION_PROP, CONFIG_LOCATION_DEFAULT);
setConfig(location);
_consoleBuffer = new LogConsoleBuffer(context);
_consoleBuffer = new LogConsoleBuffer(_consoleBufferSize);
// If we aren't in the router context, delay creating the LogWriter until required,
// so it doesn't create a log directory and log files unless there is output.
// In the router context, we have to rotate to a new log file at startup or the logs.jsp
@ -656,6 +656,9 @@ public class LogManager {
// this could generate out-of-order messages
_writer.flushRecords(false);
_writer.stopWriting();
synchronized (_writer) {
_writer.notifyAll();
}
}
}

View File

@ -36,7 +36,7 @@ class LogWriter implements Runnable {
private File _currentFile;
private final LogManager _manager;
private boolean _write;
private volatile boolean _write;
private static final int MAX_DISKFULL_MESSAGES = 8;
private int _diskFullMessageCount;
@ -55,7 +55,8 @@ class LogWriter implements Runnable {
rotateFile();
while (_write) {
flushRecords();
rereadConfig();
if (_write)
rereadConfig();
}
//System.err.println("Done writing");
} catch (Exception e) {

View File

@ -28,6 +28,12 @@ public class RandomSource extends SecureRandom implements EntropyHarvester {
private final EntropyHarvester _entropyHarvester;
protected final I2PAppContext _context;
/**
* Deprecated - do not instantiate this directly, as you won't get the
* good one (Fortuna). Use getInstance() or
* I2PAppContext.getGlobalContext().random() to get the FortunaRandomSource
* instance.
*/
public RandomSource(I2PAppContext context) {
super();
_context = context;
@ -202,10 +208,4 @@ public class RandomSource extends SecureRandom implements EntropyHarvester {
rs.saveSeed();
}
}
// noop
private static class DummyEntropyHarvester implements EntropyHarvester {
public void feedEntropy(String source, long data, int bitoffset, int bits) {}
public void feedEntropy(String source, byte[] data, int offset, int len) {}
}
}

View File

@ -2,6 +2,7 @@ package net.i2p.util;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ThreadFactory;
@ -48,12 +49,25 @@ public class SimpleScheduler {
_threads = (int) Math.max(MIN_THREADS, Math.min(MAX_THREADS, 1 + (maxMemory / (32*1024*1024))));
_executor = new ScheduledThreadPoolExecutor(_threads, new CustomThreadFactory());
_executor.prestartAllCoreThreads();
// don't bother saving ref to remove hook if somebody else calls stop
_context.addShutdownTask(new Shutdown());
}
/**
* Removes the SimpleScheduler.
* @since 0.8.8
*/
private class Shutdown implements Runnable {
public void run() {
stop();
}
}
/**
* Stops the SimpleScheduler.
* Subsequent executions should not throw a RejectedExecutionException.
*/
public void stop() {
_executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
_executor.shutdownNow();
}

View File

@ -53,16 +53,32 @@ public class SimpleTimer {
executor.setDaemon(true);
executor.start();
}
_context.addShutdownTask(new Shutdown());
}
/**
* @since 0.8.8
*/
private class Shutdown implements Runnable {
public void run() {
removeSimpleTimer();
}
}
/**
* Removes the SimpleTimer.
*/
public void removeSimpleTimer() {
synchronized(_events) {
runn.setAnswer(false);
_events.clear();
_eventTimes.clear();
_events.notifyAll();
}
synchronized (_readyEvents) {
_readyEvents.clear();
_readyEvents.notifyAll();
}
}
/**

View File

@ -3,6 +3,7 @@ package net.i2p.util;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ThreadFactory;
@ -48,12 +49,25 @@ public class SimpleTimer2 {
_threads = (int) Math.max(MIN_THREADS, Math.min(MAX_THREADS, 1 + (maxMemory / (32*1024*1024))));
_executor = new CustomScheduledThreadPoolExecutor(_threads, new CustomThreadFactory());
_executor.prestartAllCoreThreads();
// don't bother saving ref to remove hook if somebody else calls stop
_context.addShutdownTask(new Shutdown());
}
/**
* Removes the SimpleTimer.
* @since 0.8.8
*/
private class Shutdown implements Runnable {
public void run() {
stop();
}
}
/**
* Stops the SimpleTimer.
* Subsequent executions should not throw a RejectedExecutionException.
*/
public void stop() {
_executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
_executor.shutdownNow();
}

View File

@ -266,7 +266,7 @@ lp.i2p=9GCeBnLXaOgCDUrkzxaV3MxB~k4M7AM~-5YROP51QyzRQ~MohC7P20vkfJrIa7CU~5ZSpYLex
amazone.i2p=KloqmDOtO35jYgefz4gmjyaDxSviLGY0A2ZeQyvDKuhVD1BaROvBTLMp2Z2rcC~hVFJkus1UVSFT0fDwQV~is8uYpcfrKal~EUTbV7uWOelwSJFEJBfoIfJ-yj7hfd6kC4Hb0tMudMLnaRmLGVZJ8o2FGTkNmvItZMPXlc6E8DjfxgegvYcdx-BYDaOvJUyD6322xaRyVDJmQc4OjvCgw8fzS-m-3V1eHkfx4UVeoxKy~tuckKXLd7VVVXNijoHG2PHNVXinfMwopUYzPyqsw9n9ucqLZdskXgm9O0BAuCJVJNCVmBQNBzRt8bX-NCypmISU7MN5YYVPumh-duuTPEQsZcks5JixtWlkIfzON~TZ5PaDlUPXvLgdAyKfWbNXBCR9nfE73KkpienPjn4-XAYupkRm1mG80IiojtViGjFAWnyP-awHQGPj0fURPvQvPu~RVTpKcI6x0QaUdd389N-QYjnBA6dJCRx~JrHjUP32GmWM4qNNLwUumo3za6DxAAAA
inproxy.tino.i2p=gNVJ0hpqZafnYkh7KFVHcvHuE~DeeOPZ45T1EFc8ARtd41s4dzKKaADtCtAmQIgQ8UYaiXA1l9eahcGP2rfJ4y3Ap4n3t-kext4UpCjzfwdI-u6s824nUb~ZaNfJIlqhu4sjmN0CK87BwKWU4-fYv3bM7mDDPWFT70ret81nN-SA3dQleKa65EK1T6EjKvAWeWkMhD9KnlsSMCc1OHwKX9Z8rM0Q8uXSYCLiY8VfQDPwOINvpTFUnuToo8GZ3KLQlECIZbE9OVJZ-0ZZEny1Muq~J~mQydFQnnCHB3hvPxjuv0rtO25zsBrAy4oFBrGgaFqy81gfTGDKhLwjaJ4hyYZwjBUNK5K-XLRO6ev03iOcVMvfuEdnCb~3uEUxEGYZTcAFJMUhSswo9CquLe0BodHm-biyki0kPxf~vE-yiCj0OPsUSD6frvqSqpksmAlasBhgQXR9hCJ37qJF1tBT2GaoHlHVhHzMqdNDEUHk-R1WRSeTghRDpwhQpCKb8VThAAAA
kohaar.i2p=2qYXoTui18BY9KtzFiMsdw6-8-rZbeSoEMkVkFzYCP-ztPWp4p7-d-6d9Yjkjo647iqTA2DYcQqIZfI8V9gG7Q0NwpC5AHbbBz~117YPLcm1uf13m7nMiXKy66qzsTC~AzIazw4EUCuCTyubr~SzYrcX3M5c0ccl~ltZrnr233Y5B4zt5-6tkckXYLuOJVXfNhRLOAI-EQ~KGP~MxSWiuItDQW7DFo9-zEzN8J0sdiIHW6XcELDWth02PbAGOyi6OlJFfj53oF5MHPLMnR~o50mvu0wWtlZXR7bOcaFonfcfHdoV-m5Ilj9H5tNBdspg7Gimx3HBW8BoUXkxWoJu403mLHNXOhG2Zw1uK9bx2GJdrkMgvFKyQn9iq2USensMrb7Wf2LgFzc0lI5BsR2BTTp~cB~u1HHhXKlVYSxYKPxpjls6-n7bynIe9NSE-ToAvVOJ7ygW8sKJWxNu3tA-8ZSQ1kB4IRpzT901Nng4lAq4aMYVw8l2Wvo1SgALqUNJAAAA
TheBreton.i2p=hYx0~K9CvPjbRwzPG~DCONlB3TwOiMGc47o5FoMAFUOoA7c3uMPuXS--1OXP3J7VbcfyRkWpQjmLhRXjeYD1HuCCJ5P8uuQ7w3RhTLActjkUkoF9yE9xH-3chudIbyZmlXRgNQaVxLkueK3hFFNODAFryITAtdH51RHl-CvBt7oQHdexL6TySL9YqlMIRHwjTCDv-mLngXHhltesU-RTwiJyJbmcNx8aq2RHcW0AdNve0nRCgNOt6k9lZ2RW3llmE42RVJiKIa9OYCNUMU7BNyklpBbfDxpoEPMO74aMqsNBhQmXXsZfNYfGAEWWvnYV~FMD40xJ~6bnlUTJj0AF-njN7mVOO57ne~l5wm-2Ltke1tomI9z-o4IhylIhzIJQYXVbNZezxf54Z5x~ydTWrvZE1KLDy3aAw8ODYByN00GRHLFuRJYYDRVUT2DcCk90wqySRUAV8fEO8EPe4Bx1KJh4yFJ6lgWfj~1hvDj4DZIDjsghonvKohTMZ1j3Pdi-AAAA
thebreton.i2p=hYx0~K9CvPjbRwzPG~DCONlB3TwOiMGc47o5FoMAFUOoA7c3uMPuXS--1OXP3J7VbcfyRkWpQjmLhRXjeYD1HuCCJ5P8uuQ7w3RhTLActjkUkoF9yE9xH-3chudIbyZmlXRgNQaVxLkueK3hFFNODAFryITAtdH51RHl-CvBt7oQHdexL6TySL9YqlMIRHwjTCDv-mLngXHhltesU-RTwiJyJbmcNx8aq2RHcW0AdNve0nRCgNOt6k9lZ2RW3llmE42RVJiKIa9OYCNUMU7BNyklpBbfDxpoEPMO74aMqsNBhQmXXsZfNYfGAEWWvnYV~FMD40xJ~6bnlUTJj0AF-njN7mVOO57ne~l5wm-2Ltke1tomI9z-o4IhylIhzIJQYXVbNZezxf54Z5x~ydTWrvZE1KLDy3aAw8ODYByN00GRHLFuRJYYDRVUT2DcCk90wqySRUAV8fEO8EPe4Bx1KJh4yFJ6lgWfj~1hvDj4DZIDjsghonvKohTMZ1j3Pdi-AAAA
adab.i2p=3Z9v5Fx92js68wHTweMVAdqGlIieQRH7VtUiF~~jGLpf6P62ohcEqVaiCnLyfcWjXrGwz-uX-CvTTM5rwzfqciuqrXcG-asRF751TQAs1ncj-GHq5W0C-uRBtfaa5NPpfKsxAGlk4ILNvnUIgZrbwkNuDUulmaa-FwxjXPCcVYlzuUlGE9JYikk0nKgeTgm2ALBzLNH~EyM4CjYoIhtBGNPThqBckIR24JDo~zYB9plqpTvBIlA5iuskv0~3siXQW6eqnU9y3gIMy10vJYPY7rpbrR6YpAu2NoCxFCKsghpSqYXGpy19~1ymFtJMGDTj7uzz2OW9lvj8gtCp-4To7A6A-P2HNH0FpAANZmsX9sRaqqjSsp3kMuqg7T2XfB-Z29XkLtiTcl-rfr3xZdyznDg7n4mVW2DOeyoCthVTbf3dM2d9PmANLWS0Iy2KUDAIt8Y0RqqSB1CORZztsoE4sh7u-CjcE24mmbcVG6MHyhsK4PLDYJADquDDCgD5boolAAAA
awup.i2p=y1UIT5rcMxuEyY9xQvXWOvg50sZnS4wjWqdeuKJCajMy2r9sLmL9KBVpJbTyO-QGOk5IHpyGSjEzWxQQOvFBPMtVllmaMaaHfZjrZ9h5D1iIqiz-DogfBDSjNetHhSRHEOxG1xF6Xq1ViWd14hKwTypfQTq8c~40tDChSdYHKHMJN8OWoTnicVw1AINu3aib6d6sAl5F9CKu1i949UFsVpoZw3LfiXcxjU0r7zQPIAltzezBsqJmhy-HbgyMvOKJb6MiGQ3niu5bM1J7TFz2uRKLxwM0QOblS3QOHzRXNFziPSinS5PkBIM2gDEyeZVlMDWBwVmRbG06mO5uVx~64Xc1jyjRvdvnzqITZRY389028A5HteLps2ge1fKYYINCxE5k4rq2BLXd-Utd5VArlcBDTTZVhapgaF8dE4xS3XLRF9GNY2VUTusElN6-dvhrOUNG80edhyBksUY0C5SszaBiWVBlb~WEZ0pTwdjCDf7B7sEsZVhRgqm~k3e62SNFAAAA
china.i2p=Dt1dfJgR0VzvhQyfoXhS1SQ6Rq4I5KwDOndFATTcxRmVejxsHpTa2IaU6ebPr-hMxnxin314qj2V5I-Ew9nfnJkI~HgEzwTD9pLiDjDvLx7UG1vTOu-hyQ2buL4Q3lun~YmdDxhwecU1FBwkrxoNDYY1hsbiAmFUzigUZ70iPSpRj7YPRs-O47fOV7Zz--tJXeBa76BUvguWwATYu7bbteaAXAChIAN2VHQLV6qnQEUKayakMYUX9xd~XQv6S9LXrZ~IQvF6JLqmQZoTGAllHkM6BWisu7WKQ2~uKGFgseXlimmu3gJtwdyk4LrMK-xANGxp29IbNzwL4xcpDiagpnVjEzfRDFzjJoOXWn8dcFwvSyEEPaFfGJu2U~H~kh2~t2Dz7OZ2UuXF4NMdZP5sIxCi1IsStggk6EkcQIffMirgbXDFJbQujc-3uHiOX9E0AkPAWamHr6stCJa9pinlPqSmgGLuh6HrLOpYA6R54hOYMB~jBUWg-DIzN9q9g~D9AAAA

View File

@ -22,8 +22,8 @@ import net.i2p.util.Log;
*
*/
public class I2NPMessageHandler {
private Log _log;
private I2PAppContext _context;
private final Log _log;
private final I2PAppContext _context;
private long _lastReadBegin;
private long _lastReadEnd;
private int _lastSize;

View File

@ -28,8 +28,8 @@ import net.i2p.util.SimpleByteCache;
* @author jrandom
*/
public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPMessage {
private Log _log;
protected I2PAppContext _context;
private final Log _log;
protected final I2PAppContext _context;
private long _expiration;
private long _uniqueId;

View File

@ -280,14 +280,22 @@ public class JobQueue {
void shutdown() {
_alive = false;
_timedJobs.clear();
_readyJobs.clear();
synchronized (_jobLock) {
_timedJobs.clear();
_readyJobs.clear();
_jobLock.notifyAll();
}
// The JobQueueRunners are NOT daemons,
// so they must be stopped.
Job poison = new PoisonJob();
for (int i = 0; i < _queueRunners.size(); i++)
for (JobQueueRunner runner : _queueRunners.values()) {
runner.stopRunning();
_readyJobs.offer(poison);
// TODO interrupt thread for each runner
}
_queueRunners.clear();
_jobStats.clear();
_runnerId = 0;
/********
if (_log.shouldLog(Log.WARN)) {

View File

@ -45,10 +45,12 @@ import net.i2p.stat.RateStat;
import net.i2p.stat.StatManager;
import net.i2p.util.ByteCache;
import net.i2p.util.FileUtil;
import net.i2p.util.FortunaRandomSource;
import net.i2p.util.I2PAppThread;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
import net.i2p.util.SecureFileOutputStream;
import net.i2p.util.SimpleByteCache;
import net.i2p.util.SimpleScheduler;
import net.i2p.util.SimpleTimer;
@ -73,6 +75,8 @@ public class Router {
private I2PThread.OOMEventListener _oomListener;
private ShutdownHook _shutdownHook;
private final I2PThread _gracefulShutdownDetector;
private final RouterWatchdog _watchdog;
private final Thread _watchdogThread;
public final static String PROP_CONFIG_FILE = "router.configLocation";
@ -187,6 +191,19 @@ public class Router {
// Save this in the context for the logger and apps that need it
envProps.setProperty("i2p.systemTimeZone", originalTimeZoneID);
// Make darn sure we don't have a leftover I2PAppContext in the same JVM
// e.g. on Android - see finalShutdown() also
List<RouterContext> contexts = RouterContext.getContexts();
if (contexts.isEmpty()) {
RouterContext.killGlobalContext();
} else if (System.getProperty("java.vendor").contains("Android")) {
System.err.println("Warning: Killing " + contexts.size() + " other routers in this JVM");
contexts.clear();
RouterContext.killGlobalContext();
} else {
System.err.println("Warning: " + contexts.size() + " other routers in this JVM");
}
// The important thing that happens here is the directory paths are set and created
// i2p.dir.router defaults to i2p.dir.config
// i2p.dir.app defaults to i2p.dir.router
@ -257,7 +274,7 @@ public class Router {
_killVMOnEnd = true;
_oomListener = new I2PThread.OOMEventListener() {
public void outOfMemory(OutOfMemoryError oom) {
ByteCache.clearAll();
clearCaches();
_log.log(Log.CRIT, "Thread ran out of memory", oom);
for (int i = 0; i < 5; i++) { // try this 5 times, in case it OOMs
try {
@ -275,11 +292,18 @@ public class Router {
_gracefulShutdownDetector = new I2PAppThread(new GracefulShutdown(), "Graceful shutdown hook", true);
_gracefulShutdownDetector.start();
Thread watchdog = new I2PAppThread(new RouterWatchdog(_context), "RouterWatchdog", true);
watchdog.start();
_watchdog = new RouterWatchdog(_context);
_watchdogThread = new I2PAppThread(_watchdog, "RouterWatchdog", true);
_watchdogThread.start();
}
/** @since 0.8.8 */
private static final void clearCaches() {
ByteCache.clearAll();
SimpleByteCache.clearAll();
}
/**
* Configure the router to kill the JVM when the router shuts down, as well
* as whether to explicitly halt the JVM during the hard fail process.
@ -616,12 +640,15 @@ public class Router {
public void rebuildNewIdentity() {
killKeys();
for (Runnable task : _context.getShutdownTasks()) {
if (_log.shouldLog(Log.WARN))
_log.warn("Running shutdown task " + task.getClass());
try {
task.run();
} catch (Throwable t) {
_log.log(Log.CRIT, "Error running shutdown task", t);
}
}
_context.removeShutdownTasks();
// hard and ugly
if (System.getProperty("wrapper.version") != null)
_log.log(Log.CRIT, "Restarting with new router identity");
@ -632,7 +659,10 @@ public class Router {
private void warmupCrypto() {
_context.random().nextBoolean();
new DHSessionKeyBuilder(); // load the class so it starts the precalc process
// Use restart() to refire the static refiller threads, in case
// we are restarting the router in the same JVM (Android)
DHSessionKeyBuilder.restart();
_context.elGamalEngine().restart();
}
private void startupQueue() {
@ -938,12 +968,15 @@ public class Router {
// Run the shutdown hooks first in case they want to send some goodbye messages
// Maybe we need a delay after this too?
for (Runnable task : _context.getShutdownTasks()) {
if (_log.shouldLog(Log.WARN))
_log.warn("Running shutdown task " + task.getClass());
try {
task.run();
} catch (Throwable t) {
_log.log(Log.CRIT, "Error running shutdown task", t);
}
}
_context.removeShutdownTasks();
try { _context.clientManager().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the client manager", t); }
try { _context.namingService().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the naming service", t); }
try { _context.jobQueue().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the job queue", t); }
@ -953,13 +986,37 @@ public class Router {
try { _context.tunnelDispatcher().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the tunnel dispatcher", t); }
try { _context.netDb().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the networkDb", t); }
try { _context.commSystem().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the comm system", t); }
try { _context.bandwidthLimiter().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the comm system", t); }
try { _context.peerManager().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the peer manager", t); }
try { _context.messageRegistry().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the message registry", t); }
try { _context.messageValidator().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the message validator", t); }
try { _context.inNetMessagePool().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the inbound net pool", t); }
//try { _sessionKeyPersistenceHelper.shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the session key manager", t); }
_context.deleteTempDir();
RouterContext.listContexts().remove(_context);
List<RouterContext> contexts = RouterContext.getContexts();
contexts.remove(_context);
// shut down I2PAppContext tasks here
// If there are multiple routers in the JVM, we don't want to do this
// to the DH or YK tasks, as they are singletons.
if (contexts.isEmpty()) {
try {
DHSessionKeyBuilder.shutdown();
} catch (Throwable t) { _log.log(Log.CRIT, "Error shutting DH", t); }
try {
_context.elGamalEngine().shutdown();
} catch (Throwable t) { _log.log(Log.CRIT, "Error shutting elGamal", t); }
} else {
_log.logAlways(Log.WARN, "Warning - " + contexts.size() + " routers remaining in this JVM, not releasing all resources");
}
try {
((FortunaRandomSource)_context.random()).shutdown();
} catch (Throwable t) { _log.log(Log.CRIT, "Error shutting random()", t); }
// logManager shut down in finalShutdown()
_watchdog.shutdown();
_watchdogThread.interrupt();
finalShutdown(exitCode);
}
@ -970,6 +1027,7 @@ public class Router {
private static final boolean ALLOW_DYNAMIC_KEYS = false;
private void finalShutdown(int exitCode) {
clearCaches();
_log.log(Log.CRIT, "Shutdown(" + exitCode + ") complete" /* , new Exception("Shutdown") */ );
try { _context.logManager().shutdown(); } catch (Throwable t) { }
if (ALLOW_DYNAMIC_KEYS) {
@ -979,6 +1037,20 @@ public class Router {
File f = getPingFile();
f.delete();
if (RouterContext.getContexts().isEmpty())
RouterContext.killGlobalContext();
// Since 0.8.8, mainly for Android
for (Runnable task : _context.getFinalShutdownTasks()) {
System.err.println("Running final shutdown task " + task.getClass());
try {
task.run();
} catch (Throwable t) {
System.err.println("Running final shutdown task " + t);
}
}
_context.getFinalShutdownTasks().clear();
if (_killVMOnEnd) {
try { Thread.sleep(1000); } catch (InterruptedException ie) {}
Runtime.getRuntime().halt(exitCode);
@ -1541,7 +1613,7 @@ private static class CoalesceStatsEvent implements SimpleTimer.TimedEvent {
long used = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
getContext().statManager().addRateData("router.memoryUsed", used, 0);
if (_maxMemory - used < LOW_MEMORY_THRESHOLD)
ByteCache.clearAll();
clearCaches();
getContext().tunnelDispatcher().updateParticipatingStats(COALESCE_TIME);

View File

@ -1,8 +1,11 @@
package net.i2p.router;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import net.i2p.I2PAppContext;
import net.i2p.data.Hash;
@ -55,6 +58,7 @@ public class RouterContext extends I2PAppContext {
private MessageValidator _messageValidator;
private MessageStateMonitor _messageStateMonitor;
private RouterThrottle _throttle;
private final Set<Runnable> _finalShutdownTasks;
private static List<RouterContext> _contexts = new ArrayList(1);
@ -67,7 +71,10 @@ public class RouterContext extends I2PAppContext {
// to init everything. Caller MUST call initAll() afterwards.
// Sorry, this breaks some main() unit tests out there.
//initAll();
if (!_contexts.isEmpty())
System.err.println("Warning - More than one router in this JVM");
_contexts.add(this);
_finalShutdownTasks = new CopyOnWriteArraySet();
}
/**
@ -165,11 +172,37 @@ public class RouterContext extends I2PAppContext {
/**
* Retrieve the list of router contexts currently instantiated in this JVM.
* This will always contain only one item (except when a simulation per the
* MultiRouter is going on), and the list should only be modified when a new
* MultiRouter is going on).
*
* @return an unmodifiable list (as of 0.8.8). May be null or empty.
*/
public static List<RouterContext> listContexts() {
return Collections.unmodifiableList(_contexts);
}
/**
* Same as listContexts() but package private and modifiable.
* The list should only be modified when a new
* context is created or a router is shut down.
*
* @since 0.8.8
*/
public static List<RouterContext> listContexts() { return _contexts; }
static List<RouterContext> getContexts() {
return _contexts;
}
/**
* Kill the global I2PAppContext, so it isn't still around
* when we restart in the same JVM (Android).
* Only do this if there are no other routers in the JVM.
*
* @since 0.8.8
*/
static void killGlobalContext() {
synchronized (I2PAppContext.class) {
_globalAppContext = null;
}
}
/** what router is this context working for? */
public Router router() { return _router; }
@ -402,6 +435,32 @@ public class RouterContext extends I2PAppContext {
}
}
/**
* @since 0.8.8
*/
void removeShutdownTasks() {
_shutdownTasks.clear();
}
/**
* The last thing to be called before router shutdown.
* No context resources, including logging, will be available.
* Only for external threads in the same JVM needing to know when
* the shutdown is complete, like Android.
* @since 0.8.8
*/
public void addFinalShutdownTask(Runnable task) {
_finalShutdownTasks.add(task);
}
/**
* @return the Set
* @since 0.8.8
*/
Set<Runnable> getFinalShutdownTasks() {
return _finalShutdownTasks;
}
/**
* Use this instead of context instanceof RouterContext
* @return true

View File

@ -15,14 +15,21 @@ class RouterWatchdog implements Runnable {
private final Log _log;
private final RouterContext _context;
private int _consecutiveErrors;
private volatile boolean _isRunning;
private static final long MAX_JOB_RUN_LAG = 60*1000;
public RouterWatchdog(RouterContext ctx) {
_context = ctx;
_log = ctx.logManager().getLog(RouterWatchdog.class);
_isRunning = true;
}
/** @since 0.8.8 */
public void shutdown() {
_isRunning = false;
}
public boolean verifyJobQueueLiveliness() {
long when = _context.jobQueue().getLastJobBegin();
if (when < 0)
@ -109,7 +116,7 @@ class RouterWatchdog implements Runnable {
}
public void run() {
while (true) {
while (_isRunning) {
try { Thread.sleep(60*1000); } catch (InterruptedException ie) {}
monitorRouter();
}

View File

@ -359,6 +359,9 @@ class PersistentDataStore extends TransientDataStore {
if (routerInfoFiles.length > 5)
_alreadyWarned = false;
for (int i = 0; i < routerInfoFiles.length; i++) {
// drop out if the router gets killed right after startup
if (!_context.router().isAlive())
break;
Hash key = getRouterInfoHash(routerInfoFiles[i].getName());
if ( (key != null) && (!isKnown(key)) ) {
// Run it inline so we don't clog up the job queue, esp. at startup

View File

@ -52,6 +52,11 @@ class PeerManager {
private static final long REORGANIZE_TIME_MEDIUM = 123*1000;
private static final long REORGANIZE_TIME_LONG = 551*1000;
/**
* Warning - this loads all the profiles in the constructor.
* This may take a long time - 30 seconds or more.
* Instantiate this in a Job or Thread.
*/
public PeerManager(RouterContext context) {
_context = context;
_log = context.logManager().getLog(PeerManager.class);
@ -99,6 +104,14 @@ class PeerManager {
}
}
/** @since 0.8.8 */
void clearProfiles() {
_organizer.clearProfiles();
_capabilitiesByPeer.clear();
for (int i = 0; i < _peersByCapability.length; i++)
_peersByCapability[i].clear();
}
Set selectPeers() {
return _organizer.selectAllPeers();
}
@ -111,6 +124,9 @@ class PeerManager {
_persistenceHelper.writeProfile(prof);
}
/**
* This may take a long time - 30 seconds or more
*/
void loadProfiles() {
Set<PeerProfile> profiles = _persistenceHelper.readProfiles();
for (Iterator<PeerProfile> iter = profiles.iterator(); iter.hasNext();) {

View File

@ -47,8 +47,10 @@ public class PeerManagerFacadeImpl implements PeerManagerFacade {
public void shutdown() {
_log.info("Shutting down the peer manager");
_testJob.stopTesting();
if (_manager != null)
if (_manager != null) {
_manager.storeProfiles();
_manager.clearProfiles();
}
}
public void restart() {

View File

@ -227,6 +227,19 @@ public class ProfileOrganizer {
public boolean isWellIntegrated(Hash peer) { return isX(_wellIntegratedPeers, peer); }
public boolean isFailing(Hash peer) { return isX(_failingPeers, peer); }
/** @since 0.8.8 */
void clearProfiles() {
getReadLock();
try {
_failingPeers.clear();
_fastPeers.clear();
_highCapacityPeers.clear();
_notFailingPeers.clear();
_notFailingPeersList.clear();
_wellIntegratedPeers.clear();
} finally { releaseReadLock(); }
}
/**
* if a peer sends us more than 5 replies in a searchReply that we cannot
* fetch, stop listening to them.

View File

@ -64,6 +64,7 @@ public class FIFOBandwidthLimiter {
/** lifetime counter of tokens available for use but exceeded our maxOutboundBurst size */
private final AtomicLong _totalWastedOutboundBytes = new AtomicLong();
private final FIFOBandwidthRefiller _refiller;
private final Thread _refillerThread;
private long _lastTotalSent;
private long _lastTotalReceived;
@ -91,9 +92,9 @@ public class FIFOBandwidthLimiter {
_lastTotalReceived = _totalAllocatedInboundBytes.get();
_lastStatsUpdated = now();
_refiller = new FIFOBandwidthRefiller(_context, this);
I2PThread t = new I2PThread(_refiller, "BWRefiller", true);
t.setPriority(I2PThread.NORM_PRIORITY-1);
t.start();
_refillerThread = new I2PThread(_refiller, "BWRefiller", true);
_refillerThread.setPriority(I2PThread.NORM_PRIORITY-1);
_refillerThread.start();
}
//public long getAvailableInboundBytes() { return _availableInboundBytes; }
@ -122,6 +123,19 @@ public class FIFOBandwidthLimiter {
public int getInboundBurstKBytesPerSecond() { return _refiller.getInboundBurstKBytesPerSecond(); }
public void reinitialize() {
clear();
_refiller.reinitialize();
}
/** @since 0.8.8 */
public void shutdown() {
_refiller.shutdown();
_refillerThread.interrupt();
clear();
}
/** @since 0.8.8 */
private void clear() {
_pendingInboundRequests.clear();
_pendingOutboundRequests.clear();
_availableInbound.set(0);
@ -134,7 +148,6 @@ public class FIFOBandwidthLimiter {
_unavailableOutboundBurst.set(0);
_inboundUnlimited = false;
_outboundUnlimited = false;
_refiller.reinitialize();
}
public Request createRequest() { return new SimpleRequest(); }

View File

@ -24,6 +24,7 @@ public class FIFOBandwidthRefiller implements Runnable {
private long _lastCheckConfigTime;
/** how frequently do we check the config for updates? */
private long _configCheckPeriodMs = 60*1000;
private volatile boolean _isRunning;
public static final String PROP_INBOUND_BANDWIDTH = "i2np.bandwidth.inboundKBytesPerSecond";
public static final String PROP_OUTBOUND_BANDWIDTH = "i2np.bandwidth.outboundKBytesPerSecond";
@ -67,12 +68,19 @@ public class FIFOBandwidthRefiller implements Runnable {
_context = context;
_log = context.logManager().getLog(FIFOBandwidthRefiller.class);
reinitialize();
_isRunning = true;
}
/** @since 0.8.8 */
public void shutdown() {
_isRunning = false;
}
public void run() {
// bootstrap 'em with nothing
_lastRefillTime = _limiter.now();
List<FIFOBandwidthLimiter.Request> buffer = new ArrayList(2);
while (true) {
while (_isRunning) {
long now = _limiter.now();
if (now >= _lastCheckConfigTime + _configCheckPeriodMs) {
checkConfig();

View File

@ -1089,6 +1089,10 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
}
private static final int MAX_HANDLERS = 4;
/**
* FIXME static queue mixes handlers from different contexts in multirouter JVM
*/
private final static LinkedBlockingQueue<I2NPMessageHandler> _i2npHandlers = new LinkedBlockingQueue(MAX_HANDLERS);
private final static I2NPMessageHandler acquireHandler(RouterContext ctx) {
@ -1129,6 +1133,15 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_dataReadBufs.offer(buf);
}
/** @since 0.8.8 */
static void releaseResources() {
_i2npHandlers.clear();
_dataReadBufs.clear();
synchronized(_bufs) {
_bufs.clear();
}
}
/**
* sizeof(data)+data+pad+crc.
*

View File

@ -702,6 +702,7 @@ public class NTCPTransport extends TransportImpl {
NTCPConnection con = (NTCPConnection)iter.next();
con.close();
}
NTCPConnection.releaseResources();
// will this work?
replaceAddress(null);
}

View File

@ -113,6 +113,11 @@ class PacketHandler {
return rv.toString();
}
/** @since 0.8.8 */
int getHandlerCount() {
return _handlers.length;
}
/** the packet is from a peer we are establishing an outbound con to, but failed validation, so fallback */
private static final short OUTBOUND_FALLBACK = 1;
/** the packet is from a peer we are establishing an inbound con to, but failed validation, so fallback */

View File

@ -147,6 +147,7 @@ class UDPEndpoint {
/**
* Blocking call to receive the next inbound UDP packet from any peer.
* @return null if we have shut down
*/
public UDPPacket receive() {
if (_receiver == null)

View File

@ -58,9 +58,11 @@ class UDPReceiver {
public void shutdown() {
_keepRunning = false;
_inboundQueue.clear();
UDPPacket poison = UDPPacket.acquire(_context, false);
poison.setMessageType(TYPE_POISON);
_inboundQueue.offer(poison);
for (int i = 0; i < _transport.getPacketHandlerCount(); i++) {
UDPPacket poison = UDPPacket.acquire(_context, false);
poison.setMessageType(TYPE_POISON);
_inboundQueue.offer(poison);
}
for (int i = 1; i <= 5 && !_inboundQueue.isEmpty(); i++) {
try {
Thread.sleep(i * 50);

View File

@ -1367,6 +1367,15 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
return "";
}
/** @since 0.8.8 */
int getPacketHandlerCount() {
PacketHandler handler = _handler;
if (handler != null)
return handler.getHandlerCount();
else
return 0;
}
private static final int DROP_INACTIVITY_TIME = 60*1000;
public void failed(OutboundMessageState msg) { failed(msg, true); }

View File

@ -84,7 +84,7 @@ public class RandomIterator<E> implements Iterator<E> {
* <a href="http://www.qbrundage.com/michaelb/pubs/essays/random_number_generation" title="http://www.qbrundage.com/michaelb/pubs/essays/random_number_generation" target="_blank">http://www.qbrundage.com/michaelb/pubs/e&#8230;</a>
* for some implementations, which are faster than java.util.Random.
*/
private static final Random rand = RandomSource.getInstance();
private final Random rand = RandomSource.getInstance();
/** Used to narrow the range to take random indexes from */
private int lower, upper;

View File

@ -120,7 +120,9 @@ public class HTTPMUSocket
return true;
try {
ssdpMultiSock.leaveGroup(ssdpMultiGroup, ssdpMultiIf);
// I2P close it instead of leaving group so the thread dies
//ssdpMultiSock.leaveGroup(ssdpMultiGroup, ssdpMultiIf);
ssdpMultiSock.close();
ssdpMultiSock = null;
}
catch (Exception e) {

View File

@ -65,6 +65,8 @@ public class ThreadCore implements Runnable
//threadObject.destroy();
//threadObject.stop();
setThreadObject(null);
// I2P break Disposer out of sleep()
threadObject.interrupt();
}
}