explicit merge of '7bae8d314209ec279a4da918dc3255e31bda7e45'

and '3b133e76d8124df27791cb78006e7c2b9a8b6430'
This commit is contained in:
mkvore-commit
2009-04-02 08:57:46 +00:00
539 changed files with 12038 additions and 55442 deletions

View File

@@ -25,7 +25,14 @@
<mkdir dir="./build" />
<mkdir dir="./build/obj" />
<javac
srcdir="./src:./test"
srcdir="./src"
debug="true" deprecation="on" source="1.5" target="1.5"
destdir="./build/obj"
classpath="../../../core/java/build/i2p.jar:../../ministreaming/java/build/mstreaming.jar" />
</target>
<target name="compileTest" depends="compile">
<javac
srcdir="./test"
debug="true" deprecation="on" source="1.5" target="1.5"
destdir="./build/obj"
classpath="../../../core/java/build/i2p.jar:../../ministreaming/java/build/mstreaming.jar" />
@@ -33,6 +40,9 @@
<target name="jar" depends="builddep, compile">
<jar destfile="./build/streaming.jar" basedir="./build/obj" includes="**/*.class" />
</target>
<target name="jarTest" depends="jar, compileTest">
<jar destfile="./build/streaming.jar" basedir="./build/obj" includes="**/*Test*.class" update="true" />
</target>
<target name="javadoc">
<mkdir dir="./build" />
<mkdir dir="./build/javadoc" />

View File

