Streaming: Add API for sending/receiving payload in ping/pong

This commit is contained in:
zzz
2015-01-05 15:09:12 +00:00
parent 045f6dccf8
commit 6e847a4cc4
4 changed files with 143 additions and 9 deletions

View File

@@ -11,6 +11,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import net.i2p.I2PAppContext;
import net.i2p.I2PException;
import net.i2p.client.I2PSession;
import net.i2p.data.ByteArray;
import net.i2p.data.Destination;
import net.i2p.data.Hash;
import net.i2p.data.SessionKey;
@@ -58,6 +59,8 @@ class ConnectionManager {
/** @since 0.9.3 */
public static final String PROP_BLACKLIST = "i2p.streaming.blacklist";
private static final long MAX_PING_TIMEOUT = 5*60*1000;
private static final int MAX_PONG_PAYLOAD = 32;
/**
* Manage all conns for this session
@@ -328,6 +331,13 @@ class ConnectionManager {
pong.setReceiveStreamId(ping.getSendStreamId());
pong.setLocalPort(ping.getLocalPort());
pong.setRemotePort(ping.getRemotePort());
// as of 0.9.18, return the payload
ByteArray payload = ping.getPayload();
if (payload != null) {
if (payload.getValid() > MAX_PONG_PAYLOAD)
payload.setValid(MAX_PONG_PAYLOAD);
pong.setPayload(payload);
}
_outboundQueue.enqueue(pong);
return true;
}
@@ -728,16 +738,13 @@ class ConnectionManager {
packet.setOptionalFrom(_session.getMyDestination());
packet.setLocalPort(fromPort);
packet.setRemotePort(toPort);
//if ( (keyToUse != null) && (tagsToSend != null) ) {
// packet.setKeyUsed(keyToUse);
// packet.setTagsSent(tagsToSend);
//}
if (timeoutMs > MAX_PING_TIMEOUT)
timeoutMs = MAX_PING_TIMEOUT;
if (_log.shouldLog(Log.INFO)) {
_log.info(String.format("about to ping %s port %d from port %d timeout=%d blocking=%b",
peer.calculateHash().toString(), toPort, fromPort, timeoutMs, blocking));
}
_outboundQueue.enqueue(packet);
packet.releasePayload();
@@ -756,7 +763,64 @@ class ConnectionManager {
return ok;
}
/**
* blocking
*
* @param timeoutMs greater than zero
* @param notifier may be null
* @param payload non-null, include in packet, up to 32 bytes may be returned in pong
* not copied, do not modify
* @return the payload received in the pong, zero-length if none, null on failure or timeout
* @since 0.9.18
*/
public byte[] ping(Destination peer, int fromPort, int toPort, long timeoutMs,
byte[] payload) {
PingRequest req = new PingRequest(null);
long id = assignPingId(req);
PacketLocal packet = new PacketLocal(_context, peer);
packet.setSendStreamId(id);
packet.setFlag(Packet.FLAG_ECHO |
Packet.FLAG_NO_ACK |
Packet.FLAG_SIGNATURE_INCLUDED);
packet.setOptionalFrom(_session.getMyDestination());
packet.setLocalPort(fromPort);
packet.setRemotePort(toPort);
packet.setPayload(new ByteArray(payload));
if (timeoutMs > MAX_PING_TIMEOUT)
timeoutMs = MAX_PING_TIMEOUT;
if (_log.shouldLog(Log.INFO)) {
_log.info(String.format("about to ping %s port %d from port %d timeout=%d payload=%d",
peer.calculateHash().toString(), toPort, fromPort, timeoutMs, payload.length));
}
_outboundQueue.enqueue(packet);
packet.releasePayload();
synchronized (req) {
if (!req.pongReceived())
try { req.wait(timeoutMs); } catch (InterruptedException ie) {}
}
_pendingPings.remove(id);
boolean ok = req.pongReceived();
if (!ok)
return null;
ByteArray ba = req.getPayload();
if (ba == null)
return new byte[0];
byte[] rv = new byte[ba.getValid()];
System.arraycopy(ba, ba.getOffset(), rv, 0, ba.getValid());
return rv;
}
/**
* The callback interface for a pong.
* Unused? Not part of the public streaming API.
*/
public interface PingNotifier {
/**
* @param ok true if pong received; false if timed out
*/
public void pingComplete(boolean ok);
}
@@ -783,6 +847,7 @@ class ConnectionManager {
private static class PingRequest {
private boolean _ponged;
private ByteArray _payload;
private final PingNotifier _notifier;
/** @param notifier may be null */
@@ -790,23 +855,37 @@ class ConnectionManager {
_notifier = notifier;
}
public void pong() {
/**
* @param payload may be null
*/
public void pong(ByteArray payload) {
// static, no log
//_log.debug("Ping successful");
//_context.sessionKeyManager().tagsDelivered(_peer.getPublicKey(), _packet.getKeyUsed(), _packet.getTagsSent());
synchronized (this) {
_ponged = true;
_payload = payload;
notifyAll();
}
if (_notifier != null)
_notifier.pingComplete(true);
}
public synchronized boolean pongReceived() { return _ponged; }
/**
* @return null if no payload or no pong received
* @since 0.9.18
*/
public synchronized ByteArray getPayload() { return _payload; }
}
void receivePong(long pingId) {
/**
* @param payload may be null
*/
void receivePong(long pingId, ByteArray payload) {
PingRequest req = _pendingPings.remove(Long.valueOf(pingId));
if (req != null)
req.pong();
req.pong(payload);
}
}

View File

@@ -178,6 +178,30 @@ public class I2PSocketManagerFull implements I2PSocketManager {
return _connectionManager.ping(peer, localPort, remotePort, timeoutMs);
}
/**
* Ping the specified peer, returning true if they replied to the ping within
* the timeout specified, false otherwise. This call blocks.
*
* Uses the ports specified.
*
* @param peer Destination to ping
* @param localPort 0 - 65535
* @param remotePort 0 - 65535
* @param timeoutMs timeout in ms, greater than zero
* @param payload to include in the ping
* @return the payload received in the pong, zero-length if none, null on failure or timeout
* @throws IllegalArgumentException
* @since 0.9.18
*/
public byte[] ping(Destination peer, int localPort, int remotePort, long timeoutMs, byte[] payload) {
if (localPort < 0 || localPort > 65535 ||
remotePort < 0 || remotePort > 65535)
throw new IllegalArgumentException("bad port");
if (timeoutMs <= 0)
throw new IllegalArgumentException("bad timeout");
return _connectionManager.ping(peer, localPort, remotePort, timeoutMs, payload);
}
/**
* How long should we wait for the client to .accept() a socket before
* sending back a NACK/Close?
@@ -394,7 +418,14 @@ public class I2PSocketManagerFull implements I2PSocketManager {
return rv;
}
/**
* For logging / diagnostics only
*/
public String getName() { return _name; }
/**
* For logging / diagnostics only
*/
public void setName(String name) { _name = name; }

View File

@@ -361,7 +361,7 @@ class PacketHandler {
}
private void receivePong(Packet packet) {
_manager.receivePong(packet.getReceiveStreamId());
_manager.receivePong(packet.getReceiveStreamId(), packet.getPayload());
}
private static final boolean isValidMatch(long conStreamId, long packetStreamId) {