- Move stat initialization, reduce number of rates
      - Add basic DOS prevention by not flooding if stores are too-frequent
This commit is contained in:
zzz
2010-01-24 02:36:42 +00:00
parent fdfbab850a
commit 087fd5a909
10 changed files with 101 additions and 30 deletions

View File

@@ -10,19 +10,28 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class ObjectCounter<K> {
private ConcurrentHashMap<K, Integer> _map;
private static final Integer ONE = Integer.valueOf(1);
public ObjectCounter() {
_map = new ConcurrentHashMap();
}
/**
* Add one.
* Not perfectly concurrent, new AtomicInteger(1) would be better,
* at the cost of some object churn.
* @return count after increment
*/
public void increment(K h) {
Integer i = _map.putIfAbsent(h, Integer.valueOf(1));
if (i != null)
_map.put(h, Integer.valueOf(i.intValue() + 1));
public int increment(K h) {
Integer i = _map.putIfAbsent(h, ONE);
if (i != null) {
int rv = i.intValue() + 1;
_map.put(h, Integer.valueOf(rv));
return rv;
}
return 1;
}
/**
* @return current count
*/
@@ -32,11 +41,20 @@ public class ObjectCounter<K> {
return i.intValue();
return 0;
}
/**
* @return set of objects with counts > 0
*/
public Set<K> objects() {
return _map.keySet();
}
/**
* start over
* @since 0.7.11
*/
public void clear() {
_map.clear();
}
}

View File

@@ -55,13 +55,6 @@ public class HandleDatabaseLookupMessageJob extends JobImpl {
public HandleDatabaseLookupMessageJob(RouterContext ctx, DatabaseLookupMessage receivedMessage, RouterIdentity from, Hash fromHash) {
super(ctx);
_log = getContext().logManager().getLog(HandleDatabaseLookupMessageJob.class);
getContext().statManager().createRateStat("netDb.lookupsHandled", "How many netDb lookups have we handled?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.lookupsMatched", "How many netDb lookups did we have the data for?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.lookupsMatchedLeaseSet", "How many netDb leaseSet lookups did we have the data for?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.lookupsMatchedReceivedPublished", "How many netDb lookups did we have the data for that were published to us?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.lookupsMatchedLocalClosest", "How many netDb lookups for local data were received where we are the closest peers?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.lookupsMatchedLocalNotClosest", "How many netDb lookups for local data were received where we are NOT the closest peers?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.lookupsMatchedRemoteNotClosest", "How many netDb lookups for remote data were received where we are NOT the closest peers?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
_message = receivedMessage;
_from = from;
_fromHash = fromHash;

View File

@@ -0,0 +1,31 @@
package net.i2p.router.networkdb.kademlia;
import net.i2p.data.Hash;
import net.i2p.util.ObjectCounter;
import net.i2p.util.SimpleScheduler;
import net.i2p.util.SimpleTimer;
/**
* Count how often we have recently flooded a key
*/
class FloodThrottler {
private ObjectCounter<Hash> counter;
private static final int MAX_FLOODS = 3;
private static final long CLEAN_TIME = 60*1000;
FloodThrottler() {
this.counter = new ObjectCounter();
SimpleScheduler.getInstance().addPeriodicEvent(new Cleaner(), CLEAN_TIME);
}
/** increments before checking */
boolean shouldThrottle(Hash h) {
return this.counter.increment(h) > MAX_FLOODS;
}
private class Cleaner implements SimpleTimer.TimedEvent {
public void timeReached() {
FloodThrottler.this.counter.clear();
}
}
}

View File

@@ -29,6 +29,14 @@ public class FloodfillDatabaseLookupMessageHandler implements HandlerJobBuilder
_log = context.logManager().getLog(FloodfillDatabaseLookupMessageHandler.class);
_context.statManager().createRateStat("netDb.lookupsReceived", "How many netDb lookups have we received?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("netDb.lookupsDropped", "How many netDb lookups did we drop due to throttling?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
// following are for ../HDLMJ
_context.statManager().createRateStat("netDb.lookupsHandled", "How many netDb lookups have we handled?", "NetworkDatabase", new long[] { 60*60*1000l });
_context.statManager().createRateStat("netDb.lookupsMatched", "How many netDb lookups did we have the data for?", "NetworkDatabase", new long[] { 60*60*1000l });
_context.statManager().createRateStat("netDb.lookupsMatchedLeaseSet", "How many netDb leaseSet lookups did we have the data for?", "NetworkDatabase", new long[] { 60*60*1000l });
_context.statManager().createRateStat("netDb.lookupsMatchedReceivedPublished", "How many netDb lookups did we have the data for that were published to us?", "NetworkDatabase", new long[] { 60*60*1000l });
_context.statManager().createRateStat("netDb.lookupsMatchedLocalClosest", "How many netDb lookups for local data were received where we are the closest peers?", "NetworkDatabase", new long[] { 60*60*1000l });
_context.statManager().createRateStat("netDb.lookupsMatchedLocalNotClosest", "How many netDb lookups for local data were received where we are NOT the closest peers?", "NetworkDatabase", new long[] { 60*60*1000l });
_context.statManager().createRateStat("netDb.lookupsMatchedRemoteNotClosest", "How many netDb lookups for remote data were received where we are NOT the closest peers?", "NetworkDatabase", new long[] { 60*60*1000l });
}
public Job createJob(I2NPMessage receivedMessage, RouterIdentity from, Hash fromHash) {

View File

@@ -27,7 +27,13 @@ public class FloodfillDatabaseStoreMessageHandler implements HandlerJobBuilder {
public FloodfillDatabaseStoreMessageHandler(RouterContext context, FloodfillNetworkDatabaseFacade facade) {
_context = context;
_facade = facade;
// following are for HFDSMJ
context.statManager().createRateStat("netDb.storeHandled", "How many netDb store messages have we handled?", "NetworkDatabase", new long[] { 60*60*1000l });
context.statManager().createRateStat("netDb.storeLeaseSetHandled", "How many leaseSet store messages have we handled?", "NetworkDatabase", new long[] { 60*60*1000l });
context.statManager().createRateStat("netDb.storeRouterInfoHandled", "How many routerInfo store messages have we handled?", "NetworkDatabase", new long[] { 60*60*1000l });
context.statManager().createRateStat("netDb.storeRecvTime", "How long it takes to handle the local store part of a dbStore?", "NetworkDatabase", new long[] { 60*60*1000l });
}
public Job createJob(I2NPMessage receivedMessage, RouterIdentity from, Hash fromHash) {
Job j = new HandleFloodfillDatabaseStoreMessageJob(_context, (DatabaseStoreMessage)receivedMessage, from, fromHash, _facade);
if (false) {

View File

@@ -37,6 +37,7 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
/** for testing, see isFloodfill() below */
private static String _alwaysQuery;
private final Set<Hash> _verifiesInProgress;
private FloodThrottler _floodThrottler;
public FloodfillNetworkDatabaseFacade(RouterContext context) {
super(context);
@@ -115,6 +116,13 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
key = ((LeaseSet)ds).getDestination().calculateHash();
else
key = ((RouterInfo)ds).getIdentity().calculateHash();
// DOS prevention
if (_floodThrottler != null && _floodThrottler.shouldThrottle(key)) {
if (_log.shouldLog(Log.WARN))
_log.warn("Too many recent stores, not flooding key: " + key);
_context.statManager().addRateData("netDb.floodThrottled", 1, 0);
return;
}
Hash rkey = _context.routingKeyGenerator().getRoutingKey(key);
FloodfillPeerSelector sel = (FloodfillPeerSelector)getPeerSelector();
List peers = sel.selectFloodfillParticipants(rkey, MAX_TO_FLOOD, getKBuckets());
@@ -124,6 +132,10 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
RouterInfo target = lookupRouterInfoLocally(peer);
if ( (target == null) || (_context.shitlist().isShitlisted(peer)) )
continue;
// Don't flood a RI back to itself
// Not necessary, a ff will do its own flooding (reply token == 0)
//if (peer.equals(target.getIdentity().getHash()))
// continue;
if (peer.equals(_context.routerHash()))
continue;
DatabaseStoreMessage msg = new DatabaseStoreMessage(_context);
@@ -176,7 +188,17 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
@Override
protected PeerSelector createPeerSelector() { return new FloodfillPeerSelector(_context); }
public void setFloodfillEnabled(boolean yes) { _floodfillEnabled = yes; }
synchronized void setFloodfillEnabled(boolean yes) {
_floodfillEnabled = yes;
if (yes && _floodThrottler == null) {
_floodThrottler = new FloodThrottler();
_context.statManager().createRateStat("netDb.floodThrottled", "How often do we decline to flood?", "NetworkDatabase", new long[] { 60*60*1000l });
// following are for HFDSMJ
_context.statManager().createRateStat("netDb.storeFloodNew", "How long it takes to flood out a newly received entry?", "NetworkDatabase", new long[] { 60*60*1000l });
_context.statManager().createRateStat("netDb.storeFloodOld", "How often we receive an old entry?", "NetworkDatabase", new long[] { 60*60*1000l });
}
}
public boolean floodfillEnabled() { return _floodfillEnabled; }
public static boolean floodfillEnabled(RouterContext ctx) {
return ((FloodfillNetworkDatabaseFacade)ctx.netDb()).floodfillEnabled();

View File

@@ -39,14 +39,6 @@ public class HandleFloodfillDatabaseStoreMessageJob extends JobImpl {
public HandleFloodfillDatabaseStoreMessageJob(RouterContext ctx, DatabaseStoreMessage receivedMessage, RouterIdentity from, Hash fromHash, FloodfillNetworkDatabaseFacade facade) {
super(ctx);
_log = ctx.logManager().getLog(getClass());
ctx.statManager().createRateStat("netDb.storeHandled", "How many netDb store messages have we handled?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("netDb.storeLeaseSetHandled", "How many leaseSet store messages have we handled?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
//ctx.statManager().createRateStat("netDb.storeLocalLeaseSetAttempt", "Peer tries to store our leaseset (multihome?)", "NetworkDatabase", new long[] { 60*60*1000l });
//ctx.statManager().createRateStat("netDb.storeLocalRouterInfoAttempt", "Peer tries to store our router info", "NetworkDatabase", new long[] { 60*60*1000l });
ctx.statManager().createRateStat("netDb.storeRouterInfoHandled", "How many routerInfo store messages have we handled?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("netDb.storeRecvTime", "How long it takes to handle the local store part of a dbStore?", "NetworkDatabase", new long[] { 60*1000l, 10*60*1000l });
ctx.statManager().createRateStat("netDb.storeFloodNew", "How long it takes to flood out a newly received entry?", "NetworkDatabase", new long[] { 60*1000l, 10*60*1000l });
ctx.statManager().createRateStat("netDb.storeFloodOld", "How often we receive an old entry?", "NetworkDatabase", new long[] { 60*1000l, 10*60*1000l });
_message = receivedMessage;
_from = from;
_fromHash = fromHash;

View File

@@ -44,7 +44,6 @@ import net.i2p.router.networkdb.DatabaseStoreMessageHandler;
import net.i2p.router.networkdb.PublishLocalRouterInfoJob;
import net.i2p.router.peermanager.PeerProfile;
import net.i2p.util.Log;
import net.i2p.util.ObjectCounter;
/**
* Kademlia based version of the network database
@@ -140,8 +139,17 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
_lastExploreNew = 0;
_activeRequests = new HashMap(8);
_enforceNetId = DEFAULT_ENFORCE_NETID;
context.statManager().createRateStat("netDb.lookupLeaseSetDeferred", "how many lookups are deferred for a single leaseSet lookup?", "NetworkDatabase", new long[] { 60*1000, 5*60*1000 });
context.statManager().createRateStat("netDb.exploreKeySet", "how many keys are queued for exploration?", "NetworkDatabase", new long[] { 10*60*1000 });
context.statManager().createRateStat("netDb.lookupLeaseSetDeferred", "how many lookups are deferred for a single leaseSet lookup?", "NetworkDatabase", new long[] { 60*60*1000 });
context.statManager().createRateStat("netDb.exploreKeySet", "how many keys are queued for exploration?", "NetworkDatabase", new long[] { 60*60*1000 });
// following are for StoreJob
context.statManager().createRateStat("netDb.storeRouterInfoSent", "How many routerInfo store messages have we sent?", "NetworkDatabase", new long[] { 60*60*1000l });
context.statManager().createRateStat("netDb.storeLeaseSetSent", "How many leaseSet store messages have we sent?", "NetworkDatabase", new long[] { 60*60*1000l });
context.statManager().createRateStat("netDb.storePeers", "How many peers each netDb must be sent to before success?", "NetworkDatabase", new long[] { 60*60*1000l });
context.statManager().createRateStat("netDb.storeFailedPeers", "How many peers each netDb must be sent to before failing completely?", "NetworkDatabase", new long[] { 60*60*1000l });
context.statManager().createRateStat("netDb.ackTime", "How long does it take for a peer to ack a netDb store?", "NetworkDatabase", new long[] { 60*60*1000l });
context.statManager().createRateStat("netDb.replyTimeout", "How long after a netDb send does the timeout expire (when the peer doesn't reply in time)?", "NetworkDatabase", new long[] { 60*60*1000l });
// following is for RepublishLeaseSetJob
context.statManager().createRateStat("netDb.republishLeaseSetCount", "How often we republish a leaseSet?", "NetworkDatabase", new long[] { 60*60*1000l });
}
@Override

View File

@@ -36,7 +36,6 @@ public class RepublishLeaseSetJob extends JobImpl {
_dest = destHash;
_lastPublished = 0;
//getTiming().setStartAfter(ctx.clock().now()+REPUBLISH_LEASESET_DELAY);
getContext().statManager().createRateStat("netDb.republishLeaseSetCount", "How often we republish a leaseSet?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
}
public String getName() { return "Republish a local leaseSet"; }
public void runJob() {

View File

@@ -73,12 +73,6 @@ class StoreJob extends JobImpl {
DataStructure data, Job onSuccess, Job onFailure, long timeoutMs, Set<Hash> toSkip) {
super(context);
_log = context.logManager().getLog(StoreJob.class);
getContext().statManager().createRateStat("netDb.storeRouterInfoSent", "How many routerInfo store messages have we sent?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.storeLeaseSetSent", "How many leaseSet store messages have we sent?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.storePeers", "How many peers each netDb must be sent to before success?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.storeFailedPeers", "How many peers each netDb must be sent to before failing completely?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.ackTime", "How long does it take for a peer to ack a netDb store?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.replyTimeout", "How long after a netDb send does the timeout expire (when the peer doesn't reply in time)?", "NetworkDatabase", new long[] { 60*1000, 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
_facade = facade;
_state = new StoreState(getContext(), key, data, toSkip);
_onSuccess = onSuccess;