From a7763a08dc845546cb24e5ef78b2462579306541 Mon Sep 17 00:00:00 2001 From: zzz Date: Sun, 14 Sep 2014 13:04:48 +0000 Subject: [PATCH] SSU OutboundMessageState - Fix SSU Output Queue errors due to races with PacketBuilder: - Remove all buffer caching as it can't be made thread-safe. Just allocate buffer in constructor and let GC handle it - Do fragmenting in constructor and make all fragment fields final - Don't track per-fragment retransmissions as it wasn't used - Move ack tracking from an array to a long - Sync all ack methods - Entire class now thread-safe (thx dg) --- .../transport/udp/OutboundMessageState.java | 362 ++++-------------- .../i2p/router/transport/udp/PeerState.java | 16 +- 2 files changed, 79 insertions(+), 299 deletions(-) diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java index 0a10bc4a8..61d415ccb 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java @@ -4,16 +4,16 @@ import java.util.Date; import net.i2p.I2PAppContext; import net.i2p.data.Base64; -import net.i2p.data.ByteArray; import net.i2p.data.i2np.I2NPMessage; import net.i2p.router.OutNetMessage; import net.i2p.router.util.CDPQEntry; -import net.i2p.util.ByteCache; import net.i2p.util.Log; /** * Maintain the outbound fragmentation for resending, for a single message. * + * All methods are thread-safe. + * */ class OutboundMessageState implements CDPQEntry { private final I2PAppContext _context; @@ -21,53 +21,32 @@ class OutboundMessageState implements CDPQEntry { /** may be null if we are part of the establishment */ private final OutNetMessage _message; private final I2NPMessage _i2npMessage; - private final long _messageId; /** will be null, unless we are part of the establishment */ private final PeerState _peer; private final long _expiration; - private ByteArray _messageBuf; + private final byte[] _messageBuf; /** fixed fragment size across the message */ - private int _fragmentSize; - /** size of the I2NP message */ - private int _totalSize; - /** sends[i] is how many times the fragment has been sent, or -1 if ACKed - * TODO this may not accurately track the number of retransmissions per-fragment, - * and we don't make any use of it anyway, so we should just make it a bitfield. - */ - private short _fragmentSends[]; + private final int _fragmentSize; + /** bitmask, 0 if acked, all 0 = complete */ + private long _fragmentAcks; + private final int _numFragments; private final long _startedOn; private long _nextSendTime; private int _pushCount; - private short _maxSends; - // private int _nextSendFragment; - /** for tracking use-after-free bugs */ - private boolean _released; - private Exception _releasedBy; + private int _maxSends; // we can't use the ones in _message since it is null for injections private long _enqueueTime; private long _seqNum; public static final int MAX_MSG_SIZE = 32 * 1024; - private static final int CACHE4_BYTES = MAX_MSG_SIZE; - private static final int CACHE3_BYTES = CACHE4_BYTES / 4; - private static final int CACHE2_BYTES = CACHE3_BYTES / 4; - private static final int CACHE1_BYTES = CACHE2_BYTES / 4; - - private static final int CACHE1_MAX = 256; - private static final int CACHE2_MAX = CACHE1_MAX / 4; - private static final int CACHE3_MAX = CACHE2_MAX / 4; - private static final int CACHE4_MAX = CACHE3_MAX / 4; - - private static final ByteCache _cache1 = ByteCache.getInstance(CACHE1_MAX, CACHE1_BYTES); - private static final ByteCache _cache2 = ByteCache.getInstance(CACHE2_MAX, CACHE2_BYTES); - private static final ByteCache _cache3 = ByteCache.getInstance(CACHE3_MAX, CACHE3_BYTES); - private static final ByteCache _cache4 = ByteCache.getInstance(CACHE4_MAX, CACHE4_BYTES); private static final long EXPIRATION = 10*1000; /** - * Called from UDPTransport + * "injected" message from the establisher. + * + * Called from UDPTransport. * @throws IAE if too big or if msg or peer is null */ public OutboundMessageState(I2PAppContext context, I2NPMessage msg, PeerState peer) { @@ -75,7 +54,9 @@ class OutboundMessageState implements CDPQEntry { } /** - * Called from OutboundMessageFragments + * Normal constructor. + * + * Called from OutboundMessageFragments. * @throws IAE if too big or if msg or peer is null */ public OutboundMessageState(I2PAppContext context, OutNetMessage m, PeerState peer) { @@ -95,161 +76,86 @@ class OutboundMessageState implements CDPQEntry { _message = m; _i2npMessage = msg; _peer = peer; - _messageId = msg.getUniqueId(); _startedOn = _context.clock().now(); _nextSendTime = _startedOn; _expiration = _startedOn + EXPIRATION; //_expiration = msg.getExpiration(); - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("Raw byte array for " + _messageId + ": " + Base64.encode(_messageBuf.getData(), 0, len)); + // now "fragment" it + int totalSize = _i2npMessage.getRawMessageSize(); + if (totalSize > MAX_MSG_SIZE) + throw new IllegalArgumentException("Size too large! " + totalSize); + _messageBuf = new byte[totalSize]; + _i2npMessage.toRawByteArray(_messageBuf); + _fragmentSize = _peer.fragmentSize(); + int numFragments = totalSize / _fragmentSize; + if (numFragments * _fragmentSize < totalSize) + numFragments++; + // This should never happen, as 534 bytes * 64 fragments > 32KB, and we won't bid on > 32KB + if (numFragments > InboundMessageState.MAX_FRAGMENTS) + throw new IllegalArgumentException("Fragmenting a " + totalSize + " message into " + numFragments + " fragments - too many!"); + _numFragments = numFragments; + // all 1's where we care + _fragmentAcks = _numFragments < 64 ? mask(_numFragments) - 1L : -1L; } /** - * lazily inits the message buffer unless already inited + * @param fragment 0-63 */ - private synchronized void initBuf() { - if (_messageBuf != null) - return; - final int size = _i2npMessage.getRawMessageSize(); - acquireBuf(size); - _totalSize = _i2npMessage.toRawByteArray(_messageBuf.getData()); - _messageBuf.setValid(_totalSize); - } - - /** - * @throws IAE if too big - * @since 0.9.3 - */ - private void acquireBuf(int size) { - if (_messageBuf != null) - releaseBuf(); - if (size <= CACHE1_BYTES) - _messageBuf = _cache1.acquire(); - else if (size <= CACHE2_BYTES) - _messageBuf = _cache2.acquire(); - else if (size <= CACHE3_BYTES) - _messageBuf = _cache3.acquire(); - else if (size <= CACHE4_BYTES) - _messageBuf = _cache4.acquire(); - else - throw new IllegalArgumentException("Size too large! " + size); - } - - /** - * @since 0.9.3 - */ - private void releaseBuf() { - if (_messageBuf == null) - return; - int size = _messageBuf.getData().length; - if (size == CACHE1_BYTES) - _cache1.release(_messageBuf); - else if (size == CACHE2_BYTES) - _cache2.release(_messageBuf); - else if (size == CACHE3_BYTES) - _cache3.release(_messageBuf); - else if (size == CACHE4_BYTES) - _cache4.release(_messageBuf); - _messageBuf = null; - _released = true; + private static long mask(int fragment) { + return 1L << fragment; } - /** - * This is synchronized with writeFragment(), - * so we do not release (probably due to an ack) while we are retransmitting. - * Also prevent double-free - */ - public synchronized void releaseResources() { - if (_messageBuf != null && !_released) { - releaseBuf(); - if (_log.shouldLog(Log.WARN)) - _releasedBy = new Exception ("Released on " + new Date() + " by:"); - } - //_messageBuf = null; - } - public OutNetMessage getMessage() { return _message; } - public long getMessageId() { return _messageId; } + + public long getMessageId() { return _i2npMessage.getUniqueId(); } + public PeerState getPeer() { return _peer; } public boolean isExpired() { return _expiration < _context.clock().now(); } - public boolean isComplete() { - short sends[] = _fragmentSends; - if (sends == null) return false; - for (int i = 0; i < sends.length; i++) - if (sends[i] >= 0) - return false; - // nothing else pending ack - return true; + public synchronized boolean isComplete() { + return _fragmentAcks == 0; } public synchronized int getUnackedSize() { - short fragmentSends[] = _fragmentSends; - ByteArray messageBuf = _messageBuf; int rv = 0; - if ( (messageBuf != null) && (fragmentSends != null) ) { - int lastSize = _totalSize % _fragmentSize; - if (lastSize == 0) - lastSize = _fragmentSize; - for (int i = 0; i < fragmentSends.length; i++) { - if (fragmentSends[i] >= (short)0) { - if (i + 1 == fragmentSends.length) - rv += lastSize; - else - rv += _fragmentSize; - } + if (isComplete()) + return rv; + int lastSize = _messageBuf.length % _fragmentSize; + if (lastSize == 0) + lastSize = _fragmentSize; + for (int i = 0; i < _numFragments; i++) { + if (needsSending(i)) { + if (i + 1 == _numFragments) + rv += lastSize; + else + rv += _fragmentSize; } } return rv; } - public boolean needsSending(int fragment) { - short sends[] = _fragmentSends; - if ( (sends == null) || (fragment >= sends.length) || (fragment < 0) ) - return false; - return (sends[fragment] >= (short)0); + public synchronized boolean needsSending(int fragment) { + return (_fragmentAcks & mask(fragment)) != 0; } public long getLifetime() { return _context.clock().now() - _startedOn; } /** - * Ack all the fragments in the ack list. As a side effect, if there are - * still unacked fragments, the 'next send' time will be updated under the - * assumption that that all of the packets within a volley would reach the - * peer within that ack frequency (2-400ms). + * Ack all the fragments in the ack list. * * @return true if the message was completely ACKed */ - public boolean acked(ACKBitfield bitfield) { + public synchronized boolean acked(ACKBitfield bitfield) { // stupid brute force, but the cardinality should be trivial - short sends[] = _fragmentSends; - if (sends != null) { - for (int i = 0; i < bitfield.fragmentCount() && i < sends.length; i++) { - if (bitfield.received(i)) - sends[i] = (short)-1; - } + for (int i = 0; i < bitfield.fragmentCount() && i < _numFragments; i++) { + if (bitfield.received(i)) + _fragmentAcks &= ~mask(i); } - - boolean rv = isComplete(); - /**** - if (!rv && false) { // don't do the fast retransmit... lets give it time to get ACKed - long nextTime = _context.clock().now() + Math.max(_peer.getRTT(), ACKSender.ACK_FREQUENCY); - //_nextSendTime = Math.max(now, _startedOn+PeerState.MIN_RTO); - if (_nextSendTime <= 0) - _nextSendTime = nextTime; - else - _nextSendTime = Math.min(_nextSendTime, nextTime); - - //if (now + 100 > _nextSendTime) - // _nextSendTime = now + 100; - //_nextSendTime = now; - } - ****/ - return rv; + return isComplete(); } public long getNextSendTime() { return _nextSendTime; } @@ -259,111 +165,45 @@ class OutboundMessageState implements CDPQEntry { * The max number of sends for any fragment, which is the * same as the push count, at least as it's coded now. */ - public int getMaxSends() { return _maxSends; } + public synchronized int getMaxSends() { return _maxSends; } /** * The number of times we've pushed some fragments, which is the * same as the max sends, at least as it's coded now. */ - public int getPushCount() { return _pushCount; } + public synchronized int getPushCount() { return _pushCount; } /** * Note that we have pushed the message fragments. * Increments push count (and max sends... why?) */ - public void push() { + public synchronized void push() { // these will never be different... _pushCount++; - if (_pushCount > _maxSends) - _maxSends = (short)_pushCount; - if (_fragmentSends != null) - for (int i = 0; i < _fragmentSends.length; i++) - if (_fragmentSends[i] >= (short)0) - _fragmentSends[i]++; - - } - - /** - * Whether fragment() has been called. - * NOT whether it has more than one fragment. - * - * Caller should synchronize - * - * @return true iff fragment() has been called previously - */ - public boolean isFragmented() { return _fragmentSends != null; } - - /** - * Prepare the message for fragmented delivery, using no more than - * fragmentSize bytes per fragment. - * - * Caller should synchronize - * - * @throws IllegalStateException if called more than once - */ - public void fragment(int fragmentSize) { - if (_fragmentSends != null) - throw new IllegalStateException(); - initBuf(); - int numFragments = _totalSize / fragmentSize; - if (numFragments * fragmentSize < _totalSize) - numFragments++; - // This should never happen, as 534 bytes * 64 fragments > 32KB, and we won't bid on > 32KB - if (numFragments > InboundMessageState.MAX_FRAGMENTS) - throw new IllegalArgumentException("Fragmenting a " + _totalSize + " message into " + numFragments + " fragments - too many!"); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Fragmenting a " + _totalSize + " message into " + numFragments + " fragments"); - - //_fragmentEnd = new int[numFragments]; - _fragmentSends = new short[numFragments]; - //Arrays.fill(_fragmentEnd, -1); - //Arrays.fill(_fragmentSends, (short)0); - - _fragmentSize = fragmentSize; + _maxSends = _pushCount; } /** * How many fragments in the message. - * Only valid after fragment() has been called. - * Returns -1 before then. - * - * Caller should synchronize */ public int getFragmentCount() { - if (_fragmentSends == null) - return -1; - else - return _fragmentSends.length; + return _numFragments; } /** * The size of the I2NP message. Does not include any SSU overhead. - * - * Caller should synchronize */ - public int getMessageSize() { return _totalSize; } + public int getMessageSize() { return _messageBuf.length; } /** - * Should we continue sending this fragment? - * Only valid after fragment() has been called. - * Throws NPE before then. + * The size in bytes of the fragment * - * Caller should synchronize - * - * @return true if fragment is not acked yet - */ - public boolean shouldSend(int fragmentNum) { return _fragmentSends[fragmentNum] >= (short)0; } - - /** - * This assumes fragment(int size) has been called * @param fragmentNum the number of the fragment - * * @return the size of the fragment specified by the number */ public int fragmentSize(int fragmentNum) { - if (_messageBuf == null) return -1; - if (fragmentNum + 1 == _fragmentSends.length) { - int valid = _totalSize; + if (fragmentNum + 1 == _numFragments) { + int valid = _messageBuf.length; if (valid <= _fragmentSize) return valid; // bugfix 0.8.12 @@ -376,63 +216,19 @@ class OutboundMessageState implements CDPQEntry { /** * Write a part of the the message onto the specified buffer. - * See releaseResources() above for synchronization information. - * This assumes fragment(int size) has been called. * * @param out target to write * @param outOffset into outOffset to begin writing * @param fragmentNum fragment to write (0 indexed) * @return bytesWritten */ - public synchronized int writeFragment(byte out[], int outOffset, int fragmentNum) { - if (_messageBuf == null) return -1; - if (_released) { - /****** - Solved by synchronization with releaseResources() and simply returning -1. - Previous output: - - 23:50:57.013 ERROR [acket pusher] sport.udp.OutboundMessageState: SSU OMS Use after free - java.lang.Exception: Released on Wed Dec 23 23:50:57 GMT 2009 by: - at net.i2p.router.transport.udp.OutboundMessageState.releaseResources(OutboundMessageState.java:133) - at net.i2p.router.transport.udp.PeerState.acked(PeerState.java:1391) - at net.i2p.router.transport.udp.OutboundMessageFragments.acked(OutboundMessageFragments.java:404) - at net.i2p.router.transport.udp.InboundMessageFragments.receiveACKs(InboundMessageFragments.java:191) - at net.i2p.router.transport.udp.InboundMessageFragments.receiveData(InboundMessageFragments.java:77) - at net.i2p.router.transport.udp.PacketHandler$Handler.handlePacket(PacketHandler.java:485) - at net.i2p.router.transport.udp.PacketHandler$Handler.receivePacket(PacketHandler.java:282) - at net.i2p.router.transport.udp.PacketHandler$Handler.handlePacket(PacketHandler.java:231) - at net.i2p.router.transport.udp.PacketHandler$Handler.run(PacketHandler.java:136) - at java.lang.Thread.run(Thread.java:619) - at net.i2p.util.I2PThread.run(I2PThread.java:71) - 23:50:57.014 ERROR [acket pusher] ter.transport.udp.PacketPusher: SSU Output Queue Error - java.lang.RuntimeException: SSU OMS Use after free: Message 2381821417 with 4 fragments of size 0 volleys: 2 lifetime: 1258 pending fragments: 0 1 2 3 - at net.i2p.router.transport.udp.OutboundMessageState.writeFragment(OutboundMessageState.java:298) - at net.i2p.router.transport.udp.PacketBuilder.buildPacket(PacketBuilder.java:170) - at net.i2p.router.transport.udp.OutboundMessageFragments.preparePackets(OutboundMessageFragments.java:332) - at net.i2p.router.transport.udp.OutboundMessageFragments.getNextVolley(OutboundMessageFragments.java:297) - at net.i2p.router.transport.udp.PacketPusher.run(PacketPusher.java:38) - at java.lang.Thread.run(Thread.java:619) - at net.i2p.util.I2PThread.run(I2PThread.java:71) - *******/ - if (_log.shouldLog(Log.WARN)) - _log.log(Log.WARN, "SSU OMS Use after free: " + toString(), _releasedBy); - return -1; - //throw new RuntimeException("SSU OMS Use after free: " + toString()); - } + public int writeFragment(byte out[], int outOffset, int fragmentNum) { int start = _fragmentSize * fragmentNum; - int end = start + fragmentSize(fragmentNum); - int toSend = end - start; - byte buf[] = _messageBuf.getData(); - if ( (buf != null) && (start + toSend <= buf.length) && (outOffset + toSend <= out.length) ) { - System.arraycopy(buf, start, out, outOffset, toSend); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Raw fragment[" + fragmentNum + "] for " + _messageId - + "[" + start + "-" + (start+toSend) + "/" + _totalSize + "/" + _fragmentSize + "]: " - + Base64.encode(out, outOffset, toSend)); + int toSend = fragmentSize(fragmentNum); + int end = start + toSend; + if (end <= _messageBuf.length && outOffset + toSend <= out.length) { + System.arraycopy(_messageBuf, start, out, outOffset, toSend); return toSend; - } else if (buf == null) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Error: null buf"); } else { if (_log.shouldLog(Log.WARN)) _log.warn("Error: " + start + '/' + end + '/' + outOffset + '/' + out.length); @@ -462,7 +258,6 @@ class OutboundMessageState implements CDPQEntry { */ public void drop() { _peer.getTransport().failed(this, false); - releaseResources(); } /** @@ -492,19 +287,18 @@ class OutboundMessageState implements CDPQEntry { @Override public String toString() { - short sends[] = _fragmentSends; StringBuilder buf = new StringBuilder(256); - buf.append("OB Message ").append(_messageId); - if (sends != null) - buf.append(" with ").append(sends.length).append(" fragments"); - buf.append(" of size ").append(_totalSize); + buf.append("OB Message ").append(_i2npMessage.getUniqueId()); + buf.append(" with ").append(_numFragments).append(" fragments"); + buf.append(" of size ").append(_messageBuf.length); buf.append(" volleys: ").append(_maxSends); buf.append(" lifetime: ").append(getLifetime()); - if (sends != null) { + if (!isComplete()) { buf.append(" pending fragments: "); - for (int i = 0; i < sends.length; i++) - if (sends[i] >= 0) + for (int i = 0; i < _numFragments; i++) { + if (needsSending(i)) buf.append(i).append(' '); + } } return buf.toString(); } diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index 8a5d41d80..f456acc22 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -1541,7 +1541,6 @@ class PeerState { for (int i = 0; succeeded != null && i < succeeded.size(); i++) { OutboundMessageState state = succeeded.get(i); _transport.succeeded(state); - state.releaseResources(); OutNetMessage msg = state.getMessage(); if (msg != null) msg.timestamp("sending complete"); @@ -1559,7 +1558,6 @@ class PeerState { if (_log.shouldLog(Log.WARN)) _log.warn("Unable to send a direct message: " + state); } - state.releaseResources(); } return rv + _outboundQueue.size(); @@ -1708,7 +1706,7 @@ class PeerState { * how much payload data can we shove in there? * @return MTU - 87, i.e. 533 or 1397 (IPv4), MTU - 107 (IPv6) */ - private int fragmentSize() { + public int fragmentSize() { // 46 + 20 + 8 + 13 = 74 + 13 = 87 (IPv4) // 46 + 40 + 8 + 13 = 74 + 13 = 107 (IPv6) return _mtu - @@ -1727,16 +1725,6 @@ class PeerState { private ShouldSend locked_shouldSend(OutboundMessageState state) { long now = _context.clock().now(); if (state.getNextSendTime() <= now) { - if (!state.isFragmented()) { - state.fragment(fragmentSize()); - if (state.getMessage() != null) - state.getMessage().timestamp("fragment into " + state.getFragmentCount()); - - if (_log.shouldLog(Log.INFO)) - _log.info("Fragmenting " + state); - } - - OutboundMessageState retrans = _retransmitter; if ( (retrans != null) && ( (retrans.isExpired() || retrans.isComplete()) ) ) { _retransmitter = null; @@ -1858,7 +1846,6 @@ class PeerState { //if (getSendWindowBytesRemaining() > 0) // _throttle.unchoke(peer.getRemotePeer()); - state.releaseResources(); } else { // dupack, likely //if (_log.shouldLog(Log.DEBUG)) @@ -1935,7 +1922,6 @@ class PeerState { //if (state.getPeer().getSendWindowBytesRemaining() > 0) // _throttle.unchoke(state.getPeer().getRemotePeer()); - state.releaseResources(); } else { //if (state.getMessage() != null) // state.getMessage().timestamp("partial ack after " + numSends + ": " + bitfield.toString());