HostCache client

This commit is contained in:
Zlatin Balevsky
2018-07-23 15:02:45 +01:00
parent e860ee1e20
commit 0b43be2bc4
4 changed files with 114 additions and 8 deletions

View File

@@ -21,5 +21,5 @@ abstract class ConnectionManager {
abstract void drop(Destination d) abstract void drop(Destination d)
abstract boolean hasConnection() abstract Collection<Connection> getConnections()
} }

View File

@@ -20,8 +20,8 @@ class LeafConnectionManager extends ConnectionManager {
} }
@Override @Override
public boolean hasConnection() { public Collection<Connection> getConnections() {
// TODO implement // TODO implement
false []
} }
} }

View File

@@ -1,5 +1,7 @@
package com.muwire.core.connection package com.muwire.core.connection
import java.util.Collection
import com.muwire.core.EventBus import com.muwire.core.EventBus
import net.i2p.data.Destination import net.i2p.data.Destination
@@ -20,8 +22,18 @@ class UltrapeerConnectionManager extends ConnectionManager {
} }
@Override @Override
public boolean hasConnection() { public Collection<Connection> getConnections() {
// TODO: implement // TODO implement
false []
}
boolean hasLeafSlots() {
// TODO implement
true
}
boolean hasPeerSlots() {
// TODO implement
true
} }
} }

View File

@@ -3,17 +3,23 @@ package com.muwire.core.hostcache
import com.muwire.core.EventBus import com.muwire.core.EventBus
import com.muwire.core.MuWireSettings import com.muwire.core.MuWireSettings
import com.muwire.core.connection.ConnectionManager import com.muwire.core.connection.ConnectionManager
import com.muwire.core.connection.UltrapeerConnectionManager
import groovy.json.JsonOutput import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import groovy.util.logging.Log import groovy.util.logging.Log
import net.i2p.client.I2PSession import net.i2p.client.I2PSession
import net.i2p.client.I2PSessionMuxedListener import net.i2p.client.I2PSessionMuxedListener
import net.i2p.client.SendMessageOptions import net.i2p.client.SendMessageOptions
import net.i2p.client.datagram.I2PDatagramDissector
import net.i2p.client.datagram.I2PDatagramMaker import net.i2p.client.datagram.I2PDatagramMaker
import net.i2p.data.Destination
@Log @Log
class CacheClient { class CacheClient {
private static final int CRAWLER_RETURN = 10
final EventBus eventBus final EventBus eventBus
final HostCache cache final HostCache cache
final ConnectionManager manager final ConnectionManager manager
@@ -42,7 +48,7 @@ class CacheClient {
} }
private void queryIfNeeded() { private void queryIfNeeded() {
if (manager.hasConnection()) if (!manager.getConnections().isEmpty())
return return
if (!cache.getHosts(1).isEmpty()) if (!cache.getHosts(1).isEmpty())
return return
@@ -62,6 +68,8 @@ class CacheClient {
} }
class Listener implements I2PSessionMuxedListener { class Listener implements I2PSessionMuxedListener {
private final JsonSlurper slurper = new JsonSlurper()
@Override @Override
public void messageAvailable(I2PSession session, int msgId, long size) { public void messageAvailable(I2PSession session, int msgId, long size) {
@@ -69,8 +77,35 @@ class CacheClient {
@Override @Override
public void messageAvailable(I2PSession session, int msgId, long size, int proto, int fromport, int toport) { public void messageAvailable(I2PSession session, int msgId, long size, int proto, int fromport, int toport) {
// TODO Auto-generated method stub
if (proto != I2PSession.PROTO_DATAGRAM) {
log.warning "Received unexpected protocol $proto"
return
}
def payload = session.receiveMessage(msgId)
def dissector = new I2PDatagramDissector()
try {
dissector.loadI2PDatagram(payload)
def sender = dissector.getSender()
log.info("Received something from ${sender.toBase32()}")
payload = dissector.getPayload()
payload = slurper.parse(payload)
if (payload.type == null) {
log.warning("type missing")
return
}
switch(payload.type) {
case "Pong" : handlePong(sender, payload); break
case "CrawlerPing": handleCrawlerPing(session, sender, payload); break
default : log.warning("unknown type ${payload.type}")
}
} catch (Exception e) {
log.warning("Invalid datagram $e")
}
} }
@Override @Override
@@ -88,5 +123,64 @@ class CacheClient {
} }
} }
private void handlePong(Destination from, def pong) {
if (!CacheServers.isRegistered(from)) {
log.warning("received pong from non-registered destination")
return
}
if (pong.pongs == null) {
log.warning("malformed pong - no pongs")
return
}
pong.pongs.asList().each { eventBus.publish(new HostDiscoveredEvent(destination: new Destination(it))) }
}
private void handleCrawlerPing(I2PSession session, Destination from, def ping) {
if (settings.isLeaf()) {
log.warning("Received crawler ping but I'm a leaf")
return
}
switch(settings.getCrawlerResponse()) {
case CrawlerResponse.NONE:
log.info("Responding to crawlers is disabled by user")
break
case CrawlerResponse.ALL:
respondToCrawler(session, from, ping)
break;
case CrawlerResponse.REGISTERED:
if (CacheServers.isRegistered(from))
respondToCrawler(session, from, ping)
else
log.warning("Ignoring crawler ping from non-registered crawler")
break
}
}
private void respondToCrawler(I2PSession session, Destination from, def ping) {
log.info "responding to crawler ping"
def neighbors = manager.getConnections().collect { c -> c.remoteSide.toBase64() }
Collections.shuffle(neighbors)
if (neighbors.size() > CRAWLER_RETURN)
neighbors = neighbors[0..CRAWLER_RETURN - 1]
def upManager = (UltrapeerConnectionManager) manager;
def pong = [:]
pong.peers = neighbors
pong.uuid = ping.uuid
pong.version = 1
pong.leafSlots = upManager.hasLeafSlots()
pong.peerSlots = upManager.hasPeerSlots()
pong = JsonOutput.toJson(pong)
def maker = new I2PDatagramMaker(session)
pong = maker.makeI2PDatagram(pong.bytes)
session.sendMessage(from, pong, I2PSession.PROTO_DATAGRAM, 0, 0)
}
} }