* i2psnark:

- Store received chunks in temp files
   - Don't allocate from heap for unneeded chunks
   - Remove peer count restriction for torrents with large pieces
   - Use priorities and rarest calculations to sort partials
   - Preserve p parameter in clear messages link
This commit is contained in:
zzz
2012-05-19 13:27:02 +00:00
parent 3685bf04d0
commit 6ca4b519bf
15 changed files with 430 additions and 166 deletions

View File

@@ -184,6 +184,9 @@ public class I2PSnarkUtil {
/** @since 0.8.9 */ /** @since 0.8.9 */
public void setFilesPublic(boolean yes) { _areFilesPublic = yes; } public void setFilesPublic(boolean yes) { _areFilesPublic = yes; }
/** @since 0.9.1 */
public File getTempDir() { return _tmpDir; }
/** /**
* Connect to the router, if we aren't already * Connect to the router, if we aren't already
*/ */

View File

@@ -423,6 +423,20 @@ public class MetaInfo
return true; return true;
} }
/**
* @since 0.9.1
*/
boolean checkPiece(PartialPiece pp) throws IOException {
MessageDigest sha1 = SHA1.getInstance();
int piece = pp.getPiece();
byte[] hash = pp.getHash();
for (int i = 0; i < 20; i++)
if (hash[i] != piece_hashes[20 * piece + i])
return false;
return true;
}
/** /**
* Returns the total length of the torrent in bytes. * Returns the total length of the torrent in bytes.
*/ */

View File

