propagate from branch 'i2p.i2p' (head 66743cfb9b4e1c257e4f0a20a318ee7eb1fb607c)

to branch 'i2p.i2p.zzz.multisess' (head 4533ba250cb8e49044f5144b34014e9bc618cdc7)
This commit is contained in:
zzz
2015-04-18 14:08:22 +00:00
346 changed files with 37868 additions and 27424 deletions

View File

@@ -5,8 +5,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import net.i2p.data.Hash;
import net.i2p.util.ObjectCounter;
import net.i2p.util.RandomSource;
import net.i2p.util.SimpleScheduler;
import net.i2p.util.SimpleTimer;
import net.i2p.util.SimpleTimer2;
/**
* Count how often we have received an incoming connection
@@ -33,7 +33,7 @@ class ConnThrottler {
// shorten the initial period by a random amount
// to prevent correlation across destinations
// and identification of router startup time
SimpleScheduler.getInstance().addPeriodicEvent(new Cleaner(),
SimpleTimer2.getInstance().addPeriodicEvent(new Cleaner(),
(period / 2) + RandomSource.getInstance().nextLong(period / 2),
period);
}

View File

@@ -12,6 +12,7 @@ import java.util.concurrent.atomic.AtomicLong;
import net.i2p.I2PAppContext;
import net.i2p.client.I2PSession;
import net.i2p.client.streaming.I2PSocketException;
import net.i2p.data.DataHelper;
import net.i2p.data.Destination;
import net.i2p.util.Log;
@@ -28,8 +29,8 @@ class Connection {
private final Log _log;
private final ConnectionManager _connectionManager;
private Destination _remotePeer;
private long _sendStreamId;
private long _receiveStreamId;
private final AtomicLong _sendStreamId = new AtomicLong();
private final AtomicLong _receiveStreamId = new AtomicLong();
private volatile long _lastSendTime;
private final AtomicLong _lastSendId;
private final AtomicBoolean _resetReceived = new AtomicBoolean();
@@ -40,12 +41,13 @@ class Connection {
private final MessageInputStream _inputStream;
private final MessageOutputStream _outputStream;
private final SchedulerChooser _chooser;
private volatile long _nextSendTime;
private long _ackedPackets;
/** Locking: _nextSendLock */
private long _nextSendTime;
private final AtomicLong _ackedPackets = new AtomicLong();
private final long _createdOn;
private final AtomicLong _closeSentOn = new AtomicLong();
private final AtomicLong _closeReceivedOn = new AtomicLong();
private int _unackedPacketsReceived;
private final AtomicInteger _unackedPacketsReceived = new AtomicInteger();
private long _congestionWindowEnd;
private volatile long _highestAckedThrough;
private final boolean _isInbound;
@@ -69,23 +71,25 @@ class Connection {
private final AtomicBoolean _ackSinceCongestion;
/** Notify this on connection (or connection failure) */
private final Object _connectLock;
/** Locking for _nextSendTime */
private final Object _nextSendLock;
/** how many messages have been resent and not yet ACKed? */
private final AtomicInteger _activeResends = new AtomicInteger();
private final ConEvent _connectionEvent;
private final int _randomWait;
private int _localPort;
private int _remotePort;
private final int _localPort;
private final int _remotePort;
private final SimpleTimer2 _timer;
private long _lifetimeBytesSent;
private final AtomicLong _lifetimeBytesSent = new AtomicLong();
/** TBD for tcpdump-compatible ack output */
private long _lowestBytesAckedThrough;
private long _lifetimeBytesReceived;
private long _lifetimeDupMessageSent;
private long _lifetimeDupMessageReceived;
private final AtomicLong _lifetimeBytesReceived = new AtomicLong();
private final AtomicLong _lifetimeDupMessageSent = new AtomicLong();
private final AtomicLong _lifetimeDupMessageReceived = new AtomicLong();
public static final long MAX_RESEND_DELAY = 45*1000;
public static final long MIN_RESEND_DELAY = 1000;
public static final long MIN_RESEND_DELAY = 750;
/**
* Wait up to 5 minutes after disconnection so we can ack/close packets.
@@ -128,6 +132,9 @@ class Connection {
if (opts != null) {
_localPort = opts.getLocalPort();
_remotePort = opts.getPort();
} else {
_localPort = 0;
_remotePort = 0;
}
_options = (opts != null ? opts : new ConnectionOptions());
_outputStream.setWriteTimeout((int)_options.getWriteTimeout());
@@ -144,6 +151,7 @@ class Connection {
_activityTimer = new ActivityTimer();
_ackSinceCongestion = new AtomicBoolean(true);
_connectLock = new Object();
_nextSendLock = new Object();
_connectionEvent = new ConEvent();
_randomWait = _context.random().nextInt(10*1000); // just do this once to reduce usage
// all createRateStats in ConnectionManager
@@ -300,18 +308,19 @@ class Connection {
if (_resetReceived.get()) return;
// Unconditionally set
_resetSentOn.set(now);
if ( (_remotePeer == null) || (_sendStreamId <= 0) ) return;
if ( (_remotePeer == null) || (_sendStreamId.get() <= 0) ) return;
PacketLocal reply = new PacketLocal(_context, _remotePeer);
reply.setFlag(Packet.FLAG_RESET);
reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
reply.setSendStreamId(_sendStreamId);
reply.setReceiveStreamId(_receiveStreamId);
reply.setSendStreamId(_sendStreamId.get());
reply.setReceiveStreamId(_receiveStreamId.get());
// TODO remove this someday, as of 0.9.20 we do not require it
reply.setOptionalFrom(_connectionManager.getSession().getMyDestination());
reply.setLocalPort(_localPort);
reply.setRemotePort(_remotePort);
// this just sends the packet - no retries or whatnot
if (_outboundQueue.enqueue(reply)) {
_unackedPacketsReceived = 0;
_unackedPacketsReceived.set(0);
_lastSendTime = _context.clock().now();
resetActivityTimer();
}
@@ -391,7 +400,7 @@ class Connection {
//_context.statManager().getStatLog().addData(Packet.toId(_sendStreamId), "stream.rtt", _options.getRTT(), _options.getWindowSize());
if (_outboundQueue.enqueue(packet)) {
_unackedPacketsReceived = 0;
_unackedPacketsReceived.set(0);
_lastSendTime = _context.clock().now();
resetActivityTimer();
}
@@ -498,10 +507,10 @@ class Connection {
} // for
} // !isEmpty()
if (acked != null) {
_ackedPackets.addAndGet(acked.size());
for (int i = 0; i < acked.size(); i++) {
PacketLocal p = acked.get(i);
// removed from _outboundPackets above in iterator
_ackedPackets++;
if (p.getNumSends() > 1) {
_activeResends.decrementAndGet();
if (_log.shouldLog(Log.DEBUG))
@@ -606,7 +615,7 @@ class Connection {
public void resetReceived() {
if (!_resetReceived.compareAndSet(false, true))
return;
IOException ioe = new IOException("Reset received");
IOException ioe = new I2PSocketException(I2PSocketException.STATUS_CONNECTION_RESET);
_outputStream.streamErrorOccurred(ioe);
_inputStream.streamErrorOccurred(ioe);
_connectionError = "Connection reset";
@@ -780,7 +789,7 @@ class Connection {
private boolean scheduleDisconnectEvent() {
if (!_disconnectScheduledOn.compareAndSet(0, _context.clock().now()))
return false;
_context.simpleScheduler().addEvent(new DisconnectEvent(), DISCONNECT_TIMEOUT);
_context.simpleTimer2().addEvent(new DisconnectEvent(), DISCONNECT_TIMEOUT);
return true;
}
@@ -808,26 +817,34 @@ class Connection {
_connectionManager.updateOptsFromShare(this);
}
private boolean _sendStreamIdSet = false;
/** what stream do we send data to the peer on?
* @return non-global stream sending ID
/**
* What stream do we send data to the peer on?
* @return non-global stream sending ID, or 0 if unknown
*/
public long getSendStreamId() { return _sendStreamId.get(); }
/**
* @param id 0 to 0xffffffff
* @throws RuntimeException if already set to nonzero
*/
public long getSendStreamId() { return _sendStreamId; }
public void setSendStreamId(long id) {
if (_sendStreamIdSet) throw new RuntimeException("Send stream ID already set [" + _sendStreamId + ", " + id + "]");
_sendStreamIdSet = true;
_sendStreamId = id;
if (!_sendStreamId.compareAndSet(0, id))
throw new RuntimeException("Send stream ID already set [" + _sendStreamId + ", " + id + "]");
}
private boolean _receiveStreamIdSet = false;
/** The stream ID of a peer connection that sends data to us. (may be null)
* @return receive stream ID, or null if there isn't one
/**
* The stream ID of a peer connection that sends data to us, or zero if unknown.
* @return receive stream ID, or 0 if unknown
*/
public long getReceiveStreamId() { return _receiveStreamId.get(); }
/**
* @param id 0 to 0xffffffff
* @throws RuntimeException if already set to nonzero
*/
public long getReceiveStreamId() { return _receiveStreamId; }
public void setReceiveStreamId(long id) {
if (_receiveStreamIdSet) throw new RuntimeException("Receive stream ID already set [" + _receiveStreamId + ", " + id + "]");
_receiveStreamIdSet = true;
_receiveStreamId = id;
if (!_receiveStreamId.compareAndSet(0, id))
throw new RuntimeException("Receive stream ID already set [" + _receiveStreamId + ", " + id + "]");
synchronized (_connectLock) { _connectLock.notifyAll(); }
}
@@ -890,14 +907,14 @@ class Connection {
public ConnectionPacketHandler getPacketHandler() { return _handler; }
public long getLifetimeBytesSent() { return _lifetimeBytesSent; }
public long getLifetimeBytesReceived() { return _lifetimeBytesReceived; }
public long getLifetimeDupMessagesSent() { return _lifetimeDupMessageSent; }
public long getLifetimeDupMessagesReceived() { return _lifetimeDupMessageReceived; }
public void incrementBytesSent(int bytes) { _lifetimeBytesSent += bytes; }
public void incrementDupMessagesSent(int msgs) { _lifetimeDupMessageSent += msgs; }
public void incrementBytesReceived(int bytes) { _lifetimeBytesReceived += bytes; }
public void incrementDupMessagesReceived(int msgs) { _lifetimeDupMessageReceived += msgs; }
public long getLifetimeBytesSent() { return _lifetimeBytesSent.get(); }
public long getLifetimeBytesReceived() { return _lifetimeBytesReceived.get(); }
public long getLifetimeDupMessagesSent() { return _lifetimeDupMessageSent.get(); }
public long getLifetimeDupMessagesReceived() { return _lifetimeDupMessageReceived.get(); }
public void incrementBytesSent(int bytes) { _lifetimeBytesSent.addAndGet(bytes); }
public void incrementDupMessagesSent(int msgs) { _lifetimeDupMessageSent.addAndGet(msgs); }
public void incrementBytesReceived(int bytes) { _lifetimeBytesReceived.addAndGet(bytes); }
public void incrementDupMessagesReceived(int msgs) { _lifetimeDupMessageReceived.addAndGet(msgs); }
/**
* Time when the scheduler next want to send a packet, or -1 if
@@ -905,7 +922,11 @@ class Connection {
* instance, or want to delay an ACK.
* @return the next time the scheduler will want to send a packet, or -1 if never.
*/
public long getNextSendTime() { return _nextSendTime; }
public long getNextSendTime() {
synchronized(_nextSendLock) {
return _nextSendTime;
}
}
/**
* If the next send time is currently >= 0 (i.e. not "never"),
@@ -915,31 +936,26 @@ class Connection {
* options.getSendAckDelay() from now (1000 ms)
*/
public void setNextSendTime(long when) {
if (_nextSendTime >= 0) {
if (when < _nextSendTime)
_nextSendTime = when;
} else {
_nextSendTime = when;
}
synchronized(_nextSendLock) {
if (_nextSendTime >= 0) {
if (when < _nextSendTime)
_nextSendTime = when;
} else {
_nextSendTime = when;
}
if (_nextSendTime >= 0) {
long max = _context.clock().now() + _options.getSendAckDelay();
if (max < _nextSendTime)
_nextSendTime = max;
if (_nextSendTime >= 0) {
long max = _context.clock().now() + _options.getSendAckDelay();
if (max < _nextSendTime)
_nextSendTime = max;
}
}
//if (_log.shouldLog(Log.DEBUG) && false) {
// if (_nextSendTime <= 0)
// _log.debug("set next send time to an unknown time", new Exception(toString()));
// else
// _log.debug("set next send time to " + (_nextSendTime-_context.clock().now()) + "ms from now", new Exception(toString()));
//}
}
/** how many packets have we sent and the other side has ACKed?
* @return Count of how many packets ACKed.
*/
public long getAckedPackets() { return _ackedPackets; }
public long getAckedPackets() { return _ackedPackets.get(); }
public long getCreatedOn() { return _createdOn; }
/** @return 0 if not sent */
@@ -954,8 +970,9 @@ class Connection {
_updatedShareOpts = true;
}
}
public void incrementUnackedPacketsReceived() { _unackedPacketsReceived++; }
public int getUnackedPacketsReceived() { return _unackedPacketsReceived; }
public void incrementUnackedPacketsReceived() { _unackedPacketsReceived.incrementAndGet(); }
public int getUnackedPacketsReceived() { return _unackedPacketsReceived.get(); }
/** how many packets have we sent but not yet received an ACK for?
* @return Count of packets in-flight.
@@ -1001,7 +1018,7 @@ class Connection {
void waitForConnect() {
long expiration = _context.clock().now() + _options.getConnectTimeout();
while (true) {
if (_connected.get() && (_receiveStreamId > 0) && (_sendStreamId > 0) ) {
if (_connected.get() && (_receiveStreamId.get() > 0) && (_sendStreamId.get() > 0) ) {
// w00t
if (_log.shouldLog(Log.DEBUG))
_log.debug("waitForConnect(): Connected and we have stream IDs");
@@ -1163,13 +1180,15 @@ class Connection {
public String toString() {
StringBuilder buf = new StringBuilder(256);
buf.append("[Connection ");
if (_receiveStreamId > 0)
buf.append(Packet.toId(_receiveStreamId));
long id = _receiveStreamId.get();
if (id > 0)
buf.append(Packet.toId(id));
else
buf.append("unknown");
buf.append('/');
if (_sendStreamId > 0)
buf.append(Packet.toId(_sendStreamId));
id = _sendStreamId.get();
if (id > 0)
buf.append(Packet.toId(id));
else
buf.append("unknown");
if (_isInbound)
@@ -1258,17 +1277,17 @@ class Connection {
*/
class ResendPacketEvent extends SimpleTimer2.TimedEvent {
private final PacketLocal _packet;
private long _nextSendTime;
private long _nextSend;
public ResendPacketEvent(PacketLocal packet, long delay) {
super(_timer);
_packet = packet;
_nextSendTime = delay + _context.clock().now();
_nextSend = delay + _context.clock().now();
packet.setResendPacketEvent(ResendPacketEvent.this);
schedule(delay);
}
public long getNextSendTime() { return _nextSendTime; }
public long getNextSendTime() { return _nextSend; }
public void timeReached() { retransmit(); }
/**
* Retransmit the packet if we need to.
@@ -1318,7 +1337,7 @@ class Connection {
+ _activeResends + " active resend, "
+ _outboundPackets.size() + " unacked, window size = " + _options.getWindowSize());
forceReschedule(1333);
_nextSendTime = 1333 + _context.clock().now();
_nextSend = 1333 + _context.clock().now();
return false;
}
@@ -1339,9 +1358,9 @@ class Connection {
// bugfix release 0.7.8, we weren't dividing by 1000
_packet.setResendDelay(getOptions().getResendDelay() / 1000);
if (_packet.getReceiveStreamId() <= 0)
_packet.setReceiveStreamId(_receiveStreamId);
_packet.setReceiveStreamId(_receiveStreamId.get());
if (_packet.getSendStreamId() <= 0)
_packet.setSendStreamId(_sendStreamId);
_packet.setSendStreamId(_sendStreamId.get());
int newWindowSize = getOptions().getWindowSize();
@@ -1358,8 +1377,8 @@ class Connection {
//getOptions().setRTT(getOptions().getRTT() + 10*1000);
getOptions().setWindowSize(newWindowSize);
if (_log.shouldLog(Log.WARN))
_log.warn("Congestion, resending packet " + _packet.getSequenceNum() + " (new windowSize " + newWindowSize
if (_log.shouldLog(Log.INFO))
_log.info("Congestion, resending packet " + _packet.getSequenceNum() + " (new windowSize " + newWindowSize
+ "/" + getOptions().getWindowSize() + ") for " + Connection.this.toString());
windowAdjusted();
@@ -1405,7 +1424,7 @@ class Connection {
if ( (timeout > MAX_RESEND_DELAY) || (timeout <= 0) )
timeout = MAX_RESEND_DELAY;
// set this before enqueue() as it passes it on to the router
_nextSendTime = timeout + _context.clock().now();
_nextSend = timeout + _context.clock().now();
if (_outboundQueue.enqueue(_packet)) {
// first resend for this packet ?
@@ -1420,7 +1439,7 @@ class Connection {
" (wsize "
+ newWindowSize + " lifetime "
+ (_context.clock().now() - _packet.getCreatedOn()) + "ms)");
_unackedPacketsReceived = 0;
_unackedPacketsReceived.set(0);
_lastSendTime = _context.clock().now();
// timer reset added 0.9.1
resetActivityTimer();

View File

@@ -47,9 +47,9 @@ class ConnectionHandler {
public synchronized void setActive(boolean active) {
// FIXME active=false this only kills for one thread in accept()
// if they are more, they won't ket a poison packet.
if (_log.shouldLog(Log.DEBUG))
_log.debug("setActive(" + active + ") called");
// if there are more, they won't get a poison packet.
if (_log.shouldLog(Log.WARN))
_log.warn("setActive(" + active + ") called, previously " + _active, new Exception("I did it"));
// if starting, clear any old poison
// if stopping, the accept() loop will clear any pending sockets
if (active && !_active)
@@ -96,7 +96,7 @@ class ConnectionHandler {
// also check if expiration of the head is long past for overload detection with peek() ?
boolean success = _synQueue.offer(packet); // fail immediately if full
if (success) {
_context.simpleScheduler().addEvent(new TimeoutSyn(packet), _acceptTimeout);
_context.simpleTimer2().addEvent(new TimeoutSyn(packet), _acceptTimeout);
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping new SYN request, as the queue is full");
@@ -249,6 +249,7 @@ class ConnectionHandler {
reply.setAckThrough(packet.getSequenceNum());
reply.setSendStreamId(packet.getReceiveStreamId());
reply.setReceiveStreamId(0);
// TODO remove this someday, as of 0.9.20 we do not require it
reply.setOptionalFrom(_manager.getSession().getMyDestination());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending RST: " + reply + " because of " + packet);

View File

@@ -262,9 +262,12 @@ class ConnectionManager {
// Ditto for blacklist / whitelist
// This is a tradeoff, because it will keep retransmitting the SYN for a while,
// thus more inbound, but let's not spend several KB on the outbound.
if (_log.shouldLog(Log.INFO))
_log.info("Dropping RST to " + h);
return null;
if (!Boolean.valueOf(_context.getProperty("i2p.streaming.sendResetOnBlock"))) {
// this is the default. Set property to send reset for debugging.
if (_log.shouldLog(Log.INFO))
_log.info("Dropping RST to " + h);
return null;
}
}
}
PacketLocal reply = new PacketLocal(_context, from);
@@ -293,7 +296,7 @@ class ConnectionManager {
return null;
}
_context.statManager().addRateData("stream.connectionReceived", 1, 0);
_context.statManager().addRateData("stream.connectionReceived", 1);
return con;
}
@@ -451,7 +454,7 @@ class ConnectionManager {
break;
}
_context.statManager().addRateData("stream.connectionCreated", 1, 0);
_context.statManager().addRateData("stream.connectionCreated", 1);
return con;
}
@@ -539,7 +542,7 @@ class ConnectionManager {
if (_dayThrottler != null && _dayThrottler.shouldThrottle(h)) {
_context.statManager().addRateData("stream.con.throttledDay", 1, 0);
_context.statManager().addRateData("stream.con.throttledDay", 1);
if (_defaultOptions.getMaxConnsPerDay() <= 0)
return "throttled by" +
" total limit of " + _defaultOptions.getMaxTotalConnsPerDay() +
@@ -553,7 +556,7 @@ class ConnectionManager {
" per day";
}
if (_hourThrottler != null && _hourThrottler.shouldThrottle(h)) {
_context.statManager().addRateData("stream.con.throttledHour", 1, 0);
_context.statManager().addRateData("stream.con.throttledHour", 1);
if (_defaultOptions.getMaxConnsPerHour() <= 0)
return "throttled by" +
" total limit of " + _defaultOptions.getMaxTotalConnsPerHour() +
@@ -567,7 +570,7 @@ class ConnectionManager {
" per hour";
}
if (_minuteThrottler != null && _minuteThrottler.shouldThrottle(h)) {
_context.statManager().addRateData("stream.con.throttledMinute", 1, 0);
_context.statManager().addRateData("stream.con.throttledMinute", 1);
if (_defaultOptions.getMaxConnsPerMinute() <= 0)
return "throttled by" +
" total limit of " + _defaultOptions.getMaxTotalConnsPerMinute() +

View File

@@ -264,6 +264,9 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
/**
* Sets max buffer size, connect timeout, read timeout, and write timeout
* from properties. Does not set local port or remote port.
*
* As of 0.9.19, defaults in opts are honored.
*
* @param opts may be null
*/
public ConnectionOptions(Properties opts) {
@@ -388,66 +391,68 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
/**
* Note: NOT part of the interface
*
* As of 0.9.19, defaults in opts are honored.
*/
@Override
public void setProperties(Properties opts) {
super.setProperties(opts);
if (opts == null) return;
if (opts.containsKey(PROP_MAX_WINDOW_SIZE))
if (opts.getProperty(PROP_MAX_WINDOW_SIZE) != null)
setMaxWindowSize(getInt(opts, PROP_MAX_WINDOW_SIZE, Connection.MAX_WINDOW_SIZE));
if (opts.containsKey(PROP_CONNECT_DELAY))
if (opts.getProperty(PROP_CONNECT_DELAY) != null)
setConnectDelay(getInt(opts, PROP_CONNECT_DELAY, -1));
if (opts.containsKey(PROP_PROFILE))
if (opts.getProperty(PROP_PROFILE) != null)
setProfile(getInt(opts, PROP_PROFILE, PROFILE_BULK));
if (opts.containsKey(PROP_MAX_MESSAGE_SIZE))
if (opts.getProperty(PROP_MAX_MESSAGE_SIZE) != null)
setMaxMessageSize(getInt(opts, PROP_MAX_MESSAGE_SIZE, Packet.MAX_PAYLOAD_SIZE));
if (opts.containsKey(PROP_INITIAL_RECEIVE_WINDOW))
if (opts.getProperty(PROP_INITIAL_RECEIVE_WINDOW) != null)
setReceiveWindow(getInt(opts, PROP_INITIAL_RECEIVE_WINDOW, 1));
if (opts.containsKey(PROP_INITIAL_RESEND_DELAY))
if (opts.getProperty(PROP_INITIAL_RESEND_DELAY) != null)
setResendDelay(getInt(opts, PROP_INITIAL_RESEND_DELAY, 1000));
if (opts.containsKey(PROP_INITIAL_ACK_DELAY))
if (opts.getProperty(PROP_INITIAL_ACK_DELAY) != null)
setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, DEFAULT_INITIAL_ACK_DELAY));
if (opts.containsKey(PROP_INITIAL_WINDOW_SIZE))
if (opts.getProperty(PROP_INITIAL_WINDOW_SIZE) != null)
setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, INITIAL_WINDOW_SIZE));
if (opts.containsKey(PROP_MAX_RESENDS))
if (opts.getProperty(PROP_MAX_RESENDS) != null)
setMaxResends(getInt(opts, PROP_MAX_RESENDS, DEFAULT_MAX_SENDS));
// handled in super()
//if (opts.containsKey(PROP_WRITE_TIMEOUT))
//if (opts.getProperty(PROP_WRITE_TIMEOUT))
// setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1));
if (opts.containsKey(PROP_INACTIVITY_TIMEOUT))
if (opts.getProperty(PROP_INACTIVITY_TIMEOUT) != null)
setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, DEFAULT_INACTIVITY_TIMEOUT));
if (opts.containsKey(PROP_INACTIVITY_ACTION))
if (opts.getProperty(PROP_INACTIVITY_ACTION) != null)
setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, DEFAULT_INACTIVITY_ACTION));
setInboundBufferSize(getMaxMessageSize() * (Connection.MAX_WINDOW_SIZE + 2));
if (opts.contains(PROP_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR))
if (opts.getProperty(PROP_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR) != null)
setCongestionAvoidanceGrowthRateFactor(getInt(opts, PROP_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR,
DEFAULT_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR));
if (opts.contains(PROP_SLOW_START_GROWTH_RATE_FACTOR))
if (opts.getProperty(PROP_SLOW_START_GROWTH_RATE_FACTOR) != null)
setSlowStartGrowthRateFactor(getInt(opts, PROP_SLOW_START_GROWTH_RATE_FACTOR,
DEFAULT_SLOW_START_GROWTH_RATE_FACTOR));
if (opts.containsKey(PROP_CONNECT_TIMEOUT))
if (opts.getProperty(PROP_CONNECT_TIMEOUT) != null)
// overrides default in super()
setConnectTimeout(getInt(opts, PROP_CONNECT_TIMEOUT, Connection.DEFAULT_CONNECT_TIMEOUT));
if (opts.containsKey(PROP_ANSWER_PINGS))
if (opts.getProperty(PROP_ANSWER_PINGS) != null)
setAnswerPings(getBool(opts, PROP_ANSWER_PINGS, DEFAULT_ANSWER_PINGS));
if (opts.containsKey(PROP_ENFORCE_PROTO))
if (opts.getProperty(PROP_ENFORCE_PROTO) != null)
setEnforceProtocol(getBool(opts, PROP_ENFORCE_PROTO, DEFAULT_ENFORCE_PROTO));
if (opts.containsKey(PROP_DISABLE_REJ_LOG))
if (opts.getProperty(PROP_DISABLE_REJ_LOG) != null)
setDisableRejectLogging(getBool(opts, PROP_DISABLE_REJ_LOG, false));
initLists(opts);
if (opts.containsKey(PROP_MAX_CONNS_MIN))
if (opts.getProperty(PROP_MAX_CONNS_MIN) != null)
_maxConnsPerMinute = getInt(opts, PROP_MAX_CONNS_MIN, 0);
if (opts.containsKey(PROP_MAX_CONNS_HOUR))
if (opts.getProperty(PROP_MAX_CONNS_HOUR) != null)
_maxConnsPerHour = getInt(opts, PROP_MAX_CONNS_HOUR, 0);
if (opts.containsKey(PROP_MAX_CONNS_DAY))
if (opts.getProperty(PROP_MAX_CONNS_DAY) != null)
_maxConnsPerDay = getInt(opts, PROP_MAX_CONNS_DAY, 0);
if (opts.containsKey(PROP_MAX_TOTAL_CONNS_MIN))
if (opts.getProperty(PROP_MAX_TOTAL_CONNS_MIN) != null)
_maxTotalConnsPerMinute = getInt(opts, PROP_MAX_TOTAL_CONNS_MIN, 0);
if (opts.containsKey(PROP_MAX_TOTAL_CONNS_HOUR))
if (opts.getProperty(PROP_MAX_TOTAL_CONNS_HOUR) != null)
_maxTotalConnsPerHour = getInt(opts, PROP_MAX_TOTAL_CONNS_HOUR, 0);
if (opts.containsKey(PROP_MAX_TOTAL_CONNS_DAY))
if (opts.getProperty(PROP_MAX_TOTAL_CONNS_DAY) != null)
_maxTotalConnsPerDay = getInt(opts, PROP_MAX_TOTAL_CONNS_DAY, 0);
if (opts.containsKey(PROP_MAX_STREAMS))
if (opts.getProperty(PROP_MAX_STREAMS) != null)
_maxConns = getInt(opts, PROP_MAX_STREAMS, 0);
_rto = getInt(opts, PROP_INITIAL_RTO, INITIAL_RTO);

