diff --git a/router/java/src/net/i2p/data/i2np/DatabaseLookupMessage.java b/router/java/src/net/i2p/data/i2np/DatabaseLookupMessage.java index c1ffa6c74..d2e444cee 100644 --- a/router/java/src/net/i2p/data/i2np/DatabaseLookupMessage.java +++ b/router/java/src/net/i2p/data/i2np/DatabaseLookupMessage.java @@ -33,7 +33,7 @@ public class DatabaseLookupMessage extends I2NPMessageImpl { private final static Log _log = new Log(DatabaseLookupMessage.class); public final static int MESSAGE_TYPE = 2; private Hash _key; - private RouterInfo _from; + private Hash _fromHash; private TunnelId _replyTunnel; private Set _dontIncludePeers; @@ -51,11 +51,11 @@ public class DatabaseLookupMessage extends I2NPMessageImpl { public void setSearchKey(Hash key) { _key = key; } /** - * Contains the current router info of the router who requested this lookup + * Contains the router who requested this lookup * */ - public RouterInfo getFrom() { return _from; } - public void setFrom(RouterInfo from) { _from = from; } + public Hash getFrom() { return _fromHash; } + public void setFrom(Hash from) { _fromHash = from; } /** * Contains the tunnel ID a reply should be sent to @@ -82,8 +82,8 @@ public class DatabaseLookupMessage extends I2NPMessageImpl { try { _key = new Hash(); _key.readBytes(in); - _from = new RouterInfo(); - _from.readBytes(in); + _fromHash = new Hash(); + _fromHash.readBytes(in); Boolean val = DataHelper.readBoolean(in); if (val == null) throw new I2NPMessageException("Tunnel must be explicitly specified (or not)"); @@ -109,12 +109,12 @@ public class DatabaseLookupMessage extends I2NPMessageImpl { protected byte[] writeMessage() throws I2NPMessageException, IOException { if (_key == null) throw new I2NPMessageException("Key being searched for not specified"); - if (_from == null) throw new I2NPMessageException("From address not specified"); + if (_fromHash == null) throw new I2NPMessageException("From address not specified"); ByteArrayOutputStream os = new ByteArrayOutputStream(32); try { _key.writeBytes(os); - _from.writeBytes(os); + _fromHash.writeBytes(os); if (_replyTunnel != null) { DataHelper.writeBoolean(os, Boolean.TRUE); _replyTunnel.writeBytes(os); diff --git a/router/java/src/net/i2p/data/i2np/DatabaseSearchReplyMessage.java b/router/java/src/net/i2p/data/i2np/DatabaseSearchReplyMessage.java index f7c9973e5..b74820eb8 100644 --- a/router/java/src/net/i2p/data/i2np/DatabaseSearchReplyMessage.java +++ b/router/java/src/net/i2p/data/i2np/DatabaseSearchReplyMessage.java @@ -34,7 +34,7 @@ public class DatabaseSearchReplyMessage extends I2NPMessageImpl { private final static Log _log = new Log(DatabaseSearchReplyMessage.class); public final static int MESSAGE_TYPE = 3; private Hash _key; - private List _routerInfoStructures; + private List _peerHashes; private Hash _from; public DatabaseSearchReplyMessage(I2PAppContext context) { @@ -42,7 +42,7 @@ public class DatabaseSearchReplyMessage extends I2NPMessageImpl { _context.statManager().createRateStat("netDb.searchReplyMessageSend", "How many search reply messages we send", "Network Database", new long[] { 60*1000, 5*60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("netDb.searchReplyMessageReceive", "How many search reply messages we receive", "Network Database", new long[] { 60*1000, 5*60*1000, 10*60*1000, 60*60*1000 }); setSearchKey(null); - _routerInfoStructures = new ArrayList(); + _peerHashes = new ArrayList(3); setFromHash(null); } @@ -52,10 +52,10 @@ public class DatabaseSearchReplyMessage extends I2NPMessageImpl { public Hash getSearchKey() { return _key; } public void setSearchKey(Hash key) { _key = key; } - public int getNumReplies() { return _routerInfoStructures.size(); } - public RouterInfo getReply(int index) { return (RouterInfo)_routerInfoStructures.get(index); } - public void addReply(RouterInfo info) { _routerInfoStructures.add(info); } - public void addReplies(Collection replies) { _routerInfoStructures.addAll(replies); } + public int getNumReplies() { return _peerHashes.size(); } + public Hash getReply(int index) { return (Hash)_peerHashes.get(index); } + public void addReply(Hash peer) { _peerHashes.add(peer); } + //public void addReplies(Collection replies) { _peerHashes.addAll(replies); } public Hash getFromHash() { return _from; } public void setFromHash(Hash from) { _from = from; } @@ -66,27 +66,18 @@ public class DatabaseSearchReplyMessage extends I2NPMessageImpl { _key = new Hash(); _key.readBytes(in); - int compressedLength = (int)DataHelper.readLong(in, 2); - byte compressedData[] = new byte[compressedLength]; - int read = DataHelper.read(in, compressedData); - if (read != compressedLength) - throw new IOException("Not enough data to decompress"); - byte decompressedData[] = DataHelper.decompress(compressedData); - if (decompressedData == null) - throw new I2NPMessageException("Could not decompress the " + compressedLength + "bytes of data"); - ByteArrayInputStream bais = new ByteArrayInputStream(decompressedData); - int num = (int)DataHelper.readLong(bais, 1); - _routerInfoStructures.clear(); + int num = (int)DataHelper.readLong(in, 1); + _peerHashes.clear(); for (int i = 0; i < num; i++) { - RouterInfo info = new RouterInfo(); - info.readBytes(bais); - addReply(info); + Hash peer = new Hash(); + peer.readBytes(in); + addReply(peer); } _from = new Hash(); _from.readBytes(in); - _context.statManager().addRateData("netDb.searchReplyMessageReceive", compressedLength + 64, 1); + _context.statManager().addRateData("netDb.searchReplyMessageReceive", num*32 + 64, 1); } catch (DataFormatException dfe) { throw new I2NPMessageException("Unable to load the message data", dfe); } @@ -95,10 +86,8 @@ public class DatabaseSearchReplyMessage extends I2NPMessageImpl { protected byte[] writeMessage() throws I2NPMessageException, IOException { if (_key == null) throw new I2NPMessageException("Key in reply to not specified"); - if (_routerInfoStructures == null) - throw new I2NPMessageException("RouterInfo replies are null"); - if (_routerInfoStructures.size() <= 0) - throw new I2NPMessageException("No replies specified in SearchReply! Always include oneself!"); + if (_peerHashes == null) + throw new I2NPMessageException("Peer replies are null"); if (_from == null) throw new I2NPMessageException("No 'from' address specified!"); @@ -107,16 +96,12 @@ public class DatabaseSearchReplyMessage extends I2NPMessageImpl { try { _key.writeBytes(os); - ByteArrayOutputStream baos = new ByteArrayOutputStream(512); - DataHelper.writeLong(baos, 1, _routerInfoStructures.size()); + DataHelper.writeLong(os, 1, _peerHashes.size()); for (int i = 0; i < getNumReplies(); i++) { - RouterInfo info = getReply(i); - info.writeBytes(baos); + Hash peer = getReply(i); + peer.writeBytes(os); } - byte compressed[] = DataHelper.compress(baos.toByteArray()); - DataHelper.writeLong(os, 2, compressed.length); - os.write(compressed); _from.writeBytes(os); rv = os.toByteArray(); @@ -134,7 +119,7 @@ public class DatabaseSearchReplyMessage extends I2NPMessageImpl { DatabaseSearchReplyMessage msg = (DatabaseSearchReplyMessage)object; return DataHelper.eq(getSearchKey(),msg.getSearchKey()) && DataHelper.eq(getFromHash(),msg.getFromHash()) && - DataHelper.eq(_routerInfoStructures,msg._routerInfoStructures); + DataHelper.eq(_peerHashes,msg._peerHashes); } else { return false; } @@ -143,7 +128,7 @@ public class DatabaseSearchReplyMessage extends I2NPMessageImpl { public int hashCode() { return DataHelper.hashCode(getSearchKey()) + DataHelper.hashCode(getFromHash()) + - DataHelper.hashCode(_routerInfoStructures); + DataHelper.hashCode(_peerHashes); } public String toString() { diff --git a/router/java/src/net/i2p/data/i2np/GarlicMessage.java b/router/java/src/net/i2p/data/i2np/GarlicMessage.java index 4e0842a90..d5dbf5abf 100644 --- a/router/java/src/net/i2p/data/i2np/GarlicMessage.java +++ b/router/java/src/net/i2p/data/i2np/GarlicMessage.java @@ -42,7 +42,7 @@ public class GarlicMessage extends I2NPMessageImpl { _data = new byte[(int)len]; int read = read(in, _data); if (read != len) - throw new I2NPMessageException("Incorrect size read"); + throw new I2NPMessageException("Incorrect size read [" + read + " read, expected " + len + "]"); } catch (DataFormatException dfe) { throw new I2NPMessageException("Unable to load the message data", dfe); } diff --git a/router/java/src/net/i2p/router/MessageHistory.java b/router/java/src/net/i2p/router/MessageHistory.java index bcf3e7f9e..a38d4b61e 100644 --- a/router/java/src/net/i2p/router/MessageHistory.java +++ b/router/java/src/net/i2p/router/MessageHistory.java @@ -277,11 +277,13 @@ public class MessageHistory { * @param id tunnel ID we received a message for * @param from peer that sent us this message (if known) */ - public void droppedTunnelMessage(TunnelId id, Hash from) { + public void droppedTunnelMessage(TunnelId id, long msgId, Date expiration, Hash from) { if (!_doLog) return; StringBuffer buf = new StringBuffer(128); buf.append(getPrefix()); - buf.append("dropped message for unknown tunnel [").append(id.getTunnelId()).append("] from [").append(getName(from)).append("]"); + buf.append("dropped message ").append(msgId).append(" for unknown tunnel [").append(id.getTunnelId()); + buf.append("] from [").append(getName(from)).append("]").append(" expiring on "); + buf.append(getTime(expiration)); addEntry(buf.toString()); } diff --git a/router/java/src/net/i2p/router/message/GarlicMessageBuilder.java b/router/java/src/net/i2p/router/message/GarlicMessageBuilder.java index e71bdf7a7..33fce5cd4 100644 --- a/router/java/src/net/i2p/router/message/GarlicMessageBuilder.java +++ b/router/java/src/net/i2p/router/message/GarlicMessageBuilder.java @@ -54,7 +54,8 @@ public class GarlicMessageBuilder { noteWrap(ctx, msg, config); - log.info("Encrypted with public key " + key + " to expire on " + new Date(config.getExpiration())); + if (log.shouldLog(Log.INFO)) + log.info("Encrypted with public key " + key + " to expire on " + new Date(config.getExpiration())); byte cloveSet[] = buildCloveSet(ctx, config); @@ -64,26 +65,34 @@ public class GarlicMessageBuilder { wrappedKey.setData(curKey.getData()); int availTags = ctx.sessionKeyManager().getAvailableTags(key, curKey); - log.debug("Available tags for encryption to " + key + ": " + availTags); + if (log.shouldLog(Log.DEBUG)) + log.debug("Available tags for encryption to " + key + ": " + availTags); if (availTags < 10) { // arbitrary threshold for (int i = 0; i < 20; i++) wrappedTags.add(new SessionTag(true)); - log.info("Less than 10 tags are available (" + availTags + "), so we're including 20 more"); + if (log.shouldLog(Log.INFO)) + log.info("Less than 10 tags are available (" + availTags + "), so we're including 20 more"); } else if (ctx.sessionKeyManager().getAvailableTimeLeft(key, curKey) < 30*1000) { // if we have > 10 tags, but they expire in under 30 seconds, we want more for (int i = 0; i < 20; i++) wrappedTags.add(new SessionTag(true)); - log.info("Tags are almost expired, adding 20 new ones"); + if (log.shouldLog(Log.INFO)) + log.info("Tags are almost expired, adding 20 new ones"); } else { // always tack on at least one more - not necessary. //wrappedTags.add(new SessionTag(true)); } SessionTag curTag = ctx.sessionKeyManager().consumeNextAvailableTag(key, curKey); - byte encData[] = ctx.elGamalAESEngine().encrypt(cloveSet, key, curKey, wrappedTags, curTag, 1024); + byte encData[] = ctx.elGamalAESEngine().encrypt(cloveSet, key, curKey, wrappedTags, curTag, 256); msg.setData(encData); Date exp = new Date(config.getExpiration()); msg.setMessageExpiration(exp); + + if (log.shouldLog(Log.WARN)) + log.warn("CloveSet size for message " + msg.getUniqueId() + " is " + cloveSet.length + + " and encrypted message data is " + encData.length); + return msg; } diff --git a/router/java/src/net/i2p/router/message/HandleTunnelMessageJob.java b/router/java/src/net/i2p/router/message/HandleTunnelMessageJob.java index 9436db247..30751ef36 100644 --- a/router/java/src/net/i2p/router/message/HandleTunnelMessageJob.java +++ b/router/java/src/net/i2p/router/message/HandleTunnelMessageJob.java @@ -29,8 +29,11 @@ import net.i2p.data.i2np.TunnelMessage; import net.i2p.data.i2np.TunnelVerificationStructure; import net.i2p.router.ClientMessage; import net.i2p.router.InNetMessage; +import net.i2p.router.Job; import net.i2p.router.JobImpl; import net.i2p.router.MessageReceptionInfo; +import net.i2p.router.MessageSelector; +import net.i2p.router.ReplyJob; import net.i2p.router.Router; import net.i2p.router.RouterContext; import net.i2p.router.TunnelInfo; @@ -43,7 +46,7 @@ public class HandleTunnelMessageJob extends JobImpl { private Hash _fromHash; private I2NPMessageHandler _handler; - private final static long FORWARD_TIMEOUT = 60*1000; + private final static int FORWARD_TIMEOUT = 60*1000; private final static int FORWARD_PRIORITY = 400; public HandleTunnelMessageJob(RouterContext ctx, TunnelMessage msg, RouterIdentity from, Hash fromHash) { @@ -60,10 +63,7 @@ public class HandleTunnelMessageJob extends JobImpl { _fromHash = fromHash; } - public String getName() { return "Handle Inbound Tunnel Message"; } - public void runJob() { - TunnelId id = _message.getTunnelId(); - + private TunnelInfo validate(TunnelId id) { long excessLag = getContext().clock().now() - _message.getMessageExpiration().getTime(); if (excessLag > Router.CLOCK_FUDGE_FACTOR) { // expired while on the queue @@ -76,7 +76,7 @@ public class HandleTunnelMessageJob extends JobImpl { getContext().messageHistory().messageProcessingError(_message.getUniqueId(), TunnelMessage.class.getName(), "tunnel message expired on the queue"); - return; + return null; } else if (excessLag > 0) { // almost expired while on the queue if (_log.shouldLog(Log.WARN)) @@ -84,6 +84,8 @@ public class HandleTunnelMessageJob extends JobImpl { + id.getTunnelId() + " expiring " + excessLag + "ms ago"); + } else { + // not expired } TunnelInfo info = getContext().tunnelManager().getTunnelInfo(id); @@ -92,16 +94,19 @@ public class HandleTunnelMessageJob extends JobImpl { Hash from = _fromHash; if (_from != null) from = _from.getHash(); - getContext().messageHistory().droppedTunnelMessage(id, from); + getContext().messageHistory().droppedTunnelMessage(id, _message.getUniqueId(), + _message.getMessageExpiration(), + from); if (_log.shouldLog(Log.ERROR)) _log.error("Received a message for an unknown tunnel [" + id.getTunnelId() + "], dropping it: " + _message, getAddedBy()); long timeRemaining = _message.getMessageExpiration().getTime() - getContext().clock().now(); getContext().statManager().addRateData("tunnel.unknownTunnelTimeLeft", timeRemaining, 0); - return; + long lag = getTiming().getActualStart() - getTiming().getStartAfter(); + if (_log.shouldLog(Log.ERROR)) + _log.error("Lag processing a dropped tunnel message: " + lag); + return null; } - - info.messageProcessed(); info = getUs(info); if (info == null) { @@ -109,94 +114,123 @@ public class HandleTunnelMessageJob extends JobImpl { _log.error("We are not part of a known tunnel?? wtf! drop.", getAddedBy()); long timeRemaining = _message.getMessageExpiration().getTime() - getContext().clock().now(); getContext().statManager().addRateData("tunnel.unknownTunnelTimeLeft", timeRemaining, 0); - return; - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Tunnel message received for tunnel: \n" + info); + return null; } - //if ( (_message.getVerificationStructure() == null) && (info.getSigningKey() != null) ) { - if (_message.getVerificationStructure() == null) { - if (info.getSigningKey() != null) { - if (info.getNextHop() != null) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("We are the gateway to tunnel " + id.getTunnelId()); - byte data[] = _message.getData(); - I2NPMessage msg = getBody(data); - getContext().jobQueue().addJob(new HandleGatewayMessageJob(msg, info, data.length)); - return; - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("We are the gateway and the endpoint for tunnel " + id.getTunnelId()); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Process locally"); - if (info.getDestination() != null) { - if (!getContext().clientManager().isLocal(info.getDestination())) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Received a message on a tunnel allocated to a client that has disconnected - dropping it!"); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Dropping message for disconnected client: " + _message); + return info; + } - getContext().messageHistory().droppedOtherMessage(_message); - getContext().messageHistory().messageProcessingError(_message.getUniqueId(), - _message.getClass().getName(), - "Disconnected client"); - return; - } - } - - I2NPMessage body = getBody(_message.getData()); - if (body != null) { - getContext().jobQueue().addJob(new HandleLocallyJob(body, info)); - return; - } else { - if (_log.shouldLog(Log.ERROR)) - _log.error("Body is null! content of message.getData() = [" + - DataHelper.toString(_message.getData()) + "]", getAddedBy()); + /** + * The current router may be the gateway to the tunnel since there is no + * verification data, or it could be a b0rked message. + * + */ + private void receiveUnverified(TunnelInfo info) { + if (info.getSigningKey() != null) { + if (info.getNextHop() != null) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("We are the gateway to tunnel " + info.getTunnelId().getTunnelId()); + byte data[] = _message.getData(); + I2NPMessage msg = getBody(data); + getContext().jobQueue().addJob(new HandleGatewayMessageJob(msg, info, data.length)); + return; + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("We are the gateway and the endpoint for tunnel " + info.getTunnelId().getTunnelId()); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Process locally"); + if (info.getDestination() != null) { + if (!getContext().clientManager().isLocal(info.getDestination())) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Received a message on a tunnel allocated to a client that has disconnected - dropping it!"); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Message that failed: " + _message, getAddedBy()); + _log.debug("Dropping message for disconnected client: " + _message); + + getContext().messageHistory().droppedOtherMessage(_message); + getContext().messageHistory().messageProcessingError(_message.getUniqueId(), + _message.getClass().getName(), + "Disconnected client"); return; } } - } else { - if (_log.shouldLog(Log.ERROR)) - _log.error("Received a message that we are not the gateway for on tunnel " - + id.getTunnelId() + " without a verification structure: " + _message, getAddedBy()); - return; + + I2NPMessage body = getBody(_message.getData()); + if (body != null) { + getContext().jobQueue().addJob(new HandleLocallyJob(body, info)); + return; + } else { + if (_log.shouldLog(Log.ERROR)) + _log.error("Body is null! content of message.getData() = [" + + DataHelper.toString(_message.getData()) + "]", getAddedBy()); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Message that failed: " + _message, getAddedBy()); + return; + } } } else { - // participant - TunnelVerificationStructure struct = _message.getVerificationStructure(); - boolean ok = struct.verifySignature(getContext(), info.getVerificationKey().getKey()); - if (!ok) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Failed tunnel verification! Spoofing / tagging attack? " + _message, getAddedBy()); - return; - } else { - if (info.getNextHop() != null) { - if (_log.shouldLog(Log.INFO)) - _log.info("Message for tunnel " + id.getTunnelId() - + " received where we're not the gateway and there are remaining hops, so forward it on to " - + info.getNextHop().toBase64() + " via SendTunnelMessageJob"); + if (_log.shouldLog(Log.ERROR)) + _log.error("Received a message that we are not the gateway for on tunnel " + + info.getTunnelId().getTunnelId() + + " without a verification structure: " + _message, getAddedBy()); + return; + } + } - getContext().statManager().addRateData("tunnel.relayMessageSize", + /** + * We may be a participant in the tunnel, as there is a verification structure. + * + */ + private void receiveParticipant(TunnelInfo info) { + // participant + TunnelVerificationStructure struct = _message.getVerificationStructure(); + boolean ok = struct.verifySignature(getContext(), info.getVerificationKey().getKey()); + if (!ok) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Failed tunnel verification! Spoofing / tagging attack? " + _message, getAddedBy()); + return; + } else { + if (info.getNextHop() != null) { + if (_log.shouldLog(Log.INFO)) + _log.info("Message for tunnel " + info.getTunnelId().getTunnelId() + + " received where we're not the gateway and there are remaining hops, so forward it on to " + + info.getNextHop().toBase64() + " via SendTunnelMessageJob"); + + getContext().statManager().addRateData("tunnel.relayMessageSize", _message.getData().length, 0); - getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), _message, - info.getNextHop(), - getContext().clock().now() + FORWARD_TIMEOUT, - FORWARD_PRIORITY)); - return; - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("No more hops, unwrap and follow the instructions"); - getContext().jobQueue().addJob(new HandleEndpointJob(info)); - return; - } + SendMessageDirectJob j = new SendMessageDirectJob(getContext(), _message, + info.getNextHop(), + (int)(_message.getMessageExpiration().getTime() - getContext().clock().now()), + FORWARD_PRIORITY); + getContext().jobQueue().addJob(j); + return; + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("No more hops, unwrap and follow the instructions"); + getContext().jobQueue().addJob(new HandleEndpointJob(info)); + return; } } } + public String getName() { return "Handle Inbound Tunnel Message"; } + public void runJob() { + TunnelId id = _message.getTunnelId(); + + TunnelInfo info = validate(id); + if (info == null) + return; + + info.messageProcessed(); + + //if ( (_message.getVerificationStructure() == null) && (info.getSigningKey() != null) ) { + if (_message.getVerificationStructure() == null) { + receiveUnverified(info); + } else { + receiveParticipant(info); + } + } + private void processLocally(TunnelInfo ourPlace) { if (ourPlace.getEncryptionKey() == null) { if (_log.shouldLog(Log.ERROR)) @@ -288,8 +322,7 @@ public class HandleTunnelMessageJob extends JobImpl { ByteArrayOutputStream baos = new ByteArrayOutputStream(1024); body.writeBytes(baos); msg.setData(baos.toByteArray()); - long exp = getContext().clock().now() + FORWARD_TIMEOUT; - getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), msg, router, exp, FORWARD_PRIORITY)); + getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), msg, router, FORWARD_TIMEOUT, FORWARD_PRIORITY)); String bodyType = body.getClass().getName(); getContext().messageHistory().wrap(bodyType, body.getUniqueId(), TunnelMessage.class.getName(), msg.getUniqueId()); @@ -306,8 +339,7 @@ public class HandleTunnelMessageJob extends JobImpl { // TODO: we may want to send it via a tunnel later on, but for now, direct will do. if (_log.shouldLog(Log.DEBUG)) _log.debug("Sending on to requested router " + router.toBase64()); - long exp = getContext().clock().now() + FORWARD_TIMEOUT; - getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), body, router, exp, FORWARD_PRIORITY)); + getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), body, router, FORWARD_TIMEOUT, FORWARD_PRIORITY)); } private void sendToLocal(I2NPMessage body) { @@ -467,11 +499,24 @@ public class HandleTunnelMessageJob extends JobImpl { public void runJob() { RouterContext ctx = HandleTunnelMessageJob.this.getContext(); if (_body != null) { + long expiration = _body.getMessageExpiration().getTime(); + long timeout = expiration - ctx.clock().now(); ctx.statManager().addRateData("tunnel.gatewayMessageSize", _length, 0); if (_log.shouldLog(Log.INFO)) - _log.info("Message for tunnel " + _info.getTunnelId() + " received at the gateway (us), and since its > 0 length, forward the " - + _body.getClass().getName() + " message on to " + _info.getNextHop().toBase64() + " via SendTunnelMessageJob"); - ctx.jobQueue().addJob(new SendTunnelMessageJob(ctx, _body, _info.getTunnelId(), null, null, null, null, FORWARD_TIMEOUT, FORWARD_PRIORITY)); + _log.info("Message for tunnel " + _info.getTunnelId() + + " received at the gateway (us), and since its > 0 length, forward the " + + _body.getClass().getName() + " message on to " + + _info.getNextHop().toBase64() + " via SendTunnelMessageJob expiring in " + + timeout + "ms"); + + MessageSelector selector = null; + Job onFailure = null; + Job onSuccess = null; + ReplyJob onReply = null; + Hash targetRouter = null; + TunnelId targetTunnelId = null; + SendTunnelMessageJob j = new SendTunnelMessageJob(ctx, _body, _info.getTunnelId(), targetRouter, targetTunnelId, onSuccess, onReply, onFailure, selector, timeout, FORWARD_PRIORITY); + ctx.jobQueue().addJob(j); } else { if (_log.shouldLog(Log.WARN)) _log.warn("Body of the message for the tunnel could not be parsed"); diff --git a/router/java/src/net/i2p/router/message/MessageHandler.java b/router/java/src/net/i2p/router/message/MessageHandler.java index 22f059680..605e38b86 100644 --- a/router/java/src/net/i2p/router/message/MessageHandler.java +++ b/router/java/src/net/i2p/router/message/MessageHandler.java @@ -123,7 +123,8 @@ class MessageHandler { if (_log.shouldLog(Log.INFO)) _log.info("Handle " + message.getClass().getName() + " to a remote router " + instructions.getRouter().toBase64() + " - fire a SendMessageDirectJob"); - SendMessageDirectJob j = new SendMessageDirectJob(_context, message, instructions.getRouter(), expiration, priority); + int timeoutMs = (int)(expiration-_context.clock().now()); + SendMessageDirectJob j = new SendMessageDirectJob(_context, message, instructions.getRouter(), timeoutMs, priority); _context.jobQueue().addJob(j); } @@ -160,7 +161,7 @@ class MessageHandler { _log.debug("Placing message of type " + message.getClass().getName() + " into the new tunnel message bound for " + tunnelId.getTunnelId() + " on " + to.toBase64()); - _context.jobQueue().addJob(new SendMessageDirectJob(_context, msg, to, expiration, priority)); + _context.jobQueue().addJob(new SendMessageDirectJob(_context, msg, to, (int)timeoutMs, priority)); String bodyType = message.getClass().getName(); _context.messageHistory().wrap(bodyType, message.getUniqueId(), TunnelMessage.class.getName(), msg.getUniqueId()); diff --git a/router/java/src/net/i2p/router/message/SendMessageDirectJob.java b/router/java/src/net/i2p/router/message/SendMessageDirectJob.java index 03e53615f..d0a766574 100644 --- a/router/java/src/net/i2p/router/message/SendMessageDirectJob.java +++ b/router/java/src/net/i2p/router/message/SendMessageDirectJob.java @@ -37,24 +37,20 @@ public class SendMessageDirectJob extends JobImpl { private boolean _sent; private long _searchOn; - private final static long DEFAULT_TIMEOUT = 60*1000; - - public SendMessageDirectJob(RouterContext ctx, I2NPMessage message, Hash toPeer, long expiration, int priority) { - this(ctx, message, toPeer, null, null, null, null, expiration, priority); + public SendMessageDirectJob(RouterContext ctx, I2NPMessage message, Hash toPeer, int timeoutMs, int priority) { + this(ctx, message, toPeer, null, null, null, null, timeoutMs, priority); } - public SendMessageDirectJob(RouterContext ctx, I2NPMessage message, Hash toPeer, int priority) { - this(ctx, message, toPeer, DEFAULT_TIMEOUT+ctx.clock().now(), priority); + public SendMessageDirectJob(RouterContext ctx, I2NPMessage message, Hash toPeer, ReplyJob onSuccess, Job onFail, MessageSelector selector, int timeoutMs, int priority) { + this(ctx, message, toPeer, null, onSuccess, onFail, selector, timeoutMs, priority); } - public SendMessageDirectJob(RouterContext ctx, I2NPMessage message, Hash toPeer, ReplyJob onSuccess, Job onFail, MessageSelector selector, long expiration, int priority) { - this(ctx, message, toPeer, null, onSuccess, onFail, selector, expiration, priority); - } - public SendMessageDirectJob(RouterContext ctx, I2NPMessage message, Hash toPeer, Job onSend, ReplyJob onSuccess, Job onFail, MessageSelector selector, long expiration, int priority) { + public SendMessageDirectJob(RouterContext ctx, I2NPMessage message, Hash toPeer, Job onSend, ReplyJob onSuccess, Job onFail, MessageSelector selector, int timeoutMs, int priority) { super(ctx); + if (timeoutMs <= 0) throw new IllegalArgumentException("specify a timeout (" + timeoutMs + ")"); _log = getContext().logManager().getLog(SendMessageDirectJob.class); _message = message; _targetHash = toPeer; _router = null; - _expiration = expiration; + _expiration = timeoutMs + ctx.clock().now(); _priority = priority; _searchOn = 0; _alreadySearched = false; @@ -67,27 +63,22 @@ public class SendMessageDirectJob extends JobImpl { if (_targetHash == null) throw new IllegalArgumentException("Attempt to send a message to a null peer"); _sent = false; - long remaining = expiration - getContext().clock().now(); - if (remaining < 50*1000) { - _log.info("Sending message to expire in " + remaining + "ms containing " + message.getUniqueId() + " (a " + message.getClass().getName() + ")", new Exception("SendDirect from")); - } } public String getName() { return "Send Message Direct"; } public void runJob() { long now = getContext().clock().now(); - if (_expiration == 0) - _expiration = now + DEFAULT_TIMEOUT; if (_expiration - 30*1000 < now) { - _log.info("Soon to expire sendDirect of " + _message.getClass().getName() - + " [expiring in " + (_expiration-now) + "]", getAddedBy()); + if (_log.shouldLog(Log.INFO)) + _log.info("Soon to expire sendDirect of " + _message.getClass().getName() + + " [expiring in " + (_expiration-now) + "]", getAddedBy()); } if (_expiration < now) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Timed out sending message " + _message + " directly (expiration = " - + new Date(_expiration) + ") to " + _targetHash.toBase64(), getAddedBy()); + if (_log.shouldLog(Log.ERROR)) + _log.error("Timed out sending message " + _message + " directly (expiration = " + + new Date(_expiration) + ") to " + _targetHash.toBase64(), getAddedBy()); return; } if (_router != null) { @@ -109,8 +100,8 @@ public class SendMessageDirectJob extends JobImpl { _searchOn = getContext().clock().now(); _alreadySearched = true; } else { - if (_log.shouldLog(Log.WARN)) - _log.warn("Unable to find the router to send to: " + _targetHash + if (_log.shouldLog(Log.ERROR)) + _log.error("Unable to find the router to send to: " + _targetHash + " after searching for " + (getContext().clock().now()-_searchOn) + "ms, message: " + _message, getAddedBy()); } diff --git a/router/java/src/net/i2p/router/message/SendReplyMessageJob.java b/router/java/src/net/i2p/router/message/SendReplyMessageJob.java index 424fba4bb..77d05e769 100644 --- a/router/java/src/net/i2p/router/message/SendReplyMessageJob.java +++ b/router/java/src/net/i2p/router/message/SendReplyMessageJob.java @@ -56,7 +56,8 @@ public class SendReplyMessageJob extends JobImpl { */ protected void send(I2NPMessage msg) { _log.info("Sending reply with " + _message.getClass().getName() + " in a sourceRouteeplyMessage to " + _block.getRouter().toBase64()); - SendMessageDirectJob j = new SendMessageDirectJob(getContext(), msg, _block.getRouter(), _priority); + int timeout = (int)(msg.getMessageExpiration().getTime()-getContext().clock().now()); + SendMessageDirectJob j = new SendMessageDirectJob(getContext(), msg, _block.getRouter(), timeout, _priority); getContext().jobQueue().addJob(j); } diff --git a/router/java/src/net/i2p/router/message/SendTunnelMessageJob.java b/router/java/src/net/i2p/router/message/SendTunnelMessageJob.java index e1aae2d67..30a6f39fd 100644 --- a/router/java/src/net/i2p/router/message/SendTunnelMessageJob.java +++ b/router/java/src/net/i2p/router/message/SendTunnelMessageJob.java @@ -136,7 +136,8 @@ public class SendTunnelMessageJob extends JobImpl { getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), msg, _destRouter, _onSend, _onReply, _onFailure, - _selector, _expiration, + _selector, + (int)(_expiration-getContext().clock().now()), _priority)); String bodyType = _message.getClass().getName(); @@ -186,10 +187,11 @@ public class SendTunnelMessageJob extends JobImpl { } msg.setMessageExpiration(new Date(_expiration)); getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), msg, - info.getNextHop(), _onSend, - _onReply, _onFailure, - _selector, _expiration, - _priority)); + info.getNextHop(), _onSend, + _onReply, _onFailure, + _selector, + (int)(_expiration - getContext().clock().now()), + _priority)); } } @@ -234,7 +236,11 @@ public class SendTunnelMessageJob extends JobImpl { if (_log.shouldLog(Log.INFO)) _log.info("Message for tunnel " + info.getTunnelId().getTunnelId() + " received where we're not the gateway and there are remaining hops, so forward it on to " + info.getNextHop().toBase64() + " via SendMessageDirectJob"); - getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), msg, info.getNextHop(), _onSend, null, _onFailure, null, _message.getMessageExpiration().getTime(), _priority)); + SendMessageDirectJob j = new SendMessageDirectJob(getContext(), msg, info.getNextHop(), _onSend, + null, _onFailure, null, + (int)(_message.getMessageExpiration().getTime() - getContext().clock().now()), + _priority); + getContext().jobQueue().addJob(j); return; } else { if (_log.shouldLog(Log.ERROR)) @@ -273,6 +279,9 @@ public class SendTunnelMessageJob extends JobImpl { return (us.getSigningKey() != null); // only the gateway can sign } + private static final int INSTRUCTIONS_PADDING = 32; + private static final int PAYLOAD_PADDING = 32; + /** * Build the tunnel message with appropriate instructions for the * tunnel endpoint, then encrypt and sign it. @@ -326,8 +335,8 @@ public class SendTunnelMessageJob extends JobImpl { return null; } - byte encryptedInstructions[] = encrypt(instructions, info.getEncryptionKey().getKey(), 512); - byte encryptedMessage[] = encrypt(_message, key, 1024); + byte encryptedInstructions[] = encrypt(instructions, info.getEncryptionKey().getKey(), INSTRUCTIONS_PADDING); + byte encryptedMessage[] = encrypt(_message, key, PAYLOAD_PADDING); TunnelVerificationStructure verification = createVerificationStructure(encryptedMessage, info); String bodyType = _message.getClass().getName(); @@ -457,9 +466,11 @@ public class SendTunnelMessageJob extends JobImpl { TunnelMessage.class.getName(), msg.getUniqueId()); // don't specify a selector, since createFakeOutNetMessage already does that - getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), msg, _destRouter, - _onSend, _onReply, _onFailure, - null, _expiration, _priority)); + SendMessageDirectJob j = new SendMessageDirectJob(getContext(), msg, _destRouter, + _onSend, _onReply, _onFailure, + null, (int)(_expiration-getContext().clock().now()), + _priority); + getContext().jobQueue().addJob(j); } /** diff --git a/router/java/src/net/i2p/router/networkdb/DatabaseSearchReplyMessageHandler.java b/router/java/src/net/i2p/router/networkdb/DatabaseSearchReplyMessageHandler.java deleted file mode 100644 index bf903f003..000000000 --- a/router/java/src/net/i2p/router/networkdb/DatabaseSearchReplyMessageHandler.java +++ /dev/null @@ -1,33 +0,0 @@ -package net.i2p.router.networkdb; -/* - * free (adj.): unencumbered; not under the control of others - * Written by jrandom in 2003 and released into the public domain - * with no warranty of any kind, either expressed or implied. - * It probably won't make your computer catch on fire, or eat - * your children, but it might. Use at your own risk. - * - */ - -import net.i2p.data.Hash; -import net.i2p.data.RouterIdentity; -import net.i2p.data.i2np.DatabaseSearchReplyMessage; -import net.i2p.data.i2np.I2NPMessage; -import net.i2p.data.i2np.SourceRouteBlock; -import net.i2p.router.HandlerJobBuilder; -import net.i2p.router.Job; -import net.i2p.router.RouterContext; - -/** - * Build a HandleDatabaseSearchReplyMessageJob whenever a DatabaseSearchReplyMessage arrives - * - */ -public class DatabaseSearchReplyMessageHandler implements HandlerJobBuilder { - private RouterContext _context; - public DatabaseSearchReplyMessageHandler(RouterContext context) { - _context = context; - } - public Job createJob(I2NPMessage receivedMessage, RouterIdentity from, Hash fromHash, SourceRouteBlock replyBlock) { - // ignore the reply block for now - return new HandleDatabaseSearchReplyMessageJob(_context, (DatabaseSearchReplyMessage)receivedMessage, from, fromHash); - } -} diff --git a/router/java/src/net/i2p/router/networkdb/HandleDatabaseLookupMessageJob.java b/router/java/src/net/i2p/router/networkdb/HandleDatabaseLookupMessageJob.java index 3614e8074..683cf62d2 100644 --- a/router/java/src/net/i2p/router/networkdb/HandleDatabaseLookupMessageJob.java +++ b/router/java/src/net/i2p/router/networkdb/HandleDatabaseLookupMessageJob.java @@ -11,6 +11,7 @@ package net.i2p.router.networkdb; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.Date; +import java.util.Iterator; import java.util.Set; import net.i2p.data.DataFormatException; @@ -61,7 +62,7 @@ public class HandleDatabaseLookupMessageJob extends JobImpl { if (_log.shouldLog(Log.DEBUG)) _log.debug("Handling database lookup message for " + _message.getSearchKey()); - Hash fromKey = _message.getFrom().getIdentity().getHash(); + Hash fromKey = _message.getFrom(); if (_log.shouldLog(Log.DEBUG)) { if (_message.getReplyTunnel() != null) @@ -69,8 +70,13 @@ public class HandleDatabaseLookupMessageJob extends JobImpl { + " (tunnel " + _message.getReplyTunnel() + ")"); } - // might as well grab what they sent us - getContext().netDb().store(fromKey, _message.getFrom()); + if (getContext().netDb().lookupRouterInfoLocally(_message.getFrom()) == null) { + // hmm, perhaps don't always send a lookup for this... + // but for now, wtf, why not. we may even want to adjust it so that + // we penalize or benefit peers who send us that which we can or + // cannot lookup + getContext().netDb().lookupRouterInfo(_message.getFrom(), null, null, REPLY_TIMEOUT); + } // whatdotheywant? handleRequest(fromKey); @@ -130,11 +136,10 @@ public class HandleDatabaseLookupMessageJob extends JobImpl { DatabaseSearchReplyMessage msg = new DatabaseSearchReplyMessage(getContext()); msg.setFromHash(getContext().router().getRouterInfo().getIdentity().getHash()); msg.setSearchKey(key); - if (routerInfoSet.size() <= 0) { - // always include something, so lets toss ourselves in there - routerInfoSet.add(getContext().router().getRouterInfo()); + for (Iterator iter = routerInfoSet.iterator(); iter.hasNext(); ) { + RouterInfo peer = (RouterInfo)iter.next(); + msg.addReply(peer.getIdentity().getHash()); } - msg.addReplies(routerInfoSet); getContext().statManager().addRateData("netDb.lookupsHandled", 1, 0); sendMessage(msg, toPeer, replyTunnel); // should this go via garlic messages instead? } @@ -146,7 +151,7 @@ public class HandleDatabaseLookupMessageJob extends JobImpl { } else { if (_log.shouldLog(Log.DEBUG)) _log.debug("Sending reply directly to " + toPeer); - send = new SendMessageDirectJob(getContext(), message, toPeer, REPLY_TIMEOUT+getContext().clock().now(), MESSAGE_PRIORITY); + send = new SendMessageDirectJob(getContext(), message, toPeer, REPLY_TIMEOUT, MESSAGE_PRIORITY); } getContext().netDb().lookupRouterInfo(toPeer, send, null, REPLY_TIMEOUT); @@ -186,7 +191,7 @@ public class HandleDatabaseLookupMessageJob extends JobImpl { msg.setData(baos.toByteArray()); msg.setTunnelId(replyTunnel); msg.setMessageExpiration(new Date(expiration)); - getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), msg, toPeer, null, null, null, null, expiration, MESSAGE_PRIORITY)); + getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), msg, toPeer, null, null, null, null, REPLY_TIMEOUT, MESSAGE_PRIORITY)); String bodyType = message.getClass().getName(); getContext().messageHistory().wrap(bodyType, message.getUniqueId(), TunnelMessage.class.getName(), msg.getUniqueId()); diff --git a/router/java/src/net/i2p/router/networkdb/HandleDatabaseSearchReplyMessageJob.java b/router/java/src/net/i2p/router/networkdb/HandleDatabaseSearchReplyMessageJob.java deleted file mode 100644 index e3c90dda0..000000000 --- a/router/java/src/net/i2p/router/networkdb/HandleDatabaseSearchReplyMessageJob.java +++ /dev/null @@ -1,74 +0,0 @@ -package net.i2p.router.networkdb; -/* - * free (adj.): unencumbered; not under the control of others - * Written by jrandom in 2003 and released into the public domain - * with no warranty of any kind, either expressed or implied. - * It probably won't make your computer catch on fire, or eat - * your children, but it might. Use at your own risk. - * - */ - -import net.i2p.data.Hash; -import net.i2p.data.RouterIdentity; -import net.i2p.data.RouterInfo; -import net.i2p.data.i2np.DatabaseSearchReplyMessage; -import net.i2p.router.JobImpl; -import net.i2p.router.RouterContext; -import net.i2p.util.Log; - -/** - * Receive DatabaseSearchReplyMessage data and store it in the local net db - * - */ -public class HandleDatabaseSearchReplyMessageJob extends JobImpl { - private Log _log; - private DatabaseSearchReplyMessage _message; - private RouterIdentity _from; - private Hash _fromHash; - - public HandleDatabaseSearchReplyMessageJob(RouterContext context, DatabaseSearchReplyMessage receivedMessage, RouterIdentity from, Hash fromHash) { - super(context); - _log = context.logManager().getLog(HandleDatabaseSearchReplyMessageJob.class); - _message = receivedMessage; - _from = from; - _fromHash = fromHash; - } - - public void runJob() { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Handling database search reply message for key " + _message.getSearchKey().toBase64() + " with " + _message.getNumReplies() + " replies"); - if (_message.getNumReplies() > 0) - getContext().jobQueue().addJob(new HandlePeerJob(0)); - } - - /** - * Partial job - take each reply entry, store it, then requeue again until all - * of the entries are stored. This prevents a single reply from swamping the jobqueue - * - */ - private final class HandlePeerJob extends JobImpl { - private int _curReply; - public HandlePeerJob(int reply) { - super(HandleDatabaseSearchReplyMessageJob.this.getContext()); - _curReply = reply; - } - public void runJob() { - boolean remaining = handle(); - if (remaining) - requeue(0); - } - - private boolean handle() { - RouterInfo info = _message.getReply(_curReply); - if (_log.shouldLog(Log.INFO)) - _log.info("On search for " + _message.getSearchKey().toBase64() + ", received " + info.getIdentity().getHash().toBase64()); - - HandlePeerJob.this.getContext().netDb().store(info.getIdentity().getHash(), info); - _curReply++; - return _message.getNumReplies() > _curReply; - } - public String getName() { return "Handle search reply value"; } - } - - public String getName() { return "Handle Database Search Reply Message"; } -} diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/ExploreJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/ExploreJob.java index a3420a259..50087b10d 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/ExploreJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/ExploreJob.java @@ -70,7 +70,7 @@ class ExploreJob extends SearchJob { protected DatabaseLookupMessage buildMessage(TunnelId replyTunnelId, RouterInfo replyGateway, long expiration) { DatabaseLookupMessage msg = new DatabaseLookupMessage(getContext()); msg.setSearchKey(getState().getTarget()); - msg.setFrom(replyGateway); + msg.setFrom(replyGateway.getIdentity().getHash()); msg.setDontIncludePeers(getState().getAttempted()); msg.setMessageExpiration(new Date(expiration)); msg.setReplyTunnel(replyTunnelId); diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java b/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java index ca2b02c58..1f90e7db2 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java @@ -202,7 +202,6 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { _context.inNetMessagePool().registerHandlerJobBuilder(DatabaseLookupMessage.MESSAGE_TYPE, new DatabaseLookupMessageHandler(_context)); _context.inNetMessagePool().registerHandlerJobBuilder(DatabaseStoreMessage.MESSAGE_TYPE, new DatabaseStoreMessageHandler(_context)); - _context.inNetMessagePool().registerHandlerJobBuilder(DatabaseSearchReplyMessage.MESSAGE_TYPE, new DatabaseSearchReplyMessageHandler(_context)); _initialized = true; _started = System.currentTimeMillis(); @@ -249,9 +248,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { if (_log.shouldLog(Log.INFO)) _log.info("Selected hash " + rhash.toBase64() + " is not stored locally"); } else if ( !(ds instanceof RouterInfo) ) { - // could be a LeaseSet - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Selected router hash " + rhash.toBase64() + " is NOT a routerInfo!"); + // leaseSet } else { rv.add(ds); } @@ -274,8 +271,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { if (_log.shouldLog(Log.INFO)) _log.info("Selected hash " + key.toBase64() + " is not stored locally"); } else if ( !(ds instanceof RouterInfo) ) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Selected router hash [" + key.toBase64() + "] is NOT a routerInfo!"); + // leaseSet } else { if (_log.shouldLog(Log.DEBUG)) _log.debug("getAllRouters(): key is router: " + key.toBase64()); diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java index 6f0cdff20..a5209bbab 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java @@ -53,7 +53,7 @@ class SearchJob extends JobImpl { * How long will we give each peer to reply to our search? * */ - private static final long PER_PEER_TIMEOUT = 10*1000; + private static final int PER_PEER_TIMEOUT = 10*1000; /** * give ourselves 30 seconds to send out the value found to the closest @@ -283,15 +283,14 @@ class SearchJob extends JobImpl { if (_log.shouldLog(Log.DEBUG)) _log.debug(getJobId() + ": Sending leaseSet search to " + router.getIdentity().getHash().toBase64() + " for " + msg.getSearchKey().toBase64() + " w/ replies through [" - + msg.getFrom().getIdentity().getHash().toBase64() + "] via tunnel [" + + msg.getFrom().toBase64() + "] via tunnel [" + msg.getReplyTunnel() + "]"); SearchMessageSelector sel = new SearchMessageSelector(getContext(), router, _expiration, _state); - long timeoutMs = PER_PEER_TIMEOUT; // getTimeoutMs(); SearchUpdateReplyFoundJob reply = new SearchUpdateReplyFoundJob(getContext(), router, _state, _facade, this); SendTunnelMessageJob j = new SendTunnelMessageJob(getContext(), msg, outTunnelId, router.getIdentity().getHash(), null, null, reply, new FailedJob(router), sel, - timeoutMs, SEARCH_PRIORITY); + PER_PEER_TIMEOUT, SEARCH_PRIORITY); getContext().jobQueue().addJob(j); } @@ -304,12 +303,11 @@ class SearchJob extends JobImpl { if (_log.shouldLog(Log.INFO)) _log.info(getJobId() + ": Sending router search to " + router.getIdentity().getHash().toBase64() + " for " + msg.getSearchKey().toBase64() + " w/ replies to us [" - + msg.getFrom().getIdentity().getHash().toBase64() + "]"); + + msg.getFrom().toBase64() + "]"); SearchMessageSelector sel = new SearchMessageSelector(getContext(), router, _expiration, _state); - long timeoutMs = PER_PEER_TIMEOUT; SearchUpdateReplyFoundJob reply = new SearchUpdateReplyFoundJob(getContext(), router, _state, _facade, this); SendMessageDirectJob j = new SendMessageDirectJob(getContext(), msg, router.getIdentity().getHash(), - reply, new FailedJob(router), sel, expiration, SEARCH_PRIORITY); + reply, new FailedJob(router), sel, PER_PEER_TIMEOUT, SEARCH_PRIORITY); getContext().jobQueue().addJob(j); } @@ -356,7 +354,7 @@ class SearchJob extends JobImpl { protected DatabaseLookupMessage buildMessage(TunnelId replyTunnelId, RouterInfo replyGateway, long expiration) { DatabaseLookupMessage msg = new DatabaseLookupMessage(getContext()); msg.setSearchKey(_state.getTarget()); - msg.setFrom(replyGateway); + msg.setFrom(replyGateway.getIdentity().getHash()); msg.setDontIncludePeers(_state.getAttempted()); msg.setMessageExpiration(new Date(expiration)); msg.setReplyTunnel(replyTunnelId); @@ -371,7 +369,7 @@ class SearchJob extends JobImpl { protected DatabaseLookupMessage buildMessage(long expiration) { DatabaseLookupMessage msg = new DatabaseLookupMessage(getContext()); msg.setSearchKey(_state.getTarget()); - msg.setFrom(getContext().router().getRouterInfo()); + msg.setFrom(getContext().routerHash()); msg.setDontIncludePeers(_state.getAttempted()); msg.setMessageExpiration(new Date(expiration)); msg.setReplyTunnel(null); @@ -420,25 +418,28 @@ class SearchJob extends JobImpl { if (_newPeers > 0) newPeersFound(_newPeers); } else { - RouterInfo ri = _msg.getReply(_curIndex); - if (ri.isValid()) { - if (_state.wasAttempted(ri.getIdentity().getHash())) { - _duplicatePeers++; - } - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getJobId() + ": dbSearchReply received on search containing router " - + ri.getIdentity().getHash() + " with publishDate of " - + new Date(ri.getPublished())); - _facade.store(ri.getIdentity().getHash(), ri); - if (_facade.getKBuckets().add(ri.getIdentity().getHash())) - _newPeers++; - else - _seenPeers++; - } else { - if (_log.shouldLog(Log.ERROR)) - _log.error(getJobId() + ": Received an invalid peer from " + _peer + ": " + ri); - _invalidPeers++; + Hash peer = _msg.getReply(_curIndex); + + RouterInfo info = getContext().netDb().lookupRouterInfoLocally(peer); + if (info == null) { + // hmm, perhaps don't always send a lookup for this... + // but for now, wtf, why not. we may even want to adjust it so that + // we penalize or benefit peers who send us that which we can or + // cannot lookup + getContext().netDb().lookupRouterInfo(peer, null, null, _timeoutMs); } + + if (_state.wasAttempted(peer)) { + _duplicatePeers++; + } + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getJobId() + ": dbSearchReply received on search referencing router " + + peer); + if (_facade.getKBuckets().add(peer)) + _newPeers++; + else + _seenPeers++; + _curIndex++; requeue(0); }