diff --git a/apps/routerconsole/java/src/net/i2p/router/web/NewsFetcher.java b/apps/routerconsole/java/src/net/i2p/router/web/NewsFetcher.java index 1f1cd4098..a11b8adc5 100644 --- a/apps/routerconsole/java/src/net/i2p/router/web/NewsFetcher.java +++ b/apps/routerconsole/java/src/net/i2p/router/web/NewsFetcher.java @@ -37,6 +37,8 @@ public class NewsFetcher implements Runnable, EepGet.StatusListener { private File _newsFile; private File _tempFile; private static NewsFetcher _instance; + private volatile boolean _isRunning; + //public static final synchronized NewsFetcher getInstance() { return _instance; } public static final synchronized NewsFetcher getInstance(I2PAppContext ctx) { if (_instance != null) @@ -64,8 +66,14 @@ public class NewsFetcher implements Runnable, EepGet.StatusListener { _tempFile = new File(_context.getTempDir(), TEMP_NEWS_FILE); updateLastFetched(); _updateVersion = ""; + _isRunning = true; } + /** @since 0.8.8 */ + void shutdown() { + _isRunning = false; + } + private void updateLastFetched() { if (_newsFile.exists()) { if (_lastUpdated == 0) @@ -108,7 +116,7 @@ public class NewsFetcher implements Runnable, EepGet.StatusListener { public void run() { try { Thread.sleep(INITIAL_DELAY + _context.random().nextLong(INITIAL_DELAY)); } catch (InterruptedException ie) {} - while (true) { + while (_isRunning) { if (!_updateAvailable) checkForUpdates(); if (shouldFetchNews()) { fetchNews(); diff --git a/apps/routerconsole/java/src/net/i2p/router/web/RouterConsoleRunner.java b/apps/routerconsole/java/src/net/i2p/router/web/RouterConsoleRunner.java index be46bdca0..a436e3e83 100644 --- a/apps/routerconsole/java/src/net/i2p/router/web/RouterConsoleRunner.java +++ b/apps/routerconsole/java/src/net/i2p/router/web/RouterConsoleRunner.java @@ -342,10 +342,10 @@ public class RouterConsoleRunner { } NewsFetcher fetcher = NewsFetcher.getInstance(I2PAppContext.getGlobalContext()); - Thread t = new I2PAppThread(fetcher, "NewsFetcher", true); - t.start(); + Thread newsThread = new I2PAppThread(fetcher, "NewsFetcher", true); + newsThread.start(); - t = new I2PAppThread(new StatSummarizer(), "StatSummarizer", true); + Thread t = new I2PAppThread(new StatSummarizer(), "StatSummarizer", true); t.start(); List contexts = RouterContext.listContexts(); @@ -356,6 +356,9 @@ public class RouterConsoleRunner { t.start(); ctx.addShutdownTask(new PluginStopper(ctx)); } + ctx.addShutdownTask(new NewsShutdown(fetcher, newsThread)); + // stat summarizer registers its own hook + ctx.addShutdownTask(new ServerShutdown()); } } @@ -495,16 +498,31 @@ public class RouterConsoleRunner { } } -/******* - public void stopConsole() { - try { - _server.stop(); - } catch (InterruptedException ie) { - ie.printStackTrace(); + /** @since 0.8.8 */ + private class ServerShutdown implements Runnable { + public void run() { + try { + _server.stop(); + } catch (InterruptedException ie) {} } } -********/ + /** @since 0.8.8 */ + private static class NewsShutdown implements Runnable { + private final NewsFetcher _fetcher; + private final Thread _newsThread; + + public NewsShutdown(NewsFetcher fetcher, Thread t) { + _fetcher = fetcher; + _newsThread = t; + } + + public void run() { + _fetcher.shutdown(); + _newsThread.interrupt(); + } + } + public static Properties webAppProperties() { return webAppProperties(I2PAppContext.getGlobalContext().getConfigDir().getAbsolutePath()); } diff --git a/core/java/src/net/i2p/time/Timestamper.java b/core/java/src/net/i2p/time/Timestamper.java index 2c798bab6..829d7a68a 100644 --- a/core/java/src/net/i2p/time/Timestamper.java +++ b/core/java/src/net/i2p/time/Timestamper.java @@ -29,6 +29,8 @@ public class Timestamper implements Runnable { private boolean _daemon; private boolean _initialized; private boolean _wellSynced; + private volatile boolean _isRunning; + private Thread _timestamperThread; private static final int MIN_QUERY_FREQUENCY = 5*60*1000; private static final int DEFAULT_QUERY_FREQUENCY = 5*60*1000; @@ -106,10 +108,11 @@ public class Timestamper implements Runnable { } private void startTimestamper() { - I2PThread t = new I2PThread(this, "Timestamper"); - t.setPriority(I2PThread.MIN_PRIORITY); - t.setDaemon(_daemon); - t.start(); + _timestamperThread = new I2PThread(this, "Timestamper", _daemon); + _timestamperThread.setPriority(I2PThread.MIN_PRIORITY); + _isRunning = true; + _timestamperThread.start(); + _context.addShutdownTask(new Shutdown()); } public void waitForInitialization() { @@ -121,6 +124,15 @@ public class Timestamper implements Runnable { } catch (InterruptedException ie) {} } + /** @since 0.8.8 */ + private class Shutdown implements Runnable { + public void run() { + _isRunning = false; + if (_timestamperThread != null) + _timestamperThread.interrupt(); + } + } + public void run() { try { Thread.sleep(1000); } catch (InterruptedException ie) {} _log = _context.logManager().getLog(Timestamper.class); @@ -128,7 +140,7 @@ public class Timestamper implements Runnable { _log.info("Starting timestamper"); boolean lastFailed = false; try { - while (true) { + while (_isRunning) { updateConfig(); if (!_disabled) { // first the servers for our country, if we know what country we're in... diff --git a/router/java/src/net/i2p/router/Router.java b/router/java/src/net/i2p/router/Router.java index 03ee24db4..a6eb13ca4 100644 --- a/router/java/src/net/i2p/router/Router.java +++ b/router/java/src/net/i2p/router/Router.java @@ -967,6 +967,7 @@ public class Router { try { _context.tunnelDispatcher().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the tunnel dispatcher", t); } try { _context.netDb().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the networkDb", t); } try { _context.commSystem().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the comm system", t); } + try { _context.bandwidthLimiter().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the comm system", t); } try { _context.peerManager().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the peer manager", t); } try { _context.messageRegistry().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the message registry", t); } try { _context.messageValidator().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the message validator", t); } diff --git a/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java b/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java index bea77fcf4..b8f8a9590 100644 --- a/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java +++ b/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java @@ -64,6 +64,7 @@ public class FIFOBandwidthLimiter { /** lifetime counter of tokens available for use but exceeded our maxOutboundBurst size */ private final AtomicLong _totalWastedOutboundBytes = new AtomicLong(); private final FIFOBandwidthRefiller _refiller; + private final Thread _refillerThread; private long _lastTotalSent; private long _lastTotalReceived; @@ -91,9 +92,9 @@ public class FIFOBandwidthLimiter { _lastTotalReceived = _totalAllocatedInboundBytes.get(); _lastStatsUpdated = now(); _refiller = new FIFOBandwidthRefiller(_context, this); - I2PThread t = new I2PThread(_refiller, "BWRefiller", true); - t.setPriority(I2PThread.NORM_PRIORITY-1); - t.start(); + _refillerThread = new I2PThread(_refiller, "BWRefiller", true); + _refillerThread.setPriority(I2PThread.NORM_PRIORITY-1); + _refillerThread.start(); } //public long getAvailableInboundBytes() { return _availableInboundBytes; } @@ -122,6 +123,19 @@ public class FIFOBandwidthLimiter { public int getInboundBurstKBytesPerSecond() { return _refiller.getInboundBurstKBytesPerSecond(); } public void reinitialize() { + clear(); + _refiller.reinitialize(); + } + + /** @since 0.8.8 */ + public void shutdown() { + _refiller.shutdown(); + _refillerThread.interrupt(); + clear(); + } + + /** @since 0.8.8 */ + private void clear() { _pendingInboundRequests.clear(); _pendingOutboundRequests.clear(); _availableInbound.set(0); @@ -134,7 +148,6 @@ public class FIFOBandwidthLimiter { _unavailableOutboundBurst.set(0); _inboundUnlimited = false; _outboundUnlimited = false; - _refiller.reinitialize(); } public Request createRequest() { return new SimpleRequest(); } diff --git a/router/java/src/net/i2p/router/transport/FIFOBandwidthRefiller.java b/router/java/src/net/i2p/router/transport/FIFOBandwidthRefiller.java index bc8dd1852..7dd9d4128 100644 --- a/router/java/src/net/i2p/router/transport/FIFOBandwidthRefiller.java +++ b/router/java/src/net/i2p/router/transport/FIFOBandwidthRefiller.java @@ -24,6 +24,7 @@ public class FIFOBandwidthRefiller implements Runnable { private long _lastCheckConfigTime; /** how frequently do we check the config for updates? */ private long _configCheckPeriodMs = 60*1000; + private volatile boolean _isRunning; public static final String PROP_INBOUND_BANDWIDTH = "i2np.bandwidth.inboundKBytesPerSecond"; public static final String PROP_OUTBOUND_BANDWIDTH = "i2np.bandwidth.outboundKBytesPerSecond"; @@ -67,12 +68,19 @@ public class FIFOBandwidthRefiller implements Runnable { _context = context; _log = context.logManager().getLog(FIFOBandwidthRefiller.class); reinitialize(); + _isRunning = true; } + + /** @since 0.8.8 */ + public void shutdown() { + _isRunning = false; + } + public void run() { // bootstrap 'em with nothing _lastRefillTime = _limiter.now(); List buffer = new ArrayList(2); - while (true) { + while (_isRunning) { long now = _limiter.now(); if (now >= _lastCheckConfigTime + _configCheckPeriodMs) { checkConfig(); diff --git a/router/java/src/net/i2p/router/transport/udp/PacketHandler.java b/router/java/src/net/i2p/router/transport/udp/PacketHandler.java index 824f4a928..f8b387d68 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketHandler.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketHandler.java @@ -113,6 +113,11 @@ class PacketHandler { return rv.toString(); } + /** @since 0.8.8 */ + int getHandlerCount() { + return _handlers.length; + } + /** the packet is from a peer we are establishing an outbound con to, but failed validation, so fallback */ private static final short OUTBOUND_FALLBACK = 1; /** the packet is from a peer we are establishing an inbound con to, but failed validation, so fallback */ diff --git a/router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java b/router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java index f20781927..b59c40d4b 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java @@ -147,6 +147,7 @@ class UDPEndpoint { /** * Blocking call to receive the next inbound UDP packet from any peer. + * @return null if we have shut down */ public UDPPacket receive() { if (_receiver == null) diff --git a/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java b/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java index ded71f8a2..647a5639f 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java @@ -58,9 +58,11 @@ class UDPReceiver { public void shutdown() { _keepRunning = false; _inboundQueue.clear(); - UDPPacket poison = UDPPacket.acquire(_context, false); - poison.setMessageType(TYPE_POISON); - _inboundQueue.offer(poison); + for (int i = 0; i < _transport.getPacketHandlerCount(); i++) { + UDPPacket poison = UDPPacket.acquire(_context, false); + poison.setMessageType(TYPE_POISON); + _inboundQueue.offer(poison); + } for (int i = 1; i <= 5 && !_inboundQueue.isEmpty(); i++) { try { Thread.sleep(i * 50); diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index b8f1d35ee..b49906c43 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -1367,6 +1367,15 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority return ""; } + /** @since 0.8.8 */ + int getPacketHandlerCount() { + PacketHandler handler = _handler; + if (handler != null) + return handler.getHandlerCount(); + else + return 0; + } + private static final int DROP_INACTIVITY_TIME = 60*1000; public void failed(OutboundMessageState msg) { failed(msg, true); } diff --git a/router/java/src/org/cybergarage/upnp/ssdp/HTTPMUSocket.java b/router/java/src/org/cybergarage/upnp/ssdp/HTTPMUSocket.java index 80e589678..21f4f13a4 100644 --- a/router/java/src/org/cybergarage/upnp/ssdp/HTTPMUSocket.java +++ b/router/java/src/org/cybergarage/upnp/ssdp/HTTPMUSocket.java @@ -120,7 +120,9 @@ public class HTTPMUSocket return true; try { - ssdpMultiSock.leaveGroup(ssdpMultiGroup, ssdpMultiIf); + // I2P close it instead of leaving group so the thread dies + //ssdpMultiSock.leaveGroup(ssdpMultiGroup, ssdpMultiIf); + ssdpMultiSock.close(); ssdpMultiSock = null; } catch (Exception e) {