* ClientManager:

- Concurrent client map for faster lookup
    - Add by-hash client map for faster lookup by hash
    - More cleanups
This commit is contained in:
zzz
2012-09-08 21:56:05 +00:00
parent d9e6c06b22
commit 9bc54f27cf

View File

@@ -13,8 +13,10 @@ import java.io.Writer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import net.i2p.client.I2PSessionException;
import net.i2p.crypto.SessionKeyManager;
@@ -42,8 +44,14 @@ import net.i2p.util.Log;
class ClientManager {
private final Log _log;
private ClientListenerRunner _listener;
private final HashMap<Destination, ClientConnectionRunner> _runners; // Destination --> ClientConnectionRunner
private final Set<ClientConnectionRunner> _pendingRunners; // ClientConnectionRunner for clients w/out a Dest yet
// Destination --> ClientConnectionRunner
// Locked for adds/removes but not lookups
private final Map<Destination, ClientConnectionRunner> _runners;
// Same as what's in _runners, but for fast lookup by Hash
// Locked for adds/removes but not lookups
private final Map<Hash, ClientConnectionRunner> _runnersByHash;
// ClientConnectionRunner for clients w/out a Dest yet
private final Set<ClientConnectionRunner> _pendingRunners;
private final RouterContext _ctx;
private volatile boolean _isStarted;
@@ -61,7 +69,8 @@ class ClientManager {
// "How large are messages received by the client?",
// "ClientMessages",
// new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
_runners = new HashMap();
_runners = new ConcurrentHashMap();
_runnersByHash = new ConcurrentHashMap();
_pendingRunners = new HashSet();
startListeners(port);
}
@@ -116,6 +125,7 @@ class ClientManager {
ClientConnectionRunner runner = iter.next();
runner.disconnectClient(msg, Log.WARN);
}
_runnersByHash.clear();
}
/**
@@ -159,8 +169,10 @@ class ClientManager {
}
if ( (runner.getConfig() != null) && (runner.getConfig().getDestination() != null) ) {
// after connection establishment
Destination dest = runner.getConfig().getDestination();
synchronized (_runners) {
_runners.remove(runner.getConfig().getDestination());
_runners.remove(dest);
_runnersByHash.remove(dest.calculateHash());
}
}
}
@@ -178,9 +190,11 @@ class ClientManager {
}
boolean fail = false;
synchronized (_runners) {
fail = _runners.containsKey(dest);
if (!fail)
fail = _runnersByHash.containsKey(dest.calculateHash());
if (!fail) {
_runners.put(dest, runner);
_runnersByHash.put(dest.calculateHash(), runner);
}
}
if (fail) {
_log.log(Log.CRIT, "Client attempted to register duplicate destination " + dest.calculateHash().toBase64());
@@ -290,32 +304,19 @@ class ClientManager {
}
}
/**
* Unsynchronized
*/
public boolean isLocal(Destination dest) {
boolean rv = false;
long beforeLock = _ctx.clock().now();
long inLock = 0;
synchronized (_runners) {
inLock = _ctx.clock().now();
rv = _runners.containsKey(dest);
}
long afterLock = _ctx.clock().now();
if (afterLock - beforeLock > 50) {
_log.warn("isLocal(Destination).locking took too long: " + (afterLock-beforeLock)
+ " overall, synchronized took " + (inLock - beforeLock));
}
return rv;
return _runners.containsKey(dest);
}
/**
* Unsynchronized
*/
public boolean isLocal(Hash destHash) {
if (destHash == null) return false;
synchronized (_runners) {
for (Iterator iter = _runners.values().iterator(); iter.hasNext(); ) {
ClientConnectionRunner cur = (ClientConnectionRunner)iter.next();
if (destHash.equals(cur.getDestHash())) return true;
}
}
return false;
return _runnersByHash.containsKey(destHash);
}
/**
@@ -328,29 +329,21 @@ class ClientManager {
return !Boolean.valueOf(runner.getConfig().getOptions().getProperty(ClientManagerFacade.PROP_CLIENT_ONLY)).booleanValue();
}
/**
* Unsynchronized
*/
public Set<Destination> listClients() {
Set<Destination> rv = new HashSet();
synchronized (_runners) {
rv.addAll(_runners.keySet());
}
rv.addAll(_runners.keySet());
return rv;
}
/**
* Unsynchronized
*/
ClientConnectionRunner getRunner(Destination dest) {
ClientConnectionRunner rv = null;
long beforeLock = _ctx.clock().now();
long inLock = 0;
synchronized (_runners) {
inLock = _ctx.clock().now();
rv = _runners.get(dest);
}
long afterLock = _ctx.clock().now();
if (afterLock - beforeLock > 50) {
_log.warn("getRunner(Dest).locking took too long: " + (afterLock-beforeLock)
+ " overall, synchronized took " + (inLock - beforeLock));
}
return rv;
return _runners.get(dest);
}
/**
@@ -378,17 +371,13 @@ class ClientManager {
return null;
}
/**
* Unsynchronized
*/
private ClientConnectionRunner getRunner(Hash destHash) {
if (destHash == null)
return null;
synchronized (_runners) {
for (Iterator<ClientConnectionRunner> iter = _runners.values().iterator(); iter.hasNext(); ) {
ClientConnectionRunner cur = iter.next();
if (cur.getDestHash().equals(destHash))
return cur;
}
}
return null;
return _runnersByHash.get(destHash);
}
public void messageDeliveryStatusUpdate(Destination fromDest, MessageId id, boolean delivered) {
@@ -407,18 +396,7 @@ class ClientManager {
Set<Destination> getRunnerDestinations() {
Set<Destination> dests = new HashSet();
long beforeLock = _ctx.clock().now();
long inLock = 0;
synchronized (_runners) {
inLock = _ctx.clock().now();
dests.addAll(_runners.keySet());
}
long afterLock = _ctx.clock().now();
if (afterLock - beforeLock > 50) {
_log.warn("getRunnerDestinations().locking took too long: " + (afterLock-beforeLock)
+ " overall, synchronized took " + (inLock - beforeLock));
}
dests.addAll(_runners.keySet());
return dests;
}