diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index 4bf1be1a6..4770356a9 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -6,6 +6,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -32,9 +33,8 @@ class Connection { private long _receiveStreamId; private volatile long _lastSendTime; private final AtomicLong _lastSendId; - private boolean _resetReceived; - private boolean _resetSent; - private long _resetSentOn; + private final AtomicBoolean _resetReceived = new AtomicBoolean(); + private final AtomicLong _resetSentOn = new AtomicLong(); private volatile boolean _connected; private boolean _hardDisconnected; private final MessageInputStream _inputStream; @@ -43,8 +43,8 @@ class Connection { private volatile long _nextSendTime; private long _ackedPackets; private final long _createdOn; - private long _closeSentOn; - private long _closeReceivedOn; + private final AtomicLong _closeSentOn = new AtomicLong(); + private final AtomicLong _closeReceivedOn = new AtomicLong(); private int _unackedPacketsReceived; private long _congestionWindowEnd; private volatile long _highestAckedThrough; @@ -130,8 +130,6 @@ class Connection { _lastSendId = new AtomicLong(-1); _nextSendTime = -1; _createdOn = _context.clock().now(); - _closeSentOn = -1; - _closeReceivedOn = -1; _congestionWindowEnd = _options.getWindowSize()-1; _highestAckedThrough = -1; _lastCongestionSeenAt = MAX_WINDOW_SIZE*2; // lets allow it to grow @@ -142,7 +140,6 @@ class Connection { _activityTimer = new ActivityTimer(); _ackSinceCongestion = true; _connectLock = new Object(); - _resetSentOn = -1; _connectionEvent = new ConEvent(); _randomWait = _context.random().nextInt(10*1000); // just do this once to reduce usage // all createRateStats in ConnectionManager @@ -155,9 +152,10 @@ class Connection { } void closeReceived() { - setCloseReceivedOn(_context.clock().now()); - _inputStream.closeReceived(); - synchronized (_connectLock) { _connectLock.notifyAll(); } + if (setCloseReceivedOn(_context.clock().now())) { + _inputStream.closeReceived(); + synchronized (_connectLock) { _connectLock.notifyAll(); } + } } /** @@ -284,11 +282,9 @@ class Connection { void sendReset() { scheduleDisconnectEvent(); long now = _context.clock().now(); - if (_resetSentOn + 10*1000 > now) return; // don't send resets too fast - if (_resetReceived) return; - _resetSent = true; - if (_resetSentOn <= 0) - _resetSentOn = now; + if (_resetSentOn.get() + 10*1000 > now) return; // don't send resets too fast + if (_resetReceived.get()) return; + _resetSentOn.compareAndSet(0, now); if ( (_remotePeer == null) || (_sendStreamId <= 0) ) return; PacketLocal reply = new PacketLocal(_context, _remotePeer); reply.setFlag(Packet.FLAG_RESET); @@ -534,15 +530,17 @@ class Connection { } void resetReceived() { + if (!_resetReceived.compareAndSet(false, true)) + return; scheduleDisconnectEvent(); - _resetReceived = true; IOException ioe = new IOException("Reset received"); _outputStream.streamErrorOccurred(ioe); _inputStream.streamErrorOccurred(ioe); _connectionError = "Connection reset"; synchronized (_connectLock) { _connectLock.notifyAll(); } } - public boolean getResetReceived() { return _resetReceived; } + + public boolean getResetReceived() { return _resetReceived.get(); } public void setInbound() { _isInbound = true; } public boolean isInbound() { return _isInbound; } @@ -556,8 +554,10 @@ class Connection { public boolean getIsConnected() { return _connected; } public boolean getHardDisconnected() { return _hardDisconnected; } - public boolean getResetSent() { return _resetSent; } - public long getResetSentOn() { return _resetSentOn; } + public boolean getResetSent() { return _resetSentOn.get() > 0; } + + /** @return 0 if not sent */ + public long getResetSentOn() { return _resetSentOn.get(); } /** @return 0 if not scheduled */ public long getDisconnectScheduledOn() { return _disconnectScheduledOn.get(); } @@ -750,10 +750,11 @@ class Connection { public void setConnectionError(String err) { _connectionError = err; } public long getLifetime() { - if (_closeSentOn <= 0) + long cso = _closeSentOn.get(); + if (cso <= 0) return _context.clock().now() - _createdOn; else - return _closeSentOn - _createdOn; + return cso - _createdOn; } public ConnectionPacketHandler getPacketHandler() { return _handler; } @@ -809,16 +810,27 @@ class Connection { */ public long getAckedPackets() { return _ackedPackets; } public long getCreatedOn() { return _createdOn; } - public long getCloseSentOn() { return _closeSentOn; } + + /** @return 0 if not sent */ + public long getCloseSentOn() { return _closeSentOn.get(); } + public void setCloseSentOn(long when) { - _closeSentOn = when; - scheduleDisconnectEvent(); + if (_closeSentOn.compareAndSet(0, when)) + scheduleDisconnectEvent(); + } + + /** @return 0 if not received */ + public long getCloseReceivedOn() { return _closeReceivedOn.get(); } + + /** + * @return true if the first close received, false otherwise + */ + public boolean setCloseReceivedOn(long when) { + return _closeReceivedOn.compareAndSet(0, when); } - public long getCloseReceivedOn() { return _closeReceivedOn; } - public void setCloseReceivedOn(long when) { _closeReceivedOn = when; } public void updateShareOpts() { - if (_closeSentOn > 0 && !_updatedShareOpts) { + if (_closeSentOn.get() > 0 && !_updatedShareOpts) { _connectionManager.updateShareOpts(this); _updatedShareOpts = true; } @@ -981,7 +993,7 @@ class Connection { _log.warn("Inactivity timer expired, not doing anything"); break; case ConnectionOptions.INACTIVITY_ACTION_SEND: - if (_closeSentOn <= 0 && _closeReceivedOn <= 0) { + if (_closeSentOn.get() <= 0 && _closeReceivedOn.get() <= 0) { if (_log.shouldLog(Log.WARN)) _log.warn("Sending some data due to inactivity"); _receiver.send(null, 0, 0, true); @@ -1156,8 +1168,8 @@ class Connection { if (_packet.getAckTime() > 0) return false; - if (_resetSent || _resetReceived || !_connected) { - if(_log.shouldLog(Log.WARN) && (!_resetSent) && (!_resetReceived)) + if (_resetSentOn.get() > 0 || _resetReceived.get() || !_connected) { + if(_log.shouldLog(Log.WARN) && (_resetSentOn.get() <= 0) && (!_resetReceived.get())) _log.warn("??? no resets but not connected: " + _packet); // don't think this is possible _packet.cancelled(); return false; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java index 8f2a40f46..ca2f65ff7 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java @@ -69,7 +69,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { boolean doSend = true; if ( (size <= 0) && (con.getLastSendId() >= 0) ) { if (con.getOutputStream().getClosed()) { - if (con.getCloseSentOn() < 0) { + if (con.getCloseSentOn() <= 0) { doSend = true; } else { // closed, no new data, and we've already sent a close packet