* OutNetMessage: Remove setters, make fields final

This commit is contained in:
zzz
2013-10-18 18:42:48 +00:00
parent c71b485083
commit bdd9900d0d
12 changed files with 65 additions and 142 deletions

View File

@@ -31,14 +31,14 @@ import net.i2p.util.Log;
public class OutNetMessage implements CDPQEntry {
private final Log _log;
private final RouterContext _context;
private RouterInfo _target;
private I2NPMessage _message;
private int _messageTypeId;
private final RouterInfo _target;
private final I2NPMessage _message;
private final int _messageTypeId;
/** cached message ID, for use after we discard the message */
private long _messageId;
private long _messageSize;
private int _priority;
private long _expiration;
private final long _messageId;
private final long _messageSize;
private final int _priority;
private final long _expiration;
private Job _onSend;
private Job _onFailedSend;
private ReplyJob _onReply;
@@ -81,11 +81,37 @@ public class OutNetMessage implements CDPQEntry {
public static final int PRIORITY_NETDB_HARVEST = 100;
public static final int PRIORITY_LOWEST = 100;
public OutNetMessage(RouterContext context) {
/**
* Null msg and target (used in OutboundMessageRegistry only)
* @since 0.9.9
*/
public OutNetMessage(RouterContext context, long expiration) {
this(context, null, expiration, -1, null);
}
/**
* Standard constructor
* @param msg generally non-null
* @param target generally non-null
* @since 0.9.9
*/
public OutNetMessage(RouterContext context, I2NPMessage msg, long expiration, int priority, RouterInfo target) {
_context = context;
_log = context.logManager().getLog(OutNetMessage.class);
_priority = -1;
_expiration = -1;
_message = msg;
if (msg != null) {
_messageTypeId = msg.getType();
_messageId = msg.getUniqueId();
_messageSize = _message.getMessageSize();
} else {
_messageTypeId = 0;
_messageId = 0;
_messageSize = 0;
}
_priority = priority;
_expiration = expiration;
_target = target;
//_createdBy = new Exception("Created by");
_created = context.clock().now();
if (_log.shouldLog(Log.INFO))
@@ -160,7 +186,6 @@ public class OutNetMessage implements CDPQEntry {
*
*/
public RouterInfo getTarget() { return _target; }
public void setTarget(RouterInfo target) { _target = target; }
/**
* Specifies the message to be sent
@@ -168,15 +193,6 @@ public class OutNetMessage implements CDPQEntry {
*/
public I2NPMessage getMessage() { return _message; }
public void setMessage(I2NPMessage msg) {
_message = msg;
if (msg != null) {
_messageTypeId = msg.getType();
_messageId = msg.getUniqueId();
_messageSize = _message.getMessageSize();
}
}
/**
* For debugging only.
* @return the simple class name
@@ -190,18 +206,19 @@ public class OutNetMessage implements CDPQEntry {
public long getMessageId() { return _messageId; }
public long getMessageSize() {
if (_messageSize <= 0) {
_messageSize = _message.getMessageSize();
}
return _messageSize;
}
/**
* Copies the message data to outbuffer.
* Used only by VM Comm System.
* @return the length, or -1 if message is null
*/
public int getMessageData(byte outBuffer[]) {
if (_message == null) {
return -1;
} else {
int len = _message.toByteArray(outBuffer);
_messageSize = len;
return len;
}
}
@@ -213,7 +230,7 @@ public class OutNetMessage implements CDPQEntry {
*
*/
public int getPriority() { return _priority; }
public void setPriority(int priority) { _priority = priority; }
/**
* Specify the # ms since the epoch after which if the message has not been
* sent the OnFailedSend job should be fired and the message should be
@@ -222,7 +239,7 @@ public class OutNetMessage implements CDPQEntry {
*
*/
public long getExpiration() { return _expiration; }
public void setExpiration(long expiration) { _expiration = expiration; }
/**
* After the message is successfully passed to the router specified, the
* given job is enqueued.
@@ -230,6 +247,7 @@ public class OutNetMessage implements CDPQEntry {
*/
public Job getOnSendJob() { return _onSend; }
public void setOnSendJob(Job job) { _onSend = job; }
/**
* If the router could not be reached or the expiration passed, this job
* is enqueued.
@@ -237,18 +255,21 @@ public class OutNetMessage implements CDPQEntry {
*/
public Job getOnFailedSendJob() { return _onFailedSend; }
public void setOnFailedSendJob(Job job) { _onFailedSend = job; }
/**
* If the MessageSelector detects a reply, this job is enqueued
*
*/
public ReplyJob getOnReplyJob() { return _onReply; }
public void setOnReplyJob(ReplyJob job) { _onReply = job; }
/**
* If the Message selector is specified but it doesn't find a reply before
* its expiration passes, this job is enqueued.
*/
public Job getOnFailedReplyJob() { return _onFailedReply; }
public void setOnFailedReplyJob(Job job) { _onFailedReply = job; }
/**
* Defines a MessageSelector to find a reply to this message.
*
@@ -256,13 +277,13 @@ public class OutNetMessage implements CDPQEntry {
public MessageSelector getReplySelector() { return _replySelector; }
public void setReplySelector(MessageSelector selector) { _replySelector = selector; }
public void transportFailed(String transportStyle) {
public synchronized void transportFailed(String transportStyle) {
if (_failedTransports == null)
_failedTransports = new HashSet(2);
_failedTransports.add(transportStyle);
}
/** not thread safe - dont fail transports and iterate over this at the same time */
public Set getFailedTransports() {
public synchronized Set getFailedTransports() {
return (_failedTransports == null ? Collections.EMPTY_SET : _failedTransports);
}
@@ -345,38 +366,8 @@ public class OutNetMessage implements CDPQEntry {
* we may keep the object around for a while to use its ID, jobs, etc.
*/
public void discardData() {
if ( (_message != null) && (_messageSize <= 0) )
_messageSize = _message.getMessageSize();
//if (_log.shouldLog(Log.DEBUG)) {
// long timeToDiscard = _context.clock().now() - _created;
// _log.debug("Discard " + _messageSize + "byte " + getMessageType() + " message after "
// + timeToDiscard);
//}
_message = null;
//_context.statManager().addRateData("outNetMessage.timeToDiscard", timeToDiscard, timeToDiscard);
//_context.messageStateMonitor().outboundMessageDiscarded();
}
/*
public void finalize() throws Throwable {
if (_message != null) {
if (_log.shouldLog(Log.WARN)) {
StringBuilder buf = new StringBuilder(1024);
buf.append("Undiscarded ").append(_messageSize).append("byte ");
buf.append(_messageType).append(" message created ");
buf.append((_context.clock().now() - _created)).append("ms ago: ");
buf.append(_messageId); // .append(" to ").append(_target.calculateHash().toBase64());
buf.append(", timing - \n");
renderTimestamps(buf);
_log.warn(buf.toString(), _createdBy);
}
_context.messageStateMonitor().outboundMessageDiscarded();
}
_context.messageStateMonitor().outboundMessageFinalized();
super.finalize();
}
*/
@Override
public String toString() {
StringBuilder buf = new StringBuilder(256);
@@ -445,22 +436,4 @@ public class OutNetMessage implements CDPQEntry {
return _fmt.format(d);
}
}
/****
@Override
public int hashCode() {
int rv = DataHelper.hashCode(_message);
rv ^= DataHelper.hashCode(_target);
// the others are pretty much inconsequential
return rv;
}
@Override
public boolean equals(Object obj) {
//if(obj == null) return false;
//if(!(obj instanceof OutNetMessage)) return false;
return obj == this; // two OutNetMessages are different even if they contain the same message
}
****/
}

View File

@@ -129,16 +129,12 @@ public class SendMessageDirectJob extends JobImpl {
Hash us = getContext().routerHash();
if (us.equals(to)) {
if (_selector != null) {
OutNetMessage outM = new OutNetMessage(getContext());
outM.setExpiration(_expiration);
outM.setMessage(_message);
OutNetMessage outM = new OutNetMessage(getContext(), _message, _expiration, _priority, _router);
outM.setOnFailedReplyJob(_onFail);
outM.setOnFailedSendJob(_onFail);
outM.setOnReplyJob(_onSuccess);
outM.setOnSendJob(_onSend);
outM.setPriority(_priority);
outM.setReplySelector(_selector);
outM.setTarget(_router);
getContext().messageRegistry().registerPending(outM);
}
@@ -152,16 +148,12 @@ public class SendMessageDirectJob extends JobImpl {
+ " to inbound message pool as it was destined for ourselves");
//_log.debug("debug", _createdBy);
} else {
OutNetMessage msg = new OutNetMessage(getContext());
msg.setExpiration(_expiration);
msg.setMessage(_message);
OutNetMessage msg = new OutNetMessage(getContext(), _message, _expiration, _priority, _router);
msg.setOnFailedReplyJob(_onFail);
msg.setOnFailedSendJob(_onFail);
msg.setOnReplyJob(_onSuccess);
msg.setOnSendJob(_onSend);
msg.setPriority(_priority);
msg.setReplySelector(_selector);
msg.setTarget(_router);
getContext().outNetMessagePool().add(msg);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Adding " + _message.getClass().getName()

View File

@@ -214,12 +214,7 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
msg.setReplyGateway(null);
msg.setReplyToken(0);
msg.setReplyTunnel(null);
OutNetMessage m = new OutNetMessage(_context);
m.setMessage(msg);
m.setOnFailedReplyJob(null);
m.setPriority(FLOOD_PRIORITY);
m.setTarget(target);
m.setExpiration(_context.clock().now()+FLOOD_TIMEOUT);
OutNetMessage m = new OutNetMessage(_context, msg, _context.clock().now()+FLOOD_TIMEOUT, FLOOD_PRIORITY, target);
// note send failure but don't give credit on success
// might need to change this
Job floodFail = new FloodFailedJob(_context, peer);

View File

@@ -315,15 +315,11 @@ class StoreJob extends JobImpl {
if (_log.shouldLog(Log.DEBUG))
_log.debug("sending store directly to " + peer.getIdentity().getHash());
OutNetMessage m = new OutNetMessage(getContext());
m.setExpiration(expiration);
m.setMessage(msg);
OutNetMessage m = new OutNetMessage(getContext(), msg, expiration, STORE_PRIORITY, peer);
m.setOnFailedReplyJob(onFail);
m.setOnFailedSendJob(onFail);
m.setOnReplyJob(onReply);
m.setPriority(STORE_PRIORITY);
m.setReplySelector(selector);
m.setTarget(peer);
getContext().messageRegistry().registerPending(m);
getContext().commSystem().processMessage(m);
}

View File

@@ -171,8 +171,7 @@ public class OutboundMessageRegistry {
* @return an ONM where getMessage() is null. Use it to call unregisterPending() later if desired.
*/
public OutNetMessage registerPending(MessageSelector replySelector, ReplyJob onReply, Job onTimeout, int timeoutMs) {
OutNetMessage msg = new OutNetMessage(_context);
msg.setExpiration(_context.clock().now() + timeoutMs);
OutNetMessage msg = new OutNetMessage(_context, _context.clock().now() + timeoutMs);
msg.setOnFailedReplyJob(onTimeout);
msg.setOnFailedSendJob(onTimeout);
msg.setOnReplyJob(onReply);

View File

@@ -489,15 +489,11 @@ class NTCPConnection {
}
public void enqueueInfoMessage() {
OutNetMessage infoMsg = new OutNetMessage(_context);
infoMsg.setExpiration(_context.clock().now()+10*1000);
DatabaseStoreMessage dsm = new DatabaseStoreMessage(_context);
dsm.setEntry(_context.router().getRouterInfo());
infoMsg.setMessage(dsm);
infoMsg.setPriority(PRIORITY);
RouterInfo target = _context.netDb().lookupRouterInfoLocally(_remotePeer.calculateHash());
if (target != null) {
infoMsg.setTarget(target);
DatabaseStoreMessage dsm = new DatabaseStoreMessage(_context);
dsm.setEntry(_context.router().getRouterInfo());
OutNetMessage infoMsg = new OutNetMessage(_context, dsm, _context.clock().now()+10*1000, PRIORITY, target);
infoMsg.beginSend();
_context.statManager().addRateData("ntcp.infoMessageEnqueued", 1);
send(infoMsg);

View File

@@ -57,11 +57,7 @@ class InboundGatewayReceiver implements TunnelGateway.Receiver {
msg.setData(encrypted);
msg.setTunnelId(_config.getSendTunnel());
OutNetMessage out = new OutNetMessage(_context);
out.setMessage(msg);
out.setTarget(_target);
out.setExpiration(msg.getMessageExpiration());
out.setPriority(PRIORITY);
OutNetMessage out = new OutNetMessage(_context, msg, msg.getMessageExpiration(), PRIORITY, _target);
_context.outNetMessagePool().add(out);
return msg.getUniqueId();
}

View File

@@ -66,11 +66,7 @@ class OutboundMessageDistributor {
_context.inNetMessagePool().add(m, null, null);
return;
} else {
OutNetMessage out = new OutNetMessage(_context);
out.setExpiration(_context.clock().now() + MAX_DISTRIBUTE_TIME);
out.setTarget(target);
out.setMessage(m);
out.setPriority(_priority);
OutNetMessage out = new OutNetMessage(_context, m, _context.clock().now() + MAX_DISTRIBUTE_TIME, _priority, target);
if (_log.shouldLog(Log.DEBUG))
_log.debug("queueing outbound message to " + target.getIdentity().calculateHash());

View File

@@ -71,11 +71,7 @@ class OutboundReceiver implements TunnelGateway.Receiver {
private void send(TunnelDataMessage msg, RouterInfo ri) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("forwarding encrypted data out " + _config + ": " + msg.getUniqueId());
OutNetMessage m = new OutNetMessage(_context);
m.setMessage(msg);
m.setExpiration(msg.getMessageExpiration());
m.setTarget(ri);
m.setPriority(_priority);
OutNetMessage m = new OutNetMessage(_context, msg, msg.getMessageExpiration(), _priority, ri);
_context.outNetMessagePool().add(m);
_config.incrementProcessedMessages();
}

View File

@@ -192,12 +192,8 @@ class TunnelParticipant {
_context.messageHistory().wrap("TunnelDataMessage", oldId, "TunnelDataMessage", newId);
msg.setUniqueId(newId);
msg.setMessageExpiration(_context.clock().now() + 10*1000);
OutNetMessage m = new OutNetMessage(_context);
msg.setTunnelId(config.getSendTunnel());
m.setMessage(msg);
m.setExpiration(msg.getMessageExpiration());
m.setTarget(ri);
m.setPriority(PRIORITY);
OutNetMessage m = new OutNetMessage(_context, msg, msg.getMessageExpiration(), PRIORITY, ri);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Forward on from " + _config + ": " + msg);
_context.outNetMessagePool().add(m);

View File

@@ -781,11 +781,7 @@ class BuildHandler implements Runnable {
if (!isOutEnd) {
state.msg.setUniqueId(req.readReplyMessageId());
state.msg.setMessageExpiration(_context.clock().now() + NEXT_HOP_SEND_TIMEOUT);
OutNetMessage msg = new OutNetMessage(_context);
msg.setMessage(state.msg);
msg.setExpiration(state.msg.getMessageExpiration());
msg.setPriority(PRIORITY);
msg.setTarget(nextPeerInfo);
OutNetMessage msg = new OutNetMessage(_context, state.msg, state.msg.getMessageExpiration(), PRIORITY, nextPeerInfo);
if (response == 0)
msg.setOnFailedSendJob(new TunnelBuildNextHopFailJob(_context, cfg));
_context.outNetMessagePool().add(msg);
@@ -814,11 +810,7 @@ class BuildHandler implements Runnable {
_context.tunnelDispatcher().dispatch(m);
} else {
// ok, the gateway is some other peer, shove 'er across
OutNetMessage outMsg = new OutNetMessage(_context);
outMsg.setExpiration(m.getMessageExpiration());
outMsg.setMessage(m);
outMsg.setPriority(PRIORITY);
outMsg.setTarget(nextPeerInfo);
OutNetMessage outMsg = new OutNetMessage(_context, m, m.getMessageExpiration(), PRIORITY, nextPeerInfo);
if (response == 0)
outMsg.setOnFailedSendJob(new TunnelBuildNextHopFailJob(_context, cfg));
_context.outNetMessagePool().add(outMsg);

View File

@@ -172,15 +172,11 @@ abstract class BuildRequestor {
+ " for " + cfg + " waiting for the reply of " + cfg.getReplyMessageId()
+ " with msgId=" + msg.getUniqueId());
// send it directly to the first hop
OutNetMessage outMsg = new OutNetMessage(ctx);
// Add some fuzz to the TBM expiration to make it harder to guess how many hops
// or placement in the tunnel
msg.setMessageExpiration(ctx.clock().now() + BUILD_MSG_TIMEOUT + ctx.random().nextLong(20*1000));
// We set the OutNetMessage expiration much shorter, so that the
// TunnelBuildFirstHopFailJob fires before the 13s build expiration.
outMsg.setExpiration(ctx.clock().now() + FIRST_HOP_TIMEOUT);
outMsg.setMessage(msg);
outMsg.setPriority(PRIORITY);
RouterInfo peer = ctx.netDb().lookupRouterInfoLocally(cfg.getPeer(1));
if (peer == null) {
if (log.shouldLog(Log.WARN))
@@ -188,7 +184,7 @@ abstract class BuildRequestor {
exec.buildComplete(cfg, pool);
return;
}
outMsg.setTarget(peer);
OutNetMessage outMsg = new OutNetMessage(ctx, msg, ctx.clock().now() + FIRST_HOP_TIMEOUT, PRIORITY, peer);
outMsg.setOnFailedSendJob(new TunnelBuildFirstHopFailJob(ctx, pool, cfg, exec));
ctx.outNetMessagePool().add(outMsg);
}