Compare commits

..

11 Commits

Author SHA1 Message Date
Zlatin Balevsky
c4f48c02b6 delete incomplete file on cancel 2019-06-17 12:33:44 +01:00
Zlatin Balevsky
c3d9e852ba separate incomplete files 2019-06-17 07:49:06 +01:00
Zlatin Balevsky
0db7077a45 Release 0.2.4 2019-06-17 03:22:52 +01:00
Zlatin Balevsky
614ecc85fe new piece selection logic to avoid high cpu bug 2019-06-17 03:21:37 +01:00
Zlatin Balevsky
af66a79376 fix sorting by progress 2019-06-17 00:56:16 +01:00
Zlatin Balevsky
465171c81d prevent multiple identical shared files 2019-06-17 00:38:05 +01:00
Zlatin Balevsky
b507361c58 close the file before marking pieces complete 2019-06-16 23:45:23 +01:00
Zlatin Balevsky
4d001ae74b thread-safe access to the pieces file 2019-06-16 22:56:09 +01:00
Zlatin Balevsky
36a6e2769f Release 0.2.3 2019-06-16 19:05:12 +01:00
Zlatin Balevsky
69eeb7d77a fix 2019-06-16 18:58:52 +01:00
Zlatin Balevsky
551982b72a batch results sent to the GUI to prevent freeze 2019-06-16 18:51:07 +01:00
13 changed files with 159 additions and 92 deletions

View File

