- Implement destroy message sending
      (receiving was implemented in 0.8.1)
    - More cleanups at shutdown
    - Log tweaks
This commit is contained in:
zzz
2011-07-24 18:57:51 +00:00
parent 89d32e3bef
commit 734444e183
7 changed files with 69 additions and 14 deletions

View File

@@ -331,6 +331,7 @@ class EstablishmentManager {
/**
* Got a SessionDestroy on an established conn
* @since 0.8.1
*/
void receiveSessionDestroy(RemoteHostId from, PeerState state) {
if (_log.shouldLog(Log.DEBUG))
@@ -340,6 +341,7 @@ class EstablishmentManager {
/**
* Got a SessionDestroy during outbound establish
* @since 0.8.1
*/
void receiveSessionDestroy(RemoteHostId from, OutboundEstablishState state) {
if (_log.shouldLog(Log.DEBUG))
@@ -351,6 +353,7 @@ class EstablishmentManager {
/**
* Got a SessionDestroy - maybe after an inbound establish
* @since 0.8.1
*/
void receiveSessionDestroy(RemoteHostId from) {
if (_log.shouldLog(Log.DEBUG))

View File

@@ -656,6 +656,8 @@ class PacketBuilder {
/**
* Build a destroy packet, which contains a header but no body.
* Session must be established or this will NPE in authenticate().
* Unused until 0.8.9.
*
* @since 0.8.1
*/

View File

@@ -372,8 +372,8 @@ class PacketHandler {
if (state.getMACKey() != null) {
isValid = packet.validate(state.getMACKey());
if (isValid) {
if (_log.shouldLog(Log.WARN))
_log.warn("Valid introduction packet received for inbound con: " + packet);
if (_log.shouldLog(Log.INFO))
_log.info("Valid introduction packet received for inbound con: " + packet);
_state = 32;
packet.decrypt(state.getCipherKey());
@@ -418,8 +418,8 @@ class PacketHandler {
_state = 36;
isValid = packet.validate(state.getMACKey());
if (isValid) {
if (_log.shouldLog(Log.WARN))
_log.warn("Valid introduction packet received for outbound established con: " + packet);
if (_log.shouldLog(Log.INFO))
_log.info("Valid introduction packet received for outbound established con: " + packet);
_state = 37;
packet.decrypt(state.getCipherKey());
@@ -432,8 +432,8 @@ class PacketHandler {
// keys not yet exchanged, lets try it with the peer's intro key
isValid = packet.validate(state.getIntroKey());
if (isValid) {
if (_log.shouldLog(Log.WARN))
_log.warn("Valid introduction packet received for outbound established con with old intro key: " + packet);
if (_log.shouldLog(Log.INFO))
_log.info("Valid introduction packet received for outbound established con with old intro key: " + packet);
_state = 39;
packet.decrypt(state.getIntroKey());
handlePacket(reader, packet, null, state, null);

View File

@@ -1380,14 +1380,14 @@ class PeerState {
_context.statManager().addRateData("udp.sendRejected", state.getPushCount(), state.getLifetime());
//if (state.getMessage() != null)
// state.getMessage().timestamp("send rejected, available=" + getSendWindowBytesRemaining());
if (_log.shouldLog(Log.WARN))
_log.warn("Allocation of " + size + " rejected w/ wsize=" + getSendWindowBytes()
if (_log.shouldLog(Log.INFO))
_log.info("Allocation of " + size + " rejected w/ wsize=" + getSendWindowBytes()
+ " available=" + getSendWindowBytesRemaining()
+ " for message " + state.getMessageId() + ": " + state);
state.setNextSendTime(now + (ACKSender.ACK_FREQUENCY / 2) +
_context.random().nextInt(ACKSender.ACK_FREQUENCY)); //(now + 1024) & ~SECOND_MASK);
if (_log.shouldLog(Log.WARN))
_log.warn("Retransmit after choke for next send time in " + (state.getNextSendTime()-now) + "ms");
if (_log.shouldLog(Log.INFO))
_log.info("Retransmit after choke for next send time in " + (state.getNextSendTime()-now) + "ms");
//_throttle.choke(peer.getRemotePeer());
//if (state.getMessage() != null)

View File

@@ -137,7 +137,7 @@ class UDPEndpoint {
* Add the packet to the outobund queue to be sent ASAP (as allowed by
* the bandwidth limiter)
*
* @return number of packets in the send queue
* @return ZERO (used to be number of packets in the queue)
*/
public int send(UDPPacket packet) {
if (_sender == null)

View File

@@ -91,7 +91,7 @@ class UDPSender {
* available, if requested, otherwise it returns immediately
*
* @param blockTime how long to block IGNORED
* @return number of packets queued
* @return ZERO (used to be number of packets in the queue)
* @deprecated use add(packet)
*/
public int add(UDPPacket packet, int blockTime) {

View File

@@ -64,6 +64,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
private final IntroductionManager _introManager;
private final ExpirePeerEvent _expireEvent;
private final PeerTestEvent _testEvent;
private final PacketBuilder _destroyBuilder;
private short _reachabilityStatus;
private long _reachabilityStatusLastUpdated;
private long _introducersSelectedOn;
@@ -200,6 +201,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_cachedBid[i] = new SharedBid(BID_VALUES[i]);
}
_destroyBuilder = new PacketBuilder(_context, this);
_fragments = new OutboundMessageFragments(_context, this, _activeThrottle);
_inboundFragments = new InboundMessageFragments(_context, _fragments, this);
if (SHOULD_FLOOD_PEERS)
@@ -337,6 +339,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
}
public void shutdown() {
destroyAll();
if (_endpoint != null)
_endpoint.shutdown();
if (_flooder != null)
@@ -353,6 +356,10 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_inboundFragments.shutdown();
_expireEvent.setIsAlive(false);
_testEvent.setIsAlive(false);
_peersByRemoteHost.clear();
_peersByIdent.clear();
_dropList.clear();
_introManager.reset();
}
/**
@@ -1011,12 +1018,53 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
*/
}
/**
* This sends it directly out, bypassing OutboundMessageFragments
* and the PacketPusher. The only queueing is for the bandwidth limiter.
*
* @return ZERO (used to be number of packets in the queue)
*/
int send(UDPPacket packet) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending packet " + packet);
return _endpoint.send(packet);
}
/**
* Send a session destroy message, bypassing OMF and PacketPusher.
*
* @since 0.8.9
*/
private void sendDestroy(PeerState peer) {
// peer must be fully established
if (peer.getCurrentCipherKey() == null)
return;
UDPPacket pkt = _destroyBuilder.buildSessionDestroyPacket(peer);
if (_log.shouldLog(Log.WARN))
_log.warn("Sending destroy to : " + peer);
send(pkt);
}
/**
* Send a session destroy message to everybody
*
* @since 0.8.9
*/
private void destroyAll() {
int howMany = _peersByIdent.size();
if (_log.shouldLog(Log.WARN))
_log.warn("Sending destroy to : " + howMany + " peers");
for (PeerState peer : _peersByIdent.values()) {
sendDestroy(peer);
}
int toSleep = Math.min(howMany / 3, 750);
if (toSleep > 0) {
try {
Thread.sleep(toSleep);
} catch (InterruptedException ie) {}
}
}
/** minimum active peers to maintain IP detection, etc. */
private static final int MIN_PEERS = 3;
/** minimum peers volunteering to be introducers if we need that */
@@ -2236,8 +2284,10 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
}
}
for (int i = 0; i < _expireBuffer.size(); i++)
dropPeer(_expireBuffer.get(i), false, "idle too long");
for (PeerState peer : _expireBuffer) {
sendDestroy(peer);
dropPeer(peer, false, "idle too long");
}
_expireBuffer.clear();
if (_alive)