* ByteCache:

- Add a per-cache stat
      - Limit each cache based on max memory
      - Disable in UDP MessageReceiver
      - Add clearAll() method to be called when under
        severe memory pressure; call from Router
This commit is contained in:
zzz
2010-05-02 12:14:14 +00:00
parent 949a8901fb
commit b6cb90d731
4 changed files with 105 additions and 11 deletions

View File

@@ -1078,6 +1078,7 @@ public class DataHelper {
ReusableGZIPInputStream in = ReusableGZIPInputStream.acquire(); ReusableGZIPInputStream in = ReusableGZIPInputStream.acquire();
in.initialize(new ByteArrayInputStream(orig, offset, length)); in.initialize(new ByteArrayInputStream(orig, offset, length));
// don't make this a static field, or else I2PAppContext gets initialized too early
ByteCache cache = ByteCache.getInstance(8, MAX_UNCOMPRESSED); ByteCache cache = ByteCache.getInstance(8, MAX_UNCOMPRESSED);
ByteArray outBuf = cache.acquire(); ByteArray outBuf = cache.acquire();
int written = 0; int written = 0;

View File

@@ -13,27 +13,98 @@ import net.i2p.data.ByteArray;
* Cache the objects frequently used to reduce memory churn. The ByteArray * Cache the objects frequently used to reduce memory churn. The ByteArray
* should be held onto as long as the data referenced in it is needed. * should be held onto as long as the data referenced in it is needed.
* *
* Heap size control - survey of usage (April 2010) :
*
* </pre>
Size Max MaxMem From
16 16 256 CryptixAESEngine
16 32 512 BloomFilterIVValidator
16 64 1K UDP PacketBuilder
16 128 2K tunnel HopProcessor
16 128 2K tunnel TrivialPreprocessor
16 128 2K tunnel InboundEndpointProcessor
16 128 2K tunnel OutboundGatewayProcessor
32 64 2K UDP PacketBuilder
32 128 4K tunnel TrivialPreprocessor
1K 32 32K tunnel TrivialPreprocessor
1K 512 512K tunnel FragmentHandler
1K 512 512K I2NP TunnelDataMessage
1K 512 512K tunnel FragmentedMessage
1730 128 216K streaming MessageOutputStream
2K 64 128K UDP IMS
4K 32 128K I2PTunnelRunner
8K 8 64K I2PTunnel HTTPResponseOutputStream
32K 4 128K SAM StreamSession
32K 10 320K SAM v2StreamSession
32K 64 2M UDP OMS
32K 128 4M streaming MessageInputStream
36K 64 2.25M streaming PacketQueue
40K 8 320K DataHelper decompress
64K 64 4M UDP MessageReceiver - disabled in 0.7.14
* </pre>
*
*/ */
public final class ByteCache { public final class ByteCache {
private final static Map _caches = new HashMap(16);
private static final Map<Integer, ByteCache> _caches = new HashMap(16);
/**
* max size in bytes of each cache
* Set to max memory / 128, with a min of 128KB and a max of 4MB
*
* @since 0.7.14
*/
private static final int MAX_CACHE;
static {
long maxMemory = Runtime.getRuntime().maxMemory();
MAX_CACHE = (int) Math.min(4*1024*1024l, Math.max(128*1024l, maxMemory / 128));
}
/** /**
* Get a cache responsible for objects of the given size * Get a cache responsible for objects of the given size
* *
* @param cacheSize how large we want the cache to grow before using on * @param cacheSize how large we want the cache to grow before using on
* demand allocation * demand allocation
* Since 0.7.14, a limit of 1MB / size is enforced
* for the typical 128MB max memory JVM
* @param size how large should the objects cached be? * @param size how large should the objects cached be?
*/ */
public static ByteCache getInstance(int cacheSize, int size) { public static ByteCache getInstance(int cacheSize, int size) {
if (cacheSize * size > MAX_CACHE)
cacheSize = MAX_CACHE / size;
Integer sz = Integer.valueOf(size); Integer sz = Integer.valueOf(size);
ByteCache cache = null; ByteCache cache = null;
synchronized (_caches) { synchronized (_caches) {
if (!_caches.containsKey(sz)) if (!_caches.containsKey(sz))
_caches.put(sz, new ByteCache(cacheSize, size)); _caches.put(sz, new ByteCache(cacheSize, size));
cache = (ByteCache)_caches.get(sz); cache = _caches.get(sz);
} }
cache.resize(cacheSize); cache.resize(cacheSize);
//I2PAppContext.getGlobalContext().logManager().getLog(ByteCache.class).error("ByteCache size: " + size + " max: " + cacheSize, new Exception("from"));
return cache; return cache;
} }
/**
* Clear everything (memory pressure)
* @since 0.7.14
*/
public static void clearAll() {
for (ByteCache bc : _caches.values())
bc.clear();
I2PAppContext.getGlobalContext().logManager().getLog(ByteCache.class).error("WARNING: Low memory, clearing byte caches");
}
private Log _log; private Log _log;
/** list of available and available entries */ /** list of available and available entries */
private Queue<ByteArray> _available; private Queue<ByteArray> _available;
@@ -57,13 +128,14 @@ public final class ByteCache {
_lastOverflow = -1; _lastOverflow = -1;
SimpleScheduler.getInstance().addPeriodicEvent(new Cleanup(), CLEANUP_FREQUENCY); SimpleScheduler.getInstance().addPeriodicEvent(new Cleanup(), CLEANUP_FREQUENCY);
_log = I2PAppContext.getGlobalContext().logManager().getLog(ByteCache.class); _log = I2PAppContext.getGlobalContext().logManager().getLog(ByteCache.class);
I2PAppContext.getGlobalContext().statManager().createRateStat("byteCache.memory." + entrySize, "Memory usage (B)", "Router", new long[] { 60*1000 });
} }
private void resize(int maxCachedEntries) { private void resize(int maxCachedEntries) {
if (_maxCached >= maxCachedEntries) return; if (_maxCached >= maxCachedEntries) return;
_maxCached = maxCachedEntries; _maxCached = maxCachedEntries;
// make a bigger one, move the cached items over // make a bigger one, move the cached items over
Queue newLBQ = new LinkedBlockingQueue(maxCachedEntries); Queue<ByteArray> newLBQ = new LinkedBlockingQueue(maxCachedEntries);
ByteArray ba; ByteArray ba;
while ((ba = _available.poll()) != null) while ((ba = _available.poll()) != null)
newLBQ.offer(ba); newLBQ.offer(ba);
@@ -109,8 +181,17 @@ public final class ByteCache {
} }
} }
/**
* Clear everything (memory pressure)
* @since 0.7.14
*/
private void clear() {
_available.clear();
}
private class Cleanup implements SimpleTimer.TimedEvent { private class Cleanup implements SimpleTimer.TimedEvent {
public void timeReached() { public void timeReached() {
I2PAppContext.getGlobalContext().statManager().addRateData("byteCache.memory." + _entrySize, _entrySize * _available.size(), 0);
if (System.currentTimeMillis() - _lastOverflow > EXPIRE_PERIOD) { if (System.currentTimeMillis() - _lastOverflow > EXPIRE_PERIOD) {
// we haven't exceeded the cache size in a few minutes, so lets // we haven't exceeded the cache size in a few minutes, so lets
// shrink the cache // shrink the cache

View File

@@ -41,6 +41,7 @@ import net.i2p.router.transport.udp.UDPTransport;
import net.i2p.stat.Rate; import net.i2p.stat.Rate;
import net.i2p.stat.RateStat; import net.i2p.stat.RateStat;
import net.i2p.stat.StatManager; import net.i2p.stat.StatManager;
import net.i2p.util.ByteCache;
import net.i2p.util.FileUtil; import net.i2p.util.FileUtil;
import net.i2p.util.I2PAppThread; import net.i2p.util.I2PAppThread;
import net.i2p.util.I2PThread; import net.i2p.util.I2PThread;
@@ -224,6 +225,7 @@ public class Router {
_killVMOnEnd = true; _killVMOnEnd = true;
_oomListener = new I2PThread.OOMEventListener() { _oomListener = new I2PThread.OOMEventListener() {
public void outOfMemory(OutOfMemoryError oom) { public void outOfMemory(OutOfMemoryError oom) {
ByteCache.clearAll();
_log.log(Log.CRIT, "Thread ran out of memory", oom); _log.log(Log.CRIT, "Thread ran out of memory", oom);
for (int i = 0; i < 5; i++) { // try this 5 times, in case it OOMs for (int i = 0; i < 5; i++) { // try this 5 times, in case it OOMs
try { try {
@@ -252,6 +254,8 @@ public class Router {
* *
*/ */
public void setKillVMOnEnd(boolean shouldDie) { _killVMOnEnd = shouldDie; } public void setKillVMOnEnd(boolean shouldDie) { _killVMOnEnd = shouldDie; }
/** @deprecated unused */
public boolean getKillVMOnEnd() { return _killVMOnEnd; } public boolean getKillVMOnEnd() { return _killVMOnEnd; }
public String getConfigFilename() { return _configFilename; } public String getConfigFilename() { return _configFilename; }
@@ -923,7 +927,7 @@ public class Router {
private static final boolean ALLOW_DYNAMIC_KEYS = false; private static final boolean ALLOW_DYNAMIC_KEYS = false;
private void finalShutdown(int exitCode) { private void finalShutdown(int exitCode) {
_log.log(Log.CRIT, "Shutdown(" + exitCode + ") complete", new Exception("Shutdown")); _log.log(Log.CRIT, "Shutdown(" + exitCode + ") complete" /* , new Exception("Shutdown") */ );
try { _context.logManager().shutdown(); } catch (Throwable t) { } try { _context.logManager().shutdown(); } catch (Throwable t) { }
if (ALLOW_DYNAMIC_KEYS) { if (ALLOW_DYNAMIC_KEYS) {
if (Boolean.valueOf(_context.getProperty(PROP_DYNAMIC_KEYS)).booleanValue()) if (Boolean.valueOf(_context.getProperty(PROP_DYNAMIC_KEYS)).booleanValue())
@@ -1357,12 +1361,16 @@ public class Router {
/* following classes are now private static inner classes, didn't bother to reindent */ /* following classes are now private static inner classes, didn't bother to reindent */
private static final long LOW_MEMORY_THRESHOLD = 5 * 1024 * 1024;
/** /**
* coalesce the stats framework every minute * coalesce the stats framework every minute
* *
*/ */
private static class CoalesceStatsEvent implements SimpleTimer.TimedEvent { private static class CoalesceStatsEvent implements SimpleTimer.TimedEvent {
private RouterContext _ctx; private RouterContext _ctx;
private long _maxMemory;
public CoalesceStatsEvent(RouterContext ctx) { public CoalesceStatsEvent(RouterContext ctx) {
_ctx = ctx; _ctx = ctx;
ctx.statManager().createRateStat("bw.receiveBps", "How fast we receive data (in KBps)", "Bandwidth", new long[] { 60*1000, 5*60*1000, 60*60*1000 }); ctx.statManager().createRateStat("bw.receiveBps", "How fast we receive data (in KBps)", "Bandwidth", new long[] { 60*1000, 5*60*1000, 60*60*1000 });
@@ -1373,8 +1381,8 @@ private static class CoalesceStatsEvent implements SimpleTimer.TimedEvent {
ctx.statManager().createRateStat("router.activeSendPeers", "How many peers we've sent to this minute", "Throttle", new long[] { 60*1000, 5*60*1000, 60*60*1000 }); ctx.statManager().createRateStat("router.activeSendPeers", "How many peers we've sent to this minute", "Throttle", new long[] { 60*1000, 5*60*1000, 60*60*1000 });
ctx.statManager().createRateStat("router.highCapacityPeers", "How many high capacity peers we know", "Throttle", new long[] { 5*60*1000, 60*60*1000 }); ctx.statManager().createRateStat("router.highCapacityPeers", "How many high capacity peers we know", "Throttle", new long[] { 5*60*1000, 60*60*1000 });
ctx.statManager().createRateStat("router.fastPeers", "How many fast peers we know", "Throttle", new long[] { 5*60*1000, 60*60*1000 }); ctx.statManager().createRateStat("router.fastPeers", "How many fast peers we know", "Throttle", new long[] { 5*60*1000, 60*60*1000 });
long max = Runtime.getRuntime().maxMemory() / (1024*1024); _maxMemory = Runtime.getRuntime().maxMemory();
ctx.statManager().createRateStat("router.memoryUsed", "(Bytes) Max is " + max + "MB", "Router", new long[] { 60*1000 }); ctx.statManager().createRateStat("router.memoryUsed", "(Bytes) Max is " + (_maxMemory / (1024*1024)) + "MB", "Router", new long[] { 60*1000 });
} }
private RouterContext getContext() { return _ctx; } private RouterContext getContext() { return _ctx; }
public void timeReached() { public void timeReached() {
@@ -1395,6 +1403,8 @@ private static class CoalesceStatsEvent implements SimpleTimer.TimedEvent {
long used = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); long used = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
getContext().statManager().addRateData("router.memoryUsed", used, 0); getContext().statManager().addRateData("router.memoryUsed", used, 0);
if (_maxMemory - used < LOW_MEMORY_THRESHOLD)
ByteCache.clearAll();
getContext().tunnelDispatcher().updateParticipatingStats(COALESCE_TIME); getContext().tunnelDispatcher().updateParticipatingStats(COALESCE_TIME);

View File

@@ -10,7 +10,7 @@ import net.i2p.data.i2np.I2NPMessageException;
import net.i2p.data.i2np.I2NPMessageHandler; import net.i2p.data.i2np.I2NPMessageHandler;
import net.i2p.data.i2np.I2NPMessageImpl; import net.i2p.data.i2np.I2NPMessageImpl;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
import net.i2p.util.ByteCache; //import net.i2p.util.ByteCache;
import net.i2p.util.I2PThread; import net.i2p.util.I2PThread;
import net.i2p.util.Log; import net.i2p.util.Log;
@@ -26,7 +26,7 @@ public class MessageReceiver {
/** list of messages (InboundMessageState) fully received but not interpreted yet */ /** list of messages (InboundMessageState) fully received but not interpreted yet */
private final BlockingQueue<InboundMessageState> _completeMessages; private final BlockingQueue<InboundMessageState> _completeMessages;
private boolean _alive; private boolean _alive;
private ByteCache _cache; //private ByteCache _cache;
private static final int THREADS = 5; private static final int THREADS = 5;
private static final long POISON_IMS = -99999999999l; private static final long POISON_IMS = -99999999999l;
@@ -35,7 +35,8 @@ public class MessageReceiver {
_log = ctx.logManager().getLog(MessageReceiver.class); _log = ctx.logManager().getLog(MessageReceiver.class);
_transport = transport; _transport = transport;
_completeMessages = new LinkedBlockingQueue(); _completeMessages = new LinkedBlockingQueue();
_cache = ByteCache.getInstance(64, I2NPMessage.MAX_SIZE); // the runners run forever, no need to have a cache
//_cache = ByteCache.getInstance(64, I2NPMessage.MAX_SIZE);
_context.statManager().createRateStat("udp.inboundExpired", "How many messages were expired before reception?", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.inboundExpired", "How many messages were expired before reception?", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.inboundRemaining", "How many messages were remaining when a message is pulled off the complete queue?", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.inboundRemaining", "How many messages were remaining when a message is pulled off the complete queue?", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.inboundReady", "How many messages were ready when a message is added to the complete queue?", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.inboundReady", "How many messages were ready when a message is added to the complete queue?", "udp", UDPTransport.RATES);
@@ -91,7 +92,8 @@ public class MessageReceiver {
public void loop(I2NPMessageHandler handler) { public void loop(I2NPMessageHandler handler) {
InboundMessageState message = null; InboundMessageState message = null;
ByteArray buf = _cache.acquire(); //ByteArray buf = _cache.acquire();
ByteArray buf = new ByteArray(new byte[I2NPMessage.MAX_SIZE]);
while (_alive) { while (_alive) {
int expired = 0; int expired = 0;
long expiredLifetime = 0; long expiredLifetime = 0;
@@ -142,7 +144,7 @@ public class MessageReceiver {
} }
// no need to zero it out, as these buffers are only used with an explicit getCompleteSize // no need to zero it out, as these buffers are only used with an explicit getCompleteSize
_cache.release(buf, false); //_cache.release(buf, false);
} }
private I2NPMessage readMessage(ByteArray buf, InboundMessageState state, I2NPMessageHandler handler) { private I2NPMessage readMessage(ByteArray buf, InboundMessageState state, I2NPMessageHandler handler) {