Store the session in Packet, so we may more easily and efficiently

handle multisession, especially on the incoming side.
More refactoring to follow
This commit is contained in:
zzz
2015-06-08 22:18:14 +00:00
parent cbc2f899a6
commit fef65c996f
9 changed files with 58 additions and 30 deletions

View File

@@ -310,13 +310,13 @@ class Connection {
// Unconditionally set // Unconditionally set
_resetSentOn.set(now); _resetSentOn.set(now);
if ( (_remotePeer == null) || (_sendStreamId.get() <= 0) ) return; if ( (_remotePeer == null) || (_sendStreamId.get() <= 0) ) return;
PacketLocal reply = new PacketLocal(_context, _remotePeer); PacketLocal reply = new PacketLocal(_context, _remotePeer, this);
reply.setFlag(Packet.FLAG_RESET); reply.setFlag(Packet.FLAG_RESET);
reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED); reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
reply.setSendStreamId(_sendStreamId.get()); reply.setSendStreamId(_sendStreamId.get());
reply.setReceiveStreamId(_receiveStreamId.get()); reply.setReceiveStreamId(_receiveStreamId.get());
// TODO remove this someday, as of 0.9.20 we do not require it // TODO remove this someday, as of 0.9.20 we do not require it
reply.setOptionalFrom(_connectionManager.getSession().getMyDestination()); reply.setOptionalFrom();
reply.setLocalPort(_localPort); reply.setLocalPort(_localPort);
reply.setRemotePort(_remotePort); reply.setRemotePort(_remotePort);
// this just sends the packet - no retries or whatnot // this just sends the packet - no retries or whatnot

View File

@@ -204,7 +204,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
//if ( (!ackOnly) && (packet.getSequenceNum() <= 0) ) { //if ( (!ackOnly) && (packet.getSequenceNum() <= 0) ) {
if (isFirst) { if (isFirst) {
packet.setFlag(Packet.FLAG_SYNCHRONIZE); packet.setFlag(Packet.FLAG_SYNCHRONIZE);
packet.setOptionalFrom(con.getSession().getMyDestination()); packet.setOptionalFrom();
packet.setOptionalMaxSize(con.getOptions().getMaxMessageSize()); packet.setOptionalMaxSize(con.getOptions().getMaxMessageSize());
} }
packet.setLocalPort(con.getLocalPort()); packet.setLocalPort(con.getLocalPort());

View File

@@ -243,14 +243,14 @@ class ConnectionHandler {
_log.warn("Received a spoofed SYN packet: they said they were " + packet.getOptionalFrom()); _log.warn("Received a spoofed SYN packet: they said they were " + packet.getOptionalFrom());
return; return;
} }
PacketLocal reply = new PacketLocal(_context, packet.getOptionalFrom()); PacketLocal reply = new PacketLocal(_context, packet.getOptionalFrom(), packet.getSession());
reply.setFlag(Packet.FLAG_RESET); reply.setFlag(Packet.FLAG_RESET);
reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED); reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
reply.setAckThrough(packet.getSequenceNum()); reply.setAckThrough(packet.getSequenceNum());
reply.setSendStreamId(packet.getReceiveStreamId()); reply.setSendStreamId(packet.getReceiveStreamId());
reply.setReceiveStreamId(0); reply.setReceiveStreamId(0);
// TODO remove this someday, as of 0.9.20 we do not require it // TODO remove this someday, as of 0.9.20 we do not require it
reply.setOptionalFrom(_manager.getSession().getMyDestination()); reply.setOptionalFrom();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending RST: " + reply + " because of " + packet); _log.debug("Sending RST: " + reply + " because of " + packet);
// this just sends the packet - no retries or whatnot // this just sends the packet - no retries or whatnot
@@ -292,6 +292,7 @@ class ConnectionHandler {
public static final int POISON_MAX_DELAY_REQUEST = Packet.MAX_DELAY_REQUEST + 1; public static final int POISON_MAX_DELAY_REQUEST = Packet.MAX_DELAY_REQUEST + 1;
public PoisonPacket() { public PoisonPacket() {
super(null);
setOptionalDelay(POISON_MAX_DELAY_REQUEST); setOptionalDelay(POISON_MAX_DELAY_REQUEST);
} }
} }

View File

