Compare commits
3 Commits
zzzot-0.20
...
zzzot-0.20
Author | SHA1 | Date | |
---|---|---|---|
5c08658360 | |||
02328bd5d4 | |||
6bc7821f1b |
@ -4,7 +4,11 @@
|
||||
s/<Ref id=/<Ref refid=/g
|
||||
- Add interval to stats page
|
||||
- Add stats to I2P stats subsystem
|
||||
- Show announce URLs on stats page
|
||||
- Remove ElGamal support
|
||||
- Remove support for non-compact announce replies
|
||||
- Reduce memory usage
|
||||
- Remove seedless support
|
||||
|
||||
2024-04-07 [0.19.0]
|
||||
- Disable full scrape by default
|
||||
|
@ -36,6 +36,13 @@ public class Peer {
|
||||
hash = address.calculateHash();
|
||||
}
|
||||
|
||||
/**
|
||||
* @since 0.20.0
|
||||
*/
|
||||
public Peer(byte[] id, Hash h) {
|
||||
hash = h;
|
||||
}
|
||||
|
||||
public void setLeft(long l) {
|
||||
bytesLeft = l;
|
||||
lastSeen = System.currentTimeMillis();
|
||||
|
@ -22,6 +22,13 @@ import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import net.i2p.I2PAppContext;
|
||||
import net.i2p.client.I2PSession;
|
||||
@ -51,9 +58,14 @@ public class UDPHandler implements I2PSessionMuxedListener {
|
||||
private final Log _log;
|
||||
private final I2PTunnel _tunnel;
|
||||
private final ZzzOT _zzzot;
|
||||
private final Cleaner _cleaner;
|
||||
private final long sipk0, sipk1;
|
||||
private final Map<Hash, Destination> _destCache;
|
||||
private final AtomicInteger _announces = new AtomicInteger();
|
||||
private volatile boolean _running;
|
||||
private ThreadPoolExecutor _executor;
|
||||
/** how long to wait before dropping an idle thread */
|
||||
private static final long HANDLER_KEEPALIVE_MS = 2*60*1000;
|
||||
|
||||
// The listen port.
|
||||
public final int PORT;
|
||||
@ -67,9 +79,11 @@ public class UDPHandler implements I2PSessionMuxedListener {
|
||||
private static final int EVENT_COMPLETED = 1;
|
||||
private static final int EVENT_STARTED = 2;
|
||||
private static final int EVENT_STOPPED = 3;
|
||||
// keep it short, we should have the leaseset
|
||||
private final long LOOKUP_TIMEOUT = 1000;
|
||||
// keep it short, we should have the leaseset,
|
||||
// if a new ratchet session was created
|
||||
private final long LOOKUP_TIMEOUT = 2000;
|
||||
private final long CLEAN_TIME;
|
||||
private final long STAT_TIME = 2*60*1000;
|
||||
private static final byte[] INVALID = DataHelper.getUTF8("Invalid connection ID");
|
||||
private static final byte[] PROTOCOL = DataHelper.getUTF8("Bad protocol");
|
||||
private static final byte[] SCRAPE = DataHelper.getUTF8("Scrape unsupported");
|
||||
@ -81,6 +95,7 @@ public class UDPHandler implements I2PSessionMuxedListener {
|
||||
_zzzot = zzzot;
|
||||
CLEAN_TIME = (zzzot.getTorrents().getUDPLifetime() + 60) * 1000;
|
||||
PORT = port;
|
||||
_cleaner = new Cleaner();
|
||||
sipk0 = ctx.random().nextLong();
|
||||
sipk1 = ctx.random().nextLong();
|
||||
// the highest-traffic zzzot is running about 3000 announces/minute,
|
||||
@ -88,16 +103,26 @@ public class UDPHandler implements I2PSessionMuxedListener {
|
||||
_destCache = new LHMCache<Hash, Destination>(1024);
|
||||
}
|
||||
|
||||
public void start() {
|
||||
public synchronized void start() {
|
||||
_running = true;
|
||||
_executor = new CustomThreadPoolExecutor();
|
||||
_executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
|
||||
(new I2PAppThread(new Waiter(), "ZzzOT UDP startup", true)).start();
|
||||
long[] r = new long[] { 5*60*1000 };
|
||||
_context.statManager().createRequiredRateStat("plugin.zzzot.announces.udp", "UDP announces per minute", "Plugins", r);
|
||||
}
|
||||
|
||||
/**
|
||||
* @since 0.20.0
|
||||
*/
|
||||
public void stop() {
|
||||
public synchronized void stop() {
|
||||
_running = false;
|
||||
_executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
|
||||
_executor.shutdownNow();
|
||||
_executor = null;
|
||||
_cleaner.cancel();
|
||||
_context.statManager().removeRateStat("plugin.zzzot.announces.udp");
|
||||
_announces.set(0);
|
||||
}
|
||||
|
||||
private class Waiter implements Runnable {
|
||||
@ -112,6 +137,7 @@ public class UDPHandler implements I2PSessionMuxedListener {
|
||||
I2PSession session = sessions.get(0);
|
||||
session.addMuxedSessionListener(UDPHandler.this, I2PSession.PROTO_DATAGRAM2, PORT);
|
||||
session.addMuxedSessionListener(UDPHandler.this, I2PSession.PROTO_DATAGRAM3, PORT);
|
||||
_cleaner.schedule(STAT_TIME);
|
||||
if (_log.shouldInfo())
|
||||
_log.info("got session");
|
||||
break;
|
||||
@ -154,6 +180,7 @@ public class UDPHandler implements I2PSessionMuxedListener {
|
||||
public void reportAbuse(I2PSession arg0, int arg1) {}
|
||||
|
||||
public void disconnected(I2PSession arg0) {
|
||||
_cleaner.cancel();
|
||||
}
|
||||
|
||||
public void errorOccurred(I2PSession arg0, String arg1, Throwable arg2) {
|
||||
@ -257,11 +284,6 @@ public class UDPHandler implements I2PSessionMuxedListener {
|
||||
return;
|
||||
}
|
||||
|
||||
// TODO use a waiter
|
||||
Destination from = lookup(session, fromHash);
|
||||
if (from == null)
|
||||
return;
|
||||
|
||||
// parse packet
|
||||
byte[] bih = new byte[InfoHash.LENGTH];
|
||||
System.arraycopy(data, 16, bih, 0, InfoHash.LENGTH);
|
||||
@ -286,6 +308,7 @@ public class UDPHandler implements I2PSessionMuxedListener {
|
||||
Torrents torrents = _zzzot.getTorrents();
|
||||
Peers peers = torrents.get(ih);
|
||||
if (peers == null && event != EVENT_STOPPED) {
|
||||
_announces.incrementAndGet();
|
||||
peers = new Peers();
|
||||
Peers p2 = torrents.putIfAbsent(ih, peers);
|
||||
if (p2 != null)
|
||||
@ -303,7 +326,7 @@ public class UDPHandler implements I2PSessionMuxedListener {
|
||||
} else {
|
||||
Peer p = peers.get(pid);
|
||||
if (p == null) {
|
||||
p = new Peer(pid.getData(), from);
|
||||
p = new Peer(pid.getData(), fromHash);
|
||||
Peer p2 = peers.putIfAbsent(pid, p);
|
||||
if (p2 != null)
|
||||
p = p2;
|
||||
@ -347,6 +370,17 @@ public class UDPHandler implements I2PSessionMuxedListener {
|
||||
}
|
||||
}
|
||||
|
||||
Destination from = lookupCache(fromHash);
|
||||
if (from == null) {
|
||||
try {
|
||||
_executor.execute(new Lookup(session, fromHash, fromPort, resp));
|
||||
} catch (RejectedExecutionException ree) {
|
||||
if (_log.shouldWarn())
|
||||
_log.warn("error sending announce reply - thread pool full");
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
session.sendMessage(from, resp, I2PSession.PROTO_DATAGRAM_RAW, PORT, fromPort);
|
||||
if (_log.shouldDebug())
|
||||
@ -362,10 +396,13 @@ public class UDPHandler implements I2PSessionMuxedListener {
|
||||
* @param msg non-null
|
||||
*/
|
||||
private void sendError(I2PSession session, Hash toHash, int toPort, long transID, byte[] msg) {
|
||||
// TODO use a waiter
|
||||
Destination to = lookup(session, toHash);
|
||||
if (to == null)
|
||||
Destination to = lookupCache(toHash);
|
||||
if (to == null) {
|
||||
if (_log.shouldInfo())
|
||||
_log.info("don't have cached dest to send error to " + toHash.toBase32());
|
||||
return;
|
||||
}
|
||||
// don't bother looking up via I2CP
|
||||
sendError(session, to, toPort, transID, msg);
|
||||
}
|
||||
|
||||
@ -393,16 +430,37 @@ public class UDPHandler implements I2PSessionMuxedListener {
|
||||
* @return null on failure
|
||||
*/
|
||||
private Destination lookup(I2PSession session, Hash hash) {
|
||||
Destination rv;
|
||||
synchronized(_destCache) {
|
||||
rv = _destCache.get(hash);
|
||||
}
|
||||
Destination rv = lookupCache(hash);
|
||||
if (rv != null)
|
||||
return rv;
|
||||
// TODO use a waiter
|
||||
return lookupI2CP(session, hash);
|
||||
}
|
||||
|
||||
/**
|
||||
* Nonblocking.
|
||||
* @return null on failure
|
||||
*/
|
||||
private Destination lookupCache(Hash hash) {
|
||||
// Test deferred
|
||||
//if (true) return null;
|
||||
synchronized(_destCache) {
|
||||
return _destCache.get(hash);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Blocking.
|
||||
* @return null on failure
|
||||
*/
|
||||
private Destination lookupI2CP(I2PSession session, Hash hash) {
|
||||
Destination rv;
|
||||
try {
|
||||
rv = session.lookupDest(hash, LOOKUP_TIMEOUT);
|
||||
} catch (I2PSessionException ise) {}
|
||||
} catch (I2PSessionException ise) {
|
||||
if (_log.shouldWarn())
|
||||
_log.warn("lookup error", ise);
|
||||
return null;
|
||||
}
|
||||
if (rv == null) {
|
||||
if (_log.shouldWarn())
|
||||
_log.warn("lookup failed for response to " + hash.toBase32());
|
||||
@ -433,4 +491,81 @@ public class UDPHandler implements I2PSessionMuxedListener {
|
||||
c = SipHashInline.hash24(sipk0, sipk1, buf);
|
||||
return cid == c;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the announce stat and set the announce count to 0
|
||||
*/
|
||||
private class Cleaner extends SimpleTimer2.TimedEvent {
|
||||
public Cleaner() { super(_context.simpleTimer2()); }
|
||||
public void timeReached() {
|
||||
long count = _announces.getAndSet(0);
|
||||
_context.statManager().addRateData("plugin.zzzot.announces.udp", count / (STAT_TIME / (60*1000L)));
|
||||
schedule(STAT_TIME);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Until we have a nonblocking lookup API in I2CP
|
||||
*
|
||||
* @since 0.20.0
|
||||
*/
|
||||
private class Lookup implements Runnable {
|
||||
private final I2PSession _session;
|
||||
private final Hash _hash;
|
||||
private final int _port;
|
||||
private final byte[] _msg;
|
||||
|
||||
public Lookup(I2PSession sess, Hash h, int port, byte[] msg) {
|
||||
_session = sess;
|
||||
_hash = h;
|
||||
_port = port;
|
||||
_msg = msg;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
// blocking
|
||||
Destination d = lookupI2CP(_session, _hash);
|
||||
if (d == null) {
|
||||
if (_log.shouldWarn())
|
||||
_log.warn("deferred lookup failed for " + _hash.toBase32());
|
||||
return;
|
||||
}
|
||||
try {
|
||||
_session.sendMessage(d, _msg, I2PSession.PROTO_DATAGRAM_RAW, PORT, _port);
|
||||
if (_log.shouldDebug())
|
||||
_log.debug("sent deferred reply to " + _hash.toBase32());
|
||||
} catch (I2PSessionException ise) {
|
||||
if (_log.shouldWarn())
|
||||
_log.warn("error sending deferred reply", ise);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Until we have a nonblocking lookup API in I2CP
|
||||
*
|
||||
* @since 0.20.0
|
||||
*/
|
||||
private static class CustomThreadPoolExecutor extends ThreadPoolExecutor {
|
||||
public CustomThreadPoolExecutor() {
|
||||
super(0, 25, HANDLER_KEEPALIVE_MS, TimeUnit.MILLISECONDS,
|
||||
new SynchronousQueue<Runnable>(), new CustomThreadFactory());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Just to set the name and set Daemon
|
||||
*
|
||||
* @since 0.20.0
|
||||
*/
|
||||
private static class CustomThreadFactory implements ThreadFactory {
|
||||
private final AtomicInteger _executorThreadCount = new AtomicInteger();
|
||||
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread rv = Executors.defaultThreadFactory().newThread(r);
|
||||
rv.setName("ZzzOT lookup " + _executorThreadCount.incrementAndGet());
|
||||
rv.setDaemon(true);
|
||||
return rv;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -36,8 +36,7 @@ class ZzzOT {
|
||||
|
||||
private static final String PROP_INTERVAL = "interval";
|
||||
private static final String PROP_UDP_LIFETIME = "lifetime";
|
||||
private static final long CLEAN_TIME = 4*60*1000;
|
||||
private static final long DEST_CACHE_CLEAN_TIME = 3*60*60*1000;
|
||||
private static final long CLEAN_TIME = 2*60*1000;
|
||||
private static final int DEFAULT_INTERVAL = 27*60;
|
||||
private static final int DEFAULT_UDP_LIFETIME = 20*60;
|
||||
private static final int MIN_INTERVAL = 15*60;
|
||||
@ -81,7 +80,7 @@ class ZzzOT {
|
||||
void start() {
|
||||
_cleaner.forceReschedule(CLEAN_TIME);
|
||||
long[] r = new long[] { 5*60*1000 };
|
||||
_context.statManager().createRequiredRateStat("plugin.zzzot.announces", "Announces per minute", "Plugins", r);
|
||||
_context.statManager().createRequiredRateStat("plugin.zzzot.announces", "Total announces per minute", "Plugins", r);
|
||||
_context.statManager().createRequiredRateStat("plugin.zzzot.peers", "Number of peers", "Plugins", r);
|
||||
_context.statManager().createRequiredRateStat("plugin.zzzot.torrents", "Number of torrents", "Plugins", r);
|
||||
}
|
||||
|
@ -82,7 +82,7 @@ public class ZzzOTController implements ClientApp {
|
||||
private static final String NAME = "ZzzOT";
|
||||
private static final String DEFAULT_SITENAME = "ZZZOT";
|
||||
private static final String PROP_SITENAME = "sitename";
|
||||
private static final String VERSION = "0.20.0-beta";
|
||||
private static final String VERSION = "0.20.0-beta2";
|
||||
private static final String DEFAULT_SHOWFOOTER = "true";
|
||||
private static final String PROP_SHOWFOOTER = "showfooter";
|
||||
private static final String DEFAULT_FOOTERTEXT = "Running <a href=\"http://git.idk.i2p/i2p-hackers/i2p.plugins.zzzot\" target=\"_blank\">ZZZOT</a> " + VERSION;
|
||||
@ -180,6 +180,20 @@ public class ZzzOTController implements ClientApp {
|
||||
return r.getAvgOrLifetimeAvg();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return announces per minute, 0 if not running or UDP not enabled
|
||||
* @since 0.20.0
|
||||
*/
|
||||
public static double getUDPAnnounceRate() {
|
||||
RateStat rs = I2PAppContext.getGlobalContext().statManager().getRate("plugin.zzzot.announces.udp");
|
||||
if (rs == null)
|
||||
return 0;
|
||||
Rate r = rs.getRate(5*60*1000);
|
||||
if (r == null)
|
||||
return 0;
|
||||
return r.getAvgOrLifetimeAvg();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return false if not running
|
||||
* @since 0.20.0
|
||||
|
@ -20,19 +20,31 @@
|
||||
<p id="totals">
|
||||
<b>Torrents:</b> <%=torrents.size()%><br>
|
||||
<b>Peers:</b> <%=torrents.countPeers()%><br>
|
||||
<b>Announce Rate:</b> <%=String.format(java.util.Locale.US, "%.1f", ZzzOTController.getAnnounceRate())%> / minute<br>
|
||||
<%
|
||||
boolean udp = ZzzOTController.isUDPEnabled();
|
||||
if (udp) {
|
||||
%>
|
||||
<b>Total Announce Rate:</b>
|
||||
<%
|
||||
} else {
|
||||
%>
|
||||
<b>Announce Rate:</b>
|
||||
<%
|
||||
}
|
||||
%>
|
||||
<%=String.format(java.util.Locale.US, "%.1f", ZzzOTController.getAnnounceRate())%> / minute<br>
|
||||
<b>Announce Interval:</b> <%=torrents.getInterval() / 60%> minutes<br>
|
||||
<%
|
||||
String host = ZzzOTController.b32();
|
||||
if (host != null) {
|
||||
%><b>Announce URL:</b> <a href="http://<%=host%>/a">http://<%=host%>/a</a><br><%
|
||||
}
|
||||
boolean udp = ZzzOTController.isUDPEnabled();
|
||||
%>
|
||||
<b>UDP Announce Support:</b> <%=udp ? "yes" : "no"%><br>
|
||||
<%
|
||||
if (udp) {
|
||||
%>
|
||||
<b>UDP Announce Rate:</b> <%=String.format(java.util.Locale.US, "%.1f", ZzzOTController.getUDPAnnounceRate())%> / minute<br>
|
||||
<b>UDP Connection Lifetime:</b> <%=torrents.getUDPLifetime() / 60%> minutes<br>
|
||||
<%
|
||||
if (host != null) {
|
||||
|
Reference in New Issue
Block a user