diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java index 451bb5f37..e25e4707c 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java @@ -29,8 +29,8 @@ class Connection { private final Log _log; private final ConnectionManager _connectionManager; private Destination _remotePeer; - private long _sendStreamId; - private long _receiveStreamId; + private final AtomicLong _sendStreamId = new AtomicLong(); + private final AtomicLong _receiveStreamId = new AtomicLong(); private volatile long _lastSendTime; private final AtomicLong _lastSendId; private final AtomicBoolean _resetReceived = new AtomicBoolean(); @@ -43,11 +43,11 @@ class Connection { private final SchedulerChooser _chooser; /** Locking: _nextSendLock */ private long _nextSendTime; - private long _ackedPackets; + private final AtomicLong _ackedPackets = new AtomicLong(); private final long _createdOn; private final AtomicLong _closeSentOn = new AtomicLong(); private final AtomicLong _closeReceivedOn = new AtomicLong(); - private int _unackedPacketsReceived; + private final AtomicInteger _unackedPacketsReceived = new AtomicInteger(); private long _congestionWindowEnd; private volatile long _highestAckedThrough; private final boolean _isInbound; @@ -77,16 +77,16 @@ class Connection { private final AtomicInteger _activeResends = new AtomicInteger(); private final ConEvent _connectionEvent; private final int _randomWait; - private int _localPort; - private int _remotePort; + private final int _localPort; + private final int _remotePort; private final SimpleTimer2 _timer; - private long _lifetimeBytesSent; + private final AtomicLong _lifetimeBytesSent = new AtomicLong(); /** TBD for tcpdump-compatible ack output */ private long _lowestBytesAckedThrough; - private long _lifetimeBytesReceived; - private long _lifetimeDupMessageSent; - private long _lifetimeDupMessageReceived; + private final AtomicLong _lifetimeBytesReceived = new AtomicLong(); + private final AtomicLong _lifetimeDupMessageSent = new AtomicLong(); + private final AtomicLong _lifetimeDupMessageReceived = new AtomicLong(); public static final long MAX_RESEND_DELAY = 45*1000; public static final long MIN_RESEND_DELAY = 750; @@ -132,6 +132,9 @@ class Connection { if (opts != null) { _localPort = opts.getLocalPort(); _remotePort = opts.getPort(); + } else { + _localPort = 0; + _remotePort = 0; } _options = (opts != null ? opts : new ConnectionOptions()); _outputStream.setWriteTimeout((int)_options.getWriteTimeout()); @@ -305,19 +308,19 @@ class Connection { if (_resetReceived.get()) return; // Unconditionally set _resetSentOn.set(now); - if ( (_remotePeer == null) || (_sendStreamId <= 0) ) return; + if ( (_remotePeer == null) || (_sendStreamId.get() <= 0) ) return; PacketLocal reply = new PacketLocal(_context, _remotePeer); reply.setFlag(Packet.FLAG_RESET); reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED); - reply.setSendStreamId(_sendStreamId); - reply.setReceiveStreamId(_receiveStreamId); + reply.setSendStreamId(_sendStreamId.get()); + reply.setReceiveStreamId(_receiveStreamId.get()); // TODO remove this someday, as of 0.9.20 we do not require it reply.setOptionalFrom(_connectionManager.getSession().getMyDestination()); reply.setLocalPort(_localPort); reply.setRemotePort(_remotePort); // this just sends the packet - no retries or whatnot if (_outboundQueue.enqueue(reply)) { - _unackedPacketsReceived = 0; + _unackedPacketsReceived.set(0); _lastSendTime = _context.clock().now(); resetActivityTimer(); } @@ -397,7 +400,7 @@ class Connection { //_context.statManager().getStatLog().addData(Packet.toId(_sendStreamId), "stream.rtt", _options.getRTT(), _options.getWindowSize()); if (_outboundQueue.enqueue(packet)) { - _unackedPacketsReceived = 0; + _unackedPacketsReceived.set(0); _lastSendTime = _context.clock().now(); resetActivityTimer(); } @@ -504,10 +507,10 @@ class Connection { } // for } // !isEmpty() if (acked != null) { + _ackedPackets.addAndGet(acked.size()); for (int i = 0; i < acked.size(); i++) { PacketLocal p = acked.get(i); // removed from _outboundPackets above in iterator - _ackedPackets++; if (p.getNumSends() > 1) { _activeResends.decrementAndGet(); if (_log.shouldLog(Log.DEBUG)) @@ -814,26 +817,34 @@ class Connection { _connectionManager.updateOptsFromShare(this); } - private boolean _sendStreamIdSet = false; - /** what stream do we send data to the peer on? - * @return non-global stream sending ID + /** + * What stream do we send data to the peer on? + * @return non-global stream sending ID, or 0 if unknown + */ + public long getSendStreamId() { return _sendStreamId.get(); } + + /** + * @param id 0 to 0xffffffff + * @throws RuntimeException if already set to nonzero */ - public long getSendStreamId() { return _sendStreamId; } public void setSendStreamId(long id) { - if (_sendStreamIdSet) throw new RuntimeException("Send stream ID already set [" + _sendStreamId + ", " + id + "]"); - _sendStreamIdSet = true; - _sendStreamId = id; + if (!_sendStreamId.compareAndSet(0, id)) + throw new RuntimeException("Send stream ID already set [" + _sendStreamId + ", " + id + "]"); } - private boolean _receiveStreamIdSet = false; - /** The stream ID of a peer connection that sends data to us. (may be null) - * @return receive stream ID, or null if there isn't one + /** + * The stream ID of a peer connection that sends data to us, or zero if unknown. + * @return receive stream ID, or 0 if unknown + */ + public long getReceiveStreamId() { return _receiveStreamId.get(); } + + /** + * @param id 0 to 0xffffffff + * @throws RuntimeException if already set to nonzero */ - public long getReceiveStreamId() { return _receiveStreamId; } public void setReceiveStreamId(long id) { - if (_receiveStreamIdSet) throw new RuntimeException("Receive stream ID already set [" + _receiveStreamId + ", " + id + "]"); - _receiveStreamIdSet = true; - _receiveStreamId = id; + if (!_receiveStreamId.compareAndSet(0, id)) + throw new RuntimeException("Receive stream ID already set [" + _receiveStreamId + ", " + id + "]"); synchronized (_connectLock) { _connectLock.notifyAll(); } } @@ -896,14 +907,14 @@ class Connection { public ConnectionPacketHandler getPacketHandler() { return _handler; } - public long getLifetimeBytesSent() { return _lifetimeBytesSent; } - public long getLifetimeBytesReceived() { return _lifetimeBytesReceived; } - public long getLifetimeDupMessagesSent() { return _lifetimeDupMessageSent; } - public long getLifetimeDupMessagesReceived() { return _lifetimeDupMessageReceived; } - public void incrementBytesSent(int bytes) { _lifetimeBytesSent += bytes; } - public void incrementDupMessagesSent(int msgs) { _lifetimeDupMessageSent += msgs; } - public void incrementBytesReceived(int bytes) { _lifetimeBytesReceived += bytes; } - public void incrementDupMessagesReceived(int msgs) { _lifetimeDupMessageReceived += msgs; } + public long getLifetimeBytesSent() { return _lifetimeBytesSent.get(); } + public long getLifetimeBytesReceived() { return _lifetimeBytesReceived.get(); } + public long getLifetimeDupMessagesSent() { return _lifetimeDupMessageSent.get(); } + public long getLifetimeDupMessagesReceived() { return _lifetimeDupMessageReceived.get(); } + public void incrementBytesSent(int bytes) { _lifetimeBytesSent.addAndGet(bytes); } + public void incrementDupMessagesSent(int msgs) { _lifetimeDupMessageSent.addAndGet(msgs); } + public void incrementBytesReceived(int bytes) { _lifetimeBytesReceived.addAndGet(bytes); } + public void incrementDupMessagesReceived(int msgs) { _lifetimeDupMessageReceived.addAndGet(msgs); } /** * Time when the scheduler next want to send a packet, or -1 if @@ -944,7 +955,7 @@ class Connection { /** how many packets have we sent and the other side has ACKed? * @return Count of how many packets ACKed. */ - public long getAckedPackets() { return _ackedPackets; } + public long getAckedPackets() { return _ackedPackets.get(); } public long getCreatedOn() { return _createdOn; } /** @return 0 if not sent */ @@ -959,8 +970,9 @@ class Connection { _updatedShareOpts = true; } } - public void incrementUnackedPacketsReceived() { _unackedPacketsReceived++; } - public int getUnackedPacketsReceived() { return _unackedPacketsReceived; } + + public void incrementUnackedPacketsReceived() { _unackedPacketsReceived.incrementAndGet(); } + public int getUnackedPacketsReceived() { return _unackedPacketsReceived.get(); } /** how many packets have we sent but not yet received an ACK for? * @return Count of packets in-flight. @@ -1006,7 +1018,7 @@ class Connection { void waitForConnect() { long expiration = _context.clock().now() + _options.getConnectTimeout(); while (true) { - if (_connected.get() && (_receiveStreamId > 0) && (_sendStreamId > 0) ) { + if (_connected.get() && (_receiveStreamId.get() > 0) && (_sendStreamId.get() > 0) ) { // w00t if (_log.shouldLog(Log.DEBUG)) _log.debug("waitForConnect(): Connected and we have stream IDs"); @@ -1168,13 +1180,15 @@ class Connection { public String toString() { StringBuilder buf = new StringBuilder(256); buf.append("[Connection "); - if (_receiveStreamId > 0) - buf.append(Packet.toId(_receiveStreamId)); + long id = _receiveStreamId.get(); + if (id > 0) + buf.append(Packet.toId(id)); else buf.append("unknown"); buf.append('/'); - if (_sendStreamId > 0) - buf.append(Packet.toId(_sendStreamId)); + id = _sendStreamId.get(); + if (id > 0) + buf.append(Packet.toId(id)); else buf.append("unknown"); if (_isInbound) @@ -1344,9 +1358,9 @@ class Connection { // bugfix release 0.7.8, we weren't dividing by 1000 _packet.setResendDelay(getOptions().getResendDelay() / 1000); if (_packet.getReceiveStreamId() <= 0) - _packet.setReceiveStreamId(_receiveStreamId); + _packet.setReceiveStreamId(_receiveStreamId.get()); if (_packet.getSendStreamId() <= 0) - _packet.setSendStreamId(_sendStreamId); + _packet.setSendStreamId(_sendStreamId.get()); int newWindowSize = getOptions().getWindowSize(); @@ -1425,7 +1439,7 @@ class Connection { " (wsize " + newWindowSize + " lifetime " + (_context.clock().now() - _packet.getCreatedOn()) + "ms)"); - _unackedPacketsReceived = 0; + _unackedPacketsReceived.set(0); _lastSendTime = _context.clock().now(); // timer reset added 0.9.1 resetActivityTimer();