* Streaming:

- Hook I2CP ports through to I2PSocket
    - Javadocs, init cleanups, final
This commit is contained in:
zzz
2011-07-14 18:53:10 +00:00
parent 0ca035dc44
commit 9f433b2e6b
21 changed files with 361 additions and 106 deletions

View File

@@ -70,6 +70,21 @@ public interface I2PSocket {
public boolean isClosed(); public boolean isClosed();
public void setSocketErrorListener(SocketErrorListener lsnr); public void setSocketErrorListener(SocketErrorListener lsnr);
/**
* The remote port.
* @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0)
* @since 0.8.9
*/
public int getPort();
/**
* The local port.
* @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0)
* @since 0.8.9
*/
public int getLocalPort();
/** /**
* Allow notification of underlying errors communicating across I2P without * Allow notification of underlying errors communicating across I2P without
* waiting for any sort of cleanup process. For example, if some data could * waiting for any sort of cleanup process. For example, if some data could

View File

@@ -7,6 +7,7 @@ import java.io.OutputStream;
import net.i2p.I2PAppContext; import net.i2p.I2PAppContext;
import net.i2p.I2PException; import net.i2p.I2PException;
import net.i2p.client.I2PSession;
import net.i2p.client.I2PSessionException; import net.i2p.client.I2PSessionException;
import net.i2p.data.Destination; import net.i2p.data.Destination;
import net.i2p.util.Clock; import net.i2p.util.Clock;
@@ -301,6 +302,24 @@ class I2PSocketImpl implements I2PSocket {
public long getCreatedOn() { return _createdOn; } public long getCreatedOn() { return _createdOn; }
public long getClosedOn() { return _closedOn; } public long getClosedOn() { return _closedOn; }
/**
* The remote port.
* @return 0 always
* @since 0.8.9
*/
public int getPort() {
return I2PSession.PORT_UNSPECIFIED;
}
/**
* The local port.
* @return 0 always
* @since 0.8.9
*/
public int getLocalPort() {
return I2PSession.PORT_UNSPECIFIED;
}
private String getPrefix() { return "[" + _socketId + "]: "; } private String getPrefix() { return "[" + _socketId + "]: "; }
@@ -671,7 +690,7 @@ class I2PSocketImpl implements I2PSocket {
return sent; return sent;
} }
} }
@Override @Override
public String toString() { return "" + hashCode(); } public String toString() { return "" + hashCode(); }
} }

View File

