stub out flags support in I2CP SMES

This commit is contained in:
zzz
2011-01-14 17:12:44 +00:00
parent c92a8851b2
commit f679ef250b
11 changed files with 345 additions and 75 deletions

View File

@@ -130,19 +130,31 @@ class I2CPMessageProducer {
*/ */
public void sendMessage(I2PSessionImpl session, Destination dest, long nonce, byte[] payload, SessionTag tag, public void sendMessage(I2PSessionImpl session, Destination dest, long nonce, byte[] payload, SessionTag tag,
SessionKey key, Set tags, SessionKey newKey, long expires) throws I2PSessionException { SessionKey key, Set tags, SessionKey newKey, long expires) throws I2PSessionException {
sendMessage(session, dest, nonce, payload, expires, 0);
}
/**
* Package up and send the payload to the router for delivery
* @since 0.8.4
*/
public void sendMessage(I2PSessionImpl session, Destination dest, long nonce, byte[] payload,
long expires, int flags) throws I2PSessionException {
if (!updateBps(payload.length, expires)) if (!updateBps(payload.length, expires))
// drop the message... send fail notification? // drop the message... send fail notification?
return; return;
SendMessageMessage msg; SendMessageMessage msg;
if (expires > 0) { if (expires > 0 || flags > 0) {
msg = new SendMessageExpiresMessage(); SendMessageExpiresMessage smsg = new SendMessageExpiresMessage();
((SendMessageExpiresMessage)msg).setExpiration(new Date(expires)); smsg.setExpiration(expires);
smsg.setFlags(flags);
msg = smsg;
} else } else
msg = new SendMessageMessage(); msg = new SendMessageMessage();
msg.setDestination(dest); msg.setDestination(dest);
msg.setSessionId(session.getSessionId()); msg.setSessionId(session.getSessionId());
msg.setNonce(nonce); msg.setNonce(nonce);
Payload data = createPayload(dest, payload, tag, key, tags, newKey); Payload data = createPayload(dest, payload, null, null, null, null);
msg.setPayload(data); msg.setPayload(data);
session.sendMessage(msg); session.sendMessage(msg);
} }

View File

@@ -21,17 +21,20 @@ import net.i2p.data.SigningPrivateKey;
/** /**
* <p>Define the standard means of sending and receiving messages on the * <p>Define the standard means of sending and receiving messages on the
* I2P network by using the I2CP (the client protocol). This is done over a * I2P network by using the I2CP (the client protocol). This is done over a
* bidirectional TCP socket and never sends any private keys - all end to end * bidirectional TCP socket and never sends any private keys.
* encryption is done transparently within the client's I2PSession *
* itself. Periodically the router will ask the client to authorize a new set of * End to end encryption in I2PSession was disabled in release 0.6.
*
* Periodically the router will ask the client to authorize a new set of
* tunnels to be allocated to the client, which the client can accept by sending a * tunnels to be allocated to the client, which the client can accept by sending a
* {@link net.i2p.data.LeaseSet} signed by the {@link net.i2p.data.Destination}. * {@link net.i2p.data.LeaseSet} signed by the {@link net.i2p.data.Destination}.
* In addition, the router may on occation provide the client with an updated * In addition, the router may on occasion provide the client with an updated
* clock offset so that the client can stay in sync with the network (even if * clock offset so that the client can stay in sync with the network (even if
* the host computer's clock is off).</p> * the host computer's clock is off).</p>
* *
*/ */
public interface I2PSession { public interface I2PSession {
/** Send a new message to the given destination, containing the specified /** Send a new message to the given destination, containing the specified
* payload, returning true if the router feels confident that the message * payload, returning true if the router feels confident that the message
* was delivered. * was delivered.
@@ -40,11 +43,18 @@ public interface I2PSession {
* @return whether it was accepted by the router for delivery or not * @return whether it was accepted by the router for delivery or not
*/ */
public boolean sendMessage(Destination dest, byte[] payload) throws I2PSessionException; public boolean sendMessage(Destination dest, byte[] payload) throws I2PSessionException;
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size) throws I2PSessionException; public boolean sendMessage(Destination dest, byte[] payload, int offset, int size) throws I2PSessionException;
/** See I2PSessionMuxedImpl for details */
/**
* See I2PSessionMuxedImpl for proto/port details.
* @since 0.7.1
*/
public boolean sendMessage(Destination dest, byte[] payload, int proto, int fromport, int toport) throws I2PSessionException; public boolean sendMessage(Destination dest, byte[] payload, int proto, int fromport, int toport) throws I2PSessionException;
/** /**
* End-to-End Crypto is disabled, tags and keys are ignored!
*
* Like sendMessage above, except the key used and the tags sent are exposed to the * Like sendMessage above, except the key used and the tags sent are exposed to the
* application. <p /> * application. <p />
* *
@@ -62,25 +72,62 @@ public interface I2PSession {
* *
* @param dest location to send the message * @param dest location to send the message
* @param payload body of the message to be sent (unencrypted) * @param payload body of the message to be sent (unencrypted)
* @param keyUsed session key delivered to the destination for association with the tags sent. This is essentially * @param keyUsed UNUSED, IGNORED. Session key delivered to the destination for association with the tags sent. This is essentially
* an output parameter - keyUsed.getData() is ignored during this call, but after the call completes, * an output parameter - keyUsed.getData() is ignored during this call, but after the call completes,
* it will be filled with the bytes of the session key delivered. Typically the key delivered is the * it will be filled with the bytes of the session key delivered. Typically the key delivered is the
* same one as the key encrypted with, but not always. If this is null then the key data will not be * same one as the key encrypted with, but not always. If this is null then the key data will not be
* exposed. * exposed.
* @param tagsSent set of tags delivered to the peer and associated with the keyUsed. This is also an output parameter - * @param tagsSent UNUSED, IGNORED. Set of tags delivered to the peer and associated with the keyUsed. This is also an output parameter -
* the contents of the set is ignored during the call, but afterwards it contains a set of SessionTag * the contents of the set is ignored during the call, but afterwards it contains a set of SessionTag
* objects that were sent along side the given keyUsed. * objects that were sent along side the given keyUsed.
*/ */
public boolean sendMessage(Destination dest, byte[] payload, SessionKey keyUsed, Set tagsSent) throws I2PSessionException; public boolean sendMessage(Destination dest, byte[] payload, SessionKey keyUsed, Set tagsSent) throws I2PSessionException;
/**
* End-to-End Crypto is disabled, tags and keys are ignored.
* @param keyUsed UNUSED, IGNORED.
* @param tagsSent UNUSED, IGNORED.
*/
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent) throws I2PSessionException; public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent) throws I2PSessionException;
/**
* End-to-End Crypto is disabled, tags and keys are ignored.
* @param keyUsed UNUSED, IGNORED.
* @param tagsSent UNUSED, IGNORED.
* @since 0.7.1
*/
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expire) throws I2PSessionException; public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expire) throws I2PSessionException;
/** See I2PSessionMuxedImpl for details */
/**
* See I2PSessionMuxedImpl for proto/port details.
* End-to-End Crypto is disabled, tags and keys are ignored.
* @param keyUsed UNUSED, IGNORED.
* @param tagsSent UNUSED, IGNORED.
* @since 0.7.1
*/
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent,
int proto, int fromport, int toport) throws I2PSessionException; int proto, int fromport, int toport) throws I2PSessionException;
/** See I2PSessionMuxedImpl for details */
/**
* See I2PSessionMuxedImpl for proto/port details.
* End-to-End Crypto is disabled, tags and keys are ignored.
* @param keyUsed UNUSED, IGNORED.
* @param tagsSent UNUSED, IGNORED.
* @since 0.7.1
*/
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expire, public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expire,
int proto, int fromport, int toport) throws I2PSessionException; int proto, int fromport, int toport) throws I2PSessionException;
/**
* See I2PSessionMuxedImpl for proto/port details.
* End-to-End Crypto is disabled, tags and keys are ignored.
* @param keyUsed UNUSED, IGNORED.
* @param tagsSent UNUSED, IGNORED.
* @since 0.8.4
*/
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expire,
int proto, int fromport, int toport, int flags) throws I2PSessionException;
/** Receive a message that the router has notified the client about, returning /** Receive a message that the router has notified the client about, returning
* the payload. * the payload.
* @param msgId message to fetch * @param msgId message to fetch
@@ -161,6 +208,7 @@ public interface I2PSession {
/** /**
* Get the current bandwidth limits. Blocking. * Get the current bandwidth limits. Blocking.
* @since 0.8.3
*/ */
public int[] bandwidthLimits() throws I2PSessionException; public int[] bandwidthLimits() throws I2PSessionException;

View File

@@ -438,21 +438,6 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
_producer.reportAbuse(this, msgId, severity); _producer.reportAbuse(this, msgId, severity);
} }
/**
* Send the data to the destination.
* TODO: this currently always returns true, regardless of whether the message was
* delivered successfully. make this wait for at least ACCEPTED
*
*/
public abstract boolean sendMessage(Destination dest, byte[] payload) throws I2PSessionException;
/**
* @param keyUsed unused - no end-to-end crypto
* @param tagsSent unused - no end-to-end crypto
*/
public abstract boolean sendMessage(Destination dest, byte[] payload, SessionKey keyUsed,
Set tagsSent) throws I2PSessionException;
public abstract void receiveStatus(int msgId, long nonce, int status); public abstract void receiveStatus(int msgId, long nonce, int status);
/****** no end-to-end crypto /****** no end-to-end crypto

View File

@@ -130,6 +130,10 @@ class I2PSessionImpl2 extends I2PSessionImpl {
int proto, int fromport, int toport) throws I2PSessionException { int proto, int fromport, int toport) throws I2PSessionException {
throw new IllegalArgumentException("Use MuxedImpl"); throw new IllegalArgumentException("Use MuxedImpl");
} }
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expire,
int proto, int fromport, int toport, int flags) throws I2PSessionException {
throw new IllegalArgumentException("Use MuxedImpl");
}
@Override @Override
public boolean sendMessage(Destination dest, byte[] payload) throws I2PSessionException { public boolean sendMessage(Destination dest, byte[] payload) throws I2PSessionException {
@@ -222,14 +226,23 @@ class I2PSessionImpl2 extends I2PSessionImpl {
private static final int NUM_TAGS = 50; private static final int NUM_TAGS = 50;
/** /**
* TODO - Don't need to save MessageState since actuallyWait is false...
* But for now just use sendNoEffort() instead.
*
* @param keyUsed unused - no end-to-end crypto * @param keyUsed unused - no end-to-end crypto
* @param tagsSent unused - no end-to-end crypto * @param tagsSent unused - no end-to-end crypto
*/ */
protected boolean sendBestEffort(Destination dest, byte payload[], SessionKey keyUsed, Set tagsSent, long expires) protected boolean sendBestEffort(Destination dest, byte payload[], SessionKey keyUsed, Set tagsSent, long expires)
throws I2PSessionException { throws I2PSessionException {
return sendBestEffort(dest, payload, expires, 0);
}
/**
* TODO - Don't need to save MessageState since actuallyWait is false...
* But for now just use sendNoEffort() instead.
*
* @param flags to be passed to the router
* @since 0.8.4
*/
protected boolean sendBestEffort(Destination dest, byte payload[], long expires, int flags)
throws I2PSessionException {
//SessionKey key = null; //SessionKey key = null;
//SessionKey newKey = null; //SessionKey newKey = null;
//SessionTag tag = null; //SessionTag tag = null;
@@ -324,7 +337,7 @@ class I2PSessionImpl2 extends I2PSessionImpl {
+ " sync took " + (inSendingSync-beforeSendingSync) + " sync took " + (inSendingSync-beforeSendingSync)
+ " add took " + (afterSendingSync-inSendingSync)); + " add took " + (afterSendingSync-inSendingSync));
//_producer.sendMessage(this, dest, nonce, payload, tag, key, sentTags, newKey, expires); //_producer.sendMessage(this, dest, nonce, payload, tag, key, sentTags, newKey, expires);
_producer.sendMessage(this, dest, nonce, payload, null, null, null, null, expires); _producer.sendMessage(this, dest, nonce, payload, expires, flags);
// since this is 'best effort', all we're waiting for is a status update // since this is 'best effort', all we're waiting for is a status update
// saying that the router received it - in theory, that should come back // saying that the router received it - in theory, that should come back

View File

@@ -162,12 +162,34 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 implements I2PSession {
* 255 disallowed * 255 disallowed
* @param fromPort 1-65535 or 0 for unset * @param fromPort 1-65535 or 0 for unset
* @param toPort 1-65535 or 0 for unset * @param toPort 1-65535 or 0 for unset
* @since 0.7.1
*/ */
@Override @Override
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, public boolean sendMessage(Destination dest, byte[] payload, int offset, int size,
SessionKey keyUsed, Set tagsSent, long expires, SessionKey keyUsed, Set tagsSent, long expires,
int proto, int fromPort, int toPort) int proto, int fromPort, int toPort)
throws I2PSessionException { throws I2PSessionException {
return sendMessage(dest, payload, offset, size, keyUsed, tagsSent, 0, proto, fromPort, toPort, 0);
}
/**
* @param keyUsed unused - no end-to-end crypto
* @param tagsSent unused - no end-to-end crypto
* @param proto 1-254 or 0 for unset; recommended:
* I2PSession.PROTO_UNSPECIFIED
* I2PSession.PROTO_STREAMING
* I2PSession.PROTO_DATAGRAM
* 255 disallowed
* @param fromPort 1-65535 or 0 for unset
* @param toPort 1-65535 or 0 for unset
* @param flags to be passed to the router
* @since 0.8.4
*/
@Override
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size,
SessionKey keyUsed, Set tagsSent, long expires,
int proto, int fromPort, int toPort, int flags)
throws I2PSessionException {
if (isClosed()) throw new I2PSessionException("Already closed"); if (isClosed()) throw new I2PSessionException("Already closed");
updateActivity(); updateActivity();
@@ -183,7 +205,7 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 implements I2PSession {
_context.statManager().addRateData("i2cp.tx.msgCompressed", payload.length, 0); _context.statManager().addRateData("i2cp.tx.msgCompressed", payload.length, 0);
_context.statManager().addRateData("i2cp.tx.msgExpanded", size, 0); _context.statManager().addRateData("i2cp.tx.msgExpanded", size, 0);
return sendBestEffort(dest, payload, keyUsed, tagsSent, expires); return sendBestEffort(dest, payload, expires, flags);
} }
/** /**

View File

@@ -0,0 +1,139 @@
package net.i2p.data;
/*
* free (adj.): unencumbered; not under the control of others
* Released into the public domain
* with no warranty of any kind, either expressed or implied.
*
*/
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Date;
/**
* A six-byte Date and 2 bytes of flags, since a Date won't encroach
* on the top two bytes until the year 10889.
*
* The flag format is not specified here. The bits may be used in
* an application-specific manner. The application should
* be designed so that a flags value of 0 is the default, for
* compatibility with an 8-byte Date.
*
* If we really need some more bits we could use the first few bits
* of the third byte.
*
* @author zzz
* @since 0.8.4
*/
public class DateAndFlags extends DataStructureImpl {
private int _flags;
private long _date;
public DateAndFlags() {}
/**
* @param flags 0 - 65535
*/
public DateAndFlags(int flags, long date) {
_flags = flags;
_date = date;
}
/**
* @param flags 0 - 65535
*/
public DateAndFlags(int flags, Date date) {
_flags = flags;
_date = date.getTime();
}
public int getFlags() {
return _flags;
}
/**
* @param flags 0 - 65535
*/
public void setFlags(int flags) {
_flags = flags;
}
/**
* The Date object is created here, it is not cached.
* Use getTime() if you only need the long value.
*/
public Date getDate() {
return new Date(_date);
}
public long getTime() {
return (_date);
}
public void setDate(long date) {
_date = date;
}
public void setDate(Date date) {
_date = date.getTime();
}
public void readBytes(InputStream in) throws DataFormatException, IOException {
_flags = (int) DataHelper.readLong(in, 2);
_date = DataHelper.readLong(in, 6);
}
public void writeBytes(OutputStream out) throws DataFormatException, IOException {
DataHelper.writeLong(out, 2, _flags);
DataHelper.writeLong(out, 6, _date);
}
/**
* Overridden for efficiency.
*/
@Override
public byte[] toByteArray() {
byte[] rv = DataHelper.toLong(8, _date);
rv[0] = (byte) ((_flags >> 8) & 0xff);
rv[1] = (byte) (_flags & 0xff);
return rv;
}
/**
* Overridden for efficiency.
* @param data non-null
* @throws DataFormatException if null or wrong length
*/
@Override
public void fromByteArray(byte data[]) throws DataFormatException {
if (data == null) throw new DataFormatException("Null data passed in");
if (data.length != 8) throw new DataFormatException("Bad data length");
_flags = (int) DataHelper.fromLong(data, 0, 2);
_date = DataHelper.fromLong(data, 2, 6);
}
@Override
public boolean equals(Object object) {
if ((object == null) || !(object instanceof DateAndFlags)) return false;
DateAndFlags daf = (DateAndFlags) object;
return _date == daf._date && _flags == daf._flags;
}
@Override
public int hashCode() {
return _flags + (int) _date;
}
@Override
public String toString() {
StringBuilder buf = new StringBuilder(64);
buf.append("[DateAndFlags: ");
buf.append("\n\tDate: ").append((new Date(_date)).toString());
buf.append("\n\tFlags: 0x").append(Integer.toHexString(_flags));
buf.append("]");
return buf.toString();
}
}

View File

@@ -16,32 +16,66 @@ import java.util.Date;
import net.i2p.data.DataFormatException; import net.i2p.data.DataFormatException;
import net.i2p.data.DataHelper; import net.i2p.data.DataHelper;
import net.i2p.data.DateAndFlags;
import net.i2p.data.Destination; import net.i2p.data.Destination;
import net.i2p.data.Payload; import net.i2p.data.Payload;
/** /**
* Same as SendMessageMessage, but with an expiration to be passed to the router * Same as SendMessageMessage, but with an expiration to be passed to the router
* *
* As of 0.8.4, retrofitted to use DateAndFlags. Backwards compatible.
*
* @author zzz * @author zzz
*/ */
public class SendMessageExpiresMessage extends SendMessageMessage { public class SendMessageExpiresMessage extends SendMessageMessage {
/* FIXME hides another field FIXME */ /* FIXME hides another field FIXME */
public final static int MESSAGE_TYPE = 36; public final static int MESSAGE_TYPE = 36;
private SessionId _sessionId; private final DateAndFlags _daf;
private Destination _destination;
private Payload _payload;
private Date _expiration;
public SendMessageExpiresMessage() { public SendMessageExpiresMessage() {
super(); super();
_daf = new DateAndFlags();
} }
/**
* The Date object is created here, it is not cached.
* Use getExpirationTime() if you only need the long value.
*/
public Date getExpiration() { public Date getExpiration() {
return _expiration; return _daf.getDate();
}
/**
* Use this instead of getExpiration().getTime()
* @since 0.8.4
*/
public long getExpirationTime() {
return _daf.getTime();
} }
public void setExpiration(Date d) { public void setExpiration(Date d) {
_expiration = d; _daf.setDate(d);
}
/**
* @since 0.8.4
*/
public void setExpiration(long d) {
_daf.setDate(d);
}
/**
* @since 0.8.4
*/
public int getFlags() {
return _daf.getFlags();
}
/**
* @since 0.8.4
*/
public void setFlags(int f) {
_daf.setFlags(f);
} }
/** /**
@@ -54,7 +88,7 @@ public class SendMessageExpiresMessage extends SendMessageMessage {
super.readMessage(in, length, type); super.readMessage(in, length, type);
try { try {
_expiration = DataHelper.readDate(in); _daf.readBytes(in);
} catch (DataFormatException dfe) { } catch (DataFormatException dfe) {
throw new I2CPMessageException("Unable to load the message data", dfe); throw new I2CPMessageException("Unable to load the message data", dfe);
} }
@@ -68,7 +102,7 @@ public class SendMessageExpiresMessage extends SendMessageMessage {
*/ */
@Override @Override
public void writeMessage(OutputStream out) throws I2CPMessageException, IOException { public void writeMessage(OutputStream out) throws I2CPMessageException, IOException {
if ((getSessionId() == null) || (getDestination() == null) || (getPayload() == null) || (getNonce() <= 0) || (_expiration == null)) if ((getSessionId() == null) || (getDestination() == null) || (getPayload() == null) || (getNonce() <= 0))
throw new I2CPMessageException("Unable to write out the message as there is not enough data"); throw new I2CPMessageException("Unable to write out the message as there is not enough data");
int len = 2 + getDestination().size() + getPayload().getSize() + 4 + 4 + DataHelper.DATE_LENGTH; int len = 2 + getDestination().size() + getPayload().getSize() + 4 + 4 + DataHelper.DATE_LENGTH;
@@ -79,7 +113,7 @@ public class SendMessageExpiresMessage extends SendMessageMessage {
getDestination().writeBytes(out); getDestination().writeBytes(out);
getPayload().writeBytes(out); getPayload().writeBytes(out);
DataHelper.writeLong(out, 4, getNonce()); DataHelper.writeLong(out, 4, getNonce());
DataHelper.writeDate(out, _expiration); _daf.writeBytes(out);
} catch (DataFormatException dfe) { } catch (DataFormatException dfe) {
throw new I2CPMessageException("Error writing the msg", dfe); throw new I2CPMessageException("Error writing the msg", dfe);
} }
@@ -96,7 +130,7 @@ public class SendMessageExpiresMessage extends SendMessageMessage {
if ((object != null) && (object instanceof SendMessageExpiresMessage)) { if ((object != null) && (object instanceof SendMessageExpiresMessage)) {
SendMessageExpiresMessage msg = (SendMessageExpiresMessage) object; SendMessageExpiresMessage msg = (SendMessageExpiresMessage) object;
return super.equals(object) return super.equals(object)
&& DataHelper.eq(getExpiration(), msg.getExpiration()); && _daf.equals(msg._daf);
} }
return false; return false;

View File

@@ -28,16 +28,10 @@ public class ClientMessage {
private Hash _destinationHash; private Hash _destinationHash;
private MessageId _messageId; private MessageId _messageId;
private long _expiration; private long _expiration;
/** only for outbound messages */
private int _flags;
public ClientMessage() { public ClientMessage() {
setPayload(null);
setDestination(null);
setFromDestination(null);
setReceptionInfo(null);
setSenderConfig(null);
setDestinationHash(null);
setMessageId(null);
setExpiration(0);
} }
/** /**
@@ -101,4 +95,17 @@ public class ClientMessage {
*/ */
public long getExpiration() { return _expiration; } public long getExpiration() { return _expiration; }
public void setExpiration(long e) { _expiration = e; } public void setExpiration(long e) { _expiration = e; }
/**
* Flags requested by the client that sent the message. This will only be available
* for locally originated messages.
*
* @since 0.8.4
*/
public int getFlags() { return _flags; }
/**
* @since 0.8.4
*/
public void setFlags(int f) { _flags = f; }
} }

View File

@@ -280,8 +280,12 @@ class ClientConnectionRunner {
MessageId id = new MessageId(); MessageId id = new MessageId();
id.setMessageId(getNextMessageId()); id.setMessageId(getNextMessageId());
long expiration = 0; long expiration = 0;
if (message instanceof SendMessageExpiresMessage) int flags = 0;
expiration = ((SendMessageExpiresMessage) message).getExpiration().getTime(); if (message.getType() == SendMessageExpiresMessage.MESSAGE_TYPE) {
SendMessageExpiresMessage msg = (SendMessageExpiresMessage) message;
expiration = msg.getExpirationTime();
flags = msg.getFlags();
}
if (!_dontSendMSM) if (!_dontSendMSM)
_acceptedPending.add(id); _acceptedPending.add(id);
@@ -289,16 +293,17 @@ class ClientConnectionRunner {
_log.debug("** Receiving message [" + id.getMessageId() + "] with payload of size [" _log.debug("** Receiving message [" + id.getMessageId() + "] with payload of size ["
+ payload.getSize() + "]" + " for session [" + _sessionId.getSessionId() + payload.getSize() + "]" + " for session [" + _sessionId.getSessionId()
+ "]"); + "]");
long beforeDistribute = _context.clock().now(); //long beforeDistribute = _context.clock().now();
// the following blocks as described above // the following blocks as described above
SessionConfig cfg = _config; SessionConfig cfg = _config;
if (cfg != null) if (cfg != null)
_manager.distributeMessage(cfg.getDestination(), dest, payload, id, expiration); _manager.distributeMessage(cfg.getDestination(), dest, payload, id, expiration, flags);
long timeToDistribute = _context.clock().now() - beforeDistribute; // else log error?
if (_log.shouldLog(Log.DEBUG)) //long timeToDistribute = _context.clock().now() - beforeDistribute;
_log.warn("Time to distribute in the manager to " //if (_log.shouldLog(Log.DEBUG))
+ dest.calculateHash().toBase64() + ": " // _log.warn("Time to distribute in the manager to "
+ timeToDistribute); // + dest.calculateHash().toBase64() + ": "
// + timeToDistribute);
return id; return id;
} }

View File

@@ -193,7 +193,11 @@ class ClientManager {
} }
} }
void distributeMessage(Destination fromDest, Destination toDest, Payload payload, MessageId msgId, long expiration) { /**
* Distribute message to a local or remote destination.
* @param flags ignored for local
*/
void distributeMessage(Destination fromDest, Destination toDest, Payload payload, MessageId msgId, long expiration, int flags) {
// check if there is a runner for it // check if there is a runner for it
ClientConnectionRunner runner = getRunner(toDest); ClientConnectionRunner runner = getRunner(toDest);
if (runner != null) { if (runner != null) {
@@ -204,6 +208,7 @@ class ClientManager {
// sender went away // sender went away
return; return;
} }
// TODO can we just run this inline instead?
_ctx.jobQueue().addJob(new DistributeLocal(toDest, runner, sender, fromDest, payload, msgId)); _ctx.jobQueue().addJob(new DistributeLocal(toDest, runner, sender, fromDest, payload, msgId));
} else { } else {
// remote. w00t // remote. w00t
@@ -217,22 +222,22 @@ class ClientManager {
ClientMessage msg = new ClientMessage(); ClientMessage msg = new ClientMessage();
msg.setDestination(toDest); msg.setDestination(toDest);
msg.setPayload(payload); msg.setPayload(payload);
msg.setReceptionInfo(null);
msg.setSenderConfig(runner.getConfig()); msg.setSenderConfig(runner.getConfig());
msg.setFromDestination(runner.getConfig().getDestination()); msg.setFromDestination(runner.getConfig().getDestination());
msg.setMessageId(msgId); msg.setMessageId(msgId);
msg.setExpiration(expiration); msg.setExpiration(expiration);
msg.setFlags(flags);
_ctx.clientMessagePool().add(msg, true); _ctx.clientMessagePool().add(msg, true);
} }
} }
private class DistributeLocal extends JobImpl { private class DistributeLocal extends JobImpl {
private Destination _toDest; private final Destination _toDest;
private ClientConnectionRunner _to; private final ClientConnectionRunner _to;
private ClientConnectionRunner _from; private final ClientConnectionRunner _from;
private Destination _fromDest; private final Destination _fromDest;
private Payload _payload; private final Payload _payload;
private MessageId _msgId; private final MessageId _msgId;
public DistributeLocal(Destination toDest, ClientConnectionRunner to, ClientConnectionRunner from, Destination fromDest, Payload payload, MessageId id) { public DistributeLocal(Destination toDest, ClientConnectionRunner to, ClientConnectionRunner from, Destination fromDest, Payload payload, MessageId id) {
super(_ctx); super(_ctx);

View File

@@ -47,21 +47,21 @@ import net.i2p.util.SimpleTimer;
* *
*/ */
public class OutboundClientMessageOneShotJob extends JobImpl { public class OutboundClientMessageOneShotJob extends JobImpl {
private Log _log; private final Log _log;
private long _overallExpiration; private long _overallExpiration;
private ClientMessage _clientMessage; private ClientMessage _clientMessage;
private MessageId _clientMessageId; private final MessageId _clientMessageId;
private int _clientMessageSize; private final int _clientMessageSize;
private Destination _from; private final Destination _from;
private Destination _to; private final Destination _to;
private String _toString; private final String _toString;
/** target destination's leaseSet, if known */ /** target destination's leaseSet, if known */
private LeaseSet _leaseSet; private LeaseSet _leaseSet;
/** Actual lease the message is being routed through */ /** Actual lease the message is being routed through */
private Lease _lease; private Lease _lease;
private PayloadGarlicConfig _clove; private PayloadGarlicConfig _clove;
private long _cloveId; private long _cloveId;
private long _start; private final long _start;
private boolean _finished; private boolean _finished;
private long _leaseSetLookupBegin; private long _leaseSetLookupBegin;
private TunnelInfo _outTunnel; private TunnelInfo _outTunnel;