forked from I2P_Developers/i2p.i2p
SSU: Hand all messages pending after establishment to the
outbound queue at once, for efficiency. This allows more aggressive combining of fragments in packets, and ensures the priority queue works as designed. Don't sort outbound messages by size, keep priority order instead. Log tweaks
This commit is contained in:
@@ -1,3 +1,8 @@
|
||||
2015-12-21 zzz
|
||||
* SSU: Hand all messages pending after establishment to the
|
||||
outbound queue at once, for efficiency.
|
||||
Don't sort outbound messages by size, keep priority order instead.
|
||||
|
||||
2015-12-20 zzz
|
||||
* BuildHandler: Additional fixes (ticket #1738)
|
||||
* CertUtil: Add methods to export private keys
|
||||
|
@@ -18,7 +18,7 @@ public class RouterVersion {
|
||||
/** deprecated */
|
||||
public final static String ID = "Monotone";
|
||||
public final static String VERSION = CoreVersion.VERSION;
|
||||
public final static long BUILD = 17;
|
||||
public final static long BUILD = 18;
|
||||
|
||||
/** for example "-test" */
|
||||
public final static String EXTRA = "";
|
||||
|
@@ -770,7 +770,7 @@ class EstablishmentManager {
|
||||
// so it needs to be caught in InNetMessagePool.
|
||||
dsm.setMessageExpiration(_context.clock().now() + DATA_MESSAGE_TIMEOUT);
|
||||
dsm.setMessageId(_context.random().nextLong(I2NPMessage.MAX_ID_VALUE));
|
||||
_transport.send(dsm, peer);
|
||||
// sent below
|
||||
|
||||
// just do this inline
|
||||
//_context.simpleTimer2().addEvent(new PublishToNewInbound(peer), 0);
|
||||
@@ -780,8 +780,14 @@ class EstablishmentManager {
|
||||
// ok, we are fine with them, send them our latest info
|
||||
//if (_log.shouldLog(Log.INFO))
|
||||
// _log.info("Publishing to the peer after confirm plus delay (without banlist): " + peer);
|
||||
sendOurInfo(peer, true);
|
||||
// bundle the two messages together for efficiency
|
||||
DatabaseStoreMessage dbsm = getOurInfo();
|
||||
List<I2NPMessage> msgs = new ArrayList<I2NPMessage>(2);
|
||||
msgs.add(dsm);
|
||||
msgs.add(dbsm);
|
||||
_transport.send(msgs, peer);
|
||||
} else {
|
||||
_transport.send(dsm, peer);
|
||||
// nuh uh.
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("NOT publishing to the peer after confirm plus delay (WITH banlist): " + (hash != null ? hash.toString() : "unknown"));
|
||||
@@ -828,12 +834,14 @@ class EstablishmentManager {
|
||||
_transport.setIP(remote.calculateHash(), state.getSentIP());
|
||||
|
||||
_context.statManager().addRateData("udp.outboundEstablishTime", state.getLifetime(), 0);
|
||||
DatabaseStoreMessage dbsm = null;
|
||||
if (!state.isFirstMessageOurDSM()) {
|
||||
sendOurInfo(peer, false);
|
||||
dbsm = getOurInfo();
|
||||
} else if (_log.shouldLog(Log.INFO)) {
|
||||
_log.info("Skipping publish: " + state);
|
||||
}
|
||||
|
||||
List<OutNetMessage> msgs = new ArrayList<OutNetMessage>(8);
|
||||
OutNetMessage msg;
|
||||
while ((msg = state.getNextQueuedMessage()) != null) {
|
||||
if (now - Router.CLOCK_FUDGE_FACTOR > msg.getExpiration()) {
|
||||
@@ -841,21 +849,33 @@ class EstablishmentManager {
|
||||
_transport.failed(msg, "Took too long to establish, but it was established");
|
||||
} else {
|
||||
msg.timestamp("session fully established and sent");
|
||||
_transport.send(msg);
|
||||
msgs.add(msg);
|
||||
}
|
||||
}
|
||||
_transport.send(dbsm, msgs, peer);
|
||||
return peer;
|
||||
}
|
||||
|
||||
/****
|
||||
private void sendOurInfo(PeerState peer, boolean isInbound) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Publishing to the peer after confirm: " +
|
||||
(isInbound ? " inbound con from " + peer : "outbound con to " + peer));
|
||||
|
||||
DatabaseStoreMessage m = getOurInfo();
|
||||
_transport.send(m, peer);
|
||||
}
|
||||
****/
|
||||
|
||||
/**
|
||||
* A database store message with our router info
|
||||
* @return non-null
|
||||
* @since 0.9.24 split from sendOurInfo()
|
||||
*/
|
||||
private DatabaseStoreMessage getOurInfo() {
|
||||
DatabaseStoreMessage m = new DatabaseStoreMessage(_context);
|
||||
m.setEntry(_context.router().getRouterInfo());
|
||||
m.setMessageExpiration(_context.clock().now() + DATA_MESSAGE_TIMEOUT);
|
||||
_transport.send(m, peer);
|
||||
return m;
|
||||
}
|
||||
|
||||
/** the relay tag is a 4-byte field in the protocol */
|
||||
|
@@ -172,11 +172,12 @@ class OutboundMessageFragments {
|
||||
}
|
||||
|
||||
/**
|
||||
* short circuit the OutNetMessage, letting us send the establish
|
||||
* complete message reliably
|
||||
* Short circuit the OutNetMessage, letting us send the establish
|
||||
* complete message reliably.
|
||||
* If you have multiple messages, use the list variant,
|
||||
* so the messages may be bundled efficiently.
|
||||
*/
|
||||
public void add(OutboundMessageState state) {
|
||||
PeerState peer = state.getPeer();
|
||||
public void add(OutboundMessageState state, PeerState peer) {
|
||||
if (peer == null)
|
||||
throw new RuntimeException("null peer for " + state);
|
||||
peer.add(state);
|
||||
@@ -184,6 +185,22 @@ class OutboundMessageFragments {
|
||||
//_context.statManager().addRateData("udp.outboundActiveCount", active, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Short circuit the OutNetMessage, letting us send multiple messages
|
||||
* reliably and efficiently.
|
||||
* @since 0.9.24
|
||||
*/
|
||||
public void add(List<OutboundMessageState> states, PeerState peer) {
|
||||
if (peer == null)
|
||||
throw new RuntimeException("null peer");
|
||||
int sz = states.size();
|
||||
for (int i = 0; i < sz; i++) {
|
||||
peer.add(states.get(i));
|
||||
}
|
||||
add(peer);
|
||||
//_context.statManager().addRateData("udp.outboundActiveCount", active, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add the peer to the list of peers wanting to transmit something.
|
||||
* This wakes up the packet pusher if it is sleeping.
|
||||
@@ -400,8 +417,10 @@ class OutboundMessageFragments {
|
||||
int fragmentsToSend = toSend.size();
|
||||
// sort by size, biggest first
|
||||
// don't bother unless more than one state (fragments are already sorted within a state)
|
||||
if (fragmentsToSend > 1 && states.size() > 1)
|
||||
Collections.sort(toSend, new FragmentComparator());
|
||||
// This puts the DeliveryStatusMessage after the DatabaseStoreMessage, don't do it for now.
|
||||
// It also undoes the ordering of the priority queue in PeerState.
|
||||
//if (fragmentsToSend > 1 && states.size() > 1)
|
||||
// Collections.sort(toSend, new FragmentComparator());
|
||||
|
||||
List<Fragment> sendNext = new ArrayList<Fragment>(Math.min(toSend.size(), 4));
|
||||
List<UDPPacket> rv = new ArrayList<UDPPacket>(toSend.size());
|
||||
@@ -490,6 +509,7 @@ class OutboundMessageFragments {
|
||||
* Biggest first
|
||||
* @since 0.9.16
|
||||
*/
|
||||
/****
|
||||
private static class FragmentComparator implements Comparator<Fragment>, Serializable {
|
||||
|
||||
public int compare(Fragment l, Fragment r) {
|
||||
@@ -497,7 +517,9 @@ class OutboundMessageFragments {
|
||||
return r.state.fragmentSize(r.num) - l.state.fragmentSize(l.num);
|
||||
}
|
||||
}
|
||||
****/
|
||||
|
||||
/** throttle */
|
||||
public interface ActiveThrottle {
|
||||
public void choke(Hash peer);
|
||||
public void unchoke(Hash peer);
|
||||
|
@@ -305,7 +305,7 @@ class OutboundMessageState implements CDPQEntry {
|
||||
buf.append(i).append(' ');
|
||||
}
|
||||
}
|
||||
buf.append(" to: ").append(_peer.toString());
|
||||
//buf.append(" to: ").append(_peer.toString());
|
||||
return buf.toString();
|
||||
}
|
||||
}
|
||||
|
@@ -1828,14 +1828,74 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
|
||||
}
|
||||
|
||||
/**
|
||||
* "injected" message from the EstablishmentManager
|
||||
* "injected" message from the EstablishmentManager.
|
||||
* If you have multiple messages, use the list variant,
|
||||
* so the messages may be bundled efficiently.
|
||||
*
|
||||
* @param peer all messages MUST be going to this peer
|
||||
*/
|
||||
void send(I2NPMessage msg, PeerState peer) {
|
||||
try {
|
||||
OutboundMessageState state = new OutboundMessageState(_context, msg, peer);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Injecting a data message to a new peer: " + peer);
|
||||
_fragments.add(state);
|
||||
_fragments.add(state, peer);
|
||||
} catch (IllegalArgumentException iae) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Shouldnt happen", new Exception("I did it"));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* "injected" message from the EstablishmentManager,
|
||||
* plus pending messages to send,
|
||||
* so the messages may be bundled efficiently.
|
||||
* Called at end of outbound establishment.
|
||||
*
|
||||
* @param msg may be null if nothing to inject
|
||||
* @param msgs non-null, may be empty
|
||||
* @param peer all messages MUST be going to this peer
|
||||
* @since 0.9.24
|
||||
*/
|
||||
void send(I2NPMessage msg, List<OutNetMessage> msgs, PeerState peer) {
|
||||
try {
|
||||
int sz = msgs.size();
|
||||
List<OutboundMessageState> states = new ArrayList<OutboundMessageState>(sz + 1);
|
||||
if (msg != null) {
|
||||
OutboundMessageState state = new OutboundMessageState(_context, msg, peer);
|
||||
states.add(state);
|
||||
}
|
||||
for (int i = 0; i < sz; i++) {
|
||||
OutboundMessageState state = new OutboundMessageState(_context, msgs.get(i), peer);
|
||||
states.add(state);
|
||||
}
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Injecting " + states.size() + " data messages to a new peer: " + peer);
|
||||
_fragments.add(states, peer);
|
||||
} catch (IllegalArgumentException iae) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Shouldnt happen", new Exception("I did it"));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* "injected" messages from the EstablishmentManager.
|
||||
* Called at end of inbound establishment.
|
||||
*
|
||||
* @param peer all messages MUST be going to this peer
|
||||
* @since 0.9.24
|
||||
*/
|
||||
void send(List<I2NPMessage> msgs, PeerState peer) {
|
||||
try {
|
||||
int sz = msgs.size();
|
||||
List<OutboundMessageState> states = new ArrayList<OutboundMessageState>(sz);
|
||||
for (int i = 0; i < sz; i++) {
|
||||
OutboundMessageState state = new OutboundMessageState(_context, msgs.get(i), peer);
|
||||
states.add(state);
|
||||
}
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Injecting " + sz + " data messages to a new peer: " + peer);
|
||||
_fragments.add(states, peer);
|
||||
} catch (IllegalArgumentException iae) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Shouldnt happen", new Exception("I did it"));
|
||||
|
Reference in New Issue
Block a user