propagate from branch 'i2p.i2p.zab.2263' (head 5d4d46ea16b13a188d27ff31c81a5362f20b6d68)

to branch 'i2p.i2p' (head 3ba7c5b2e24e950c83d6370df3c814fd025add81)
This commit is contained in:
zzz
2018-07-08 11:36:53 +00:00
3 changed files with 126 additions and 67 deletions

View File

@ -0,0 +1,91 @@
package net.i2p.util;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* An object cache which is safe to use by multiple threads without blocking.
*
* @author zab
*
* @param <T>
*/
public class TryCache<T> {
/**
* Something that creates objects of the type cached by this cache
*
* @param <T>
*/
public static interface ObjectFactory<T> {
T newInstance();
}
private final ObjectFactory<T> factory;
private final int capacity;
private final List<T> items;
private final Lock lock = new ReentrantLock();
/**
* @param factory to be used for creating new instances
* @param capacity cache up to this many items
*/
public TryCache(ObjectFactory<T> factory, int capacity) {
this.factory = factory;
this.capacity = capacity;
this.items = new ArrayList<>(capacity);
}
/**
* @return a cached or newly created item from this cache
*/
public T acquire() {
T rv = null;
if (lock.tryLock()) {
try {
if (!items.isEmpty()) {
rv = items.remove(items.size() - 1);
}
} finally {
lock.unlock();
}
}
if (rv == null) {
rv = factory.newInstance();
}
return rv;
}
/**
* Tries to return this item to the cache but it may fail if
* the cache has reached capacity or it's lock is held by
* another thread.
*/
public void release(T item) {
if (lock.tryLock()) {
try {
if (items.size() < capacity) {
items.add(item);
}
} finally {
lock.unlock();
}
}
}
/**
* Clears all cached items. This is the only method
* that blocks until it acquires the lock.
*/
public void clear() {
lock.lock();
try {
items.clear();
} finally {
lock.unlock();
}
}
}

View File

