* synchronize around the buffer used in the packet queue (duh)

* dont increment # unacked packets artificially
* dont try to push data after closing
* cleanup the packet serialization
* logging
This commit is contained in:
jrandom
2004-11-08 21:40:25 +00:00
committed by zzz
parent 53f3802a81
commit 16715aa309
7 changed files with 68 additions and 53 deletions

View File

@ -54,11 +54,11 @@ public class ConnectionPacketHandler {
//con.getOptions().setWindowSize(con.getOptions().getWindowSize()/2);
if (_log.shouldLog(Log.WARN))
_log.warn("congestion.. dup " + packet);
con.incrementUnackedPacketsReceived();
//con.incrementUnackedPacketsReceived();
con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay());
} else {
if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) {
con.incrementUnackedPacketsReceived();
//con.incrementUnackedPacketsReceived();
con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay());
} else {
if (_log.shouldLog(Log.DEBUG))

View File

@ -273,12 +273,12 @@ public class Packet {
*/
private int writePacket(byte buffer[], int offset, boolean includeSig) throws IllegalStateException {
int cur = offset;
if (_sendStreamId != null)
if ( (_sendStreamId != null) && (_sendStreamId.length == 4) )
System.arraycopy(_sendStreamId, 0, buffer, cur, _sendStreamId.length);
else
System.arraycopy(STREAM_ID_UNKNOWN, 0, buffer, cur, STREAM_ID_UNKNOWN.length);
cur += 4;
if (_receiveStreamId != null)
if ( (_receiveStreamId != null) && (_receiveStreamId.length == 4) )
System.arraycopy(_receiveStreamId, 0, buffer, cur, _receiveStreamId.length);
else
System.arraycopy(STREAM_ID_UNKNOWN, 0, buffer, cur, STREAM_ID_UNKNOWN.length);
@ -354,8 +354,8 @@ public class Packet {
*/
public int writtenSize() throws IllegalStateException {
int size = 0;
size += _sendStreamId.length;
size += _receiveStreamId.length;
size += 4; // _sendStreamId.length;
size += 4; // _receiveStreamId.length;
size += 4; // sequenceNum
size += 4; // ackThrough
if (_nacks != null) {
@ -426,12 +426,13 @@ public class Packet {
int optionSize = (int)DataHelper.fromLong(buffer, cur, 2);
cur += 2;
int payloadBegin = cur + optionSize;
int payloadSize = length - payloadBegin;
if ( (payloadSize < 0) || (payloadSize > MAX_PAYLOAD_SIZE) )
throw new IllegalArgumentException("length: " + length + " offset: " + offset + " begin: " + payloadBegin);
// skip ahead to the payload
_payload = new byte[offset + length - payloadBegin];
if (_payload.length > MAX_PAYLOAD_SIZE)
throw new IllegalArgumentException("length: " + length + " offset: " + offset + " begin: " + payloadBegin);
System.arraycopy(buffer, payloadBegin, _payload, 0, _payload.length);
_payload = new byte[payloadSize];
System.arraycopy(buffer, payloadBegin, _payload, 0, payloadSize);
// ok now lets go back and deal with the options
if (isFlagSet(FLAG_DELAY_REQUESTED)) {
@ -480,7 +481,7 @@ public class Packet {
}
boolean ok = ctx.dsa().verifySignature(_optionSignature, buffer, 0, size, from.getSigningPublicKey());
if (!ok) {
ctx.logManager().getLog(Packet.class).error("Signature failed with sig " + Base64.encode(_optionSignature.getData()), new Exception("moo"));
ctx.logManager().getLog(Packet.class).error("Signature failed on " + toString(), new Exception("moo"));
}
return ok;
}

View File

@ -173,7 +173,7 @@ public class PacketHandler {
}
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Packet received on an unknown stream (and not a SYN): " + packet);
_log.debug("Packet received on an unknown stream (and not an ECHO): " + packet);
if (sendId == null) {
for (Iterator iter = _manager.listConnections().iterator(); iter.hasNext(); ) {
Connection con = (Connection)iter.next();

View File

@ -56,7 +56,8 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat
public boolean shouldSign() {
return isFlagSet(FLAG_SIGNATURE_INCLUDED) ||
isFlagSet(FLAG_SYNCHRONIZE) ||
isFlagSet(FLAG_CLOSE);
isFlagSet(FLAG_CLOSE) ||
isFlagSet(FLAG_ECHO);
}
/** last minute update of ack fields, just before write/sign */

View File

@ -1,5 +1,6 @@
package net.i2p.client.streaming;
import java.util.Arrays;
import java.util.Set;
import java.util.HashSet;
@ -30,11 +31,6 @@ class PacketQueue {
*/
public void enqueue(PacketLocal packet) {
packet.prepare();
int size = 0;
if (packet.shouldSign())
size = packet.writeSignedPacket(_buf, 0, _context, _session.getPrivateKey());
else
size = packet.writePacket(_buf, 0);
SessionKey keyUsed = packet.getKeyUsed();
if (keyUsed == null)
@ -42,42 +38,57 @@ class PacketQueue {
Set tagsSent = packet.getTagsSent();
if (tagsSent == null)
tagsSent = new HashSet();
// cache this from before sendMessage
String conStr = (packet.getConnection() != null ? packet.getConnection().toString() : "");
if (packet.getAckTime() > 0) {
_log.debug("Not resending " + packet);
return;
} else {
_log.debug("Sending... " + packet);
}
long begin = 0;
long end = 0;
boolean sent = false;
try {
// cache this from before sendMessage
String conStr = (packet.getConnection() != null ? packet.getConnection().toString() : "");
if (packet.getAckTime() > 0) {
_log.debug("Not resending " + packet);
return;
} else {
_log.debug("Sending... " + packet);
}
// this should not block!
long begin = _context.clock().now();
boolean sent = _session.sendMessage(packet.getTo(), _buf, 0, size, keyUsed, tagsSent);
long end = _context.clock().now();
if (!sent) {
if (_log.shouldLog(Log.WARN))
_log.warn("Send failed for " + packet);
packet.getConnection().disconnect(false);
} else {
packet.setKeyUsed(keyUsed);
packet.setTagsSent(tagsSent);
packet.incrementSends();
if (_log.shouldLog(Log.DEBUG)) {
String msg = "SEND " + packet + (tagsSent.size() > 0
? " with " + tagsSent.size() + " tags"
: "")
+ " send # " + packet.getNumSends()
+ " sendTime: " + (end-begin)
+ " con: " + conStr;
_log.debug(msg);
}
PacketHandler.displayPacket(packet, "SEND");
synchronized (this) {
Arrays.fill(_buf, (byte)0x0);
int size = 0;
if (packet.shouldSign())
size = packet.writeSignedPacket(_buf, 0, _context, _session.getPrivateKey());
else
size = packet.writePacket(_buf, 0);
// this should not block!
begin = _context.clock().now();
sent = _session.sendMessage(packet.getTo(), _buf, 0, size, keyUsed, tagsSent);
end = _context.clock().now();
}
} catch (I2PSessionException ise) {
if (_log.shouldLog(Log.WARN))
_log.warn("Unable to send the packet " + packet, ise);
}
if (!sent) {
if (_log.shouldLog(Log.WARN))
_log.warn("Send failed for " + packet);
packet.getConnection().disconnect(false);
} else {
packet.setKeyUsed(keyUsed);
packet.setTagsSent(tagsSent);
packet.incrementSends();
if (_log.shouldLog(Log.DEBUG)) {
String msg = "SEND " + packet + (tagsSent.size() > 0
? " with " + tagsSent.size() + " tags"
: "")
+ " send # " + packet.getNumSends()
+ " sendTime: " + (end-begin)
+ " con: " + conStr;
_log.debug(msg);
}
PacketHandler.displayPacket(packet, "SEND");
}
}
}

View File

@ -46,8 +46,10 @@ class SchedulerClosing extends SchedulerImpl {
con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay());
long remaining = con.getNextSendTime() - _context.clock().now();
if (remaining <= 0) {
con.sendAvailable();
con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay());
if (con.getCloseSentOn() <= 0) {
con.sendAvailable();
con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay());
}
} else {
//if (remaining < 5*1000)
// remaining = 5*1000;

View File

@ -43,9 +43,9 @@ class SchedulerConnectedBulk extends SchedulerImpl {
(!con.getResetReceived()) &&
( (con.getCloseSentOn() <= 0) || (con.getCloseReceivedOn() <= 0) );
if (!ok) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("con: " + con + " closeSentOn: " + con.getCloseSentOn()
+ " closeReceivedOn: " + con.getCloseReceivedOn());
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("con: " + con + " closeSentOn: " + con.getCloseSentOn()
// + " closeReceivedOn: " + con.getCloseReceivedOn());
}
return ok;
}