@@ -270,13 +270,13 @@ class ConnectionManager {
} }
} }
} }
PacketLocal reply = new PacketLocal(_context, from); PacketLocal reply = new PacketLocal(_context, from, synPacket.getSession());
reply.setFlag(Packet.FLAG_RESET); reply.setFlag(Packet.FLAG_RESET);
reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED); reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
reply.setAckThrough(synPacket.getSequenceNum()); reply.setAckThrough(synPacket.getSequenceNum());
reply.setSendStreamId(synPacket.getReceiveStreamId()); reply.setSendStreamId(synPacket.getReceiveStreamId());
reply.setReceiveStreamId(0); reply.setReceiveStreamId(0);
reply.setOptionalFrom(_session.getMyDestination()); reply.setOptionalFrom();
reply.setLocalPort(synPacket.getLocalPort()); reply.setLocalPort(synPacket.getLocalPort());
reply.setRemotePort(synPacket.getRemotePort()); reply.setRemotePort(synPacket.getRemotePort());
// this just sends the packet - no retries or whatnot // this just sends the packet - no retries or whatnot
@@ -329,7 +329,7 @@ class ConnectionManager {
return false; return false;
} }
} }
PacketLocal pong = new PacketLocal(_context, dest); PacketLocal pong = new PacketLocal(_context, dest, ping.getSession());
pong.setFlag(Packet.FLAG_ECHO | Packet.FLAG_NO_ACK); pong.setFlag(Packet.FLAG_ECHO | Packet.FLAG_NO_ACK);
pong.setReceiveStreamId(ping.getSendStreamId()); pong.setReceiveStreamId(ping.getSendStreamId());
pong.setLocalPort(ping.getLocalPort()); pong.setLocalPort(ping.getLocalPort());
@@ -734,12 +734,12 @@ class ConnectionManager {
boolean blocking, PingNotifier notifier) { boolean blocking, PingNotifier notifier) {
PingRequest req = new PingRequest(notifier); PingRequest req = new PingRequest(notifier);
long id = assignPingId(req); long id = assignPingId(req);
PacketLocal packet = new PacketLocal(_context, peer); PacketLocal packet = new PacketLocal(_context, peer, _session);
packet.setSendStreamId(id); packet.setSendStreamId(id);
packet.setFlag(Packet.FLAG_ECHO | packet.setFlag(Packet.FLAG_ECHO |
Packet.FLAG_NO_ACK | Packet.FLAG_NO_ACK |
Packet.FLAG_SIGNATURE_INCLUDED); Packet.FLAG_SIGNATURE_INCLUDED);
packet.setOptionalFrom(_session.getMyDestination()); packet.setOptionalFrom();
packet.setLocalPort(fromPort); packet.setLocalPort(fromPort);
packet.setRemotePort(toPort); packet.setRemotePort(toPort);
if (timeoutMs > MAX_PING_TIMEOUT) if (timeoutMs > MAX_PING_TIMEOUT)
@@ -780,12 +780,12 @@ class ConnectionManager {
byte[] payload) { byte[] payload) {
PingRequest req = new PingRequest(null); PingRequest req = new PingRequest(null);
long id = assignPingId(req); long id = assignPingId(req);
PacketLocal packet = new PacketLocal(_context, peer); PacketLocal packet = new PacketLocal(_context, peer, _session);
packet.setSendStreamId(id); packet.setSendStreamId(id);
packet.setFlag(Packet.FLAG_ECHO | packet.setFlag(Packet.FLAG_ECHO |
Packet.FLAG_NO_ACK | Packet.FLAG_NO_ACK |
Packet.FLAG_SIGNATURE_INCLUDED); Packet.FLAG_SIGNATURE_INCLUDED);
packet.setOptionalFrom(_session.getMyDestination()); packet.setOptionalFrom();
packet.setLocalPort(fromPort); packet.setLocalPort(fromPort);
packet.setRemotePort(toPort); packet.setRemotePort(toPort);
packet.setPayload(new ByteArray(payload)); packet.setPayload(new ByteArray(payload));

View File

@@ -70,7 +70,7 @@ class MessageHandler implements I2PSessionMuxedListener {
" (" + _manager + ')' + " (" + _manager + ')' +
" proto: " + proto + " proto: " + proto +
" fromPort: " + fromPort + " toPort: " + toPort); " fromPort: " + fromPort + " toPort: " + toPort);
Packet packet = new Packet(); Packet packet = new Packet(session);
try { try {
packet.readPacket(data, 0, data.length); packet.readPacket(data, 0, data.length);
packet.setRemotePort(fromPort); packet.setRemotePort(fromPort);

View File

@@ -5,6 +5,7 @@ import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import net.i2p.I2PAppContext; import net.i2p.I2PAppContext;
import net.i2p.client.I2PSession;
import net.i2p.crypto.SigType; import net.i2p.crypto.SigType;
import net.i2p.data.Base64; import net.i2p.data.Base64;
import net.i2p.data.ByteArray; import net.i2p.data.ByteArray;
@@ -67,6 +68,7 @@ import net.i2p.util.Log;
* *
*/ */
class Packet { class Packet {
protected final I2PSession _session;
private long _sendStreamId; private long _sendStreamId;
private long _receiveStreamId; private long _receiveStreamId;
private long _sequenceNum; private long _sequenceNum;
@@ -167,8 +169,15 @@ class Packet {
* Does no initialization. * Does no initialization.
* See readPacket() for inbound packets, and the setters for outbound packets. * See readPacket() for inbound packets, and the setters for outbound packets.
*/ */
public Packet() { } public Packet(I2PSession session) {
_session = session;
}
/** @since 0.9.21 */
public I2PSession getSession() {
return _session;
}
private boolean _sendStreamIdSet = false; private boolean _sendStreamIdSet = false;
/** what stream do we send data to the peer on? /** what stream do we send data to the peer on?
@@ -334,10 +343,13 @@ class Packet {
*/ */
public Destination getOptionalFrom() { return _optionFrom; } public Destination getOptionalFrom() { return _optionFrom; }
public void setOptionalFrom(Destination from) { /**
setFlag(FLAG_FROM_INCLUDED, from != null); * This sets the from field in the packet to the Destination for the session
if (from == null) throw new RuntimeException("from is null!?"); * provided in the constructor.
_optionFrom = from; */
public void setOptionalFrom() {
setFlag(FLAG_FROM_INCLUDED, true);
_optionFrom = _session.getMyDestination();
} }
/** /**
@@ -508,10 +520,10 @@ class Packet {
return cur - offset; return cur - offset;
} }
/** /**
* how large would this packet be if we wrote it * how large would this packet be if we wrote it
* @return How large the current packet would be * @return How large the current packet would be
*
* @throws IllegalStateException * @throws IllegalStateException
*/ */
private int writtenSize() { private int writtenSize() {
@@ -546,6 +558,8 @@ class Packet {
return size; return size;
} }
/** /**
* Read the packet from the buffer (starting at the offset) and return * Read the packet from the buffer (starting at the offset) and return
* the number of bytes read. * the number of bytes read.
@@ -619,7 +633,7 @@ class Packet {
try { try {
Destination optionFrom = Destination.create(bais); Destination optionFrom = Destination.create(bais);
cur += optionFrom.size(); cur += optionFrom.size();
setOptionalFrom(optionFrom); _optionFrom = optionFrom;
} catch (IOException ioe) { } catch (IOException ioe) {
throw new IllegalArgumentException("Bad from field", ioe); throw new IllegalArgumentException("Bad from field", ioe);
} catch (DataFormatException dfe) { } catch (DataFormatException dfe) {

View File

@@ -231,6 +231,8 @@ class PacketHandler {
* This sends a reset back to the place this packet came from. * This sends a reset back to the place this packet came from.
* If the packet has no 'optional from' or valid signature, this does nothing. * If the packet has no 'optional from' or valid signature, this does nothing.
* This is not associated with a connection, so no con stats are updated. * This is not associated with a connection, so no con stats are updated.
*
* @param packet incoming packet to be replied to
*/ */
private void sendReset(Packet packet) { private void sendReset(Packet packet) {
Destination from = packet.getOptionalFrom(); Destination from = packet.getOptionalFrom();
@@ -242,13 +244,13 @@ class PacketHandler {
_log.warn("Can't send reset after recv spoofed packet: " + packet); _log.warn("Can't send reset after recv spoofed packet: " + packet);
return; return;
} }
PacketLocal reply = new PacketLocal(_context, from); PacketLocal reply = new PacketLocal(_context, from, packet.getSession());
reply.setFlag(Packet.FLAG_RESET); reply.setFlag(Packet.FLAG_RESET);
reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED); reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
reply.setSendStreamId(packet.getReceiveStreamId()); reply.setSendStreamId(packet.getReceiveStreamId());
reply.setReceiveStreamId(packet.getSendStreamId()); reply.setReceiveStreamId(packet.getSendStreamId());
// TODO remove this someday, as of 0.9.20 we do not require it // TODO remove this someday, as of 0.9.20 we do not require it
reply.setOptionalFrom(_manager.getSession().getMyDestination()); reply.setOptionalFrom();
reply.setLocalPort(packet.getLocalPort()); reply.setLocalPort(packet.getLocalPort());
reply.setRemotePort(packet.getRemotePort()); reply.setRemotePort(packet.getRemotePort());
// this just sends the packet - no retries or whatnot // this just sends the packet - no retries or whatnot

View File

@@ -6,6 +6,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import net.i2p.I2PAppContext; import net.i2p.I2PAppContext;
import net.i2p.client.I2PSession;
import net.i2p.data.Destination; import net.i2p.data.Destination;
import net.i2p.data.SessionKey; import net.i2p.data.SessionKey;
import net.i2p.data.SessionTag; import net.i2p.data.SessionTag;
@@ -37,11 +38,20 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
private volatile SimpleTimer2.TimedEvent _resendEvent; private volatile SimpleTimer2.TimedEvent _resendEvent;
/** not bound to a connection */ /** not bound to a connection */
public PacketLocal(I2PAppContext ctx, Destination to) { public PacketLocal(I2PAppContext ctx, Destination to, I2PSession session) {
this(ctx, to, null); super(session);
_context = ctx;
_createdOn = ctx.clock().now();
_log = ctx.logManager().getLog(PacketLocal.class);
_to = to;
_connection = null;
_lastSend = -1;
_cancelledOn = -1;
} }
/** bound to a connection */
public PacketLocal(I2PAppContext ctx, Destination to, Connection con) { public PacketLocal(I2PAppContext ctx, Destination to, Connection con) {
super(con.getSession());
_context = ctx; _context = ctx;
_createdOn = ctx.clock().now(); _createdOn = ctx.clock().now();
_log = ctx.logManager().getLog(PacketLocal.class); _log = ctx.logManager().getLog(PacketLocal.class);
@@ -203,10 +213,11 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
* @throws IllegalStateException if there is data missing or otherwise b0rked * @throws IllegalStateException if there is data missing or otherwise b0rked
* @since 0.9.20 moved from Packet * @since 0.9.20 moved from Packet
*/ */
public int writeSignedPacket(byte buffer[], int offset, I2PAppContext ctx, SigningPrivateKey key) throws IllegalStateException { public int writeSignedPacket(byte buffer[], int offset) throws IllegalStateException {
setFlag(FLAG_SIGNATURE_INCLUDED); setFlag(FLAG_SIGNATURE_INCLUDED);
SigningPrivateKey key = _session.getPrivateKey();
int size = writePacket(buffer, offset, key.getType().getSigLen()); int size = writePacket(buffer, offset, key.getType().getSigLen());
_optionSignature = ctx.dsa().sign(buffer, offset, size, key); _optionSignature = _context.dsa().sign(buffer, offset, size, key);
//if (false) { //if (false) {
// Log l = ctx.logManager().getLog(Packet.class); // Log l = ctx.logManager().getLog(Packet.class);
// l.error("Signing: " + toString()); // l.error("Signing: " + toString());
@@ -258,7 +269,7 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
Connection con = _connection; Connection con = _connection;
if (con != null) { if (con != null) {
buf.append(" from "); buf.append(" from ");
Destination local = con.getSession().getMyDestination(); Destination local = _session.getMyDestination();
if (local != null) if (local != null)
buf.append(local.calculateHash().toBase64().substring(0,4)); buf.append(local.calculateHash().toBase64().substring(0,4));
else else
@@ -275,7 +286,7 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
} }
return buf; return buf;
} }
////// begin WriteStatus methods ////// begin WriteStatus methods
/** /**

View File

@@ -99,7 +99,7 @@ class PacketQueue implements SendMessageStatusListener {
int size = 0; int size = 0;
long beforeWrite = System.currentTimeMillis(); long beforeWrite = System.currentTimeMillis();
if (packet.shouldSign()) if (packet.shouldSign())
size = packet.writeSignedPacket(buf, 0, _context, _session.getPrivateKey()); size = packet.writeSignedPacket(buf, 0);
else else
size = packet.writePacket(buf, 0); size = packet.writePacket(buf, 0);
long writeTime = System.currentTimeMillis() - beforeWrite; long writeTime = System.currentTimeMillis() - beforeWrite;
@@ -197,7 +197,7 @@ class PacketQueue implements SendMessageStatusListener {
//packet.setTagsSent(tagsSent); //packet.setTagsSent(tagsSent);
packet.incrementSends(); packet.incrementSends();
Connection c = packet.getConnection(); Connection c = packet.getConnection();
if (c != null) { if (c != null && _log.shouldDebug()) {
String suffix = "wsize " + c.getOptions().getWindowSize() + " rto " + c.getOptions().getRTO(); String suffix = "wsize " + c.getOptions().getWindowSize() + " rto " + c.getOptions().getRTO();
c.getConnectionManager().getPacketHandler().displayPacket(packet, "SEND", suffix); c.getConnectionManager().getPacketHandler().displayPacket(packet, "SEND", suffix);
} }