implement routing of queries

This commit is contained in:
Zlatin Balevsky
2019-05-25 18:25:43 +01:00
parent 1b27e22941
commit eae1b01c1c
5 changed files with 35 additions and 15 deletions

View File

@@ -138,7 +138,7 @@ class Core {
log.info("initializing connection manager") log.info("initializing connection manager")
ConnectionManager connectionManager = props.isLeaf() ? ConnectionManager connectionManager = props.isLeaf() ?
new LeafConnectionManager(eventBus,3, hostCache) : new UltrapeerConnectionManager(eventBus, 512, 512, hostCache, trustService) new LeafConnectionManager(eventBus, me, 3, hostCache) : new UltrapeerConnectionManager(eventBus, me, 512, 512, hostCache, trustService)
eventBus.register(TrustEvent.class, connectionManager) eventBus.register(TrustEvent.class, connectionManager)
eventBus.register(ConnectionEvent.class, connectionManager) eventBus.register(ConnectionEvent.class, connectionManager)
eventBus.register(DisconnectionEvent.class, connectionManager) eventBus.register(DisconnectionEvent.class, connectionManager)

View File

@@ -163,7 +163,8 @@ abstract class Connection implements Closeable {
uuid : uuid) uuid : uuid)
QueryEvent event = new QueryEvent ( searchEvent : searchEvent, QueryEvent event = new QueryEvent ( searchEvent : searchEvent,
replyTo : replyTo, replyTo : replyTo,
receivedOn : endpoint.destination ) receivedOn : endpoint.destination,
firstHop : search.firstHop )
eventBus.publish(event) eventBus.publish(event)
} }

View File

@@ -1,6 +1,7 @@
package com.muwire.core.connection package com.muwire.core.connection
import com.muwire.core.EventBus import com.muwire.core.EventBus
import com.muwire.core.Persona
import com.muwire.core.hostcache.HostCache import com.muwire.core.hostcache.HostCache
import com.muwire.core.search.QueryEvent import com.muwire.core.search.QueryEvent
import com.muwire.core.trust.TrustEvent import com.muwire.core.trust.TrustEvent
@@ -17,11 +18,13 @@ abstract class ConnectionManager {
private final Timer timer private final Timer timer
protected final HostCache hostCache protected final HostCache hostCache
protected final Persona me
ConnectionManager() {} ConnectionManager() {}
ConnectionManager(EventBus eventBus, HostCache hostCache) { ConnectionManager(EventBus eventBus, Persona me, HostCache hostCache) {
this.eventBus = eventBus this.eventBus = eventBus
this.me = me
this.hostCache = hostCache this.hostCache = hostCache
this.timer = new Timer("connections-pinger",true) this.timer = new Timer("connections-pinger",true)
} }
@@ -40,13 +43,6 @@ abstract class ConnectionManager {
drop(e.destination) drop(e.destination)
} }
void onQueryEvent(QueryEvent e) {
getConnections().each {
if (e.getReceivedOn() != it.getEndpoint().getDestination())
it.sendQuery(e)
}
}
abstract void drop(Destination d) abstract void drop(Destination d)
abstract Collection<Connection> getConnections() abstract Collection<Connection> getConnections()

View File

@@ -3,7 +3,9 @@ package com.muwire.core.connection
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import com.muwire.core.EventBus import com.muwire.core.EventBus
import com.muwire.core.Persona
import com.muwire.core.hostcache.HostCache import com.muwire.core.hostcache.HostCache
import com.muwire.core.search.QueryEvent
import groovy.util.logging.Log import groovy.util.logging.Log
import net.i2p.data.Destination import net.i2p.data.Destination
@@ -15,8 +17,8 @@ class LeafConnectionManager extends ConnectionManager {
final Map<Destination, UltrapeerConnection> connections = new ConcurrentHashMap() final Map<Destination, UltrapeerConnection> connections = new ConcurrentHashMap()
public LeafConnectionManager(EventBus eventBus, int maxConnections, HostCache hostCache) { public LeafConnectionManager(EventBus eventBus, Persona me, int maxConnections, HostCache hostCache) {
super(eventBus, hostCache) super(eventBus, me, hostCache)
this.maxConnections = maxConnections this.maxConnections = maxConnections
} }
@@ -26,6 +28,13 @@ class LeafConnectionManager extends ConnectionManager {
} }
void onQueryEvent(QueryEvent e) {
if (me.destination == e.receivedOn) {
connections.values().each { it.sendQuery(e) }
}
}
@Override @Override
public Collection<Connection> getConnections() { public Collection<Connection> getConnections() {
connections.values() connections.values()

View File

@@ -4,6 +4,7 @@ import java.util.Collection
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import com.muwire.core.EventBus import com.muwire.core.EventBus
import com.muwire.core.Persona
import com.muwire.core.hostcache.HostCache import com.muwire.core.hostcache.HostCache
import com.muwire.core.search.QueryEvent import com.muwire.core.search.QueryEvent
import com.muwire.core.trust.TrustService import com.muwire.core.trust.TrustService
@@ -22,9 +23,9 @@ class UltrapeerConnectionManager extends ConnectionManager {
UltrapeerConnectionManager() {} UltrapeerConnectionManager() {}
public UltrapeerConnectionManager(EventBus eventBus, int maxPeers, int maxLeafs, public UltrapeerConnectionManager(EventBus eventBus, Persona me, int maxPeers, int maxLeafs,
HostCache hostCache, TrustService trustService) { HostCache hostCache, TrustService trustService) {
super(eventBus, hostCache) super(eventBus, me, hostCache)
this.maxPeers = maxPeers this.maxPeers = maxPeers
this.maxLeafs = maxLeafs this.maxLeafs = maxLeafs
this.trustService = trustService this.trustService = trustService
@@ -35,6 +36,19 @@ class UltrapeerConnectionManager extends ConnectionManager {
} }
void onQueryEvent(QueryEvent e) {
forwardQueryToLeafs(e)
if (!e.firstHop)
return
if (e.replyTo != me.destination && e.receivedOn != me.destination &&
!leafConnections.containsKey(e.receivedOn))
e.firstHop = false
peerConnections.values().each {
if (e.getReceivedOn() != it.getEndpoint().getDestination())
it.sendQuery(e)
}
}
@Override @Override
public Collection<Connection> getConnections() { public Collection<Connection> getConnections() {
def rv = new ArrayList(peerConnections.size() + leafConnections.size()) def rv = new ArrayList(peerConnections.size() + leafConnections.size())
@@ -87,7 +101,7 @@ class UltrapeerConnectionManager extends ConnectionManager {
log.severe("Removed connection not present in either leaf or peer map ${e.destination.toBase32()}") log.severe("Removed connection not present in either leaf or peer map ${e.destination.toBase32()}")
} }
void forwardQueryToLeaf(Destination leaf, QueryEvent e) { void forwardQueryToLeafs(QueryEvent e) {
} }
} }