diff --git a/core/src/main/groovy/com/muwire/core/connection/ConnectionManager.groovy b/core/src/main/groovy/com/muwire/core/connection/ConnectionManager.groovy index 0f9d6144..7af84855 100644 --- a/core/src/main/groovy/com/muwire/core/connection/ConnectionManager.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/ConnectionManager.groovy @@ -21,5 +21,5 @@ abstract class ConnectionManager { abstract void drop(Destination d) - abstract boolean hasConnection() + abstract Collection getConnections() } diff --git a/core/src/main/groovy/com/muwire/core/connection/LeafConnectionManager.groovy b/core/src/main/groovy/com/muwire/core/connection/LeafConnectionManager.groovy index 6ce4babc..78cfd7d2 100644 --- a/core/src/main/groovy/com/muwire/core/connection/LeafConnectionManager.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/LeafConnectionManager.groovy @@ -20,8 +20,8 @@ class LeafConnectionManager extends ConnectionManager { } @Override - public boolean hasConnection() { + public Collection getConnections() { // TODO implement - false + [] } } diff --git a/core/src/main/groovy/com/muwire/core/connection/UltrapeerConnectionManager.groovy b/core/src/main/groovy/com/muwire/core/connection/UltrapeerConnectionManager.groovy index cc760f20..a6b2ee3b 100644 --- a/core/src/main/groovy/com/muwire/core/connection/UltrapeerConnectionManager.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/UltrapeerConnectionManager.groovy @@ -1,5 +1,7 @@ package com.muwire.core.connection +import java.util.Collection + import com.muwire.core.EventBus import net.i2p.data.Destination @@ -20,8 +22,18 @@ class UltrapeerConnectionManager extends ConnectionManager { } @Override - public boolean hasConnection() { - // TODO: implement - false + public Collection getConnections() { + // TODO implement + [] + } + + boolean hasLeafSlots() { + // TODO implement + true + } + + boolean hasPeerSlots() { + // TODO implement + true } } diff --git a/core/src/main/groovy/com/muwire/core/hostcache/CacheClient.groovy b/core/src/main/groovy/com/muwire/core/hostcache/CacheClient.groovy index 36d8f120..340e96e8 100644 --- a/core/src/main/groovy/com/muwire/core/hostcache/CacheClient.groovy +++ b/core/src/main/groovy/com/muwire/core/hostcache/CacheClient.groovy @@ -3,17 +3,23 @@ package com.muwire.core.hostcache import com.muwire.core.EventBus import com.muwire.core.MuWireSettings import com.muwire.core.connection.ConnectionManager +import com.muwire.core.connection.UltrapeerConnectionManager import groovy.json.JsonOutput +import groovy.json.JsonSlurper import groovy.util.logging.Log import net.i2p.client.I2PSession import net.i2p.client.I2PSessionMuxedListener import net.i2p.client.SendMessageOptions +import net.i2p.client.datagram.I2PDatagramDissector import net.i2p.client.datagram.I2PDatagramMaker +import net.i2p.data.Destination @Log class CacheClient { + private static final int CRAWLER_RETURN = 10 + final EventBus eventBus final HostCache cache final ConnectionManager manager @@ -42,7 +48,7 @@ class CacheClient { } private void queryIfNeeded() { - if (manager.hasConnection()) + if (!manager.getConnections().isEmpty()) return if (!cache.getHosts(1).isEmpty()) return @@ -62,6 +68,8 @@ class CacheClient { } class Listener implements I2PSessionMuxedListener { + + private final JsonSlurper slurper = new JsonSlurper() @Override public void messageAvailable(I2PSession session, int msgId, long size) { @@ -69,8 +77,35 @@ class CacheClient { @Override public void messageAvailable(I2PSession session, int msgId, long size, int proto, int fromport, int toport) { - // TODO Auto-generated method stub + if (proto != I2PSession.PROTO_DATAGRAM) { + log.warning "Received unexpected protocol $proto" + return + } + + def payload = session.receiveMessage(msgId) + def dissector = new I2PDatagramDissector() + try { + dissector.loadI2PDatagram(payload) + def sender = dissector.getSender() + log.info("Received something from ${sender.toBase32()}") + + payload = dissector.getPayload() + payload = slurper.parse(payload) + + if (payload.type == null) { + log.warning("type missing") + return + } + + switch(payload.type) { + case "Pong" : handlePong(sender, payload); break + case "CrawlerPing": handleCrawlerPing(session, sender, payload); break + default : log.warning("unknown type ${payload.type}") + } + } catch (Exception e) { + log.warning("Invalid datagram $e") + } } @Override @@ -88,5 +123,64 @@ class CacheClient { } } + + private void handlePong(Destination from, def pong) { + if (!CacheServers.isRegistered(from)) { + log.warning("received pong from non-registered destination") + return + } + + if (pong.pongs == null) { + log.warning("malformed pong - no pongs") + return + } + + pong.pongs.asList().each { eventBus.publish(new HostDiscoveredEvent(destination: new Destination(it))) } + + } + + private void handleCrawlerPing(I2PSession session, Destination from, def ping) { + if (settings.isLeaf()) { + log.warning("Received crawler ping but I'm a leaf") + return + } + + switch(settings.getCrawlerResponse()) { + case CrawlerResponse.NONE: + log.info("Responding to crawlers is disabled by user") + break + case CrawlerResponse.ALL: + respondToCrawler(session, from, ping) + break; + case CrawlerResponse.REGISTERED: + if (CacheServers.isRegistered(from)) + respondToCrawler(session, from, ping) + else + log.warning("Ignoring crawler ping from non-registered crawler") + break + } + } + + private void respondToCrawler(I2PSession session, Destination from, def ping) { + log.info "responding to crawler ping" + + def neighbors = manager.getConnections().collect { c -> c.remoteSide.toBase64() } + Collections.shuffle(neighbors) + if (neighbors.size() > CRAWLER_RETURN) + neighbors = neighbors[0..CRAWLER_RETURN - 1] + + def upManager = (UltrapeerConnectionManager) manager; + def pong = [:] + pong.peers = neighbors + pong.uuid = ping.uuid + pong.version = 1 + pong.leafSlots = upManager.hasLeafSlots() + pong.peerSlots = upManager.hasPeerSlots() + pong = JsonOutput.toJson(pong) + + def maker = new I2PDatagramMaker(session) + pong = maker.makeI2PDatagram(pong.bytes) + session.sendMessage(from, pong, I2PSession.PROTO_DATAGRAM, 0, 0) + } }