* NTCP: Reduce conLock contention

This commit is contained in:
zzz
2012-10-03 17:40:59 +00:00
parent a1873e74e5
commit d2c1641569

View File

@@ -17,6 +17,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import net.i2p.data.DataHelper;
import net.i2p.data.Hash;
@@ -140,7 +141,7 @@ public class NTCPTransport extends TransportImpl {
_context.statManager().createRateStat("ntcp.writeError", "", "ntcp", RATES);
_establishing = new ConcurrentHashSet(16);
_conLock = new Object();
_conByIdent = new HashMap(64);
_conByIdent = new ConcurrentHashMap(64);
_finisher = new NTCPSendFinisher(ctx, this);
@@ -160,7 +161,7 @@ public class NTCPTransport extends TransportImpl {
_context.statManager().addRateData("ntcp.inboundEstablished", 1);
markReachable(con.getRemotePeer().calculateHash(), true);
//_context.shitlist().unshitlistRouter(con.getRemotePeer().calculateHash());
NTCPConnection old = null;
NTCPConnection old;
synchronized (_conLock) {
old = _conByIdent.put(con.getRemotePeer().calculateHash(), con);
}
@@ -263,6 +264,7 @@ public class NTCPTransport extends TransportImpl {
public void afterSend(OutNetMessage msg, boolean sendSuccessful, boolean allowRequeue, long msToSend) {
super.afterSend(msg, sendSuccessful, allowRequeue, msToSend);
}
public TransportBid bid(RouterInfo toAddress, long dataSize) {
if (!isAlive())
return null;
@@ -354,26 +356,23 @@ public class NTCPTransport extends TransportImpl {
@Override
public boolean isEstablished(Hash dest) {
synchronized (_conLock) {
NTCPConnection con = _conByIdent.get(dest);
return (con != null) && con.isEstablished() && !con.isClosed();
}
}
@Override
public boolean isBacklogged(Hash dest) {
synchronized (_conLock) {
NTCPConnection con = _conByIdent.get(dest);
return (con != null) && con.isEstablished() && con.tooBacklogged();
}
}
void removeCon(NTCPConnection con) {
NTCPConnection removed = null;
synchronized (_conLock) {
RouterIdentity ident = con.getRemotePeer();
if (ident != null)
RouterIdentity ident = con.getRemotePeer();
if (ident != null) {
synchronized (_conLock) {
removed = _conByIdent.remove(ident.calculateHash());
}
}
if ( (removed != null) && (removed != con) ) {// multiple cons, close 'em both
if (_log.shouldLog(Log.WARN))
@@ -388,19 +387,17 @@ public class NTCPTransport extends TransportImpl {
*
*/
@Override
public int countActivePeers() { synchronized (_conLock) { return _conByIdent.size(); } }
public int countActivePeers() { return _conByIdent.size(); }
/**
* How many peers are we actively sending messages to (this minute)
*/
@Override
public int countActiveSendPeers() {
int active = 0;
synchronized (_conLock) {
for (Iterator iter = _conByIdent.values().iterator(); iter.hasNext(); ) {
NTCPConnection con = (NTCPConnection)iter.next();
for (NTCPConnection con : _conByIdent.values()) {
if ( (con.getTimeSinceSend() <= 60*1000) || (con.getTimeSinceReceive() <= 60*1000) )
active++;
}
}
return active;
}
@@ -416,16 +413,9 @@ public class NTCPTransport extends TransportImpl {
*/
@Override
public Vector<Long> getClockSkews() {
Vector<NTCPConnection> peers = new Vector();
Vector<Long> skews = new Vector();
synchronized (_conLock) {
peers.addAll(_conByIdent.values());
}
for (Iterator<NTCPConnection> iter = peers.iterator(); iter.hasNext(); ) {
NTCPConnection con = iter.next();
for (NTCPConnection con : _conByIdent.values()) {
if (con.isEstablished())
skews.addElement(Long.valueOf(con.getClockSkew()));
}
@@ -603,6 +593,7 @@ public class NTCPTransport extends TransportImpl {
void establishing(NTCPConnection con) {
_establishing.add(con);
}
/**
* called in the EventPumper no more than once a second or so, closing
* any unconnected/unestablished connections
@@ -694,12 +685,10 @@ public class NTCPTransport extends TransportImpl {
@Override
public short getReachabilityStatus() {
if (isAlive() && _myAddress != null) {
synchronized (_conLock) {
for (NTCPConnection con : _conByIdent.values()) {
if (con.isInbound())
return CommSystemFacade.STATUS_OK;
}
}
}
return CommSystemFacade.STATUS_UNKNOWN;
}
@@ -727,17 +716,17 @@ public class NTCPTransport extends TransportImpl {
// will this work?
replaceAddress(null);
}
public static final String STYLE = "NTCP";
public void renderStatusHTML(java.io.Writer out, int sortFlags) throws IOException {}
@Override
public void renderStatusHTML(java.io.Writer out, String urlBase, int sortFlags) throws IOException {
TreeSet peers = new TreeSet(getComparator(sortFlags));
synchronized (_conLock) {
peers.addAll(_conByIdent.values());
}
long offsetTotal = 0;
peers.addAll(_conByIdent.values());
long offsetTotal = 0;
float bpsSend = 0;
float bpsRecv = 0;
long totalUptime = 0;
@@ -838,6 +827,7 @@ public class NTCPTransport extends TransportImpl {
}
private static final NumberFormat _rateFmt = new DecimalFormat("#,##0.00");
private static String formatRate(float rate) {
synchronized (_rateFmt) { return _rateFmt.format(rate); }
}
@@ -858,14 +848,12 @@ public class NTCPTransport extends TransportImpl {
public static final AlphaComparator instance() { return _instance; }
}
private static class PeerComparator implements Comparator {
public int compare(Object lhs, Object rhs) {
if ( (lhs == null) || (rhs == null) || !(lhs instanceof NTCPConnection) || !(rhs instanceof NTCPConnection))
throw new IllegalArgumentException("rhs = " + rhs + " lhs = " + lhs);
return compare((NTCPConnection)lhs, (NTCPConnection)rhs);
}
protected int compare(NTCPConnection l, NTCPConnection r) {
private static class PeerComparator implements Comparator<NTCPConnection> {
public int compare(NTCPConnection l, NTCPConnection r) {
if (l == null || r == null)
throw new IllegalArgumentException();
// base64 retains binary ordering
// UM, no it doesn't, but close enough
return l.getRemotePeer().calculateHash().toBase64().compareTo(r.getRemotePeer().calculateHash().toBase64());
}
}