@@ -1,6 +1,22 @@
package org.klomp.snark; package org.klomp.snark;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.security.MessageDigest;
import net.i2p.I2PAppContext;
import net.i2p.crypto.SHA1;
import net.i2p.util.Log;
import net.i2p.util.SecureFile;
/** /**
* Store the received data either on the heap or in a temp file.
* The third option, to write chunks directly to the destination file,
* is unimplemented.
*
* This is the class passed from PeerCoordinator to PeerState so * This is the class passed from PeerCoordinator to PeerState so
* PeerState may start requests. * PeerState may start requests.
* *
@@ -8,45 +24,81 @@ package org.klomp.snark;
* a piece is not completely downloaded, for example * a piece is not completely downloaded, for example
* when the Peer disconnects or chokes. * when the Peer disconnects or chokes.
* *
* New objects for the same piece are created during the end game -
* this object should not be shared among multiple peers.
*
* @since 0.8.2 * @since 0.8.2
*/ */
class PartialPiece implements Comparable { class PartialPiece implements Comparable {
private final int piece; // we store the piece so we can use it in compareTo()
private final Piece piece;
// null if using temp file
private final byte[] bs; private final byte[] bs;
private final int off; private int off;
private final long createdTime; //private final long createdTime;
private File tempfile;
private RandomAccessFile raf;
private final int pclen;
private final File tempDir;
// Any bigger than this, use temp file instead of heap
private static final int MAX_IN_MEM = 128 * 1024;
// May be reduced on OOM
private static int _max_in_mem = MAX_IN_MEM;
/** /**
* Used by PeerCoordinator. * Used by PeerCoordinator.
* Creates a new PartialPiece, with no chunks yet downloaded. * Creates a new PartialPiece, with no chunks yet downloaded.
* Allocates the data. * Allocates the data storage area, either on the heap or in the
* temp directory, depending on size.
* *
* @param piece Piece number requested. * @param piece Piece number requested.
* @param len must be equal to the piece length * @param len must be equal to the piece length
*/ */
public PartialPiece (int piece, int len) throws OutOfMemoryError { public PartialPiece (Piece piece, int len, File tempDir) {
this.piece = piece; this.piece = piece;
this.bs = new byte[len]; this.pclen = len;
this.off = 0; //this.createdTime = 0;
this.createdTime = 0; this.tempDir = tempDir;
// temps for finals
byte[] tbs = null;
try {
if (len <= MAX_IN_MEM) {
try {
tbs = new byte[len];
return;
} catch (OutOfMemoryError oom) {
if (_max_in_mem > PeerState.PARTSIZE)
_max_in_mem /= 2;
Log log = I2PAppContext.getGlobalContext().logManager().getLog(PartialPiece.class);
log.logAlways(Log.WARN, "OOM creating new partial piece");
// fall through to use temp file
}
}
// delay creating temp file until required in read()
} finally {
// finals
this.bs = tbs;
}
} }
/** /**
* Used by PeerState. * Caller must synchronize
* Creates a new PartialPiece, with chunks up to but not including
* firstOutstandingRequest already downloaded and stored in the Request byte array.
* *
* Note that this cannot handle gaps; chunks after a missing chunk cannot be saved. * @since 0.9.1
* That would be harder.
*
* @param firstOutstandingRequest the first request not fulfilled for the piece
*/ */
public PartialPiece (Request firstOutstandingRequest) { private void createTemp() throws IOException {
this.piece = firstOutstandingRequest.piece; //tfile = SecureFile.createTempFile("piece", null, tempDir);
this.bs = firstOutstandingRequest.bs; // debug
this.off = firstOutstandingRequest.off; tempfile = SecureFile.createTempFile("piece_" + piece.getId() + '_', null, tempDir);
this.createdTime = System.currentTimeMillis(); //I2PAppContext.getGlobalContext().logManager().getLog(PartialPiece.class).warn("Created " + tempfile);
// tfile.deleteOnExit() ???
raf = new RandomAccessFile(tempfile, "rw");
// Do not preallocate the file space.
// Not necessary to call setLength(), file is extended when written
//traf.setLength(len);
} }
/** /**
@@ -55,33 +107,168 @@ class PartialPiece implements Comparable {
*/ */
public Request getRequest() { public Request getRequest() {
return new Request(this.piece, this.bs, this.off, Math.min(this.bs.length - this.off, PeerState.PARTSIZE)); return new Request(this, this.off, Math.min(this.pclen - this.off, PeerState.PARTSIZE));
} }
/** piece number */ /** piece number */
public int getPiece() { public int getPiece() {
return this.piece; return this.piece.getId();
} }
/** how many bytes are good */ /**
* @since 0.9.1
*/
public int getLength() {
return this.pclen;
}
/**
* How many bytes are good - only valid by setDownloaded()
*/
public int getDownloaded() { public int getDownloaded() {
return this.off; return this.off;
} }
/**
* Call this before returning a PartialPiece to the PeerCoordinator
* @since 0.9.1
*/
public void setDownloaded(int offset) {
this.off = offset;
}
/****
public long getCreated() { public long getCreated() {
return this.createdTime; return this.createdTime;
} }
****/
/** /**
* Highest downloaded first * Piece must be complete.
* The SHA1 hash of the completely read data.
* @since 0.9.1
*/
public byte[] getHash() throws IOException {
MessageDigest sha1 = SHA1.getInstance();
if (bs != null) {
sha1.update(bs);
} else {
int read = 0;
byte[] buf = new byte[Math.min(pclen, 16384)];
synchronized (this) {
if (raf == null)
throw new IOException();
raf.seek(0);
while (read < pclen) {
int rd = raf.read(buf, 0, Math.min(buf.length, pclen - read));
if (rd < 0)
break;
read += rd;
sha1.update(buf, 0, rd);
}
}
if (read < pclen)
throw new IOException();
}
return sha1.digest();
}
/**
* Blocking.
* @since 0.9.1
*/
public void read(DataInputStream din, int off, int len) throws IOException {
if (bs != null) {
din.readFully(bs, off, len);
} else {
// read in fully before synching on raf
byte[] tmp = new byte[len];
din.readFully(tmp);
synchronized (this) {
if (raf == null)
createTemp();
raf.seek(off);
raf.write(tmp);
}
}
}
/**
* Piece must be complete.
* Caller must synchronize on out and seek to starting point.
* Caller must call release() when done with the whole piece.
*
* @param start offset in the output file
* @param offset offset in the piece
* @param len length to write
* @since 0.9.1
*/
public void write(DataOutput out, int offset, int len) throws IOException {
if (bs != null) {
out.write(bs, offset, len);
} else {
int read = 0;
byte[] buf = new byte[Math.min(len, 16384)];
synchronized (this) {
if (raf == null)
throw new IOException();
raf.seek(offset);
while (read < len) {
int rd = Math.min(buf.length, len - read);
raf.readFully(buf, 0, rd);
read += rd;
out.write(buf, 0, rd);
}
}
}
}
/**
* Release all resources.
*
* @since 0.9.1
*/
public void release() {
if (bs == null) {
synchronized (this) {
if (raf != null)
locked_release();
}
//if (raf != null)
// I2PAppContext.getGlobalContext().logManager().getLog(PartialPiece.class).warn("Released " + tempfile);
}
}
/**
* Caller must synchronize
*
* @since 0.9.1
*/
private void locked_release() {
try {
raf.close();
} catch (IOException ioe) {
I2PAppContext.getGlobalContext().logManager().getLog(PartialPiece.class).warn("Error closing " + raf, ioe);
}
tempfile.delete();
}
/*
* Highest priority first,
* then rarest first,
* then highest downloaded first
*/ */
public int compareTo(Object o) throws ClassCastException { public int compareTo(Object o) throws ClassCastException {
return ((PartialPiece)o).off - this.off; // reverse PartialPiece opp = (PartialPiece)o;
int d = this.piece.compareTo(opp.piece);
if (d != 0)
return d;
return opp.off - this.off; // reverse
} }
@Override @Override
public int hashCode() { public int hashCode() {
return piece * 7777; return piece.getId() * 7777;
} }
/** /**
@@ -92,13 +279,13 @@ class PartialPiece implements Comparable {
public boolean equals(Object o) { public boolean equals(Object o) {
if (o instanceof PartialPiece) { if (o instanceof PartialPiece) {
PartialPiece pp = (PartialPiece)o; PartialPiece pp = (PartialPiece)o;
return pp.piece == this.piece; return pp.piece.getId() == this.piece.getId();
} }
return false; return false;
} }
@Override @Override
public String toString() { public String toString() {
return "Partial(" + piece + ',' + off + ',' + bs.length + ')'; return "Partial(" + piece.getId() + ',' + off + ',' + pclen + ')';
} }
} }

View File

@@ -460,7 +460,7 @@ public class Peer implements Comparable
if (this.deregister) { if (this.deregister) {
PeerListener p = s.listener; PeerListener p = s.listener;
if (p != null) { if (p != null) {
List<PartialPiece> pcs = s.returnPartialPieces(); List<Request> pcs = s.returnPartialPieces();
if (!pcs.isEmpty()) if (!pcs.isEmpty())
p.savePartialPieces(this, pcs); p.savePartialPieces(this, pcs);
// now covered by savePartialPieces // now covered by savePartialPieces

View File

@@ -148,11 +148,9 @@ class PeerConnectionIn implements Runnable
begin = din.readInt(); begin = din.readInt();
len = i-9; len = i-9;
Request req = ps.getOutstandingRequest(piece, begin, len); Request req = ps.getOutstandingRequest(piece, begin, len);
byte[] piece_bytes;
if (req != null) if (req != null)
{ {
piece_bytes = req.bs; req.read(din);
din.readFully(piece_bytes, begin, len);
ps.pieceMessage(req); ps.pieceMessage(req);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Received data(" + piece + "," + begin + ") from " + peer); _log.debug("Received data(" + piece + "," + begin + ") from " + peer);
@@ -160,8 +158,9 @@ class PeerConnectionIn implements Runnable
else else
{ {
// XXX - Consume but throw away afterwards. // XXX - Consume but throw away afterwards.
piece_bytes = new byte[len]; int rcvd = din.skipBytes(len);
din.readFully(piece_bytes); if (rcvd != len)
throw new IOException("EOF reading unwanted data");
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Received UNWANTED data(" + piece + "," + begin + ") from " + peer); _log.debug("Received UNWANTED data(" + piece + "," + begin + ") from " + peer);
} }

View File

@@ -395,7 +395,7 @@ class PeerConnectionOut implements Runnable
while (it.hasNext()) while (it.hasNext())
{ {
Message m = (Message)it.next(); Message m = (Message)it.next();
if (m.type == Message.REQUEST && m.piece == req.piece && if (m.type == Message.REQUEST && m.piece == req.getPiece() &&
m.begin == req.off && m.length == req.len) m.begin == req.off && m.length == req.len)
{ {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
@@ -406,7 +406,7 @@ class PeerConnectionOut implements Runnable
} }
Message m = new Message(); Message m = new Message();
m.type = Message.REQUEST; m.type = Message.REQUEST;
m.piece = req.piece; m.piece = req.getPiece();
m.begin = req.off; m.begin = req.off;
m.length = req.len; m.length = req.len;
addMessage(m); addMessage(m);
@@ -492,7 +492,7 @@ class PeerConnectionOut implements Runnable
{ {
Message m = (Message)it.next(); Message m = (Message)it.next();
if (m.type == Message.REQUEST if (m.type == Message.REQUEST
&& m.piece == req.piece && m.piece == req.getPiece()
&& m.begin == req.off && m.begin == req.off
&& m.length == req.len) && m.length == req.len)
it.remove(); it.remove();
@@ -502,7 +502,7 @@ class PeerConnectionOut implements Runnable
// Always send, just to be sure it it is really canceled. // Always send, just to be sure it it is really canceled.
Message m = new Message(); Message m = new Message();
m.type = Message.CANCEL; m.type = Message.CANCEL;
m.piece = req.piece; m.piece = req.getPiece();
m.begin = req.off; m.begin = req.off;
m.length = req.len; m.length = req.len;
addMessage(m); addMessage(m);

View File

@@ -116,7 +116,7 @@ public class PeerCoordinator implements PeerListener
*/ */
private final List<Piece> wantedPieces; private final List<Piece> wantedPieces;
/** partial pieces - lock by synching on wantedPieces */ /** partial pieces - lock by synching on wantedPieces - TODO store Requests, not PartialPieces */
private final List<PartialPiece> partialPieces; private final List<PartialPiece> partialPieces;
private boolean halted = false; private boolean halted = false;
@@ -349,11 +349,12 @@ public class PeerCoordinator implements PeerListener
return 6; return 6;
int size = metainfo.getPieceLength(0); int size = metainfo.getPieceLength(0);
int max = _util.getMaxConnections(); int max = _util.getMaxConnections();
if (size <= 512*1024 || completed()) // Now that we use temp files, no memory concern
//if (size <= 512*1024 || completed())
return max; return max;
if (size <= 1024*1024) //if (size <= 1024*1024)
return (max + max + 2) / 3; // return (max + max + 2) / 3;
return (max + 2) / 3; //return (max + 2) / 3;
} }
public boolean halted() { return halted; } public boolean halted() { return halted; }
@@ -380,6 +381,9 @@ public class PeerCoordinator implements PeerListener
} }
// delete any saved orphan partial piece // delete any saved orphan partial piece
synchronized (partialPieces) { synchronized (partialPieces) {
for (PartialPiece pp : partialPieces) {
pp.release();
}
partialPieces.clear(); partialPieces.clear();
} }
} }
@@ -630,22 +634,23 @@ public class PeerCoordinator implements PeerListener
* -1 if none of the given pieces are wanted. * -1 if none of the given pieces are wanted.
*/ */
public int wantPiece(Peer peer, BitField havePieces) { public int wantPiece(Peer peer, BitField havePieces) {
return wantPiece(peer, havePieces, true); Piece pc = wantPiece(peer, havePieces, true);
return pc != null ? pc.getId() : -1;
} }
/** /**
* Returns one of pieces in the given BitField that is still wanted or * Returns one of pieces in the given BitField that is still wanted or
* -1 if none of the given pieces are wanted. * null if none of the given pieces are wanted.
* *
* @param record if true, actually record in our data structures that we gave the * @param record if true, actually record in our data structures that we gave the
* request to this peer. If false, do not update the data structures. * request to this peer. If false, do not update the data structures.
* @since 0.8.2 * @since 0.8.2
*/ */
private int wantPiece(Peer peer, BitField havePieces, boolean record) { private Piece wantPiece(Peer peer, BitField havePieces, boolean record) {
if (halted) { if (halted) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("We don't want anything from the peer, as we are halted! peer=" + peer); _log.warn("We don't want anything from the peer, as we are halted! peer=" + peer);
return -1; return null;
} }
Piece piece = null; Piece piece = null;
@@ -680,7 +685,7 @@ public class PeerCoordinator implements PeerListener
// If we do end game all the time, we generate lots of extra traffic // If we do end game all the time, we generate lots of extra traffic
// when the seeder is super-slow and all the peers are "caught up" // when the seeder is super-slow and all the peers are "caught up"
if (wantedSize > END_GAME_THRESHOLD) if (wantedSize > END_GAME_THRESHOLD)
return -1; // nothing to request and not in end game return null; // nothing to request and not in end game
// let's not all get on the same piece // let's not all get on the same piece
// Even better would be to sort by number of requests // Even better would be to sort by number of requests
if (record) if (record)
@@ -704,7 +709,7 @@ public class PeerCoordinator implements PeerListener
_log.warn("nothing to even rerequest from " + peer + ": requested = " + requested); _log.warn("nothing to even rerequest from " + peer + ": requested = " + requested);
// _log.warn("nothing to even rerequest from " + peer + ": requested = " + requested // _log.warn("nothing to even rerequest from " + peer + ": requested = " + requested
// + " wanted = " + wantedPieces + " peerHas = " + havePieces); // + " wanted = " + wantedPieces + " peerHas = " + havePieces);
return -1; //If we still can't find a piece we want, so be it. return null; //If we still can't find a piece we want, so be it.
} else { } else {
// Should be a lot smarter here - // Should be a lot smarter here -
// share blocks rather than starting from 0 with each peer. // share blocks rather than starting from 0 with each peer.
@@ -719,7 +724,7 @@ public class PeerCoordinator implements PeerListener
_log.info(peer + " is now requesting: piece " + piece + " priority " + piece.getPriority()); _log.info(peer + " is now requesting: piece " + piece + " priority " + piece.getPriority());
piece.setRequested(peer, true); piece.setRequested(peer, true);
} }
return piece.getId(); return piece;
} // synch } // synch
} }
@@ -846,10 +851,11 @@ public class PeerCoordinator implements PeerListener
* *
* @throws RuntimeException on IOE saving the piece * @throws RuntimeException on IOE saving the piece
*/ */
public boolean gotPiece(Peer peer, int piece, byte[] bs) public boolean gotPiece(Peer peer, PartialPiece pp)
{ {
if (metainfo == null || storage == null) if (metainfo == null || storage == null)
return true; return true;
int piece = pp.getPiece();
if (halted) { if (halted) {
_log.info("Got while-halted piece " + piece + "/" + metainfo.getPieces() +" from " + peer + " for " + metainfo.getName()); _log.info("Got while-halted piece " + piece + "/" + metainfo.getPieces() +" from " + peer + " for " + metainfo.getName());
return true; // We don't actually care anymore. return true; // We don't actually care anymore.
@@ -872,7 +878,7 @@ public class PeerCoordinator implements PeerListener
try try
{ {
if (storage.putPiece(piece, bs)) if (storage.putPiece(pp))
{ {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Got valid piece " + piece + "/" + metainfo.getPieces() +" from " + peer + " for " + metainfo.getName()); _log.info("Got valid piece " + piece + "/" + metainfo.getPieces() +" from " + peer + " for " + metainfo.getName());
@@ -922,6 +928,15 @@ public class PeerCoordinator implements PeerListener
p.disconnect(true); p.disconnect(true);
} }
if (completed()) {
synchronized (partialPieces) {
for (PartialPiece ppp : partialPieces) {
ppp.release();
}
partialPieces.clear();
}
}
return true; return true;
} }
@@ -998,17 +1013,24 @@ public class PeerCoordinator implements PeerListener
* Also mark the piece unrequested if this peer was the only one. * Also mark the piece unrequested if this peer was the only one.
* *
* @param peer partials, must include the zero-offset (empty) ones too * @param peer partials, must include the zero-offset (empty) ones too
* No dup pieces, piece.setDownloaded() must be set
* @since 0.8.2 * @since 0.8.2
*/ */
public void savePartialPieces(Peer peer, List<PartialPiece> partials) public void savePartialPieces(Peer peer, List<Request> partials)
{ {
if (halted)
return;
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Partials received from " + peer + ": " + partials); _log.info("Partials received from " + peer + ": " + partials);
if (halted || completed()) {
for (Request req : partials) {
PartialPiece pp = req.getPartialPiece();
pp.release();
}
return;
}
synchronized(wantedPieces) { synchronized(wantedPieces) {
for (PartialPiece pp : partials) { for (Request req : partials) {
if (pp.getDownloaded() > 0) { PartialPiece pp = req.getPartialPiece();
if (req.off > 0) {
// PartialPiece.equals() only compares piece number, which is what we want // PartialPiece.equals() only compares piece number, which is what we want
int idx = partialPieces.indexOf(pp); int idx = partialPieces.indexOf(pp);
if (idx < 0) { if (idx < 0) {
@@ -1017,10 +1039,12 @@ public class PeerCoordinator implements PeerListener
_log.info("Saving orphaned partial piece (new) " + pp); _log.info("Saving orphaned partial piece (new) " + pp);
} else if (idx >= 0 && pp.getDownloaded() > partialPieces.get(idx).getDownloaded()) { } else if (idx >= 0 && pp.getDownloaded() > partialPieces.get(idx).getDownloaded()) {
// replace what's there now // replace what's there now
partialPieces.get(idx).release();
partialPieces.set(idx, pp); partialPieces.set(idx, pp);
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Saving orphaned partial piece (bigger) " + pp); _log.info("Saving orphaned partial piece (bigger) " + pp);
} else { } else {
pp.release();
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Discarding partial piece (not bigger)" + pp); _log.info("Discarding partial piece (not bigger)" + pp);
} }
@@ -1029,10 +1053,14 @@ public class PeerCoordinator implements PeerListener
// sorts by remaining bytes, least first // sorts by remaining bytes, least first
Collections.sort(partialPieces); Collections.sort(partialPieces);
PartialPiece gone = partialPieces.remove(max); PartialPiece gone = partialPieces.remove(max);
gone.release();
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Discarding orphaned partial piece (list full)" + gone); _log.info("Discarding orphaned partial piece (list full)" + gone);
} }
} // else drop the empty partial piece } else {
// drop the empty partial piece
pp.release();
}
// synchs on wantedPieces... // synchs on wantedPieces...
markUnrequested(peer, pp.getPiece()); markUnrequested(peer, pp.getPiece());
} }
@@ -1079,14 +1107,9 @@ public class PeerCoordinator implements PeerListener
} }
// ...and this section turns this into the general move-requests-around code! // ...and this section turns this into the general move-requests-around code!
// Temporary? So PeerState never calls wantPiece() directly for now... // Temporary? So PeerState never calls wantPiece() directly for now...
int piece = wantPiece(peer, havePieces); Piece piece = wantPiece(peer, havePieces, true);
if (piece >= 0) { if (piece != null) {
try { return new PartialPiece(piece, metainfo.getPieceLength(piece.getId()), _util.getTempDir());
return new PartialPiece(piece, metainfo.getPieceLength(piece));
} catch (OutOfMemoryError oom) {
if (_log.shouldLog(Log.WARN))
_log.warn("OOM creating new partial piece");
}
} }
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("We have no partial piece to return"); _log.debug("We have no partial piece to return");
@@ -1121,7 +1144,7 @@ public class PeerCoordinator implements PeerListener
} }
} }
} }
return wantPiece(peer, havePieces, false) >= 0; return wantPiece(peer, havePieces, false) != null;
} }
/** /**

View File

@@ -95,12 +95,12 @@ interface PeerListener
* will be closed. * will be closed.
* *
* @param peer the Peer that got the piece. * @param peer the Peer that got the piece.
* @param piece the piece number received. * @param piece the piece received.
* @param bs the byte array containing the piece. * @param bs the byte array containing the piece.
* *
* @return true when the bytes represent the piece, false otherwise. * @return true when the bytes represent the piece, false otherwise.
*/ */
boolean gotPiece(Peer peer, int piece, byte[] bs); boolean gotPiece(Peer peer, PartialPiece piece);
/** /**
* Called when the peer wants (part of) a piece from us. Only called * Called when the peer wants (part of) a piece from us. Only called
@@ -167,7 +167,7 @@ interface PeerListener
* @param peer the peer * @param peer the peer
* @since 0.8.2 * @since 0.8.2
*/ */
void savePartialPieces(Peer peer, List<PartialPiece> pcs); void savePartialPieces(Peer peer, List<Request> pcs);
/** /**
* Called when a peer has connected and there may be a partially * Called when a peer has connected and there may be a partially

View File

@@ -107,7 +107,7 @@ class PeerState implements DataLoader
// The only problem with returning the partials to the coordinator // The only problem with returning the partials to the coordinator
// is that chunks above a missing request are lost. // is that chunks above a missing request are lost.
// Future enhancements to PartialPiece could keep track of the holes. // Future enhancements to PartialPiece could keep track of the holes.
List<PartialPiece> pcs = returnPartialPieces(); List<Request> pcs = returnPartialPieces();
if (!pcs.isEmpty()) { if (!pcs.isEmpty()) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug(peer + " got choked, returning partial pieces to the PeerCoordinator: " + pcs); _log.debug(peer + " got choked, returning partial pieces to the PeerCoordinator: " + pcs);
@@ -304,22 +304,22 @@ class PeerState implements DataLoader
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("got end of Chunk(" _log.debug("got end of Chunk("
+ req.piece + "," + req.off + "," + req.len + ") from " + req.getPiece() + "," + req.off + "," + req.len + ") from "
+ peer); + peer);
// Last chunk needed for this piece? // Last chunk needed for this piece?
if (getFirstOutstandingRequest(req.piece) == -1) if (getFirstOutstandingRequest(req.getPiece()) == -1)
{ {
// warning - may block here for a while // warning - may block here for a while
if (listener.gotPiece(peer, req.piece, req.bs)) if (listener.gotPiece(peer, req.getPartialPiece()))
{ {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Got " + req.piece + ": " + peer); _log.debug("Got " + req.getPiece() + ": " + peer);
} }
else else
{ {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Got BAD " + req.piece + " from " + peer); _log.warn("Got BAD " + req.getPiece() + " from " + peer);
} }
} }
@@ -335,7 +335,7 @@ class PeerState implements DataLoader
synchronized private int getFirstOutstandingRequest(int piece) synchronized private int getFirstOutstandingRequest(int piece)
{ {
for (int i = 0; i < outstandingRequests.size(); i++) for (int i = 0; i < outstandingRequests.size(); i++)
if (outstandingRequests.get(i).piece == piece) if (outstandingRequests.get(i).getPiece() == piece)
return i; return i;
return -1; return -1;
} }
@@ -371,7 +371,7 @@ class PeerState implements DataLoader
synchronized(this) synchronized(this)
{ {
req = outstandingRequests.get(r); req = outstandingRequests.get(r);
while (req.piece == piece && req.off != begin while (req.getPiece() == piece && req.off != begin
&& r < outstandingRequests.size() - 1) && r < outstandingRequests.size() - 1)
{ {
r++; r++;
@@ -379,7 +379,7 @@ class PeerState implements DataLoader
} }
// Something wrong? // Something wrong?
if (req.piece != piece || req.off != begin || req.len != length) if (req.getPiece() != piece || req.off != begin || req.len != length)
{ {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Unrequested or unneeded 'piece: " _log.info("Unrequested or unneeded 'piece: "
@@ -427,13 +427,13 @@ class PeerState implements DataLoader
Request rv = null; Request rv = null;
int lowest = Integer.MAX_VALUE; int lowest = Integer.MAX_VALUE;
for (Request r : outstandingRequests) { for (Request r : outstandingRequests) {
if (r.piece == piece && r.off < lowest) { if (r.getPiece() == piece && r.off < lowest) {
lowest = r.off; lowest = r.off;
rv = r; rv = r;
} }
} }
if (pendingRequest != null && if (pendingRequest != null &&
pendingRequest.piece == piece && pendingRequest.off < lowest) pendingRequest.getPiece() == piece && pendingRequest.off < lowest)
rv = pendingRequest; rv = pendingRequest;
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
@@ -447,14 +447,16 @@ class PeerState implements DataLoader
* @return List of PartialPieces, even those with an offset == 0, or empty list * @return List of PartialPieces, even those with an offset == 0, or empty list
* @since 0.8.2 * @since 0.8.2
*/ */
synchronized List<PartialPiece> returnPartialPieces() synchronized List<Request> returnPartialPieces()
{ {
Set<Integer> pcs = getRequestedPieces(); Set<Integer> pcs = getRequestedPieces();
List<PartialPiece> rv = new ArrayList(pcs.size()); List<Request> rv = new ArrayList(pcs.size());
for (Integer p : pcs) { for (Integer p : pcs) {
Request req = getLowestOutstandingRequest(p.intValue()); Request req = getLowestOutstandingRequest(p.intValue());
if (req != null) if (req != null) {
rv.add(new PartialPiece(req)); req.getPartialPiece().setDownloaded(req.off);
rv.add(req);
}
} }
outstandingRequests.clear(); outstandingRequests.clear();
pendingRequest = null; pendingRequest = null;
@@ -468,9 +470,9 @@ class PeerState implements DataLoader
synchronized private Set<Integer> getRequestedPieces() { synchronized private Set<Integer> getRequestedPieces() {
Set<Integer> rv = new HashSet(outstandingRequests.size() + 1); Set<Integer> rv = new HashSet(outstandingRequests.size() + 1);
for (Request req : outstandingRequests) { for (Request req : outstandingRequests) {
rv.add(Integer.valueOf(req.piece)); rv.add(Integer.valueOf(req.getPiece()));
if (pendingRequest != null) if (pendingRequest != null)
rv.add(Integer.valueOf(pendingRequest.piece)); rv.add(Integer.valueOf(pendingRequest.getPiece()));
} }
return rv; return rv;
} }
@@ -571,14 +573,14 @@ class PeerState implements DataLoader
* @since 0.8.1 * @since 0.8.1
*/ */
synchronized void cancelPiece(int piece) { synchronized void cancelPiece(int piece) {
if (lastRequest != null && lastRequest.piece == piece) if (lastRequest != null && lastRequest.getPiece() == piece)
lastRequest = null; lastRequest = null;
Iterator<Request> it = outstandingRequests.iterator(); Iterator<Request> it = outstandingRequests.iterator();
while (it.hasNext()) while (it.hasNext())
{ {
Request req = it.next(); Request req = it.next();
if (req.piece == piece) if (req.getPiece() == piece)
{ {
it.remove(); it.remove();
// Send cancel even when we are choked to make sure that it is // Send cancel even when we are choked to make sure that it is
@@ -594,10 +596,10 @@ class PeerState implements DataLoader
* @since 0.8.1 * @since 0.8.1
*/ */
synchronized boolean isRequesting(int piece) { synchronized boolean isRequesting(int piece) {
if (pendingRequest != null && pendingRequest.piece == piece) if (pendingRequest != null && pendingRequest.getPiece() == piece)
return true; return true;
for (Request req : outstandingRequests) { for (Request req : outstandingRequests) {
if (req.piece == piece) if (req.getPiece() == piece)
return true; return true;
} }
return false; return false;
@@ -679,7 +681,7 @@ class PeerState implements DataLoader
{ {
int pieceLength; int pieceLength;
boolean isLastChunk; boolean isLastChunk;
pieceLength = metainfo.getPieceLength(lastRequest.piece); pieceLength = metainfo.getPieceLength(lastRequest.getPiece());
isLastChunk = lastRequest.off + lastRequest.len == pieceLength; isLastChunk = lastRequest.off + lastRequest.len == pieceLength;
// Last part of a piece? // Last part of a piece?
@@ -687,14 +689,13 @@ class PeerState implements DataLoader
more_pieces = requestNextPiece(); more_pieces = requestNextPiece();
else else
{ {
int nextPiece = lastRequest.piece; PartialPiece nextPiece = lastRequest.getPartialPiece();
int nextBegin = lastRequest.off + PARTSIZE; int nextBegin = lastRequest.off + PARTSIZE;
byte[] bs = lastRequest.bs;
int maxLength = pieceLength - nextBegin; int maxLength = pieceLength - nextBegin;
int nextLength = maxLength > PARTSIZE ? PARTSIZE int nextLength = maxLength > PARTSIZE ? PARTSIZE
: maxLength; : maxLength;
Request req Request req
= new Request(nextPiece, bs, nextBegin, nextLength); = new Request(nextPiece,nextBegin, nextLength);
outstandingRequests.add(req); outstandingRequests.add(req);
if (!choked) if (!choked)
out.sendRequest(req); out.sendRequest(req);
@@ -740,7 +741,7 @@ class PeerState implements DataLoader
// what piece to give us next. // what piece to give us next.
int nextPiece = listener.wantPiece(peer, bitfield); int nextPiece = listener.wantPiece(peer, bitfield);
if (nextPiece != -1 if (nextPiece != -1
&& (lastRequest == null || lastRequest.piece != nextPiece)) { && (lastRequest == null || lastRequest.getPiece() != nextPiece)) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug(peer + " want piece " + nextPiece); _log.debug(peer + " want piece " + nextPiece);
// Fail safe to make sure we are interested // Fail safe to make sure we are interested

View File

@@ -20,14 +20,16 @@
package org.klomp.snark; package org.klomp.snark;
import java.io.DataInputStream;
import java.io.IOException;
/** /**
* Holds all information needed for a partial piece request. * Holds all information needed for a partial piece request.
* This class should be used only by PeerState, PeerConnectionIn, and PeerConnectionOut. * This class should be used only by PeerState, PeerConnectionIn, and PeerConnectionOut.
*/ */
class Request class Request
{ {
final int piece; private final PartialPiece piece;
final byte[] bs;
final int off; final int off;
final int len; final int len;
long sendTime; long sendTime;
@@ -36,26 +38,49 @@ class Request
* Creates a new Request. * Creates a new Request.
* *
* @param piece Piece number requested. * @param piece Piece number requested.
* @param bs byte array where response should be stored.
* @param off the offset in the array. * @param off the offset in the array.
* @param len the number of bytes requested. * @param len the number of bytes requested.
*/ */
Request(int piece, byte[] bs, int off, int len) Request(PartialPiece piece, int off, int len)
{ {
// Sanity check
if (off < 0 || len <= 0 || off + len > piece.getLength())
throw new IndexOutOfBoundsException("Illegal Request " + toString());
this.piece = piece; this.piece = piece;
this.bs = bs;
this.off = off; this.off = off;
this.len = len; this.len = len;
}
// Sanity check /**
if (piece < 0 || off < 0 || len <= 0 || off + len > bs.length) * @since 0.9.1
throw new IndexOutOfBoundsException("Illegal Request " + toString()); */
public void read(DataInputStream din) throws IOException {
piece.read(din, off, len);
}
/**
* The piece number this Request is for
*
* @since 0.9.1
*/
public int getPiece() {
return piece.getPiece();
}
/**
* The PartialPiece this Request is for
*
* @since 0.9.1
*/
public PartialPiece getPartialPiece() {
return piece;
} }
@Override @Override
public int hashCode() public int hashCode()
{ {
return piece ^ off ^ len; return piece.getPiece() ^ off ^ len;
} }
@Override @Override
@@ -64,7 +89,7 @@ class Request
if (o instanceof Request) if (o instanceof Request)
{ {
Request req = (Request)o; Request req = (Request)o;
return req.piece == piece && req.off == off && req.len == len; return req.piece.equals(piece) && req.off == off && req.len == len;
} }
return false; return false;
@@ -73,6 +98,6 @@ class Request
@Override @Override
public String toString() public String toString()
{ {
return "(" + piece + "," + off + "," + len + ")"; return "(" + piece.getPiece() + "," + off + "," + len + ")";
} }
} }

View File

@@ -1214,7 +1214,7 @@ public class Snark
total += c.getCurrentUploadRate(); total += c.getCurrentUploadRate();
} }
long limit = 1024l * _util.getMaxUpBW(); long limit = 1024l * _util.getMaxUpBW();
debug("Total up bw: " + total + " Limit: " + limit, Snark.WARNING); debug("Total up bw: " + total + " Limit: " + limit, Snark.NOTICE);
return total > limit; return total > limit;
} }

View File

@@ -873,54 +873,55 @@ public class Storage
* matches), otherwise false. * matches), otherwise false.
* @exception IOException when some storage related error occurs. * @exception IOException when some storage related error occurs.
*/ */
public boolean putPiece(int piece, byte[] ba) throws IOException public boolean putPiece(PartialPiece pp) throws IOException
{ {
// First check if the piece is correct. int piece = pp.getPiece();
// Copy the array first to be paranoid. try {
byte[] bs = ba.clone(); synchronized(bitfield) {
int length = bs.length; if (bitfield.get(piece))
boolean correctHash = metainfo.checkPiece(piece, bs, 0, length); return true; // No need to store twice.
if (listener != null)
listener.storageChecked(this, piece, correctHash);
if (!correctHash)
return false;
synchronized(bitfield)
{
if (bitfield.get(piece))
return true; // No need to store twice.
}
// Early typecast, avoid possibly overflowing a temp integer
long start = (long) piece * (long) piece_size;
int i = 0;
long raflen = lengths[i];
while (start > raflen)
{
i++;
start -= raflen;
raflen = lengths[i];
}
int written = 0;
int off = 0;
while (written < length)
{
int need = length - written;
int len = (start + need < raflen) ? need : (int)(raflen - start);
synchronized(RAFlock[i])
{
checkRAF(i);
rafs[i].seek(start);
rafs[i].write(bs, off + written, len);
} }
written += len;
if (need - len > 0) // TODO alternative - check hash on the fly as we write to the file,
{ // to save another I/O pass
i++; boolean correctHash = metainfo.checkPiece(pp);
raflen = lengths[i]; if (listener != null)
start = 0; listener.storageChecked(this, piece, correctHash);
if (!correctHash) {
return false;
} }
// Early typecast, avoid possibly overflowing a temp integer
long start = (long) piece * (long) piece_size;
int i = 0;
long raflen = lengths[i];
while (start > raflen) {
i++;
start -= raflen;
raflen = lengths[i];
}
int written = 0;
int off = 0;
int length = metainfo.getPieceLength(piece);
while (written < length) {
int need = length - written;
int len = (start + need < raflen) ? need : (int)(raflen - start);
synchronized(RAFlock[i]) {
checkRAF(i);
rafs[i].seek(start);
//rafs[i].write(bs, off + written, len);
pp.write(rafs[i], off + written, len);
}
written += len;
if (need - len > 0) {
i++;
raflen = lengths[i];
start = 0;
}
}
} finally {
pp.release();
} }
changed = true; changed = true;

View File

@@ -150,6 +150,15 @@ public class I2PSnarkServlet extends DefaultServlet {
String path = req.getServletPath(); String path = req.getServletPath();
resp.setHeader("X-Frame-Options", "SAMEORIGIN"); resp.setHeader("X-Frame-Options", "SAMEORIGIN");
String peerParam = req.getParameter("p");
String peerString;
if (peerParam == null || (!_manager.util().connected()) ||
peerParam.replaceAll("[a-zA-Z0-9~=-]", "").length() > 0) { // XSS
peerString = "";
} else {
peerString = "?p=" + peerParam;
}
// AJAX for mainsection // AJAX for mainsection
if ("/.ajax/xhr1.html".equals(path)) { if ("/.ajax/xhr1.html".equals(path)) {
resp.setCharacterEncoding("UTF-8"); resp.setCharacterEncoding("UTF-8");
@@ -157,7 +166,7 @@ public class I2PSnarkServlet extends DefaultServlet {
PrintWriter out = resp.getWriter(); PrintWriter out = resp.getWriter();
//if (_log.shouldLog(Log.DEBUG)) //if (_log.shouldLog(Log.DEBUG))
// _manager.addMessage((_context.clock().now() / 1000) + " xhr1 p=" + req.getParameter("p")); // _manager.addMessage((_context.clock().now() / 1000) + " xhr1 p=" + req.getParameter("p"));
writeMessages(out, false); writeMessages(out, false, peerString);
writeTorrents(out, req); writeTorrents(out, req);
return; return;
} }
@@ -201,15 +210,6 @@ public class I2PSnarkServlet extends DefaultServlet {
_manager.addMessage("Please retry form submission (bad nonce)"); _manager.addMessage("Please retry form submission (bad nonce)");
} }
String peerParam = req.getParameter("p");
String peerString;
if (peerParam == null || (!_manager.util().connected()) ||
peerParam.replaceAll("[a-zA-Z0-9~=-]", "").length() > 0) { // XSS
peerString = "";
} else {
peerString = "?p=" + peerParam;
}
PrintWriter out = resp.getWriter(); PrintWriter out = resp.getWriter();
out.write(DOCTYPE + "<html>\n" + out.write(DOCTYPE + "<html>\n" +
"<head><link rel=\"shortcut icon\" href=\"" + _themePath + "favicon.ico\">\n" + "<head><link rel=\"shortcut icon\" href=\"" + _themePath + "favicon.ico\">\n" +
@@ -274,7 +274,7 @@ public class I2PSnarkServlet extends DefaultServlet {
_manager.addMessage(_("Click \"Add torrent\" button to fetch torrent")); _manager.addMessage(_("Click \"Add torrent\" button to fetch torrent"));
out.write("<div class=\"page\"><div id=\"mainsection\" class=\"mainsection\">"); out.write("<div class=\"page\"><div id=\"mainsection\" class=\"mainsection\">");
writeMessages(out, isConfigure); writeMessages(out, isConfigure, peerString);
if (isConfigure) { if (isConfigure) {
// end of mainsection div // end of mainsection div
@@ -294,7 +294,7 @@ public class I2PSnarkServlet extends DefaultServlet {
out.write(FOOTER); out.write(FOOTER);
} }
private void writeMessages(PrintWriter out, boolean isConfigure) throws IOException { private void writeMessages(PrintWriter out, boolean isConfigure, String peerString) throws IOException {
List<String> msgs = _manager.getMessages(); List<String> msgs = _manager.getMessages();
if (!msgs.isEmpty()) { if (!msgs.isEmpty()) {
out.write("<div class=\"snarkMessages\"><ul>"); out.write("<div class=\"snarkMessages\"><ul>");
@@ -302,11 +302,14 @@ public class I2PSnarkServlet extends DefaultServlet {
String msg = msgs.get(i); String msg = msgs.get(i);
out.write("<li>" + msg + "</li>\n"); out.write("<li>" + msg + "</li>\n");
} }
// lazy GET, lose p parameter
out.write("</ul><p><a href=\"/i2psnark/"); out.write("</ul><p><a href=\"/i2psnark/");
if (isConfigure) if (isConfigure)
out.write("configure"); out.write("configure");
out.write("?action=Clear&amp;nonce=" + _nonce + "\">" + _("clear messages") + "</a></p></div>"); if (peerString.length() > 0)
out.write(peerString + "&amp;");
else
out.write("?");
out.write("action=Clear&amp;nonce=" + _nonce + "\">" + _("clear messages") + "</a></p></div>");
} }
} }

View File

@@ -1,3 +1,11 @@
2012-05-19 zzz
* i2psnark:
- Store received chunks in temp files
- Don't allocate from heap for unneeded chunks
- Remove peer count restriction for torrents with large pieces
- Use priorities and rarest calculations to sort partials
- Preserve p parameter in clear messages link
2012-05-13 zzz 2012-05-13 zzz
* Console: Add X-Frame-Options to headers, * Console: Add X-Frame-Options to headers,
disable with routerconsole.disableXFrame=true disable with routerconsole.disableXFrame=true

View File

@@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */ /** deprecated */
public final static String ID = "Monotone"; public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION; public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 1; public final static long BUILD = 2;
/** for example "-test" */ /** for example "-test" */
public final static String EXTRA = ""; public final static String EXTRA = "";