forked from I2P_Developers/i2p.i2p
Transport: Async NTCP writes (MR !43)
- Write directly from writer threads, except for during establishment and when write doesn't complete; throw those to the pumper as usual - New NTCPCon writelock, readlock, and statlock (formerly all on NTCPCon.this) to prevent deadlocks - NTCPCon chan and key now volatile, remove synch to prevent deadlocks - All interestOps changes now lock on the key via setInterest() and clearInterest() since changes may now happen in multiple threads - Set _paddingConfig at initialization to avoid NPE Greatly reduces pumper loops and CPU As proposed by jogger Reviewed by zlatinb Ref: http://zzz.i2p/topics/3192
This commit is contained in:
@ -1,3 +1,6 @@
|
||||
2021-12-21 zzz
|
||||
* Transport: Async NTCP writes (MR !43)
|
||||
|
||||
2021-12-18 zzz
|
||||
* Tunnels:
|
||||
- Restore support for IP restriction in client tunnels (MR !45)
|
||||
|
@ -18,7 +18,7 @@ public class RouterVersion {
|
||||
/** deprecated */
|
||||
public final static String ID = "Git";
|
||||
public final static String VERSION = CoreVersion.VERSION;
|
||||
public final static long BUILD = 3;
|
||||
public final static long BUILD = 4;
|
||||
|
||||
/** for example "-test" */
|
||||
public final static String EXTRA = "";
|
||||
|
@ -280,15 +280,17 @@ class EventPumper implements Runnable {
|
||||
continue;
|
||||
}
|
||||
|
||||
if ( (!con.isWriteBufEmpty()) &&
|
||||
((key.interestOps() & SelectionKey.OP_WRITE) == 0) ) {
|
||||
// the data queued to be sent has already passed through
|
||||
// the bw limiter and really just wants to get shoved
|
||||
// out the door asap.
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Failsafe write for " + con);
|
||||
key.interestOps(SelectionKey.OP_WRITE | key.interestOps());
|
||||
failsafeWrites++;
|
||||
synchronized(con.getWriteLock()) {
|
||||
if ( (!con.isWriteBufEmpty()) &&
|
||||
((key.interestOps() & SelectionKey.OP_WRITE) == 0) ) {
|
||||
// the data queued to be sent has already passed through
|
||||
// the bw limiter and really just wants to get shoved
|
||||
// out the door asap.
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Failsafe write for " + con);
|
||||
setInterest(key, SelectionKey.OP_WRITE);
|
||||
failsafeWrites++;
|
||||
}
|
||||
}
|
||||
|
||||
final long expire;
|
||||
@ -406,7 +408,7 @@ class EventPumper implements Runnable {
|
||||
processAccept(key);
|
||||
}
|
||||
if (connect) {
|
||||
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT);
|
||||
clearInterest(key, SelectionKey.OP_CONNECT);
|
||||
processConnect(key);
|
||||
}
|
||||
if (read) {
|
||||
@ -637,7 +639,7 @@ class EventPumper implements Runnable {
|
||||
FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestInbound(read, "NTCP read"); //con, buf);
|
||||
if (req.getPendingRequested() > 0) {
|
||||
// rare since we generally don't throttle inbound
|
||||
key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
|
||||
clearInterest(key, SelectionKey.OP_READ);
|
||||
_context.statManager().addRateData("ntcp.queuedRecv", read);
|
||||
con.queuedRecv(buf, req);
|
||||
break;
|
||||
@ -699,7 +701,7 @@ class EventPumper implements Runnable {
|
||||
if (buf != null)
|
||||
releaseBuf(buf);
|
||||
// ???
|
||||
key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
|
||||
clearInterest(key, SelectionKey.OP_READ);
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("error reading on " + con, nyce);
|
||||
}
|
||||
@ -712,50 +714,75 @@ class EventPumper implements Runnable {
|
||||
*/
|
||||
private void processWrite(SelectionKey key) {
|
||||
final NTCPConnection con = (NTCPConnection)key.attachment();
|
||||
processWrite(con, key);
|
||||
}
|
||||
|
||||
/**
|
||||
* Asynchronous write all buffers to the channel.
|
||||
* This method will disable the interest if no more writes remain.
|
||||
* If this returns false, caller MUST call wantsWrite(con)
|
||||
*
|
||||
* @param key non-null
|
||||
* @return true if all buffers were completely written, false if buffers remain
|
||||
* @since 0.9.53
|
||||
*/
|
||||
public boolean processWrite(final NTCPConnection con, final SelectionKey key) {
|
||||
boolean rv = false;
|
||||
final SocketChannel chan = con.getChannel();
|
||||
try {
|
||||
while (true) {
|
||||
ByteBuffer buf = con.getNextWriteBuf();
|
||||
if (buf != null) {
|
||||
if (buf.remaining() <= 0) {
|
||||
con.removeWriteBuf(buf);
|
||||
continue;
|
||||
}
|
||||
int written = chan.write(buf);
|
||||
//totalWritten += written;
|
||||
if (written == 0) {
|
||||
if ( (buf.remaining() > 0) || (!con.isWriteBufEmpty()) ) {
|
||||
synchronized(con.getWriteLock()) {
|
||||
while (true) {
|
||||
ByteBuffer buf = con.getNextWriteBuf();
|
||||
if (buf != null) {
|
||||
if (buf.remaining() <= 0) {
|
||||
con.removeWriteBuf(buf);
|
||||
continue;
|
||||
}
|
||||
int written = chan.write(buf);
|
||||
//totalWritten += written;
|
||||
if (written == 0) {
|
||||
if ( (buf.remaining() > 0) || (!con.isWriteBufEmpty()) ) {
|
||||
// stay interested
|
||||
//key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
|
||||
} else {
|
||||
rv = true;
|
||||
}
|
||||
break;
|
||||
} else if (buf.remaining() > 0) {
|
||||
// stay interested
|
||||
//key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
|
||||
break;
|
||||
} else {
|
||||
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
|
||||
con.removeWriteBuf(buf);
|
||||
//if (buffer time is too much, add OP_WRITE to the interest ops and break?)
|
||||
// LOOP
|
||||
}
|
||||
} else {
|
||||
// Nothing more to write
|
||||
if (key.isValid()) {
|
||||
rv = true;
|
||||
}
|
||||
break;
|
||||
} else if (buf.remaining() > 0) {
|
||||
// stay interested
|
||||
//key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
|
||||
continue;
|
||||
} else {
|
||||
con.removeWriteBuf(buf);
|
||||
//if (buffer time is too much, add OP_WRITe to the interest ops and break?)
|
||||
// LOOP
|
||||
}
|
||||
} else {
|
||||
// Nothing more to write
|
||||
if (key.isValid())
|
||||
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
|
||||
break;
|
||||
}
|
||||
if (rv)
|
||||
clearInterest(key, SelectionKey.OP_WRITE);
|
||||
else
|
||||
setInterest(key, SelectionKey.OP_WRITE);
|
||||
}
|
||||
// catch and close outside the write lock to avoid deadlocks in NTCPCon.locked_close()
|
||||
} catch (CancelledKeyException cke) {
|
||||
if (_log.shouldLog(Log.WARN)) _log.warn("error writing on " + con, cke);
|
||||
_context.statManager().addRateData("ntcp.writeError", 1);
|
||||
con.close();
|
||||
rv = true;
|
||||
} catch (IOException ioe) {
|
||||
if (_log.shouldLog(Log.WARN)) _log.warn("error writing on " + con, ioe);
|
||||
_context.statManager().addRateData("ntcp.writeError", 1);
|
||||
con.close();
|
||||
rv = true;
|
||||
}
|
||||
return rv;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -768,7 +795,7 @@ class EventPumper implements Runnable {
|
||||
while ((con = _wantsRead.poll()) != null) {
|
||||
SelectionKey key = con.getKey();
|
||||
try {
|
||||
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
|
||||
setInterest(key, SelectionKey.OP_READ);
|
||||
} catch (CancelledKeyException cke) {
|
||||
// ignore, we remove/etc elsewhere
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
@ -803,7 +830,7 @@ class EventPumper implements Runnable {
|
||||
if (key == null)
|
||||
continue;
|
||||
try {
|
||||
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
|
||||
setInterest(key, SelectionKey.OP_WRITE);
|
||||
} catch (CancelledKeyException cke) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("RDE CKE 2", cke);
|
||||
@ -844,7 +871,7 @@ class EventPumper implements Runnable {
|
||||
boolean connected = schan.connect(saddr);
|
||||
if (connected) {
|
||||
// Never happens, we use nonblocking
|
||||
key.interestOps(SelectionKey.OP_READ);
|
||||
setInterest(key, SelectionKey.OP_READ);
|
||||
processConnect(key);
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
@ -880,4 +907,32 @@ class EventPumper implements Runnable {
|
||||
}
|
||||
|
||||
public long getIdleTimeout() { return _expireIdleWriteTime; }
|
||||
|
||||
/**
|
||||
* Warning - caller should catch unchecked CancelledKeyException
|
||||
*
|
||||
* @throws CancelledKeyException which is unchecked
|
||||
* @since 0.9.53
|
||||
*/
|
||||
public static void setInterest(SelectionKey key, int op) throws CancelledKeyException {
|
||||
synchronized(key) {
|
||||
int old = key.interestOps();
|
||||
if ((old & op) == 0)
|
||||
key.interestOps(old | op);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Warning - caller should catch unchecked CancelledKeyException
|
||||
*
|
||||
* @throws CancelledKeyException which is unchecked
|
||||
* @since 0.9.53
|
||||
*/
|
||||
public static void clearInterest(SelectionKey key, int op) throws CancelledKeyException {
|
||||
synchronized(key) {
|
||||
int old = key.interestOps();
|
||||
if ((old & op) != 0)
|
||||
key.interestOps(old & ~op);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.Inet6Address;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.CancelledKeyException;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.security.GeneralSecurityException;
|
||||
@ -63,8 +64,8 @@ import net.i2p.util.VersionComparator;
|
||||
public class NTCPConnection implements Closeable {
|
||||
private final RouterContext _context;
|
||||
private final Log _log;
|
||||
private SocketChannel _chan;
|
||||
private SelectionKey _conKey;
|
||||
private volatile SocketChannel _chan;
|
||||
private volatile SelectionKey _conKey;
|
||||
private final FIFOBandwidthLimiter.CompleteListener _inboundListener;
|
||||
private final FIFOBandwidthLimiter.CompleteListener _outboundListener;
|
||||
/**
|
||||
@ -95,7 +96,7 @@ public class NTCPConnection implements Closeable {
|
||||
//private final CoDelPriorityBlockingQueue<OutNetMessage> _outbound;
|
||||
private final PriBlockingQueue<OutNetMessage> _outbound;
|
||||
/**
|
||||
* current prepared OutNetMessages, or empty - synchronize to modify or read
|
||||
* current prepared OutNetMessages, or empty - synchronize on _writeLock
|
||||
*/
|
||||
private final List<OutNetMessage> _currentOutbound;
|
||||
private SessionKey _sessionKey;
|
||||
@ -111,6 +112,15 @@ public class NTCPConnection implements Closeable {
|
||||
// prevent sending meta before established
|
||||
private long _nextMetaTime = Long.MAX_VALUE;
|
||||
private final AtomicInteger _consecutiveZeroReads = new AtomicInteger();
|
||||
// This lock covers:
|
||||
// _curReadState
|
||||
private final Object _readLock = new Object();
|
||||
// This lock covers:
|
||||
// _writeBufs, _currentOutbound, _outbound, _sendSipk1, _sendSipk2, _sendSipIV, _sender
|
||||
private final Object _writeLock = new Object();
|
||||
// This lock covers:
|
||||
// _bytesReceived, _bytesSent, _lastBytesReceived, _lastBytesSent, _sendBps, _recvBps
|
||||
private final Object _statLock = new Object();
|
||||
|
||||
private static final int BLOCK_SIZE = 16;
|
||||
private static final int META_SIZE = BLOCK_SIZE;
|
||||
@ -195,7 +205,7 @@ public class NTCPConnection implements Closeable {
|
||||
DELAY_DEFAULT, DELAY_DEFAULT);
|
||||
private static final int MIN_PADDING_RANGE = 16;
|
||||
private static final int MAX_PADDING_RANGE = 128;
|
||||
private NTCP2Options _paddingConfig;
|
||||
private NTCP2Options _paddingConfig = OUR_PADDING;
|
||||
private int _version;
|
||||
private CipherState _sender;
|
||||
private long _sendSipk1, _sendSipk2;
|
||||
@ -267,14 +277,14 @@ public class NTCPConnection implements Closeable {
|
||||
/**
|
||||
* Valid for inbound; valid for outbound shortly after creation
|
||||
*/
|
||||
public synchronized SocketChannel getChannel() { return _chan; }
|
||||
public SocketChannel getChannel() { return _chan; }
|
||||
|
||||
/**
|
||||
* Valid for inbound; valid for outbound shortly after creation
|
||||
*/
|
||||
public synchronized SelectionKey getKey() { return _conKey; }
|
||||
public synchronized void setChannel(SocketChannel chan) { _chan = chan; }
|
||||
public synchronized void setKey(SelectionKey key) { _conKey = key; }
|
||||
public SelectionKey getKey() { return _conKey; }
|
||||
public void setChannel(SocketChannel chan) { _chan = chan; }
|
||||
public void setKey(SelectionKey key) { _conKey = key; }
|
||||
|
||||
public boolean isInbound() { return _isInbound; }
|
||||
public boolean isEstablished() { return _establishState.isComplete(); }
|
||||
@ -341,7 +351,7 @@ public class NTCPConnection implements Closeable {
|
||||
|
||||
public int getOutboundQueueSize() {
|
||||
int queued = _outbound.size();
|
||||
synchronized(_currentOutbound) {
|
||||
synchronized(_writeLock) {
|
||||
queued += _currentOutbound.size();
|
||||
}
|
||||
return queued;
|
||||
@ -349,7 +359,7 @@ public class NTCPConnection implements Closeable {
|
||||
|
||||
/** @since 0.9.36 */
|
||||
private boolean hasCurrentOutbound() {
|
||||
synchronized(_currentOutbound) {
|
||||
synchronized(_writeLock) {
|
||||
return ! _currentOutbound.isEmpty();
|
||||
}
|
||||
}
|
||||
@ -361,7 +371,7 @@ public class NTCPConnection implements Closeable {
|
||||
*/
|
||||
private int drainOutboundTo(Queue<OutNetMessage> to) {
|
||||
int rv = 0;
|
||||
synchronized (_currentOutbound) {
|
||||
synchronized (_writeLock) {
|
||||
rv = _currentOutbound.size();
|
||||
if (rv > 0) {
|
||||
to.addAll(_currentOutbound);
|
||||
@ -507,37 +517,37 @@ public class NTCPConnection implements Closeable {
|
||||
}
|
||||
_bwOutRequests.clear();
|
||||
|
||||
_writeBufs.clear();
|
||||
ByteBuffer bb;
|
||||
while ((bb = _readBufs.poll()) != null) {
|
||||
EventPumper.releaseBuf(bb);
|
||||
}
|
||||
|
||||
List<OutNetMessage> pending = new ArrayList<OutNetMessage>();
|
||||
//_outbound.drainAllTo(pending);
|
||||
_outbound.drainTo(pending);
|
||||
synchronized(_currentOutbound) {
|
||||
synchronized(_writeLock) {
|
||||
_writeBufs.clear();
|
||||
_outbound.drainTo(pending);
|
||||
if (!_currentOutbound.isEmpty())
|
||||
pending.addAll(_currentOutbound);
|
||||
_currentOutbound.clear();
|
||||
if (_sender != null) {
|
||||
_sender.destroy();
|
||||
_sender = null;
|
||||
}
|
||||
// zero out everything we can
|
||||
_sendSipk1 = 0;
|
||||
_sendSipk2 = 0;
|
||||
if (_sendSipIV != null) {
|
||||
Arrays.fill(_sendSipIV, (byte) 0);
|
||||
_sendSipIV = null;
|
||||
}
|
||||
}
|
||||
for (OutNetMessage msg : pending) {
|
||||
_transport.afterSend(msg, false, allowRequeue, msg.getLifetime());
|
||||
}
|
||||
// zero out everything we can
|
||||
if (_curReadState != null) {
|
||||
_curReadState.destroy();
|
||||
_curReadState = null;
|
||||
}
|
||||
if (_sender != null) {
|
||||
_sender.destroy();
|
||||
_sender = null;
|
||||
}
|
||||
_sendSipk1 = 0;
|
||||
_sendSipk2 = 0;
|
||||
if (_sendSipIV != null) {
|
||||
Arrays.fill(_sendSipIV, (byte) 0);
|
||||
_sendSipIV = null;
|
||||
synchronized(_readLock) {
|
||||
ByteBuffer bb;
|
||||
while ((bb = _readBufs.poll()) != null) {
|
||||
EventPumper.releaseBuf(bb);
|
||||
}
|
||||
if (_curReadState != null) {
|
||||
_curReadState.destroy();
|
||||
_curReadState = null;
|
||||
}
|
||||
}
|
||||
return old;
|
||||
}
|
||||
@ -570,7 +580,7 @@ public class NTCPConnection implements Closeable {
|
||||
int writeBufs = _writeBufs.size();
|
||||
boolean currentOutboundSet;
|
||||
long seq;
|
||||
synchronized(_currentOutbound) {
|
||||
synchronized(_writeLock) {
|
||||
currentOutboundSet = !_currentOutbound.isEmpty();
|
||||
seq = currentOutboundSet ? _currentOutbound.get(0).getSeqNum() : -1;
|
||||
}
|
||||
@ -615,7 +625,7 @@ public class NTCPConnection implements Closeable {
|
||||
* @param prep an instance of PrepBuffer to use as scratch space
|
||||
*
|
||||
*/
|
||||
synchronized void prepareNextWrite(PrepBuffer prep) {
|
||||
void prepareNextWrite(PrepBuffer prep) {
|
||||
if (_closed.get())
|
||||
return;
|
||||
// Must be established or else session key is null and we can't encrypt
|
||||
@ -625,7 +635,10 @@ public class NTCPConnection implements Closeable {
|
||||
if (!isEstablished()) {
|
||||
return;
|
||||
}
|
||||
prepareNextWriteNTCP2(prep);
|
||||
|
||||
synchronized(_writeLock) {
|
||||
prepareNextWriteNTCP2(prep);
|
||||
}
|
||||
}
|
||||
|
||||
static class PrepBuffer {
|
||||
@ -647,7 +660,7 @@ public class NTCPConnection implements Closeable {
|
||||
* Prepare the next I2NP message for transmission. This should be run from
|
||||
* the Writer thread pool.
|
||||
*
|
||||
* Caller must synchronize.
|
||||
* Caller must synchronize on _writeLock.
|
||||
*
|
||||
* @param buf we use buf.enencrypted only
|
||||
* @since 0.9.36
|
||||
@ -656,7 +669,7 @@ public class NTCPConnection implements Closeable {
|
||||
int size = OutboundNTCP2State.MAC_SIZE;
|
||||
List<Block> blocks = new ArrayList<Block>(4);
|
||||
long now = _context.clock().now();
|
||||
synchronized (_currentOutbound) {
|
||||
/* synchronized (_currentOutbound) */ {
|
||||
if (!_currentOutbound.isEmpty()) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("attempt for multiple outbound messages with " + _currentOutbound.size() + " already waiting and " + _outbound.size() + " queued");
|
||||
@ -843,9 +856,10 @@ public class NTCPConnection implements Closeable {
|
||||
*/
|
||||
void sendTerminationAndClose() {
|
||||
ReadState rs = null;
|
||||
synchronized (this) {
|
||||
if (_version == 2 && isEstablished())
|
||||
if (_version == 2 && isEstablished()) {
|
||||
synchronized (_readLock) {
|
||||
rs = _curReadState;
|
||||
}
|
||||
}
|
||||
if (rs != null)
|
||||
sendTermination(REASON_TIMEOUT, rs.getFramesReceived());
|
||||
@ -878,7 +892,7 @@ public class NTCPConnection implements Closeable {
|
||||
}
|
||||
// use a "read buf" for the temp array
|
||||
ByteArray dataBuf = acquireReadBuf();
|
||||
synchronized(this) {
|
||||
synchronized(_writeLock) {
|
||||
if (_sender != null) {
|
||||
sendNTCP2(dataBuf.getData(), blocks);
|
||||
_sender.destroy();
|
||||
@ -900,28 +914,33 @@ public class NTCPConnection implements Closeable {
|
||||
* must have room for block output. May be released immediately on return.
|
||||
* @since 0.9.36
|
||||
*/
|
||||
private synchronized void sendNTCP2(byte[] tmp, List<Block> blocks) {
|
||||
if (_sender == null) {
|
||||
if (_log.shouldInfo())
|
||||
_log.info("sender gone", new Exception());
|
||||
return;
|
||||
}
|
||||
private void sendNTCP2(byte[] tmp, List<Block> blocks) {
|
||||
int payloadlen = NTCP2Payload.writePayload(tmp, 0, blocks);
|
||||
int framelen = payloadlen + OutboundNTCP2State.MAC_SIZE;
|
||||
// TODO use a buffer
|
||||
byte[] enc = new byte[2 + framelen];
|
||||
try {
|
||||
_sender.encryptWithAd(null, tmp, 0, enc, 2, payloadlen);
|
||||
} catch (GeneralSecurityException gse) {
|
||||
// TODO anything else?
|
||||
_log.error("data enc", gse);
|
||||
return;
|
||||
|
||||
synchronized(_writeLock) {
|
||||
if (_sender == null) {
|
||||
if (_log.shouldInfo())
|
||||
_log.info("sender gone", new Exception());
|
||||
return;
|
||||
}
|
||||
try {
|
||||
_sender.encryptWithAd(null, tmp, 0, enc, 2, payloadlen);
|
||||
} catch (GeneralSecurityException gse) {
|
||||
// TODO anything else?
|
||||
_log.error("data enc", gse);
|
||||
return;
|
||||
}
|
||||
// siphash ^ len
|
||||
long sipIV = SipHashInline.hash24(_sendSipk1, _sendSipk2, _sendSipIV);
|
||||
toLong8LE(_sendSipIV, 0, sipIV);
|
||||
enc[0] = (byte) ((framelen >> 8) ^ (sipIV >> 8));
|
||||
enc[1] = (byte) (framelen ^ sipIV);
|
||||
wantsWrite(enc);
|
||||
}
|
||||
|
||||
// siphash ^ len
|
||||
long sipIV = SipHashInline.hash24(_sendSipk1, _sendSipk2, _sendSipIV);
|
||||
enc[0] = (byte) ((framelen >> 8) ^ (sipIV >> 8));
|
||||
enc[1] = (byte) (framelen ^ sipIV);
|
||||
if (_log.shouldDebug()) {
|
||||
StringBuilder buf = new StringBuilder(256);
|
||||
buf.append("Sending ").append(blocks.size())
|
||||
@ -932,8 +951,6 @@ public class NTCPConnection implements Closeable {
|
||||
}
|
||||
_log.debug(buf.toString());
|
||||
}
|
||||
wantsWrite(enc);
|
||||
toLong8LE(_sendSipIV, 0, sipIV);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -946,7 +963,12 @@ public class NTCPConnection implements Closeable {
|
||||
try {_chan.close(); } catch (IOException ignored) {}
|
||||
return;
|
||||
}
|
||||
_conKey.interestOps(_conKey.interestOps() | SelectionKey.OP_READ);
|
||||
try {
|
||||
EventPumper.setInterest(_conKey, SelectionKey.OP_READ);
|
||||
} catch (CancelledKeyException cke) {
|
||||
try {_chan.close(); } catch (IOException ignored) {}
|
||||
return;
|
||||
}
|
||||
// schedule up the beginning of our handshaking by calling prepareNextWrite on the
|
||||
// writer thread pool
|
||||
_transport.getWriter().wantsWrite(this, "outbound connected");
|
||||
@ -1066,7 +1088,7 @@ public class NTCPConnection implements Closeable {
|
||||
_log.warn("recv() on closed con");
|
||||
return;
|
||||
}
|
||||
synchronized(this) {
|
||||
synchronized(_statLock) {
|
||||
_bytesReceived += buf.remaining();
|
||||
updateStats();
|
||||
}
|
||||
@ -1074,13 +1096,34 @@ public class NTCPConnection implements Closeable {
|
||||
_transport.getReader().wantsRead(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* Write lock for pumper delayed writes
|
||||
*
|
||||
* @since 0.9.53
|
||||
*/
|
||||
Object getWriteLock() {
|
||||
return _writeLock;
|
||||
}
|
||||
|
||||
/**
|
||||
* The contents of the buffer have been encrypted / padded / etc and have
|
||||
* been fully allocated for the bandwidth limiter.
|
||||
*/
|
||||
private void write(ByteBuffer buf) {
|
||||
_writeBufs.offer(buf);
|
||||
_transport.getPumper().wantsWrite(this);
|
||||
EventPumper pumper = _transport.getPumper();
|
||||
if (_isInbound || isEstablished()) {
|
||||
// Attempt to write directly
|
||||
if (!pumper.processWrite(this, getKey())) {
|
||||
if (_log.shouldDebug())
|
||||
_log.debug("Async write not completed, pending bufs: " + _writeBufs.size() + " on " + this);
|
||||
// queue it up
|
||||
pumper.wantsWrite(this);
|
||||
}
|
||||
} else {
|
||||
// outbound not connected yet
|
||||
pumper.wantsWrite(this);
|
||||
}
|
||||
}
|
||||
|
||||
/** @return null if none available */
|
||||
@ -1089,25 +1132,34 @@ public class NTCPConnection implements Closeable {
|
||||
}
|
||||
|
||||
/**
|
||||
* Replaces getWriteBufCount()
|
||||
* Replaces getWriteBufCount().
|
||||
* Caller should sync on getWriteLock()
|
||||
*
|
||||
* @since 0.8.12
|
||||
*/
|
||||
boolean isWriteBufEmpty() {
|
||||
return _writeBufs.isEmpty();
|
||||
}
|
||||
|
||||
/** @return null if none available */
|
||||
/**
|
||||
* Returns but does not remove the buffer.
|
||||
* Call removeWriteBuf() after write complete.
|
||||
* Caller should sync on getWriteLock()
|
||||
*
|
||||
* @return null if none available
|
||||
*/
|
||||
ByteBuffer getNextWriteBuf() {
|
||||
return _writeBufs.peek(); // not remove! we removeWriteBuf afterwards
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove the buffer, which _should_ be the one at the head of _writeBufs
|
||||
* Remove the buffer, which _should_ be the one at the head of _writeBufs.
|
||||
* Caller must sync on _writeLock
|
||||
*/
|
||||
void removeWriteBuf(ByteBuffer buf) {
|
||||
// never clear OutNetMessages during establish phase
|
||||
boolean clearMessage = isEstablished();
|
||||
synchronized(this) {
|
||||
synchronized(_statLock) {
|
||||
_bytesSent += buf.capacity();
|
||||
if (_sendingMeta && (buf.capacity() == META_SIZE)) {
|
||||
_sendingMeta = false;
|
||||
@ -1118,12 +1170,9 @@ public class NTCPConnection implements Closeable {
|
||||
_writeBufs.remove(buf);
|
||||
if (clearMessage) {
|
||||
List<OutNetMessage> msgs = null;
|
||||
// see synchronization comments in prepareNextWriteFast()
|
||||
synchronized (_currentOutbound) {
|
||||
if (!_currentOutbound.isEmpty()) {
|
||||
msgs = new ArrayList<OutNetMessage>(_currentOutbound);
|
||||
_currentOutbound.clear();
|
||||
}
|
||||
if (!_currentOutbound.isEmpty()) {
|
||||
msgs = new ArrayList<OutNetMessage>(_currentOutbound);
|
||||
_currentOutbound.clear();
|
||||
}
|
||||
// push through the bw limiter to reach _writeBufs
|
||||
if (!_outbound.isEmpty())
|
||||
@ -1154,6 +1203,7 @@ public class NTCPConnection implements Closeable {
|
||||
}
|
||||
}
|
||||
|
||||
// following fields covered by _statLock
|
||||
private long _bytesReceived;
|
||||
private long _bytesSent;
|
||||
/** _bytesReceived when we last updated the rate */
|
||||
@ -1163,29 +1213,31 @@ public class NTCPConnection implements Closeable {
|
||||
private float _sendBps;
|
||||
private float _recvBps;
|
||||
|
||||
public synchronized float getSendRate() { return _sendBps; }
|
||||
public synchronized float getRecvRate() { return _recvBps; }
|
||||
public float getSendRate() { synchronized(_statLock) { return _sendBps; } }
|
||||
public float getRecvRate() { synchronized(_statLock) { return _recvBps; } }
|
||||
|
||||
/**
|
||||
* Stats only for console
|
||||
*/
|
||||
private synchronized void updateStats() {
|
||||
long now = _context.clock().now();
|
||||
long time = now - _lastRateUpdated;
|
||||
// If enough time has passed...
|
||||
// Perhaps should synchronize, but if so do the time check before synching...
|
||||
// only for console so don't bother....
|
||||
if (time >= STAT_UPDATE_TIME_MS) {
|
||||
long totS = _bytesSent;
|
||||
long totR = _bytesReceived;
|
||||
long sent = totS - _lastBytesSent; // How much we sent meanwhile
|
||||
long recv = totR - _lastBytesReceived; // How much we received meanwhile
|
||||
_lastBytesSent = totS;
|
||||
_lastBytesReceived = totR;
|
||||
_lastRateUpdated = now;
|
||||
private void updateStats() {
|
||||
synchronized(_statLock) {
|
||||
long now = _context.clock().now();
|
||||
long time = now - _lastRateUpdated;
|
||||
// If enough time has passed...
|
||||
// Perhaps should synchronize, but if so do the time check before synching...
|
||||
// only for console so don't bother....
|
||||
if (time >= STAT_UPDATE_TIME_MS) {
|
||||
long totS = _bytesSent;
|
||||
long totR = _bytesReceived;
|
||||
long sent = totS - _lastBytesSent; // How much we sent meanwhile
|
||||
long recv = totR - _lastBytesReceived; // How much we received meanwhile
|
||||
_lastBytesSent = totS;
|
||||
_lastBytesReceived = totR;
|
||||
_lastRateUpdated = now;
|
||||
|
||||
_sendBps = (0.9f)*_sendBps + (0.1f)*(sent*1000f)/time;
|
||||
_recvBps = (0.9f)*_recvBps + (0.1f)*((float)recv*1000)/time;
|
||||
_sendBps = (0.9f)*_sendBps + (0.1f)*(sent*1000f)/time;
|
||||
_recvBps = (0.9f)*_recvBps + (0.1f)*((float)recv*1000)/time;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1204,10 +1256,12 @@ public class NTCPConnection implements Closeable {
|
||||
*
|
||||
* This is the entry point as called from Reader.processRead()
|
||||
*/
|
||||
synchronized void recvEncryptedI2NP(ByteBuffer buf) {
|
||||
if (_curReadState == null)
|
||||
throw new IllegalStateException("not established");
|
||||
_curReadState.receive(buf);
|
||||
void recvEncryptedI2NP(ByteBuffer buf) {
|
||||
synchronized(_readLock) {
|
||||
if (_curReadState == null)
|
||||
throw new IllegalStateException("not established");
|
||||
_curReadState.receive(buf);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1292,7 +1346,6 @@ public class NTCPConnection implements Closeable {
|
||||
synchronized void finishOutboundEstablishment(CipherState sender, CipherState receiver,
|
||||
byte[] sip_ab, byte[] sip_ba, long clockSkew) {
|
||||
finishEstablishment(sender, receiver, sip_ab, sip_ba, clockSkew);
|
||||
_paddingConfig = OUR_PADDING;
|
||||
_transport.markReachable(getRemotePeer().calculateHash(), false);
|
||||
if (!_outbound.isEmpty())
|
||||
_transport.getWriter().wantsWrite(this, "outbound established");
|
||||
@ -1324,8 +1377,6 @@ public class NTCPConnection implements Closeable {
|
||||
"\nhis padding options: " + hisPadding +
|
||||
"\nour padding options: " + OUR_PADDING +
|
||||
"\nmerged config is: " + _paddingConfig);
|
||||
} else {
|
||||
_paddingConfig = OUR_PADDING;
|
||||
}
|
||||
NTCPConnection toClose = _transport.inboundEstablished(this);
|
||||
if (toClose != null && toClose != this) {
|
||||
@ -1679,8 +1730,10 @@ public class NTCPConnection implements Closeable {
|
||||
if (_log.shouldWarn())
|
||||
_log.warn("delayed close, AEAD failure after " + validFramesRcvd +
|
||||
" good frames, to read: " + toRead + " on " + this, new Exception("I did it"));
|
||||
_curReadState = new NTCP2FailState(toRead, validFramesRcvd);
|
||||
_curReadState.receive(buf);
|
||||
synchronized(_readLock) {
|
||||
_curReadState = new NTCP2FailState(toRead, validFramesRcvd);
|
||||
_curReadState.receive(buf);
|
||||
}
|
||||
} else {
|
||||
if (_log.shouldWarn())
|
||||
_log.warn("immediate close, AEAD failure after " + validFramesRcvd +
|
||||
|
Reference in New Issue
Block a user