Transports: Add mayDisconnect() advisory which says we

don't expect more messages on this connection; use for BuildHandler
Rename some dest arguments to peer for clarity
UDP: Display messages, not packets, sent/rcvd on /peers
Don't count duplicates in received message count
Count sent messages when sent, not acked
Move some PeerState counters from longs to ints to save space
This commit is contained in:
zzz
2015-12-20 14:15:48 +00:00
parent 0b94d866f0
commit d5990cc0f2
12 changed files with 217 additions and 48 deletions

View File

@@ -70,6 +70,7 @@ public abstract class CommSystemFacade implements Service {
*
* @deprecated use getStatus()
*/
@Deprecated
public short getReachabilityStatus() { return (short) getStatus().getCode(); }
/**
@@ -81,13 +82,22 @@ public abstract class CommSystemFacade implements Service {
/**
* @deprecated unused
*/
@Deprecated
public void recheckReachability() {}
public boolean isBacklogged(Hash dest) { return false; }
public boolean wasUnreachable(Hash dest) { return false; }
public boolean isEstablished(Hash dest) { return false; }
public boolean isBacklogged(Hash peer) { return false; }
public boolean wasUnreachable(Hash peer) { return false; }
public boolean isEstablished(Hash peer) { return false; }
public byte[] getIP(Hash dest) { return null; }
public void queueLookup(byte[] ip) {}
/**
* Tell the comm system that we may disconnect from this peer.
* This is advisory only.
*
* @since 0.9.24
*/
public void mayDisconnect(Hash peer) {}
/** @since 0.8.11 */
public String getOurCountry() { return null; }

View File

@@ -155,23 +155,34 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
}
@Override
public boolean isBacklogged(Hash dest) {
return _manager.isBacklogged(dest);
public boolean isBacklogged(Hash peer) {
return _manager.isBacklogged(peer);
}
@Override
public boolean isEstablished(Hash dest) {
return _manager.isEstablished(dest);
public boolean isEstablished(Hash peer) {
return _manager.isEstablished(peer);
}
@Override
public boolean wasUnreachable(Hash dest) {
return _manager.wasUnreachable(dest);
public boolean wasUnreachable(Hash peer) {
return _manager.wasUnreachable(peer);
}
@Override
public byte[] getIP(Hash dest) {
return _manager.getIP(dest);
public byte[] getIP(Hash peer) {
return _manager.getIP(peer);
}
/**
* Tell the comm system that we may disconnect from this peer.
* This is advisory only.
*
* @since 0.9.24
*/
@Override
public void mayDisconnect(Hash peer) {
_manager.mayDisconnect(peer);
}
@Override
@@ -196,6 +207,7 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
* @deprecated unused
*/
@Override
@Deprecated
public void recheckReachability() { _manager.recheckReachability(); }
@Override

View File

@@ -171,4 +171,12 @@ public interface Transport {
public boolean isUnreachable(Hash peer);
public boolean isEstablished(Hash peer);
/**
* Tell the transport that we may disconnect from this peer.
* This is advisory only.
*
* @since 0.9.24
*/
public void mayDisconnect(Hash peer);
}

View File

@@ -809,6 +809,7 @@ public abstract class TransportImpl implements Transport {
/**
* @deprecated unused
*/
@Deprecated
public void recheckReachability() {}
/**
@@ -818,8 +819,16 @@ public abstract class TransportImpl implements Transport {
return TransportUtil.isIPv4Firewalled(_context, getStyle());
}
public boolean isBacklogged(Hash dest) { return false; }
public boolean isEstablished(Hash dest) { return false; }
public boolean isBacklogged(Hash peer) { return false; }
public boolean isEstablished(Hash peer) { return false; }
/**
* Tell the transport that we may disconnect from this peer.
* This is advisory only.
*
* @since 0.9.24
*/
public void mayDisconnect(Hash peer) {}
public boolean isUnreachable(Hash peer) {
long now = _context.clock().now();

View File

@@ -407,35 +407,48 @@ public class TransportManager implements TransportEventListener {
/**
* @deprecated unused
*/
@Deprecated
public void recheckReachability() {
for (Transport t : _transports.values())
t.recheckReachability();
}
public boolean isBacklogged(Hash dest) {
public boolean isBacklogged(Hash peer) {
for (Transport t : _transports.values()) {
if (t.isBacklogged(dest))
if (t.isBacklogged(peer))
return true;
}
return false;
}
public boolean isEstablished(Hash dest) {
public boolean isEstablished(Hash peer) {
for (Transport t : _transports.values()) {
if (t.isEstablished(dest))
if (t.isEstablished(peer))
return true;
}
return false;
}
/**
* Tell the transports that we may disconnect from this peer.
* This is advisory only.
*
* @since 0.9.24
*/
public void mayDisconnect(Hash peer) {
for (Transport t : _transports.values()) {
t.mayDisconnect(peer);
}
}
/**
* Was the peer UNreachable (outbound only) on any transport,
* based on the last time we tried it for each transport?
* This is NOT reset if the peer contacts us.
*/
public boolean wasUnreachable(Hash dest) {
public boolean wasUnreachable(Hash peer) {
for (Transport t : _transports.values()) {
if (!t.wasUnreachable(dest))
if (!t.wasUnreachable(peer))
return false;
}
return true;
@@ -452,8 +465,8 @@ public class TransportManager implements TransportEventListener {
*
* @return IPv4 or IPv6 or null
*/
public byte[] getIP(Hash dest) {
return TransportImpl.getIP(dest);
public byte[] getIP(Hash peer) {
return TransportImpl.getIP(peer);
}
/**
@@ -745,8 +758,8 @@ public class TransportManager implements TransportEventListener {
//"<b id=\"def.dev\">").append(_t("Dev")).append("</b>: ").append(_t("The standard deviation of the round trip time in milliseconds")).append("<br>\n" +
"<b id=\"def.rto\">RTO</b>: ").append(_t("The retransmit timeout in milliseconds")).append("<br>\n" +
"<b id=\"def.mtu\">MTU</b>: ").append(_t("Current maximum send packet size / estimated maximum receive packet size (bytes)")).append("<br>\n" +
"<b id=\"def.send\">").append(_t("TX")).append("</b>: ").append(_t("The total number of packets sent to the peer")).append("<br>\n" +
"<b id=\"def.recv\">").append(_t("RX")).append("</b>: ").append(_t("The total number of packets received from the peer")).append("<br>\n" +
"<b id=\"def.send\">").append(_t("TX")).append("</b>: ").append(_t("The total number of messages sent to the peer")).append("<br>\n" +
"<b id=\"def.recv\">").append(_t("RX")).append("</b>: ").append(_t("The total number of messages received from the peer")).append("<br>\n" +
"<b id=\"def.resent\">").append(_t("Dup TX")).append("</b>: ").append(_t("The total number of packets retransmitted to the peer")).append("<br>\n" +
"<b id=\"def.dupRecv\">").append(_t("Dup RX")).append("</b>: ").append(_t("The total number of duplicate packets received from the peer")).append("</p>" +
"</div>\n");

View File

@@ -85,6 +85,7 @@ class EventPumper implements Runnable {
/** tunnel test now disabled, but this should be long enough to allow an active tunnel to get started */
private static final long MIN_EXPIRE_IDLE_TIME = 120*1000l;
private static final long MAX_EXPIRE_IDLE_TIME = 11*60*1000l;
private static final long MAY_DISCON_TIMEOUT = 10*1000;
/**
* Do we use direct buffers for reading? Default false.
@@ -221,7 +222,8 @@ class EventPumper implements Runnable {
int failsafeInvalid = 0;
// Increase allowed idle time if we are well under allowed connections, otherwise decrease
if (_transport.haveCapacity(33))
boolean haveCap = _transport.haveCapacity(33);
if (haveCap)
_expireIdleWriteTime = Math.min(_expireIdleWriteTime + 1000, MAX_EXPIRE_IDLE_TIME);
else
_expireIdleWriteTime = Math.max(_expireIdleWriteTime - 3000, MIN_EXPIRE_IDLE_TIME);
@@ -270,8 +272,16 @@ class EventPumper implements Runnable {
failsafeWrites++;
}
if ( con.getTimeSinceSend() > _expireIdleWriteTime &&
con.getTimeSinceReceive() > _expireIdleWriteTime) {
final long expire;
if (!haveCap && con.getMayDisconnect() &&
con.getMessagesReceived() <= 2 && con.getMessagesSent() <= 1) {
expire = MAY_DISCON_TIMEOUT;
} else {
expire = _expireIdleWriteTime;
}
if ( con.getTimeSinceSend() > expire &&
con.getTimeSinceReceive() > expire) {
// we haven't sent or received anything in a really long time, so lets just close 'er up
con.close();
failsafeCloses++;

View File

@@ -13,6 +13,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.Adler32;
@@ -115,8 +116,8 @@ class NTCPConnection implements Closeable {
private byte _prevWriteEnd[];
/** current partially read I2NP message */
private final ReadState _curReadState;
private final AtomicLong _messagesRead = new AtomicLong();
private final AtomicLong _messagesWritten = new AtomicLong();
private final AtomicInteger _messagesRead = new AtomicInteger();
private final AtomicInteger _messagesWritten = new AtomicInteger();
private long _lastSendTime;
private long _lastReceiveTime;
private long _lastRateUpdated;
@@ -134,6 +135,7 @@ class NTCPConnection implements Closeable {
/** how many consecutive sends were failed due to (estimated) send queue time */
//private int _consecutiveBacklog;
private long _nextInfoTime;
private boolean _mayDisconnect;
/*
* Update frequency for send/recv rates in console peers page
@@ -325,11 +327,11 @@ class NTCPConnection implements Closeable {
return _context.clock().now() -_establishedOn;
}
public long getMessagesSent() { return _messagesWritten.get(); }
public int getMessagesSent() { return _messagesWritten.get(); }
public long getMessagesReceived() { return _messagesRead.get(); }
public int getMessagesReceived() { return _messagesRead.get(); }
public long getOutboundQueueSize() {
public int getOutboundQueueSize() {
int queued;
synchronized(_outbound) {
queued = _outbound.size();
@@ -360,6 +362,17 @@ class NTCPConnection implements Closeable {
*/
public long getCreated() { return _created; }
/**
* Sets to true.
* @since 0.9.24
*/
public void setMayDisconnect() { _mayDisconnect = true; }
/**
* @since 0.9.24
*/
public boolean getMayDisconnect() { return _mayDisconnect; }
/**
* workaround for EventPumper
* @since 0.8.12

View File

@@ -486,6 +486,21 @@ public class NTCPTransport extends TransportImpl {
return (con != null) && con.isEstablished() && con.tooBacklogged();
}
/**
* Tell the transport that we may disconnect from this peer.
* This is advisory only.
*
* @since 0.9.24
*/
@Override
public void mayDisconnect(final Hash peer) {
final NTCPConnection con = _conByIdent.get(peer);
if (con != null && con.isEstablished() && con.isInbound() &&
con.getMessagesReceived() <= 2 && con.getMessagesSent() <= 1) {
con.setMayDisconnect();
}
}
/**
* @return usually the con passed in, but possibly a second connection with the same peer...
*/

View File

@@ -177,11 +177,14 @@ class OutboundMessageState implements CDPQEntry {
/**
* Note that we have pushed the message fragments.
* Increments push count (and max sends... why?)
* @return true if this is the first push
*/
public synchronized void push() {
public synchronized boolean push() {
boolean rv = _pushCount == 0;
// these will never be different...
_pushCount++;
_maxSends = _pushCount;
return rv;
}
/**

View File

@@ -198,6 +198,7 @@ class PeerState {
/** how many dup packets were received within the last RETRANSMISSION_PERIOD_WIDTH packets */
private int _packetsReceivedDuplicate;
private int _packetsReceived;
private boolean _mayDisconnect;
/** list of InboundMessageState for active message */
private final Map<Long, InboundMessageState> _inboundMessages;
@@ -447,6 +448,7 @@ class PeerState {
* @return false always
* @deprecated unused, ECNs are never sent, always returns false
*/
@Deprecated
public boolean getCurrentSecondECNReceived() { return _currentSecondECNReceived; }
/**
@@ -542,6 +544,7 @@ class PeerState {
* connection, or null if we are not in the process of rekeying.
* @deprecated unused
*/
@Deprecated
public void setNextMACKey(SessionKey key) { _nextMACKey = key; }
/**
@@ -550,6 +553,7 @@ class PeerState {
* of rekeying.
* @deprecated unused
*/
@Deprecated
public void setNextCipherKey(SessionKey key) { _nextCipherKey = key; }
/**
@@ -569,6 +573,7 @@ class PeerState {
* when were the current cipher and MAC keys established/rekeyed?
* @deprecated unused
*/
@Deprecated
public void setKeyEstablishedTime(long when) { _keyEstablishedTime = when; }
/**
@@ -771,10 +776,18 @@ class PeerState {
public long getIntroducerTime() { return _lastIntroducerTime; }
public void setIntroducerTime() { _lastIntroducerTime = _context.clock().now(); }
/** we received the message specified completely */
/**
* We received the message specified completely.
* @param bytes if less than or equal to zero, message is a duplicate.
*/
public void messageFullyReceived(Long messageId, int bytes) { messageFullyReceived(messageId, bytes, false); }
public synchronized void messageFullyReceived(Long messageId, int bytes, boolean isForACK) {
/**
* We received the message specified completely.
* @param isForACK unused
* @param bytes if less than or equal to zero, message is a duplicate.
*/
private synchronized void messageFullyReceived(Long messageId, int bytes, boolean isForACK) {
if (bytes > 0) {
_receiveBytes += bytes;
//if (isForACK)
@@ -803,7 +816,6 @@ class PeerState {
if (_wantACKSendSince <= 0)
_wantACKSendSince = now;
_currentACKs.add(messageId);
_messagesReceived++;
}
public void messagePartiallyReceived() {
@@ -958,6 +970,7 @@ class PeerState {
* @return non-null, possibly empty
* @deprecated unused
*/
@Deprecated
public List<ACKBitfield> retrieveACKBitfields() { return retrieveACKBitfields(true); }
/**
@@ -1027,10 +1040,6 @@ class PeerState {
}
}
int partialIncluded = 0;
if (bytesRemaining > 4) {
// ok, there's room to *try* to fit in some partial ACKs, so
@@ -1274,8 +1283,23 @@ class PeerState {
/** how skewed are the measured RTTs? */
public synchronized int getRTTDeviation() { return _rttDeviation; }
public synchronized int getMessagesSent() { return _messagesSent; }
/**
* I2NP messages sent.
* Does not include duplicates.
* As of 0.9.24, incremented when bandwidth is allocated just before sending, not when acked.
*/
public int getMessagesSent() {
synchronized (_outboundMessages) {
return _messagesSent;
}
}
/**
* I2NP messages received.
* As of 0.9.24, does not include duplicates.
*/
public synchronized int getMessagesReceived() { return _messagesReceived; }
public synchronized int getPacketsTransmitted() { return _packetsTransmitted; }
public synchronized int getPacketsRetransmitted() { return _packetsRetransmitted; }
//public long getPacketsPeriodTransmitted() { return _packetsPeriodTransmitted; }
@@ -1339,6 +1363,7 @@ class PeerState {
public long getLastACKSend() { return _lastACKSend; }
/** @deprecated unused */
@Deprecated
public void setLastACKSend(long when) { _lastACKSend = when; }
public long getWantedACKSendSince() { return _wantACKSendSince; }
@@ -1498,6 +1523,18 @@ class PeerState {
if (_dead) return 0;
return _outboundMessages.size() + _outboundQueue.size();
}
/**
* Sets to true.
* @since 0.9.24
*/
public void setMayDisconnect() { _mayDisconnect = true; }
/**
* @since 0.9.24
*/
public boolean getMayDisconnect() { return _mayDisconnect; }
/**
* Expire / complete any outbound messages
@@ -1771,7 +1808,8 @@ class PeerState {
if (state.getPushCount() > 0)
_retransmitter = state;
state.push();
if (state.push())
_messagesSent++;
int rto = getRTO();
state.setNextSendTime(now + rto);
@@ -2062,8 +2100,10 @@ class PeerState {
buf.append(" cwin: ").append(_sendWindowBytes);
buf.append(" acwin: ").append(_sendWindowBytesRemaining);
buf.append(" consecFail: ").append(_consecutiveFailedSends);
buf.append(" recv OK/Dup: ").append(_packetsReceived).append('/').append(_packetsReceivedDuplicate);
buf.append(" send OK/Dup: ").append(_packetsTransmitted).append('/').append(_packetsRetransmitted);
buf.append(" msgs rcvd: ").append(_messagesReceived);
buf.append(" msgs sent: ").append(_messagesSent);
buf.append(" pkts rcvd OK/Dup: ").append(_packetsReceived).append('/').append(_packetsReceivedDuplicate);
buf.append(" pkts sent OK/Dup: ").append(_packetsTransmitted).append('/').append(_packetsRetransmitted);
buf.append(" IBM: ").append(_inboundMessages.size());
buf.append(" OBQ: ").append(_outboundQueue.size());
buf.append(" OBL: ").append(_outboundMessages.size());

View File

@@ -2432,6 +2432,21 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
return peer != null && peer.isBacklogged();
}
/**
* Tell the transport that we may disconnect from this peer.
* This is advisory only.
*
* @since 0.9.24
*/
@Override
public void mayDisconnect(final Hash peer) {
final PeerState ps = _peersByIdent.get(peer);
if (ps != null && ps.isInbound() &&
ps.getMessagesReceived() <= 2 && ps.getMessagesSent() <= 1) {
ps.setMayDisconnect();
}
}
public boolean allowConnection() {
return _peersByIdent.size() < getMaxConnections();
}
@@ -2678,8 +2693,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
//buf.append(peer.getMTUDecreases());
buf.append("</td>");
long sent = peer.getPacketsTransmitted();
long recv = peer.getPacketsReceived();
long sent = peer.getMessagesSent();
long recv = peer.getMessagesReceived();
buf.append("<td class=\"cells\" align=\"right\">");
buf.append(sent);
@@ -2820,6 +2835,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
private static final long LONG_LOOP_TIME = 25*1000;
private static final long EXPIRE_INCREMENT = 15*1000;
private static final long EXPIRE_DECREMENT = 45*1000;
private static final long MAY_DISCON_TIMEOUT = 10*1000;
public ExpirePeerEvent() {
super(_context.simpleTimer2());
@@ -2829,7 +2845,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
public void timeReached() {
// Increase allowed idle time if we are well under allowed connections, otherwise decrease
if (haveCapacity(33)) {
boolean haveCap = haveCapacity(33);
if (haveCap) {
long inc;
// don't adjust too quickly if we are looping fast
if (_lastLoopShort)
@@ -2848,6 +2865,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
long now = _context.clock().now();
long shortInactivityCutoff = now - _expireTimeout;
long longInactivityCutoff = now - EXPIRE_TIMEOUT;
final long mayDisconCutoff = now - MAY_DISCON_TIMEOUT;
long pingCutoff = now - (2 * 60*60*1000);
long pingFirewallCutoff = now - PING_FIREWALL_CUTOFF;
boolean shouldPingFirewall = _reachabilityStatus != Status.OK;
@@ -2862,10 +2880,14 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
PeerState peer = iter.next();
long inactivityCutoff;
// if we offered to introduce them, or we used them as introducer in last 2 hours
if (peer.getWeRelayToThemAs() > 0 || peer.getIntroducerTime() > pingCutoff)
if (peer.getWeRelayToThemAs() > 0 || peer.getIntroducerTime() > pingCutoff) {
inactivityCutoff = longInactivityCutoff;
else
} else if (!haveCap && peer.getMayDisconnect() &&
peer.getMessagesReceived() <= 2 && peer.getMessagesSent() <= 1) {
inactivityCutoff = mayDisconCutoff;
} else {
inactivityCutoff = shortInactivityCutoff;
}
if ( (peer.getLastReceiveTime() < inactivityCutoff) && (peer.getLastSendTime() < inactivityCutoff) ) {
_expireBuffer.add(peer);
iter.remove();

View File

@@ -638,6 +638,8 @@ class BuildHandler implements Runnable {
if (isInGW && isOutEnd) {
_context.statManager().addRateData("tunnel.rejectHostile", 1);
_log.error("Dropping build request, IBGW+OBEP");
if (from != null)
_context.commSystem().mayDisconnect(from);
return;
}
@@ -649,6 +651,8 @@ class BuildHandler implements Runnable {
// old i2pd
if (_log.shouldWarn())
_log.warn("Dropping build request, we are the next hop");
if (from != null)
_context.commSystem().mayDisconnect(from);
return;
}
if (!isInGW) {
@@ -669,6 +673,7 @@ class BuildHandler implements Runnable {
_context.statManager().addRateData("tunnel.rejectHostile", 1);
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping build request with the same previous and next hop");
_context.commSystem().mayDisconnect(from);
return;
}
}
@@ -683,12 +688,16 @@ class BuildHandler implements Runnable {
_context.statManager().addRateData("tunnel.rejectTooOld", 1);
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping build request too old... replay attack? " + DataHelper.formatDuration(timeDiff));
if (from != null)
_context.commSystem().mayDisconnect(from);
return;
}
if (timeDiff < 0 - MAX_REQUEST_FUTURE) {
_context.statManager().addRateData("tunnel.rejectFuture", 1);
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping build request too far in future " + DataHelper.formatDuration(0 - timeDiff));
if (from != null)
_context.commSystem().mayDisconnect(from);
return;
}
@@ -844,6 +853,8 @@ class BuildHandler implements Runnable {
state.msg.getUniqueId() + "/" + ourId + "/" + req.readNextTunnelId() + " delay " +
recvDelay + " as " +
(isOutEnd ? "outbound endpoint" : isInGW ? "inbound gw" : "participant"));
if (from != null)
_context.commSystem().mayDisconnect(from);
// Connection congestion control:
// If we rejected the request, are near our conn limits, and aren't connected to the next hop,
// just drop it.
@@ -856,6 +867,9 @@ class BuildHandler implements Runnable {
_log.warn("Not sending rejection due to conn limits");
return;
}
} else if (isInGW && from != null) {
// we're the start of the tunnel, no use staying connected
_context.commSystem().mayDisconnect(from);
}
EncryptedBuildRecord reply = BuildResponseRecord.create(_context, response, req.readReplyKey(), req.readReplyIV(), state.msg.getUniqueId());