From 314316cee065671b9b971f95a3427acca4141805 Mon Sep 17 00:00:00 2001 From: jrandom Date: Sat, 6 Nov 2004 07:59:54 +0000 Subject: [PATCH] 2004-11-06 jrandom * Fix for a long standing synchronization bug in the SDK that in rare instances can add a few seconds of lag. --- .../src/net/i2p/client/I2PSessionImpl.java | 7 + .../src/net/i2p/client/I2PSessionImpl2.java | 4 +- .../java/src/net/i2p/client/MessageState.java | 233 +++++++++--------- history.txt | 6 +- .../src/net/i2p/router/RouterVersion.java | 4 +- .../router/message/SendTunnelMessageJob.java | 17 +- 6 files changed, 138 insertions(+), 133 deletions(-) diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java index 6f1eb467d..ebb06526d 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl.java @@ -314,6 +314,13 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa public abstract void receiveStatus(int msgId, long nonce, int status); protected boolean isGuaranteed() { + if (_log.shouldLog(Log.DEBUG)) { + String str = _options.getProperty(I2PClient.PROP_RELIABILITY); + if (str == null) + _log.debug("reliability is not specified, fallback"); + else + _log.debug("reliability is specified: " + str); + } String reliability = _options.getProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_GUARANTEED); return I2PClient.PROP_RELIABILITY_GUARANTEED.equals(reliability); } diff --git a/core/java/src/net/i2p/client/I2PSessionImpl2.java b/core/java/src/net/i2p/client/I2PSessionImpl2.java index 0e5724616..aab11ad19 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl2.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl2.java @@ -168,7 +168,7 @@ class I2PSessionImpl2 extends I2PSessionImpl { long afterSendingSync = _context.clock().now(); if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Adding sending state " + state.getMessageId() + " / " - + state.getNonce() + + state.getNonce() + " for best effort " + " sync took " + (inSendingSync-beforeSendingSync) + " add took " + (afterSendingSync-inSendingSync)); _producer.sendMessage(this, dest, nonce, payload, tag, key, sentTags, newKey); @@ -248,7 +248,7 @@ class I2PSessionImpl2 extends I2PSessionImpl { long afterSendingSync = _context.clock().now(); if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Adding sending state " + state.getMessageId() + " / " - + state.getNonce() + + state.getNonce() + " for guaranteed " + " sync took " + (inSendingSync-beforeSendingSync) + " add took " + (afterSendingSync-inSendingSync)); _producer.sendMessage(this, dest, nonce, payload, tag, key, sentTags, newKey); diff --git a/core/java/src/net/i2p/client/MessageState.java b/core/java/src/net/i2p/client/MessageState.java index 8b1792ac3..74afc0f76 100644 --- a/core/java/src/net/i2p/client/MessageState.java +++ b/core/java/src/net/i2p/client/MessageState.java @@ -29,7 +29,6 @@ class MessageState { private Destination _to; private boolean _cancelled; private long _created; - private Object _lock = new Object(); private static long __stateId = 0; private long _stateId; @@ -51,9 +50,7 @@ class MessageState { public void receive(int status) { synchronized (_receivedStatus) { _receivedStatus.add(new Integer(status)); - } - synchronized (_lock) { - _lock.notifyAll(); + _receivedStatus.notifyAll(); } } @@ -116,150 +113,140 @@ class MessageState { _log.warn(_prefix + "Expired waiting for the status [" + status + "]"); return; } - if (isSuccess(status) || isFailure(status)) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug(_prefix + "Received a confirm (one way or the other)"); - return; - } - if (timeToWait > 5000) { - timeToWait = 5000; - } - synchronized (_lock) { + synchronized (_receivedStatus) { + if (locked_isSuccess(status) || locked_isFailure(status)) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug(_prefix + "Received a confirm (one way or the other)"); + return; + } + if (timeToWait > 5000) { + timeToWait = 5000; + } try { - _lock.wait(timeToWait); + _receivedStatus.wait(timeToWait); } catch (InterruptedException ie) { // nop } } } } - private boolean isSuccess(int wantedStatus) { - List received = null; - synchronized (_receivedStatus) { - received = new ArrayList(_receivedStatus); - //_receivedStatus.clear(); - } - + private boolean locked_isSuccess(int wantedStatus) { boolean rv = false; if (_log.shouldLog(Log.DEBUG)) - _log.debug(_prefix + "isSuccess(" + wantedStatus + "): " + received); - for (Iterator iter = received.iterator(); iter.hasNext();) { + _log.debug(_prefix + "isSuccess(" + wantedStatus + "): " + _receivedStatus); + for (Iterator iter = _receivedStatus.iterator(); iter.hasNext();) { Integer val = (Integer) iter.next(); int recv = val.intValue(); switch (recv) { - case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_FAILURE: - if (_log.shouldLog(Log.WARN)) - _log.warn(_prefix + "Received best effort failure after " + getElapsed() + " from " - + toString()); - rv = false; - break; - case MessageStatusMessage.STATUS_SEND_GUARANTEED_FAILURE: - if (_log.shouldLog(Log.WARN)) - _log.warn(_prefix + "Received guaranteed failure after " + getElapsed() + " from " - + toString()); - rv = false; - break; - case MessageStatusMessage.STATUS_SEND_ACCEPTED: - if (wantedStatus == MessageStatusMessage.STATUS_SEND_ACCEPTED) { - return true; // if we're only looking for accepted, take it directly (don't let any GUARANTEED_* override it) - } - // ignore accepted, as we want something better - if (_log.shouldLog(Log.DEBUG)) - _log.debug(_prefix + "Got accepted, but we're waiting for more from " + toString()); - continue; - case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_SUCCESS: - if (_log.shouldLog(Log.DEBUG)) - _log.debug(_prefix + "Received best effort success after " + getElapsed() - + " from " + toString()); - if (wantedStatus == recv) { - rv = true; - } else { + case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_FAILURE: + if (_log.shouldLog(Log.WARN)) + _log.warn(_prefix + "Received best effort failure after " + getElapsed() + " from " + + toString()); + rv = false; + break; + case MessageStatusMessage.STATUS_SEND_GUARANTEED_FAILURE: + if (_log.shouldLog(Log.WARN)) + _log.warn(_prefix + "Received guaranteed failure after " + getElapsed() + " from " + + toString()); + rv = false; + break; + case MessageStatusMessage.STATUS_SEND_ACCEPTED: + if (wantedStatus == MessageStatusMessage.STATUS_SEND_ACCEPTED) { + return true; // if we're only looking for accepted, take it directly (don't let any GUARANTEED_* override it) + } + // ignore accepted, as we want something better if (_log.shouldLog(Log.DEBUG)) - _log.debug(_prefix + "Not guaranteed success, but best effort after " - + getElapsed() + " will do... from " + toString()); + _log.debug(_prefix + "Got accepted, but we're waiting for more from " + toString()); + continue; + case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_SUCCESS: + if (_log.shouldLog(Log.DEBUG)) + _log.debug(_prefix + "Received best effort success after " + getElapsed() + + " from " + toString()); + if (wantedStatus == recv) { + rv = true; + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug(_prefix + "Not guaranteed success, but best effort after " + + getElapsed() + " will do... from " + toString()); + rv = true; + } + break; + case MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS: + if (_log.shouldLog(Log.DEBUG)) + _log.debug(_prefix + "Received guaranteed success after " + getElapsed() + " from " + + toString()); + // even if we're waiting for best effort success, guaranteed is good enough rv = true; - } - break; - case MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS: - if (_log.shouldLog(Log.DEBUG)) - _log.debug(_prefix + "Received guaranteed success after " + getElapsed() + " from " - + toString()); - // even if we're waiting for best effort success, guaranteed is good enough - rv = true; - break; - case -1: - continue; - default: - if (_log.shouldLog(Log.DEBUG)) - _log.debug(_prefix + "Received something else [" + recv + "]..."); + break; + case -1: + continue; + default: + if (_log.shouldLog(Log.DEBUG)) + _log.debug(_prefix + "Received something else [" + recv + "]..."); } } return rv; } - private boolean isFailure(int wantedStatus) { - List received = null; - synchronized (_receivedStatus) { - received = new ArrayList(_receivedStatus); - //_receivedStatus.clear(); - } + private boolean locked_isFailure(int wantedStatus) { boolean rv = false; if (_log.shouldLog(Log.DEBUG)) - _log.debug(_prefix + "isFailure(" + wantedStatus + "): " + received); - for (Iterator iter = received.iterator(); iter.hasNext();) { + _log.debug(_prefix + "isFailure(" + wantedStatus + "): " + _receivedStatus); + + for (Iterator iter = _receivedStatus.iterator(); iter.hasNext();) { Integer val = (Integer) iter.next(); int recv = val.intValue(); switch (recv) { - case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_FAILURE: - if (_log.shouldLog(Log.DEBUG)) - _log.warn(_prefix + "Received best effort failure after " + getElapsed() + " from " - + toString()); - rv = true; - break; - case MessageStatusMessage.STATUS_SEND_GUARANTEED_FAILURE: - if (_log.shouldLog(Log.DEBUG)) - _log.warn(_prefix + "Received guaranteed failure after " + getElapsed() + " from " - + toString()); - rv = true; - break; - case MessageStatusMessage.STATUS_SEND_ACCEPTED: - if (wantedStatus == MessageStatusMessage.STATUS_SEND_ACCEPTED) { - rv = false; - } else { + case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_FAILURE: if (_log.shouldLog(Log.DEBUG)) - _log.debug(_prefix + "Got accepted, but we're waiting for more from " + _log.warn(_prefix + "Received best effort failure after " + getElapsed() + " from " + + toString()); + rv = true; + break; + case MessageStatusMessage.STATUS_SEND_GUARANTEED_FAILURE: + if (_log.shouldLog(Log.DEBUG)) + _log.warn(_prefix + "Received guaranteed failure after " + getElapsed() + " from " + + toString()); + rv = true; + break; + case MessageStatusMessage.STATUS_SEND_ACCEPTED: + if (wantedStatus == MessageStatusMessage.STATUS_SEND_ACCEPTED) { + rv = false; + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug(_prefix + "Got accepted, but we're waiting for more from " + + toString()); + continue; + // ignore accepted, as we want something better + } + break; + case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_SUCCESS: + if (_log.shouldLog(Log.DEBUG)) + _log.debug(_prefix + "Received best effort success after " + getElapsed() + + " from " + toString()); + if (wantedStatus == recv) { + rv = false; + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug(_prefix + "Not guaranteed success, but best effort after " + + getElapsed() + " will do... from " + toString()); + rv = false; + } + break; + case MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS: + if (_log.shouldLog(Log.DEBUG)) + _log.debug(_prefix + "Received guaranteed success after " + getElapsed() + " from " + toString()); + // even if we're waiting for best effort success, guaranteed is good enough + rv = false; + break; + case -1: continue; - // ignore accepted, as we want something better - } - break; - case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_SUCCESS: - if (_log.shouldLog(Log.DEBUG)) - _log.debug(_prefix + "Received best effort success after " + getElapsed() - + " from " + toString()); - if (wantedStatus == recv) { - rv = false; - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug(_prefix + "Not guaranteed success, but best effort after " - + getElapsed() + " will do... from " + toString()); - rv = false; - } - break; - case MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS: - if (_log.shouldLog(Log.DEBUG)) - _log.debug(_prefix + "Received guaranteed success after " + getElapsed() + " from " - + toString()); - // even if we're waiting for best effort success, guaranteed is good enough - rv = false; - break; - case -1: - continue; - default: - if (_log.shouldLog(Log.DEBUG)) - _log.debug(_prefix + "Received something else [" + recv + "]..."); + default: + if (_log.shouldLog(Log.DEBUG)) + _log.debug(_prefix + "Received something else [" + recv + "]..."); } } return rv; @@ -267,13 +254,15 @@ class MessageState { /** true if the given status (or an equivilant) was received */ public boolean received(int status) { - return isSuccess(status); + synchronized (_receivedStatus) { + return locked_isSuccess(status); + } } public void cancel() { _cancelled = true; - synchronized (_lock) { - _lock.notifyAll(); + synchronized (_receivedStatus) { + _receivedStatus.notifyAll(); } } } \ No newline at end of file diff --git a/history.txt b/history.txt index 620b165bc..44c54334e 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,8 @@ -$Id: history.txt,v 1.62 2004/11/02 06:57:08 jrandom Exp $ +$Id: history.txt,v 1.63 2004/11/05 05:53:41 jrandom Exp $ + +2004-11-06 jrandom + * Fix for a long standing synchronization bug in the SDK that in rare + instances can add a few seconds of lag. 2004-11-05 jrandom * Bugfixes and unit tests for the SAM bridge to handle quoted message diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index e2f25d7e5..ec84241a0 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.68 $ $Date: 2004/11/02 06:57:08 $"; + public final static String ID = "$Revision: 1.69 $ $Date: 2004/11/05 05:53:40 $"; public final static String VERSION = "0.4.1.3"; - public final static long BUILD = 9; + public final static long BUILD = 10; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/message/SendTunnelMessageJob.java b/router/java/src/net/i2p/router/message/SendTunnelMessageJob.java index 757db9f3a..6d434bf37 100644 --- a/router/java/src/net/i2p/router/message/SendTunnelMessageJob.java +++ b/router/java/src/net/i2p/router/message/SendTunnelMessageJob.java @@ -186,6 +186,7 @@ public class SendTunnelMessageJob extends JobImpl { _state = 12; // we're the gateway, so sign, encrypt, and forward to info.getNextHop() TunnelMessage msg = prepareMessage(info); + _state = 66; if (msg == null) { if (_log.shouldLog(Log.ERROR)) _log.error("wtf, unable to prepare a tunnel message to the next hop, when we're the gateway and hops remain? tunnel: " + info); @@ -212,13 +213,17 @@ public class SendTunnelMessageJob extends JobImpl { _log.warn("Adding a tunnel message that will expire shortly [" + new Date(_expiration) + "]", getAddedBy()); } + _state = 67; msg.setMessageExpiration(new Date(_expiration)); - getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), msg, - info.getNextHop(), _onSend, - _onReply, _onFailure, - _selector, - (int)(_expiration - getContext().clock().now()), - _priority)); + _state = 68; + Job j = new SendMessageDirectJob(getContext(), msg, + info.getNextHop(), _onSend, + _onReply, _onFailure, + _selector, + (int)(_expiration - getContext().clock().now()), + _priority); + _state = 69; + getContext().jobQueue().addJob(j); _state = 15; } }