diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java index 059823980..f8fb3a9d5 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java @@ -74,6 +74,9 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { protected int localPort = DEFAULT_LOCALPORT; /** + * Warning, blocks in constructor while connecting to router and building tunnels; + * TODO move that to startRunning() + * * @param privData Base64-encoded private key data, * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} * @throws IllegalArgumentException if the I2CP configuration is b0rked so @@ -87,6 +90,9 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { } /** + * Warning, blocks in constructor while connecting to router and building tunnels; + * TODO move that to startRunning() + * * @param privkey file containing the private key data, * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} * @param privkeyname the name of the privKey file, not clear why we need this too @@ -111,6 +117,9 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { } /** + * Warning, blocks in constructor while connecting to router and building tunnels; + * TODO move that to startRunning() + * * @param privData stream containing the private key data, * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} * @param privkeyname the name of the privKey file, not clear why we need this too @@ -124,6 +133,8 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { } /** + * Non-blocking + * * @param sktMgr the existing socket manager * @since 0.8.9 */ @@ -142,6 +153,9 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { private static final int MAX_RETRIES = 4; /** + * Warning, blocks while connecting to router and building tunnels; + * TODO move that to startRunning() + * * @param privData stream containing the private key data, * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} * @param privkeyname the name of the privKey file, not clear why we need this too @@ -236,6 +250,7 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { /** * Start running the I2PTunnelServer. * + * TODO: Wait to connect to router until here. */ public void startRunning() { // prevent JVM exit when running outside the router diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/TunnelController.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/TunnelController.java index 5baef65c2..61b7c6432 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/TunnelController.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/TunnelController.java @@ -156,8 +156,8 @@ public class TunnelController implements Logging { } String type = getType(); if ( (type == null) || (type.length() <= 0) ) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Cannot start the tunnel - no type specified"); + if (_log.shouldLog(Log.ERROR)) + _log.error("Cannot start the tunnel - no type specified"); return; } // Config options may have changed since instantiation, so do this again. @@ -455,6 +455,25 @@ public class TunnelController implements Logging { } } _config = props; + + // Set up some per-type defaults + // This really isn't the best spot to do this but for servers in particular, + // it's hard to override settings in the subclass since the session connect + // is done in the I2PTunnelServer constructor. + String type = getType(); + if (type != null) { + if (type.equals("httpserver") || type.equals("streamrserver")) { + if (!_config.containsKey("option.shouldBundleReplyInfo")) + _config.setProperty("option.shouldBundleReplyInfo", "false"); + } else if (type.contains("irc") || type.equals("streamrclient")) { + // maybe a bad idea for ircclient if DCC is enabled + if (!_config.containsKey("option.crypto.tagsToSend")) + _config.setProperty("option.crypto.tagsToSend", "20"); + if (!_config.containsKey("option.crypto.lowTagThreshold")) + _config.setProperty("option.crypto.lowTagThreshold", "14"); + } + } + // tell i2ptunnel, who will tell the TunnelTask, who will tell the SocketManager setSessionOptions(); if (_running && _sessions != null) { @@ -467,6 +486,9 @@ public class TunnelController implements Logging { } } + /** + * @return a copy + */ public Properties getConfig(String prefix) { Properties rv = new Properties(); for (Map.Entry e : _config.entrySet()) { diff --git a/core/java/src/net/i2p/crypto/SessionKeyManager.java b/core/java/src/net/i2p/crypto/SessionKeyManager.java index 4a9456f7d..3a96497e2 100644 --- a/core/java/src/net/i2p/crypto/SessionKeyManager.java +++ b/core/java/src/net/i2p/crypto/SessionKeyManager.java @@ -24,14 +24,14 @@ import net.i2p.data.SessionTag; * unknown (and hence always forces a full ElGamal encryption for each message). * A more intelligent subclass should manage and persist keys and tags. * + * TODO if we aren't going to use this for testing, make it abstract. */ public class SessionKeyManager { - /** session key managers must be created through an app context */ - protected SessionKeyManager(I2PAppContext context) { // nop - } - /** see above */ - private SessionKeyManager() { // nop + /** + * Make this public if you need a dummy SessionKeyManager for testing + */ + protected SessionKeyManager(I2PAppContext context) { // nop } /** @@ -86,6 +86,31 @@ public class SessionKeyManager { return null; } + /** + * How many to send, IF we need to. + * @since 0.9.1 + */ + public int getTagsToSend() { return 0; }; + + /** + * @since 0.9.1 + */ + public int getLowThreshold() { return 0; }; + + /** + * @return true if we have less than the threshold or what we have is about to expire + * @since 0.9.1 + */ + public boolean shouldSendTags(PublicKey target, SessionKey key) { + return shouldSendTags(target, key, getLowThreshold()); + } + + /** + * @return true if we have less than the threshold or what we have is about to expire + * @since 0.9.1 + */ + public boolean shouldSendTags(PublicKey target, SessionKey key, int lowThreshold) { return false; } + /** * Determine (approximately) how many available session tags for the current target * have been confirmed and are available diff --git a/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java b/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java index e31adcada..fe0d72778 100644 --- a/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java +++ b/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java @@ -85,27 +85,74 @@ public class TransientSessionKeyManager extends SessionKeyManager { /** for debugging */ private final AtomicInteger _rcvTagSetID = new AtomicInteger(); private final AtomicInteger _sentTagSetID = new AtomicInteger(); + private final int _tagsToSend; + private final int _lowThreshold; /** - * Let session tags sit around for 10 minutes before expiring them. We can now have such a large + * Let session tags sit around for this long before expiring them. We can now have such a large * value since there is the persistent session key manager. This value is for outbound tags - * inbound tags are managed by SESSION_LIFETIME_MAX_MS - * */ - public final static long SESSION_TAG_DURATION_MS = 10 * 60 * 1000; + private final static long SESSION_TAG_DURATION_MS = 12 * 60 * 1000; + /** - * Keep unused inbound session tags around for up to 12 minutes (2 minutes longer than + * Keep unused inbound session tags around for this long (a few minutes longer than * session tags are used on the outbound side so that no reasonable network lag * can cause failed decrypts) - * */ - public final static long SESSION_LIFETIME_MAX_MS = SESSION_TAG_DURATION_MS + 5 * 60 * 1000; + private final static long SESSION_LIFETIME_MAX_MS = SESSION_TAG_DURATION_MS + 3 * 60 * 1000; + + /** + * Time to send more if we are this close to expiration + */ + private static final long SESSION_TAG_EXPIRATION_WINDOW = 90 * 1000; + /** * a few MB? how about 16MB! * This is the max size of _inboundTagSets. */ public final static int MAX_INBOUND_SESSION_TAGS = 500 * 1000; // this will consume at most a few MB + /** + * This was 100 since 0.6.1.10 (50 before that). It's important because: + *
+     *  - Tags are 32 bytes. So it previously added 3200 bytes to an initial message.
+     *  - Too many tags adds a huge overhead to short-duration connections
+     *    (like http, datagrams, etc.)
+     *  - Large messages have a much higher chance of being dropped due to
+     *    one of their 1KB fragments being discarded by a tunnel participant.
+     *  - This reduces the effective maximum datagram size because the client
+     *    doesn't know when tags will be bundled, so the tag size must be
+     *    subtracted from the maximum I2NP size or transport limit.
+     * 
+ * + * Issues with too small a value: + *
+     *  - When tags are sent, a reply leaseset (~1KB) is always bundled.
+     *    Maybe don't need to bundle more than every minute or so
+     *    rather than every time?
+     *  - Does the number of tags (and the threshold of 20) limit the effective
+     *    streaming lib window size? Should the threshold and the number of
+     *    sent tags be variable based on the message rate?
+     * 
+ * + * We have to be very careful if we implement an adaptive scheme, + * since the key manager is per-router, not per-local-dest. + * Or maybe that's a bad idea, and we need to move to a per-dest manager. + * This needs further investigation. + * + * So a value somewhat higher than the low threshold + * seems appropriate. + * + * Use care when adjusting these values. See ConnectionOptions in streaming, + * and TransientSessionKeyManager in crypto, for more information. + * + * @since 0.9.1 moved from GarlicMessageBuilder to per-SKM config + */ + public static final int DEFAULT_TAGS = 40; + /** ditto */ + public static final int LOW_THRESHOLD = 30; + /** * The session key manager should only be constructed and accessed through the * application context. This constructor should only be used by the @@ -113,11 +160,24 @@ public class TransientSessionKeyManager extends SessionKeyManager { * */ public TransientSessionKeyManager(I2PAppContext context) { + this(context, DEFAULT_TAGS, LOW_THRESHOLD); + } + + /** + * @param tagsToSend how many to send at a time, may be lower or higher than lowThreshold. 1-128 + * @param lowThreshold below this, send more. 1-128 + * @since 0.9.1 + */ + public TransientSessionKeyManager(I2PAppContext context, int tagsToSend, int lowThreshold) { super(context); + if (tagsToSend <= 0 || tagsToSend > 128 || lowThreshold <= 0 || lowThreshold > 128) + throw new IllegalArgumentException(); + _tagsToSend = tagsToSend; + _lowThreshold = lowThreshold; _log = context.logManager().getLog(TransientSessionKeyManager.class); _context = context; _outboundSessions = new HashMap(64); - _inboundTagSets = new HashMap(1024); + _inboundTagSets = new HashMap(128); context.statManager().createRateStat("crypto.sessionTagsExpired", "How many tags/sessions are expired?", "Encryption", new long[] { 10*60*1000, 60*60*1000, 3*60*60*1000 }); context.statManager().createRateStat("crypto.sessionTagsRemaining", "How many tags/sessions are remaining after a cleanup?", "Encryption", new long[] { 10*60*1000, 60*60*1000, 3*60*60*1000 }); _alive = true; @@ -291,6 +351,31 @@ public class TransientSessionKeyManager extends SessionKeyManager { return null; } + /** + * How many to send, IF we need to. + * @return the configured value (not adjusted for current available) + * @since 0.9.1 + */ + @Override + public int getTagsToSend() { return _tagsToSend; }; + + /** + * @return the configured value + * @since 0.9.1 + */ + @Override + public int getLowThreshold() { return _lowThreshold; }; + + /** + * @return true if we have less than the threshold or what we have is about to expire + * @since 0.9.1 + */ + @Override + public boolean shouldSendTags(PublicKey target, SessionKey key, int lowThreshold) { + return getAvailableTags(target, key) < lowThreshold || + getAvailableTimeLeft(target, key) < SESSION_TAG_EXPIRATION_WINDOW; + } + /** * Determine (approximately) how many available session tags for the current target * have been confirmed and are available diff --git a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java index 83d5d51d5..9fba564f8 100644 --- a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java +++ b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java @@ -17,6 +17,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -95,6 +96,10 @@ class ClientConnectionRunner { // e.g. on local access private static final int MAX_MESSAGE_ID = 0x4000000; + /** @since 0.9.1 */ + private static final String PROP_TAGS = "crypto.tagsToSend"; + private static final String PROP_THRESH = "crypto.lowTagThreshold"; + /** * Create a new runner against the given socket * @@ -200,14 +205,29 @@ class ClientConnectionRunner { if (_log.shouldLog(Log.DEBUG)) _log.debug("SessionEstablished called for destination " + _destHashCache.toBase64()); _config = config; - // This is the only option that is interpreted here, not at the tunnel manager - if (config.getOptions() != null) + // We process a few options here, but most are handled by the tunnel manager. + // The ones here can't be changed later. + Properties opts = config.getOptions(); + if (opts != null) _dontSendMSM = "none".equals(config.getOptions().getProperty(I2PClient.PROP_RELIABILITY, "").toLowerCase(Locale.US)); // per-destination session key manager to prevent rather easy correlation - if (_sessionKeyManager == null) - _sessionKeyManager = new TransientSessionKeyManager(_context); - else + if (_sessionKeyManager == null) { + int tags = TransientSessionKeyManager.DEFAULT_TAGS; + int thresh = TransientSessionKeyManager.LOW_THRESHOLD; + if (opts != null) { + String ptags = opts.getProperty(PROP_TAGS); + if (ptags != null) { + try { tags = Integer.parseInt(ptags); } catch (NumberFormatException nfe) {} + } + String pthresh = opts.getProperty(PROP_THRESH); + if (pthresh != null) { + try { thresh = Integer.parseInt(pthresh); } catch (NumberFormatException nfe) {} + } + } + _sessionKeyManager = new TransientSessionKeyManager(_context, tags, thresh); + } else { _log.error("SessionEstablished called for twice for destination " + _destHashCache.toBase64().substring(0,4)); + } _manager.destinationEstablished(this); } diff --git a/router/java/src/net/i2p/router/client/ClientMessageEventListener.java b/router/java/src/net/i2p/router/client/ClientMessageEventListener.java index c74b38d04..fa762ecab 100644 --- a/router/java/src/net/i2p/router/client/ClientMessageEventListener.java +++ b/router/java/src/net/i2p/router/client/ClientMessageEventListener.java @@ -298,6 +298,9 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi * Message's Session ID ignored. This doesn't support removing previously set options. * Nor do we bother with message.getSessionConfig().verifySignature() ... should we? * Nor is the Date checked. + * + * Note that this does NOT update the few options handled in + * ClientConnectionRunner.sessionEstablished(). Those can't be changed later. */ private void handleReconfigureSession(I2CPMessageReader reader, ReconfigureSessionMessage message) { if (_log.shouldLog(Log.INFO)) diff --git a/router/java/src/net/i2p/router/message/GarlicMessageBuilder.java b/router/java/src/net/i2p/router/message/GarlicMessageBuilder.java index ad7c63627..998e267b5 100644 --- a/router/java/src/net/i2p/router/message/GarlicMessageBuilder.java +++ b/router/java/src/net/i2p/router/message/GarlicMessageBuilder.java @@ -33,52 +33,15 @@ import net.i2p.util.Log; */ public class GarlicMessageBuilder { - /** - * This was 100 since 0.6.1.10 (50 before that). It's important because: - *
-     *  - Tags are 32 bytes. So it previously added 3200 bytes to an initial message.
-     *  - Too many tags adds a huge overhead to short-duration connections
-     *    (like http, datagrams, etc.)
-     *  - Large messages have a much higher chance of being dropped due to
-     *    one of their 1KB fragments being discarded by a tunnel participant.
-     *  - This reduces the effective maximum datagram size because the client
-     *    doesn't know when tags will be bundled, so the tag size must be
-     *    subtracted from the maximum I2NP size or transport limit.
-     * 
- * - * Issues with too small a value: - *
-     *  - When tags are sent, a reply leaseset (~1KB) is always bundled.
-     *    Maybe don't need to bundle more than every minute or so
-     *    rather than every time?
-     *  - Does the number of tags (and the threshold of 20) limit the effective
-     *    streaming lib window size? Should the threshold and the number of
-     *    sent tags be variable based on the message rate?
-     * 
- * - * We have to be very careful if we implement an adaptive scheme, - * since the key manager is per-router, not per-local-dest. - * Or maybe that's a bad idea, and we need to move to a per-dest manager. - * This needs further investigation. - * - * So a value somewhat higher than the low threshold - * seems appropriate. - * - * Use care when adjusting these values. See ConnectionOptions in streaming, - * and TransientSessionKeyManager in crypto, for more information. - */ - private static final int DEFAULT_TAGS = 40; - private static final int LOW_THRESHOLD = 30; - /** @param local non-null; do not use this method for the router's SessionKeyManager */ - public static int estimateAvailableTags(RouterContext ctx, PublicKey key, Hash local) { + public static boolean needsTags(RouterContext ctx, PublicKey key, Hash local) { SessionKeyManager skm = ctx.clientManager().getClientSessionKeyManager(local); if (skm == null) - return 0; + return true; SessionKey curKey = skm.getCurrentKey(key); if (curKey == null) - return 0; - return skm.getAvailableTags(key, curKey); + return true; + return skm.shouldSendTags(key, curKey); } /** @@ -100,17 +63,19 @@ public class GarlicMessageBuilder { } /** - * called by OCMJH + * Now unused, since we have to generate a reply token first in OCMOSJ but we don't know if tags are required yet. * * @param ctx scope * @param config how/what to wrap * @param wrappedKey output parameter that will be filled with the sessionKey used - * @param wrappedTags output parameter that will be filled with the sessionTags used + * @param wrappedTags Output parameter that will be filled with the sessionTags used. + If non-empty on return you must call skm.tagsDelivered() when sent + and then call skm.tagsAcked() or skm.failTags() later. * @param skm non-null */ public static GarlicMessage buildMessage(RouterContext ctx, GarlicConfig config, SessionKey wrappedKey, Set wrappedTags, SessionKeyManager skm) { - return buildMessage(ctx, config, wrappedKey, wrappedTags, DEFAULT_TAGS, false, skm); + return buildMessage(ctx, config, wrappedKey, wrappedTags, skm.getTagsToSend(), skm); } /** unused */ @@ -122,33 +87,42 @@ public class GarlicMessageBuilder { ***/ /** - * called by above + * called by OCMJH * * @param ctx scope * @param config how/what to wrap * @param wrappedKey output parameter that will be filled with the sessionKey used - * @param wrappedTags output parameter that will be filled with the sessionTags used - * @param numTagsToDeliver only if the estimated available tags are below the threshold + * @param wrappedTags Output parameter that will be filled with the sessionTags used. + If non-empty on return you must call skm.tagsDelivered() when sent + and then call skm.tagsAcked() or skm.failTags() later. + * @param numTagsToDeliver Only if the estimated available tags are below the threshold. + Set to zero to disable tag delivery. You must set to zero if you are not + equipped to confirm delivery and call skm.tagsAcked() or skm.failTags() later. * @param skm non-null */ - private static GarlicMessage buildMessage(RouterContext ctx, GarlicConfig config, SessionKey wrappedKey, Set wrappedTags, - int numTagsToDeliver, boolean forceElGamal, SessionKeyManager skm) { - return buildMessage(ctx, config, wrappedKey, wrappedTags, numTagsToDeliver, LOW_THRESHOLD, false, skm); + public static GarlicMessage buildMessage(RouterContext ctx, GarlicConfig config, SessionKey wrappedKey, Set wrappedTags, + int numTagsToDeliver, SessionKeyManager skm) { + return buildMessage(ctx, config, wrappedKey, wrappedTags, numTagsToDeliver, skm.getLowThreshold(), skm); } /** - * called by netdb + * called by netdb and above * * @param ctx scope * @param config how/what to wrap * @param wrappedKey output parameter that will be filled with the sessionKey used - * @param wrappedTags output parameter that will be filled with the sessionTags used - * @param numTagsToDeliver only if the estimated available tags are below the threshold + * @param wrappedTags Output parameter that will be filled with the sessionTags used. + If non-empty on return you must call skm.tagsDelivered() when sent + and then call skm.tagsAcked() or skm.failTags() later. + * @param numTagsToDeliver only if the estimated available tags are below the threshold. + Set to zero to disable tag delivery. You must set to zero if you are not + equipped to confirm delivery and call skm.tagsAcked() or failTags() later. + If this is always 0, it forces ElGamal every time. * @param lowTagsThreshold the threshold * @param skm non-null */ public static GarlicMessage buildMessage(RouterContext ctx, GarlicConfig config, SessionKey wrappedKey, Set wrappedTags, - int numTagsToDeliver, int lowTagsThreshold, boolean forceElGamal, SessionKeyManager skm) { + int numTagsToDeliver, int lowTagsThreshold, SessionKeyManager skm) { Log log = ctx.logManager().getLog(GarlicMessageBuilder.class); PublicKey key = config.getRecipientPublicKey(); if (key == null) { @@ -167,29 +141,19 @@ public class GarlicMessageBuilder { SessionKey curKey = skm.getCurrentOrNewKey(key); SessionTag curTag = null; - if (!forceElGamal) { + curTag = skm.consumeNextAvailableTag(key, curKey); int availTags = skm.getAvailableTags(key, curKey); if (log.shouldLog(Log.DEBUG)) log.debug("Available tags for encryption to " + key + ": " + availTags); - if (availTags < lowTagsThreshold) { // arbitrary threshold + if (numTagsToDeliver > 0 && skm.shouldSendTags(key, curKey, lowTagsThreshold)) { for (int i = 0; i < numTagsToDeliver; i++) wrappedTags.add(new SessionTag(true)); if (log.shouldLog(Log.INFO)) - log.info("Too few are available (" + availTags + "), so we're including more"); - } else if (skm.getAvailableTimeLeft(key, curKey) < 60*1000) { - // if we have enough tags, but they expire in under 30 seconds, we want more - for (int i = 0; i < numTagsToDeliver; i++) - wrappedTags.add(new SessionTag(true)); - if (log.shouldLog(Log.INFO)) - log.info("Tags are almost expired, adding new ones"); - } else { - // always tack on at least one more - not necessary. - //wrappedTags.add(new SessionTag(true)); + log.info("Too few tags available so we're including " + numTagsToDeliver); } - } wrappedKey.setData(curKey.getData()); @@ -202,7 +166,9 @@ public class GarlicMessageBuilder { * @param ctx scope * @param config how/what to wrap * @param wrappedKey unused - why?? - * @param wrappedTags output parameter that will be filled with the sessionTags used + * @param wrappedTags Output parameter that will be filled with the sessionTags used. + If non-empty on return you must call skm.tagsDelivered() when sent + and then call skm.tagsAcked() or skm.failTags() later. * @param target public key of the location being garlic routed to (may be null if we * know the encryptKey and encryptTag) * @param encryptKey sessionKey used to encrypt the current message @@ -216,7 +182,7 @@ public class GarlicMessageBuilder { GarlicMessage msg = new GarlicMessage(ctx); - noteWrap(ctx, msg, config); + //noteWrap(ctx, msg, config); byte cloveSet[] = buildCloveSet(ctx, config); @@ -239,6 +205,7 @@ public class GarlicMessageBuilder { return msg; } +/**** private static void noteWrap(RouterContext ctx, GarlicMessage wrapper, GarlicConfig contained) { for (int i = 0; i < contained.getCloveCount(); i++) { GarlicConfig config = contained.getClove(i); @@ -249,6 +216,7 @@ public class GarlicMessageBuilder { } } } +****/ /** * Build an unencrypted set of cloves specified by the config. @@ -303,7 +271,7 @@ public class GarlicMessageBuilder { private static byte[] buildClove(RouterContext ctx, PayloadGarlicConfig config) throws DataFormatException, IOException { GarlicClove clove = new GarlicClove(ctx); clove.setData(config.getPayload()); - return buildCommonClove(ctx, clove, config); + return buildCommonClove(clove, config); } /** @@ -328,11 +296,10 @@ public class GarlicMessageBuilder { if (msg == null) throw new DataFormatException("Unable to build message from clove config"); clove.setData(msg); - return buildCommonClove(ctx, clove, config); + return buildCommonClove(clove, config); } - - private static byte[] buildCommonClove(RouterContext ctx, GarlicClove clove, GarlicConfig config) throws DataFormatException, IOException { + private static byte[] buildCommonClove(GarlicClove clove, GarlicConfig config) throws DataFormatException, IOException { clove.setCertificate(config.getCertificate()); clove.setCloveId(config.getId()); clove.setExpiration(new Date(config.getExpiration())); diff --git a/router/java/src/net/i2p/router/message/OutboundClientMessageJobHelper.java b/router/java/src/net/i2p/router/message/OutboundClientMessageJobHelper.java index 7941616f5..f5a4d3f38 100644 --- a/router/java/src/net/i2p/router/message/OutboundClientMessageJobHelper.java +++ b/router/java/src/net/i2p/router/message/OutboundClientMessageJobHelper.java @@ -50,6 +50,8 @@ class OutboundClientMessageJobHelper { * * Unused? * + * @param wrappedKey output parameter that will be filled with the sessionKey used + * @param wrappedTags output parameter that will be filled with the sessionTags used * @param bundledReplyLeaseSet if specified, the given LeaseSet will be packaged with the message (allowing * much faster replies, since their netDb search will return almost instantly) * @return garlic, or null if no tunnels were found (or other errors) @@ -68,6 +70,8 @@ class OutboundClientMessageJobHelper { * * This is called from OCMOSJ * + * @param wrappedKey output parameter that will be filled with the sessionKey used + * @param wrappedTags output parameter that will be filled with the sessionTags used * @return garlic, or null if no tunnels were found (or other errors) */ static GarlicMessage createGarlicMessage(RouterContext ctx, long replyToken, long expiration, PublicKey recipientPK, @@ -79,8 +83,10 @@ class OutboundClientMessageJobHelper { SessionKeyManager skm = ctx.clientManager().getClientSessionKeyManager(from); if (skm == null) return null; + // no use sending tags unless we have a reply token set up already + int tagsToSend = replyToken >= 0 ? skm.getTagsToSend() : 0; GarlicMessage msg = GarlicMessageBuilder.buildMessage(ctx, config, wrappedKey, wrappedTags, - skm); + tagsToSend, skm); return msg; } diff --git a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java index ea0664ec6..d171b6b24 100644 --- a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java +++ b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java @@ -78,27 +78,38 @@ public class OutboundClientMessageOneShotJob extends JobImpl { private final static long OVERALL_TIMEOUT_MS_MIN = 8*1000; /** - * If the client's config specifies shouldBundleReplyInfo=true, messages sent from - * that client to any peers will probabalistically include the sending destination's - * current LeaseSet (allowing the recipient to reply without having to do a full - * netDb lookup). This should improve performance during the initial negotiations, - * but is not necessary for communication that isn't bidirectional. + * NOTE: Changed as of 0.9.1. * + * Defaults to true. + * + * If the client's config specifies shouldBundleReplyInfo=true, messages sent from + * that client to any peers will periodically include the sending destination's + * current LeaseSet (allowing the recipient to reply without having to do a full + * netDb lookup). This should improve performance during the initial negotiations. + * + * For clients that do not publish their LeaseSet, this option must be true + * for any reply to be possible. + * + * Setting to "false" may save significant outbound bandwidth, especially if + * the client is configured with a large number of inbound tunnels (Leases). + * If replies are still required, this may shift the bandwidth burden to + * the far-end client and the floodfill. + * + * There are several cases where "false" is may be appropriate: + *
  • + * Unidirectional communication, no reply required + *
  • + * LeaseSet is published and higher reply latency is acceptable + *
  • + * LeaseSet is published, client is a "server", all connections are inbound + * so the connecting far-end destination obviously has the leaseset already. + * Connections are either short, or it is acceptable for latency on a long-lived + * connection to temporarily increase while the other end re-fetches the LeaseSet + * after expiration. + * HTTP servers may fit these requirements. + *
*/ public static final String BUNDLE_REPLY_LEASESET = "shouldBundleReplyInfo"; - /** - * Allow the override of the frequency of bundling the reply info in with a message. - * The client app can specify bundleReplyInfoProbability=80 (for instance) and that - * will cause the router to include the sender's leaseSet with 80% of the messages - * sent to the peer. - * - */ - public static final String BUNDLE_PROBABILITY = "bundleReplyInfoProbability"; - /** - * How often do messages include the reply leaseSet (out of every 100 tries). - * Including it each time is probably overkill, but who knows. - */ - private static final int BUNDLE_PROBABILITY_DEFAULT = 100; private static final int REPLY_REQUEST_INTERVAL = 60*1000; @@ -212,47 +223,14 @@ public class OutboundClientMessageOneShotJob extends JobImpl { if (newLS == null) return null; // punt - if (!force) { - // Don't send it every time unless configured to; default=false - Properties opts = _clientMessage.getSenderConfig().getOptions(); - String wantBundle = opts.getProperty(BUNDLE_REPLY_LEASESET, "false"); - if ("true".equals(wantBundle)) { - int probability = BUNDLE_PROBABILITY_DEFAULT; - String str = opts.getProperty(BUNDLE_PROBABILITY); - try { - if (str != null) - probability = Integer.parseInt(str); - } catch (NumberFormatException nfe) { - if (_log.shouldLog(Log.WARN)) - _log.warn(getJobId() + ": Bundle leaseSet probability overridden incorrectly [" - + str + "]", nfe); - } - if (probability >= 100) - return newLS; // do this every time so don't worry about cache - if (_log.shouldLog(Log.INFO)) - _log.info(getJobId() + ": Bundle leaseSet probability is " + probability); - if (probability >= getContext().random().nextInt(100)) - force = true; // just add newLS to cache below and return - // fall through to cache check and add - } - } - // If the last leaseSet we sent him is still good, don't bother sending again LeaseSet ls = _cache.leaseSetCache.put(_hashPair, newLS); if (!force) { if (ls != null) { if (ls.equals(newLS)) { - // still good, send it 10% of the time - // sendACK does 5% random which forces us, good enough - //if (10 >= getContext().random().nextInt(100)) { - // if (_log.shouldLog(Log.INFO)) - // _log.info("Found in cache - including reply leaseset for " + _toString); - // return ls; - //} else { if (_log.shouldLog(Log.INFO)) _log.info(getJobId() + ": Found in cache - NOT including reply leaseset for " + _toString); return null; - //} } else { if (_log.shouldLog(Log.INFO)) _log.info(getJobId() + ": Expired from cache - reply leaseset for " + _toString); @@ -441,35 +419,53 @@ public class OutboundClientMessageOneShotJob extends JobImpl { return; } - int existingTags = GarlicMessageBuilder.estimateAvailableTags(getContext(), _leaseSet.getEncryptionKey(), - _from.calculateHash()); _outTunnel = selectOutboundTunnel(_to); // boolean wantACK = _wantACK || existingTags <= 30 || getContext().random().nextInt(100) < 5; // what's the point of 5% random? possible improvements or replacements: // DONE (getNextLease() is called before this): wantACK if we changed their inbound lease (getNextLease() sets _wantACK) // DONE (selectOutboundTunnel() moved above here): wantACK if we changed our outbound tunnel (selectOutboundTunnel() sets _wantACK) // DONE (added new cache): wantACK if we haven't in last 1m (requires a new static cache probably) - boolean wantACK; - Long lastSent = _cache.lastReplyRequestCache.get(_hashPair); - wantACK = _wantACK || existingTags <= 30 || - lastSent == null || lastSent.longValue() < now - REPLY_REQUEST_INTERVAL; - if (wantACK) - _cache.lastReplyRequestCache.put(_hashPair, Long.valueOf(now)); + Long lastReplyRequestSent = _cache.lastReplyRequestCache.get(_hashPair); + boolean shouldRequestReply = lastReplyRequestSent == null || + lastReplyRequestSent.longValue() < now - REPLY_REQUEST_INTERVAL; + + boolean wantACK = _wantACK || + shouldRequestReply || + // TODO: check the per-message flags also + GarlicMessageBuilder.needsTags(getContext(), _leaseSet.getEncryptionKey(), _from.calculateHash()); PublicKey key = _leaseSet.getEncryptionKey(); SessionKey sessKey = new SessionKey(); Set tags = new HashSet(); - // If we want an ack, bundle a leaseSet... (so he can get back to us) - LeaseSet replyLeaseSet = getReplyLeaseSet(wantACK); - // ... and vice versa (so we know he got it) - if (replyLeaseSet != null) - wantACK = true; - long token = (wantACK ? getContext().random().nextLong(I2NPMessage.MAX_ID_VALUE) : -1); - if (wantACK) - _inTunnel = selectInboundTunnel(); - boolean ok = (_clientMessage != null) && buildClove(); + LeaseSet replyLeaseSet; + // TODO: check the per-message flags also + String allow = _clientMessage.getSenderConfig().getOptions().getProperty(BUNDLE_REPLY_LEASESET); + boolean allowLeaseBundle = allow == null || Boolean.valueOf(allow).booleanValue(); + if (allowLeaseBundle) { + // If we want an ack, bundle a leaseSet... + //replyLeaseSet = getReplyLeaseSet(wantACK); + // Only when necessary. We don't need to force. + // ACKs find their own way back, they don't need a leaseset. + replyLeaseSet = getReplyLeaseSet(false); + // ... and vice versa (so we know he got it) + if (replyLeaseSet != null) + wantACK = true; + } else { + replyLeaseSet = null; + } + + long token; + if (wantACK) { + _cache.lastReplyRequestCache.put(_hashPair, Long.valueOf(now)); + token = getContext().random().nextLong(I2NPMessage.MAX_ID_VALUE); + _inTunnel = selectInboundTunnel(); + } else { + token = -1; + } + + boolean ok = buildClove(); if (!ok) { dieFatal(); return; @@ -502,12 +498,10 @@ public class OutboundClientMessageOneShotJob extends JobImpl { ReplySelector selector = null; if (wantACK) { TagSetHandle tsh = null; - if ( (sessKey != null) && (tags != null) && (!tags.isEmpty()) ) { - if (_leaseSet != null) { + if (!tags.isEmpty()) { SessionKeyManager skm = getContext().clientManager().getClientSessionKeyManager(_from.calculateHash()); if (skm != null) tsh = skm.tagsDelivered(_leaseSet.getEncryptionKey(), sessKey, tags); - } } onReply = new SendSuccessJob(getContext(), sessKey, tsh); onFail = new SendTimeoutJob(getContext(), sessKey, tsh); diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/MessageWrapper.java b/router/java/src/net/i2p/router/networkdb/kademlia/MessageWrapper.java index f6fb57ae6..5bed6aec7 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/MessageWrapper.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/MessageWrapper.java @@ -62,7 +62,7 @@ class MessageWrapper { SessionKey sentKey = new SessionKey(); Set sentTags = new HashSet(); GarlicMessage msg = GarlicMessageBuilder.buildMessage(ctx, payload, sentKey, sentTags, - NETDB_TAGS_TO_DELIVER, NETDB_LOW_THRESHOLD, false, skm); + NETDB_TAGS_TO_DELIVER, NETDB_LOW_THRESHOLD, skm); if (msg == null) return null; TagSetHandle tsh = null;