diff --git a/apps/i2psnark/java/src/org/klomp/snark/DataLoader.java b/apps/i2psnark/java/src/org/klomp/snark/DataLoader.java index 0bd974f53..c21685ac4 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/DataLoader.java +++ b/apps/i2psnark/java/src/org/klomp/snark/DataLoader.java @@ -1,5 +1,7 @@ package org.klomp.snark; +import net.i2p.data.ByteArray; + /** * Callback used to fetch data * @since 0.8.2 @@ -10,5 +12,5 @@ interface DataLoader * This is the callback that PeerConnectionOut calls to get the data from disk * @return bytes or null for errors */ - public byte[] loadData(int piece, int begin, int length); + public ByteArray loadData(int piece, int begin, int length); } diff --git a/apps/i2psnark/java/src/org/klomp/snark/Message.java b/apps/i2psnark/java/src/org/klomp/snark/Message.java index 42682b5f2..9f2197d2b 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/Message.java +++ b/apps/i2psnark/java/src/org/klomp/snark/Message.java @@ -23,8 +23,13 @@ package org.klomp.snark; import java.io.DataOutputStream; import java.io.IOException; -// Used to queue outgoing connections -// sendMessage() should be used to translate them to wire format. +import net.i2p.data.ByteArray; +import net.i2p.util.ByteCache; + +/** + * Used to queue outgoing connections + * sendMessage() should be used to translate them to wire format. + */ class Message { final static byte KEEP_ALIVE = -1; @@ -69,6 +74,9 @@ class Message // now unused //SimpleTimer.TimedEvent expireEvent; + private static final int BUFSIZE = PeerState.PARTSIZE; + private static final ByteCache _cache = ByteCache.getInstance(16, BUFSIZE); + /** Utility method for sending a message through a DataStream. */ void sendMessage(DataOutputStream dos) throws IOException { @@ -79,11 +87,15 @@ class Message return; } + ByteArray ba; // Get deferred data if (data == null && dataLoader != null) { - data = dataLoader.loadData(piece, begin, length); - if (data == null) + ba = dataLoader.loadData(piece, begin, length); + if (ba == null) return; // hmm will get retried, but shouldn't happen + data = ba.getData(); + } else { + ba = null; } // Calculate the total length in bytes @@ -139,6 +151,10 @@ class Message // Send actual data if (type == BITFIELD || type == PIECE || type == EXTENSION) dos.write(data, off, len); + + // Was pulled from cache in Storage.getPiece() via dataLoader + if (ba != null && ba.getData().length == BUFSIZE) + _cache.release(ba, false); } @Override diff --git a/apps/i2psnark/java/src/org/klomp/snark/PartialPiece.java b/apps/i2psnark/java/src/org/klomp/snark/PartialPiece.java index 3c746590f..bfeb49b7f 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PartialPiece.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PartialPiece.java @@ -9,6 +9,8 @@ import java.security.MessageDigest; import net.i2p.I2PAppContext; import net.i2p.crypto.SHA1; +import net.i2p.data.ByteArray; +import net.i2p.util.ByteCache; import net.i2p.util.Log; import net.i2p.util.SecureFile; @@ -42,6 +44,9 @@ class PartialPiece implements Comparable { private final int pclen; private final File tempDir; + private static final int BUFSIZE = PeerState.PARTSIZE; + private static final ByteCache _cache = ByteCache.getInstance(16, BUFSIZE); + // Any bigger than this, use temp file instead of heap private static final int MAX_IN_MEM = 128 * 1024; // May be reduced on OOM @@ -154,7 +159,16 @@ class PartialPiece implements Comparable { sha1.update(bs); } else { int read = 0; - byte[] buf = new byte[Math.min(pclen, 16384)]; + int buflen = Math.min(pclen, BUFSIZE); + ByteArray ba; + byte[] buf; + if (buflen == BUFSIZE) { + ba = _cache.acquire(); + buf = ba.getData(); + } else { + ba = null; + buf = new byte[buflen]; + } synchronized (this) { if (raf == null) throw new IOException(); @@ -167,6 +181,8 @@ class PartialPiece implements Comparable { sha1.update(buf, 0, rd); } } + if (ba != null) + _cache.release(ba, false); if (read < pclen) throw new IOException(); } @@ -182,7 +198,15 @@ class PartialPiece implements Comparable { din.readFully(bs, off, len); } else { // read in fully before synching on raf - byte[] tmp = new byte[len]; + ByteArray ba; + byte[] tmp; + if (len == BUFSIZE) { + ba = _cache.acquire(); + tmp = ba.getData(); + } else { + ba = null; + tmp = new byte[len]; + } din.readFully(tmp); synchronized (this) { if (raf == null) @@ -190,6 +214,8 @@ class PartialPiece implements Comparable { raf.seek(off); raf.write(tmp); } + if (ba != null) + _cache.release(ba, false); } } @@ -208,7 +234,16 @@ class PartialPiece implements Comparable { out.write(bs, offset, len); } else { int read = 0; - byte[] buf = new byte[Math.min(len, 16384)]; + int buflen = Math.min(len, BUFSIZE); + ByteArray ba; + byte[] buf; + if (buflen == BUFSIZE) { + ba = _cache.acquire(); + buf = ba.getData(); + } else { + ba = null; + buf = new byte[buflen]; + } synchronized (this) { if (raf == null) throw new IOException(); @@ -220,6 +255,8 @@ class PartialPiece implements Comparable { out.write(buf, 0, rd); } } + if (ba != null) + _cache.release(ba, false); } } diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionOut.java b/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionOut.java index 62e58ad8d..dc64f885f 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionOut.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionOut.java @@ -437,6 +437,7 @@ class PeerConnectionOut implements Runnable */ void sendPiece(int piece, int begin, int length, DataLoader loader) { + /**** boolean sendNow = false; // are there any cases where we should? @@ -447,6 +448,7 @@ class PeerConnectionOut implements Runnable sendPiece(piece, begin, length, bytes); return; } + ****/ // queue a fake message... set everything up, // except save the PeerState instead of the bytes. diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java b/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java index 5c4fdda39..536865b0b 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java @@ -35,6 +35,7 @@ import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; import net.i2p.I2PAppContext; +import net.i2p.data.ByteArray; import net.i2p.data.DataHelper; import net.i2p.data.Destination; import net.i2p.util.ConcurrentHashSet; @@ -874,7 +875,7 @@ class PeerCoordinator implements PeerListener * * @throws RuntimeException on IOE getting the data */ - public byte[] gotRequest(Peer peer, int piece, int off, int len) + public ByteArray gotRequest(Peer peer, int piece, int off, int len) { if (halted) return null; diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerListener.java b/apps/i2psnark/java/src/org/klomp/snark/PeerListener.java index 892ba4ff3..ee6801167 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerListener.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerListener.java @@ -22,6 +22,8 @@ package org.klomp.snark; import java.util.List; +import net.i2p.data.ByteArray; + /** * Listener for Peer events. */ @@ -114,7 +116,7 @@ interface PeerListener * @return a byte array containing the piece or null when the piece * is not available (which is a protocol error). */ - byte[] gotRequest(Peer peer, int piece, int off, int len); + ByteArray gotRequest(Peer peer, int piece, int off, int len); /** * Called when a (partial) piece has been downloaded from the peer. diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerState.java b/apps/i2psnark/java/src/org/klomp/snark/PeerState.java index 8433242ff..8f80d4496 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerState.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerState.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Set; import net.i2p.I2PAppContext; +import net.i2p.data.ByteArray; import net.i2p.util.Log; class PeerState implements DataLoader @@ -245,8 +246,8 @@ class PeerState implements DataLoader * @return bytes or null for errors * @since 0.8.2 */ - public byte[] loadData(int piece, int begin, int length) { - byte[] pieceBytes = listener.gotRequest(peer, piece, begin, length); + public ByteArray loadData(int piece, int begin, int length) { + ByteArray pieceBytes = listener.gotRequest(peer, piece, begin, length); if (pieceBytes == null) { // XXX - Protocol error-> diconnect? @@ -256,7 +257,7 @@ class PeerState implements DataLoader } // More sanity checks - if (length != pieceBytes.length) + if (length != pieceBytes.getData().length) { // XXX - Protocol error-> disconnect? if (_log.shouldLog(Log.WARN)) diff --git a/apps/i2psnark/java/src/org/klomp/snark/Storage.java b/apps/i2psnark/java/src/org/klomp/snark/Storage.java index b343f3735..5ae23203a 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/Storage.java +++ b/apps/i2psnark/java/src/org/klomp/snark/Storage.java @@ -34,6 +34,8 @@ import java.util.StringTokenizer; import java.util.concurrent.ConcurrentHashMap; import net.i2p.crypto.SHA1; +import net.i2p.data.ByteArray; +import net.i2p.util.ByteCache; import net.i2p.util.Log; import net.i2p.util.SecureFile; import net.i2p.util.SystemVersion; @@ -80,6 +82,9 @@ public class Storage private static final boolean _isWindows = SystemVersion.isWindows(); + private static final int BUFSIZE = PeerState.PARTSIZE; + private static final ByteCache _cache = ByteCache.getInstance(16, BUFSIZE); + /** * Creates a new storage based on the supplied MetaInfo. This will * try to create and/or check all needed files in the MetaInfo. @@ -899,22 +904,28 @@ public class Storage * Returns a byte array containing a portion of the requested piece or null if * the storage doesn't contain the piece yet. */ - public byte[] getPiece(int piece, int off, int len) throws IOException + public ByteArray getPiece(int piece, int off, int len) throws IOException { if (!bitfield.get(piece)) return null; //Catch a common place for OOMs esp. on 1MB pieces + ByteArray rv; byte[] bs; try { - bs = new byte[len]; + // Will be restored to cache in Message.sendMessage() + if (len == BUFSIZE) + rv = _cache.acquire(); + else + rv = new ByteArray(new byte[len]); } catch (OutOfMemoryError oom) { if (_log.shouldLog(Log.WARN)) _log.warn("Out of memory, can't honor request for piece " + piece, oom); return null; } + bs = rv.getData(); getUncheckedPiece(piece, bs, off, len); - return bs; + return rv; } /**