diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java
index f5ea487e7..e8eb3eed4 100644
--- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java
@@ -96,7 +96,7 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer {
_log.error("Error while closing the received i2p con", ex);
}
} catch (IOException ex) {
- _log.error("Error while waiting for I2PConnections", ex);
+ _log.error("Error while handling for I2PConnections", ex);
}
long afterHandle = I2PAppContext.getGlobalContext().clock().now();
diff --git a/apps/i2ptunnel/jsp/editClient.jsp b/apps/i2ptunnel/jsp/editClient.jsp
index 12a3d6aa8..31516add1 100644
--- a/apps/i2ptunnel/jsp/editClient.jsp
+++ b/apps/i2ptunnel/jsp/editClient.jsp
@@ -93,7 +93,7 @@ if (curTunnel >= 0) {
others:
-
+
<% } else if ("0.0.0.0".equals(clientInterface)) { %>
@@ -102,7 +102,7 @@ if (curTunnel >= 0) {
others:
-
+
<% } else { %>
diff --git a/history.txt b/history.txt
index 476f8c0c2..f1a4144e4 100644
--- a/history.txt
+++ b/history.txt
@@ -1,4 +1,14 @@
-$Id: history.txt,v 1.198 2005/04/24 13:42:05 jrandom Exp $
+$Id: history.txt,v 1.199 2005/04/25 21:59:23 smeghead Exp $
+
+2005-04-28 jrandom
+ * More fixes for the I2PTunnel "other" interface handling (thanks nelgin!)
+ * Add back the code to handle bids from multiple transports (though there
+ is still only one transport enabled by default)
+ * Adjust the router's queueing of outbound client messages when under
+ heavy load by running the preparatory job in the client's I2CP handler
+ thread, thereby blocking additional outbound messages when the router is
+ hosed.
+ * No need to validate or persist a netDb entry if we already have it
2005-04-25 smeghead
* Added button to router console for manual update checks.
diff --git a/news.xml b/news.xml
index cdb2b7ef0..5bd3f823c 100644
--- a/news.xml
+++ b/news.xml
@@ -1,15 +1,18 @@
-
+
+ anonannouncement="http://i2p/NF2RLVUxVulR3IqK0sGJR0dHQcGXAzwa6rEO4WAWYXOHw-DoZhKnlbf1nzHXwMEJoex5nFTyiNMqxJMWlY54cvU~UenZdkyQQeUSBZXyuSweflUXFqKN-y8xIoK2w9Ylq1k8IcrAFDsITyOzjUKoOPfVq34rKNDo7fYyis4kT5bAHy~2N1EVMs34pi2RFabATIOBk38Qhab57Umpa6yEoE~rbyR~suDRvD7gjBvBiIKFqhFueXsR2uSrPB-yzwAGofTXuklofK3DdKspciclTVzqbDjsk5UXfu2nTrC1agkhLyqlOfjhyqC~t1IXm-Vs2o7911k7KKLGjB4lmH508YJ7G9fLAUyjuB-wwwhejoWqvg7oWvqo4oIok8LG6ECR71C3dzCvIjY2QcrhoaazA9G4zcGMm6NKND-H4XY6tUWhpB~5GefB3YczOqMbHq4wi0O9MzBFrOJEOs3X4hwboKWANf7DT5PZKJZ5KorQPsYRSq0E3wSOsFCSsdVCKUGsAAAA/pipermail/i2p/2005-April/000709.html"
+ publicannouncement="http://dev.i2p.net/pipermail/i2p/2005-April/000709.html" />
-0.5.0.7 release available! It is backwards compatible, but should
-help eepsite authors keep their sites reachable, so give it a whirl!
+No news to report, beyond the usual
+status notes
+(non anon)
+and meeting logs
+(non anon)
diff --git a/router/java/src/net/i2p/router/ClientMessagePool.java b/router/java/src/net/i2p/router/ClientMessagePool.java
index 8f680bf84..e2e67dc99 100644
--- a/router/java/src/net/i2p/router/ClientMessagePool.java
+++ b/router/java/src/net/i2p/router/ClientMessagePool.java
@@ -58,7 +58,11 @@ public class ClientMessagePool {
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Adding message for remote delivery");
- _context.jobQueue().addJob(new OutboundClientMessageOneShotJob(_context, msg));
+ OutboundClientMessageOneShotJob j = new OutboundClientMessageOneShotJob(_context, msg);
+ if (true) // blocks the I2CP reader for a nontrivial period of time
+ j.runJob();
+ else
+ _context.jobQueue().addJob(j);
}
}
diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java
index bdf023fde..6d7b27da3 100644
--- a/router/java/src/net/i2p/router/RouterVersion.java
+++ b/router/java/src/net/i2p/router/RouterVersion.java
@@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
- public final static String ID = "$Revision: 1.189 $ $Date: 2005/04/20 15:14:19 $";
+ public final static String ID = "$Revision: 1.190 $ $Date: 2005/04/24 13:42:04 $";
public final static String VERSION = "0.5.0.7";
- public final static long BUILD = 1;
+ public final static long BUILD = 2;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);
diff --git a/router/java/src/net/i2p/router/client/ClientMessageEventListener.java b/router/java/src/net/i2p/router/client/ClientMessageEventListener.java
index 8807dc1a3..a0c8c10a9 100644
--- a/router/java/src/net/i2p/router/client/ClientMessageEventListener.java
+++ b/router/java/src/net/i2p/router/client/ClientMessageEventListener.java
@@ -42,6 +42,7 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
_context = context;
_log = _context.logManager().getLog(ClientMessageEventListener.class);
_runner = runner;
+ _context.statManager().createRateStat("client.distributeTime", "How long it took to inject the client message into the router", "ClientMessages", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
}
/**
@@ -162,6 +163,7 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
MessageId id = _runner.distributeMessage(message);
long timeToDistribute = _context.clock().now() - beforeDistribute;
_runner.ackSendMessage(id, message.getNonce());
+ _context.statManager().addRateData("client.distributeTime", timeToDistribute, timeToDistribute);
if ( (timeToDistribute > 50) && (_log.shouldLog(Log.WARN)) )
_log.warn("Took too long to distribute the message (which holds up the ack): " + timeToDistribute);
}
diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java b/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java
index 17cd2da04..b00c391b6 100644
--- a/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java
+++ b/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java
@@ -545,13 +545,17 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
public LeaseSet store(Hash key, LeaseSet leaseSet) throws IllegalArgumentException {
if (!_initialized) return null;
+ LeaseSet rv = (LeaseSet)_ds.get(key);
+
+ if ( (rv != null) && (rv.equals(leaseSet)) ) {
+ // if it hasn't changed, no need to do anything
+ return rv;
+ }
+
String err = validate(key, leaseSet);
if (err != null)
throw new IllegalArgumentException("Invalid store attempt - " + err);
- LeaseSet rv = null;
- if (_ds.isKnown(key))
- rv = (LeaseSet)_ds.get(key);
_ds.put(key, leaseSet);
synchronized (_lastSent) {
if (!_lastSent.containsKey(key))
@@ -629,14 +633,17 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
public RouterInfo store(Hash key, RouterInfo routerInfo) throws IllegalArgumentException {
if (!_initialized) return null;
+ RouterInfo rv = (RouterInfo)_ds.get(key);
+
+ if ( (rv != null) && (rv.equals(routerInfo)) ) {
+ // no need to validate
+ return rv;
+ }
+
String err = validate(key, routerInfo);
if (err != null)
throw new IllegalArgumentException("Invalid store attempt - " + err);
- RouterInfo rv = null;
- if (_ds.isKnown(key))
- rv = (RouterInfo)_ds.get(key);
-
if (_log.shouldLog(Log.INFO))
_log.info("RouterInfo " + key.toBase64() + " is stored with "
+ routerInfo.getOptions().size() + " options on "
@@ -648,7 +655,8 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
if (!_lastSent.containsKey(key))
_lastSent.put(key, new Long(0));
}
- _kb.add(key);
+ if (rv == null)
+ _kb.add(key);
return rv;
}
diff --git a/router/java/src/net/i2p/router/transport/GetBidsJob.java b/router/java/src/net/i2p/router/transport/GetBidsJob.java
index b1f572755..659099f53 100644
--- a/router/java/src/net/i2p/router/transport/GetBidsJob.java
+++ b/router/java/src/net/i2p/router/transport/GetBidsJob.java
@@ -8,6 +8,7 @@ package net.i2p.router.transport;
*
*/
+import java.util.List;
import net.i2p.data.Hash;
import net.i2p.router.JobImpl;
import net.i2p.router.MessageSelector;
@@ -58,18 +59,28 @@ public class GetBidsJob extends JobImpl {
return;
}
- TransportBid bid = facade.getBid(msg);
- if (bid == null) {
- // only shitlist them if we couldnt even try a single transport
- if (msg.getFailedTransports().size() <= 0) {
- if (log.shouldLog(Log.WARN))
- log.warn("No bids available for the message " + msg);
- context.shitlist().shitlistRouter(to, "No bids");
- context.netDb().fail(to);
- }
+ List bids = facade.getBids(msg);
+
+ if ( (bids == null) || (bids.size() <= 0) ) {
+ context.shitlist().shitlistRouter(to, "No bids after " + (bids != null ? bids.size() + " tries" : "0 tries"));
+ context.netDb().fail(to);
fail(context, msg);
} else {
- bid.getTransport().send(msg);
+ int lowestCost = -1;
+ TransportBid winner = null;
+ for (int i = 0; i < bids.size(); i++) {
+ TransportBid bid = (TransportBid)bids.get(i);
+ if ( (lowestCost < 0) || (bid.getLatencyMs() < lowestCost) ) {
+ winner = bid;
+ lowestCost = bid.getLatencyMs();
+ }
+ }
+ if (winner != null) {
+ if (log.shouldLog(Log.INFO))
+ log.info("Winning bid: " + winner + " out of " + bids);
+
+ winner.getTransport().send(msg);
+ }
}
}
diff --git a/router/java/src/net/i2p/router/transport/TransportManager.java b/router/java/src/net/i2p/router/transport/TransportManager.java
index eea745ae2..744aaf60c 100644
--- a/router/java/src/net/i2p/router/transport/TransportManager.java
+++ b/router/java/src/net/i2p/router/transport/TransportManager.java
@@ -30,7 +30,7 @@ public class TransportManager implements TransportEventListener {
private RouterContext _context;
private final static String PROP_DISABLE_TCP = "i2np.tcp.disable";
- private static final boolean ENABLE_UDP = false;
+ private final static String PROP_ENABLE_UDP = "i2np.udp.enable";
public TransportManager(RouterContext context) {
_context = context;
@@ -59,7 +59,8 @@ public class TransportManager implements TransportEventListener {
t.setListener(this);
_transports.add(t);
}
- if (ENABLE_UDP) {
+ String enableUDP = _context.router().getConfigSetting(PROP_ENABLE_UDP);
+ if ( (enableUDP != null) && (Boolean.valueOf(enableUDP).booleanValue())) {
UDPTransport udp = new UDPTransport(_context);
udp.setListener(this);
_transports.add(udp);
@@ -75,6 +76,7 @@ public class TransportManager implements TransportEventListener {
_log.debug("Transport " + i + " (" + t.getStyle() + ") started");
}
_log.debug("Done start listening on transports");
+ _context.router().rebuildRouterInfo();
}
public void restart() {
@@ -116,17 +118,20 @@ public class TransportManager implements TransportEventListener {
return rv;
}
- public List getBids(OutNetMessage msg) {
- List rv = new ArrayList(1);
- rv.add(getBid(msg));
- return rv;
- }
public TransportBid getBid(OutNetMessage msg) {
+ List bids = getBids(msg);
+ if ( (bids == null) || (bids.size() <= 0) )
+ return null;
+ else
+ return (TransportBid)bids.get(0);
+ }
+ public List getBids(OutNetMessage msg) {
if (msg == null)
throw new IllegalArgumentException("Null message? no bidding on a null outNetMessage!");
if (_context.router().getRouterInfo().equals(msg.getTarget()))
throw new IllegalArgumentException("WTF, bids for a message bound to ourselves?");
+ List rv = new ArrayList(_transports.size());
Set failedTransports = msg.getFailedTransports();
for (int i = 0; i < _transports.size(); i++) {
Transport t = (Transport)_transports.get(i);
@@ -141,13 +146,13 @@ public class TransportManager implements TransportEventListener {
if (bid != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Transport " + t.getStyle() + " bid: " + bid);
- return bid;
+ rv.add(bid);
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Transport " + t.getStyle() + " did not produce a bid");
}
}
- return null;
+ return rv;
}
public void messageReceived(I2NPMessage message, RouterIdentity fromRouter, Hash fromRouterHash) {
diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java b/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java
index 289312c92..553a2b28e 100644
--- a/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java
+++ b/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java
@@ -137,16 +137,18 @@ public class TCPTransport extends TransportImpl {
public TransportBid bid(RouterInfo toAddress, long dataSize) {
RouterAddress addr = toAddress.getTargetAddress(STYLE);
- if (addr == null)
- return null;
if ( (_myAddress != null) && (_myAddress.equals(addr)) )
return null; // dont talk to yourself
- if (getIsConnected(toAddress.getIdentity()))
+ if (getIsConnected(toAddress.getIdentity())) {
return _fastBid;
- else
+ } else {
+ if (addr == null)
+ return null;
+
return _slowBid;
+ }
}
private boolean getIsConnected(RouterIdentity ident) {
@@ -381,7 +383,10 @@ public class TCPTransport extends TransportImpl {
configureLocalAddress();
_listener.startListening();
if (_myAddress != null) {
- return _myAddress.toRouterAddress();
+ RouterAddress rv = _myAddress.toRouterAddress();
+ if (rv != null)
+ replaceAddress(rv);
+ return rv;
} else {
return null;
}
@@ -826,5 +831,6 @@ public class TCPTransport extends TransportImpl {
public SharedBid(int ms) { _ms = ms; }
public int getLatency() { return _ms; }
public Transport getTransport() { return TCPTransport.this; }
+ public String toString() { return "TCP bid @ " + _ms; }
}
}
diff --git a/router/java/src/net/i2p/router/transport/udp/ACKSender.java b/router/java/src/net/i2p/router/transport/udp/ACKSender.java
index fd7b380a8..7b344b496 100644
--- a/router/java/src/net/i2p/router/transport/udp/ACKSender.java
+++ b/router/java/src/net/i2p/router/transport/udp/ACKSender.java
@@ -1,41 +1,104 @@
package net.i2p.router.transport.udp;
+import java.util.ArrayList;
import java.util.List;
import net.i2p.router.RouterContext;
+import net.i2p.util.I2PThread;
import net.i2p.util.Log;
/**
- * Blocking thread that pulls peers off the inboundFragment pool and
- * sends them any outstanding ACKs. The logic of what peers get ACKed when
- * is determined by the {@link InboundMessageFragments#getNextPeerToACK }
+ * Blocking thread that is given peers by the inboundFragment pool, sending out
+ * any outstanding ACKs.
*
*/
public class ACKSender implements Runnable {
private RouterContext _context;
private Log _log;
- private InboundMessageFragments _fragments;
private UDPTransport _transport;
private PacketBuilder _builder;
+ /** list of peers (PeerState) who we have received data from but not yet ACKed to */
+ private List _peersToACK;
+ private boolean _alive;
- public ACKSender(RouterContext ctx, InboundMessageFragments fragments, UDPTransport transport) {
+ /** how frequently do we want to send ACKs to a peer? */
+ private static final int ACK_FREQUENCY = 400;
+
+ public ACKSender(RouterContext ctx, UDPTransport transport) {
_context = ctx;
_log = ctx.logManager().getLog(ACKSender.class);
- _fragments = fragments;
_transport = transport;
+ _peersToACK = new ArrayList(4);
_builder = new PacketBuilder(_context);
+ _alive = true;
_context.statManager().createRateStat("udp.sendACKCount", "how many ack messages were sent to a peer", "udp", new long[] { 60*1000, 60*60*1000 });
+ _context.statManager().createRateStat("udp.ackFrequency", "how long ago did we send an ACK to this peer?", "udp", new long[] { 60*1000, 60*60*1000 });
+ _context.statManager().createRateStat("udp.sendACKRemaining", "when we ack a peer, how many peers are left waiting to ack?", "udp", new long[] { 60*1000, 60*60*1000 });
+ }
+
+ public void ackPeer(PeerState peer) {
+ synchronized (_peersToACK) {
+ if (!_peersToACK.contains(peer))
+ _peersToACK.add(peer);
+ _peersToACK.notifyAll();
+ }
+ }
+
+ public void startup() {
+ _alive = true;
+ I2PThread t = new I2PThread(this, "UDP ACK sender");
+ t.setDaemon(true);
+ t.start();
+ }
+
+ public void shutdown() {
+ _alive = false;
+ synchronized (_peersToACK) {
+ _peersToACK.clear();
+ _peersToACK.notifyAll();
+ }
}
public void run() {
- while (_fragments.isAlive()) {
- PeerState peer = _fragments.getNextPeerToACK();
+ while (_alive) {
+ PeerState peer = null;
+ long now = _context.clock().now();
+ long remaining = -1;
+ try {
+ synchronized (_peersToACK) {
+ for (int i = 0; i < _peersToACK.size(); i++) {
+ PeerState cur = (PeerState)_peersToACK.get(i);
+ long delta = cur.getWantedACKSendSince() + ACK_FREQUENCY - now;
+ if ( (delta < 0) || (cur.unsentACKThresholdReached()) ) {
+ _peersToACK.remove(i);
+ peer = cur;
+ break;
+ }
+ }
+
+ if (peer == null) {
+ if (_peersToACK.size() <= 0)
+ _peersToACK.wait();
+ else
+ _peersToACK.wait(50);
+ } else {
+ remaining = _peersToACK.size();
+ }
+ }
+ } catch (InterruptedException ie) {}
+
if (peer != null) {
+ long lastSend = peer.getLastACKSend();
+ long wanted = peer.getWantedACKSendSince();
List acks = peer.retrieveACKs();
if ( (acks != null) && (acks.size() > 0) ) {
_context.statManager().addRateData("udp.sendACKCount", acks.size(), 0);
+ _context.statManager().addRateData("udp.sendACKRemaining", remaining, 0);
+ now = _context.clock().now();
+ _context.statManager().addRateData("udp.ackFrequency", now-lastSend, now-wanted);
_context.statManager().getStatLog().addData(peer.getRemoteHostString(), "udp.peer.sendACKCount", acks.size(), 0);
UDPPacket ack = _builder.buildACK(peer, acks);
+ ack.markType(1);
if (_log.shouldLog(Log.INFO))
_log.info("Sending ACK for " + acks);
_transport.send(ack);
diff --git a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java
index 72d6349eb..cbb75c91a 100644
--- a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java
+++ b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java
@@ -11,13 +11,16 @@ import net.i2p.util.I2PThread;
import net.i2p.util.Log;
/**
- * Organize the received data message fragments, allowing its
- * {@link MessageReceiver} to pull off completed messages and its
- * {@link ACKSender} to pull off peers who need to receive an ACK for
- * these messages. In addition, it drops failed fragments and keeps a
+ * Organize the received data message fragments, feeding completed messages
+ * to the {@link MessageReceiver} and telling the {@link ACKSender} of new
+ * peers to ACK. In addition, it drops failed fragments and keeps a
* minimal list of the most recently completed messages (even though higher
* up in the router we have full blown replay detection, its nice to have a
- * basic line of defense here)
+ * basic line of defense here).
+ *
+ * TODO: add in some sensible code to drop expired fragments from peers we
+ * don't hear from again (either a periodic culling for expired peers, or
+ * a scheduled event)
*
*/
public class InboundMessageFragments {
@@ -25,21 +28,15 @@ public class InboundMessageFragments {
private Log _log;
/** Map of peer (Hash) to a Map of messageId (Long) to InboundMessageState objects */
private Map _inboundMessages;
- /** list of peers (PeerState) who we have received data from but not yet ACKed to */
- private List _unsentACKs;
- /** list of messages (InboundMessageState) fully received but not interpreted yet */
- private List _completeMessages;
/** list of message IDs recently received, so we can ignore in flight dups */
private DecayingBloomFilter _recentlyCompletedMessages;
private OutboundMessageFragments _outbound;
private UDPTransport _transport;
- /** this can be broken down further, but to start, OneBigLock does the trick */
- private Object _stateLock;
+ private ACKSender _ackSender;
+ private MessageReceiver _messageReceiver;
private boolean _alive;
private static final int RECENTLY_COMPLETED_SIZE = 100;
- /** how frequently do we want to send ACKs to a peer? */
- private static final int ACK_FREQUENCY = 200;
/** decay the recently completed every 2 minutes */
private static final int DECAY_PERIOD = 120*1000;
@@ -47,17 +44,16 @@ public class InboundMessageFragments {
_context = ctx;
_log = ctx.logManager().getLog(InboundMessageFragments.class);
_inboundMessages = new HashMap(64);
- _unsentACKs = new ArrayList(64);
- _completeMessages = new ArrayList(64);
_outbound = outbound;
_transport = transport;
+ _ackSender = new ACKSender(_context, _transport);
+ _messageReceiver = new MessageReceiver(_context, _transport);
_context.statManager().createRateStat("udp.receivedCompleteTime", "How long it takes to receive a full message", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.receivedCompleteFragments", "How many fragments go in a fully received message", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.receivedACKs", "How many messages were ACKed at a time", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.ignoreRecentDuplicate", "Take note that we received a packet for a recently completed message", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.receiveMessagePeriod", "How long it takes to pull the message fragments out of a packet", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.receiveACKPeriod", "How long it takes to pull the ACKs out of a packet", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
- _stateLock = this;
}
public void startup() {
@@ -66,25 +62,18 @@ public class InboundMessageFragments {
// array size (currently its tuned for 10 minute rates for the
// messageValidator)
_recentlyCompletedMessages = new DecayingBloomFilter(_context, DECAY_PERIOD, 8);
-
- I2PThread t = new I2PThread(new ACKSender(_context, this, _transport), "UDP ACK sender");
- t.setDaemon(true);
- t.start();
-
- t = new I2PThread(new MessageReceiver(_context, this, _transport), "UDP message receiver");
- t.setDaemon(true);
- t.start();
+ _ackSender.startup();
+ _messageReceiver.startup();
}
public void shutdown() {
_alive = false;
if (_recentlyCompletedMessages != null)
_recentlyCompletedMessages.stopDecaying();
_recentlyCompletedMessages = null;
- synchronized (_stateLock) {
- _completeMessages.clear();
- _unsentACKs.clear();
+ _ackSender.shutdown();
+ _messageReceiver.shutdown();
+ synchronized (_inboundMessages) {
_inboundMessages.clear();
- _stateLock.notifyAll();
}
}
public boolean isAlive() { return _alive; }
@@ -112,7 +101,7 @@ public class InboundMessageFragments {
private void receiveMessages(PeerState from, UDPPacketReader.DataReader data) {
int fragments = data.readFragmentCount();
if (fragments <= 0) return;
- synchronized (_stateLock) {
+ synchronized (_inboundMessages) {
Map messages = (Map)_inboundMessages.get(from.getRemotePeer());
if (messages == null) {
messages = new HashMap(fragments);
@@ -125,8 +114,7 @@ public class InboundMessageFragments {
if (_recentlyCompletedMessages.isKnown(messageId.longValue())) {
_context.statManager().addRateData("udp.ignoreRecentDuplicate", 1, 0);
from.messageFullyReceived(messageId);
- if (!_unsentACKs.contains(from))
- _unsentACKs.add(from);
+ _ackSender.ackPeer(from);
if (_log.shouldLog(Log.WARN))
_log.warn("Message received is a dup: " + messageId + " dups: "
+ _recentlyCompletedMessages.getCurrentDuplicateCount() + " out of "
@@ -148,14 +136,15 @@ public class InboundMessageFragments {
if (state.isComplete()) {
messageComplete = true;
messages.remove(messageId);
+ if (messages.size() <= 0)
+ _inboundMessages.remove(from.getRemotePeer());
_recentlyCompletedMessages.add(messageId.longValue());
- _completeMessages.add(state);
+ _messageReceiver.receiveMessage(state);
from.messageFullyReceived(messageId);
- if (!_unsentACKs.contains(from))
- _unsentACKs.add(from);
+ _ackSender.ackPeer(from);
if (_log.shouldLog(Log.INFO))
_log.info("Message received completely! " + state);
@@ -165,6 +154,8 @@ public class InboundMessageFragments {
} else if (state.isExpired()) {
messageExpired = true;
messages.remove(messageId);
+ if (messages.size() <= 0)
+ _inboundMessages.remove(from.getRemotePeer());
if (_log.shouldLog(Log.WARN))
_log.warn("Message expired while only being partially read: " + state);
state.releaseResources();
@@ -173,8 +164,6 @@ public class InboundMessageFragments {
if (!fragmentOK)
break;
}
-
- _stateLock.notifyAll();
}
}
@@ -200,50 +189,4 @@ public class InboundMessageFragments {
else
from.dataReceived();
}
-
- /**
- * Blocking call to pull off the next fully received message
- *
- */
- public InboundMessageState receiveNextMessage() {
- while (_alive) {
- try {
- synchronized (_stateLock) {
- if (_completeMessages.size() > 0)
- return (InboundMessageState)_completeMessages.remove(0);
- _stateLock.wait();
- }
- } catch (InterruptedException ie) {}
- }
- return null;
- }
-
- /**
- * Pull off the peer who we next want to send ACKs/NACKs to.
- * This call blocks, and only returns null on shutdown.
- *
- */
- public PeerState getNextPeerToACK() {
- while (_alive) {
- try {
- long now = _context.clock().now();
- synchronized (_stateLock) {
- for (int i = 0; i < _unsentACKs.size(); i++) {
- PeerState peer = (PeerState)_unsentACKs.get(i);
- if ( (peer.getLastACKSend() + ACK_FREQUENCY <= now) ||
- (peer.unsentACKThresholdReached()) ) {
- _unsentACKs.remove(i);
- peer.setLastACKSend(now);
- return peer;
- }
- }
- if (_unsentACKs.size() > 0)
- _stateLock.wait(_context.random().nextInt(100));
- else
- _stateLock.wait();
- }
- } catch (InterruptedException ie) {}
- }
- return null;
- }
}
diff --git a/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java
index 7167a9978..a03bf47eb 100644
--- a/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java
+++ b/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java
@@ -96,7 +96,7 @@ public class InboundMessageState {
if (_fragments != null)
for (int i = 0; i < _fragments.length; i++)
_fragmentCache.release(_fragments[i]);
- _fragments = null;
+ //_fragments = null;
}
public ByteArray[] getFragments() {
@@ -107,10 +107,11 @@ public class InboundMessageState {
public String toString() {
StringBuffer buf = new StringBuffer(32);
buf.append("Message: ").append(_messageId);
- if (isComplete()) {
- buf.append(" completely received with ");
- buf.append(getCompleteSize()).append(" bytes");
- }
+ //if (isComplete()) {
+ // buf.append(" completely received with ");
+ // buf.append(getCompleteSize()).append(" bytes");
+ //}
+ buf.append(" lifetime: ").append(getLifetime());
return buf.toString();
}
}
diff --git a/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java b/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java
index 8a709f5a1..36c85c7aa 100644
--- a/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java
+++ b/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java
@@ -1,5 +1,8 @@
package net.i2p.router.transport.udp;
+import java.util.ArrayList;
+import java.util.List;
+
import net.i2p.data.Base64;
import net.i2p.data.ByteArray;
import net.i2p.data.DataFormatException;
@@ -7,6 +10,7 @@ import net.i2p.data.i2np.I2NPMessage;
import net.i2p.data.i2np.I2NPMessageImpl;
import net.i2p.data.i2np.I2NPMessageException;
import net.i2p.router.RouterContext;
+import net.i2p.util.I2PThread;
import net.i2p.util.Log;
/**
@@ -17,28 +21,62 @@ import net.i2p.util.Log;
public class MessageReceiver implements Runnable {
private RouterContext _context;
private Log _log;
- private InboundMessageFragments _fragments;
private UDPTransport _transport;
+ /** list of messages (InboundMessageState) fully received but not interpreted yet */
+ private List _completeMessages;
+ private boolean _alive;
- public MessageReceiver(RouterContext ctx, InboundMessageFragments frag, UDPTransport transport) {
+ public MessageReceiver(RouterContext ctx, UDPTransport transport) {
_context = ctx;
_log = ctx.logManager().getLog(MessageReceiver.class);
- _fragments = frag;
_transport = transport;
+ _completeMessages = new ArrayList(16);
+ _alive = true;
}
+ public void startup() {
+ _alive = true;
+ I2PThread t = new I2PThread(this, "UDP message receiver");
+ t.setDaemon(true);
+ t.start();
+ }
+ public void shutdown() {
+ _alive = false;
+ synchronized (_completeMessages) {
+ _completeMessages.clear();
+ _completeMessages.notifyAll();
+ }
+ }
+
+ public void receiveMessage(InboundMessageState state) {
+ synchronized (_completeMessages) {
+ _completeMessages.add(state);
+ _completeMessages.notifyAll();
+ }
+ }
+
public void run() {
- while (_fragments.isAlive()) {
- InboundMessageState message = _fragments.receiveNextMessage();
- if (message == null) continue;
+ InboundMessageState message = null;
+ while (_alive) {
+ try {
+ synchronized (_completeMessages) {
+ if (_completeMessages.size() > 0)
+ message = (InboundMessageState)_completeMessages.remove(0);
+ else
+ _completeMessages.wait();
+ }
+ } catch (InterruptedException ie) {}
- int size = message.getCompleteSize();
- if (_log.shouldLog(Log.INFO))
- _log.info("Full message received (" + message.getMessageId() + ") after " + message.getLifetime()
- + "... todo: parse and plop it onto InNetMessagePool");
- I2NPMessage msg = readMessage(message);
- if (msg != null)
- _transport.messageReceived(msg, null, message.getFrom(), message.getLifetime(), size);
+ if (message != null) {
+ int size = message.getCompleteSize();
+ if (_log.shouldLog(Log.INFO))
+ _log.info("Full message received (" + message.getMessageId() + ") after " + message.getLifetime()
+ + "... todo: parse and plop it onto InNetMessagePool");
+ I2NPMessage msg = readMessage(message);
+ if (msg != null)
+ _transport.messageReceived(msg, null, message.getFrom(), message.getLifetime(), size);
+ message = null;
+ }
}
}
diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java
index 0f9167988..de2330f04 100644
--- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java
+++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java
@@ -34,9 +34,9 @@ public class OutboundMessageFragments {
/** if we can handle more messages explicitly, set this to true */
private boolean _allowExcess;
- private static final int MAX_ACTIVE = 64;
+ private static final int MAX_ACTIVE = 16;
// don't send a packet more than 10 times
- private static final int MAX_VOLLEYS = 10;
+ static final int MAX_VOLLEYS = 10;
public OutboundMessageFragments(RouterContext ctx, UDPTransport transport) {
_context = ctx;
@@ -54,6 +54,7 @@ public class OutboundMessageFragments {
_context.statManager().createRateStat("udp.sendFailed", "How many fragments were in a message that couldn't be delivered", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.sendAggressiveFailed", "How many volleys was a packet sent before we gave up", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.outboundActiveCount", "How many messages are in the active pool when a new one is added", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
+ _context.statManager().createRateStat("udp.sendRejected", "What volley are we on when the peer was throttled (time == message lifetime)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
}
public void startup() { _alive = true; }
@@ -130,6 +131,11 @@ public class OutboundMessageFragments {
_activeMessages.remove(i);
_transport.succeeded(state.getMessage());
state.releaseResources();
+ if (i < _nextPacketMessage) {
+ _nextPacketMessage--;
+ if (_nextPacketMessage < 0)
+ _nextPacketMessage = 0;
+ }
i--;
} else if (state.isExpired()) {
_activeMessages.remove(i);
@@ -144,6 +150,11 @@ public class OutboundMessageFragments {
_log.warn("Unable to send an expired direct message: " + state);
}
state.releaseResources();
+ if (i < _nextPacketMessage) {
+ _nextPacketMessage--;
+ if (_nextPacketMessage < 0)
+ _nextPacketMessage = 0;
+ }
i--;
} else if (state.getPushCount() > MAX_VOLLEYS) {
_activeMessages.remove(i);
@@ -160,6 +171,11 @@ public class OutboundMessageFragments {
_log.warn("Unable to send a direct message after too many volleys: " + state);
}
state.releaseResources();
+ if (i < _nextPacketMessage) {
+ _nextPacketMessage--;
+ if (_nextPacketMessage < 0)
+ _nextPacketMessage = 0;
+ }
i--;
}
}
@@ -214,6 +230,7 @@ public class OutboundMessageFragments {
int fragmentSize = state.fragmentSize(currentFragment);
if (peer.allocateSendingBytes(fragmentSize)) {
+ state.incrementCurrentFragment();
if (_log.shouldLog(Log.INFO))
_log.info("Allocation of " + fragmentSize + " allowed with "
+ peer.getSendWindowBytesRemaining()
@@ -223,24 +240,26 @@ public class OutboundMessageFragments {
if (state.justBeganVolley() && (state.getPushCount() > 0) && (state.getFragmentCount() > 1)) {
peer.messageRetransmitted();
- if (_log.shouldLog(Log.ERROR))
- _log.error("Retransmitting " + state + " to " + peer);
+ if (_log.shouldLog(Log.WARN))
+ _log.warn("Retransmitting " + state + " to " + peer);
}
// for fairness, we move on in a round robin
- _nextPacketMessage = i + 1;
+ //_nextPacketMessage = i + 1;
if (currentFragment >= state.getFragmentCount() - 1) {
// this is the last fragment
_context.statManager().addRateData("udp.sendVolleyTime", state.getLifetime(), state.getFragmentCount());
if (state.getPeer() != null) {
int rto = state.getPeer().getRTO() * state.getPushCount();
- //_log.error("changed volley, rto=" + rto + " volley="+ state.getPushCount());
state.setNextSendTime(now + rto);
} else {
- _log.error("changed volley, unknown peer");
+ if (_log.shouldLog(Log.ERROR))
+ _log.error("changed volley, unknown peer");
state.setNextSendTime(now + 1000 + _context.random().nextInt(2000));
}
+ // only move on in round robin after sending a full volley
+ _nextPacketMessage = (i + 1) % _activeMessages.size();
} else {
if (peer.getSendWindowBytesRemaining() > 0)
state.setNextSendTime(now);
@@ -249,6 +268,7 @@ public class OutboundMessageFragments {
}
break;
} else {
+ _context.statManager().addRateData("udp.sendRejected", state.getPushCount(), state.getLifetime());
if (_log.shouldLog(Log.WARN))
_log.warn("Allocation of " + fragmentSize + " rejected w/ wsize=" + peer.getSendWindowBytes()
+ " available=" + peer.getSendWindowBytesRemaining()
@@ -330,6 +350,11 @@ public class OutboundMessageFragments {
// either the message was a short circuit after establishment,
// or it was received from who we sent it to. yay!
_activeMessages.remove(i);
+ if (i < _nextPacketMessage) {
+ _nextPacketMessage--;
+ if (_nextPacketMessage < 0)
+ _nextPacketMessage = 0;
+ }
_activeMessages.notifyAll();
break;
} else {
@@ -346,8 +371,6 @@ public class OutboundMessageFragments {
_context.statManager().addRateData("udp.sendConfirmTime", state.getLifetime(), state.getLifetime());
_context.statManager().addRateData("udp.sendConfirmFragments", state.getFragmentCount(), state.getLifetime());
_context.statManager().addRateData("udp.sendConfirmVolley", numSends, state.getFragmentCount());
- if ( (numSends > 1) && (state.getPeer() != null) )
- state.getPeer().congestionOccurred();
_transport.succeeded(state.getMessage());
int numFragments = state.getFragmentCount();
if (state.getPeer() != null) {
@@ -359,8 +382,8 @@ public class OutboundMessageFragments {
state.releaseResources();
return numFragments;
} else {
- if (_log.shouldLog(Log.ERROR))
- _log.error("Received an ACK for a message not pending: " + messageId);
+ if (_log.shouldLog(Log.WARN))
+ _log.warn("Received an ACK for a message not pending: " + messageId);
return 0;
}
}
@@ -386,6 +409,11 @@ public class OutboundMessageFragments {
state.acked(ackedFragments);
if (state.isComplete()) {
_activeMessages.remove(i);
+ if (i < _nextPacketMessage) {
+ _nextPacketMessage--;
+ if (_nextPacketMessage < 0)
+ _nextPacketMessage = 0;
+ }
_activeMessages.notifyAll();
}
break;
diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java
index 1f73553ea..221032357 100644
--- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java
+++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java
@@ -174,6 +174,17 @@ public class OutboundMessageState {
return _fragmentSize;
}
+ public void incrementCurrentFragment() {
+ int cur = _nextSendFragment;
+ _fragmentSends[cur]++;
+ _maxSends = _fragmentSends[cur];
+ _nextSendFragment++;
+ if (_nextSendFragment >= _fragmentSends.length) {
+ _nextSendFragment = 0;
+ _pushCount++;
+ }
+ }
+
/**
* Pick a fragment that we still need to send. Current implementation
* picks the fragment which has been sent the least (randomly choosing
@@ -183,15 +194,7 @@ public class OutboundMessageState {
*/
public int pickNextFragment() {
if (true) {
- int rv = _nextSendFragment;
- _fragmentSends[rv]++;
- _maxSends = _fragmentSends[rv];
- _nextSendFragment++;
- if (_nextSendFragment >= _fragmentSends.length) {
- _nextSendFragment = 0;
- _pushCount++;
- }
- return rv;
+ return _nextSendFragment;
}
short minValue = -1;
int minIndex = -1;
diff --git a/router/java/src/net/i2p/router/transport/udp/PacketHandler.java b/router/java/src/net/i2p/router/transport/udp/PacketHandler.java
index 92ba1989e..d6c3eda8c 100644
--- a/router/java/src/net/i2p/router/transport/udp/PacketHandler.java
+++ b/router/java/src/net/i2p/router/transport/udp/PacketHandler.java
@@ -29,7 +29,7 @@ public class PacketHandler {
private InboundMessageFragments _inbound;
private boolean _keepReading;
- private static final int NUM_HANDLERS = 1;
+ private static final int NUM_HANDLERS = 3;
public PacketHandler(RouterContext ctx, UDPTransport transport, UDPEndpoint endpoint, EstablishmentManager establisher, InboundMessageFragments inbound) {
_context = ctx;
diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java
index d84c0f2f3..984f6d0cb 100644
--- a/router/java/src/net/i2p/router/transport/udp/PeerState.java
+++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java
@@ -65,12 +65,14 @@ public class PeerState {
private long _lastReceiveTime;
/** how many consecutive messages have we sent and not received an ACK to */
private int _consecutiveFailedSends;
- /** when did we last have a failed send */
- private long _lastFailedSendMinute;
+ /** when did we last have a failed send (beginning of period) */
+ private long _lastFailedSendPeriod;
/** list of messageIds (Long) that we have received but not yet sent */
private List _currentACKs;
/** when did we last send ACKs to the peer? */
- private long _lastACKSend;
+ private volatile long _lastACKSend;
+ /** when did we decide we need to ACK to this peer? */
+ private volatile long _wantACKSendSince;
/** have we received a packet with the ECN bit set in the current second? */
private boolean _currentSecondECNReceived;
/**
@@ -79,17 +81,17 @@ public class PeerState {
*/
private boolean _remoteWantsPreviousACKs;
/** how many bytes should we send to the peer in a second */
- private int _sendWindowBytes;
+ private volatile int _sendWindowBytes;
/** how many bytes can we send to the peer in the current second */
- private int _sendWindowBytesRemaining;
+ private volatile int _sendWindowBytesRemaining;
private long _lastSendRefill;
- private long _lastCongestionOccurred;
+ private volatile long _lastCongestionOccurred;
/**
* when sendWindowBytes is below this, grow the window size quickly,
* but after we reach it, grow it slowly
*
*/
- private int _slowStartThreshold;
+ private volatile int _slowStartThreshold;
/** what IP is the peer sending and receiving packets on? */
private byte[] _remoteIP;
/** cached IP address */
@@ -116,19 +118,19 @@ public class PeerState {
/** when did we last check the MTU? */
private long _mtuLastChecked;
/** current round trip time estimate */
- private int _rtt;
+ private volatile int _rtt;
/** smoothed mean deviation in the rtt */
- private int _rttDeviation;
+ private volatile int _rttDeviation;
/** current retransmission timeout */
- private int _rto;
+ private volatile int _rto;
private long _messagesReceived;
private long _messagesSent;
- private static final int DEFAULT_SEND_WINDOW_BYTES = 16*1024;
+ private static final int DEFAULT_SEND_WINDOW_BYTES = 8*1024;
private static final int MINIMUM_WINDOW_BYTES = DEFAULT_SEND_WINDOW_BYTES;
private static final int MAX_SEND_WINDOW_BYTES = 1024*1024;
- private static final int DEFAULT_MTU = 1492;
+ private static final int DEFAULT_MTU = 1472;
public PeerState(I2PAppContext ctx) {
_context = ctx;
@@ -306,11 +308,11 @@ public class PeerState {
/** when did we last receive a packet from them? */
public void setLastReceiveTime(long when) { _lastReceiveTime = when; }
public int incrementConsecutiveFailedSends() {
- long now = _context.clock().now()/60*1000;
- if (_lastFailedSendMinute == now) {
+ long now = _context.clock().now()/(10*1000);
+ if (_lastFailedSendPeriod >= now) {
// ignore... too fast
} else {
- _lastFailedSendMinute = now;
+ _lastFailedSendPeriod = now;
_consecutiveFailedSends++;
}
return _consecutiveFailedSends;
@@ -372,6 +374,8 @@ public class PeerState {
/** we received the message specified completely */
public void messageFullyReceived(Long messageId) {
synchronized (_currentACKs) {
+ if (_wantACKSendSince <= 0)
+ _wantACKSendSince = _context.clock().now();
if (!_currentACKs.contains(messageId))
_currentACKs.add(messageId);
}
@@ -383,17 +387,21 @@ public class PeerState {
* the data through.
*
*/
- public void congestionOccurred() {
+ private boolean congestionOccurred() {
long now = _context.clock().now();
- if (_lastCongestionOccurred + 2000 > now)
- return; // only shrink once every other second
+ if (_lastCongestionOccurred + 10*1000 > now)
+ return false; // only shrink once every 10 seconds
_lastCongestionOccurred = now;
- _sendWindowBytes /= 2;
+ //if (true)
+ // _sendWindowBytes -= 10000;
+ //else
+ _sendWindowBytes = (_sendWindowBytes*2) / 3;
if (_sendWindowBytes < MINIMUM_WINDOW_BYTES)
_sendWindowBytes = MINIMUM_WINDOW_BYTES;
if (_sendWindowBytes < _slowStartThreshold)
_slowStartThreshold = _sendWindowBytes;
+ return true;
}
/** pull off the ACKs (Long) to send to the peer */
@@ -404,19 +412,21 @@ public class PeerState {
if (_currentACKs.size() < threshold) {
rv = new ArrayList(_currentACKs);
_currentACKs.clear();
+ _wantACKSendSince = -1;
} else {
rv = new ArrayList(threshold);
for (int i = 0; i < threshold; i++)
rv.add(_currentACKs.remove(0));
}
}
+ _lastACKSend = _context.clock().now();
return rv;
}
/** we sent a message which was ACKed containing the given # of bytes */
public void messageACKed(int bytesACKed, long lifetime, int numSends) {
_consecutiveFailedSends = 0;
- _lastFailedSendMinute = -1;
+ _lastFailedSendPeriod = -1;
if (_sendWindowBytes <= _slowStartThreshold) {
_sendWindowBytes += bytesACKed;
} else {
@@ -449,6 +459,7 @@ public class PeerState {
}
/** we are resending a packet, so lets jack up the rto */
public void messageRetransmitted() {
+ congestionOccurred();
//_rto *= 2;
}
/** how long does it usually take to get a message ACKed? */
@@ -477,6 +488,7 @@ public class PeerState {
/** when did we last send an ACK to the peer? */
public long getLastACKSend() { return _lastACKSend; }
public void setLastACKSend(long when) { _lastACKSend = when; }
+ public long getWantedACKSendSince() { return _wantACKSendSince; }
public boolean unsentACKThresholdReached() {
int threshold = countMaxACKs();
synchronized (_currentACKs) {
diff --git a/router/java/src/net/i2p/router/transport/udp/UDPPacket.java b/router/java/src/net/i2p/router/transport/udp/UDPPacket.java
index 4d340f19a..05a2baeaa 100644
--- a/router/java/src/net/i2p/router/transport/udp/UDPPacket.java
+++ b/router/java/src/net/i2p/router/transport/udp/UDPPacket.java
@@ -31,6 +31,7 @@ public class UDPPacket {
private long _expiration;
private byte[] _data;
private ByteArray _dataBuf;
+ private int _markedType;
private static final List _packetCache;
static {
@@ -72,6 +73,7 @@ public class UDPPacket {
_data = _dataBuf.getData();
_packet = new DatagramPacket(_data, MAX_PACKET_SIZE);
_initializeTime = _context.clock().now();
+ _markedType = -1;
}
public void initialize(short priority, long expiration, InetAddress host, int port) {
@@ -92,8 +94,12 @@ public class UDPPacket {
public DatagramPacket getPacket() { return _packet; }
public short getPriority() { return _priority; }
public long getExpiration() { return _expiration; }
+ public long getBegin() { return _initializeTime; }
public long getLifetime() { return _context.clock().now() - _initializeTime; }
public void resetBegin() { _initializeTime = _context.clock().now(); }
+ /** flag this packet as a particular type for accounting purposes */
+ public void markType(int type) { _markedType = type; }
+ public int getMarkedType() { return _markedType; }
/**
* Validate the packet against the MAC specified, returning true if the
@@ -173,6 +179,7 @@ public class UDPPacket {
rv._log = ctx.logManager().getLog(UDPPacket.class);
rv.resetBegin();
Arrays.fill(rv._data, (byte)0x00);
+ rv._markedType = -1;
return rv;
}
}
diff --git a/router/java/src/net/i2p/router/transport/udp/UDPSender.java b/router/java/src/net/i2p/router/transport/udp/UDPSender.java
index bc7a231ea..c7b5fcaee 100644
--- a/router/java/src/net/i2p/router/transport/udp/UDPSender.java
+++ b/router/java/src/net/i2p/router/transport/udp/UDPSender.java
@@ -25,7 +25,7 @@ public class UDPSender {
private boolean _keepRunning;
private Runner _runner;
- private static final int MAX_QUEUED = 64;
+ private static final int MAX_QUEUED = 4;
public UDPSender(RouterContext ctx, DatagramSocket socket, String name) {
_context = ctx;
@@ -35,7 +35,11 @@ public class UDPSender {
_runner = new Runner();
_name = name;
_context.statManager().createRateStat("udp.pushTime", "How long a UDP packet takes to get pushed out", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
+ _context.statManager().createRateStat("udp.sendQueueSize", "How many packets are queued on the UDP sender", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.sendPacketSize", "How large packets sent are", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
+ _context.statManager().createRateStat("udp.socketSendTime", "How long the actual socket.send took", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
+ _context.statManager().createRateStat("udp.sendBWThrottleTime", "How long the send is blocked by the bandwidth throttle", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
+ _context.statManager().createRateStat("udp.sendACKTime", "How long an ACK packet is blocked for (duration == lifetime)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
}
public void startup() {
@@ -83,6 +87,7 @@ public class UDPSender {
}
} catch (InterruptedException ie) {}
}
+ _context.statManager().addRateData("udp.sendQueueSize", remaining, packet.getLifetime());
return remaining;
}
@@ -97,6 +102,7 @@ public class UDPSender {
size = _outboundQueue.size();
_outboundQueue.notifyAll();
}
+ _context.statManager().addRateData("udp.sendQueueSize", size, packet.getLifetime());
return size;
}
@@ -112,6 +118,7 @@ public class UDPSender {
UDPPacket packet = getNextPacket();
if (packet != null) {
+ long acquireTime = _context.clock().now();
int size = packet.getPacket().getLength();
if (size > 0) {
FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestOutbound(size, "UDP sender");
@@ -119,6 +126,8 @@ public class UDPSender {
req.waitForNextAllocation();
}
+ long afterBW = _context.clock().now();
+
if (_log.shouldLog(Log.DEBUG)) {
int len = packet.getPacket().getLength();
//if (len > 128)
@@ -127,10 +136,16 @@ public class UDPSender {
}
try {
+ long before = _context.clock().now();
synchronized (Runner.this) {
// synchronization lets us update safely
_socket.send(packet.getPacket());
}
+ long sendTime = _context.clock().now() - before;
+ _context.statManager().addRateData("udp.socketSendTime", sendTime, packet.getLifetime());
+ _context.statManager().addRateData("udp.sendBWThrottleTime", afterBW - acquireTime, acquireTime - packet.getBegin());
+ if (packet.getMarkedType() == 1)
+ _context.statManager().addRateData("udp.sendACKTime", afterBW - acquireTime, packet.getLifetime());
_context.statManager().addRateData("udp.pushTime", packet.getLifetime(), packet.getLifetime());
_context.statManager().addRateData("udp.sendPacketSize", packet.getPacket().getLength(), packet.getLifetime());
} catch (IOException ioe) {
diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java
index 040795275..2ebff7471 100644
--- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java
+++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java
@@ -86,9 +86,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
private static final int PRIORITY_WEIGHT[] = new int[] { 1, 1, 1, 1, 1, 2 };
/** should we flood all UDP peers with the configured rate? */
- private static final boolean SHOULD_FLOOD_PEERS = true;
+ private static final boolean SHOULD_FLOOD_PEERS = false;
- private static final int MAX_CONSECUTIVE_FAILED = 2;
+ private static final int MAX_CONSECUTIVE_FAILED = 5;
public UDPTransport(RouterContext ctx) {
super(ctx);
@@ -103,7 +103,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_relayPeers = new ArrayList(1);
_fastBid = new SharedBid(50);
- _slowBid = new SharedBid(100);
+ _slowBid = new SharedBid(1000);
_fragments = new OutboundMessageFragments(_context, this);
_inboundFragments = new InboundMessageFragments(_context, _fragments, this);
@@ -353,6 +353,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_log.debug("bidding on a message to an established peer: " + peer);
return _fastBid;
} else {
+ if (null == toAddress.getTargetAddress(STYLE))
+ return null;
+
if (_log.shouldLog(Log.DEBUG))
_log.debug("bidding on a message to an unestablished peer: " + to.toBase64());
return _slowBid;
@@ -478,12 +481,15 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
public void failed(OutboundMessageState msg) {
if (msg == null) return;
int consecutive = 0;
- if (msg.getPeer() != null)
+ if ( (msg.getPeer() != null) &&
+ ( (msg.getMaxSends() >= OutboundMessageFragments.MAX_VOLLEYS) ||
+ (msg.isExpired())) ) {
consecutive = msg.getPeer().incrementConsecutiveFailedSends();
- if (_log.shouldLog(Log.WARN))
- _log.warn("Consecutive failure #" + consecutive + " sending to " + msg.getPeer());
- if (consecutive > MAX_CONSECUTIVE_FAILED)
- dropPeer(msg.getPeer());
+ if (_log.shouldLog(Log.WARN))
+ _log.warn("Consecutive failure #" + consecutive + " sending to " + msg.getPeer());
+ if (consecutive > MAX_CONSECUTIVE_FAILED)
+ dropPeer(msg.getPeer());
+ }
failed(msg.getMessage());
}
@@ -614,5 +620,6 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
public SharedBid(int ms) { _ms = ms; }
public int getLatency() { return _ms; }
public Transport getTransport() { return UDPTransport.this; }
+ public String toString() { return "UDP bid @ " + _ms; }
}
}