diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index 6dcfba412..d1dd9692f 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -35,7 +35,8 @@ class Connection { private final AtomicLong _lastSendId; private final AtomicBoolean _resetReceived = new AtomicBoolean(); private final AtomicLong _resetSentOn = new AtomicLong(); - private volatile boolean _connected; + private final AtomicBoolean _connected = new AtomicBoolean(true); + private final AtomicBoolean _finalDisconnect = new AtomicBoolean(); private boolean _hardDisconnected; private final MessageInputStream _inputStream; private final MessageOutputStream _outputStream; @@ -48,7 +49,7 @@ class Connection { private int _unackedPacketsReceived; private long _congestionWindowEnd; private volatile long _highestAckedThrough; - private boolean _isInbound; + private final boolean _isInbound; private boolean _updatedShareOpts; /** Packet ID (Long) to PacketLocal for sent but unacked packets */ private final Map _outboundPackets; @@ -87,7 +88,10 @@ class Connection { public static final long MAX_RESEND_DELAY = 45*1000; public static final long MIN_RESEND_DELAY = 2*1000; - /** wait up to 5 minutes after disconnection so we can ack/close packets */ + /** + * Wait up to 5 minutes after disconnection so we can ack/close packets. + * Roughly equal to the TIME-WAIT time in RFC 793, where the recommendation is 4 minutes (2 * MSL) + */ public static final int DISCONNECT_TIMEOUT = 5*60*1000; public static final int DEFAULT_CONNECT_TIMEOUT = 60*1000; @@ -107,12 +111,14 @@ class Connection { */ public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, SimpleTimer2 timer, - PacketQueue queue, ConnectionPacketHandler handler, ConnectionOptions opts) { + PacketQueue queue, ConnectionPacketHandler handler, ConnectionOptions opts, + boolean isInbound) { _context = ctx; _connectionManager = manager; _chooser = chooser; _outboundQueue = queue; _handler = handler; + _isInbound = isInbound; _log = _context.logManager().getLog(Connection.class); _receiver = new ConnectionDataReceiver(_context, this); _inputStream = new MessageInputStream(_context); @@ -135,7 +141,6 @@ class Connection { _lastCongestionSeenAt = MAX_WINDOW_SIZE*2; // lets allow it to grow _lastCongestionTime = -1; _lastCongestionHighestUnacked = -1; - _connected = true; _lastReceivedOn = -1; _activityTimer = new ActivityTimer(); _ackSinceCongestion = true; @@ -151,24 +156,6 @@ class Connection { return _lastSendId.incrementAndGet(); } - /** - * Notify that a close was received - */ - public void closeReceived() { - if (_closeReceivedOn.compareAndSet(0, _context.clock().now())) { - _inputStream.closeReceived(); - synchronized (_connectLock) { _connectLock.notifyAll(); } - } - } - - /** - * Notify that a close that we sent was acked - * @since 0.9.9 - */ - public void ourCloseAcked() { - // todo - } - /** * This doesn't "send a choke". Rather, it blocks if the outbound window is full, * thus choking the sender that calls this. @@ -196,7 +183,7 @@ class Connection { // no need to wait until the other side has ACKed us before sending the first few wsize // packets through // Incorrect assumption, the constructor defaults _connected to true --Sponge - if (!_connected) + if (!_connected.get()) return false; started = true; // Try to keep things moving even during NACKs and retransmissions... @@ -234,6 +221,7 @@ class Connection { } } } + void windowAdjusted() { synchronized (_outboundPackets) { _outboundPackets.notifyAll(); @@ -290,8 +278,7 @@ class Connection { * Got a packet we shouldn't have, send 'em a reset. * More than one reset may be sent. */ - public void sendReset() { - scheduleDisconnectEvent(); + private void sendReset() { long now = _context.clock().now(); if (_resetSentOn.get() + 10*1000 > now) return; // don't send resets too fast if (_resetReceived.get()) return; @@ -544,22 +531,74 @@ class Connection { if ( (elapsed > 250) && (_log.shouldLog(Log.WARN)) ) _log.warn("Took " + elapsed + "ms to pump through " + sched + " on " + toString()); } + + /** + * Notify that a close was sent. + * Called by CPH. + * May be called multiple times... but shouldn't be. + */ + public void notifyCloseSent() { + if (!_closeSentOn.compareAndSet(0, _context.clock().now())) { + // TODO ackImmediately() after sending CLOSE causes this. Bad? + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Sent more than one CLOSE: " + toString()); + } + // that's it, wait for notifyLastPacketAcked() or closeReceived() + } - /** notify that a reset was received */ + /** + * Notify that a close was received. + * Called by CPH. + * May be called multiple times. + */ + public void closeReceived() { + if (_closeReceivedOn.compareAndSet(0, _context.clock().now())) { + _inputStream.closeReceived(); + // TODO if outbound && no SYN received, treat like a reset? Could this happen? + if (_closeSentOn.get() > 0) { + // received after sent + disconnect(true); + } else { + synchronized (_connectLock) { _connectLock.notifyAll(); } + } + } + } + + /** + * Notify that a close that we sent, and all previous packets, were acked. + * Called by CPH. Only call this once. + * @since 0.9.9 + */ + public void notifyLastPacketAcked() { + long cso = _closeSentOn.get(); + if (cso <= 0) + throw new IllegalStateException(); + // we only create one CLOSE packet so we will only get called once, + // no need to check + long cro = _closeReceivedOn.get(); + if (cro > 0 && cro < cso) + // received before sent + disconnect(true); + } + + /** + * Notify that a reset was received. + * May be called multiple times. + */ public void resetReceived() { if (!_resetReceived.compareAndSet(false, true)) return; - scheduleDisconnectEvent(); IOException ioe = new IOException("Reset received"); _outputStream.streamErrorOccurred(ioe); _inputStream.streamErrorOccurred(ioe); _connectionError = "Connection reset"; synchronized (_connectLock) { _connectLock.notifyAll(); } + // RFC 793 end of setion 3.4: We are completely done. + disconnectComplete(); } public boolean getResetReceived() { return _resetReceived.get(); } - public void setInbound() { _isInbound = true; } public boolean isInbound() { return _isInbound; } /** @@ -567,10 +606,16 @@ class Connection { * outbound connection. Only set to false on disconnect. * For outbound, use getHighestAckedThrough() >= 0 also, * to determine if the connection is up. + * + * In general, this is true until either: + * - CLOSE received and CLOSE sent and our CLOSE is acked + * - RESET received or sent + * - closed on the socket side */ - public boolean getIsConnected() { return _connected; } + public boolean getIsConnected() { return _connected.get(); } public boolean getHardDisconnected() { return _hardDisconnected; } + public boolean getResetSent() { return _resetSentOn.get() > 0; } /** @return 0 if not sent */ @@ -579,38 +624,103 @@ class Connection { /** @return 0 if not scheduled */ public long getDisconnectScheduledOn() { return _disconnectScheduledOn.get(); } - void disconnect(boolean cleanDisconnect) { + /** + * Must be called when we are done with this connection. + * Enters TIME-WAIT if necessary, and removes from connection manager. + * May be called multiple times. + * This closes the socket side. + * In normal operation, this is called when a CLOSE has been received, + * AND a CLOSE has been sent, AND EITHER: + * received close before sent close AND our CLOSE has been acked + * OR + * received close after sent close. + * + * @param cleanDisconnect if true, normal close; if false, send a RESET + */ + public void disconnect(boolean cleanDisconnect) { disconnect(cleanDisconnect, true); } - void disconnect(boolean cleanDisconnect, boolean removeFromConMgr) { + /** + * Must be called when we are done with this connection. + * May be called multiple times. + * This closes the socket side. + * In normal operation, this is called when a CLOSE has been received, + * AND a CLOSE has been sent, AND EITHER: + * received close before sent close AND our CLOSE has been acked + * OR + * received close after sent close. + * + * @param cleanDisconnect if true, normal close; if false, send a RESET + * @param removeFromConMgr if true, enters TIME-WAIT if necessary. + * if false, MUST call disconnectComplete() later. + * Should always be true unless called from ConnectionManager. + */ + public void disconnect(boolean cleanDisconnect, boolean removeFromConMgr) { + if (!_connected.compareAndSet(true, false)) { + return; + } synchronized (_connectLock) { _connectLock.notifyAll(); } - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Disconnecting " + toString(), new Exception("discon")); - if (!cleanDisconnect) { - _hardDisconnected = true; - if (_log.shouldLog(Log.WARN)) - _log.warn("Hard disconnecting and sending a reset on " + toString(), new Exception("cause")); - sendReset(); + + if (_closeReceivedOn.get() <= 0) { + // should have already been called from closeReceived() above + _inputStream.closeReceived(); } - - if (cleanDisconnect && _connected) { - // send close packets and schedule stuff... + + if (cleanDisconnect) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Clean disconnecting, remove? " + removeFromConMgr + + ": " + toString(), new Exception("discon")); _outputStream.closeInternal(); - _inputStream.close(); } else { - if (_connected) - doClose(); - killOutstandingPackets(); + _hardDisconnected = true; + if (_inputStream.getHighestBlockId() >= 0 && !getResetReceived()) { + // only send a RESET if we ever got something (and he didn't RESET us), + // otherwise don't waste the crypto and tags + if (_log.shouldLog(Log.WARN)) + _log.warn("Hard disconnecting and sending reset, remove? " + removeFromConMgr + + " on " + toString(), new Exception("cause")); + sendReset(); + } else { + if (_log.shouldLog(Log.WARN)) + _log.warn("Hard disconnecting, remove? " + removeFromConMgr + + " on " + toString(), new Exception("cause")); + } + _outputStream.streamErrorOccurred(new IOException("Hard disconnect")); } + if (removeFromConMgr) { - scheduleDisconnectEvent(); + if (!cleanDisconnect) { + disconnectComplete(); + } else { + long cro = _closeReceivedOn.get(); + long cso = _closeSentOn.get(); + if (cro > 0 && cro < cso && getUnackedPacketsSent() <= 0) { + if (_log.shouldLog(Log.INFO)) + _log.info("Rcv close -> send close -> last acked, skip TIME-WAIT for " + toString()); + // They sent the first CLOSE. + // We do not need to enter TIME-WAIT, we are done. + // clean disconnect, don't schedule TIME-WAIT + // remove conn + disconnectComplete(); + } else { + scheduleDisconnectEvent(); + } + } } - _connected = false; } - void disconnectComplete() { - _connected = false; + private static final IOException DISCON_IOE = new IOException("disconnected!"); + + /** + * Must be called when we are done with this connection. + * Final disconnect. Remove from conn manager. + * May be called multiple times. + */ + public void disconnectComplete() { + if (!_finalDisconnect.compareAndSet(false, true)) + return; + _connected.set(false); I2PSocketFull s = _socket; if (s != null) { s.destroy2(); @@ -619,38 +729,35 @@ class Connection { _outputStream.destroy(); _receiver.destroy(); _activityTimer.cancel(); - _inputStream.streamErrorOccurred(new IOException("disconnected!")); + _inputStream.streamErrorOccurred(DISCON_IOE); - if (_disconnectScheduledOn.compareAndSet(0, _context.clock().now())) { - if (_log.shouldLog(Log.INFO)) - _log.info("Connection disconnect complete from dead, drop the con " + if (_log.shouldLog(Log.INFO)) + _log.info("Connection disconnect complete: " + toString()); - _connectionManager.removeConnection(this); - } - + _connectionManager.removeConnection(this); killOutstandingPackets(); } - /** ignore tag issues */ + /** + * Cancel and remove all packets awaiting ack + */ private void killOutstandingPackets() { - //boolean tagsCancelled = false; synchronized (_outboundPackets) { - for (Iterator iter = _outboundPackets.values().iterator(); iter.hasNext(); ) { - PacketLocal pl = iter.next(); - //if ( (pl.getTagsSent() != null) && (pl.getTagsSent().size() > 0) ) - // tagsCancelled = true; + if (_outboundPackets.isEmpty()) + return; // short circuit iterator + for (PacketLocal pl : _outboundPackets.values()) { pl.cancelled(); } _outboundPackets.clear(); _outboundPackets.notifyAll(); } - //if (tagsCancelled) - // _context.sessionKeyManager().failTags(_remotePeer.getPublicKey()); } /** * Schedule the end of the TIME-WAIT state, * but only if not previously scheduled. + * Must call either this or disconnectComplete() + * * @return true if a new event was scheduled; false if already scheduled * @since 0.9.9 */ @@ -665,23 +772,13 @@ class Connection { public DisconnectEvent() { if (_log.shouldLog(Log.INFO)) _log.info("Connection disconnect timer initiated: 5 minutes to drop " - + Connection.this.toString()); + + Connection.this.toString(), new Exception()); } public void timeReached() { - killOutstandingPackets(); - if (_log.shouldLog(Log.INFO)) - _log.info("Connection disconnect timer complete, drop the con " - + Connection.this.toString()); - _connectionManager.removeConnection(Connection.this); + disconnectComplete(); } } - private void doClose() { - _outputStream.streamErrorOccurred(new IOException("Hard disconnect")); - _inputStream.closeReceived(); - synchronized (_connectLock) { _connectLock.notifyAll(); } - } - private boolean _remotePeerSet = false; /** who are we talking with * @return peer Destination @@ -832,12 +929,6 @@ class Connection { /** @return 0 if not sent */ public long getCloseSentOn() { return _closeSentOn.get(); } - /** notify that a close was sent */ - public void setCloseSentOn(long when) { - if (_closeSentOn.compareAndSet(0, when)) - scheduleDisconnectEvent(); - } - /** @return 0 if not received */ public long getCloseReceivedOn() { return _closeReceivedOn.get(); } @@ -849,6 +940,7 @@ class Connection { } public void incrementUnackedPacketsReceived() { _unackedPacketsReceived++; } public int getUnackedPacketsReceived() { return _unackedPacketsReceived; } + /** how many packets have we sent but not yet received an ACK for? * @return Count of packets in-flight. */ @@ -894,7 +986,7 @@ class Connection { void waitForConnect() { long expiration = _context.clock().now() + _options.getConnectTimeout(); while (true) { - if (_connected && (_receiveStreamId > 0) && (_sendStreamId > 0) ) { + if (_connected.get() && (_receiveStreamId > 0) && (_sendStreamId > 0) ) { // w00t if (_log.shouldLog(Log.DEBUG)) _log.debug("waitForConnect(): Connected and we have stream IDs"); @@ -905,7 +997,7 @@ class Connection { _log.debug("waitForConnect(): connection error found: " + _connectionError); return; } - if (!_connected) { + if (!_connected.get()) { _connectionError = "Connection failed"; if (_log.shouldLog(Log.DEBUG)) _log.debug("waitForConnect(): not connected"); @@ -964,7 +1056,7 @@ class Connection { if (_log.shouldLog(Log.DEBUG)) _log.debug("Fire inactivity timer on " + Connection.this.toString()); // uh, nothing more to do... - if (!_connected) { + if (!_connected.get()) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Inactivity timeout reached, but we are already closed"); return; } @@ -1101,7 +1193,7 @@ class Connection { if (getResetSent()) buf.append(" reset sent ").append(DataHelper.formatDuration(_context.clock().now() - getResetSentOn())).append(" ago"); if (getResetReceived()) - buf.append(" reset received ").append(DataHelper.formatDuration(_context.clock().now() - getDisconnectScheduledOn())).append(" ago"); + buf.append(" reset rcvd ").append(DataHelper.formatDuration(_context.clock().now() - getDisconnectScheduledOn())).append(" ago"); if (getCloseSentOn() > 0) { buf.append(" close sent "); long timeSinceClose = _context.clock().now() - getCloseSentOn(); @@ -1109,7 +1201,7 @@ class Connection { buf.append(" ago"); } if (getCloseReceivedOn() > 0) - buf.append(" close received ").append(DataHelper.formatDuration(_context.clock().now() - getCloseReceivedOn())).append(" ago"); + buf.append(" close rcvd ").append(DataHelper.formatDuration(_context.clock().now() - getCloseReceivedOn())).append(" ago"); buf.append(" sent: ").append(1 + _lastSendId.get()); buf.append(" rcvd: ").append(1 + _inputStream.getHighestBlockId() - missing); buf.append(" ackThru ").append(_highestAckedThrough); @@ -1180,9 +1272,7 @@ class Connection { if (_packet.getAckTime() > 0) return false; - if (_resetSentOn.get() > 0 || _resetReceived.get() || !_connected) { - if(_log.shouldLog(Log.WARN) && (_resetSentOn.get() <= 0) && (!_resetReceived.get())) - _log.warn("??? no resets but not connected: " + _packet); // don't think this is possible + if (_resetSentOn.get() > 0 || _resetReceived.get() || _finalDisconnect.get()) { _packet.cancelled(); return false; } @@ -1277,9 +1367,22 @@ class Connection { if (numSends - 1 > _options.getMaxResends()) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Too many resends"); + _log.debug("Disconnecting, too many resends of " + _packet); _packet.cancelled(); disconnect(false); + } else if (numSends >= 3 && + _packet.isFlagSet(Packet.FLAG_CLOSE) && + _packet.getPayloadSize() <= 0 && + _outboundPackets.size() <= 1 && + getCloseReceivedOn() > 0) { + // Bug workaround to prevent 5 minutes of retransmission + // Routers before 0.9.9 have bugs, they won't ack anything after + // they sent a close. Only send 3 CLOSE packets total, then + // shut down normally. + if (_log.shouldLog(Log.INFO)) + _log.info("Too many CLOSE resends, disconnecting: " + Connection.this.toString()); + _packet.cancelled(); + disconnect(true); } else { //long timeout = _options.getResendDelay() << numSends; long rto = _options.getRTO(); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java index ca2f65ff7..71ccbfe93 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java @@ -57,6 +57,8 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { * may generate a packet with a plain ACK/NACK or CLOSE, or nothing whatsoever * if there's nothing new to send. * + * This is called from MessageOutputStream, i.e. data from the client. + * * @param buf data to be sent - may be null * @param off offset into the buffer to start writing from * @param size how many bytes of the buffer to write (may be 0) @@ -108,6 +110,8 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { * Send some data through the connection, attaching any appropriate flags * onto the packet. * + * Called externally from Connection with args (null, 0, 0) to send an ack + * * @param buf data to be sent - may be null * @param off offset into the buffer to start writing from * @param size how many bytes of the buffer to write (may be 0) @@ -118,6 +122,8 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { } /** + * Called externally from Connection with args (null, 0, 0, true) to send an empty data packet + * * @param buf data to be sent - may be null * @param off offset into the buffer to start writing from * @param size how many bytes of the buffer to write (may be 0) @@ -210,13 +216,17 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { // packets sent, otherwise the other side could receive the CLOSE prematurely, // since this ACK could arrive before the unacked payload message. // TODO if the only unacked packet is the CLOSE packet and it didn't have any data... + // + // FIXME Implement better half-close by sending CLOSE whenever. Needs 0.9.9 bug fixes + // throughout network? + // if (con.getOutputStream().getClosed() && ( (size > 0) || (con.getUnackedPacketsSent() <= 0) || (packet.getSequenceNum() > 0) ) ) { packet.setFlag(Packet.FLAG_CLOSE); - con.setCloseSentOn(_context.clock().now()); + con.notifyCloseSent(); } if (_log.shouldLog(Log.DEBUG)) - _log.debug("New outbound packet on " + _connection + ": " + packet); + _log.debug("New OB pkt (acks not yet filled in): " + packet + " on " + _connection); return packet; } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java index 6adca023e..61be04bca 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java @@ -192,9 +192,8 @@ class ConnectionManager { ConnectionOptions opts = new ConnectionOptions(_defaultOptions); opts.setPort(synPacket.getRemotePort()); opts.setLocalPort(synPacket.getLocalPort()); - Connection con = new Connection(_context, this, _schedulerChooser, _timer, _outboundQueue, _conPacketHandler, opts); + Connection con = new Connection(_context, this, _schedulerChooser, _timer, _outboundQueue, _conPacketHandler, opts, true); _tcbShare.updateOptsFromShare(con); - con.setInbound(); long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1; boolean reject = false; int active = 0; @@ -326,7 +325,7 @@ class ConnectionManager { // try { _connectionLock.wait(remaining); } catch (InterruptedException ie) {} try { Thread.sleep(remaining/4); } catch (InterruptedException ie) {} } else { - con = new Connection(_context, this, _schedulerChooser, _timer, _outboundQueue, _conPacketHandler, opts); + con = new Connection(_context, this, _schedulerChooser, _timer, _outboundQueue, _conPacketHandler, opts, false); con.setRemotePeer(peer); while (_connectionByInboundId.containsKey(Long.valueOf(receiveId))) { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java index b49ff104b..4427874b6 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -52,12 +52,13 @@ class ConnectionPacketHandler { return; } + final long seqNum = packet.getSequenceNum(); if (con.getHardDisconnected()) { - if ( (packet.getSequenceNum() > 0) || (packet.getPayloadSize() > 0) || + if ( (seqNum > 0) || (packet.getPayloadSize() > 0) || (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE | Packet.FLAG_CLOSE)) ) { if (_log.shouldLog(Log.WARN)) _log.warn("Received a data packet after hard disconnect: " + packet + " on " + con); - con.sendReset(); + // the following will send a RESET con.disconnect(false); } else { if (_log.shouldLog(Log.WARN)) @@ -68,14 +69,12 @@ class ConnectionPacketHandler { } if ( (con.getCloseSentOn() > 0) && (con.getUnackedPacketsSent() <= 0) && - (packet.getSequenceNum() > 0) && (packet.getPayloadSize() > 0)) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Received new data when we've sent them data and all of our data is acked: " + (seqNum > 0) && (packet.getPayloadSize() > 0)) { + if (_log.shouldLog(Log.INFO)) + _log.info("Received new data when we've sent them data and all of our data is acked: " + packet + " on " + con + ""); - con.sendReset(); - con.disconnect(false); - packet.releasePayload(); - return; + // this is fine, half-close + // Major bug before 0.9.9, packets were dropped here and a reset sent } if (packet.isFlagSet(Packet.FLAG_MAX_PACKET_SIZE_INCLUDED)) { @@ -111,7 +110,7 @@ class ConnectionPacketHandler { long ready = con.getInputStream().getHighestReadyBockId(); int available = con.getOptions().getInboundBufferSize() - con.getInputStream().getTotalReadySize(); int allowedBlocks = available/con.getOptions().getMaxMessageSize(); - if (packet.getSequenceNum() > ready + allowedBlocks) { + if (seqNum > ready + allowedBlocks) { if (_log.shouldLog(Log.WARN)) _log.warn("Inbound buffer exceeded on connection " + con + " (" + ready + "/"+ (ready+allowedBlocks) + "/" + available @@ -127,23 +126,28 @@ class ConnectionPacketHandler { _context.statManager().addRateData("stream.con.receiveMessageSize", packet.getPayloadSize(), 0); - boolean isNew = false; boolean allowAck = true; + final boolean isSYN = packet.isFlagSet(Packet.FLAG_SYNCHRONIZE); // We allow the SendStreamID to be 0 so that the originator can send // multiple packets before he gets the first ACK back. // If we want to limit the number of packets we receive without a // SendStreamID, do it in PacketHandler.receiveUnknownCon(). - if ( (!packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) && + if ( (!isSYN) && (packet.getReceiveStreamId() <= 0) ) allowAck = false; - if (allowAck) { - isNew = con.getInputStream().messageReceived(packet.getSequenceNum(), packet.getPayload()); - } else { - con.getInputStream().notifyActivity(); + // Receive the message. + // Note that this is called even for empty packets, including CLOSE packets, so the + // MessageInputStream will know the last sequence number. + // But not ack-only packets! + boolean isNew; + if (seqNum > 0 || isSYN) + isNew = con.getInputStream().messageReceived(seqNum, packet.getPayload()); + else + isNew = false; + if (!allowAck) isNew = false; - } //if ( (packet.getSequenceNum() == 0) && (packet.getPayloadSize() > 0) ) { // if (_log.shouldLog(Log.DEBUG)) @@ -151,13 +155,19 @@ class ConnectionPacketHandler { // + " packet: " + packet + " con: " + con); //} - if (_log.shouldLog(Log.DEBUG)) - _log.debug((isNew ? "New" : "Dup or ack-only") + " inbound packet on " + con + ": " + packet); + if (_log.shouldLog(Log.DEBUG)) { + String type; + if (!allowAck) + type = "Non-SYN before SYN"; + else if (isNew) + type = "New"; + else if (packet.getPayloadSize() <= 0) + type = "Ack-only"; + else + type = "Dup"; + _log.debug(type + " IB pkt: " + packet + " on " + con); + } - // close *after* receiving the data, as well as after verifying the signatures / etc - if (packet.isFlagSet(Packet.FLAG_CLOSE) && packet.isFlagSet(Packet.FLAG_SIGNATURE_INCLUDED)) - con.closeReceived(); - boolean fastAck = false; boolean ackOnly = false; @@ -180,8 +190,7 @@ class ConnectionPacketHandler { _log.debug("Scheduling ack in " + delay + "ms for received packet " + packet); } } else { - if ( (packet.getSequenceNum() > 0) || (packet.getPayloadSize() > 0) || - (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) ) { + if ( (seqNum > 0) || (packet.getPayloadSize() > 0) || isSYN) { _context.statManager().addRateData("stream.con.receiveDuplicateSize", packet.getPayloadSize(), 0); con.incrementDupMessagesReceived(1); @@ -209,7 +218,7 @@ class ConnectionPacketHandler { } } else { - if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) { + if (isSYN) { //con.incrementUnackedPacketsReceived(); con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay()); } else { @@ -220,7 +229,7 @@ class ConnectionPacketHandler { } } - if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE) && (packet.getSendStreamId() <= 0) ) { + if (isSYN && (packet.getSendStreamId() <= 0) ) { // don't honor the ACK 0 in SYN packets received when the other side // has obviously not seen our messages } else { @@ -249,10 +258,14 @@ class ConnectionPacketHandler { // non-ack message payloads are queued in the MessageInputStream packet.releasePayload(); } - + + // close *after* receiving the data, as well as after verifying the signatures / etc // update the TCB Cache now that we've processed the acks and updated our rtt etc. - if (isNew && packet.isFlagSet(Packet.FLAG_CLOSE) && packet.isFlagSet(Packet.FLAG_SIGNATURE_INCLUDED)) - con.updateShareOpts(); + if (packet.isFlagSet(Packet.FLAG_CLOSE) && packet.isFlagSet(Packet.FLAG_SIGNATURE_INCLUDED)) { + con.closeReceived(); + if (isNew) + con.updateShareOpts(); + } //if (choke) // con.fastRetransmit(); @@ -285,6 +298,7 @@ class ConnectionPacketHandler { else return false; + boolean lastPacketAcked = false; if ( (acked != null) && (!acked.isEmpty()) ) { if (_log.shouldLog(Log.DEBUG)) _log.debug(acked.size() + " of our packets acked with " + packet); @@ -305,9 +319,6 @@ class ConnectionPacketHandler { _context.statManager().addRateData("stream.sendsBeforeAck", numSends, ackTime); - if (p.isFlagSet(Packet.FLAG_CLOSE)) - con.ourCloseAcked(); - // ACK the tags we delivered so we can use them //if ( (p.getKeyUsed() != null) && (p.getTagsSent() != null) // && (p.getTagsSent().size() > 0) ) { @@ -339,9 +350,14 @@ class ConnectionPacketHandler { } } _context.statManager().addRateData("stream.con.packetsAckedPerMessageReceived", acked.size(), highestRTT); + if (con.getCloseSentOn() > 0 && con.getUnackedPacketsSent() <= 0) + lastPacketAcked = true; } - return adjustWindow(con, isNew, packet.getSequenceNum(), numResends, (acked != null ? acked.size() : 0), choke); + boolean rv = adjustWindow(con, isNew, packet.getSequenceNum(), numResends, (acked != null ? acked.size() : 0), choke); + if (lastPacketAcked) + con.notifyLastPacketAcked(); + return rv; } /** @return are we congested? */ diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java index 203182770..08c9aaf02 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java @@ -43,15 +43,14 @@ class I2PSocketFull implements I2PSocket { if (c == null) return; if (c.getIsConnected()) { OutputStream out = c.getOutputStream(); - if (out != null) { - try { - out.close(); - } catch (IOException ioe) { - // ignore any write error, as we want to keep on and kill the - // con (thanks Complication!) - } + try { + out.close(); + } catch (IOException ioe) { + // ignore any write error, as we want to keep on and kill the + // con (thanks Complication!) } - c.disconnect(true); + MessageInputStream in = c.getInputStream(); + in.close(); } else { //throw new IOException("Not connected"); } @@ -143,10 +142,7 @@ class I2PSocketFull implements I2PSocket { } void destroy() { - Connection c = _connection; destroy2(); - if (c != null) - c.disconnectComplete(); } /**