From a997a4604092b4a0e976b95ed75746910daf094c Mon Sep 17 00:00:00 2001 From: jrandom Date: Thu, 17 Mar 2005 22:12:51 +0000 Subject: [PATCH] 2005-03-17 jrandom * Update the old speed calculator and associated profile data points to use a non-tiered moving average of the tunnel test time, avoiding the freshness issues of the old tiered speed stats. * Explicitly synchronize all of the methods on the PRNG, rather than just the feeder methods (sun and kaffe only need the feeder, but it seems ibm needs all of them synchronized). * Properly use the tunnel tests as part of the profile stats. * Don't flood the jobqueue with sequential persist profile tasks, but instead, inject a brief scheduling delay between them. * Reduce the TCP connection establishment timeout to 20s (which is still absurdly excessive) * Reduced the max resend delay to 30s so we can get some resends in when dealing with client apps that hang up early (e.g. wget) * Added more alternative socketManager factories (good call aum!) --- .../streaming/I2PSocketManagerFactory.java | 20 +++++++ .../net/i2p/client/streaming/Connection.java | 2 +- core/java/src/net/i2p/util/RandomSource.java | 45 +++++++++++++--- history.txt | 18 ++++++- .../src/net/i2p/router/OutNetMessage.java | 8 +-- .../src/net/i2p/router/RouterVersion.java | 4 +- .../i2p/router/peermanager/PeerProfile.java | 24 ++++++++- .../peermanager/PersistProfilesJob.java | 2 +- .../peermanager/ProfileManagerImpl.java | 1 + .../peermanager/ProfilePersistenceHelper.java | 15 ++++++ .../router/peermanager/SpeedCalculator.java | 15 ++++-- .../transport/tcp/ConnectionBuilder.java | 2 +- .../router/transport/tcp/TCPTransport.java | 1 + .../net/i2p/router/tunnel/pool/TestJob.java | 53 +++++++++++++------ 14 files changed, 170 insertions(+), 40 deletions(-) diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java index c4b37ec4a..07d14220f 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java @@ -38,6 +38,26 @@ public class I2PSocketManagerFactory { public static I2PSocketManager createManager() { return createManager(getHost(), getPort(), System.getProperties()); } + + /** + * Create a socket manager using a brand new destination connected to the + * I2CP router on the local machine on the default port (7654). + * + * @return the newly created socket manager, or null if there were errors + */ + public static I2PSocketManager createManager(Properties opts) { + return createManager(getHost(), getPort(), opts); + } + + /** + * Create a socket manager using a brand new destination connected to the + * I2CP router on the specified host and port + * + * @return the newly created socket manager, or null if there were errors + */ + public static I2PSocketManager createManager(String host, int port) { + return createManager(host, port, System.getProperties()); + } /** * Create a socket manager using a brand new destination connected to the diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index ad9eaafc3..9afa42581 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -71,7 +71,7 @@ public class Connection { private long _lifetimeDupMessageSent; private long _lifetimeDupMessageReceived; - public static final long MAX_RESEND_DELAY = 60*1000; + public static final long MAX_RESEND_DELAY = 30*1000; public static final long MIN_RESEND_DELAY = 10*1000; /** wait up to 5 minutes after disconnection so we can ack/close packets */ diff --git a/core/java/src/net/i2p/util/RandomSource.java b/core/java/src/net/i2p/util/RandomSource.java index cf320d599..414f90839 100644 --- a/core/java/src/net/i2p/util/RandomSource.java +++ b/core/java/src/net/i2p/util/RandomSource.java @@ -42,7 +42,7 @@ public class RandomSource extends SecureRandom { * thats what it has been used for. * */ - public int nextInt(int n) { + public synchronized int nextInt(int n) { if (n == 0) return 0; int val = super.nextInt(n); if (val < 0) val = 0 - val; @@ -54,19 +54,48 @@ public class RandomSource extends SecureRandom { * Like the modified nextInt, nextLong(n) returns a random number from 0 through n, * including 0, excluding n. */ - public long nextLong(long n) { + public synchronized long nextLong(long n) { long v = super.nextLong(); if (v < 0) v = 0 - v; if (v >= n) v = v % n; return v; } - /** synchronized for older versions of kaffe */ - public void nextBytes(byte bytes[]) { - synchronized (this) { - super.nextBytes(bytes); - } - } + /** + * override as synchronized, for those JVMs that don't always pull via + * nextBytes (cough ibm) + */ + public synchronized boolean nextBoolean() { return super.nextBoolean(); } + /** + * override as synchronized, for those JVMs that don't always pull via + * nextBytes (cough ibm) + */ + public synchronized void nextBytes(byte buf[]) { super.nextBytes(buf); } + /** + * override as synchronized, for those JVMs that don't always pull via + * nextBytes (cough ibm) + */ + public synchronized double nextDouble() { return super.nextDouble(); } + /** + * override as synchronized, for those JVMs that don't always pull via + * nextBytes (cough ibm) + */ + public synchronized float nextFloat() { return super.nextFloat(); } + /** + * override as synchronized, for those JVMs that don't always pull via + * nextBytes (cough ibm) + */ + public synchronized double nextGaussian() { return super.nextGaussian(); } + /** + * override as synchronized, for those JVMs that don't always pull via + * nextBytes (cough ibm) + */ + public synchronized int nextInt() { return super.nextInt(); } + /** + * override as synchronized, for those JVMs that don't always pull via + * nextBytes (cough ibm) + */ + public synchronized long nextLong() { return super.nextLong(); } public EntropyHarvester harvester() { return _entropyHarvester; } diff --git a/history.txt b/history.txt index 537d35ffb..d60b90408 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,20 @@ -$Id: history.txt,v 1.169 2005/03/14 22:47:15 jrandom Exp $ +$Id: history.txt,v 1.170 2005/03/17 00:29:55 jrandom Exp $ + +2005-03-17 jrandom + * Update the old speed calculator and associated profile data points to + use a non-tiered moving average of the tunnel test time, avoiding the + freshness issues of the old tiered speed stats. + * Explicitly synchronize all of the methods on the PRNG, rather than just + the feeder methods (sun and kaffe only need the feeder, but it seems ibm + needs all of them synchronized). + * Properly use the tunnel tests as part of the profile stats. + * Don't flood the jobqueue with sequential persist profile tasks, but + instead, inject a brief scheduling delay between them. + * Reduce the TCP connection establishment timeout to 20s (which is still + absurdly excessive) + * Reduced the max resend delay to 30s so we can get some resends in when + dealing with client apps that hang up early (e.g. wget) + * Added more alternative socketManager factories (good call aum!) 2005-03-16 jrandom * Adjust the old speed calculator to include end to end RTT data in its diff --git a/router/java/src/net/i2p/router/OutNetMessage.java b/router/java/src/net/i2p/router/OutNetMessage.java index 808dcecde..cc7ece804 100644 --- a/router/java/src/net/i2p/router/OutNetMessage.java +++ b/router/java/src/net/i2p/router/OutNetMessage.java @@ -89,7 +89,7 @@ public class OutNetMessage { */ public long timestamp(String eventName) { long now = _context.clock().now(); - if (_log.shouldLog(Log.DEBUG)) { + if (_log.shouldLog(Log.INFO)) { // only timestamp if we are debugging synchronized (this) { locked_initTimestamps(); @@ -103,7 +103,7 @@ public class OutNetMessage { return now - _created; } public Map getTimestamps() { - if (_log.shouldLog(Log.DEBUG)) { + if (_log.shouldLog(Log.INFO)) { synchronized (this) { locked_initTimestamps(); return (Map)_timestamps.clone(); @@ -112,7 +112,7 @@ public class OutNetMessage { return Collections.EMPTY_MAP; } public Long getTimestamp(String eventName) { - if (_log.shouldLog(Log.DEBUG)) { + if (_log.shouldLog(Log.INFO)) { synchronized (this) { locked_initTimestamps(); return (Long)_timestamps.get(eventName); @@ -301,7 +301,7 @@ public class OutNetMessage { } private void renderTimestamps(StringBuffer buf) { - if (_log.shouldLog(Log.DEBUG)) { + if (_log.shouldLog(Log.INFO)) { synchronized (this) { long lastWhen = -1; for (int i = 0; i < _timestampOrder.size(); i++) { diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 69ce2378a..5106f0782 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.163 $ $Date: 2005/03/14 22:47:15 $"; + public final static String ID = "$Revision: 1.164 $ $Date: 2005/03/17 00:29:55 $"; public final static String VERSION = "0.5.0.2"; - public final static long BUILD = 4; + public final static long BUILD = 5; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/peermanager/PeerProfile.java b/router/java/src/net/i2p/router/peermanager/PeerProfile.java index c97395823..5d830c625 100644 --- a/router/java/src/net/i2p/router/peermanager/PeerProfile.java +++ b/router/java/src/net/i2p/router/peermanager/PeerProfile.java @@ -19,6 +19,7 @@ public class PeerProfile { private long _lastSentToSuccessfully; private long _lastFailedSend; private long _lastHeardFrom; + private double _tunnelTestResponseTimeAvg; // periodic rates private RateStat _sendSuccessSize = null; private RateStat _sendFailureSize = null; @@ -61,6 +62,7 @@ public class PeerProfile { _integrationValue = 0; _isFailing = false; _consecutiveShitlists = 0; + _tunnelTestResponseTimeAvg = 0.0d; _peer = peer; if (expand) expandProfile(); @@ -213,6 +215,24 @@ public class PeerProfile { * is this peer actively failing (aka not worth touching)? */ public boolean getIsFailing() { return _isFailing; } + + public double getTunnelTestTimeAverage() { return _tunnelTestResponseTimeAvg; } + void setTunnelTestTimeAverage(double avg) { _tunnelTestResponseTimeAvg = avg; } + + void updateTunnelTestTimeAverage(long ms) { + if (_tunnelTestResponseTimeAvg <= 0) + _tunnelTestResponseTimeAvg = 30*1000; // should we instead start at $ms? + + // weighted since we want to let the average grow quickly and shrink slowly + if (ms < _tunnelTestResponseTimeAvg) + _tunnelTestResponseTimeAvg = 0.95*_tunnelTestResponseTimeAvg + .05*(double)ms; + else + _tunnelTestResponseTimeAvg = 0.75*_tunnelTestResponseTimeAvg + .25*(double)ms; + + if ( (_peer != null) && (_log.shouldLog(Log.INFO)) ) + _log.info("Updating tunnel test time for " + _peer.toBase64().substring(0,6) + + " to " + _tunnelTestResponseTimeAvg + " via " + ms); + } /** * when the given peer is performing so poorly that we don't want to bother keeping @@ -256,9 +276,9 @@ public class PeerProfile { if (_tunnelCreateResponseTime == null) _tunnelCreateResponseTime = new RateStat("tunnelCreateResponseTime", "how long it takes to get a tunnel create response from the peer (in milliseconds)", group, new long[] { 10*60*1000l, 30*60*1000l, 60*60*1000l, 24*60*60*1000 } ); if (_tunnelTestResponseTime == null) - _tunnelTestResponseTime = new RateStat("tunnelTestResponseTime", "how long it takes to successfully test a tunnel this peer participates in (in milliseconds)", group, new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000 } ); + _tunnelTestResponseTime = new RateStat("tunnelTestResponseTime", "how long it takes to successfully test a tunnel this peer participates in (in milliseconds)", group, new long[] { 10*60*1000l, 30*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000 } ); if (_tunnelTestResponseTimeSlow == null) - _tunnelTestResponseTimeSlow = new RateStat("tunnelTestResponseTimeSlow", "how long it takes to successfully test a peer when the time exceeds 5s", group, new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l, }); + _tunnelTestResponseTimeSlow = new RateStat("tunnelTestResponseTimeSlow", "how long it takes to successfully test a peer when the time exceeds 5s", group, new long[] { 10*60*1000l, 30*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l, }); if (_commError == null) _commError = new RateStat("commErrorRate", "how long between communication errors with the peer (e.g. disconnection)", group, new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000 } ); if (_dbIntroduction == null) diff --git a/router/java/src/net/i2p/router/peermanager/PersistProfilesJob.java b/router/java/src/net/i2p/router/peermanager/PersistProfilesJob.java index 8d9fa3bae..e900d612c 100644 --- a/router/java/src/net/i2p/router/peermanager/PersistProfilesJob.java +++ b/router/java/src/net/i2p/router/peermanager/PersistProfilesJob.java @@ -46,7 +46,7 @@ class PersistProfilesJob extends JobImpl { PersistProfilesJob.this.getContext().jobQueue().addJob(PersistProfilesJob.this); } else { // we've got peers left to persist, so requeue the persist profile job - PersistProfilesJob.this.getContext().jobQueue().addJob(PersistProfileJob.this); + PersistProfilesJob.PersistProfileJob.this.requeue(1000); } } public String getName() { return "Persist profile"; } diff --git a/router/java/src/net/i2p/router/peermanager/ProfileManagerImpl.java b/router/java/src/net/i2p/router/peermanager/ProfileManagerImpl.java index d7fb12500..474588fd3 100644 --- a/router/java/src/net/i2p/router/peermanager/ProfileManagerImpl.java +++ b/router/java/src/net/i2p/router/peermanager/ProfileManagerImpl.java @@ -111,6 +111,7 @@ public class ProfileManagerImpl implements ProfileManager { public void tunnelTestSucceeded(Hash peer, long responseTimeMs) { PeerProfile data = getProfile(peer); if (data == null) return; + data.updateTunnelTestTimeAverage(responseTimeMs); data.getTunnelTestResponseTime().addData(responseTimeMs, responseTimeMs); if (responseTimeMs > getSlowThreshold()) data.getTunnelTestResponseTimeSlow().addData(responseTimeMs, responseTimeMs); diff --git a/router/java/src/net/i2p/router/peermanager/ProfilePersistenceHelper.java b/router/java/src/net/i2p/router/peermanager/ProfilePersistenceHelper.java index c384caab3..18d5e3c36 100644 --- a/router/java/src/net/i2p/router/peermanager/ProfilePersistenceHelper.java +++ b/router/java/src/net/i2p/router/peermanager/ProfilePersistenceHelper.java @@ -109,6 +109,8 @@ class ProfilePersistenceHelper { buf.append("lastFailedSend=").append(profile.getLastSendFailed()).append(NL); buf.append("# Last heard from: when did we last get a message from the peer? (milliseconds from the epoch)").append(NL); buf.append("lastHeardFrom=").append(profile.getLastHeardFrom()).append(NL); + buf.append("# moving average as to how fast the peer replies").append(NL); + buf.append("tunnelTestTimeAverage=").append(profile.getTunnelTestTimeAverage()).append(NL); buf.append(NL); out.write(buf.toString().getBytes()); @@ -178,6 +180,7 @@ class ProfilePersistenceHelper { profile.setLastSendSuccessful(getLong(props, "lastSentToSuccessfully")); profile.setLastSendFailed(getLong(props, "lastFailedSend")); profile.setLastHeardFrom(getLong(props, "lastHeardFrom")); + profile.setTunnelTestTimeAverage(getDouble(props, "tunnelTestTimeAverage")); profile.getTunnelHistory().load(props); profile.getDBHistory().load(props); @@ -214,6 +217,18 @@ class ProfilePersistenceHelper { } return 0; } + + private final static double getDouble(Properties props, String key) { + String val = props.getProperty(key); + if (val != null) { + try { + return Double.parseDouble(val); + } catch (NumberFormatException nfe) { + return 0.0; + } + } + return 0.0; + } private void loadProps(Properties props, File file) { try { diff --git a/router/java/src/net/i2p/router/peermanager/SpeedCalculator.java b/router/java/src/net/i2p/router/peermanager/SpeedCalculator.java index 261cab163..ad78a4ed7 100644 --- a/router/java/src/net/i2p/router/peermanager/SpeedCalculator.java +++ b/router/java/src/net/i2p/router/peermanager/SpeedCalculator.java @@ -38,6 +38,7 @@ public class SpeedCalculator extends Calculator { } public double calc(PeerProfile profile) { + if (true) return calcAverage(profile); long threshold = getEventThreshold(); boolean tunnelTestOnly = getUseTunnelTestOnly(); @@ -109,16 +110,24 @@ public class SpeedCalculator extends Calculator { return rv; } + private double calcAverage(PeerProfile profile) { + double avg = profile.getTunnelTestTimeAverage(); + if (avg == 0) + return 0.0; + else + return (60.0*1000.0) / avg; + } + private double adjust(long period, double value) { switch ((int)period) { case 10*60*1000: return value; case 60*60*1000: - return value * 0.5; + return value * 0.75; case 24*60*60*1000: - return value * 0.001; + return value * 0.1; default: - return value * 0.0001; + return value * 0.01; } } diff --git a/router/java/src/net/i2p/router/transport/tcp/ConnectionBuilder.java b/router/java/src/net/i2p/router/transport/tcp/ConnectionBuilder.java index af06802c1..a9ad1c675 100644 --- a/router/java/src/net/i2p/router/transport/tcp/ConnectionBuilder.java +++ b/router/java/src/net/i2p/router/transport/tcp/ConnectionBuilder.java @@ -76,7 +76,7 @@ public class ConnectionBuilder { private String _error; /** If the connection hasn't been built in 30 seconds, give up */ - public static final int CONNECTION_TIMEOUT = 30*1000; + public static final int CONNECTION_TIMEOUT = 20*1000; public static final int WRITE_BUFFER_SIZE = 2*1024; diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java b/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java index deac4229a..9f5217b21 100644 --- a/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java +++ b/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java @@ -196,6 +196,7 @@ public class TCPTransport extends TransportImpl { newPeer = true; } msgs.add(msg); + msg.timestamp("TCPTransport.outboundMessageReady queued behind " +(msgs.size()-1)); if (newPeer) _connectionLock.notifyAll(); diff --git a/router/java/src/net/i2p/router/tunnel/pool/TestJob.java b/router/java/src/net/i2p/router/tunnel/pool/TestJob.java index b7d10e61b..29c94a33b 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/TestJob.java +++ b/router/java/src/net/i2p/router/tunnel/pool/TestJob.java @@ -24,6 +24,8 @@ class TestJob extends JobImpl { private TunnelPool _pool; private PooledTunnelCreatorConfig _cfg; private boolean _found; + private TunnelInfo _outTunnel; + private TunnelInfo _replyTunnel; /** base to randomize the test delay on */ private static final int TEST_DELAY = 60*1000; @@ -50,19 +52,19 @@ class TestJob extends JobImpl { _found = false; // note: testing with exploratory tunnels always, even if the tested tunnel // is a client tunnel (per _cfg.getDestination()) - TunnelInfo replyTunnel = null; - TunnelInfo outTunnel = null; + _replyTunnel = null; + _outTunnel = null; if (_cfg.isInbound()) { - replyTunnel = _cfg; - outTunnel = getContext().tunnelManager().selectOutboundTunnel(); + _replyTunnel = _cfg; + _outTunnel = getContext().tunnelManager().selectOutboundTunnel(); } else { - replyTunnel = getContext().tunnelManager().selectInboundTunnel(); - outTunnel = _cfg; + _replyTunnel = getContext().tunnelManager().selectInboundTunnel(); + _outTunnel = _cfg; } - if ( (replyTunnel == null) || (outTunnel == null) ) { + if ( (_replyTunnel == null) || (_outTunnel == null) ) { if (_log.shouldLog(Log.ERROR)) - _log.error("Insufficient tunnels to test " + _cfg + " with: " + replyTunnel + " / " + outTunnel); + _log.error("Insufficient tunnels to test " + _cfg + " with: " + _replyTunnel + " / " + _outTunnel); getContext().statManager().addRateData("tunnel.testAborted", _cfg.getLength(), 0); scheduleRetest(); } else { @@ -77,15 +79,15 @@ class TestJob extends JobImpl { OnTestReply onReply = new OnTestReply(getContext()); OnTestTimeout onTimeout = new OnTestTimeout(getContext()); getContext().messageRegistry().registerPending(sel, onReply, onTimeout, 3*testPeriod); - sendTest(m, outTunnel, replyTunnel); + sendTest(m); } } - private void sendTest(I2NPMessage m, TunnelInfo outTunnel, TunnelInfo replyTunnel) { + private void sendTest(I2NPMessage m) { if (false) { - getContext().tunnelDispatcher().dispatchOutbound(m, outTunnel.getSendTunnelId(0), - replyTunnel.getReceiveTunnelId(0), - replyTunnel.getPeer(0)); + getContext().tunnelDispatcher().dispatchOutbound(m, _outTunnel.getSendTunnelId(0), + _replyTunnel.getReceiveTunnelId(0), + _replyTunnel.getPeer(0)); } else { // garlic route that DeliveryStatusMessage to ourselves so the endpoints and gateways // can't tell its a test. to simplify this, we encrypt it with a random key and tag, @@ -116,20 +118,35 @@ class TestJob extends JobImpl { getContext().sessionKeyManager().tagsReceived(encryptKey, encryptTags); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Sending garlic test of " + outTunnel + " / " + replyTunnel); - getContext().tunnelDispatcher().dispatchOutbound(msg, outTunnel.getSendTunnelId(0), - replyTunnel.getReceiveTunnelId(0), - replyTunnel.getPeer(0)); + _log.debug("Sending garlic test of " + _outTunnel + " / " + _replyTunnel); + getContext().tunnelDispatcher().dispatchOutbound(msg, _outTunnel.getSendTunnelId(0), + _replyTunnel.getReceiveTunnelId(0), + _replyTunnel.getPeer(0)); } } public void testSuccessful(int ms) { getContext().statManager().addRateData("tunnel.testSuccessLength", _cfg.getLength(), 0); getContext().statManager().addRateData("tunnel.testSuccessTime", ms, 0); + + noteSuccess(ms, _outTunnel); + noteSuccess(ms, _replyTunnel); + scheduleRetest(); } + private void noteSuccess(long ms, TunnelInfo tunnel) { + if (tunnel != null) + for (int i = 0; i < tunnel.getLength(); i++) + getContext().profileManager().tunnelTestSucceeded(tunnel.getPeer(i), ms); + } + private void testFailed(long timeToFail) { + if (_found) { + // ok, not really a /success/, but we did find it, even though slowly + noteSuccess(timeToFail, _outTunnel); + noteSuccess(timeToFail, _replyTunnel); + } if (_pool.getSettings().isExploratory()) getContext().statManager().addRateData("tunnel.testExploratoryFailedTime", timeToFail, timeToFail); else @@ -144,6 +161,8 @@ class TestJob extends JobImpl { /** how long we allow tests to run for before failing them */ private int getTestPeriod() { return 20*1000; } private void scheduleRetest() { + _outTunnel = null; + _replyTunnel = null; int delay = getDelay(); if (_cfg.getExpiration() > getContext().clock().now() + delay) requeue(delay);