diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java index c4b37ec4a..07d14220f 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java @@ -38,6 +38,26 @@ public class I2PSocketManagerFactory { public static I2PSocketManager createManager() { return createManager(getHost(), getPort(), System.getProperties()); } + + /** + * Create a socket manager using a brand new destination connected to the + * I2CP router on the local machine on the default port (7654). + * + * @return the newly created socket manager, or null if there were errors + */ + public static I2PSocketManager createManager(Properties opts) { + return createManager(getHost(), getPort(), opts); + } + + /** + * Create a socket manager using a brand new destination connected to the + * I2CP router on the specified host and port + * + * @return the newly created socket manager, or null if there were errors + */ + public static I2PSocketManager createManager(String host, int port) { + return createManager(host, port, System.getProperties()); + } /** * Create a socket manager using a brand new destination connected to the diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index ad9eaafc3..9afa42581 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -71,7 +71,7 @@ public class Connection { private long _lifetimeDupMessageSent; private long _lifetimeDupMessageReceived; - public static final long MAX_RESEND_DELAY = 60*1000; + public static final long MAX_RESEND_DELAY = 30*1000; public static final long MIN_RESEND_DELAY = 10*1000; /** wait up to 5 minutes after disconnection so we can ack/close packets */ diff --git a/core/java/src/net/i2p/util/RandomSource.java b/core/java/src/net/i2p/util/RandomSource.java index cf320d599..414f90839 100644 --- a/core/java/src/net/i2p/util/RandomSource.java +++ b/core/java/src/net/i2p/util/RandomSource.java @@ -42,7 +42,7 @@ public class RandomSource extends SecureRandom { * thats what it has been used for. * */ - public int nextInt(int n) { + public synchronized int nextInt(int n) { if (n == 0) return 0; int val = super.nextInt(n); if (val < 0) val = 0 - val; @@ -54,19 +54,48 @@ public class RandomSource extends SecureRandom { * Like the modified nextInt, nextLong(n) returns a random number from 0 through n, * including 0, excluding n. */ - public long nextLong(long n) { + public synchronized long nextLong(long n) { long v = super.nextLong(); if (v < 0) v = 0 - v; if (v >= n) v = v % n; return v; } - /** synchronized for older versions of kaffe */ - public void nextBytes(byte bytes[]) { - synchronized (this) { - super.nextBytes(bytes); - } - } + /** + * override as synchronized, for those JVMs that don't always pull via + * nextBytes (cough ibm) + */ + public synchronized boolean nextBoolean() { return super.nextBoolean(); } + /** + * override as synchronized, for those JVMs that don't always pull via + * nextBytes (cough ibm) + */ + public synchronized void nextBytes(byte buf[]) { super.nextBytes(buf); } + /** + * override as synchronized, for those JVMs that don't always pull via + * nextBytes (cough ibm) + */ + public synchronized double nextDouble() { return super.nextDouble(); } + /** + * override as synchronized, for those JVMs that don't always pull via + * nextBytes (cough ibm) + */ + public synchronized float nextFloat() { return super.nextFloat(); } + /** + * override as synchronized, for those JVMs that don't always pull via + * nextBytes (cough ibm) + */ + public synchronized double nextGaussian() { return super.nextGaussian(); } + /** + * override as synchronized, for those JVMs that don't always pull via + * nextBytes (cough ibm) + */ + public synchronized int nextInt() { return super.nextInt(); } + /** + * override as synchronized, for those JVMs that don't always pull via + * nextBytes (cough ibm) + */ + public synchronized long nextLong() { return super.nextLong(); } public EntropyHarvester harvester() { return _entropyHarvester; } diff --git a/history.txt b/history.txt index 537d35ffb..d60b90408 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,20 @@ -$Id: history.txt,v 1.169 2005/03/14 22:47:15 jrandom Exp $ +$Id: history.txt,v 1.170 2005/03/17 00:29:55 jrandom Exp $ + +2005-03-17 jrandom + * Update the old speed calculator and associated profile data points to + use a non-tiered moving average of the tunnel test time, avoiding the + freshness issues of the old tiered speed stats. + * Explicitly synchronize all of the methods on the PRNG, rather than just + the feeder methods (sun and kaffe only need the feeder, but it seems ibm + needs all of them synchronized). + * Properly use the tunnel tests as part of the profile stats. + * Don't flood the jobqueue with sequential persist profile tasks, but + instead, inject a brief scheduling delay between them. + * Reduce the TCP connection establishment timeout to 20s (which is still + absurdly excessive) + * Reduced the max resend delay to 30s so we can get some resends in when + dealing with client apps that hang up early (e.g. wget) + * Added more alternative socketManager factories (good call aum!) 2005-03-16 jrandom * Adjust the old speed calculator to include end to end RTT data in its diff --git a/router/java/src/net/i2p/router/OutNetMessage.java b/router/java/src/net/i2p/router/OutNetMessage.java index 808dcecde..cc7ece804 100644 --- a/router/java/src/net/i2p/router/OutNetMessage.java +++ b/router/java/src/net/i2p/router/OutNetMessage.java @@ -89,7 +89,7 @@ public class OutNetMessage { */ public long timestamp(String eventName) { long now = _context.clock().now(); - if (_log.shouldLog(Log.DEBUG)) { + if (_log.shouldLog(Log.INFO)) { // only timestamp if we are debugging synchronized (this) { locked_initTimestamps(); @@ -103,7 +103,7 @@ public class OutNetMessage { return now - _created; } public Map getTimestamps() { - if (_log.shouldLog(Log.DEBUG)) { + if (_log.shouldLog(Log.INFO)) { synchronized (this) { locked_initTimestamps(); return (Map)_timestamps.clone(); @@ -112,7 +112,7 @@ public class OutNetMessage { return Collections.EMPTY_MAP; } public Long getTimestamp(String eventName) { - if (_log.shouldLog(Log.DEBUG)) { + if (_log.shouldLog(Log.INFO)) { synchronized (this) { locked_initTimestamps(); return (Long)_timestamps.get(eventName); @@ -301,7 +301,7 @@ public class OutNetMessage { } private void renderTimestamps(StringBuffer buf) { - if (_log.shouldLog(Log.DEBUG)) { + if (_log.shouldLog(Log.INFO)) { synchronized (this) { long lastWhen = -1; for (int i = 0; i < _timestampOrder.size(); i++) { diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 69ce2378a..5106f0782 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.163 $ $Date: 2005/03/14 22:47:15 $"; + public final static String ID = "$Revision: 1.164 $ $Date: 2005/03/17 00:29:55 $"; public final static String VERSION = "0.5.0.2"; - public final static long BUILD = 4; + public final static long BUILD = 5; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/peermanager/PeerProfile.java b/router/java/src/net/i2p/router/peermanager/PeerProfile.java index c97395823..5d830c625 100644 --- a/router/java/src/net/i2p/router/peermanager/PeerProfile.java +++ b/router/java/src/net/i2p/router/peermanager/PeerProfile.java @@ -19,6 +19,7 @@ public class PeerProfile { private long _lastSentToSuccessfully; private long _lastFailedSend; private long _lastHeardFrom; + private double _tunnelTestResponseTimeAvg; // periodic rates private RateStat _sendSuccessSize = null; private RateStat _sendFailureSize = null; @@ -61,6 +62,7 @@ public class PeerProfile { _integrationValue = 0; _isFailing = false; _consecutiveShitlists = 0; + _tunnelTestResponseTimeAvg = 0.0d; _peer = peer; if (expand) expandProfile(); @@ -213,6 +215,24 @@ public class PeerProfile { * is this peer actively failing (aka not worth touching)? */ public boolean getIsFailing() { return _isFailing; } + + public double getTunnelTestTimeAverage() { return _tunnelTestResponseTimeAvg; } + void setTunnelTestTimeAverage(double avg) { _tunnelTestResponseTimeAvg = avg; } + + void updateTunnelTestTimeAverage(long ms) { + if (_tunnelTestResponseTimeAvg <= 0) + _tunnelTestResponseTimeAvg = 30*1000; // should we instead start at $ms? + + // weighted since we want to let the average grow quickly and shrink slowly + if (ms < _tunnelTestResponseTimeAvg) + _tunnelTestResponseTimeAvg = 0.95*_tunnelTestResponseTimeAvg + .05*(double)ms; + else + _tunnelTestResponseTimeAvg = 0.75*_tunnelTestResponseTimeAvg + .25*(double)ms; + + if ( (_peer != null) && (_log.shouldLog(Log.INFO)) ) + _log.info("Updating tunnel test time for " + _peer.toBase64().substring(0,6) + + " to " + _tunnelTestResponseTimeAvg + " via " + ms); + } /** * when the given peer is performing so poorly that we don't want to bother keeping @@ -256,9 +276,9 @@ public class PeerProfile { if (_tunnelCreateResponseTime == null) _tunnelCreateResponseTime = new RateStat("tunnelCreateResponseTime", "how long it takes to get a tunnel create response from the peer (in milliseconds)", group, new long[] { 10*60*1000l, 30*60*1000l, 60*60*1000l, 24*60*60*1000 } ); if (_tunnelTestResponseTime == null) - _tunnelTestResponseTime = new RateStat("tunnelTestResponseTime", "how long it takes to successfully test a tunnel this peer participates in (in milliseconds)", group, new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000 } ); + _tunnelTestResponseTime = new RateStat("tunnelTestResponseTime", "how long it takes to successfully test a tunnel this peer participates in (in milliseconds)", group, new long[] { 10*60*1000l, 30*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000 } ); if (_tunnelTestResponseTimeSlow == null) - _tunnelTestResponseTimeSlow = new RateStat("tunnelTestResponseTimeSlow", "how long it takes to successfully test a peer when the time exceeds 5s", group, new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l, }); + _tunnelTestResponseTimeSlow = new RateStat("tunnelTestResponseTimeSlow", "how long it takes to successfully test a peer when the time exceeds 5s", group, new long[] { 10*60*1000l, 30*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l, }); if (_commError == null) _commError = new RateStat("commErrorRate", "how long between communication errors with the peer (e.g. disconnection)", group, new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000 } ); if (_dbIntroduction == null) diff --git a/router/java/src/net/i2p/router/peermanager/PersistProfilesJob.java b/router/java/src/net/i2p/router/peermanager/PersistProfilesJob.java index 8d9fa3bae..e900d612c 100644 --- a/router/java/src/net/i2p/router/peermanager/PersistProfilesJob.java +++ b/router/java/src/net/i2p/router/peermanager/PersistProfilesJob.java @@ -46,7 +46,7 @@ class PersistProfilesJob extends JobImpl { PersistProfilesJob.this.getContext().jobQueue().addJob(PersistProfilesJob.this); } else { // we've got peers left to persist, so requeue the persist profile job - PersistProfilesJob.this.getContext().jobQueue().addJob(PersistProfileJob.this); + PersistProfilesJob.PersistProfileJob.this.requeue(1000); } } public String getName() { return "Persist profile"; } diff --git a/router/java/src/net/i2p/router/peermanager/ProfileManagerImpl.java b/router/java/src/net/i2p/router/peermanager/ProfileManagerImpl.java index d7fb12500..474588fd3 100644 --- a/router/java/src/net/i2p/router/peermanager/ProfileManagerImpl.java +++ b/router/java/src/net/i2p/router/peermanager/ProfileManagerImpl.java @@ -111,6 +111,7 @@ public class ProfileManagerImpl implements ProfileManager { public void tunnelTestSucceeded(Hash peer, long responseTimeMs) { PeerProfile data = getProfile(peer); if (data == null) return; + data.updateTunnelTestTimeAverage(responseTimeMs); data.getTunnelTestResponseTime().addData(responseTimeMs, responseTimeMs); if (responseTimeMs > getSlowThreshold()) data.getTunnelTestResponseTimeSlow().addData(responseTimeMs, responseTimeMs); diff --git a/router/java/src/net/i2p/router/peermanager/ProfilePersistenceHelper.java b/router/java/src/net/i2p/router/peermanager/ProfilePersistenceHelper.java index c384caab3..18d5e3c36 100644 --- a/router/java/src/net/i2p/router/peermanager/ProfilePersistenceHelper.java +++ b/router/java/src/net/i2p/router/peermanager/ProfilePersistenceHelper.java @@ -109,6 +109,8 @@ class ProfilePersistenceHelper { buf.append("lastFailedSend=").append(profile.getLastSendFailed()).append(NL); buf.append("# Last heard from: when did we last get a message from the peer? (milliseconds from the epoch)").append(NL); buf.append("lastHeardFrom=").append(profile.getLastHeardFrom()).append(NL); + buf.append("# moving average as to how fast the peer replies").append(NL); + buf.append("tunnelTestTimeAverage=").append(profile.getTunnelTestTimeAverage()).append(NL); buf.append(NL); out.write(buf.toString().getBytes()); @@ -178,6 +180,7 @@ class ProfilePersistenceHelper { profile.setLastSendSuccessful(getLong(props, "lastSentToSuccessfully")); profile.setLastSendFailed(getLong(props, "lastFailedSend")); profile.setLastHeardFrom(getLong(props, "lastHeardFrom")); + profile.setTunnelTestTimeAverage(getDouble(props, "tunnelTestTimeAverage")); profile.getTunnelHistory().load(props); profile.getDBHistory().load(props); @@ -214,6 +217,18 @@ class ProfilePersistenceHelper { } return 0; } + + private final static double getDouble(Properties props, String key) { + String val = props.getProperty(key); + if (val != null) { + try { + return Double.parseDouble(val); + } catch (NumberFormatException nfe) { + return 0.0; + } + } + return 0.0; + } private void loadProps(Properties props, File file) { try { diff --git a/router/java/src/net/i2p/router/peermanager/SpeedCalculator.java b/router/java/src/net/i2p/router/peermanager/SpeedCalculator.java index 261cab163..ad78a4ed7 100644 --- a/router/java/src/net/i2p/router/peermanager/SpeedCalculator.java +++ b/router/java/src/net/i2p/router/peermanager/SpeedCalculator.java @@ -38,6 +38,7 @@ public class SpeedCalculator extends Calculator { } public double calc(PeerProfile profile) { + if (true) return calcAverage(profile); long threshold = getEventThreshold(); boolean tunnelTestOnly = getUseTunnelTestOnly(); @@ -109,16 +110,24 @@ public class SpeedCalculator extends Calculator { return rv; } + private double calcAverage(PeerProfile profile) { + double avg = profile.getTunnelTestTimeAverage(); + if (avg == 0) + return 0.0; + else + return (60.0*1000.0) / avg; + } + private double adjust(long period, double value) { switch ((int)period) { case 10*60*1000: return value; case 60*60*1000: - return value * 0.5; + return value * 0.75; case 24*60*60*1000: - return value * 0.001; + return value * 0.1; default: - return value * 0.0001; + return value * 0.01; } } diff --git a/router/java/src/net/i2p/router/transport/tcp/ConnectionBuilder.java b/router/java/src/net/i2p/router/transport/tcp/ConnectionBuilder.java index af06802c1..a9ad1c675 100644 --- a/router/java/src/net/i2p/router/transport/tcp/ConnectionBuilder.java +++ b/router/java/src/net/i2p/router/transport/tcp/ConnectionBuilder.java @@ -76,7 +76,7 @@ public class ConnectionBuilder { private String _error; /** If the connection hasn't been built in 30 seconds, give up */ - public static final int CONNECTION_TIMEOUT = 30*1000; + public static final int CONNECTION_TIMEOUT = 20*1000; public static final int WRITE_BUFFER_SIZE = 2*1024; diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java b/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java index deac4229a..9f5217b21 100644 --- a/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java +++ b/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java @@ -196,6 +196,7 @@ public class TCPTransport extends TransportImpl { newPeer = true; } msgs.add(msg); + msg.timestamp("TCPTransport.outboundMessageReady queued behind " +(msgs.size()-1)); if (newPeer) _connectionLock.notifyAll(); diff --git a/router/java/src/net/i2p/router/tunnel/pool/TestJob.java b/router/java/src/net/i2p/router/tunnel/pool/TestJob.java index b7d10e61b..29c94a33b 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/TestJob.java +++ b/router/java/src/net/i2p/router/tunnel/pool/TestJob.java @@ -24,6 +24,8 @@ class TestJob extends JobImpl { private TunnelPool _pool; private PooledTunnelCreatorConfig _cfg; private boolean _found; + private TunnelInfo _outTunnel; + private TunnelInfo _replyTunnel; /** base to randomize the test delay on */ private static final int TEST_DELAY = 60*1000; @@ -50,19 +52,19 @@ class TestJob extends JobImpl { _found = false; // note: testing with exploratory tunnels always, even if the tested tunnel // is a client tunnel (per _cfg.getDestination()) - TunnelInfo replyTunnel = null; - TunnelInfo outTunnel = null; + _replyTunnel = null; + _outTunnel = null; if (_cfg.isInbound()) { - replyTunnel = _cfg; - outTunnel = getContext().tunnelManager().selectOutboundTunnel(); + _replyTunnel = _cfg; + _outTunnel = getContext().tunnelManager().selectOutboundTunnel(); } else { - replyTunnel = getContext().tunnelManager().selectInboundTunnel(); - outTunnel = _cfg; + _replyTunnel = getContext().tunnelManager().selectInboundTunnel(); + _outTunnel = _cfg; } - if ( (replyTunnel == null) || (outTunnel == null) ) { + if ( (_replyTunnel == null) || (_outTunnel == null) ) { if (_log.shouldLog(Log.ERROR)) - _log.error("Insufficient tunnels to test " + _cfg + " with: " + replyTunnel + " / " + outTunnel); + _log.error("Insufficient tunnels to test " + _cfg + " with: " + _replyTunnel + " / " + _outTunnel); getContext().statManager().addRateData("tunnel.testAborted", _cfg.getLength(), 0); scheduleRetest(); } else { @@ -77,15 +79,15 @@ class TestJob extends JobImpl { OnTestReply onReply = new OnTestReply(getContext()); OnTestTimeout onTimeout = new OnTestTimeout(getContext()); getContext().messageRegistry().registerPending(sel, onReply, onTimeout, 3*testPeriod); - sendTest(m, outTunnel, replyTunnel); + sendTest(m); } } - private void sendTest(I2NPMessage m, TunnelInfo outTunnel, TunnelInfo replyTunnel) { + private void sendTest(I2NPMessage m) { if (false) { - getContext().tunnelDispatcher().dispatchOutbound(m, outTunnel.getSendTunnelId(0), - replyTunnel.getReceiveTunnelId(0), - replyTunnel.getPeer(0)); + getContext().tunnelDispatcher().dispatchOutbound(m, _outTunnel.getSendTunnelId(0), + _replyTunnel.getReceiveTunnelId(0), + _replyTunnel.getPeer(0)); } else { // garlic route that DeliveryStatusMessage to ourselves so the endpoints and gateways // can't tell its a test. to simplify this, we encrypt it with a random key and tag, @@ -116,20 +118,35 @@ class TestJob extends JobImpl { getContext().sessionKeyManager().tagsReceived(encryptKey, encryptTags); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Sending garlic test of " + outTunnel + " / " + replyTunnel); - getContext().tunnelDispatcher().dispatchOutbound(msg, outTunnel.getSendTunnelId(0), - replyTunnel.getReceiveTunnelId(0), - replyTunnel.getPeer(0)); + _log.debug("Sending garlic test of " + _outTunnel + " / " + _replyTunnel); + getContext().tunnelDispatcher().dispatchOutbound(msg, _outTunnel.getSendTunnelId(0), + _replyTunnel.getReceiveTunnelId(0), + _replyTunnel.getPeer(0)); } } public void testSuccessful(int ms) { getContext().statManager().addRateData("tunnel.testSuccessLength", _cfg.getLength(), 0); getContext().statManager().addRateData("tunnel.testSuccessTime", ms, 0); + + noteSuccess(ms, _outTunnel); + noteSuccess(ms, _replyTunnel); + scheduleRetest(); } + private void noteSuccess(long ms, TunnelInfo tunnel) { + if (tunnel != null) + for (int i = 0; i < tunnel.getLength(); i++) + getContext().profileManager().tunnelTestSucceeded(tunnel.getPeer(i), ms); + } + private void testFailed(long timeToFail) { + if (_found) { + // ok, not really a /success/, but we did find it, even though slowly + noteSuccess(timeToFail, _outTunnel); + noteSuccess(timeToFail, _replyTunnel); + } if (_pool.getSettings().isExploratory()) getContext().statManager().addRateData("tunnel.testExploratoryFailedTime", timeToFail, timeToFail); else @@ -144,6 +161,8 @@ class TestJob extends JobImpl { /** how long we allow tests to run for before failing them */ private int getTestPeriod() { return 20*1000; } private void scheduleRetest() { + _outTunnel = null; + _replyTunnel = null; int delay = getDelay(); if (_cfg.getExpiration() > getContext().clock().now() + delay) requeue(delay);