* I2CP: Prep for per-message reliability settings (ticket #788)

Router side:
     Store message nonce in ClientMessage, so we may send
     a MessageStatusMessage with a failure code to the client
     without sending an ACCEPTED MessageStatusMessage first.
     All MessageStatusMessages sent in response to outbound messages will now have a valid nonce.
This commit is contained in:
zzz
2014-05-14 13:49:42 +00:00
parent dbb7eb3d88
commit a93666cd36
7 changed files with 65 additions and 23 deletions

View File

@@ -70,9 +70,12 @@ public abstract class ClientManagerFacade implements Service {
public abstract boolean isLocal(Hash destHash); public abstract boolean isLocal(Hash destHash);
/** /**
* @param id the router's ID for this message
* @param messageNonce the client's ID for this message
* @param status see I2CP MessageStatusMessage for success/failure codes * @param status see I2CP MessageStatusMessage for success/failure codes
*/ */
public abstract void messageDeliveryStatusUpdate(Destination fromDest, MessageId id, int status); public abstract void messageDeliveryStatusUpdate(Destination fromDest, MessageId id,
long messageNonce, int status);
public abstract void messageReceived(ClientMessage msg); public abstract void messageReceived(ClientMessage msg);

View File

@@ -29,22 +29,26 @@ public class ClientMessage {
private final SessionConfig _senderConfig; private final SessionConfig _senderConfig;
private final Hash _destinationHash; private final Hash _destinationHash;
private final MessageId _messageId; private final MessageId _messageId;
private final long _messageNonce;
private final long _expiration; private final long _expiration;
/** only for outbound messages */ /** only for outbound messages */
private final int _flags; private final int _flags;
/** /**
* For outbound (locally originated) * For outbound (locally originated)
* @param msgID the router's ID for this message
* @param messageNonce the client's ID for this message
* @since 0.9.9 * @since 0.9.9
*/ */
public ClientMessage(Destination toDest, Payload payload, SessionConfig config, Destination fromDest, public ClientMessage(Destination toDest, Payload payload, SessionConfig config, Destination fromDest,
MessageId msgID, long expiration, int flags) { MessageId msgID, long messageNonce, long expiration, int flags) {
_destination = toDest; _destination = toDest;
_destinationHash = null; _destinationHash = null;
_payload = payload; _payload = payload;
_senderConfig = config; _senderConfig = config;
_fromDestination = fromDest; _fromDestination = fromDest;
_messageId = msgID; _messageId = msgID;
_messageNonce = messageNonce;
_expiration = expiration; _expiration = expiration;
_flags = flags; _flags = flags;
} }
@@ -60,6 +64,7 @@ public class ClientMessage {
_senderConfig = null; _senderConfig = null;
_fromDestination = null; _fromDestination = null;
_messageId = null; _messageId = null;
_messageNonce = 0;
_expiration = 0; _expiration = 0;
_flags = 0; _flags = 0;
} }
@@ -94,6 +99,12 @@ public class ClientMessage {
*/ */
public MessageId getMessageId() { return _messageId; } public MessageId getMessageId() { return _messageId; }
/**
* Valid for outbound; 0 for inbound.
* @since 0.9.14
*/
public long getMessageNonce() { return _messageNonce; }
/** /**
* Retrieve the information regarding how the router received this message. Only * Retrieve the information regarding how the router received this message. Only
* messages received from the network will have this information, not locally * messages received from the network will have this information, not locally

View File

@@ -66,14 +66,20 @@ class ClientConnectionRunner {
/** user's config */ /** user's config */
private SessionConfig _config; private SessionConfig _config;
private String _clientVersion; private String _clientVersion;
/** static mapping of MessageId to Payload, storing messages for retrieval */ /**
* Mapping of MessageId to Payload, storing messages for retrieval.
* Unused for i2cp.fastReceive = "true" (_dontSendMSMOnRecive = true)
*/
private final Map<MessageId, Payload> _messages; private final Map<MessageId, Payload> _messages;
/** lease set request state, or null if there is no request pending on at the moment */ /** lease set request state, or null if there is no request pending on at the moment */
private LeaseRequestState _leaseRequest; private LeaseRequestState _leaseRequest;
private int _consecutiveLeaseRequestFails; private int _consecutiveLeaseRequestFails;
/** currently allocated leaseSet, or null if none is allocated */ /** currently allocated leaseSet, or null if none is allocated */
private LeaseSet _currentLeaseSet; private LeaseSet _currentLeaseSet;
/** set of messageIds created but not yet ACCEPTED */ /**
* Set of messageIds created but not yet ACCEPTED.
* Unused for i2cp.messageReliability = "none" (_dontSendMSM = true)
*/
private final Set<MessageId> _acceptedPending; private final Set<MessageId> _acceptedPending;
/** thingy that does stuff */ /** thingy that does stuff */
protected I2CPMessageReader _reader; protected I2CPMessageReader _reader;
@@ -323,12 +329,14 @@ class ClientConnectionRunner {
* *
* Do not use for status = STATUS_SEND_ACCEPTED; use ackSendMessage() for that. * Do not use for status = STATUS_SEND_ACCEPTED; use ackSendMessage() for that.
* *
* @param id the router's ID for this message
* @param messageNonce the client's ID for this message
* @param status see I2CP MessageStatusMessage for success/failure codes * @param status see I2CP MessageStatusMessage for success/failure codes
*/ */
void updateMessageDeliveryStatus(MessageId id, int status) { void updateMessageDeliveryStatus(MessageId id, long messageNonce, int status) {
if (_dead || _dontSendMSM) if (_dead || messageNonce <= 0)
return; return;
_context.jobQueue().addJob(new MessageDeliveryStatusUpdate(id, status)); _context.jobQueue().addJob(new MessageDeliveryStatusUpdate(id, messageNonce, status));
} }
/** /**
@@ -414,7 +422,7 @@ class ClientConnectionRunner {
expiration = msg.getExpirationTime(); expiration = msg.getExpirationTime();
flags = msg.getFlags(); flags = msg.getFlags();
} }
if (message.getNonce() != 0 && !_dontSendMSM) if ((!_dontSendMSM) && message.getNonce() != 0)
_acceptedPending.add(id); _acceptedPending.add(id);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
@@ -424,7 +432,8 @@ class ClientConnectionRunner {
// the following blocks as described above // the following blocks as described above
SessionConfig cfg = _config; SessionConfig cfg = _config;
if (cfg != null) if (cfg != null)
_manager.distributeMessage(cfg.getDestination(), dest, payload, id, expiration, flags); _manager.distributeMessage(cfg.getDestination(), dest, payload,
id, message.getNonce(), expiration, flags);
// else log error? // else log error?
//long timeToDistribute = _context.clock().now() - beforeDistribute; //long timeToDistribute = _context.clock().now() - beforeDistribute;
//if (_log.shouldLog(Log.DEBUG)) //if (_log.shouldLog(Log.DEBUG))
@@ -687,6 +696,7 @@ class ClientConnectionRunner {
private class MessageDeliveryStatusUpdate extends JobImpl { private class MessageDeliveryStatusUpdate extends JobImpl {
private final MessageId _messageId; private final MessageId _messageId;
private final long _messageNonce;
private final int _status; private final int _status;
private long _lastTried; private long _lastTried;
private int _requeueCount; private int _requeueCount;
@@ -694,11 +704,14 @@ class ClientConnectionRunner {
/** /**
* Do not use for status = STATUS_SEND_ACCEPTED; use ackSendMessage() for that. * Do not use for status = STATUS_SEND_ACCEPTED; use ackSendMessage() for that.
* *
* @param id the router's ID for this message
* @param messageNonce the client's ID for this message
* @param status see I2CP MessageStatusMessage for success/failure codes * @param status see I2CP MessageStatusMessage for success/failure codes
*/ */
public MessageDeliveryStatusUpdate(MessageId id, int status) { public MessageDeliveryStatusUpdate(MessageId id, long messageNonce, int status) {
super(ClientConnectionRunner.this._context); super(ClientConnectionRunner.this._context);
_messageId = id; _messageId = id;
_messageNonce = messageNonce;
_status = status; _status = status;
} }
@@ -714,7 +727,7 @@ class ClientConnectionRunner {
msg.setMessageId(_messageId.getMessageId()); msg.setMessageId(_messageId.getMessageId());
msg.setSessionId(_sessionId.getSessionId()); msg.setSessionId(_sessionId.getSessionId());
// has to be >= 0, it is initialized to -1 // has to be >= 0, it is initialized to -1
msg.setNonce(2); msg.setNonce(_messageNonce);
msg.setSize(0); msg.setSize(0);
msg.setStatus(_status); msg.setStatus(_status);

View File

@@ -265,9 +265,12 @@ class ClientManager {
/** /**
* Distribute message to a local or remote destination. * Distribute message to a local or remote destination.
* @param msgId the router's ID for this message
* @param messageNonce the client's ID for this message
* @param flags ignored for local * @param flags ignored for local
*/ */
void distributeMessage(Destination fromDest, Destination toDest, Payload payload, MessageId msgId, long expiration, int flags) { void distributeMessage(Destination fromDest, Destination toDest, Payload payload,
MessageId msgId, long messageNonce, long expiration, int flags) {
// check if there is a runner for it // check if there is a runner for it
ClientConnectionRunner runner = getRunner(toDest); ClientConnectionRunner runner = getRunner(toDest);
if (runner != null) { if (runner != null) {
@@ -279,7 +282,7 @@ class ClientManager {
return; return;
} }
// TODO can we just run this inline instead? // TODO can we just run this inline instead?
_ctx.jobQueue().addJob(new DistributeLocal(toDest, runner, sender, fromDest, payload, msgId)); _ctx.jobQueue().addJob(new DistributeLocal(toDest, runner, sender, fromDest, payload, msgId, messageNonce));
} else { } else {
// remote. w00t // remote. w00t
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
@@ -291,7 +294,7 @@ class ClientManager {
} }
ClientMessage msg = new ClientMessage(toDest, payload, runner.getConfig(), ClientMessage msg = new ClientMessage(toDest, payload, runner.getConfig(),
runner.getConfig().getDestination(), msgId, runner.getConfig().getDestination(), msgId,
expiration, flags); messageNonce, expiration, flags);
_ctx.clientMessagePool().add(msg, true); _ctx.clientMessagePool().add(msg, true);
} }
} }
@@ -303,8 +306,14 @@ class ClientManager {
private final Destination _fromDest; private final Destination _fromDest;
private final Payload _payload; private final Payload _payload;
private final MessageId _msgId; private final MessageId _msgId;
private final long _messageNonce;
public DistributeLocal(Destination toDest, ClientConnectionRunner to, ClientConnectionRunner from, Destination fromDest, Payload payload, MessageId id) { /**
* @param msgId the router's ID for this message
* @param messageNonce the client's ID for this message
*/
public DistributeLocal(Destination toDest, ClientConnectionRunner to, ClientConnectionRunner from,
Destination fromDest, Payload payload, MessageId id, long messageNonce) {
super(_ctx); super(_ctx);
_toDest = toDest; _toDest = toDest;
_to = to; _to = to;
@@ -312,6 +321,7 @@ class ClientManager {
_fromDest = fromDest; _fromDest = fromDest;
_payload = payload; _payload = payload;
_msgId = id; _msgId = id;
_messageNonce = messageNonce;
} }
public String getName() { return "Distribute local message"; } public String getName() { return "Distribute local message"; }
@@ -319,7 +329,7 @@ class ClientManager {
public void runJob() { public void runJob() {
_to.receiveMessage(_toDest, _fromDest, _payload); _to.receiveMessage(_toDest, _fromDest, _payload);
if (_from != null) { if (_from != null) {
_from.updateMessageDeliveryStatus(_msgId, MessageStatusMessage.STATUS_SEND_SUCCESS_LOCAL); _from.updateMessageDeliveryStatus(_msgId, _messageNonce, MessageStatusMessage.STATUS_SEND_SUCCESS_LOCAL);
} }
} }
} }
@@ -437,15 +447,17 @@ class ClientManager {
} }
/** /**
* @param id the router's ID for this message
* @param messageNonce the client's ID for this message
* @param status see I2CP MessageStatusMessage for success/failure codes * @param status see I2CP MessageStatusMessage for success/failure codes
*/ */
public void messageDeliveryStatusUpdate(Destination fromDest, MessageId id, int status) { public void messageDeliveryStatusUpdate(Destination fromDest, MessageId id, long messageNonce, int status) {
ClientConnectionRunner runner = getRunner(fromDest); ClientConnectionRunner runner = getRunner(fromDest);
if (runner != null) { if (runner != null) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Delivering status " + status + " to " _log.debug("Delivering status " + status + " to "
+ fromDest.calculateHash() + " for message " + id); + fromDest.calculateHash() + " for message " + id);
runner.updateMessageDeliveryStatus(id, status); runner.updateMessageDeliveryStatus(id, messageNonce, status);
} else { } else {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Cannot deliver status " + status + " to " _log.warn("Cannot deliver status " + status + " to "

View File

@@ -179,11 +179,13 @@ public class ClientManagerFacadeImpl extends ClientManagerFacade implements Inte
public boolean shouldPublishLeaseSet(Hash destinationHash) { return _manager.shouldPublishLeaseSet(destinationHash); } public boolean shouldPublishLeaseSet(Hash destinationHash) { return _manager.shouldPublishLeaseSet(destinationHash); }
/** /**
* @param id the router's ID for this message
* @param messageNonce the client's ID for this message
* @param status see I2CP MessageStatusMessage for success/failure codes * @param status see I2CP MessageStatusMessage for success/failure codes
*/ */
public void messageDeliveryStatusUpdate(Destination fromDest, MessageId id, int status) { public void messageDeliveryStatusUpdate(Destination fromDest, MessageId id, long messageNonce, int status) {
if (_manager != null) if (_manager != null)
_manager.messageDeliveryStatusUpdate(fromDest, id, status); _manager.messageDeliveryStatusUpdate(fromDest, id, messageNonce, status);
else else
_log.error("Null manager on messageDeliveryStatusUpdate!"); _log.error("Null manager on messageDeliveryStatusUpdate!");
} }

View File

@@ -43,7 +43,7 @@ public class DummyClientManagerFacade extends ClientManagerFacade {
public void shutdown(String msg) {} public void shutdown(String msg) {}
public void restart() {} public void restart() {}
public void messageDeliveryStatusUpdate(Destination fromDest, MessageId id, int status) {} public void messageDeliveryStatusUpdate(Destination fromDest, MessageId id, long msgNonce, int status) {}
public SessionConfig getClientSessionConfig(Destination _dest) { return null; } public SessionConfig getClientSessionConfig(Destination _dest) { return null; }
public SessionKeyManager getClientSessionKeyManager(Hash _dest) { return null; } public SessionKeyManager getClientSessionKeyManager(Hash _dest) { return null; }

View File

@@ -769,7 +769,8 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
clearCaches(); clearCaches();
getContext().messageHistory().sendPayloadMessage(_clientMessageId.getMessageId(), false, sendTime); getContext().messageHistory().sendPayloadMessage(_clientMessageId.getMessageId(), false, sendTime);
getContext().clientManager().messageDeliveryStatusUpdate(_from, _clientMessageId, status); getContext().clientManager().messageDeliveryStatusUpdate(_from, _clientMessageId,
_clientMessage.getMessageNonce(), status);
getContext().statManager().updateFrequency("client.sendMessageFailFrequency"); getContext().statManager().updateFrequency("client.sendMessageFailFrequency");
} }
@@ -917,7 +918,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
//long dataMsgId = _cloveId; // fake ID 99999 //long dataMsgId = _cloveId; // fake ID 99999
getContext().messageHistory().sendPayloadMessage(99999, true, sendTime); getContext().messageHistory().sendPayloadMessage(99999, true, sendTime);
getContext().clientManager().messageDeliveryStatusUpdate(_from, _clientMessageId, getContext().clientManager().messageDeliveryStatusUpdate(_from, _clientMessageId, _clientMessage.getMessageNonce(),
MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS); MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS);
// unused // unused
//_lease.setNumSuccess(_lease.getNumSuccess()+1); //_lease.setNumSuccess(_lease.getNumSuccess()+1);