forked from I2P_Developers/i2p.i2p
cleanups and javadoc
This commit is contained in:
@@ -14,6 +14,7 @@ import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
@@ -27,14 +28,17 @@ import net.i2p.router.RouterContext;
|
||||
import net.i2p.util.Log;
|
||||
import net.i2p.util.SimpleTimer;
|
||||
|
||||
/**
|
||||
* Tracks outbound messages.
|
||||
*/
|
||||
public class OutboundMessageRegistry {
|
||||
private final Log _log;
|
||||
/** list of currently active MessageSelector instances */
|
||||
private final List _selectors;
|
||||
private final List<MessageSelector> _selectors;
|
||||
/** map of active MessageSelector to either an OutNetMessage or a List of OutNetMessages causing it (for quick removal) */
|
||||
private final Map _selectorToMessage;
|
||||
private final Map<MessageSelector, Object> _selectorToMessage;
|
||||
/** set of active OutNetMessage (for quick removal and selector fetching) */
|
||||
private final Set _activeMessages;
|
||||
private final Set<OutNetMessage> _activeMessages;
|
||||
private final CleanupTask _cleanupTask;
|
||||
private final RouterContext _context;
|
||||
|
||||
@@ -78,16 +82,19 @@ public class OutboundMessageRegistry {
|
||||
* message remains in the registry, but if it shouldn't continue, the matched
|
||||
* message is removed from the registry.
|
||||
*
|
||||
* This is called only by InNetMessagePool.
|
||||
*
|
||||
* @param message Payload received that may be a reply to something we sent
|
||||
* @return List of OutNetMessage describing messages that were waiting for
|
||||
* @return non-null List of OutNetMessage describing messages that were waiting for
|
||||
* the payload
|
||||
*/
|
||||
public List getOriginalMessages(I2NPMessage message) {
|
||||
ArrayList matchedSelectors = null;
|
||||
ArrayList removedSelectors = null;
|
||||
public List<OutNetMessage> getOriginalMessages(I2NPMessage message) {
|
||||
List<MessageSelector> matchedSelectors = null;
|
||||
List<MessageSelector> removedSelectors = null;
|
||||
|
||||
synchronized (_selectors) {
|
||||
for (int i = 0; i < _selectors.size(); i++) {
|
||||
MessageSelector sel = (MessageSelector)_selectors.get(i);
|
||||
for (Iterator<MessageSelector> iter = _selectors.iterator(); iter.hasNext(); ) {
|
||||
MessageSelector sel = iter.next();
|
||||
if (sel == null)
|
||||
continue;
|
||||
boolean isMatch = sel.isMatch(message);
|
||||
@@ -97,18 +104,16 @@ public class OutboundMessageRegistry {
|
||||
if (!sel.continueMatching()) {
|
||||
if (removedSelectors == null) removedSelectors = new ArrayList(1);
|
||||
removedSelectors.add(sel);
|
||||
_selectors.remove(i);
|
||||
i--;
|
||||
iter.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
List rv = null;
|
||||
List<OutNetMessage> rv = null;
|
||||
if (matchedSelectors != null) {
|
||||
rv = new ArrayList(matchedSelectors.size());
|
||||
for (int i = 0; i < matchedSelectors.size(); i++) {
|
||||
MessageSelector sel = (MessageSelector)matchedSelectors.get(i);
|
||||
for (MessageSelector sel : matchedSelectors) {
|
||||
boolean removed = false;
|
||||
OutNetMessage msg = null;
|
||||
List msgs = null;
|
||||
@@ -126,7 +131,7 @@ public class OutboundMessageRegistry {
|
||||
if (msg != null)
|
||||
rv.add(msg);
|
||||
} else if (o instanceof List) {
|
||||
msgs = (List)o;
|
||||
msgs = (List<OutNetMessage>)o;
|
||||
if (msgs != null)
|
||||
rv.addAll(msgs);
|
||||
}
|
||||
@@ -150,6 +155,14 @@ public class OutboundMessageRegistry {
|
||||
return rv;
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers a new, empty OutNetMessage, with the reply and timeout jobs specified.
|
||||
*
|
||||
* @param replySelector non-null; The same selector may be used for more than one message.
|
||||
* @param onReply may be null
|
||||
* @param onTimeout Also called on failed send; may be null
|
||||
* @return an ONM where getMessage() is null. Use it to call unregisterPending() later if desired.
|
||||
*/
|
||||
public OutNetMessage registerPending(MessageSelector replySelector, ReplyJob onReply, Job onTimeout, int timeoutMs) {
|
||||
OutNetMessage msg = new OutNetMessage(_context);
|
||||
msg.setExpiration(_context.clock().now() + timeoutMs);
|
||||
@@ -161,8 +174,19 @@ public class OutboundMessageRegistry {
|
||||
return msg;
|
||||
}
|
||||
|
||||
/**
|
||||
* Register the message. Each message must have a non-null
|
||||
* selector at msg.getReplySelector().
|
||||
* The same selector may be used for more than one message.
|
||||
*
|
||||
* @param msg msg.getMessage() and msg.getReplySelector() must be non-null
|
||||
*/
|
||||
public void registerPending(OutNetMessage msg) { registerPending(msg, false); }
|
||||
public void registerPending(OutNetMessage msg, boolean allowEmpty) {
|
||||
|
||||
/**
|
||||
* @param allowEmpty is msg.getMessage() allowed to be null?
|
||||
*/
|
||||
private void registerPending(OutNetMessage msg, boolean allowEmpty) {
|
||||
if ( (!allowEmpty) && (msg.getMessage() == null) )
|
||||
throw new IllegalArgumentException("OutNetMessage doesn't contain an I2NPMessage? wtf");
|
||||
MessageSelector sel = msg.getReplySelector();
|
||||
@@ -184,7 +208,7 @@ public class OutboundMessageRegistry {
|
||||
multi.add(msg);
|
||||
_selectorToMessage.put(sel, multi);
|
||||
} else if (oldMsg instanceof List) {
|
||||
multi = (List)oldMsg;
|
||||
multi = (List<OutNetMessage>)oldMsg;
|
||||
multi.add(msg);
|
||||
_selectorToMessage.put(sel, multi);
|
||||
}
|
||||
@@ -197,6 +221,9 @@ public class OutboundMessageRegistry {
|
||||
_cleanupTask.scheduleExpiration(sel);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param msg may be be null
|
||||
*/
|
||||
public void unregisterPending(OutNetMessage msg) {
|
||||
if (msg == null) return;
|
||||
MessageSelector sel = msg.getReplySelector();
|
||||
@@ -205,7 +232,7 @@ public class OutboundMessageRegistry {
|
||||
Object old = _selectorToMessage.remove(sel);
|
||||
if (old != null) {
|
||||
if (old instanceof List) {
|
||||
List l = (List)old;
|
||||
List<OutNetMessage> l = (List<OutNetMessage>)old;
|
||||
l.remove(msg);
|
||||
if (!l.isEmpty()) {
|
||||
_selectorToMessage.put(sel, l);
|
||||
@@ -219,42 +246,42 @@ public class OutboundMessageRegistry {
|
||||
synchronized (_activeMessages) { _activeMessages.remove(msg); }
|
||||
}
|
||||
|
||||
/** @deprecated unused */
|
||||
public void renderStatusHTML(Writer out) throws IOException {}
|
||||
|
||||
private class CleanupTask implements SimpleTimer.TimedEvent {
|
||||
private long _nextExpire;
|
||||
|
||||
public CleanupTask() {
|
||||
_nextExpire = -1;
|
||||
}
|
||||
|
||||
public void timeReached() {
|
||||
long now = _context.clock().now();
|
||||
List removing = new ArrayList(1);
|
||||
List<MessageSelector> removing = new ArrayList(1);
|
||||
synchronized (_selectors) {
|
||||
for (int i = 0; i < _selectors.size(); i++) {
|
||||
MessageSelector sel = (MessageSelector)_selectors.get(i);
|
||||
if (sel == null) continue;
|
||||
for (Iterator<MessageSelector> iter = _selectors.iterator(); iter.hasNext(); ) {
|
||||
MessageSelector sel = iter.next();
|
||||
long expiration = sel.getExpiration();
|
||||
if (expiration <= now) {
|
||||
removing.add(sel);
|
||||
_selectors.remove(i);
|
||||
i--;
|
||||
iter.remove();
|
||||
} else if (expiration < _nextExpire || _nextExpire < now) {
|
||||
_nextExpire = expiration;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!removing.isEmpty()) {
|
||||
for (int i = 0; i < removing.size(); i++) {
|
||||
MessageSelector sel = (MessageSelector)removing.get(i);
|
||||
for (MessageSelector sel : removing) {
|
||||
OutNetMessage msg = null;
|
||||
List msgs = null;
|
||||
List<OutNetMessage> msgs = null;
|
||||
synchronized (_selectorToMessage) {
|
||||
Object o = _selectorToMessage.remove(sel);
|
||||
if (o instanceof OutNetMessage) {
|
||||
msg = (OutNetMessage)o;
|
||||
} else if (o instanceof List) {
|
||||
//msgs = new ArrayList((List)o);
|
||||
msgs = (List)o;
|
||||
msgs = (List<OutNetMessage>)o;
|
||||
}
|
||||
}
|
||||
if (msg != null) {
|
||||
@@ -268,9 +295,8 @@ public class OutboundMessageRegistry {
|
||||
synchronized (_activeMessages) {
|
||||
_activeMessages.removeAll(msgs);
|
||||
}
|
||||
for (int j = 0; j < msgs.size(); j++) {
|
||||
msg = (OutNetMessage)msgs.get(j);
|
||||
Job fail = msg.getOnFailedReplyJob();
|
||||
for (OutNetMessage m : msgs) {
|
||||
Job fail = m.getOnFailedReplyJob();
|
||||
if (fail != null)
|
||||
_context.jobQueue().addJob(fail);
|
||||
}
|
||||
@@ -282,6 +308,7 @@ public class OutboundMessageRegistry {
|
||||
_nextExpire = now + 10*1000;
|
||||
SimpleTimer.getInstance().addEvent(CleanupTask.this, _nextExpire - now);
|
||||
}
|
||||
|
||||
public void scheduleExpiration(MessageSelector sel) {
|
||||
long now = _context.clock().now();
|
||||
if ( (_nextExpire <= now) || (sel.getExpiration() < _nextExpire) ) {
|
||||
|
Reference in New Issue
Block a user