diff --git a/history.txt b/history.txt index c2e743f3f..fad5db94b 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,8 @@ +2014-01-25 zab + * Move OutNetMessage buffer preparation to the Writer threads + (Ticket #1184) + * Up version to -1 + * 2014-01-22 0.9.10 released 2014-01-20 kytv diff --git a/router/java/src/net/i2p/router/OutNetMessage.java b/router/java/src/net/i2p/router/OutNetMessage.java index 25149b1f3..f1aa5519e 100644 --- a/router/java/src/net/i2p/router/OutNetMessage.java +++ b/router/java/src/net/i2p/router/OutNetMessage.java @@ -57,7 +57,6 @@ public class OutNetMessage implements CDPQEntry { * (some JVMs have less than 10ms resolution, so the Long above doesn't guarantee order) */ private List _timestampOrder; - private Object _preparationBuf; /** * Priorities, higher is higher priority. @@ -292,16 +291,6 @@ public class OutNetMessage implements CDPQEntry { public void beginSend() { _sendBegin = _context.clock().now(); } - public void prepared(Object buf) { - _preparationBuf = buf; - } - - public Object releasePreparationBuffer() { - Object rv = _preparationBuf; - _preparationBuf = null; - return rv; - } - public long getCreated() { return _created; } /** time since the message was created */ diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index a411b0099..87e5bffca 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 0; + public final static long BUILD = 1; /** for example "-test" */ public final static String EXTRA = ""; diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java index 217ae341e..1aab41d4a 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java @@ -392,20 +392,12 @@ class NTCPConnection { List pending = new ArrayList(); //_outbound.drainAllTo(pending); _outbound.drainTo(pending); - for (OutNetMessage msg : pending) { - Object buf = msg.releasePreparationBuffer(); - if (buf != null) - releaseBuf((PrepBuffer)buf); + for (OutNetMessage msg : pending) _transport.afterSend(msg, false, allowRequeue, msg.getLifetime()); - } OutNetMessage msg = getCurrentOutbound(); - if (msg != null) { - Object buf = msg.releasePreparationBuffer(); - if (buf != null) - releaseBuf((PrepBuffer)buf); + if (msg != null) _transport.afterSend(msg, false, allowRequeue, msg.getLifetime()); - } return old; } @@ -438,7 +430,6 @@ class NTCPConnection { _consecutiveBacklog = 0; ****/ //if (FAST_LARGE) - bufferedPrepare(msg); _outbound.offer(msg); //int enqueued = _outbound.size(); // although stat description says ahead of this one, not including this one... @@ -604,11 +595,13 @@ class NTCPConnection { /** * prepare the next i2np message for transmission. this should be run from * the Writer thread pool. + * + * @param prep an instance of PrepBuffer to use as scratch space * */ - synchronized void prepareNextWrite() { + synchronized void prepareNextWrite(PrepBuffer prep) { //if (FAST_LARGE) - prepareNextWriteFast(); + prepareNextWriteFast(prep); //else // prepareNextWriteSmall(); } @@ -717,9 +710,10 @@ class NTCPConnection { * the Writer thread pool. * * Caller must synchronize. + * @param buf a PrepBuffer to use as scratch space * */ - private void prepareNextWriteFast() { + private void prepareNextWriteFast(PrepBuffer buf) { if (_closed.get()) return; //if (_log.shouldLog(Log.DEBUG)) @@ -780,14 +774,7 @@ class NTCPConnection { } //long begin = System.currentTimeMillis(); - PrepBuffer buf = (PrepBuffer)msg.releasePreparationBuffer(); - if (buf == null) { - // race, see ticket #392 - //throw new RuntimeException("buf is null for " + msg); - if (_log.shouldLog(Log.WARN)) - _log.warn("Null prep buf for " + msg); - return; - } + bufferedPrepare(msg,buf); _context.aes().encrypt(buf.unencrypted, 0, buf.encrypted, 0, _sessionKey, _prevWriteEnd, 0, buf.unencryptedLength); System.arraycopy(buf.encrypted, buf.encrypted.length-16, _prevWriteEnd, 0, _prevWriteEnd.length); //long encryptedTime = System.currentTimeMillis(); @@ -797,7 +784,6 @@ class NTCPConnection { // + Base64.encode(unencrypted, 0, 16) + "..." + "\nIV=" + Base64.encode(_prevWriteEnd, 0, 16)); _transport.getPumper().wantsWrite(this, buf.encrypted); //long wantsTime = System.currentTimeMillis(); - releaseBuf(buf); //long releaseTime = System.currentTimeMillis(); //if (_log.shouldLog(Log.DEBUG)) // _log.debug("prepared outbound " + System.identityHashCode(msg) @@ -817,16 +803,15 @@ class NTCPConnection { /** * Serialize the message/checksum/padding/etc for transmission, but leave off - * the encryption for the actual write process (when we will always have the - * end of the previous encrypted transmission to serve as our IV). with care, - * the encryption could be handled here too, as long as messages aren't expired - * in the queue and the establishment process takes that into account. + * the encryption. This should be called from a Writer thread + * + * @param msg message to send + * @param buf PrepBuffer to use as scratch space */ - private void bufferedPrepare(OutNetMessage msg) { + private void bufferedPrepare(OutNetMessage msg, PrepBuffer buf) { //if (!_isInbound && !_established) // return; //long begin = System.currentTimeMillis(); - PrepBuffer buf = acquireBuf(); //long alloc = System.currentTimeMillis(); I2NPMessage m = msg.getMessage(); @@ -863,39 +848,12 @@ class NTCPConnection { buf.encrypted = new byte[buf.unencryptedLength]; //long crced = System.currentTimeMillis(); - msg.prepared(buf); //if (_log.shouldLog(Log.DEBUG)) // _log.debug("Buffered prepare took " + (crced-begin) + ", alloc=" + (alloc-begin) // + " serialize=" + (serialized-alloc) + " crc=" + (crced-serialized)); } - - private static final int MIN_BUFS = 4; - private static final int MAX_BUFS = 16; - private static int NUM_PREP_BUFS; - static { - long maxMemory = SystemVersion.getMaxMemory(); - NUM_PREP_BUFS = (int) Math.max(MIN_BUFS, Math.min(MAX_BUFS, 1 + (maxMemory / (16*1024*1024)))); - } - private final static LinkedBlockingQueue _bufs = new LinkedBlockingQueue(NUM_PREP_BUFS); - - /** - * 32KB each - * @return initialized buffer - */ - private static PrepBuffer acquireBuf() { - PrepBuffer b = _bufs.poll(); - if (b == null) - b = new PrepBuffer(); - return b; - } - - private static void releaseBuf(PrepBuffer buf) { - buf.init(); - _bufs.offer(buf); - } - - private static class PrepBuffer { + public static class PrepBuffer { final byte unencrypted[]; int unencryptedLength; final byte base[]; @@ -903,13 +861,13 @@ class NTCPConnection { final Adler32 crc; byte encrypted[]; - PrepBuffer() { + public PrepBuffer() { unencrypted = new byte[BUFFER_SIZE]; base = new byte[BUFFER_SIZE]; crc = new Adler32(); } - private void init() { + public void init() { unencryptedLength = 0; baseLength = 0; encrypted = null; @@ -1367,7 +1325,6 @@ class NTCPConnection { */ static void releaseResources() { _i2npHandlers.clear(); - _bufs.clear(); } /** diff --git a/router/java/src/net/i2p/router/transport/ntcp/Writer.java b/router/java/src/net/i2p/router/transport/ntcp/Writer.java index 3a5eb1884..d9c09735d 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/Writer.java +++ b/router/java/src/net/i2p/router/transport/ntcp/Writer.java @@ -24,12 +24,16 @@ class Writer { private final Set _writeAfterLive; private final List _runners; + /** a scratch space to serialize and encrypt messages */ + private final NTCPConnection.PrepBuffer _prepBuffer; + public Writer(RouterContext ctx) { _log = ctx.logManager().getLog(getClass()); _pendingConnections = new LinkedHashSet(16); _runners = new ArrayList(5); _liveWrites = new HashSet(5); _writeAfterLive = new HashSet(5); + _prepBuffer = new NTCPConnection.PrepBuffer(); } public synchronized void startWriting(int numWriters) { @@ -119,7 +123,8 @@ class Writer { try { if (_log.shouldLog(Log.DEBUG)) _log.debug("Prepare next write on: " + con); - con.prepareNextWrite(); + _prepBuffer.init(); + con.prepareNextWrite(_prepBuffer); } catch (RuntimeException re) { _log.log(Log.CRIT, "Error in the ntcp writer", re); }