From 5c0865836031a87aa0788afb7ff8df5bd922edca Mon Sep 17 00:00:00 2001 From: zzz Date: Thu, 1 May 2025 18:40:29 -0400 Subject: [PATCH] Thread I2CP lookups until we have a nonblocking I2CP lookup API. Most of the time, this won't be needed, as our cache will have it, but if additional announces come in over the lifetime, we'll have to do a I2CP lookup. Even then, the I2CP cache may have it, or the router should have the LS, so it should be quick. --- src/java/net/i2p/zzzot/Peer.java | 7 + src/java/net/i2p/zzzot/UDPHandler.java | 148 +++++++++++++++++--- src/java/net/i2p/zzzot/ZzzOTController.java | 2 +- 3 files changed, 137 insertions(+), 20 deletions(-) diff --git a/src/java/net/i2p/zzzot/Peer.java b/src/java/net/i2p/zzzot/Peer.java index b36a131..19b0b8a 100644 --- a/src/java/net/i2p/zzzot/Peer.java +++ b/src/java/net/i2p/zzzot/Peer.java @@ -36,6 +36,13 @@ public class Peer { hash = address.calculateHash(); } + /** + * @since 0.20.0 + */ + public Peer(byte[] id, Hash h) { + hash = h; + } + public void setLeft(long l) { bytesLeft = l; lastSeen = System.currentTimeMillis(); diff --git a/src/java/net/i2p/zzzot/UDPHandler.java b/src/java/net/i2p/zzzot/UDPHandler.java index 1e7a2c8..324def3 100644 --- a/src/java/net/i2p/zzzot/UDPHandler.java +++ b/src/java/net/i2p/zzzot/UDPHandler.java @@ -22,6 +22,12 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import net.i2p.I2PAppContext; @@ -57,6 +63,9 @@ public class UDPHandler implements I2PSessionMuxedListener { private final Map _destCache; private final AtomicInteger _announces = new AtomicInteger(); private volatile boolean _running; + private ThreadPoolExecutor _executor; + /** how long to wait before dropping an idle thread */ + private static final long HANDLER_KEEPALIVE_MS = 2*60*1000; // The listen port. public final int PORT; @@ -70,8 +79,9 @@ public class UDPHandler implements I2PSessionMuxedListener { private static final int EVENT_COMPLETED = 1; private static final int EVENT_STARTED = 2; private static final int EVENT_STOPPED = 3; - // keep it short, we should have the leaseset - private final long LOOKUP_TIMEOUT = 1000; + // keep it short, we should have the leaseset, + // if a new ratchet session was created + private final long LOOKUP_TIMEOUT = 2000; private final long CLEAN_TIME; private final long STAT_TIME = 2*60*1000; private static final byte[] INVALID = DataHelper.getUTF8("Invalid connection ID"); @@ -93,8 +103,10 @@ public class UDPHandler implements I2PSessionMuxedListener { _destCache = new LHMCache(1024); } - public void start() { + public synchronized void start() { _running = true; + _executor = new CustomThreadPoolExecutor(); + _executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); (new I2PAppThread(new Waiter(), "ZzzOT UDP startup", true)).start(); long[] r = new long[] { 5*60*1000 }; _context.statManager().createRequiredRateStat("plugin.zzzot.announces.udp", "UDP announces per minute", "Plugins", r); @@ -103,8 +115,11 @@ public class UDPHandler implements I2PSessionMuxedListener { /** * @since 0.20.0 */ - public void stop() { + public synchronized void stop() { _running = false; + _executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); + _executor.shutdownNow(); + _executor = null; _cleaner.cancel(); _context.statManager().removeRateStat("plugin.zzzot.announces.udp"); _announces.set(0); @@ -269,11 +284,6 @@ public class UDPHandler implements I2PSessionMuxedListener { return; } - // TODO use a waiter - Destination from = lookup(session, fromHash); - if (from == null) - return; - // parse packet byte[] bih = new byte[InfoHash.LENGTH]; System.arraycopy(data, 16, bih, 0, InfoHash.LENGTH); @@ -316,7 +326,7 @@ public class UDPHandler implements I2PSessionMuxedListener { } else { Peer p = peers.get(pid); if (p == null) { - p = new Peer(pid.getData(), from); + p = new Peer(pid.getData(), fromHash); Peer p2 = peers.putIfAbsent(pid, p); if (p2 != null) p = p2; @@ -360,6 +370,17 @@ public class UDPHandler implements I2PSessionMuxedListener { } } + Destination from = lookupCache(fromHash); + if (from == null) { + try { + _executor.execute(new Lookup(session, fromHash, fromPort, resp)); + } catch (RejectedExecutionException ree) { + if (_log.shouldWarn()) + _log.warn("error sending announce reply - thread pool full"); + } + return; + } + try { session.sendMessage(from, resp, I2PSession.PROTO_DATAGRAM_RAW, PORT, fromPort); if (_log.shouldDebug()) @@ -375,10 +396,13 @@ public class UDPHandler implements I2PSessionMuxedListener { * @param msg non-null */ private void sendError(I2PSession session, Hash toHash, int toPort, long transID, byte[] msg) { - // TODO use a waiter - Destination to = lookup(session, toHash); - if (to == null) + Destination to = lookupCache(toHash); + if (to == null) { + if (_log.shouldInfo()) + _log.info("don't have cached dest to send error to " + toHash.toBase32()); return; + } + // don't bother looking up via I2CP sendError(session, to, toPort, transID, msg); } @@ -406,16 +430,37 @@ public class UDPHandler implements I2PSessionMuxedListener { * @return null on failure */ private Destination lookup(I2PSession session, Hash hash) { - Destination rv; - synchronized(_destCache) { - rv = _destCache.get(hash); - } + Destination rv = lookupCache(hash); if (rv != null) return rv; - // TODO use a waiter + return lookupI2CP(session, hash); + } + + /** + * Nonblocking. + * @return null on failure + */ + private Destination lookupCache(Hash hash) { + // Test deferred + //if (true) return null; + synchronized(_destCache) { + return _destCache.get(hash); + } + } + + /** + * Blocking. + * @return null on failure + */ + private Destination lookupI2CP(I2PSession session, Hash hash) { + Destination rv; try { rv = session.lookupDest(hash, LOOKUP_TIMEOUT); - } catch (I2PSessionException ise) {} + } catch (I2PSessionException ise) { + if (_log.shouldWarn()) + _log.warn("lookup error", ise); + return null; + } if (rv == null) { if (_log.shouldWarn()) _log.warn("lookup failed for response to " + hash.toBase32()); @@ -458,4 +503,69 @@ public class UDPHandler implements I2PSessionMuxedListener { schedule(STAT_TIME); } } + + /** + * Until we have a nonblocking lookup API in I2CP + * + * @since 0.20.0 + */ + private class Lookup implements Runnable { + private final I2PSession _session; + private final Hash _hash; + private final int _port; + private final byte[] _msg; + + public Lookup(I2PSession sess, Hash h, int port, byte[] msg) { + _session = sess; + _hash = h; + _port = port; + _msg = msg; + } + + public void run() { + // blocking + Destination d = lookupI2CP(_session, _hash); + if (d == null) { + if (_log.shouldWarn()) + _log.warn("deferred lookup failed for " + _hash.toBase32()); + return; + } + try { + _session.sendMessage(d, _msg, I2PSession.PROTO_DATAGRAM_RAW, PORT, _port); + if (_log.shouldDebug()) + _log.debug("sent deferred reply to " + _hash.toBase32()); + } catch (I2PSessionException ise) { + if (_log.shouldWarn()) + _log.warn("error sending deferred reply", ise); + } + } + } + + /** + * Until we have a nonblocking lookup API in I2CP + * + * @since 0.20.0 + */ + private static class CustomThreadPoolExecutor extends ThreadPoolExecutor { + public CustomThreadPoolExecutor() { + super(0, 25, HANDLER_KEEPALIVE_MS, TimeUnit.MILLISECONDS, + new SynchronousQueue(), new CustomThreadFactory()); + } + } + + /** + * Just to set the name and set Daemon + * + * @since 0.20.0 + */ + private static class CustomThreadFactory implements ThreadFactory { + private final AtomicInteger _executorThreadCount = new AtomicInteger(); + + public Thread newThread(Runnable r) { + Thread rv = Executors.defaultThreadFactory().newThread(r); + rv.setName("ZzzOT lookup " + _executorThreadCount.incrementAndGet()); + rv.setDaemon(true); + return rv; + } + } } diff --git a/src/java/net/i2p/zzzot/ZzzOTController.java b/src/java/net/i2p/zzzot/ZzzOTController.java index bd3d736..cdcfc4a 100644 --- a/src/java/net/i2p/zzzot/ZzzOTController.java +++ b/src/java/net/i2p/zzzot/ZzzOTController.java @@ -82,7 +82,7 @@ public class ZzzOTController implements ClientApp { private static final String NAME = "ZzzOT"; private static final String DEFAULT_SITENAME = "ZZZOT"; private static final String PROP_SITENAME = "sitename"; - private static final String VERSION = "0.20.0-beta"; + private static final String VERSION = "0.20.0-beta2"; private static final String DEFAULT_SHOWFOOTER = "true"; private static final String PROP_SHOWFOOTER = "showfooter"; private static final String DEFAULT_FOOTERTEXT = "Running ZZZOT " + VERSION;