- Catch some cases where UDPPackets were not returned to the cache (ticket #675)
   - Fix inverted logging logic (ticket #663)
   - Remove check in UDPPacket.getLifetime() (ticket #664)
   - RemoteHostID cleanup, cache hashcode
   - Remove udp.fetchRemoteSlow stat
   - Remove some time stamping in UDPPacket
   - Other cleanups - see http://zzz.i2p/topics/1198
This commit is contained in:
zzz
2012-08-03 14:25:32 +00:00
parent 18e8d35910
commit 501651125f
10 changed files with 143 additions and 106 deletions

View File

@@ -751,7 +751,7 @@ class EstablishmentManager {
int port = reader.getRelayResponseReader().readCharliePort(); int port = reader.getRelayResponseReader().readCharliePort();
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Received relay intro for " + state.getRemoteIdentity().calculateHash().toBase64() + " - they are on " _log.info("Received relay intro for " + state.getRemoteIdentity().calculateHash().toBase64() + " - they are on "
+ addr.toString() + ":" + port + " (according to " + bob.toString(true) + ")"); + addr.toString() + ":" + port + " (according to " + bob + ")");
RemoteHostId oldId = state.getRemoteHostId(); RemoteHostId oldId = state.getRemoteHostId();
state.introduced(addr, ip, port); state.introduced(addr, ip, port);
_outboundStates.remove(oldId); _outboundStates.remove(oldId);

View File

@@ -51,9 +51,8 @@ class OutboundMessageFragments {
*/ */
private boolean _isWaiting; private boolean _isWaiting;
private boolean _alive; private volatile boolean _alive;
private final PacketBuilder _builder; private final PacketBuilder _builder;
private long _lastCycleTime = System.currentTimeMillis();
/** if we can handle more messages explicitly, set this to true */ /** if we can handle more messages explicitly, set this to true */
// private boolean _allowExcess; // LINT not used?? // private boolean _allowExcess; // LINT not used??
@@ -205,8 +204,6 @@ class OutboundMessageFragments {
if (added) { if (added) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Add a new message to a new peer " + peer.getRemotePeer().toBase64()); _log.debug("Add a new message to a new peer " + peer.getRemotePeer().toBase64());
if (wasEmpty)
_lastCycleTime = System.currentTimeMillis();
} else { } else {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Add a new message to an existing peer " + peer.getRemotePeer().toBase64()); _log.debug("Add a new message to an existing peer " + peer.getRemotePeer().toBase64());

View File

@@ -28,7 +28,7 @@ class PacketHandler {
private final InboundMessageFragments _inbound; private final InboundMessageFragments _inbound;
private final PeerTestManager _testManager; private final PeerTestManager _testManager;
private final IntroductionManager _introManager; private final IntroductionManager _introManager;
private boolean _keepReading; private volatile boolean _keepReading;
private final Handler[] _handlers; private final Handler[] _handlers;
private static final int MIN_NUM_HANDLERS = 2; // unless < 32MB private static final int MIN_NUM_HANDLERS = 2; // unless < 32MB
@@ -73,11 +73,11 @@ class PacketHandler {
_context.statManager().createRateStat("udp.droppedInvalidEstablish.new", "How old the packet we dropped due to invalidity (even though we do not have any active establishment with the peer) was", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.droppedInvalidEstablish.new", "How old the packet we dropped due to invalidity (even though we do not have any active establishment with the peer) was", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.droppedInvalidInboundEstablish", "How old the packet we dropped due to invalidity (inbound establishment, bad key) was", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.droppedInvalidInboundEstablish", "How old the packet we dropped due to invalidity (inbound establishment, bad key) was", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.droppedInvalidSkew", "How skewed the packet we dropped due to invalidity (valid except bad skew) was", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.droppedInvalidSkew", "How skewed the packet we dropped due to invalidity (valid except bad skew) was", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.packetDequeueTime", "How long it takes the UDPReader to pull a packet off the inbound packet queue (when its slow)", "udp", UDPTransport.RATES); //_context.statManager().createRateStat("udp.packetDequeueTime", "How long it takes the UDPReader to pull a packet off the inbound packet queue (when its slow)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.packetVerifyTime", "How long it takes the PacketHandler to verify a data packet after dequeueing (period is dequeue time)", "udp", UDPTransport.RATES); //_context.statManager().createRateStat("udp.packetVerifyTime", "How long it takes the PacketHandler to verify a data packet after dequeueing (period is dequeue time)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.packetVerifyTimeSlow", "How long it takes the PacketHandler to verify a data packet after dequeueing when its slow (period is dequeue time)", "udp", UDPTransport.RATES); //_context.statManager().createRateStat("udp.packetVerifyTimeSlow", "How long it takes the PacketHandler to verify a data packet after dequeueing when its slow (period is dequeue time)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.packetValidateMultipleCount", "How many times we validate a packet, if done more than once (period = afterValidate-enqueue)", "udp", UDPTransport.RATES); //_context.statManager().createRateStat("udp.packetValidateMultipleCount", "How many times we validate a packet, if done more than once (period = afterValidate-enqueue)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.packetNoValidationLifetime", "How long packets that are never validated are around for", "udp", UDPTransport.RATES); //_context.statManager().createRateStat("udp.packetNoValidationLifetime", "How long packets that are never validated are around for", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.receivePacketSize.sessionRequest", "Packet size of the given inbound packet type (period is the packet's lifetime)", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.receivePacketSize.sessionRequest", "Packet size of the given inbound packet type (period is the packet's lifetime)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.receivePacketSize.sessionConfirmed", "Packet size of the given inbound packet type (period is the packet's lifetime)", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.receivePacketSize.sessionConfirmed", "Packet size of the given inbound packet type (period is the packet's lifetime)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.receivePacketSize.sessionCreated", "Packet size of the given inbound packet type (period is the packet's lifetime)", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.receivePacketSize.sessionCreated", "Packet size of the given inbound packet type (period is the packet's lifetime)", "udp", UDPTransport.RATES);
@@ -158,7 +158,7 @@ class PacketHandler {
_log.error("Crazy error handling a packet: " + packet, e); _log.error("Crazy error handling a packet: " + packet, e);
} }
long handleTime = _context.clock().now() - handleStart; long handleTime = _context.clock().now() - handleStart;
packet.afterHandling(); //packet.afterHandling();
_context.statManager().addRateData("udp.handleTime", handleTime, packet.getLifetime()); _context.statManager().addRateData("udp.handleTime", handleTime, packet.getLifetime());
_context.statManager().addRateData("udp.queueTime", queueTime, packet.getLifetime()); _context.statManager().addRateData("udp.queueTime", queueTime, packet.getLifetime());
_state = 8; _state = 8;
@@ -166,6 +166,7 @@ class PacketHandler {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Done receiving the packet " + packet); _log.info("Done receiving the packet " + packet);
/********
if (handleTime > 1000) { if (handleTime > 1000) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Took " + handleTime + " to process the packet " _log.warn("Took " + handleTime + " to process the packet "
@@ -198,6 +199,7 @@ class PacketHandler {
_context.statManager().addRateData("udp.packetValidateMultipleCount", validateCount, timeToValidate); _context.statManager().addRateData("udp.packetValidateMultipleCount", validateCount, timeToValidate);
else if (validateCount <= 0) else if (validateCount <= 0)
_context.statManager().addRateData("udp.packetNoValidationLifetime", packet.getLifetime(), 0); _context.statManager().addRateData("udp.packetNoValidationLifetime", packet.getLifetime(), 0);
********/
// back to the cache with thee! // back to the cache with thee!
packet.release(); packet.release();
@@ -211,7 +213,6 @@ class PacketHandler {
* Find the state and call the correct receivePacket() variant * Find the state and call the correct receivePacket() variant
*/ */
private void handlePacket(UDPPacketReader reader, UDPPacket packet) { private void handlePacket(UDPPacketReader reader, UDPPacket packet) {
if (packet == null) return;
_state = 10; _state = 10;
@@ -538,7 +539,7 @@ class PacketHandler {
msg.append(": ").append(dr.toString()); msg.append(": ").append(dr.toString());
_log.info(msg.toString()); _log.info(msg.toString());
} }
packet.beforeReceiveFragments(); //packet.beforeReceiveFragments();
_inbound.receiveData(state, dr); _inbound.receiveData(state, dr);
_context.statManager().addRateData("udp.receivePacketSize.dataKnown", packet.getPacket().getLength(), packet.getLifetime()); _context.statManager().addRateData("udp.receivePacketSize.dataKnown", packet.getPacket().getLength(), packet.getLifetime());
if (dr.readFragmentCount() <= 0) if (dr.readFragmentCount() <= 0)

View File

@@ -13,6 +13,7 @@ import net.i2p.data.RouterInfo;
import net.i2p.data.SessionKey; import net.i2p.data.SessionKey;
import net.i2p.router.CommSystemFacade; import net.i2p.router.CommSystemFacade;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
import net.i2p.util.Addresses;
import net.i2p.util.Log; import net.i2p.util.Log;
import net.i2p.util.SimpleScheduler; import net.i2p.util.SimpleScheduler;
import net.i2p.util.SimpleTimer; import net.i2p.util.SimpleTimer;
@@ -446,7 +447,7 @@ class PeerTestManager {
// initiated test // initiated test
} else { } else {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("We are charlie, as the testIP/port is " + RemoteHostId.toString(testIP) + ":" + testPort + " and the state is unknown for " + nonce); _log.debug("We are charlie, as the testIP/port is " + Addresses.toString(testIP, testPort) + " and the state is unknown for " + nonce);
// we are charlie, since alice never sends us her IP and port, only bob does (and, // we are charlie, since alice never sends us her IP and port, only bob does (and,
// erm, we're not alice, since it isn't our nonce) // erm, we're not alice, since it isn't our nonce)
receiveFromBobAsCharlie(from, testInfo, nonce, null); receiveFromBobAsCharlie(from, testInfo, nonce, null);

View File

@@ -2,33 +2,49 @@ package net.i2p.router.transport.udp;
import net.i2p.data.Base64; import net.i2p.data.Base64;
import net.i2p.data.DataHelper; import net.i2p.data.DataHelper;
import net.i2p.util.Addresses;
/** /**
* Unique ID for a peer - its IP + port, all bundled into a tidy obj. * Unique ID for a peer - its IP + port, all bundled into a tidy obj.
* If the remote peer is not reachabe through an IP+port, this contains * If the remote peer is not reachable through an IP+port, this contains
* the hash of their identity. * the hash of their identity.
* *
*/ */
final class RemoteHostId { final class RemoteHostId {
private byte _ip[]; private final byte _ip[];
private int _port; private final int _port;
private byte _peerHash[]; private final byte _peerHash[];
private final int _hashCode;
/** direct */
public RemoteHostId(byte ip[], int port) { public RemoteHostId(byte ip[], int port) {
this(ip, port, null);
}
/** indirect */
public RemoteHostId(byte peerHash[]) {
this(null, 0, peerHash);
}
private RemoteHostId(byte ip[], int port, byte peerHash[]) {
_ip = ip; _ip = ip;
_port = port; _port = port;
}
public RemoteHostId(byte peerHash[]) {
_peerHash = peerHash; _peerHash = peerHash;
_hashCode = DataHelper.hashCode(_ip) ^ DataHelper.hashCode(_peerHash) ^ _port;
} }
/** @return null if indirect */
public byte[] getIP() { return _ip; } public byte[] getIP() { return _ip; }
/** @return 0 if indirect */
public int getPort() { return _port; } public int getPort() { return _port; }
/** @return null if direct */
public byte[] getPeerHash() { return _peerHash; } public byte[] getPeerHash() { return _peerHash; }
@Override @Override
public int hashCode() { public int hashCode() {
return DataHelper.hashCode(_ip) ^ DataHelper.hashCode(_peerHash) ^ _port; return _hashCode;
} }
@Override @Override
@@ -38,29 +54,22 @@ final class RemoteHostId {
if (!(obj instanceof RemoteHostId)) if (!(obj instanceof RemoteHostId))
return false; return false;
RemoteHostId id = (RemoteHostId)obj; RemoteHostId id = (RemoteHostId)obj;
return (_port == id.getPort()) && DataHelper.eq(_ip, id.getIP()) && DataHelper.eq(_peerHash, id.getPeerHash()); return (_port == id._port) && DataHelper.eq(_ip, id._ip) && DataHelper.eq(_peerHash, id._peerHash);
} }
@Override @Override
public String toString() { return toString(true); } public String toString() { return toString(true); }
public String toString(boolean includePort) {
private String toString(boolean includePort) {
if (_ip != null) { if (_ip != null) {
if (includePort) if (includePort)
return toString(_ip) + ':' + _port; return Addresses.toString(_ip, _port);
else else
return toString(_ip); return Addresses.toString(_ip);
} else { } else {
return Base64.encode(_peerHash); return Base64.encode(_peerHash);
} }
} }
public static String toString(byte ip[]) {
StringBuilder buf = new StringBuilder(ip.length+5);
for (int i = 0; i < ip.length; i++) {
buf.append(ip[i]&0xFF);
if (i + 1 < ip.length)
buf.append('.');
}
return buf.toString();
}
public String toHostString() { return toString(false); } public String toHostString() { return toString(false); }
} }

View File

@@ -12,14 +12,14 @@ import net.i2p.util.Log;
* UDPReceiver * UDPReceiver
*/ */
class UDPEndpoint { class UDPEndpoint {
private RouterContext _context; private final RouterContext _context;
private Log _log; private final Log _log;
private int _listenPort; private int _listenPort;
private UDPTransport _transport; private final UDPTransport _transport;
private UDPSender _sender; private UDPSender _sender;
private UDPReceiver _receiver; private UDPReceiver _receiver;
private DatagramSocket _socket; private DatagramSocket _socket;
private InetAddress _bindAddress; private final InetAddress _bindAddress;
/** /**
* @param listenPort -1 or the requested port, may not be honored * @param listenPort -1 or the requested port, may not be honored
@@ -34,7 +34,7 @@ class UDPEndpoint {
} }
/** caller should call getListenPort() after this to get the actual bound port and determine success */ /** caller should call getListenPort() after this to get the actual bound port and determine success */
public void startup() { public synchronized void startup() {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Starting up the UDP endpoint"); _log.debug("Starting up the UDP endpoint");
shutdown(); shutdown();
@@ -49,7 +49,7 @@ class UDPEndpoint {
_receiver.startup(); _receiver.startup();
} }
public void shutdown() { public synchronized void shutdown() {
if (_sender != null) { if (_sender != null) {
_sender.shutdown(); _sender.shutdown();
_receiver.shutdown(); _receiver.shutdown();

View File

@@ -32,13 +32,14 @@ class UDPPacket {
private volatile Exception _acquiredBy; private volatile Exception _acquiredBy;
private long _enqueueTime; private long _enqueueTime;
private long _receivedTime; private long _receivedTime;
private long _beforeValidate; //private long _beforeValidate;
private long _afterValidate; //private long _afterValidate;
private long _beforeReceiveFragments; //private long _beforeReceiveFragments;
private long _afterHandlingTime; //private long _afterHandlingTime;
private int _validateCount; private int _validateCount;
// private boolean _isInbound; // private boolean _isInbound;
// Warning - this mixes contexts in a multi-router JVM
private static final Queue<UDPPacket> _packetCache; private static final Queue<UDPPacket> _packetCache;
private static final boolean CACHE = true; private static final boolean CACHE = true;
private static final int CACHE_SIZE = 64; private static final int CACHE_SIZE = 64;
@@ -92,7 +93,7 @@ class UDPPacket {
private static final int MAX_VALIDATE_SIZE = MAX_PACKET_SIZE; private static final int MAX_VALIDATE_SIZE = MAX_PACKET_SIZE;
private UDPPacket(I2PAppContext ctx) { private UDPPacket(I2PAppContext ctx) {
ctx.statManager().createRateStat("udp.fetchRemoteSlow", "How long it takes to grab the remote ip info", "udp", UDPTransport.RATES); //ctx.statManager().createRateStat("udp.fetchRemoteSlow", "How long it takes to grab the remote ip info", "udp", UDPTransport.RATES);
// the data buffer is clobbered on init(..), but we need it to bootstrap // the data buffer is clobbered on init(..), but we need it to bootstrap
_data = new byte[MAX_PACKET_SIZE]; _data = new byte[MAX_PACKET_SIZE];
_packet = new DatagramPacket(_data, MAX_PACKET_SIZE); _packet = new DatagramPacket(_data, MAX_PACKET_SIZE);
@@ -134,7 +135,7 @@ class UDPPacket {
public short getPriority() { verifyNotReleased(); return _priority; } public short getPriority() { verifyNotReleased(); return _priority; }
public long getExpiration() { verifyNotReleased(); return _expiration; } public long getExpiration() { verifyNotReleased(); return _expiration; }
public long getBegin() { verifyNotReleased(); return _initializeTime; } public long getBegin() { verifyNotReleased(); return _initializeTime; }
public long getLifetime() { verifyNotReleased(); return _context.clock().now() - _initializeTime; } public long getLifetime() { /** verifyNotReleased(); */ return _context.clock().now() - _initializeTime; }
public void resetBegin() { _initializeTime = _context.clock().now(); } public void resetBegin() { _initializeTime = _context.clock().now(); }
/** flag this packet as a particular type for accounting purposes */ /** flag this packet as a particular type for accounting purposes */
public void markType(int type) { verifyNotReleased(); _markedType = type; } public void markType(int type) { verifyNotReleased(); _markedType = type; }
@@ -156,14 +157,14 @@ class UDPPacket {
RemoteHostId getRemoteHost() { RemoteHostId getRemoteHost() {
if (_remoteHost == null) { if (_remoteHost == null) {
long before = System.currentTimeMillis(); //long before = System.currentTimeMillis();
InetAddress addr = _packet.getAddress(); InetAddress addr = _packet.getAddress();
byte ip[] = addr.getAddress(); byte ip[] = addr.getAddress();
int port = _packet.getPort(); int port = _packet.getPort();
_remoteHost = new RemoteHostId(ip, port); _remoteHost = new RemoteHostId(ip, port);
long timeToFetch = System.currentTimeMillis() - before; //long timeToFetch = System.currentTimeMillis() - before;
if (timeToFetch > 50) //if (timeToFetch > 50)
_context.statManager().addRateData("udp.fetchRemoteSlow", timeToFetch, getLifetime()); // _context.statManager().addRateData("udp.fetchRemoteSlow", timeToFetch, getLifetime());
} }
return _remoteHost; return _remoteHost;
} }
@@ -175,7 +176,7 @@ class UDPPacket {
*/ */
public boolean validate(SessionKey macKey) { public boolean validate(SessionKey macKey) {
verifyNotReleased(); verifyNotReleased();
_beforeValidate = _context.clock().now(); //_beforeValidate = _context.clock().now();
boolean eq = false; boolean eq = false;
Arrays.fill(_validateBuf, (byte)0); Arrays.fill(_validateBuf, (byte)0);
@@ -216,7 +217,7 @@ class UDPPacket {
// _log.warn("Payload length is " + payloadLength); // _log.warn("Payload length is " + payloadLength);
} }
_afterValidate = _context.clock().now(); //_afterValidate = _context.clock().now();
_validateCount++; _validateCount++;
return eq; return eq;
} }
@@ -238,30 +239,35 @@ class UDPPacket {
void enqueue() { _enqueueTime = _context.clock().now(); } void enqueue() { _enqueueTime = _context.clock().now(); }
/** a packet handler has pulled it off the inbound queue */ /** a packet handler has pulled it off the inbound queue */
void received() { _receivedTime = _context.clock().now(); } void received() { _receivedTime = _context.clock().now(); }
/** a packet handler has decrypted and verified the packet and is about to parse out the good bits */ /** a packet handler has decrypted and verified the packet and is about to parse out the good bits */
void beforeReceiveFragments() { _beforeReceiveFragments = _context.clock().now(); } //void beforeReceiveFragments() { _beforeReceiveFragments = _context.clock().now(); }
/** a packet handler has finished parsing out the good bits */ /** a packet handler has finished parsing out the good bits */
void afterHandling() { _afterHandlingTime = _context.clock().now(); } //void afterHandling() { _afterHandlingTime = _context.clock().now(); }
/** the UDPReceiver has tossed it onto the inbound queue */ /** the UDPReceiver has tossed it onto the inbound queue */
long getTimeSinceEnqueue() { return (_enqueueTime > 0 ? _context.clock().now() - _enqueueTime : 0); } //long getTimeSinceEnqueue() { return (_enqueueTime > 0 ? _context.clock().now() - _enqueueTime : 0); }
/** a packet handler has pulled it off the inbound queue */ /** a packet handler has pulled it off the inbound queue */
long getTimeSinceReceived() { return (_receivedTime > 0 ? _context.clock().now() - _receivedTime : 0); } long getTimeSinceReceived() { return (_receivedTime > 0 ? _context.clock().now() - _receivedTime : 0); }
/** a packet handler has decrypted and verified the packet and is about to parse out the good bits */ /** a packet handler has decrypted and verified the packet and is about to parse out the good bits */
long getTimeSinceReceiveFragments() { return (_beforeReceiveFragments > 0 ? _context.clock().now() - _beforeReceiveFragments : 0); } //long getTimeSinceReceiveFragments() { return (_beforeReceiveFragments > 0 ? _context.clock().now() - _beforeReceiveFragments : 0); }
/** a packet handler has finished parsing out the good bits */ /** a packet handler has finished parsing out the good bits */
long getTimeSinceHandling() { return (_afterHandlingTime > 0 ? _context.clock().now() - _afterHandlingTime : 0); } //long getTimeSinceHandling() { return (_afterHandlingTime > 0 ? _context.clock().now() - _afterHandlingTime : 0); }
// Following 5: All used only for stats in PacketHandler, commented out
/** when it was added to the endpoint's receive queue */ /** when it was added to the endpoint's receive queue */
long getEnqueueTime() { return _enqueueTime; } //long getEnqueueTime() { return _enqueueTime; }
/** when it was pulled off the endpoint receive queue */ /** when it was pulled off the endpoint receive queue */
long getReceivedTime() { return _receivedTime; } //long getReceivedTime() { return _receivedTime; }
/** when we began validate() */ /** when we began validate() */
long getBeforeValidate() { return _beforeValidate; } //long getBeforeValidate() { return _beforeValidate; }
/** when we finished validate() */ /** when we finished validate() */
long getAfterValidate() { return _afterValidate; } //long getAfterValidate() { return _afterValidate; }
/** how many times we tried to validate the packet */ /** how many times we tried to validate the packet */
int getValidateCount() { return _validateCount; } //int getValidateCount() { return _validateCount; }
@Override @Override
public String toString() { public String toString() {
@@ -278,8 +284,8 @@ class UDPPacket {
buf.append(" sinceEnqueued=").append((_enqueueTime > 0 ? _context.clock().now()-_enqueueTime : -1)); buf.append(" sinceEnqueued=").append((_enqueueTime > 0 ? _context.clock().now()-_enqueueTime : -1));
buf.append(" sinceReceived=").append((_receivedTime > 0 ? _context.clock().now()-_receivedTime : -1)); buf.append(" sinceReceived=").append((_receivedTime > 0 ? _context.clock().now()-_receivedTime : -1));
buf.append(" beforeReceiveFragments=").append((_beforeReceiveFragments > 0 ? _context.clock().now()-_beforeReceiveFragments : -1)); //buf.append(" beforeReceiveFragments=").append((_beforeReceiveFragments > 0 ? _context.clock().now()-_beforeReceiveFragments : -1));
buf.append(" sinceHandled=").append((_afterHandlingTime > 0 ? _context.clock().now()-_afterHandlingTime : -1)); //buf.append(" sinceHandled=").append((_afterHandlingTime > 0 ? _context.clock().now()-_afterHandlingTime : -1));
//buf.append("\ndata=").append(Base64.encode(_packet.getData(), _packet.getOffset(), _packet.getLength())); //buf.append("\ndata=").append(Base64.encode(_packet.getData(), _packet.getOffset(), _packet.getLength()));
return buf.toString(); return buf.toString();
} }
@@ -316,13 +322,22 @@ class UDPPacket {
_packetCache.offer(this); _packetCache.offer(this);
} }
/**
* Call at shutdown/startup to not hold ctx refs
* @since 0.9.2
*/
public static void clearCache() {
if (CACHE)
_packetCache.clear();
}
private void verifyNotReleased() { private void verifyNotReleased() {
if (CACHE) return; if (!CACHE) return;
if (_released) { if (_released) {
Log log = I2PAppContext.getGlobalContext().logManager().getLog(UDPPacket.class); Log log = _context.logManager().getLog(UDPPacket.class);
log.log(Log.CRIT, "Already released. current stack trace is:", new Exception()); log.error("Already released", new Exception());
log.log(Log.CRIT, "Released by: ", _releasedBy); //log.log(Log.CRIT, "Released by: ", _releasedBy);
log.log(Log.CRIT, "Acquired by: ", _acquiredBy); //log.log(Log.CRIT, "Acquired by: ", _acquiredBy);
} }
} }
} }

View File

@@ -22,10 +22,10 @@ import net.i2p.util.SimpleTimer;
class UDPReceiver { class UDPReceiver {
private final RouterContext _context; private final RouterContext _context;
private final Log _log; private final Log _log;
private DatagramSocket _socket; private final DatagramSocket _socket;
private String _name; private String _name;
private final BlockingQueue<UDPPacket> _inboundQueue; private final BlockingQueue<UDPPacket> _inboundQueue;
private boolean _keepRunning; private volatile boolean _keepRunning;
private final Runner _runner; private final Runner _runner;
private final UDPTransport _transport; private final UDPTransport _transport;
private static int __id; private static int __id;
@@ -90,9 +90,11 @@ class UDPReceiver {
* NOTE: this closes the old socket so that blocking calls unblock! * NOTE: this closes the old socket so that blocking calls unblock!
* *
*/ */
/*********
public DatagramSocket updateListeningPort(DatagramSocket socket, int newPort) { public DatagramSocket updateListeningPort(DatagramSocket socket, int newPort) {
return _runner.updateListeningPort(socket, newPort); return _runner.updateListeningPort(socket, newPort);
} }
**********/
/** if a packet been sitting in the queue for a full second (meaning the handlers are overwhelmed), drop subsequent packets */ /** if a packet been sitting in the queue for a full second (meaning the handlers are overwhelmed), drop subsequent packets */
private static final long MAX_QUEUE_PERIOD = 2*1000; private static final long MAX_QUEUE_PERIOD = 2*1000;
@@ -215,26 +217,27 @@ class UDPReceiver {
} }
private class Runner implements Runnable { private class Runner implements Runnable {
private boolean _socketChanged; //private volatile boolean _socketChanged;
public void run() { public void run() {
_socketChanged = false; //_socketChanged = false;
FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().createRequest(); FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().createRequest();
while (_keepRunning) { while (_keepRunning) {
if (_socketChanged) { //if (_socketChanged) {
Thread.currentThread().setName(_name + "." + _id); // Thread.currentThread().setName(_name + "." + _id);
_socketChanged = false; // _socketChanged = false;
} //}
UDPPacket packet = UDPPacket.acquire(_context, true); UDPPacket packet = UDPPacket.acquire(_context, true);
// block before we read... // block before we read...
if (_log.shouldLog(Log.DEBUG)) //if (_log.shouldLog(Log.DEBUG))
_log.debug("Before throttling receive"); // _log.debug("Before throttling receive");
while (!_context.throttle().acceptNetworkMessage()) while (!_context.throttle().acceptNetworkMessage())
try { Thread.sleep(10); } catch (InterruptedException ie) {} try { Thread.sleep(10); } catch (InterruptedException ie) {}
try { try {
if (_log.shouldLog(Log.INFO)) //if (_log.shouldLog(Log.INFO))
_log.info("Before blocking socket.receive on " + System.identityHashCode(packet)); // _log.info("Before blocking socket.receive on " + System.identityHashCode(packet));
synchronized (Runner.this) { synchronized (Runner.this) {
_socket.receive(packet.getPacket()); _socket.receive(packet.getPacket());
} }
@@ -263,15 +266,16 @@ class UDPReceiver {
// nat hole punch packets are 0 bytes // nat hole punch packets are 0 bytes
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Received a 0 byte udp packet from " + packet.getPacket().getAddress() + ":" + packet.getPacket().getPort()); _log.info("Received a 0 byte udp packet from " + packet.getPacket().getAddress() + ":" + packet.getPacket().getPort());
packet.release();
} }
} catch (IOException ioe) { } catch (IOException ioe) {
if (_socketChanged) { //if (_socketChanged) {
if (_log.shouldLog(Log.INFO)) // if (_log.shouldLog(Log.INFO))
_log.info("Changing ports..."); // _log.info("Changing ports...");
} else { //} else {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Error receiving", ioe); _log.warn("Error receiving", ioe);
} //}
packet.release(); packet.release();
} }
} }
@@ -279,6 +283,7 @@ class UDPReceiver {
_log.debug("Stop receiving..."); _log.debug("Stop receiving...");
} }
/******
public DatagramSocket updateListeningPort(DatagramSocket socket, int newPort) { public DatagramSocket updateListeningPort(DatagramSocket socket, int newPort) {
_name = "UDPReceive on " + newPort; _name = "UDPReceive on " + newPort;
DatagramSocket old = null; DatagramSocket old = null;
@@ -291,6 +296,6 @@ class UDPReceiver {
old.close(); old.close();
return old; return old;
} }
*****/
} }
} }

View File

@@ -18,10 +18,10 @@ import net.i2p.util.Log;
class UDPSender { class UDPSender {
private final RouterContext _context; private final RouterContext _context;
private final Log _log; private final Log _log;
private DatagramSocket _socket; private final DatagramSocket _socket;
private String _name; private String _name;
private final BlockingQueue<UDPPacket> _outboundQueue; private final BlockingQueue<UDPPacket> _outboundQueue;
private boolean _keepRunning; private volatile boolean _keepRunning;
private final Runner _runner; private final Runner _runner;
private static final int TYPE_POISON = 99999; private static final int TYPE_POISON = 99999;
@@ -81,9 +81,11 @@ class UDPSender {
_outboundQueue.clear(); _outboundQueue.clear();
} }
/*********
public DatagramSocket updateListeningPort(DatagramSocket socket, int newPort) { public DatagramSocket updateListeningPort(DatagramSocket socket, int newPort) {
return _runner.updateListeningPort(socket, newPort); return _runner.updateListeningPort(socket, newPort);
} }
**********/
/** /**
@@ -172,17 +174,18 @@ class UDPSender {
} }
private class Runner implements Runnable { private class Runner implements Runnable {
private boolean _socketChanged; //private volatile boolean _socketChanged;
FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().createRequest(); FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().createRequest();
public void run() { public void run() {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Running the UDP sender"); _log.debug("Running the UDP sender");
_socketChanged = false; //_socketChanged = false;
while (_keepRunning) { while (_keepRunning) {
if (_socketChanged) { //if (_socketChanged) {
Thread.currentThread().setName(_name); // Thread.currentThread().setName(_name);
_socketChanged = false; // _socketChanged = false;
} //}
UDPPacket packet = getNextPacket(); UDPPacket packet = getNextPacket();
if (packet != null) { if (packet != null) {
@@ -200,11 +203,11 @@ class UDPSender {
long afterBW = _context.clock().now(); long afterBW = _context.clock().now();
if (_log.shouldLog(Log.DEBUG)) { //if (_log.shouldLog(Log.DEBUG)) {
//if (len > 128) //if (len > 128)
// len = 128; // len = 128;
//_log.debug("Sending packet: (size="+size + "/"+size2 +")\nraw: " + Base64.encode(packet.getPacket().getData(), 0, size)); //_log.debug("Sending packet: (size="+size + "/"+size2 +")\nraw: " + Base64.encode(packet.getPacket().getData(), 0, size));
} //}
if (packet.getMessageType() >= PacketBuilder.TYPE_FIRST) if (packet.getMessageType() >= PacketBuilder.TYPE_FIRST)
_context.statManager().addRateData("udp.sendPacketSize." + packet.getMessageType(), size, packet.getFragmentCount()); _context.statManager().addRateData("udp.sendPacketSize." + packet.getMessageType(), size, packet.getFragmentCount());
@@ -216,11 +219,11 @@ class UDPSender {
// synchronization lets us update safely // synchronization lets us update safely
//_log.debug("Break out datagram for " + packet); //_log.debug("Break out datagram for " + packet);
DatagramPacket dp = packet.getPacket(); DatagramPacket dp = packet.getPacket();
if (_log.shouldLog(Log.DEBUG)) //if (_log.shouldLog(Log.DEBUG))
_log.debug("Just before socket.send of " + packet); // _log.debug("Just before socket.send of " + packet);
_socket.send(dp); _socket.send(dp);
if (_log.shouldLog(Log.DEBUG)) //if (_log.shouldLog(Log.DEBUG))
_log.debug("Just after socket.send of " + packet); // _log.debug("Just after socket.send of " + packet);
} }
long sendTime = _context.clock().now() - before; long sendTime = _context.clock().now() - before;
_context.statManager().addRateData("udp.socketSendTime", sendTime, packet.getLifetime()); _context.statManager().addRateData("udp.socketSendTime", sendTime, packet.getLifetime());
@@ -251,8 +254,10 @@ class UDPSender {
private UDPPacket getNextPacket() { private UDPPacket getNextPacket() {
UDPPacket packet = null; UDPPacket packet = null;
while ( (_keepRunning) && (packet == null || packet.getLifetime() > MAX_HEAD_LIFETIME) ) { while ( (_keepRunning) && (packet == null || packet.getLifetime() > MAX_HEAD_LIFETIME) ) {
if (packet != null) if (packet != null) {
_context.statManager().addRateData("udp.sendQueueTrimmed", 1, 0); _context.statManager().addRateData("udp.sendQueueTrimmed", 1, 0);
packet.release();
}
try { try {
packet = _outboundQueue.take(); packet = _outboundQueue.take();
} catch (InterruptedException ie) {} } catch (InterruptedException ie) {}
@@ -261,6 +266,8 @@ class UDPSender {
} }
return packet; return packet;
} }
/******
public DatagramSocket updateListeningPort(DatagramSocket socket, int newPort) { public DatagramSocket updateListeningPort(DatagramSocket socket, int newPort) {
_name = "UDPSend on " + newPort; _name = "UDPSend on " + newPort;
DatagramSocket old = null; DatagramSocket old = null;
@@ -271,5 +278,6 @@ class UDPSender {
_socketChanged = true; _socketChanged = true;
return old; return old;
} }
*****/
} }
} }

View File

@@ -247,6 +247,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if (_flooder != null) if (_flooder != null)
_flooder.shutdown(); _flooder.shutdown();
_introManager.reset(); _introManager.reset();
UDPPacket.clearCache();
_introKey = new SessionKey(new byte[SessionKey.KEYSIZE_BYTES]); _introKey = new SessionKey(new byte[SessionKey.KEYSIZE_BYTES]);
System.arraycopy(_context.routerHash().getData(), 0, _introKey.getData(), 0, SessionKey.KEYSIZE_BYTES); System.arraycopy(_context.routerHash().getData(), 0, _introKey.getData(), 0, SessionKey.KEYSIZE_BYTES);
@@ -379,6 +380,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_peersByIdent.clear(); _peersByIdent.clear();
_dropList.clear(); _dropList.clear();
_introManager.reset(); _introManager.reset();
UDPPacket.clearCache();
} }
/** /**
@@ -421,9 +423,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
*/ */
@Override @Override
public void externalAddressReceived(String source, byte[] ip, int port) { public void externalAddressReceived(String source, byte[] ip, int port) {
String s = RemoteHostId.toString(ip);
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Received address: " + s + " port: " + port + " from: " + source); _log.warn("Received address: " + Addresses.toString(ip, port) + " from: " + source);
if (explicitAddressSpecified()) if (explicitAddressSpecified())
return; return;
String sources = _context.getProperty(PROP_SOURCES, DEFAULT_SOURCES); String sources = _context.getProperty(PROP_SOURCES, DEFAULT_SOURCES);