Streaming: Minor cleanups

- Remove unused stream.trend stat
- Optimization of getting first value from TreeMap
- Only notify reader of input stream when new data available
This commit is contained in:
zzz
2021-06-27 11:50:20 -04:00
parent b7322e1647
commit e88eed760d
4 changed files with 21 additions and 13 deletions

View File

@ -5,7 +5,6 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -59,7 +58,7 @@ class Connection {
private final boolean _isInbound;
private boolean _updatedShareOpts;
/** Packet ID (Long) to PacketLocal for sent but unacked packets */
private final SortedMap<Long, PacketLocal> _outboundPackets;
private final TreeMap<Long, PacketLocal> _outboundPackets;
private final PacketQueue _outboundQueue;
private final ConnectionPacketHandler _handler;
private ConnectionOptions _options;
@ -1481,19 +1480,19 @@ class Connection {
congestionOccurred();
// 1. Double RTO and backoff (RFC 6298 section 5.5 & 5.6)
final long now = _context.clock().now();
pushBackRTO(_options.doubleRTO());
// 2. cut ssthresh to bandwidth estimate, window to 1
List<PacketLocal> toResend = null;
synchronized(_outboundPackets) {
if (_outboundPackets.isEmpty()) {
Map.Entry<Long, PacketLocal> e = _outboundPackets.firstEntry();
if (e == null) {
if (_log.shouldLog(Log.WARN))
_log.warn(Connection.this + " Retransmission timer hit but nothing transmitted??");
return;
}
PacketLocal oldest = _outboundPackets.get(_outboundPackets.firstKey());
PacketLocal oldest = e.getValue();
if (oldest.getNumSends() == 1) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(Connection.this + " cutting ssthresh and window");
@ -1574,7 +1573,7 @@ class Connection {
}
if (sentAny) {
_lastSendTime = now;
_lastSendTime = _context.clock().now();
resetActivityTimer();
windowAdjusted();
}

View File

@ -135,7 +135,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
public static final String PROP_TAG_THRESHOLD = "crypto.lowTagThreshold";
private static final int TREND_COUNT = 3;
//private static final int TREND_COUNT = 3;
/** RFC 5681 sec. 3.1 */
static final int INITIAL_WINDOW_SIZE = 3;
static final int DEFAULT_MAX_SENDS = 8;
@ -166,7 +166,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
*/
private static final boolean DEFAULT_ENFORCE_PROTO = true;
private final int _trend[] = new int[TREND_COUNT];
//private final int _trend[] = new int[TREND_COUNT];
/**
* OK, here is the calculation on the message size to fit in a single
@ -632,6 +632,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
* not public, use updateRTT()
*/
private void setRTT(int ms) {
/*
synchronized (_trend) {
_trend[0] = _trend[1];
_trend[1] = _trend[2];
@ -642,6 +643,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
else
_trend[2] = 0;
}
*/
synchronized(this) {
_rtt = ms;
@ -711,9 +713,12 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
* If we have 3 consecutive rtt increases, we are trending upwards (1), or if we have
* 3 consecutive rtt decreases, we are trending downwards (-1), else we're stable.
*
* @deprecated unused as of 0.9.51
* @return positive/flat/negative trend in round trip time
*/
@Deprecated
public int getRTTTrend() {
/*
synchronized (_trend) {
for (int i = 0; i < TREND_COUNT - 1; i++) {
if (_trend[i] != _trend[i+1])
@ -721,6 +726,8 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
}
return _trend[0];
}
*/
return 0;
}
/**

View File

@ -41,7 +41,7 @@ class ConnectionPacketHandler {
_context.statManager().createRateStat("stream.con.packetsAckedPerMessageReceived", "Avg number of acks in a message", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.sendsBeforeAck", "How many times a message was sent before it was ACKed?", "Stream", new long[] { 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.resetReceived", "How many messages had we sent successfully before receiving a RESET?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("stream.trend", "What direction the RTT is trending in (with period = windowsize)", "Stream", new long[] { 60*1000, 60*60*1000 });
//_context.statManager().createRateStat("stream.trend", "What direction the RTT is trending in (with period = windowsize)", "Stream", new long[] { 60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.con.initialRTT.in", "What is the actual RTT for the first packet of an inbound conn?", "Stream", new long[] { 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.con.initialRTT.out", "What is the actual RTT for the first packet of an outbound conn?", "Stream", new long[] { 10*60*1000, 60*60*1000 });
_context.statManager().createFrequencyStat("stream.ack.dup.immediate","How often duplicate packets get acked immediately","Stream",new long[] { 10*60*1000, 60*60*1000 });
@ -453,9 +453,9 @@ class ConnectionPacketHandler {
int oldWindow = con.getOptions().getWindowSize();
int newWindowSize = oldWindow;
int trend = con.getOptions().getRTTTrend();
//int trend = con.getOptions().getRTTTrend();
_context.statManager().addRateData("stream.trend", trend, newWindowSize);
//_context.statManager().addRateData("stream.trend", trend, newWindowSize);
if ( (!congested) && (acked > 0) ) {
int ssthresh = con.getSSThresh();

View File

@ -153,6 +153,7 @@ class MessageInputStream extends InputStream {
if (_log.shouldWarn())
_log.warn("Dropping message " + messageId + ", inbound buffer exceeded: available = " +
available);
_dataLock.notifyAll();
return false;
}
// following code screws up if available < 0
@ -161,12 +162,14 @@ class MessageInputStream extends InputStream {
if (_log.shouldWarn())
_log.warn("Dropping message " + messageId + ", inbound buffer exceeded: " +
_highestReadyBlockId + '/' + (_highestReadyBlockId + allowedBlocks) + '/' + available);
_dataLock.notifyAll();
return false;
}
// This prevents us from getting DoSed by accepting unlimited in-order small messages
if (_readyDataBlocks.size() >= 4 * _maxWindowSize) {
if (_log.shouldWarn())
_log.warn("Dropping message " + messageId + ", too many ready blocks");
_dataLock.notifyAll();
return false;
}
}
@ -327,7 +330,6 @@ class MessageInputStream extends InputStream {
if (messageId <= _highestReadyBlockId) {
if (_log.shouldLog(Log.INFO))
_log.info("ignoring dup message " + messageId);
_dataLock.notifyAll();
return false; // already received
}
if (messageId > _highestBlockId)
@ -353,6 +355,7 @@ class MessageInputStream extends InputStream {
cur++;
_highestReadyBlockId++;
}
_dataLock.notifyAll();
} else {
// _notYetReadyBlocks size is limited in canAccept()
if (_locallyClosed) {
@ -366,7 +369,6 @@ class MessageInputStream extends InputStream {
_notYetReadyBlocks.put(Long.valueOf(messageId), payload);
}
}
_dataLock.notifyAll();
}
return true;
}