* Streaming:

- Cleanups
   - Remove setCloseReceivedOn(), unused outside Connection
   - OR the isFlagSet parameter instead of multiple calls
   - Remove acked packets from _outboundPackets inside synced iterator
   - Short-circuit _outboundPackets iterator if empty
   - Small optimization if not logging in ConnectionPacketHandler
   - Stub out processing of close ack (ticket #1042)
This commit is contained in:
zzz
2013-09-24 16:01:20 +00:00
parent fdf38a952d
commit 9318099845
3 changed files with 50 additions and 35 deletions

View File

@@ -151,13 +151,24 @@ class Connection {
return _lastSendId.incrementAndGet(); return _lastSendId.incrementAndGet();
} }
void closeReceived() { /**
if (setCloseReceivedOn(_context.clock().now())) { * Notify that a close was received
*/
public void closeReceived() {
if (_closeReceivedOn.compareAndSet(0, _context.clock().now())) {
_inputStream.closeReceived(); _inputStream.closeReceived();
synchronized (_connectLock) { _connectLock.notifyAll(); } synchronized (_connectLock) { _connectLock.notifyAll(); }
} }
} }
/**
* Notify that a close that we sent was acked
* @since 0.9.9
*/
public void ourCloseAcked() {
// todo
}
/** /**
* This doesn't "send a choke". Rather, it blocks if the outbound window is full, * This doesn't "send a choke". Rather, it blocks if the outbound window is full,
* thus choking the sender that calls this. * thus choking the sender that calls this.
@@ -276,15 +287,16 @@ class Connection {
} }
/** /**
* got a packet we shouldn't have, send 'em a reset * Got a packet we shouldn't have, send 'em a reset.
* * More than one reset may be sent.
*/ */
void sendReset() { public void sendReset() {
scheduleDisconnectEvent(); scheduleDisconnectEvent();
long now = _context.clock().now(); long now = _context.clock().now();
if (_resetSentOn.get() + 10*1000 > now) return; // don't send resets too fast if (_resetSentOn.get() + 10*1000 > now) return; // don't send resets too fast
if (_resetReceived.get()) return; if (_resetReceived.get()) return;
_resetSentOn.compareAndSet(0, now); // Unconditionally set
_resetSentOn.set(now);
if ( (_remotePeer == null) || (_sendStreamId <= 0) ) return; if ( (_remotePeer == null) || (_sendStreamId <= 0) ) return;
PacketLocal reply = new PacketLocal(_context, _remotePeer); PacketLocal reply = new PacketLocal(_context, _remotePeer);
reply.setFlag(Packet.FLAG_RESET); reply.setFlag(Packet.FLAG_RESET);
@@ -411,9 +423,9 @@ class Connection {
/** /**
* Process the acks and nacks received in a packet * Process the acks and nacks received in a packet
* @return List of packets acked or null * @return List of packets acked for the first time, or null if none
*/ */
List<PacketLocal> ackPackets(long ackThrough, long nacks[]) { public List<PacketLocal> ackPackets(long ackThrough, long nacks[]) {
// FIXME synch this part too? // FIXME synch this part too?
if (ackThrough < _highestAckedThrough) { if (ackThrough < _highestAckedThrough) {
// dupack which won't tell us anything // dupack which won't tell us anything
@@ -433,7 +445,9 @@ class Connection {
List<PacketLocal> acked = null; List<PacketLocal> acked = null;
synchronized (_outboundPackets) { synchronized (_outboundPackets) {
for (Map.Entry<Long, PacketLocal> e : _outboundPackets.entrySet()) { if (!_outboundPackets.isEmpty()) { // short circuit iterator
for (Iterator<Map.Entry<Long, PacketLocal>> iter = _outboundPackets.entrySet().iterator(); iter.hasNext(); ) {
Map.Entry<Long, PacketLocal> e = iter.next();
long id = e.getKey().longValue(); long id = e.getKey().longValue();
if (id <= ackThrough) { if (id <= ackThrough) {
boolean nacked = false; boolean nacked = false;
@@ -451,10 +465,11 @@ class Connection {
} }
if (!nacked) { // aka ACKed if (!nacked) { // aka ACKed
if (acked == null) if (acked == null)
acked = new ArrayList(1); acked = new ArrayList(8);
PacketLocal ackedPacket = e.getValue(); PacketLocal ackedPacket = e.getValue();
ackedPacket.ackReceived(); ackedPacket.ackReceived();
acked.add(ackedPacket); acked.add(ackedPacket);
iter.remove();
} }
} else { } else {
// TODO // TODO
@@ -475,11 +490,12 @@ class Connection {
//nackedPacket.incrementNACKs(); //nackedPacket.incrementNACKs();
break; // _outboundPackets is ordered break; // _outboundPackets is ordered
} }
} } // for
} // !isEmpty()
if (acked != null) { if (acked != null) {
for (int i = 0; i < acked.size(); i++) { for (int i = 0; i < acked.size(); i++) {
PacketLocal p = acked.get(i); PacketLocal p = acked.get(i);
_outboundPackets.remove(Long.valueOf(p.getSequenceNum())); // removed from _outboundPackets above in iterator
_ackedPackets++; _ackedPackets++;
if (p.getNumSends() > 1) { if (p.getNumSends() > 1) {
_activeResends.decrementAndGet(); _activeResends.decrementAndGet();
@@ -529,7 +545,8 @@ class Connection {
_log.warn("Took " + elapsed + "ms to pump through " + sched + " on " + toString()); _log.warn("Took " + elapsed + "ms to pump through " + sched + " on " + toString());
} }
void resetReceived() { /** notify that a reset was received */
public void resetReceived() {
if (!_resetReceived.compareAndSet(false, true)) if (!_resetReceived.compareAndSet(false, true))
return; return;
scheduleDisconnectEvent(); scheduleDisconnectEvent();
@@ -815,6 +832,7 @@ class Connection {
/** @return 0 if not sent */ /** @return 0 if not sent */
public long getCloseSentOn() { return _closeSentOn.get(); } public long getCloseSentOn() { return _closeSentOn.get(); }
/** notify that a close was sent */
public void setCloseSentOn(long when) { public void setCloseSentOn(long when) {
if (_closeSentOn.compareAndSet(0, when)) if (_closeSentOn.compareAndSet(0, when))
scheduleDisconnectEvent(); scheduleDisconnectEvent();
@@ -823,13 +841,6 @@ class Connection {
/** @return 0 if not received */ /** @return 0 if not received */
public long getCloseReceivedOn() { return _closeReceivedOn.get(); } public long getCloseReceivedOn() { return _closeReceivedOn.get(); }
/**
* @return true if the first close received, false otherwise
*/
public boolean setCloseReceivedOn(long when) {
return _closeReceivedOn.compareAndSet(0, when);
}
public void updateShareOpts() { public void updateShareOpts() {
if (_closeSentOn.get() > 0 && !_updatedShareOpts) { if (_closeSentOn.get() > 0 && !_updatedShareOpts) {
_connectionManager.updateShareOpts(this); _connectionManager.updateShareOpts(this);

View File

@@ -54,7 +54,7 @@ class ConnectionPacketHandler {
if (con.getHardDisconnected()) { if (con.getHardDisconnected()) {
if ( (packet.getSequenceNum() > 0) || (packet.getPayloadSize() > 0) || if ( (packet.getSequenceNum() > 0) || (packet.getPayloadSize() > 0) ||
(packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) || (packet.isFlagSet(Packet.FLAG_CLOSE)) ) { (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE | Packet.FLAG_CLOSE)) ) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Received a data packet after hard disconnect: " + packet + " on " + con); _log.warn("Received a data packet after hard disconnect: " + packet + " on " + con);
con.sendReset(); con.sendReset();
@@ -305,6 +305,8 @@ class ConnectionPacketHandler {
_context.statManager().addRateData("stream.sendsBeforeAck", numSends, ackTime); _context.statManager().addRateData("stream.sendsBeforeAck", numSends, ackTime);
if (p.isFlagSet(Packet.FLAG_CLOSE))
con.ourCloseAcked();
// ACK the tags we delivered so we can use them // ACK the tags we delivered so we can use them
//if ( (p.getKeyUsed() != null) && (p.getTagsSent() != null) //if ( (p.getKeyUsed() != null) && (p.getTagsSent() != null)
@@ -317,15 +319,18 @@ class ConnectionPacketHandler {
_log.debug("Packet acked after " + ackTime + "ms: " + p); _log.debug("Packet acked after " + ackTime + "ms: " + p);
} }
if (highestRTT > 0) { if (highestRTT > 0) {
if (_log.shouldLog(Log.INFO)) {
int oldrtt = con.getOptions().getRTT(); int oldrtt = con.getOptions().getRTT();
int oldrto = con.getOptions().getRTO(); int oldrto = con.getOptions().getRTO();
int olddev = con.getOptions().getRTTDev(); int olddev = con.getOptions().getRTTDev();
con.getOptions().updateRTT(highestRTT); con.getOptions().updateRTT(highestRTT);
if (_log.shouldLog(Log.INFO))
_log.info("acked: " + acked.size() + " highestRTT: " + highestRTT + _log.info("acked: " + acked.size() + " highestRTT: " + highestRTT +
" RTT: " + oldrtt + " -> " + con.getOptions().getRTT() + " RTT: " + oldrtt + " -> " + con.getOptions().getRTT() +
" RTO: " + oldrto + " -> " + con.getOptions().getRTO() + " RTO: " + oldrto + " -> " + con.getOptions().getRTO() +
" Dev: " + olddev + " -> " + con.getOptions().getRTTDev()); " Dev: " + olddev + " -> " + con.getOptions().getRTTDev());
} else {
con.getOptions().updateRTT(highestRTT);
}
if (firstAck) { if (firstAck) {
if (con.isInbound()) if (con.isInbound())
_context.statManager().addRateData("stream.con.initialRTT.in", highestRTT, 0); _context.statManager().addRateData("stream.con.initialRTT.in", highestRTT, 0);
@@ -542,8 +547,7 @@ class ConnectionPacketHandler {
private void verifySignature(Packet packet, Connection con) throws I2PException { private void verifySignature(Packet packet, Connection con) throws I2PException {
// verify the signature if necessary // verify the signature if necessary
if (con.getOptions().getRequireFullySigned() || if (con.getOptions().getRequireFullySigned() ||
packet.isFlagSet(Packet.FLAG_SYNCHRONIZE) || packet.isFlagSet(Packet.FLAG_SYNCHRONIZE | Packet.FLAG_CLOSE)) {
packet.isFlagSet(Packet.FLAG_CLOSE) ) {
// we need a valid signature // we need a valid signature
Destination from = con.getRemotePeer(); Destination from = con.getRemotePeer();
if (from == null) if (from == null)

View File

@@ -89,10 +89,10 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
} }
public boolean shouldSign() { public boolean shouldSign() {
return isFlagSet(FLAG_SIGNATURE_INCLUDED) || return isFlagSet(FLAG_SIGNATURE_INCLUDED |
isFlagSet(FLAG_SYNCHRONIZE) || FLAG_SYNCHRONIZE |
isFlagSet(FLAG_CLOSE) || FLAG_CLOSE |
isFlagSet(FLAG_ECHO); FLAG_ECHO);
} }
/** last minute update of ack fields, just before write/sign */ /** last minute update of ack fields, just before write/sign */
@@ -209,9 +209,9 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
if (_numSends > 1) if (_numSends > 1)
buf.append(" sent ").append(_numSends).append(" times"); buf.append(" sent ").append(_numSends).append(" times");
if (isFlagSet(Packet.FLAG_SYNCHRONIZE) || if (isFlagSet(FLAG_SYNCHRONIZE |
isFlagSet(Packet.FLAG_CLOSE) || FLAG_CLOSE |
isFlagSet(Packet.FLAG_RESET)) { FLAG_RESET)) {
if (con != null) { if (con != null) {
buf.append(" from "); buf.append(" from ");