forked from I2P_Developers/i2p.i2p
* NetDb:
- Update dbLookup profile stats in FloodOnlySearchJob and FloodfillVerifyStoreJob - Fix response time store in profile in SearchJob * profiles.jsp: Don't override locale number format, clean up the response time output for floodfills
This commit is contained in:
@ -2,6 +2,8 @@ package net.i2p.router.networkdb.kademlia;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import net.i2p.data.Hash;
|
||||
@ -52,6 +54,7 @@ class FloodOnlySearchJob extends FloodSearchJob {
|
||||
private volatile boolean _dead;
|
||||
private long _created;
|
||||
private boolean _shouldProcessDSRM;
|
||||
private HashSet _unheardFrom;
|
||||
|
||||
protected List _out;
|
||||
protected MessageSelector _replySelector;
|
||||
@ -73,6 +76,7 @@ class FloodOnlySearchJob extends FloodSearchJob {
|
||||
_lookupsRemaining = 0;
|
||||
_dead = false;
|
||||
_out = Collections.synchronizedList(new ArrayList(2));
|
||||
_unheardFrom = new HashSet(CONCURRENT_SEARCHES);
|
||||
_replySelector = new FloodOnlyLookupSelector(getContext(), this);
|
||||
_onReply = new FloodOnlyLookupMatchJob(getContext(), this);
|
||||
_onTimeout = new FloodOnlyLookupTimeoutJob(getContext(), this);
|
||||
@ -88,6 +92,7 @@ class FloodOnlySearchJob extends FloodSearchJob {
|
||||
}
|
||||
}
|
||||
public long getExpiration() { return _expiration; }
|
||||
public long getCreated() { return _created; }
|
||||
public boolean shouldProcessDSRM() { return _shouldProcessDSRM; }
|
||||
private static final int CONCURRENT_SEARCHES = 2;
|
||||
public void runJob() {
|
||||
@ -157,6 +162,9 @@ class FloodOnlySearchJob extends FloodSearchJob {
|
||||
failed();
|
||||
return;
|
||||
}
|
||||
synchronized(_unheardFrom) {
|
||||
_unheardFrom.add(peer);
|
||||
}
|
||||
dlm.setFrom(replyTunnel.getPeer(0));
|
||||
dlm.setMessageExpiration(getContext().clock().now()+10*1000);
|
||||
dlm.setReplyTunnel(replyTunnel.getReceiveTunnelId(0));
|
||||
@ -181,6 +189,13 @@ class FloodOnlySearchJob extends FloodSearchJob {
|
||||
Hash getKey() { return _key; }
|
||||
void decrementRemaining() { if (_lookupsRemaining > 0) _lookupsRemaining--; }
|
||||
int getLookupsRemaining() { return _lookupsRemaining; }
|
||||
/** Note that we heard from the peer */
|
||||
void decrementRemaining(Hash peer) {
|
||||
decrementRemaining();
|
||||
synchronized(_unheardFrom) {
|
||||
_unheardFrom.remove(peer);
|
||||
}
|
||||
}
|
||||
|
||||
void failed() {
|
||||
synchronized (this) {
|
||||
@ -196,6 +211,10 @@ class FloodOnlySearchJob extends FloodSearchJob {
|
||||
int timeRemaining = (int)(_origExpiration - getContext().clock().now());
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info(getJobId() + ": Floodfill search for " + _key.toBase64() + " failed with " + timeRemaining + " remaining after " + (System.currentTimeMillis()-_created));
|
||||
synchronized(_unheardFrom) {
|
||||
for (Iterator iter = _unheardFrom.iterator(); iter.hasNext(); )
|
||||
getContext().profileManager().dbLookupFailed((Hash) iter.next());
|
||||
}
|
||||
_facade.complete(_key);
|
||||
getContext().statManager().addRateData("netDb.failedTime", System.currentTimeMillis()-_created, System.currentTimeMillis()-_created);
|
||||
synchronized (_onFailed) {
|
||||
@ -212,6 +231,20 @@ class FloodOnlySearchJob extends FloodSearchJob {
|
||||
}
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info(getJobId() + ": Floodfill search for " + _key.toBase64() + " successful");
|
||||
// Sadly, we don't know which of the two replied, unless the first one sent a DSRM
|
||||
// before the second one sent the answer, which isn't that likely.
|
||||
// Would be really nice to fix this, but it isn't clear how unless CONCURRENT_SEARCHES == 1.
|
||||
// Maybe don't unregister the msg from the Registry for a while and see if we get a 2nd reply?
|
||||
// Or delay the 2nd search for a few seconds?
|
||||
// We'll have to rely primarily on other searches (ExploreJob which calls SearchJob,
|
||||
// and FloodfillVerifyStoreJob) to record successful searches for now.
|
||||
// StoreJob also calls dbStoreSent() which updates the lastHeardFrom timer - this also helps.
|
||||
synchronized(_unheardFrom) {
|
||||
if (_unheardFrom.size() == 1) {
|
||||
Hash peer = (Hash) _unheardFrom.iterator().next();
|
||||
getContext().profileManager().dbLookupSuccessful(peer, System.currentTimeMillis()-_created);
|
||||
}
|
||||
}
|
||||
_facade.complete(_key);
|
||||
getContext().statManager().addRateData("netDb.successTime", System.currentTimeMillis()-_created, System.currentTimeMillis()-_created);
|
||||
synchronized (_onFind) {
|
||||
@ -260,6 +293,7 @@ class FloodOnlyLookupMatchJob extends JobImpl implements ReplyJob {
|
||||
+ _search.getKey().toBase64() + ", with " + remaining + " outstanding searches");
|
||||
// netDb reply pointing us at other people
|
||||
// Only process if we don't know enough floodfills
|
||||
// This only works if both reply, otherwise we aren't called - should be fixed
|
||||
if (_search.shouldProcessDSRM() && _dsrm != null) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info(_search.getJobId() + ": Processing DatabaseSearchReply");
|
||||
@ -274,6 +308,8 @@ class FloodOnlyLookupMatchJob extends JobImpl implements ReplyJob {
|
||||
if (message instanceof DatabaseSearchReplyMessage) {
|
||||
// a dsrm is only passed in when there are no more lookups remaining
|
||||
// If more than one peer sent one, we only process the last one
|
||||
// And sadly if the first peer sends a DRSM and the second one times out,
|
||||
// this won't get called...
|
||||
_dsrm = (DatabaseSearchReplyMessage) message;
|
||||
_search.failed();
|
||||
return;
|
||||
@ -319,7 +355,10 @@ class FloodOnlyLookupSelector implements MessageSelector {
|
||||
} else if (message instanceof DatabaseSearchReplyMessage) {
|
||||
DatabaseSearchReplyMessage dsrm = (DatabaseSearchReplyMessage)message;
|
||||
if (_search.getKey().equals(dsrm.getSearchKey())) {
|
||||
_search.decrementRemaining();
|
||||
_search.decrementRemaining(dsrm.getFromHash());
|
||||
// assume 0 old, all new, 0 invalid, 0 dup
|
||||
_context.profileManager().dbLookupReply(dsrm.getFromHash(), 0, dsrm.getNumReplies(), 0, 0,
|
||||
System.currentTimeMillis()-_search.getCreated());
|
||||
if (_search.getLookupsRemaining() <= 0)
|
||||
return true; // ok, no more left, so time to fail
|
||||
else
|
||||
|
@ -25,6 +25,7 @@ import net.i2p.util.Log;
|
||||
public class FloodfillVerifyStoreJob extends JobImpl {
|
||||
private Log _log;
|
||||
private Hash _key;
|
||||
private Hash _target;
|
||||
private FloodfillNetworkDatabaseFacade _facade;
|
||||
private long _expiration;
|
||||
private long _sendTime;
|
||||
@ -44,8 +45,8 @@ public class FloodfillVerifyStoreJob extends JobImpl {
|
||||
}
|
||||
public String getName() { return "Verify netdb store"; }
|
||||
public void runJob() {
|
||||
Hash target = pickTarget();
|
||||
if (target == null) return;
|
||||
_target = pickTarget();
|
||||
if (_target == null) return;
|
||||
|
||||
DatabaseLookupMessage lookup = buildLookup();
|
||||
if (lookup == null) return;
|
||||
@ -60,7 +61,7 @@ public class FloodfillVerifyStoreJob extends JobImpl {
|
||||
_sendTime = getContext().clock().now();
|
||||
_expiration = _sendTime + VERIFY_TIMEOUT;
|
||||
getContext().messageRegistry().registerPending(new VerifyReplySelector(), new VerifyReplyJob(getContext()), new VerifyTimeoutJob(getContext()), VERIFY_TIMEOUT);
|
||||
getContext().tunnelDispatcher().dispatchOutbound(lookup, outTunnel.getSendTunnelId(0), target);
|
||||
getContext().tunnelDispatcher().dispatchOutbound(lookup, outTunnel.getSendTunnelId(0), _target);
|
||||
}
|
||||
|
||||
private Hash pickTarget() {
|
||||
@ -121,12 +122,20 @@ public class FloodfillVerifyStoreJob extends JobImpl {
|
||||
}
|
||||
public String getName() { return "Handle floodfill verification reply"; }
|
||||
public void runJob() {
|
||||
long delay = getContext().clock().now() - _sendTime;
|
||||
if (_message instanceof DatabaseStoreMessage) {
|
||||
// store ok, w00t!
|
||||
getContext().statManager().addRateData("netDb.floodfillVerifyOK", getContext().clock().now() - _sendTime, 0);
|
||||
// Hmm should we verify it's as recent as the one we sent???
|
||||
getContext().profileManager().dbLookupSuccessful(_target, delay);
|
||||
getContext().statManager().addRateData("netDb.floodfillVerifyOK", delay, 0);
|
||||
} else {
|
||||
// store failed, boo, hiss!
|
||||
getContext().statManager().addRateData("netDb.floodfillVerifyFail", getContext().clock().now() - _sendTime, 0);
|
||||
if (_message instanceof DatabaseSearchReplyMessage) {
|
||||
// assume 0 old, all new, 0 invalid, 0 dup
|
||||
getContext().profileManager().dbLookupReply(_target, 0,
|
||||
((DatabaseSearchReplyMessage)_message).getNumReplies(), 0, 0, delay);
|
||||
}
|
||||
getContext().statManager().addRateData("netDb.floodfillVerifyFail", delay, 0);
|
||||
resend();
|
||||
}
|
||||
}
|
||||
@ -149,6 +158,7 @@ public class FloodfillVerifyStoreJob extends JobImpl {
|
||||
}
|
||||
public String getName() { return "Floodfill verification timeout"; }
|
||||
public void runJob() {
|
||||
getContext().profileManager().dbLookupFailed(_target);
|
||||
getContext().statManager().addRateData("netDb.floodfillVerifyTimeout", getContext().clock().now() - _sendTime, 0);
|
||||
resend();
|
||||
}
|
||||
|
@ -809,6 +809,10 @@ class SearchReplyJob extends JobImpl {
|
||||
_newPeers = 0;
|
||||
_duplicatePeers = 0;
|
||||
_repliesPendingVerification = 0;
|
||||
if (duration > 0)
|
||||
_duration = duration;
|
||||
else
|
||||
_duration = 0;
|
||||
}
|
||||
public String getName() { return "Process Reply for Kademlia Search"; }
|
||||
public void runJob() {
|
||||
|
@ -307,19 +307,20 @@ class ProfileOrganizerRenderer {
|
||||
|
||||
}
|
||||
|
||||
private final static DecimalFormat _fmt = new DecimalFormat("###,##0.00", new DecimalFormatSymbols(Locale.UK));
|
||||
private final static DecimalFormat _fmt = new DecimalFormat("###,##0.00");
|
||||
private final static String num(double num) { synchronized (_fmt) { return _fmt.format(num); } }
|
||||
private final static String na = "n/a";
|
||||
|
||||
String avg (PeerProfile prof, long rate) {
|
||||
RateStat rs = prof.getDbResponseTime();
|
||||
if (rs == null)
|
||||
return "0ms";
|
||||
return na;
|
||||
Rate r = rs.getRate(rate);
|
||||
if (r == null)
|
||||
return "0ms";
|
||||
return na;
|
||||
long c = r.getCurrentEventCount() + r.getLastEventCount();
|
||||
if (c == 0)
|
||||
return "0ms";
|
||||
return na;
|
||||
double d = r.getCurrentTotalValue() + r.getLastTotalValue();
|
||||
return Math.round(d/c) + "ms";
|
||||
}
|
||||
@ -327,10 +328,10 @@ class ProfileOrganizerRenderer {
|
||||
String davg (DBHistory dbh, long rate) {
|
||||
RateStat rs = dbh.getFailedLookupRate();
|
||||
if (rs == null)
|
||||
return num(0d);
|
||||
return na;
|
||||
Rate r = rs.getRate(rate);
|
||||
if (r == null)
|
||||
return num(0d);
|
||||
return na;
|
||||
long c = r.getCurrentEventCount() + r.getLastEventCount();
|
||||
return "" + c;
|
||||
}
|
||||
|
Reference in New Issue
Block a user