@ -28,6 +28,7 @@ import net.i2p.data.router.RouterIdentity;
import net.i2p.router.CommSystemFacade.Status;
import net.i2p.router.RouterContext;
import net.i2p.router.transport.FIFOBandwidthLimiter;
import net.i2p.util.TryCache;
import net.i2p.util.Addresses;
import net.i2p.util.ConcurrentHashSet;
import net.i2p.util.I2PThread;
@ -53,7 +54,7 @@ class EventPumper implements Runnable {
private final NTCPTransport _transport;
private final ObjectCounter<ByteArray> _blockedIPs;
private long _expireIdleWriteTime;
private boolean _useDirect;
private static boolean _useDirect;
/**
* This probably doesn't need to be bigger than the largest typical
@ -63,13 +64,16 @@ class EventPumper implements Runnable {
private static final int BUF_SIZE = 8*1024;
private static final int MAX_CACHE_SIZE = 64;
/**
* Read buffers. (write buffers use wrap())
* Shared if there are multiple routers in the JVM
* Note that if the routers have different PROP_DIRECT settings this will have a mix,
* so don't do that.
*/
private static final LinkedBlockingQueue<ByteBuffer> _bufCache = new LinkedBlockingQueue<ByteBuffer>(MAX_CACHE_SIZE);
private static class BufferFactory implements TryCache.ObjectFactory<ByteBuffer> {
public ByteBuffer newInstance() {
if (_useDirect)
return ByteBuffer.allocateDirect(BUF_SIZE);
else
return ByteBuffer.allocate(BUF_SIZE);
}
}
private static final TryCache<ByteBuffer> _bufferCache = new TryCache<>(new BufferFactory(), MAX_CACHE_SIZE);
/**
* every few seconds, iterate across all ntcp connections just to make sure
@ -319,13 +323,7 @@ class EventPumper implements Runnable {
}
// Clear the cache if the user changes the setting,
// so we can test the effect.
boolean newUseDirect = _context.getBooleanProperty(PROP_DIRECT);
if (_useDirect != newUseDirect) {
_useDirect = newUseDirect;
_bufCache.clear();
}
_useDirect = _context.getBooleanProperty(PROP_DIRECT);
} catch (RuntimeException re) {
_log.error("Error in the event pumper", re);
}
@ -363,7 +361,6 @@ class EventPumper implements Runnable {
_wantsRead.clear();
_wantsRegister.clear();
_wantsWrite.clear();
_bufCache.clear();
}
/**
@ -461,27 +458,11 @@ class EventPumper implements Runnable {
_selector.wakeup();
}
/**
* How many to keep in reserve.
* Shared if there are multiple routers in the JVM
*/
private static int _numBufs = MIN_BUFS;
private static int __consecutiveExtra;
/**
* High-frequency path in thread.
*/
private ByteBuffer acquireBuf() {
ByteBuffer rv = _bufCache.poll();
// discard buffer if _useDirect setting changes
if (rv == null || rv.isDirect() != _useDirect) {
if (_useDirect)
rv = ByteBuffer.allocateDirect(BUF_SIZE);
else
rv = ByteBuffer.allocate(BUF_SIZE);
_numBufs++;
}
return rv;
return _bufferCache.acquire();
}
/**
@ -496,21 +477,7 @@ class EventPumper implements Runnable {
return;
}
buf.clear();
int extra = _bufCache.size();
boolean cached = extra < _numBufs;
// TODO always offer if direct?
if (cached) {
_bufCache.offer(buf);
if (extra > MIN_BUFS) {
__consecutiveExtra++;
if (__consecutiveExtra >= 20) {
if (_numBufs > MIN_BUFS)
_numBufs--;
__consecutiveExtra = 0;
}
}
}
_bufferCache.release(buf);
}
private void processAccept(SelectionKey key) {

View File

@ -12,6 +12,7 @@ import net.i2p.data.SessionKey;
import net.i2p.router.RouterContext;
import net.i2p.router.transport.FIFOBandwidthLimiter;
import net.i2p.router.util.CDQEntry;
import net.i2p.util.TryCache;
import net.i2p.util.Addresses;
import net.i2p.util.Log;
import net.i2p.util.SystemVersion;
@ -45,18 +46,25 @@ class UDPPacket implements CDQEntry {
// private boolean _isInbound;
private FIFOBandwidthLimiter.Request _bandwidthRequest;
private static class PacketFactory implements TryCache.ObjectFactory<UDPPacket> {
static RouterContext context;
public UDPPacket newInstance() {
return new UDPPacket(context);
}
}
// Warning - this mixes contexts in a multi-router JVM
private static final Queue<UDPPacket> _packetCache;
private static final TryCache<UDPPacket> _packetCache;
private static final TryCache.ObjectFactory<UDPPacket> _packetFactory;
private static final boolean CACHE = true;
private static final int MIN_CACHE_SIZE = 64;
private static final int MAX_CACHE_SIZE = 256;
static {
if (CACHE) {
long maxMemory = SystemVersion.getMaxMemory();
int csize = (int) Math.max(MIN_CACHE_SIZE, Math.min(MAX_CACHE_SIZE, maxMemory / (1024*1024)));
_packetCache = new LinkedBlockingQueue<UDPPacket>(csize);
_packetFactory = new PacketFactory();
_packetCache = new TryCache<>(_packetFactory, MAX_CACHE_SIZE);
} else {
_packetCache = null;
_packetFactory = null;
}
}
@ -398,19 +406,10 @@ class UDPPacket implements CDQEntry {
public static UDPPacket acquire(RouterContext ctx, boolean inbound) {
UDPPacket rv = null;
if (CACHE) {
rv = _packetCache.poll();
if (rv != null) {
synchronized(rv) {
if (!rv._released) {
Log log = rv._context.logManager().getLog(UDPPacket.class);
log.error("Unreleased cached packet", new Exception());
rv = null;
} else {
PacketFactory.context = ctx;
rv = _packetCache.acquire();
rv.init(ctx);
}
}
}
}
if (rv == null)
rv = new UDPPacket(ctx);
return rv;
@ -440,7 +439,7 @@ class UDPPacket implements CDQEntry {
}
if (!CACHE)
return;
_packetCache.offer(this);
_packetCache.release(this);
}
/**
@ -448,9 +447,11 @@ class UDPPacket implements CDQEntry {
* @since 0.9.2
*/
public static void clearCache() {
if (CACHE)
if (CACHE) {
PacketFactory.context = null;
_packetCache.clear();
}
}
private synchronized void verifyNotReleased() {
if (!CACHE) return;