compressed results

This commit is contained in:
Zlatin Balevsky
2019-10-20 00:54:32 +01:00
parent 13f7b8563c
commit 46de1baf88
6 changed files with 118 additions and 21 deletions

View File

@@ -133,6 +133,7 @@ abstract class Connection implements Closeable {
query.keywords = e.searchEvent.getSearchTerms() query.keywords = e.searchEvent.getSearchTerms()
query.oobInfohash = e.searchEvent.oobInfohash query.oobInfohash = e.searchEvent.oobInfohash
query.searchComments = e.searchEvent.searchComments query.searchComments = e.searchEvent.searchComments
query.compressedResults = e.searchEvent.compressedResults
if (e.searchEvent.searchHash != null) if (e.searchEvent.searchHash != null)
query.infohash = Base64.encode(e.searchEvent.searchHash) query.infohash = Base64.encode(e.searchEvent.searchHash)
query.replyTo = e.replyTo.toBase64() query.replyTo = e.replyTo.toBase64()
@@ -213,12 +214,16 @@ abstract class Connection implements Closeable {
boolean searchComments = false boolean searchComments = false
if (search.searchComments != null) if (search.searchComments != null)
searchComments = search.searchComments searchComments = search.searchComments
boolean compressedResults = false
if (search.compressedResults != null)
compressedResults = search.compressedResults
SearchEvent searchEvent = new SearchEvent(searchTerms : search.keywords, SearchEvent searchEvent = new SearchEvent(searchTerms : search.keywords,
searchHash : infohash, searchHash : infohash,
uuid : uuid, uuid : uuid,
oobInfohash : oob, oobInfohash : oob,
searchComments : searchComments) searchComments : searchComments,
compressedResults : compressedResults)
QueryEvent event = new QueryEvent ( searchEvent : searchEvent, QueryEvent event = new QueryEvent ( searchEvent : searchEvent,
replyTo : replyTo, replyTo : replyTo,
originator : originator, originator : originator,

View File

@@ -5,9 +5,11 @@ import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors import java.util.concurrent.Executors
import java.util.logging.Level import java.util.logging.Level
import java.util.zip.DeflaterOutputStream import java.util.zip.DeflaterOutputStream
import java.util.zip.GZIPInputStream
import java.util.zip.GZIPOutputStream import java.util.zip.GZIPOutputStream
import java.util.zip.InflaterInputStream import java.util.zip.InflaterInputStream
import com.muwire.core.Constants
import com.muwire.core.EventBus import com.muwire.core.EventBus
import com.muwire.core.MuWireSettings import com.muwire.core.MuWireSettings
import com.muwire.core.Persona import com.muwire.core.Persona
@@ -28,6 +30,7 @@ import com.muwire.core.search.UnexpectedResultsException
import groovy.json.JsonOutput import groovy.json.JsonOutput
import groovy.json.JsonSlurper import groovy.json.JsonSlurper
import groovy.util.logging.Log import groovy.util.logging.Log
import net.i2p.data.Base64
@Log @Log
class ConnectionAcceptor { class ConnectionAcceptor {
@@ -131,6 +134,9 @@ class ConnectionAcceptor {
case (byte)'P': case (byte)'P':
processPOST(e) processPOST(e)
break break
case (byte)'R':
processRESULTS(e)
break
case (byte)'T': case (byte)'T':
processTRUST(e) processTRUST(e)
break break
@@ -237,7 +243,7 @@ class ConnectionAcceptor {
Persona sender = new Persona(dis) Persona sender = new Persona(dis)
if (sender.destination != e.getDestination()) if (sender.destination != e.getDestination())
throw new IOException("Sender destination mismatch expected $e.getDestination(), got $sender.destination") throw new IOException("Sender destination mismatch expected ${e.getDestination()}, got $sender.destination")
int nResults = dis.readUnsignedShort() int nResults = dis.readUnsignedShort()
UIResultEvent[] results = new UIResultEvent[nResults] UIResultEvent[] results = new UIResultEvent[nResults]
for (int i = 0; i < nResults; i++) { for (int i = 0; i < nResults; i++) {
@@ -255,6 +261,66 @@ class ConnectionAcceptor {
} }
} }
private void processRESULTS(Endpoint e) {
InputStream is = e.getInputStream()
DataInputStream dis = new DataInputStream(is)
byte[] esults = new byte[7]
dis.readFully(esults)
if (esults != "ESULTS ".getBytes(StandardCharsets.US_ASCII))
throw new IOException("Invalid RESULTS connection")
JsonSlurper slurper = new JsonSlurper()
try {
String uuid = DataUtil.readTillRN(dis)
UUID resultsUUID = UUID.fromString(uuid)
if (!searchManager.hasLocalSearch(resultsUUID))
throw new UnexpectedResultsException(resultsUUID.toString())
// parse all headers
Map<String,String> headers = new HashMap<>()
String header
while((header = DataUtil.readTillRN(is)) != "" && headers.size() < Constants.MAX_HEADERS) {
int colon = header.indexOf(':')
if (colon == -1 || colon == header.length() - 1)
throw new IOException("invalid header $header")
String key = header.substring(0, colon)
String value = header.substring(colon + 1)
headers[key] = value.trim()
}
if (!headers.containsKey("Sender"))
throw new IOException("No Sender header")
if (!headers.containsKey("Count"))
throw new IOException("No Count header")
byte [] personaBytes = Base64.decode(headers['Sender'])
Persona sender = new Persona(new ByteArrayInputStream(personaBytes))
if (sender.destination != e.getDestination())
throw new IOException("Sender destination mismatch expected ${e.getDestination()}, got $sender.destination")
int nResults = Integer.parseInt(headers['Count'])
if (nResults > Constants.MAX_RESULTS)
throw new IOException("too many results $nResults")
dis = new DataInputStream(new GZIPInputStream(dis))
UIResultEvent[] results = new UIResultEvent[nResults]
for (int i = 0; i < nResults; i++) {
int jsonSize = dis.readUnsignedShort()
byte [] payload = new byte[jsonSize]
dis.readFully(payload)
def json = slurper.parse(payload)
results[i] = ResultsParser.parse(sender, resultsUUID, json)
}
eventBus.publish(new UIResultBatchEvent(uuid: resultsUUID, results: results))
} catch (IOException bad) {
log.log(Level.WARNING, "failed to process RESULTS", bad)
} finally {
e.close()
}
}
private void processBROWSE(Endpoint e) { private void processBROWSE(Endpoint e) {
try { try {
byte [] rowse = new byte[7] byte [] rowse = new byte[7]

View File

@@ -14,6 +14,7 @@ import java.util.concurrent.ThreadFactory
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import java.util.logging.Level import java.util.logging.Level
import java.util.stream.Collectors import java.util.stream.Collectors
import java.util.zip.GZIPOutputStream
import com.muwire.core.DownloadedFile import com.muwire.core.DownloadedFile
import com.muwire.core.EventBus import com.muwire.core.EventBus
@@ -53,7 +54,7 @@ class ResultsSender {
this.settings = settings this.settings = settings
} }
void sendResults(UUID uuid, SharedFile[] results, Destination target, boolean oobInfohash) { void sendResults(UUID uuid, SharedFile[] results, Destination target, boolean oobInfohash, boolean compressedResults) {
log.info("Sending $results.length results for uuid $uuid to ${target.toBase32()} oobInfohash : $oobInfohash") log.info("Sending $results.length results for uuid $uuid to ${target.toBase32()} oobInfohash : $oobInfohash")
if (target.equals(me.destination)) { if (target.equals(me.destination)) {
results.each { results.each {
@@ -81,7 +82,7 @@ class ResultsSender {
} }
} else { } else {
executor.execute(new ResultSendJob(uuid : uuid, results : results, executor.execute(new ResultSendJob(uuid : uuid, results : results,
target: target, oobInfohash : oobInfohash)) target: target, oobInfohash : oobInfohash, compressedResults : compressedResults))
} }
} }
@@ -90,27 +91,49 @@ class ResultsSender {
SharedFile [] results SharedFile [] results
Destination target Destination target
boolean oobInfohash boolean oobInfohash
boolean compressedResults
@Override @Override
public void run() { public void run() {
try { try {
JsonOutput jsonOutput = new JsonOutput() JsonOutput jsonOutput = new JsonOutput()
Endpoint endpoint = null; Endpoint endpoint = null;
try { if (!compressedResults) {
endpoint = connector.connect(target) try {
DataOutputStream os = new DataOutputStream(endpoint.getOutputStream()) endpoint = connector.connect(target)
os.write("POST $uuid\r\n\r\n".getBytes(StandardCharsets.US_ASCII)) DataOutputStream os = new DataOutputStream(endpoint.getOutputStream())
me.write(os) os.write("POST $uuid\r\n\r\n".getBytes(StandardCharsets.US_ASCII))
os.writeShort((short)results.length) me.write(os)
results.each { os.writeShort((short)results.length)
def obj = sharedFileToObj(it, settings.browseFiles) results.each {
def json = jsonOutput.toJson(obj) def obj = sharedFileToObj(it, settings.browseFiles)
os.writeShort((short)json.length()) def json = jsonOutput.toJson(obj)
os.write(json.getBytes(StandardCharsets.US_ASCII)) os.writeShort((short)json.length())
os.write(json.getBytes(StandardCharsets.US_ASCII))
}
os.flush()
} finally {
endpoint?.close()
}
} else {
try {
endpoint = connector.connect(target)
OutputStream os = endpoint.getOutputStream()
os.write("RESULTS $uuid\r\n".getBytes(StandardCharsets.US_ASCII))
os.write("Sender: ${me.toBase64()}\r\n".getBytes(StandardCharsets.US_ASCII))
os.write("Count: $results.length\r\n".getBytes(StandardCharsets.US_ASCII))
os.write("\r\n".getBytes(StandardCharsets.US_ASCII))
DataOutputStream dos = new DataOutputStream(new GZIPOutputStream(os))
results.each {
def obj = sharedFileToObj(it, settings.browseFiles)
def json = jsonOutput.toJson(obj)
dos.writeShort((short)json.length())
dos.write(json.getBytes(StandardCharsets.US_ASCII))
}
dos.close()
} finally {
endpoint?.close()
} }
os.flush()
} finally {
endpoint?.close()
} }
} catch (Exception e) { } catch (Exception e) {
log.log(Level.WARNING, "problem sending results",e) log.log(Level.WARNING, "problem sending results",e)

View File

@@ -10,11 +10,12 @@ class SearchEvent extends Event {
UUID uuid UUID uuid
boolean oobInfohash boolean oobInfohash
boolean searchComments boolean searchComments
boolean compressedResults
String toString() { String toString() {
def infoHash = null def infoHash = null
if (searchHash != null) if (searchHash != null)
infoHash = new InfoHash(searchHash) infoHash = new InfoHash(searchHash)
"searchTerms: $searchTerms searchHash:$infoHash, uuid:$uuid oobInfohash:$oobInfohash searchComments:$searchComments" "searchTerms: $searchTerms searchHash:$infoHash, uuid:$uuid oobInfohash:$oobInfohash searchComments:$searchComments compressedResults:$compressedResults"
} }
} }

View File

@@ -8,4 +8,6 @@ public class Constants {
public static final int MAX_HEADER_SIZE = 0x1 << 14; public static final int MAX_HEADER_SIZE = 0x1 << 14;
public static final int MAX_HEADERS = 16; public static final int MAX_HEADERS = 16;
public static final int MAX_RESULTS = 0x1 << 16
} }

View File

@@ -78,7 +78,7 @@ class MainFrameController {
def searchEvent def searchEvent
if (hashSearch) { if (hashSearch) {
searchEvent = new SearchEvent(searchHash : root, uuid : uuid, oobInfohash: true) searchEvent = new SearchEvent(searchHash : root, uuid : uuid, oobInfohash: true, compressedResults : true)
} else { } else {
// this can be improved a lot // this can be improved a lot
def replaced = search.toLowerCase().trim().replaceAll(SplitPattern.SPLIT_PATTERN, " ") def replaced = search.toLowerCase().trim().replaceAll(SplitPattern.SPLIT_PATTERN, " ")
@@ -86,7 +86,7 @@ class MainFrameController {
def nonEmpty = [] def nonEmpty = []
terms.each { if (it.length() > 0) nonEmpty << it } terms.each { if (it.length() > 0) nonEmpty << it }
searchEvent = new SearchEvent(searchTerms : nonEmpty, uuid : uuid, oobInfohash: true, searchEvent = new SearchEvent(searchTerms : nonEmpty, uuid : uuid, oobInfohash: true,
searchComments : core.muOptions.searchComments) searchComments : core.muOptions.searchComments, compressedResults : true)
} }
boolean firstHop = core.muOptions.allowUntrusted || core.muOptions.searchExtraHop boolean firstHop = core.muOptions.allowUntrusted || core.muOptions.searchExtraHop
core.eventBus.publish(new QueryEvent(searchEvent : searchEvent, firstHop : firstHop, core.eventBus.publish(new QueryEvent(searchEvent : searchEvent, firstHop : firstHop,