From f679ef250bcde9616793f49b8969b5e15a03fa5d Mon Sep 17 00:00:00 2001 From: zzz Date: Fri, 14 Jan 2011 17:12:44 +0000 Subject: [PATCH] stub out flags support in I2CP SMES --- .../net/i2p/client/I2CPMessageProducer.java | 20 ++- core/java/src/net/i2p/client/I2PSession.java | 66 +++++++-- .../src/net/i2p/client/I2PSessionImpl.java | 15 -- .../src/net/i2p/client/I2PSessionImpl2.java | 21 ++- .../net/i2p/client/I2PSessionMuxedImpl.java | 24 ++- core/java/src/net/i2p/data/DateAndFlags.java | 139 ++++++++++++++++++ .../data/i2cp/SendMessageExpiresMessage.java | 54 +++++-- .../src/net/i2p/router/ClientMessage.java | 23 ++- .../router/client/ClientConnectionRunner.java | 23 +-- .../net/i2p/router/client/ClientManager.java | 21 ++- .../OutboundClientMessageOneShotJob.java | 14 +- 11 files changed, 345 insertions(+), 75 deletions(-) create mode 100644 core/java/src/net/i2p/data/DateAndFlags.java diff --git a/core/java/src/net/i2p/client/I2CPMessageProducer.java b/core/java/src/net/i2p/client/I2CPMessageProducer.java index f9840c5c3..336c858ca 100644 --- a/core/java/src/net/i2p/client/I2CPMessageProducer.java +++ b/core/java/src/net/i2p/client/I2CPMessageProducer.java @@ -130,19 +130,31 @@ class I2CPMessageProducer { */ public void sendMessage(I2PSessionImpl session, Destination dest, long nonce, byte[] payload, SessionTag tag, SessionKey key, Set tags, SessionKey newKey, long expires) throws I2PSessionException { + sendMessage(session, dest, nonce, payload, expires, 0); + } + + /** + * Package up and send the payload to the router for delivery + * @since 0.8.4 + */ + public void sendMessage(I2PSessionImpl session, Destination dest, long nonce, byte[] payload, + long expires, int flags) throws I2PSessionException { + if (!updateBps(payload.length, expires)) // drop the message... send fail notification? return; SendMessageMessage msg; - if (expires > 0) { - msg = new SendMessageExpiresMessage(); - ((SendMessageExpiresMessage)msg).setExpiration(new Date(expires)); + if (expires > 0 || flags > 0) { + SendMessageExpiresMessage smsg = new SendMessageExpiresMessage(); + smsg.setExpiration(expires); + smsg.setFlags(flags); + msg = smsg; } else msg = new SendMessageMessage(); msg.setDestination(dest); msg.setSessionId(session.getSessionId()); msg.setNonce(nonce); - Payload data = createPayload(dest, payload, tag, key, tags, newKey); + Payload data = createPayload(dest, payload, null, null, null, null); msg.setPayload(data); session.sendMessage(msg); } diff --git a/core/java/src/net/i2p/client/I2PSession.java b/core/java/src/net/i2p/client/I2PSession.java index 27138b884..567c9e521 100644 --- a/core/java/src/net/i2p/client/I2PSession.java +++ b/core/java/src/net/i2p/client/I2PSession.java @@ -21,17 +21,20 @@ import net.i2p.data.SigningPrivateKey; /** *

Define the standard means of sending and receiving messages on the * I2P network by using the I2CP (the client protocol). This is done over a - * bidirectional TCP socket and never sends any private keys - all end to end - * encryption is done transparently within the client's I2PSession - * itself. Periodically the router will ask the client to authorize a new set of + * bidirectional TCP socket and never sends any private keys. + * + * End to end encryption in I2PSession was disabled in release 0.6. + * + * Periodically the router will ask the client to authorize a new set of * tunnels to be allocated to the client, which the client can accept by sending a * {@link net.i2p.data.LeaseSet} signed by the {@link net.i2p.data.Destination}. - * In addition, the router may on occation provide the client with an updated + * In addition, the router may on occasion provide the client with an updated * clock offset so that the client can stay in sync with the network (even if * the host computer's clock is off).

* */ public interface I2PSession { + /** Send a new message to the given destination, containing the specified * payload, returning true if the router feels confident that the message * was delivered. @@ -40,11 +43,18 @@ public interface I2PSession { * @return whether it was accepted by the router for delivery or not */ public boolean sendMessage(Destination dest, byte[] payload) throws I2PSessionException; + public boolean sendMessage(Destination dest, byte[] payload, int offset, int size) throws I2PSessionException; - /** See I2PSessionMuxedImpl for details */ + + /** + * See I2PSessionMuxedImpl for proto/port details. + * @since 0.7.1 + */ public boolean sendMessage(Destination dest, byte[] payload, int proto, int fromport, int toport) throws I2PSessionException; /** + * End-to-End Crypto is disabled, tags and keys are ignored! + * * Like sendMessage above, except the key used and the tags sent are exposed to the * application.

* @@ -62,25 +72,62 @@ public interface I2PSession { * * @param dest location to send the message * @param payload body of the message to be sent (unencrypted) - * @param keyUsed session key delivered to the destination for association with the tags sent. This is essentially + * @param keyUsed UNUSED, IGNORED. Session key delivered to the destination for association with the tags sent. This is essentially * an output parameter - keyUsed.getData() is ignored during this call, but after the call completes, * it will be filled with the bytes of the session key delivered. Typically the key delivered is the * same one as the key encrypted with, but not always. If this is null then the key data will not be * exposed. - * @param tagsSent set of tags delivered to the peer and associated with the keyUsed. This is also an output parameter - + * @param tagsSent UNUSED, IGNORED. Set of tags delivered to the peer and associated with the keyUsed. This is also an output parameter - * the contents of the set is ignored during the call, but afterwards it contains a set of SessionTag * objects that were sent along side the given keyUsed. */ public boolean sendMessage(Destination dest, byte[] payload, SessionKey keyUsed, Set tagsSent) throws I2PSessionException; + + /** + * End-to-End Crypto is disabled, tags and keys are ignored. + * @param keyUsed UNUSED, IGNORED. + * @param tagsSent UNUSED, IGNORED. + */ public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent) throws I2PSessionException; + + /** + * End-to-End Crypto is disabled, tags and keys are ignored. + * @param keyUsed UNUSED, IGNORED. + * @param tagsSent UNUSED, IGNORED. + * @since 0.7.1 + */ public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expire) throws I2PSessionException; - /** See I2PSessionMuxedImpl for details */ + + /** + * See I2PSessionMuxedImpl for proto/port details. + * End-to-End Crypto is disabled, tags and keys are ignored. + * @param keyUsed UNUSED, IGNORED. + * @param tagsSent UNUSED, IGNORED. + * @since 0.7.1 + */ public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, int proto, int fromport, int toport) throws I2PSessionException; - /** See I2PSessionMuxedImpl for details */ + + /** + * See I2PSessionMuxedImpl for proto/port details. + * End-to-End Crypto is disabled, tags and keys are ignored. + * @param keyUsed UNUSED, IGNORED. + * @param tagsSent UNUSED, IGNORED. + * @since 0.7.1 + */ public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expire, int proto, int fromport, int toport) throws I2PSessionException; + /** + * See I2PSessionMuxedImpl for proto/port details. + * End-to-End Crypto is disabled, tags and keys are ignored. + * @param keyUsed UNUSED, IGNORED. + * @param tagsSent UNUSED, IGNORED. + * @since 0.8.4 + */ + public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expire, + int proto, int fromport, int toport, int flags) throws I2PSessionException; + /** Receive a message that the router has notified the client about, returning * the payload. * @param msgId message to fetch @@ -161,6 +208,7 @@ public interface I2PSession { /** * Get the current bandwidth limits. Blocking. + * @since 0.8.3 */ public int[] bandwidthLimits() throws I2PSessionException; diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java index b904b121d..dc7e875d9 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl.java @@ -438,21 +438,6 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa _producer.reportAbuse(this, msgId, severity); } - /** - * Send the data to the destination. - * TODO: this currently always returns true, regardless of whether the message was - * delivered successfully. make this wait for at least ACCEPTED - * - */ - public abstract boolean sendMessage(Destination dest, byte[] payload) throws I2PSessionException; - - /** - * @param keyUsed unused - no end-to-end crypto - * @param tagsSent unused - no end-to-end crypto - */ - public abstract boolean sendMessage(Destination dest, byte[] payload, SessionKey keyUsed, - Set tagsSent) throws I2PSessionException; - public abstract void receiveStatus(int msgId, long nonce, int status); /****** no end-to-end crypto diff --git a/core/java/src/net/i2p/client/I2PSessionImpl2.java b/core/java/src/net/i2p/client/I2PSessionImpl2.java index f0000b68c..3af551eaa 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl2.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl2.java @@ -130,6 +130,10 @@ class I2PSessionImpl2 extends I2PSessionImpl { int proto, int fromport, int toport) throws I2PSessionException { throw new IllegalArgumentException("Use MuxedImpl"); } + public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expire, + int proto, int fromport, int toport, int flags) throws I2PSessionException { + throw new IllegalArgumentException("Use MuxedImpl"); + } @Override public boolean sendMessage(Destination dest, byte[] payload) throws I2PSessionException { @@ -222,14 +226,23 @@ class I2PSessionImpl2 extends I2PSessionImpl { private static final int NUM_TAGS = 50; /** - * TODO - Don't need to save MessageState since actuallyWait is false... - * But for now just use sendNoEffort() instead. - * * @param keyUsed unused - no end-to-end crypto * @param tagsSent unused - no end-to-end crypto */ protected boolean sendBestEffort(Destination dest, byte payload[], SessionKey keyUsed, Set tagsSent, long expires) throws I2PSessionException { + return sendBestEffort(dest, payload, expires, 0); + } + + /** + * TODO - Don't need to save MessageState since actuallyWait is false... + * But for now just use sendNoEffort() instead. + * + * @param flags to be passed to the router + * @since 0.8.4 + */ + protected boolean sendBestEffort(Destination dest, byte payload[], long expires, int flags) + throws I2PSessionException { //SessionKey key = null; //SessionKey newKey = null; //SessionTag tag = null; @@ -324,7 +337,7 @@ class I2PSessionImpl2 extends I2PSessionImpl { + " sync took " + (inSendingSync-beforeSendingSync) + " add took " + (afterSendingSync-inSendingSync)); //_producer.sendMessage(this, dest, nonce, payload, tag, key, sentTags, newKey, expires); - _producer.sendMessage(this, dest, nonce, payload, null, null, null, null, expires); + _producer.sendMessage(this, dest, nonce, payload, expires, flags); // since this is 'best effort', all we're waiting for is a status update // saying that the router received it - in theory, that should come back diff --git a/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java b/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java index c8ddaf77b..fcf11d0da 100644 --- a/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java @@ -162,12 +162,34 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 implements I2PSession { * 255 disallowed * @param fromPort 1-65535 or 0 for unset * @param toPort 1-65535 or 0 for unset + * @since 0.7.1 */ @Override public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expires, int proto, int fromPort, int toPort) throws I2PSessionException { + return sendMessage(dest, payload, offset, size, keyUsed, tagsSent, 0, proto, fromPort, toPort, 0); + } + + /** + * @param keyUsed unused - no end-to-end crypto + * @param tagsSent unused - no end-to-end crypto + * @param proto 1-254 or 0 for unset; recommended: + * I2PSession.PROTO_UNSPECIFIED + * I2PSession.PROTO_STREAMING + * I2PSession.PROTO_DATAGRAM + * 255 disallowed + * @param fromPort 1-65535 or 0 for unset + * @param toPort 1-65535 or 0 for unset + * @param flags to be passed to the router + * @since 0.8.4 + */ + @Override + public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, + SessionKey keyUsed, Set tagsSent, long expires, + int proto, int fromPort, int toPort, int flags) + throws I2PSessionException { if (isClosed()) throw new I2PSessionException("Already closed"); updateActivity(); @@ -183,7 +205,7 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 implements I2PSession { _context.statManager().addRateData("i2cp.tx.msgCompressed", payload.length, 0); _context.statManager().addRateData("i2cp.tx.msgExpanded", size, 0); - return sendBestEffort(dest, payload, keyUsed, tagsSent, expires); + return sendBestEffort(dest, payload, expires, flags); } /** diff --git a/core/java/src/net/i2p/data/DateAndFlags.java b/core/java/src/net/i2p/data/DateAndFlags.java new file mode 100644 index 000000000..70f88ce65 --- /dev/null +++ b/core/java/src/net/i2p/data/DateAndFlags.java @@ -0,0 +1,139 @@ +package net.i2p.data; + +/* + * free (adj.): unencumbered; not under the control of others + * Released into the public domain + * with no warranty of any kind, either expressed or implied. + * + */ + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Date; + +/** + * A six-byte Date and 2 bytes of flags, since a Date won't encroach + * on the top two bytes until the year 10889. + * + * The flag format is not specified here. The bits may be used in + * an application-specific manner. The application should + * be designed so that a flags value of 0 is the default, for + * compatibility with an 8-byte Date. + * + * If we really need some more bits we could use the first few bits + * of the third byte. + * + * @author zzz + * @since 0.8.4 + */ +public class DateAndFlags extends DataStructureImpl { + private int _flags; + private long _date; + + public DateAndFlags() {} + + /** + * @param flags 0 - 65535 + */ + public DateAndFlags(int flags, long date) { + _flags = flags; + _date = date; + } + + /** + * @param flags 0 - 65535 + */ + public DateAndFlags(int flags, Date date) { + _flags = flags; + _date = date.getTime(); + } + + public int getFlags() { + return _flags; + } + + /** + * @param flags 0 - 65535 + */ + public void setFlags(int flags) { + _flags = flags; + } + + /** + * The Date object is created here, it is not cached. + * Use getTime() if you only need the long value. + */ + public Date getDate() { + return new Date(_date); + } + + public long getTime() { + return (_date); + } + + public void setDate(long date) { + _date = date; + } + + public void setDate(Date date) { + _date = date.getTime(); + } + + public void readBytes(InputStream in) throws DataFormatException, IOException { + _flags = (int) DataHelper.readLong(in, 2); + _date = DataHelper.readLong(in, 6); + } + + public void writeBytes(OutputStream out) throws DataFormatException, IOException { + DataHelper.writeLong(out, 2, _flags); + DataHelper.writeLong(out, 6, _date); + } + + /** + * Overridden for efficiency. + */ + @Override + public byte[] toByteArray() { + byte[] rv = DataHelper.toLong(8, _date); + rv[0] = (byte) ((_flags >> 8) & 0xff); + rv[1] = (byte) (_flags & 0xff); + return rv; + } + + /** + * Overridden for efficiency. + * @param data non-null + * @throws DataFormatException if null or wrong length + */ + @Override + public void fromByteArray(byte data[]) throws DataFormatException { + if (data == null) throw new DataFormatException("Null data passed in"); + if (data.length != 8) throw new DataFormatException("Bad data length"); + _flags = (int) DataHelper.fromLong(data, 0, 2); + _date = DataHelper.fromLong(data, 2, 6); + } + + @Override + public boolean equals(Object object) { + if ((object == null) || !(object instanceof DateAndFlags)) return false; + DateAndFlags daf = (DateAndFlags) object; + return _date == daf._date && _flags == daf._flags; + + } + + @Override + public int hashCode() { + return _flags + (int) _date; + } + + @Override + public String toString() { + StringBuilder buf = new StringBuilder(64); + buf.append("[DateAndFlags: "); + buf.append("\n\tDate: ").append((new Date(_date)).toString()); + buf.append("\n\tFlags: 0x").append(Integer.toHexString(_flags)); + buf.append("]"); + return buf.toString(); + } +} diff --git a/core/java/src/net/i2p/data/i2cp/SendMessageExpiresMessage.java b/core/java/src/net/i2p/data/i2cp/SendMessageExpiresMessage.java index 9bcabe2fb..08b31a6e7 100644 --- a/core/java/src/net/i2p/data/i2cp/SendMessageExpiresMessage.java +++ b/core/java/src/net/i2p/data/i2cp/SendMessageExpiresMessage.java @@ -16,32 +16,66 @@ import java.util.Date; import net.i2p.data.DataFormatException; import net.i2p.data.DataHelper; +import net.i2p.data.DateAndFlags; import net.i2p.data.Destination; import net.i2p.data.Payload; /** * Same as SendMessageMessage, but with an expiration to be passed to the router * + * As of 0.8.4, retrofitted to use DateAndFlags. Backwards compatible. + * * @author zzz */ public class SendMessageExpiresMessage extends SendMessageMessage { /* FIXME hides another field FIXME */ public final static int MESSAGE_TYPE = 36; - private SessionId _sessionId; - private Destination _destination; - private Payload _payload; - private Date _expiration; + private final DateAndFlags _daf; public SendMessageExpiresMessage() { super(); + _daf = new DateAndFlags(); } + /** + * The Date object is created here, it is not cached. + * Use getExpirationTime() if you only need the long value. + */ public Date getExpiration() { - return _expiration; + return _daf.getDate(); + } + + /** + * Use this instead of getExpiration().getTime() + * @since 0.8.4 + */ + public long getExpirationTime() { + return _daf.getTime(); } public void setExpiration(Date d) { - _expiration = d; + _daf.setDate(d); + } + + /** + * @since 0.8.4 + */ + public void setExpiration(long d) { + _daf.setDate(d); + } + + /** + * @since 0.8.4 + */ + public int getFlags() { + return _daf.getFlags(); + } + + /** + * @since 0.8.4 + */ + public void setFlags(int f) { + _daf.setFlags(f); } /** @@ -54,7 +88,7 @@ public class SendMessageExpiresMessage extends SendMessageMessage { super.readMessage(in, length, type); try { - _expiration = DataHelper.readDate(in); + _daf.readBytes(in); } catch (DataFormatException dfe) { throw new I2CPMessageException("Unable to load the message data", dfe); } @@ -68,7 +102,7 @@ public class SendMessageExpiresMessage extends SendMessageMessage { */ @Override public void writeMessage(OutputStream out) throws I2CPMessageException, IOException { - if ((getSessionId() == null) || (getDestination() == null) || (getPayload() == null) || (getNonce() <= 0) || (_expiration == null)) + if ((getSessionId() == null) || (getDestination() == null) || (getPayload() == null) || (getNonce() <= 0)) throw new I2CPMessageException("Unable to write out the message as there is not enough data"); int len = 2 + getDestination().size() + getPayload().getSize() + 4 + 4 + DataHelper.DATE_LENGTH; @@ -79,7 +113,7 @@ public class SendMessageExpiresMessage extends SendMessageMessage { getDestination().writeBytes(out); getPayload().writeBytes(out); DataHelper.writeLong(out, 4, getNonce()); - DataHelper.writeDate(out, _expiration); + _daf.writeBytes(out); } catch (DataFormatException dfe) { throw new I2CPMessageException("Error writing the msg", dfe); } @@ -96,7 +130,7 @@ public class SendMessageExpiresMessage extends SendMessageMessage { if ((object != null) && (object instanceof SendMessageExpiresMessage)) { SendMessageExpiresMessage msg = (SendMessageExpiresMessage) object; return super.equals(object) - && DataHelper.eq(getExpiration(), msg.getExpiration()); + && _daf.equals(msg._daf); } return false; diff --git a/router/java/src/net/i2p/router/ClientMessage.java b/router/java/src/net/i2p/router/ClientMessage.java index ec7820d69..5b5a228a1 100644 --- a/router/java/src/net/i2p/router/ClientMessage.java +++ b/router/java/src/net/i2p/router/ClientMessage.java @@ -28,16 +28,10 @@ public class ClientMessage { private Hash _destinationHash; private MessageId _messageId; private long _expiration; + /** only for outbound messages */ + private int _flags; public ClientMessage() { - setPayload(null); - setDestination(null); - setFromDestination(null); - setReceptionInfo(null); - setSenderConfig(null); - setDestinationHash(null); - setMessageId(null); - setExpiration(0); } /** @@ -101,4 +95,17 @@ public class ClientMessage { */ public long getExpiration() { return _expiration; } public void setExpiration(long e) { _expiration = e; } + + /** + * Flags requested by the client that sent the message. This will only be available + * for locally originated messages. + * + * @since 0.8.4 + */ + public int getFlags() { return _flags; } + + /** + * @since 0.8.4 + */ + public void setFlags(int f) { _flags = f; } } diff --git a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java index 8bef2776d..a73dcde83 100644 --- a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java +++ b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java @@ -280,8 +280,12 @@ class ClientConnectionRunner { MessageId id = new MessageId(); id.setMessageId(getNextMessageId()); long expiration = 0; - if (message instanceof SendMessageExpiresMessage) - expiration = ((SendMessageExpiresMessage) message).getExpiration().getTime(); + int flags = 0; + if (message.getType() == SendMessageExpiresMessage.MESSAGE_TYPE) { + SendMessageExpiresMessage msg = (SendMessageExpiresMessage) message; + expiration = msg.getExpirationTime(); + flags = msg.getFlags(); + } if (!_dontSendMSM) _acceptedPending.add(id); @@ -289,16 +293,17 @@ class ClientConnectionRunner { _log.debug("** Receiving message [" + id.getMessageId() + "] with payload of size [" + payload.getSize() + "]" + " for session [" + _sessionId.getSessionId() + "]"); - long beforeDistribute = _context.clock().now(); + //long beforeDistribute = _context.clock().now(); // the following blocks as described above SessionConfig cfg = _config; if (cfg != null) - _manager.distributeMessage(cfg.getDestination(), dest, payload, id, expiration); - long timeToDistribute = _context.clock().now() - beforeDistribute; - if (_log.shouldLog(Log.DEBUG)) - _log.warn("Time to distribute in the manager to " - + dest.calculateHash().toBase64() + ": " - + timeToDistribute); + _manager.distributeMessage(cfg.getDestination(), dest, payload, id, expiration, flags); + // else log error? + //long timeToDistribute = _context.clock().now() - beforeDistribute; + //if (_log.shouldLog(Log.DEBUG)) + // _log.warn("Time to distribute in the manager to " + // + dest.calculateHash().toBase64() + ": " + // + timeToDistribute); return id; } diff --git a/router/java/src/net/i2p/router/client/ClientManager.java b/router/java/src/net/i2p/router/client/ClientManager.java index bc2d09135..6f4d4414f 100644 --- a/router/java/src/net/i2p/router/client/ClientManager.java +++ b/router/java/src/net/i2p/router/client/ClientManager.java @@ -193,7 +193,11 @@ class ClientManager { } } - void distributeMessage(Destination fromDest, Destination toDest, Payload payload, MessageId msgId, long expiration) { + /** + * Distribute message to a local or remote destination. + * @param flags ignored for local + */ + void distributeMessage(Destination fromDest, Destination toDest, Payload payload, MessageId msgId, long expiration, int flags) { // check if there is a runner for it ClientConnectionRunner runner = getRunner(toDest); if (runner != null) { @@ -204,6 +208,7 @@ class ClientManager { // sender went away return; } + // TODO can we just run this inline instead? _ctx.jobQueue().addJob(new DistributeLocal(toDest, runner, sender, fromDest, payload, msgId)); } else { // remote. w00t @@ -217,22 +222,22 @@ class ClientManager { ClientMessage msg = new ClientMessage(); msg.setDestination(toDest); msg.setPayload(payload); - msg.setReceptionInfo(null); msg.setSenderConfig(runner.getConfig()); msg.setFromDestination(runner.getConfig().getDestination()); msg.setMessageId(msgId); msg.setExpiration(expiration); + msg.setFlags(flags); _ctx.clientMessagePool().add(msg, true); } } private class DistributeLocal extends JobImpl { - private Destination _toDest; - private ClientConnectionRunner _to; - private ClientConnectionRunner _from; - private Destination _fromDest; - private Payload _payload; - private MessageId _msgId; + private final Destination _toDest; + private final ClientConnectionRunner _to; + private final ClientConnectionRunner _from; + private final Destination _fromDest; + private final Payload _payload; + private final MessageId _msgId; public DistributeLocal(Destination toDest, ClientConnectionRunner to, ClientConnectionRunner from, Destination fromDest, Payload payload, MessageId id) { super(_ctx); diff --git a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java index 5b3e3e507..d1e86db95 100644 --- a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java +++ b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java @@ -47,21 +47,21 @@ import net.i2p.util.SimpleTimer; * */ public class OutboundClientMessageOneShotJob extends JobImpl { - private Log _log; + private final Log _log; private long _overallExpiration; private ClientMessage _clientMessage; - private MessageId _clientMessageId; - private int _clientMessageSize; - private Destination _from; - private Destination _to; - private String _toString; + private final MessageId _clientMessageId; + private final int _clientMessageSize; + private final Destination _from; + private final Destination _to; + private final String _toString; /** target destination's leaseSet, if known */ private LeaseSet _leaseSet; /** Actual lease the message is being routed through */ private Lease _lease; private PayloadGarlicConfig _clove; private long _cloveId; - private long _start; + private final long _start; private boolean _finished; private long _leaseSetLookupBegin; private TunnelInfo _outTunnel;