* ElGamal/AES/SessionTag:

- Increase TX expire from 10 to 12 min, while keeping RX expire at 15 min.
    3 minutes should be plenty of clock skew + delay.
  - Move tags-to-send and low-threshold values to be per-SKM
  - New session config options crypto.tagsToSend and crypto.lowTagThreshold
  - Prep for per-packet override of tags and thresholds
  - Cleanups and Javadocs
* I2PTunnel: Add some defaults for the new session config options
* OCMOSJ:
  - Don't bundle LeaseSet just because we're requesting an ACK
  - Changed session config option shouldBundleReplyInfo to default to true
    and be used to disable bundling altogether when set to false.
    Was previously an undocumented option to force bundling with a certain probability.
  - Don't send tags unless we've already generated a reply token (race)
  - Cleanups and Javadocs
This commit is contained in:
zzz
2012-06-24 13:17:52 +00:00
parent 97b05b1dbf
commit e497859587
10 changed files with 302 additions and 165 deletions

View File

@@ -74,6 +74,9 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable {
protected int localPort = DEFAULT_LOCALPORT;
/**
* Warning, blocks in constructor while connecting to router and building tunnels;
* TODO move that to startRunning()
*
* @param privData Base64-encoded private key data,
* format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile}
* @throws IllegalArgumentException if the I2CP configuration is b0rked so
@@ -87,6 +90,9 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable {
}
/**
* Warning, blocks in constructor while connecting to router and building tunnels;
* TODO move that to startRunning()
*
* @param privkey file containing the private key data,
* format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile}
* @param privkeyname the name of the privKey file, not clear why we need this too
@@ -111,6 +117,9 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable {
}
/**
* Warning, blocks in constructor while connecting to router and building tunnels;
* TODO move that to startRunning()
*
* @param privData stream containing the private key data,
* format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile}
* @param privkeyname the name of the privKey file, not clear why we need this too
@@ -124,6 +133,8 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable {
}
/**
* Non-blocking
*
* @param sktMgr the existing socket manager
* @since 0.8.9
*/
@@ -142,6 +153,9 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable {
private static final int MAX_RETRIES = 4;
/**
* Warning, blocks while connecting to router and building tunnels;
* TODO move that to startRunning()
*
* @param privData stream containing the private key data,
* format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile}
* @param privkeyname the name of the privKey file, not clear why we need this too
@@ -236,6 +250,7 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable {
/**
* Start running the I2PTunnelServer.
*
* TODO: Wait to connect to router until here.
*/
public void startRunning() {
// prevent JVM exit when running outside the router

View File

@@ -156,8 +156,8 @@ public class TunnelController implements Logging {
}
String type = getType();
if ( (type == null) || (type.length() <= 0) ) {
if (_log.shouldLog(Log.WARN))
_log.warn("Cannot start the tunnel - no type specified");
if (_log.shouldLog(Log.ERROR))
_log.error("Cannot start the tunnel - no type specified");
return;
}
// Config options may have changed since instantiation, so do this again.
@@ -455,6 +455,25 @@ public class TunnelController implements Logging {
}
}
_config = props;
// Set up some per-type defaults
// This really isn't the best spot to do this but for servers in particular,
// it's hard to override settings in the subclass since the session connect
// is done in the I2PTunnelServer constructor.
String type = getType();
if (type != null) {
if (type.equals("httpserver") || type.equals("streamrserver")) {
if (!_config.containsKey("option.shouldBundleReplyInfo"))
_config.setProperty("option.shouldBundleReplyInfo", "false");
} else if (type.contains("irc") || type.equals("streamrclient")) {
// maybe a bad idea for ircclient if DCC is enabled
if (!_config.containsKey("option.crypto.tagsToSend"))
_config.setProperty("option.crypto.tagsToSend", "20");
if (!_config.containsKey("option.crypto.lowTagThreshold"))
_config.setProperty("option.crypto.lowTagThreshold", "14");
}
}
// tell i2ptunnel, who will tell the TunnelTask, who will tell the SocketManager
setSessionOptions();
if (_running && _sessions != null) {
@@ -467,6 +486,9 @@ public class TunnelController implements Logging {
}
}
/**
* @return a copy
*/
public Properties getConfig(String prefix) {
Properties rv = new Properties();
for (Map.Entry e : _config.entrySet()) {

View File

@@ -24,14 +24,14 @@ import net.i2p.data.SessionTag;
* unknown (and hence always forces a full ElGamal encryption for each message).
* A more intelligent subclass should manage and persist keys and tags.
*
* TODO if we aren't going to use this for testing, make it abstract.
*/
public class SessionKeyManager {
/** session key managers must be created through an app context */
protected SessionKeyManager(I2PAppContext context) { // nop
}
/** see above */
private SessionKeyManager() { // nop
/**
* Make this public if you need a dummy SessionKeyManager for testing
*/
protected SessionKeyManager(I2PAppContext context) { // nop
}
/**
@@ -86,6 +86,31 @@ public class SessionKeyManager {
return null;
}
/**
* How many to send, IF we need to.
* @since 0.9.1
*/
public int getTagsToSend() { return 0; };
/**
* @since 0.9.1
*/
public int getLowThreshold() { return 0; };
/**
* @return true if we have less than the threshold or what we have is about to expire
* @since 0.9.1
*/
public boolean shouldSendTags(PublicKey target, SessionKey key) {
return shouldSendTags(target, key, getLowThreshold());
}
/**
* @return true if we have less than the threshold or what we have is about to expire
* @since 0.9.1
*/
public boolean shouldSendTags(PublicKey target, SessionKey key, int lowThreshold) { return false; }
/**
* Determine (approximately) how many available session tags for the current target
* have been confirmed and are available

View File

@@ -85,27 +85,74 @@ public class TransientSessionKeyManager extends SessionKeyManager {
/** for debugging */
private final AtomicInteger _rcvTagSetID = new AtomicInteger();
private final AtomicInteger _sentTagSetID = new AtomicInteger();
private final int _tagsToSend;
private final int _lowThreshold;
/**
* Let session tags sit around for 10 minutes before expiring them. We can now have such a large
* Let session tags sit around for this long before expiring them. We can now have such a large
* value since there is the persistent session key manager. This value is for outbound tags -
* inbound tags are managed by SESSION_LIFETIME_MAX_MS
*
*/
public final static long SESSION_TAG_DURATION_MS = 10 * 60 * 1000;
private final static long SESSION_TAG_DURATION_MS = 12 * 60 * 1000;
/**
* Keep unused inbound session tags around for up to 12 minutes (2 minutes longer than
* Keep unused inbound session tags around for this long (a few minutes longer than
* session tags are used on the outbound side so that no reasonable network lag
* can cause failed decrypts)
*
*/
public final static long SESSION_LIFETIME_MAX_MS = SESSION_TAG_DURATION_MS + 5 * 60 * 1000;
private final static long SESSION_LIFETIME_MAX_MS = SESSION_TAG_DURATION_MS + 3 * 60 * 1000;
/**
* Time to send more if we are this close to expiration
*/
private static final long SESSION_TAG_EXPIRATION_WINDOW = 90 * 1000;
/**
* a few MB? how about 16MB!
* This is the max size of _inboundTagSets.
*/
public final static int MAX_INBOUND_SESSION_TAGS = 500 * 1000; // this will consume at most a few MB
/**
* This was 100 since 0.6.1.10 (50 before that). It's important because:
* <pre>
* - Tags are 32 bytes. So it previously added 3200 bytes to an initial message.
* - Too many tags adds a huge overhead to short-duration connections
* (like http, datagrams, etc.)
* - Large messages have a much higher chance of being dropped due to
* one of their 1KB fragments being discarded by a tunnel participant.
* - This reduces the effective maximum datagram size because the client
* doesn't know when tags will be bundled, so the tag size must be
* subtracted from the maximum I2NP size or transport limit.
* </pre>
*
* Issues with too small a value:
* <pre>
* - When tags are sent, a reply leaseset (~1KB) is always bundled.
* Maybe don't need to bundle more than every minute or so
* rather than every time?
* - Does the number of tags (and the threshold of 20) limit the effective
* streaming lib window size? Should the threshold and the number of
* sent tags be variable based on the message rate?
* </pre>
*
* We have to be very careful if we implement an adaptive scheme,
* since the key manager is per-router, not per-local-dest.
* Or maybe that's a bad idea, and we need to move to a per-dest manager.
* This needs further investigation.
*
* So a value somewhat higher than the low threshold
* seems appropriate.
*
* Use care when adjusting these values. See ConnectionOptions in streaming,
* and TransientSessionKeyManager in crypto, for more information.
*
* @since 0.9.1 moved from GarlicMessageBuilder to per-SKM config
*/
public static final int DEFAULT_TAGS = 40;
/** ditto */
public static final int LOW_THRESHOLD = 30;
/**
* The session key manager should only be constructed and accessed through the
* application context. This constructor should only be used by the
@@ -113,11 +160,24 @@ public class TransientSessionKeyManager extends SessionKeyManager {
*
*/
public TransientSessionKeyManager(I2PAppContext context) {
this(context, DEFAULT_TAGS, LOW_THRESHOLD);
}
/**
* @param tagsToSend how many to send at a time, may be lower or higher than lowThreshold. 1-128
* @param lowThreshold below this, send more. 1-128
* @since 0.9.1
*/
public TransientSessionKeyManager(I2PAppContext context, int tagsToSend, int lowThreshold) {
super(context);
if (tagsToSend <= 0 || tagsToSend > 128 || lowThreshold <= 0 || lowThreshold > 128)
throw new IllegalArgumentException();
_tagsToSend = tagsToSend;
_lowThreshold = lowThreshold;
_log = context.logManager().getLog(TransientSessionKeyManager.class);
_context = context;
_outboundSessions = new HashMap(64);
_inboundTagSets = new HashMap(1024);
_inboundTagSets = new HashMap(128);
context.statManager().createRateStat("crypto.sessionTagsExpired", "How many tags/sessions are expired?", "Encryption", new long[] { 10*60*1000, 60*60*1000, 3*60*60*1000 });
context.statManager().createRateStat("crypto.sessionTagsRemaining", "How many tags/sessions are remaining after a cleanup?", "Encryption", new long[] { 10*60*1000, 60*60*1000, 3*60*60*1000 });
_alive = true;
@@ -291,6 +351,31 @@ public class TransientSessionKeyManager extends SessionKeyManager {
return null;
}
/**
* How many to send, IF we need to.
* @return the configured value (not adjusted for current available)
* @since 0.9.1
*/
@Override
public int getTagsToSend() { return _tagsToSend; };
/**
* @return the configured value
* @since 0.9.1
*/
@Override
public int getLowThreshold() { return _lowThreshold; };
/**
* @return true if we have less than the threshold or what we have is about to expire
* @since 0.9.1
*/
@Override
public boolean shouldSendTags(PublicKey target, SessionKey key, int lowThreshold) {
return getAvailableTags(target, key) < lowThreshold ||
getAvailableTimeLeft(target, key) < SESSION_TAG_EXPIRATION_WINDOW;
}
/**
* Determine (approximately) how many available session tags for the current target
* have been confirmed and are available

View File

@@ -17,6 +17,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
@@ -95,6 +96,10 @@ class ClientConnectionRunner {
// e.g. on local access
private static final int MAX_MESSAGE_ID = 0x4000000;
/** @since 0.9.1 */
private static final String PROP_TAGS = "crypto.tagsToSend";
private static final String PROP_THRESH = "crypto.lowTagThreshold";
/**
* Create a new runner against the given socket
*
@@ -200,14 +205,29 @@ class ClientConnectionRunner {
if (_log.shouldLog(Log.DEBUG))
_log.debug("SessionEstablished called for destination " + _destHashCache.toBase64());
_config = config;
// This is the only option that is interpreted here, not at the tunnel manager
if (config.getOptions() != null)
// We process a few options here, but most are handled by the tunnel manager.
// The ones here can't be changed later.
Properties opts = config.getOptions();
if (opts != null)
_dontSendMSM = "none".equals(config.getOptions().getProperty(I2PClient.PROP_RELIABILITY, "").toLowerCase(Locale.US));
// per-destination session key manager to prevent rather easy correlation
if (_sessionKeyManager == null)
_sessionKeyManager = new TransientSessionKeyManager(_context);
else
if (_sessionKeyManager == null) {
int tags = TransientSessionKeyManager.DEFAULT_TAGS;
int thresh = TransientSessionKeyManager.LOW_THRESHOLD;
if (opts != null) {
String ptags = opts.getProperty(PROP_TAGS);
if (ptags != null) {
try { tags = Integer.parseInt(ptags); } catch (NumberFormatException nfe) {}
}
String pthresh = opts.getProperty(PROP_THRESH);
if (pthresh != null) {
try { thresh = Integer.parseInt(pthresh); } catch (NumberFormatException nfe) {}
}
}
_sessionKeyManager = new TransientSessionKeyManager(_context, tags, thresh);
} else {
_log.error("SessionEstablished called for twice for destination " + _destHashCache.toBase64().substring(0,4));
}
_manager.destinationEstablished(this);
}

View File

@@ -298,6 +298,9 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
* Message's Session ID ignored. This doesn't support removing previously set options.
* Nor do we bother with message.getSessionConfig().verifySignature() ... should we?
* Nor is the Date checked.
*
* Note that this does NOT update the few options handled in
* ClientConnectionRunner.sessionEstablished(). Those can't be changed later.
*/
private void handleReconfigureSession(I2CPMessageReader reader, ReconfigureSessionMessage message) {
if (_log.shouldLog(Log.INFO))

View File

@@ -33,52 +33,15 @@ import net.i2p.util.Log;
*/
public class GarlicMessageBuilder {
/**
* This was 100 since 0.6.1.10 (50 before that). It's important because:
* <pre>
* - Tags are 32 bytes. So it previously added 3200 bytes to an initial message.
* - Too many tags adds a huge overhead to short-duration connections
* (like http, datagrams, etc.)
* - Large messages have a much higher chance of being dropped due to
* one of their 1KB fragments being discarded by a tunnel participant.
* - This reduces the effective maximum datagram size because the client
* doesn't know when tags will be bundled, so the tag size must be
* subtracted from the maximum I2NP size or transport limit.
* </pre>
*
* Issues with too small a value:
* <pre>
* - When tags are sent, a reply leaseset (~1KB) is always bundled.
* Maybe don't need to bundle more than every minute or so
* rather than every time?
* - Does the number of tags (and the threshold of 20) limit the effective
* streaming lib window size? Should the threshold and the number of
* sent tags be variable based on the message rate?
* </pre>
*
* We have to be very careful if we implement an adaptive scheme,
* since the key manager is per-router, not per-local-dest.
* Or maybe that's a bad idea, and we need to move to a per-dest manager.
* This needs further investigation.
*
* So a value somewhat higher than the low threshold
* seems appropriate.
*
* Use care when adjusting these values. See ConnectionOptions in streaming,
* and TransientSessionKeyManager in crypto, for more information.
*/
private static final int DEFAULT_TAGS = 40;
private static final int LOW_THRESHOLD = 30;
/** @param local non-null; do not use this method for the router's SessionKeyManager */
public static int estimateAvailableTags(RouterContext ctx, PublicKey key, Hash local) {
public static boolean needsTags(RouterContext ctx, PublicKey key, Hash local) {
SessionKeyManager skm = ctx.clientManager().getClientSessionKeyManager(local);
if (skm == null)
return 0;
return true;
SessionKey curKey = skm.getCurrentKey(key);
if (curKey == null)
return 0;
return skm.getAvailableTags(key, curKey);
return true;
return skm.shouldSendTags(key, curKey);
}
/**
@@ -100,17 +63,19 @@ public class GarlicMessageBuilder {
}
/**
* called by OCMJH
* Now unused, since we have to generate a reply token first in OCMOSJ but we don't know if tags are required yet.
*
* @param ctx scope
* @param config how/what to wrap
* @param wrappedKey output parameter that will be filled with the sessionKey used
* @param wrappedTags output parameter that will be filled with the sessionTags used
* @param wrappedTags Output parameter that will be filled with the sessionTags used.
If non-empty on return you must call skm.tagsDelivered() when sent
and then call skm.tagsAcked() or skm.failTags() later.
* @param skm non-null
*/
public static GarlicMessage buildMessage(RouterContext ctx, GarlicConfig config, SessionKey wrappedKey, Set<SessionTag> wrappedTags,
SessionKeyManager skm) {
return buildMessage(ctx, config, wrappedKey, wrappedTags, DEFAULT_TAGS, false, skm);
return buildMessage(ctx, config, wrappedKey, wrappedTags, skm.getTagsToSend(), skm);
}
/** unused */
@@ -122,33 +87,42 @@ public class GarlicMessageBuilder {
***/
/**
* called by above
* called by OCMJH
*
* @param ctx scope
* @param config how/what to wrap
* @param wrappedKey output parameter that will be filled with the sessionKey used
* @param wrappedTags output parameter that will be filled with the sessionTags used
* @param numTagsToDeliver only if the estimated available tags are below the threshold
* @param wrappedTags Output parameter that will be filled with the sessionTags used.
If non-empty on return you must call skm.tagsDelivered() when sent
and then call skm.tagsAcked() or skm.failTags() later.
* @param numTagsToDeliver Only if the estimated available tags are below the threshold.
Set to zero to disable tag delivery. You must set to zero if you are not
equipped to confirm delivery and call skm.tagsAcked() or skm.failTags() later.
* @param skm non-null
*/
private static GarlicMessage buildMessage(RouterContext ctx, GarlicConfig config, SessionKey wrappedKey, Set<SessionTag> wrappedTags,
int numTagsToDeliver, boolean forceElGamal, SessionKeyManager skm) {
return buildMessage(ctx, config, wrappedKey, wrappedTags, numTagsToDeliver, LOW_THRESHOLD, false, skm);
public static GarlicMessage buildMessage(RouterContext ctx, GarlicConfig config, SessionKey wrappedKey, Set<SessionTag> wrappedTags,
int numTagsToDeliver, SessionKeyManager skm) {
return buildMessage(ctx, config, wrappedKey, wrappedTags, numTagsToDeliver, skm.getLowThreshold(), skm);
}
/**
* called by netdb
* called by netdb and above
*
* @param ctx scope
* @param config how/what to wrap
* @param wrappedKey output parameter that will be filled with the sessionKey used
* @param wrappedTags output parameter that will be filled with the sessionTags used
* @param numTagsToDeliver only if the estimated available tags are below the threshold
* @param wrappedTags Output parameter that will be filled with the sessionTags used.
If non-empty on return you must call skm.tagsDelivered() when sent
and then call skm.tagsAcked() or skm.failTags() later.
* @param numTagsToDeliver only if the estimated available tags are below the threshold.
Set to zero to disable tag delivery. You must set to zero if you are not
equipped to confirm delivery and call skm.tagsAcked() or failTags() later.
If this is always 0, it forces ElGamal every time.
* @param lowTagsThreshold the threshold
* @param skm non-null
*/
public static GarlicMessage buildMessage(RouterContext ctx, GarlicConfig config, SessionKey wrappedKey, Set<SessionTag> wrappedTags,
int numTagsToDeliver, int lowTagsThreshold, boolean forceElGamal, SessionKeyManager skm) {
int numTagsToDeliver, int lowTagsThreshold, SessionKeyManager skm) {
Log log = ctx.logManager().getLog(GarlicMessageBuilder.class);
PublicKey key = config.getRecipientPublicKey();
if (key == null) {
@@ -167,29 +141,19 @@ public class GarlicMessageBuilder {
SessionKey curKey = skm.getCurrentOrNewKey(key);
SessionTag curTag = null;
if (!forceElGamal) {
curTag = skm.consumeNextAvailableTag(key, curKey);
int availTags = skm.getAvailableTags(key, curKey);
if (log.shouldLog(Log.DEBUG))
log.debug("Available tags for encryption to " + key + ": " + availTags);
if (availTags < lowTagsThreshold) { // arbitrary threshold
if (numTagsToDeliver > 0 && skm.shouldSendTags(key, curKey, lowTagsThreshold)) {
for (int i = 0; i < numTagsToDeliver; i++)
wrappedTags.add(new SessionTag(true));
if (log.shouldLog(Log.INFO))
log.info("Too few are available (" + availTags + "), so we're including more");
} else if (skm.getAvailableTimeLeft(key, curKey) < 60*1000) {
// if we have enough tags, but they expire in under 30 seconds, we want more
for (int i = 0; i < numTagsToDeliver; i++)
wrappedTags.add(new SessionTag(true));
if (log.shouldLog(Log.INFO))
log.info("Tags are almost expired, adding new ones");
} else {
// always tack on at least one more - not necessary.
//wrappedTags.add(new SessionTag(true));
log.info("Too few tags available so we're including " + numTagsToDeliver);
}
}
wrappedKey.setData(curKey.getData());
@@ -202,7 +166,9 @@ public class GarlicMessageBuilder {
* @param ctx scope
* @param config how/what to wrap
* @param wrappedKey unused - why??
* @param wrappedTags output parameter that will be filled with the sessionTags used
* @param wrappedTags Output parameter that will be filled with the sessionTags used.
If non-empty on return you must call skm.tagsDelivered() when sent
and then call skm.tagsAcked() or skm.failTags() later.
* @param target public key of the location being garlic routed to (may be null if we
* know the encryptKey and encryptTag)
* @param encryptKey sessionKey used to encrypt the current message
@@ -216,7 +182,7 @@ public class GarlicMessageBuilder {
GarlicMessage msg = new GarlicMessage(ctx);
noteWrap(ctx, msg, config);
//noteWrap(ctx, msg, config);
byte cloveSet[] = buildCloveSet(ctx, config);
@@ -239,6 +205,7 @@ public class GarlicMessageBuilder {
return msg;
}
/****
private static void noteWrap(RouterContext ctx, GarlicMessage wrapper, GarlicConfig contained) {
for (int i = 0; i < contained.getCloveCount(); i++) {
GarlicConfig config = contained.getClove(i);
@@ -249,6 +216,7 @@ public class GarlicMessageBuilder {
}
}
}
****/
/**
* Build an unencrypted set of cloves specified by the config.
@@ -303,7 +271,7 @@ public class GarlicMessageBuilder {
private static byte[] buildClove(RouterContext ctx, PayloadGarlicConfig config) throws DataFormatException, IOException {
GarlicClove clove = new GarlicClove(ctx);
clove.setData(config.getPayload());
return buildCommonClove(ctx, clove, config);
return buildCommonClove(clove, config);
}
/**
@@ -328,11 +296,10 @@ public class GarlicMessageBuilder {
if (msg == null)
throw new DataFormatException("Unable to build message from clove config");
clove.setData(msg);
return buildCommonClove(ctx, clove, config);
return buildCommonClove(clove, config);
}
private static byte[] buildCommonClove(RouterContext ctx, GarlicClove clove, GarlicConfig config) throws DataFormatException, IOException {
private static byte[] buildCommonClove(GarlicClove clove, GarlicConfig config) throws DataFormatException, IOException {
clove.setCertificate(config.getCertificate());
clove.setCloveId(config.getId());
clove.setExpiration(new Date(config.getExpiration()));

View File

@@ -50,6 +50,8 @@ class OutboundClientMessageJobHelper {
*
* Unused?
*
* @param wrappedKey output parameter that will be filled with the sessionKey used
* @param wrappedTags output parameter that will be filled with the sessionTags used
* @param bundledReplyLeaseSet if specified, the given LeaseSet will be packaged with the message (allowing
* much faster replies, since their netDb search will return almost instantly)
* @return garlic, or null if no tunnels were found (or other errors)
@@ -68,6 +70,8 @@ class OutboundClientMessageJobHelper {
*
* This is called from OCMOSJ
*
* @param wrappedKey output parameter that will be filled with the sessionKey used
* @param wrappedTags output parameter that will be filled with the sessionTags used
* @return garlic, or null if no tunnels were found (or other errors)
*/
static GarlicMessage createGarlicMessage(RouterContext ctx, long replyToken, long expiration, PublicKey recipientPK,
@@ -79,8 +83,10 @@ class OutboundClientMessageJobHelper {
SessionKeyManager skm = ctx.clientManager().getClientSessionKeyManager(from);
if (skm == null)
return null;
// no use sending tags unless we have a reply token set up already
int tagsToSend = replyToken >= 0 ? skm.getTagsToSend() : 0;
GarlicMessage msg = GarlicMessageBuilder.buildMessage(ctx, config, wrappedKey, wrappedTags,
skm);
tagsToSend, skm);
return msg;
}

View File

@@ -78,27 +78,38 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
private final static long OVERALL_TIMEOUT_MS_MIN = 8*1000;
/**
* If the client's config specifies shouldBundleReplyInfo=true, messages sent from
* that client to any peers will probabalistically include the sending destination's
* current LeaseSet (allowing the recipient to reply without having to do a full
* netDb lookup). This should improve performance during the initial negotiations,
* but is not necessary for communication that isn't bidirectional.
* NOTE: Changed as of 0.9.1.
*
* Defaults to true.
*
* If the client's config specifies shouldBundleReplyInfo=true, messages sent from
* that client to any peers will periodically include the sending destination's
* current LeaseSet (allowing the recipient to reply without having to do a full
* netDb lookup). This should improve performance during the initial negotiations.
*
* For clients that do not publish their LeaseSet, this option must be true
* for any reply to be possible.
*
* Setting to "false" may save significant outbound bandwidth, especially if
* the client is configured with a large number of inbound tunnels (Leases).
* If replies are still required, this may shift the bandwidth burden to
* the far-end client and the floodfill.
*
* There are several cases where "false" is may be appropriate:
* <ul><li>
* Unidirectional communication, no reply required
* <li>
* LeaseSet is published and higher reply latency is acceptable
* <li>
* LeaseSet is published, client is a "server", all connections are inbound
* so the connecting far-end destination obviously has the leaseset already.
* Connections are either short, or it is acceptable for latency on a long-lived
* connection to temporarily increase while the other end re-fetches the LeaseSet
* after expiration.
* HTTP servers may fit these requirements.
* </li></ul>
*/
public static final String BUNDLE_REPLY_LEASESET = "shouldBundleReplyInfo";
/**
* Allow the override of the frequency of bundling the reply info in with a message.
* The client app can specify bundleReplyInfoProbability=80 (for instance) and that
* will cause the router to include the sender's leaseSet with 80% of the messages
* sent to the peer.
*
*/
public static final String BUNDLE_PROBABILITY = "bundleReplyInfoProbability";
/**
* How often do messages include the reply leaseSet (out of every 100 tries).
* Including it each time is probably overkill, but who knows.
*/
private static final int BUNDLE_PROBABILITY_DEFAULT = 100;
private static final int REPLY_REQUEST_INTERVAL = 60*1000;
@@ -212,47 +223,14 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
if (newLS == null)
return null; // punt
if (!force) {
// Don't send it every time unless configured to; default=false
Properties opts = _clientMessage.getSenderConfig().getOptions();
String wantBundle = opts.getProperty(BUNDLE_REPLY_LEASESET, "false");
if ("true".equals(wantBundle)) {
int probability = BUNDLE_PROBABILITY_DEFAULT;
String str = opts.getProperty(BUNDLE_PROBABILITY);
try {
if (str != null)
probability = Integer.parseInt(str);
} catch (NumberFormatException nfe) {
if (_log.shouldLog(Log.WARN))
_log.warn(getJobId() + ": Bundle leaseSet probability overridden incorrectly ["
+ str + "]", nfe);
}
if (probability >= 100)
return newLS; // do this every time so don't worry about cache
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Bundle leaseSet probability is " + probability);
if (probability >= getContext().random().nextInt(100))
force = true; // just add newLS to cache below and return
// fall through to cache check and add
}
}
// If the last leaseSet we sent him is still good, don't bother sending again
LeaseSet ls = _cache.leaseSetCache.put(_hashPair, newLS);
if (!force) {
if (ls != null) {
if (ls.equals(newLS)) {
// still good, send it 10% of the time
// sendACK does 5% random which forces us, good enough
//if (10 >= getContext().random().nextInt(100)) {
// if (_log.shouldLog(Log.INFO))
// _log.info("Found in cache - including reply leaseset for " + _toString);
// return ls;
//} else {
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Found in cache - NOT including reply leaseset for " + _toString);
return null;
//}
} else {
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Expired from cache - reply leaseset for " + _toString);
@@ -441,35 +419,53 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
return;
}
int existingTags = GarlicMessageBuilder.estimateAvailableTags(getContext(), _leaseSet.getEncryptionKey(),
_from.calculateHash());
_outTunnel = selectOutboundTunnel(_to);
// boolean wantACK = _wantACK || existingTags <= 30 || getContext().random().nextInt(100) < 5;
// what's the point of 5% random? possible improvements or replacements:
// DONE (getNextLease() is called before this): wantACK if we changed their inbound lease (getNextLease() sets _wantACK)
// DONE (selectOutboundTunnel() moved above here): wantACK if we changed our outbound tunnel (selectOutboundTunnel() sets _wantACK)
// DONE (added new cache): wantACK if we haven't in last 1m (requires a new static cache probably)
boolean wantACK;
Long lastSent = _cache.lastReplyRequestCache.get(_hashPair);
wantACK = _wantACK || existingTags <= 30 ||
lastSent == null || lastSent.longValue() < now - REPLY_REQUEST_INTERVAL;
if (wantACK)
_cache.lastReplyRequestCache.put(_hashPair, Long.valueOf(now));
Long lastReplyRequestSent = _cache.lastReplyRequestCache.get(_hashPair);
boolean shouldRequestReply = lastReplyRequestSent == null ||
lastReplyRequestSent.longValue() < now - REPLY_REQUEST_INTERVAL;
boolean wantACK = _wantACK ||
shouldRequestReply ||
// TODO: check the per-message flags also
GarlicMessageBuilder.needsTags(getContext(), _leaseSet.getEncryptionKey(), _from.calculateHash());
PublicKey key = _leaseSet.getEncryptionKey();
SessionKey sessKey = new SessionKey();
Set<SessionTag> tags = new HashSet();
// If we want an ack, bundle a leaseSet... (so he can get back to us)
LeaseSet replyLeaseSet = getReplyLeaseSet(wantACK);
// ... and vice versa (so we know he got it)
if (replyLeaseSet != null)
wantACK = true;
long token = (wantACK ? getContext().random().nextLong(I2NPMessage.MAX_ID_VALUE) : -1);
if (wantACK)
_inTunnel = selectInboundTunnel();
boolean ok = (_clientMessage != null) && buildClove();
LeaseSet replyLeaseSet;
// TODO: check the per-message flags also
String allow = _clientMessage.getSenderConfig().getOptions().getProperty(BUNDLE_REPLY_LEASESET);
boolean allowLeaseBundle = allow == null || Boolean.valueOf(allow).booleanValue();
if (allowLeaseBundle) {
// If we want an ack, bundle a leaseSet...
//replyLeaseSet = getReplyLeaseSet(wantACK);
// Only when necessary. We don't need to force.
// ACKs find their own way back, they don't need a leaseset.
replyLeaseSet = getReplyLeaseSet(false);
// ... and vice versa (so we know he got it)
if (replyLeaseSet != null)
wantACK = true;
} else {
replyLeaseSet = null;
}
long token;
if (wantACK) {
_cache.lastReplyRequestCache.put(_hashPair, Long.valueOf(now));
token = getContext().random().nextLong(I2NPMessage.MAX_ID_VALUE);
_inTunnel = selectInboundTunnel();
} else {
token = -1;
}
boolean ok = buildClove();
if (!ok) {
dieFatal();
return;
@@ -502,12 +498,10 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
ReplySelector selector = null;
if (wantACK) {
TagSetHandle tsh = null;
if ( (sessKey != null) && (tags != null) && (!tags.isEmpty()) ) {
if (_leaseSet != null) {
if (!tags.isEmpty()) {
SessionKeyManager skm = getContext().clientManager().getClientSessionKeyManager(_from.calculateHash());
if (skm != null)
tsh = skm.tagsDelivered(_leaseSet.getEncryptionKey(), sessKey, tags);
}
}
onReply = new SendSuccessJob(getContext(), sessKey, tsh);
onFail = new SendTimeoutJob(getContext(), sessKey, tsh);

View File

@@ -62,7 +62,7 @@ class MessageWrapper {
SessionKey sentKey = new SessionKey();
Set<SessionTag> sentTags = new HashSet();
GarlicMessage msg = GarlicMessageBuilder.buildMessage(ctx, payload, sentKey, sentTags,
NETDB_TAGS_TO_DELIVER, NETDB_LOW_THRESHOLD, false, skm);
NETDB_TAGS_TO_DELIVER, NETDB_LOW_THRESHOLD, skm);
if (msg == null)
return null;
TagSetHandle tsh = null;