2005-09-26 jrandom

* Reworded the SSU introductions config section (thanks duck!)
    * Force identity content encoding for I2PTunnel httpserver requests
      (thanks redzara!)
    * Further x-i2p-gzip bugfixes for the end of streams
    * Reduce the minimum bandwidth limits to 3KBps steady and burst (though
      I2P's performance at 3KBps is another issue)
    * Cleaned up some streaming lib structures
This commit is contained in:
jrandom
2005-09-26 23:45:52 +00:00
committed by zzz
parent 56ecdcce82
commit aef33548b3
20 changed files with 193 additions and 179 deletions

View File

@@ -24,8 +24,8 @@ public class Connection {
private Log _log;
private ConnectionManager _connectionManager;
private Destination _remotePeer;
private byte _sendStreamId[];
private byte _receiveStreamId[];
private long _sendStreamId;
private long _receiveStreamId;
private long _lastSendTime;
private long _lastSendId;
private boolean _resetReceived;
@@ -205,7 +205,7 @@ public class Connection {
_resetSent = true;
if (_resetSentOn <= 0)
_resetSentOn = _context.clock().now();
if ( (_remotePeer == null) || (_sendStreamId == null) ) return;
if ( (_remotePeer == null) || (_sendStreamId <= 0) ) return;
PacketLocal reply = new PacketLocal(_context, _remotePeer);
reply.setFlag(Packet.FLAG_RESET);
reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
@@ -521,12 +521,12 @@ public class Connection {
public void setRemotePeer(Destination peer) { _remotePeer = peer; }
/** what stream do we send data to the peer on? */
public byte[] getSendStreamId() { return _sendStreamId; }
public void setSendStreamId(byte[] id) { _sendStreamId = id; }
public long getSendStreamId() { return _sendStreamId; }
public void setSendStreamId(long id) { _sendStreamId = id; }
/** stream the peer sends data to us on. (may be null) */
public byte[] getReceiveStreamId() { return _receiveStreamId; }
public void setReceiveStreamId(byte[] id) {
public long getReceiveStreamId() { return _receiveStreamId; }
public void setReceiveStreamId(long id) {
_receiveStreamId = id;
synchronized (_connectLock) { _connectLock.notifyAll(); }
}
@@ -653,7 +653,7 @@ public class Connection {
void waitForConnect() {
long expiration = _context.clock().now() + _options.getConnectTimeout();
while (true) {
if (_connected && (_receiveStreamId != null) && (_sendStreamId != null) ) {
if (_connected && (_receiveStreamId > 0) && (_sendStreamId > 0) ) {
// w00t
if (_log.shouldLog(Log.DEBUG))
_log.debug("waitForConnect(): Connected and we have stream IDs");
@@ -793,13 +793,13 @@ public class Connection {
public String toString() {
StringBuffer buf = new StringBuffer(128);
buf.append("[Connection ");
if (_receiveStreamId != null)
buf.append(Base64.encode(_receiveStreamId));
if (_receiveStreamId > 0)
buf.append(Packet.toId(_receiveStreamId));
else
buf.append("unknown");
buf.append("<-->");
if (_sendStreamId != null)
buf.append(Base64.encode(_sendStreamId));
if (_sendStreamId > 0)
buf.append(Packet.toId(_sendStreamId));
else
buf.append("unknown");
buf.append(" wsize: ").append(_options.getWindowSize());

View File

@@ -127,7 +127,7 @@ class ConnectionHandler {
reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
reply.setAckThrough(packet.getSequenceNum());
reply.setSendStreamId(packet.getReceiveStreamId());
reply.setReceiveStreamId(null);
reply.setReceiveStreamId(0);
reply.setOptionalFrom(_manager.getSession().getMyDestination());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending RST: " + reply + " because of " + packet);

View File

@@ -31,9 +31,9 @@ public class ConnectionManager {
private PacketQueue _outboundQueue;
private SchedulerChooser _schedulerChooser;
private ConnectionPacketHandler _conPacketHandler;
/** Inbound stream ID (ByteArray) to Connection map */
/** Inbound stream ID (Long) to Connection map */
private Map _connectionByInboundId;
/** Ping ID (ByteArray) to PingRequest */
/** Ping ID (Long) to PingRequest */
private Map _pendingPings;
private boolean _allowIncoming;
private int _maxConcurrentStreams;
@@ -71,16 +71,16 @@ public class ConnectionManager {
_context.statManager().createRateStat("stream.receiveActive", "How many streams are active when a new one is received (period being not yet dropped)", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
}
Connection getConnectionByInboundId(byte[] id) {
Connection getConnectionByInboundId(long id) {
synchronized (_connectionLock) {
return (Connection)_connectionByInboundId.get(new ByteArray(id));
return (Connection)_connectionByInboundId.get(new Long(id));
}
}
/**
* not guaranteed to be unique, but in case we receive more than one packet
* on an inbound connection that we havent ack'ed yet...
*/
Connection getConnectionByOutboundId(byte[] id) {
Connection getConnectionByOutboundId(long id) {
synchronized (_connectionLock) {
for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
Connection con = (Connection)iter.next();
@@ -107,8 +107,7 @@ public class ConnectionManager {
*/
public Connection receiveConnection(Packet synPacket) {
Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, new ConnectionOptions(_defaultOptions));
byte receiveId[] = new byte[4];
_context.random().nextBytes(receiveId);
long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
boolean reject = false;
int active = 0;
int total = 0;
@@ -122,16 +121,13 @@ public class ConnectionManager {
reject = true;
} else {
while (true) {
ByteArray ba = new ByteArray(receiveId);
Connection oldCon = (Connection)_connectionByInboundId.put(ba, con);
Connection oldCon = (Connection)_connectionByInboundId.put(new Long(receiveId), con);
if (oldCon == null) {
break;
} else {
_connectionByInboundId.put(ba, oldCon);
_connectionByInboundId.put(new Long(receiveId), oldCon);
// receiveId already taken, try another
// (need to realloc receiveId, as ba.getData() points to the old value)
receiveId = new byte[4];
_context.random().nextBytes(receiveId);
receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
}
}
}
@@ -148,7 +144,7 @@ public class ConnectionManager {
reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
reply.setAckThrough(synPacket.getSequenceNum());
reply.setSendStreamId(synPacket.getReceiveStreamId());
reply.setReceiveStreamId(null);
reply.setReceiveStreamId(0);
reply.setOptionalFrom(_session.getMyDestination());
// this just sends the packet - no retries or whatnot
_outboundQueue.enqueue(reply);
@@ -160,7 +156,7 @@ public class ConnectionManager {
con.getPacketHandler().receivePacket(synPacket, con);
} catch (I2PException ie) {
synchronized (_connectionLock) {
_connectionByInboundId.remove(new ByteArray(receiveId));
_connectionByInboundId.remove(new Long(receiveId));
}
return null;
}
@@ -179,8 +175,7 @@ public class ConnectionManager {
*/
public Connection connect(Destination peer, ConnectionOptions opts) {
Connection con = null;
byte receiveId[] = new byte[4];
_context.random().nextBytes(receiveId);
long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
long expiration = _context.clock().now() + opts.getConnectTimeout();
if (opts.getConnectTimeout() <= 0)
expiration = _context.clock().now() + DEFAULT_STREAM_DELAY_MAX;
@@ -213,11 +208,10 @@ public class ConnectionManager {
con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts);
con.setRemotePeer(peer);
ByteArray ba = new ByteArray(receiveId);
while (_connectionByInboundId.containsKey(ba)) {
_context.random().nextBytes(receiveId);
while (_connectionByInboundId.containsKey(new Long(receiveId))) {
receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
}
_connectionByInboundId.put(ba, con);
_connectionByInboundId.put(new Long(receiveId), con);
break; // stop looping as a psuedo-wait
}
}
@@ -284,7 +278,7 @@ public class ConnectionManager {
public void removeConnection(Connection con) {
boolean removed = false;
synchronized (_connectionLock) {
Object o = _connectionByInboundId.remove(new ByteArray(con.getReceiveStreamId()));
Object o = _connectionByInboundId.remove(new Long(con.getReceiveStreamId()));
removed = (o == con);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Connection removed? " + removed + " remaining: "
@@ -320,11 +314,9 @@ public class ConnectionManager {
return ping(peer, timeoutMs, blocking, null, null, null);
}
public boolean ping(Destination peer, long timeoutMs, boolean blocking, SessionKey keyToUse, Set tagsToSend, PingNotifier notifier) {
byte id[] = new byte[4];
_context.random().nextBytes(id);
ByteArray ba = new ByteArray(id);
Long id = new Long(_context.random().nextLong(Packet.MAX_STREAM_ID-1)+1);
PacketLocal packet = new PacketLocal(_context, peer);
packet.setSendStreamId(id);
packet.setSendStreamId(id.longValue());
packet.setFlag(Packet.FLAG_ECHO);
packet.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
packet.setOptionalFrom(_session.getMyDestination());
@@ -336,7 +328,7 @@ public class ConnectionManager {
PingRequest req = new PingRequest(peer, packet, notifier);
synchronized (_pendingPings) {
_pendingPings.put(ba, req);
_pendingPings.put(id, req);
}
_outboundQueue.enqueue(packet);
@@ -349,10 +341,10 @@ public class ConnectionManager {
}
synchronized (_pendingPings) {
_pendingPings.remove(ba);
_pendingPings.remove(id);
}
} else {
SimpleTimer.getInstance().addEvent(new PingFailed(ba, notifier), timeoutMs);
SimpleTimer.getInstance().addEvent(new PingFailed(id, notifier), timeoutMs);
}
boolean ok = req.pongReceived();
@@ -364,17 +356,17 @@ public class ConnectionManager {
}
private class PingFailed implements SimpleTimer.TimedEvent {
private ByteArray _ba;
private Long _id;
private PingNotifier _notifier;
public PingFailed(ByteArray ba, PingNotifier notifier) {
_ba = ba;
public PingFailed(Long id, PingNotifier notifier) {
_id = id;
_notifier = notifier;
}
public void timeReached() {
boolean removed = false;
synchronized (_pendingPings) {
Object o = _pendingPings.remove(_ba);
Object o = _pendingPings.remove(_id);
if (o != null)
removed = true;
}
@@ -411,11 +403,10 @@ public class ConnectionManager {
public boolean pongReceived() { return _ponged; }
}
void receivePong(byte pingId[]) {
ByteArray ba = new ByteArray(pingId);
void receivePong(long pingId) {
PingRequest req = null;
synchronized (_pendingPings) {
req = (PingRequest)_pendingPings.remove(ba);
req = (PingRequest)_pendingPings.remove(new Long(pingId));
}
if (req != null)
req.pong();

View File

@@ -96,10 +96,8 @@ public class ConnectionPacketHandler {
boolean allowAck = true;
if ( (!packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) &&
( (packet.getSendStreamId() == null) ||
(packet.getReceiveStreamId() == null) ||
(DataHelper.eq(packet.getSendStreamId(), Packet.STREAM_ID_UNKNOWN)) ||
(DataHelper.eq(packet.getReceiveStreamId(), Packet.STREAM_ID_UNKNOWN)) ) )
( (packet.getSendStreamId() <= 0) ||
(packet.getReceiveStreamId() <= 0) ) )
allowAck = false;
if (allowAck)
@@ -160,9 +158,7 @@ public class ConnectionPacketHandler {
}
}
if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE) &&
((packet.getSendStreamId() == null) ||
DataHelper.eq(packet.getSendStreamId(), Packet.STREAM_ID_UNKNOWN) ) ) {
if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE) && (packet.getSendStreamId() <= 0) ) {
// don't honor the ACK 0 in SYN packets received when the other side
// has obviously not seen our messages
} else {
@@ -197,8 +193,8 @@ public class ConnectionPacketHandler {
// could actually be acking data (this fixes the buggered up ack of packet 0 problem).
// this is called after packet verification, which places the stream IDs as necessary if
// the SYN verifies (so if we're acking w/out stream IDs, no SYN has been received yet)
if ( (packet != null) && (packet.getSendStreamId() != null) && (packet.getReceiveStreamId() != null) &&
(con != null) && (con.getSendStreamId() != null) && (con.getReceiveStreamId() != null) &&
if ( (packet != null) && (packet.getSendStreamId() > 0) && (packet.getReceiveStreamId() > 0) &&
(con != null) && (con.getSendStreamId() > 0) && (con.getReceiveStreamId() > 0) &&
(!DataHelper.eq(packet.getSendStreamId(), Packet.STREAM_ID_UNKNOWN)) &&
(!DataHelper.eq(packet.getReceiveStreamId(), Packet.STREAM_ID_UNKNOWN)) &&
(!DataHelper.eq(con.getSendStreamId(), Packet.STREAM_ID_UNKNOWN)) &&
@@ -335,7 +331,7 @@ public class ConnectionPacketHandler {
} else {
verifySignature(packet, con);
if (con.getSendStreamId() == null) {
if (con.getSendStreamId() <= 0) {
if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) {
con.setSendStreamId(packet.getReceiveStreamId());
con.setRemotePeer(packet.getOptionalFrom());

View File

@@ -51,8 +51,8 @@ import net.i2p.util.ByteCache;
*
*/
public class Packet {
private byte _sendStreamId[];
private byte _receiveStreamId[];
private long _sendStreamId;
private long _receiveStreamId;
private long _sequenceNum;
private long _ackThrough;
private long _nacks[];
@@ -72,7 +72,9 @@ public class Packet {
* synchronize packet)
*
*/
public static final byte STREAM_ID_UNKNOWN[] = new byte[] { 0x00, 0x00, 0x00, 0x00 };
public static final long STREAM_ID_UNKNOWN = 0l;
public static final long MAX_STREAM_ID = 0xffffffffl;
/**
* This packet is creating a new socket connection (if the receiveStreamId
@@ -149,17 +151,8 @@ public class Packet {
}
/** what stream is this packet a part of? */
public byte[] getSendStreamId() {
if ( (_sendStreamId == null) || (DataHelper.eq(_sendStreamId, STREAM_ID_UNKNOWN)) )
return null;
else
return _sendStreamId;
}
public void setSendStreamId(byte[] id) {
_sendStreamId = id;
if ( (id != null) && (DataHelper.eq(id, STREAM_ID_UNKNOWN)) )
_sendStreamId = null;
}
public long getSendStreamId() { return _sendStreamId; }
public void setSendStreamId(long id) { _sendStreamId = id; }
/**
* Stream that replies should be sent on. if the
@@ -167,17 +160,8 @@ public class Packet {
* null.
*
*/
public byte[] getReceiveStreamId() {
if ( (_receiveStreamId == null) || (DataHelper.eq(_receiveStreamId, STREAM_ID_UNKNOWN)) )
return null;
else
return _receiveStreamId;
}
public void setReceiveStreamId(byte[] id) {
_receiveStreamId = id;
if ( (id != null) && (DataHelper.eq(id, STREAM_ID_UNKNOWN)) )
_receiveStreamId = null;
}
public long getReceiveStreamId() { return _receiveStreamId; }
public void setReceiveStreamId(long id) { _receiveStreamId = id; }
/** 0-indexed sequence number for this Packet in the sendStream */
public long getSequenceNum() { return _sequenceNum; }
@@ -312,15 +296,9 @@ public class Packet {
*/
private int writePacket(byte buffer[], int offset, boolean includeSig) throws IllegalStateException {
int cur = offset;
if ( (_sendStreamId != null) && (_sendStreamId.length == 4) )
System.arraycopy(_sendStreamId, 0, buffer, cur, _sendStreamId.length);
else
System.arraycopy(STREAM_ID_UNKNOWN, 0, buffer, cur, STREAM_ID_UNKNOWN.length);
DataHelper.toLong(buffer, cur, 4, (_sendStreamId >= 0 ? _sendStreamId : STREAM_ID_UNKNOWN));
cur += 4;
if ( (_receiveStreamId != null) && (_receiveStreamId.length == 4) )
System.arraycopy(_receiveStreamId, 0, buffer, cur, _receiveStreamId.length);
else
System.arraycopy(STREAM_ID_UNKNOWN, 0, buffer, cur, STREAM_ID_UNKNOWN.length);
DataHelper.toLong(buffer, cur, 4, (_receiveStreamId >= 0 ? _receiveStreamId : STREAM_ID_UNKNOWN));
cur += 4;
DataHelper.toLong(buffer, cur, 4, _sequenceNum > 0 ? _sequenceNum : 0);
cur += 4;
@@ -398,7 +376,7 @@ public class Packet {
size += 4; // sequenceNum
size += 4; // ackThrough
if (_nacks != null) {
size++; // nacks length
size++; // nacks length
size += 4 * _nacks.length;
} else {
size++; // nacks length
@@ -440,11 +418,9 @@ public class Packet {
if (length < 22) // min header size
throw new IllegalArgumentException("Too small: len=" + buffer.length);
int cur = offset;
_sendStreamId = new byte[4];
System.arraycopy(buffer, cur, _sendStreamId, 0, 4);
_sendStreamId = DataHelper.fromLong(buffer, cur, 4);
cur += 4;
_receiveStreamId = new byte[4];
System.arraycopy(buffer, cur, _receiveStreamId, 0, 4);
_receiveStreamId = DataHelper.fromLong(buffer, cur, 4);
cur += 4;
_sequenceNum = DataHelper.fromLong(buffer, cur, 4);
cur += 4;
@@ -593,11 +569,8 @@ public class Packet {
return buf;
}
static final String toId(byte id[]) {
if (id == null)
return Base64.encode(STREAM_ID_UNKNOWN);
else
return Base64.encode(id);
static final String toId(long id) {
return Base64.encode(DataHelper.toLong(4, id));
}
private final String toFlagString() {

View File

@@ -97,11 +97,9 @@ public class PacketHandler {
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("packet received: " + packet);
byte sendId[] = packet.getSendStreamId();
if (!isNonZero(sendId))
sendId = null;
long sendId = packet.getSendStreamId();
Connection con = (sendId != null ? _manager.getConnectionByInboundId(sendId) : null);
Connection con = (sendId > 0 ? _manager.getConnectionByInboundId(sendId) : null);
if (con != null) {
receiveKnownCon(con, packet);
displayPacket(packet, "RECV", "wsize " + con.getOptions().getWindowSize() + " rto " + con.getOptions().getRTO());
@@ -127,9 +125,9 @@ public class PacketHandler {
private void receiveKnownCon(Connection con, Packet packet) {
if (packet.isFlagSet(Packet.FLAG_ECHO)) {
if (packet.getSendStreamId() != null) {
if (packet.getSendStreamId() > 0) {
receivePing(packet);
} else if (packet.getReceiveStreamId() != null) {
} else if (packet.getReceiveStreamId() > 0) {
receivePong(packet);
} else {
if (_log.shouldLog(Log.WARN))
@@ -162,9 +160,9 @@ public class PacketHandler {
_log.warn("Received forged reset for " + con, ie);
}
} else {
if ( (con.getSendStreamId() == null) ||
if ( (con.getSendStreamId() <= 0) ||
(DataHelper.eq(con.getSendStreamId(), packet.getReceiveStreamId())) ) {
byte oldId[] =con.getSendStreamId();
long oldId =con.getSendStreamId();
if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) // con fully established, w00t
con.setSendStreamId(packet.getReceiveStreamId());
@@ -214,11 +212,11 @@ public class PacketHandler {
_manager.getPacketQueue().enqueue(reply);
}
private void receiveUnknownCon(Packet packet, byte sendId[]) {
private void receiveUnknownCon(Packet packet, long sendId) {
if (packet.isFlagSet(Packet.FLAG_ECHO)) {
if (packet.getSendStreamId() != null) {
if (packet.getSendStreamId() > 0) {
receivePing(packet);
} else if (packet.getReceiveStreamId() != null) {
} else if (packet.getReceiveStreamId() > 0) {
receivePong(packet);
} else {
if (_log.shouldLog(Log.WARN))
@@ -228,7 +226,7 @@ public class PacketHandler {
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Packet received on an unknown stream (and not an ECHO): " + packet);
if (sendId == null) {
if (sendId <= 0) {
Connection con = _manager.getConnectionByOutboundId(packet.getReceiveStreamId());
if (con != null) {
if (con.getAckedPackets() <= 0) {
@@ -257,7 +255,7 @@ public class PacketHandler {
}
_log.warn("Packet belongs to no other cons: " + packet + " connections: "
+ buf.toString() + " sendId: "
+ (sendId != null ? Base64.encode(sendId) : " unknown"));
+ (sendId > 0 ? Packet.toId(sendId) : " unknown"));
}
packet.releasePayload();
}
@@ -289,25 +287,7 @@ public class PacketHandler {
_manager.receivePong(packet.getReceiveStreamId());
}
private static final boolean isValidMatch(byte conStreamId[], byte packetStreamId[]) {
if ( (conStreamId == null) || (packetStreamId == null) ||
(conStreamId.length != packetStreamId.length) )
return false;
boolean nonZeroFound = false;
for (int i = 0; i < conStreamId.length; i++) {
if (conStreamId[i] != packetStreamId[i]) return false;
if (conStreamId[i] != 0x0) nonZeroFound = true;
}
return nonZeroFound;
}
private static final boolean isNonZero(byte[] b) {
boolean nonZeroFound = false;
for (int i = 0; b != null && i < b.length; i++) {
if (b[i] != 0x0)
nonZeroFound = true;
}
return nonZeroFound;
private static final boolean isValidMatch(long conStreamId, long packetStreamId) {
return ( (conStreamId == packetStreamId) && (conStreamId != 0) );
}
}

View File

@@ -41,7 +41,7 @@ class SchedulerClosed extends SchedulerImpl {
(!con.getResetReceived()) &&
(timeSinceClose < Connection.DISCONNECT_TIMEOUT);
boolean conTimeout = (con.getOptions().getConnectTimeout() < con.getLifetime()) &&
con.getSendStreamId() == null &&
con.getSendStreamId() <= 0 &&
con.getLifetime() < Connection.DISCONNECT_TIMEOUT;
return (ok || conTimeout);
}

View File

@@ -36,7 +36,7 @@ class SchedulerDead extends SchedulerImpl {
boolean nothingLeftToDo = (con.getDisconnectScheduledOn() > 0) &&
(timeSinceClose >= Connection.DISCONNECT_TIMEOUT);
boolean timedOut = (con.getOptions().getConnectTimeout() < con.getLifetime()) &&
con.getSendStreamId() == null &&
con.getSendStreamId() <= 0 &&
con.getLifetime() >= Connection.DISCONNECT_TIMEOUT;
return nothingLeftToDo || timedOut;
}

View File

@@ -31,7 +31,7 @@ class SchedulerPreconnect extends SchedulerImpl {
public boolean accept(Connection con) {
return (con != null) &&
(con.getSendStreamId() == null) &&
(con.getSendStreamId() <= 0) &&
(con.getLastSendId() < 0);
}

View File

@@ -19,7 +19,7 @@ class SchedulerReceived extends SchedulerImpl {
public boolean accept(Connection con) {
return (con != null) &&
(con.getLastSendId() < 0) &&
(con.getSendStreamId() != null);
(con.getSendStreamId() > 0);
}
public void eventOccurred(Connection con) {