From 5f7982540fdadd92481b6aef12a34dc9ae17394a Mon Sep 17 00:00:00 2001 From: jrandom Date: Sat, 13 Nov 2004 09:43:35 +0000 Subject: [PATCH] 2004-11-13 jrandom * Added throttles on how many I2PTunnel client connections we open at once * Replaced some buffered streams in I2PTunnel with unbuffered streams, as the streaming library used should take care of any buffering. * Added a cache for some objects used in I2PTunnel, especially useful when there are many short lived connections. * Trimmed the SimpleTimer's processing a bit --- .../net/i2p/i2ptunnel/I2PTunnelRunner.java | 10 +- core/java/src/net/i2p/util/ByteCache.java | 108 ++++++++++++++++++ core/java/src/net/i2p/util/SimpleTimer.java | 20 ++-- history.txt | 10 +- .../src/net/i2p/router/RouterVersion.java | 4 +- 5 files changed, 134 insertions(+), 18 deletions(-) create mode 100644 core/java/src/net/i2p/util/ByteCache.java diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java index c850549f7..7aedf19b0 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java @@ -13,6 +13,8 @@ import java.net.SocketException; import java.util.HashMap; import net.i2p.client.streaming.I2PSocket; +import net.i2p.data.ByteArray; +import net.i2p.util.ByteCache; import net.i2p.util.Clock; import net.i2p.util.I2PThread; import net.i2p.util.Log; @@ -94,7 +96,7 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL public void run() { try { InputStream in = s.getInputStream(); - OutputStream out = new BufferedOutputStream(s.getOutputStream(), NETWORK_BUFFER_SIZE); + OutputStream out = s.getOutputStream(); // = new BufferedOutputStream(s.getOutputStream(), NETWORK_BUFFER_SIZE); i2ps.setSocketErrorListener(this); InputStream i2pin = i2ps.getInputStream(); OutputStream i2pout = i2ps.getOutputStream(); //new BufferedOutputStream(i2ps.getOutputStream(), MAX_PACKET_SIZE); @@ -150,11 +152,13 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL InputStream in; OutputStream out; String direction; + private ByteCache _cache; private StreamForwarder(InputStream in, OutputStream out, String dir) { this.in = in; this.out = out; direction = dir; + _cache = ByteCache.getInstance(256, NETWORK_BUFFER_SIZE); setName("StreamForwarder " + _runnerId + "." + (++__forwarderId)); start(); } @@ -170,7 +174,8 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL + to); } - byte[] buffer = new byte[NETWORK_BUFFER_SIZE]; + ByteArray ba = _cache.acquire(); + byte[] buffer = ba.getData(); // new byte[NETWORK_BUFFER_SIZE]; try { int len; while ((len = in.read(buffer)) != -1) { @@ -227,6 +232,7 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL finishLock.notifyAll(); // the main thread will close sockets etc. now } + _cache.release(ba); } } } diff --git a/core/java/src/net/i2p/util/ByteCache.java b/core/java/src/net/i2p/util/ByteCache.java new file mode 100644 index 000000000..e961d2ecd --- /dev/null +++ b/core/java/src/net/i2p/util/ByteCache.java @@ -0,0 +1,108 @@ +package net.i2p.util; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import net.i2p.I2PAppContext; +import net.i2p.data.ByteArray; + +/** + * Cache the objects frequently used to reduce memory churn. The ByteArray + * should be held onto as long as the data referenced in it is needed. + * + */ +public final class ByteCache { + private static Map _caches = new HashMap(16); + /** + * Get a cache responsible for objects of the given size + * + * @param cacheSize how large we want the cache to grow before using on + * demand allocation + * @param size how large should the objects cached be? + */ + public static ByteCache getInstance(int cacheSize, int size) { + Integer sz = new Integer(size); + synchronized (_caches) { + if (!_caches.containsKey(sz)) + _caches.put(sz, new ByteCache(cacheSize, size)); + return (ByteCache)_caches.get(sz); + } + } + private Log _log; + /** list of available and available entries */ + private List _available; + private int _maxCached; + private int _entrySize; + private long _lastOverflow; + + /** do we actually want to cache? */ + private static final boolean _cache = true; + + /** how often do we cleanup the cache */ + private static final int CLEANUP_FREQUENCY = 30*1000; + /** if we haven't exceeded the cache size in 2 minutes, cut our cache in half */ + private static final long EXPIRE_PERIOD = 2*60*1000; + + private ByteCache(int maxCachedEntries, int entrySize) { + if (_cache) + _available = new ArrayList(maxCachedEntries); + _maxCached = maxCachedEntries; + _entrySize = entrySize; + _lastOverflow = -1; + SimpleTimer.getInstance().addEvent(new Cleanup(), CLEANUP_FREQUENCY); + _log = I2PAppContext.getGlobalContext().logManager().getLog(ByteCache.class); + } + + /** + * Get the next available structure, either from the cache or a brand new one + * + */ + public final ByteArray acquire() { + if (_cache) { + synchronized (_available) { + if (_available.size() > 0) + return (ByteArray)_available.remove(0); + } + } + _lastOverflow = System.currentTimeMillis(); + byte data[] = new byte[_entrySize]; + return new ByteArray(data); + } + + /** + * Put this structure back onto the available cache for reuse + * + */ + public final void release(ByteArray entry) { + if (_cache) { + if ( (entry == null) || (entry.getData() == null) ) + return; + + Arrays.fill(entry.getData(), (byte)0x0); + synchronized (_available) { + if (_available.size() < _maxCached) + _available.add(entry); + } + } + } + + private class Cleanup implements SimpleTimer.TimedEvent { + public void timeReached() { + if (System.currentTimeMillis() - _lastOverflow > EXPIRE_PERIOD) { + // we haven't exceeded the cache size in a few minutes, so lets + // shrink the cache + synchronized (_available) { + int toRemove = _available.size() / 2; + for (int i = 0; i < toRemove; i++) + _available.remove(0); + if ( (toRemove > 0) && (_log.shouldLog(Log.DEBUG)) ) + _log.debug("Removing " + toRemove + " cached entries of size " + _entrySize); + } + } + SimpleTimer.getInstance().addEvent(Cleanup.this, CLEANUP_FREQUENCY); + } + } +} diff --git a/core/java/src/net/i2p/util/SimpleTimer.java b/core/java/src/net/i2p/util/SimpleTimer.java index 722207776..21c522ad5 100644 --- a/core/java/src/net/i2p/util/SimpleTimer.java +++ b/core/java/src/net/i2p/util/SimpleTimer.java @@ -20,7 +20,7 @@ public class SimpleTimer { public static SimpleTimer getInstance() { return _instance; } private Log _log; /** event time (Long) to event (TimedEvent) mapping */ - private Map _events; + private TreeMap _events; /** event (TimedEvent) to event time (Long) mapping */ private Map _eventTimes; @@ -74,7 +74,6 @@ public class SimpleTimer { private class SimpleTimerRunner implements Runnable { public void run() { List eventsToFire = new ArrayList(1); - List timesToRemove = new ArrayList(1); while (true) { try { synchronized (_events) { @@ -82,23 +81,19 @@ public class SimpleTimer { _events.wait(); long now = System.currentTimeMillis(); long nextEventDelay = -1; - for (Iterator iter = _events.keySet().iterator(); iter.hasNext(); ) { - Long when = (Long)iter.next(); + while (true) { + if (_events.size() <= 0) break; + Long when = (Long)_events.firstKey(); if (when.longValue() <= now) { - TimedEvent evt = (TimedEvent)_events.get(when); + TimedEvent evt = (TimedEvent)_events.remove(when); + _eventTimes.remove(when); eventsToFire.add(evt); - timesToRemove.add(when); } else { nextEventDelay = when.longValue() - now; break; } } - if (timesToRemove.size() > 0) { - for (int i = 0; i < timesToRemove.size(); i++) - _events.remove(timesToRemove.get(i)); - for (int i = 0; i < eventsToFire.size(); i++) - _eventTimes.remove(eventsToFire.get(i)); - } else { + if (eventsToFire.size() <= 0) { if (nextEventDelay != -1) _events.wait(nextEventDelay); else @@ -125,7 +120,6 @@ public class SimpleTimer { } } eventsToFire.clear(); - timesToRemove.clear(); } } } diff --git a/history.txt b/history.txt index e62d01ccb..27c9e71f6 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,12 @@ -$Id: history.txt,v 1.68 2004/11/08 00:40:21 jrandom Exp $ +$Id: history.txt,v 1.69 2004/11/10 07:33:02 jrandom Exp $ + +2004-11-13 jrandom + * Added throttles on how many I2PTunnel client connections we open at once + * Replaced some buffered streams in I2PTunnel with unbuffered streams, as + the streaming library used should take care of any buffering. + * Added a cache for some objects used in I2PTunnel, especially useful when + there are many short lived connections. + * Trimmed the SimpleTimer's processing a bit 2004-11-10 jrandom * Allow loading the (mini)streaming connection options from the diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 7f079ec62..e09d0c244 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.73 $ $Date: 2004/11/08 00:40:20 $"; + public final static String ID = "$Revision: 1.74 $ $Date: 2004/11/10 07:33:01 $"; public final static String VERSION = "0.4.1.4"; - public final static long BUILD = 2; + public final static long BUILD = 3; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID);