From a93666cd3621b577deb7ab188a9b6c6b1bf33959 Mon Sep 17 00:00:00 2001 From: zzz Date: Wed, 14 May 2014 13:49:42 +0000 Subject: [PATCH] * I2CP: Prep for per-message reliability settings (ticket #788) Router side: Store message nonce in ClientMessage, so we may send a MessageStatusMessage with a failure code to the client without sending an ACCEPTED MessageStatusMessage first. All MessageStatusMessages sent in response to outbound messages will now have a valid nonce. --- .../net/i2p/router/ClientManagerFacade.java | 5 ++- .../src/net/i2p/router/ClientMessage.java | 13 +++++++- .../router/client/ClientConnectionRunner.java | 31 +++++++++++++------ .../net/i2p/router/client/ClientManager.java | 26 +++++++++++----- .../client/ClientManagerFacadeImpl.java | 6 ++-- .../dummy/DummyClientManagerFacade.java | 2 +- .../OutboundClientMessageOneShotJob.java | 5 +-- 7 files changed, 65 insertions(+), 23 deletions(-) diff --git a/router/java/src/net/i2p/router/ClientManagerFacade.java b/router/java/src/net/i2p/router/ClientManagerFacade.java index a5e403973..9f6bafbad 100644 --- a/router/java/src/net/i2p/router/ClientManagerFacade.java +++ b/router/java/src/net/i2p/router/ClientManagerFacade.java @@ -70,9 +70,12 @@ public abstract class ClientManagerFacade implements Service { public abstract boolean isLocal(Hash destHash); /** + * @param id the router's ID for this message + * @param messageNonce the client's ID for this message * @param status see I2CP MessageStatusMessage for success/failure codes */ - public abstract void messageDeliveryStatusUpdate(Destination fromDest, MessageId id, int status); + public abstract void messageDeliveryStatusUpdate(Destination fromDest, MessageId id, + long messageNonce, int status); public abstract void messageReceived(ClientMessage msg); diff --git a/router/java/src/net/i2p/router/ClientMessage.java b/router/java/src/net/i2p/router/ClientMessage.java index 42dd8fb77..da23323c8 100644 --- a/router/java/src/net/i2p/router/ClientMessage.java +++ b/router/java/src/net/i2p/router/ClientMessage.java @@ -29,22 +29,26 @@ public class ClientMessage { private final SessionConfig _senderConfig; private final Hash _destinationHash; private final MessageId _messageId; + private final long _messageNonce; private final long _expiration; /** only for outbound messages */ private final int _flags; /** * For outbound (locally originated) + * @param msgID the router's ID for this message + * @param messageNonce the client's ID for this message * @since 0.9.9 */ public ClientMessage(Destination toDest, Payload payload, SessionConfig config, Destination fromDest, - MessageId msgID, long expiration, int flags) { + MessageId msgID, long messageNonce, long expiration, int flags) { _destination = toDest; _destinationHash = null; _payload = payload; _senderConfig = config; _fromDestination = fromDest; _messageId = msgID; + _messageNonce = messageNonce; _expiration = expiration; _flags = flags; } @@ -60,6 +64,7 @@ public class ClientMessage { _senderConfig = null; _fromDestination = null; _messageId = null; + _messageNonce = 0; _expiration = 0; _flags = 0; } @@ -94,6 +99,12 @@ public class ClientMessage { */ public MessageId getMessageId() { return _messageId; } + /** + * Valid for outbound; 0 for inbound. + * @since 0.9.14 + */ + public long getMessageNonce() { return _messageNonce; } + /** * Retrieve the information regarding how the router received this message. Only * messages received from the network will have this information, not locally diff --git a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java index 8f3a216e5..0fcceb23e 100644 --- a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java +++ b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java @@ -66,14 +66,20 @@ class ClientConnectionRunner { /** user's config */ private SessionConfig _config; private String _clientVersion; - /** static mapping of MessageId to Payload, storing messages for retrieval */ + /** + * Mapping of MessageId to Payload, storing messages for retrieval. + * Unused for i2cp.fastReceive = "true" (_dontSendMSMOnRecive = true) + */ private final Map _messages; /** lease set request state, or null if there is no request pending on at the moment */ private LeaseRequestState _leaseRequest; private int _consecutiveLeaseRequestFails; /** currently allocated leaseSet, or null if none is allocated */ private LeaseSet _currentLeaseSet; - /** set of messageIds created but not yet ACCEPTED */ + /** + * Set of messageIds created but not yet ACCEPTED. + * Unused for i2cp.messageReliability = "none" (_dontSendMSM = true) + */ private final Set _acceptedPending; /** thingy that does stuff */ protected I2CPMessageReader _reader; @@ -323,12 +329,14 @@ class ClientConnectionRunner { * * Do not use for status = STATUS_SEND_ACCEPTED; use ackSendMessage() for that. * + * @param id the router's ID for this message + * @param messageNonce the client's ID for this message * @param status see I2CP MessageStatusMessage for success/failure codes */ - void updateMessageDeliveryStatus(MessageId id, int status) { - if (_dead || _dontSendMSM) + void updateMessageDeliveryStatus(MessageId id, long messageNonce, int status) { + if (_dead || messageNonce <= 0) return; - _context.jobQueue().addJob(new MessageDeliveryStatusUpdate(id, status)); + _context.jobQueue().addJob(new MessageDeliveryStatusUpdate(id, messageNonce, status)); } /** @@ -414,7 +422,7 @@ class ClientConnectionRunner { expiration = msg.getExpirationTime(); flags = msg.getFlags(); } - if (message.getNonce() != 0 && !_dontSendMSM) + if ((!_dontSendMSM) && message.getNonce() != 0) _acceptedPending.add(id); if (_log.shouldLog(Log.DEBUG)) @@ -424,7 +432,8 @@ class ClientConnectionRunner { // the following blocks as described above SessionConfig cfg = _config; if (cfg != null) - _manager.distributeMessage(cfg.getDestination(), dest, payload, id, expiration, flags); + _manager.distributeMessage(cfg.getDestination(), dest, payload, + id, message.getNonce(), expiration, flags); // else log error? //long timeToDistribute = _context.clock().now() - beforeDistribute; //if (_log.shouldLog(Log.DEBUG)) @@ -687,6 +696,7 @@ class ClientConnectionRunner { private class MessageDeliveryStatusUpdate extends JobImpl { private final MessageId _messageId; + private final long _messageNonce; private final int _status; private long _lastTried; private int _requeueCount; @@ -694,11 +704,14 @@ class ClientConnectionRunner { /** * Do not use for status = STATUS_SEND_ACCEPTED; use ackSendMessage() for that. * + * @param id the router's ID for this message + * @param messageNonce the client's ID for this message * @param status see I2CP MessageStatusMessage for success/failure codes */ - public MessageDeliveryStatusUpdate(MessageId id, int status) { + public MessageDeliveryStatusUpdate(MessageId id, long messageNonce, int status) { super(ClientConnectionRunner.this._context); _messageId = id; + _messageNonce = messageNonce; _status = status; } @@ -714,7 +727,7 @@ class ClientConnectionRunner { msg.setMessageId(_messageId.getMessageId()); msg.setSessionId(_sessionId.getSessionId()); // has to be >= 0, it is initialized to -1 - msg.setNonce(2); + msg.setNonce(_messageNonce); msg.setSize(0); msg.setStatus(_status); diff --git a/router/java/src/net/i2p/router/client/ClientManager.java b/router/java/src/net/i2p/router/client/ClientManager.java index 9ece4d6c5..96e6ac20b 100644 --- a/router/java/src/net/i2p/router/client/ClientManager.java +++ b/router/java/src/net/i2p/router/client/ClientManager.java @@ -265,9 +265,12 @@ class ClientManager { /** * Distribute message to a local or remote destination. + * @param msgId the router's ID for this message + * @param messageNonce the client's ID for this message * @param flags ignored for local */ - void distributeMessage(Destination fromDest, Destination toDest, Payload payload, MessageId msgId, long expiration, int flags) { + void distributeMessage(Destination fromDest, Destination toDest, Payload payload, + MessageId msgId, long messageNonce, long expiration, int flags) { // check if there is a runner for it ClientConnectionRunner runner = getRunner(toDest); if (runner != null) { @@ -279,7 +282,7 @@ class ClientManager { return; } // TODO can we just run this inline instead? - _ctx.jobQueue().addJob(new DistributeLocal(toDest, runner, sender, fromDest, payload, msgId)); + _ctx.jobQueue().addJob(new DistributeLocal(toDest, runner, sender, fromDest, payload, msgId, messageNonce)); } else { // remote. w00t if (_log.shouldLog(Log.DEBUG)) @@ -291,7 +294,7 @@ class ClientManager { } ClientMessage msg = new ClientMessage(toDest, payload, runner.getConfig(), runner.getConfig().getDestination(), msgId, - expiration, flags); + messageNonce, expiration, flags); _ctx.clientMessagePool().add(msg, true); } } @@ -303,8 +306,14 @@ class ClientManager { private final Destination _fromDest; private final Payload _payload; private final MessageId _msgId; + private final long _messageNonce; - public DistributeLocal(Destination toDest, ClientConnectionRunner to, ClientConnectionRunner from, Destination fromDest, Payload payload, MessageId id) { + /** + * @param msgId the router's ID for this message + * @param messageNonce the client's ID for this message + */ + public DistributeLocal(Destination toDest, ClientConnectionRunner to, ClientConnectionRunner from, + Destination fromDest, Payload payload, MessageId id, long messageNonce) { super(_ctx); _toDest = toDest; _to = to; @@ -312,6 +321,7 @@ class ClientManager { _fromDest = fromDest; _payload = payload; _msgId = id; + _messageNonce = messageNonce; } public String getName() { return "Distribute local message"; } @@ -319,7 +329,7 @@ class ClientManager { public void runJob() { _to.receiveMessage(_toDest, _fromDest, _payload); if (_from != null) { - _from.updateMessageDeliveryStatus(_msgId, MessageStatusMessage.STATUS_SEND_SUCCESS_LOCAL); + _from.updateMessageDeliveryStatus(_msgId, _messageNonce, MessageStatusMessage.STATUS_SEND_SUCCESS_LOCAL); } } } @@ -437,15 +447,17 @@ class ClientManager { } /** + * @param id the router's ID for this message + * @param messageNonce the client's ID for this message * @param status see I2CP MessageStatusMessage for success/failure codes */ - public void messageDeliveryStatusUpdate(Destination fromDest, MessageId id, int status) { + public void messageDeliveryStatusUpdate(Destination fromDest, MessageId id, long messageNonce, int status) { ClientConnectionRunner runner = getRunner(fromDest); if (runner != null) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Delivering status " + status + " to " + fromDest.calculateHash() + " for message " + id); - runner.updateMessageDeliveryStatus(id, status); + runner.updateMessageDeliveryStatus(id, messageNonce, status); } else { if (_log.shouldLog(Log.WARN)) _log.warn("Cannot deliver status " + status + " to " diff --git a/router/java/src/net/i2p/router/client/ClientManagerFacadeImpl.java b/router/java/src/net/i2p/router/client/ClientManagerFacadeImpl.java index 94dbe6d36..6168ab794 100644 --- a/router/java/src/net/i2p/router/client/ClientManagerFacadeImpl.java +++ b/router/java/src/net/i2p/router/client/ClientManagerFacadeImpl.java @@ -179,11 +179,13 @@ public class ClientManagerFacadeImpl extends ClientManagerFacade implements Inte public boolean shouldPublishLeaseSet(Hash destinationHash) { return _manager.shouldPublishLeaseSet(destinationHash); } /** + * @param id the router's ID for this message + * @param messageNonce the client's ID for this message * @param status see I2CP MessageStatusMessage for success/failure codes */ - public void messageDeliveryStatusUpdate(Destination fromDest, MessageId id, int status) { + public void messageDeliveryStatusUpdate(Destination fromDest, MessageId id, long messageNonce, int status) { if (_manager != null) - _manager.messageDeliveryStatusUpdate(fromDest, id, status); + _manager.messageDeliveryStatusUpdate(fromDest, id, messageNonce, status); else _log.error("Null manager on messageDeliveryStatusUpdate!"); } diff --git a/router/java/src/net/i2p/router/dummy/DummyClientManagerFacade.java b/router/java/src/net/i2p/router/dummy/DummyClientManagerFacade.java index c0b476333..ba40badd0 100644 --- a/router/java/src/net/i2p/router/dummy/DummyClientManagerFacade.java +++ b/router/java/src/net/i2p/router/dummy/DummyClientManagerFacade.java @@ -43,7 +43,7 @@ public class DummyClientManagerFacade extends ClientManagerFacade { public void shutdown(String msg) {} public void restart() {} - public void messageDeliveryStatusUpdate(Destination fromDest, MessageId id, int status) {} + public void messageDeliveryStatusUpdate(Destination fromDest, MessageId id, long msgNonce, int status) {} public SessionConfig getClientSessionConfig(Destination _dest) { return null; } public SessionKeyManager getClientSessionKeyManager(Hash _dest) { return null; } diff --git a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java index f41e808e7..9bb7432a6 100644 --- a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java +++ b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java @@ -769,7 +769,8 @@ public class OutboundClientMessageOneShotJob extends JobImpl { clearCaches(); getContext().messageHistory().sendPayloadMessage(_clientMessageId.getMessageId(), false, sendTime); - getContext().clientManager().messageDeliveryStatusUpdate(_from, _clientMessageId, status); + getContext().clientManager().messageDeliveryStatusUpdate(_from, _clientMessageId, + _clientMessage.getMessageNonce(), status); getContext().statManager().updateFrequency("client.sendMessageFailFrequency"); } @@ -917,7 +918,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { //long dataMsgId = _cloveId; // fake ID 99999 getContext().messageHistory().sendPayloadMessage(99999, true, sendTime); - getContext().clientManager().messageDeliveryStatusUpdate(_from, _clientMessageId, + getContext().clientManager().messageDeliveryStatusUpdate(_from, _clientMessageId, _clientMessage.getMessageNonce(), MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS); // unused //_lease.setNumSuccess(_lease.getNumSuccess()+1);