Streaming: Handle reset packets without a FROM field, validate

signature using connection's destination
Log tweaks
Remove 0 arg from addRateData() calls
This commit is contained in:
zzz
2015-04-14 14:11:48 +00:00
parent cd6d9cdd94
commit de6608f6b8
7 changed files with 32 additions and 21 deletions

View File

@@ -307,6 +307,7 @@ class Connection {
reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
reply.setSendStreamId(_sendStreamId);
reply.setReceiveStreamId(_receiveStreamId);
// TODO remove this someday, as of 0.9.20 we do not require it
reply.setOptionalFrom(_connectionManager.getSession().getMyDestination());
reply.setLocalPort(_localPort);
reply.setRemotePort(_remotePort);
@@ -1359,8 +1360,8 @@ class Connection {
//getOptions().setRTT(getOptions().getRTT() + 10*1000);
getOptions().setWindowSize(newWindowSize);
if (_log.shouldLog(Log.WARN))
_log.warn("Congestion, resending packet " + _packet.getSequenceNum() + " (new windowSize " + newWindowSize
if (_log.shouldLog(Log.INFO))
_log.info("Congestion, resending packet " + _packet.getSequenceNum() + " (new windowSize " + newWindowSize
+ "/" + getOptions().getWindowSize() + ") for " + Connection.this.toString());
windowAdjusted();

View File

@@ -249,6 +249,7 @@ class ConnectionHandler {
reply.setAckThrough(packet.getSequenceNum());
reply.setSendStreamId(packet.getReceiveStreamId());
reply.setReceiveStreamId(0);
// TODO remove this someday, as of 0.9.20 we do not require it
reply.setOptionalFrom(_manager.getSession().getMyDestination());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending RST: " + reply + " because of " + packet);

View File

@@ -296,7 +296,7 @@ class ConnectionManager {
return null;
}
_context.statManager().addRateData("stream.connectionReceived", 1, 0);
_context.statManager().addRateData("stream.connectionReceived", 1);
return con;
}
@@ -454,7 +454,7 @@ class ConnectionManager {
break;
}
_context.statManager().addRateData("stream.connectionCreated", 1, 0);
_context.statManager().addRateData("stream.connectionCreated", 1);
return con;
}
@@ -542,7 +542,7 @@ class ConnectionManager {
if (_dayThrottler != null && _dayThrottler.shouldThrottle(h)) {
_context.statManager().addRateData("stream.con.throttledDay", 1, 0);
_context.statManager().addRateData("stream.con.throttledDay", 1);
if (_defaultOptions.getMaxConnsPerDay() <= 0)
return "throttled by" +
" total limit of " + _defaultOptions.getMaxTotalConnsPerDay() +
@@ -556,7 +556,7 @@ class ConnectionManager {
" per day";
}
if (_hourThrottler != null && _hourThrottler.shouldThrottle(h)) {
_context.statManager().addRateData("stream.con.throttledHour", 1, 0);
_context.statManager().addRateData("stream.con.throttledHour", 1);
if (_defaultOptions.getMaxConnsPerHour() <= 0)
return "throttled by" +
" total limit of " + _defaultOptions.getMaxTotalConnsPerHour() +
@@ -570,7 +570,7 @@ class ConnectionManager {
" per hour";
}
if (_minuteThrottler != null && _minuteThrottler.shouldThrottle(h)) {
_context.statManager().addRateData("stream.con.throttledMinute", 1, 0);
_context.statManager().addRateData("stream.con.throttledMinute", 1);
if (_defaultOptions.getMaxConnsPerMinute() <= 0)
return "throttled by" +
" total limit of " + _defaultOptions.getMaxTotalConnsPerMinute() +

View File

@@ -123,7 +123,7 @@ class ConnectionPacketHandler {
}
con.getOptions().setChoke(0);
_context.statManager().addRateData("stream.con.receiveMessageSize", packet.getPayloadSize(), 0);
_context.statManager().addRateData("stream.con.receiveMessageSize", packet.getPayloadSize());
boolean allowAck = true;
final boolean isSYN = packet.isFlagSet(Packet.FLAG_SYNCHRONIZE);
@@ -190,7 +190,7 @@ class ConnectionPacketHandler {
}
} else {
if ( (seqNum > 0) || (packet.getPayloadSize() > 0) || isSYN) {
_context.statManager().addRateData("stream.con.receiveDuplicateSize", packet.getPayloadSize(), 0);
_context.statManager().addRateData("stream.con.receiveDuplicateSize", packet.getPayloadSize());
con.incrementDupMessagesReceived(1);
// take note of congestion
@@ -199,8 +199,8 @@ class ConnectionPacketHandler {
final int ackDelay = con.getOptions().getSendAckDelay();
final long lastSendTime = con.getLastSendTime();
if (_log.shouldLog(Log.WARN))
_log.warn(String.format("%s congestion.. dup packet %s ackDelay %d lastSend %s ago",
if (_log.shouldLog(Log.INFO))
_log.info(String.format("%s congestion.. dup packet %s ackDelay %d lastSend %s ago",
con, packet, ackDelay, DataHelper.formatDuration(now - lastSendTime)));
final long nextSendTime = lastSendTime + ackDelay;
@@ -344,9 +344,9 @@ class ConnectionPacketHandler {
}
if (firstAck) {
if (con.isInbound())
_context.statManager().addRateData("stream.con.initialRTT.in", highestRTT, 0);
_context.statManager().addRateData("stream.con.initialRTT.in", highestRTT);
else
_context.statManager().addRateData("stream.con.initialRTT.out", highestRTT, 0);
_context.statManager().addRateData("stream.con.initialRTT.out", highestRTT);
}
}
_context.statManager().addRateData("stream.con.packetsAckedPerMessageReceived", acked.size(), highestRTT);
@@ -513,10 +513,17 @@ class ConnectionPacketHandler {
/**
* Make sure this RST packet is valid, and if it is, act on it.
*
* Prior to 0.9.20, the reset packet must contain a FROM field,
* and we used that for verification.
* As of 0.9.20, we correctly use the connection's remote peer.
*/
private void verifyReset(Packet packet, Connection con) {
if (con.getReceiveStreamId() == packet.getSendStreamId()) {
boolean ok = packet.verifySignature(_context, packet.getOptionalFrom(), null);
Destination from = con.getRemotePeer();
if (from == null)
from = packet.getOptionalFrom();
boolean ok = packet.verifySignature(_context, from, null);
if (!ok) {
if (_log.shouldLog(Log.ERROR))
_log.error("Received unsigned / forged RST on " + con);

View File

@@ -54,7 +54,7 @@ class MessageHandler implements I2PSessionMuxedListener {
try {
data = session.receiveMessage(msgId);
} catch (I2PSessionException ise) {
_context.statManager().addRateData("stream.packetReceiveFailure", 1, 0);
_context.statManager().addRateData("stream.packetReceiveFailure", 1);
if (_log.shouldLog(Log.WARN))
_log.warn("Error receiving the message", ise);
return;
@@ -67,7 +67,7 @@ class MessageHandler implements I2PSessionMuxedListener {
packet.setLocalPort(toPort);
_manager.getPacketHandler().receivePacket(packet);
} catch (IllegalArgumentException iae) {
_context.statManager().addRateData("stream.packetReceiveFailure", 1, 0);
_context.statManager().addRateData("stream.packetReceiveFailure", 1);
if (_log.shouldLog(Log.WARN))
_log.warn("Received an invalid packet", iae);
}

View File

@@ -246,6 +246,7 @@ class PacketHandler {
reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
reply.setSendStreamId(packet.getReceiveStreamId());
reply.setReceiveStreamId(packet.getSendStreamId());
// TODO remove this someday, as of 0.9.20 we do not require it
reply.setOptionalFrom(_manager.getSession().getMyDestination());
reply.setLocalPort(packet.getLocalPort());
reply.setRemotePort(packet.getRemotePort());
@@ -268,14 +269,15 @@ class PacketHandler {
}
packet.releasePayload();
} else {
if (_log.shouldLog(Log.WARN) && !packet.isFlagSet(Packet.FLAG_SYNCHRONIZE))
_log.warn("Packet received on an unknown stream (and not an ECHO or SYN): " + packet);
// this happens a lot
if (_log.shouldLog(Log.INFO) && !packet.isFlagSet(Packet.FLAG_SYNCHRONIZE))
_log.info("Packet received on an unknown stream (and not an ECHO or SYN): " + packet);
if (sendId <= 0) {
Connection con = _manager.getConnectionByOutboundId(packet.getReceiveStreamId());
if (con != null) {
if ( (con.getHighestAckedThrough() <= 5) && (packet.getSequenceNum() <= 5) ) {
if (_log.shouldLog(Log.WARN))
_log.warn("Received additional packet w/o SendStreamID after the syn on " + con + ": " + packet);
if (_log.shouldLog(Log.INFO))
_log.info("Received additional packet w/o SendStreamID after the syn on " + con + ": " + packet);
receiveKnownCon(con, packet);
return;
} else {