View File

@@ -123,7 +123,7 @@ class ConnectionPacketHandler {
}
con.getOptions().setChoke(0);
_context.statManager().addRateData("stream.con.receiveMessageSize", packet.getPayloadSize(), 0);
_context.statManager().addRateData("stream.con.receiveMessageSize", packet.getPayloadSize());
boolean allowAck = true;
final boolean isSYN = packet.isFlagSet(Packet.FLAG_SYNCHRONIZE);
@@ -190,7 +190,7 @@ class ConnectionPacketHandler {
}
} else {
if ( (seqNum > 0) || (packet.getPayloadSize() > 0) || isSYN) {
_context.statManager().addRateData("stream.con.receiveDuplicateSize", packet.getPayloadSize(), 0);
_context.statManager().addRateData("stream.con.receiveDuplicateSize", packet.getPayloadSize());
con.incrementDupMessagesReceived(1);
// take note of congestion
@@ -199,8 +199,8 @@ class ConnectionPacketHandler {
final int ackDelay = con.getOptions().getSendAckDelay();
final long lastSendTime = con.getLastSendTime();
if (_log.shouldLog(Log.WARN))
_log.warn(String.format("%s congestion.. dup packet %s ackDelay %d lastSend %s ago",
if (_log.shouldLog(Log.INFO))
_log.info(String.format("%s congestion.. dup packet %s ackDelay %d lastSend %s ago",
con, packet, ackDelay, DataHelper.formatDuration(now - lastSendTime)));
final long nextSendTime = lastSendTime + ackDelay;
@@ -213,7 +213,7 @@ class ConnectionPacketHandler {
final long delay = nextSendTime - now;
if (_log.shouldLog(Log.DEBUG))
_log.debug("scheduling ack in "+delay);
_context.simpleScheduler().addEvent(new AckDup(con), delay);
_context.simpleTimer2().addEvent(new AckDup(con), delay);
}
} else {
@@ -344,9 +344,9 @@ class ConnectionPacketHandler {
}
if (firstAck) {
if (con.isInbound())
_context.statManager().addRateData("stream.con.initialRTT.in", highestRTT, 0);
_context.statManager().addRateData("stream.con.initialRTT.in", highestRTT);
else
_context.statManager().addRateData("stream.con.initialRTT.out", highestRTT, 0);
_context.statManager().addRateData("stream.con.initialRTT.out", highestRTT);
}
}
_context.statManager().addRateData("stream.con.packetsAckedPerMessageReceived", acked.size(), highestRTT);
@@ -513,10 +513,17 @@ class ConnectionPacketHandler {
/**
* Make sure this RST packet is valid, and if it is, act on it.
*
* Prior to 0.9.20, the reset packet must contain a FROM field,
* and we used that for verification.
* As of 0.9.20, we correctly use the connection's remote peer.
*/
private void verifyReset(Packet packet, Connection con) {
if (con.getReceiveStreamId() == packet.getSendStreamId()) {
boolean ok = packet.verifySignature(_context, packet.getOptionalFrom(), null);
Destination from = con.getRemotePeer();
if (from == null)
from = packet.getOptionalFrom();
boolean ok = packet.verifySignature(_context, from, null);
if (!ok) {
if (_log.shouldLog(Log.ERROR))
_log.error("Received unsigned / forged RST on " + con);

View File

@@ -51,6 +51,8 @@ class I2PSocketFull implements I2PSocket {
}
Connection c = _connection;
if (c == null) return;
if (log.shouldLog(Log.INFO))
log.info("close() called, connected? " + c.getIsConnected() + " : " + c);
if (c.getIsConnected()) {
MessageInputStream in = c.getInputStream();
in.close();
@@ -136,6 +138,8 @@ class I2PSocketFull implements I2PSocket {
Connection c = _connection;
if (c == null) return;
if (ms > Integer.MAX_VALUE)
ms = Integer.MAX_VALUE;
c.getInputStream().setReadTimeout((int)ms);
c.getOptions().setReadTimeout(ms);
}

View File

@@ -107,6 +107,9 @@ public class I2PSocketManagerFull implements I2PSocketManager {
/**
* Create a modified copy of the current options, to be used in a setDefaultOptions() call.
*
* As of 0.9.19, defaults in opts are honored.
*
* @param opts The new options, may be null
*/
public I2PSocketOptions buildOptions(Properties opts) {
@@ -254,6 +257,7 @@ public class I2PSocketManagerFull implements I2PSocketManager {
* Parameters in the I2PSocketOptions interface may be changed directly
* with the setters; no need to use this method for those.
* This does NOT update the underlying I2CP or tunnel options; use getSession().updateOptions() for that.
*
* @param options as created from a call to buildOptions(properties), non-null
*/
public void setDefaultOptions(I2PSocketOptions options) {

View File

@@ -47,6 +47,9 @@ class I2PSocketOptionsImpl implements I2PSocketOptions {
/**
* Sets max buffer size, connect timeout, read timeout, and write timeout
* from properties. Does not set local port or remote port.
*
* As of 0.9.19, defaults in opts are honored.
*
* @param opts may be null
*/
public I2PSocketOptionsImpl(Properties opts) {
@@ -56,17 +59,20 @@ class I2PSocketOptionsImpl implements I2PSocketOptions {
/**
* Sets max buffer size, connect timeout, read timeout, and write timeout
* from properties. Does not set local port or remote port.
*
* As of 0.9.19, defaults in opts are honored.
*
* @param opts may be null
*/
public void setProperties(Properties opts) {
if (opts == null) return;
if (opts.containsKey(PROP_BUFFER_SIZE))
if (opts.getProperty(PROP_BUFFER_SIZE) != null)
_maxBufferSize = getInt(opts, PROP_BUFFER_SIZE, DEFAULT_BUFFER_SIZE);
if (opts.containsKey(PROP_CONNECT_TIMEOUT))
if (opts.getProperty(PROP_CONNECT_TIMEOUT) != null)
_connectTimeout = getInt(opts, PROP_CONNECT_TIMEOUT, DEFAULT_CONNECT_TIMEOUT);
if (opts.containsKey(PROP_READ_TIMEOUT))
if (opts.getProperty(PROP_READ_TIMEOUT) != null)
_readTimeout = getInt(opts, PROP_READ_TIMEOUT, -1);
if (opts.containsKey(PROP_WRITE_TIMEOUT))
if (opts.getProperty(PROP_WRITE_TIMEOUT) != null)
_writeTimeout = getInt(opts, PROP_WRITE_TIMEOUT, DEFAULT_WRITE_TIMEOUT);
}
@@ -95,6 +101,9 @@ class I2PSocketOptionsImpl implements I2PSocketOptions {
}
}
/**
* Not part of the API, not for external use.
*/
public static double getDouble(Properties opts, String name, double defaultVal) {
if (opts == null) return defaultVal;
String val = opts.getProperty(name);
@@ -135,9 +144,11 @@ class I2PSocketOptionsImpl implements I2PSocketOptions {
/**
* What is the longest we'll block on the input stream while waiting
* for more data. If this value is exceeded, the read() throws
* InterruptedIOException
* InterruptedIOException - FIXME doesn't really, returns -1 or 0 instead.
*
* WARNING: Default -1 (unlimited), which is probably not what you want.
*
* @return timeout in ms, 0 for nonblocking, -1 for forever
*/
public long getReadTimeout() {
return _readTimeout;
@@ -146,9 +157,11 @@ class I2PSocketOptionsImpl implements I2PSocketOptions {
/**
* What is the longest we'll block on the input stream while waiting
* for more data. If this value is exceeded, the read() throws
* InterruptedIOException
* InterruptedIOException - FIXME doesn't really, returns -1 or 0 instead.
*
* WARNING: Default -1 (unlimited), which is probably not what you want.
*
* @param ms timeout in ms, 0 for nonblocking, -1 for forever
*/
public void setReadTimeout(long ms) {
_readTimeout = ms;

View File

@@ -54,7 +54,7 @@ class MessageHandler implements I2PSessionMuxedListener {
try {
data = session.receiveMessage(msgId);
} catch (I2PSessionException ise) {
_context.statManager().addRateData("stream.packetReceiveFailure", 1, 0);
_context.statManager().addRateData("stream.packetReceiveFailure", 1);
if (_log.shouldLog(Log.WARN))
_log.warn("Error receiving the message", ise);
return;
@@ -67,7 +67,7 @@ class MessageHandler implements I2PSessionMuxedListener {
packet.setLocalPort(toPort);
_manager.getPacketHandler().receivePacket(packet);
} catch (IllegalArgumentException iae) {
_context.statManager().addRateData("stream.packetReceiveFailure", 1, 0);
_context.statManager().addRateData("stream.packetReceiveFailure", 1);
if (_log.shouldLog(Log.WARN))
_log.warn("Received an invalid packet", iae);
}

View File

@@ -170,10 +170,15 @@ class MessageInputStream extends InputStream {
/**
* how long a read() call should block (if less than 0, block indefinitely,
* but if it is 0, do not block at all)
* @return how long read calls should block, 0 or less indefinitely block
* @return how long read calls should block, 0 for nonblocking, negative to indefinitely block
*/
public int getReadTimeout() { return _readTimeout; }
/**
* how long a read() call should block (if less than 0, block indefinitely,
* but if it is 0, do not block at all)
* @param timeout how long read calls should block, 0 for nonblocking, negative to indefinitely block
*/
public void setReadTimeout(int timeout) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Changing read timeout from " + _readTimeout + " to " + timeout);
@@ -230,7 +235,8 @@ class MessageInputStream extends InputStream {
*/
public boolean messageReceived(long messageId, ByteArray payload) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("received " + messageId + " with " + (payload != null ? payload.getValid()+"" : "no payload"));
_log.debug("received msg ID " + messageId + " with " +
(payload != null ? payload.getValid() + " bytes" : "no payload"));
synchronized (_dataLock) {
if (messageId <= _highestReadyBlockId) {
if (_log.shouldLog(Log.INFO))
@@ -261,6 +267,10 @@ class MessageInputStream extends InputStream {
cur++;
_highestReadyBlockId++;
}
// FIXME Javadocs for setReadTimeout() say we will throw
// an InterruptedIOException.
// Java throws a SocketTimeoutException.
// We do neither.
} else {
if (_log.shouldLog(Log.INFO))
_log.info("Message is out of order: " + messageId);
@@ -275,23 +285,41 @@ class MessageInputStream extends InputStream {
return true;
}
/**
* On a read timeout, this returns -1
* (doesn't throw SocketTimeoutException like Socket)
* (doesn't throw InterruptedIOException like our javadocs say)
*/
public int read() throws IOException {
int read = read(_oneByte, 0, 1);
if (read < 0)
if (read <= 0)
return -1;
return _oneByte[0] & 0xff;
}
/**
* On a read timeout, this returns 0
* (doesn't throw SocketTimeoutException like Socket)
* (doesn't throw InterruptedIOException like our javadocs say)
*/
@Override
public int read(byte target[]) throws IOException {
return read(target, 0, target.length);
}
/**
* On a read timeout, this returns 0
* (doesn't throw SocketTimeoutException like Socket)
* (doesn't throw InterruptedIOException like our javadocs say)
*/
@Override
public int read(byte target[], int offset, int length) throws IOException {
long expiration = -1;
if (_readTimeout > 0)
expiration = _readTimeout + System.currentTimeMillis();
int readTimeout = _readTimeout;
long expiration;
if (readTimeout > 0)
expiration = readTimeout + System.currentTimeMillis();
else
expiration = -1;
synchronized (_dataLock) {
if (_locallyClosed) throw new IOException("Already locally closed");
throwAnyError();
@@ -310,10 +338,10 @@ class MessageInputStream extends InputStream {
+ "] got EOF after " + _readTotal + " " + toString());
return -1;
} else {
if (_readTimeout < 0) {
if (readTimeout < 0) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("read(...," + offset+", " + length+ ")[" + i
+ ") with no timeout: " + toString());
+ "] with no timeout: " + toString());
try {
_dataLock.wait();
} catch (InterruptedException ie) {
@@ -323,14 +351,14 @@ class MessageInputStream extends InputStream {
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("read(...," + offset+", " + length+ ")[" + i
+ ") with no timeout complete: " + toString());
+ "] with no timeout complete: " + toString());
throwAnyError();
} else if (_readTimeout > 0) {
} else if (readTimeout > 0) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("read(...," + offset+", " + length+ ")[" + i
+ ") with timeout: " + _readTimeout + ": " + toString());
+ "] with timeout: " + readTimeout + ": " + toString());
try {
_dataLock.wait(_readTimeout);
_dataLock.wait(readTimeout);
} catch (InterruptedException ie) {
IOException ioe2 = new InterruptedIOException("Interrupted read");
ioe2.initCause(ie);
@@ -338,21 +366,30 @@ class MessageInputStream extends InputStream {
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("read(...," + offset+", " + length+ ")[" + i
+ ") with timeout complete: " + _readTimeout + ": " + toString());
+ "] with timeout complete: " + readTimeout + ": " + toString());
throwAnyError();
} else { // readTimeout == 0
// noop, don't block
if (_log.shouldLog(Log.DEBUG))
_log.debug("read(...," + offset+", " + length+ ")[" + i
+ ") with nonblocking setup: " + toString());
+ "] with nonblocking setup: " + toString());
return i;
}
if (_readyDataBlocks.isEmpty()) {
if ( (_readTimeout > 0) && (expiration < System.currentTimeMillis()) ) {
if (_log.shouldLog(Log.INFO))
_log.info("read(...," + offset+", " + length+ ")[" + i
+ ") expired: " + toString());
return i;
if (readTimeout > 0) {
long remaining = expiration - System.currentTimeMillis();
if (remaining <= 0) {
// FIXME Javadocs for setReadTimeout() say we will throw
// an InterruptedIOException.
// Java throws a SocketTimeoutException.
// We do neither.
if (_log.shouldLog(Log.INFO))
_log.info("read(...," + offset+", " + length+ ")[" + i
+ "] expired: " + toString());
return i;
} else {
readTimeout = (int) remaining;
}
}
}
}

View File

@@ -233,7 +233,7 @@ class MessageOutputStream extends OutputStream {
// no need to be overly worried about duplicates - it would just
// push it further out
if (!_enqueued) {
// Maybe we could just use schedule() here - or even SimpleScheduler - not sure...
// Maybe we could just use schedule() here - or even SimpleTimer2 - not sure...
// To be safe, use forceReschedule() so we don't get lots of duplicates
// We've seen the queue blow up before, maybe it was this before the rewrite...
// So perhaps it IS wise to be "overly worried" ...

View File

@@ -246,6 +246,7 @@ class PacketHandler {
reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
reply.setSendStreamId(packet.getReceiveStreamId());
reply.setReceiveStreamId(packet.getSendStreamId());
// TODO remove this someday, as of 0.9.20 we do not require it
reply.setOptionalFrom(_manager.getSession().getMyDestination());
reply.setLocalPort(packet.getLocalPort());
reply.setRemotePort(packet.getRemotePort());
@@ -268,14 +269,15 @@ class PacketHandler {
}
packet.releasePayload();
} else {
if (_log.shouldLog(Log.WARN) && !packet.isFlagSet(Packet.FLAG_SYNCHRONIZE))
_log.warn("Packet received on an unknown stream (and not an ECHO or SYN): " + packet);
// this happens a lot
if (_log.shouldLog(Log.INFO) && !packet.isFlagSet(Packet.FLAG_SYNCHRONIZE))
_log.info("Packet received on an unknown stream (and not an ECHO or SYN): " + packet);
if (sendId <= 0) {
Connection con = _manager.getConnectionByOutboundId(packet.getReceiveStreamId());
if (con != null) {
if ( (con.getHighestAckedThrough() <= 5) && (packet.getSequenceNum() <= 5) ) {
if (_log.shouldLog(Log.WARN))
_log.warn("Received additional packet w/o SendStreamID after the syn on " + con + ": " + packet);
if (_log.shouldLog(Log.INFO))
_log.info("Received additional packet w/o SendStreamID after the syn on " + con + ": " + packet);
receiveKnownCon(con, packet);
return;
} else {

View File

@@ -16,7 +16,7 @@ abstract class SchedulerImpl implements TaskScheduler {
}
protected void reschedule(long msToWait, Connection con) {
_context.simpleScheduler().addEvent(con.getConnectionEvent(), msToWait);
_context.simpleTimer2().addEvent(con.getConnectionEvent(), msToWait);
}
@Override

View File

@@ -193,7 +193,15 @@ class StandardSocket extends Socket {
I2PSocketOptions opts = _socket.getOptions();
if (opts == null)
return 0;
return (int) opts.getReadTimeout();
long rv = opts.getReadTimeout();
// Java Socket: 0 is forever, and we don't exactly have nonblocking
if (rv > Integer.MAX_VALUE)
rv = Integer.MAX_VALUE;
else if (rv < 0)
rv = 0;
else if (rv == 0)
rv = 1;
return (int) rv;
}
/**
@@ -309,6 +317,9 @@ class StandardSocket extends Socket {
I2PSocketOptions opts = _socket.getOptions();
if (opts == null)
throw new SocketException("No options");
// Java Socket: 0 is forever
if (timeout == 0)
timeout = -1;
opts.setReadTimeout(timeout);
}