diff --git a/core/src/main/groovy/com/muwire/core/connection/Connection.groovy b/core/src/main/groovy/com/muwire/core/connection/Connection.groovy index 9f470f8c..b1e9cd30 100644 --- a/core/src/main/groovy/com/muwire/core/connection/Connection.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/Connection.groovy @@ -133,6 +133,7 @@ abstract class Connection implements Closeable { query.keywords = e.searchEvent.getSearchTerms() query.oobInfohash = e.searchEvent.oobInfohash query.searchComments = e.searchEvent.searchComments + query.compressedResults = e.searchEvent.compressedResults if (e.searchEvent.searchHash != null) query.infohash = Base64.encode(e.searchEvent.searchHash) query.replyTo = e.replyTo.toBase64() @@ -213,12 +214,16 @@ abstract class Connection implements Closeable { boolean searchComments = false if (search.searchComments != null) searchComments = search.searchComments + boolean compressedResults = false + if (search.compressedResults != null) + compressedResults = search.compressedResults SearchEvent searchEvent = new SearchEvent(searchTerms : search.keywords, searchHash : infohash, uuid : uuid, oobInfohash : oob, - searchComments : searchComments) + searchComments : searchComments, + compressedResults : compressedResults) QueryEvent event = new QueryEvent ( searchEvent : searchEvent, replyTo : replyTo, originator : originator, diff --git a/core/src/main/groovy/com/muwire/core/connection/ConnectionAcceptor.groovy b/core/src/main/groovy/com/muwire/core/connection/ConnectionAcceptor.groovy index c5fd9b6f..c9e86d53 100644 --- a/core/src/main/groovy/com/muwire/core/connection/ConnectionAcceptor.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/ConnectionAcceptor.groovy @@ -5,9 +5,11 @@ import java.util.concurrent.ExecutorService import java.util.concurrent.Executors import java.util.logging.Level import java.util.zip.DeflaterOutputStream +import java.util.zip.GZIPInputStream import java.util.zip.GZIPOutputStream import java.util.zip.InflaterInputStream +import com.muwire.core.Constants import com.muwire.core.EventBus import com.muwire.core.MuWireSettings import com.muwire.core.Persona @@ -28,6 +30,7 @@ import com.muwire.core.search.UnexpectedResultsException import groovy.json.JsonOutput import groovy.json.JsonSlurper import groovy.util.logging.Log +import net.i2p.data.Base64 @Log class ConnectionAcceptor { @@ -131,6 +134,9 @@ class ConnectionAcceptor { case (byte)'P': processPOST(e) break + case (byte)'R': + processRESULTS(e) + break case (byte)'T': processTRUST(e) break @@ -237,7 +243,7 @@ class ConnectionAcceptor { Persona sender = new Persona(dis) if (sender.destination != e.getDestination()) - throw new IOException("Sender destination mismatch expected $e.getDestination(), got $sender.destination") + throw new IOException("Sender destination mismatch expected ${e.getDestination()}, got $sender.destination") int nResults = dis.readUnsignedShort() UIResultEvent[] results = new UIResultEvent[nResults] for (int i = 0; i < nResults; i++) { @@ -255,6 +261,66 @@ class ConnectionAcceptor { } } + private void processRESULTS(Endpoint e) { + InputStream is = e.getInputStream() + DataInputStream dis = new DataInputStream(is) + byte[] esults = new byte[7] + dis.readFully(esults) + if (esults != "ESULTS ".getBytes(StandardCharsets.US_ASCII)) + throw new IOException("Invalid RESULTS connection") + + JsonSlurper slurper = new JsonSlurper() + try { + String uuid = DataUtil.readTillRN(dis) + UUID resultsUUID = UUID.fromString(uuid) + if (!searchManager.hasLocalSearch(resultsUUID)) + throw new UnexpectedResultsException(resultsUUID.toString()) + + + // parse all headers + Map headers = new HashMap<>() + String header + while((header = DataUtil.readTillRN(is)) != "" && headers.size() < Constants.MAX_HEADERS) { + int colon = header.indexOf(':') + if (colon == -1 || colon == header.length() - 1) + throw new IOException("invalid header $header") + String key = header.substring(0, colon) + String value = header.substring(colon + 1) + headers[key] = value.trim() + } + + if (!headers.containsKey("Sender")) + throw new IOException("No Sender header") + if (!headers.containsKey("Count")) + throw new IOException("No Count header") + + byte [] personaBytes = Base64.decode(headers['Sender']) + Persona sender = new Persona(new ByteArrayInputStream(personaBytes)) + if (sender.destination != e.getDestination()) + throw new IOException("Sender destination mismatch expected ${e.getDestination()}, got $sender.destination") + + int nResults = Integer.parseInt(headers['Count']) + if (nResults > Constants.MAX_RESULTS) + throw new IOException("too many results $nResults") + + dis = new DataInputStream(new GZIPInputStream(dis)) + UIResultEvent[] results = new UIResultEvent[nResults] + for (int i = 0; i < nResults; i++) { + int jsonSize = dis.readUnsignedShort() + byte [] payload = new byte[jsonSize] + dis.readFully(payload) + def json = slurper.parse(payload) + results[i] = ResultsParser.parse(sender, resultsUUID, json) + } + eventBus.publish(new UIResultBatchEvent(uuid: resultsUUID, results: results)) + } catch (IOException bad) { + log.log(Level.WARNING, "failed to process RESULTS", bad) + } finally { + e.close() + } + + } + private void processBROWSE(Endpoint e) { try { byte [] rowse = new byte[7] diff --git a/core/src/main/groovy/com/muwire/core/search/ResultsSender.groovy b/core/src/main/groovy/com/muwire/core/search/ResultsSender.groovy index 1289c58e..5a688f75 100644 --- a/core/src/main/groovy/com/muwire/core/search/ResultsSender.groovy +++ b/core/src/main/groovy/com/muwire/core/search/ResultsSender.groovy @@ -14,6 +14,7 @@ import java.util.concurrent.ThreadFactory import java.util.concurrent.atomic.AtomicInteger import java.util.logging.Level import java.util.stream.Collectors +import java.util.zip.GZIPOutputStream import com.muwire.core.DownloadedFile import com.muwire.core.EventBus @@ -53,7 +54,7 @@ class ResultsSender { this.settings = settings } - void sendResults(UUID uuid, SharedFile[] results, Destination target, boolean oobInfohash) { + void sendResults(UUID uuid, SharedFile[] results, Destination target, boolean oobInfohash, boolean compressedResults) { log.info("Sending $results.length results for uuid $uuid to ${target.toBase32()} oobInfohash : $oobInfohash") if (target.equals(me.destination)) { results.each { @@ -81,7 +82,7 @@ class ResultsSender { } } else { executor.execute(new ResultSendJob(uuid : uuid, results : results, - target: target, oobInfohash : oobInfohash)) + target: target, oobInfohash : oobInfohash, compressedResults : compressedResults)) } } @@ -90,27 +91,49 @@ class ResultsSender { SharedFile [] results Destination target boolean oobInfohash + boolean compressedResults @Override public void run() { try { JsonOutput jsonOutput = new JsonOutput() Endpoint endpoint = null; - try { - endpoint = connector.connect(target) - DataOutputStream os = new DataOutputStream(endpoint.getOutputStream()) - os.write("POST $uuid\r\n\r\n".getBytes(StandardCharsets.US_ASCII)) - me.write(os) - os.writeShort((short)results.length) - results.each { - def obj = sharedFileToObj(it, settings.browseFiles) - def json = jsonOutput.toJson(obj) - os.writeShort((short)json.length()) - os.write(json.getBytes(StandardCharsets.US_ASCII)) + if (!compressedResults) { + try { + endpoint = connector.connect(target) + DataOutputStream os = new DataOutputStream(endpoint.getOutputStream()) + os.write("POST $uuid\r\n\r\n".getBytes(StandardCharsets.US_ASCII)) + me.write(os) + os.writeShort((short)results.length) + results.each { + def obj = sharedFileToObj(it, settings.browseFiles) + def json = jsonOutput.toJson(obj) + os.writeShort((short)json.length()) + os.write(json.getBytes(StandardCharsets.US_ASCII)) + } + os.flush() + } finally { + endpoint?.close() + } + } else { + try { + endpoint = connector.connect(target) + OutputStream os = endpoint.getOutputStream() + os.write("RESULTS $uuid\r\n".getBytes(StandardCharsets.US_ASCII)) + os.write("Sender: ${me.toBase64()}\r\n".getBytes(StandardCharsets.US_ASCII)) + os.write("Count: $results.length\r\n".getBytes(StandardCharsets.US_ASCII)) + os.write("\r\n".getBytes(StandardCharsets.US_ASCII)) + DataOutputStream dos = new DataOutputStream(new GZIPOutputStream(os)) + results.each { + def obj = sharedFileToObj(it, settings.browseFiles) + def json = jsonOutput.toJson(obj) + dos.writeShort((short)json.length()) + dos.write(json.getBytes(StandardCharsets.US_ASCII)) + } + dos.close() + } finally { + endpoint?.close() } - os.flush() - } finally { - endpoint?.close() } } catch (Exception e) { log.log(Level.WARNING, "problem sending results",e) diff --git a/core/src/main/groovy/com/muwire/core/search/SearchEvent.groovy b/core/src/main/groovy/com/muwire/core/search/SearchEvent.groovy index 0d1fea56..541df9b9 100644 --- a/core/src/main/groovy/com/muwire/core/search/SearchEvent.groovy +++ b/core/src/main/groovy/com/muwire/core/search/SearchEvent.groovy @@ -10,11 +10,12 @@ class SearchEvent extends Event { UUID uuid boolean oobInfohash boolean searchComments + boolean compressedResults String toString() { def infoHash = null if (searchHash != null) infoHash = new InfoHash(searchHash) - "searchTerms: $searchTerms searchHash:$infoHash, uuid:$uuid oobInfohash:$oobInfohash searchComments:$searchComments" + "searchTerms: $searchTerms searchHash:$infoHash, uuid:$uuid oobInfohash:$oobInfohash searchComments:$searchComments compressedResults:$compressedResults" } } diff --git a/core/src/main/java/com/muwire/core/Constants.java b/core/src/main/java/com/muwire/core/Constants.java index e2c3019d..6541d84b 100644 --- a/core/src/main/java/com/muwire/core/Constants.java +++ b/core/src/main/java/com/muwire/core/Constants.java @@ -8,4 +8,6 @@ public class Constants { public static final int MAX_HEADER_SIZE = 0x1 << 14; public static final int MAX_HEADERS = 16; + + public static final int MAX_RESULTS = 0x1 << 16 } diff --git a/gui/griffon-app/controllers/com/muwire/gui/MainFrameController.groovy b/gui/griffon-app/controllers/com/muwire/gui/MainFrameController.groovy index c4e9b087..f4e76813 100644 --- a/gui/griffon-app/controllers/com/muwire/gui/MainFrameController.groovy +++ b/gui/griffon-app/controllers/com/muwire/gui/MainFrameController.groovy @@ -78,7 +78,7 @@ class MainFrameController { def searchEvent if (hashSearch) { - searchEvent = new SearchEvent(searchHash : root, uuid : uuid, oobInfohash: true) + searchEvent = new SearchEvent(searchHash : root, uuid : uuid, oobInfohash: true, compressedResults : true) } else { // this can be improved a lot def replaced = search.toLowerCase().trim().replaceAll(SplitPattern.SPLIT_PATTERN, " ") @@ -86,7 +86,7 @@ class MainFrameController { def nonEmpty = [] terms.each { if (it.length() > 0) nonEmpty << it } searchEvent = new SearchEvent(searchTerms : nonEmpty, uuid : uuid, oobInfohash: true, - searchComments : core.muOptions.searchComments) + searchComments : core.muOptions.searchComments, compressedResults : true) } boolean firstHop = core.muOptions.allowUntrusted || core.muOptions.searchExtraHop core.eventBus.publish(new QueryEvent(searchEvent : searchEvent, firstHop : firstHop,