SSU InboundMessageState.PartialBitfield, PacketBuilder:

- Add ACKBitfield.highestReceived() for efficiency
- Only write as many partial bitfield bytes as required,
  rather than 10 (for 64 bits) every time.
- Don't allow more than 10 bytes when reading in bitfield
- Don't send an extra byte if (fragments % 7) == 0
- Don't send a corrupt ack packet if the partial ack got completed (race)
- Log tweaks
This commit is contained in:
zzz
2014-09-14 17:51:29 +00:00
parent 0a41052f3f
commit 67fb4e7007
5 changed files with 85 additions and 23 deletions

View File

@@ -24,4 +24,11 @@ interface ACKBitfield {
* @since 0.9.16
*/
public int ackCount();
/**
* Highest fragment number acked in this bitfield.
* @return highest fragment number acked, or -1 if none
* @since 0.9.16
*/
public int highestReceived();
}

View File

@@ -39,6 +39,9 @@ class InboundMessageState implements CDQEntry {
private static final long MAX_RECEIVE_TIME = 10*1000;
public static final int MAX_FRAGMENTS = 64;
/** 10 */
public static final int MAX_PARTIAL_BITFIELD_BYTES = (MAX_FRAGMENTS / 7) + 1;
private static final int MAX_FRAGMENT_SIZE = UDPPacket.MAX_PACKET_SIZE;
private static final ByteCache _fragmentCache = ByteCache.getInstance(64, MAX_FRAGMENT_SIZE);
@@ -232,6 +235,7 @@ class InboundMessageState implements CDQEntry {
private final long _bitfieldMessageId;
private final int _fragmentCount;
private final int _ackCount;
private final int _highestReceived;
// bitfield, 1 for acked
private final long _fragmentAcks;
@@ -244,16 +248,19 @@ class InboundMessageState implements CDQEntry {
throw new IllegalArgumentException();
_bitfieldMessageId = messageId;
int ackCount = 0;
int highestReceived = -1;
long acks = 0;
for (int i = 0; i < size; i++) {
if (data[i] != null) {
acks |= mask(i);
ackCount++;
highestReceived = i;
}
}
_fragmentAcks = acks;
_fragmentCount = size;
_ackCount = ackCount;
_highestReceived = highestReceived;
}
/**
@@ -267,6 +274,8 @@ class InboundMessageState implements CDQEntry {
public int ackCount() { return _ackCount; }
public int highestReceived() { return _highestReceived; }
public long getMessageId() { return _bitfieldMessageId; }
public boolean received(int fragmentNum) {
@@ -280,14 +289,15 @@ class InboundMessageState implements CDQEntry {
@Override
public String toString() {
StringBuilder buf = new StringBuilder(64);
buf.append("Partial ACK of ");
buf.append("OB Partial ACK of ");
buf.append(_bitfieldMessageId);
buf.append(" with ACKs for: ");
buf.append(" highest: ").append(_highestReceived);
buf.append(" with ").append(_ackCount).append(" ACKs for: [");
for (int i = 0; i < _fragmentCount; i++) {
if (received(i))
buf.append(i).append(" ");
buf.append(i).append(' ');
}
buf.append(" / ").append(_fragmentCount);
buf.append("] / ").append(_fragmentCount);
return buf.toString();
}
}

View File

@@ -349,7 +349,15 @@ class PacketBuilder {
break; // ack count
if (bf.receivedComplete())
continue;
int acksz = 4 + (bf.fragmentCount() / 7) + 1;
// only send what we have to
//int acksz = 4 + (bf.fragmentCount() / 7) + 1;
int bits = bf.highestReceived() + 1;
if (bits <= 0)
continue;
int acksz = bits / 7;
if (bits % 7 > 0)
acksz++;
acksz += 4;
if (partialAcksToSend == 0)
acksz++; // ack count
if (availableForExplicitAcks >= acksz) {
@@ -414,10 +422,16 @@ class PacketBuilder {
for (int i = 0; i < partialAcksToSend && iter.hasNext(); i++) {
ACKBitfield bitfield = iter.next();
if (bitfield.receivedComplete()) continue;
// only send what we have to
//int bits = bitfield.fragmentCount();
int bits = bitfield.highestReceived() + 1;
if (bits <= 0)
continue;
int size = bits / 7;
if (bits % 7 > 0)
size++;
DataHelper.toLong(data, off, 4, bitfield.getMessageId());
off += 4;
int bits = bitfield.fragmentCount();
int size = (bits / 7) + 1;
for (int curByte = 0; curByte < size; curByte++) {
if (curByte + 1 < size)
data[off] |= (byte)(1 << 7);
@@ -430,7 +444,7 @@ class PacketBuilder {
}
iter.remove();
if (msg != null) // logging it
msg.append(' ').append(bitfield);
msg.append(' ').append(bitfield).append(" with ack bytes: ").append(size);
}
//acksIncluded = true;
// now jump back and fill in the number of bitfields *actually* included
@@ -607,11 +621,16 @@ class PacketBuilder {
off++;
for (int i = 0; i < ackBitfields.size(); i++) {
ACKBitfield bitfield = ackBitfields.get(i);
if (bitfield.receivedComplete()) continue;
// no, this will corrupt the packet
//if (bitfield.receivedComplete()) continue;
DataHelper.toLong(data, off, 4, bitfield.getMessageId());
off += 4;
int bits = bitfield.fragmentCount();
int size = (bits / 7) + 1;
// only send what we have to
//int bits = bitfield.fragmentCount();
int bits = bitfield.highestReceived() + 1;
int size = bits / 7;
if (bits == 0 || bits % 7 > 0)
size++;
for (int curByte = 0; curByte < size; curByte++) {
if (curByte + 1 < size)
data[off] |= (byte)(1 << 7);
@@ -624,7 +643,7 @@ class PacketBuilder {
}
if (msg != null) // logging it
msg.append(" partial ack: ").append(bitfield);
msg.append(" partial ack: ").append(bitfield).append(" with ack bytes: ").append(size);
}
}

View File

@@ -1075,8 +1075,9 @@ class PeerState {
public FullACKBitfield(long id) { _msgId = id; }
public int fragmentCount() { return 0; }
public int ackCount() { return 0; }
public int fragmentCount() { return 1; }
public int ackCount() { return 1; }
public int highestReceived() { return 0; }
public long getMessageId() { return _msgId; }
public boolean received(int fragmentNum) { return true; }
public boolean receivedComplete() { return true; }
@@ -1709,7 +1710,7 @@ class PeerState {
*/
public int fragmentSize() {
// 46 + 20 + 8 + 13 = 74 + 13 = 87 (IPv4)
// 46 + 40 + 8 + 13 = 74 + 13 = 107 (IPv6)
// 46 + 40 + 8 + 13 = 94 + 13 = 107 (IPv6)
return _mtu -
(_remoteIP.length == 4 ? PacketBuilder.MIN_DATA_PACKET_OVERHEAD : PacketBuilder.MIN_IPV6_DATA_PACKET_OVERHEAD) -
MIN_ACK_SIZE;

View File

@@ -519,8 +519,10 @@ class UDPPacketReader {
int bfsz = 1;
// bitfield is an array of bytes where the high bit is 1 if
// further bytes in the bitfield follow
while ((_message[_bitfieldStart + bfsz - 1] & UDPPacket.BITFIELD_CONTINUATION) != 0x0)
bfsz++;
while ((_message[_bitfieldStart + bfsz - 1] & UDPPacket.BITFIELD_CONTINUATION) != 0x0) {
if (++bfsz > InboundMessageState.MAX_PARTIAL_BITFIELD_BYTES)
throw new IllegalArgumentException();
}
_bitfieldSize = bfsz;
}
@@ -549,6 +551,28 @@ class UDPPacketReader {
return rv;
}
/**
* Highest fragment number acked in this bitfield.
* @return highest fragment number acked, or -1 if none
* @since 0.9.16
*/
public int highestReceived() {
int count = fragmentCount();
int rv = -1;
for (int i = 0; i < _bitfieldSize; i++) {
byte b = _message[_bitfieldStart + i];
b &= 0x7f;
int j = 0;
while (b != 0 && j++ < 7) {
if ((b & 0x01) != 0)
rv = (7 * i) + j;
b >>= 1;
b &= 0x7f;
}
}
return rv;
}
public boolean received(int fragmentNum) {
if ( (fragmentNum < 0) || (fragmentNum >= _bitfieldSize*7) )
return false;
@@ -561,16 +585,17 @@ class UDPPacketReader {
@Override
public String toString() {
StringBuilder buf = new StringBuilder(64);
buf.append("Read partial ACK of ");
buf.append("IB Partial ACK of ");
buf.append(getMessageId());
buf.append(" with ACKs for: ");
buf.append(" highest: ").append(highestReceived());
buf.append(" with ACKs for: [");
int numFrags = fragmentCount();
for (int i = 0; i < numFrags; i++) {
if (received(i))
buf.append(i).append(" ");
else
buf.append('!').append(i).append(" ");
if (!received(i))
buf.append('!');
buf.append(i).append(' ');
}
buf.append("] / ").append(numFrags);
return buf.toString();
}
}