diff --git a/router/java/src/net/i2p/router/OutNetMessage.java b/router/java/src/net/i2p/router/OutNetMessage.java
index 93306e174..4d76ed20e 100644
--- a/router/java/src/net/i2p/router/OutNetMessage.java
+++ b/router/java/src/net/i2p/router/OutNetMessage.java
@@ -85,10 +85,21 @@ public class OutNetMessage {
"OutNetMessage", new long[] { 5*60*1000, 30*60*1000, 60*60*1000 });
}
- public void timestamp(String eventName) {
+ /**
+ * Stamp the message's progress
+ *
+ * @param eventName what occurred
+ * @return how long this message has been 'in flight'
+ */
+ public long timestamp(String eventName) {
synchronized (_timestamps) {
- _timestamps.put(eventName, new Long(_context.clock().now()));
+ long now = _context.clock().now();
+ while (_timestamps.containsKey(eventName)) {
+ eventName = eventName + '.';
+ }
+ _timestamps.put(eventName, new Long(now));
_timestampOrder.add(eventName);
+ return now - _created;
}
}
public Map getTimestamps() {
diff --git a/router/java/src/net/i2p/router/Router.java b/router/java/src/net/i2p/router/Router.java
index 05d28709a..d7a1f7202 100644
--- a/router/java/src/net/i2p/router/Router.java
+++ b/router/java/src/net/i2p/router/Router.java
@@ -286,7 +286,7 @@ public class Router {
if ( (notSent > 0) && (notReceived > 0) ) {
double notSendKBps = notSent / (lifetime*1024.0);
double notReceivedKBps = notReceived / (lifetime*1024.0);
- buf.append("
Lifetime rate: ");
+ buf.append("Lifetime unused rate: ");
buf.append(fmt.format(notSendKBps)).append("KBps outbound unused ");
buf.append(fmt.format(notReceivedKBps)).append("KBps inbound unused");
buf.append("");
diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/DataPublisherJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/DataPublisherJob.java
index 295e0eb4c..38bccd48b 100644
--- a/router/java/src/net/i2p/router/networkdb/kademlia/DataPublisherJob.java
+++ b/router/java/src/net/i2p/router/networkdb/kademlia/DataPublisherJob.java
@@ -23,7 +23,7 @@ import net.i2p.util.Log;
class DataPublisherJob extends JobImpl {
private Log _log;
private KademliaNetworkDatabaseFacade _facade;
- private final static long RERUN_DELAY_MS = 60*1000;
+ private final static long RERUN_DELAY_MS = 120*1000;
private final static int MAX_SEND_PER_RUN = 1; // publish no more than 2 at a time
private final static long STORE_TIMEOUT = 60*1000; // give 'er a minute to send the data
diff --git a/router/java/src/net/i2p/router/transport/TransportImpl.java b/router/java/src/net/i2p/router/transport/TransportImpl.java
index a6f192560..bcf42d42d 100644
--- a/router/java/src/net/i2p/router/transport/TransportImpl.java
+++ b/router/java/src/net/i2p/router/transport/TransportImpl.java
@@ -63,15 +63,28 @@ public abstract class TransportImpl implements Transport {
}
public void afterSend(OutNetMessage msg, boolean sendSuccessful) {
- afterSend(msg, sendSuccessful, true);
+ afterSend(msg, sendSuccessful, true, 0);
}
public void afterSend(OutNetMessage msg, boolean sendSuccessful, boolean allowRequeue) {
+ afterSend(msg, sendSuccessful, allowRequeue, 0);
+ }
+ public void afterSend(OutNetMessage msg, boolean sendSuccessful, long msToSend) {
+ afterSend(msg, sendSuccessful, true, msToSend);
+ }
+ public void afterSend(OutNetMessage msg, boolean sendSuccessful, boolean allowRequeue, long msToSend) {
boolean log = false;
msg.timestamp("afterSend(" + sendSuccessful + ")");
if (!sendSuccessful)
msg.transportFailed(getStyle());
+ if (msToSend > 1000) {
+ if (_log.shouldLog(Log.WARN))
+ _log.warn("afterSend: [success=" + sendSuccessful + "] " + msg.getMessageSize() + "byte "
+ + msg.getMessageType() + " " + msg.getMessageId() + " from "
+ + _context.routerHash().toBase64().substring(0,6) + " took " + msToSend);
+ }
+
long lifetime = msg.getLifetime();
if (lifetime > 5000) {
if (_log.shouldLog(Log.WARN))
@@ -104,23 +117,31 @@ public abstract class TransportImpl implements Transport {
_context.statManager().addRateData("transport.expiredOnQueueLifetime", lifetime, lifetime);
if (allowRequeue) {
- if ( (msg.getExpiration() <= 0) || (msg.getExpiration() > _context.clock().now()) ) {
- // this may not be the last transport available - keep going
- _context.outNetMessagePool().add(msg);
- // don't discard the data yet!
- } else {
- if (_log.shouldLog(Log.INFO))
- _log.info("No more time left (" + new Date(msg.getExpiration())
- + ", expiring without sending successfully the "
- + msg.getMessageType());
- if (msg.getOnFailedSendJob() != null)
- _context.jobQueue().addJob(msg.getOnFailedSendJob());
- MessageSelector selector = msg.getReplySelector();
- if (selector != null) {
- _context.messageRegistry().unregisterPending(msg);
- }
+ if (true) {
+ if (_log.shouldLog(Log.ERROR))
+ _log.error("wtf, requeueing message " + msg.getMessageId() + " of type " + msg.getMessageType(),
+ new Exception("requeued by"));
log = true;
msg.discardData();
+ } else {
+ if ( (msg.getExpiration() <= 0) || (msg.getExpiration() > _context.clock().now()) ) {
+ // this may not be the last transport available - keep going
+ _context.outNetMessagePool().add(msg);
+ // don't discard the data yet!
+ } else {
+ if (_log.shouldLog(Log.INFO))
+ _log.info("No more time left (" + new Date(msg.getExpiration())
+ + ", expiring without sending successfully the "
+ + msg.getMessageType());
+ if (msg.getOnFailedSendJob() != null)
+ _context.jobQueue().addJob(msg.getOnFailedSendJob());
+ MessageSelector selector = msg.getReplySelector();
+ if (selector != null) {
+ _context.messageRegistry().unregisterPending(msg);
+ }
+ log = true;
+ msg.discardData();
+ }
}
} else {
if (_log.shouldLog(Log.INFO))
@@ -211,9 +232,13 @@ public abstract class TransportImpl implements Transport {
protected abstract void outboundMessageReady();
public void messageReceived(I2NPMessage inMsg, RouterIdentity remoteIdent, Hash remoteIdentHash, long msToReceive, int bytesReceived) {
- if (_log.shouldLog(Log.INFO)) {
+ int level = Log.INFO;
+ if (msToReceive > 5000)
+ level = Log.ERROR;
+ if (_log.shouldLog(level)) {
StringBuffer buf = new StringBuffer(128);
buf.append("Message received: ").append(inMsg.getClass().getName());
+ buf.append(" / ").append(inMsg.getUniqueId());
buf.append(" in ").append(msToReceive).append("ms containing ");
buf.append(bytesReceived).append(" bytes ");
buf.append(" from ");
@@ -228,7 +253,7 @@ public abstract class TransportImpl implements Transport {
if (_listener != null)
buf.append(_listener);
- _log.info(buf.toString());
+ _log.log(level, buf.toString());
}
if (remoteIdent != null)
@@ -239,8 +264,9 @@ public abstract class TransportImpl implements Transport {
}
_context.statManager().addRateData("transport.receiveMessageTime", msToReceive, msToReceive);
- if (msToReceive > 1000)
+ if (msToReceive > 1000) {
_context.statManager().addRateData("transport.receiveMessageTimeSlow", msToReceive, msToReceive);
+ }
//// this functionality is built into the InNetMessagePool
//String type = inMsg.getClass().getName();
diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java b/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java
index 98aa1af1b..72c27cf37 100644
--- a/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java
+++ b/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java
@@ -309,7 +309,7 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
for (int i = 0; i < removed.size(); i++) {
OutNetMessage cur = (OutNetMessage)removed.get(i);
msg.timestamp("TCPConnection.addMessage expired but not our fault");
- _transport.afterSend(cur, false);
+ _transport.afterSend(cur, false, false);
}
}
@@ -331,7 +331,7 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
_context.profileManager().commErrorOccurred(_remoteIdentity.getHash());
msg.timestamp("TCPConnection.addMessage saw an expired queued message");
- _transport.afterSend(msg, false);
+ _transport.afterSend(msg, false, false);
// should we really be closing a connection if they're that slow?
// yeah, i think we should.
closeConnection();
@@ -490,7 +490,19 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
timedOut.add(cur);
_toBeSent.remove(i);
i--;
- }
+ } else {
+ long lifetime = cur.timestamp("TCPConnection.runner.locked_expireOldMessages still ok with "
+ + (i) + " ahead and " + (_toBeSent.size()-i-1)
+ + " behind on the queue");
+ if (lifetime > 5*1000) {
+ cur.timestamp("TCPConnection.runner.locked_expireOldMessages lifetime too long - " + lifetime);
+ if (timedOut == null)
+ timedOut = new ArrayList(2);
+ timedOut.add(cur);
+ _toBeSent.remove(i);
+ i--;
+ }
+ }
}
boolean reallySlowFound = false;
@@ -503,8 +515,8 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
+ " timed out while sitting on the TCP Connection's queue! was too slow by: "
+ (now-failed.getExpiration()) + "ms to "
+ _remoteIdentity.getHash().toBase64() + ": " + failed);
- failed.timestamp("TCPConnection.runner.locked_expireOldMessages expired");
- _transport.afterSend(failed, false);
+ failed.timestamp("TCPConnection.runner.locked_expireOldMessages expired with " + _toBeSent.size() + " left");
+ _transport.afterSend(failed, false, false);
if (failed.getLifetime() >= MIN_MESSAGE_LIFETIME_FOR_PENALTY)
reallySlowFound = true;
}
@@ -521,6 +533,15 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
msg.timestamp("TCPConnection.runner.doSend fetched");
long afterExpire = _context.clock().now();
+ long remaining = msg.getExpiration() - afterExpire;
+ if (remaining < 0) {
+ if (_log.shouldLog(Log.WARN))
+ _log.warn("Message " + msg.getMessageType() + "/" + msg.getMessageId()
+ + " expired before doSend (too slow by " + remaining + "ms)");
+ _transport.afterSend(msg, false, false);
+ return true;
+ }
+
byte data[] = msg.getMessageData();
if (data == null) {
if (_log.shouldLog(Log.WARN))
@@ -573,7 +594,7 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener {
+ "ms) - time left (" + timeLeft + ") to "
+ _remoteIdentity.getHash().toBase64() + "\n" + msg.toString());
}
- _transport.afterSend(msg, true);
+ _transport.afterSend(msg, true, (end-beforeWrite));
if (_log.shouldLog(Log.DEBUG))
_log.debug("doSend - message sent completely: "