increase next hop send timeout; log tweaks

This commit is contained in:
zzz
2011-12-06 21:49:23 +00:00
parent 0f384c86fe
commit bd82a0c435
4 changed files with 51 additions and 30 deletions

View File

@@ -148,7 +148,7 @@ public class HopConfig {
if (_receiveTunnelId != null) {
buf.append("recv on ");
buf.append(DataHelper.fromLong(_receiveTunnelId, 0, 4));
buf.append(" ");
buf.append(' ');
}
if (_sendTo != null) {
@@ -157,8 +157,9 @@ public class HopConfig {
buf.append(DataHelper.fromLong(_sendTunnelId, 0, 4));
}
buf.append(" expiring on ").append(TunnelCreatorConfig.format(_expiration));
buf.append(" having transferred ").append(_messagesProcessed).append("KB");
buf.append(" exp. ").append(TunnelCreatorConfig.format(_expiration));
if (_messagesProcessed > 0)
buf.append(" used ").append(_messagesProcessed).append("KB");
return buf.toString();
}
}

View File

@@ -208,36 +208,38 @@ public class TunnelCreatorConfig implements TunnelInfo {
// H0:1235-->H1:2345-->H2:2345
StringBuilder buf = new StringBuilder(128);
if (_isInbound)
buf.append("inbound");
buf.append("IB");
else
buf.append("outbound");
buf.append("OB");
if (_destination == null)
buf.append(" exploratory");
buf.append(": ");
buf.append(" expl");
else
buf.append(" client ").append(Base64.encode(_destination.getData(), 0, 3));
buf.append(": GW ");
for (int i = 0; i < _peers.length; i++) {
buf.append(_peers[i].toBase64().substring(0,4));
buf.append(':');
if (_config[i].getReceiveTunnel() != null)
buf.append(_config[i].getReceiveTunnel());
else
buf.append('x');
buf.append("me");
buf.append('.');
if (_config[i].getSendTunnel() != null)
buf.append(_config[i].getSendTunnel());
else
buf.append('x');
buf.append("me");
if (i + 1 < _peers.length)
buf.append("...");
buf.append("-->");
}
buf.append(" expiring on ").append(getExpirationString());
if (_destination != null)
buf.append(" for ").append(Base64.encode(_destination.getData(), 0, 3));
buf.append(" exp. ").append(getExpirationString());
if (_replyMessageId > 0)
buf.append(" replyMessageId ").append(_replyMessageId);
buf.append(" with ").append(_messagesProcessed).append("/").append(_verifiedBytesTransferred).append(" msgs/bytes");
buf.append(" replyMsgID ").append(_replyMessageId);
if (_messagesProcessed > 0)
buf.append(" with ").append(_messagesProcessed).append("/").append(_verifiedBytesTransferred).append(" msgs/bytes");
buf.append(" with ").append(_failures).append(" failures");
if (_failures > 0)
buf.append(" with ").append(_failures).append(" failures");
return buf.toString();
}

View File

