forked from I2P_Developers/i2p.i2p
NTCP: Cleanup, remove commented-out code in prep for NTCP2
Increase max RI size
This commit is contained in:
@@ -123,7 +123,7 @@ class EstablishState {
|
||||
private boolean _failedBySkew;
|
||||
|
||||
private static final int MIN_RI_SIZE = 387;
|
||||
private static final int MAX_RI_SIZE = 2048;
|
||||
private static final int MAX_RI_SIZE = 3072;
|
||||
|
||||
private static final int AES_SIZE = 16;
|
||||
private static final int XY_SIZE = 256;
|
||||
@@ -256,14 +256,6 @@ class EstablishState {
|
||||
while (_state == State.IB_INIT && src.hasRemaining()) {
|
||||
byte c = src.get();
|
||||
_X[_received++] = c;
|
||||
//if (_log.shouldLog(Log.DEBUG)) _log.debug("recv x" + (int)c + " received=" + _received);
|
||||
//if (_received >= _X.length) {
|
||||
// if (isCheckInfo(_context, _context.routerHash(), _X)) {
|
||||
// _context.statManager().addRateData("ntcp.inboundCheckConnection", 1);
|
||||
// fail("Incoming connection was a check connection");
|
||||
// return;
|
||||
// }
|
||||
//}
|
||||
if (_received >= XY_SIZE)
|
||||
changeState(State.IB_GOT_X);
|
||||
}
|
||||
@@ -272,7 +264,6 @@ class EstablishState {
|
||||
_received++;
|
||||
byte c = src.get();
|
||||
_hX_xor_bobIdentHash[i] = c;
|
||||
//if (_log.shouldLog(Log.DEBUG)) _log.debug("recv bih" + (int)c + " received=" + _received);
|
||||
if (i >= HXY_SIZE - 1)
|
||||
changeState(State.IB_GOT_HX);
|
||||
}
|
||||
@@ -287,12 +278,6 @@ class EstablishState {
|
||||
byte[] realXor = SimpleByteCache.acquire(HXY_SIZE);
|
||||
_context.sha().calculateHash(_X, 0, XY_SIZE, realXor, 0);
|
||||
xor32(_context.routerHash().getData(), realXor);
|
||||
//if (_log.shouldLog(Log.DEBUG)) {
|
||||
//_log.debug(prefix()+"_X = " + Base64.encode(_X));
|
||||
// _log.debug(prefix()+"hx = " + Base64.encode(hX.getData()));
|
||||
// _log.debug(prefix()+"bih=" + Base64.encode(_context.routerHash().getData()));
|
||||
// _log.debug(prefix()+"xor=" + Base64.encode(realXor));
|
||||
//}
|
||||
if (!DataHelper.eq(realXor, _hX_xor_bobIdentHash)) {
|
||||
SimpleByteCache.release(realXor);
|
||||
_context.statManager().addRateData("ntcp.invalidHXxorBIH", 1);
|
||||
@@ -327,11 +312,8 @@ class EstablishState {
|
||||
System.arraycopy(hxy, 0, toEncrypt, 0, HXY_SIZE);
|
||||
byte tsB[] = DataHelper.toLong(4, _tsB);
|
||||
System.arraycopy(tsB, 0, toEncrypt, HXY_SIZE, tsB.length);
|
||||
//DataHelper.toLong(toEncrypt, hxy.getData().length, 4, _tsB);
|
||||
_context.random().nextBytes(toEncrypt, HXY_SIZE + 4, 12);
|
||||
if (_log.shouldLog(Log.DEBUG)) {
|
||||
//_log.debug(prefix()+"Y="+Base64.encode(_Y));
|
||||
//_log.debug(prefix()+"x+y="+Base64.encode(xy));
|
||||
_log.debug(prefix()+"h(x+y)="+Base64.encode(hxy));
|
||||
_log.debug(prefix() + "tsb = " + _tsB);
|
||||
_log.debug(prefix()+"unencrypted H(X+Y)+tsB+padding: " + Base64.encode(toEncrypt));
|
||||
@@ -364,8 +346,6 @@ class EstablishState {
|
||||
_state == State.IB_GOT_RI_SIZE ||
|
||||
_state == State.IB_GOT_RI) && src.hasRemaining()) {
|
||||
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug(prefix()+"Encrypted bytes available (" + src.hasRemaining() + ")");
|
||||
// Collect a 16-byte block
|
||||
while (_curEncryptedOffset < AES_SIZE && src.hasRemaining()) {
|
||||
_curEncrypted[_curEncryptedOffset++] = src.get();
|
||||
@@ -375,8 +355,6 @@ class EstablishState {
|
||||
if (_curEncryptedOffset >= AES_SIZE) {
|
||||
_context.aes().decrypt(_curEncrypted, 0, _curDecrypted, 0, _dh.getSessionKey(),
|
||||
_prevEncrypted, 0, AES_SIZE);
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug(prefix() + "full block read and decrypted: ");
|
||||
|
||||
byte swap[] = _prevEncrypted;
|
||||
_prevEncrypted = _curEncrypted;
|
||||
@@ -406,8 +384,6 @@ class EstablishState {
|
||||
} catch (IOException ioe) {
|
||||
if (_log.shouldLog(Log.ERROR)) _log.error(prefix()+"Error writing to the baos?", ioe);
|
||||
}
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug(prefix()+"subsequent block decrypted (" + _sz_aliceIdent_tsA_padding_aliceSig.size() + ")");
|
||||
|
||||
if (_state == State.IB_GOT_RI_SIZE &&
|
||||
_sz_aliceIdent_tsA_padding_aliceSig.size() >= 2 + _aliceIdentSize) {
|
||||
@@ -487,7 +463,6 @@ class EstablishState {
|
||||
while (_state == State.OB_SENT_X && src.hasRemaining()) {
|
||||
byte c = src.get();
|
||||
_Y[_received++] = c;
|
||||
//if (_log.shouldLog(Log.DEBUG)) _log.debug("recv x" + (int)c + " received=" + _received);
|
||||
if (_received >= XY_SIZE) {
|
||||
try {
|
||||
_dh.setPeerPublicValue(_Y);
|
||||
@@ -510,8 +485,6 @@ class EstablishState {
|
||||
_received++;
|
||||
byte c = src.get();
|
||||
_e_hXY_tsB[i] = c;
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug(prefix() + "recv _e_hXY_tsB " + (int)c + " received=" + _received);
|
||||
if (i+1 >= HXY_TSB_PAD_SIZE) {
|
||||
if (_log.shouldLog(Log.DEBUG)) _log.debug(prefix() + "received _e_hXY_tsB fully");
|
||||
byte hXY_tsB[] = new byte[HXY_TSB_PAD_SIZE];
|
||||
@@ -521,8 +494,6 @@ class EstablishState {
|
||||
System.arraycopy(_Y, 0, XY, XY_SIZE, XY_SIZE);
|
||||
byte[] h = SimpleByteCache.acquire(HXY_SIZE);
|
||||
_context.sha().calculateHash(XY, 0, XY_SIZE + XY_SIZE, h, 0);
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug(prefix() + "h(XY)=" + h.toBase64());
|
||||
if (!DataHelper.eq(h, 0, hXY_tsB, 0, HXY_SIZE)) {
|
||||
SimpleByteCache.release(h);
|
||||
_context.statManager().addRateData("ntcp.invalidHXY", 1);
|
||||
@@ -576,16 +547,8 @@ class EstablishState {
|
||||
DataHelper.toLong(preSign, XY_SIZE + XY_SIZE + HXY_SIZE, 4, _tsA);
|
||||
DataHelper.toLong(preSign, XY_SIZE + XY_SIZE + HXY_SIZE + 4, 4, _tsB);
|
||||
// hXY_tsB has 12 bytes of padding (size=48, tsB=4 + hXY=32)
|
||||
//System.arraycopy(hXY_tsB, hXY_tsB.length-12, preSign, _X.length+_Y.length+Hash.HASH_LENGTH+4+4, 12);
|
||||
//byte sigPad[] = new byte[padSig];
|
||||
//_context.random().nextBytes(sigPad);
|
||||
//System.arraycopy(sigPad, 0, preSign, _X.length+_Y.length+Hash.HASH_LENGTH+4+4, padSig);
|
||||
Signature sig = _context.dsa().sign(preSign, _context.keyManager().getSigningPrivateKey());
|
||||
|
||||
//if (_log.shouldLog(Log.DEBUG)) {
|
||||
// _log.debug(prefix()+"signing " + Base64.encode(preSign));
|
||||
//}
|
||||
|
||||
byte ident[] = _context.router().getRouterInfo().getIdentity().toByteArray();
|
||||
// handle variable signature size
|
||||
int min = 2 + ident.length + 4 + sig.length();
|
||||
@@ -605,11 +568,6 @@ class EstablishState {
|
||||
_context.aes().encrypt(preEncrypt, 0, _prevEncrypted, 0, _dh.getSessionKey(),
|
||||
_hX_xor_bobIdentHash, _hX_xor_bobIdentHash.length-AES_SIZE, preEncrypt.length);
|
||||
|
||||
//if (_log.shouldLog(Log.DEBUG)) {
|
||||
//_log.debug(prefix() + "unencrypted response to Bob: " + Base64.encode(preEncrypt));
|
||||
//_log.debug(prefix() + "encrypted response to Bob: " + Base64.encode(_prevEncrypted));
|
||||
//}
|
||||
// send 'er off (when the bw limiter says, etc)
|
||||
changeState(State.OB_SENT_RI);
|
||||
_transport.getPumper().wantsWrite(_con, _prevEncrypted);
|
||||
}
|
||||
@@ -641,14 +599,11 @@ class EstablishState {
|
||||
src.hasRemaining() + " off=" + off + " recv=" + _received + ")");
|
||||
}
|
||||
while (_state == State.OB_SENT_RI && src.hasRemaining()) {
|
||||
//if (_log.shouldLog(Log.DEBUG)) _log.debug(prefix()+"recv bobSig received=" + _received);
|
||||
_e_bobSig[off++] = src.get();
|
||||
_received++;
|
||||
|
||||
if (off >= _e_bobSig.length) {
|
||||
changeState(State.OB_GOT_SIG);
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug(prefix() + "received E(S(X+Y+Alice.identHash+tsA+tsB)+padding, sk, prev): " + Base64.encode(_e_bobSig));
|
||||
byte bobSig[] = new byte[_e_bobSig.length];
|
||||
_context.aes().decrypt(_e_bobSig, 0, bobSig, 0, _dh.getSessionKey(),
|
||||
_e_hXY_tsB, HXY_TSB_PAD_SIZE - AES_SIZE, _e_bobSig.length);
|
||||
@@ -759,8 +714,6 @@ class EstablishState {
|
||||
*/
|
||||
private void readAliceRouterIdentity() {
|
||||
byte b[] = _sz_aliceIdent_tsA_padding_aliceSig.toByteArray();
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug(prefix()+"decrypted sz(etc) data: " + Base64.encode(b));
|
||||
|
||||
try {
|
||||
int sz = _aliceIdentSize;
|
||||
@@ -825,10 +778,6 @@ class EstablishState {
|
||||
//baos.write(b, 2+sz+4, b.length-2-sz-4-Signature.SIGNATURE_BYTES);
|
||||
|
||||
byte toVerify[] = baos.toByteArray();
|
||||
//if (_log.shouldLog(Log.DEBUG)) {
|
||||
// _log.debug(prefix()+"checking " + Base64.encode(toVerify, 0, AES_SIZE));
|
||||
// //_log.debug(prefix()+"check pad " + Base64.encode(b, 2+sz+4, 12));
|
||||
//}
|
||||
|
||||
// handle variable signature size
|
||||
SigType type = _aliceIdent.getSigningPublicKey().getType();
|
||||
@@ -1052,63 +1001,6 @@ class EstablishState {
|
||||
return buf.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* a check info connection will receive 256 bytes containing:
|
||||
* - 32 bytes of uninterpreted, ignored data
|
||||
* - 1 byte size
|
||||
* - that many bytes making up the local router's IP address (as reached by the remote side)
|
||||
* - 2 byte port number that the local router was reached on
|
||||
* - 4 byte i2p network time as known by the remote side (seconds since the epoch)
|
||||
* - uninterpreted padding data, up to byte 223
|
||||
* - xor of the local router's identity hash and the SHA256 of bytes 32 through bytes 223
|
||||
*
|
||||
* @return should always be false since nobody ever sends a check info message
|
||||
*
|
||||
*/
|
||||
/*****
|
||||
private static boolean isCheckInfo(I2PAppContext ctx, Hash us, byte first256[]) {
|
||||
Log log = ctx.logManager().getLog(EstablishState.class);
|
||||
int off = 32; // ignore the first 32 bytes
|
||||
|
||||
byte[] xor = SimpleByteCache.acquire(Hash.HASH_LENGTH);
|
||||
ctx.sha().calculateHash(first256, off, first256.length-32-off, xor, 0);
|
||||
xor32(us.getData(), xor);
|
||||
//if (log.shouldLog(Log.DEBUG))
|
||||
// log.debug("check hash: " + h.toBase64() + " xor: " + Base64.encode(xor));
|
||||
if (DataHelper.eq(xor, 0, first256, first256.length-32, 32)) {
|
||||
SimpleByteCache.release(xor);
|
||||
// ok, data is as expected
|
||||
// parse our IP/port/etc out of the first256
|
||||
int ipSize = (int)DataHelper.fromLong(first256, off, 1);
|
||||
off++;
|
||||
byte ip[] = new byte[ipSize];
|
||||
System.arraycopy(first256, off, ip, 0, ipSize);
|
||||
try {
|
||||
InetAddress ourIP = InetAddress.getByAddress(ip);
|
||||
off += ipSize;
|
||||
int port = (int)DataHelper.fromLong(first256, off, 2);
|
||||
off += 2;
|
||||
long now = DataHelper.fromLong(first256, off, 4);
|
||||
off += 4;
|
||||
long skewSeconds = (ctx.clock().now()/1000)-now;
|
||||
if (log.shouldLog(Log.INFO))
|
||||
log.info("Check info received: our IP: " + ourIP + " our port: " + port
|
||||
+ " skew: " + skewSeconds + " s");
|
||||
} catch (UnknownHostException uhe) {
|
||||
// ipSize is invalid
|
||||
if (log.shouldLog(Log.WARN))
|
||||
log.warn("Invalid IP received on check connection (size: " + ipSize + ")");
|
||||
}
|
||||
return true;
|
||||
} else {
|
||||
SimpleByteCache.release(xor);
|
||||
if (log.shouldLog(Log.DEBUG))
|
||||
log.debug("Not a checkInfo connection");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
*****/
|
||||
|
||||
/**
|
||||
* @since 0.9.8
|
||||
*/
|
||||
@@ -1145,47 +1037,6 @@ class EstablishState {
|
||||
@Override public String toString() { return "FailedEstablishState: ";}
|
||||
}
|
||||
|
||||
/** @deprecated unused */
|
||||
/*********
|
||||
public static void checkHost(String args[]) {
|
||||
if (args.length != 3) {
|
||||
System.err.println("Usage: EstablishState ipOrHostname portNum peerHashBase64");
|
||||
return;
|
||||
}
|
||||
try {
|
||||
I2PAppContext ctx = I2PAppContext.getGlobalContext();
|
||||
String host = args[0];
|
||||
int port = Integer.parseInt(args[1]);
|
||||
byte peer[] = Base64.decode(args[2]);
|
||||
Socket s = new Socket(host, port);
|
||||
OutputStream out = s.getOutputStream();
|
||||
byte toSend[] = new byte[256];
|
||||
ctx.random().nextBytes(toSend);
|
||||
int off = 32;
|
||||
byte ip[] = s.getInetAddress().getAddress();
|
||||
DataHelper.toLong(toSend, off, 1, ip.length);
|
||||
off++;
|
||||
System.arraycopy(ip, 0, toSend, off, ip.length);
|
||||
off += ip.length;
|
||||
DataHelper.toLong(toSend, off, 2, port);
|
||||
off += 2;
|
||||
long now = ctx.clock().now()/1000;
|
||||
DataHelper.toLong(toSend, off, 4, now);
|
||||
off += 4;
|
||||
Hash h = ctx.sha().calculateHash(toSend, 32, toSend.length-32-32);
|
||||
DataHelper.xor(peer, 0, h.getData(), 0, toSend, toSend.length-32, peer.length);
|
||||
System.out.println("check hash: " + h.toBase64());
|
||||
|
||||
out.write(toSend);
|
||||
out.flush();
|
||||
try { Thread.sleep(1000); } catch (InterruptedException ie) {}
|
||||
s.close();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
*******/
|
||||
|
||||
/*******
|
||||
public static void main(String args[]) {
|
||||
if (args.length == 3) {
|
||||
|
@@ -181,12 +181,8 @@ class EventPumper implements Runnable {
|
||||
runDelayedEvents();
|
||||
|
||||
try {
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("before select...");
|
||||
int count = _selector.select(SELECTOR_LOOP_DELAY);
|
||||
if (count > 0) {
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("select returned " + count);
|
||||
Set<SelectionKey> selected = _selector.selectedKeys();
|
||||
//_context.statManager().addRateData("ntcp.pumperKeysPerLoop", selected.size());
|
||||
processKeys(selected);
|
||||
@@ -397,11 +393,9 @@ class EventPumper implements Runnable {
|
||||
processConnect(key);
|
||||
}
|
||||
if (read) {
|
||||
//_context.statManager().addRateData("ntcp.read", 1, 0);
|
||||
processRead(key);
|
||||
}
|
||||
if (write) {
|
||||
//_context.statManager().addRateData("ntcp.write", 1, 0);
|
||||
processWrite(key);
|
||||
}
|
||||
//if (!(accept || connect || read || write)) {
|
||||
@@ -429,9 +423,6 @@ class EventPumper implements Runnable {
|
||||
_context.statManager().addRateData("ntcp.wantsQueuedWrite", 1);
|
||||
con.queuedWrite(buf, req);
|
||||
} else {
|
||||
// fully allocated
|
||||
//if (_log.shouldLog(Log.INFO))
|
||||
// _log.info("fully allocated write on " + con + " for " + data.length);
|
||||
con.write(buf);
|
||||
}
|
||||
}
|
||||
@@ -475,12 +466,6 @@ class EventPumper implements Runnable {
|
||||
else
|
||||
rv = ByteBuffer.allocate(BUF_SIZE);
|
||||
_numBufs++;
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("creating a new read buffer " + System.identityHashCode(rv) + " with " + __liveBufs + " live: " + rv);
|
||||
//_context.statManager().addRateData("ntcp.liveReadBufs", NUM_BUFS, 0);
|
||||
} else {
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("acquiring existing read buffer " + System.identityHashCode(rv) + " with " + __liveBufs + " live: " + rv);
|
||||
}
|
||||
return rv;
|
||||
}
|
||||
@@ -491,10 +476,6 @@ class EventPumper implements Runnable {
|
||||
* High-frequency path in thread.
|
||||
*/
|
||||
public static void releaseBuf(ByteBuffer buf) {
|
||||
//if (false) return;
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("releasing read buffer " + System.identityHashCode(buf) + " with " + __liveBufs + " live: " + buf);
|
||||
|
||||
// double check
|
||||
if (buf.capacity() < BUF_SIZE) {
|
||||
I2PAppContext.getGlobalContext().logManager().getLog(EventPumper.class).error("Bad size " + buf.capacity(), new Exception());
|
||||
@@ -516,13 +497,9 @@ class EventPumper implements Runnable {
|
||||
}
|
||||
}
|
||||
}
|
||||
//if (cached && _log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("read buffer " + System.identityHashCode(buf) + " cached with " + __liveBufs + " live");
|
||||
}
|
||||
|
||||
private void processAccept(SelectionKey key) {
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("processing accept");
|
||||
ServerSocketChannel servChan = (ServerSocketChannel)key.attachment();
|
||||
try {
|
||||
SocketChannel chan = servChan.accept();
|
||||
@@ -542,8 +519,6 @@ class EventPumper implements Runnable {
|
||||
if (_context.blocklist().isBlocklisted(ip)) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Receive session request from blocklisted IP: " + chan.socket().getInetAddress());
|
||||
// need to add this stat first
|
||||
// _context.statManager().addRateData("ntcp.connectBlocklisted", 1, 0);
|
||||
try { chan.close(); } catch (IOException ioe) { }
|
||||
return;
|
||||
}
|
||||
@@ -559,14 +534,11 @@ class EventPumper implements Runnable {
|
||||
return;
|
||||
}
|
||||
|
||||
// BUGFIX for firewalls. --Sponge
|
||||
if (shouldSetKeepAlive(chan))
|
||||
chan.socket().setKeepAlive(true);
|
||||
|
||||
SelectionKey ckey = chan.register(_selector, SelectionKey.OP_READ);
|
||||
new NTCPConnection(_context, _transport, chan, ckey);
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("new NTCP connection established: " +con);
|
||||
} catch (IOException ioe) {
|
||||
_log.error("Error accepting", ioe);
|
||||
}
|
||||
@@ -580,7 +552,6 @@ class EventPumper implements Runnable {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("processing connect for " + con + ": connected? " + connected);
|
||||
if (connected) {
|
||||
// BUGFIX for firewalls. --Sponge
|
||||
if (shouldSetKeepAlive(chan))
|
||||
chan.socket().setKeepAlive(true);
|
||||
con.setKey(key);
|
||||
@@ -595,7 +566,6 @@ class EventPumper implements Runnable {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Failed outbound " + con, ioe);
|
||||
con.closeOnTimeout("connect failed", ioe);
|
||||
//_context.banlist().banlistRouter(con.getRemotePeer().calculateHash(), "Error connecting", NTCPTransport.STYLE);
|
||||
_transport.markUnreachable(con.getRemotePeer().calculateHash());
|
||||
_context.statManager().addRateData("ntcp.connectFailedTimeoutIOE", 1);
|
||||
} catch (NoConnectionPendingException ncpe) {
|
||||
@@ -630,7 +600,6 @@ class EventPumper implements Runnable {
|
||||
try {
|
||||
int read = con.getChannel().read(buf);
|
||||
if (read < 0) {
|
||||
//_context.statManager().addRateData("ntcp.readEOF", 1);
|
||||
if (con.isInbound() && con.getMessagesReceived() <= 0) {
|
||||
InetAddress addr = con.getChannel().socket().getInetAddress();
|
||||
int count;
|
||||
@@ -653,8 +622,6 @@ class EventPumper implements Runnable {
|
||||
con.close();
|
||||
releaseBuf(buf);
|
||||
} else if (read == 0) {
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("nothing to read for " + con + ", but stay interested");
|
||||
// stay interested
|
||||
//key.interestOps(key.interestOps() | SelectionKey.OP_READ);
|
||||
releaseBuf(buf);
|
||||
@@ -679,14 +646,9 @@ class EventPumper implements Runnable {
|
||||
if (req.getPendingRequested() > 0) {
|
||||
// rare since we generally don't throttle inbound
|
||||
key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("bw throttled reading for " + con + ", so we don't want to read anymore");
|
||||
_context.statManager().addRateData("ntcp.queuedRecv", read);
|
||||
con.queuedRecv(buf, req);
|
||||
} else {
|
||||
// fully allocated
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("not bw throttled reading for " + con);
|
||||
// stay interested
|
||||
//key.interestOps(key.interestOps() | SelectionKey.OP_READ);
|
||||
con.recv(buf);
|
||||
@@ -747,51 +709,31 @@ class EventPumper implements Runnable {
|
||||
* High-frequency path in thread.
|
||||
*/
|
||||
private void processWrite(SelectionKey key) {
|
||||
//int totalWritten = 0;
|
||||
//int buffers = 0;
|
||||
//long before = System.currentTimeMillis();
|
||||
NTCPConnection con = (NTCPConnection)key.attachment();
|
||||
try {
|
||||
while (true) {
|
||||
ByteBuffer buf = con.getNextWriteBuf();
|
||||
if (buf != null) {
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("writing " + buf.remaining()+"...");
|
||||
if (buf.remaining() <= 0) {
|
||||
//long beforeRem = System.currentTimeMillis();
|
||||
con.removeWriteBuf(buf);
|
||||
//long afterRem = System.currentTimeMillis();
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("buffer was already fully written and removed after " + (afterRem-beforeRem) + "...");
|
||||
//buffers++;
|
||||
continue;
|
||||
}
|
||||
int written = con.getChannel().write(buf);
|
||||
//totalWritten += written;
|
||||
if (written == 0) {
|
||||
if ( (buf.remaining() > 0) || (!con.isWriteBufEmpty()) ) {
|
||||
//if (_log.shouldLog(Log.DEBUG)) _log.debug("done writing, but data remains...");
|
||||
// stay interested
|
||||
//key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
|
||||
} else {
|
||||
//if (_log.shouldLog(Log.DEBUG)) _log.debug("done writing, no data remains...");
|
||||
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
|
||||
}
|
||||
break;
|
||||
} else if (buf.remaining() > 0) {
|
||||
//if (_log.shouldLog(Log.DEBUG)) _log.debug("buffer data remaining...");
|
||||
// stay interested
|
||||
//key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
|
||||
break;
|
||||
} else {
|
||||
//long beforeRem = System.currentTimeMillis();
|
||||
con.removeWriteBuf(buf);
|
||||
//long afterRem = System.currentTimeMillis();
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("buffer "+ buffers+"/"+written+"/"+totalWritten+" fully written after " +
|
||||
// (beforeRem-before) + ", then removed after " + (afterRem-beforeRem) + "...");
|
||||
//releaseBuf(buf);
|
||||
//buffers++;
|
||||
//if (buffer time is too much, add OP_WRITe to the interest ops and break?)
|
||||
// LOOP
|
||||
}
|
||||
@@ -811,10 +753,6 @@ class EventPumper implements Runnable {
|
||||
_context.statManager().addRateData("ntcp.writeError", 1);
|
||||
con.close();
|
||||
}
|
||||
//long after = System.currentTimeMillis();
|
||||
//if (_log.shouldLog(Log.INFO))
|
||||
// _log.info("Wrote " + totalWritten + " in " + buffers + " buffers on " + con
|
||||
// + " after " + (after-before));
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -900,7 +838,6 @@ class EventPumper implements Runnable {
|
||||
boolean connected = con.getChannel().connect(saddr);
|
||||
if (connected) {
|
||||
// Never happens, we use nonblocking
|
||||
//_context.statManager().addRateData("ntcp.connectImmediate", 1);
|
||||
key.interestOps(SelectionKey.OP_READ);
|
||||
processConnect(key);
|
||||
}
|
||||
@@ -909,24 +846,12 @@ class EventPumper implements Runnable {
|
||||
_log.warn("error connecting to " + Addresses.toString(naddr.getIP(), naddr.getPort()), ioe);
|
||||
_context.statManager().addRateData("ntcp.connectFailedIOE", 1);
|
||||
_transport.markUnreachable(con.getRemotePeer().calculateHash());
|
||||
//if (ntcpOnly(con)) {
|
||||
// _context.banlist().banlistRouter(con.getRemotePeer().calculateHash(), "unable to connect: " + ioe.getMessage());
|
||||
// con.close(false);
|
||||
//} else {
|
||||
// _context.banlist().banlistRouter(con.getRemotePeer().calculateHash(), "unable to connect: " + ioe.getMessage(), NTCPTransport.STYLE);
|
||||
con.close(true);
|
||||
//}
|
||||
con.close(true);
|
||||
} catch (UnresolvedAddressException uae) {
|
||||
if (_log.shouldLog(Log.WARN)) _log.warn("unresolved address connecting", uae);
|
||||
_context.statManager().addRateData("ntcp.connectFailedUnresolved", 1);
|
||||
_transport.markUnreachable(con.getRemotePeer().calculateHash());
|
||||
//if (ntcpOnly(con)) {
|
||||
// _context.banlist().banlistRouter(con.getRemotePeer().calculateHash(), "unable to connect/resolve: " + uae.getMessage());
|
||||
// con.close(false);
|
||||
//} else {
|
||||
// _context.banlist().banlistRouter(con.getRemotePeer().calculateHash(), "unable to connect/resolve: " + uae.getMessage(), NTCPTransport.STYLE);
|
||||
con.close(true);
|
||||
//}
|
||||
con.close(true);
|
||||
} catch (CancelledKeyException cke) {
|
||||
con.close(false);
|
||||
}
|
||||
@@ -941,22 +866,7 @@ class EventPumper implements Runnable {
|
||||
_lastExpired = now;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If the other peer only supports ntcp, we should banlist them when we can't reach 'em,
|
||||
* but if they support other transports (eg ssu) we should allow those transports to be
|
||||
* tried as well.
|
||||
*/
|
||||
/****
|
||||
private boolean ntcpOnly(NTCPConnection con) {
|
||||
RouterIdentity ident = con.getRemotePeer();
|
||||
if (ident == null) return true;
|
||||
RouterInfo info = _context.netDb().lookupRouterInfoLocally(ident.calculateHash());
|
||||
if (info == null) return true;
|
||||
return info.getAddresses().size() == 1;
|
||||
}
|
||||
****/
|
||||
|
||||
|
||||
private long _lastExpired;
|
||||
|
||||
private void expireTimedOut() {
|
||||
|
@@ -305,8 +305,6 @@ public class NTCPConnection implements Closeable {
|
||||
_clockSkew = clockSkew;
|
||||
_prevWriteEnd = prevWriteEnd;
|
||||
System.arraycopy(prevReadEnd, prevReadEnd.length - BLOCK_SIZE, _prevReadBlock, 0, BLOCK_SIZE);
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("Inbound established, prevWriteEnd: " + Base64.encode(prevWriteEnd) + " prevReadEnd: " + Base64.encode(prevReadEnd));
|
||||
_establishedOn = _context.clock().now();
|
||||
NTCPConnection rv = _transport.inboundEstablished(this);
|
||||
_nextMetaTime = _establishedOn + (META_FREQUENCY / 2) + _context.random().nextInt(META_FREQUENCY);
|
||||
@@ -468,64 +466,20 @@ public class NTCPConnection implements Closeable {
|
||||
* toss the message onto the connection's send queue
|
||||
*/
|
||||
public void send(OutNetMessage msg) {
|
||||
/****
|
||||
always enqueue, let the queue do the dropping
|
||||
|
||||
if (tooBacklogged()) {
|
||||
boolean allowRequeue = false; // if we are too backlogged in tcp, don't try ssu
|
||||
boolean successful = false;
|
||||
_consecutiveBacklog++;
|
||||
_transport.afterSend(msg, successful, allowRequeue, msg.getLifetime());
|
||||
if (_consecutiveBacklog > 10) { // waaay too backlogged
|
||||
boolean wantsWrite = false;
|
||||
try { wantsWrite = ( (_conKey.interestOps() & SelectionKey.OP_WRITE) != 0); } catch (RuntimeException e) {}
|
||||
if (_log.shouldLog(Log.WARN)) {
|
||||
int blocks = _writeBufs.size();
|
||||
_log.warn("Too backlogged for too long (" + _consecutiveBacklog + " messages for " + DataHelper.formatDuration(queueTime()) + ", sched? " + wantsWrite + ", blocks: " + blocks + ") sending to " + _remotePeer.calculateHash());
|
||||
}
|
||||
_context.statManager().addRateData("ntcp.closeOnBacklog", getUptime());
|
||||
close();
|
||||
}
|
||||
_context.statManager().addRateData("ntcp.dontSendOnBacklog", _consecutiveBacklog);
|
||||
return;
|
||||
}
|
||||
_consecutiveBacklog = 0;
|
||||
****/
|
||||
//if (FAST_LARGE)
|
||||
_outbound.offer(msg);
|
||||
//int enqueued = _outbound.size();
|
||||
// although stat description says ahead of this one, not including this one...
|
||||
//_context.statManager().addRateData("ntcp.sendQueueSize", enqueued);
|
||||
boolean noOutbound = (getCurrentOutbound() == null);
|
||||
//if (_log.shouldLog(Log.DEBUG)) _log.debug("messages enqueued on " + toString() + ": " + enqueued + " new one: " + msg.getMessageId() + " of " + msg.getMessageType());
|
||||
if (isEstablished() && noOutbound)
|
||||
_transport.getWriter().wantsWrite(this, "enqueued");
|
||||
}
|
||||
|
||||
/****
|
||||
private long queueTime() {
|
||||
OutNetMessage msg = _currentOutbound;
|
||||
if (msg == null) {
|
||||
msg = _outbound.peek();
|
||||
if (msg == null)
|
||||
return 0;
|
||||
}
|
||||
return msg.getSendTime(); // does not include any of the pre-send(...) preparation
|
||||
}
|
||||
****/
|
||||
|
||||
public boolean isBacklogged() { return _outbound.isBacklogged(); }
|
||||
|
||||
public boolean tooBacklogged() {
|
||||
//long queueTime = queueTime();
|
||||
//if (queueTime <= 0) return false;
|
||||
|
||||
// perhaps we could take into account the size of the queued messages too, our
|
||||
// current transmission rate, and how much time is left before the new message's expiration?
|
||||
// ok, maybe later...
|
||||
if (getUptime() < 10*1000) // allow some slack just after establishment
|
||||
return false;
|
||||
//if (queueTime > 5*1000) { // bloody arbitrary. well, its half the average message lifetime...
|
||||
if (_outbound.isBacklogged()) { // bloody arbitrary. well, its half the average message lifetime...
|
||||
int size = _outbound.size();
|
||||
if (_log.shouldLog(Log.WARN)) {
|
||||
@@ -538,12 +492,7 @@ public class NTCPConnection implements Closeable {
|
||||
+ ", writeBufs: " + writeBufs + " on " + toString());
|
||||
} catch (RuntimeException e) {} // java.nio.channels.CancelledKeyException
|
||||
}
|
||||
//_context.statManager().addRateData("ntcp.sendBacklogTime", queueTime);
|
||||
return true;
|
||||
//} else if (size > 32) { // another arbitrary limit.
|
||||
// if (_log.shouldLog(Log.ERROR))
|
||||
// _log.error("Too backlogged: queue size is " + size + " and the lifetime of the head is " + queueTime);
|
||||
// return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
@@ -554,22 +503,6 @@ public class NTCPConnection implements Closeable {
|
||||
*/
|
||||
public void enqueueInfoMessage() {
|
||||
int priority = INFO_PRIORITY;
|
||||
//if (!_isInbound) {
|
||||
// Workaround for bug at Bob's end.
|
||||
// This probably isn't helpful because Bob puts the store on the job queue.
|
||||
// Prior to 0.9.12, Bob would only send his RI if he had our RI after
|
||||
// the first received message, so make sure it is first in our queue.
|
||||
// As of 0.9.12 this is fixed and Bob will always send his RI.
|
||||
// RouterInfo target = _context.netDb().lookupRouterInfoLocally(_remotePeer.calculateHash());
|
||||
// if (target != null) {
|
||||
// String v = target.getOption("router.version");
|
||||
// if (v == null || VersionComparator.comp(v, FIXED_RI_VERSION) < 0) {
|
||||
// priority = OutNetMessage.PRIORITY_HIGHEST;
|
||||
// }
|
||||
// } else {
|
||||
// priority = OutNetMessage.PRIORITY_HIGHEST;
|
||||
// }
|
||||
//}
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("SENDING INFO message pri. " + priority + ": " + toString());
|
||||
DatabaseStoreMessage dsm = new DatabaseStoreMessage(_context);
|
||||
@@ -577,52 +510,9 @@ public class NTCPConnection implements Closeable {
|
||||
// We are injecting directly, so we can use a null target.
|
||||
OutNetMessage infoMsg = new OutNetMessage(_context, dsm, _context.clock().now()+10*1000, priority, null);
|
||||
infoMsg.beginSend();
|
||||
//_context.statManager().addRateData("ntcp.infoMessageEnqueued", 1);
|
||||
send(infoMsg);
|
||||
}
|
||||
|
||||
//private static final int PEERS_TO_FLOOD = 3;
|
||||
|
||||
/**
|
||||
* to prevent people from losing track of the floodfill peers completely, lets periodically
|
||||
* send those we are connected to references to the floodfill peers that we know
|
||||
*
|
||||
* Do we really need this anymore??? Peers shouldn't lose track anymore, and if they do,
|
||||
* FloodOnlyLookupJob should recover.
|
||||
* The bandwidth isn't so much, but it is a lot of extra data at connection startup, which
|
||||
* hurts latency of new connections.
|
||||
*/
|
||||
/**********
|
||||
private void enqueueFloodfillMessage(RouterInfo target) {
|
||||
FloodfillNetworkDatabaseFacade fac = (FloodfillNetworkDatabaseFacade)_context.netDb();
|
||||
List peers = fac.getFloodfillPeers();
|
||||
Collections.shuffle(peers);
|
||||
for (int i = 0; i < peers.size() && i < PEERS_TO_FLOOD; i++) {
|
||||
Hash peer = (Hash)peers.get(i);
|
||||
|
||||
// we already sent our own info, and no need to tell them about themselves
|
||||
if (peer.equals(_context.routerHash()) || peer.equals(target.calculateHash()))
|
||||
continue;
|
||||
|
||||
RouterInfo info = fac.lookupRouterInfoLocally(peer);
|
||||
if (info == null)
|
||||
continue;
|
||||
|
||||
OutNetMessage infoMsg = new OutNetMessage(_context);
|
||||
infoMsg.setExpiration(_context.clock().now()+10*1000);
|
||||
DatabaseStoreMessage dsm = new DatabaseStoreMessage(_context);
|
||||
dsm.setKey(peer);
|
||||
dsm.setRouterInfo(info);
|
||||
infoMsg.setMessage(dsm);
|
||||
infoMsg.setPriority(100);
|
||||
infoMsg.setTarget(target);
|
||||
infoMsg.beginSend();
|
||||
_context.statManager().addRateData("ntcp.floodInfoMessageEnqueued", 1, 0);
|
||||
send(infoMsg);
|
||||
}
|
||||
}
|
||||
***********/
|
||||
|
||||
/**
|
||||
* We are Alice.
|
||||
*
|
||||
@@ -644,7 +534,6 @@ public class NTCPConnection implements Closeable {
|
||||
_establishedOn = _context.clock().now();
|
||||
_establishState = EstablishState.VERIFIED;
|
||||
_transport.markReachable(getRemotePeer().calculateHash(), false);
|
||||
//_context.banlist().unbanlistRouter(getRemotePeer().calculateHash(), NTCPTransport.STYLE);
|
||||
boolean msgs = !_outbound.isEmpty();
|
||||
_nextMetaTime = _establishedOn + (META_FREQUENCY / 2) + _context.random().nextInt(META_FREQUENCY);
|
||||
_nextInfoTime = _establishedOn + (INFO_FREQUENCY / 2) + _context.random().nextInt(INFO_FREQUENCY);
|
||||
@@ -652,23 +541,6 @@ public class NTCPConnection implements Closeable {
|
||||
_transport.getWriter().wantsWrite(this, "outbound established");
|
||||
}
|
||||
|
||||
/**
|
||||
// Time vs space tradeoff:
|
||||
// on slow GCing jvms, the mallocs in the following preparation can cause the
|
||||
// write to get congested, taking up a substantial portion of the Writer's
|
||||
// time (and hence, slowing down the transmission to the peer). we could
|
||||
// however do the preparation (up to but not including the aes.encrypt)
|
||||
// as part of the .send(OutNetMessage) above, which runs on less congested
|
||||
// threads (whatever calls OutNetMessagePool.add, which can be the jobqueue,
|
||||
// tunnel builders, simpletimers, etc). that would increase the Writer's
|
||||
// efficiency (speeding up the transmission to the peer) but would require
|
||||
// more memory to hold the serialized preparations of all queued messages, not
|
||||
// just the currently transmitting one.
|
||||
//
|
||||
// hmm.
|
||||
*/
|
||||
private static final boolean FAST_LARGE = true; // otherwise, SLOW_SMALL
|
||||
|
||||
/**
|
||||
* prepare the next i2np message for transmission. this should be run from
|
||||
* the Writer thread pool.
|
||||
@@ -677,111 +549,9 @@ public class NTCPConnection implements Closeable {
|
||||
*
|
||||
*/
|
||||
synchronized void prepareNextWrite(PrepBuffer prep) {
|
||||
//if (FAST_LARGE)
|
||||
prepareNextWriteFast(prep);
|
||||
//else
|
||||
// prepareNextWriteSmall();
|
||||
}
|
||||
|
||||
/********** nobody's tried this one in years
|
||||
private void prepareNextWriteSmall() {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("prepare next write w/ isInbound? " + _isInbound + " established? " + _established);
|
||||
if (!_isInbound && !_established) {
|
||||
if (_establishState == null) {
|
||||
_establishState = new EstablishState(_context, _transport, this);
|
||||
_establishState.prepareOutbound();
|
||||
} else {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("prepare next write, but we have already prepared the first outbound and we are not yet established..." + toString());
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (_nextMetaTime <= System.currentTimeMillis()) {
|
||||
sendMeta();
|
||||
_nextMetaTime = System.currentTimeMillis() + _context.random().nextInt(META_FREQUENCY);
|
||||
}
|
||||
|
||||
OutNetMessage msg = null;
|
||||
synchronized (_outbound) {
|
||||
if (_currentOutbound != null) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("attempt for multiple outbound messages with " + System.identityHashCode(_currentOutbound) + " already waiting and " + _outbound.size() + " queued");
|
||||
return;
|
||||
}
|
||||
//throw new RuntimeException("We should not be preparing a write while we still have one pending");
|
||||
if (!_outbound.isEmpty()) {
|
||||
msg = (OutNetMessage)_outbound.remove(0);
|
||||
_currentOutbound = msg;
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
msg.beginTransmission();
|
||||
msg.beginPrepare();
|
||||
long begin = System.currentTimeMillis();
|
||||
// prepare the message as a binary array, then encrypt it w/ a checksum
|
||||
// and add it to the _writeBufs
|
||||
// E(sizeof(data)+data+pad+crc, sessionKey, prevEncrypted)
|
||||
I2NPMessage m = msg.getMessage();
|
||||
int sz = m.getMessageSize();
|
||||
int min = 2 + sz + 4;
|
||||
int rem = min % 16;
|
||||
int padding = 0;
|
||||
if (rem > 0)
|
||||
padding = 16 - rem;
|
||||
|
||||
byte unencrypted[] = new byte[min+padding];
|
||||
byte base[] = m.toByteArray();
|
||||
DataHelper.toLong(unencrypted, 0, 2, sz);
|
||||
System.arraycopy(base, 0, unencrypted, 2, base.length);
|
||||
if (padding > 0) {
|
||||
byte pad[] = new byte[padding];
|
||||
_context.random().nextBytes(pad);
|
||||
System.arraycopy(pad, 0, unencrypted, 2+sz, padding);
|
||||
}
|
||||
|
||||
long serialized = System.currentTimeMillis();
|
||||
Adler32 crc = new Adler32();
|
||||
crc.reset();
|
||||
crc.update(unencrypted, 0, unencrypted.length-4);
|
||||
long val = crc.getValue();
|
||||
DataHelper.toLong(unencrypted, unencrypted.length-4, 4, val);
|
||||
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Outbound message " + _messagesWritten + " has crc " + val);
|
||||
|
||||
long crced = System.currentTimeMillis();
|
||||
byte encrypted[] = new byte[unencrypted.length];
|
||||
_context.aes().encrypt(unencrypted, 0, encrypted, 0, _sessionKey, _prevWriteEnd, 0, unencrypted.length);
|
||||
System.arraycopy(encrypted, encrypted.length-16, _prevWriteEnd, 0, _prevWriteEnd.length);
|
||||
long encryptedTime = System.currentTimeMillis();
|
||||
msg.prepared();
|
||||
if (_log.shouldLog(Log.DEBUG)) {
|
||||
_log.debug("prepared outbound " + System.identityHashCode(msg)
|
||||
+ " serialize=" + (serialized-begin)
|
||||
+ " crc=" + (crced-serialized)
|
||||
+ " encrypted=" + (encryptedTime-crced)
|
||||
+ " prepared=" + (encryptedTime-begin));
|
||||
}
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("Encrypting " + msg + " [" + System.identityHashCode(msg) + "] crc=" + crc.getValue() + "\nas: "
|
||||
// + Base64.encode(encrypted, 0, 16) + "...\ndecrypted: "
|
||||
// + Base64.encode(unencrypted, 0, 16) + "..." + "\nIV=" + Base64.encode(_prevWriteEnd, 0, 16));
|
||||
_transport.getPumper().wantsWrite(this, encrypted);
|
||||
|
||||
// for every 6-12 hours that we are connected to a peer, send them
|
||||
// our updated netDb info (they may not accept it and instead query
|
||||
// the floodfill netDb servers, but they may...)
|
||||
if (_nextInfoTime <= System.currentTimeMillis()) {
|
||||
enqueueInfoMessage();
|
||||
_nextInfoTime = System.currentTimeMillis() + (INFO_FREQUENCY / 2) + _context.random().nextInt(INFO_FREQUENCY);
|
||||
}
|
||||
}
|
||||
**********/
|
||||
|
||||
/**
|
||||
* prepare the next i2np message for transmission. this should be run from
|
||||
* the Writer thread pool.
|
||||
@@ -816,60 +586,16 @@ public class NTCPConnection implements Closeable {
|
||||
_log.info("attempt for multiple outbound messages with " + System.identityHashCode(_currentOutbound) + " already waiting and " + _outbound.size() + " queued");
|
||||
return;
|
||||
}
|
||||
/****
|
||||
//throw new RuntimeException("We should not be preparing a write while we still have one pending");
|
||||
if (queueTime() > 3*1000) { // don't stall low-priority messages
|
||||
****/
|
||||
msg = _outbound.poll();
|
||||
if (msg == null)
|
||||
return;
|
||||
/****
|
||||
} else {
|
||||
// FIXME
|
||||
// This is a linear search to implement a priority queue, O(n**2)
|
||||
// Also race with unsynchronized removal in close() above
|
||||
// Either implement a real (concurrent?) priority queue or just comment out all of this,
|
||||
// as it isn't clear how effective the priorities on a per-connection basis are.
|
||||
int slot = 0; // only for logging
|
||||
Iterator<OutNetMessage> it = _outbound.iterator();
|
||||
for (int i = 0; it.hasNext() && i < 75; i++) { //arbitrary bound
|
||||
OutNetMessage mmsg = it.next();
|
||||
if (msg == null || mmsg.getPriority() > msg.getPriority()) {
|
||||
msg = mmsg;
|
||||
slot = i;
|
||||
}
|
||||
}
|
||||
if (msg == null)
|
||||
return;
|
||||
// if (_outbound.indexOf(msg) > 0)
|
||||
// _log.debug("Priority message sent, pri = " + msg.getPriority() + " pos = " + _outbound.indexOf(msg) + "/" +_outbound.size());
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Type " + msg.getMessage().getType() + " pri " + msg.getPriority() + " slot " + slot);
|
||||
boolean removed = _outbound.remove(msg);
|
||||
if ((!removed) && _log.shouldLog(Log.WARN))
|
||||
_log.warn("Already removed??? " + msg.getMessage().getType());
|
||||
}
|
||||
****/
|
||||
_currentOutbound = msg;
|
||||
}
|
||||
|
||||
//long begin = System.currentTimeMillis();
|
||||
bufferedPrepare(msg,buf);
|
||||
_context.aes().encrypt(buf.unencrypted, 0, buf.encrypted, 0, _sessionKey, _prevWriteEnd, 0, buf.unencryptedLength);
|
||||
System.arraycopy(buf.encrypted, buf.encrypted.length-16, _prevWriteEnd, 0, _prevWriteEnd.length);
|
||||
//long encryptedTime = System.currentTimeMillis();
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("Encrypting " + msg + " [" + System.identityHashCode(msg) + "] crc=" + crc.getValue() + "\nas: "
|
||||
// + Base64.encode(encrypted, 0, 16) + "...\ndecrypted: "
|
||||
// + Base64.encode(unencrypted, 0, 16) + "..." + "\nIV=" + Base64.encode(_prevWriteEnd, 0, 16));
|
||||
_transport.getPumper().wantsWrite(this, buf.encrypted);
|
||||
//long wantsTime = System.currentTimeMillis();
|
||||
//long releaseTime = System.currentTimeMillis();
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("prepared outbound " + System.identityHashCode(msg)
|
||||
// + " encrypted=" + (encryptedTime-begin)
|
||||
// + " wantsWrite=" + (wantsTime-encryptedTime)
|
||||
// + " releaseBuf=" + (releaseTime-wantsTime));
|
||||
|
||||
// for every 6-12 hours that we are connected to a peer, send them
|
||||
// our updated netDb info (they may not accept it and instead query
|
||||
@@ -889,15 +615,9 @@ public class NTCPConnection implements Closeable {
|
||||
* @param buf PrepBuffer to use as scratch space
|
||||
*/
|
||||
private void bufferedPrepare(OutNetMessage msg, PrepBuffer buf) {
|
||||
//if (!_isInbound && !_established)
|
||||
// return;
|
||||
//long begin = System.currentTimeMillis();
|
||||
//long alloc = System.currentTimeMillis();
|
||||
|
||||
I2NPMessage m = msg.getMessage();
|
||||
buf.baseLength = m.toByteArray(buf.base);
|
||||
int sz = buf.baseLength;
|
||||
//int sz = m.getMessageSize();
|
||||
int min = 2 + sz + 4;
|
||||
int rem = min % 16;
|
||||
int padding = 0;
|
||||
@@ -911,7 +631,6 @@ public class NTCPConnection implements Closeable {
|
||||
_context.random().nextBytes(buf.unencrypted, 2+sz, padding);
|
||||
}
|
||||
|
||||
//long serialized = System.currentTimeMillis();
|
||||
buf.crc.update(buf.unencrypted, 0, buf.unencryptedLength-4);
|
||||
|
||||
long val = buf.crc.getValue();
|
||||
@@ -926,11 +645,6 @@ public class NTCPConnection implements Closeable {
|
||||
// 3) change EventPumper.wantsWrite() to take a ByteBuffer arg
|
||||
// 4) in EventPumper.processWrite(), release the byte buffer
|
||||
buf.encrypted = new byte[buf.unencryptedLength];
|
||||
|
||||
//long crced = System.currentTimeMillis();
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("Buffered prepare took " + (crced-begin) + ", alloc=" + (alloc-begin)
|
||||
// + " serialize=" + (serialized-alloc) + " crc=" + (crced-serialized));
|
||||
}
|
||||
|
||||
public static class PrepBuffer {
|
||||
@@ -984,7 +698,6 @@ public class NTCPConnection implements Closeable {
|
||||
// longer interested in reading from the network), but we aren't
|
||||
// throttled anymore, so we should resume being interested in reading
|
||||
_transport.getPumper().wantsRead(NTCPConnection.this);
|
||||
//_transport.getReader().wantsRead(this);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1046,7 +759,6 @@ public class NTCPConnection implements Closeable {
|
||||
*/
|
||||
public void recv(ByteBuffer buf) {
|
||||
_bytesReceived += buf.remaining();
|
||||
//buf.flip();
|
||||
_readBufs.offer(buf);
|
||||
_transport.getReader().wantsRead(this);
|
||||
updateStats();
|
||||
@@ -1057,9 +769,7 @@ public class NTCPConnection implements Closeable {
|
||||
* been fully allocated for the bandwidth limiter.
|
||||
*/
|
||||
public void write(ByteBuffer buf) {
|
||||
//if (_log.shouldLog(Log.DEBUG)) _log.debug("Before write(buf)");
|
||||
_writeBufs.offer(buf);
|
||||
//if (_log.shouldLog(Log.DEBUG)) _log.debug("After write(buf)");
|
||||
_transport.getPumper().wantsWrite(this);
|
||||
}
|
||||
|
||||
@@ -1122,12 +832,6 @@ public class NTCPConnection implements Closeable {
|
||||
if (getOutboundQueueSize() > 0) // push through the bw limiter to reach _writeBufs
|
||||
_transport.getWriter().wantsWrite(this, "write completed");
|
||||
|
||||
// this is not necessary, EventPumper.processWrite() handles this
|
||||
// and it just causes unnecessary selector.wakeup() and looping
|
||||
//boolean bufsRemain = !_writeBufs.isEmpty();
|
||||
//if (bufsRemain) // send asap
|
||||
// _transport.getPumper().wantsWrite(this);
|
||||
|
||||
updateStats();
|
||||
}
|
||||
|
||||
@@ -1139,8 +843,6 @@ public class NTCPConnection implements Closeable {
|
||||
private long _lastBytesSent;
|
||||
private float _sendBps;
|
||||
private float _recvBps;
|
||||
//private float _sendBps15s;
|
||||
//private float _recvBps15s;
|
||||
|
||||
public float getSendRate() { return _sendBps; }
|
||||
public float getRecvRate() { return _recvBps; }
|
||||
@@ -1165,18 +867,6 @@ public class NTCPConnection implements Closeable {
|
||||
|
||||
_sendBps = (0.9f)*_sendBps + (0.1f)*(sent*1000f)/time;
|
||||
_recvBps = (0.9f)*_recvBps + (0.1f)*((float)recv*1000)/time;
|
||||
|
||||
// Maintain an approximate average with a 15-second halflife
|
||||
// Weights (0.955 and 0.045) are tuned so that transition between two values (e.g. 0..10)
|
||||
// would reach their midpoint (e.g. 5) in 15s
|
||||
//_sendBps15s = (0.955f)*_sendBps15s + (0.045f)*((float)sent*1000f)/(float)time;
|
||||
//_recvBps15s = (0.955f)*_recvBps15s + (0.045f)*((float)recv*1000)/(float)time;
|
||||
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("Rates updated to "
|
||||
// + _sendBps + '/' + _recvBps + "Bps in/out "
|
||||
// //+ _sendBps15s + "/" + _recvBps15s + "Bps in/out 15s after "
|
||||
// + sent + '/' + recv + " in " + DataHelper.formatDuration(time));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1194,8 +884,6 @@ public class NTCPConnection implements Closeable {
|
||||
* as reader will call EventPumper.releaseBuf().
|
||||
*/
|
||||
synchronized void recvEncryptedI2NP(ByteBuffer buf) {
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("receive encrypted i2np: " + buf.remaining());
|
||||
// hasArray() is false for direct buffers, at least on my system...
|
||||
if (_curReadBlockIndex == 0 && buf.hasArray()) {
|
||||
// fast way
|
||||
@@ -1212,16 +900,12 @@ public class NTCPConnection implements Closeable {
|
||||
buf.get(_curReadBlock, _curReadBlockIndex, want);
|
||||
_curReadBlockIndex += want;
|
||||
}
|
||||
//_curReadBlock[_curReadBlockIndex++] = buf.get();
|
||||
if (_curReadBlockIndex >= BLOCK_SIZE) {
|
||||
// cbc
|
||||
_context.aes().decryptBlock(_curReadBlock, 0, _sessionKey, _decryptBlockBuf, 0);
|
||||
//DataHelper.xor(_decryptBlockBuf, 0, _prevReadBlock, 0, _decryptBlockBuf, 0, BLOCK_SIZE);
|
||||
for (int i = 0; i < BLOCK_SIZE; i++) {
|
||||
_decryptBlockBuf[i] ^= _prevReadBlock[i];
|
||||
}
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("parse decrypted i2np block (remaining: " + buf.remaining() + ")");
|
||||
boolean ok = recvUnencryptedI2NP();
|
||||
if (!ok) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
@@ -1258,14 +942,11 @@ public class NTCPConnection implements Closeable {
|
||||
for ( ; pos < end && !_closed.get(); pos += BLOCK_SIZE) {
|
||||
_context.aes().decryptBlock(array, pos, _sessionKey, _decryptBlockBuf, 0);
|
||||
if (first) {
|
||||
// XOR with _prevReadBlock the first time...
|
||||
//DataHelper.xor(_decryptBlockBuf, 0, _prevReadBlock, 0, _decryptBlockBuf, 0, BLOCK_SIZE);
|
||||
for (int i = 0; i < BLOCK_SIZE; i++) {
|
||||
_decryptBlockBuf[i] ^= _prevReadBlock[i];
|
||||
}
|
||||
first = false;
|
||||
} else {
|
||||
//DataHelper.xor(_decryptBlockBuf, 0, array, pos - BLOCK_SIZE, _decryptBlockBuf, 0, BLOCK_SIZE);
|
||||
int start = pos - BLOCK_SIZE;
|
||||
for (int i = 0; i < BLOCK_SIZE; i++) {
|
||||
_decryptBlockBuf[i] ^= array[start + i];
|
||||
@@ -1371,7 +1052,6 @@ public class NTCPConnection implements Closeable {
|
||||
_log.debug("Sending NTCP metadata");
|
||||
_sendingMeta = true;
|
||||
_transport.getPumper().wantsWrite(this, encrypted);
|
||||
// enqueueInfoMessage(); // this often?
|
||||
}
|
||||
|
||||
private static final int MAX_HANDLERS = 4;
|
||||
@@ -1392,9 +1072,6 @@ public class NTCPConnection implements Closeable {
|
||||
_i2npHandlers.offer(handler);
|
||||
}
|
||||
|
||||
|
||||
//public long getReadTime() { return _curReadState.getReadTime(); }
|
||||
|
||||
private static ByteArray acquireReadBuf() {
|
||||
return _dataReadBufs.acquire();
|
||||
}
|
||||
@@ -1469,17 +1146,6 @@ public class NTCPConnection implements Closeable {
|
||||
}
|
||||
}
|
||||
|
||||
/****
|
||||
public long getReadTime() {
|
||||
long now = System.currentTimeMillis();
|
||||
long readTime = now - _stateBegin;
|
||||
if (readTime >= now)
|
||||
return -1;
|
||||
else
|
||||
return readTime;
|
||||
}
|
||||
****/
|
||||
|
||||
/** @param buf 16 bytes */
|
||||
private void receiveInitial(byte buf[]) {
|
||||
_size = (int)DataHelper.fromLong(buf, 0, 2);
|
||||
@@ -1518,22 +1184,15 @@ public class NTCPConnection implements Closeable {
|
||||
receiveLastBlock(buf);
|
||||
} else {
|
||||
_crc.update(buf);
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("update read state with another block (remaining: " + remaining + ")");
|
||||
}
|
||||
}
|
||||
|
||||
/** @param buf 16 bytes */
|
||||
private void receiveLastBlock(byte buf[]) {
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("block remaining in the last block: " + (buf.length-blockUsed));
|
||||
|
||||
// on the last block
|
||||
_expectedCrc = DataHelper.fromLong(buf, buf.length-4, 4);
|
||||
_crc.update(buf, 0, buf.length-4);
|
||||
long val = _crc.getValue();
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("CRC value computed: " + val + " expected: " + _expectedCrc + " size: " + _size);
|
||||
if (val == _expectedCrc) {
|
||||
try {
|
||||
I2NPMessageHandler h = acquireHandler(_context);
|
||||
@@ -1569,21 +1228,11 @@ public class NTCPConnection implements Closeable {
|
||||
ime);
|
||||
}
|
||||
_context.statManager().addRateData("ntcp.corruptI2NPIME", 1);
|
||||
// Don't close the con, possible attack vector, not necessarily the peer's fault,
|
||||
// and should be recoverable
|
||||
// handler and databuf are lost if we do this
|
||||
//close();
|
||||
//return;
|
||||
}
|
||||
} else {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("CRC incorrect for message " + _messagesRead + " (calc=" + val + " expected=" + _expectedCrc + ") size=" + _size + " blocks " + _blocks);
|
||||
_context.statManager().addRateData("ntcp.corruptI2NPCRC", 1);
|
||||
// This probably can't be spoofed from somebody else, but do we really need to close it?
|
||||
// This is rare.
|
||||
//close();
|
||||
// databuf is lost if we do this
|
||||
//return;
|
||||
}
|
||||
// get it ready for the next I2NP message
|
||||
init();
|
||||
|
Reference in New Issue
Block a user