- Add check for replayed session requests (ticket #1212)
   - Disable inbound check connection
   - Reduce object churn in EstablishmentManager
   - Don't pollute Hash cache in EstablishmentManager
   - addRateData() cleanup
This commit is contained in:
zzz
2014-02-24 13:54:52 +00:00
parent 5d6a1c5e35
commit 9d7a9c9895
2 changed files with 108 additions and 50 deletions

View File

@@ -17,6 +17,7 @@ import net.i2p.router.Router;
import net.i2p.router.RouterContext;
import net.i2p.router.transport.crypto.DHSessionKeyBuilder;
import net.i2p.util.Log;
import net.i2p.util.SimpleByteCache;
/**
* Handle the 4-phase establishment, which is as follows:
@@ -135,8 +136,8 @@ class EstablishState {
} else {
_X = _dh.getMyPublicValueBytes();
_Y = new byte[256];
byte hx[] = ctx.sha().calculateHash(_X).getData();
DataHelper.xor(hx, 0, con.getRemotePeer().calculateHash().getData(), 0, _hX_xor_bobIdentHash, 0, hx.length);
ctx.sha().calculateHash(_X, 0, _X.length, _hX_xor_bobIdentHash, 0);
xor32(con.getRemotePeer().calculateHash().getData(), _hX_xor_bobIdentHash);
}
_prevEncrypted = new byte[16];
@@ -189,13 +190,13 @@ class EstablishState {
byte c = src.get();
_X[_received++] = c;
//if (_log.shouldLog(Log.DEBUG)) _log.debug("recv x" + (int)c + " received=" + _received);
if (_received >= _X.length) {
if (isCheckInfo(_context, _context.routerHash(), _X)) {
_context.statManager().addRateData("ntcp.inboundCheckConnection", 1, 0);
fail("Incoming connection was a check connection");
return;
}
}
//if (_received >= _X.length) {
// if (isCheckInfo(_context, _context.routerHash(), _X)) {
// _context.statManager().addRateData("ntcp.inboundCheckConnection", 1);
// fail("Incoming connection was a check connection");
// return;
// }
//}
}
while (_received < _X.length + _hX_xor_bobIdentHash.length && src.hasRemaining()) {
int i = _received-_X.length;
@@ -212,19 +213,29 @@ class EstablishState {
// first verify that Alice knows who she is trying to talk with and that the X
// isn't corrupt
Hash hX = _context.sha().calculateHash(_X);
byte realXor[] = DataHelper.xor(hX.getData(), _context.routerHash().getData());
if (_log.shouldLog(Log.DEBUG)) {
byte[] realXor = SimpleByteCache.acquire(Hash.HASH_LENGTH);
_context.sha().calculateHash(_X, 0, _X.length, realXor, 0);
xor32(_context.routerHash().getData(), realXor);
//if (_log.shouldLog(Log.DEBUG)) {
//_log.debug(prefix()+"_X = " + Base64.encode(_X));
_log.debug(prefix()+"hx = " + Base64.encode(hX.getData()));
_log.debug(prefix()+"bih=" + Base64.encode(_context.routerHash().getData()));
_log.debug(prefix()+"xor=" + Base64.encode(realXor));
}
// _log.debug(prefix()+"hx = " + Base64.encode(hX.getData()));
// _log.debug(prefix()+"bih=" + Base64.encode(_context.routerHash().getData()));
// _log.debug(prefix()+"xor=" + Base64.encode(realXor));
//}
if (!DataHelper.eq(realXor, _hX_xor_bobIdentHash)) {
_context.statManager().addRateData("ntcp.invalidHXxorBIH", 1, 0);
SimpleByteCache.release(realXor);
_context.statManager().addRateData("ntcp.invalidHXxorBIH", 1);
fail("Invalid hX_xor");
return;
}
SimpleByteCache.release(realXor);
if (!_transport.isHXHIValid(_hX_xor_bobIdentHash)) {
// blocklist source? but spoofed IPs could DoS us
_context.statManager().addRateData("ntcp.replayHXxorBIH", 1);
fail("Replay hX_xor");
return;
}
try {
// ok, they're actually trying to talk to us, and we got their (unauthenticated) X
_dh.setPeerPublicValue(_X);
@@ -238,23 +249,25 @@ class EstablishState {
byte xy[] = new byte[_X.length+_Y.length];
System.arraycopy(_X, 0, xy, 0, _X.length);
System.arraycopy(_Y, 0, xy, _X.length, _Y.length);
Hash hxy = _context.sha().calculateHash(xy);
byte[] hxy = SimpleByteCache.acquire(Hash.HASH_LENGTH);
_context.sha().calculateHash(xy, 0, xy.length, hxy, 0);
_tsB = (_context.clock().now() + 500) / 1000l; // our (Bob's) timestamp in seconds
byte toEncrypt[] = new byte[hxy.getData().length + (4 + 12)];
System.arraycopy(hxy.getData(), 0, toEncrypt, 0, hxy.getData().length);
byte toEncrypt[] = new byte[hxy.length + (4 + 12)]; // 48
System.arraycopy(hxy, 0, toEncrypt, 0, hxy.length);
byte tsB[] = DataHelper.toLong(4, _tsB);
System.arraycopy(tsB, 0, toEncrypt, hxy.getData().length, tsB.length);
System.arraycopy(tsB, 0, toEncrypt, hxy.length, tsB.length);
//DataHelper.toLong(toEncrypt, hxy.getData().length, 4, _tsB);
_context.random().nextBytes(toEncrypt, hxy.getData().length + 4, 12);
_context.random().nextBytes(toEncrypt, hxy.length + 4, 12);
if (_log.shouldLog(Log.DEBUG)) {
//_log.debug(prefix()+"Y="+Base64.encode(_Y));
//_log.debug(prefix()+"x+y="+Base64.encode(xy));
_log.debug(prefix()+"h(x+y)="+Base64.encode(hxy.getData()));
_log.debug(prefix()+"h(x+y)="+Base64.encode(hxy));
_log.debug(prefix()+"tsb="+Base64.encode(tsB));
_log.debug(prefix()+"unencrypted H(X+Y)+tsB+padding: " + Base64.encode(toEncrypt));
_log.debug(prefix()+"encryption iv= " + Base64.encode(_Y, _Y.length-16, 16));
_log.debug(prefix()+"encryption key= " + _dh.getSessionKey().toBase64());
}
SimpleByteCache.release(hxy);
_e_hXY_tsB = new byte[toEncrypt.length];
_context.aes().encrypt(toEncrypt, 0, _e_hXY_tsB, 0, _dh.getSessionKey(), _Y, _Y.length-16, toEncrypt.length);
if (_log.shouldLog(Log.DEBUG))
@@ -267,7 +280,7 @@ class EstablishState {
_transport.getPumper().wantsWrite(_con, write);
if (!src.hasRemaining()) return;
} catch (DHSessionKeyBuilder.InvalidPublicParameterException e) {
_context.statManager().addRateData("ntcp.invalidDH", 1, 0);
_context.statManager().addRateData("ntcp.invalidDH", 1);
fail("Invalid X", e);
return;
}
@@ -363,7 +376,7 @@ class EstablishState {
_log.debug(prefix()+"DH session key calculated (" + _dh.getSessionKey().toBase64() + ")");
_e_hXY_tsB = new byte[Hash.HASH_LENGTH+4+12];
} catch (DHSessionKeyBuilder.InvalidPublicParameterException e) {
_context.statManager().addRateData("ntcp.invalidDH", 1, 0);
_context.statManager().addRateData("ntcp.invalidDH", 1);
fail("Invalid X", e);
return;
}
@@ -385,13 +398,17 @@ class EstablishState {
byte XY[] = new byte[_X.length + _Y.length];
System.arraycopy(_X, 0, XY, 0, _X.length);
System.arraycopy(_Y, 0, XY, _X.length, _Y.length);
Hash h = _context.sha().calculateHash(XY);
if (_log.shouldLog(Log.DEBUG)) _log.debug(prefix() + "h(XY)=" + h.toBase64());
if (!DataHelper.eq(h.getData(), 0, hXY_tsB, 0, Hash.HASH_LENGTH)) {
_context.statManager().addRateData("ntcp.invalidHXY", 1, 0);
byte[] h = SimpleByteCache.acquire(Hash.HASH_LENGTH);
_context.sha().calculateHash(XY, 0, XY.length, h, 0);
//if (_log.shouldLog(Log.DEBUG))
// _log.debug(prefix() + "h(XY)=" + h.toBase64());
if (!DataHelper.eq(h, 0, hXY_tsB, 0, Hash.HASH_LENGTH)) {
SimpleByteCache.release(h);
_context.statManager().addRateData("ntcp.invalidHXY", 1);
fail("Invalid H(X+Y) - mitm attack attempted?");
return;
}
SimpleByteCache.release(h);
_tsB = DataHelper.fromLong(hXY_tsB, Hash.HASH_LENGTH, 4); // their (Bob's) timestamp in seconds
_tsA = (_context.clock().now() + 500) / 1000; // our (Alice's) timestamp in seconds
if (_log.shouldLog(Log.DEBUG))
@@ -407,7 +424,7 @@ class EstablishState {
if (diff != 0)
_log.logAlways(Log.WARN, "NTP failure, NTCP adjusting clock by " + DataHelper.formatDuration(diff));
} else if (diff >= Router.CLOCK_FUDGE_FACTOR) {
_context.statManager().addRateData("ntcp.invalidOutboundSkew", diff, 0);
_context.statManager().addRateData("ntcp.invalidOutboundSkew", diff);
_transport.markReachable(_con.getRemotePeer().calculateHash(), false);
// Only banlist if we know what time it is
_context.banlist().banlistRouter(DataHelper.formatDuration(diff),
@@ -457,10 +474,10 @@ class EstablishState {
_prevEncrypted = new byte[preEncrypt.length];
_context.aes().encrypt(preEncrypt, 0, _prevEncrypted, 0, _dh.getSessionKey(), _hX_xor_bobIdentHash, _hX_xor_bobIdentHash.length-16, preEncrypt.length);
if (_log.shouldLog(Log.DEBUG)) {
//if (_log.shouldLog(Log.DEBUG)) {
//_log.debug(prefix() + "unencrypted response to Bob: " + Base64.encode(preEncrypt));
//_log.debug(prefix() + "encrypted response to Bob: " + Base64.encode(_prevEncrypted));
}
//}
// send 'er off (when the bw limiter says, etc)
_transport.getPumper().wantsWrite(_con, _prevEncrypted);
}
@@ -504,7 +521,7 @@ class EstablishState {
_verified = _context.dsa().verifySignature(sig, toVerify, _con.getRemotePeer().getSigningPublicKey());
if (!_verified) {
_context.statManager().addRateData("ntcp.invalidSignature", 1, 0);
_context.statManager().addRateData("ntcp.invalidSignature", 1);
fail("Signature was invalid - attempt to spoof " + _con.getRemotePeer().calculateHash().toBase64() + "?");
} else {
if (_log.shouldLog(Log.DEBUG))
@@ -569,7 +586,7 @@ class EstablishState {
RouterIdentity alice = new RouterIdentity();
int sz = (int)DataHelper.fromLong(b, 0, 2); // TO-DO: Hey zzz... Throws an NPE for me... see below, for my "quick fix", need to find out the real reason
if ( (sz <= 0) || (sz > b.length-2-4-Signature.SIGNATURE_BYTES) ) {
_context.statManager().addRateData("ntcp.invalidInboundSize", sz, 0);
_context.statManager().addRateData("ntcp.invalidInboundSize", sz);
fail("size is invalid", new Exception("size is " + sz));
return;
}
@@ -597,21 +614,21 @@ class EstablishState {
Signature sig = new Signature(s);
_verified = _context.dsa().verifySignature(sig, toVerify, alice.getSigningPublicKey());
if (_verified) {
// get inet-addr
InetAddress addr = this._con.getChannel().socket().getInetAddress();
// get inet-addr
InetAddress addr = this._con.getChannel().socket().getInetAddress();
byte[] ip = (addr == null) ? null : addr.getAddress();
if (_context.banlist().isBanlistedForever(alice.calculateHash())) {
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping inbound connection from permanently banlisted peer: " + alice.calculateHash().toBase64());
// So next time we will not accept the con from this IP,
// rather than doing the whole handshake
if(ip != null)
_context.blocklist().add(ip);
if(ip != null)
_context.blocklist().add(ip);
fail("Peer is banlisted forever: " + alice.calculateHash().toBase64());
return;
}
if(ip != null)
_transport.setIP(alice.calculateHash(), ip);
if(ip != null)
_transport.setIP(alice.calculateHash(), ip);
if (_log.shouldLog(Log.DEBUG))
_log.debug(prefix() + "verification successful for " + _con);
@@ -624,7 +641,7 @@ class EstablishState {
if (diff != 0)
_log.logAlways(Log.WARN, "NTP failure, NTCP adjusting clock by " + DataHelper.formatDuration(diff));
} else if (diff >= Router.CLOCK_FUDGE_FACTOR) {
_context.statManager().addRateData("ntcp.invalidInboundSkew", diff, 0);
_context.statManager().addRateData("ntcp.invalidInboundSkew", diff);
_transport.markReachable(alice.calculateHash(), true);
// Only banlist if we know what time it is
_context.banlist().banlistRouter(DataHelper.formatDuration(diff),
@@ -647,14 +664,14 @@ class EstablishState {
if (_log.shouldLog(Log.INFO))
_log.info(prefix()+"Verified remote peer as " + alice.calculateHash().toBase64());
} else {
_context.statManager().addRateData("ntcp.invalidInboundSignature", 1, 0);
_context.statManager().addRateData("ntcp.invalidInboundSignature", 1);
fail("Peer verification failed - spoof of " + alice.calculateHash().toBase64() + "?");
}
} catch (IOException ioe) {
_context.statManager().addRateData("ntcp.invalidInboundIOE", 1, 0);
_context.statManager().addRateData("ntcp.invalidInboundIOE", 1);
fail("Error verifying peer", ioe);
} catch (DataFormatException dfe) {
_context.statManager().addRateData("ntcp.invalidInboundDFE", 1, 0);
_context.statManager().addRateData("ntcp.invalidInboundDFE", 1);
fail("Error verifying peer", dfe);
} catch(NullPointerException npe) {
fail("Error verifying peer", npe); // TO-DO: zzz This is that quick-fix. -- Sponge
@@ -722,8 +739,21 @@ class EstablishState {
public String getError() { return _err; }
public Exception getException() { return _e; }
/**
* XOR a into b. Modifies b. a is unmodified.
* @param a 32 bytes
* @param b 32 bytes
* @since 0.9.12
*/
private static void xor32(byte[] a, byte[] b) {
for (int i = 0; i < 32; i++) {
b[i] ^= a[i];
}
}
private String prefix() { return toString(); }
@Override
public String toString() {
StringBuilder buf = new StringBuilder(64);
@@ -750,14 +780,18 @@ class EstablishState {
* @return should always be false since nobody ever sends a check info message
*
*/
/*****
private static boolean isCheckInfo(I2PAppContext ctx, Hash us, byte first256[]) {
Log log = ctx.logManager().getLog(EstablishState.class);
int off = 32; // ignore the first 32 bytes
Hash h = ctx.sha().calculateHash(first256, off, first256.length-32-off);
byte xor[] = DataHelper.xor(h.getData(), us.getData());
if (log.shouldLog(Log.DEBUG))
log.debug("check hash: " + h.toBase64() + " xor: " + Base64.encode(xor));
byte[] xor = SimpleByteCache.acquire(Hash.HASH_LENGTH);
ctx.sha().calculateHash(first256, off, first256.length-32-off, xor, 0);
xor32(us.getData(), xor);
//if (log.shouldLog(Log.DEBUG))
// log.debug("check hash: " + h.toBase64() + " xor: " + Base64.encode(xor));
if (DataHelper.eq(xor, 0, first256, first256.length-32, 32)) {
SimpleByteCache.release(xor);
// ok, data is as expected
// parse our IP/port/etc out of the first256
int ipSize = (int)DataHelper.fromLong(first256, off, 1);
@@ -782,12 +816,17 @@ class EstablishState {
}
return true;
} else {
SimpleByteCache.release(xor);
if (log.shouldLog(Log.DEBUG))
log.debug("Not a checkInfo connection");
return false;
}
}
*****/
/**
* @since 0.9.8
*/
private static class VerifiedEstablishState extends EstablishState {
@Override public boolean isComplete() { return true; }
@Override public void prepareOutbound() {

View File

@@ -37,6 +37,8 @@ import net.i2p.router.transport.TransportImpl;
import net.i2p.router.transport.TransportUtil;
import static net.i2p.router.transport.TransportUtil.IPv6Config.*;
import net.i2p.router.transport.crypto.DHSessionKeyBuilder;
import net.i2p.router.util.DecayingHashSet;
import net.i2p.router.util.DecayingBloomFilter;
import net.i2p.util.Addresses;
import net.i2p.util.ConcurrentHashSet;
import net.i2p.util.Log;
@@ -69,6 +71,8 @@ public class NTCPTransport extends TransportImpl {
* want to remove on establishment or close on timeout
*/
private final Set<NTCPConnection> _establishing;
/** "bloom filter" */
private final DecayingBloomFilter _replayFilter;
/**
* Do we have a public IPv6 address?
@@ -135,7 +139,7 @@ public class NTCPTransport extends TransportImpl {
_context.statManager().createRateStat("ntcp.corruptSkew", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.corruptTooLargeI2NP", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.dontSendOnBacklog", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.inboundCheckConnection", "", "ntcp", RATES);
//_context.statManager().createRateStat("ntcp.inboundCheckConnection", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.inboundEstablished", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.inboundEstablishedDuplicate", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.infoMessageEnqueued", "", "ntcp", RATES);
@@ -162,6 +166,7 @@ public class NTCPTransport extends TransportImpl {
_context.statManager().createRateStat("ntcp.receiveCorruptEstablishment", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.receiveMeta", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.registerConnect", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.replayHXxorBIH", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.throttledReadComplete", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.throttledWriteComplete", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.wantsQueuedWrite", "", "ntcp", RATES);
@@ -171,6 +176,7 @@ public class NTCPTransport extends TransportImpl {
_establishing = new ConcurrentHashSet<NTCPConnection>(16);
_conLock = new Object();
_conByIdent = new ConcurrentHashMap<Hash, NTCPConnection>(64);
_replayFilter = new DecayingHashSet(ctx, 10*60*1000, 32, "NTCP-Hx^HI");
_finisher = new NTCPSendFinisher(ctx, this);
@@ -486,6 +492,19 @@ public class NTCPTransport extends TransportImpl {
return skews;
}
/**
* Incoming connection replay detection.
* As there is no timestamp in the first message, we can't detect
* something long-delayed. To be fixed in next version of NTCP.
*
* @param hxhi 32 bytes
* @return valid
* @since 0.9.12
*/
boolean isHXHIValid(byte[] hxhi) {
return !_replayFilter.add(hxhi);
}
private static final int MIN_CONCURRENT_READERS = 2; // unless < 32MB
private static final int MIN_CONCURRENT_WRITERS = 2; // unless < 32MB
private static final int MAX_CONCURRENT_READERS = 4;