- Use remote MTU when published (ticket #687)
   - Queue outbound msgs during inbound establish
   - IntroManager cleanups
   - More synchronization
   - More log tweaks
This commit is contained in:
zzz
2012-08-17 14:15:01 +00:00
parent 3cac01ff27
commit 3a546612d9
9 changed files with 256 additions and 64 deletions

View File

@@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 11;
public final static long BUILD = 12;
/** for example "-test" */
public final static String EXTRA = "";

View File

@@ -12,6 +12,7 @@ import net.i2p.data.Base64;
import net.i2p.data.Hash;
import net.i2p.data.RouterAddress;
import net.i2p.data.RouterIdentity;
import net.i2p.data.RouterInfo;
import net.i2p.data.SessionKey;
import net.i2p.data.i2np.DatabaseStoreMessage;
import net.i2p.data.i2np.DeliveryStatusMessage;
@@ -191,9 +192,39 @@ class EstablishmentManager {
//_context.shitlist().shitlistRouter(msg.getTarget().getIdentity().calculateHash(), "Invalid SSU address", UDPTransport.STYLE);
return;
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Add outbound establish state to: " + to);
InboundEstablishState inState = _inboundStates.get(to);
if (inState != null) {
// we have an inbound establishment in progress, queue it there instead
synchronized (inState) {
switch (inState.getState()) {
case IB_STATE_UNKNOWN:
case IB_STATE_REQUEST_RECEIVED:
case IB_STATE_CREATED_SENT:
case IB_STATE_CONFIRMED_PARTIALLY:
case IB_STATE_CONFIRMED_COMPLETELY:
// queue it
inState.addMessage(msg);
if (_log.shouldLog(Log.WARN))
_log.debug("OB msg queued to IES");
break;
case IB_STATE_COMPLETE:
// race, send it out (but don't call _transport.send() again and risk a loop)
_transport.sendIfEstablished(msg);
break;
case IB_STATE_FAILED:
// race, failed
_transport.failed(msg, "OB msg failed during IB establish");
break;
}
}
return;
}
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Add indirect outbound establish state to: " + addr);
@@ -251,9 +282,13 @@ class EstablishmentManager {
sessionKey, addr, _transport.getDHBuilder());
OutboundEstablishState oldState = _outboundStates.putIfAbsent(to, state);
boolean isNew = oldState == null;
if (!isNew)
if (isNew) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Adding new " + state);
} else {
// whoops, somebody beat us to it, throw out the state we just created
state = oldState;
}
}
}
if (state != null) {
@@ -548,7 +583,7 @@ class EstablishmentManager {
*
*/
private void handleCompletelyEstablished(InboundEstablishState state) {
if (state.complete()) return;
if (state.isComplete()) return;
RouterIdentity remote = state.getConfirmedIdentity();
PeerState peer = new PeerState(_context, _transport,
@@ -556,6 +591,22 @@ class EstablishmentManager {
peer.setCurrentCipherKey(state.getCipherKey());
peer.setCurrentMACKey(state.getMACKey());
peer.setWeRelayToThemAs(state.getSentRelayTag());
// Lookup the peer's MTU from the netdb, since it isn't included in the protocol setup (yet)
// TODO if we don't have RI then we will get it shortly, but too late.
// Perhaps netdb should notify transport when it gets a new RI...
RouterInfo info = _context.netDb().lookupRouterInfoLocally(remote.calculateHash());
if (info != null) {
RouterAddress addr = info.getTargetAddress(UDPTransport.STYLE);
if (addr != null) {
String smtu = addr.getOption(UDPAddress.PROP_MTU);
if (smtu != null) {
try {
int mtu = MTU.rectify(Integer.parseInt(smtu));
peer.setHisMTU(mtu);
} catch (NumberFormatException nfe) {}
}
}
}
// 0 is the default
//peer.setTheyRelayToUsAs(0);
@@ -573,6 +624,17 @@ class EstablishmentManager {
_context.statManager().addRateData("udp.inboundEstablishTime", state.getLifetime(), 0);
sendInboundComplete(peer);
OutNetMessage msg;
while ((msg = state.getNextQueuedMessage()) != null) {
if (_context.clock().now() - Router.CLOCK_FUDGE_FACTOR > msg.getExpiration()) {
msg.timestamp("took too long but established...");
_transport.failed(msg, "Took too long to establish, but it was established");
} else {
msg.timestamp("session fully established and sent");
_transport.send(msg);
}
}
state.complete();
}
/**
@@ -634,6 +696,9 @@ class EstablishmentManager {
peer.setCurrentCipherKey(state.getCipherKey());
peer.setCurrentMACKey(state.getMACKey());
peer.setTheyRelayToUsAs(state.getReceivedRelayTag());
int mtu = state.getRemoteAddress().getMTU();
if (mtu > 0)
peer.setHisMTU(mtu);
// 0 is the default
//peer.setWeRelayToThemAs(0);
@@ -780,15 +845,22 @@ class EstablishmentManager {
if (_log.shouldLog(Log.INFO))
_log.info("Received RelayResponse for " + state.getRemoteIdentity().calculateHash() + " - they are on "
+ addr.toString() + ":" + port + " (according to " + bob + ")");
RemoteHostId oldId = state.getRemoteHostId();
state.introduced(addr, ip, port);
RemoteHostId newId = state.getRemoteHostId();
// Swap out the RemoteHostId the state is indexed under
// TODO only if !oldId.equals(newId) ? synch?
OutboundEstablishState oldState = _outboundStates.remove(oldId);
_outboundStates.put(newId, state);
if (_log.shouldLog(Log.DEBUG))
_log.debug("RR replaced " + oldId + " -> " + oldState + " with " + newId + " -> " + state);
synchronized (state) {
RemoteHostId oldId = state.getRemoteHostId();
state.introduced(addr, ip, port);
RemoteHostId newId = state.getRemoteHostId();
// Swap out the RemoteHostId the state is indexed under
// TODO only if !oldId.equals(newId) ? synch?
// FIXME if the RemoteHostIDs aren't the same we have problems
// FIXME if the RemoteHostIDs aren't the same the SessionCreated signature is probably going to fail
// Common occurrence - port changes
if (!oldId.equals(newId)) {
_outboundStates.remove(oldId);
_outboundStates.put(newId, state);
if (_log.shouldLog(Log.WARN))
_log.warn("RR replaced " + oldId + " with " + newId + " -> " + state);
}
}
notifyActivity();
}
@@ -923,7 +995,9 @@ class EstablishmentManager {
synchronized (inboundState) {
switch (inboundState.getState()) {
case IB_STATE_REQUEST_RECEIVED:
if (!expired)
if (expired)
processExpired(inboundState);
else
sendCreated(inboundState);
break;
@@ -931,6 +1005,7 @@ class EstablishmentManager {
case IB_STATE_CONFIRMED_PARTIALLY:
if (expired) {
sendDestroy(inboundState);
processExpired(inboundState);
} else if (inboundState.getNextSendTime() <= now) {
sendCreated(inboundState);
}
@@ -945,6 +1020,7 @@ class EstablishmentManager {
// So next time we will not accept the con, rather than doing the whole handshake
_context.blocklist().add(inboundState.getSentIP());
inboundState.fail();
processExpired(inboundState);
} else {
handleCompletelyEstablished(inboundState);
}
@@ -952,9 +1028,11 @@ class EstablishmentManager {
if (_log.shouldLog(Log.WARN))
_log.warn("confirmed with invalid? " + inboundState);
inboundState.fail();
processExpired(inboundState);
}
break;
case IB_STATE_COMPLETE: // fall through
case IB_STATE_FAILED:
break; // already removed;
@@ -1118,10 +1196,8 @@ class EstablishmentManager {
if (outboundState.getState() != OB_STATE_CONFIRMED_COMPLETELY) {
if (_log.shouldLog(Log.INFO))
_log.info("Lifetime of expired outbound establish: " + outboundState.getLifetime());
while (true) {
OutNetMessage msg = outboundState.getNextQueuedMessage();
if (msg == null)
break;
OutNetMessage msg;
while ((msg = outboundState.getNextQueuedMessage()) != null) {
_transport.failed(msg, "Expired during failed establish");
}
String err = "Took too long to establish OB connection, state = " + outboundState.getState();
@@ -1131,15 +1207,25 @@ class EstablishmentManager {
_transport.dropPeer(peer, false, err);
//_context.profileManager().commErrorOccurred(peer);
} else {
while (true) {
OutNetMessage msg = outboundState.getNextQueuedMessage();
if (msg == null)
break;
OutNetMessage msg;
while ((msg = outboundState.getNextQueuedMessage()) != null) {
_transport.send(msg);
}
}
}
/**
* Caller should probably synch on inboundState
* @since 0.9.2
*/
private void processExpired(InboundEstablishState inboundState) {
OutNetMessage msg;
while ((msg = inboundState.getNextQueuedMessage()) != null) {
_transport.failed(msg, "Expired during failed establish");
}
}
/**
* Driving thread, processing up to one step for an inbound peer and up to
* one step for an outbound peer. This is prodded whenever any peer's state

View File

@@ -2,6 +2,8 @@ package net.i2p.router.transport.udp;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.data.Base64;
import net.i2p.data.ByteArray;
@@ -10,6 +12,7 @@ import net.i2p.data.DataHelper;
import net.i2p.data.RouterIdentity;
import net.i2p.data.SessionKey;
import net.i2p.data.Signature;
import net.i2p.router.OutNetMessage;
import net.i2p.router.RouterContext;
import net.i2p.router.transport.crypto.DHSessionKeyBuilder;
import net.i2p.util.Addresses;
@@ -52,7 +55,7 @@ class InboundEstablishState {
private long _nextSend;
private final RemoteHostId _remoteHostId;
private InboundState _currentState;
private boolean _complete;
private final Queue<OutNetMessage> _queuedMessages;
// count for backoff
private int _createdSentCount;
@@ -69,7 +72,9 @@ class InboundEstablishState {
/** we have all the confirmation packets */
IB_STATE_CONFIRMED_COMPLETELY,
/** we are explicitly failing it */
IB_STATE_FAILED
IB_STATE_FAILED,
/** Successful completion, PeerState created and added to transport */
IB_STATE_COMPLETE
}
/** basic delay before backoff */
@@ -89,17 +94,44 @@ class InboundEstablishState {
_currentState = InboundState.IB_STATE_UNKNOWN;
_establishBegin = ctx.clock().now();
_keyBuilder = dh;
_queuedMessages = new LinkedBlockingQueue();
}
public synchronized InboundState getState() { return _currentState; }
/** @return if previously complete */
public synchronized boolean complete() {
boolean already = _complete;
_complete = true;
return already;
public synchronized boolean isComplete() {
return _currentState == InboundState.IB_STATE_COMPLETE ||
_currentState == InboundState.IB_STATE_FAILED;
}
/** Notify successful completion */
public synchronized void complete() {
_currentState = InboundState.IB_STATE_COMPLETE;
}
/**
* Queue a message to be sent after the session is established.
* This will only happen if we decide to send something during establishment
* @since 0.9.2
*/
public void addMessage(OutNetMessage msg) {
// chance of a duplicate here in a race, that's ok
if (!_queuedMessages.contains(msg))
_queuedMessages.offer(msg);
else if (_log.shouldLog(Log.WARN))
_log.warn("attempt to add duplicate msg to queue: " + msg);
}
/**
* Pull from the message queue
* @return null if none
* @since 0.9.2
*/
public OutNetMessage getNextQueuedMessage() {
return _queuedMessages.poll();
}
public synchronized void receiveSessionRequest(UDPPacketReader.SessionRequestReader req) {
if (_receivedX == null)
_receivedX = new byte[UDPPacketReader.SessionRequestReader.X_LENGTH];
@@ -200,8 +232,8 @@ class InboundEstablishState {
if (_log.shouldLog(Log.DEBUG)) {
StringBuilder buf = new StringBuilder(128);
buf.append("Signing sessionCreated:");
buf.append(" ReceivedX: ").append(Base64.encode(_receivedX));
buf.append(" SentY: ").append(Base64.encode(_sentY));
//buf.append(" ReceivedX: ").append(Base64.encode(_receivedX));
//buf.append(" SentY: ").append(Base64.encode(_sentY));
buf.append(" Alice: ").append(Addresses.toString(_aliceIP, _alicePort));
buf.append(" Bob: ").append(Addresses.toString(_bobIP, _bobPort));
buf.append(" RelayTag: ").append(_sentRelayTag);
@@ -370,15 +402,15 @@ class InboundEstablishState {
@Override
public String toString() {
StringBuilder buf = new StringBuilder(128);
buf.append("IES ").append(super.toString());
buf.append("IES ");
buf.append(Addresses.toString(_aliceIP, _alicePort));
if (_receivedX != null)
buf.append(" ReceivedX: ").append(Base64.encode(_receivedX, 0, 4));
if (_sentY != null)
buf.append(" SentY: ").append(Base64.encode(_sentY, 0, 4));
buf.append(" Alice: ").append(Addresses.toString(_aliceIP, _alicePort));
buf.append(" Bob: ").append(Addresses.toString(_bobIP, _bobPort));
//buf.append(" Bob: ").append(Addresses.toString(_bobIP, _bobPort));
buf.append(" RelayTag: ").append(_sentRelayTag);
buf.append(" SignedOn: ").append(_sentSignedOnTime);
//buf.append(" SignedOn: ").append(_sentSignedOnTime);
buf.append(' ').append(_currentState);
return buf.toString();
}

View File

@@ -81,7 +81,7 @@ class IntroductionManager {
}
}
public PeerState get(long id) {
private PeerState get(long id) {
return _outbound.get(Long.valueOf(id));
}
@@ -187,28 +187,54 @@ class IntroductionManager {
return _inbound.size();
}
/**
* We are Charlie and we got this from Bob.
* Send a HolePunch to Alice, who will soon be sending us a RelayRequest.
* We should already have a session with Bob, but probably not with Alice.
*
* We do some throttling here.
*/
void receiveRelayIntro(RemoteHostId bob, UDPPacketReader reader) {
if (_context.router().isHidden())
return;
if (_log.shouldLog(Log.INFO))
_log.info("Receive relay intro from " + bob);
_context.statManager().addRateData("udp.receiveRelayIntro", 1, 0);
if (!_transport.allowConnection())
return;
// TODO throttle
// TODO IB req limits
// TODO check if already have a session or in progress state.
_transport.send(_builder.buildHolePunch(reader));
}
/**
* We are Bob and we got this from Alice.
* Send a RelayIntro to Charlie and a RelayResponse to Alice.
* We should already have a session with Charlie, but not necessarily with Alice.
*/
void receiveRelayRequest(RemoteHostId alice, UDPPacketReader reader) {
if (_context.router().isHidden())
return;
long tag = reader.getRelayRequestReader().readTag();
PeerState charlie = _transport.getPeerState(tag);
PeerState charlie = get(tag);
if (charlie == null) {
if (_log.shouldLog(Log.INFO))
_log.info("Receive relay request from " + alice
+ " with unknown tag");
_context.statManager().addRateData("udp.receiveRelayRequestBadTag", 1, 0);
return;
}
if (_log.shouldLog(Log.INFO))
_log.info("Receive relay request from " + alice
+ " for tag " + tag
+ " and relaying with " + charlie);
if (charlie == null) {
_context.statManager().addRateData("udp.receiveRelayRequestBadTag", 1, 0);
return;
}
// TODO throttle based on alice identity and/or intro tag?
_context.statManager().addRateData("udp.receiveRelayRequest", 1, 0);
byte key[] = new byte[SessionKey.KEYSIZE_BYTES];
reader.getRelayRequestReader().readAliceIntroKey(key, 0);

View File

@@ -90,6 +90,9 @@ class OutboundEstablishState {
/** max delay including backoff */
private static final long MAX_DELAY = 15*1000;
/**
* @param addr non-null
*/
public OutboundEstablishState(RouterContext ctx, InetAddress remoteHost, int remotePort,
RouterIdentity remotePeer, SessionKey introKey, UDPAddress addr,
DHSessionKeyBuilder dh) {
@@ -113,7 +116,7 @@ class OutboundEstablishState {
_keyBuilder = dh;
_sentX = new byte[UDPPacketReader.SessionRequestReader.X_LENGTH];
prepareSessionRequest();
if ( (addr != null) && (addr.getIntroducerCount() > 0) ) {
if (addr.getIntroducerCount() > 0) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("new outbound establish to " + remotePeer.calculateHash() + ", with address: " + addr);
_currentState = OutboundState.OB_STATE_PENDING_INTRO;
@@ -131,12 +134,17 @@ class OutboundEstablishState {
return already;
}
/** @return non-null */
public UDPAddress getRemoteAddress() { return _remoteAddress; }
public void setIntroNonce(long nonce) { _introductionNonce = nonce; }
/** @return -1 if unset */
public long getIntroNonce() { return _introductionNonce; }
/**
* Queue a message to be sent after the session is established.
*/
public void addMessage(OutNetMessage msg) {
// chance of a duplicate here in a race, that's ok
if (!_queuedMessages.contains(msg))
@@ -190,12 +198,12 @@ class OutboundEstablishState {
reader.readIV(_receivedIV, 0);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive session created:\neSig: " + Base64.encode(_receivedEncryptedSignature)
+ "\nreceivedIV: " + Base64.encode(_receivedIV)
+ "\nAliceIP: " + Addresses.toString(_aliceIP)
_log.debug("Receive session created:Sig: " + Base64.encode(_receivedEncryptedSignature)
+ "receivedIV: " + Base64.encode(_receivedIV)
+ "AliceIP: " + Addresses.toString(_aliceIP)
+ " RelayTag: " + _receivedRelayTag
+ " SignedOn: " + _receivedSignedOnTime
+ "\nthis: " + this.toString());
+ ' ' + this.toString());
if (_currentState == OutboundState.OB_STATE_UNKNOWN ||
_currentState == OutboundState.OB_STATE_REQUEST_SENT ||
@@ -212,6 +220,8 @@ class OutboundEstablishState {
* receive another one
*
* Generates session key and mac key.
*
* @return true if valid
*/
public synchronized boolean validateSessionCreated() {
if (_receivedSignature != null) {
@@ -253,7 +263,6 @@ class OutboundEstablishState {
_receivedSignature = null;
if ( (_currentState == OutboundState.OB_STATE_UNKNOWN) ||
(_currentState == OutboundState.OB_STATE_REQUEST_SENT) ||
(_currentState == OutboundState.OB_STATE_CREATED_RECEIVED) )
_currentState = OutboundState.OB_STATE_REQUEST_SENT;
@@ -292,7 +301,7 @@ class OutboundEstablishState {
System.arraycopy(_receivedEncryptedSignature, 0, signatureBytes, 0, Signature.SIGNATURE_BYTES);
_receivedSignature = new Signature(signatureBytes);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Decrypted received signature: \n" + Base64.encode(signatureBytes));
_log.debug("Decrypted received signature: " + Base64.encode(signatureBytes));
}
/**
@@ -475,7 +484,8 @@ class OutboundEstablishState {
}
/**
* This changes the remoteHostId from a hash-based one to a IP/Port one
* This changes the remoteHostId from a hash-based one to a IP/Port one,
* OR the IP or port could change.
*/
public synchronized void introduced(InetAddress bob, byte bobIP[], int bobPort) {
if (_currentState != OutboundState.OB_STATE_PENDING_INTRO)

View File

@@ -156,7 +156,7 @@ class PeerState {
private int _mtu;
private int _mtuReceive;
/** what is the largest packet we will ever send to the peer? */
private final int _largeMTU;
private int _largeMTU;
/* how many consecutive packets at or under the min MTU have been received */
private long _consecutiveSmall;
/** when did we last check the MTU? */
@@ -987,8 +987,10 @@ class PeerState {
_messagesSent++;
if (numSends < 2) {
recalculateTimeouts(lifetime);
adjustMTU();
synchronized (this) {
recalculateTimeouts(lifetime);
adjustMTU();
}
}
else if (_log.shouldLog(Log.INFO))
_log.info("acked after numSends=" + numSends + " w/ lifetime=" + lifetime + " and size=" + bytesACKed);
@@ -996,7 +998,10 @@ class PeerState {
_context.statManager().addRateData("udp.sendBps", _sendBps, lifetime);
}
/** adjust the tcp-esque timeouts */
/**
* Adjust the tcp-esque timeouts.
* Caller should synch on this
*/
private void recalculateTimeouts(long lifetime) {
_rttDeviation = _rttDeviation + (int)(0.25d*(Math.abs(lifetime-_rtt)-_rttDeviation));
@@ -1017,6 +1022,9 @@ class PeerState {
_rto = MAX_RTO;
}
/**
* Caller should synch on this
*/
private void adjustMTU() {
double retransPct = 0;
if (_packetsTransmitted > 10) {
@@ -1037,6 +1045,17 @@ class PeerState {
_mtu = DEFAULT_MTU;
}
}
/**
* @since 0.9.2
*/
public synchronized void setHisMTU(int mtu) {
if (mtu <= MIN_MTU || mtu >= _largeMTU)
return;
_largeMTU = mtu;
if (mtu < _mtu)
_mtu = mtu;
}
/** we are resending a packet, so lets jack up the rto */
public void messageRetransmitted(int packets) {
@@ -1054,7 +1073,9 @@ class PeerState {
*****/
congestionOccurred();
_context.statManager().addRateData("udp.congestedRTO", _rto, _rttDeviation);
adjustMTU();
synchronized (this) {
adjustMTU();
}
//_rto *= 2;
}

View File

@@ -62,7 +62,7 @@ public class UDPAddress {
}
return rv.toString();
}
private void parse(RouterAddress addr) {
if (addr == null) return;
_host = addr.getOption(PROP_HOST);

View File

@@ -697,14 +697,6 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
return _peersByIdent.get(remotePeer);
}
/**
* get the state for the peer being introduced, or null if we aren't
* offering to introduce anyone with that tag.
*/
PeerState getPeerState(long relayTag) {
return _introManager.get(relayTag);
}
/**
* Intercept RouterInfo entries received directly from a peer to inject them into
* the PeersByCapacity listing.
@@ -1278,6 +1270,15 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
}
}
/**
* Send only if established, otherwise fail immediately.
* Never queue with the establisher.
* @since 0.9.2
*/
void sendIfEstablished(OutNetMessage msg) {
_fragments.add(msg);
}
void send(I2NPMessage msg, PeerState peer) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Injecting a data message to a new peer: " + peer);