forked from I2P_Developers/i2p.i2p
* ExploreJob/SearchJob - fix brokenness:
- Give each search a minimum of time even at the end - Fix ExploreJob exclude peer list - Always add floodfills to exclude peer list - Don't queue keys for exploration or run ExploreJob if floodfill - Allow floodfills to return non-floodfills in a DSRM msg so exploration works
This commit is contained in:
@@ -28,15 +28,25 @@ class ExploreJob extends SearchJob {
|
||||
private Log _log;
|
||||
private PeerSelector _peerSelector;
|
||||
|
||||
/** how long each exploration should run for (currently a trivial 10 seconds) */
|
||||
private static final long MAX_EXPLORE_TIME = 10*1000;
|
||||
/** how long each exploration should run for
|
||||
* The exploration won't "succeed" so we make it long so we query several peers */
|
||||
private static final long MAX_EXPLORE_TIME = 15*1000;
|
||||
|
||||
/** how many of the peers closest to the key being explored do we want to explicitly say "dont send me this"? */
|
||||
private static final int NUM_CLOSEST_TO_IGNORE = 3;
|
||||
|
||||
/** how many peers to explore through concurrently */
|
||||
private static final int EXPLORE_BREDTH = 1;
|
||||
|
||||
/** only send the closest "dont tell me about" refs...
|
||||
* Override to make this bigger because we want to include both the
|
||||
* floodfills and the previously-queried peers */
|
||||
static final int MAX_CLOSEST = 20;
|
||||
|
||||
/** Override to make this shorter, since we don't sort out the
|
||||
* unresponsive ff peers like we do in FloodOnlySearchJob */
|
||||
static final int PER_FLOODFILL_PEER_TIMEOUT = 5*1000;
|
||||
|
||||
/**
|
||||
* Create a new search for the routingKey specified
|
||||
*
|
||||
@@ -60,19 +70,31 @@ class ExploreJob extends SearchJob {
|
||||
* massive (aka sending the entire routing table as 'dont tell me about these
|
||||
* guys'). but maybe we do. dunno. lots of implications.
|
||||
*
|
||||
* FloodfillPeerSelector would add only the floodfill peers,
|
||||
* and PeerSelector doesn't include the floodfill peers,
|
||||
* so we add the ff peers ourselves and then use the regular PeerSelector.
|
||||
*
|
||||
* @param replyTunnelId tunnel to receive replies through
|
||||
* @param replyGateway gateway for the reply tunnel
|
||||
* @param expiration when the search should stop
|
||||
*/
|
||||
protected DatabaseLookupMessage buildMessage(TunnelId replyTunnelId, RouterInfo replyGateway, long expiration) {
|
||||
protected DatabaseLookupMessage buildMessage(TunnelId replyTunnelId, Hash replyGateway, long expiration) {
|
||||
DatabaseLookupMessage msg = new DatabaseLookupMessage(getContext(), true);
|
||||
msg.setSearchKey(getState().getTarget());
|
||||
msg.setFrom(replyGateway.getIdentity().getHash());
|
||||
msg.setFrom(replyGateway);
|
||||
msg.setDontIncludePeers(getState().getClosestAttempted(MAX_CLOSEST));
|
||||
msg.setMessageExpiration(expiration);
|
||||
msg.setReplyTunnel(replyTunnelId);
|
||||
|
||||
int available = MAX_CLOSEST - msg.getDontIncludePeers().size();
|
||||
if (available > 0) {
|
||||
List peers = ((FloodfillNetworkDatabaseFacade)_facade).getFloodfillPeers();
|
||||
int len = peers.size();
|
||||
if (len > 0)
|
||||
msg.getDontIncludePeers().addAll(peers.subList(0, Math.min(available, len)));
|
||||
}
|
||||
|
||||
available = MAX_CLOSEST - msg.getDontIncludePeers().size();
|
||||
if (available > 0) {
|
||||
List peers = _peerSelector.selectNearestExplicit(getState().getTarget(), available, msg.getDontIncludePeers(), getFacade().getKBuckets());
|
||||
msg.getDontIncludePeers().addAll(peers);
|
||||
@@ -91,7 +113,7 @@ class ExploreJob extends SearchJob {
|
||||
*
|
||||
*/
|
||||
protected DatabaseLookupMessage buildMessage(long expiration) {
|
||||
return buildMessage(null, getContext().router().getRouterInfo(), expiration);
|
||||
return buildMessage(null, getContext().router().getRouterInfo().getIdentity().getHash(), expiration);
|
||||
}
|
||||
|
||||
/** max # of concurrent searches */
|
||||
|
@@ -37,6 +37,10 @@ class ExploreKeySelectorJob extends JobImpl {
|
||||
|
||||
public String getName() { return "Explore Key Selector Job"; }
|
||||
public void runJob() {
|
||||
if (((FloodfillNetworkDatabaseFacade)_facade).floodfillEnabled()) {
|
||||
requeue(30*RERUN_DELAY_MS);
|
||||
return;
|
||||
}
|
||||
Set toExplore = selectKeysToExplore();
|
||||
_log.info("Filling the explorer pool with: " + toExplore);
|
||||
if (toExplore != null)
|
||||
|
@@ -174,11 +174,11 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
|
||||
synchronized (_activeFloodQueries) {
|
||||
searchJob = (FloodSearchJob)_activeFloodQueries.get(key);
|
||||
if (searchJob == null) {
|
||||
if (SearchJob.onlyQueryFloodfillPeers(_context)) {
|
||||
//if (SearchJob.onlyQueryFloodfillPeers(_context)) {
|
||||
searchJob = new FloodOnlySearchJob(_context, this, key, onFindJob, onFailedLookupJob, (int)timeoutMs, isLease);
|
||||
} else {
|
||||
searchJob = new FloodSearchJob(_context, this, key, onFindJob, onFailedLookupJob, (int)timeoutMs, isLease);
|
||||
}
|
||||
//} else {
|
||||
// searchJob = new FloodSearchJob(_context, this, key, onFindJob, onFailedLookupJob, (int)timeoutMs, isLease);
|
||||
//}
|
||||
_activeFloodQueries.put(key, searchJob);
|
||||
isNew = true;
|
||||
}
|
||||
|
@@ -124,6 +124,14 @@ class SearchJob extends JobImpl {
|
||||
static boolean onlyQueryFloodfillPeers(RouterContext ctx) {
|
||||
if (isCongested(ctx))
|
||||
return true;
|
||||
// If we are floodfill, we want the FloodfillPeerSelector (in add()) to include
|
||||
// non-ff peers (if required) in DatabaseSearchReplyMessage responses
|
||||
// so that Exploration works.
|
||||
// ExploreJob is disabled if we are floodfill.
|
||||
// The other two places this was called (one below and one in FNDF)
|
||||
// have been commented out.
|
||||
if (FloodfillNetworkDatabaseFacade.floodfillEnabled(ctx))
|
||||
return false;
|
||||
return Boolean.valueOf(ctx.getProperty("netDb.floodfillOnly", DEFAULT_FLOODFILL_ONLY + "")).booleanValue();
|
||||
}
|
||||
|
||||
@@ -136,6 +144,7 @@ class SearchJob extends JobImpl {
|
||||
}
|
||||
|
||||
static final int PER_FLOODFILL_PEER_TIMEOUT = 10*1000;
|
||||
static final long MIN_TIMEOUT = 2500;
|
||||
|
||||
protected int getPerPeerTimeoutMs(Hash peer) {
|
||||
int timeout = 0;
|
||||
@@ -146,7 +155,7 @@ class SearchJob extends JobImpl {
|
||||
long now = getContext().clock().now();
|
||||
|
||||
if (now + timeout > _expiration)
|
||||
return (int)(_expiration - now);
|
||||
return (int) Math.max(_expiration - now, MIN_TIMEOUT);
|
||||
else
|
||||
return timeout;
|
||||
}
|
||||
@@ -247,7 +256,8 @@ class SearchJob extends JobImpl {
|
||||
int sent = 0;
|
||||
Set attempted = _state.getAttempted();
|
||||
while (sent <= 0) {
|
||||
boolean onlyFloodfill = onlyQueryFloodfillPeers(getContext());
|
||||
//boolean onlyFloodfill = onlyQueryFloodfillPeers(getContext());
|
||||
boolean onlyFloodfill = true;
|
||||
if (_floodfillPeersExhausted && onlyFloodfill && _state.getPending().size() <= 0) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn(getJobId() + ": no non-floodfill peers left, and no more pending. Searched: "
|
||||
@@ -421,7 +431,7 @@ class SearchJob extends JobImpl {
|
||||
|
||||
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug(getJobId() + ": Sending leaseSet search to " + router.getIdentity().getHash().toBase64()
|
||||
_log.debug(getJobId() + ": Sending search to " + router.getIdentity().getHash().toBase64()
|
||||
+ " for " + msg.getSearchKey().toBase64() + " w/ replies through ["
|
||||
+ msg.getFrom().toBase64() + "] via tunnel ["
|
||||
+ msg.getReplyTunnel() + "]");
|
||||
|
@@ -57,6 +57,8 @@ class SearchReplyJob extends JobImpl {
|
||||
public String getName() { return "Process Reply for Kademlia Search"; }
|
||||
public void runJob() {
|
||||
if (_curIndex >= _msg.getNumReplies()) {
|
||||
if (_log.shouldLog(Log.DEBUG) && _msg.getNumReplies() == 0)
|
||||
_log.debug(getJobId() + ": dbSearchReply received with no routers referenced");
|
||||
if (_repliesPendingVerification > 0) {
|
||||
// we received new references from the peer, but still
|
||||
// haven't verified all of them, so lets give it more time
|
||||
@@ -106,7 +108,8 @@ class SearchReplyJob extends JobImpl {
|
||||
_duplicatePeers++;
|
||||
}
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug(getJobId() + ": dbSearchReply received on search referencing router " + peer);
|
||||
_log.debug(getJobId() + ": dbSearchReply received on search referencing router " + peer +
|
||||
" already known? " + (info != null));
|
||||
if (shouldAdd) {
|
||||
if (_searchJob.add(peer))
|
||||
_newPeers++;
|
||||
|
@@ -28,7 +28,7 @@ class StartExplorersJob extends JobImpl {
|
||||
|
||||
/** don't explore more than 1 bucket at a time */
|
||||
private static final int MAX_PER_RUN = 1;
|
||||
/** dont explore the network more often than once every minute */
|
||||
/** dont explore the network more often than this */
|
||||
private static final int MIN_RERUN_DELAY_MS = 5*60*1000;
|
||||
/** explore the network at least once every thirty minutes */
|
||||
private static final int MAX_RERUN_DELAY_MS = 30*60*1000;
|
||||
@@ -41,14 +41,15 @@ class StartExplorersJob extends JobImpl {
|
||||
|
||||
public String getName() { return "Start Explorers Job"; }
|
||||
public void runJob() {
|
||||
Set toExplore = selectKeysToExplore();
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Keys to explore during this run: " + toExplore);
|
||||
_facade.removeFromExploreKeys(toExplore);
|
||||
for (Iterator iter = toExplore.iterator(); iter.hasNext(); ) {
|
||||
Hash key = (Hash)iter.next();
|
||||
//_log.info("Starting explorer for " + key, new Exception("Exploring!"));
|
||||
getContext().jobQueue().addJob(new ExploreJob(getContext(), _facade, key));
|
||||
if (! ((FloodfillNetworkDatabaseFacade)_facade).floodfillEnabled()) {
|
||||
Set toExplore = selectKeysToExplore();
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Keys to explore during this run: " + toExplore);
|
||||
_facade.removeFromExploreKeys(toExplore);
|
||||
for (Iterator iter = toExplore.iterator(); iter.hasNext(); ) {
|
||||
Hash key = (Hash)iter.next();
|
||||
getContext().jobQueue().addJob(new ExploreJob(getContext(), _facade, key));
|
||||
}
|
||||
}
|
||||
long delay = getNextRunDelay();
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
|
Reference in New Issue
Block a user