diff --git a/apps/i2psnark/java/src/org/klomp/snark/ExtensionHandler.java b/apps/i2psnark/java/src/org/klomp/snark/ExtensionHandler.java index 3748b6239..7b2979b8a 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/ExtensionHandler.java +++ b/apps/i2psnark/java/src/org/klomp/snark/ExtensionHandler.java @@ -3,10 +3,13 @@ package org.klomp.snark; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import net.i2p.I2PAppContext; +import net.i2p.data.DataHelper; import net.i2p.util.Log; import org.klomp.snark.bencode.BDecoder; @@ -23,10 +26,11 @@ abstract class ExtensionHandler { private static final Log _log = I2PAppContext.getGlobalContext().logManager().getLog(ExtensionHandler.class); + public static final int ID_HANDSHAKE = 0; public static final int ID_METADATA = 1; - private static final String TYPE_METADATA = "ut_metadata"; + public static final String TYPE_METADATA = "ut_metadata"; public static final int ID_PEX = 2; - private static final String TYPE_PEX = "ut_pex"; + public static final String TYPE_PEX = "ut_pex"; /** Pieces * SHA1 Hash length, + 25% extra for file names, benconding overhead, etc */ private static final int MAX_METADATA_SIZE = Storage.MAX_PIECES * 20 * 5 / 4; private static final int PARALLEL_REQUESTS = 3; @@ -40,6 +44,7 @@ abstract class ExtensionHandler { Map handshake = new HashMap(); Map m = new HashMap(); m.put(TYPE_METADATA, Integer.valueOf(ID_METADATA)); + m.put(TYPE_PEX, Integer.valueOf(ID_PEX)); if (metasize >= 0) handshake.put("metadata_size", Integer.valueOf(metasize)); handshake.put("m", m); @@ -52,10 +57,12 @@ abstract class ExtensionHandler { public static void handleMessage(Peer peer, PeerListener listener, int id, byte[] bs) { if (_log.shouldLog(Log.INFO)) _log.info("Got extension msg " + id + " length " + bs.length + " from " + peer); - if (id == 0) + if (id == ID_HANDSHAKE) handleHandshake(peer, listener, bs); else if (id == ID_METADATA) handleMetadata(peer, listener, bs); + else if (id == ID_PEX) + handlePEX(peer, listener, bs); else if (_log.shouldLog(Log.INFO)) _log.info("Unknown extension msg " + id + " from " + peer); } @@ -72,6 +79,12 @@ abstract class ExtensionHandler { peer.setHandshakeMap(map); Map msgmap = map.get("m").getMap(); + if (msgmap.get(TYPE_PEX) != null) { + if (_log.shouldLog(Log.WARN)) + _log.debug("Peer supports PEX extension: " + peer); + // peer state calls peer listener calls sendPEX() + } + if (msgmap.get(TYPE_METADATA) == null) { if (_log.shouldLog(Log.WARN)) _log.debug("Peer does not support metadata extension: " + peer); @@ -216,6 +229,7 @@ abstract class ExtensionHandler { } catch (Exception e) { if (_log.shouldLog(Log.WARN)) _log.info("Metadata ext. msg. exception from " + peer, e); + // fatal ? peer.disconnect(false); } } @@ -262,4 +276,72 @@ abstract class ExtensionHandler { _log.info("Metadata send piece msg exception to " + peer, e); } } + + private static final int HASH_LENGTH = 32; + + /** + * Can't find a published standard for this anywhere. + * See the libtorrent code. + * Here we use the "added" key as a single string of concatenated + * 32-byte peer hashes. + * added.f and dropped unsupported + * @since 0.8.4 + */ + private static void handlePEX(Peer peer, PeerListener listener, byte[] bs) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Got PEX msg from " + peer); + try { + InputStream is = new ByteArrayInputStream(bs); + BDecoder dec = new BDecoder(is); + BEValue bev = dec.bdecodeMap(); + Map map = bev.getMap(); + byte[] ids = map.get("added").getBytes(); + if (ids.length < HASH_LENGTH) + return; + int len = Math.min(ids.length, (I2PSnarkUtil.MAX_CONNECTIONS - 1) * HASH_LENGTH); + List peers = new ArrayList(len / HASH_LENGTH); + for (int off = 0; off < len; off += HASH_LENGTH) { + byte[] hash = new byte[HASH_LENGTH]; + System.arraycopy(ids, off, hash, 0, HASH_LENGTH); + if (DataHelper.eq(hash, peer.getPeerID().getDestHash())) + continue; + PeerID pID = new PeerID(hash); + peers.add(pID); + } + // could include ourselves, listener must remove + listener.gotPeers(peer, peers); + } catch (Exception e) { + if (_log.shouldLog(Log.WARN)) + _log.info("PEX msg exception from " + peer, e); + //peer.disconnect(false); + } + } + + /** + * added.f and dropped unsupported + * @param pList non-null + * @since 0.8.4 + */ + public static void sendPEX(Peer peer, List pList) { + if (pList.isEmpty()) + return; + Map map = new HashMap(); + byte[] peers = new byte[HASH_LENGTH * pList.size()]; + int off = 0; + for (Peer p : pList) { + System.arraycopy(p.getPeerID().getDestHash(), 0, peers, off, HASH_LENGTH); + off += HASH_LENGTH; + } + map.put("added", peers); + byte[] payload = BEncoder.bencode(map); + try { + int hisMsgCode = peer.getHandshakeMap().get("m").getMap().get(TYPE_PEX).getInt(); + peer.sendExtension(hisMsgCode, payload); + } catch (Exception e) { + // NPE, no PEX caps + if (_log.shouldLog(Log.WARN)) + _log.info("PEX msg exception to " + peer, e); + } + } + } diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java b/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java index dbecc757e..7852840cc 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java @@ -1171,6 +1171,16 @@ public class PeerCoordinator implements PeerListener listener.gotMetaInfo(this, metainfo); } } + } else if (id == ExtensionHandler.ID_HANDSHAKE) { + try { + if (peer.getHandshakeMap().get("m").getMap().get(ExtensionHandler.TYPE_PEX) != null) { + List pList = peerList(); + pList.remove(peer); + ExtensionHandler.sendPEX(peer, pList); + } + } catch (Exception e) { + // NPE, no map + } } } @@ -1199,6 +1209,14 @@ public class PeerCoordinator implements PeerListener dht.ping(peer.getDestination(), port); } + /** + * PeerListener callback + * @since 0.8.4 + */ + public void gotPeers(Peer peer, List peers) { + + } + /** Return number of allowed uploaders for this torrent. ** Check with Snark to see if we are over the total upload limit. */ diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerListener.java b/apps/i2psnark/java/src/org/klomp/snark/PeerListener.java index 6d9e681cb..c7650a552 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerListener.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerListener.java @@ -191,11 +191,20 @@ interface PeerListener void gotExtension(Peer peer, int id, byte[] bs); /** - * Called when an extension message is received. + * Called when a port message is received. * * @param peer the Peer that got the message. * @param port the port * @since 0.8.4 */ void gotPort(Peer peer, int port); + + /** + * Called when peers are received via PEX + * + * @param peer the Peer that got the message. + * @param pIDList the peer IDs (dest hashes) + * @since 0.8.4 + */ + void gotPeers(Peer peer, List pIDList); }