forked from I2P_Developers/i2p.i2p
* NetDB:
- Replace the old parallel lookup method with a true Kademlia lookup that iteratively queries additional floodfill peers returned in DatabaseSearchReplyMessages. This is a more efficient and reliable lookup that will work much better when not all floodfill peers are known, and it removes a serious limitation to network growth. - Limit max number of DSRM entries to look up - Cleanups, javadocs, log tweaks
This commit is contained in:
@@ -13,9 +13,9 @@ import net.i2p.util.Log;
|
||||
|
||||
class FloodOnlyLookupMatchJob extends JobImpl implements ReplyJob {
|
||||
private final Log _log;
|
||||
private final FloodOnlySearchJob _search;
|
||||
private final FloodSearchJob _search;
|
||||
|
||||
public FloodOnlyLookupMatchJob(RouterContext ctx, FloodOnlySearchJob job) {
|
||||
public FloodOnlyLookupMatchJob(RouterContext ctx, FloodSearchJob job) {
|
||||
super(ctx);
|
||||
_log = ctx.logManager().getLog(getClass());
|
||||
_search = job;
|
||||
@@ -33,7 +33,7 @@ class FloodOnlyLookupMatchJob extends JobImpl implements ReplyJob {
|
||||
}
|
||||
}
|
||||
|
||||
public String getName() { return "NetDb flood search (phase 1) match"; }
|
||||
public String getName() { return "NetDb flood search match"; }
|
||||
|
||||
public void setMessage(I2NPMessage message) {
|
||||
if (message instanceof DatabaseSearchReplyMessage) {
|
||||
|
@@ -7,6 +7,9 @@ import net.i2p.router.MessageSelector;
|
||||
import net.i2p.router.RouterContext;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
/**
|
||||
* Mostly replaced by IterativeLookupSelector
|
||||
*/
|
||||
class FloodOnlyLookupSelector implements MessageSelector {
|
||||
private final RouterContext _context;
|
||||
private final FloodOnlySearchJob _search;
|
||||
|
@@ -4,10 +4,13 @@ import net.i2p.router.JobImpl;
|
||||
import net.i2p.router.RouterContext;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
/**
|
||||
* This is the timeout for the whole search.
|
||||
*/
|
||||
class FloodOnlyLookupTimeoutJob extends JobImpl {
|
||||
private final FloodSearchJob _search;
|
||||
|
||||
public FloodOnlyLookupTimeoutJob(RouterContext ctx, FloodOnlySearchJob job) {
|
||||
public FloodOnlyLookupTimeoutJob(RouterContext ctx, FloodSearchJob job) {
|
||||
super(ctx);
|
||||
_search = job;
|
||||
}
|
||||
@@ -19,5 +22,5 @@ class FloodOnlyLookupTimeoutJob extends JobImpl {
|
||||
_search.failed();
|
||||
}
|
||||
|
||||
public String getName() { return "NetDb flood search (phase 1) timeout"; }
|
||||
public String getName() { return "NetDb flood search timeout"; }
|
||||
}
|
||||
|
@@ -14,10 +14,12 @@ import net.i2p.router.OutNetMessage;
|
||||
import net.i2p.router.ReplyJob;
|
||||
import net.i2p.router.RouterContext;
|
||||
import net.i2p.router.TunnelInfo;
|
||||
import net.i2p.router.peermanager.PeerProfile;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
/**
|
||||
* Uunused directly, replaced by IterativeSearchJob, but still extended by
|
||||
* SingleSearchJob.
|
||||
*
|
||||
* Try sending a search to some floodfill peers, failing completely if we don't get
|
||||
* a match from one of those peers, with no fallback to the kademlia search
|
||||
*
|
||||
@@ -34,11 +36,8 @@ import net.i2p.util.Log;
|
||||
* These enhancements allow the router to bootstrap back into the network
|
||||
* after it loses (or never had) floodfill references, as long as it
|
||||
* knows one peer that is up.
|
||||
*
|
||||
*/
|
||||
class FloodOnlySearchJob extends FloodSearchJob {
|
||||
private volatile boolean _dead;
|
||||
protected final long _created;
|
||||
private boolean _shouldProcessDSRM;
|
||||
private final HashSet<Hash> _unheardFrom;
|
||||
|
||||
@@ -49,23 +48,19 @@ class FloodOnlySearchJob extends FloodSearchJob {
|
||||
protected final Job _onTimeout;
|
||||
|
||||
private static final int MIN_FOR_NO_DSRM = 4;
|
||||
private static final long SINGLE_SEARCH_MSG_TIME = 10*1000;
|
||||
|
||||
public FloodOnlySearchJob(RouterContext ctx, FloodfillNetworkDatabaseFacade facade, Hash key, Job onFind, Job onFailed, int timeoutMs, boolean isLease) {
|
||||
super(ctx, facade, key, onFind, onFailed, timeoutMs, isLease);
|
||||
// these override the settings in super
|
||||
_timeoutMs = Math.min(timeoutMs, SearchJob.PER_FLOODFILL_PEER_TIMEOUT);
|
||||
_expiration = _timeoutMs + ctx.clock().now();
|
||||
_origExpiration = _timeoutMs + ctx.clock().now();
|
||||
_unheardFrom = new HashSet(CONCURRENT_SEARCHES);
|
||||
_replySelector = new FloodOnlyLookupSelector(getContext(), this);
|
||||
_onReply = new FloodOnlyLookupMatchJob(getContext(), this);
|
||||
_onTimeout = new FloodOnlyLookupTimeoutJob(getContext(), this);
|
||||
_created = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
/** System time, NOT context time */
|
||||
public long getCreated() { return _created; }
|
||||
|
||||
public boolean shouldProcessDSRM() { return _shouldProcessDSRM; }
|
||||
|
||||
@Override
|
||||
@@ -174,12 +169,12 @@ class FloodOnlySearchJob extends FloodSearchJob {
|
||||
_unheardFrom.add(peer);
|
||||
}
|
||||
dlm.setFrom(replyTunnel.getPeer(0));
|
||||
dlm.setMessageExpiration(getContext().clock().now()+10*1000);
|
||||
dlm.setMessageExpiration(getContext().clock().now() + SINGLE_SEARCH_MSG_TIME);
|
||||
dlm.setReplyTunnel(replyTunnel.getReceiveTunnelId(0));
|
||||
dlm.setSearchKey(_key);
|
||||
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info(getJobId() + ": Floodfill search for " + _key.toBase64() + " to " + peer.toBase64());
|
||||
_log.info(getJobId() + ": Floodfill search for " + _key + " to " + peer);
|
||||
getContext().tunnelDispatcher().dispatchOutbound(dlm, outTunnel.getSendTunnelId(0), peer);
|
||||
count++;
|
||||
_lookupsRemaining++;
|
||||
@@ -187,14 +182,14 @@ class FloodOnlySearchJob extends FloodSearchJob {
|
||||
|
||||
if (count <= 0) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info(getJobId() + ": Floodfill search for " + _key.toBase64() + " had no peers to send to");
|
||||
_log.info(getJobId() + ": Floodfill search for " + _key + " had no peers to send to");
|
||||
// no floodfill peers, fail
|
||||
failed();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() { return "NetDb flood search (phase 1)"; }
|
||||
public String getName() { return "NetDb flood search"; }
|
||||
|
||||
/**
|
||||
* Note that we heard from the peer
|
||||
@@ -215,15 +210,17 @@ class FloodOnlySearchJob extends FloodSearchJob {
|
||||
_dead = true;
|
||||
}
|
||||
getContext().messageRegistry().unregisterPending(_out);
|
||||
int timeRemaining = (int)(_origExpiration - getContext().clock().now());
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info(getJobId() + ": Floodfill search for " + _key.toBase64() + " failed with " + timeRemaining + " remaining after " + (System.currentTimeMillis()-_created));
|
||||
long time = System.currentTimeMillis() - _created;
|
||||
if (_log.shouldLog(Log.INFO)) {
|
||||
int timeRemaining = (int)(_expiration - getContext().clock().now());
|
||||
_log.info(getJobId() + ": Floodfill search for " + _key + " failed with " + timeRemaining + " remaining after " + time);
|
||||
}
|
||||
synchronized(_unheardFrom) {
|
||||
for (Iterator<Hash> iter = _unheardFrom.iterator(); iter.hasNext(); )
|
||||
getContext().profileManager().dbLookupFailed(iter.next());
|
||||
}
|
||||
_facade.complete(_key);
|
||||
getContext().statManager().addRateData("netDb.failedTime", System.currentTimeMillis()-_created, System.currentTimeMillis()-_created);
|
||||
getContext().statManager().addRateData("netDb.failedTime", time, 0);
|
||||
synchronized (_onFailed) {
|
||||
for (int i = 0; i < _onFailed.size(); i++) {
|
||||
Job j = _onFailed.remove(0);
|
||||
@@ -239,7 +236,7 @@ class FloodOnlySearchJob extends FloodSearchJob {
|
||||
_dead = true;
|
||||
}
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info(getJobId() + ": Floodfill search for " + _key.toBase64() + " successful");
|
||||
_log.info(getJobId() + ": Floodfill search for " + _key + " successful");
|
||||
// Sadly, we don't know which of the two replied, unless the first one sent a DSRM
|
||||
// before the second one sent the answer, which isn't that likely.
|
||||
// Would be really nice to fix this, but it isn't clear how unless CONCURRENT_SEARCHES == 1.
|
||||
@@ -248,14 +245,15 @@ class FloodOnlySearchJob extends FloodSearchJob {
|
||||
// We'll have to rely primarily on other searches (ExploreJob which calls SearchJob,
|
||||
// and FloodfillVerifyStoreJob) to record successful searches for now.
|
||||
// StoreJob also calls dbStoreSent() which updates the lastHeardFrom timer - this also helps.
|
||||
long time = System.currentTimeMillis() - _created;
|
||||
synchronized(_unheardFrom) {
|
||||
if (_unheardFrom.size() == 1) {
|
||||
Hash peer = _unheardFrom.iterator().next();
|
||||
getContext().profileManager().dbLookupSuccessful(peer, System.currentTimeMillis()-_created);
|
||||
getContext().profileManager().dbLookupSuccessful(peer, time);
|
||||
}
|
||||
}
|
||||
_facade.complete(_key);
|
||||
getContext().statManager().addRateData("netDb.successTime", System.currentTimeMillis()-_created, System.currentTimeMillis()-_created);
|
||||
getContext().statManager().addRateData("netDb.successTime", time, 0);
|
||||
synchronized (_onFind) {
|
||||
while (!_onFind.isEmpty())
|
||||
getContext().jobQueue().addJob(_onFind.remove(0));
|
||||
|
@@ -36,10 +36,10 @@ public class FloodSearchJob extends JobImpl {
|
||||
protected final List<Job> _onFailed;
|
||||
protected long _expiration;
|
||||
protected int _timeoutMs;
|
||||
protected long _origExpiration;
|
||||
protected final boolean _isLease;
|
||||
protected volatile int _lookupsRemaining;
|
||||
protected volatile boolean _dead;
|
||||
protected final long _created;
|
||||
|
||||
public FloodSearchJob(RouterContext ctx, FloodfillNetworkDatabaseFacade facade, Hash key, Job onFind, Job onFailed, int timeoutMs, boolean isLease) {
|
||||
super(ctx);
|
||||
@@ -55,10 +55,16 @@ public class FloodSearchJob extends JobImpl {
|
||||
timeout = timeoutMs;
|
||||
_timeoutMs = timeout;
|
||||
_expiration = timeout + ctx.clock().now();
|
||||
_origExpiration = timeoutMs + ctx.clock().now();
|
||||
_isLease = isLease;
|
||||
_created = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
/** System time, NOT context time */
|
||||
public long getCreated() { return _created; }
|
||||
|
||||
/**
|
||||
* Add jobs to an existing search
|
||||
*/
|
||||
void addDeferred(Job onFind, Job onFailed, long timeoutMs, boolean isLease) {
|
||||
if (_dead) {
|
||||
getContext().jobQueue().addJob(onFailed);
|
||||
@@ -75,7 +81,12 @@ public class FloodSearchJob extends JobImpl {
|
||||
private static final int FLOOD_SEARCH_TIME_FACTOR = 2;
|
||||
private static final int FLOOD_SEARCH_TIME_MIN = 30*1000;
|
||||
|
||||
/**
|
||||
* Deprecated, unused, see FOSJ override
|
||||
*/
|
||||
public void runJob() {
|
||||
throw new UnsupportedOperationException("use override");
|
||||
/****
|
||||
// pick some floodfill peers and send out the searches
|
||||
List floodfillPeers = _facade.getFloodfillPeers();
|
||||
FloodLookupSelector replySelector = new FloodLookupSelector(getContext(), this);
|
||||
@@ -121,8 +132,12 @@ public class FloodSearchJob extends JobImpl {
|
||||
getContext().messageRegistry().unregisterPending(out);
|
||||
_facade.searchFull(_key, _onFind, _onFailed, _timeoutMs*FLOOD_SEARCH_TIME_FACTOR, _isLease);
|
||||
}
|
||||
****/
|
||||
}
|
||||
|
||||
/**
|
||||
* Deprecated, unused, see FOSJ override
|
||||
*/
|
||||
public String getName() { return "NetDb search (phase 1)"; }
|
||||
|
||||
protected Hash getKey() { return _key; }
|
||||
@@ -139,10 +154,15 @@ public class FloodSearchJob extends JobImpl {
|
||||
|
||||
protected int getLookupsRemaining() { return _lookupsRemaining; }
|
||||
|
||||
/**
|
||||
* Deprecated, unused, see FOSJ override
|
||||
*/
|
||||
void failed() {
|
||||
throw new UnsupportedOperationException("use override");
|
||||
/****
|
||||
if (_dead) return;
|
||||
_dead = true;
|
||||
int timeRemaining = (int)(_origExpiration - getContext().clock().now());
|
||||
int timeRemaining = (int)(_expiration - getContext().clock().now());
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info(getJobId() + ": Floodfill search for " + _key.toBase64() + " failed with " + timeRemaining);
|
||||
if (timeRemaining > 0) {
|
||||
@@ -156,9 +176,15 @@ public class FloodSearchJob extends JobImpl {
|
||||
while (!removed.isEmpty())
|
||||
getContext().jobQueue().addJob(removed.remove(0));
|
||||
}
|
||||
****/
|
||||
}
|
||||
|
||||
/**
|
||||
* Deprecated, unused, see FOSJ override
|
||||
*/
|
||||
void success() {
|
||||
throw new UnsupportedOperationException("use override");
|
||||
/****
|
||||
if (_dead) return;
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info(getJobId() + ": Floodfill search for " + _key.toBase64() + " successful");
|
||||
@@ -171,8 +197,13 @@ public class FloodSearchJob extends JobImpl {
|
||||
}
|
||||
while (!removed.isEmpty())
|
||||
getContext().jobQueue().addJob(removed.remove(0));
|
||||
****/
|
||||
}
|
||||
|
||||
/**
|
||||
* Deprecated, unused, see FOSJ override
|
||||
*/
|
||||
/****
|
||||
private static class FloodLookupTimeoutJob extends JobImpl {
|
||||
private FloodSearchJob _search;
|
||||
public FloodLookupTimeoutJob(RouterContext ctx, FloodSearchJob job) {
|
||||
@@ -186,7 +217,12 @@ public class FloodSearchJob extends JobImpl {
|
||||
}
|
||||
public String getName() { return "NetDb search (phase 1) timeout"; }
|
||||
}
|
||||
****/
|
||||
|
||||
/**
|
||||
* Deprecated, unused, see FOSJ override
|
||||
*/
|
||||
/****
|
||||
private static class FloodLookupMatchJob extends JobImpl implements ReplyJob {
|
||||
private Log _log;
|
||||
private FloodSearchJob _search;
|
||||
@@ -211,7 +247,12 @@ public class FloodSearchJob extends JobImpl {
|
||||
public String getName() { return "NetDb search (phase 1) match"; }
|
||||
public void setMessage(I2NPMessage message) {}
|
||||
}
|
||||
****/
|
||||
|
||||
/**
|
||||
* Deprecated, unused, see FOSJ override
|
||||
*/
|
||||
/****
|
||||
private static class FloodLookupSelector implements MessageSelector {
|
||||
private RouterContext _context;
|
||||
private FloodSearchJob _search;
|
||||
@@ -240,4 +281,5 @@ public class FloodSearchJob extends JobImpl {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
****/
|
||||
}
|
||||
|
@@ -49,7 +49,8 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
|
||||
|
||||
_context.statManager().createRequiredRateStat("netDb.successTime", "Time for successful lookup (ms)", "NetworkDatabase", new long[] { 60*60*1000l, 24*60*60*1000l });
|
||||
_context.statManager().createRateStat("netDb.failedTime", "How long a failed search takes", "NetworkDatabase", new long[] { 60*60*1000l, 24*60*60*1000l });
|
||||
_context.statManager().createRateStat("netDb.failedAttemptedPeers", "How many peers we sent a search to when the search fails", "NetworkDatabase", new long[] { 60*1000l, 10*60*1000l });
|
||||
_context.statManager().createRateStat("netDb.retries", "How many additional queries for an iterative search", "NetworkDatabase", new long[] { 60*60*1000l });
|
||||
_context.statManager().createRateStat("netDb.failedAttemptedPeers", "How many peers we sent a search to when the search fails", "NetworkDatabase", new long[] { 10*60*1000l });
|
||||
_context.statManager().createRateStat("netDb.successPeers", "How many peers are contacted in a successful search", "NetworkDatabase", new long[] { 60*60*1000l, 24*60*60*1000l });
|
||||
_context.statManager().createRateStat("netDb.failedPeers", "How many peers fail to respond to a lookup?", "NetworkDatabase", new long[] { 60*60*1000l, 24*60*60*1000l });
|
||||
_context.statManager().createRateStat("netDb.searchCount", "Overall number of searches sent", "NetworkDatabase", new long[] { 5*60*1000l, 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
|
||||
@@ -211,6 +212,9 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
|
||||
return ((FloodfillNetworkDatabaseFacade)ctx.netDb()).floodfillEnabled();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param may be null, returns false if null
|
||||
*/
|
||||
public static boolean isFloodfill(RouterInfo peer) {
|
||||
if (peer == null) return false;
|
||||
// For testing or local networks... we will
|
||||
@@ -247,6 +251,7 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
|
||||
* will fire the appropriate jobs on success or timeout (or if the kademlia search completes
|
||||
* without any match)
|
||||
*
|
||||
* @return null always
|
||||
*/
|
||||
@Override
|
||||
SearchJob search(Hash key, Job onFindJob, Job onFailedLookupJob, long timeoutMs, boolean isLease) {
|
||||
@@ -258,7 +263,8 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
|
||||
searchJob = _activeFloodQueries.get(key);
|
||||
if (searchJob == null) {
|
||||
//if (SearchJob.onlyQueryFloodfillPeers(_context)) {
|
||||
searchJob = new FloodOnlySearchJob(_context, this, key, onFindJob, onFailedLookupJob, (int)timeoutMs, isLease);
|
||||
//searchJob = new FloodOnlySearchJob(_context, this, key, onFindJob, onFailedLookupJob, (int)timeoutMs, isLease);
|
||||
searchJob = new IterativeSearchJob(_context, this, key, onFindJob, onFailedLookupJob, (int)timeoutMs, isLease);
|
||||
//} else {
|
||||
// searchJob = new FloodSearchJob(_context, this, key, onFindJob, onFailedLookupJob, (int)timeoutMs, isLease);
|
||||
//}
|
||||
@@ -286,6 +292,7 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
|
||||
*
|
||||
* Unused - called only by FloodSearchJob which is overridden - don't use this.
|
||||
*/
|
||||
/*****
|
||||
void searchFull(Hash key, List<Job> onFind, List<Job> onFailed, long timeoutMs, boolean isLease) {
|
||||
synchronized (_activeFloodQueries) { _activeFloodQueries.remove(key); }
|
||||
|
||||
@@ -330,7 +337,11 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
|
||||
}
|
||||
}
|
||||
}
|
||||
*****/
|
||||
|
||||
/**
|
||||
* Must be called by the search job queued by search() on success or failure
|
||||
*/
|
||||
void complete(Hash key) {
|
||||
synchronized (_activeFloodQueries) { _activeFloodQueries.remove(key); }
|
||||
}
|
||||
|
@@ -47,8 +47,8 @@ public class HandleFloodfillDatabaseStoreMessageJob extends JobImpl {
|
||||
}
|
||||
|
||||
public void runJob() {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Handling database store message");
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("Handling database store message");
|
||||
|
||||
long recvBegin = System.currentTimeMillis();
|
||||
|
||||
|
@@ -0,0 +1,31 @@
|
||||
package net.i2p.router.networkdb.kademlia;
|
||||
|
||||
import net.i2p.data.Hash;
|
||||
import net.i2p.router.RouterContext;
|
||||
|
||||
/**
|
||||
* Ask a single peer for a single key.
|
||||
* This isn't really a flood-only search job at all, but we extend
|
||||
* FloodOnlySearchJob so we can use the same selectors, etc.
|
||||
*
|
||||
* Different from SingleSearchJob in that we tell the search to add it on success.
|
||||
*
|
||||
* @since 0.8.9
|
||||
*/
|
||||
class IterativeFollowupJob extends SingleSearchJob {
|
||||
private final IterativeSearchJob _search;
|
||||
|
||||
public IterativeFollowupJob(RouterContext ctx, Hash key, Hash to, IterativeSearchJob search) {
|
||||
super(ctx, key, to);
|
||||
_search = search;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() { return "Iterative search key from DSRM"; }
|
||||
|
||||
@Override
|
||||
void success() {
|
||||
_search.newPeerToTry(_key);
|
||||
super.success();
|
||||
}
|
||||
}
|
@@ -0,0 +1,114 @@
|
||||
package net.i2p.router.networkdb.kademlia;
|
||||
|
||||
import net.i2p.data.Hash;
|
||||
import net.i2p.data.RouterInfo;
|
||||
import net.i2p.data.i2np.DatabaseSearchReplyMessage;
|
||||
import net.i2p.data.i2np.DatabaseStoreMessage;
|
||||
import net.i2p.data.i2np.I2NPMessage;
|
||||
import net.i2p.router.MessageSelector;
|
||||
import net.i2p.router.RouterContext;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
/**
|
||||
* Slightly modified version of FloodOnlyLookupSelector,
|
||||
* plus it incorporates the functions of SingleLookupJob inline.
|
||||
* Always follows the DSRM entries.
|
||||
*
|
||||
* @since 0.8.9
|
||||
*/
|
||||
class IterativeLookupSelector implements MessageSelector {
|
||||
private final RouterContext _context;
|
||||
private final IterativeSearchJob _search;
|
||||
private boolean _matchFound;
|
||||
private final Log _log;
|
||||
|
||||
public IterativeLookupSelector(RouterContext ctx, IterativeSearchJob search) {
|
||||
_context = ctx;
|
||||
_search = search;
|
||||
_log = ctx.logManager().getLog(getClass());
|
||||
}
|
||||
|
||||
public boolean continueMatching() {
|
||||
// don't use remaining searches count
|
||||
return (!_matchFound) && _context.clock().now() < getExpiration();
|
||||
}
|
||||
|
||||
public long getExpiration() { return (_matchFound ? -1 : _search.getExpiration()); }
|
||||
|
||||
/**
|
||||
* This only returns true for DSMs, not for DSRMs.
|
||||
*/
|
||||
public boolean isMatch(I2NPMessage message) {
|
||||
if (message == null) return false;
|
||||
if (message instanceof DatabaseStoreMessage) {
|
||||
DatabaseStoreMessage dsm = (DatabaseStoreMessage)message;
|
||||
// is it worth making sure the reply came in on the right tunnel?
|
||||
if (_search.getKey().equals(dsm.getKey())) {
|
||||
_matchFound = true;
|
||||
return true;
|
||||
}
|
||||
} else if (message instanceof DatabaseSearchReplyMessage) {
|
||||
DatabaseSearchReplyMessage dsrm = (DatabaseSearchReplyMessage)message;
|
||||
if (_search.getKey().equals(dsrm.getSearchKey())) {
|
||||
|
||||
// TODO - dsrm.getFromHash() can't be trusted - check against the list of
|
||||
// those we sent the search to in _search ?
|
||||
Hash from = dsrm.getFromHash();
|
||||
|
||||
// Moved from FloodOnlyLookupMatchJob so it is called for all replies
|
||||
// rather than just the last one
|
||||
// Got a netDb reply pointing us at other floodfills...
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info(_search.getJobId() + ": Processing DSRM via IterativeLookupJobs, apparently from " + from);
|
||||
|
||||
// Chase the hashes from the reply
|
||||
// 255 max, see comments in SingleLookupJob
|
||||
int limit = Math.min(dsrm.getNumReplies(), SingleLookupJob.MAX_TO_FOLLOW);
|
||||
int newPeers = 0;
|
||||
int oldPeers = 0;
|
||||
int invalidPeers = 0;
|
||||
for (int i = 0; i < limit; i++) {
|
||||
Hash peer = dsrm.getReply(i);
|
||||
if (peer.equals(_context.routerHash())) {
|
||||
// us
|
||||
oldPeers++;
|
||||
continue;
|
||||
}
|
||||
if (peer.equals(from)) {
|
||||
// wtf
|
||||
invalidPeers++;
|
||||
continue;
|
||||
}
|
||||
RouterInfo ri = _context.netDb().lookupRouterInfoLocally(peer);
|
||||
if (ri == null) {
|
||||
// get the RI from the peer that told us about it
|
||||
_context.jobQueue().addJob(new IterativeFollowupJob(_context, peer, from, _search));
|
||||
newPeers++;
|
||||
} else if (ri.getPublished() < _context.clock().now() - 60*60*1000 ||
|
||||
!FloodfillNetworkDatabaseFacade.isFloodfill(ri)) {
|
||||
// get an updated RI from the (now ff?) peer
|
||||
_context.jobQueue().addJob(new IterativeFollowupJob(_context, peer, peer, _search));
|
||||
oldPeers++;
|
||||
} else {
|
||||
// add it to the sorted queue
|
||||
// this will check if we have already tried it
|
||||
// should we really add? if we know about it but skipped it,
|
||||
// it was for some reason?
|
||||
_search.newPeerToTry(peer);
|
||||
oldPeers++;
|
||||
}
|
||||
}
|
||||
long timeSent = _search.timeSent(from);
|
||||
// assume 0 dup
|
||||
if (timeSent > 0) {
|
||||
_context.profileManager().dbLookupReply(from, newPeers, oldPeers, invalidPeers, 0,
|
||||
_context.clock().now() - timeSent);
|
||||
}
|
||||
|
||||
_search.failed(dsrm.getFromHash(), false);
|
||||
// fall through, always return false, we do not wish the match job to be called
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
@@ -0,0 +1,364 @@
|
||||
package net.i2p.router.networkdb.kademlia;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.SortedSet;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import net.i2p.data.DataHelper;
|
||||
import net.i2p.data.Hash;
|
||||
import net.i2p.data.RouterInfo;
|
||||
import net.i2p.data.i2np.DatabaseLookupMessage;
|
||||
import net.i2p.router.Job;
|
||||
import net.i2p.router.MessageSelector;
|
||||
import net.i2p.router.OutNetMessage;
|
||||
import net.i2p.router.ReplyJob;
|
||||
import net.i2p.router.RouterContext;
|
||||
import net.i2p.router.TunnelInfo;
|
||||
import net.i2p.router.util.RandomIterator;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
/**
|
||||
* A traditional Kademlia search that continues to search
|
||||
* when the initial lookup fails, by iteratively searching the
|
||||
* closer-to-the-key peers returned by the query in a DSRM.
|
||||
*
|
||||
* Unlike traditional Kad, it doesn't stop when there are no
|
||||
* closer keys, it keeps going until the timeout or max number
|
||||
* of searches is reached.
|
||||
*
|
||||
* Differences from FloodOnlySearchJob:
|
||||
* Chases peers in DSRM's immediately.
|
||||
* FOSJ searches the two closest in parallel and then stops.
|
||||
* There is no per-search timeout, only a total timeout.
|
||||
* Here, we search one at a time, and must have a separate per-search timeout.
|
||||
*
|
||||
* Advantages: Much more robust than FOSJ, especially in a large network
|
||||
* where not all floodfills are known. Longer total timeout.
|
||||
* Halves search traffic for successful searches, as this doesn't do
|
||||
* two sesarches in parallel like FOSJ does.
|
||||
*
|
||||
* @since 0.8.9
|
||||
*/
|
||||
class IterativeSearchJob extends FloodSearchJob {
|
||||
/** peers not sent to yet, sorted closest-to-the-routing-key */
|
||||
private final SortedSet<Hash> _toTry;
|
||||
/** query sent, no reply yet */
|
||||
private final Set<Hash> _unheardFrom;
|
||||
/** query sent, failed, timed out, or got DSRM */
|
||||
private final Set<Hash> _failedPeers;
|
||||
/** the time the query was sent to a peer, which we need to update profiles correctly */
|
||||
private final Map<Hash, Long> _sentTime;
|
||||
/** the routing key */
|
||||
private final Hash _rkey;
|
||||
/** this is a marker to register with the MessageRegistry, it is never sent */
|
||||
private OutNetMessage _out;
|
||||
|
||||
private static final int MAX_NON_FF = 3;
|
||||
/** Max number of peers to query */
|
||||
private static final int TOTAL_SEARCH_LIMIT = 8;
|
||||
/** TOTAL_SEARCH_LIMIT * SINGLE_SEARCH_TIME, plus some extra */
|
||||
private static final int MAX_SEARCH_TIME = 30*1000;
|
||||
/**
|
||||
* The time before we give up and start a new search - much shorter than the message's expire time
|
||||
* Longer than the typ. response time of 1.0 - 1.5 sec, but short enough that we move
|
||||
* on to another peer quickly.
|
||||
*/
|
||||
private static final long SINGLE_SEARCH_TIME = 3*1000;
|
||||
/** the actual expire time for a search message */
|
||||
private static final long SINGLE_SEARCH_MSG_TIME = 10*1000;
|
||||
/**
|
||||
* Use instead of CONCURRENT_SEARCHES in super() which is final.
|
||||
* For now, we don't do concurrent, but we keep SINGLE_SEARCH_TIME very short,
|
||||
* so we have effective concurrency in that we fail a search quickly.
|
||||
*/
|
||||
private static final int MAX_CONCURRENT = 1;
|
||||
|
||||
public IterativeSearchJob(RouterContext ctx, FloodfillNetworkDatabaseFacade facade, Hash key, Job onFind, Job onFailed, int timeoutMs, boolean isLease) {
|
||||
super(ctx, facade, key, onFind, onFailed, timeoutMs, isLease);
|
||||
// these override the settings in super
|
||||
_timeoutMs = Math.min(timeoutMs, MAX_SEARCH_TIME);
|
||||
_expiration = _timeoutMs + ctx.clock().now();
|
||||
_rkey = ctx.routingKeyGenerator().getRoutingKey(key);
|
||||
_toTry = new TreeSet(new XORComparator(_rkey));
|
||||
_unheardFrom = new HashSet(CONCURRENT_SEARCHES);
|
||||
_failedPeers = new HashSet(TOTAL_SEARCH_LIMIT);
|
||||
_sentTime = new ConcurrentHashMap(TOTAL_SEARCH_LIMIT);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void runJob() {
|
||||
// pick some floodfill peers and send out the searches
|
||||
List<Hash> floodfillPeers;
|
||||
KBucketSet ks = _facade.getKBuckets();
|
||||
if (ks != null) {
|
||||
// Ideally we would add the key to an exclude list, so we don't try to query a ff peer for itself,
|
||||
// but we're passing the rkey not the key, so we do it below instead in certain cases.
|
||||
floodfillPeers = ((FloodfillPeerSelector)_facade.getPeerSelector()).selectFloodfillParticipants(_rkey, TOTAL_SEARCH_LIMIT, ks);
|
||||
} else {
|
||||
floodfillPeers = new ArrayList(TOTAL_SEARCH_LIMIT);
|
||||
}
|
||||
|
||||
if (floodfillPeers.isEmpty()) {
|
||||
// ask anybody, they may not return the answer but they will return a few ff peers we can go look up,
|
||||
// so this situation should be temporary
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Running netDb searches against the floodfill peers, but we don't know any");
|
||||
List<Hash> all = new ArrayList(_facade.getAllRouters());
|
||||
if (all.isEmpty()) {
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("We don't know any peers at all");
|
||||
failed();
|
||||
return;
|
||||
}
|
||||
Iterator<Hash> iter = new RandomIterator(all);
|
||||
// Limit non-FF to 3, because we don't sort the FFs ahead of the non-FFS,
|
||||
// so once we get some FFs we want to be sure to query them
|
||||
for (int i = 0; iter.hasNext() && i < MAX_NON_FF; i++) {
|
||||
floodfillPeers.add(iter.next());
|
||||
}
|
||||
}
|
||||
_toTry.addAll(floodfillPeers);
|
||||
// don't ask ourselves or the target
|
||||
_toTry.remove(getContext().routerHash());
|
||||
_toTry.remove(_key);
|
||||
if (_toTry.isEmpty()) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn(getJobId() + ": Iterative search for " + _key + " had no peers to send to");
|
||||
// no floodfill peers, fail
|
||||
failed();
|
||||
return;
|
||||
}
|
||||
// This OutNetMessage is never used or sent (setMessage() is never called), it's only
|
||||
// so we can register a reply selector.
|
||||
MessageSelector replySelector = new IterativeLookupSelector(getContext(), this);
|
||||
ReplyJob onReply = new FloodOnlyLookupMatchJob(getContext(), this);
|
||||
Job onTimeout = new FloodOnlyLookupTimeoutJob(getContext(), this);
|
||||
_out = getContext().messageRegistry().registerPending(replySelector, onReply, onTimeout, _timeoutMs);
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info(getJobId() + ": Iterative search for " + _key + " (rkey " + _rkey + ") timeout " +
|
||||
DataHelper.formatDuration(_timeoutMs) + " toTry: " + DataHelper.toString(_toTry));
|
||||
retry();
|
||||
}
|
||||
|
||||
/**
|
||||
* Send lookups to one or more peers, up to the configured concurrent and total limits
|
||||
*/
|
||||
private void retry() {
|
||||
long now = getContext().clock().now();
|
||||
if (_expiration < now) {
|
||||
failed();
|
||||
return;
|
||||
}
|
||||
if (_expiration - 500 < now) {
|
||||
// not enough time left to bother
|
||||
return;
|
||||
}
|
||||
while (true) {
|
||||
Hash peer;
|
||||
synchronized (this) {
|
||||
if (_dead) return;
|
||||
int pend = _unheardFrom.size();
|
||||
if (pend >= MAX_CONCURRENT)
|
||||
return;
|
||||
int done = _failedPeers.size();
|
||||
if (done >= TOTAL_SEARCH_LIMIT) {
|
||||
failed();
|
||||
return;
|
||||
}
|
||||
// even if pend and todo are empty, we don't fail, as there may be more peers
|
||||
// coming via newPeerToTry()
|
||||
if (done + pend >= TOTAL_SEARCH_LIMIT)
|
||||
return;
|
||||
if (_toTry.isEmpty())
|
||||
return;
|
||||
Iterator<Hash> iter = _toTry.iterator();
|
||||
peer = iter.next();
|
||||
iter.remove();
|
||||
_unheardFrom.add(peer);
|
||||
}
|
||||
sendQuery(peer);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a DLM to the peer
|
||||
*/
|
||||
private void sendQuery(Hash peer) {
|
||||
DatabaseLookupMessage dlm = new DatabaseLookupMessage(getContext(), true);
|
||||
TunnelInfo replyTunnel = getContext().tunnelManager().selectInboundTunnel();
|
||||
TunnelInfo outTunnel = getContext().tunnelManager().selectOutboundTunnel();
|
||||
if ( (replyTunnel == null) || (outTunnel == null) ) {
|
||||
failed();
|
||||
return;
|
||||
}
|
||||
|
||||
// As explained above, it's hard to keep the key itself out of the ff list,
|
||||
// so let's just skip it for now if the outbound tunnel is zero-hop.
|
||||
// Yes, that means we aren't doing double-lookup for a floodfill
|
||||
// if it happens to be closest to itself and we are using zero-hop exploratory tunnels.
|
||||
// If we don't, the OutboundMessageDistributor ends up logging erors for
|
||||
// not being able to send to the floodfill, if we don't have an older netdb entry.
|
||||
if (outTunnel.getLength() <= 1 && peer.equals(_key)) {
|
||||
failed(peer, false);
|
||||
return;
|
||||
}
|
||||
|
||||
dlm.setFrom(replyTunnel.getPeer(0));
|
||||
dlm.setMessageExpiration(getContext().clock().now() + SINGLE_SEARCH_MSG_TIME);
|
||||
dlm.setReplyTunnel(replyTunnel.getReceiveTunnelId(0));
|
||||
dlm.setSearchKey(_key);
|
||||
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info(getJobId() + ": Iterative search for " + _key + " to " + peer);
|
||||
long now = getContext().clock().now();
|
||||
_sentTime.put(peer, Long.valueOf(now));
|
||||
getContext().tunnelDispatcher().dispatchOutbound(dlm, outTunnel.getSendTunnelId(0), peer);
|
||||
|
||||
// The timeout job is always run (never cancelled)
|
||||
// Note that the timeout is much shorter than the message expiration (see above)
|
||||
Job j = new IterativeTimeoutJob(getContext(), peer, this);
|
||||
long expire = Math.min(_expiration, now + SINGLE_SEARCH_TIME);
|
||||
j.getTiming().setStartAfter(expire);
|
||||
getContext().jobQueue().addJob(j);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() { return "Iterative search"; }
|
||||
|
||||
/**
|
||||
* Note that the peer did not respond with a DSM
|
||||
* (either a DSRM, timeout, or failure).
|
||||
* This is not necessarily a total failure of the search.
|
||||
* @param timedOut if true, will blame the peer's profile
|
||||
*/
|
||||
void failed(Hash peer, boolean timedOut) {
|
||||
boolean isNewFail;
|
||||
synchronized (this) {
|
||||
if (_dead) return;
|
||||
_unheardFrom.remove(peer);
|
||||
isNewFail = _failedPeers.add(peer);
|
||||
}
|
||||
if (isNewFail) {
|
||||
if (timedOut) {
|
||||
getContext().profileManager().dbLookupFailed(peer);
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info(getJobId() + ": search timed out to " + peer);
|
||||
} else {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info(getJobId() + ": search failed to " + peer);
|
||||
}
|
||||
}
|
||||
retry();
|
||||
}
|
||||
|
||||
/**
|
||||
* A new (floodfill) peer was discovered that may have the answer.
|
||||
* @param peer may not actually be new
|
||||
*/
|
||||
void newPeerToTry(Hash peer) {
|
||||
// don't ask ourselves or the target
|
||||
if (peer.equals(getContext().routerHash()) ||
|
||||
peer.equals(_key))
|
||||
return;
|
||||
RouterInfo ri = getContext().netDb().lookupRouterInfoLocally(peer);
|
||||
if (!FloodfillNetworkDatabaseFacade.isFloodfill(ri))
|
||||
return;
|
||||
if (getContext().shitlist().isShitlistedForever(peer))
|
||||
return;
|
||||
synchronized (this) {
|
||||
if (_failedPeers.contains(peer) ||
|
||||
_unheardFrom.contains(peer))
|
||||
return; // already tried
|
||||
if (!_toTry.add(peer))
|
||||
return; // already in the list
|
||||
}
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info(getJobId() + ": new peer from DSRM " + peer);
|
||||
retry();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* When did we send the query to the peer?
|
||||
* @return context time, or -1 if never sent
|
||||
*/
|
||||
long timeSent(Hash peer) {
|
||||
Long rv = _sentTime.get(peer);
|
||||
return rv == null ? -1 : rv.longValue();
|
||||
}
|
||||
|
||||
/**
|
||||
* Total failure
|
||||
*/
|
||||
@Override
|
||||
void failed() {
|
||||
synchronized (this) {
|
||||
if (_dead) return;
|
||||
_dead = true;
|
||||
}
|
||||
_facade.complete(_key);
|
||||
getContext().messageRegistry().unregisterPending(_out);
|
||||
int tries;
|
||||
synchronized(this) {
|
||||
tries = _unheardFrom.size() + _failedPeers.size();
|
||||
// blame the unheard-from (others already blamed in failed() above)
|
||||
for (Iterator<Hash> iter = _unheardFrom.iterator(); iter.hasNext(); )
|
||||
getContext().profileManager().dbLookupFailed(iter.next());
|
||||
}
|
||||
long time = System.currentTimeMillis() - _created;
|
||||
if (_log.shouldLog(Log.INFO)) {
|
||||
long timeRemaining = _expiration - getContext().clock().now();
|
||||
_log.info(getJobId() + ": Iterative search for " + _key + " failed with " + timeRemaining + " remaining after " + time +
|
||||
", peers queried: " + tries);
|
||||
}
|
||||
getContext().statManager().addRateData("netDb.failedTime", time, 0);
|
||||
getContext().statManager().addRateData("netDb.retries", Math.max(0, tries - 1), 0);
|
||||
synchronized (_onFailed) {
|
||||
for (int i = 0; i < _onFailed.size(); i++) {
|
||||
Job j = _onFailed.remove(0);
|
||||
getContext().jobQueue().addJob(j);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
void success() {
|
||||
// Sadly, we don't know for sure which one replied.
|
||||
// If the reply is after expiration (which moves the hash from _unheardFrom to _failedPeers),
|
||||
// we will credit the wrong one.
|
||||
int tries;
|
||||
Hash peer = null;
|
||||
synchronized(this) {
|
||||
if (_dead) return;
|
||||
_dead = true;
|
||||
tries = _unheardFrom.size() + _failedPeers.size();
|
||||
if (_unheardFrom.size() == 1) {
|
||||
peer = _unheardFrom.iterator().next();
|
||||
_unheardFrom.clear();
|
||||
}
|
||||
}
|
||||
_facade.complete(_key);
|
||||
if (peer != null) {
|
||||
Long timeSent = _sentTime.get(peer);
|
||||
if (timeSent != null)
|
||||
getContext().profileManager().dbLookupSuccessful(peer, getContext().clock().now() - timeSent.longValue());
|
||||
}
|
||||
long time = System.currentTimeMillis() - _created;
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info(getJobId() + ": Iterative search for " + _key + " successful after " + time +
|
||||
", peers queried: " + tries);
|
||||
getContext().statManager().addRateData("netDb.successTime", time, 0);
|
||||
getContext().statManager().addRateData("netDb.retries", tries - 1, 0);
|
||||
synchronized (_onFind) {
|
||||
while (!_onFind.isEmpty())
|
||||
getContext().jobQueue().addJob(_onFind.remove(0));
|
||||
}
|
||||
}
|
||||
}
|
@@ -0,0 +1,30 @@
|
||||
package net.i2p.router.networkdb.kademlia;
|
||||
|
||||
import net.i2p.data.Hash;
|
||||
import net.i2p.router.JobImpl;
|
||||
import net.i2p.router.RouterContext;
|
||||
|
||||
/**
|
||||
* This is the timeout for a single lookup, not for the whole search.
|
||||
* It is called every time, it is not cancelled after the search succeeds
|
||||
* or the peer replies with a DSRM. We rely on ISJ.failed(peer) to
|
||||
* decide whether or not it actually timed out.
|
||||
*
|
||||
* @since 0.8.9
|
||||
*/
|
||||
class IterativeTimeoutJob extends JobImpl {
|
||||
private final IterativeSearchJob _search;
|
||||
private final Hash _peer;
|
||||
|
||||
public IterativeTimeoutJob(RouterContext ctx, Hash peer, IterativeSearchJob job) {
|
||||
super(ctx);
|
||||
_peer = peer;
|
||||
_search = job;
|
||||
}
|
||||
|
||||
public void runJob() {
|
||||
_search.failed(_peer, true);
|
||||
}
|
||||
|
||||
public String getName() { return "Iterative search timeout"; }
|
||||
}
|
@@ -643,13 +643,14 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
|
||||
+ " expires on " + new Date(leaseSet.getEarliestLeaseDate()), new Exception("Rejecting store"));
|
||||
return "Expired leaseSet for " + leaseSet.getDestination().calculateHash().toBase64()
|
||||
+ " expired " + DataHelper.formatDuration(age) + " ago";
|
||||
} else if (leaseSet.getEarliestLeaseDate() > _context.clock().now() + Router.CLOCK_FUDGE_FACTOR + MAX_LEASE_FUTURE) {
|
||||
} else if (leaseSet.getEarliestLeaseDate() > _context.clock().now() + (Router.CLOCK_FUDGE_FACTOR + MAX_LEASE_FUTURE)) {
|
||||
long age = leaseSet.getEarliestLeaseDate() - _context.clock().now();
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("LeaseSet to expire too far in the future: "
|
||||
// let's not make this an error, it happens when peers have bad clocks
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("LeaseSet expires too far in the future: "
|
||||
+ leaseSet.getDestination().calculateHash().toBase64()
|
||||
+ " expires on " + new Date(leaseSet.getEarliestLeaseDate()), new Exception("Rejecting store"));
|
||||
return "Future expiring leaseSet for " + leaseSet.getDestination().calculateHash().toBase64()
|
||||
+ " expires " + DataHelper.formatDuration(age) + " from now");
|
||||
return "Future expiring leaseSet for " + leaseSet.getDestination().calculateHash()
|
||||
+ " expiring in " + DataHelper.formatDuration(age);
|
||||
}
|
||||
return null;
|
||||
|
@@ -8,9 +8,9 @@ import net.i2p.router.RouterContext;
|
||||
//import net.i2p.util.Log;
|
||||
|
||||
/**
|
||||
* Ask the peer who sent us the DSRM for the RouterInfos.
|
||||
* Ask the peer who sent us the DSRM for the RouterInfos...
|
||||
*
|
||||
* If we have the routerInfo already, try to refetch it from that router itself,
|
||||
* ... but If we have the routerInfo already, try to refetch it from that router itself,
|
||||
* (if the info is old or we don't think it is floodfill)
|
||||
* which will help us establish that router as a good floodfill and speed our
|
||||
* integration into the network.
|
||||
@@ -23,6 +23,12 @@ class SingleLookupJob extends JobImpl {
|
||||
//private final Log _log;
|
||||
private final DatabaseSearchReplyMessage _dsrm;
|
||||
|
||||
/**
|
||||
* I2NP spec allows 255, max actually sent (in ../HDLMJ) is 3,
|
||||
* so just to prevent trouble, we don't want to queue 255 jobs at once
|
||||
*/
|
||||
public static final int MAX_TO_FOLLOW = 8;
|
||||
|
||||
public SingleLookupJob(RouterContext ctx, DatabaseSearchReplyMessage dsrm) {
|
||||
super(ctx);
|
||||
//_log = ctx.logManager().getLog(getClass());
|
||||
@@ -31,7 +37,8 @@ class SingleLookupJob extends JobImpl {
|
||||
|
||||
public void runJob() {
|
||||
Hash from = _dsrm.getFromHash();
|
||||
for (int i = 0; i < _dsrm.getNumReplies(); i++) {
|
||||
int limit = Math.min(_dsrm.getNumReplies(), MAX_TO_FOLLOW);
|
||||
for (int i = 0; i < limit; i++) {
|
||||
Hash peer = _dsrm.getReply(i);
|
||||
if (peer.equals(getContext().routerHash())) // us
|
||||
continue;
|
||||
|
@@ -48,7 +48,7 @@ class SingleSearchJob extends FloodOnlySearchJob {
|
||||
dlm.setSearchKey(_key);
|
||||
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info(getJobId() + ": Single search for " + _key.toBase64() + " to " + _to.toBase64());
|
||||
_log.info(getJobId() + ": Single search for " + _key + " to " + _to);
|
||||
getContext().tunnelDispatcher().dispatchOutbound(dlm, outTunnel.getSendTunnelId(0), _to);
|
||||
_lookupsRemaining = 1;
|
||||
}
|
||||
|
@@ -10,13 +10,15 @@ import net.i2p.data.Hash;
|
||||
*
|
||||
*/
|
||||
class XORComparator implements Comparator<Hash> {
|
||||
private Hash _base;
|
||||
private final Hash _base;
|
||||
|
||||
/**
|
||||
* @param target key to compare distances with
|
||||
*/
|
||||
public XORComparator(Hash target) {
|
||||
_base = target;
|
||||
}
|
||||
|
||||
public int compare(Hash lhs, Hash rhs) {
|
||||
if (lhs == null) throw new NullPointerException("LHS is null");
|
||||
if (rhs == null) throw new NullPointerException("RHS is null");
|
||||
|
Reference in New Issue
Block a user