From 736a4a74243a76ba083687424f9e4fcb0c939150 Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Fri, 27 Jul 2018 15:48:23 +0100 Subject: [PATCH] Configure deflater output streams for sync flush, flush after each message --- .../com/muwire/core/connection/ConnectionAcceptor.groovy | 2 +- .../muwire/core/connection/ConnectionEstablisher.groovy | 3 ++- .../com/muwire/core/connection/PeerConnection.groovy | 8 +++++--- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/core/src/main/groovy/com/muwire/core/connection/ConnectionAcceptor.groovy b/core/src/main/groovy/com/muwire/core/connection/ConnectionAcceptor.groovy index 5434dc1a..e34cba9b 100644 --- a/core/src/main/groovy/com/muwire/core/connection/ConnectionAcceptor.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/ConnectionAcceptor.groovy @@ -139,7 +139,7 @@ class ConnectionAcceptor { log.info("accepting connection, leaf:$leaf") e.outputStream.write("OK".bytes) e.outputStream.flush() - def wrapped = new Endpoint(e.destination, new InflaterInputStream(e.inputStream), new DeflaterOutputStream(e.outputStream)) + def wrapped = new Endpoint(e.destination, new InflaterInputStream(e.inputStream), new DeflaterOutputStream(e.outputStream, true)) eventBus.publish(new ConnectionEvent(endpoint: wrapped, incoming: true, leaf: leaf, status: ConnectionAttemptStatus.SUCCESSFUL)) } else { log.info("rejecting connection, leaf:$leaf") diff --git a/core/src/main/groovy/com/muwire/core/connection/ConnectionEstablisher.groovy b/core/src/main/groovy/com/muwire/core/connection/ConnectionEstablisher.groovy index 9c13a79e..2efae001 100644 --- a/core/src/main/groovy/com/muwire/core/connection/ConnectionEstablisher.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/ConnectionEstablisher.groovy @@ -4,6 +4,7 @@ import java.util.concurrent.ExecutorService import java.util.concurrent.Executors import java.util.concurrent.ThreadFactory import java.util.logging.Level +import java.util.zip.Deflater import java.util.zip.DeflaterInputStream import java.util.zip.DeflaterOutputStream import java.util.zip.InflaterInputStream @@ -132,7 +133,7 @@ class ConnectionEstablisher { log.info("connection to ${e.destination.toBase32()} established") // wrap into deflater / inflater streams and publish - def wrapped = new Endpoint(e.destination, new InflaterInputStream(e.inputStream), new DeflaterOutputStream(e.outputStream)) + def wrapped = new Endpoint(e.destination, new InflaterInputStream(e.inputStream), new DeflaterOutputStream(e.outputStream, true)) eventBus.publish(new ConnectionEvent(endpoint: wrapped, incoming: false, leaf: false, status: ConnectionAttemptStatus.SUCCESSFUL)) } diff --git a/core/src/main/groovy/com/muwire/core/connection/PeerConnection.groovy b/core/src/main/groovy/com/muwire/core/connection/PeerConnection.groovy index 2dc1a078..64d9b33a 100644 --- a/core/src/main/groovy/com/muwire/core/connection/PeerConnection.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/PeerConnection.groovy @@ -43,7 +43,7 @@ class PeerConnection extends Connection { byte[] payload = new byte[length] dis.readFully(payload) - if (readHeader[0] & 0x80 == 0x80) { + if ((readHeader[0] & (byte)0x80) == 0x80) { // TODO process binary } else { def json = slurper.parse(payload) @@ -60,9 +60,10 @@ class PeerConnection extends Connection { @Override protected void write(Object message) { - byte [] payload + byte[] payload if (message instanceof Map) { - payload = JsonOutput.toJson(message) + log.fine "$name writing message type ${message.type}" + payload = JsonOutput.toJson(message).bytes DataUtil.packHeader(payload.length, writeHeader) writeHeader[0] &= 0x7F } else { @@ -71,6 +72,7 @@ class PeerConnection extends Connection { dos.write(writeHeader) dos.write(payload) + dos.flush() } }