3 Commits

Author SHA1 Message Date
zzz
5c08658360 Thread I2CP lookups
until we have a nonblocking I2CP lookup API.
Most of the time, this won't be needed, as our cache
will have it, but if additional announces come in over
the lifetime, we'll have to do a I2CP lookup.
Even then, the I2CP cache may have it,
or the router should have the LS, so it should be quick.
2025-05-01 18:40:29 -04:00
zzz
02328bd5d4 Add UDP announce rate stat 2025-04-29 08:59:30 -04:00
zzz
6bc7821f1b Reduce clean interval
To reduce memory usage and get at least two stat updates per 5 minutes
2025-04-28 09:24:00 -04:00
6 changed files with 196 additions and 25 deletions

View File

@ -4,7 +4,11 @@
s/<Ref id=/<Ref refid=/g
- Add interval to stats page
- Add stats to I2P stats subsystem
- Show announce URLs on stats page
- Remove ElGamal support
- Remove support for non-compact announce replies
- Reduce memory usage
- Remove seedless support
2024-04-07 [0.19.0]
- Disable full scrape by default

View File

@ -36,6 +36,13 @@ public class Peer {
hash = address.calculateHash();
}
/**
* @since 0.20.0
*/
public Peer(byte[] id, Hash h) {
hash = h;
}
public void setLeft(long l) {
bytesLeft = l;
lastSeen = System.currentTimeMillis();

View File

@ -22,6 +22,13 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import net.i2p.I2PAppContext;
import net.i2p.client.I2PSession;
@ -51,9 +58,14 @@ public class UDPHandler implements I2PSessionMuxedListener {
private final Log _log;
private final I2PTunnel _tunnel;
private final ZzzOT _zzzot;
private final Cleaner _cleaner;
private final long sipk0, sipk1;
private final Map<Hash, Destination> _destCache;
private final AtomicInteger _announces = new AtomicInteger();
private volatile boolean _running;
private ThreadPoolExecutor _executor;
/** how long to wait before dropping an idle thread */
private static final long HANDLER_KEEPALIVE_MS = 2*60*1000;
// The listen port.
public final int PORT;
@ -67,9 +79,11 @@ public class UDPHandler implements I2PSessionMuxedListener {
private static final int EVENT_COMPLETED = 1;
private static final int EVENT_STARTED = 2;
private static final int EVENT_STOPPED = 3;
// keep it short, we should have the leaseset
private final long LOOKUP_TIMEOUT = 1000;
// keep it short, we should have the leaseset,
// if a new ratchet session was created
private final long LOOKUP_TIMEOUT = 2000;
private final long CLEAN_TIME;
private final long STAT_TIME = 2*60*1000;
private static final byte[] INVALID = DataHelper.getUTF8("Invalid connection ID");
private static final byte[] PROTOCOL = DataHelper.getUTF8("Bad protocol");
private static final byte[] SCRAPE = DataHelper.getUTF8("Scrape unsupported");
@ -81,6 +95,7 @@ public class UDPHandler implements I2PSessionMuxedListener {
_zzzot = zzzot;
CLEAN_TIME = (zzzot.getTorrents().getUDPLifetime() + 60) * 1000;
PORT = port;
_cleaner = new Cleaner();
sipk0 = ctx.random().nextLong();
sipk1 = ctx.random().nextLong();
// the highest-traffic zzzot is running about 3000 announces/minute,
@ -88,16 +103,26 @@ public class UDPHandler implements I2PSessionMuxedListener {
_destCache = new LHMCache<Hash, Destination>(1024);
}
public void start() {
public synchronized void start() {
_running = true;
_executor = new CustomThreadPoolExecutor();
_executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
(new I2PAppThread(new Waiter(), "ZzzOT UDP startup", true)).start();
long[] r = new long[] { 5*60*1000 };
_context.statManager().createRequiredRateStat("plugin.zzzot.announces.udp", "UDP announces per minute", "Plugins", r);
}
/**
* @since 0.20.0
*/
public void stop() {
public synchronized void stop() {
_running = false;
_executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
_executor.shutdownNow();
_executor = null;
_cleaner.cancel();
_context.statManager().removeRateStat("plugin.zzzot.announces.udp");
_announces.set(0);
}
private class Waiter implements Runnable {
@ -112,6 +137,7 @@ public class UDPHandler implements I2PSessionMuxedListener {
I2PSession session = sessions.get(0);
session.addMuxedSessionListener(UDPHandler.this, I2PSession.PROTO_DATAGRAM2, PORT);
session.addMuxedSessionListener(UDPHandler.this, I2PSession.PROTO_DATAGRAM3, PORT);
_cleaner.schedule(STAT_TIME);
if (_log.shouldInfo())
_log.info("got session");
break;
@ -154,6 +180,7 @@ public class UDPHandler implements I2PSessionMuxedListener {
public void reportAbuse(I2PSession arg0, int arg1) {}
public void disconnected(I2PSession arg0) {
_cleaner.cancel();
}
public void errorOccurred(I2PSession arg0, String arg1, Throwable arg2) {
@ -257,11 +284,6 @@ public class UDPHandler implements I2PSessionMuxedListener {
return;
}
// TODO use a waiter
Destination from = lookup(session, fromHash);
if (from == null)
return;
// parse packet
byte[] bih = new byte[InfoHash.LENGTH];
System.arraycopy(data, 16, bih, 0, InfoHash.LENGTH);
@ -286,6 +308,7 @@ public class UDPHandler implements I2PSessionMuxedListener {
Torrents torrents = _zzzot.getTorrents();
Peers peers = torrents.get(ih);
if (peers == null && event != EVENT_STOPPED) {
_announces.incrementAndGet();
peers = new Peers();
Peers p2 = torrents.putIfAbsent(ih, peers);
if (p2 != null)
@ -303,7 +326,7 @@ public class UDPHandler implements I2PSessionMuxedListener {
} else {
Peer p = peers.get(pid);
if (p == null) {
p = new Peer(pid.getData(), from);
p = new Peer(pid.getData(), fromHash);
Peer p2 = peers.putIfAbsent(pid, p);
if (p2 != null)
p = p2;
@ -347,6 +370,17 @@ public class UDPHandler implements I2PSessionMuxedListener {
}
}
Destination from = lookupCache(fromHash);
if (from == null) {
try {
_executor.execute(new Lookup(session, fromHash, fromPort, resp));
} catch (RejectedExecutionException ree) {
if (_log.shouldWarn())
_log.warn("error sending announce reply - thread pool full");
}
return;
}
try {
session.sendMessage(from, resp, I2PSession.PROTO_DATAGRAM_RAW, PORT, fromPort);
if (_log.shouldDebug())
@ -362,10 +396,13 @@ public class UDPHandler implements I2PSessionMuxedListener {
* @param msg non-null
*/
private void sendError(I2PSession session, Hash toHash, int toPort, long transID, byte[] msg) {
// TODO use a waiter
Destination to = lookup(session, toHash);
if (to == null)
Destination to = lookupCache(toHash);
if (to == null) {
if (_log.shouldInfo())
_log.info("don't have cached dest to send error to " + toHash.toBase32());
return;
}
// don't bother looking up via I2CP
sendError(session, to, toPort, transID, msg);
}
@ -393,16 +430,37 @@ public class UDPHandler implements I2PSessionMuxedListener {
* @return null on failure
*/
private Destination lookup(I2PSession session, Hash hash) {
Destination rv;
synchronized(_destCache) {
rv = _destCache.get(hash);
}
Destination rv = lookupCache(hash);
if (rv != null)
return rv;
// TODO use a waiter
return lookupI2CP(session, hash);
}
/**
* Nonblocking.
* @return null on failure
*/
private Destination lookupCache(Hash hash) {
// Test deferred
//if (true) return null;
synchronized(_destCache) {
return _destCache.get(hash);
}
}
/**
* Blocking.
* @return null on failure
*/
private Destination lookupI2CP(I2PSession session, Hash hash) {
Destination rv;
try {
rv = session.lookupDest(hash, LOOKUP_TIMEOUT);
} catch (I2PSessionException ise) {}
} catch (I2PSessionException ise) {
if (_log.shouldWarn())
_log.warn("lookup error", ise);
return null;
}
if (rv == null) {
if (_log.shouldWarn())
_log.warn("lookup failed for response to " + hash.toBase32());
@ -433,4 +491,81 @@ public class UDPHandler implements I2PSessionMuxedListener {
c = SipHashInline.hash24(sipk0, sipk1, buf);
return cid == c;
}
/**
* Update the announce stat and set the announce count to 0
*/
private class Cleaner extends SimpleTimer2.TimedEvent {
public Cleaner() { super(_context.simpleTimer2()); }
public void timeReached() {
long count = _announces.getAndSet(0);
_context.statManager().addRateData("plugin.zzzot.announces.udp", count / (STAT_TIME / (60*1000L)));
schedule(STAT_TIME);
}
}
/**
* Until we have a nonblocking lookup API in I2CP
*
* @since 0.20.0
*/
private class Lookup implements Runnable {
private final I2PSession _session;
private final Hash _hash;
private final int _port;
private final byte[] _msg;
public Lookup(I2PSession sess, Hash h, int port, byte[] msg) {
_session = sess;
_hash = h;
_port = port;
_msg = msg;
}
public void run() {
// blocking
Destination d = lookupI2CP(_session, _hash);
if (d == null) {
if (_log.shouldWarn())
_log.warn("deferred lookup failed for " + _hash.toBase32());
return;
}
try {
_session.sendMessage(d, _msg, I2PSession.PROTO_DATAGRAM_RAW, PORT, _port);
if (_log.shouldDebug())
_log.debug("sent deferred reply to " + _hash.toBase32());
} catch (I2PSessionException ise) {
if (_log.shouldWarn())
_log.warn("error sending deferred reply", ise);
}
}
}
/**
* Until we have a nonblocking lookup API in I2CP
*
* @since 0.20.0
*/
private static class CustomThreadPoolExecutor extends ThreadPoolExecutor {
public CustomThreadPoolExecutor() {
super(0, 25, HANDLER_KEEPALIVE_MS, TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(), new CustomThreadFactory());
}
}
/**
* Just to set the name and set Daemon
*
* @since 0.20.0
*/
private static class CustomThreadFactory implements ThreadFactory {
private final AtomicInteger _executorThreadCount = new AtomicInteger();
public Thread newThread(Runnable r) {
Thread rv = Executors.defaultThreadFactory().newThread(r);
rv.setName("ZzzOT lookup " + _executorThreadCount.incrementAndGet());
rv.setDaemon(true);
return rv;
}
}
}

View File

@ -36,8 +36,7 @@ class ZzzOT {
private static final String PROP_INTERVAL = "interval";
private static final String PROP_UDP_LIFETIME = "lifetime";
private static final long CLEAN_TIME = 4*60*1000;
private static final long DEST_CACHE_CLEAN_TIME = 3*60*60*1000;
private static final long CLEAN_TIME = 2*60*1000;
private static final int DEFAULT_INTERVAL = 27*60;
private static final int DEFAULT_UDP_LIFETIME = 20*60;
private static final int MIN_INTERVAL = 15*60;
@ -81,7 +80,7 @@ class ZzzOT {
void start() {
_cleaner.forceReschedule(CLEAN_TIME);
long[] r = new long[] { 5*60*1000 };
_context.statManager().createRequiredRateStat("plugin.zzzot.announces", "Announces per minute", "Plugins", r);
_context.statManager().createRequiredRateStat("plugin.zzzot.announces", "Total announces per minute", "Plugins", r);
_context.statManager().createRequiredRateStat("plugin.zzzot.peers", "Number of peers", "Plugins", r);
_context.statManager().createRequiredRateStat("plugin.zzzot.torrents", "Number of torrents", "Plugins", r);
}

View File

@ -82,7 +82,7 @@ public class ZzzOTController implements ClientApp {
private static final String NAME = "ZzzOT";
private static final String DEFAULT_SITENAME = "ZZZOT";
private static final String PROP_SITENAME = "sitename";
private static final String VERSION = "0.20.0-beta";
private static final String VERSION = "0.20.0-beta2";
private static final String DEFAULT_SHOWFOOTER = "true";
private static final String PROP_SHOWFOOTER = "showfooter";
private static final String DEFAULT_FOOTERTEXT = "Running <a href=\"http://git.idk.i2p/i2p-hackers/i2p.plugins.zzzot\" target=\"_blank\">ZZZOT</a> " + VERSION;
@ -180,6 +180,20 @@ public class ZzzOTController implements ClientApp {
return r.getAvgOrLifetimeAvg();
}
/**
* @return announces per minute, 0 if not running or UDP not enabled
* @since 0.20.0
*/
public static double getUDPAnnounceRate() {
RateStat rs = I2PAppContext.getGlobalContext().statManager().getRate("plugin.zzzot.announces.udp");
if (rs == null)
return 0;
Rate r = rs.getRate(5*60*1000);
if (r == null)
return 0;
return r.getAvgOrLifetimeAvg();
}
/**
* @return false if not running
* @since 0.20.0

View File

@ -20,19 +20,31 @@
<p id="totals">
<b>Torrents:</b> <%=torrents.size()%><br>
<b>Peers:</b> <%=torrents.countPeers()%><br>
<b>Announce Rate:</b> <%=String.format(java.util.Locale.US, "%.1f", ZzzOTController.getAnnounceRate())%> / minute<br>
<%
boolean udp = ZzzOTController.isUDPEnabled();
if (udp) {
%>
<b>Total Announce Rate:</b>
<%
} else {
%>
<b>Announce Rate:</b>
<%
}
%>
<%=String.format(java.util.Locale.US, "%.1f", ZzzOTController.getAnnounceRate())%> / minute<br>
<b>Announce Interval:</b> <%=torrents.getInterval() / 60%> minutes<br>
<%
String host = ZzzOTController.b32();
if (host != null) {
%><b>Announce URL:</b> <a href="http://<%=host%>/a">http://<%=host%>/a</a><br><%
}
boolean udp = ZzzOTController.isUDPEnabled();
%>
<b>UDP Announce Support:</b> <%=udp ? "yes" : "no"%><br>
<%
if (udp) {
%>
<b>UDP Announce Rate:</b> <%=String.format(java.util.Locale.US, "%.1f", ZzzOTController.getUDPAnnounceRate())%> / minute<br>
<b>UDP Connection Lifetime:</b> <%=torrents.getUDPLifetime() / 60%> minutes<br>
<%
if (host != null) {