- Round expiration times when converting to seconds
    - Zero-copy of single-fragment messages in MessageReceiver
    - Optimizations, log tweaks, comments
This commit is contained in:
zzz
2011-12-09 16:43:54 +00:00
parent 6d4a9abd35
commit 937ae8ad60
5 changed files with 90 additions and 66 deletions

View File

@@ -138,8 +138,8 @@ class ACKSender implements Runnable {
// bulk operations may throw an exception
_peersToACK.addAll(notYet);
} catch (Exception e) {}
if (_log.shouldLog(Log.INFO))
_log.info("sleeping, pending size = " + notYet.size());
if (_log.shouldLog(Log.DEBUG))
_log.debug("sleeping, pending size = " + notYet.size());
notYet.clear();
try {
// sleep a little longer than the divided frequency,

View File

@@ -69,6 +69,17 @@ class InboundMessageState {
try {
data.readMessageFragment(dataFragment, message.getData(), 0);
int size = data.readMessageFragmentSize(dataFragment);
if (size <= 0) {
// Bug in routers prior to 0.8.12
// If the msg size was an exact multiple of the fragment size,
// it would send a zero-length last fragment.
// This message is almost certainly doomed.
// We might as well ack it, keep going, and pass it along to I2NP where it
// will get dropped as corrupted.
// If we don't ack the fragment he will just send a zero-length fragment again.
if (_log.shouldLog(Log.WARN))
_log.warn("Zero-length fragment " + fragmentNum + " for message " + _messageId + " from " + _from);
}
message.setValid(size);
_fragments[fragmentNum] = message;
boolean isLast = data.readMessageIsLast(dataFragment);
@@ -91,7 +102,7 @@ class InboundMessageState {
_log.debug("New fragment " + fragmentNum + " for message " + _messageId
+ ", size=" + size
+ ", isLast=" + isLast
+ ", data=" + Base64.encode(message.getData(), 0, size));
/* + ", data=" + Base64.encode(message.getData(), 0, size) */ );
} catch (ArrayIndexOutOfBoundsException aioobe) {
_log.warn("Corrupt SSU fragment " + fragmentNum, aioobe);
return false;
@@ -106,11 +117,15 @@ class InboundMessageState {
}
/**
* May not be valid after released
* May not be valid after released.
* Probably doesn't need to be synced by caller, given the order of
* events in receiveFragment() above, but you might want to anyway
* to be safe.
*/
public boolean isComplete() {
if (_lastFragment < 0) return false;
for (int i = 0; i <= _lastFragment; i++)
int last = _lastFragment;
if (last < 0) return false;
for (int i = 0; i <= last; i++)
if (_fragments[i] == null)
return false;
return true;

View File

@@ -11,6 +11,7 @@ import net.i2p.data.i2np.I2NPMessageHandler;
import net.i2p.data.i2np.I2NPMessageImpl;
import net.i2p.router.RouterContext;
//import net.i2p.util.ByteCache;
import net.i2p.util.HexDump;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
@@ -53,8 +54,8 @@ class MessageReceiver {
_context.statManager().createRateStat("udp.inboundExpired", "How many messages were expired before reception?", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.inboundRemaining", "How many messages were remaining when a message is pulled off the complete queue?", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.inboundReady", "How many messages were ready when a message is added to the complete queue?", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.inboundReadTime", "How long it takes to parse in the completed fragments into a message?", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.inboundReceiveProcessTime", "How long it takes to add the message to the transport?", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.inboundReadTime", "How long it takes to parse in the completed fragments into a message?", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.inboundReceiveProcessTime", "How long it takes to add the message to the transport?", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.inboundLag", "How long the olded ready message has been sitting on the queue (period is the queue size)?", "udp", UDPTransport.RATES);
_alive = true;
@@ -136,16 +137,16 @@ class MessageReceiver {
_context.statManager().addRateData("udp.inboundExpired", expired, expiredLifetime);
if (message != null) {
long before = System.currentTimeMillis();
//long before = System.currentTimeMillis();
//if (remaining > 0)
// _context.statManager().addRateData("udp.inboundRemaining", remaining, 0);
int size = message.getCompleteSize();
if (_log.shouldLog(Log.INFO))
_log.info("Full message received (" + message.getMessageId() + ") after " + message.getLifetime());
long afterRead = -1;
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Full message received (" + message.getMessageId() + ") after " + message.getLifetime());
//long afterRead = -1;
try {
I2NPMessage msg = readMessage(buf, message, handler);
afterRead = System.currentTimeMillis();
//afterRead = System.currentTimeMillis();
if (msg != null)
_transport.messageReceived(msg, null, message.getFrom(), message.getLifetime(), size);
} catch (RuntimeException re) {
@@ -153,11 +154,11 @@ class MessageReceiver {
continue;
}
message = null;
long after = System.currentTimeMillis();
if (afterRead - before > 100)
_context.statManager().addRateData("udp.inboundReadTime", afterRead - before, remaining);
if (after - afterRead > 100)
_context.statManager().addRateData("udp.inboundReceiveProcessTime", after - afterRead, remaining);
//long after = System.currentTimeMillis();
//if (afterRead - before > 100)
// _context.statManager().addRateData("udp.inboundReadTime", afterRead - before, remaining);
//if (after - afterRead > 100)
// _context.statManager().addRateData("udp.inboundReceiveProcessTime", after - afterRead, remaining);
}
}
@@ -168,36 +169,50 @@ class MessageReceiver {
/**
* Assemble all the fragments into an I2NP message.
* This calls state.releaseResources(), do not access state after calling this.
*
* @param buf temp buffer for convenience
* @return null on error
*/
private I2NPMessage readMessage(ByteArray buf, InboundMessageState state, I2NPMessageHandler handler) {
try {
//byte buf[] = new byte[state.getCompleteSize()];
ByteArray fragments[] = state.getFragments();
I2NPMessage m;
int numFragments = state.getFragmentCount();
int off = 0;
for (int i = 0; i < numFragments; i++) {
System.arraycopy(fragments[i].getData(), 0, buf.getData(), off, fragments[i].getValid());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Raw fragment[" + i + "] for " + state.getMessageId() + ": "
+ Base64.encode(fragments[i].getData(), 0, fragments[i].getValid())
+ " (valid: " + fragments[i].getValid()
+ " raw: " + Base64.encode(fragments[i].getData()) + ")");
off += fragments[i].getValid();
if (numFragments > 1) {
ByteArray fragments[] = state.getFragments();
int off = 0;
for (int i = 0; i < numFragments; i++) {
System.arraycopy(fragments[i].getData(), 0, buf.getData(), off, fragments[i].getValid());
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Raw fragment[" + i + "] for " + state.getMessageId() + ": "
// + Base64.encode(fragments[i].getData(), 0, fragments[i].getValid())
// + " (valid: " + fragments[i].getValid()
// + " raw: " + Base64.encode(fragments[i].getData()) + ")");
off += fragments[i].getValid();
}
if (off != state.getCompleteSize()) {
if (_log.shouldLog(Log.WARN))
_log.warn("Hmm, offset of the fragments = " + off + " while the state says " + state.getCompleteSize());
return null;
}
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Raw byte array for " + state.getMessageId() + ": " + HexDump.dump(buf.getData(), 0, state.getCompleteSize()));
m = I2NPMessageImpl.fromRawByteArray(_context, buf.getData(), 0, state.getCompleteSize(), handler);
} else {
// zero copy for single fragment
m = I2NPMessageImpl.fromRawByteArray(_context, state.getFragments()[0].getData(), 0, state.getCompleteSize(), handler);
}
if (off != state.getCompleteSize()) {
if (_log.shouldLog(Log.WARN))
_log.warn("Hmm, offset of the fragments = " + off + " while the state says " + state.getCompleteSize());
return null;
if (state.getCompleteSize() == 534 && _log.shouldLog(Log.INFO)) {
_log.info(HexDump.dump(buf.getData(), 0, state.getCompleteSize()));
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Raw byte array for " + state.getMessageId() + ": " + Base64.encode(buf.getData(), 0, state.getCompleteSize()));
I2NPMessage m = I2NPMessageImpl.fromRawByteArray(_context, buf.getData(), 0, state.getCompleteSize(), handler);
m.setUniqueId(state.getMessageId());
return m;
} catch (I2NPMessageException ime) {
if (_log.shouldLog(Log.WARN))
if (_log.shouldLog(Log.WARN)) {
_log.warn("Message invalid: " + state, ime);
_log.warn(HexDump.dump(buf.getData(), 0, state.getCompleteSize()));
_log.warn("RAW: " + Base64.encode(buf.getData(), 0, state.getCompleteSize()));
}
_context.messageHistory().droppedInboundMessage(state.getMessageId(), state.getFrom(), "error: " + ime.toString() + ": " + state.toString());
return null;
} catch (Exception e) {

View File

@@ -238,6 +238,10 @@ class PacketBuilder {
packet.release();
return null;
}
if (dataSize == 0) {
// OK according to the protocol but if we send it, it's a bug
_log.error("Sending zero-size fragment " + fragment + " of " + state + " for " + peer);
}
int currentMTU = peer.getMTU();
int availableForAcks = currentMTU - MIN_DATA_PACKET_OVERHEAD - dataSize;
int availableForExplicitAcks = availableForAcks;

View File

@@ -1169,11 +1169,10 @@ class PeerState {
state.setPeer(this);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Adding to " + _remotePeer.toBase64() + ": " + state.getMessageId());
List<OutboundMessageState> msgs = _outboundMessages;
int rv = 0;
boolean fail = false;
synchronized (msgs) {
rv = msgs.size() + 1;
synchronized (_outboundMessages) {
rv = _outboundMessages.size() + 1;
if (rv > 32) {
// 32 queued messages? to *one* peer? nuh uh.
fail = true;
@@ -1218,7 +1217,7 @@ class PeerState {
*******/
} else {
msgs.add(state);
_outboundMessages.add(state);
}
}
if (fail)
@@ -1230,17 +1229,16 @@ class PeerState {
public void dropOutbound() {
//if (_dead) return;
_dead = true;
List<OutboundMessageState> msgs = _outboundMessages;
//_outboundMessages = null;
_retransmitter = null;
int sz = 0;
List<OutboundMessageState> tempList = null;
synchronized (msgs) {
sz = msgs.size();
synchronized (_outboundMessages) {
sz = _outboundMessages.size();
if (sz > 0) {
tempList = new ArrayList(msgs);
msgs.clear();
tempList = new ArrayList(_outboundMessages);
_outboundMessages.clear();
}
}
for (int i = 0; i < sz; i++)
@@ -1263,9 +1261,8 @@ class PeerState {
* @return number of active outbound messages remaining
*/
public int finishMessages() {
List<OutboundMessageState> msgs = _outboundMessages;
// short circuit, unsynchronized
if (msgs.isEmpty())
if (_outboundMessages.isEmpty())
return 0;
if (_dead) {
@@ -1276,8 +1273,8 @@ class PeerState {
int rv = 0;
List<OutboundMessageState> succeeded = null;
List<OutboundMessageState> failed = null;
synchronized (msgs) {
for (Iterator<OutboundMessageState> iter = msgs.iterator(); iter.hasNext(); ) {
synchronized (_outboundMessages) {
for (Iterator<OutboundMessageState> iter = _outboundMessages.iterator(); iter.hasNext(); ) {
OutboundMessageState state = iter.next();
if (state.isComplete()) {
iter.remove();
@@ -1301,7 +1298,7 @@ class PeerState {
failed.add(state);
} // end (pushCount > maxVolleys)
} // end iterating over outbound messages
rv = msgs.size();
rv = _outboundMessages.size();
}
for (int i = 0; succeeded != null && i < succeeded.size(); i++) {
@@ -1337,11 +1334,9 @@ class PeerState {
*
*/
public OutboundMessageState allocateSend() {
int total = 0;
List<OutboundMessageState> msgs = _outboundMessages;
if (_dead) return null;
synchronized (msgs) {
for (OutboundMessageState state : msgs) {
synchronized (_outboundMessages) {
for (OutboundMessageState state : _outboundMessages) {
if (locked_shouldSend(state)) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Allocate sending to " + _remotePeer.toBase64() + ": " + state.getMessageId());
@@ -1360,10 +1355,9 @@ class PeerState {
msg.timestamp("passed over for allocation with " + msgs.size() + " peers");
} */
}
total = msgs.size();
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Nothing to send to " + _remotePeer.toBase64() + ", with " + total + " remaining");
_log.debug("Nothing to send to " + _remotePeer.toBase64() + ", with " + _outboundMessages.size() + " remaining");
return null;
}
@@ -1375,13 +1369,12 @@ class PeerState {
int rv = Integer.MAX_VALUE;
if (_dead) return rv;
long now = _context.clock().now();
List<OutboundMessageState> msgs = _outboundMessages;
synchronized (msgs) {
synchronized (_outboundMessages) {
if (_retransmitter != null) {
rv = (int)(_retransmitter.getNextSendTime() - now);
return rv;
}
for (OutboundMessageState state : msgs) {
for (OutboundMessageState state : _outboundMessages) {
int delay = (int)(state.getNextSendTime() - now);
if (delay < rv)
rv = delay;
@@ -1512,9 +1505,8 @@ class PeerState {
public boolean acked(long messageId) {
if (_dead) return false;
OutboundMessageState state = null;
List<OutboundMessageState> msgs = _outboundMessages;
synchronized (msgs) {
for (Iterator<OutboundMessageState> iter = msgs.iterator(); iter.hasNext(); ) {
synchronized (_outboundMessages) {
for (Iterator<OutboundMessageState> iter = _outboundMessages.iterator(); iter.hasNext(); ) {
state = iter.next();
if (state.getMessageId() == messageId) {
iter.remove();
@@ -1574,12 +1566,10 @@ class PeerState {
return acked(bitfield.getMessageId());
}
List<OutboundMessageState> msgs = _outboundMessages;
OutboundMessageState state = null;
boolean isComplete = false;
synchronized (msgs) {
for (Iterator<OutboundMessageState> iter = msgs.iterator(); iter.hasNext(); ) {
synchronized (_outboundMessages) {
for (Iterator<OutboundMessageState> iter = _outboundMessages.iterator(); iter.hasNext(); ) {
state = iter.next();
if (state.getMessageId() == bitfield.getMessageId()) {
boolean complete = state.acked(bitfield);