diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java index 60367d922..b7a5732d0 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java @@ -158,7 +158,31 @@ public interface I2PSocketManager { */ public boolean ping(Destination peer, int localPort, int remotePort, long timeoutMs); + /** + * Ping the specified peer, returning true if they replied to the ping within + * the timeout specified, false otherwise. This call blocks. + * + * Uses the ports specified. + * + * @param peer Destination to ping + * @param localPort 0 - 65535 + * @param remotePort 0 - 65535 + * @param timeoutMs timeout in ms, greater than zero + * @param payload to include in the ping + * @return the payload received in the pong, zero-length if none, null on failure or timeout + * @throws IllegalArgumentException + * @since 0.9.18 + */ + public byte[] ping(Destination peer, int localPort, int remotePort, long timeoutMs, byte[] payload); + + /** + * For logging / diagnostics only + */ public String getName(); + + /** + * For logging / diagnostics only + */ public void setName(String name); /** diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java index de2eb86e2..a45323cb7 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java @@ -11,6 +11,7 @@ import java.util.concurrent.atomic.AtomicInteger; import net.i2p.I2PAppContext; import net.i2p.I2PException; import net.i2p.client.I2PSession; +import net.i2p.data.ByteArray; import net.i2p.data.Destination; import net.i2p.data.Hash; import net.i2p.data.SessionKey; @@ -58,6 +59,8 @@ class ConnectionManager { /** @since 0.9.3 */ public static final String PROP_BLACKLIST = "i2p.streaming.blacklist"; + private static final long MAX_PING_TIMEOUT = 5*60*1000; + private static final int MAX_PONG_PAYLOAD = 32; /** * Manage all conns for this session @@ -328,6 +331,13 @@ class ConnectionManager { pong.setReceiveStreamId(ping.getSendStreamId()); pong.setLocalPort(ping.getLocalPort()); pong.setRemotePort(ping.getRemotePort()); + // as of 0.9.18, return the payload + ByteArray payload = ping.getPayload(); + if (payload != null) { + if (payload.getValid() > MAX_PONG_PAYLOAD) + payload.setValid(MAX_PONG_PAYLOAD); + pong.setPayload(payload); + } _outboundQueue.enqueue(pong); return true; } @@ -728,16 +738,13 @@ class ConnectionManager { packet.setOptionalFrom(_session.getMyDestination()); packet.setLocalPort(fromPort); packet.setRemotePort(toPort); - //if ( (keyToUse != null) && (tagsToSend != null) ) { - // packet.setKeyUsed(keyToUse); - // packet.setTagsSent(tagsToSend); - //} + if (timeoutMs > MAX_PING_TIMEOUT) + timeoutMs = MAX_PING_TIMEOUT; if (_log.shouldLog(Log.INFO)) { _log.info(String.format("about to ping %s port %d from port %d timeout=%d blocking=%b", peer.calculateHash().toString(), toPort, fromPort, timeoutMs, blocking)); } - _outboundQueue.enqueue(packet); packet.releasePayload(); @@ -756,7 +763,64 @@ class ConnectionManager { return ok; } + /** + * blocking + * + * @param timeoutMs greater than zero + * @param notifier may be null + * @param payload non-null, include in packet, up to 32 bytes may be returned in pong + * not copied, do not modify + * @return the payload received in the pong, zero-length if none, null on failure or timeout + * @since 0.9.18 + */ + public byte[] ping(Destination peer, int fromPort, int toPort, long timeoutMs, + byte[] payload) { + PingRequest req = new PingRequest(null); + long id = assignPingId(req); + PacketLocal packet = new PacketLocal(_context, peer); + packet.setSendStreamId(id); + packet.setFlag(Packet.FLAG_ECHO | + Packet.FLAG_NO_ACK | + Packet.FLAG_SIGNATURE_INCLUDED); + packet.setOptionalFrom(_session.getMyDestination()); + packet.setLocalPort(fromPort); + packet.setRemotePort(toPort); + packet.setPayload(new ByteArray(payload)); + if (timeoutMs > MAX_PING_TIMEOUT) + timeoutMs = MAX_PING_TIMEOUT; + if (_log.shouldLog(Log.INFO)) { + _log.info(String.format("about to ping %s port %d from port %d timeout=%d payload=%d", + peer.calculateHash().toString(), toPort, fromPort, timeoutMs, payload.length)); + } + + _outboundQueue.enqueue(packet); + packet.releasePayload(); + + synchronized (req) { + if (!req.pongReceived()) + try { req.wait(timeoutMs); } catch (InterruptedException ie) {} + } + _pendingPings.remove(id); + + boolean ok = req.pongReceived(); + if (!ok) + return null; + ByteArray ba = req.getPayload(); + if (ba == null) + return new byte[0]; + byte[] rv = new byte[ba.getValid()]; + System.arraycopy(ba, ba.getOffset(), rv, 0, ba.getValid()); + return rv; + } + + /** + * The callback interface for a pong. + * Unused? Not part of the public streaming API. + */ public interface PingNotifier { + /** + * @param ok true if pong received; false if timed out + */ public void pingComplete(boolean ok); } @@ -783,6 +847,7 @@ class ConnectionManager { private static class PingRequest { private boolean _ponged; + private ByteArray _payload; private final PingNotifier _notifier; /** @param notifier may be null */ @@ -790,23 +855,37 @@ class ConnectionManager { _notifier = notifier; } - public void pong() { + /** + * @param payload may be null + */ + public void pong(ByteArray payload) { // static, no log //_log.debug("Ping successful"); //_context.sessionKeyManager().tagsDelivered(_peer.getPublicKey(), _packet.getKeyUsed(), _packet.getTagsSent()); synchronized (this) { _ponged = true; + _payload = payload; notifyAll(); } if (_notifier != null) _notifier.pingComplete(true); } + public synchronized boolean pongReceived() { return _ponged; } + + /** + * @return null if no payload or no pong received + * @since 0.9.18 + */ + public synchronized ByteArray getPayload() { return _payload; } } - void receivePong(long pingId) { + /** + * @param payload may be null + */ + void receivePong(long pingId, ByteArray payload) { PingRequest req = _pendingPings.remove(Long.valueOf(pingId)); if (req != null) - req.pong(); + req.pong(payload); } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketManagerFull.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketManagerFull.java index 6d948141a..41396edb8 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketManagerFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketManagerFull.java @@ -178,6 +178,30 @@ public class I2PSocketManagerFull implements I2PSocketManager { return _connectionManager.ping(peer, localPort, remotePort, timeoutMs); } + /** + * Ping the specified peer, returning true if they replied to the ping within + * the timeout specified, false otherwise. This call blocks. + * + * Uses the ports specified. + * + * @param peer Destination to ping + * @param localPort 0 - 65535 + * @param remotePort 0 - 65535 + * @param timeoutMs timeout in ms, greater than zero + * @param payload to include in the ping + * @return the payload received in the pong, zero-length if none, null on failure or timeout + * @throws IllegalArgumentException + * @since 0.9.18 + */ + public byte[] ping(Destination peer, int localPort, int remotePort, long timeoutMs, byte[] payload) { + if (localPort < 0 || localPort > 65535 || + remotePort < 0 || remotePort > 65535) + throw new IllegalArgumentException("bad port"); + if (timeoutMs <= 0) + throw new IllegalArgumentException("bad timeout"); + return _connectionManager.ping(peer, localPort, remotePort, timeoutMs, payload); + } + /** * How long should we wait for the client to .accept() a socket before * sending back a NACK/Close? @@ -394,7 +418,14 @@ public class I2PSocketManagerFull implements I2PSocketManager { return rv; } + /** + * For logging / diagnostics only + */ public String getName() { return _name; } + + /** + * For logging / diagnostics only + */ public void setName(String name) { _name = name; } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketHandler.java index b2983114f..af1a0839f 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketHandler.java @@ -361,7 +361,7 @@ class PacketHandler { } private void receivePong(Packet packet) { - _manager.receivePong(packet.getReceiveStreamId()); + _manager.receivePong(packet.getReceiveStreamId(), packet.getPayload()); } private static final boolean isValidMatch(long conStreamId, long packetStreamId) {