@@ -90,7 +90,7 @@ public class I2PSocketManagerFactory {
* Create a socket manager using the destination loaded from the given private key * Create a socket manager using the destination loaded from the given private key
* stream and connected to the default I2CP host and port. * stream and connected to the default I2CP host and port.
* *
* @param myPrivateKeyStream private key stream * @param myPrivateKeyStream private key stream, format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile}
* @return the newly created socket manager, or null if there were errors * @return the newly created socket manager, or null if there were errors
*/ */
public static I2PSocketManager createManager(InputStream myPrivateKeyStream) { public static I2PSocketManager createManager(InputStream myPrivateKeyStream) {
@@ -101,7 +101,7 @@ public class I2PSocketManagerFactory {
* Create a socket manager using the destination loaded from the given private key * Create a socket manager using the destination loaded from the given private key
* stream and connected to the default I2CP host and port. * stream and connected to the default I2CP host and port.
* *
* @param myPrivateKeyStream private key stream * @param myPrivateKeyStream private key stream, format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile}
* @param opts I2CP options * @param opts I2CP options
* @return the newly created socket manager, or null if there were errors * @return the newly created socket manager, or null if there were errors
*/ */
@@ -114,7 +114,7 @@ public class I2PSocketManagerFactory {
* stream and connected to the I2CP router on the specified machine on the given * stream and connected to the I2CP router on the specified machine on the given
* port * port
* *
* @param myPrivateKeyStream private key stream * @param myPrivateKeyStream private key stream, format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile}
* @param i2cpHost I2CP host * @param i2cpHost I2CP host
* @param i2cpPort I2CP port * @param i2cpPort I2CP port
* @param opts I2CP options * @param opts I2CP options

View File

@@ -2,7 +2,7 @@ package net.i2p.client.streaming;
/** /**
* Define the configuration for streaming and verifying data on the socket. * Define the configuration for streaming and verifying data on the socket.
* * Use I2PSocketManager.buildOptions() to get one of these.
*/ */
public interface I2PSocketOptions { public interface I2PSocketOptions {
/** How much data will we accept that hasn't been written out yet. */ /** How much data will we accept that hasn't been written out yet. */
@@ -81,4 +81,32 @@ public interface I2PSocketOptions {
* @param ms wait time to block on the output stream while waiting for the data to flush. * @param ms wait time to block on the output stream while waiting for the data to flush.
*/ */
public void setWriteTimeout(long ms); public void setWriteTimeout(long ms);
/**
* The remote port.
* @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0)
* @since 0.8.9
*/
public int getPort();
/**
* The remote port.
* @param port 0 - 65535
* @since 0.8.9
*/
public void setPort(int port);
/**
* The local port.
* @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0)
* @since 0.8.9
*/
public int getLocalPort();
/**
* The local port.
* @param port 0 - 65535
* @since 0.8.9
*/
public void setLocalPort(int port);
} }

View File

@@ -4,22 +4,32 @@ import java.util.Properties;
/** /**
* Define the configuration for streaming and verifying data on the socket. * Define the configuration for streaming and verifying data on the socket.
* * Use I2PSocketManager.buildOptions() to get one of these.
*/ */
class I2PSocketOptionsImpl implements I2PSocketOptions { class I2PSocketOptionsImpl implements I2PSocketOptions {
private long _connectTimeout; private long _connectTimeout;
private long _readTimeout; private long _readTimeout;
private long _writeTimeout; private long _writeTimeout;
private int _maxBufferSize; private int _maxBufferSize;
private int _localPort;
private int _remotePort;
public static final int DEFAULT_BUFFER_SIZE = 1024*64; public static final int DEFAULT_BUFFER_SIZE = 1024*64;
public static final int DEFAULT_WRITE_TIMEOUT = -1; public static final int DEFAULT_WRITE_TIMEOUT = -1;
public static final int DEFAULT_CONNECT_TIMEOUT = 60*1000; public static final int DEFAULT_CONNECT_TIMEOUT = 60*1000;
/**
* Sets max buffer size, connect timeout, read timeout, and write timeout
* from System properties. Does not set local port or remote port.
*/
public I2PSocketOptionsImpl() { public I2PSocketOptionsImpl() {
this(System.getProperties()); this(System.getProperties());
} }
/**
* Initializes from System properties then copies over all options.
* @param opts may be null
*/
public I2PSocketOptionsImpl(I2PSocketOptions opts) { public I2PSocketOptionsImpl(I2PSocketOptions opts) {
this(System.getProperties()); this(System.getProperties());
if (opts != null) { if (opts != null) {
@@ -27,13 +37,25 @@ class I2PSocketOptionsImpl implements I2PSocketOptions {
_readTimeout = opts.getReadTimeout(); _readTimeout = opts.getReadTimeout();
_writeTimeout = opts.getWriteTimeout(); _writeTimeout = opts.getWriteTimeout();
_maxBufferSize = opts.getMaxBufferSize(); _maxBufferSize = opts.getMaxBufferSize();
_localPort = opts.getLocalPort();
_remotePort = opts.getPort();
} }
} }
/**
* Sets max buffer size, connect timeout, read timeout, and write timeout
* from properties. Does not set local port or remote port.
* @param opts may be null
*/
public I2PSocketOptionsImpl(Properties opts) { public I2PSocketOptionsImpl(Properties opts) {
init(opts); init(opts);
} }
/**
* Sets max buffer size, connect timeout, read timeout, and write timeout
* from properties. Does not set local port or remote port.
* @param opts may be null
*/
public void setProperties(Properties opts) { public void setProperties(Properties opts) {
if (opts == null) return; if (opts == null) return;
if (opts.containsKey(PROP_BUFFER_SIZE)) if (opts.containsKey(PROP_BUFFER_SIZE))
@@ -46,6 +68,10 @@ class I2PSocketOptionsImpl implements I2PSocketOptions {
_writeTimeout = getInt(opts, PROP_WRITE_TIMEOUT, DEFAULT_WRITE_TIMEOUT); _writeTimeout = getInt(opts, PROP_WRITE_TIMEOUT, DEFAULT_WRITE_TIMEOUT);
} }
/**
* Sets max buffer size, connect timeout, read timeout, and write timeout
* from properties. Does not set local port or remote port.
*/
protected void init(Properties opts) { protected void init(Properties opts) {
_maxBufferSize = getInt(opts, PROP_BUFFER_SIZE, DEFAULT_BUFFER_SIZE); _maxBufferSize = getInt(opts, PROP_BUFFER_SIZE, DEFAULT_BUFFER_SIZE);
_connectTimeout = getInt(opts, PROP_CONNECT_TIMEOUT, DEFAULT_CONNECT_TIMEOUT); _connectTimeout = getInt(opts, PROP_CONNECT_TIMEOUT, DEFAULT_CONNECT_TIMEOUT);
@@ -144,4 +170,40 @@ class I2PSocketOptionsImpl implements I2PSocketOptions {
public void setWriteTimeout(long ms) { public void setWriteTimeout(long ms) {
_writeTimeout = ms; _writeTimeout = ms;
} }
/**
* The remote port.
* @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0)
* @since 0.8.9
*/
public int getPort() {
return _remotePort;
}
/**
* The remote port.
* @param port 0 - 65535
* @since 0.8.9
*/
public void setPort(int port) {
_remotePort = port;
}
/**
* The local port.
* @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0)
* @since 0.8.9
*/
public int getLocalPort() {
return _localPort;
}
/**
* The local port.
* @param port 0 - 65535
* @since 0.8.9
*/
public void setLocalPort(int port) {
_localPort = port;
}
} }

View File

@@ -23,25 +23,25 @@ import net.i2p.util.SimpleTimer2;
* *
*/ */
class Connection { class Connection {
private I2PAppContext _context; private final I2PAppContext _context;
private Log _log; private final Log _log;
private ConnectionManager _connectionManager; private final ConnectionManager _connectionManager;
private Destination _remotePeer; private Destination _remotePeer;
private long _sendStreamId; private long _sendStreamId;
private long _receiveStreamId; private long _receiveStreamId;
private long _lastSendTime; private long _lastSendTime;
private AtomicLong _lastSendId; private final AtomicLong _lastSendId;
private boolean _resetReceived; private boolean _resetReceived;
private boolean _resetSent; private boolean _resetSent;
private long _resetSentOn; private long _resetSentOn;
private boolean _connected; private boolean _connected;
private boolean _hardDisconnected; private boolean _hardDisconnected;
private MessageInputStream _inputStream; private final MessageInputStream _inputStream;
private MessageOutputStream _outputStream; private final MessageOutputStream _outputStream;
private SchedulerChooser _chooser; private final SchedulerChooser _chooser;
private long _nextSendTime; private long _nextSendTime;
private long _ackedPackets; private long _ackedPackets;
private long _createdOn; private final long _createdOn;
private long _closeSentOn; private long _closeSentOn;
private long _closeReceivedOn; private long _closeReceivedOn;
private int _unackedPacketsReceived; private int _unackedPacketsReceived;
@@ -51,10 +51,10 @@ class Connection {
private boolean _updatedShareOpts; private boolean _updatedShareOpts;
/** Packet ID (Long) to PacketLocal for sent but unacked packets */ /** Packet ID (Long) to PacketLocal for sent but unacked packets */
private final Map<Long, PacketLocal> _outboundPackets; private final Map<Long, PacketLocal> _outboundPackets;
private PacketQueue _outboundQueue; private final PacketQueue _outboundQueue;
private ConnectionPacketHandler _handler; private final ConnectionPacketHandler _handler;
private ConnectionOptions _options; private ConnectionOptions _options;
private ConnectionDataReceiver _receiver; private final ConnectionDataReceiver _receiver;
private I2PSocketFull _socket; private I2PSocketFull _socket;
/** set to an error cause if the connection could not be established */ /** set to an error cause if the connection could not be established */
private String _connectionError; private String _connectionError;
@@ -70,8 +70,10 @@ class Connection {
private final Object _connectLock; private final Object _connectLock;
/** how many messages have been resent and not yet ACKed? */ /** how many messages have been resent and not yet ACKed? */
private int _activeResends; private int _activeResends;
private ConEvent _connectionEvent; private final ConEvent _connectionEvent;
private int _randomWait; private final int _randomWait;
private int _localPort;
private int _remotePort;
private long _lifetimeBytesSent; private long _lifetimeBytesSent;
private long _lifetimeBytesReceived; private long _lifetimeBytesReceived;
@@ -86,10 +88,13 @@ class Connection {
public static final int MAX_WINDOW_SIZE = 128; public static final int MAX_WINDOW_SIZE = 128;
public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, PacketQueue queue, ConnectionPacketHandler handler) { public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser,
PacketQueue queue, ConnectionPacketHandler handler) {
this(ctx, manager, chooser, queue, handler, null); this(ctx, manager, chooser, queue, handler, null);
} }
public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, PacketQueue queue, ConnectionPacketHandler handler, ConnectionOptions opts) {
public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser,
PacketQueue queue, ConnectionPacketHandler handler, ConnectionOptions opts) {
_context = ctx; _context = ctx;
_connectionManager = manager; _connectionManager = manager;
_chooser = chooser; _chooser = chooser;
@@ -101,34 +106,31 @@ class Connection {
// FIXME pass through a passive flush delay setting as the 4th arg // FIXME pass through a passive flush delay setting as the 4th arg
_outputStream = new MessageOutputStream(_context, _receiver, (opts == null ? Packet.MAX_PAYLOAD_SIZE : opts.getMaxMessageSize())); _outputStream = new MessageOutputStream(_context, _receiver, (opts == null ? Packet.MAX_PAYLOAD_SIZE : opts.getMaxMessageSize()));
_outboundPackets = new TreeMap(); _outboundPackets = new TreeMap();
if (opts != null) {
_localPort = opts.getLocalPort();
_remotePort = opts.getPort();
}
_options = (opts != null ? opts : new ConnectionOptions()); _options = (opts != null ? opts : new ConnectionOptions());
_outputStream.setWriteTimeout((int)_options.getWriteTimeout()); _outputStream.setWriteTimeout((int)_options.getWriteTimeout());
_inputStream.setReadTimeout((int)_options.getReadTimeout()); _inputStream.setReadTimeout((int)_options.getReadTimeout());
_lastSendId = new AtomicLong(-1); _lastSendId = new AtomicLong(-1);
_nextSendTime = -1; _nextSendTime = -1;
_ackedPackets = 0;
_createdOn = _context.clock().now(); _createdOn = _context.clock().now();
_closeSentOn = -1; _closeSentOn = -1;
_closeReceivedOn = -1; _closeReceivedOn = -1;
_unackedPacketsReceived = 0;
_congestionWindowEnd = _options.getWindowSize()-1; _congestionWindowEnd = _options.getWindowSize()-1;
_highestAckedThrough = -1; _highestAckedThrough = -1;
_lastCongestionSeenAt = MAX_WINDOW_SIZE*2; // lets allow it to grow _lastCongestionSeenAt = MAX_WINDOW_SIZE*2; // lets allow it to grow
_lastCongestionTime = -1; _lastCongestionTime = -1;
_lastCongestionHighestUnacked = -1; _lastCongestionHighestUnacked = -1;
_resetReceived = false;
_connected = true; _connected = true;
_disconnectScheduledOn = -1; _disconnectScheduledOn = -1;
_lastReceivedOn = -1; _lastReceivedOn = -1;
_activityTimer = new ActivityTimer(); _activityTimer = new ActivityTimer();
_ackSinceCongestion = true; _ackSinceCongestion = true;
_connectLock = new Object(); _connectLock = new Object();
_activeResends = 0;
_resetSentOn = -1; _resetSentOn = -1;
_isInbound = false;
_updatedShareOpts = false;
_connectionEvent = new ConEvent(); _connectionEvent = new ConEvent();
_hardDisconnected = false;
_randomWait = _context.random().nextInt(10*1000); // just do this once to reduce usage _randomWait = _context.random().nextInt(10*1000); // just do this once to reduce usage
_context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("stream.chokeSizeBegin", "How many messages were outstanding when we started to choke?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.chokeSizeBegin", "How many messages were outstanding when we started to choke?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
@@ -678,6 +680,23 @@ class Connection {
public I2PSocketFull getSocket() { return _socket; } public I2PSocketFull getSocket() { return _socket; }
public void setSocket(I2PSocketFull socket) { _socket = socket; } public void setSocket(I2PSocketFull socket) { _socket = socket; }
/**
* The remote port.
* @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0)
* @since 0.8.9
*/
public int getPort() {
return _remotePort;
}
/**
* @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0)
* @since 0.8.9
*/
public int getLocalPort() {
return _localPort;
}
public String getConnectionError() { return _connectionError; } public String getConnectionError() { return _connectionError; }
public void setConnectionError(String err) { _connectionError = err; } public void setConnectionError(String err) { _connectionError = err; }
@@ -781,7 +800,7 @@ class Connection {
} }
public int getLastCongestionSeenAt() { return _lastCongestionSeenAt; } public int getLastCongestionSeenAt() { return _lastCongestionSeenAt; }
void congestionOccurred() { void congestionOccurred() {
// if we hit congestion and e.g. 5 packets are resent, // if we hit congestion and e.g. 5 packets are resent,
// dont set the size to (winSize >> 4). only set the // dont set the size to (winSize >> 4). only set the
@@ -962,12 +981,13 @@ class Connection {
* @return the inbound message stream * @return the inbound message stream
*/ */
public MessageInputStream getInputStream() { return _inputStream; } public MessageInputStream getInputStream() { return _inputStream; }
/** stream that the local peer sends data to the remote peer on /** stream that the local peer sends data to the remote peer on
* @return the outbound message stream * @return the outbound message stream
*/ */
public MessageOutputStream getOutputStream() { return _outputStream; } public MessageOutputStream getOutputStream() { return _outputStream; }
@Override @Override
public String toString() { public String toString() {
StringBuilder buf = new StringBuilder(128); StringBuilder buf = new StringBuilder(128);
buf.append("[Connection "); buf.append("[Connection ");

View File

@@ -13,11 +13,14 @@ import net.i2p.util.Log;
* *
*/ */
class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
private I2PAppContext _context; private final I2PAppContext _context;
private Log _log; private final Log _log;
private Connection _connection; private final Connection _connection;
private static final MessageOutputStream.WriteStatus _dummyStatus = new DummyStatus(); private static final MessageOutputStream.WriteStatus _dummyStatus = new DummyStatus();
/**
* @param con non-null
*/
public ConnectionDataReceiver(I2PAppContext ctx, Connection con) { public ConnectionDataReceiver(I2PAppContext ctx, Connection con) {
_context = ctx; _context = ctx;
_log = ctx.logManager().getLog(ConnectionDataReceiver.class); _log = ctx.logManager().getLog(ConnectionDataReceiver.class);
@@ -41,10 +44,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
* @return !flush * @return !flush
*/ */
public boolean writeInProcess() { public boolean writeInProcess() {
Connection con = _connection; return _connection.getUnackedPacketsSent() >= _connection.getOptions().getWindowSize();
if (con != null)
return con.getUnackedPacketsSent() >= con.getOptions().getWindowSize();
return false;
} }
/** /**
@@ -60,7 +60,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
*/ */
public MessageOutputStream.WriteStatus writeData(byte[] buf, int off, int size) { public MessageOutputStream.WriteStatus writeData(byte[] buf, int off, int size) {
Connection con = _connection; Connection con = _connection;
if (con == null) return _dummyStatus; //if (con == null) return _dummyStatus;
boolean doSend = true; boolean doSend = true;
if ( (size <= 0) && (con.getLastSendId() >= 0) ) { if ( (size <= 0) && (con.getLastSendId() >= 0) ) {
if (con.getOutputStream().getClosed()) { if (con.getOutputStream().getClosed()) {
@@ -121,7 +121,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
*/ */
public PacketLocal send(byte buf[], int off, int size, boolean forceIncrement) { public PacketLocal send(byte buf[], int off, int size, boolean forceIncrement) {
Connection con = _connection; Connection con = _connection;
if (con == null) return null; //if (con == null) return null;
long before = System.currentTimeMillis(); long before = System.currentTimeMillis();
PacketLocal packet = buildPacket(con, buf, off, size, forceIncrement); PacketLocal packet = buildPacket(con, buf, off, size, forceIncrement);
long built = System.currentTimeMillis(); long built = System.currentTimeMillis();
@@ -185,6 +185,8 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
packet.setFlag(Packet.FLAG_SYNCHRONIZE); packet.setFlag(Packet.FLAG_SYNCHRONIZE);
packet.setOptionalFrom(con.getSession().getMyDestination()); packet.setOptionalFrom(con.getSession().getMyDestination());
packet.setOptionalMaxSize(con.getOptions().getMaxMessageSize()); packet.setOptionalMaxSize(con.getOptions().getMaxMessageSize());
packet.setLocalPort(con.getLocalPort());
packet.setRemotePort(con.getPort());
} }
if (con.getSendStreamId() == Packet.STREAM_ID_UNKNOWN) { if (con.getSendStreamId() == Packet.STREAM_ID_UNKNOWN) {
packet.setFlag(Packet.FLAG_NO_ACK); packet.setFlag(Packet.FLAG_NO_ACK);

View File

@@ -18,10 +18,10 @@ import net.i2p.util.SimpleTimer;
* @author zzz modded to use concurrent and bound queue size * @author zzz modded to use concurrent and bound queue size
*/ */
class ConnectionHandler { class ConnectionHandler {
private I2PAppContext _context; private final I2PAppContext _context;
private Log _log; private final Log _log;
private ConnectionManager _manager; private final ConnectionManager _manager;
private LinkedBlockingQueue<Packet> _synQueue; private final LinkedBlockingQueue<Packet> _synQueue;
private boolean _active; private boolean _active;
private int _acceptTimeout; private int _acceptTimeout;
@@ -41,7 +41,6 @@ class ConnectionHandler {
_log = context.logManager().getLog(ConnectionHandler.class); _log = context.logManager().getLog(ConnectionHandler.class);
_manager = mgr; _manager = mgr;
_synQueue = new LinkedBlockingQueue<Packet>(MAX_QUEUE_SIZE); _synQueue = new LinkedBlockingQueue<Packet>(MAX_QUEUE_SIZE);
_active = false;
_acceptTimeout = DEFAULT_ACCEPT_TIMEOUT; _acceptTimeout = DEFAULT_ACCEPT_TIMEOUT;
} }

View File

@@ -21,24 +21,24 @@ import net.i2p.util.SimpleTimer;
* *
*/ */
class ConnectionManager { class ConnectionManager {
private I2PAppContext _context; private final I2PAppContext _context;
private Log _log; private final Log _log;
private I2PSession _session; private final I2PSession _session;
private MessageHandler _messageHandler; private final MessageHandler _messageHandler;
private PacketHandler _packetHandler; private final PacketHandler _packetHandler;
private ConnectionHandler _connectionHandler; private final ConnectionHandler _connectionHandler;
private PacketQueue _outboundQueue; private final PacketQueue _outboundQueue;
private SchedulerChooser _schedulerChooser; private final SchedulerChooser _schedulerChooser;
private ConnectionPacketHandler _conPacketHandler; private final ConnectionPacketHandler _conPacketHandler;
private TCBShare _tcbShare; private final TCBShare _tcbShare;
/** Inbound stream ID (Long) to Connection map */ /** Inbound stream ID (Long) to Connection map */
private ConcurrentHashMap<Long, Connection> _connectionByInboundId; private final ConcurrentHashMap<Long, Connection> _connectionByInboundId;
/** Ping ID (Long) to PingRequest */ /** Ping ID (Long) to PingRequest */
private final Map<Long, PingRequest> _pendingPings; private final Map<Long, PingRequest> _pendingPings;
private boolean _allowIncoming; private boolean _allowIncoming;
private boolean _throttlersInitialized; private boolean _throttlersInitialized;
private int _maxConcurrentStreams; private int _maxConcurrentStreams;
private ConnectionOptions _defaultOptions; private final ConnectionOptions _defaultOptions;
private volatile int _numWaiting; private volatile int _numWaiting;
private long _soTimeout; private long _soTimeout;
private ConnThrottler _minuteThrottler; private ConnThrottler _minuteThrottler;
@@ -59,10 +59,12 @@ class ConnectionManager {
_schedulerChooser = new SchedulerChooser(_context); _schedulerChooser = new SchedulerChooser(_context);
_conPacketHandler = new ConnectionPacketHandler(_context); _conPacketHandler = new ConnectionPacketHandler(_context);
_tcbShare = new TCBShare(_context); _tcbShare = new TCBShare(_context);
_session.setSessionListener(_messageHandler); // PROTO_ANY is for backward compatibility (pre-0.7.1)
// TODO change proto to PROTO_STREAMING someday.
// Right now we get everything, and rely on Datagram to specify PROTO_UDP.
// PacketQueue has sent PROTO_STREAMING since the beginning of mux support (0.7.1)
_session.addMuxedSessionListener(_messageHandler, I2PSession.PROTO_ANY, I2PSession.PORT_ANY);
_outboundQueue = new PacketQueue(_context, _session, this); _outboundQueue = new PacketQueue(_context, _session, this);
_allowIncoming = false;
_numWaiting = 0;
/** Socket timeout for accept() */ /** Socket timeout for accept() */
_soTimeout = -1; _soTimeout = -1;
@@ -141,7 +143,10 @@ class ConnectionManager {
* it, or null if the syn's streamId was already taken * it, or null if the syn's streamId was already taken
*/ */
public Connection receiveConnection(Packet synPacket) { public Connection receiveConnection(Packet synPacket) {
Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, new ConnectionOptions(_defaultOptions)); ConnectionOptions opts = new ConnectionOptions(_defaultOptions);
opts.setPort(synPacket.getRemotePort());
opts.setLocalPort(synPacket.getLocalPort());
Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts);
_tcbShare.updateOptsFromShare(con); _tcbShare.updateOptsFromShare(con);
con.setInbound(); con.setInbound();
long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1; long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;

View File

@@ -106,6 +106,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
* This is based on documentation, the code, and logging, however there are still * This is based on documentation, the code, and logging, however there are still
* some parts that could use more research. * some parts that could use more research.
* *
*<pre>
* 1024 Tunnel Message * 1024 Tunnel Message
* - 21 Header (see router/tunnel/BatchedPreprocessor.java) * - 21 Header (see router/tunnel/BatchedPreprocessor.java)
* ----- * -----
@@ -169,7 +170,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
* Similarly: * Similarly:
* 3 msgs: 2722 * 3 msgs: 2722
* 4 msgs: 3714 * 4 msgs: 3714
* *</pre>
* *
* Before release 0.6.1.14 this was 4096. * Before release 0.6.1.14 this was 4096.
* From release 0.6.1.14 through release 0.6.4, this was 960. * From release 0.6.1.14 through release 0.6.4, this was 960.
@@ -205,18 +206,35 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
public static final int DEFAULT_MAX_MESSAGE_SIZE = 1730; public static final int DEFAULT_MAX_MESSAGE_SIZE = 1730;
public static final int MIN_MESSAGE_SIZE = 512; public static final int MIN_MESSAGE_SIZE = 512;
/**
* Sets max buffer size, connect timeout, read timeout, and write timeout
* from System properties. Does not set local port or remote port.
*/
public ConnectionOptions() { public ConnectionOptions() {
super(); super();
} }
/**
* Sets max buffer size, connect timeout, read timeout, and write timeout
* from properties. Does not set local port or remote port.
* @param opts may be null
*/
public ConnectionOptions(Properties opts) { public ConnectionOptions(Properties opts) {
super(opts); super(opts);
} }
/**
* Initializes from System properties then copies over all options.
* @param opts may be null
*/
public ConnectionOptions(I2PSocketOptions opts) { public ConnectionOptions(I2PSocketOptions opts) {
super(opts); super(opts);
} }
/**
* Initializes from System properties then copies over all options.
* @param opts may be null
*/
public ConnectionOptions(ConnectionOptions opts) { public ConnectionOptions(ConnectionOptions opts) {
super(opts); super(opts);
if (opts != null) { if (opts != null) {
@@ -235,8 +253,10 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
setInboundBufferSize(opts.getInboundBufferSize()); setInboundBufferSize(opts.getInboundBufferSize());
setCongestionAvoidanceGrowthRateFactor(opts.getCongestionAvoidanceGrowthRateFactor()); setCongestionAvoidanceGrowthRateFactor(opts.getCongestionAvoidanceGrowthRateFactor());
setSlowStartGrowthRateFactor(opts.getSlowStartGrowthRateFactor()); setSlowStartGrowthRateFactor(opts.getSlowStartGrowthRateFactor());
setWriteTimeout(opts.getWriteTimeout()); // handled in super()
setReadTimeout(opts.getReadTimeout()); // not clear why added by jr 12/22/2005
//setWriteTimeout(opts.getWriteTimeout());
//setReadTimeout(opts.getReadTimeout());
setAnswerPings(opts.getAnswerPings()); setAnswerPings(opts.getAnswerPings());
initLists(opts); initLists(opts);
_maxConnsPerMinute = opts.getMaxConnsPerMinute(); _maxConnsPerMinute = opts.getMaxConnsPerMinute();
@@ -248,7 +268,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
} }
} }
@Override @Override
protected void init(Properties opts) { protected void init(Properties opts) {
super.init(opts); super.init(opts);
_trend = new int[TREND_COUNT]; _trend = new int[TREND_COUNT];
@@ -262,12 +282,14 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, DEFAULT_INITIAL_ACK_DELAY)); setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, DEFAULT_INITIAL_ACK_DELAY));
setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, INITIAL_WINDOW_SIZE)); setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, INITIAL_WINDOW_SIZE));
setMaxResends(getInt(opts, PROP_MAX_RESENDS, DEFAULT_MAX_SENDS)); setMaxResends(getInt(opts, PROP_MAX_RESENDS, DEFAULT_MAX_SENDS));
setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1)); // handled in super()
//setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1));
setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 90*1000)); setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 90*1000));
setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, INACTIVITY_ACTION_SEND)); setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, INACTIVITY_ACTION_SEND));
setInboundBufferSize(getMaxMessageSize() * (Connection.MAX_WINDOW_SIZE + 2)); setInboundBufferSize(getMaxMessageSize() * (Connection.MAX_WINDOW_SIZE + 2));
setCongestionAvoidanceGrowthRateFactor(getInt(opts, PROP_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR, 1)); setCongestionAvoidanceGrowthRateFactor(getInt(opts, PROP_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR, 1));
setSlowStartGrowthRateFactor(getInt(opts, PROP_SLOW_START_GROWTH_RATE_FACTOR, 1)); setSlowStartGrowthRateFactor(getInt(opts, PROP_SLOW_START_GROWTH_RATE_FACTOR, 1));
// overrides default in super()
setConnectTimeout(getInt(opts, PROP_CONNECT_TIMEOUT, Connection.DISCONNECT_TIMEOUT)); setConnectTimeout(getInt(opts, PROP_CONNECT_TIMEOUT, Connection.DISCONNECT_TIMEOUT));
setAnswerPings(getBool(opts, PROP_ANSWER_PINGS, DEFAULT_ANSWER_PINGS)); setAnswerPings(getBool(opts, PROP_ANSWER_PINGS, DEFAULT_ANSWER_PINGS));
initLists(opts); initLists(opts);
@@ -279,7 +301,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
_maxTotalConnsPerDay = getInt(opts, PROP_MAX_TOTAL_CONNS_DAY, 0); _maxTotalConnsPerDay = getInt(opts, PROP_MAX_TOTAL_CONNS_DAY, 0);
} }
@Override @Override
public void setProperties(Properties opts) { public void setProperties(Properties opts) {
super.setProperties(opts); super.setProperties(opts);
if (opts == null) return; if (opts == null) return;
@@ -303,8 +325,9 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, INITIAL_WINDOW_SIZE)); setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, INITIAL_WINDOW_SIZE));
if (opts.containsKey(PROP_MAX_RESENDS)) if (opts.containsKey(PROP_MAX_RESENDS))
setMaxResends(getInt(opts, PROP_MAX_RESENDS, DEFAULT_MAX_SENDS)); setMaxResends(getInt(opts, PROP_MAX_RESENDS, DEFAULT_MAX_SENDS));
if (opts.containsKey(PROP_WRITE_TIMEOUT)) // handled in super()
setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1)); //if (opts.containsKey(PROP_WRITE_TIMEOUT))
// setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1));
if (opts.containsKey(PROP_INACTIVITY_TIMEOUT)) if (opts.containsKey(PROP_INACTIVITY_TIMEOUT))
setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 90*1000)); setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 90*1000));
if (opts.containsKey(PROP_INACTIVITY_ACTION)) if (opts.containsKey(PROP_INACTIVITY_ACTION))
@@ -316,6 +339,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
setSlowStartGrowthRateFactor(getInt(opts, PROP_SLOW_START_GROWTH_RATE_FACTOR, 2)); setSlowStartGrowthRateFactor(getInt(opts, PROP_SLOW_START_GROWTH_RATE_FACTOR, 2));
if (opts.containsKey(PROP_CONNECT_TIMEOUT)) if (opts.containsKey(PROP_CONNECT_TIMEOUT))
// wow 5 minutes!!! FIXME!! // wow 5 minutes!!! FIXME!!
// overrides default in super()
setConnectTimeout(getInt(opts, PROP_CONNECT_TIMEOUT, Connection.DISCONNECT_TIMEOUT)); setConnectTimeout(getInt(opts, PROP_CONNECT_TIMEOUT, Connection.DISCONNECT_TIMEOUT));
if (opts.containsKey(PROP_ANSWER_PINGS)) if (opts.containsKey(PROP_ANSWER_PINGS))
setAnswerPings(getBool(opts, PROP_ANSWER_PINGS, DEFAULT_ANSWER_PINGS)); setAnswerPings(getBool(opts, PROP_ANSWER_PINGS, DEFAULT_ANSWER_PINGS));

View File

@@ -15,8 +15,8 @@ import net.i2p.util.SimpleTimer;
* *
*/ */
class ConnectionPacketHandler { class ConnectionPacketHandler {
private I2PAppContext _context; private final I2PAppContext _context;
private Log _log; private final Log _log;
public ConnectionPacketHandler(I2PAppContext context) { public ConnectionPacketHandler(I2PAppContext context) {
_context = context; _context = context;

View File

@@ -8,7 +8,7 @@ import net.i2p.I2PException;
* *
*/ */
class I2PServerSocketFull implements I2PServerSocket { class I2PServerSocketFull implements I2PServerSocket {
private I2PSocketManagerFull _socketManager; private final I2PSocketManagerFull _socketManager;
public I2PServerSocketFull(I2PSocketManagerFull mgr) { public I2PServerSocketFull(I2PSocketManagerFull mgr) {
_socketManager = mgr; _socketManager = mgr;

View File

@@ -4,6 +4,7 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import net.i2p.client.I2PSession;
import net.i2p.data.Destination; import net.i2p.data.Destination;
/** /**
@@ -127,7 +128,28 @@ class I2PSocketFull implements I2PSocket {
if (c != null) if (c != null)
c.disconnectComplete(); c.disconnectComplete();
} }
@Override
/**
* The remote port.
* @return the port or 0 if unknown
* @since 0.8.9
*/
public int getPort() {
Connection c = _connection;
return c == null ? I2PSession.PORT_UNSPECIFIED : c.getPort();
}
/**
* The local port.
* @return the port or 0 if unknown
* @since 0.8.9
*/
public int getLocalPort() {
Connection c = _connection;
return c == null ? I2PSession.PORT_UNSPECIFIED : c.getLocalPort();
}
@Override
public String toString() { public String toString() {
Connection c = _connection; Connection c = _connection;
if (c == null) if (c == null)

View File

@@ -7,7 +7,7 @@ import java.util.concurrent.CopyOnWriteArraySet;
import net.i2p.I2PAppContext; import net.i2p.I2PAppContext;
import net.i2p.client.I2PSession; import net.i2p.client.I2PSession;
import net.i2p.client.I2PSessionException; import net.i2p.client.I2PSessionException;
import net.i2p.client.I2PSessionListener; import net.i2p.client.I2PSessionMuxedListener;
import net.i2p.util.Log; import net.i2p.util.Log;
/** /**
@@ -15,10 +15,10 @@ import net.i2p.util.Log;
* Packets, if we can. * Packets, if we can.
* *
*/ */
class MessageHandler implements I2PSessionListener { class MessageHandler implements I2PSessionMuxedListener {
private ConnectionManager _manager; private final ConnectionManager _manager;
private I2PAppContext _context; private final I2PAppContext _context;
private Log _log; private final Log _log;
private final Set<I2PSocketManager.DisconnectListener> _listeners; private final Set<I2PSocketManager.DisconnectListener> _listeners;
public MessageHandler(I2PAppContext ctx, ConnectionManager mgr) { public MessageHandler(I2PAppContext ctx, ConnectionManager mgr) {
@@ -31,11 +31,23 @@ class MessageHandler implements I2PSessionListener {
/** Instruct the client that the given session has received a message with /** Instruct the client that the given session has received a message with
* size # of bytes. * size # of bytes.
* This shouldn't be called anymore since we are registering as a muxed listener.
* @param session session to notify * @param session session to notify
* @param msgId message number available * @param msgId message number available
* @param size size of the message * @param size size of the message
*/ */
public void messageAvailable(I2PSession session, int msgId, long size) { public void messageAvailable(I2PSession session, int msgId, long size) {
messageAvailable(session, msgId, size, I2PSession.PROTO_UNSPECIFIED,
I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED);
}
/** Instruct the client that the given session has received a message with
* size # of bytes.
* @param session session to notify
* @param msgId message number available
* @param size size of the message
*/
public void messageAvailable(I2PSession session, int msgId, long size, int proto, int fromPort, int toPort) {
byte data[] = null; byte data[] = null;
try { try {
data = session.receiveMessage(msgId); data = session.receiveMessage(msgId);
@@ -49,6 +61,8 @@ class MessageHandler implements I2PSessionListener {
Packet packet = new Packet(); Packet packet = new Packet();
try { try {
packet.readPacket(data, 0, data.length); packet.readPacket(data, 0, data.length);
packet.setRemotePort(fromPort);
packet.setLocalPort(toPort);
_manager.getPacketHandler().receivePacket(packet); _manager.getPacketHandler().receivePacket(packet);
} catch (IllegalArgumentException iae) { } catch (IllegalArgumentException iae) {
_context.statManager().addRateData("stream.packetReceiveFailure", 1, 0); _context.statManager().addRateData("stream.packetReceiveFailure", 1, 0);

View File

@@ -63,14 +63,10 @@ class MessageOutputStream extends OutputStream {
_buf = _dataCache.acquire().getData(); // new byte[bufSize]; _buf = _dataCache.acquire().getData(); // new byte[bufSize];
_dataReceiver = receiver; _dataReceiver = receiver;
_dataLock = new Object(); _dataLock = new Object();
_written = 0;
_closed = false;
_writeTimeout = -1; _writeTimeout = -1;
_passiveFlushDelay = passiveFlushDelay; _passiveFlushDelay = passiveFlushDelay;
_nextBufferSize = -1; _nextBufferSize = -1;
_sendPeriodBeginTime = ctx.clock().now(); _sendPeriodBeginTime = ctx.clock().now();
_sendPeriodBytes = 0;
_sendBps = 0;
_context.statManager().createRateStat("stream.sendBps", "How fast we pump data through the stream", "Stream", new long[] { 60*1000, 5*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.sendBps", "How fast we pump data through the stream", "Stream", new long[] { 60*1000, 5*60*1000, 60*60*1000 });
_flusher = new Flusher(); _flusher = new Flusher();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))

View File

@@ -13,6 +13,13 @@ import net.i2p.data.SigningPrivateKey;
import net.i2p.util.Log; import net.i2p.util.Log;
/** /**
* This contains solely the data that goes out on the wire,
* including the local and remote port which is embedded in
* the I2CP overhead, not in the packet itself.
* For local state saved for outbound packets, see PacketLocal.
*
* <p>
*
* Contain a single packet transferred as part of a streaming connection. * Contain a single packet transferred as part of a streaming connection.
* The data format is as follows:<ul> * The data format is as follows:<ul>
* <li>{@link #getSendStreamId sendStreamId} [4 byte value]</li> * <li>{@link #getSendStreamId sendStreamId} [4 byte value]</li>
@@ -67,6 +74,8 @@ class Packet {
private Destination _optionFrom; private Destination _optionFrom;
private int _optionDelay; private int _optionDelay;
private int _optionMaxSize; private int _optionMaxSize;
private int _localPort;
private int _remotePort;
/** /**
* The receiveStreamId will be set to this when the packet doesn't know * The receiveStreamId will be set to this when the packet doesn't know
@@ -148,6 +157,10 @@ class Packet {
public static final int DEFAULT_MAX_SIZE = 32*1024; public static final int DEFAULT_MAX_SIZE = 32*1024;
protected static final int MAX_DELAY_REQUEST = 65535; protected static final int MAX_DELAY_REQUEST = 65535;
/**
* Does no initialization.
* See readPacket() for inbound packets, and the setters for outbound packets.
*/
public Packet() { } public Packet() { }
private boolean _sendStreamIdSet = false; private boolean _sendStreamIdSet = false;
@@ -316,6 +329,40 @@ class Packet {
_optionMaxSize = numBytes; _optionMaxSize = numBytes;
} }
/**
* @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0)
* @since 0.8.9
*/
public int getLocalPort() {
return _localPort;
}
/**
* Must be called to change the port, not set by readPacket()
* as the port is out-of-band in the I2CP header.
* @since 0.8.9
*/
public void setLocalPort(int port) {
_localPort = port;
}
/**
* @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0)
* @since 0.8.9
*/
public int getRemotePort() {
return _remotePort;
}
/**
* Must be called to change the port, not set by readPacket()
* as the port is out-of-band in the I2CP header.
* @since 0.8.9
*/
public void setRemotePort(int port) {
_remotePort = port;
}
/** /**
* Write the packet to the buffer (starting at the offset) and return * Write the packet to the buffer (starting at the offset) and return
* the number of bytes written. * the number of bytes written.

View File

@@ -16,9 +16,9 @@ import net.i2p.util.Log;
* *
*/ */
class PacketHandler { class PacketHandler {
private ConnectionManager _manager; private final ConnectionManager _manager;
private I2PAppContext _context; private final I2PAppContext _context;
private Log _log; private final Log _log;
//private int _lastDelay; //private int _lastDelay;
//private int _dropped; //private int _dropped;

View File

@@ -13,13 +13,13 @@ import net.i2p.util.SimpleTimer2;
* retries, etc. * retries, etc.
*/ */
class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
private I2PAppContext _context; private final I2PAppContext _context;
private Log _log; private final Log _log;
private Connection _connection; private final Connection _connection;
private Destination _to; private Destination _to;
private SessionKey _keyUsed; private SessionKey _keyUsed;
private Set _tagsSent; private Set _tagsSent;
private long _createdOn; private final long _createdOn;
private int _numSends; private int _numSends;
private long _lastSend; private long _lastSend;
private long _acceptedOn; private long _acceptedOn;
@@ -29,9 +29,11 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
private volatile boolean _retransmitted; private volatile boolean _retransmitted;
private SimpleTimer2.TimedEvent _resendEvent; private SimpleTimer2.TimedEvent _resendEvent;
/** not bound to a connection */
public PacketLocal(I2PAppContext ctx, Destination to) { public PacketLocal(I2PAppContext ctx, Destination to) {
this(ctx, to, null); this(ctx, to, null);
} }
public PacketLocal(I2PAppContext ctx, Destination to, Connection con) { public PacketLocal(I2PAppContext ctx, Destination to, Connection con) {
_context = ctx; _context = ctx;
_createdOn = ctx.clock().now(); _createdOn = ctx.clock().now();
@@ -40,8 +42,6 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
_connection = con; _connection = con;
_lastSend = -1; _lastSend = -1;
_cancelledOn = -1; _cancelledOn = -1;
_nackCount = 0;
_retransmitted = false;
} }
public Destination getTo() { return _to; } public Destination getTo() { return _to; }
@@ -138,6 +138,8 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
} }
public int getNumSends() { return _numSends; } public int getNumSends() { return _numSends; }
public long getLastSend() { return _lastSend; } public long getLastSend() { return _lastSend; }
/** @return null if not bound */
public Connection getConnection() { return _connection; } public Connection getConnection() { return _connection; }
public void incrementNACKs() { public void incrementNACKs() {

View File

@@ -19,11 +19,11 @@ import net.i2p.util.Log;
* *
*/ */
class PacketQueue { class PacketQueue {
private I2PAppContext _context; private final I2PAppContext _context;
private Log _log; private final Log _log;
private I2PSession _session; private final I2PSession _session;
private ConnectionManager _connectionManager; private final ConnectionManager _connectionManager;
private ByteCache _cache = ByteCache.getInstance(64, 36*1024); private final ByteCache _cache = ByteCache.getInstance(64, 36*1024);
public PacketQueue(I2PAppContext context, I2PSession session, ConnectionManager mgr) { public PacketQueue(I2PAppContext context, I2PSession session, ConnectionManager mgr) {
_context = context; _context = context;
@@ -98,7 +98,7 @@ class PacketQueue {
// I2PSession.PROTO_STREAMING, I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED); // I2PSession.PROTO_STREAMING, I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED);
// I2PSessionMuxedImpl no tags // I2PSessionMuxedImpl no tags
sent = _session.sendMessage(packet.getTo(), buf, 0, size, null, null, expires, sent = _session.sendMessage(packet.getTo(), buf, 0, size, null, null, expires,
I2PSession.PROTO_STREAMING, I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED); I2PSession.PROTO_STREAMING, packet.getLocalPort(), packet.getRemotePort());
else else
// I2PSessionImpl2 // I2PSessionImpl2
//sent = _session.sendMessage(packet.getTo(), buf, 0, size, keyUsed, tagsSent, 0); //sent = _session.sendMessage(packet.getTo(), buf, 0, size, keyUsed, tagsSent, 0);
@@ -107,7 +107,7 @@ class PacketQueue {
// I2PSession.PROTO_STREAMING, I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED); // I2PSession.PROTO_STREAMING, I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED);
// I2PSessionMuxedImpl no tags // I2PSessionMuxedImpl no tags
sent = _session.sendMessage(packet.getTo(), buf, 0, size, null, null, sent = _session.sendMessage(packet.getTo(), buf, 0, size, null, null,
I2PSession.PROTO_STREAMING, I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED); I2PSession.PROTO_STREAMING, packet.getLocalPort(), packet.getRemotePort());
end = _context.clock().now(); end = _context.clock().now();
if ( (end-begin > 1000) && (_log.shouldLog(Log.WARN)) ) if ( (end-begin > 1000) && (_log.shouldLog(Log.WARN)) )

View File

@@ -107,11 +107,11 @@ class StandardSocket extends Socket {
} }
/** /**
* @return -1 always * @return the port or 0 if unknown
*/ */
@Override @Override
public int getLocalPort() { public int getLocalPort() {
return -1; return _socket.getLocalPort();
} }
/** /**
@@ -139,11 +139,11 @@ class StandardSocket extends Socket {
} }
/** /**
* @return 0 always * @return the port or 0 if unknown
*/ */
@Override @Override
public int getPort() { public int getPort() {
return 0; return _socket.getPort();
} }
@Override @Override

View File

@@ -21,10 +21,10 @@ import net.i2p.util.SimpleTimer2;
* *
*/ */
class TCBShare { class TCBShare {
private I2PAppContext _context; private final I2PAppContext _context;
private Log _log; private final Log _log;
private Map<Destination, Entry> _cache; private final Map<Destination, Entry> _cache;
private CleanEvent _cleaner; private final CleanEvent _cleaner;
private static final long EXPIRE_TIME = 30*60*1000; private static final long EXPIRE_TIME = 30*60*1000;
private static final long CLEAN_TIME = 10*60*1000; private static final long CLEAN_TIME = 10*60*1000;