forked from I2P_Developers/i2p.i2p
* Streaming: Consolidate scheduling of DisconnectEvent, and ensure
we only do it once. (Ticket #1041)
This commit is contained in:
@@ -59,7 +59,7 @@ class Connection {
|
|||||||
private I2PSocketFull _socket;
|
private I2PSocketFull _socket;
|
||||||
/** set to an error cause if the connection could not be established */
|
/** set to an error cause if the connection could not be established */
|
||||||
private String _connectionError;
|
private String _connectionError;
|
||||||
private long _disconnectScheduledOn;
|
private final AtomicLong _disconnectScheduledOn = new AtomicLong();
|
||||||
private long _lastReceivedOn;
|
private long _lastReceivedOn;
|
||||||
private final ActivityTimer _activityTimer;
|
private final ActivityTimer _activityTimer;
|
||||||
/** window size when we last saw congestion */
|
/** window size when we last saw congestion */
|
||||||
@@ -138,7 +138,6 @@ class Connection {
|
|||||||
_lastCongestionTime = -1;
|
_lastCongestionTime = -1;
|
||||||
_lastCongestionHighestUnacked = -1;
|
_lastCongestionHighestUnacked = -1;
|
||||||
_connected = true;
|
_connected = true;
|
||||||
_disconnectScheduledOn = -1;
|
|
||||||
_lastReceivedOn = -1;
|
_lastReceivedOn = -1;
|
||||||
_activityTimer = new ActivityTimer();
|
_activityTimer = new ActivityTimer();
|
||||||
_ackSinceCongestion = true;
|
_ackSinceCongestion = true;
|
||||||
@@ -283,10 +282,7 @@ class Connection {
|
|||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
void sendReset() {
|
void sendReset() {
|
||||||
if (_disconnectScheduledOn < 0) {
|
scheduleDisconnectEvent();
|
||||||
_disconnectScheduledOn = _context.clock().now();
|
|
||||||
_context.simpleScheduler().addEvent(new DisconnectEvent(), DISCONNECT_TIMEOUT);
|
|
||||||
}
|
|
||||||
long now = _context.clock().now();
|
long now = _context.clock().now();
|
||||||
if (_resetSentOn + 10*1000 > now) return; // don't send resets too fast
|
if (_resetSentOn + 10*1000 > now) return; // don't send resets too fast
|
||||||
if (_resetReceived) return;
|
if (_resetReceived) return;
|
||||||
@@ -538,10 +534,7 @@ class Connection {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void resetReceived() {
|
void resetReceived() {
|
||||||
if (_disconnectScheduledOn < 0) {
|
scheduleDisconnectEvent();
|
||||||
_disconnectScheduledOn = _context.clock().now();
|
|
||||||
_context.simpleScheduler().addEvent(new DisconnectEvent(), DISCONNECT_TIMEOUT);
|
|
||||||
}
|
|
||||||
_resetReceived = true;
|
_resetReceived = true;
|
||||||
IOException ioe = new IOException("Reset received");
|
IOException ioe = new IOException("Reset received");
|
||||||
_outputStream.streamErrorOccurred(ioe);
|
_outputStream.streamErrorOccurred(ioe);
|
||||||
@@ -565,7 +558,9 @@ class Connection {
|
|||||||
public boolean getHardDisconnected() { return _hardDisconnected; }
|
public boolean getHardDisconnected() { return _hardDisconnected; }
|
||||||
public boolean getResetSent() { return _resetSent; }
|
public boolean getResetSent() { return _resetSent; }
|
||||||
public long getResetSentOn() { return _resetSentOn; }
|
public long getResetSentOn() { return _resetSentOn; }
|
||||||
public long getDisconnectScheduledOn() { return _disconnectScheduledOn; }
|
|
||||||
|
/** @return 0 if not scheduled */
|
||||||
|
public long getDisconnectScheduledOn() { return _disconnectScheduledOn.get(); }
|
||||||
|
|
||||||
void disconnect(boolean cleanDisconnect) {
|
void disconnect(boolean cleanDisconnect) {
|
||||||
disconnect(cleanDisconnect, true);
|
disconnect(cleanDisconnect, true);
|
||||||
@@ -591,10 +586,7 @@ class Connection {
|
|||||||
killOutstandingPackets();
|
killOutstandingPackets();
|
||||||
}
|
}
|
||||||
if (removeFromConMgr) {
|
if (removeFromConMgr) {
|
||||||
if (_disconnectScheduledOn < 0) {
|
scheduleDisconnectEvent();
|
||||||
_disconnectScheduledOn = _context.clock().now();
|
|
||||||
_context.simpleScheduler().addEvent(new DisconnectEvent(), DISCONNECT_TIMEOUT);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
_connected = false;
|
_connected = false;
|
||||||
}
|
}
|
||||||
@@ -611,9 +603,7 @@ class Connection {
|
|||||||
_activityTimer.cancel();
|
_activityTimer.cancel();
|
||||||
_inputStream.streamErrorOccurred(new IOException("disconnected!"));
|
_inputStream.streamErrorOccurred(new IOException("disconnected!"));
|
||||||
|
|
||||||
if (_disconnectScheduledOn < 0) {
|
if (_disconnectScheduledOn.compareAndSet(0, _context.clock().now())) {
|
||||||
_disconnectScheduledOn = _context.clock().now();
|
|
||||||
|
|
||||||
if (_log.shouldLog(Log.INFO))
|
if (_log.shouldLog(Log.INFO))
|
||||||
_log.info("Connection disconnect complete from dead, drop the con "
|
_log.info("Connection disconnect complete from dead, drop the con "
|
||||||
+ toString());
|
+ toString());
|
||||||
@@ -640,6 +630,19 @@ class Connection {
|
|||||||
// _context.sessionKeyManager().failTags(_remotePeer.getPublicKey());
|
// _context.sessionKeyManager().failTags(_remotePeer.getPublicKey());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Schedule the end of the TIME-WAIT state,
|
||||||
|
* but only if not previously scheduled.
|
||||||
|
* @return true if a new event was scheduled; false if already scheduled
|
||||||
|
* @since 0.9.9
|
||||||
|
*/
|
||||||
|
private boolean scheduleDisconnectEvent() {
|
||||||
|
if (!_disconnectScheduledOn.compareAndSet(0, _context.clock().now()))
|
||||||
|
return false;
|
||||||
|
_context.simpleScheduler().addEvent(new DisconnectEvent(), DISCONNECT_TIMEOUT);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
private class DisconnectEvent implements SimpleTimer.TimedEvent {
|
private class DisconnectEvent implements SimpleTimer.TimedEvent {
|
||||||
public DisconnectEvent() {
|
public DisconnectEvent() {
|
||||||
if (_log.shouldLog(Log.INFO))
|
if (_log.shouldLog(Log.INFO))
|
||||||
@@ -809,10 +812,7 @@ class Connection {
|
|||||||
public long getCloseSentOn() { return _closeSentOn; }
|
public long getCloseSentOn() { return _closeSentOn; }
|
||||||
public void setCloseSentOn(long when) {
|
public void setCloseSentOn(long when) {
|
||||||
_closeSentOn = when;
|
_closeSentOn = when;
|
||||||
if (_disconnectScheduledOn < 0) {
|
scheduleDisconnectEvent();
|
||||||
_disconnectScheduledOn = _context.clock().now();
|
|
||||||
_context.simpleScheduler().addEvent(new DisconnectEvent(), DISCONNECT_TIMEOUT);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
public long getCloseReceivedOn() { return _closeReceivedOn; }
|
public long getCloseReceivedOn() { return _closeReceivedOn; }
|
||||||
public void setCloseReceivedOn(long when) { _closeReceivedOn = when; }
|
public void setCloseReceivedOn(long when) { _closeReceivedOn = when; }
|
||||||
@@ -1005,7 +1005,7 @@ class Connection {
|
|||||||
_outputStream.streamErrorOccurred(ioe);
|
_outputStream.streamErrorOccurred(ioe);
|
||||||
// Clean disconnect if we have already scheduled one
|
// Clean disconnect if we have already scheduled one
|
||||||
// (generally because we already sent a close)
|
// (generally because we already sent a close)
|
||||||
disconnect(_disconnectScheduledOn >= 0);
|
disconnect(_disconnectScheduledOn.get() > 0);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user