merge of '22cebc21c21e3a101e03165f26a5e9fddc3648da'

and 'e210f94f3d17359b39a6b24f2a5e1221a86abfd0'
This commit is contained in:
z3r0fox
2015-12-20 21:07:16 +00:00
20 changed files with 340 additions and 83 deletions

View File

@@ -155,7 +155,7 @@ public class ConfigServiceHandler extends FormHandler {
/**
* Register a handler for signals,
* so we can handle HUP from the wrapper (non-Windows only, wrapper 3.2.0 or higher)
* so we can handle HUP from the wrapper (wrapper 3.2.0 or higher)
*
* @since 0.8.13
*/

View File

@@ -64,8 +64,11 @@ class SybilRenderer {
private static final double MIN_CLOSE = 242.0;
private static final double PAIR_DISTANCE_FACTOR = 2.0;
private static final double OUR_KEY_FACTOR = 4.0;
private static final double MIN_DISPLAY_POINTS = 3.0;
private static final double MIN_DISPLAY_POINTS = 5.0;
private static final double VERSION_FACTOR = 1.0;
private static final double POINTS_BAD_VERSION = 50.0;
private static final double POINTS_UNREACHABLE = 4.0;
private static final double POINTS_NEW = 4.0;
public SybilRenderer(RouterContext ctx) {
_context = ctx;
@@ -616,8 +619,8 @@ class SybilRenderer {
if (heard > 0) {
long age = Math.max(now - heard, 1);
if (age < 2 * DAY) {
// .125 point for every hour under 48, max 6 points
double point = Math.min(6.0d, (2 * DAY - age) / (2 * DAY / 6.0d));
// (POINTS_NEW / 48) for every hour under 48, max POINTS_NEW
double point = Math.min(POINTS_NEW, (2 * DAY - age) / (2 * DAY / POINTS_NEW));
addPoints(points, h, point,
"First heard about: " + _t("{0} ago", DataHelper.formatDuration2(age)));
}
@@ -656,9 +659,12 @@ class SybilRenderer {
} catch (NumberFormatException nfe) { return; }
for (RouterInfo info : ris) {
Hash h = info.getHash();
String caps = info.getCapabilities();
if (!caps.contains("R"))
addPoints(points, h, POINTS_UNREACHABLE, "Unreachable: " + DataHelper.escapeHTML(caps));
String hisFullVer = info.getVersion();
if (!hisFullVer.startsWith("0.9.")) {
addPoints(points, h, 50.0, "Strange version " + DataHelper.escapeHTML(hisFullVer));
addPoints(points, h, POINTS_BAD_VERSION, "Strange version " + DataHelper.escapeHTML(hisFullVer));
continue;
}
String hisVer = hisFullVer.substring(4);

View File

@@ -8,6 +8,8 @@ import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.security.GeneralSecurityException;
import java.security.InvalidKeyException;
import java.security.PrivateKey;
import java.security.PublicKey;
import java.security.cert.Certificate;
import java.security.cert.CertificateFactory;
@@ -36,31 +38,16 @@ public class CertUtil {
private static final int LINE_LENGTH = 64;
/**
* Modified from:
* http://www.exampledepot.com/egs/java.security.cert/ExportCert.html
*
* This method writes a certificate to a file in base64 format.
* Write a certificate to a file in base64 format.
*
* @return success
* @since 0.8.2, moved from SSLEepGet in 0.9.9
*/
public static boolean saveCert(Certificate cert, File file) {
OutputStream os = null;
PrintWriter wr = null;
try {
// Get the encoded form which is suitable for exporting
byte[] buf = cert.getEncoded();
os = new SecureFileOutputStream(file);
wr = new PrintWriter(new OutputStreamWriter(os, "UTF-8"));
wr.println("-----BEGIN CERTIFICATE-----");
String b64 = Base64.encode(buf, true); // true = use standard alphabet
for (int i = 0; i < b64.length(); i += LINE_LENGTH) {
wr.println(b64.substring(i, Math.min(i + LINE_LENGTH, b64.length())));
}
wr.println("-----END CERTIFICATE-----");
wr.flush();
if (wr.checkError())
throw new IOException("Failed write to " + file);
exportCert(cert, os);
return true;
} catch (CertificateEncodingException cee) {
error("Error writing X509 Certificate " + file.getAbsolutePath(), cee);
@@ -73,6 +60,79 @@ public class CertUtil {
}
}
/**
* Writes the private key and all certs in base64 format.
* Does NOT close the stream. Throws on all errors.
*
* @param pk non-null
* @param certs certificate chain, null or empty to export pk only
* @throws InvalidKeyException if the key does not support encoding
* @throws CertificateEncodingException if a cert does not support encoding
* @since 0.9.24
*/
public static void exportPrivateKey(PrivateKey pk, Certificate[] certs, OutputStream out)
throws IOException, GeneralSecurityException {
exportPrivateKey(pk, out);
if (certs == null)
return;
for (int i = 0; i < certs.length; i++) {
exportCert(certs[i], out);
}
}
/**
* Modified from:
* http://www.exampledepot.com/egs/java.security.cert/ExportCert.html
*
* Writes a certificate in base64 format.
* Does NOT close the stream. Throws on all errors.
*
* @since 0.9.24, pulled out of saveCert()
*/
private static void exportCert(Certificate cert, OutputStream out)
throws IOException, CertificateEncodingException {
// Get the encoded form which is suitable for exporting
byte[] buf = cert.getEncoded();
PrintWriter wr = new PrintWriter(new OutputStreamWriter(out, "UTF-8"));
wr.println("-----BEGIN CERTIFICATE-----");
String b64 = Base64.encode(buf, true); // true = use standard alphabet
for (int i = 0; i < b64.length(); i += LINE_LENGTH) {
wr.println(b64.substring(i, Math.min(i + LINE_LENGTH, b64.length())));
}
wr.println("-----END CERTIFICATE-----");
wr.flush();
if (wr.checkError())
throw new IOException("Failed write to " + out);
}
/**
* Modified from:
* http://www.exampledepot.com/egs/java.security.cert/ExportCert.html
*
* Writes a private key in base64 format.
* Does NOT close the stream. Throws on all errors.
*
* @throws InvalidKeyException if the key does not support encoding
* @since 0.9.24
*/
private static void exportPrivateKey(PrivateKey pk, OutputStream out)
throws IOException, InvalidKeyException {
// Get the encoded form which is suitable for exporting
byte[] buf = pk.getEncoded();
if (buf == null)
throw new InvalidKeyException("encoding unsupported for this key");
PrintWriter wr = new PrintWriter(new OutputStreamWriter(out, "UTF-8"));
wr.println("-----BEGIN PRIVATE KEY-----");
String b64 = Base64.encode(buf, true); // true = use standard alphabet
for (int i = 0; i < b64.length(); i += LINE_LENGTH) {
wr.println(b64.substring(i, Math.min(i + LINE_LENGTH, b64.length())));
}
wr.println("-----END PRIVATE KEY-----");
wr.flush();
if (wr.checkError())
throw new IOException("Failed write to " + out);
}
/**
* Get a value out of the subject distinguished name.
*

View File

@@ -1,3 +1,13 @@
2015-12-20 zzz
* BuildHandler: Additional fixes (ticket #1738)
* CertUtil: Add methods to export private keys
* Console: Sybil tool enhancementsrivate keys
* Transports:
- Disconnect faster when first message is a
tunnel build request which we reject
- Display SSU sent/received messages, not packets,
on /peers to be consistent with NTCP
2015-12-18 zzz
* BuildHandler: Fix NPE (ticket #1738)

View File

@@ -70,6 +70,7 @@ public abstract class CommSystemFacade implements Service {
*
* @deprecated use getStatus()
*/
@Deprecated
public short getReachabilityStatus() { return (short) getStatus().getCode(); }
/**
@@ -81,13 +82,22 @@ public abstract class CommSystemFacade implements Service {
/**
* @deprecated unused
*/
@Deprecated
public void recheckReachability() {}
public boolean isBacklogged(Hash dest) { return false; }
public boolean wasUnreachable(Hash dest) { return false; }
public boolean isEstablished(Hash dest) { return false; }
public boolean isBacklogged(Hash peer) { return false; }
public boolean wasUnreachable(Hash peer) { return false; }
public boolean isEstablished(Hash peer) { return false; }
public byte[] getIP(Hash dest) { return null; }
public void queueLookup(byte[] ip) {}
/**
* Tell the comm system that we may disconnect from this peer.
* This is advisory only.
*
* @since 0.9.24
*/
public void mayDisconnect(Hash peer) {}
/** @since 0.8.11 */
public String getOurCountry() { return null; }

View File

@@ -94,7 +94,7 @@ public class InNetMessagePool implements Service {
* @return previous builder for this message type, or null
* @throws AIOOBE if i2npMessageType is greater than MAX_I2NP_MESSAGE_TYPE
*/
public HandlerJobBuilder registerHandlerJobBuilder(int i2npMessageType, HandlerJobBuilder builder) {
public synchronized HandlerJobBuilder registerHandlerJobBuilder(int i2npMessageType, HandlerJobBuilder builder) {
HandlerJobBuilder old = _handlerJobBuilders[i2npMessageType];
_handlerJobBuilders[i2npMessageType] = builder;
return old;
@@ -103,8 +103,10 @@ public class InNetMessagePool implements Service {
/**
* @return previous builder for this message type, or null
* @throws AIOOBE if i2npMessageType is greater than MAX_I2NP_MESSAGE_TYPE
* @deprecated unused
*/
public HandlerJobBuilder unregisterHandlerJobBuilder(int i2npMessageType) {
@Deprecated
public synchronized HandlerJobBuilder unregisterHandlerJobBuilder(int i2npMessageType) {
HandlerJobBuilder old = _handlerJobBuilders[i2npMessageType];
_handlerJobBuilders[i2npMessageType] = null;
return old;

View File

@@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 15;
public final static long BUILD = 16;
/** for example "-test" */
public final static String EXTRA = "";

View File

@@ -195,11 +195,17 @@ public class StatisticsManager {
if (family.equals(oldRI.getOption(FamilyKeyCrypto.OPT_NAME))) {
// copy over the pubkey and signature
key = oldRI.getOption(FamilyKeyCrypto.OPT_KEY);
if (key != null)
stats.setProperty(FamilyKeyCrypto.OPT_KEY, key);
sig = oldRI.getOption(FamilyKeyCrypto.OPT_SIG);
if (sig != null)
stats.setProperty(FamilyKeyCrypto.OPT_SIG, sig);
if (key != null) {
if (key.contains(";")) {
// we changed the separator from ';' to ':'
key = null;
} else {
stats.setProperty(FamilyKeyCrypto.OPT_KEY, key);
sig = oldRI.getOption(FamilyKeyCrypto.OPT_SIG);
if (sig != null)
stats.setProperty(FamilyKeyCrypto.OPT_SIG, sig);
}
}
}
}
if (sig == null || key == null) {

View File

@@ -134,7 +134,7 @@ public class FamilyKeyCrypto {
throw new GeneralSecurityException("sig failed");
Map<String, String> rv = new HashMap<String, String>(3);
rv.put(OPT_NAME, family);
rv.put(OPT_KEY, _pubkey.getType().getCode() + ";" + _pubkey.toBase64());
rv.put(OPT_KEY, _pubkey.getType().getCode() + ":" + _pubkey.toBase64());
rv.put(OPT_SIG, sig.toBase64());
return rv;
}
@@ -174,13 +174,16 @@ public class FamilyKeyCrypto {
// look for a b64 key in the RI
String skey = ri.getOption(OPT_KEY);
if (skey != null) {
int semi = skey.indexOf(";");
if (semi > 0) {
int colon = skey.indexOf(':');
// switched from ';' to ':' during dev, remove this later
if (colon < 0)
colon = skey.indexOf(';');
if (colon > 0) {
try {
int code = Integer.parseInt(skey.substring(0, semi));
int code = Integer.parseInt(skey.substring(0, colon));
SigType type = SigType.getByCode(code);
if (type != null) {
byte[] bkey = Base64.decode(skey.substring(semi + 1));
byte[] bkey = Base64.decode(skey.substring(colon + 1));
if (bkey != null) {
spk = new SigningPublicKey(type, bkey);
}

View File

@@ -155,23 +155,34 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
}
@Override
public boolean isBacklogged(Hash dest) {
return _manager.isBacklogged(dest);
public boolean isBacklogged(Hash peer) {
return _manager.isBacklogged(peer);
}
@Override
public boolean isEstablished(Hash dest) {
return _manager.isEstablished(dest);
public boolean isEstablished(Hash peer) {
return _manager.isEstablished(peer);
}
@Override
public boolean wasUnreachable(Hash dest) {
return _manager.wasUnreachable(dest);
public boolean wasUnreachable(Hash peer) {
return _manager.wasUnreachable(peer);
}
@Override
public byte[] getIP(Hash dest) {
return _manager.getIP(dest);
public byte[] getIP(Hash peer) {
return _manager.getIP(peer);
}
/**
* Tell the comm system that we may disconnect from this peer.
* This is advisory only.
*
* @since 0.9.24
*/
@Override
public void mayDisconnect(Hash peer) {
_manager.mayDisconnect(peer);
}
@Override
@@ -196,6 +207,7 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
* @deprecated unused
*/
@Override
@Deprecated
public void recheckReachability() { _manager.recheckReachability(); }
@Override

View File

@@ -171,4 +171,12 @@ public interface Transport {
public boolean isUnreachable(Hash peer);
public boolean isEstablished(Hash peer);
/**
* Tell the transport that we may disconnect from this peer.
* This is advisory only.
*
* @since 0.9.24
*/
public void mayDisconnect(Hash peer);
}

View File

@@ -809,6 +809,7 @@ public abstract class TransportImpl implements Transport {
/**
* @deprecated unused
*/
@Deprecated
public void recheckReachability() {}
/**
@@ -818,8 +819,16 @@ public abstract class TransportImpl implements Transport {
return TransportUtil.isIPv4Firewalled(_context, getStyle());
}
public boolean isBacklogged(Hash dest) { return false; }
public boolean isEstablished(Hash dest) { return false; }
public boolean isBacklogged(Hash peer) { return false; }
public boolean isEstablished(Hash peer) { return false; }
/**
* Tell the transport that we may disconnect from this peer.
* This is advisory only.
*
* @since 0.9.24
*/
public void mayDisconnect(Hash peer) {}
public boolean isUnreachable(Hash peer) {
long now = _context.clock().now();

View File

@@ -407,35 +407,48 @@ public class TransportManager implements TransportEventListener {
/**
* @deprecated unused
*/
@Deprecated
public void recheckReachability() {
for (Transport t : _transports.values())
t.recheckReachability();
}
public boolean isBacklogged(Hash dest) {
public boolean isBacklogged(Hash peer) {
for (Transport t : _transports.values()) {
if (t.isBacklogged(dest))
if (t.isBacklogged(peer))
return true;
}
return false;
}
public boolean isEstablished(Hash dest) {
public boolean isEstablished(Hash peer) {
for (Transport t : _transports.values()) {
if (t.isEstablished(dest))
if (t.isEstablished(peer))
return true;
}
return false;
}
/**
* Tell the transports that we may disconnect from this peer.
* This is advisory only.
*
* @since 0.9.24
*/
public void mayDisconnect(Hash peer) {
for (Transport t : _transports.values()) {
t.mayDisconnect(peer);
}
}
/**
* Was the peer UNreachable (outbound only) on any transport,
* based on the last time we tried it for each transport?
* This is NOT reset if the peer contacts us.
*/
public boolean wasUnreachable(Hash dest) {
public boolean wasUnreachable(Hash peer) {
for (Transport t : _transports.values()) {
if (!t.wasUnreachable(dest))
if (!t.wasUnreachable(peer))
return false;
}
return true;
@@ -452,8 +465,8 @@ public class TransportManager implements TransportEventListener {
*
* @return IPv4 or IPv6 or null
*/
public byte[] getIP(Hash dest) {
return TransportImpl.getIP(dest);
public byte[] getIP(Hash peer) {
return TransportImpl.getIP(peer);
}
/**
@@ -745,8 +758,8 @@ public class TransportManager implements TransportEventListener {
//"<b id=\"def.dev\">").append(_t("Dev")).append("</b>: ").append(_t("The standard deviation of the round trip time in milliseconds")).append("<br>\n" +
"<b id=\"def.rto\">RTO</b>: ").append(_t("The retransmit timeout in milliseconds")).append("<br>\n" +
"<b id=\"def.mtu\">MTU</b>: ").append(_t("Current maximum send packet size / estimated maximum receive packet size (bytes)")).append("<br>\n" +
"<b id=\"def.send\">").append(_t("TX")).append("</b>: ").append(_t("The total number of packets sent to the peer")).append("<br>\n" +
"<b id=\"def.recv\">").append(_t("RX")).append("</b>: ").append(_t("The total number of packets received from the peer")).append("<br>\n" +
"<b id=\"def.send\">").append(_t("TX")).append("</b>: ").append(_t("The total number of messages sent to the peer")).append("<br>\n" +
"<b id=\"def.recv\">").append(_t("RX")).append("</b>: ").append(_t("The total number of messages received from the peer")).append("<br>\n" +
"<b id=\"def.resent\">").append(_t("Dup TX")).append("</b>: ").append(_t("The total number of packets retransmitted to the peer")).append("<br>\n" +
"<b id=\"def.dupRecv\">").append(_t("Dup RX")).append("</b>: ").append(_t("The total number of duplicate packets received from the peer")).append("</p>" +
"</div>\n");

View File

@@ -85,6 +85,7 @@ class EventPumper implements Runnable {
/** tunnel test now disabled, but this should be long enough to allow an active tunnel to get started */
private static final long MIN_EXPIRE_IDLE_TIME = 120*1000l;
private static final long MAX_EXPIRE_IDLE_TIME = 11*60*1000l;
private static final long MAY_DISCON_TIMEOUT = 10*1000;
/**
* Do we use direct buffers for reading? Default false.
@@ -221,7 +222,8 @@ class EventPumper implements Runnable {
int failsafeInvalid = 0;
// Increase allowed idle time if we are well under allowed connections, otherwise decrease
if (_transport.haveCapacity(33))
boolean haveCap = _transport.haveCapacity(33);
if (haveCap)
_expireIdleWriteTime = Math.min(_expireIdleWriteTime + 1000, MAX_EXPIRE_IDLE_TIME);
else
_expireIdleWriteTime = Math.max(_expireIdleWriteTime - 3000, MIN_EXPIRE_IDLE_TIME);
@@ -270,8 +272,16 @@ class EventPumper implements Runnable {
failsafeWrites++;
}
if ( con.getTimeSinceSend() > _expireIdleWriteTime &&
con.getTimeSinceReceive() > _expireIdleWriteTime) {
final long expire;
if (!haveCap && con.getMayDisconnect() &&
con.getMessagesReceived() <= 2 && con.getMessagesSent() <= 1) {
expire = MAY_DISCON_TIMEOUT;
} else {
expire = _expireIdleWriteTime;
}
if ( con.getTimeSinceSend() > expire &&
con.getTimeSinceReceive() > expire) {
// we haven't sent or received anything in a really long time, so lets just close 'er up
con.close();
failsafeCloses++;

View File

@@ -13,6 +13,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.Adler32;
@@ -115,8 +116,8 @@ class NTCPConnection implements Closeable {
private byte _prevWriteEnd[];
/** current partially read I2NP message */
private final ReadState _curReadState;
private final AtomicLong _messagesRead = new AtomicLong();
private final AtomicLong _messagesWritten = new AtomicLong();
private final AtomicInteger _messagesRead = new AtomicInteger();
private final AtomicInteger _messagesWritten = new AtomicInteger();
private long _lastSendTime;
private long _lastReceiveTime;
private long _lastRateUpdated;
@@ -134,6 +135,7 @@ class NTCPConnection implements Closeable {
/** how many consecutive sends were failed due to (estimated) send queue time */
//private int _consecutiveBacklog;
private long _nextInfoTime;
private boolean _mayDisconnect;
/*
* Update frequency for send/recv rates in console peers page
@@ -325,11 +327,11 @@ class NTCPConnection implements Closeable {
return _context.clock().now() -_establishedOn;
}
public long getMessagesSent() { return _messagesWritten.get(); }
public int getMessagesSent() { return _messagesWritten.get(); }
public long getMessagesReceived() { return _messagesRead.get(); }
public int getMessagesReceived() { return _messagesRead.get(); }
public long getOutboundQueueSize() {
public int getOutboundQueueSize() {
int queued;
synchronized(_outbound) {
queued = _outbound.size();
@@ -360,6 +362,17 @@ class NTCPConnection implements Closeable {
*/
public long getCreated() { return _created; }
/**
* Sets to true.
* @since 0.9.24
*/
public void setMayDisconnect() { _mayDisconnect = true; }
/**
* @since 0.9.24
*/
public boolean getMayDisconnect() { return _mayDisconnect; }
/**
* workaround for EventPumper
* @since 0.8.12

View File

@@ -486,6 +486,21 @@ public class NTCPTransport extends TransportImpl {
return (con != null) && con.isEstablished() && con.tooBacklogged();
}
/**
* Tell the transport that we may disconnect from this peer.
* This is advisory only.
*
* @since 0.9.24
*/
@Override
public void mayDisconnect(final Hash peer) {
final NTCPConnection con = _conByIdent.get(peer);
if (con != null && con.isEstablished() && con.isInbound() &&
con.getMessagesReceived() <= 2 && con.getMessagesSent() <= 1) {
con.setMayDisconnect();
}
}
/**
* @return usually the con passed in, but possibly a second connection with the same peer...
*/

View File

@@ -177,11 +177,14 @@ class OutboundMessageState implements CDPQEntry {
/**
* Note that we have pushed the message fragments.
* Increments push count (and max sends... why?)
* @return true if this is the first push
*/
public synchronized void push() {
public synchronized boolean push() {
boolean rv = _pushCount == 0;
// these will never be different...
_pushCount++;
_maxSends = _pushCount;
return rv;
}
/**

View File

@@ -198,6 +198,7 @@ class PeerState {
/** how many dup packets were received within the last RETRANSMISSION_PERIOD_WIDTH packets */
private int _packetsReceivedDuplicate;
private int _packetsReceived;
private boolean _mayDisconnect;
/** list of InboundMessageState for active message */
private final Map<Long, InboundMessageState> _inboundMessages;
@@ -447,6 +448,7 @@ class PeerState {
* @return false always
* @deprecated unused, ECNs are never sent, always returns false
*/
@Deprecated
public boolean getCurrentSecondECNReceived() { return _currentSecondECNReceived; }
/**
@@ -542,6 +544,7 @@ class PeerState {
* connection, or null if we are not in the process of rekeying.
* @deprecated unused
*/
@Deprecated
public void setNextMACKey(SessionKey key) { _nextMACKey = key; }
/**
@@ -550,6 +553,7 @@ class PeerState {
* of rekeying.
* @deprecated unused
*/
@Deprecated
public void setNextCipherKey(SessionKey key) { _nextCipherKey = key; }
/**
@@ -569,6 +573,7 @@ class PeerState {
* when were the current cipher and MAC keys established/rekeyed?
* @deprecated unused
*/
@Deprecated
public void setKeyEstablishedTime(long when) { _keyEstablishedTime = when; }
/**
@@ -771,14 +776,23 @@ class PeerState {
public long getIntroducerTime() { return _lastIntroducerTime; }
public void setIntroducerTime() { _lastIntroducerTime = _context.clock().now(); }
/** we received the message specified completely */
/**
* We received the message specified completely.
* @param bytes if less than or equal to zero, message is a duplicate.
*/
public void messageFullyReceived(Long messageId, int bytes) { messageFullyReceived(messageId, bytes, false); }
public synchronized void messageFullyReceived(Long messageId, int bytes, boolean isForACK) {
/**
* We received the message specified completely.
* @param isForACK unused
* @param bytes if less than or equal to zero, message is a duplicate.
*/
private synchronized void messageFullyReceived(Long messageId, int bytes, boolean isForACK) {
if (bytes > 0) {
_receiveBytes += bytes;
//if (isForACK)
// _receiveACKBytes += bytes;
_messagesReceived++;
} else {
//if (true || _retransmissionPeriodStart + 1000 < _context.clock().now()) {
_packetsReceivedDuplicate++;
@@ -803,7 +817,6 @@ class PeerState {
if (_wantACKSendSince <= 0)
_wantACKSendSince = now;
_currentACKs.add(messageId);
_messagesReceived++;
}
public void messagePartiallyReceived() {
@@ -958,6 +971,7 @@ class PeerState {
* @return non-null, possibly empty
* @deprecated unused
*/
@Deprecated
public List<ACKBitfield> retrieveACKBitfields() { return retrieveACKBitfields(true); }
/**
@@ -1027,10 +1041,6 @@ class PeerState {
}
}
int partialIncluded = 0;
if (bytesRemaining > 4) {
// ok, there's room to *try* to fit in some partial ACKs, so
@@ -1274,8 +1284,23 @@ class PeerState {
/** how skewed are the measured RTTs? */
public synchronized int getRTTDeviation() { return _rttDeviation; }
public synchronized int getMessagesSent() { return _messagesSent; }
/**
* I2NP messages sent.
* Does not include duplicates.
* As of 0.9.24, incremented when bandwidth is allocated just before sending, not when acked.
*/
public int getMessagesSent() {
synchronized (_outboundMessages) {
return _messagesSent;
}
}
/**
* I2NP messages received.
* As of 0.9.24, does not include duplicates.
*/
public synchronized int getMessagesReceived() { return _messagesReceived; }
public synchronized int getPacketsTransmitted() { return _packetsTransmitted; }
public synchronized int getPacketsRetransmitted() { return _packetsRetransmitted; }
//public long getPacketsPeriodTransmitted() { return _packetsPeriodTransmitted; }
@@ -1339,6 +1364,7 @@ class PeerState {
public long getLastACKSend() { return _lastACKSend; }
/** @deprecated unused */
@Deprecated
public void setLastACKSend(long when) { _lastACKSend = when; }
public long getWantedACKSendSince() { return _wantACKSendSince; }
@@ -1498,6 +1524,18 @@ class PeerState {
if (_dead) return 0;
return _outboundMessages.size() + _outboundQueue.size();
}
/**
* Sets to true.
* @since 0.9.24
*/
public void setMayDisconnect() { _mayDisconnect = true; }
/**
* @since 0.9.24
*/
public boolean getMayDisconnect() { return _mayDisconnect; }
/**
* Expire / complete any outbound messages
@@ -1771,7 +1809,8 @@ class PeerState {
if (state.getPushCount() > 0)
_retransmitter = state;
state.push();
if (state.push())
_messagesSent++;
int rto = getRTO();
state.setNextSendTime(now + rto);
@@ -2062,8 +2101,10 @@ class PeerState {
buf.append(" cwin: ").append(_sendWindowBytes);
buf.append(" acwin: ").append(_sendWindowBytesRemaining);
buf.append(" consecFail: ").append(_consecutiveFailedSends);
buf.append(" recv OK/Dup: ").append(_packetsReceived).append('/').append(_packetsReceivedDuplicate);
buf.append(" send OK/Dup: ").append(_packetsTransmitted).append('/').append(_packetsRetransmitted);
buf.append(" msgs rcvd: ").append(_messagesReceived);
buf.append(" msgs sent: ").append(_messagesSent);
buf.append(" pkts rcvd OK/Dup: ").append(_packetsReceived).append('/').append(_packetsReceivedDuplicate);
buf.append(" pkts sent OK/Dup: ").append(_packetsTransmitted).append('/').append(_packetsRetransmitted);
buf.append(" IBM: ").append(_inboundMessages.size());
buf.append(" OBQ: ").append(_outboundQueue.size());
buf.append(" OBL: ").append(_outboundMessages.size());

View File

@@ -2432,6 +2432,21 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
return peer != null && peer.isBacklogged();
}
/**
* Tell the transport that we may disconnect from this peer.
* This is advisory only.
*
* @since 0.9.24
*/
@Override
public void mayDisconnect(final Hash peer) {
final PeerState ps = _peersByIdent.get(peer);
if (ps != null && ps.isInbound() &&
ps.getMessagesReceived() <= 2 && ps.getMessagesSent() <= 1) {
ps.setMayDisconnect();
}
}
public boolean allowConnection() {
return _peersByIdent.size() < getMaxConnections();
}
@@ -2678,8 +2693,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
//buf.append(peer.getMTUDecreases());
buf.append("</td>");
long sent = peer.getPacketsTransmitted();
long recv = peer.getPacketsReceived();
long sent = peer.getMessagesSent();
long recv = peer.getMessagesReceived();
buf.append("<td class=\"cells\" align=\"right\">");
buf.append(sent);
@@ -2820,6 +2835,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
private static final long LONG_LOOP_TIME = 25*1000;
private static final long EXPIRE_INCREMENT = 15*1000;
private static final long EXPIRE_DECREMENT = 45*1000;
private static final long MAY_DISCON_TIMEOUT = 10*1000;
public ExpirePeerEvent() {
super(_context.simpleTimer2());
@@ -2829,7 +2845,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
public void timeReached() {
// Increase allowed idle time if we are well under allowed connections, otherwise decrease
if (haveCapacity(33)) {
boolean haveCap = haveCapacity(33);
if (haveCap) {
long inc;
// don't adjust too quickly if we are looping fast
if (_lastLoopShort)
@@ -2848,6 +2865,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
long now = _context.clock().now();
long shortInactivityCutoff = now - _expireTimeout;
long longInactivityCutoff = now - EXPIRE_TIMEOUT;
final long mayDisconCutoff = now - MAY_DISCON_TIMEOUT;
long pingCutoff = now - (2 * 60*60*1000);
long pingFirewallCutoff = now - PING_FIREWALL_CUTOFF;
boolean shouldPingFirewall = _reachabilityStatus != Status.OK;
@@ -2862,10 +2880,14 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
PeerState peer = iter.next();
long inactivityCutoff;
// if we offered to introduce them, or we used them as introducer in last 2 hours
if (peer.getWeRelayToThemAs() > 0 || peer.getIntroducerTime() > pingCutoff)
if (peer.getWeRelayToThemAs() > 0 || peer.getIntroducerTime() > pingCutoff) {
inactivityCutoff = longInactivityCutoff;
else
} else if (!haveCap && peer.getMayDisconnect() &&
peer.getMessagesReceived() <= 2 && peer.getMessagesSent() <= 1) {
inactivityCutoff = mayDisconCutoff;
} else {
inactivityCutoff = shortInactivityCutoff;
}
if ( (peer.getLastReceiveTime() < inactivityCutoff) && (peer.getLastSendTime() < inactivityCutoff) ) {
_expireBuffer.add(peer);
iter.remove();

View File

@@ -638,6 +638,8 @@ class BuildHandler implements Runnable {
if (isInGW && isOutEnd) {
_context.statManager().addRateData("tunnel.rejectHostile", 1);
_log.error("Dropping build request, IBGW+OBEP");
if (from != null)
_context.commSystem().mayDisconnect(from);
return;
}
@@ -649,6 +651,8 @@ class BuildHandler implements Runnable {
// old i2pd
if (_log.shouldWarn())
_log.warn("Dropping build request, we are the next hop");
if (from != null)
_context.commSystem().mayDisconnect(from);
return;
}
if (!isInGW) {
@@ -669,6 +673,7 @@ class BuildHandler implements Runnable {
_context.statManager().addRateData("tunnel.rejectHostile", 1);
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping build request with the same previous and next hop");
_context.commSystem().mayDisconnect(from);
return;
}
}
@@ -683,12 +688,16 @@ class BuildHandler implements Runnable {
_context.statManager().addRateData("tunnel.rejectTooOld", 1);
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping build request too old... replay attack? " + DataHelper.formatDuration(timeDiff));
if (from != null)
_context.commSystem().mayDisconnect(from);
return;
}
if (timeDiff < 0 - MAX_REQUEST_FUTURE) {
_context.statManager().addRateData("tunnel.rejectFuture", 1);
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping build request too far in future " + DataHelper.formatDuration(0 - timeDiff));
if (from != null)
_context.commSystem().mayDisconnect(from);
return;
}
@@ -844,6 +853,8 @@ class BuildHandler implements Runnable {
state.msg.getUniqueId() + "/" + ourId + "/" + req.readNextTunnelId() + " delay " +
recvDelay + " as " +
(isOutEnd ? "outbound endpoint" : isInGW ? "inbound gw" : "participant"));
if (from != null)
_context.commSystem().mayDisconnect(from);
// Connection congestion control:
// If we rejected the request, are near our conn limits, and aren't connected to the next hop,
// just drop it.
@@ -856,6 +867,9 @@ class BuildHandler implements Runnable {
_log.warn("Not sending rejection due to conn limits");
return;
}
} else if (isInGW && from != null) {
// we're the start of the tunnel, no use staying connected
_context.commSystem().mayDisconnect(from);
}
EncryptedBuildRecord reply = BuildResponseRecord.create(_context, response, req.readReplyKey(), req.readReplyIV(), state.msg.getUniqueId());