atomics and finals

This commit is contained in:
zzz
2015-04-17 17:15:22 +00:00
parent 22993e1ea6
commit 7b82393336

View File

@@ -29,8 +29,8 @@ class Connection {
private final Log _log;
private final ConnectionManager _connectionManager;
private Destination _remotePeer;
private long _sendStreamId;
private long _receiveStreamId;
private final AtomicLong _sendStreamId = new AtomicLong();
private final AtomicLong _receiveStreamId = new AtomicLong();
private volatile long _lastSendTime;
private final AtomicLong _lastSendId;
private final AtomicBoolean _resetReceived = new AtomicBoolean();
@@ -43,11 +43,11 @@ class Connection {
private final SchedulerChooser _chooser;
/** Locking: _nextSendLock */
private long _nextSendTime;
private long _ackedPackets;
private final AtomicLong _ackedPackets = new AtomicLong();
private final long _createdOn;
private final AtomicLong _closeSentOn = new AtomicLong();
private final AtomicLong _closeReceivedOn = new AtomicLong();
private int _unackedPacketsReceived;
private final AtomicInteger _unackedPacketsReceived = new AtomicInteger();
private long _congestionWindowEnd;
private volatile long _highestAckedThrough;
private final boolean _isInbound;
@@ -77,16 +77,16 @@ class Connection {
private final AtomicInteger _activeResends = new AtomicInteger();
private final ConEvent _connectionEvent;
private final int _randomWait;
private int _localPort;
private int _remotePort;
private final int _localPort;
private final int _remotePort;
private final SimpleTimer2 _timer;
private long _lifetimeBytesSent;
private final AtomicLong _lifetimeBytesSent = new AtomicLong();
/** TBD for tcpdump-compatible ack output */
private long _lowestBytesAckedThrough;
private long _lifetimeBytesReceived;
private long _lifetimeDupMessageSent;
private long _lifetimeDupMessageReceived;
private final AtomicLong _lifetimeBytesReceived = new AtomicLong();
private final AtomicLong _lifetimeDupMessageSent = new AtomicLong();
private final AtomicLong _lifetimeDupMessageReceived = new AtomicLong();
public static final long MAX_RESEND_DELAY = 45*1000;
public static final long MIN_RESEND_DELAY = 750;
@@ -132,6 +132,9 @@ class Connection {
if (opts != null) {
_localPort = opts.getLocalPort();
_remotePort = opts.getPort();
} else {
_localPort = 0;
_remotePort = 0;
}
_options = (opts != null ? opts : new ConnectionOptions());
_outputStream.setWriteTimeout((int)_options.getWriteTimeout());
@@ -305,19 +308,19 @@ class Connection {
if (_resetReceived.get()) return;
// Unconditionally set
_resetSentOn.set(now);
if ( (_remotePeer == null) || (_sendStreamId <= 0) ) return;
if ( (_remotePeer == null) || (_sendStreamId.get() <= 0) ) return;
PacketLocal reply = new PacketLocal(_context, _remotePeer);
reply.setFlag(Packet.FLAG_RESET);
reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
reply.setSendStreamId(_sendStreamId);
reply.setReceiveStreamId(_receiveStreamId);
reply.setSendStreamId(_sendStreamId.get());
reply.setReceiveStreamId(_receiveStreamId.get());
// TODO remove this someday, as of 0.9.20 we do not require it
reply.setOptionalFrom(_connectionManager.getSession().getMyDestination());
reply.setLocalPort(_localPort);
reply.setRemotePort(_remotePort);
// this just sends the packet - no retries or whatnot
if (_outboundQueue.enqueue(reply)) {
_unackedPacketsReceived = 0;
_unackedPacketsReceived.set(0);
_lastSendTime = _context.clock().now();
resetActivityTimer();
}
@@ -397,7 +400,7 @@ class Connection {
//_context.statManager().getStatLog().addData(Packet.toId(_sendStreamId), "stream.rtt", _options.getRTT(), _options.getWindowSize());
if (_outboundQueue.enqueue(packet)) {
_unackedPacketsReceived = 0;
_unackedPacketsReceived.set(0);
_lastSendTime = _context.clock().now();
resetActivityTimer();
}
@@ -504,10 +507,10 @@ class Connection {
} // for
} // !isEmpty()
if (acked != null) {
_ackedPackets.addAndGet(acked.size());
for (int i = 0; i < acked.size(); i++) {
PacketLocal p = acked.get(i);
// removed from _outboundPackets above in iterator
_ackedPackets++;
if (p.getNumSends() > 1) {
_activeResends.decrementAndGet();
if (_log.shouldLog(Log.DEBUG))
@@ -814,26 +817,34 @@ class Connection {
_connectionManager.updateOptsFromShare(this);
}
private boolean _sendStreamIdSet = false;
/** what stream do we send data to the peer on?
* @return non-global stream sending ID
/**
* What stream do we send data to the peer on?
* @return non-global stream sending ID, or 0 if unknown
*/
public long getSendStreamId() { return _sendStreamId.get(); }
/**
* @param id 0 to 0xffffffff
* @throws RuntimeException if already set to nonzero
*/
public long getSendStreamId() { return _sendStreamId; }
public void setSendStreamId(long id) {
if (_sendStreamIdSet) throw new RuntimeException("Send stream ID already set [" + _sendStreamId + ", " + id + "]");
_sendStreamIdSet = true;
_sendStreamId = id;
if (!_sendStreamId.compareAndSet(0, id))
throw new RuntimeException("Send stream ID already set [" + _sendStreamId + ", " + id + "]");
}
private boolean _receiveStreamIdSet = false;
/** The stream ID of a peer connection that sends data to us. (may be null)
* @return receive stream ID, or null if there isn't one
/**
* The stream ID of a peer connection that sends data to us, or zero if unknown.
* @return receive stream ID, or 0 if unknown
*/
public long getReceiveStreamId() { return _receiveStreamId.get(); }
/**
* @param id 0 to 0xffffffff
* @throws RuntimeException if already set to nonzero
*/
public long getReceiveStreamId() { return _receiveStreamId; }
public void setReceiveStreamId(long id) {
if (_receiveStreamIdSet) throw new RuntimeException("Receive stream ID already set [" + _receiveStreamId + ", " + id + "]");
_receiveStreamIdSet = true;
_receiveStreamId = id;
if (!_receiveStreamId.compareAndSet(0, id))
throw new RuntimeException("Receive stream ID already set [" + _receiveStreamId + ", " + id + "]");
synchronized (_connectLock) { _connectLock.notifyAll(); }
}
@@ -896,14 +907,14 @@ class Connection {
public ConnectionPacketHandler getPacketHandler() { return _handler; }
public long getLifetimeBytesSent() { return _lifetimeBytesSent; }
public long getLifetimeBytesReceived() { return _lifetimeBytesReceived; }
public long getLifetimeDupMessagesSent() { return _lifetimeDupMessageSent; }
public long getLifetimeDupMessagesReceived() { return _lifetimeDupMessageReceived; }
public void incrementBytesSent(int bytes) { _lifetimeBytesSent += bytes; }
public void incrementDupMessagesSent(int msgs) { _lifetimeDupMessageSent += msgs; }
public void incrementBytesReceived(int bytes) { _lifetimeBytesReceived += bytes; }
public void incrementDupMessagesReceived(int msgs) { _lifetimeDupMessageReceived += msgs; }
public long getLifetimeBytesSent() { return _lifetimeBytesSent.get(); }
public long getLifetimeBytesReceived() { return _lifetimeBytesReceived.get(); }
public long getLifetimeDupMessagesSent() { return _lifetimeDupMessageSent.get(); }
public long getLifetimeDupMessagesReceived() { return _lifetimeDupMessageReceived.get(); }
public void incrementBytesSent(int bytes) { _lifetimeBytesSent.addAndGet(bytes); }
public void incrementDupMessagesSent(int msgs) { _lifetimeDupMessageSent.addAndGet(msgs); }
public void incrementBytesReceived(int bytes) { _lifetimeBytesReceived.addAndGet(bytes); }
public void incrementDupMessagesReceived(int msgs) { _lifetimeDupMessageReceived.addAndGet(msgs); }
/**
* Time when the scheduler next want to send a packet, or -1 if
@@ -944,7 +955,7 @@ class Connection {
/** how many packets have we sent and the other side has ACKed?
* @return Count of how many packets ACKed.
*/
public long getAckedPackets() { return _ackedPackets; }
public long getAckedPackets() { return _ackedPackets.get(); }
public long getCreatedOn() { return _createdOn; }
/** @return 0 if not sent */
@@ -959,8 +970,9 @@ class Connection {
_updatedShareOpts = true;
}
}
public void incrementUnackedPacketsReceived() { _unackedPacketsReceived++; }
public int getUnackedPacketsReceived() { return _unackedPacketsReceived; }
public void incrementUnackedPacketsReceived() { _unackedPacketsReceived.incrementAndGet(); }
public int getUnackedPacketsReceived() { return _unackedPacketsReceived.get(); }
/** how many packets have we sent but not yet received an ACK for?
* @return Count of packets in-flight.
@@ -1006,7 +1018,7 @@ class Connection {
void waitForConnect() {
long expiration = _context.clock().now() + _options.getConnectTimeout();
while (true) {
if (_connected.get() && (_receiveStreamId > 0) && (_sendStreamId > 0) ) {
if (_connected.get() && (_receiveStreamId.get() > 0) && (_sendStreamId.get() > 0) ) {
// w00t
if (_log.shouldLog(Log.DEBUG))
_log.debug("waitForConnect(): Connected and we have stream IDs");
@@ -1168,13 +1180,15 @@ class Connection {
public String toString() {
StringBuilder buf = new StringBuilder(256);
buf.append("[Connection ");
if (_receiveStreamId > 0)
buf.append(Packet.toId(_receiveStreamId));
long id = _receiveStreamId.get();
if (id > 0)
buf.append(Packet.toId(id));
else
buf.append("unknown");
buf.append('/');
if (_sendStreamId > 0)
buf.append(Packet.toId(_sendStreamId));
id = _sendStreamId.get();
if (id > 0)
buf.append(Packet.toId(id));
else
buf.append("unknown");
if (_isInbound)
@@ -1344,9 +1358,9 @@ class Connection {
// bugfix release 0.7.8, we weren't dividing by 1000
_packet.setResendDelay(getOptions().getResendDelay() / 1000);
if (_packet.getReceiveStreamId() <= 0)
_packet.setReceiveStreamId(_receiveStreamId);
_packet.setReceiveStreamId(_receiveStreamId.get());
if (_packet.getSendStreamId() <= 0)
_packet.setSendStreamId(_sendStreamId);
_packet.setSendStreamId(_sendStreamId.get());
int newWindowSize = getOptions().getWindowSize();
@@ -1425,7 +1439,7 @@ class Connection {
" (wsize "
+ newWindowSize + " lifetime "
+ (_context.clock().now() - _packet.getCreatedOn()) + "ms)");
_unackedPacketsReceived = 0;
_unackedPacketsReceived.set(0);
_lastSendTime = _context.clock().now();
// timer reset added 0.9.1
resetActivityTimer();