@@ -12,7 +12,9 @@ import net.i2p.client.I2PSession;
import net.i2p.data.DataHelper;
import net.i2p.data.Destination;
import net.i2p.util.Log;
import net.i2p.util.SimpleScheduler;
import net.i2p.util.SimpleTimer;
import net.i2p.util.SimpleTimer2;
/**
* Maintain the state controlling a streaming connection between two
@@ -45,6 +47,7 @@ public class Connection {
private long _congestionWindowEnd;
private long _highestAckedThrough;
private boolean _isInbound;
private boolean _updatedShareOpts;
/** Packet ID (Long) to PacketLocal for sent but unacked packets */
private Map _outboundPackets;
private PacketQueue _outboundQueue;
@@ -67,6 +70,7 @@ public class Connection {
/** how many messages have been resent and not yet ACKed? */
private int _activeResends;
private ConEvent _connectionEvent;
private int _randomWait;
private long _lifetimeBytesSent;
private long _lifetimeBytesReceived;
@@ -120,8 +124,10 @@ public class Connection {
_activeResends = 0;
_resetSentOn = -1;
_isInbound = false;
_updatedShareOpts = false;
_connectionEvent = new ConEvent();
_hardDisconnected = false;
_randomWait = _context.random().nextInt(10*1000); // just do this once to reduce usage
_context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.chokeSizeBegin", "How many messages were outstanding when we started to choke?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.chokeSizeEnd", "How many messages were outstanding when we stopped being choked?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
@@ -245,7 +251,7 @@ public class Connection {
void sendReset() {
if (_disconnectScheduledOn < 0) {
_disconnectScheduledOn = _context.clock().now();
SimpleTimer.getInstance().addEvent(new DisconnectEvent(), DISCONNECT_TIMEOUT);
SimpleScheduler.getInstance().addEvent(new DisconnectEvent(), DISCONNECT_TIMEOUT);
}
long now = _context.clock().now();
if (_resetSentOn + 10*1000 > now) return; // don't send resets too fast
@@ -323,7 +329,8 @@ public class Connection {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Resend in " + timeout + " for " + packet, new Exception("Sent by"));
RetransmissionTimer.getInstance().addEvent(new ResendPacketEvent(packet, timeout + _context.clock().now()), timeout);
// schedules itself
ResendPacketEvent rpe = new ResendPacketEvent(packet, timeout);
}
_context.statManager().getStatLog().addData(Packet.toId(_sendStreamId), "stream.rtt", _options.getRTT(), _options.getWindowSize());
@@ -459,7 +466,7 @@ public class Connection {
void resetReceived() {
if (_disconnectScheduledOn < 0) {
_disconnectScheduledOn = _context.clock().now();
SimpleTimer.getInstance().addEvent(new DisconnectEvent(), DISCONNECT_TIMEOUT);
SimpleScheduler.getInstance().addEvent(new DisconnectEvent(), DISCONNECT_TIMEOUT);
}
_resetReceived = true;
MessageOutputStream mos = _outputStream;
@@ -507,7 +514,7 @@ public class Connection {
if (removeFromConMgr) {
if (_disconnectScheduledOn < 0) {
_disconnectScheduledOn = _context.clock().now();
SimpleTimer.getInstance().addEvent(new DisconnectEvent(), DISCONNECT_TIMEOUT);
SimpleScheduler.getInstance().addEvent(new DisconnectEvent(), DISCONNECT_TIMEOUT);
}
}
_connected = false;
@@ -523,7 +530,7 @@ public class Connection {
if (_receiver != null)
_receiver.destroy();
if (_activityTimer != null)
SimpleTimer.getInstance().removeEvent(_activityTimer);
_activityTimer.cancel();
//_activityTimer = null;
if (_inputStream != null)
_inputStream.streamErrorOccurred(new IOException("disconnected!"));
@@ -586,6 +593,8 @@ public class Connection {
if (_remotePeerSet) throw new RuntimeException("Remote peer already set [" + _remotePeer + ", " + peer + "]");
_remotePeerSet = true;
_remotePeer = peer;
// now that we know who the other end is, get the rtt etc. from the cache
_connectionManager.updateOptsFromShare(this);
}
private boolean _sendStreamIdSet = false;
@@ -704,12 +713,18 @@ public class Connection {
_closeSentOn = when;
if (_disconnectScheduledOn < 0) {
_disconnectScheduledOn = _context.clock().now();
SimpleTimer.getInstance().addEvent(new DisconnectEvent(), DISCONNECT_TIMEOUT);
SimpleScheduler.getInstance().addEvent(new DisconnectEvent(), DISCONNECT_TIMEOUT);
}
}
public long getCloseReceivedOn() { return _closeReceivedOn; }
public void setCloseReceivedOn(long when) { _closeReceivedOn = when; }
public void updateShareOpts() {
if (_closeSentOn > 0 && !_updatedShareOpts) {
_connectionManager.updateShareOpts(this);
_updatedShareOpts = true;
}
}
public void incrementUnackedPacketsReceived() { _unackedPacketsReceived++; }
public int getUnackedPacketsReceived() { return _unackedPacketsReceived; }
/** how many packets have we sent but not yet received an ACK for?
@@ -811,16 +826,21 @@ public class Connection {
return;
}
long howLong = _options.getInactivityTimeout();
howLong += _context.random().nextInt(30*1000); // randomize it a bit, so both sides don't do it at once
howLong += _randomWait; // randomize it a bit, so both sides don't do it at once
if (_log.shouldLog(Log.DEBUG))
_log.debug("Resetting the inactivity timer to " + howLong, new Exception(toString()));
// this will get rescheduled, and rescheduled, and rescheduled...
RetransmissionTimer.getInstance().removeEvent(_activityTimer);
RetransmissionTimer.getInstance().addEvent(_activityTimer, howLong);
_activityTimer.reschedule(howLong, false); // use the later of current and previous timeout
}
private class ActivityTimer implements SimpleTimer.TimedEvent {
private class ActivityTimer extends SimpleTimer2.TimedEvent {
public ActivityTimer() {
super(RetransmissionTimer.getInstance());
setFuzz(5*1000); // sloppy timer, don't reschedule unless at least 5s later
}
public void timeReached() {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Fire inactivity timer on " + Connection.this.toString());
// uh, nothing more to do...
if (!_connected) {
if (_log.shouldLog(Log.DEBUG)) _log.debug("Inactivity timeout reached, but we are already closed");
@@ -830,7 +850,7 @@ public class Connection {
long left = getTimeLeft();
if (left > 0) {
if (_log.shouldLog(Log.DEBUG)) _log.debug("Inactivity timeout reached, but there is time left (" + left + ")");
RetransmissionTimer.getInstance().addEvent(ActivityTimer.this, left);
schedule(left);
return;
}
// these are either going to time out or cause further rescheduling
@@ -844,24 +864,31 @@ public class Connection {
return;
}
// if one of us can't talk...
if ( (_closeSentOn > 0) || (_closeReceivedOn > 0) ) {
if (_log.shouldLog(Log.DEBUG)) _log.debug("Inactivity timeout reached, but we are closing");
return;
}
// No - not true - data and acks are still going back and forth.
// Prevent zombie connections by keeping the inactivity timer.
// Not sure why... receiving a close but never sending one?
// If so we can probably re-enable this for _closeSentOn.
// For further investigation...
//if ( (_closeSentOn > 0) || (_closeReceivedOn > 0) ) {
// if (_log.shouldLog(Log.DEBUG)) _log.debug("Inactivity timeout reached, but we are closing");
// return;
//}
if (_log.shouldLog(Log.DEBUG)) _log.debug("Inactivity timeout reached, with action=" + _options.getInactivityAction());
// bugger it, might as well do the hard work now
switch (_options.getInactivityAction()) {
case ConnectionOptions.INACTIVITY_ACTION_SEND:
if (_log.shouldLog(Log.WARN))
_log.warn("Sending some data due to inactivity");
_receiver.send(null, 0, 0, true);
break;
case ConnectionOptions.INACTIVITY_ACTION_NOOP:
if (_log.shouldLog(Log.WARN))
_log.warn("Inactivity timer expired, but we aint doin' shit");
break;
case ConnectionOptions.INACTIVITY_ACTION_SEND:
if (_closeSentOn <= 0 && _closeReceivedOn <= 0) {
if (_log.shouldLog(Log.WARN))
_log.warn("Sending some data due to inactivity");
_receiver.send(null, 0, 0, true);
break;
} // else fall through
case ConnectionOptions.INACTIVITY_ACTION_DISCONNECT:
// fall through
default:
@@ -877,7 +904,9 @@ public class Connection {
_inputStream.streamErrorOccurred(new IOException("Inactivity timeout"));
_outputStream.streamErrorOccurred(new IOException("Inactivity timeout"));
disconnect(false);
// Clean disconnect if we have already scheduled one
// (generally because we already sent a close)
disconnect(_disconnectScheduledOn >= 0);
break;
}
}
@@ -948,9 +977,9 @@ public class Connection {
}
if (getResetSent())
buf.append(" reset sent");
buf.append(" reset sent ").append(DataHelper.formatDuration(_context.clock().now() - getResetSentOn())).append(" ago");
if (getResetReceived())
buf.append(" reset received");
buf.append(" reset received ").append(DataHelper.formatDuration(_context.clock().now() - getDisconnectScheduledOn())).append(" ago");
if (getCloseSentOn() > 0) {
buf.append(" close sent ");
long timeSinceClose = _context.clock().now() - getCloseSentOn();
@@ -958,7 +987,7 @@ public class Connection {
buf.append(" ago");
}
if (getCloseReceivedOn() > 0)
buf.append(" close received");
buf.append(" close received ").append(DataHelper.formatDuration(_context.clock().now() - getCloseReceivedOn())).append(" ago");
buf.append(" sent: ").append(1 + _lastSendId);
if (_inputStream != null)
buf.append(" rcvd: ").append(1 + _inputStream.getHighestBlockId() - missing);
@@ -990,21 +1019,23 @@ public class Connection {
/**
* If we have been explicitly NACKed three times, retransmit the packet even if
* there are other packets in flight.
* there are other packets in flight. 3 takes forever, let's try 2.
*
*/
static final int FAST_RETRANSMIT_THRESHOLD = 3;
static final int FAST_RETRANSMIT_THRESHOLD = 2;
/**
* Coordinate the resends of a given packet
*/
private class ResendPacketEvent implements SimpleTimer.TimedEvent {
public class ResendPacketEvent extends SimpleTimer2.TimedEvent {
private PacketLocal _packet;
private long _nextSendTime;
public ResendPacketEvent(PacketLocal packet, long sendTime) {
public ResendPacketEvent(PacketLocal packet, long delay) {
super(RetransmissionTimer.getInstance());
_packet = packet;
_nextSendTime = sendTime;
_nextSendTime = delay + _context.clock().now();
packet.setResendPacketEvent(ResendPacketEvent.this);
schedule(delay);
}
public long getNextSendTime() { return _nextSendTime; }
@@ -1012,6 +1043,10 @@ public class Connection {
/**
* Retransmit the packet if we need to.
*
* ackImmediately() above calls directly in here, so
* we have to use forceReschedule() instead of schedule() below,
* to prevent duplicates in the timer queue.
*
* @param penalize true if this retransmission is caused by a timeout, false if we
* are just sending this packet instead of an ACK
* @return true if the packet was sent, false if it was not
@@ -1020,7 +1055,9 @@ public class Connection {
if (_packet.getAckTime() > 0)
return false;
if (_resetSent || _resetReceived) {
if (_resetSent || _resetReceived || !_connected) {
if(_log.shouldLog(Log.WARN) && (!_resetSent) && (!_resetReceived))
_log.warn("??? no resets but not connected: " + _packet); // don't think this is possible
_packet.cancelled();
return false;
}
@@ -1044,7 +1081,7 @@ public class Connection {
if (_log.shouldLog(Log.INFO))
_log.info("Delaying resend of " + _packet + " as there are "
+ _activeResends + " active resends already in play");
RetransmissionTimer.getInstance().addEvent(ResendPacketEvent.this, 1000);
forceReschedule(1000);
_nextSendTime = 1000 + _context.clock().now();
return false;
}
@@ -1104,26 +1141,6 @@ public class Connection {
_context.sessionKeyManager().failTags(_remotePeer.getPublicKey());
}
if (numSends - 1 <= _options.getMaxResends()) {
if (_log.shouldLog(Log.INFO))
_log.info("Resend packet " + _packet + " time " + numSends +
" activeResends: " + _activeResends +
" (wsize "
+ newWindowSize + " lifetime "
+ (_context.clock().now() - _packet.getCreatedOn()) + "ms)");
_outboundQueue.enqueue(_packet);
_lastSendTime = _context.clock().now();
}
// acked during resending (... or somethin')
if ( (_packet.getAckTime() > 0) && (_packet.getNumSends() > 1) ) {
_activeResends--;
synchronized (_outboundPackets) {
_outboundPackets.notifyAll();
}
return true;
}
if (numSends - 1 > _options.getMaxResends()) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Too many resends");
@@ -1137,11 +1154,32 @@ public class Connection {
long timeout = rto << (numSends-1);
if ( (timeout > MAX_RESEND_DELAY) || (timeout <= 0) )
timeout = MAX_RESEND_DELAY;
// set this before enqueue() as it passes it on to the router
_nextSendTime = timeout + _context.clock().now();
if (_log.shouldLog(Log.INFO))
_log.info("Resend packet " + _packet + " time " + numSends +
" activeResends: " + _activeResends +
" (wsize "
+ newWindowSize + " lifetime "
+ (_context.clock().now() - _packet.getCreatedOn()) + "ms)");
_outboundQueue.enqueue(_packet);
_lastSendTime = _context.clock().now();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Scheduling resend in " + timeout + "ms for " + _packet);
RetransmissionTimer.getInstance().addEvent(ResendPacketEvent.this, timeout);
_nextSendTime = timeout + _context.clock().now();
forceReschedule(timeout);
}
// acked during resending (... or somethin')
if ( (_packet.getAckTime() > 0) && (_packet.getNumSends() > 1) ) {
_activeResends--;
synchronized (_outboundPackets) {
_outboundPackets.notifyAll();
}
return true;
}
return true;
} else {
//if (_log.shouldLog(Log.DEBUG))

View File

@@ -1,34 +1,47 @@
package net.i2p.client.streaming;
import java.net.ConnectException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.ArrayList;
import java.util.List;
import net.i2p.I2PAppContext;
import net.i2p.util.Clock;
import net.i2p.util.Log;
import net.i2p.util.SimpleScheduler;
import net.i2p.util.SimpleTimer;
/**
* Receive new connection attempts
*
* Use a bounded queue to limit the damage from SYN floods,
* router overload, or a slow client
*
* @author zzz modded to use concurrent and bound queue size
*/
class ConnectionHandler {
private I2PAppContext _context;
private Log _log;
private ConnectionManager _manager;
private List<Packet> _synQueue;
private LinkedBlockingQueue<Packet> _synQueue;
private boolean _active;
private int _acceptTimeout;
/** max time after receiveNewSyn() and before the matched accept() */
private static final int DEFAULT_ACCEPT_TIMEOUT = 3*1000;
/**
* This is both SYNs and subsequent packets, and with an initial window size of 12,
* this is a backlog of 5 to 64 Syns, which seems like plenty for now
* Don't make this too big because the removal by all the TimeoutSyns is O(n**2) - sortof.
*/
private static final int MAX_QUEUE_SIZE = 64;
/** Creates a new instance of ConnectionHandler */
public ConnectionHandler(I2PAppContext context, ConnectionManager mgr) {
_context = context;
_log = context.logManager().getLog(ConnectionHandler.class);
_manager = mgr;
_synQueue = new ArrayList(5);
_synQueue = new LinkedBlockingQueue(MAX_QUEUE_SIZE);
_active = false;
_acceptTimeout = DEFAULT_ACCEPT_TIMEOUT;
}
@@ -36,9 +49,11 @@ class ConnectionHandler {
public void setActive(boolean active) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("setActive(" + active + ") called");
synchronized (_synQueue) {
_active = active;
_synQueue.notifyAll(); // so we break from the accept()
_active = active;
if (!active) {
try {
_synQueue.put(new PoisonPacket()); // so we break from the accept() - waits until space is available
} catch (InterruptedException ie) {}
}
}
public boolean getActive() { return _active; }
@@ -46,20 +61,31 @@ class ConnectionHandler {
/**
* Non-SYN packets with a zero SendStreamID may also be queued here so
* that they don't get thrown away while the SYN packet before it is queued.
*
* Additional overload protection may be required here...
* We don't have a 3-way handshake, so the SYN fully opens a connection.
* Does that make us more or less vulnerable to SYN flooding?
*
*/
public void receiveNewSyn(Packet packet) {
if (!_active) {
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping new SYN request, as we're not listening");
sendReset(packet);
if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE))
sendReset(packet);
return;
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive new SYN: " + packet + ": timeout in " + _acceptTimeout);
RetransmissionTimer.getInstance().addEvent(new TimeoutSyn(packet), _acceptTimeout);
synchronized (_synQueue) {
_synQueue.add(packet);
_synQueue.notifyAll();
// also check if expiration of the head is long past for overload detection with peek() ?
boolean success = _synQueue.offer(packet); // fail immediately if full
if (success) {
SimpleScheduler.getInstance().addEvent(new TimeoutSyn(packet), _acceptTimeout);
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping new SYN request, as the queue is full");
if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE))
sendReset(packet);
}
}
@@ -85,41 +111,44 @@ class ConnectionHandler {
while (true) {
if (!_active) {
// fail all the ones we had queued up
synchronized (_synQueue) {
for (int i = 0; i < _synQueue.size(); i++) {
Packet packet = (Packet)_synQueue.get(i);
sendReset(packet);
}
_synQueue.clear();
while(true) {
Packet packet = _synQueue.poll(); // fails immediately if empty
if (packet == null || packet.getOptionalDelay() == PoisonPacket.MAX_DELAY_REQUEST)
break;
sendReset(packet);
}
return null;
}
Packet syn = null;
synchronized (_synQueue) {
while ( _active && (_synQueue.size() <= 0) ) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Accept("+ timeoutMs+"): active=" + _active + " queue: "
+ _synQueue.size());
if (timeoutMs < 0) {
try { _synQueue.wait(); } catch (InterruptedException ie) {}
} else {
long remaining = expiration - _context.clock().now();
// BUGFIX
// The specified amount of real time has elapsed, more or less.
// If timeout is zero, however, then real time is not taken into consideration
// and the thread simply waits until notified.
if (remaining < 1)
break;
try { _synQueue.wait(remaining); } catch (InterruptedException ie) {}
}
}
if (_active && _synQueue.size() > 0) {
syn = (Packet)_synQueue.remove(0);
while ( _active && syn == null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Accept("+ timeoutMs+"): active=" + _active + " queue: "
+ _synQueue.size());
if (timeoutMs <= 0) {
try {
syn = _synQueue.take(); // waits forever
} catch (InterruptedException ie) {}
} else {
long remaining = expiration - _context.clock().now();
// (dont think this applies anymore for LinkedBlockingQueue)
// BUGFIX
// The specified amount of real time has elapsed, more or less.
// If timeout is zero, however, then real time is not taken into consideration
// and the thread simply waits until notified.
if (remaining < 1)
break;
try {
syn = _synQueue.poll(remaining, TimeUnit.MILLISECONDS); // waits the specified time max
} catch (InterruptedException ie) {}
break;
}
}
if (syn != null) {
if (syn.getOptionalDelay() == PoisonPacket.MAX_DELAY_REQUEST)
return null;
// deal with forged / invalid syn packets
// Handle both SYN and non-SYN packets in the queue
@@ -184,10 +213,7 @@ class ConnectionHandler {
}
public void timeReached() {
boolean removed = false;
synchronized (_synQueue) {
removed = _synQueue.remove(_synPacket);
}
boolean removed = _synQueue.remove(_synPacket);
if (removed) {
if (_synPacket.isFlagSet(Packet.FLAG_SYNCHRONIZE))
@@ -201,4 +227,17 @@ class ConnectionHandler {
}
}
}
/**
* Simple end-of-queue marker.
* The standard class limits the delay to MAX_DELAY_REQUEST so
* an evil user can't use this to shut us down
*/
private static class PoisonPacket extends Packet {
public static final int MAX_DELAY_REQUEST = Packet.MAX_DELAY_REQUEST + 1;
public PoisonPacket() {
setOptionalDelay(MAX_DELAY_REQUEST);
}
}
}

View File

@@ -30,6 +30,7 @@ public class ConnectionManager {
private PacketQueue _outboundQueue;
private SchedulerChooser _schedulerChooser;
private ConnectionPacketHandler _conPacketHandler;
private TCBShare _tcbShare;
/** Inbound stream ID (Long) to Connection map */
private Map _connectionByInboundId;
/** Ping ID (Long) to PingRequest */
@@ -52,6 +53,7 @@ public class ConnectionManager {
_connectionHandler = new ConnectionHandler(context, this);
_schedulerChooser = new SchedulerChooser(context);
_conPacketHandler = new ConnectionPacketHandler(context);
_tcbShare = new TCBShare(context);
_session = session;
session.setSessionListener(_messageHandler);
_outboundQueue = new PacketQueue(context, session, this);
@@ -127,6 +129,7 @@ public class ConnectionManager {
*/
public Connection receiveConnection(Packet synPacket) {
Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, new ConnectionOptions(_defaultOptions));
_tcbShare.updateOptsFromShare(con);
con.setInbound();
long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
boolean reject = false;
@@ -277,6 +280,8 @@ public class ConnectionManager {
public ConnectionHandler getConnectionHandler() { return _connectionHandler; }
public I2PSession getSession() { return _session; }
public PacketQueue getPacketQueue() { return _outboundQueue; }
public void updateOptsFromShare(Connection con) { _tcbShare.updateOptsFromShare(con); }
public void updateShareOpts(Connection con) { _tcbShare.updateShareOpts(con); }
/**
* Something b0rked hard, so kill all of our connections without mercy.
@@ -292,6 +297,7 @@ public class ConnectionManager {
_connectionByInboundId.clear();
_connectionLock.notifyAll();
}
_tcbShare.stop();
}
/**

View File

@@ -54,9 +54,9 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
public static final String PROP_SLOW_START_GROWTH_RATE_FACTOR = "i2p.streaming.slowStartGrowthRateFactor";
private static final int TREND_COUNT = 3;
static final int INITIAL_WINDOW_SIZE = 12;
static final int INITIAL_WINDOW_SIZE = 6;
static final int DEFAULT_MAX_SENDS = 8;
public static final int DEFAULT_INITIAL_RTT = 10*1000;
public static final int DEFAULT_INITIAL_RTT = 8*1000;
static final int MIN_WINDOW_SIZE = 1;
/**

View File

@@ -7,6 +7,7 @@ import net.i2p.I2PException;
import net.i2p.data.DataHelper;
import net.i2p.data.Destination;
import net.i2p.util.Log;
import net.i2p.util.SimpleScheduler;
import net.i2p.util.SimpleTimer;
/**
@@ -168,7 +169,7 @@ public class ConnectionPacketHandler {
// take note of congestion
if (_log.shouldLog(Log.WARN))
_log.warn("congestion.. dup " + packet);
RetransmissionTimer.getInstance().addEvent(new AckDup(con), con.getOptions().getSendAckDelay());
SimpleScheduler.getInstance().addEvent(new AckDup(con), con.getOptions().getSendAckDelay());
//con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay());
//fastAck = true;
} else {
@@ -213,6 +214,10 @@ public class ConnectionPacketHandler {
packet.releasePayload();
}
// update the TCB Cache now that we've processed the acks and updated our rtt etc.
if (isNew && packet.isFlagSet(Packet.FLAG_CLOSE) && packet.isFlagSet(Packet.FLAG_SIGNATURE_INCLUDED))
con.updateShareOpts();
//if (choke)
// con.fastRetransmit();
}

View File

@@ -8,7 +8,7 @@ import net.i2p.I2PAppContext;
import net.i2p.data.ByteArray;
import net.i2p.util.ByteCache;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer;
import net.i2p.util.SimpleTimer2;
/**
* A stream that we can shove data into that fires off those bytes
@@ -201,13 +201,20 @@ public class MessageOutputStream extends OutputStream {
* Flush data that has been enqued but not flushed after a certain
* period of inactivity
*/
private class Flusher implements SimpleTimer.TimedEvent {
private class Flusher extends SimpleTimer2.TimedEvent {
private boolean _enqueued;
public Flusher() {
super(RetransmissionTimer.getInstance());
}
public void enqueue() {
// no need to be overly worried about duplicates - it would just
// push it further out
if (!_enqueued) {
RetransmissionTimer.getInstance().addEvent(_flusher, _passiveFlushDelay);
// Maybe we could just use schedule() here - or even SimpleScheduler - not sure...
// To be safe, use forceReschedule() so we don't get lots of duplicates
// We've seen the queue blow up before, maybe it was this before the rewrite...
// So perhaps it IS wise to be "overly worried" ...
forceReschedule(_passiveFlushDelay);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Enqueueing the flusher for " + _passiveFlushDelay + "ms out");
} else {

View File

@@ -41,6 +41,8 @@ import net.i2p.util.Log;
* <li>{@link #FLAG_DELAY_REQUESTED}: 2 byte integer</li>
* <li>{@link #FLAG_MAX_PACKET_SIZE_INCLUDED}: 2 byte integer</li>
* <li>{@link #FLAG_PROFILE_INTERACTIVE}: no option data</li>
* <li>{@link #FLAG_ECHO}: no option data</li>
* <li>{@link #FLAG_NO_ACK}: no option data</li>
* </ol>
*
* <p>If the signature is included, it uses the Destination's DSA key
@@ -144,7 +146,7 @@ public class Packet {
public static final int FLAG_NO_ACK = (1 << 10);
public static final int DEFAULT_MAX_SIZE = 32*1024;
private static final int MAX_DELAY_REQUEST = 65535;
protected static final int MAX_DELAY_REQUEST = 65535;
public Packet() { }
@@ -530,6 +532,8 @@ public class Packet {
public boolean verifySignature(I2PAppContext ctx, Destination from, byte buffer[]) {
if (!isFlagSet(FLAG_SIGNATURE_INCLUDED)) return false;
if (_optionSignature == null) return false;
// prevent receiveNewSyn() ... !active ... sendReset() ... verifySignature ... NPE
if (from == null) return false;
int size = writtenSize();

View File

@@ -6,7 +6,7 @@ import net.i2p.I2PAppContext;
import net.i2p.data.Destination;
import net.i2p.data.SessionKey;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer;
import net.i2p.util.SimpleTimer2;
/**
* coordinate local attributes about a packet - send time, ack time, number of
@@ -27,7 +27,7 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat
private long _cancelledOn;
private volatile int _nackCount;
private volatile boolean _retransmitted;
private SimpleTimer.TimedEvent _resendEvent;
private SimpleTimer2.TimedEvent _resendEvent;
public PacketLocal(I2PAppContext ctx, Destination to) {
this(ctx, to, null);
@@ -93,7 +93,7 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat
releasePayload();
notifyAll();
}
SimpleTimer.getInstance().removeEvent(_resendEvent);
_resendEvent.cancel();
}
public void cancelled() {
synchronized (this) {
@@ -101,11 +101,11 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat
releasePayload();
notifyAll();
}
SimpleTimer.getInstance().removeEvent(_resendEvent);
_resendEvent.cancel();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Cancelled! " + toString(), new Exception("cancelled"));
}
public SimpleTimer.TimedEvent getResendEvent() { return _resendEvent; }
public SimpleTimer2.TimedEvent getResendEvent() { return _resendEvent; }
/** how long after packet creation was it acked?
* @return how long after packet creation the packet was ACKed in ms
@@ -122,15 +122,15 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat
public void incrementNACKs() {
int cnt = ++_nackCount;
SimpleTimer.TimedEvent evt = _resendEvent;
SimpleTimer2.TimedEvent evt = _resendEvent;
if ( (cnt >= Connection.FAST_RETRANSMIT_THRESHOLD) && (evt != null) && (!_retransmitted)) {
_retransmitted = true;
RetransmissionTimer.getInstance().addEvent(evt, 0);
evt.reschedule(0);
}
}
public int getNACKs() { return _nackCount; }
public void setResendPacketEvent(SimpleTimer.TimedEvent evt) { _resendEvent = evt; }
public void setResendPacketEvent(SimpleTimer2.TimedEvent evt) { _resendEvent = evt; }
@Override
public StringBuffer formatAsString() {

View File

@@ -82,7 +82,24 @@ class PacketQueue {
// this should not block!
begin = _context.clock().now();
sent = _session.sendMessage(packet.getTo(), buf, 0, size, keyUsed, tagsSent);
long expires = 0;
Connection.ResendPacketEvent rpe = (Connection.ResendPacketEvent) packet.getResendEvent();
if (rpe != null)
// we want the router to expire it a little before we do,
// so if we retransmit it will use a new tunnel/lease combo
expires = rpe.getNextSendTime() - 500;
if (expires > 0)
// I2PSessionImpl2
//sent = _session.sendMessage(packet.getTo(), buf, 0, size, keyUsed, tagsSent, expires);
// I2PSessionMuxedImpl
sent = _session.sendMessage(packet.getTo(), buf, 0, size, keyUsed, tagsSent, expires,
I2PSession.PROTO_STREAMING, I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED);
else
// I2PSessionImpl2
//sent = _session.sendMessage(packet.getTo(), buf, 0, size, keyUsed, tagsSent, 0);
// I2PSessionMuxedImpl
sent = _session.sendMessage(packet.getTo(), buf, 0, size, keyUsed, tagsSent,
I2PSession.PROTO_STREAMING, I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED);
end = _context.clock().now();
if ( (end-begin > 1000) && (_log.shouldLog(Log.WARN)) )

View File

@@ -1,12 +1,12 @@
package net.i2p.client.streaming;
import net.i2p.util.SimpleTimer;
import net.i2p.util.SimpleTimer2;
/**
*
*/
public class RetransmissionTimer extends SimpleTimer {
public class RetransmissionTimer extends SimpleTimer2 {
private static final RetransmissionTimer _instance = new RetransmissionTimer();
public static final SimpleTimer getInstance() { return _instance; }
public static final RetransmissionTimer getInstance() { return _instance; }
protected RetransmissionTimer() { super("StreamingTimer"); }
}

View File

@@ -2,7 +2,7 @@ package net.i2p.client.streaming;
import net.i2p.I2PAppContext;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer;
import net.i2p.util.SimpleScheduler;
/**
* Base scheduler
@@ -17,6 +17,6 @@ abstract class SchedulerImpl implements TaskScheduler {
}
protected void reschedule(long msToWait, Connection con) {
SimpleTimer.getInstance().addEvent(con.getConnectionEvent(), msToWait);
SimpleScheduler.getInstance().addEvent(con.getConnectionEvent(), msToWait);
}
}

View File

@@ -0,0 +1,139 @@
package net.i2p.client.streaming;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import net.i2p.I2PAppContext;
import net.i2p.data.Destination;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer2;
/**
* Share important TCP Control Block parameters across Connections
* to the same remote peer.
* This is intended for "temporal" sharing at connection open/close time,
* not "ensemble" sharing during a connection. Ref. RFC 2140.
*
* There is a TCB share per ConnectionManager (i.e. per local Destination)
* so that there is no information leakage to other Destinations on the
* same router.
*
*/
public class TCBShare {
private I2PAppContext _context;
private Log _log;
private Map<Destination, Entry> _cache;
private CleanEvent _cleaner;
private static final long EXPIRE_TIME = 30*60*1000;
private static final long CLEAN_TIME = 10*60*1000;
private static final double RTT_DAMPENING = 0.75;
private static final double WDW_DAMPENING = 0.75;
private static final int MAX_RTT = ((int) Connection.MAX_RESEND_DELAY) / 2;
private static final int MAX_WINDOW_SIZE = Connection.MAX_WINDOW_SIZE / 4;
public TCBShare(I2PAppContext ctx) {
_context = ctx;
_log = ctx.logManager().getLog(TCBShare.class);
_cache = new ConcurrentHashMap(4);
_cleaner = new CleanEvent();
_cleaner.schedule(CLEAN_TIME);
}
public void stop() {
_cleaner.cancel();
}
public void updateOptsFromShare(Connection con) {
Destination dest = con.getRemotePeer();
if (dest == null)
return;
ConnectionOptions opts = con.getOptions();
if (opts == null)
return;
Entry e = _cache.get(dest);
if (e == null || e.isExpired())
return;
if (_log.shouldLog(Log.DEBUG))
_log.debug("From cache: " +
con.getSession().getMyDestination().calculateHash().toBase64().substring(0, 4) +
'-' +
dest.calculateHash().toBase64().substring(0, 4) +
" RTT: " + e.getRTT() + " wdw: " + e.getWindowSize());
opts.setRTT(e.getRTT());
opts.setWindowSize(e.getWindowSize());
}
public void updateShareOpts(Connection con) {
Destination dest = con.getRemotePeer();
if (dest == null)
return;
if (con.getAckedPackets() <= 0)
return;
ConnectionOptions opts = con.getOptions();
if (opts == null)
return;
int old = -1;
int oldw = -1;
Entry e = _cache.get(dest);
if (e == null || e.isExpired()) {
e = new Entry(opts.getRTT(), opts.getWindowSize());
_cache.put(dest, e);
} else {
old = e.getRTT();
oldw = e.getWindowSize();
e.setRTT(opts.getRTT());
e.setWindowSize(opts.getWindowSize());
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("To cache: " +
con.getSession().getMyDestination().calculateHash().toBase64().substring(0, 4) +
'-' +
dest.calculateHash().toBase64().substring(0, 4) +
" old: " + old + " con: " + opts.getRTT() + " new: " + e.getRTT() +
" oldw: " + oldw + " conw: " + opts.getWindowSize() + " neww: " + e.getWindowSize());
}
private class Entry {
int _rtt;
int _wdw;
long _updated;
public Entry(int ms, int wdw) {
_rtt = ms;
_wdw = wdw;
_updated = _context.clock().now();
}
public int getRTT() { return _rtt; }
public void setRTT(int ms) {
_rtt = (int)(RTT_DAMPENING*_rtt + (1-RTT_DAMPENING)*ms);
if (_rtt > MAX_RTT)
_rtt = MAX_RTT;
_updated = _context.clock().now();
}
public int getWindowSize() { return _wdw; }
public void setWindowSize(int wdw) {
_wdw = (int)(0.5 + WDW_DAMPENING*_wdw + (1-WDW_DAMPENING)*wdw);
if (_wdw > MAX_WINDOW_SIZE)
_wdw = MAX_WINDOW_SIZE;
_updated = _context.clock().now();
}
public boolean isExpired() {
return _updated < _context.clock().now() - EXPIRE_TIME;
}
}
private class CleanEvent extends SimpleTimer2.TimedEvent {
public CleanEvent() {
super(RetransmissionTimer.getInstance());
}
public void timeReached() {
for (Iterator iter = _cache.keySet().iterator(); iter.hasNext(); ) {
if (_cache.get(iter.next()).isExpired())
iter.remove();
}
schedule(CLEAN_TIME);
}
}
}