Merge branch 'prop168' into 'master'

Tunnels: Implement proposal 168 - Part 1

See merge request i2p-hackers/i2p.i2p!235
This commit is contained in:
zzz
2025-04-16 10:24:19 +00:00
8 changed files with 254 additions and 24 deletions

View File

@ -345,7 +345,16 @@ public class RouterThrottleImpl implements RouterThrottle {
return TUNNEL_ACCEPT;
}
static final int DEFAULT_MESSAGES_PER_TUNNEL_ESTIMATE = 40; // .067KBps
/**
* This is the estimated number of 1 KB tunnel messages that we'll see in
* the 10 minute lifetime of an exploratory tunnel. We use it as a baseline
* minimum for estimating tunnel bandwidth, if accepted.
*
* 40 KB in 10 minutes equals 67 Bps.
*
* @since public since 0.9.66, was package private
*/
public static final int DEFAULT_MESSAGES_PER_TUNNEL_ESTIMATE = 40; // .067KBps
/** also limited to 90% - see below */
private static final int MIN_AVAILABLE_BPS = 4*1024; // always leave at least 4KBps free when allowing
private static final String LIMIT_STR = _x("Rejecting tunnels: Bandwidth limit");

View File

@ -30,6 +30,7 @@ public class HopConfig {
private int _oldMessagesProcessed;
//private int _messagesSent;
//private int _oldMessagesSent;
private int _allocatedBW;
public HopConfig() {
_creation = -1;
@ -156,6 +157,22 @@ public class HopConfig {
return rv;
}
/**
* @return Bps
* @since 0.9.66
*/
public int getAllocatedBW() {
return _allocatedBW;
}
/**
* @param bw Bps
* @since 0.9.66
*/
public void setAllocatedBW(int bw) {
_allocatedBW = bw;
}
/**
* Take note of a message being pumped through this tunnel.
* "processed" is for incoming and "sent" is for outgoing (could be dropped in between)

View File

@ -6,6 +6,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import net.i2p.data.DataHelper;
import net.i2p.data.Hash;
@ -91,6 +92,7 @@ public class TunnelDispatcher implements Service {
//private long _lastDropTime;
private final TunnelGatewayPumper _pumper;
private final Object _joinParticipantLock = new Object();
private final AtomicInteger _allocatedBW = new AtomicInteger();
/** for shouldDropParticipatingMessage() */
enum Location {OBEP, PARTICIPANT, IBGW}
@ -326,6 +328,7 @@ public class TunnelDispatcher implements Service {
if (cfg.getExpiration() > _lastParticipatingExpiration)
_lastParticipatingExpiration = cfg.getExpiration();
_leaveJob.add(cfg);
_allocatedBW.addAndGet(cfg.getAllocatedBW());
return true;
}
@ -353,6 +356,7 @@ public class TunnelDispatcher implements Service {
if (cfg.getExpiration() > _lastParticipatingExpiration)
_lastParticipatingExpiration = cfg.getExpiration();
_leaveJob.add(cfg);
_allocatedBW.addAndGet(cfg.getAllocatedBW());
return true;
}
@ -384,9 +388,17 @@ public class TunnelDispatcher implements Service {
if (cfg.getExpiration() > _lastParticipatingExpiration)
_lastParticipatingExpiration = cfg.getExpiration();
_leaveJob.add(cfg);
_allocatedBW.addAndGet(cfg.getAllocatedBW());
return true;
}
/**
* @since 0.9.66
*/
public int getAllocatedBW() {
return _allocatedBW.get();
}
public int getParticipatingCount() {
return _participatingConfig.size();
}
@ -958,6 +970,7 @@ public class TunnelDispatcher implements Service {
if (_log.shouldLog(Log.INFO))
_log.info("Expiring " + cur);
remove(cur);
_allocatedBW.addAndGet(0 - cur.getAllocatedBW());
} else {
if (exp < nextTime)
nextTime = exp;

View File

@ -30,12 +30,15 @@ import net.i2p.router.Job;
import net.i2p.router.JobImpl;
import net.i2p.router.OutNetMessage;
import net.i2p.router.RouterContext;
import net.i2p.router.RouterThrottleImpl;
import net.i2p.router.networkdb.kademlia.MessageWrapper;
import net.i2p.router.peermanager.TunnelHistory;
import net.i2p.router.tunnel.HopConfig;
import net.i2p.router.tunnel.TunnelDispatcher;
import static net.i2p.router.tunnel.pool.BuildExecutor.Result.*;
import net.i2p.router.util.CDQEntry;
import net.i2p.stat.Rate;
import net.i2p.stat.RateStat;
import net.i2p.util.Log;
/**
@ -102,6 +105,14 @@ class BuildHandler implements Runnable {
private static final long JOB_LAG_LIMIT_TUNNEL = 350;
/**
* This is the baseline minimum for estimating tunnel bandwidth, if accepted.
* We use an estimate of 40 messages (1 KB each) in 10 minutes.
*
* 40 KB in 10 minutes equals 67 Bps.
*/
private static final int DEFAULT_BW_PER_TUNNEL_ESTIMATE = RouterThrottleImpl.DEFAULT_MESSAGES_PER_TUNNEL_ESTIMATE * 1024 / (10*60);
public BuildHandler(RouterContext ctx, TunnelPoolManager manager, BuildExecutor exec) {
_context = ctx;
_log = ctx.logManager().getLog(getClass());
@ -311,7 +322,7 @@ class BuildHandler implements Runnable {
_log.info(msg.getUniqueId() + ": Handling the reply after " + rtt + ", delayed " + delay + " waiting for " + cfg);
List<Integer> order = cfg.getReplyOrder();
int statuses[] = _buildReplyHandler.decrypt(msg, cfg, order);
BuildReplyHandler.Result statuses[] = _buildReplyHandler.decrypt(msg, cfg, order);
if (statuses != null) {
boolean allAgree = true;
// For each peer in the tunnel
@ -331,7 +342,7 @@ class BuildHandler implements Runnable {
return;
}
int howBad = statuses[record];
int howBad = statuses[record].code;
// Look up routerInfo
RouterInfo ri = _context.netDb().lookupRouterInfoLocally(peer);
@ -351,6 +362,15 @@ class BuildHandler implements Runnable {
if (howBad == 0) {
// w3wt
_context.profileManager().tunnelJoined(peer, rtt);
Properties props = statuses[record].props;
if (props != null) {
String avail = props.getProperty(BuildRequestor.PROP_AVAIL_BW);
if (avail != null) {
if (_log.shouldWarn())
_log.warn(msg.getUniqueId() + ": peer replied available: " + avail + "KBps");
// TODO
}
}
} else {
allAgree = false;
switch (howBad) {
@ -901,6 +921,60 @@ class BuildHandler implements Runnable {
}
}
// BW params
int avail = 0;
if (response == 0) {
Properties props = req.readOptions();
if (props != null) {
int min = 0;
int rqu = 0;
String smin = props.getProperty(BuildRequestor.PROP_MIN_BW);
if (smin != null) {
try {
min = 1000 * Integer.parseInt(smin);
} catch (NumberFormatException nfe) {
response = TunnelHistory.TUNNEL_REJECT_BANDWIDTH;
}
}
String sreq = props.getProperty(BuildRequestor.PROP_REQ_BW);
if (sreq != null) {
try {
rqu = 1000 * Integer.parseInt(sreq);
} catch (NumberFormatException nfe) {
response = TunnelHistory.TUNNEL_REJECT_BANDWIDTH;
}
}
if ((min > 0 || rqu > 0) && response == 0) {
int share = 1000 * TunnelDispatcher.getShareBandwidth(_context);
int max = share / 20;
if (min > max) {
response = TunnelHistory.TUNNEL_REJECT_BANDWIDTH;
} else {
RateStat stat = _context.statManager().getRate("tunnel.participatingBandwidth");
if (stat != null) {
Rate rate = stat.getRate(10*60*1000);
if (rate != null) {
int used = (int) rate.getAvgOrLifetimeAvg();
avail = Math.min(max, (share - used) / 4);
if (min > avail) {
if (_log.shouldWarn())
_log.warn("REJECT Part tunnel: min: " + min + " req: " + rqu + " avail: " + avail);
response = TunnelHistory.TUNNEL_REJECT_BANDWIDTH;
} else {
if (_log.shouldWarn())
_log.warn("ACCEPT Part tunnel: min: " + min + " req: " + rqu + " avail: " + avail);
if (min > 0 && rqu > 4 * min)
rqu = 4 * min;
if (rqu > 0 && rqu < avail)
avail = rqu;
}
}
}
}
}
}
}
HopConfig cfg = null;
if (response == 0) {
cfg = new HopConfig();
@ -928,6 +1002,10 @@ class BuildHandler implements Runnable {
cfg.setSendTo(nextPeer);
cfg.setSendTunnelId(nextId);
}
if (avail > 0)
cfg.setAllocatedBW(avail);
else
cfg.setAllocatedBW(DEFAULT_BW_PER_TUNNEL_ESTIMATE);
// now "actually" join
boolean success;
@ -996,8 +1074,13 @@ class BuildHandler implements Runnable {
}
EncryptedBuildRecord reply;
if (isEC) {
// TODO options
Properties props = EmptyProperties.INSTANCE;
Properties props;
if (avail > 0) {
props = new Properties();
props.setProperty(BuildRequestor.PROP_AVAIL_BW, Integer.toString(avail / 1000));
} else {
props = EmptyProperties.INSTANCE;
}
if (state.msg.getType() == ShortTunnelBuildMessage.MESSAGE_TYPE) {
reply = BuildResponseRecord.createShort(_context, response, req.getChaChaReplyKey(), req.getChaChaReplyAD(), props, ourSlot);
} else {

View File

@ -1,10 +1,12 @@
package net.i2p.router.tunnel.pool;
import java.util.List;
import java.util.Properties;
import net.i2p.I2PAppContext;
import net.i2p.crypto.ChaCha20;
import net.i2p.data.Base64;
import net.i2p.data.DataFormatException;
import net.i2p.data.DataHelper;
import net.i2p.data.Hash;
import net.i2p.data.SessionKey;
@ -14,6 +16,7 @@ import net.i2p.data.i2np.OutboundTunnelBuildReplyMessage;
import net.i2p.data.i2np.ShortEncryptedBuildRecord;
import net.i2p.data.i2np.ShortTunnelBuildReplyMessage;
import net.i2p.data.i2np.TunnelBuildReplyMessage;
import net.i2p.router.peermanager.TunnelHistory;
import net.i2p.router.tunnel.TunnelCreatorConfig;
import net.i2p.util.Log;
import net.i2p.util.SimpleByteCache;
@ -29,6 +32,11 @@ class BuildReplyHandler {
private final I2PAppContext ctx;
private final Log log;
// cached results
private static final Result RESULT_NG = new Result(-1, null);
private static final Result RESULT_OK = new Result(0, null);
private static final Result RESULT_BW = new Result(TunnelHistory.TUNNEL_REJECT_BANDWIDTH, null);
/**
* @since 0.9.8 (methods were static before)
*/
@ -37,6 +45,40 @@ class BuildReplyHandler {
log = ctx.logManager().getLog(BuildReplyHandler.class);
}
/**
* This contains the result of decrypting the build request
* or reply record, including the decrypted properties field,
* if present.
*
* For requests, the code is 0 if the decrypt was successful
* or -1 if the record was not found or the decrypt failed.
*
* For replies, the code is from the reply record, 0-255.
* The code is usually 0 for ACCEPT or 30 for REJECT_BANDWIDTH.
* The code is -1 if the record was not found or the decrypt failed.
*
* The Properties is usually null, but if non-null, will contain
* bandwidth request/reply options as specified in proposal 168,
* or other options to be defined later.
*
* If the code is -1, or it's a non-ECIES build record,
* or the properties field in the record was empty,
* the returned properties here will be null.
*
* @since 0.9.66
*/
public static class Result {
public final int code;
public final Properties props;
/**
* @param c 0-255 or -1
* @param p may be null
*/
public Result(int c, Properties p) {
code = c; props = p;
}
}
/**
* Decrypt the tunnel build reply records. This overwrites the contents of the reply.
* Thread safe (no state).
@ -45,15 +87,15 @@ class BuildReplyHandler {
* Do not call this more than once for a given message.
*
* @return status for the records (in record order), or null if the replies were not valid. Fake records
* always have 0 as their value
* always have 0 as their value. If the array is non-null, all entries in the array are non-null.
*/
public int[] decrypt(TunnelBuildReplyMessage reply, TunnelCreatorConfig cfg, List<Integer> recordOrder) {
public Result[] decrypt(TunnelBuildReplyMessage reply, TunnelCreatorConfig cfg, List<Integer> recordOrder) {
if (reply.getRecordCount() != recordOrder.size()) {
// somebody messed with us
log.error("Corrupted build reply, expected " + recordOrder.size() + " records, got " + reply.getRecordCount());
return null;
}
int rv[] = new int[reply.getRecordCount()];
Result rv[] = new Result[reply.getRecordCount()];
for (int i = 0; i < rv.length; i++) {
int hop = recordOrder.get(i).intValue();
if (BuildMessageGenerator.isBlank(cfg, hop)) {
@ -67,7 +109,7 @@ class BuildReplyHandler {
// get stored hash put here by BuildMessageGenerator
Hash h2 = cfg.getBlankHash();
if (h2 != null && DataHelper.eq(h1, h2.getData())) {
rv[i] = 0;
rv[i] = RESULT_OK;
} else {
if (log.shouldWarn())
log.warn("IBEP record corrupt on " + cfg);
@ -75,19 +117,19 @@ class BuildReplyHandler {
return null;
}
} else {
rv[i] = 0;
rv[i] = RESULT_OK;
}
} else {
int ok = decryptRecord(reply, cfg, i, hop);
if (ok == -1) {
Result res = decryptRecord(reply, cfg, i, hop);
if (res.code == -1) {
if (log.shouldLog(Log.WARN))
log.warn(reply.getUniqueId() + ": decrypt record " + i + "/" + hop + " fail: " + cfg);
return null;
} else {
if (log.shouldLog(Log.DEBUG))
log.debug(reply.getUniqueId() + ": decrypt record " + i + "/" + hop + " success: " + ok + " for " + cfg);
log.debug(reply.getUniqueId() + ": decrypt record " + i + "/" + hop + " code: " + res.code + " for " + cfg);
}
rv[i] = ok;
rv[i] = res;
}
}
return rv;
@ -102,13 +144,13 @@ class BuildReplyHandler {
*
* @return the status 0-255, or -1 on decrypt failure
*/
private int decryptRecord(TunnelBuildReplyMessage reply, TunnelCreatorConfig cfg, int recordNum, int hop) {
private Result decryptRecord(TunnelBuildReplyMessage reply, TunnelCreatorConfig cfg, int recordNum, int hop) {
EncryptedBuildRecord rec = reply.getRecord(recordNum);
int type = reply.getType();
if (rec == null) {
if (log.shouldWarn())
log.warn("Missing record " + recordNum);
return -1;
return RESULT_NG;
}
byte[] data = rec.getData();
int start = cfg.getLength() - 1;
@ -152,6 +194,7 @@ class BuildReplyHandler {
// ok, all of the layered encryption is stripped, so lets verify it
// (formatted per BuildResponseRecord.create)
int rv;
Properties props = null;
if (isEC) {
// For last iteration, do ChaCha instead
SessionKey replyKey = cfg.getChaChaReplyKey(hop);
@ -167,11 +210,26 @@ class BuildReplyHandler {
if (!ok) {
if (log.shouldWarn())
log.warn(reply.getUniqueId() + ": chacha reply decrypt fail on " + recordNum + "/" + hop);
return -1;
return RESULT_NG;
}
// reply properties TODO
// this handles both standard records in a build reply message and short records in a OTBRM
rv = data[rec.length() - 17] & 0xff;
// reply properties
if (rv == 0) {
// The mapping is at the beginning of the record
// check to see if non-empty before parsing
if (data[0] != 0 || data[1] != 0) {
props = new Properties();
try {
DataHelper.fromProperties(data, 0, props);
} catch (DataFormatException dfe) {
if (log.shouldWarn())
log.warn(reply.getUniqueId() + ": error reading properties", dfe);
props = null;
}
}
}
} else {
// don't cache the result
//Hash h = ctx.sha().calculateHash(data, off + Hash.HASH_LENGTH, TunnelBuildReplyMessage.RECORD_SIZE-Hash.HASH_LENGTH);
@ -184,13 +242,20 @@ class BuildReplyHandler {
Base64.encode(data, 0, Hash.HASH_LENGTH) + " expected\n" +
"Record: " + Base64.encode(data, Hash.HASH_LENGTH, TunnelBuildReplyMessage.RECORD_SIZE-Hash.HASH_LENGTH));
SimpleByteCache.release(h);
return -1;
return RESULT_NG;
}
SimpleByteCache.release(h);
rv = data[TunnelBuildReplyMessage.RECORD_SIZE - 1] & 0xff;
}
if (log.shouldLog(Log.DEBUG))
log.debug(reply.getUniqueId() + ": Verified: " + rv + " for record " + recordNum + "/" + hop);
return rv;
if (props == null) {
// return cached
if (rv == 0)
return RESULT_OK;
if (rv == TunnelHistory.TUNNEL_REJECT_BANDWIDTH)
return RESULT_BW;
}
return new Result(rv, props);
}
}

View File

@ -3,6 +3,7 @@ package net.i2p.router.tunnel.pool;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.StringTokenizer;
import net.i2p.crypto.EncType;
@ -479,6 +480,22 @@ abstract class BuildRequestor {
if (log.shouldLog(Log.DEBUG))
log.debug("Build order: " + order + " for " + cfg);
// BW properties
Properties props;
int bw = 0;
int variance = 0;
if (!useShortTBM || pool.getSettings().isExploratory()) {
props = EmptyProperties.INSTANCE;
} else {
bw = pool.getAvgBWPerTunnel();
if (bw > 7000) {
props = new Properties();
variance = 4 * bw / 10;
} else {
props = EmptyProperties.INSTANCE;
}
}
for (int i = 0; i < msg.getRecordCount(); i++) {
int hop = order.get(i).intValue();
@ -504,9 +521,20 @@ abstract class BuildRequestor {
else
log.debug(cfg.getReplyMessageId() + ": record " + i + "/" + hop + " empty");
}
// BW properties TODO
if (key != null && variance > 0) {
// BW properties, randomize per-hop
int min = bw - ctx.random().nextInt(variance);
int req = bw + variance + ctx.random().nextInt(variance);
if (log.shouldWarn())
log.warn("Pool: " + pool + " min: " + min + " avg: " + bw + " req: " + req);
props.setProperty(PROP_MIN_BW, Integer.toString(min / 1000));
props.setProperty(PROP_REQ_BW, Integer.toString(req / 1000));
// TOOO if (i == 0 && pool.getSettings().isExploratory()) set PROP_MAX_BW
}
Properties p = key != null ? props : EmptyProperties.INSTANCE;
BuildMessageGenerator.createRecord(i, hop, msg, cfg, replyRouter, replyTunnel,
ctx, key, EmptyProperties.INSTANCE);
ctx, key, p);
}
BuildMessageGenerator.layeredEncrypt(ctx, msg, cfg, order);
//if (useShortTBM && log.shouldWarn())

View File

@ -141,6 +141,21 @@ public class TunnelPool {
return _rateName;
}
/**
* @return average bandwidth per configured tunnel in Bps
* @since 0.9.66
*/
public int getAvgBWPerTunnel() {
RateStat stat = _context.statManager().getRate(_rateName);
if (stat == null)
return 0;
Rate rate = stat.getRate(5*60*1000);
if (rate == null)
return 0;
int count = _settings.isInbound() ? _settings.getQuantity() : _settings.getTotalQuantity();
return (int) (((float) rate.getAvgOrLifetimeAvg()) / count);
}
private void refreshSettings() {
if (!_settings.isExploratory())
return; // don't override client specified settings

View File

@ -216,13 +216,13 @@ public class BuildMessageTestStandalone extends TestCase {
}
}
int statuses[] = (new BuildReplyHandler(ctx)).decrypt(reply, cfg, order);
BuildReplyHandler.Result statuses[] = (new BuildReplyHandler(ctx)).decrypt(reply, cfg, order);
if (statuses == null) throw new RuntimeException("bar");
boolean allAgree = true;
for (int i = 1; i < cfg.getLength(); i++) {
Hash peer = cfg.getPeer(i);
int record = order.get(i).intValue();
if (statuses[record] != 0)
if (statuses[record].code != 0)
allAgree = false;
}