From 6ca4b519bf0ffb863d27bb28ec6b3667dc6dadd7 Mon Sep 17 00:00:00 2001 From: zzz Date: Sat, 19 May 2012 13:27:02 +0000 Subject: [PATCH] * i2psnark: - Store received chunks in temp files - Don't allocate from heap for unneeded chunks - Remove peer count restriction for torrents with large pieces - Use priorities and rarest calculations to sort partials - Preserve p parameter in clear messages link --- .../src/org/klomp/snark/I2PSnarkUtil.java | 3 + .../java/src/org/klomp/snark/MetaInfo.java | 14 + .../src/org/klomp/snark/PartialPiece.java | 243 ++++++++++++++++-- .../java/src/org/klomp/snark/Peer.java | 2 +- .../src/org/klomp/snark/PeerConnectionIn.java | 9 +- .../org/klomp/snark/PeerConnectionOut.java | 8 +- .../src/org/klomp/snark/PeerCoordinator.java | 81 +++--- .../src/org/klomp/snark/PeerListener.java | 6 +- .../java/src/org/klomp/snark/PeerState.java | 53 ++-- .../java/src/org/klomp/snark/Request.java | 47 +++- .../java/src/org/klomp/snark/Snark.java | 2 +- .../java/src/org/klomp/snark/Storage.java | 87 +++---- .../org/klomp/snark/web/I2PSnarkServlet.java | 31 ++- history.txt | 8 + .../src/net/i2p/router/RouterVersion.java | 2 +- 15 files changed, 430 insertions(+), 166 deletions(-) diff --git a/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java b/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java index 81c56a194..55c151b6b 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java +++ b/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java @@ -184,6 +184,9 @@ public class I2PSnarkUtil { /** @since 0.8.9 */ public void setFilesPublic(boolean yes) { _areFilesPublic = yes; } + /** @since 0.9.1 */ + public File getTempDir() { return _tmpDir; } + /** * Connect to the router, if we aren't already */ diff --git a/apps/i2psnark/java/src/org/klomp/snark/MetaInfo.java b/apps/i2psnark/java/src/org/klomp/snark/MetaInfo.java index 00d0abfaf..7eef4e49b 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/MetaInfo.java +++ b/apps/i2psnark/java/src/org/klomp/snark/MetaInfo.java @@ -422,6 +422,20 @@ public class MetaInfo return false; return true; } + + /** + * @since 0.9.1 + */ + boolean checkPiece(PartialPiece pp) throws IOException { + MessageDigest sha1 = SHA1.getInstance(); + int piece = pp.getPiece(); + + byte[] hash = pp.getHash(); + for (int i = 0; i < 20; i++) + if (hash[i] != piece_hashes[20 * piece + i]) + return false; + return true; + } /** * Returns the total length of the torrent in bytes. diff --git a/apps/i2psnark/java/src/org/klomp/snark/PartialPiece.java b/apps/i2psnark/java/src/org/klomp/snark/PartialPiece.java index 3a7154899..2321398be 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PartialPiece.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PartialPiece.java @@ -1,6 +1,22 @@ package org.klomp.snark; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.security.MessageDigest; + +import net.i2p.I2PAppContext; +import net.i2p.crypto.SHA1; +import net.i2p.util.Log; +import net.i2p.util.SecureFile; + /** + * Store the received data either on the heap or in a temp file. + * The third option, to write chunks directly to the destination file, + * is unimplemented. + * * This is the class passed from PeerCoordinator to PeerState so * PeerState may start requests. * @@ -8,45 +24,81 @@ package org.klomp.snark; * a piece is not completely downloaded, for example * when the Peer disconnects or chokes. * + * New objects for the same piece are created during the end game - + * this object should not be shared among multiple peers. + * * @since 0.8.2 */ class PartialPiece implements Comparable { - private final int piece; + // we store the piece so we can use it in compareTo() + private final Piece piece; + // null if using temp file private final byte[] bs; - private final int off; - private final long createdTime; + private int off; + //private final long createdTime; + private File tempfile; + private RandomAccessFile raf; + private final int pclen; + private final File tempDir; + + // Any bigger than this, use temp file instead of heap + private static final int MAX_IN_MEM = 128 * 1024; + // May be reduced on OOM + private static int _max_in_mem = MAX_IN_MEM; /** * Used by PeerCoordinator. * Creates a new PartialPiece, with no chunks yet downloaded. - * Allocates the data. + * Allocates the data storage area, either on the heap or in the + * temp directory, depending on size. * * @param piece Piece number requested. * @param len must be equal to the piece length */ - public PartialPiece (int piece, int len) throws OutOfMemoryError { + public PartialPiece (Piece piece, int len, File tempDir) { this.piece = piece; - this.bs = new byte[len]; - this.off = 0; - this.createdTime = 0; + this.pclen = len; + //this.createdTime = 0; + this.tempDir = tempDir; + + // temps for finals + byte[] tbs = null; + try { + if (len <= MAX_IN_MEM) { + try { + tbs = new byte[len]; + return; + } catch (OutOfMemoryError oom) { + if (_max_in_mem > PeerState.PARTSIZE) + _max_in_mem /= 2; + Log log = I2PAppContext.getGlobalContext().logManager().getLog(PartialPiece.class); + log.logAlways(Log.WARN, "OOM creating new partial piece"); + // fall through to use temp file + } + } + // delay creating temp file until required in read() + } finally { + // finals + this.bs = tbs; + } } /** - * Used by PeerState. - * Creates a new PartialPiece, with chunks up to but not including - * firstOutstandingRequest already downloaded and stored in the Request byte array. + * Caller must synchronize * - * Note that this cannot handle gaps; chunks after a missing chunk cannot be saved. - * That would be harder. - * - * @param firstOutstandingRequest the first request not fulfilled for the piece + * @since 0.9.1 */ - public PartialPiece (Request firstOutstandingRequest) { - this.piece = firstOutstandingRequest.piece; - this.bs = firstOutstandingRequest.bs; - this.off = firstOutstandingRequest.off; - this.createdTime = System.currentTimeMillis(); + private void createTemp() throws IOException { + //tfile = SecureFile.createTempFile("piece", null, tempDir); + // debug + tempfile = SecureFile.createTempFile("piece_" + piece.getId() + '_', null, tempDir); + //I2PAppContext.getGlobalContext().logManager().getLog(PartialPiece.class).warn("Created " + tempfile); + // tfile.deleteOnExit() ??? + raf = new RandomAccessFile(tempfile, "rw"); + // Do not preallocate the file space. + // Not necessary to call setLength(), file is extended when written + //traf.setLength(len); } /** @@ -55,33 +107,168 @@ class PartialPiece implements Comparable { */ public Request getRequest() { - return new Request(this.piece, this.bs, this.off, Math.min(this.bs.length - this.off, PeerState.PARTSIZE)); + return new Request(this, this.off, Math.min(this.pclen - this.off, PeerState.PARTSIZE)); } /** piece number */ public int getPiece() { - return this.piece; + return this.piece.getId(); } - /** how many bytes are good */ + /** + * @since 0.9.1 + */ + public int getLength() { + return this.pclen; + } + + /** + * How many bytes are good - only valid by setDownloaded() + */ public int getDownloaded() { return this.off; } + /** + * Call this before returning a PartialPiece to the PeerCoordinator + * @since 0.9.1 + */ + public void setDownloaded(int offset) { + this.off = offset; + } + +/**** public long getCreated() { return this.createdTime; } +****/ /** - * Highest downloaded first + * Piece must be complete. + * The SHA1 hash of the completely read data. + * @since 0.9.1 + */ + public byte[] getHash() throws IOException { + MessageDigest sha1 = SHA1.getInstance(); + if (bs != null) { + sha1.update(bs); + } else { + int read = 0; + byte[] buf = new byte[Math.min(pclen, 16384)]; + synchronized (this) { + if (raf == null) + throw new IOException(); + raf.seek(0); + while (read < pclen) { + int rd = raf.read(buf, 0, Math.min(buf.length, pclen - read)); + if (rd < 0) + break; + read += rd; + sha1.update(buf, 0, rd); + } + } + if (read < pclen) + throw new IOException(); + } + return sha1.digest(); + } + + /** + * Blocking. + * @since 0.9.1 + */ + public void read(DataInputStream din, int off, int len) throws IOException { + if (bs != null) { + din.readFully(bs, off, len); + } else { + // read in fully before synching on raf + byte[] tmp = new byte[len]; + din.readFully(tmp); + synchronized (this) { + if (raf == null) + createTemp(); + raf.seek(off); + raf.write(tmp); + } + } + } + + /** + * Piece must be complete. + * Caller must synchronize on out and seek to starting point. + * Caller must call release() when done with the whole piece. + * + * @param start offset in the output file + * @param offset offset in the piece + * @param len length to write + * @since 0.9.1 + */ + public void write(DataOutput out, int offset, int len) throws IOException { + if (bs != null) { + out.write(bs, offset, len); + } else { + int read = 0; + byte[] buf = new byte[Math.min(len, 16384)]; + synchronized (this) { + if (raf == null) + throw new IOException(); + raf.seek(offset); + while (read < len) { + int rd = Math.min(buf.length, len - read); + raf.readFully(buf, 0, rd); + read += rd; + out.write(buf, 0, rd); + } + } + } + } + + /** + * Release all resources. + * + * @since 0.9.1 + */ + public void release() { + if (bs == null) { + synchronized (this) { + if (raf != null) + locked_release(); + } + //if (raf != null) + // I2PAppContext.getGlobalContext().logManager().getLog(PartialPiece.class).warn("Released " + tempfile); + } + } + + /** + * Caller must synchronize + * + * @since 0.9.1 + */ + private void locked_release() { + try { + raf.close(); + } catch (IOException ioe) { + I2PAppContext.getGlobalContext().logManager().getLog(PartialPiece.class).warn("Error closing " + raf, ioe); + } + tempfile.delete(); + } + + /* + * Highest priority first, + * then rarest first, + * then highest downloaded first */ public int compareTo(Object o) throws ClassCastException { - return ((PartialPiece)o).off - this.off; // reverse + PartialPiece opp = (PartialPiece)o; + int d = this.piece.compareTo(opp.piece); + if (d != 0) + return d; + return opp.off - this.off; // reverse } @Override public int hashCode() { - return piece * 7777; + return piece.getId() * 7777; } /** @@ -92,13 +279,13 @@ class PartialPiece implements Comparable { public boolean equals(Object o) { if (o instanceof PartialPiece) { PartialPiece pp = (PartialPiece)o; - return pp.piece == this.piece; + return pp.piece.getId() == this.piece.getId(); } return false; } @Override public String toString() { - return "Partial(" + piece + ',' + off + ',' + bs.length + ')'; + return "Partial(" + piece.getId() + ',' + off + ',' + pclen + ')'; } } diff --git a/apps/i2psnark/java/src/org/klomp/snark/Peer.java b/apps/i2psnark/java/src/org/klomp/snark/Peer.java index 1330365ce..02fb63560 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/Peer.java +++ b/apps/i2psnark/java/src/org/klomp/snark/Peer.java @@ -460,7 +460,7 @@ public class Peer implements Comparable if (this.deregister) { PeerListener p = s.listener; if (p != null) { - List pcs = s.returnPartialPieces(); + List pcs = s.returnPartialPieces(); if (!pcs.isEmpty()) p.savePartialPieces(this, pcs); // now covered by savePartialPieces diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionIn.java b/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionIn.java index 33da75263..c92e30bf9 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionIn.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionIn.java @@ -148,11 +148,9 @@ class PeerConnectionIn implements Runnable begin = din.readInt(); len = i-9; Request req = ps.getOutstandingRequest(piece, begin, len); - byte[] piece_bytes; if (req != null) { - piece_bytes = req.bs; - din.readFully(piece_bytes, begin, len); + req.read(din); ps.pieceMessage(req); if (_log.shouldLog(Log.DEBUG)) _log.debug("Received data(" + piece + "," + begin + ") from " + peer); @@ -160,8 +158,9 @@ class PeerConnectionIn implements Runnable else { // XXX - Consume but throw away afterwards. - piece_bytes = new byte[len]; - din.readFully(piece_bytes); + int rcvd = din.skipBytes(len); + if (rcvd != len) + throw new IOException("EOF reading unwanted data"); if (_log.shouldLog(Log.DEBUG)) _log.debug("Received UNWANTED data(" + piece + "," + begin + ") from " + peer); } diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionOut.java b/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionOut.java index 61bc21864..62e58ad8d 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionOut.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionOut.java @@ -395,7 +395,7 @@ class PeerConnectionOut implements Runnable while (it.hasNext()) { Message m = (Message)it.next(); - if (m.type == Message.REQUEST && m.piece == req.piece && + if (m.type == Message.REQUEST && m.piece == req.getPiece() && m.begin == req.off && m.length == req.len) { if (_log.shouldLog(Log.DEBUG)) @@ -406,7 +406,7 @@ class PeerConnectionOut implements Runnable } Message m = new Message(); m.type = Message.REQUEST; - m.piece = req.piece; + m.piece = req.getPiece(); m.begin = req.off; m.length = req.len; addMessage(m); @@ -492,7 +492,7 @@ class PeerConnectionOut implements Runnable { Message m = (Message)it.next(); if (m.type == Message.REQUEST - && m.piece == req.piece + && m.piece == req.getPiece() && m.begin == req.off && m.length == req.len) it.remove(); @@ -502,7 +502,7 @@ class PeerConnectionOut implements Runnable // Always send, just to be sure it it is really canceled. Message m = new Message(); m.type = Message.CANCEL; - m.piece = req.piece; + m.piece = req.getPiece(); m.begin = req.off; m.length = req.len; addMessage(m); diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java b/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java index c9f70ba25..3e766b5f7 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java @@ -116,7 +116,7 @@ public class PeerCoordinator implements PeerListener */ private final List wantedPieces; - /** partial pieces - lock by synching on wantedPieces */ + /** partial pieces - lock by synching on wantedPieces - TODO store Requests, not PartialPieces */ private final List partialPieces; private boolean halted = false; @@ -349,11 +349,12 @@ public class PeerCoordinator implements PeerListener return 6; int size = metainfo.getPieceLength(0); int max = _util.getMaxConnections(); - if (size <= 512*1024 || completed()) + // Now that we use temp files, no memory concern + //if (size <= 512*1024 || completed()) return max; - if (size <= 1024*1024) - return (max + max + 2) / 3; - return (max + 2) / 3; + //if (size <= 1024*1024) + // return (max + max + 2) / 3; + //return (max + 2) / 3; } public boolean halted() { return halted; } @@ -380,6 +381,9 @@ public class PeerCoordinator implements PeerListener } // delete any saved orphan partial piece synchronized (partialPieces) { + for (PartialPiece pp : partialPieces) { + pp.release(); + } partialPieces.clear(); } } @@ -630,22 +634,23 @@ public class PeerCoordinator implements PeerListener * -1 if none of the given pieces are wanted. */ public int wantPiece(Peer peer, BitField havePieces) { - return wantPiece(peer, havePieces, true); + Piece pc = wantPiece(peer, havePieces, true); + return pc != null ? pc.getId() : -1; } /** * Returns one of pieces in the given BitField that is still wanted or - * -1 if none of the given pieces are wanted. + * null if none of the given pieces are wanted. * * @param record if true, actually record in our data structures that we gave the * request to this peer. If false, do not update the data structures. * @since 0.8.2 */ - private int wantPiece(Peer peer, BitField havePieces, boolean record) { + private Piece wantPiece(Peer peer, BitField havePieces, boolean record) { if (halted) { if (_log.shouldLog(Log.WARN)) _log.warn("We don't want anything from the peer, as we are halted! peer=" + peer); - return -1; + return null; } Piece piece = null; @@ -680,7 +685,7 @@ public class PeerCoordinator implements PeerListener // If we do end game all the time, we generate lots of extra traffic // when the seeder is super-slow and all the peers are "caught up" if (wantedSize > END_GAME_THRESHOLD) - return -1; // nothing to request and not in end game + return null; // nothing to request and not in end game // let's not all get on the same piece // Even better would be to sort by number of requests if (record) @@ -704,7 +709,7 @@ public class PeerCoordinator implements PeerListener _log.warn("nothing to even rerequest from " + peer + ": requested = " + requested); // _log.warn("nothing to even rerequest from " + peer + ": requested = " + requested // + " wanted = " + wantedPieces + " peerHas = " + havePieces); - return -1; //If we still can't find a piece we want, so be it. + return null; //If we still can't find a piece we want, so be it. } else { // Should be a lot smarter here - // share blocks rather than starting from 0 with each peer. @@ -719,7 +724,7 @@ public class PeerCoordinator implements PeerListener _log.info(peer + " is now requesting: piece " + piece + " priority " + piece.getPriority()); piece.setRequested(peer, true); } - return piece.getId(); + return piece; } // synch } @@ -846,10 +851,11 @@ public class PeerCoordinator implements PeerListener * * @throws RuntimeException on IOE saving the piece */ - public boolean gotPiece(Peer peer, int piece, byte[] bs) + public boolean gotPiece(Peer peer, PartialPiece pp) { if (metainfo == null || storage == null) return true; + int piece = pp.getPiece(); if (halted) { _log.info("Got while-halted piece " + piece + "/" + metainfo.getPieces() +" from " + peer + " for " + metainfo.getName()); return true; // We don't actually care anymore. @@ -872,7 +878,7 @@ public class PeerCoordinator implements PeerListener try { - if (storage.putPiece(piece, bs)) + if (storage.putPiece(pp)) { if (_log.shouldLog(Log.INFO)) _log.info("Got valid piece " + piece + "/" + metainfo.getPieces() +" from " + peer + " for " + metainfo.getName()); @@ -922,6 +928,15 @@ public class PeerCoordinator implements PeerListener p.disconnect(true); } + if (completed()) { + synchronized (partialPieces) { + for (PartialPiece ppp : partialPieces) { + ppp.release(); + } + partialPieces.clear(); + } + } + return true; } @@ -998,17 +1013,24 @@ public class PeerCoordinator implements PeerListener * Also mark the piece unrequested if this peer was the only one. * * @param peer partials, must include the zero-offset (empty) ones too + * No dup pieces, piece.setDownloaded() must be set * @since 0.8.2 */ - public void savePartialPieces(Peer peer, List partials) + public void savePartialPieces(Peer peer, List partials) { - if (halted) - return; if (_log.shouldLog(Log.INFO)) _log.info("Partials received from " + peer + ": " + partials); + if (halted || completed()) { + for (Request req : partials) { + PartialPiece pp = req.getPartialPiece(); + pp.release(); + } + return; + } synchronized(wantedPieces) { - for (PartialPiece pp : partials) { - if (pp.getDownloaded() > 0) { + for (Request req : partials) { + PartialPiece pp = req.getPartialPiece(); + if (req.off > 0) { // PartialPiece.equals() only compares piece number, which is what we want int idx = partialPieces.indexOf(pp); if (idx < 0) { @@ -1017,10 +1039,12 @@ public class PeerCoordinator implements PeerListener _log.info("Saving orphaned partial piece (new) " + pp); } else if (idx >= 0 && pp.getDownloaded() > partialPieces.get(idx).getDownloaded()) { // replace what's there now + partialPieces.get(idx).release(); partialPieces.set(idx, pp); if (_log.shouldLog(Log.INFO)) _log.info("Saving orphaned partial piece (bigger) " + pp); } else { + pp.release(); if (_log.shouldLog(Log.INFO)) _log.info("Discarding partial piece (not bigger)" + pp); } @@ -1029,10 +1053,14 @@ public class PeerCoordinator implements PeerListener // sorts by remaining bytes, least first Collections.sort(partialPieces); PartialPiece gone = partialPieces.remove(max); + gone.release(); if (_log.shouldLog(Log.INFO)) _log.info("Discarding orphaned partial piece (list full)" + gone); } - } // else drop the empty partial piece + } else { + // drop the empty partial piece + pp.release(); + } // synchs on wantedPieces... markUnrequested(peer, pp.getPiece()); } @@ -1079,14 +1107,9 @@ public class PeerCoordinator implements PeerListener } // ...and this section turns this into the general move-requests-around code! // Temporary? So PeerState never calls wantPiece() directly for now... - int piece = wantPiece(peer, havePieces); - if (piece >= 0) { - try { - return new PartialPiece(piece, metainfo.getPieceLength(piece)); - } catch (OutOfMemoryError oom) { - if (_log.shouldLog(Log.WARN)) - _log.warn("OOM creating new partial piece"); - } + Piece piece = wantPiece(peer, havePieces, true); + if (piece != null) { + return new PartialPiece(piece, metainfo.getPieceLength(piece.getId()), _util.getTempDir()); } if (_log.shouldLog(Log.DEBUG)) _log.debug("We have no partial piece to return"); @@ -1121,7 +1144,7 @@ public class PeerCoordinator implements PeerListener } } } - return wantPiece(peer, havePieces, false) >= 0; + return wantPiece(peer, havePieces, false) != null; } /** diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerListener.java b/apps/i2psnark/java/src/org/klomp/snark/PeerListener.java index c7650a552..f0285aaef 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerListener.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerListener.java @@ -95,12 +95,12 @@ interface PeerListener * will be closed. * * @param peer the Peer that got the piece. - * @param piece the piece number received. + * @param piece the piece received. * @param bs the byte array containing the piece. * * @return true when the bytes represent the piece, false otherwise. */ - boolean gotPiece(Peer peer, int piece, byte[] bs); + boolean gotPiece(Peer peer, PartialPiece piece); /** * Called when the peer wants (part of) a piece from us. Only called @@ -167,7 +167,7 @@ interface PeerListener * @param peer the peer * @since 0.8.2 */ - void savePartialPieces(Peer peer, List pcs); + void savePartialPieces(Peer peer, List pcs); /** * Called when a peer has connected and there may be a partially diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerState.java b/apps/i2psnark/java/src/org/klomp/snark/PeerState.java index 42a67ea7b..04e92ba28 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerState.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerState.java @@ -107,7 +107,7 @@ class PeerState implements DataLoader // The only problem with returning the partials to the coordinator // is that chunks above a missing request are lost. // Future enhancements to PartialPiece could keep track of the holes. - List pcs = returnPartialPieces(); + List pcs = returnPartialPieces(); if (!pcs.isEmpty()) { if (_log.shouldLog(Log.DEBUG)) _log.debug(peer + " got choked, returning partial pieces to the PeerCoordinator: " + pcs); @@ -304,22 +304,22 @@ class PeerState implements DataLoader if (_log.shouldLog(Log.DEBUG)) _log.debug("got end of Chunk(" - + req.piece + "," + req.off + "," + req.len + ") from " + + req.getPiece() + "," + req.off + "," + req.len + ") from " + peer); // Last chunk needed for this piece? - if (getFirstOutstandingRequest(req.piece) == -1) + if (getFirstOutstandingRequest(req.getPiece()) == -1) { // warning - may block here for a while - if (listener.gotPiece(peer, req.piece, req.bs)) + if (listener.gotPiece(peer, req.getPartialPiece())) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Got " + req.piece + ": " + peer); + _log.debug("Got " + req.getPiece() + ": " + peer); } else { if (_log.shouldLog(Log.WARN)) - _log.warn("Got BAD " + req.piece + " from " + peer); + _log.warn("Got BAD " + req.getPiece() + " from " + peer); } } @@ -335,7 +335,7 @@ class PeerState implements DataLoader synchronized private int getFirstOutstandingRequest(int piece) { for (int i = 0; i < outstandingRequests.size(); i++) - if (outstandingRequests.get(i).piece == piece) + if (outstandingRequests.get(i).getPiece() == piece) return i; return -1; } @@ -371,7 +371,7 @@ class PeerState implements DataLoader synchronized(this) { req = outstandingRequests.get(r); - while (req.piece == piece && req.off != begin + while (req.getPiece() == piece && req.off != begin && r < outstandingRequests.size() - 1) { r++; @@ -379,7 +379,7 @@ class PeerState implements DataLoader } // Something wrong? - if (req.piece != piece || req.off != begin || req.len != length) + if (req.getPiece() != piece || req.off != begin || req.len != length) { if (_log.shouldLog(Log.INFO)) _log.info("Unrequested or unneeded 'piece: " @@ -427,13 +427,13 @@ class PeerState implements DataLoader Request rv = null; int lowest = Integer.MAX_VALUE; for (Request r : outstandingRequests) { - if (r.piece == piece && r.off < lowest) { + if (r.getPiece() == piece && r.off < lowest) { lowest = r.off; rv = r; } } if (pendingRequest != null && - pendingRequest.piece == piece && pendingRequest.off < lowest) + pendingRequest.getPiece() == piece && pendingRequest.off < lowest) rv = pendingRequest; if (_log.shouldLog(Log.DEBUG)) @@ -447,14 +447,16 @@ class PeerState implements DataLoader * @return List of PartialPieces, even those with an offset == 0, or empty list * @since 0.8.2 */ - synchronized List returnPartialPieces() + synchronized List returnPartialPieces() { Set pcs = getRequestedPieces(); - List rv = new ArrayList(pcs.size()); + List rv = new ArrayList(pcs.size()); for (Integer p : pcs) { Request req = getLowestOutstandingRequest(p.intValue()); - if (req != null) - rv.add(new PartialPiece(req)); + if (req != null) { + req.getPartialPiece().setDownloaded(req.off); + rv.add(req); + } } outstandingRequests.clear(); pendingRequest = null; @@ -468,9 +470,9 @@ class PeerState implements DataLoader synchronized private Set getRequestedPieces() { Set rv = new HashSet(outstandingRequests.size() + 1); for (Request req : outstandingRequests) { - rv.add(Integer.valueOf(req.piece)); + rv.add(Integer.valueOf(req.getPiece())); if (pendingRequest != null) - rv.add(Integer.valueOf(pendingRequest.piece)); + rv.add(Integer.valueOf(pendingRequest.getPiece())); } return rv; } @@ -571,14 +573,14 @@ class PeerState implements DataLoader * @since 0.8.1 */ synchronized void cancelPiece(int piece) { - if (lastRequest != null && lastRequest.piece == piece) + if (lastRequest != null && lastRequest.getPiece() == piece) lastRequest = null; Iterator it = outstandingRequests.iterator(); while (it.hasNext()) { Request req = it.next(); - if (req.piece == piece) + if (req.getPiece() == piece) { it.remove(); // Send cancel even when we are choked to make sure that it is @@ -594,10 +596,10 @@ class PeerState implements DataLoader * @since 0.8.1 */ synchronized boolean isRequesting(int piece) { - if (pendingRequest != null && pendingRequest.piece == piece) + if (pendingRequest != null && pendingRequest.getPiece() == piece) return true; for (Request req : outstandingRequests) { - if (req.piece == piece) + if (req.getPiece() == piece) return true; } return false; @@ -679,7 +681,7 @@ class PeerState implements DataLoader { int pieceLength; boolean isLastChunk; - pieceLength = metainfo.getPieceLength(lastRequest.piece); + pieceLength = metainfo.getPieceLength(lastRequest.getPiece()); isLastChunk = lastRequest.off + lastRequest.len == pieceLength; // Last part of a piece? @@ -687,14 +689,13 @@ class PeerState implements DataLoader more_pieces = requestNextPiece(); else { - int nextPiece = lastRequest.piece; + PartialPiece nextPiece = lastRequest.getPartialPiece(); int nextBegin = lastRequest.off + PARTSIZE; - byte[] bs = lastRequest.bs; int maxLength = pieceLength - nextBegin; int nextLength = maxLength > PARTSIZE ? PARTSIZE : maxLength; Request req - = new Request(nextPiece, bs, nextBegin, nextLength); + = new Request(nextPiece,nextBegin, nextLength); outstandingRequests.add(req); if (!choked) out.sendRequest(req); @@ -740,7 +741,7 @@ class PeerState implements DataLoader // what piece to give us next. int nextPiece = listener.wantPiece(peer, bitfield); if (nextPiece != -1 - && (lastRequest == null || lastRequest.piece != nextPiece)) { + && (lastRequest == null || lastRequest.getPiece() != nextPiece)) { if (_log.shouldLog(Log.DEBUG)) _log.debug(peer + " want piece " + nextPiece); // Fail safe to make sure we are interested diff --git a/apps/i2psnark/java/src/org/klomp/snark/Request.java b/apps/i2psnark/java/src/org/klomp/snark/Request.java index 6c086ebae..d6a621b1f 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/Request.java +++ b/apps/i2psnark/java/src/org/klomp/snark/Request.java @@ -20,14 +20,16 @@ package org.klomp.snark; +import java.io.DataInputStream; +import java.io.IOException; + /** * Holds all information needed for a partial piece request. * This class should be used only by PeerState, PeerConnectionIn, and PeerConnectionOut. */ class Request { - final int piece; - final byte[] bs; + private final PartialPiece piece; final int off; final int len; long sendTime; @@ -36,26 +38,49 @@ class Request * Creates a new Request. * * @param piece Piece number requested. - * @param bs byte array where response should be stored. * @param off the offset in the array. * @param len the number of bytes requested. */ - Request(int piece, byte[] bs, int off, int len) + Request(PartialPiece piece, int off, int len) { + // Sanity check + if (off < 0 || len <= 0 || off + len > piece.getLength()) + throw new IndexOutOfBoundsException("Illegal Request " + toString()); + this.piece = piece; - this.bs = bs; this.off = off; this.len = len; + } - // Sanity check - if (piece < 0 || off < 0 || len <= 0 || off + len > bs.length) - throw new IndexOutOfBoundsException("Illegal Request " + toString()); + /** + * @since 0.9.1 + */ + public void read(DataInputStream din) throws IOException { + piece.read(din, off, len); + } + + /** + * The piece number this Request is for + * + * @since 0.9.1 + */ + public int getPiece() { + return piece.getPiece(); + } + + /** + * The PartialPiece this Request is for + * + * @since 0.9.1 + */ + public PartialPiece getPartialPiece() { + return piece; } @Override public int hashCode() { - return piece ^ off ^ len; + return piece.getPiece() ^ off ^ len; } @Override @@ -64,7 +89,7 @@ class Request if (o instanceof Request) { Request req = (Request)o; - return req.piece == piece && req.off == off && req.len == len; + return req.piece.equals(piece) && req.off == off && req.len == len; } return false; @@ -73,6 +98,6 @@ class Request @Override public String toString() { - return "(" + piece + "," + off + "," + len + ")"; + return "(" + piece.getPiece() + "," + off + "," + len + ")"; } } diff --git a/apps/i2psnark/java/src/org/klomp/snark/Snark.java b/apps/i2psnark/java/src/org/klomp/snark/Snark.java index 5ae2fbe1f..75b667269 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/Snark.java +++ b/apps/i2psnark/java/src/org/klomp/snark/Snark.java @@ -1214,7 +1214,7 @@ public class Snark total += c.getCurrentUploadRate(); } long limit = 1024l * _util.getMaxUpBW(); - debug("Total up bw: " + total + " Limit: " + limit, Snark.WARNING); + debug("Total up bw: " + total + " Limit: " + limit, Snark.NOTICE); return total > limit; } diff --git a/apps/i2psnark/java/src/org/klomp/snark/Storage.java b/apps/i2psnark/java/src/org/klomp/snark/Storage.java index 63057a18b..34b81bbea 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/Storage.java +++ b/apps/i2psnark/java/src/org/klomp/snark/Storage.java @@ -873,54 +873,55 @@ public class Storage * matches), otherwise false. * @exception IOException when some storage related error occurs. */ - public boolean putPiece(int piece, byte[] ba) throws IOException + public boolean putPiece(PartialPiece pp) throws IOException { - // First check if the piece is correct. - // Copy the array first to be paranoid. - byte[] bs = ba.clone(); - int length = bs.length; - boolean correctHash = metainfo.checkPiece(piece, bs, 0, length); - if (listener != null) - listener.storageChecked(this, piece, correctHash); - if (!correctHash) - return false; + int piece = pp.getPiece(); + try { + synchronized(bitfield) { + if (bitfield.get(piece)) + return true; // No need to store twice. + } - synchronized(bitfield) - { - if (bitfield.get(piece)) - return true; // No need to store twice. - } + // TODO alternative - check hash on the fly as we write to the file, + // to save another I/O pass + boolean correctHash = metainfo.checkPiece(pp); + if (listener != null) + listener.storageChecked(this, piece, correctHash); + if (!correctHash) { + return false; + } - // Early typecast, avoid possibly overflowing a temp integer - long start = (long) piece * (long) piece_size; - int i = 0; - long raflen = lengths[i]; - while (start > raflen) - { - i++; - start -= raflen; - raflen = lengths[i]; - } + // Early typecast, avoid possibly overflowing a temp integer + long start = (long) piece * (long) piece_size; + int i = 0; + long raflen = lengths[i]; + while (start > raflen) { + i++; + start -= raflen; + raflen = lengths[i]; + } - int written = 0; - int off = 0; - while (written < length) - { - int need = length - written; - int len = (start + need < raflen) ? need : (int)(raflen - start); - synchronized(RAFlock[i]) - { - checkRAF(i); - rafs[i].seek(start); - rafs[i].write(bs, off + written, len); - } - written += len; - if (need - len > 0) - { - i++; - raflen = lengths[i]; - start = 0; + int written = 0; + int off = 0; + int length = metainfo.getPieceLength(piece); + while (written < length) { + int need = length - written; + int len = (start + need < raflen) ? need : (int)(raflen - start); + synchronized(RAFlock[i]) { + checkRAF(i); + rafs[i].seek(start); + //rafs[i].write(bs, off + written, len); + pp.write(rafs[i], off + written, len); + } + written += len; + if (need - len > 0) { + i++; + raflen = lengths[i]; + start = 0; + } } + } finally { + pp.release(); } changed = true; diff --git a/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java b/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java index 9c90639cb..06e4a6f43 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java +++ b/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java @@ -150,6 +150,15 @@ public class I2PSnarkServlet extends DefaultServlet { String path = req.getServletPath(); resp.setHeader("X-Frame-Options", "SAMEORIGIN"); + String peerParam = req.getParameter("p"); + String peerString; + if (peerParam == null || (!_manager.util().connected()) || + peerParam.replaceAll("[a-zA-Z0-9~=-]", "").length() > 0) { // XSS + peerString = ""; + } else { + peerString = "?p=" + peerParam; + } + // AJAX for mainsection if ("/.ajax/xhr1.html".equals(path)) { resp.setCharacterEncoding("UTF-8"); @@ -157,7 +166,7 @@ public class I2PSnarkServlet extends DefaultServlet { PrintWriter out = resp.getWriter(); //if (_log.shouldLog(Log.DEBUG)) // _manager.addMessage((_context.clock().now() / 1000) + " xhr1 p=" + req.getParameter("p")); - writeMessages(out, false); + writeMessages(out, false, peerString); writeTorrents(out, req); return; } @@ -201,15 +210,6 @@ public class I2PSnarkServlet extends DefaultServlet { _manager.addMessage("Please retry form submission (bad nonce)"); } - String peerParam = req.getParameter("p"); - String peerString; - if (peerParam == null || (!_manager.util().connected()) || - peerParam.replaceAll("[a-zA-Z0-9~=-]", "").length() > 0) { // XSS - peerString = ""; - } else { - peerString = "?p=" + peerParam; - } - PrintWriter out = resp.getWriter(); out.write(DOCTYPE + "\n" + "\n" + @@ -274,7 +274,7 @@ public class I2PSnarkServlet extends DefaultServlet { _manager.addMessage(_("Click \"Add torrent\" button to fetch torrent")); out.write("
"); - writeMessages(out, isConfigure); + writeMessages(out, isConfigure, peerString); if (isConfigure) { // end of mainsection div @@ -294,7 +294,7 @@ public class I2PSnarkServlet extends DefaultServlet { out.write(FOOTER); } - private void writeMessages(PrintWriter out, boolean isConfigure) throws IOException { + private void writeMessages(PrintWriter out, boolean isConfigure, String peerString) throws IOException { List msgs = _manager.getMessages(); if (!msgs.isEmpty()) { out.write("
    "); @@ -302,11 +302,14 @@ public class I2PSnarkServlet extends DefaultServlet { String msg = msgs.get(i); out.write("
  • " + msg + "
  • \n"); } - // lazy GET, lose p parameter out.write("

" + _("clear messages") + "

"); + if (peerString.length() > 0) + out.write(peerString + "&"); + else + out.write("?"); + out.write("action=Clear&nonce=" + _nonce + "\">" + _("clear messages") + "

"); } } diff --git a/history.txt b/history.txt index af1ec21bf..1cf1b77a4 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,11 @@ +2012-05-19 zzz + * i2psnark: + - Store received chunks in temp files + - Don't allocate from heap for unneeded chunks + - Remove peer count restriction for torrents with large pieces + - Use priorities and rarest calculations to sort partials + - Preserve p parameter in clear messages link + 2012-05-13 zzz * Console: Add X-Frame-Options to headers, disable with routerconsole.disableXFrame=true diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 87e5bffca..282c18b42 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 1; + public final static long BUILD = 2; /** for example "-test" */ public final static String EXTRA = "";