@@ -34,7 +34,7 @@ class Cli {
Core core
try {
core = new Core(props, home, "0.2.2")
core = new Core(props, home, "0.2.4")
} catch (Exception bad) {
bad.printStackTrace(System.out)
println "Failed to initialize core, exiting"

View File

@@ -53,7 +53,7 @@ class CliDownloader {
Core core
try {
core = new Core(props, home, "0.2.2")
core = new Core(props, home, "0.2.4")
} catch (Exception bad) {
bad.printStackTrace(System.out)
println "Failed to initialize core, exiting"

View File

@@ -268,7 +268,7 @@ public class Core {
}
}
Core core = new Core(props, home, "0.2.2")
Core core = new Core(props, home, "0.2.4")
core.startServices()
// ... at the end, sleep or execute script

View File

@@ -17,6 +17,8 @@ import com.muwire.core.upload.UploadManager
import com.muwire.core.search.InvalidSearchResultException
import com.muwire.core.search.ResultsParser
import com.muwire.core.search.SearchManager
import com.muwire.core.search.UIResultBatchEvent
import com.muwire.core.search.UIResultEvent
import com.muwire.core.search.UnexpectedResultsException
import groovy.json.JsonOutput
@@ -225,13 +227,15 @@ class ConnectionAcceptor {
if (sender.destination != e.getDestination())
throw new IOException("Sender destination mismatch expected $e.getDestination(), got $sender.destination")
int nResults = dis.readUnsignedShort()
UIResultEvent[] results = new UIResultEvent[nResults]
for (int i = 0; i < nResults; i++) {
int jsonSize = dis.readUnsignedShort()
byte [] payload = new byte[jsonSize]
dis.readFully(payload)
def json = slurper.parse(payload)
eventBus.publish(ResultsParser.parse(sender, resultsUUID, json))
results[i] = ResultsParser.parse(sender, resultsUUID, json)
}
eventBus.publish(new UIResultBatchEvent(uuid: resultsUUID, results: results))
} catch (IOException | UnexpectedResultsException | InvalidSearchResultException bad) {
log.log(Level.WARNING, "failed to process POST", bad)
} finally {

View File

@@ -16,6 +16,7 @@ import java.nio.file.Files
import java.nio.file.StandardOpenOption
import java.security.MessageDigest
import java.security.NoSuchAlgorithmException
import java.util.logging.Level
@Log
class DownloadSession {
@@ -23,7 +24,7 @@ class DownloadSession {
private static int SAMPLES = 10
private final String meB64
private final Pieces downloaded, claimed
private final Pieces pieces
private final InfoHash infoHash
private final Endpoint endpoint
private final File file
@@ -36,11 +37,10 @@ class DownloadSession {
private ByteBuffer mapped
DownloadSession(String meB64, Pieces downloaded, Pieces claimed, InfoHash infoHash, Endpoint endpoint, File file,
DownloadSession(String meB64, Pieces pieces, InfoHash infoHash, Endpoint endpoint, File file,
int pieceSize, long fileLength) {
this.meB64 = meB64
this.downloaded = downloaded
this.claimed = claimed
this.pieces = pieces
this.endpoint = endpoint
this.infoHash = infoHash
this.file = file
@@ -63,20 +63,11 @@ class DownloadSession {
OutputStream os = endpoint.getOutputStream()
InputStream is = endpoint.getInputStream()
int piece
while(true) {
piece = downloaded.getRandomPiece()
if (claimed.isMarked(piece)) {
if (downloaded.donePieces() + claimed.donePieces() == downloaded.nPieces) {
log.info("all pieces claimed")
return false
}
continue
}
break
}
claimed.markDownloaded(piece)
int piece = pieces.claim()
if (piece == -1)
return false
boolean unclaim = true
log.info("will download piece $piece")
long start = piece * pieceSize
@@ -85,7 +76,6 @@ class DownloadSession {
String root = Base64.encode(infoHash.getRoot())
FileChannel channel
try {
os.write("GET $root\r\n".getBytes(StandardCharsets.US_ASCII))
os.write("Range: $start-$end\r\n".getBytes(StandardCharsets.US_ASCII))
@@ -135,41 +125,46 @@ class DownloadSession {
}
// start the download
channel = Files.newByteChannel(file.toPath(), EnumSet.of(StandardOpenOption.READ, StandardOpenOption.WRITE,
StandardOpenOption.SPARSE, StandardOpenOption.CREATE)) // TODO: double-check, maybe CREATE_NEW
mapped = channel.map(FileChannel.MapMode.READ_WRITE, start, end - start + 1)
byte[] tmp = new byte[0x1 << 13]
while(mapped.hasRemaining()) {
if (mapped.remaining() < tmp.length)
tmp = new byte[mapped.remaining()]
int read = is.read(tmp)
if (read == -1)
throw new IOException()
synchronized(this) {
mapped.put(tmp, 0, read)
if (timestamps.size() == SAMPLES) {
timestamps.removeFirst()
reads.removeFirst()
FileChannel channel
try {
channel = Files.newByteChannel(file.toPath(), EnumSet.of(StandardOpenOption.READ, StandardOpenOption.WRITE,
StandardOpenOption.SPARSE, StandardOpenOption.CREATE)) // TODO: double-check, maybe CREATE_NEW
mapped = channel.map(FileChannel.MapMode.READ_WRITE, start, end - start + 1)
byte[] tmp = new byte[0x1 << 13]
while(mapped.hasRemaining()) {
if (mapped.remaining() < tmp.length)
tmp = new byte[mapped.remaining()]
int read = is.read(tmp)
if (read == -1)
throw new IOException()
synchronized(this) {
mapped.put(tmp, 0, read)
if (timestamps.size() == SAMPLES) {
timestamps.removeFirst()
reads.removeFirst()
}
timestamps.addLast(System.currentTimeMillis())
reads.addLast(read)
}
timestamps.addLast(System.currentTimeMillis())
reads.addLast(read)
}
mapped.clear()
digest.update(mapped)
byte [] hash = digest.digest()
byte [] expected = new byte[32]
System.arraycopy(infoHash.getHashList(), piece * 32, expected, 0, 32)
if (hash != expected)
throw new BadHashException()
} finally {
try { channel?.close() } catch (IOException ignore) {}
}
mapped.clear()
digest.update(mapped)
byte [] hash = digest.digest()
byte [] expected = new byte[32]
System.arraycopy(infoHash.getHashList(), piece * 32, expected, 0, 32)
if (hash != expected)
throw new BadHashException()
downloaded.markDownloaded(piece)
pieces.markDownloaded(piece)
unclaim = false
} finally {
claimed.clear(piece)
try { channel?.close() } catch (IOException ignore) {}
if (unclaim)
pieces.unclaim(piece)
}
return true
}

View File

@@ -4,9 +4,13 @@ import com.muwire.core.InfoHash
import com.muwire.core.Persona
import com.muwire.core.connection.Endpoint
import java.nio.file.AtomicMoveNotSupportedException
import java.nio.file.Files
import java.nio.file.StandardCopyOption
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicBoolean
import java.util.logging.Level
import com.muwire.core.Constants
@@ -34,7 +38,7 @@ public class Downloader {
private final DownloadManager downloadManager
private final Persona me
private final File file
private final Pieces downloaded, claimed
private final Pieces pieces
private final long length
private InfoHash infoHash
private final int pieceSize
@@ -42,12 +46,14 @@ public class Downloader {
private final Set<Destination> destinations
private final int nPieces
private final File piecesFile
private final File incompleteFile
final int pieceSizePow2
private final Map<Destination, DownloadWorker> activeWorkers = new ConcurrentHashMap<>()
private volatile boolean cancelled
private volatile boolean eventFired
private final AtomicBoolean eventFired = new AtomicBoolean()
private boolean piecesFileClosed
public Downloader(EventBus eventBus, DownloadManager downloadManager,
Persona me, File file, long length, InfoHash infoHash,
@@ -62,6 +68,7 @@ public class Downloader {
this.connector = connector
this.destinations = destinations
this.piecesFile = new File(incompletes, file.getName()+".pieces")
this.incompleteFile = new File(incompletes, file.getName()+".part")
this.pieceSizePow2 = pieceSizePow2
this.pieceSize = 1 << pieceSizePow2
@@ -72,8 +79,7 @@ public class Downloader {
nPieces = length / pieceSize + 1
this.nPieces = nPieces
downloaded = new Pieces(nPieces, Constants.DOWNLOAD_SEQUENTIAL_RATIO)
claimed = new Pieces(nPieces)
pieces = new Pieces(nPieces, Constants.DOWNLOAD_SEQUENTIAL_RATIO)
}
public synchronized InfoHash getInfoHash() {
@@ -100,20 +106,24 @@ public class Downloader {
return
piecesFile.eachLine {
int piece = Integer.parseInt(it)
downloaded.markDownloaded(piece)
pieces.markDownloaded(piece)
}
}
void writePieces() {
piecesFile.withPrintWriter { writer ->
downloaded.getDownloaded().each { piece ->
writer.println(piece)
synchronized(piecesFile) {
if (piecesFileClosed)
return
piecesFile.withPrintWriter { writer ->
pieces.getDownloaded().each { piece ->
writer.println(piece)
}
}
}
}
public long donePieces() {
downloaded.donePieces()
pieces.donePieces()
}
@@ -136,7 +146,7 @@ public class Downloader {
allFinished &= it.currentState == WorkerState.FINISHED
}
if (allFinished) {
if (downloaded.isComplete())
if (pieces.isComplete())
return DownloadState.FINISHED
return DownloadState.FAILED
}
@@ -170,8 +180,11 @@ public class Downloader {
public void cancel() {
cancelled = true
stop()
file.delete()
piecesFile.delete()
synchronized(piecesFile) {
piecesFileClosed = true
piecesFile.delete()
}
incompleteFile.delete()
}
void stop() {
@@ -231,8 +244,8 @@ public class Downloader {
}
currentState = WorkerState.DOWNLOADING
boolean requestPerformed
while(!downloaded.isComplete()) {
currentSession = new DownloadSession(me.toBase64(), downloaded, claimed, getInfoHash(), endpoint, file, pieceSize, length)
while(!pieces.isComplete()) {
currentSession = new DownloadSession(me.toBase64(), pieces, getInfoHash(), endpoint, incompleteFile, pieceSize, length)
requestPerformed = currentSession.request()
if (!requestPerformed)
break
@@ -242,9 +255,17 @@ public class Downloader {
log.log(Level.WARNING,"Exception while downloading",bad)
} finally {
currentState = WorkerState.FINISHED
if (downloaded.isComplete() && !eventFired) {
piecesFile.delete()
eventFired = true
if (pieces.isComplete() && eventFired.compareAndSet(false, true)) {
synchronized(piecesFile) {
piecesFileClosed = true
piecesFile.delete()
}
try {
Files.move(incompleteFile.toPath(), file.toPath(), StandardCopyOption.ATOMIC_MOVE)
} catch (AtomicMoveNotSupportedException e) {
Files.copy(incompleteFile.toPath(), file.toPath(), StandardCopyOption.REPLACE_EXISTING)
incompleteFile.delete()
}
eventBus.publish(
new FileDownloadedEvent(
downloadedFile : new DownloadedFile(file, getInfoHash(), pieceSizePow2, Collections.emptySet()),

View File

@@ -1,7 +1,7 @@
package com.muwire.core.download
class Pieces {
private final BitSet bitSet
private final BitSet done, claimed
private final int nPieces
private final float ratio
private final Random random = new Random()
@@ -13,52 +13,53 @@ class Pieces {
Pieces(int nPieces, float ratio) {
this.nPieces = nPieces
this.ratio = ratio
bitSet = new BitSet(nPieces)
done = new BitSet(nPieces)
claimed = new BitSet(nPieces)
}
synchronized int getRandomPiece() {
int cardinality = bitSet.cardinality()
if (cardinality == nPieces)
synchronized int claim() {
int claimedCardinality = claimed.cardinality()
if (claimedCardinality == nPieces)
return -1
// if fuller than ratio just do sequential
if ( (1.0f * cardinality) / nPieces > ratio) {
return bitSet.nextClearBit(0)
if ( (1.0f * claimedCardinality) / nPieces > ratio) {
int rv = claimed.nextClearBit(0)
claimed.set(rv)
return rv
}
while(true) {
int start = random.nextInt(nPieces)
if (bitSet.get(start))
if (claimed.get(start))
continue
claimed.set(start)
return start
}
}
def getDownloaded() {
synchronized def getDownloaded() {
def rv = []
for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i+1)) {
for (int i = done.nextSetBit(0); i >= 0; i = done.nextSetBit(i+1)) {
rv << i
}
rv
}
synchronized void markDownloaded(int piece) {
bitSet.set(piece)
done.set(piece)
claimed.set(piece)
}
synchronized void clear(int piece) {
bitSet.clear(piece)
synchronized void unclaim(int piece) {
claimed.clear(piece)
}
synchronized boolean isComplete() {
bitSet.cardinality() == nPieces
}
synchronized boolean isMarked(int piece) {
bitSet.get(piece)
done.cardinality() == nPieces
}
synchronized int donePieces() {
bitSet.cardinality()
done.cardinality()
}
}

View File

@@ -0,0 +1,8 @@
package com.muwire.core.search
import com.muwire.core.Event
class UIResultBatchEvent extends Event {
UUID uuid
UIResultEvent[] results
}

View File

@@ -25,4 +25,17 @@ public class SharedFile {
public int getPieceSize() {
return pieceSize;
}
@Override
public int hashCode() {
return file.hashCode() ^ infoHash.hashCode();
}
@Override
public boolean equals(Object o) {
if (!(o instanceof SharedFile))
return false;
SharedFile other = (SharedFile)o;
return file.equals(other.file) && infoHash.equals(other.infoHash);
}
}

View File

@@ -1,5 +1,5 @@
group = com.muwire
version = 0.2.2
version = 0.2.4
groovyVersion = 2.4.15
slf4jVersion = 1.7.25
spockVersion = 1.1-groovy-2.4

View File

@@ -20,6 +20,7 @@ import com.muwire.core.files.FileHashedEvent
import com.muwire.core.files.FileLoadedEvent
import com.muwire.core.files.FileSharedEvent
import com.muwire.core.search.QueryEvent
import com.muwire.core.search.UIResultBatchEvent
import com.muwire.core.search.UIResultEvent
import com.muwire.core.trust.TrustEvent
import com.muwire.core.trust.TrustService
@@ -115,6 +116,7 @@ class MainFrameModel {
core = e.getNewValue()
me = core.me.getHumanReadableName()
core.eventBus.register(UIResultEvent.class, this)
core.eventBus.register(UIResultBatchEvent.class, this)
core.eventBus.register(DownloadStartedEvent.class, this)
core.eventBus.register(ConnectionEvent.class, this)
core.eventBus.register(DisconnectionEvent.class, this)
@@ -161,6 +163,11 @@ class MainFrameModel {
resultsGroup?.model.handleResult(e)
}
void onUIResultBatchEvent(UIResultBatchEvent e) {
MVCGroup resultsGroup = results.get(e.uuid)
resultsGroup?.model.handleResultBatch(e.results)
}
void onDownloadStartedEvent(DownloadStartedEvent e) {
runInsideUIAsync {
downloads << e

View File

@@ -52,4 +52,22 @@ class SearchTabModel {
table.model.fireTableDataChanged()
}
}
void handleResultBatch(UIResultEvent[] batch) {
runInsideUIAsync {
batch.each {
if (uiSettings.excludeLocalResult && it.sender == core.me)
return
def bucket = hashBucket.get(it.infohash)
if (bucket == null) {
bucket = []
hashBucket[it.infohash] = bucket
}
bucket << it
results << it
}
JTable table = builder.getVariable("results-table")
table.model.fireTableDataChanged()
}
}
}

View File

@@ -120,7 +120,7 @@ class MainFrameView {
closureColumn(header: "Progress", preferredWidth: 20, type: String, read: { row ->
int pieces = row.downloader.nPieces
int done = row.downloader.donePieces()
"$done/$pieces pieces"
"$done/$pieces pieces".toString()
})
closureColumn(header: "Sources", preferredWidth : 10, type: Integer, read : {row -> row.downloader.activeWorkers()})
closureColumn(header: "Speed", preferredWidth: 50, type:String, read :{row ->