work on handling pings
This commit is contained in:
@@ -6,6 +6,8 @@ import java.util.concurrent.atomic.AtomicBoolean
|
|||||||
import java.util.logging.Level
|
import java.util.logging.Level
|
||||||
|
|
||||||
import com.muwire.core.EventBus
|
import com.muwire.core.EventBus
|
||||||
|
import com.muwire.core.hostcache.HostCache
|
||||||
|
import com.muwire.core.hostcache.HostDiscoveredEvent
|
||||||
|
|
||||||
import groovy.util.logging.Log
|
import groovy.util.logging.Log
|
||||||
import net.i2p.data.Destination
|
import net.i2p.data.Destination
|
||||||
@@ -16,6 +18,7 @@ abstract class Connection implements Closeable {
|
|||||||
final EventBus eventBus
|
final EventBus eventBus
|
||||||
final Endpoint endpoint
|
final Endpoint endpoint
|
||||||
final boolean incoming
|
final boolean incoming
|
||||||
|
final HostCache hostCache
|
||||||
|
|
||||||
private final AtomicBoolean running = new AtomicBoolean()
|
private final AtomicBoolean running = new AtomicBoolean()
|
||||||
private final BlockingQueue messages = new LinkedBlockingQueue()
|
private final BlockingQueue messages = new LinkedBlockingQueue()
|
||||||
@@ -25,10 +28,11 @@ abstract class Connection implements Closeable {
|
|||||||
|
|
||||||
long lastPingSentTime, lastPingReceivedTime
|
long lastPingSentTime, lastPingReceivedTime
|
||||||
|
|
||||||
Connection(EventBus eventBus, Endpoint endpoint, boolean incoming) {
|
Connection(EventBus eventBus, Endpoint endpoint, boolean incoming, HostCache hostCache) {
|
||||||
this.eventBus = eventBus
|
this.eventBus = eventBus
|
||||||
this.incoming = incoming
|
this.incoming = incoming
|
||||||
this.endpoint = endpoint
|
this.endpoint = endpoint
|
||||||
|
this.hostCache = hostCache
|
||||||
|
|
||||||
this.name = endpoint.destination.toBase32().substring(0,8)
|
this.name = endpoint.destination.toBase32().substring(0,8)
|
||||||
|
|
||||||
@@ -102,4 +106,23 @@ abstract class Connection implements Closeable {
|
|||||||
ping.version = 1
|
ping.version = 1
|
||||||
messages.put(ping)
|
messages.put(ping)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void handlePing() {
|
||||||
|
log.fine("$name received ping")
|
||||||
|
def pong = [:]
|
||||||
|
pong.type = "Pong"
|
||||||
|
pong.version = 1
|
||||||
|
pong.pongs = hostCache.getGoodHosts(10).collect { d -> d.toBase64() }
|
||||||
|
messages.put(pong)
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void handlePong(def pong) {
|
||||||
|
log.fine("$name received pong")
|
||||||
|
if (pong.pongs == null)
|
||||||
|
throw new Exception("Pong doesn't have pongs")
|
||||||
|
pong.pongs.each {
|
||||||
|
def dest = new Destination(it)
|
||||||
|
eventBus.publish(new HostDiscoveredEvent(destination: dest))
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@@ -4,6 +4,7 @@ import java.io.InputStream
|
|||||||
import java.io.OutputStream
|
import java.io.OutputStream
|
||||||
|
|
||||||
import com.muwire.core.EventBus
|
import com.muwire.core.EventBus
|
||||||
|
import com.muwire.core.hostcache.HostCache
|
||||||
|
|
||||||
import net.i2p.data.Destination
|
import net.i2p.data.Destination
|
||||||
|
|
||||||
@@ -14,8 +15,8 @@ import net.i2p.data.Destination
|
|||||||
*/
|
*/
|
||||||
class LeafConnection extends Connection {
|
class LeafConnection extends Connection {
|
||||||
|
|
||||||
public LeafConnection(EventBus eventBus, Endpoint endpoint) {
|
public LeafConnection(EventBus eventBus, Endpoint endpoint, HostCache hostCache) {
|
||||||
super(eventBus, endpoint, true);
|
super(eventBus, endpoint, true, hostCache);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@@ -4,35 +4,73 @@ import java.io.InputStream
|
|||||||
import java.io.OutputStream
|
import java.io.OutputStream
|
||||||
|
|
||||||
import com.muwire.core.EventBus
|
import com.muwire.core.EventBus
|
||||||
|
import com.muwire.core.hostcache.HostCache
|
||||||
|
import com.muwire.core.util.DataUtil
|
||||||
|
|
||||||
|
import groovy.json.JsonOutput
|
||||||
|
import groovy.json.JsonSlurper
|
||||||
|
import groovy.util.logging.Log
|
||||||
import net.i2p.data.Destination
|
import net.i2p.data.Destination
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This side is an ultrapeer and the remote is an ultrapeer too
|
* This side is an ultrapeer and the remote is an ultrapeer too
|
||||||
* @author zab
|
* @author zab
|
||||||
*/
|
*/
|
||||||
|
@Log
|
||||||
class PeerConnection extends Connection {
|
class PeerConnection extends Connection {
|
||||||
|
|
||||||
private final DataInputStream dis
|
private final DataInputStream dis
|
||||||
private final DataOutputStream dos
|
private final DataOutputStream dos
|
||||||
|
|
||||||
|
private final byte[] readHeader = new byte[3]
|
||||||
|
private final byte[] writeHeader = new byte[3]
|
||||||
|
|
||||||
|
private final JsonSlurper slurper = new JsonSlurper()
|
||||||
|
|
||||||
public PeerConnection(EventBus eventBus, Endpoint endpoint,
|
public PeerConnection(EventBus eventBus, Endpoint endpoint,
|
||||||
boolean incoming) {
|
boolean incoming, HostCache hostCache) {
|
||||||
super(eventBus, endpoint, incoming)
|
super(eventBus, endpoint, incoming, hostCache)
|
||||||
this.dis = new DataInputStream(endpoint.inputStream)
|
this.dis = new DataInputStream(endpoint.inputStream)
|
||||||
this.dos = new DataOutputStream(endpoint.outputStream)
|
this.dos = new DataOutputStream(endpoint.outputStream)
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void read() {
|
protected void read() {
|
||||||
// TODO Auto-generated method stub
|
dis.readFully(readHeader)
|
||||||
|
int length = DataUtil.readLength(readHeader)
|
||||||
|
log.fine("$name read length $length")
|
||||||
|
|
||||||
|
byte[] payload = new byte[length]
|
||||||
|
dis.readFully(payload)
|
||||||
|
|
||||||
|
if (readHeader[0] & 0x80 == 0x80) {
|
||||||
|
// TODO process binary
|
||||||
|
} else {
|
||||||
|
def json = slurper.parse(payload)
|
||||||
|
if (json.type == null)
|
||||||
|
throw new Exception("missing json type")
|
||||||
|
switch(json.type) {
|
||||||
|
case "Ping" : handlePing(); break;
|
||||||
|
case "Pong" : handlePong(json); break;
|
||||||
|
default :
|
||||||
|
throw new Exception("unknown json type ${json.type}")
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void write(Object message) {
|
protected void write(Object message) {
|
||||||
// TODO Auto-generated method stub
|
byte [] payload
|
||||||
|
if (message instanceof Map) {
|
||||||
|
payload = JsonOutput.toJson(message)
|
||||||
|
DataUtil.packHeader(payload.bytes.length, writeHeader)
|
||||||
|
writeHeader[0] &= 0x7F
|
||||||
|
} else {
|
||||||
|
// TODO: write binary
|
||||||
|
}
|
||||||
|
|
||||||
|
dos.write(writeHeader)
|
||||||
|
dos.write(payload)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@@ -4,6 +4,7 @@ import java.io.InputStream
|
|||||||
import java.io.OutputStream
|
import java.io.OutputStream
|
||||||
|
|
||||||
import com.muwire.core.EventBus
|
import com.muwire.core.EventBus
|
||||||
|
import com.muwire.core.hostcache.HostCache
|
||||||
|
|
||||||
import net.i2p.data.Destination
|
import net.i2p.data.Destination
|
||||||
|
|
||||||
@@ -15,8 +16,8 @@ import net.i2p.data.Destination
|
|||||||
*/
|
*/
|
||||||
class UltrapeerConnection extends Connection {
|
class UltrapeerConnection extends Connection {
|
||||||
|
|
||||||
public UltrapeerConnection(EventBus eventBus, Endpoint endpoint) {
|
public UltrapeerConnection(EventBus eventBus, Endpoint endpoint, HostCache hostCache) {
|
||||||
super(eventBus, endpoint, false)
|
super(eventBus, endpoint, false, hostCache)
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
Reference in New Issue
Block a user