diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index 589fe6625..fcd388369 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -336,14 +336,19 @@ public class Connection { _inputStream.close(); } else { doClose(); + boolean tagsCancelled = false; synchronized (_outboundPackets) { for (Iterator iter = _outboundPackets.values().iterator(); iter.hasNext(); ) { PacketLocal pl = (PacketLocal)iter.next(); + if ( (pl.getTagsSent() != null) && (pl.getTagsSent().size() > 0) ) + tagsCancelled = true; pl.cancelled(); } _outboundPackets.clear(); _outboundPackets.notifyAll(); } + if (tagsCancelled) + _context.sessionKeyManager().failTags(_remotePeer.getPublicKey()); } if (removeFromConMgr) { if (!_disconnectScheduled) { @@ -379,15 +384,21 @@ public class Connection { + toString()); _connectionManager.removeConnection(this); } - + + boolean tagsCancelled = false; synchronized (_outboundPackets) { for (Iterator iter = _outboundPackets.values().iterator(); iter.hasNext(); ) { PacketLocal pl = (PacketLocal)iter.next(); + if ( (pl.getTagsSent() != null) && (pl.getTagsSent().size() > 0) ) + tagsCancelled = true; pl.cancelled(); } _outboundPackets.clear(); _outboundPackets.notifyAll(); - } + } + if (tagsCancelled) + _context.sessionKeyManager().failTags(_remotePeer.getPublicKey()); + } private class DisconnectEvent implements SimpleTimer.TimedEvent { @@ -672,7 +683,13 @@ public class Connection { } public void timeReached() { - if (!_connected) return; + if (_packet.getAckTime() > 0) + return; + + if (!_connected) { + _packet.cancelled(); + return; + } //if (_log.shouldLog(Log.DEBUG)) // _log.debug("Resend period reached for " + _packet); @@ -732,12 +749,14 @@ public class Connection { if (numSends > _options.getMaxResends()) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Too many resends"); + _packet.cancelled(); disconnect(false); } else { //long timeout = _options.getResendDelay() << numSends; - long timeout = _options.getRTT() << (numSends-1); - if (timeout < MIN_RESEND_DELAY) - timeout = MIN_RESEND_DELAY; + long rtt = _options.getRTT(); + if (rtt < MIN_RESEND_DELAY) + rtt = MIN_RESEND_DELAY; + long timeout = rtt << (numSends-1); if ( (timeout > MAX_RESEND_DELAY) || (timeout <= 0) ) timeout = MAX_RESEND_DELAY; if (_log.shouldLog(Log.DEBUG)) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java index 55b7684ca..2afa3be50 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java @@ -197,13 +197,14 @@ public class MessageOutputStream extends OutputStream { void flushAvailable(DataReceiver target, boolean blocking) throws IOException { WriteStatus ws = null; synchronized (_dataLock) { - if (_buf == null) throw new IOException("closed (buffer went away)"); + // _buf may be null, but the data receiver can handle that just fine, + // deciding whether or not to send a packet ws = target.writeData(_buf, 0, _valid); _written += _valid; _valid = 0; _dataLock.notifyAll(); } - if (blocking) { + if (blocking && ws != null) { ws.waitForAccept(_writeTimeout); if (ws.writeFailed()) throw new IOException("Flush available failed");