From 6e077ee621217d88bbe1dc0c475c297eafb3ac8e Mon Sep 17 00:00:00 2001 From: zzz Date: Mon, 11 Jun 2012 19:38:33 +0000 Subject: [PATCH] * i2psnark: - Reduce TrackerClient threads - Reduce delay between peer adds for faster startup - Thread the announces and reduce timeout when stopping --- .../src/org/klomp/snark/I2PSnarkUtil.java | 32 +- .../java/src/org/klomp/snark/PeerState.java | 8 +- .../java/src/org/klomp/snark/Snark.java | 4 - .../src/org/klomp/snark/SnarkManager.java | 4 +- .../src/org/klomp/snark/TrackerClient.java | 317 +++++++++++++----- .../org/klomp/snark/web/I2PSnarkServlet.java | 4 +- history.txt | 3 + .../src/net/i2p/router/RouterVersion.java | 2 +- 8 files changed, 276 insertions(+), 98 deletions(-) diff --git a/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java b/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java index f4238dae4..9be365eb1 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java +++ b/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java @@ -68,6 +68,8 @@ public class I2PSnarkUtil { private List _openTrackers; private DHT _dht; + private static final int EEPGET_CONNECT_TIMEOUT = 45*1000; + private static final int EEPGET_CONNECT_TIMEOUT_SHORT = 5*1000; public static final int DEFAULT_STARTUP_DELAY = 3; public static final boolean DEFAULT_USE_OPENTRACKERS = true; public static final String DEFAULT_OPENTRACKERS = "http://tracker.welterde.i2p/a"; @@ -306,11 +308,24 @@ public class I2PSnarkUtil { } /** - * fetch the given URL, returning the file it is stored in, or null on error + * Fetch the given URL, returning the file it is stored in, or null on error. + * No retries. */ public File get(String url) { return get(url, true, 0); } + + /** + * @param rewrite if true, convert http://KEY.i2p/foo/announce to http://i2p/KEY/foo/announce + */ public File get(String url, boolean rewrite) { return get(url, rewrite, 0); } + + /** + * @param retries if < 0, set timeout to a few seconds + */ public File get(String url, int retries) { return get(url, true, retries); } + + /** + * @param retries if < 0, set timeout to a few seconds + */ public File get(String url, boolean rewrite, int retries) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Fetching [" + url + "] proxy=" + _proxyHost + ":" + _proxyPort + ": " + _shouldProxy); @@ -331,12 +346,21 @@ public class I2PSnarkUtil { //_log.debug("Rewritten url [" + fetchURL + "]"); //EepGet get = new EepGet(_context, _shouldProxy, _proxyHost, _proxyPort, retries, out.getAbsolutePath(), fetchURL); // Use our tunnel for announces and .torrent fetches too! Make sure we're connected first... - if (!connected()) { - if (!connect()) + int timeout; + if (retries < 0) { + if (!connected()) return null; + timeout = EEPGET_CONNECT_TIMEOUT_SHORT; + retries = 0; + } else { + timeout = EEPGET_CONNECT_TIMEOUT; + if (!connected()) { + if (!connect()) + return null; + } } EepGet get = new I2PSocketEepGet(_context, _manager, retries, out.getAbsolutePath(), fetchURL); - if (get.fetch()) { + if (get.fetch(timeout)) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Fetch successful [" + url + "]: size=" + out.length()); return out; diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerState.java b/apps/i2psnark/java/src/org/klomp/snark/PeerState.java index f41bf1b57..bbe6ff932 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerState.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerState.java @@ -231,8 +231,8 @@ class PeerState implements DataLoader return; } - if (_log.shouldLog(Log.INFO)) - _log.info("Queueing (" + piece + ", " + begin + ", " + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Queueing (" + piece + ", " + begin + ", " + length + ")" + " to " + peer); // don't load the data into mem now, let PeerConnectionOut do it @@ -267,8 +267,8 @@ class PeerState implements DataLoader return null; } - if (_log.shouldLog(Log.INFO)) - _log.info("Sending (" + piece + ", " + begin + ", " + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Sending (" + piece + ", " + begin + ", " + length + ")" + " to " + peer); return pieceBytes; } diff --git a/apps/i2psnark/java/src/org/klomp/snark/Snark.java b/apps/i2psnark/java/src/org/klomp/snark/Snark.java index f5fe35e31..f7a990185 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/Snark.java +++ b/apps/i2psnark/java/src/org/klomp/snark/Snark.java @@ -567,10 +567,6 @@ public class Snark fatal("Could not reopen storage", ioe); } } - TrackerClient newClient = new TrackerClient(_util, meta, additionalTrackerURL, coordinator, this); - if (!trackerclient.halted()) - trackerclient.halt(); - trackerclient = newClient; trackerclient.start(); } else { debug("NOT starting TrackerClient???", NOTICE); diff --git a/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java b/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java index 26023cab5..b23c67c13 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java +++ b/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java @@ -1596,8 +1596,10 @@ public class SnarkManager implements Snark.CompleteListener { Set names = listTorrentFiles(); for (Iterator iter = names.iterator(); iter.hasNext(); ) { Snark snark = getTorrent((String)iter.next()); - if ( (snark != null) && (!snark.isStopped()) ) + if ( (snark != null) && (!snark.isStopped()) ) { snark.stopTorrent(); + try { Thread.sleep(50); } catch (InterruptedException ie) {} + } } } } diff --git a/apps/i2psnark/java/src/org/klomp/snark/TrackerClient.java b/apps/i2psnark/java/src/org/klomp/snark/TrackerClient.java index 01b6a829b..708e29f77 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/TrackerClient.java +++ b/apps/i2psnark/java/src/org/klomp/snark/TrackerClient.java @@ -35,18 +35,32 @@ import java.util.Random; import java.util.Set; import net.i2p.I2PAppContext; +import net.i2p.data.DataHelper; import net.i2p.data.Hash; +import net.i2p.util.Clock; import net.i2p.util.I2PAppThread; import net.i2p.util.Log; +import net.i2p.util.SimpleTimer2; /** * Informs metainfo tracker of events and gets new peers for peer * coordinator. * + * start() creates a thread and starts it. + * At the end of each run, a TimedEvent is queued on the SimpleTimer2 queue. + * The TimedEvent creates a new thread and starts it, so it does not + * clog SimpleTimer2. + * + * The thread runs one pass through the trackers, the PEX, and the DHT, + * then queues a new TimedEvent and exits. + * + * Thus there are only threads that are actively announcing, not one thread per torrent forever. + * + * start() may be called again after halt(). + * * @author Mark Wielaard (mark@klomp.org) */ -public class TrackerClient extends I2PAppThread -{ +public class TrackerClient implements Runnable { private final Log _log = I2PAppContext.getGlobalContext().logManager().getLog(TrackerClient.class); private static final String NO_EVENT = ""; private static final String STARTED_EVENT = "started"; @@ -56,25 +70,39 @@ public class TrackerClient extends I2PAppThread private final static int SLEEP = 5; // 5 minutes. private final static int DELAY_MIN = 2000; // 2 secs. - private final static int DELAY_MUL = 1500; // 1.5 secs. + private final static int DELAY_RAND = 6*1000; private final static int MAX_REGISTER_FAILS = 10; // * INITIAL_SLEEP = 15m to register private final static int INITIAL_SLEEP = 90*1000; private final static int MAX_CONSEC_FAILS = 5; // slow down after this private final static int LONG_SLEEP = 30*60*1000; // sleep a while after lots of fails - private I2PSnarkUtil _util; + private final I2PSnarkUtil _util; private final MetaInfo meta; + private final String infoHash; + private final String peerID; private final String additionalTrackerURL; private final PeerCoordinator coordinator; private final Snark snark; private final int port; + private final String _threadName; - private boolean stop; - private boolean started; + private volatile boolean stop = true; + private volatile boolean started; + private volatile boolean _initialized; + private volatile int _runCount; + // running thread so it can be interrupted + private volatile Thread _thread; + // queued event so it can be cancelled + private volatile SimpleTimer2.TimedEvent _event; + // these 2 used in loop() + private volatile boolean runStarted; + private volatile int consecutiveFails; - private List trackers; + private final List trackers; /** + * Call start() to start it. + * * @param meta null if in magnet mode * @param additionalTrackerURL may be null, from the ?tr= param in magnet mode, otherwise ignored */ @@ -84,7 +112,7 @@ public class TrackerClient extends I2PAppThread super(); // Set unique name. String id = urlencode(snark.getID()); - setName("TrackerClient " + id.substring(id.length() - 12)); + _threadName = "TrackerClient " + id.substring(id.length() - 12); _util = util; this.meta = meta; this.additionalTrackerURL = additionalTrackerURL; @@ -92,12 +120,22 @@ public class TrackerClient extends I2PAppThread this.snark = snark; this.port = 6881; //(port == -1) ? 9 : port; + this.infoHash = urlencode(snark.getInfoHash()); + this.peerID = urlencode(snark.getID()); + this.trackers = new ArrayList(2); } - @Override - public void start() { - if (stop) throw new RuntimeException("Dont rerun me, create a copy"); - super.start(); + public synchronized void start() { + if (!stop) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Already started: " + _threadName); + return; + } + stop = false; + consecutiveFails = 0; + runStarted = false; + _thread = new I2PAppThread(this, _threadName + " #" + (++_runCount), true); + _thread.start(); started = true; } @@ -107,10 +145,47 @@ public class TrackerClient extends I2PAppThread /** * Interrupts this Thread to stop it. */ - public void halt() - { - stop = true; - this.interrupt(); + public synchronized void halt() { + boolean wasStopped = stop; + if (wasStopped) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Already stopped: " + _threadName); + } else { + if (_log.shouldLog(Log.WARN)) + _log.warn("Stopping: " + _threadName); + stop = true; + } + SimpleTimer2.TimedEvent e = _event; + if (e != null) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Cancelling next announce " + _threadName); + e.cancel(); + _event = null; + } + Thread t = _thread; + if (t != null) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Interrupting " + t.getName()); + t.interrupt(); + } + if (!wasStopped) + unannounce(); + } + + private void queueLoop(long delay) { + _event = new Runner(delay); + } + + private class Runner extends SimpleTimer2.TimedEvent { + public Runner(long delay) { + super(SimpleTimer2.getInstance(), delay); + } + + public void timeReached() { + _event = null; + _thread = new I2PAppThread(TrackerClient.this, _threadName + " #" + (++_runCount), true); + _thread.start(); + } } private boolean verifyConnected() { @@ -123,20 +198,51 @@ public class TrackerClient extends I2PAppThread return !stop && _util.connected(); } - @Override - public void run() - { - String infoHash = urlencode(snark.getInfoHash()); - String peerID = urlencode(snark.getID()); - + /** + * Setup the first time only, + * then one pass (usually) through the trackers, PEX, and DHT. + * This will take several seconds to several minutes. + */ + public void run() { + long begin = Clock.getInstance().now(); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Start " + Thread.currentThread().getName()); + try { + if (!_initialized) { + setup(); + // FIXME dht + if (trackers.isEmpty()) { + stop = true; + return; + } + _initialized = true; + // FIXME only when starting everybody at once, not for a single torrent + long delay = I2PAppContext.getGlobalContext().random().nextInt(30*1000); + try { + Thread.sleep(delay); + } catch (InterruptedException ie) {} + } + loop(); + } finally { + // don't hold ref + _thread = null; + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Finish " + Thread.currentThread().getName() + + " after " + DataHelper.formatDuration(Clock.getInstance().now() - begin)); + } + } + /** + * Do this one time only (not every time it is started). + * @since 0.9.1 + */ + public void setup() { // Construct the list of trackers for this torrent, // starting with the primary one listed in the metainfo, // followed by the secondary open trackers // It's painful, but try to make sure if an open tracker is also // the primary tracker, that we don't add it twice. // todo: check for b32 matches as well - trackers = new ArrayList(2); String primary = null; if (meta != null) primary = meta.getAnnounce(); @@ -192,58 +298,32 @@ public class TrackerClient extends I2PAppThread this.snark.stopTorrent(); return; } + } - long uploaded = coordinator.getUploaded(); - long downloaded = coordinator.getDownloaded(); - long left = coordinator.getLeft(); - - boolean completed = (left == 0); + /** + * Announce to all the trackers, get peers from PEX and DHT, then queue up a SimpleTimer2 event. + * This will take several seconds to several minutes. + * @since 0.9.1 + */ + private void loop() { try { - if (!verifyConnected()) return; - boolean runStarted = false; - boolean firstTime = true; - int consecutiveFails = 0; Random r = I2PAppContext.getGlobalContext().random(); while(!stop) { + if (!verifyConnected()) { + stop = true; + return; + } + // Local DHT tracker announce if (_util.getDHT() != null) _util.getDHT().announce(snark.getInfoHash()); - try - { - // Sleep some minutes... - // Sleep the minimum interval for all the trackers, but 60s minimum - // except for the first time... - int delay; - int random = r.nextInt(120*1000); - if (firstTime) { - delay = r.nextInt(30*1000); - firstTime = false; - } else if (completed && runStarted) - delay = 3*SLEEP*60*1000 + random; - else if (snark.getTrackerProblems() != null && ++consecutiveFails < MAX_CONSEC_FAILS) - delay = INITIAL_SLEEP; - else - // sleep a while, when we wake up we will contact only the trackers whose intervals have passed - delay = SLEEP*60*1000 + random; - if (delay > 0) - Thread.sleep(delay); - } - catch(InterruptedException interrupt) - { - // ignore - } - - if (stop) - break; - - if (!verifyConnected()) return; - - uploaded = coordinator.getUploaded(); - downloaded = coordinator.getDownloaded(); - left = coordinator.getLeft(); // -1 in magnet mode + long uploaded = coordinator.getUploaded(); + long downloaded = coordinator.getDownloaded(); + long left = coordinator.getLeft(); // -1 in magnet mode + boolean completed = (left == 0); // First time we got a complete download? String event; @@ -303,7 +383,7 @@ public class TrackerClient extends I2PAppThread // FIXME if id == us || dest == us continue; // only delay if we actually make an attempt to add peer if(coordinator.addPeer(cur) && it.hasNext()) { - int delay = (DELAY_MUL * r.nextInt(10)) + DELAY_MIN; + int delay = r.nextInt(DELAY_RAND) + DELAY_MIN; try { Thread.sleep(delay); } catch (InterruptedException ie) {} } } @@ -354,7 +434,7 @@ public class TrackerClient extends I2PAppThread while ((!stop) && it.hasNext() && coordinator.needOutboundPeers()) { Peer cur = it.next(); if (coordinator.addPeer(cur) && it.hasNext()) { - int delay = (DELAY_MUL * r.nextInt(10)) + DELAY_MIN; + int delay = r.nextInt(DELAY_RAND) + DELAY_MIN; try { Thread.sleep(delay); } catch (InterruptedException ie) {} } } @@ -390,7 +470,7 @@ public class TrackerClient extends I2PAppThread while ((!stop) && it.hasNext() && coordinator.needOutboundPeers()) { Peer cur = it.next(); if (coordinator.addPeer(cur) && it.hasNext()) { - int delay = (DELAY_MUL * r.nextInt(10)) + DELAY_MIN; + int delay = r.nextInt(DELAY_RAND) + DELAY_MIN; try { Thread.sleep(delay); } catch (InterruptedException ie) {} } } @@ -400,8 +480,36 @@ public class TrackerClient extends I2PAppThread // we could try and total the unique peers but that's too hard for now snark.setTrackerSeenPeers(maxSeenPeers); + + if (stop) + return; + if (!runStarted) _util.debug(" Retrying in one minute...", Snark.DEBUG); + + try { + // Sleep some minutes... + // Sleep the minimum interval for all the trackers, but 60s minimum + int delay; + int random = r.nextInt(120*1000); + if (completed && runStarted) + delay = 3*SLEEP*60*1000 + random; + else if (snark.getTrackerProblems() != null && ++consecutiveFails < MAX_CONSEC_FAILS) + delay = INITIAL_SLEEP; + else + // sleep a while, when we wake up we will contact only the trackers whose intervals have passed + delay = SLEEP*60*1000 + random; + + if (delay > 20*1000) { + // put ourselves on SimpleTimer2 + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Requeueing in " + DataHelper.formatDuration(delay) + ": " + Thread.currentThread().getName()); + queueLoop(delay); + return; + } else if (delay > 0) { + Thread.sleep(delay); + } + } catch(InterruptedException interrupt) {} } // *** end of while loop } // try catch (Throwable t) @@ -410,26 +518,61 @@ public class TrackerClient extends I2PAppThread if (t instanceof OutOfMemoryError) throw (OutOfMemoryError)t; } - finally - { - // Local DHT tracker unannounce - if (_util.getDHT() != null) - _util.getDHT().unannounce(snark.getInfoHash()); + } + + /** + * Creates a thread for each tracker in parallel if tunnel is still open + * @since 0.9.1 + */ + private void unannounce() { + // Local DHT tracker unannounce + if (_util.getDHT() != null) + _util.getDHT().unannounce(snark.getInfoHash()); + int i = 0; + for (Tracker tr : trackers) { + if (_util.connected() && + tr.started && (!tr.stop) && tr.trackerProblems == null) { + try { + (new I2PAppThread(new Unannouncer(tr), _threadName + " Unannounce " + (++i), true)).start(); + } catch (OutOfMemoryError oom) { + // probably ran out of threads, ignore + tr.reset(); + } + } else { + tr.reset(); + } + } + } + + /** + * Send "stopped" to a single tracker + * @since 0.9.1 + */ + private class Unannouncer implements Runnable { + private final Tracker tr; + + public Unannouncer(Tracker tr) { + this.tr = tr; + } + + public void run() { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Running unannounce " + _threadName + " to " + tr.announce); + long uploaded = coordinator.getUploaded(); + long downloaded = coordinator.getDownloaded(); + long left = coordinator.getLeft(); try { - // try to contact everybody we can // Don't try to restart I2CP connection just to say goodbye - for (Iterator iter = trackers.iterator(); iter.hasNext(); ) { - if (!_util.connected()) return; - Tracker tr = (Tracker)iter.next(); - if (tr.started && (!tr.stop) && tr.trackerProblems == null) - doRequest(tr, infoHash, peerID, uploaded, + if (_util.connected()) { + if (tr.started && (!tr.stop) && tr.trackerProblems == null) + doRequest(tr, infoHash, peerID, uploaded, downloaded, left, STOPPED_EVENT); - } + } } catch(IOException ioe) { /* ignored */ } - } - + tr.reset(); + } } private TrackerInfo doRequest(Tracker tr, String infoHash, @@ -467,7 +610,8 @@ public class TrackerClient extends I2PAppThread _util.debug("Sending TrackerClient request: " + s, Snark.INFO); tr.lastRequestTime = System.currentTimeMillis(); - File fetched = _util.get(s); + // Don't wait for a response to stopped. + File fetched = _util.get(s, true, event.equals(STOPPED_EVENT) ? -1 : 0); if (fetched == null) { throw new IOException("Error fetching " + s); } @@ -556,6 +700,13 @@ public class TrackerClient extends I2PAppThread announce = a; isPrimary = p; interval = INITIAL_SLEEP; + } + + /** + * Call before restarting + * @since 0.9.1 + */ + public void reset() { lastRequestTime = 0; trackerProblems = null; stop = false; diff --git a/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java b/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java index 6ba8172be..c96385027 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java +++ b/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java @@ -743,8 +743,10 @@ public class I2PSnarkServlet extends DefaultServlet { List snarks = getSortedSnarks(req); for (int i = 0; i < snarks.size(); i++) { Snark snark = (Snark)snarks.get(i); - if (!snark.isStopped()) + if (!snark.isStopped()) { _manager.stopTorrent(snark, false); + try { Thread.sleep(50); } catch (InterruptedException ie) {} + } } if (_manager.util().connected()) { // Give the stopped announces time to get out diff --git a/history.txt b/history.txt index f1c8d2438..40d960fd3 100644 --- a/history.txt +++ b/history.txt @@ -4,6 +4,9 @@ - Sort magnets and downloads first - Fix sorting problem when torrent dir is a symlink - Reduce max file idle time + - Reduce TrackerClient threads + - Reduce delay between peer adds for faster startup + - Thread the announces and reduce timeout when stopping * NativeBigInteger: Workaround for Raspberry Pi to load the correct lib 2012-06-08 zzz diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index e36cd3b14..7f4edb975 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 12; + public final static long BUILD = 13; /** for example "-test" */ public final static String EXTRA = "";