* Streaming: Atomics for close/reset send/receive

so we only do things once. (Ticket #1041)
This commit is contained in:
zzz
2013-09-24 14:01:48 +00:00
parent 3499ed7bb0
commit fb40ab1f00
2 changed files with 44 additions and 32 deletions

View File

@@ -6,6 +6,7 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@@ -32,9 +33,8 @@ class Connection {
private long _receiveStreamId; private long _receiveStreamId;
private volatile long _lastSendTime; private volatile long _lastSendTime;
private final AtomicLong _lastSendId; private final AtomicLong _lastSendId;
private boolean _resetReceived; private final AtomicBoolean _resetReceived = new AtomicBoolean();
private boolean _resetSent; private final AtomicLong _resetSentOn = new AtomicLong();
private long _resetSentOn;
private volatile boolean _connected; private volatile boolean _connected;
private boolean _hardDisconnected; private boolean _hardDisconnected;
private final MessageInputStream _inputStream; private final MessageInputStream _inputStream;
@@ -43,8 +43,8 @@ class Connection {
private volatile long _nextSendTime; private volatile long _nextSendTime;
private long _ackedPackets; private long _ackedPackets;
private final long _createdOn; private final long _createdOn;
private long _closeSentOn; private final AtomicLong _closeSentOn = new AtomicLong();
private long _closeReceivedOn; private final AtomicLong _closeReceivedOn = new AtomicLong();
private int _unackedPacketsReceived; private int _unackedPacketsReceived;
private long _congestionWindowEnd; private long _congestionWindowEnd;
private volatile long _highestAckedThrough; private volatile long _highestAckedThrough;
@@ -130,8 +130,6 @@ class Connection {
_lastSendId = new AtomicLong(-1); _lastSendId = new AtomicLong(-1);
_nextSendTime = -1; _nextSendTime = -1;
_createdOn = _context.clock().now(); _createdOn = _context.clock().now();
_closeSentOn = -1;
_closeReceivedOn = -1;
_congestionWindowEnd = _options.getWindowSize()-1; _congestionWindowEnd = _options.getWindowSize()-1;
_highestAckedThrough = -1; _highestAckedThrough = -1;
_lastCongestionSeenAt = MAX_WINDOW_SIZE*2; // lets allow it to grow _lastCongestionSeenAt = MAX_WINDOW_SIZE*2; // lets allow it to grow
@@ -142,7 +140,6 @@ class Connection {
_activityTimer = new ActivityTimer(); _activityTimer = new ActivityTimer();
_ackSinceCongestion = true; _ackSinceCongestion = true;
_connectLock = new Object(); _connectLock = new Object();
_resetSentOn = -1;
_connectionEvent = new ConEvent(); _connectionEvent = new ConEvent();
_randomWait = _context.random().nextInt(10*1000); // just do this once to reduce usage _randomWait = _context.random().nextInt(10*1000); // just do this once to reduce usage
// all createRateStats in ConnectionManager // all createRateStats in ConnectionManager
@@ -155,10 +152,11 @@ class Connection {
} }
void closeReceived() { void closeReceived() {
setCloseReceivedOn(_context.clock().now()); if (setCloseReceivedOn(_context.clock().now())) {
_inputStream.closeReceived(); _inputStream.closeReceived();
synchronized (_connectLock) { _connectLock.notifyAll(); } synchronized (_connectLock) { _connectLock.notifyAll(); }
} }
}
/** /**
* This doesn't "send a choke". Rather, it blocks if the outbound window is full, * This doesn't "send a choke". Rather, it blocks if the outbound window is full,
@@ -284,11 +282,9 @@ class Connection {
void sendReset() { void sendReset() {
scheduleDisconnectEvent(); scheduleDisconnectEvent();
long now = _context.clock().now(); long now = _context.clock().now();
if (_resetSentOn + 10*1000 > now) return; // don't send resets too fast if (_resetSentOn.get() + 10*1000 > now) return; // don't send resets too fast
if (_resetReceived) return; if (_resetReceived.get()) return;
_resetSent = true; _resetSentOn.compareAndSet(0, now);
if (_resetSentOn <= 0)
_resetSentOn = now;
if ( (_remotePeer == null) || (_sendStreamId <= 0) ) return; if ( (_remotePeer == null) || (_sendStreamId <= 0) ) return;
PacketLocal reply = new PacketLocal(_context, _remotePeer); PacketLocal reply = new PacketLocal(_context, _remotePeer);
reply.setFlag(Packet.FLAG_RESET); reply.setFlag(Packet.FLAG_RESET);
@@ -534,15 +530,17 @@ class Connection {
} }
void resetReceived() { void resetReceived() {
if (!_resetReceived.compareAndSet(false, true))
return;
scheduleDisconnectEvent(); scheduleDisconnectEvent();
_resetReceived = true;
IOException ioe = new IOException("Reset received"); IOException ioe = new IOException("Reset received");
_outputStream.streamErrorOccurred(ioe); _outputStream.streamErrorOccurred(ioe);
_inputStream.streamErrorOccurred(ioe); _inputStream.streamErrorOccurred(ioe);
_connectionError = "Connection reset"; _connectionError = "Connection reset";
synchronized (_connectLock) { _connectLock.notifyAll(); } synchronized (_connectLock) { _connectLock.notifyAll(); }
} }
public boolean getResetReceived() { return _resetReceived; }
public boolean getResetReceived() { return _resetReceived.get(); }
public void setInbound() { _isInbound = true; } public void setInbound() { _isInbound = true; }
public boolean isInbound() { return _isInbound; } public boolean isInbound() { return _isInbound; }
@@ -556,8 +554,10 @@ class Connection {
public boolean getIsConnected() { return _connected; } public boolean getIsConnected() { return _connected; }
public boolean getHardDisconnected() { return _hardDisconnected; } public boolean getHardDisconnected() { return _hardDisconnected; }
public boolean getResetSent() { return _resetSent; } public boolean getResetSent() { return _resetSentOn.get() > 0; }
public long getResetSentOn() { return _resetSentOn; }
/** @return 0 if not sent */
public long getResetSentOn() { return _resetSentOn.get(); }
/** @return 0 if not scheduled */ /** @return 0 if not scheduled */
public long getDisconnectScheduledOn() { return _disconnectScheduledOn.get(); } public long getDisconnectScheduledOn() { return _disconnectScheduledOn.get(); }
@@ -750,10 +750,11 @@ class Connection {
public void setConnectionError(String err) { _connectionError = err; } public void setConnectionError(String err) { _connectionError = err; }
public long getLifetime() { public long getLifetime() {
if (_closeSentOn <= 0) long cso = _closeSentOn.get();
if (cso <= 0)
return _context.clock().now() - _createdOn; return _context.clock().now() - _createdOn;
else else
return _closeSentOn - _createdOn; return cso - _createdOn;
} }
public ConnectionPacketHandler getPacketHandler() { return _handler; } public ConnectionPacketHandler getPacketHandler() { return _handler; }
@@ -809,16 +810,27 @@ class Connection {
*/ */
public long getAckedPackets() { return _ackedPackets; } public long getAckedPackets() { return _ackedPackets; }
public long getCreatedOn() { return _createdOn; } public long getCreatedOn() { return _createdOn; }
public long getCloseSentOn() { return _closeSentOn; }
/** @return 0 if not sent */
public long getCloseSentOn() { return _closeSentOn.get(); }
public void setCloseSentOn(long when) { public void setCloseSentOn(long when) {
_closeSentOn = when; if (_closeSentOn.compareAndSet(0, when))
scheduleDisconnectEvent(); scheduleDisconnectEvent();
} }
public long getCloseReceivedOn() { return _closeReceivedOn; }
public void setCloseReceivedOn(long when) { _closeReceivedOn = when; } /** @return 0 if not received */
public long getCloseReceivedOn() { return _closeReceivedOn.get(); }
/**
* @return true if the first close received, false otherwise
*/
public boolean setCloseReceivedOn(long when) {
return _closeReceivedOn.compareAndSet(0, when);
}
public void updateShareOpts() { public void updateShareOpts() {
if (_closeSentOn > 0 && !_updatedShareOpts) { if (_closeSentOn.get() > 0 && !_updatedShareOpts) {
_connectionManager.updateShareOpts(this); _connectionManager.updateShareOpts(this);
_updatedShareOpts = true; _updatedShareOpts = true;
} }
@@ -981,7 +993,7 @@ class Connection {
_log.warn("Inactivity timer expired, not doing anything"); _log.warn("Inactivity timer expired, not doing anything");
break; break;
case ConnectionOptions.INACTIVITY_ACTION_SEND: case ConnectionOptions.INACTIVITY_ACTION_SEND:
if (_closeSentOn <= 0 && _closeReceivedOn <= 0) { if (_closeSentOn.get() <= 0 && _closeReceivedOn.get() <= 0) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Sending some data due to inactivity"); _log.warn("Sending some data due to inactivity");
_receiver.send(null, 0, 0, true); _receiver.send(null, 0, 0, true);
@@ -1156,8 +1168,8 @@ class Connection {
if (_packet.getAckTime() > 0) if (_packet.getAckTime() > 0)
return false; return false;
if (_resetSent || _resetReceived || !_connected) { if (_resetSentOn.get() > 0 || _resetReceived.get() || !_connected) {
if(_log.shouldLog(Log.WARN) && (!_resetSent) && (!_resetReceived)) if(_log.shouldLog(Log.WARN) && (_resetSentOn.get() <= 0) && (!_resetReceived.get()))
_log.warn("??? no resets but not connected: " + _packet); // don't think this is possible _log.warn("??? no resets but not connected: " + _packet); // don't think this is possible
_packet.cancelled(); _packet.cancelled();
return false; return false;

View File

@@ -69,7 +69,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
boolean doSend = true; boolean doSend = true;
if ( (size <= 0) && (con.getLastSendId() >= 0) ) { if ( (size <= 0) && (con.getLastSendId() >= 0) ) {
if (con.getOutputStream().getClosed()) { if (con.getOutputStream().getClosed()) {
if (con.getCloseSentOn() < 0) { if (con.getCloseSentOn() <= 0) {
doSend = true; doSend = true;
} else { } else {
// closed, no new data, and we've already sent a close packet // closed, no new data, and we've already sent a close packet