@@ -265,7 +265,7 @@ public class TunnelDispatcher implements Service {
*/
public void joinOutboundEndpoint(HopConfig cfg) {
if (_log.shouldLog(Log.INFO))
_log.info("Joining as outbound endpoint: " + cfg);
_log.info("Joining as OBEP: " + cfg);
TunnelId recvId = cfg.getReceiveTunnel();
OutboundTunnelEndpoint endpoint = new OutboundTunnelEndpoint(_context, cfg, new HopProcessor(_context, cfg, _validator));
_outboundEndpoints.put(recvId, endpoint);
@@ -284,7 +284,7 @@ public class TunnelDispatcher implements Service {
*/
public void joinInboundGateway(HopConfig cfg) {
if (_log.shouldLog(Log.INFO))
_log.info("Joining as inbound gateway: " + cfg);
_log.info("Joining as IBGW: " + cfg);
TunnelGateway.QueuePreprocessor preproc = createPreprocessor(cfg);
TunnelGateway.Sender sender = new InboundSender(_context, cfg);
TunnelGateway.Receiver receiver = new InboundGatewayReceiver(_context, cfg);
@@ -363,13 +363,15 @@ public class TunnelDispatcher implements Service {
*/
public void remove(HopConfig cfg) {
TunnelId recvId = cfg.getReceiveTunnel();
if (_log.shouldLog(Log.DEBUG))
_log.debug("removing " + cfg);
boolean removed = (null != _participatingConfig.remove(recvId));
if (!removed) {
if (_log.shouldLog(Log.INFO))
_log.info("Participating tunnel, but no longer listed in participatingConfig? " + cfg);
if (removed) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("removing " + cfg, new Exception());
} else {
// this is normal, this can get called twice
if (_log.shouldLog(Log.DEBUG))
_log.debug("Participating tunnel, but no longer listed in participatingConfig? " + cfg, new Exception());
}
removed = (null != _participants.remove(recvId));
@@ -837,6 +839,8 @@ public class TunnelDispatcher implements Service {
long exp = cur.getExpiration() + (2 * Router.CLOCK_FUDGE_FACTOR) + LEAVE_BATCH_TIME;
if (exp < now) {
_configs.poll();
if (_log.shouldLog(Log.INFO))
_log.info("Expiring " + cur);
remove(cur);
} else {
if (exp < nextTime)

View File

@@ -60,6 +60,17 @@ class BuildHandler implements Runnable {
private static final int MIN_QUEUE = 18;
private static final int MAX_QUEUE = 192;
private static final int NEXT_HOP_LOOKUP_TIMEOUT = 15*1000;
/**
* This must be high, as if we timeout the send we remove the tunnel from
* participating via OnFailedSendJob.
* If them msg actually got through then we will be dropping
* all the traffic in TunnelDispatcher.dispatch(TunnelDataMessage msg, Hash recvFrom).
* 10s was not enough.
*/
private static final int NEXT_HOP_SEND_TIMEOUT = 15*1000;
public BuildHandler(RouterContext ctx, TunnelPoolManager manager, BuildExecutor exec) {
_context = ctx;
_log = ctx.logManager().getLog(getClass());
@@ -110,8 +121,6 @@ class BuildHandler implements Runnable {
ctx.inNetMessagePool().registerHandlerJobBuilder(VariableTunnelBuildReplyMessage.MESSAGE_TYPE, tbrmhjb);
}
private static final int NEXT_HOP_LOOKUP_TIMEOUT = 15*1000;
/**
* Thread to handle inbound requests
* @since 0.8.11
@@ -650,7 +659,7 @@ class BuildHandler implements Runnable {
// now actually send the response
if (!isOutEnd) {
state.msg.setUniqueId(req.readReplyMessageId());
state.msg.setMessageExpiration(_context.clock().now() + 10*1000);
state.msg.setMessageExpiration(_context.clock().now() + NEXT_HOP_SEND_TIMEOUT);
OutNetMessage msg = new OutNetMessage(_context);
msg.setMessage(state.msg);
msg.setExpiration(state.msg.getMessageExpiration());
@@ -660,6 +669,7 @@ class BuildHandler implements Runnable {
msg.setOnFailedSendJob(new TunnelBuildNextHopFailJob(_context, cfg));
_context.outNetMessagePool().add(msg);
} else {
// We are the OBEP.
// send it to the reply tunnel on the reply peer within a new TunnelBuildReplyMessage
// (enough layers jrandom?)
TunnelBuildReplyMessage replyMsg;
@@ -670,7 +680,7 @@ class BuildHandler implements Runnable {
for (int i = 0; i < records; i++)
replyMsg.setRecord(i, state.msg.getRecord(i));
replyMsg.setUniqueId(req.readReplyMessageId());
replyMsg.setMessageExpiration(_context.clock().now() + 10*1000);
replyMsg.setMessageExpiration(_context.clock().now() + NEXT_HOP_SEND_TIMEOUT);
TunnelGatewayMessage m = new TunnelGatewayMessage(_context);
m.setMessage(replyMsg);
m.setMessageExpiration(replyMsg.getMessageExpiration());
@@ -863,17 +873,21 @@ class BuildHandler implements Runnable {
* but it affects capacity calculations
*/
private static class TunnelBuildNextHopFailJob extends JobImpl {
final HopConfig _cfg;
private final HopConfig _cfg;
private TunnelBuildNextHopFailJob(RouterContext ctx, HopConfig cfg) {
super(ctx);
_cfg = cfg;
}
public String getName() { return "Timeout contacting next peer for tunnel join"; }
public void runJob() {
getContext().tunnelDispatcher().remove(_cfg);
getContext().statManager().addRateData("tunnel.rejectTimeout2", 1, 0);
// static, no _log
//_log.error("Cant contact next hop for " + _cfg);
Log log = getContext().logManager().getLog(BuildHandler.class);
if (log.shouldLog(Log.WARN))
log.warn("Timeout contacting next hop for " + _cfg);
}
}