diff --git a/core/src/main/groovy/com/muwire/core/Core.groovy b/core/src/main/groovy/com/muwire/core/Core.groovy index d41ed7b3..60c6923f 100644 --- a/core/src/main/groovy/com/muwire/core/Core.groovy +++ b/core/src/main/groovy/com/muwire/core/Core.groovy @@ -138,7 +138,7 @@ class Core { log.info("initializing connection manager") ConnectionManager connectionManager = props.isLeaf() ? - new LeafConnectionManager(eventBus,3, hostCache) : new UltrapeerConnectionManager(eventBus, 512, 512, hostCache, trustService) + new LeafConnectionManager(eventBus, me, 3, hostCache) : new UltrapeerConnectionManager(eventBus, me, 512, 512, hostCache, trustService) eventBus.register(TrustEvent.class, connectionManager) eventBus.register(ConnectionEvent.class, connectionManager) eventBus.register(DisconnectionEvent.class, connectionManager) diff --git a/core/src/main/groovy/com/muwire/core/connection/Connection.groovy b/core/src/main/groovy/com/muwire/core/connection/Connection.groovy index 8463fbec..633dedd8 100644 --- a/core/src/main/groovy/com/muwire/core/connection/Connection.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/Connection.groovy @@ -163,7 +163,8 @@ abstract class Connection implements Closeable { uuid : uuid) QueryEvent event = new QueryEvent ( searchEvent : searchEvent, replyTo : replyTo, - receivedOn : endpoint.destination ) + receivedOn : endpoint.destination, + firstHop : search.firstHop ) eventBus.publish(event) } diff --git a/core/src/main/groovy/com/muwire/core/connection/ConnectionManager.groovy b/core/src/main/groovy/com/muwire/core/connection/ConnectionManager.groovy index c7b82862..157b8025 100644 --- a/core/src/main/groovy/com/muwire/core/connection/ConnectionManager.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/ConnectionManager.groovy @@ -1,6 +1,7 @@ package com.muwire.core.connection import com.muwire.core.EventBus +import com.muwire.core.Persona import com.muwire.core.hostcache.HostCache import com.muwire.core.search.QueryEvent import com.muwire.core.trust.TrustEvent @@ -17,11 +18,13 @@ abstract class ConnectionManager { private final Timer timer protected final HostCache hostCache + protected final Persona me ConnectionManager() {} - ConnectionManager(EventBus eventBus, HostCache hostCache) { + ConnectionManager(EventBus eventBus, Persona me, HostCache hostCache) { this.eventBus = eventBus + this.me = me this.hostCache = hostCache this.timer = new Timer("connections-pinger",true) } @@ -40,13 +43,6 @@ abstract class ConnectionManager { drop(e.destination) } - void onQueryEvent(QueryEvent e) { - getConnections().each { - if (e.getReceivedOn() != it.getEndpoint().getDestination()) - it.sendQuery(e) - } - } - abstract void drop(Destination d) abstract Collection getConnections() diff --git a/core/src/main/groovy/com/muwire/core/connection/LeafConnectionManager.groovy b/core/src/main/groovy/com/muwire/core/connection/LeafConnectionManager.groovy index 19f9a79f..73d682b4 100644 --- a/core/src/main/groovy/com/muwire/core/connection/LeafConnectionManager.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/LeafConnectionManager.groovy @@ -3,7 +3,9 @@ package com.muwire.core.connection import java.util.concurrent.ConcurrentHashMap import com.muwire.core.EventBus +import com.muwire.core.Persona import com.muwire.core.hostcache.HostCache +import com.muwire.core.search.QueryEvent import groovy.util.logging.Log import net.i2p.data.Destination @@ -15,8 +17,8 @@ class LeafConnectionManager extends ConnectionManager { final Map connections = new ConcurrentHashMap() - public LeafConnectionManager(EventBus eventBus, int maxConnections, HostCache hostCache) { - super(eventBus, hostCache) + public LeafConnectionManager(EventBus eventBus, Persona me, int maxConnections, HostCache hostCache) { + super(eventBus, me, hostCache) this.maxConnections = maxConnections } @@ -25,6 +27,13 @@ class LeafConnectionManager extends ConnectionManager { // TODO Auto-generated method stub } + + void onQueryEvent(QueryEvent e) { + if (me.destination == e.receivedOn) { + connections.values().each { it.sendQuery(e) } + } + + } @Override public Collection getConnections() { diff --git a/core/src/main/groovy/com/muwire/core/connection/UltrapeerConnectionManager.groovy b/core/src/main/groovy/com/muwire/core/connection/UltrapeerConnectionManager.groovy index 9acb0d13..c4e9f4ec 100644 --- a/core/src/main/groovy/com/muwire/core/connection/UltrapeerConnectionManager.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/UltrapeerConnectionManager.groovy @@ -4,6 +4,7 @@ import java.util.Collection import java.util.concurrent.ConcurrentHashMap import com.muwire.core.EventBus +import com.muwire.core.Persona import com.muwire.core.hostcache.HostCache import com.muwire.core.search.QueryEvent import com.muwire.core.trust.TrustService @@ -22,9 +23,9 @@ class UltrapeerConnectionManager extends ConnectionManager { UltrapeerConnectionManager() {} - public UltrapeerConnectionManager(EventBus eventBus, int maxPeers, int maxLeafs, + public UltrapeerConnectionManager(EventBus eventBus, Persona me, int maxPeers, int maxLeafs, HostCache hostCache, TrustService trustService) { - super(eventBus, hostCache) + super(eventBus, me, hostCache) this.maxPeers = maxPeers this.maxLeafs = maxLeafs this.trustService = trustService @@ -34,6 +35,19 @@ class UltrapeerConnectionManager extends ConnectionManager { // TODO Auto-generated method stub } + + void onQueryEvent(QueryEvent e) { + forwardQueryToLeafs(e) + if (!e.firstHop) + return + if (e.replyTo != me.destination && e.receivedOn != me.destination && + !leafConnections.containsKey(e.receivedOn)) + e.firstHop = false + peerConnections.values().each { + if (e.getReceivedOn() != it.getEndpoint().getDestination()) + it.sendQuery(e) + } + } @Override public Collection getConnections() { @@ -87,7 +101,7 @@ class UltrapeerConnectionManager extends ConnectionManager { log.severe("Removed connection not present in either leaf or peer map ${e.destination.toBase32()}") } - void forwardQueryToLeaf(Destination leaf, QueryEvent e) { + void forwardQueryToLeafs(QueryEvent e) { } }