2006-01-04 jrandom

* Rather than profile individual tunnels for throughput over their
      lifetime, do so at 1 minute intervals (allowing less frequently active
      tunnels to be more fairly measured).
    * Run the live tunnel load test across two tunnels at a time, by default.
      The load test runs for a random period from 90s to the tunnel lifetime,
      self paced.  This should help gathering data for profiling peers that
      are in exploratory tunnels.
2006-01-03  jrandom
    * Calculate the overall peer throughput across the 3 fastest one minute
      tunnel throughput values, rather than the single fastest throughput.
    * Degrade the profiled throughput data over time (cutting the profiled
      peaks in half once a day, on average)
    * Enable yet another new speed calculation for profiling peers, using the
      peak throughput from individual tunnels that a peer is participating in,
      rather than across all tunnels they are participating in.  This helps
      gather a fairer peer throughput measurement, since it won't allow a slow
      high capacity peer seem to have a higher throughput (pushing a little
      data across many tunnels at once, as opposed to lots of data across a
      single tunnel).  This degrades over time like the other.
    * Add basic OS/2 support to the jbigi code (though we do not bundle a
      precompiled OS/2 library)
This commit is contained in:
jrandom
2006-01-05 02:48:13 +00:00
committed by zzz
parent 23723b56ca
commit c00488afeb
21 changed files with 381 additions and 37 deletions

View File

