Move OutNetMessage buffer preparation to the Writer threads

(Ticket #1184)
 Up version to -1
This commit is contained in:
zab2
2014-01-25 00:46:30 +00:00
parent 9e87fd9b13
commit 3427464de6
5 changed files with 29 additions and 73 deletions

View File

@@ -1,3 +1,8 @@
2014-01-25 zab
* Move OutNetMessage buffer preparation to the Writer threads
(Ticket #1184)
* Up version to -1
* 2014-01-22 0.9.10 released * 2014-01-22 0.9.10 released
2014-01-20 kytv 2014-01-20 kytv

View File

@@ -57,7 +57,6 @@ public class OutNetMessage implements CDPQEntry {
* (some JVMs have less than 10ms resolution, so the Long above doesn't guarantee order) * (some JVMs have less than 10ms resolution, so the Long above doesn't guarantee order)
*/ */
private List<String> _timestampOrder; private List<String> _timestampOrder;
private Object _preparationBuf;
/** /**
* Priorities, higher is higher priority. * Priorities, higher is higher priority.
@@ -292,16 +291,6 @@ public class OutNetMessage implements CDPQEntry {
public void beginSend() { _sendBegin = _context.clock().now(); } public void beginSend() { _sendBegin = _context.clock().now(); }
public void prepared(Object buf) {
_preparationBuf = buf;
}
public Object releasePreparationBuffer() {
Object rv = _preparationBuf;
_preparationBuf = null;
return rv;
}
public long getCreated() { return _created; } public long getCreated() { return _created; }
/** time since the message was created */ /** time since the message was created */

View File

@@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */ /** deprecated */
public final static String ID = "Monotone"; public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION; public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 0; public final static long BUILD = 1;
/** for example "-test" */ /** for example "-test" */
public final static String EXTRA = ""; public final static String EXTRA = "";

View File

@@ -392,20 +392,12 @@ class NTCPConnection {
List<OutNetMessage> pending = new ArrayList<OutNetMessage>(); List<OutNetMessage> pending = new ArrayList<OutNetMessage>();
//_outbound.drainAllTo(pending); //_outbound.drainAllTo(pending);
_outbound.drainTo(pending); _outbound.drainTo(pending);
for (OutNetMessage msg : pending) { for (OutNetMessage msg : pending)
Object buf = msg.releasePreparationBuffer();
if (buf != null)
releaseBuf((PrepBuffer)buf);
_transport.afterSend(msg, false, allowRequeue, msg.getLifetime()); _transport.afterSend(msg, false, allowRequeue, msg.getLifetime());
}
OutNetMessage msg = getCurrentOutbound(); OutNetMessage msg = getCurrentOutbound();
if (msg != null) { if (msg != null)
Object buf = msg.releasePreparationBuffer();
if (buf != null)
releaseBuf((PrepBuffer)buf);
_transport.afterSend(msg, false, allowRequeue, msg.getLifetime()); _transport.afterSend(msg, false, allowRequeue, msg.getLifetime());
}
return old; return old;
} }
@@ -438,7 +430,6 @@ class NTCPConnection {
_consecutiveBacklog = 0; _consecutiveBacklog = 0;
****/ ****/
//if (FAST_LARGE) //if (FAST_LARGE)
bufferedPrepare(msg);
_outbound.offer(msg); _outbound.offer(msg);
//int enqueued = _outbound.size(); //int enqueued = _outbound.size();
// although stat description says ahead of this one, not including this one... // although stat description says ahead of this one, not including this one...
@@ -604,11 +595,13 @@ class NTCPConnection {
/** /**
* prepare the next i2np message for transmission. this should be run from * prepare the next i2np message for transmission. this should be run from
* the Writer thread pool. * the Writer thread pool.
*
* @param prep an instance of PrepBuffer to use as scratch space
* *
*/ */
synchronized void prepareNextWrite() { synchronized void prepareNextWrite(PrepBuffer prep) {
//if (FAST_LARGE) //if (FAST_LARGE)
prepareNextWriteFast(); prepareNextWriteFast(prep);
//else //else
// prepareNextWriteSmall(); // prepareNextWriteSmall();
} }
@@ -717,9 +710,10 @@ class NTCPConnection {
* the Writer thread pool. * the Writer thread pool.
* *
* Caller must synchronize. * Caller must synchronize.
* @param buf a PrepBuffer to use as scratch space
* *
*/ */
private void prepareNextWriteFast() { private void prepareNextWriteFast(PrepBuffer buf) {
if (_closed.get()) if (_closed.get())
return; return;
//if (_log.shouldLog(Log.DEBUG)) //if (_log.shouldLog(Log.DEBUG))
@@ -780,14 +774,7 @@ class NTCPConnection {
} }
//long begin = System.currentTimeMillis(); //long begin = System.currentTimeMillis();
PrepBuffer buf = (PrepBuffer)msg.releasePreparationBuffer(); bufferedPrepare(msg,buf);
if (buf == null) {
// race, see ticket #392
//throw new RuntimeException("buf is null for " + msg);
if (_log.shouldLog(Log.WARN))
_log.warn("Null prep buf for " + msg);
return;
}
_context.aes().encrypt(buf.unencrypted, 0, buf.encrypted, 0, _sessionKey, _prevWriteEnd, 0, buf.unencryptedLength); _context.aes().encrypt(buf.unencrypted, 0, buf.encrypted, 0, _sessionKey, _prevWriteEnd, 0, buf.unencryptedLength);
System.arraycopy(buf.encrypted, buf.encrypted.length-16, _prevWriteEnd, 0, _prevWriteEnd.length); System.arraycopy(buf.encrypted, buf.encrypted.length-16, _prevWriteEnd, 0, _prevWriteEnd.length);
//long encryptedTime = System.currentTimeMillis(); //long encryptedTime = System.currentTimeMillis();
@@ -797,7 +784,6 @@ class NTCPConnection {
// + Base64.encode(unencrypted, 0, 16) + "..." + "\nIV=" + Base64.encode(_prevWriteEnd, 0, 16)); // + Base64.encode(unencrypted, 0, 16) + "..." + "\nIV=" + Base64.encode(_prevWriteEnd, 0, 16));
_transport.getPumper().wantsWrite(this, buf.encrypted); _transport.getPumper().wantsWrite(this, buf.encrypted);
//long wantsTime = System.currentTimeMillis(); //long wantsTime = System.currentTimeMillis();
releaseBuf(buf);
//long releaseTime = System.currentTimeMillis(); //long releaseTime = System.currentTimeMillis();
//if (_log.shouldLog(Log.DEBUG)) //if (_log.shouldLog(Log.DEBUG))
// _log.debug("prepared outbound " + System.identityHashCode(msg) // _log.debug("prepared outbound " + System.identityHashCode(msg)
@@ -817,16 +803,15 @@ class NTCPConnection {
/** /**
* Serialize the message/checksum/padding/etc for transmission, but leave off * Serialize the message/checksum/padding/etc for transmission, but leave off
* the encryption for the actual write process (when we will always have the * the encryption. This should be called from a Writer thread
* end of the previous encrypted transmission to serve as our IV). with care, *
* the encryption could be handled here too, as long as messages aren't expired * @param msg message to send
* in the queue and the establishment process takes that into account. * @param buf PrepBuffer to use as scratch space
*/ */
private void bufferedPrepare(OutNetMessage msg) { private void bufferedPrepare(OutNetMessage msg, PrepBuffer buf) {
//if (!_isInbound && !_established) //if (!_isInbound && !_established)
// return; // return;
//long begin = System.currentTimeMillis(); //long begin = System.currentTimeMillis();
PrepBuffer buf = acquireBuf();
//long alloc = System.currentTimeMillis(); //long alloc = System.currentTimeMillis();
I2NPMessage m = msg.getMessage(); I2NPMessage m = msg.getMessage();
@@ -863,39 +848,12 @@ class NTCPConnection {
buf.encrypted = new byte[buf.unencryptedLength]; buf.encrypted = new byte[buf.unencryptedLength];
//long crced = System.currentTimeMillis(); //long crced = System.currentTimeMillis();
msg.prepared(buf);
//if (_log.shouldLog(Log.DEBUG)) //if (_log.shouldLog(Log.DEBUG))
// _log.debug("Buffered prepare took " + (crced-begin) + ", alloc=" + (alloc-begin) // _log.debug("Buffered prepare took " + (crced-begin) + ", alloc=" + (alloc-begin)
// + " serialize=" + (serialized-alloc) + " crc=" + (crced-serialized)); // + " serialize=" + (serialized-alloc) + " crc=" + (crced-serialized));
} }
private static final int MIN_BUFS = 4;
private static final int MAX_BUFS = 16;
private static int NUM_PREP_BUFS;
static {
long maxMemory = SystemVersion.getMaxMemory();
NUM_PREP_BUFS = (int) Math.max(MIN_BUFS, Math.min(MAX_BUFS, 1 + (maxMemory / (16*1024*1024))));
}
private final static LinkedBlockingQueue<PrepBuffer> _bufs = new LinkedBlockingQueue<PrepBuffer>(NUM_PREP_BUFS); public static class PrepBuffer {
/**
* 32KB each
* @return initialized buffer
*/
private static PrepBuffer acquireBuf() {
PrepBuffer b = _bufs.poll();
if (b == null)
b = new PrepBuffer();
return b;
}
private static void releaseBuf(PrepBuffer buf) {
buf.init();
_bufs.offer(buf);
}
private static class PrepBuffer {
final byte unencrypted[]; final byte unencrypted[];
int unencryptedLength; int unencryptedLength;
final byte base[]; final byte base[];
@@ -903,13 +861,13 @@ class NTCPConnection {
final Adler32 crc; final Adler32 crc;
byte encrypted[]; byte encrypted[];
PrepBuffer() { public PrepBuffer() {
unencrypted = new byte[BUFFER_SIZE]; unencrypted = new byte[BUFFER_SIZE];
base = new byte[BUFFER_SIZE]; base = new byte[BUFFER_SIZE];
crc = new Adler32(); crc = new Adler32();
} }
private void init() { public void init() {
unencryptedLength = 0; unencryptedLength = 0;
baseLength = 0; baseLength = 0;
encrypted = null; encrypted = null;
@@ -1367,7 +1325,6 @@ class NTCPConnection {
*/ */
static void releaseResources() { static void releaseResources() {
_i2npHandlers.clear(); _i2npHandlers.clear();
_bufs.clear();
} }
/** /**

View File

@@ -24,12 +24,16 @@ class Writer {
private final Set<NTCPConnection> _writeAfterLive; private final Set<NTCPConnection> _writeAfterLive;
private final List<Runner> _runners; private final List<Runner> _runners;
/** a scratch space to serialize and encrypt messages */
private final NTCPConnection.PrepBuffer _prepBuffer;
public Writer(RouterContext ctx) { public Writer(RouterContext ctx) {
_log = ctx.logManager().getLog(getClass()); _log = ctx.logManager().getLog(getClass());
_pendingConnections = new LinkedHashSet<NTCPConnection>(16); _pendingConnections = new LinkedHashSet<NTCPConnection>(16);
_runners = new ArrayList<Runner>(5); _runners = new ArrayList<Runner>(5);
_liveWrites = new HashSet<NTCPConnection>(5); _liveWrites = new HashSet<NTCPConnection>(5);
_writeAfterLive = new HashSet<NTCPConnection>(5); _writeAfterLive = new HashSet<NTCPConnection>(5);
_prepBuffer = new NTCPConnection.PrepBuffer();
} }
public synchronized void startWriting(int numWriters) { public synchronized void startWriting(int numWriters) {
@@ -119,7 +123,8 @@ class Writer {
try { try {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Prepare next write on: " + con); _log.debug("Prepare next write on: " + con);
con.prepareNextWrite(); _prepBuffer.init();
con.prepareNextWrite(_prepBuffer);
} catch (RuntimeException re) { } catch (RuntimeException re) {
_log.log(Log.CRIT, "Error in the ntcp writer", re); _log.log(Log.CRIT, "Error in the ntcp writer", re);
} }