SSU2: Fixes part 8

Fix packet length for retransmitted handshake messages
Expire pending acks
This commit is contained in:
zzz
2022-03-12 11:08:20 -05:00
parent 15bb157126
commit aa620f5ed3
4 changed files with 39 additions and 2 deletions

View File

@ -615,6 +615,7 @@ class InboundEstablishState2 extends InboundEstablishState implements SSU2Payloa
byte data[] = pkt.getData();
int off = pkt.getOffset();
System.arraycopy(_sessCrForReTX, 0, data, off, _sessCrForReTX.length);
pkt.setLength(_sessCrForReTX.length);
pkt.setSocketAddress(_aliceSocketAddress);
packet.setMessageType(PacketBuilder2.TYPE_CONF);
packet.setPriority(PacketBuilder2.PRIORITY_HIGH);

View File

@ -514,6 +514,7 @@ class OutboundEstablishState2 extends OutboundEstablishState implements SSU2Payl
byte data[] = pkt.getData();
int off = pkt.getOffset();
System.arraycopy(_sessReqForReTX, 0, data, off, _sessReqForReTX.length);
pkt.setLength(_sessReqForReTX.length);
pkt.setSocketAddress(_bobSocketAddress);
packet.setMessageType(PacketBuilder2.TYPE_SREQ);
packet.setPriority(PacketBuilder2.PRIORITY_HIGH);

View File

@ -70,7 +70,7 @@ public class PeerState {
private SessionKey _nextCipherKey;
/** when were the current cipher and MAC keys established/rekeyed? */
private final long _keyEstablishedTime;
protected final long _keyEstablishedTime;
/**
* How far off is the remote peer from our clock, in milliseconds?

View File

@ -5,6 +5,7 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.security.GeneralSecurityException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ConcurrentHashMap;
@ -55,6 +56,7 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
*/
private final SSU2Bitfield _ackedMessages;
private final ConcurrentHashMap<Long, List<PacketBuilder.Fragment>> _sentMessages;
private long _sentMessagesLastExpired;
// Session Confirmed retransmit
private byte[] _sessConfForReTX;
@ -76,7 +78,7 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
private static final int BITFIELD_SIZE = 512;
private static final int MAX_SESS_CONF_RETX = 6;
private static final int SESS_CONF_RETX_TIME = 1000;
private static final long SENT_MESSAGES_CLEAN_TIME = 60*1000;
/**
@ -98,6 +100,7 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
_receivedMessages = new SSU2Bitfield(BITFIELD_SIZE, 0);
_ackedMessages = new SSU2Bitfield(BITFIELD_SIZE, 0);
_sentMessages = new ConcurrentHashMap<Long, List<PacketBuilder.Fragment>>(32);
_sentMessagesLastExpired = _keyEstablishedTime;
if (isInbound) {
// Send immediate ack of Session Confirmed
_receivedMessages.set(0);
@ -168,6 +171,37 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
}
}
/**
* Overridden to expire unacked packets in _sentMessages.
* These will remain unacked if lost; fragments will be retransmitted
* in a new packet.
*
* @return number of active outbound messages remaining
*/
@Override
int finishMessages(long now) {
if (now >= _sentMessagesLastExpired + SENT_MESSAGES_CLEAN_TIME) {
_sentMessagesLastExpired = now;
if (!_sentMessages.isEmpty()) {
if (_log.shouldDebug())
_log.debug("finishMessages() over " + _sentMessages.size() + " pending acks");
loop:
for (Iterator<List<PacketBuilder.Fragment>> iter = _sentMessages.values().iterator(); iter.hasNext(); ) {
List<PacketBuilder.Fragment> frags = iter.next();
for (PacketBuilder.Fragment f : frags) {
OutboundMessageState state = f.state;
if (!state.isComplete() && !state.isExpired(now))
continue loop;
}
iter.remove();
if (_log.shouldWarn())
_log.warn("Cleaned from sentMessages: " + frags);
}
}
}
return super.finishMessages(now);
}
/**
* Overridden to retransmit SessionConfirmed also
*/
@ -570,6 +604,7 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
byte data[] = pkt.getData();
int off = pkt.getOffset();
System.arraycopy(_sessConfForReTX, 0, data, off, _sessConfForReTX.length);
pkt.setLength(_sessConfForReTX.length);
pkt.setAddress(_remoteIPAddress);
pkt.setPort(_remotePort);
packet.setMessageType(PacketBuilder2.TYPE_CONF);