@@ -295,7 +295,8 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer {
buf.setLength(0); buf.setLength(0);
ok = DataHelper.readLine(in, buf); ok = DataHelper.readLine(in, buf);
if (!ok) throw new IOException("EOF reached before the end of the headers [" + buf.toString() + "]"); if (!ok) throw new IOException("EOF reached before the end of the headers [" + buf.toString() + "]");
if ( (buf.length() <= 1) && ( (buf.charAt(0) == '\n') || (buf.charAt(0) == '\r') ) ) { if ( (buf.length() == 0) ||
((buf.charAt(0) == '\n') || (buf.charAt(0) == '\r')) ) {
// end of headers reached // end of headers reached
return headers; return headers;
} else { } else {

View File

@@ -105,13 +105,14 @@ public class NativeBigInteger extends BigInteger {
private final static String JBIGI_OPTIMIZATION_PENTIUM4 = "pentium4"; private final static String JBIGI_OPTIMIZATION_PENTIUM4 = "pentium4";
private static final boolean _isWin = System.getProperty("os.name").startsWith("Win"); private static final boolean _isWin = System.getProperty("os.name").startsWith("Win");
private static final boolean _isOS2 = System.getProperty("os.name").startsWith("OS/2");
private static final boolean _isMac = System.getProperty("os.name").startsWith("Mac"); private static final boolean _isMac = System.getProperty("os.name").startsWith("Mac");
private static final boolean _isLinux = System.getProperty("os.name").toLowerCase().indexOf("linux") != -1; private static final boolean _isLinux = System.getProperty("os.name").toLowerCase().indexOf("linux") != -1;
private static final boolean _isFreebsd = System.getProperty("os.name").toLowerCase().indexOf("freebsd") != -1; private static final boolean _isFreebsd = System.getProperty("os.name").toLowerCase().indexOf("freebsd") != -1;
private static final boolean _isNix = !(_isWin || _isMac); private static final boolean _isNix = !(_isWin || _isMac || _isOS2);
/* libjbigi.so vs jbigi.dll */ /* libjbigi.so vs jbigi.dll */
private static final String _libPrefix = (_isWin ? "" : "lib"); private static final String _libPrefix = (_isWin || _isOS2 ? "" : "lib");
private static final String _libSuffix = (_isWin ? ".dll" : _isMac ? ".jnilib" : ".so"); private static final String _libSuffix = (_isWin || _isOS2 ? ".dll" : _isMac ? ".jnilib" : ".so");
private final static String sCPUType; //The CPU Type to optimize for (one of the above strings) private final static String sCPUType; //The CPU Type to optimize for (one of the above strings)
@@ -254,7 +255,8 @@ public class NativeBigInteger extends BigInteger {
*/ */
public static void main(String args[]) { public static void main(String args[]) {
runModPowTest(100); runModPowTest(100);
runDoubleValueTest(100); // i2p doesn't care about the double values
//runDoubleValueTest(100);
} }
/* the sample numbers are elG generator/prime so we can test with reasonable numbers */ /* the sample numbers are elG generator/prime so we can test with reasonable numbers */
@@ -440,8 +442,10 @@ public class NativeBigInteger extends BigInteger {
if (_doLog && !_nativeOk) if (_doLog && !_nativeOk)
System.err.println("INFO: Native BigInteger library jbigi not loaded - using pure java"); System.err.println("INFO: Native BigInteger library jbigi not loaded - using pure java");
}catch(Exception e){ }catch(Exception e){
if (_doLog) if (_doLog) {
System.err.println("INFO: Native BigInteger library jbigi not loaded, reason: '"+e.getMessage()+"' - using pure java"); System.err.println("INFO: Native BigInteger library jbigi not loaded, reason: '"+e.getMessage()+"' - using pure java");
e.printStackTrace();
}
} }
} }
@@ -489,7 +493,8 @@ public class NativeBigInteger extends BigInteger {
} }
private static final boolean loadFromResource(String resourceName) { private static final boolean loadFromResource(String resourceName) {
if (resourceName == null) return false; if (resourceName == null) return false;
URL resource = NativeBigInteger.class.getClassLoader().getResource(resourceName); //URL resource = NativeBigInteger.class.getClassLoader().getResource(resourceName);
URL resource = ClassLoader.getSystemResource(resourceName);
if (resource == null) { if (resource == null) {
if (_doLog) if (_doLog)
System.err.println("NOTICE: Resource name [" + resourceName + "] was not found"); System.err.println("NOTICE: Resource name [" + resourceName + "] was not found");
@@ -561,6 +566,8 @@ public class NativeBigInteger extends BigInteger {
return "jbigi-freebsd"+sAppend; // The convention on freebsd... return "jbigi-freebsd"+sAppend; // The convention on freebsd...
if(_isMac) if(_isMac)
return "jbigi-osx"+sAppend; return "jbigi-osx"+sAppend;
if(_isOS2)
return "jbigi-os2"+sAppend;
throw new RuntimeException("Dont know jbigi library name for os type '"+System.getProperty("os.name")+"'"); throw new RuntimeException("Dont know jbigi library name for os type '"+System.getProperty("os.name")+"'");
} }
} }

View File

@@ -1,4 +1,28 @@
$Id: history.txt,v 1.376 2005/12/31 18:40:23 jrandom Exp $ $Id: history.txt,v 1.377 2006/01/01 12:23:29 jrandom Exp $
2006-01-04 jrandom
* Rather than profile individual tunnels for throughput over their
lifetime, do so at 1 minute intervals (allowing less frequently active
tunnels to be more fairly measured).
* Run the live tunnel load test across two tunnels at a time, by default.
The load test runs for a random period from 90s to the tunnel lifetime,
self paced. This should help gathering data for profiling peers that
are in exploratory tunnels.
2006-01-03 jrandom
* Calculate the overall peer throughput across the 3 fastest one minute
tunnel throughput values, rather than the single fastest throughput.
* Degrade the profiled throughput data over time (cutting the profiled
peaks in half once a day, on average)
* Enable yet another new speed calculation for profiling peers, using the
peak throughput from individual tunnels that a peer is participating in,
rather than across all tunnels they are participating in. This helps
gather a fairer peer throughput measurement, since it won't allow a slow
high capacity peer seem to have a higher throughput (pushing a little
data across many tunnels at once, as opposed to lots of data across a
single tunnel). This degrades over time like the other.
* Add basic OS/2 support to the jbigi code (though we do not bundle a
precompiled OS/2 library)
2006-01-01 jrandom 2006-01-01 jrandom
* Disable multifile torrent creation in I2PSnark's web UI for the moment * Disable multifile torrent creation in I2PSnark's web UI for the moment

View File

@@ -5,6 +5,7 @@ import java.util.*;
import net.i2p.util.*; import net.i2p.util.*;
import net.i2p.data.*; import net.i2p.data.*;
import net.i2p.data.i2np.*; import net.i2p.data.i2np.*;
import net.i2p.router.TunnelInfo;
import net.i2p.router.message.*; import net.i2p.router.message.*;
import net.i2p.router.tunnel.*; import net.i2p.router.tunnel.*;
import net.i2p.router.tunnel.pool.*; import net.i2p.router.tunnel.pool.*;
@@ -80,8 +81,8 @@ public class LoadTestManager {
} }
} }
/** 10 peers at a time */ /** 1 peer at a time */
private static final int CONCURRENT_PEERS = 0; private static final int CONCURRENT_PEERS = 1;
/** 4 messages per peer at a time */ /** 4 messages per peer at a time */
private static final int CONCURRENT_MESSAGES = 4; private static final int CONCURRENT_MESSAGES = 4;
@@ -360,6 +361,11 @@ public class LoadTestManager {
_context.statManager().addRateData("test.rtt", period, count); _context.statManager().addRateData("test.rtt", period, count);
if (period > 2000) if (period > 2000)
_context.statManager().addRateData("test.rttHigh", period, count); _context.statManager().addRateData("test.rttHigh", period, count);
TunnelInfo info = _cfg.getOutbound();
if (info != null)
info.incrementVerifiedBytesTransferred(5*1024);
// the inbound tunnel is incremented by the tunnel management system itself,
// so don't double count it here
return true; return true;
} }
return false; return false;
@@ -390,7 +396,7 @@ public class LoadTestManager {
} else { } else {
int hop = 0; int hop = 0;
TunnelInfo info = tunnel.getOutbound(); TunnelInfo info = tunnel.getOutbound();
for (int i = 0; (info != null) && (i < info.getLength()-1); i++) { for (int i = 0; (info != null) && (i < info.getLength()); i++) {
Hash peer = info.getPeer(i); Hash peer = info.getPeer(i);
if ( (peer != null) && (peer.equals(_context.routerHash())) ) if ( (peer != null) && (peer.equals(_context.routerHash())) )
continue; continue;
@@ -409,7 +415,7 @@ public class LoadTestManager {
hop++; hop++;
} }
info = tunnel.getInbound(); info = tunnel.getInbound();
for (int i = 0; (info != null) && (i < info.getLength()-1); i++) { for (int i = 0; (info != null) && (i < info.getLength()); i++) {
Hash peer = info.getPeer(i); Hash peer = info.getPeer(i);
if ( (peer != null) && (peer.equals(_context.routerHash())) ) if ( (peer != null) && (peer.equals(_context.routerHash())) )
continue; continue;
@@ -585,14 +591,16 @@ public class LoadTestManager {
private boolean wantToTest(LoadTestTunnelConfig cfg) { private boolean wantToTest(LoadTestTunnelConfig cfg) {
// wait 10 minutes before testing anything // wait 10 minutes before testing anything
if (_context.router().getUptime() <= 10*60*1000) return false; if (_context.router().getUptime() <= 10*60*1000) return false;
if (bandwidthOverloaded()) return false;
if (TEST_LIVE_TUNNELS && _active.size() < getConcurrency()) { if (TEST_LIVE_TUNNELS && _active.size() < getConcurrency()) {
// length == #hops+1 (as it includes the creator) // length == #hops+1 (as it includes the creator)
if (cfg.getLength() < 2) if (cfg.getLength() < 2)
return false; return false;
// only load test the client tunnels // only load test the client tunnels
if (cfg.getTunnel().getDestination() == null) // XXX why?
return false; ////if (cfg.getTunnel().getDestination() == null)
//// return false;
_active.add(cfg); _active.add(cfg);
return true; return true;
} else { } else {
@@ -600,6 +608,20 @@ public class LoadTestManager {
} }
} }
private boolean bandwidthOverloaded() {
int msgLoadBps = CONCURRENT_MESSAGES
* 5 // message size
/ 10; // 10 seconds before timeout & retransmission
msgLoadBps *= 2; // buffer
if (_context.bandwidthLimiter().getSendBps()/1024d + (double)msgLoadBps >= _context.bandwidthLimiter().getOutboundKBytesPerSecond())
return true;
if (_context.bandwidthLimiter().getReceiveBps()/1024d + (double)msgLoadBps >= _context.bandwidthLimiter().getInboundKBytesPerSecond())
return true;
if (_context.throttle().getMessageDelay() > 1000)
return true;
return false;
}
private class CreatedJob extends JobImpl { private class CreatedJob extends JobImpl {
private LoadTestTunnelConfig _cfg; private LoadTestTunnelConfig _cfg;
public CreatedJob(RouterContext ctx, LoadTestTunnelConfig cfg) { public CreatedJob(RouterContext ctx, LoadTestTunnelConfig cfg) {
@@ -619,6 +641,9 @@ public class LoadTestManager {
runTest(_cfg); runTest(_cfg);
} }
} }
private long TEST_PERIOD_MAX = 10*60*1000;
private long TEST_PERIOD_MIN = 90*1000;
private class Expire extends JobImpl { private class Expire extends JobImpl {
private LoadTestTunnelConfig _cfg; private LoadTestTunnelConfig _cfg;
private boolean _removeFromDispatcher; private boolean _removeFromDispatcher;
@@ -629,13 +654,22 @@ public class LoadTestManager {
super(ctx); super(ctx);
_cfg = cfg; _cfg = cfg;
_removeFromDispatcher = removeFromDispatcher; _removeFromDispatcher = removeFromDispatcher;
getTiming().setStartAfter(cfg.getExpiration()+60*1000); long duration = ctx.random().nextLong(TEST_PERIOD_MAX);
if (duration < TEST_PERIOD_MIN)
duration += TEST_PERIOD_MIN;
long expiration = duration + ctx.clock().now();
if (expiration > cfg.getExpiration()+60*1000)
expiration = cfg.getExpiration()+60*1000;
getTiming().setStartAfter(expiration);
} }
public String getName() { return "expire test tunnel"; } public String getName() { return "expire test tunnel"; }
public void runJob() { public void runJob() {
if (_removeFromDispatcher) if (_removeFromDispatcher)
getContext().tunnelDispatcher().remove(_cfg.getTunnel()); getContext().tunnelDispatcher().remove(_cfg.getTunnel());
_cfg.logComplete(); _cfg.logComplete();
TunnelInfo info = _cfg.getOutbound();
if (info != null)
info.incrementVerifiedBytesTransferred(0); // just to wrap up the test data
_active.remove(_cfg); _active.remove(_cfg);
} }
} }

View File

@@ -68,6 +68,18 @@ public interface ProfileManager {
*/ */
void tunnelDataPushed(Hash peer, long rtt, int size); void tunnelDataPushed(Hash peer, long rtt, int size);
/**
* Note that the peer is participating in a tunnel that pushed the given amount of data
* over the last minute.
*/
void tunnelDataPushed1m(Hash peer, int size);
/**
* Note that we were able to push the given amount of data through a tunnel
* that the peer is participating in
*/
void tunnelLifetimePushed(Hash peer, long lifetime, long size);
/** /**
* Note that the peer participated in a tunnel that failed. Its failure may not have * Note that the peer participated in a tunnel that failed. Its failure may not have
* been the peer's fault however. * been the peer's fault however.

View File

@@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
* *
*/ */
public class RouterVersion { public class RouterVersion {
public final static String ID = "$Revision: 1.323 $ $Date: 2005/12/31 18:40:22 $"; public final static String ID = "$Revision: 1.324 $ $Date: 2006/01/01 12:23:29 $";
public final static String VERSION = "0.6.1.8"; public final static String VERSION = "0.6.1.8";
public final static long BUILD = 7; public final static long BUILD = 8;
public static void main(String args[]) { public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
System.out.println("Router ID: " + RouterVersion.ID); System.out.println("Router ID: " + RouterVersion.ID);

View File

@@ -48,4 +48,9 @@ public interface TunnelInfo {
public void testSuccessful(int responseTime); public void testSuccessful(int responseTime);
public long getProcessedMessagesCount(); public long getProcessedMessagesCount();
/** we know for sure that this many bytes travelled through the tunnel in its lifetime */
public long getVerifiedBytesTransferred();
/** we know for sure that the given number of bytes were sent down the tunnel fully */
public void incrementVerifiedBytesTransferred(int numBytes);
} }

View File

@@ -575,6 +575,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
getContext().profileManager().tunnelTestSucceeded(_outTunnel.getPeer(i), sendTime); getContext().profileManager().tunnelTestSucceeded(_outTunnel.getPeer(i), sendTime);
getContext().profileManager().tunnelDataPushed(_outTunnel.getPeer(i), sendTime, size); getContext().profileManager().tunnelDataPushed(_outTunnel.getPeer(i), sendTime, size);
} }
_outTunnel.incrementVerifiedBytesTransferred(size);
} }
if (_inTunnel != null) if (_inTunnel != null)
for (int i = 0; i < _inTunnel.getLength(); i++) for (int i = 0; i < _inTunnel.getLength(); i++)

View File

@@ -376,7 +376,8 @@ class SearchJob extends JobImpl {
+ msg.getReplyTunnel() + "]"); + msg.getReplyTunnel() + "]");
SearchMessageSelector sel = new SearchMessageSelector(getContext(), router, _expiration, _state); SearchMessageSelector sel = new SearchMessageSelector(getContext(), router, _expiration, _state);
SearchUpdateReplyFoundJob reply = new SearchUpdateReplyFoundJob(getContext(), router, _state, _facade, this); SearchUpdateReplyFoundJob reply = new SearchUpdateReplyFoundJob(getContext(), router, _state, _facade,
this, outTunnel, inTunnel);
getContext().messageRegistry().registerPending(sel, reply, new FailedJob(getContext(), router), timeout); getContext().messageRegistry().registerPending(sel, reply, new FailedJob(getContext(), router), timeout);
getContext().tunnelDispatcher().dispatchOutbound(msg, outTunnelId, router.getIdentity().getHash()); getContext().tunnelDispatcher().dispatchOutbound(msg, outTunnelId, router.getIdentity().getHash());

View File

@@ -10,6 +10,7 @@ import net.i2p.data.i2np.I2NPMessage;
import net.i2p.router.JobImpl; import net.i2p.router.JobImpl;
import net.i2p.router.ReplyJob; import net.i2p.router.ReplyJob;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
import net.i2p.router.TunnelInfo;
import net.i2p.util.Log; import net.i2p.util.Log;
/** /**
@@ -23,16 +24,27 @@ class SearchUpdateReplyFoundJob extends JobImpl implements ReplyJob {
private SearchState _state; private SearchState _state;
private KademliaNetworkDatabaseFacade _facade; private KademliaNetworkDatabaseFacade _facade;
private SearchJob _job; private SearchJob _job;
private TunnelInfo _outTunnel;
private TunnelInfo _replyTunnel;
private long _sentOn;
public SearchUpdateReplyFoundJob(RouterContext context, RouterInfo peer, public SearchUpdateReplyFoundJob(RouterContext context, RouterInfo peer,
SearchState state, KademliaNetworkDatabaseFacade facade, SearchState state, KademliaNetworkDatabaseFacade facade,
SearchJob job) { SearchJob job) {
this(context, peer, state, facade, job, null, null);
}
public SearchUpdateReplyFoundJob(RouterContext context, RouterInfo peer,
SearchState state, KademliaNetworkDatabaseFacade facade,
SearchJob job, TunnelInfo outTunnel, TunnelInfo replyTunnel) {
super(context); super(context);
_log = context.logManager().getLog(SearchUpdateReplyFoundJob.class); _log = context.logManager().getLog(SearchUpdateReplyFoundJob.class);
_peer = peer.getIdentity().getHash(); _peer = peer.getIdentity().getHash();
_state = state; _state = state;
_facade = facade; _facade = facade;
_job = job; _job = job;
_outTunnel = outTunnel;
_replyTunnel = replyTunnel;
_sentOn = System.currentTimeMillis();
} }
public String getName() { return "Update Reply Found for Kademlia Search"; } public String getName() { return "Update Reply Found for Kademlia Search"; }
@@ -42,6 +54,21 @@ class SearchUpdateReplyFoundJob extends JobImpl implements ReplyJob {
_log.info(getJobId() + ": Reply from " + _peer.toBase64() _log.info(getJobId() + ": Reply from " + _peer.toBase64()
+ " with message " + message.getClass().getName()); + " with message " + message.getClass().getName());
long howLong = System.currentTimeMillis() - _sentOn;
// assume requests are 1KB (they're almost always much smaller, but tunnels have a fixed size)
int msgSize = 1024;
if (_replyTunnel != null) {
for (int i = 0; i < _replyTunnel.getLength(); i++)
getContext().profileManager().tunnelDataPushed(_replyTunnel.getPeer(i), howLong, msgSize);
_replyTunnel.incrementVerifiedBytesTransferred(msgSize);
}
if (_outTunnel != null) {
for (int i = 0; i < _outTunnel.getLength(); i++)
getContext().profileManager().tunnelDataPushed(_outTunnel.getPeer(i), howLong, msgSize);
_outTunnel.incrementVerifiedBytesTransferred(msgSize);
}
if (message instanceof DatabaseStoreMessage) { if (message instanceof DatabaseStoreMessage) {
long timeToReply = _state.dataFound(_peer); long timeToReply = _state.dataFound(_peer);

View File

@@ -363,6 +363,7 @@ class StoreJob extends JobImpl {
_log.warn("sent a " + _msgSize + "byte netDb message through tunnel " + _sendThrough + " after " + howLong); _log.warn("sent a " + _msgSize + "byte netDb message through tunnel " + _sendThrough + " after " + howLong);
for (int i = 0; i < _sendThrough.getLength(); i++) for (int i = 0; i < _sendThrough.getLength(); i++)
getContext().profileManager().tunnelDataPushed(_sendThrough.getPeer(i), howLong, _msgSize); getContext().profileManager().tunnelDataPushed(_sendThrough.getPeer(i), howLong, _msgSize);
_sendThrough.incrementVerifiedBytesTransferred(_msgSize);
} }
if (_state.getCompleteCount() >= getRedundancy()) { if (_state.getCompleteCount() >= getRedundancy()) {

View File

@@ -234,13 +234,95 @@ public class PeerProfile {
+ " to " + _tunnelTestResponseTimeAvg + " via " + ms); + " to " + _tunnelTestResponseTimeAvg + " via " + ms);
} }
/** bytes per minute */ /** keep track of the fastest 3 throughputs */
private volatile double _peakThroughput; private static final int THROUGHPUT_COUNT = 3;
/**
* fastest 1 minute throughput, in bytes per minute, ordered with fastest
* first. this is not synchronized, as we don't *need* perfection, and we only
* reorder/insert values on coallesce
*/
private final double _peakThroughput[] = new double[THROUGHPUT_COUNT];
private volatile long _peakThroughputCurrentTotal; private volatile long _peakThroughputCurrentTotal;
public double getPeakThroughputKBps() { return _peakThroughput / (60d*1024d); } public double getPeakThroughputKBps() {
public void setPeakThroughputKBps(double kBps) { _peakThroughput = kBps*60; } double rv = 0;
for (int i = 0; i < THROUGHPUT_COUNT; i++)
rv += _peakThroughput[i];
rv /= (60d*1024d*(double)THROUGHPUT_COUNT);
return rv;
}
public void setPeakThroughputKBps(double kBps) {
_peakThroughput[0] = kBps*60*1024;
//for (int i = 0; i < THROUGHPUT_COUNT; i++)
// _peakThroughput[i] = kBps*60;
}
void dataPushed(int size) { _peakThroughputCurrentTotal += size; } void dataPushed(int size) { _peakThroughputCurrentTotal += size; }
private final double _peakTunnelThroughput[] = new double[THROUGHPUT_COUNT];
/** the tunnel pushed that much data in its lifetime */
void tunnelDataTransferred(long tunnelByteLifetime) {
double lowPeak = _peakTunnelThroughput[THROUGHPUT_COUNT-1];
if (tunnelByteLifetime > lowPeak) {
synchronized (_peakTunnelThroughput) {
for (int i = 0; i < THROUGHPUT_COUNT; i++) {
if (tunnelByteLifetime > _peakTunnelThroughput[i]) {
for (int j = THROUGHPUT_COUNT-1; j > i; j--)
_peakTunnelThroughput[j] = _peakTunnelThroughput[j-1];
_peakTunnelThroughput[i] = tunnelByteLifetime;
break;
}
}
}
}
}
public double getPeakTunnelThroughputKBps() {
double rv = 0;
for (int i = 0; i < THROUGHPUT_COUNT; i++)
rv += _peakTunnelThroughput[i];
rv /= (10d*60d*1024d*(double)THROUGHPUT_COUNT);
return rv;
}
public void setPeakTunnelThroughputKBps(double kBps) {
_peakTunnelThroughput[0] = kBps*60d*10d*1024d;
}
/** total number of bytes pushed through a single tunnel in a 1 minute period */
private final double _peakTunnel1mThroughput[] = new double[THROUGHPUT_COUNT];
/** the tunnel pushed that much data in a 1 minute period */
void dataPushed1m(int size) {
double lowPeak = _peakTunnel1mThroughput[THROUGHPUT_COUNT-1];
if (size > lowPeak) {
synchronized (_peakTunnel1mThroughput) {
for (int i = 0; i < THROUGHPUT_COUNT; i++) {
if (size > _peakTunnel1mThroughput[i]) {
for (int j = THROUGHPUT_COUNT-1; j > i; j--)
_peakTunnel1mThroughput[j] = _peakTunnel1mThroughput[j-1];
_peakTunnel1mThroughput[i] = size;
break;
}
}
}
if (_log.shouldLog(Log.WARN) ) {
StringBuffer buf = new StringBuffer(128);
buf.append("Updating 1m throughput after ").append(size).append(" to ");
for (int i = 0; i < THROUGHPUT_COUNT; i++)
buf.append(_peakTunnel1mThroughput[i]).append(',');
buf.append(" for ").append(_peer.toBase64());
_log.warn(buf.toString());
}
}
}
public double getPeakTunnel1mThroughputKBps() {
double rv = 0;
for (int i = 0; i < THROUGHPUT_COUNT; i++)
rv += _peakTunnel1mThroughput[i];
rv /= (60d*1024d*(double)THROUGHPUT_COUNT);
return rv;
}
public void setPeakTunnel1mThroughputKBps(double kBps) {
_peakTunnel1mThroughput[0] = kBps*60*1024;
}
/** /**
* when the given peer is performing so poorly that we don't want to bother keeping * when the given peer is performing so poorly that we don't want to bother keeping
* extensive stats on them, call this to discard excess data points. Specifically, * extensive stats on them, call this to discard excess data points. Specifically,
@@ -307,19 +389,67 @@ public class PeerProfile {
_dbIntroduction.setStatLog(_context.statManager().getStatLog()); _dbIntroduction.setStatLog(_context.statManager().getStatLog());
_expanded = true; _expanded = true;
} }
/** once a day, on average, cut the measured throughtput values in half */
private static final long DROP_PERIOD_MINUTES = 24*60;
private long _lastCoalesceDate = System.currentTimeMillis(); private long _lastCoalesceDate = System.currentTimeMillis();
private void coalesceThroughput() { private void coalesceThroughput() {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
long measuredPeriod = now - _lastCoalesceDate; long measuredPeriod = now - _lastCoalesceDate;
if (measuredPeriod >= 60*1000) { if (measuredPeriod >= 60*1000) {
long tot = _peakThroughputCurrentTotal; long tot = _peakThroughputCurrentTotal;
double peak = _peakThroughput; double lowPeak = _peakThroughput[THROUGHPUT_COUNT-1];
if (tot >= peak) if (tot > lowPeak) {
_peakThroughput = tot; for (int i = 0; i < THROUGHPUT_COUNT; i++) {
if (tot > _peakThroughput[i]) {
for (int j = THROUGHPUT_COUNT-1; j > i; j--)
_peakThroughput[j] = _peakThroughput[j-1];
_peakThroughput[i] = tot;
break;
}
}
if (false && _log.shouldLog(Log.WARN) ) {
StringBuffer buf = new StringBuffer(128);
buf.append("Updating throughput after ").append(tot).append(" to ");
for (int i = 0; i < THROUGHPUT_COUNT; i++)
buf.append(_peakThroughput[i]).append(',');
buf.append(" for ").append(_peer.toBase64());
_log.warn(buf.toString());
}
} else {
if (_context.random().nextLong(DROP_PERIOD_MINUTES*2) <= 0) {
for (int i = 0; i < THROUGHPUT_COUNT; i++)
_peakThroughput[i] /= 2;
if (false && _log.shouldLog(Log.WARN) ) {
StringBuffer buf = new StringBuffer(128);
buf.append("Degrading the throughput measurements to ");
for (int i = 0; i < THROUGHPUT_COUNT; i++)
buf.append(_peakThroughput[i]).append(',');
buf.append(" for ").append(_peer.toBase64());
_log.warn(buf.toString());
}
}
}
// we degrade the tunnel throughput here too, regardless of the current
// activity
if (_context.random().nextLong(DROP_PERIOD_MINUTES*2) <= 0) {
for (int i = 0; i < THROUGHPUT_COUNT; i++) {
_peakTunnelThroughput[i] /= 2;
_peakTunnel1mThroughput[i] /= 2;
}
if (_log.shouldLog(Log.WARN) ) {
StringBuffer buf = new StringBuffer(128);
buf.append("Degrading the tunnel throughput measurements to ");
for (int i = 0; i < THROUGHPUT_COUNT; i++)
buf.append(_peakTunnel1mThroughput[i]).append(',');
buf.append(" for ").append(_peer.toBase64());
_log.warn(buf.toString());
}
}
_peakThroughputCurrentTotal = 0; _peakThroughputCurrentTotal = 0;
if ( (tot > 0) && _log.shouldLog(Log.WARN) )
_log.warn("updating throughput after " + tot + " to " + (_peakThroughput/60d) + " for " + _peer.toBase64());
_lastCoalesceDate = now; _lastCoalesceDate = now;
} }
} }

View File

@@ -124,6 +124,23 @@ public class ProfileManagerImpl implements ProfileManager {
if (data != null) if (data != null)
data.dataPushed(size); // ignore rtt, as we are averaging over a minute data.dataPushed(size); // ignore rtt, as we are averaging over a minute
} }
public void tunnelDataPushed1m(Hash peer, int size) {
if (_context.routerHash().equals(peer))
return;
PeerProfile data = getProfile(peer);
if (data != null)
data.dataPushed1m(size);
}
public void tunnelLifetimePushed(Hash peer, long lifetime, long size) {
if (_context.routerHash().equals(peer))
return;
PeerProfile data = getProfile(peer);
if (data != null)
data.tunnelDataTransferred(size);
}
private int getSlowThreshold() { private int getSlowThreshold() {
// perhaps we should have this compare vs. tunnel.testSuccessTime? // perhaps we should have this compare vs. tunnel.testSuccessTime?

View File

@@ -124,7 +124,7 @@ class ProfileOrganizerRenderer {
} }
buf.append("</table>"); buf.append("</table>");
buf.append("<i>Definitions:<ul>"); buf.append("<i>Definitions:<ul>");
buf.append("<li><b>speed</b>: peak throughput (bytes per second) over a 1 minute period that the peer has sustained</li>"); buf.append("<li><b>speed</b>: peak throughput (bytes per second) over a 1 minute period that the peer has sustained in a single tunnel</li>");
buf.append("<li><b>capacity</b>: how many tunnels can we ask them to join in an hour?</li>"); buf.append("<li><b>capacity</b>: how many tunnels can we ask them to join in an hour?</li>");
buf.append("<li><b>integration</b>: how many new peers have they told us about lately?</li>"); buf.append("<li><b>integration</b>: how many new peers have they told us about lately?</li>");
buf.append("<li><b>failing?</b>: is the peer currently swamped (and if possible we should avoid nagging them)?</li>"); buf.append("<li><b>failing?</b>: is the peer currently swamped (and if possible we should avoid nagging them)?</li>");

View File

@@ -123,6 +123,8 @@ class ProfilePersistenceHelper {
buf.append("# moving average as to how fast the peer replies").append(NL); buf.append("# moving average as to how fast the peer replies").append(NL);
buf.append("tunnelTestTimeAverage=").append(profile.getTunnelTestTimeAverage()).append(NL); buf.append("tunnelTestTimeAverage=").append(profile.getTunnelTestTimeAverage()).append(NL);
buf.append("tunnelPeakThroughput=").append(profile.getPeakThroughputKBps()).append(NL); buf.append("tunnelPeakThroughput=").append(profile.getPeakThroughputKBps()).append(NL);
buf.append("tunnelPeakTunnelThroughput=").append(profile.getPeakTunnelThroughputKBps()).append(NL);
buf.append("tunnelPeakTunnel1mThroughput=").append(profile.getPeakTunnel1mThroughputKBps()).append(NL);
buf.append(NL); buf.append(NL);
out.write(buf.toString().getBytes()); out.write(buf.toString().getBytes());
@@ -209,6 +211,8 @@ class ProfilePersistenceHelper {
profile.setLastHeardFrom(getLong(props, "lastHeardFrom")); profile.setLastHeardFrom(getLong(props, "lastHeardFrom"));
profile.setTunnelTestTimeAverage(getDouble(props, "tunnelTestTimeAverage")); profile.setTunnelTestTimeAverage(getDouble(props, "tunnelTestTimeAverage"));
profile.setPeakThroughputKBps(getDouble(props, "tunnelPeakThroughput")); profile.setPeakThroughputKBps(getDouble(props, "tunnelPeakThroughput"));
profile.setPeakTunnelThroughputKBps(getDouble(props, "tunnelPeakTunnelThroughput"));
profile.setPeakTunnel1mThroughputKBps(getDouble(props, "tunnelPeakTunnel1mThroughput"));
profile.getTunnelHistory().load(props); profile.getTunnelHistory().load(props);
profile.getDBHistory().load(props); profile.getDBHistory().load(props);

View File

@@ -38,8 +38,14 @@ public class SpeedCalculator extends Calculator {
} }
public double calc(PeerProfile profile) { public double calc(PeerProfile profile) {
if (true) return profile.getPeakThroughputKBps()*1024d; if (true) // measures 1 minute throughput of individual tunnels
if (true) return calcAverage(profile); return profile.getPeakTunnel1mThroughputKBps()*1024d;
if (true) // measures throughput of individual tunnels
return profile.getPeakTunnelThroughputKBps()*1024d;
if (true) // measures throughput across all tunnels
return profile.getPeakThroughputKBps()*1024d;
if (true)
return calcAverage(profile);
long threshold = getEventThreshold(); long threshold = getEventThreshold();
boolean tunnelTestOnly = getUseTunnelTestOnly(); boolean tunnelTestOnly = getUseTunnelTestOnly();

View File

@@ -83,6 +83,7 @@ public class InboundEndpointProcessor {
_log.debug("Received a " + length + "byte message through tunnel " + _config); _log.debug("Received a " + length + "byte message through tunnel " + _config);
for (int i = 0; i < _config.getLength(); i++) for (int i = 0; i < _config.getLength(); i++)
ctx.profileManager().tunnelDataPushed(_config.getPeer(i), rtt, length); ctx.profileManager().tunnelDataPushed(_config.getPeer(i), rtt, length);
_config.incrementVerifiedBytesTransferred(length);
} }
return true; return true;

View File

@@ -9,6 +9,7 @@ import net.i2p.data.Base64;
import net.i2p.data.Hash; import net.i2p.data.Hash;
import net.i2p.data.TunnelId; import net.i2p.data.TunnelId;
import net.i2p.router.TunnelInfo; import net.i2p.router.TunnelInfo;
import net.i2p.router.RouterContext;
/** /**
* Coordinate the info that the tunnel creator keeps track of, including what * Coordinate the info that the tunnel creator keeps track of, including what
@@ -16,6 +17,7 @@ import net.i2p.router.TunnelInfo;
* *
*/ */
public class TunnelCreatorConfig implements TunnelInfo { public class TunnelCreatorConfig implements TunnelInfo {
protected RouterContext _context;
/** only necessary for client tunnels */ /** only necessary for client tunnels */
private Hash _destination; private Hash _destination;
/** gateway first */ /** gateway first */
@@ -25,11 +27,13 @@ public class TunnelCreatorConfig implements TunnelInfo {
private long _expiration; private long _expiration;
private boolean _isInbound; private boolean _isInbound;
private long _messagesProcessed; private long _messagesProcessed;
private volatile long _verifiedBytesTransferred;
public TunnelCreatorConfig(int length, boolean isInbound) { public TunnelCreatorConfig(RouterContext ctx, int length, boolean isInbound) {
this(length, isInbound, null); this(ctx, length, isInbound, null);
} }
public TunnelCreatorConfig(int length, boolean isInbound, Hash destination) { public TunnelCreatorConfig(RouterContext ctx, int length, boolean isInbound, Hash destination) {
_context = ctx;
if (length <= 0) if (length <= 0)
throw new IllegalArgumentException("0 length? 0 hop tunnels are 1 length!"); throw new IllegalArgumentException("0 length? 0 hop tunnels are 1 length!");
_config = new HopConfig[length]; _config = new HopConfig[length];
@@ -40,6 +44,7 @@ public class TunnelCreatorConfig implements TunnelInfo {
_isInbound = isInbound; _isInbound = isInbound;
_destination = destination; _destination = destination;
_messagesProcessed = 0; _messagesProcessed = 0;
_verifiedBytesTransferred = 0;
} }
/** how many hops are there in the tunnel? */ /** how many hops are there in the tunnel? */
@@ -84,6 +89,45 @@ public class TunnelCreatorConfig implements TunnelInfo {
public void incrementProcessedMessages() { _messagesProcessed++; } public void incrementProcessedMessages() { _messagesProcessed++; }
public long getProcessedMessagesCount() { return _messagesProcessed; } public long getProcessedMessagesCount() { return _messagesProcessed; }
public void incrementVerifiedBytesTransferred(int bytes) {
_verifiedBytesTransferred += bytes;
_peakThroughputCurrentTotal += bytes;
long now = System.currentTimeMillis();
long timeSince = now - _peakThroughputLastCoallesce;
if (timeSince >= 60*1000) {
long tot = _peakThroughputCurrentTotal;
double normalized = (double)tot * 60d*1000d / (double)timeSince;
_peakThroughputLastCoallesce = now;
_peakThroughputCurrentTotal = 0;
for (int i = 0; i < _peers.length; i++)
_context.profileManager().tunnelDataPushed1m(_peers[i], (int)normalized);
}
}
public long getVerifiedBytesTransferred() { return _verifiedBytesTransferred; }
private static final int THROUGHPUT_COUNT = 3;
/**
* fastest 1 minute throughput, in bytes per minute, ordered with fastest
* first.
*/
private final double _peakThroughput[] = new double[THROUGHPUT_COUNT];
private volatile long _peakThroughputCurrentTotal;
private volatile long _peakThroughputLastCoallesce = System.currentTimeMillis();
public double getPeakThroughputKBps() {
double rv = 0;
for (int i = 0; i < THROUGHPUT_COUNT; i++)
rv += _peakThroughput[i];
rv /= (60d*1024d*(double)THROUGHPUT_COUNT);
return rv;
}
public void setPeakThroughputKBps(double kBps) {
_peakThroughput[0] = kBps*60*1024;
//for (int i = 0; i < THROUGHPUT_COUNT; i++)
// _peakThroughput[i] = kBps*60;
}
public String toString() { public String toString() {
// H0:1235-->H1:2345-->H2:2345 // H0:1235-->H1:2345-->H2:2345
StringBuffer buf = new StringBuffer(128); StringBuffer buf = new StringBuffer(128);

View File

@@ -84,6 +84,8 @@ public class TunnelParticipant {
+ " for " + msg); + " for " + msg);
_config.incrementProcessedMessages(); _config.incrementProcessedMessages();
send(_config, msg, ri); send(_config, msg, ri);
if (_config != null)
incrementThroughput(_config.getReceiveFrom());
} else { } else {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Lookup the nextHop (" + _config.getSendTo().toBase64().substring(0,4) _log.warn("Lookup the nextHop (" + _config.getSendTo().toBase64().substring(0,4)
@@ -98,6 +100,30 @@ public class TunnelParticipant {
} }
} }
private int _periodMessagesTransferred;
private long _lastCoallesced = System.currentTimeMillis();
/**
* take note that the peers specified were able to push us data. hmm, is this safe?
* this could be easily gamed to get us to rank some peer of their choosing as quite
* fast. That peer would have to actually be quite fast, but having a remote peer
* influence who we spend our time profiling is dangerous, so this will be disabled for
* now.
*/
private void incrementThroughput(Hash prev) {
if (true) return;
long now = System.currentTimeMillis();
long timeSince = now - _lastCoallesced;
if (timeSince >= 60*1000) {
int amount = 1024 * _periodMessagesTransferred;
int normalized = (int)((double)amount * 60d*1000d / (double)timeSince);
_periodMessagesTransferred = 0;
_lastCoallesced = now;
_context.profileManager().tunnelDataPushed1m(prev, normalized);
} else {
_periodMessagesTransferred++;
}
}
public int getCompleteCount() { public int getCompleteCount() {
if (_handler != null) if (_handler != null)
return _handler.getCompleteCount(); return _handler.getCompleteCount();

View File

@@ -10,7 +10,6 @@ import net.i2p.router.tunnel.TunnelCreatorConfig;
* *
*/ */
public class PooledTunnelCreatorConfig extends TunnelCreatorConfig { public class PooledTunnelCreatorConfig extends TunnelCreatorConfig {
private RouterContext _context;
private TunnelPool _pool; private TunnelPool _pool;
private boolean _failed; private boolean _failed;
private TestJob _testJob; private TestJob _testJob;
@@ -23,8 +22,7 @@ public class PooledTunnelCreatorConfig extends TunnelCreatorConfig {
this(ctx, length, isInbound, null); this(ctx, length, isInbound, null);
} }
public PooledTunnelCreatorConfig(RouterContext ctx, int length, boolean isInbound, Hash destination) { public PooledTunnelCreatorConfig(RouterContext ctx, int length, boolean isInbound, Hash destination) {
super(length, isInbound, destination); super(ctx, length, isInbound, destination);
_context = ctx;
_failed = false; _failed = false;
_pool = null; _pool = null;
} }

View File

@@ -339,6 +339,11 @@ public class TunnelPool {
_lifetimeProcessed += info.getProcessedMessagesCount(); _lifetimeProcessed += info.getProcessedMessagesCount();
long lifetimeConfirmed = info.getVerifiedBytesTransferred();
long lifetime = 10*60*1000;
for (int i = 0; i < info.getLength(); i++)
_context.profileManager().tunnelLifetimePushed(info.getPeer(i), lifetime, lifetimeConfirmed);
if (_settings.isInbound() && (_settings.getDestination() != null) ) { if (_settings.isInbound() && (_settings.getDestination() != null) ) {
if (ls != null) { if (ls != null) {
_context.clientManager().requestLeaseSet(_settings.getDestination(), ls); _context.clientManager().requestLeaseSet(_settings.getDestination(), ls);