Merge pull request 'I2CP: Add async lookup API' (#511) from zzz/i2p.i2p:i2cp-asynch-lookup into master
Some checks failed
Sync Primary Repository to GitHub Mirror / sync (push) Has been cancelled
Daily Workflow / daily-job (push) Has been cancelled
Daily Workflow / javadoc-latest (push) Has been cancelled
Daily Workflow / build-java7 (push) Has been cancelled
Java CI / build (push) Has been cancelled
Java CI / javadoc-latest (push) Has been cancelled
Java CI / build-java7 (push) Has been cancelled
Dockerhub / docker (push) Has been cancelled
Java with IzPack Snapshot Setup / setup (push) Has been cancelled

Reviewed-on: #511
Reviewed-by: idk <idki2p@mail.i2p>
This commit is contained in:
zzz
2025-06-18 11:35:35 -04:00
5 changed files with 292 additions and 8 deletions

View File

@ -422,6 +422,32 @@ public interface I2PSession {
*/
public LookupResult lookupDest2(String name, long maxWait) throws I2PSessionException;
/**
* Lookup a Destination by hostname.
* Non-blocking.
* If the result is cached or there is an immediate failure,
* the result code will be something other than RESULT_DEFERRED, and the callback will NOT be called.
*
* @param maxWait ms
* @param callback to return the result, non-null
* @return non-null. If result code is RESULT_DEFERRED, callback will be called later
* @since 0.9.67
*/
public LookupResult lookupDest(Hash h, long maxWait, LookupCallback callback) throws I2PSessionException;
/**
* Lookup a Destination by hash.
* Non-blocking.
* If the result is cached or there is an immediate failure,
* the result code will be something other than RESULT_DEFERRED, and the callback will NOT be called.
*
* @param maxWait ms
* @param callback to return the result, non-null
* @return non-null. If result code is RESULT_DEFERRED, callback will be called later
* @since 0.9.67
*/
public LookupResult lookupDest(String name, long maxWait, LookupCallback callback) throws I2PSessionException;
/**
* Pass updated options to the router.
* Does not remove properties previously present but missing from this options parameter.

View File

@ -0,0 +1,14 @@
package net.i2p.client;
/**
* Deferred callback for IPSession.lookupNonblocking()
*
* @since 0.9.67
*/
public interface LookupCallback {
/**
* The result
*/
public void complete(LookupResult result);
}

View File

@ -40,6 +40,14 @@ public interface LookupResult {
*/
public static final int RESULT_DECRYPTION_FAILURE = HostReplyMessage.RESULT_DECRYPTION_FAILURE;
/**
* For async calls only. Nonce will be non-zero and destination will be null.
* Callback will be called later with the final result and the same nonce.
*
* @since 0.9.67
*/
public static final int RESULT_DEFERRED = -1;
/**
* @return zero for success, nonzero for failure
*/
@ -50,4 +58,11 @@ public interface LookupResult {
*/
public Destination getDestination();
/**
* For async calls only. Nonce will be non-zero.
* Callback will be called later with the final result and the same nonce.
*
* @since 0.9.67
*/
public int getNonce();
}

View File

@ -38,6 +38,7 @@ import net.i2p.client.I2PClient;
import net.i2p.client.I2PSession;
import net.i2p.client.I2PSessionException;
import net.i2p.client.I2PSessionListener;
import net.i2p.client.LookupCallback;
import net.i2p.client.LookupResult;
import net.i2p.crypto.EncType;
import net.i2p.crypto.SigType;
@ -213,6 +214,8 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2
*/
private static final Map<Object, Destination> _lookupCache = new LHMCache<Object, Destination>(CACHE_MAX_SIZE);
private static final String MIN_HOST_LOOKUP_VERSION = "0.9.11";
// cached failure
private static final LookupResult LOOKUP_FAILURE = new LkupResult(LookupResult.RESULT_FAILURE, null);
/**
* Use Unix domain socket (or similar) to connect to a router
@ -1339,6 +1342,7 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2
}
if (_log.shouldLog(Log.INFO)) _log.info(getPrefix() + "Destroy the session", new Exception("DestroySession()"));
clearPendingLookups();
if (sendDisconnect) {
if (_producer != null) { // only null if overridden by I2PSimpleSession
try {
@ -1449,6 +1453,7 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2
}
if (_log.shouldWarn())
_log.warn(getPrefix() + "Disconnected", new Exception("Disconnected"));
clearPendingLookups();
if (_sessionListener != null) _sessionListener.disconnected(this);
// don't try to reconnect if it failed before GETTDATE
if (oldState != State.OPENING && shouldReconnect()) {
@ -1576,6 +1581,31 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2
}
}
/**
* Clear out all pending lookups and bw limit requests
* @since 0.9.67
*/
private void clearPendingLookups() {
LookupWaiter w;
while ((w = _pendingLookups.poll()) != null) {
if (w.callback != null) {
// asynch
LkupResult result = new LkupResult(LookupResult.RESULT_FAILURE, null, (int) w.nonce);
w.callback.complete(result);
} else {
// synch
synchronized (w) {
w.code = LookupResult.RESULT_FAILURE;
w.notifyAll();
}
}
}
// if anybody is waiting for a bw message
synchronized (_bwReceivedLock) {
_bwReceivedLock.notifyAll();
}
}
/**
* Called by the message handler
* on reception of HostReplyMessage
@ -1592,10 +1622,17 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2
_lookupCache.put(w.name, d);
_lookupCache.put(h, d);
}
synchronized (w) {
w.destination = d;
w.code = LookupResult.RESULT_SUCCESS;
w.notifyAll();
if (w.callback != null) {
// asynch
LkupResult result = new LkupResult(LookupResult.RESULT_SUCCESS, d, (int) w.nonce);
w.callback.complete(result);
} else {
// synch
synchronized (w) {
w.destination = d;
w.code = LookupResult.RESULT_SUCCESS;
w.notifyAll();
}
}
}
}
@ -1609,9 +1646,16 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2
void destLookupFailed(long nonce, int code) {
for (LookupWaiter w : _pendingLookups) {
if (nonce == w.nonce) {
synchronized (w) {
w.code = code;
w.notifyAll();
if (w.callback != null) {
// asynch
LkupResult result = new LkupResult(code, null, (int) nonce);
w.callback.complete(result);
} else {
// synch
synchronized (w) {
w.code = code;
w.notifyAll();
}
}
}
}
@ -1643,6 +1687,11 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2
* @since 0.9.43
*/
public int code;
/**
* the callback
* @since 0.9.67
*/
public final LookupCallback callback;
public LookupWaiter(Hash h) {
this(h, -1);
@ -1653,6 +1702,7 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2
this.hash = h;
this.name = null;
this.nonce = nonce;
callback = null;
}
/** @since 0.9.11 */
@ -1660,6 +1710,7 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2
this.hash = null;
this.name = name;
this.nonce = nonce;
callback = null;
}
/** Dummy, completed
@ -1670,6 +1721,23 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2
name = null;
nonce = 0;
destination = d;
callback = null;
}
/** @since 0.9.67 */
public LookupWaiter(Hash h, long nonce, LookupCallback callback) {
this.hash = h;
this.name = null;
this.nonce = nonce;
this.callback = callback;
}
/** @since 0.9.67 */
public LookupWaiter(String name, long nonce, LookupCallback callback) {
this.hash = null;
this.name = name;
this.nonce = nonce;
this.callback = callback;
}
}
@ -1800,7 +1868,7 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2
public LookupResult lookupDest2(String name, long maxWait) throws I2PSessionException {
LookupWaiter waiter = x_lookupDest(name, maxWait);
if (waiter == null)
return new LkupResult(LookupResult.RESULT_FAILURE, null);
return LOOKUP_FAILURE;
synchronized(waiter) {
int code = waiter.code;
Destination d = waiter.destination;
@ -1873,6 +1941,140 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2
}
}
/**
* Lookup a Destination by hostname.
* Non-blocking.
* If the result is cached or there is an immediate failure,
* the result code will be something other than RESULT_DEFERRED, and the callback will NOT be called.
*
* @param maxWait ms
* @param callback to return the result, non-null
* @return non-null. If result code is RESULT_DEFERRED, callback will be called later
* @since 0.9.67
*/
public LookupResult lookupDest(Hash h, long maxWait, LookupCallback callback) throws I2PSessionException {
synchronized (_lookupCache) {
Destination rv = _lookupCache.get(h);
if (rv != null)
return new LkupResult(LookupResult.RESULT_SUCCESS, rv);
}
synchronized (_stateLock) {
// not before GOTDATE
if (STATES_CLOSED_OR_OPENING.contains(_state))
return LOOKUP_FAILURE;
}
if (!_routerSupportsHostLookup) {
// older than 0.9.11, won't happen
throw new I2PSessionException("Router does not support HostLookup for " + h);
}
int nonce = _lookupID.incrementAndGet() & 0x7fffffff;
LookupWaiter waiter = new LookupWaiter(h, nonce, callback);
_pendingLookups.offer(waiter);
if (_log.shouldLog(Log.INFO))
_log.info("Sending HostLookup for " + h);
SessionId id = _sessionId;
if (id == null)
id = DUMMY_SESSION;
if (maxWait > 60*1000)
maxWait = 60*1000;
try {
sendMessage_unchecked(new HostLookupMessage(id, h, nonce, maxWait));
} catch (I2PSessionException ise) {
_pendingLookups.remove(waiter);
throw ise;
}
new LookupExpiration(waiter, maxWait);
return new LkupResult(nonce);
}
/**
* Lookup a Destination by hash.
* Non-blocking.
* If the result is cached or there is an immediate failure,
* the result code will be something other than RESULT_DEFERRED, and the callback will NOT be called.
*
* @param maxWait ms
* @param callback to return the result, non-null
* @return non-null. If result code is RESULT_DEFERRED, callback will be called later
* @since 0.9.67
*/
public LookupResult lookupDest(String name, long maxWait, LookupCallback callback) throws I2PSessionException {
if (name.length() == 0)
return LOOKUP_FAILURE;
// Shortcut for b64
if (name.length() >= 516) {
try {
Destination rv = new Destination(name);
return new LkupResult(LookupResult.RESULT_SUCCESS, rv);
} catch (DataFormatException dfe) {
return LOOKUP_FAILURE;
}
}
// won't fit in Mapping
if (name.length() >= 256 && !_context.isRouterContext())
return LOOKUP_FAILURE;
synchronized (_lookupCache) {
Destination rv = _lookupCache.get(name);
if (rv != null)
return new LkupResult(LookupResult.RESULT_SUCCESS, rv);
}
synchronized (_stateLock) {
// not before GOTDATE
if (STATES_CLOSED_OR_OPENING.contains(_state))
return LOOKUP_FAILURE;
}
if (!_routerSupportsHostLookup) {
// older than 0.9.11, won't happen
throw new I2PSessionException("Router does not support HostLookup for " + name);
}
int nonce = _lookupID.incrementAndGet() & 0x7fffffff;
LookupWaiter waiter = new LookupWaiter(name, nonce, callback);
_pendingLookups.offer(waiter);
if (_log.shouldLog(Log.INFO))
_log.info("Sending HostLookup for " + name);
SessionId id = _sessionId;
if (id == null)
id = DUMMY_SESSION;
if (maxWait > 60*1000)
maxWait = 60*1000;
try {
sendMessage_unchecked(new HostLookupMessage(id, name, nonce, maxWait));
} catch (I2PSessionException ise) {
_pendingLookups.remove(waiter);
throw ise;
}
new LookupExpiration(waiter, maxWait);
return new LkupResult(nonce);
}
/**
* Timeout for asynch lookup, if the router does not respond.
* Should rarely happen.
*
* @since 0.9.67
*/
private class LookupExpiration extends SimpleTimer2.TimedEvent {
private final LookupWaiter w;
public LookupExpiration(LookupWaiter waiter, long maxWait) {
super(_context.simpleTimer2(), maxWait + 100);
w = waiter;
}
public void timeReached() {
if (_pendingLookups.remove(w)) {
// router should always have responded
if (_log.shouldWarn())
_log.warn(getPrefix() + " Router did not respond to lookup " + w.nonce);
if (w.callback != null) {
// callback should always be present
LkupResult result = new LkupResult(LookupResult.RESULT_FAILURE, null, (int) w.nonce);
w.callback.complete(result);
}
}
}
}
/**
* Blocking. Waits a max of 5 seconds.
* But shouldn't take long.

View File

@ -12,10 +12,30 @@ public class LkupResult implements LookupResult {
private final int _code;
private final Destination _dest;
private final int _nonce;
LkupResult(int code, Destination dest) {
this(code, dest, 0);
}
/**
* Deferred
*
* @since 0.9.67
*/
LkupResult(int nonce) {
this(RESULT_DEFERRED, null, nonce);
}
/**
* Async
*
* @since 0.9.67
*/
LkupResult(int code, Destination dest, int nonce) {
_code = code;
_dest = dest;
_nonce = nonce;
}
/**
@ -28,4 +48,11 @@ public class LkupResult implements LookupResult {
*/
public Destination getDestination() { return _dest; }
/**
* For async calls only. Nonce will be non-zero.
* Callback will be called later with the final result and the same nonce.
*
* @since 0.9.67
*/
public int getNonce() { return _nonce; }
}