forked from I2P_Developers/i2p.i2p
* BloomSHA1, DecayingBloomFilter:
- Refactor for concurrent, at some small risk of false negatives - Optimizations to cache objects and reuse offsets
This commit is contained in:
@@ -1,6 +1,8 @@
|
||||
package net.i2p.util;
|
||||
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import net.i2p.I2PAppContext;
|
||||
import net.i2p.data.DataHelper;
|
||||
@@ -9,15 +11,17 @@ import org.xlattice.crypto.filters.BloomSHA1;
|
||||
|
||||
/**
|
||||
* Series of bloom filters which decay over time, allowing their continual use
|
||||
* for time sensitive data. This has a fixed size (currently 1MB per decay
|
||||
* for time sensitive data. This has a fixed size (per
|
||||
* period, using two periods overall), allowing this to pump through hundreds of
|
||||
* entries per second with virtually no false positive rate. Down the line,
|
||||
* this may be refactored to allow tighter control of the size necessary for the
|
||||
* contained bloom filters, but a fixed 2MB overhead isn't that bad.
|
||||
* contained bloom filters.
|
||||
*
|
||||
* NOTE: At 1MBps, the tunnel IVV will see an unacceptable false positive rate
|
||||
* of almost 0.1% with the current m and k values; however using DHS instead will use 30MB.
|
||||
* Further analysis and tweaking for the tunnel IVV may be required.
|
||||
* See main() for an analysis of false positive rate.
|
||||
* See BloomFilterIVValidator for instantiation parameters.
|
||||
* See DecayingHashSet for a smaller and simpler version.
|
||||
* @see net.i2p.router.tunnel.BloomFilterIVValidator
|
||||
* @see net.i2p.util.DecayingHashSet
|
||||
*/
|
||||
public class DecayingBloomFilter {
|
||||
protected final I2PAppContext _context;
|
||||
@@ -26,18 +30,21 @@ public class DecayingBloomFilter {
|
||||
private BloomSHA1 _previous;
|
||||
protected final int _durationMs;
|
||||
protected final int _entryBytes;
|
||||
private byte _extenders[][];
|
||||
private byte _extended[];
|
||||
private byte _longToEntry[];
|
||||
private long _longToEntryMask;
|
||||
private final byte _extenders[][];
|
||||
private final byte _extended[];
|
||||
private final byte _longToEntry[];
|
||||
private final long _longToEntryMask;
|
||||
protected long _currentDuplicates;
|
||||
protected volatile boolean _keepDecaying;
|
||||
protected SimpleTimer.TimedEvent _decayEvent;
|
||||
protected final SimpleTimer.TimedEvent _decayEvent;
|
||||
/** just for logging */
|
||||
protected final String _name;
|
||||
/** synchronize against this lock when switching double buffers */
|
||||
protected final ReentrantReadWriteLock _reorganizeLock = new ReentrantReadWriteLock();
|
||||
|
||||
private static final int DEFAULT_M = 23;
|
||||
private static final int DEFAULT_K = 11;
|
||||
/** true for debugging */
|
||||
private static final boolean ALWAYS_MISS = false;
|
||||
|
||||
/** only for extension by DHS */
|
||||
@@ -47,6 +54,15 @@ public class DecayingBloomFilter {
|
||||
_entryBytes = entryBytes;
|
||||
_name = name;
|
||||
_durationMs = durationMs;
|
||||
// all final
|
||||
_extenders = null;
|
||||
_extended = null;
|
||||
_longToEntry = null;
|
||||
_longToEntryMask = 0;
|
||||
context.addShutdownTask(new Shutdown());
|
||||
_decayEvent = new DecayEvent();
|
||||
_keepDecaying = true;
|
||||
SimpleTimer.getInstance().addEvent(_decayEvent, _durationMs);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -92,6 +108,11 @@ public class DecayingBloomFilter {
|
||||
_extended = new byte[32];
|
||||
_longToEntry = new byte[_entryBytes];
|
||||
_longToEntryMask = (1l << (_entryBytes * 8l)) -1;
|
||||
} else {
|
||||
// final
|
||||
_extended = null;
|
||||
_longToEntry = null;
|
||||
_longToEntryMask = 0;
|
||||
}
|
||||
_decayEvent = new DecayEvent();
|
||||
_keepDecaying = true;
|
||||
@@ -101,12 +122,12 @@ public class DecayingBloomFilter {
|
||||
" numExtenders = " + numExtenders + " cycle (s) = " + (durationMs / 1000));
|
||||
// try to get a handle on memory usage vs. false positives
|
||||
context.statManager().createRateStat("router.decayingBloomFilter." + name + ".size",
|
||||
"Size", "Router", new long[] { Math.max(60*1000, durationMs) });
|
||||
"Size", "Router", new long[] { 10 * Math.max(60*1000, durationMs) });
|
||||
context.statManager().createRateStat("router.decayingBloomFilter." + name + ".dups",
|
||||
"1000000 * Duplicates/Size", "Router", new long[] { Math.max(60*1000, durationMs) });
|
||||
"1000000 * Duplicates/Size", "Router", new long[] { 10 * Math.max(60*1000, durationMs) });
|
||||
context.statManager().createRateStat("router.decayingBloomFilter." + name + ".log10(falsePos)",
|
||||
"log10 of the false positive rate (must have net.i2p.util.DecayingBloomFilter=DEBUG)",
|
||||
"Router", new long[] { Math.max(60*1000, durationMs) });
|
||||
"Router", new long[] { 10 * Math.max(60*1000, durationMs) });
|
||||
context.addShutdownTask(new Shutdown());
|
||||
}
|
||||
|
||||
@@ -121,16 +142,14 @@ public class DecayingBloomFilter {
|
||||
|
||||
public long getCurrentDuplicateCount() { return _currentDuplicates; }
|
||||
|
||||
/** unsynchronized but only used for logging elsewhere */
|
||||
public int getInsertedCount() {
|
||||
synchronized (this) {
|
||||
return _current.size() + _previous.size();
|
||||
}
|
||||
}
|
||||
|
||||
/** unshyncronized, only used for logging elsewhere */
|
||||
public double getFalsePositiveRate() {
|
||||
synchronized (this) {
|
||||
return _current.falsePositives();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -150,9 +169,10 @@ public class DecayingBloomFilter {
|
||||
if (len != _entryBytes)
|
||||
throw new IllegalArgumentException("Bad entry [" + len + ", expected "
|
||||
+ _entryBytes + "]");
|
||||
synchronized (this) {
|
||||
getReadLock();
|
||||
try {
|
||||
return locked_add(entry, off, len, true);
|
||||
}
|
||||
} finally { releaseReadLock(); }
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -172,9 +192,10 @@ public class DecayingBloomFilter {
|
||||
} else {
|
||||
DataHelper.toLong(_longToEntry, 0, _entryBytes, entry);
|
||||
}
|
||||
synchronized (this) {
|
||||
getReadLock();
|
||||
try {
|
||||
return locked_add(_longToEntry, 0, _longToEntry.length, true);
|
||||
}
|
||||
} finally { releaseReadLock(); }
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -192,9 +213,10 @@ public class DecayingBloomFilter {
|
||||
} else {
|
||||
DataHelper.toLong(_longToEntry, 0, _entryBytes, entry);
|
||||
}
|
||||
synchronized (this) {
|
||||
getReadLock();
|
||||
try {
|
||||
return locked_add(_longToEntry, 0, _longToEntry.length, false);
|
||||
}
|
||||
} finally { releaseReadLock(); }
|
||||
}
|
||||
|
||||
private boolean locked_add(byte entry[], int offset, int len, boolean addIfNew) {
|
||||
@@ -204,38 +226,48 @@ public class DecayingBloomFilter {
|
||||
for (int i = 0; i < _extenders.length; i++)
|
||||
DataHelper.xor(entry, offset, _extenders[i], 0, _extended, _entryBytes * (i+1), _entryBytes);
|
||||
|
||||
boolean seen = _current.locked_member(_extended);
|
||||
seen = seen || _previous.locked_member(_extended);
|
||||
BloomSHA1.FilterKey key = _current.getFilterKey(_extended, 0, 32);
|
||||
boolean seen = _current.locked_member(key);
|
||||
if (!seen)
|
||||
seen = _previous.locked_member(key);
|
||||
if (seen) {
|
||||
_currentDuplicates++;
|
||||
_current.release(key);
|
||||
return true;
|
||||
} else {
|
||||
if (addIfNew) {
|
||||
_current.locked_insert(_extended);
|
||||
_current.locked_insert(key);
|
||||
}
|
||||
_current.release(key);
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
boolean seen = _current.locked_member(entry, offset, len);
|
||||
seen = seen || _previous.locked_member(entry, offset, len);
|
||||
BloomSHA1.FilterKey key = _current.getFilterKey(entry, offset, len);
|
||||
boolean seen = _current.locked_member(key);
|
||||
if (!seen)
|
||||
seen = _previous.locked_member(key);
|
||||
if (seen) {
|
||||
_currentDuplicates++;
|
||||
_current.release(key);
|
||||
return true;
|
||||
} else {
|
||||
if (addIfNew) {
|
||||
_current.locked_insert(entry, offset, len);
|
||||
_current.locked_insert(key);
|
||||
}
|
||||
_current.release(key);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void clear() {
|
||||
synchronized (this) {
|
||||
if (!getWriteLock())
|
||||
return;
|
||||
try {
|
||||
_current.clear();
|
||||
_previous.clear();
|
||||
_currentDuplicates = 0;
|
||||
}
|
||||
} finally { releaseWriteLock(); }
|
||||
}
|
||||
|
||||
public void stopDecaying() {
|
||||
@@ -243,11 +275,13 @@ public class DecayingBloomFilter {
|
||||
SimpleTimer.getInstance().removeEvent(_decayEvent);
|
||||
}
|
||||
|
||||
private void decay() {
|
||||
protected void decay() {
|
||||
int currentCount = 0;
|
||||
long dups = 0;
|
||||
double fpr = 0d;
|
||||
synchronized (this) {
|
||||
if (!getWriteLock())
|
||||
return;
|
||||
try {
|
||||
BloomSHA1 tmp = _previous;
|
||||
currentCount = _current.size();
|
||||
if (_log.shouldLog(Log.DEBUG) && currentCount > 0)
|
||||
@@ -257,20 +291,20 @@ public class DecayingBloomFilter {
|
||||
_current.clear();
|
||||
dups = _currentDuplicates;
|
||||
_currentDuplicates = 0;
|
||||
}
|
||||
} finally { releaseWriteLock(); }
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Decaying the filter " + _name + " after inserting " + currentCount
|
||||
+ " elements and " + dups + " false positives with FPR = " + fpr);
|
||||
_context.statManager().addRateData("router.decayingBloomFilter." + _name + ".size",
|
||||
currentCount, 0);
|
||||
currentCount);
|
||||
if (currentCount > 0)
|
||||
_context.statManager().addRateData("router.decayingBloomFilter." + _name + ".dups",
|
||||
1000l*1000*dups/currentCount, 0);
|
||||
1000l*1000*dups/currentCount);
|
||||
if (fpr > 0d) {
|
||||
// only if log.shouldLog(Log.DEBUG) ...
|
||||
long exponent = (long) Math.log10(fpr);
|
||||
_context.statManager().addRateData("router.decayingBloomFilter." + _name + ".log10(falsePos)",
|
||||
exponent, 0);
|
||||
exponent);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -283,12 +317,42 @@ public class DecayingBloomFilter {
|
||||
}
|
||||
}
|
||||
|
||||
/** @since 0.8.11 moved from DecayingHashSet */
|
||||
protected void getReadLock() {
|
||||
_reorganizeLock.readLock().lock();
|
||||
}
|
||||
|
||||
/** @since 0.8.11 moved from DecayingHashSet */
|
||||
protected void releaseReadLock() {
|
||||
_reorganizeLock.readLock().unlock();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if the lock was acquired
|
||||
* @since 0.8.11 moved from DecayingHashSet
|
||||
*/
|
||||
protected boolean getWriteLock() {
|
||||
try {
|
||||
boolean rv = _reorganizeLock.writeLock().tryLock(5000, TimeUnit.MILLISECONDS);
|
||||
if (!rv)
|
||||
_log.error("no lock, size is: " + _reorganizeLock.getQueueLength(), new Exception("rats"));
|
||||
return rv;
|
||||
} catch (InterruptedException ie) {}
|
||||
return false;
|
||||
}
|
||||
|
||||
/** @since 0.8.11 moved from DecayingHashSet */
|
||||
protected void releaseWriteLock() {
|
||||
_reorganizeLock.writeLock().unlock();
|
||||
}
|
||||
|
||||
/**
|
||||
* This filter is used only for participants and OBEPs, not
|
||||
* IBGWs, so depending on your assumptions of avg. tunnel length,
|
||||
* the performance is somewhat better than the gross share BW
|
||||
* would indicate.
|
||||
*
|
||||
*<pre>
|
||||
* Following stats for m=23, k=11:
|
||||
* Theoretical false positive rate for 16 KBps: 1.17E-21
|
||||
* Theoretical false positive rate for 24 KBps: 9.81E-20
|
||||
@@ -302,18 +366,37 @@ public class DecayingBloomFilter {
|
||||
* 1280 4.5E-5; 1792 5.6E-4; 2048 0.14%
|
||||
*
|
||||
* Following stats for m=25, k=10:
|
||||
* 1792 2.4E-6; 4096 0.14%
|
||||
* 1792 2.4E-6; 4096 0.14%; 5120 0.6%; 6144 1.7%; 8192 6.8%; 10240 15%
|
||||
*</pre>
|
||||
*/
|
||||
public static void main(String args[]) {
|
||||
System.out.println("Usage: DecayingBloomFilter [kbps [m [iterations]]] (default 256 23 10)");
|
||||
int kbps = 256;
|
||||
if (args.length >= 1) {
|
||||
try {
|
||||
kbps = Integer.parseInt(args[0]);
|
||||
} catch (NumberFormatException nfe) {}
|
||||
}
|
||||
int m = DEFAULT_M;
|
||||
if (args.length >= 2) {
|
||||
try {
|
||||
m = Integer.parseInt(args[1]);
|
||||
} catch (NumberFormatException nfe) {}
|
||||
}
|
||||
int iterations = 10;
|
||||
testByLong(kbps, iterations);
|
||||
testByBytes(kbps, iterations);
|
||||
if (args.length >= 3) {
|
||||
try {
|
||||
iterations = Integer.parseInt(args[2]);
|
||||
} catch (NumberFormatException nfe) {}
|
||||
}
|
||||
testByLong(kbps, m, iterations);
|
||||
testByBytes(kbps, m, iterations);
|
||||
}
|
||||
private static void testByLong(int kbps, int numRuns) {
|
||||
|
||||
private static void testByLong(int kbps, int m, int numRuns) {
|
||||
int messages = 60 * 10 * kbps;
|
||||
Random r = new Random();
|
||||
DecayingBloomFilter filter = new DecayingBloomFilter(I2PAppContext.getGlobalContext(), 600*1000, 8);
|
||||
DecayingBloomFilter filter = new DecayingBloomFilter(I2PAppContext.getGlobalContext(), 600*1000, 8, "test", m);
|
||||
int falsePositives = 0;
|
||||
long totalTime = 0;
|
||||
double fpr = 0d;
|
||||
@@ -322,7 +405,7 @@ public class DecayingBloomFilter {
|
||||
for (int i = 0; i < messages; i++) {
|
||||
if (filter.add(r.nextLong())) {
|
||||
falsePositives++;
|
||||
System.out.println("False positive " + falsePositives + " (testByLong j=" + j + " i=" + i + ")");
|
||||
//System.out.println("False positive " + falsePositives + " (testByLong j=" + j + " i=" + i + ")");
|
||||
}
|
||||
}
|
||||
totalTime += System.currentTimeMillis() - start;
|
||||
@@ -336,13 +419,14 @@ public class DecayingBloomFilter {
|
||||
+ falsePositives + " false positives");
|
||||
|
||||
}
|
||||
private static void testByBytes(int kbps, int numRuns) {
|
||||
|
||||
private static void testByBytes(int kbps, int m, int numRuns) {
|
||||
byte iv[][] = new byte[60*10*kbps][16];
|
||||
Random r = new Random();
|
||||
for (int i = 0; i < iv.length; i++)
|
||||
r.nextBytes(iv[i]);
|
||||
|
||||
DecayingBloomFilter filter = new DecayingBloomFilter(I2PAppContext.getGlobalContext(), 600*1000, 16);
|
||||
DecayingBloomFilter filter = new DecayingBloomFilter(I2PAppContext.getGlobalContext(), 600*1000, 16, "test", m);
|
||||
int falsePositives = 0;
|
||||
long totalTime = 0;
|
||||
double fpr = 0d;
|
||||
@@ -351,7 +435,7 @@ public class DecayingBloomFilter {
|
||||
for (int i = 0; i < iv.length; i++) {
|
||||
if (filter.add(iv[i])) {
|
||||
falsePositives++;
|
||||
System.out.println("False positive " + falsePositives + " (testByBytes j=" + j + " i=" + i + ")");
|
||||
//System.out.println("False positive " + falsePositives + " (testByBytes j=" + j + " i=" + i + ")");
|
||||
}
|
||||
}
|
||||
totalTime += System.currentTimeMillis() - start;
|
||||
|
@@ -1,8 +1,6 @@
|
||||
package net.i2p.util;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.Random;
|
||||
|
||||
import net.i2p.I2PAppContext;
|
||||
@@ -62,8 +60,6 @@ import net.i2p.data.DataHelper;
|
||||
public class DecayingHashSet extends DecayingBloomFilter {
|
||||
private ConcurrentHashSet<ArrayWrapper> _current;
|
||||
private ConcurrentHashSet<ArrayWrapper> _previous;
|
||||
/** synchronize against this lock when switching double buffers */
|
||||
private final ReentrantReadWriteLock _reorganizeLock = new ReentrantReadWriteLock(true);
|
||||
|
||||
/**
|
||||
* Create a double-buffered hash set that will decay its entries over time.
|
||||
@@ -82,35 +78,16 @@ public class DecayingHashSet extends DecayingBloomFilter {
|
||||
throw new IllegalArgumentException("Bad size");
|
||||
_current = new ConcurrentHashSet(128);
|
||||
_previous = new ConcurrentHashSet(128);
|
||||
_decayEvent = new DecayEvent();
|
||||
_keepDecaying = true;
|
||||
SimpleScheduler.getInstance().addEvent(_decayEvent, _durationMs);
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("New DHS " + name + " entryBytes = " + entryBytes +
|
||||
" cycle (s) = " + (durationMs / 1000));
|
||||
// try to get a handle on memory usage vs. false positives
|
||||
context.statManager().createRateStat("router.decayingHashSet." + name + ".size",
|
||||
"Size", "Router", new long[] { Math.max(60*1000, durationMs) });
|
||||
"Size", "Router", new long[] { 10 * Math.max(60*1000, durationMs) });
|
||||
context.statManager().createRateStat("router.decayingHashSet." + name + ".dups",
|
||||
"1000000 * Duplicates/Size", "Router", new long[] { Math.max(60*1000, durationMs) });
|
||||
context.addShutdownTask(new Shutdown());
|
||||
"1000000 * Duplicates/Size", "Router", new long[] { 10 * Math.max(60*1000, durationMs) });
|
||||
}
|
||||
|
||||
/**
|
||||
* @since 0.8.8
|
||||
*/
|
||||
private class Shutdown implements Runnable {
|
||||
public void run() {
|
||||
clear();
|
||||
}
|
||||
}
|
||||
|
||||
/** unsynchronized but only used for logging elsewhere */
|
||||
@Override
|
||||
public int getInsertedCount() {
|
||||
return _current.size() + _previous.size();
|
||||
}
|
||||
|
||||
/** pointless, only used for logging elsewhere */
|
||||
@Override
|
||||
public double getFalsePositiveRate() {
|
||||
@@ -166,19 +143,19 @@ public class DecayingHashSet extends DecayingBloomFilter {
|
||||
}
|
||||
|
||||
/**
|
||||
* @param addIfNew if true, add the element to current if it is not already there;
|
||||
* @param addIfNew if true, add the element to current if it is not already there or in previous;
|
||||
* if false, only check
|
||||
* @return if the element is in either the current or previous set
|
||||
*/
|
||||
private boolean locked_add(ArrayWrapper w, boolean addIfNew) {
|
||||
boolean seen;
|
||||
// only access _current once. This adds to _current even if seen in _previous.
|
||||
if (addIfNew)
|
||||
seen = !_current.add(w);
|
||||
else
|
||||
seen = _current.contains(w);
|
||||
if (!seen)
|
||||
seen = _previous.contains(w);
|
||||
boolean seen = _previous.contains(w);
|
||||
// only access _current once.
|
||||
if (!seen) {
|
||||
if (addIfNew)
|
||||
seen = !_current.add(w);
|
||||
else
|
||||
seen = _current.contains(w);
|
||||
}
|
||||
if (seen) {
|
||||
// why increment if addIfNew == false? Only used for stats...
|
||||
_currentDuplicates++;
|
||||
@@ -200,7 +177,8 @@ public class DecayingHashSet extends DecayingBloomFilter {
|
||||
clear();
|
||||
}
|
||||
|
||||
private void decay() {
|
||||
@Override
|
||||
protected void decay() {
|
||||
int currentCount = 0;
|
||||
long dups = 0;
|
||||
if (!getWriteLock())
|
||||
@@ -219,45 +197,12 @@ public class DecayingHashSet extends DecayingBloomFilter {
|
||||
_log.debug("Decaying the filter " + _name + " after inserting " + currentCount
|
||||
+ " elements and " + dups + " false positives");
|
||||
_context.statManager().addRateData("router.decayingHashSet." + _name + ".size",
|
||||
currentCount, 0);
|
||||
currentCount);
|
||||
if (currentCount > 0)
|
||||
_context.statManager().addRateData("router.decayingHashSet." + _name + ".dups",
|
||||
1000l*1000*dups/currentCount, 0);
|
||||
1000l*1000*dups/currentCount);
|
||||
}
|
||||
|
||||
/** if decay() ever blows up, we won't reschedule, and will grow unbounded, but it seems unlikely */
|
||||
private class DecayEvent implements SimpleTimer.TimedEvent {
|
||||
public void timeReached() {
|
||||
if (_keepDecaying) {
|
||||
decay();
|
||||
SimpleScheduler.getInstance().addEvent(DecayEvent.this, _durationMs);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void getReadLock() {
|
||||
_reorganizeLock.readLock().lock();
|
||||
}
|
||||
|
||||
private void releaseReadLock() {
|
||||
_reorganizeLock.readLock().unlock();
|
||||
}
|
||||
|
||||
/** @return true if the lock was acquired */
|
||||
private boolean getWriteLock() {
|
||||
try {
|
||||
boolean rv = _reorganizeLock.writeLock().tryLock(5000, TimeUnit.MILLISECONDS);
|
||||
if (!rv)
|
||||
_log.error("no lock, size is: " + _reorganizeLock.getQueueLength(), new Exception("rats"));
|
||||
return rv;
|
||||
} catch (InterruptedException ie) {}
|
||||
return false;
|
||||
}
|
||||
|
||||
private void releaseWriteLock() {
|
||||
_reorganizeLock.writeLock().unlock();
|
||||
}
|
||||
|
||||
/**
|
||||
* This saves the data as-is if the length is <= 8 bytes,
|
||||
* otherwise it stores an 8-byte hash.
|
||||
|
@@ -1,6 +1,9 @@
|
||||
/* BloomSHA1.java */
|
||||
package org.xlattice.crypto.filters;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
/**
|
||||
* A Bloom filter for sets of SHA1 digests. A Bloom filter uses a set
|
||||
* of k hash functions to determine set membership. Each hash function
|
||||
@@ -31,6 +34,13 @@ package org.xlattice.crypto.filters;
|
||||
*
|
||||
* minor tweaks by jrandom, exposing unsynchronized access and
|
||||
* allowing larger M and K. changes released into the public domain.
|
||||
*
|
||||
* Note that this is used only by DecayingBloomFilter, which uses only
|
||||
* the unsynchronized locked_foo() methods.
|
||||
*
|
||||
* As of 0.8.11, the locked_foo() methods are thread-safe, in that they work,
|
||||
* but there is a minor risk of false-negatives if two threads are
|
||||
* accessing the same bloom filter integer.
|
||||
*/
|
||||
|
||||
public class BloomSHA1 {
|
||||
@@ -39,14 +49,14 @@ public class BloomSHA1 {
|
||||
protected int count;
|
||||
|
||||
protected final int[] filter;
|
||||
protected KeySelector ks;
|
||||
protected final int[] wordOffset;
|
||||
protected final int[] bitOffset;
|
||||
protected final KeySelector ks;
|
||||
|
||||
// convenience variables
|
||||
protected final int filterBits;
|
||||
protected final int filterWords;
|
||||
|
||||
private final BlockingQueue<int[]> buf;
|
||||
|
||||
/* (24,11) too big - see KeySelector
|
||||
|
||||
public static void main(String args[]) {
|
||||
@@ -80,15 +90,11 @@ public class BloomSHA1 {
|
||||
//}
|
||||
this.m = m;
|
||||
this.k = k;
|
||||
count = 0;
|
||||
filterBits = 1 << m;
|
||||
filterWords = (filterBits + 31)/32; // round up
|
||||
filter = new int[filterWords];
|
||||
doClear();
|
||||
// offsets into the filter
|
||||
wordOffset = new int[k];
|
||||
bitOffset = new int[k];
|
||||
ks = new KeySelector(m, k, bitOffset, wordOffset);
|
||||
ks = new KeySelector(m, k);
|
||||
buf = new LinkedBlockingQueue(16);
|
||||
|
||||
// DEBUG
|
||||
//System.out.println("Bloom constructor: m = " + m + ", k = " + k
|
||||
@@ -114,9 +120,7 @@ public class BloomSHA1 {
|
||||
}
|
||||
/** Clear the filter, unsynchronized */
|
||||
protected void doClear() {
|
||||
for (int i = 0; i < filterWords; i++) {
|
||||
filter[i] = 0;
|
||||
}
|
||||
Arrays.fill(filter, 0);
|
||||
count = 0;
|
||||
}
|
||||
/** Synchronized version */
|
||||
@@ -154,19 +158,25 @@ public class BloomSHA1 {
|
||||
* @param b byte array representing a key (SHA1 digest)
|
||||
*/
|
||||
public void insert (byte[]b) { insert(b, 0, b.length); }
|
||||
|
||||
public void insert (byte[]b, int offset, int len) {
|
||||
synchronized(this) {
|
||||
locked_insert(b);
|
||||
locked_insert(b, offset, len);
|
||||
}
|
||||
}
|
||||
|
||||
public final void locked_insert(byte[]b) { locked_insert(b, 0, b.length); }
|
||||
|
||||
public final void locked_insert(byte[]b, int offset, int len) {
|
||||
ks.getOffsets(b, offset, len);
|
||||
int[] bitOffset = acquire();
|
||||
int[] wordOffset = acquire();
|
||||
ks.getOffsets(b, offset, len, bitOffset, wordOffset);
|
||||
for (int i = 0; i < k; i++) {
|
||||
filter[wordOffset[i]] |= 1 << bitOffset[i];
|
||||
}
|
||||
count++;
|
||||
buf.offer(bitOffset);
|
||||
buf.offer(wordOffset);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -176,13 +186,20 @@ public class BloomSHA1 {
|
||||
* @return true if b is in the filter
|
||||
*/
|
||||
protected final boolean isMember(byte[] b) { return isMember(b, 0, b.length); }
|
||||
|
||||
protected final boolean isMember(byte[] b, int offset, int len) {
|
||||
ks.getOffsets(b, offset, len);
|
||||
int[] bitOffset = acquire();
|
||||
int[] wordOffset = acquire();
|
||||
ks.getOffsets(b, offset, len, bitOffset, wordOffset);
|
||||
for (int i = 0; i < k; i++) {
|
||||
if (! ((filter[wordOffset[i]] & (1 << bitOffset[i])) != 0) ) {
|
||||
buf.offer(bitOffset);
|
||||
buf.offer(wordOffset);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
buf.offer(bitOffset);
|
||||
buf.offer(wordOffset);
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -202,6 +219,75 @@ public class BloomSHA1 {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the bloom filter offsets for reuse.
|
||||
* Caller should call rv.release() when done.
|
||||
* @since 0.8.11
|
||||
*/
|
||||
public FilterKey getFilterKey(byte[] b, int offset, int len) {
|
||||
int[] bitOffset = acquire();
|
||||
int[] wordOffset = acquire();
|
||||
ks.getOffsets(b, offset, len, bitOffset, wordOffset);
|
||||
return new FilterKey(bitOffset, wordOffset);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add the key to the filter.
|
||||
* @since 0.8.11
|
||||
*/
|
||||
public void locked_insert(FilterKey fk) {
|
||||
for (int i = 0; i < k; i++) {
|
||||
filter[fk.wordOffset[i]] |= 1 << fk.bitOffset[i];
|
||||
}
|
||||
count++;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Is the key in the filter.
|
||||
* @since 0.8.11
|
||||
*/
|
||||
public boolean locked_member(FilterKey fk) {
|
||||
for (int i = 0; i < k; i++) {
|
||||
if (! ((filter[fk.wordOffset[i]] & (1 << fk.bitOffset[i])) != 0) )
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* @since 0.8.11
|
||||
*/
|
||||
private int[] acquire() {
|
||||
int[] rv = buf.poll();
|
||||
if (rv != null)
|
||||
return rv;
|
||||
return new int[k];
|
||||
}
|
||||
|
||||
/**
|
||||
* @since 0.8.11
|
||||
*/
|
||||
public void release(FilterKey fk) {
|
||||
buf.offer(fk.bitOffset);
|
||||
buf.offer(fk.wordOffset);
|
||||
}
|
||||
|
||||
/**
|
||||
* Store the (opaque) bloom filter offsets for reuse.
|
||||
* @since 0.8.11
|
||||
*/
|
||||
public static class FilterKey {
|
||||
|
||||
private final int[] bitOffset;
|
||||
private final int[] wordOffset;
|
||||
|
||||
private FilterKey(int[] bitOffset, int[] wordOffset) {
|
||||
this.bitOffset = bitOffset;
|
||||
this.wordOffset = wordOffset;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param n number of set members
|
||||
* @return approximate false positive rate
|
||||
@@ -215,6 +301,8 @@ public class BloomSHA1 {
|
||||
public final double falsePositives() {
|
||||
return falsePositives(count);
|
||||
}
|
||||
|
||||
/*****
|
||||
// DEBUG METHODS
|
||||
public static String keyToString(byte[] key) {
|
||||
StringBuilder sb = new StringBuilder().append(key[0]);
|
||||
@@ -223,23 +311,32 @@ public class BloomSHA1 {
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
*****/
|
||||
|
||||
/** convert 64-bit integer to hex String */
|
||||
/*****
|
||||
public static String ltoh (long i) {
|
||||
StringBuilder sb = new StringBuilder().append("#")
|
||||
.append(Long.toString(i, 16));
|
||||
return sb.toString();
|
||||
}
|
||||
*****/
|
||||
|
||||
/** convert 32-bit integer to String */
|
||||
/*****
|
||||
public static String itoh (int i) {
|
||||
StringBuilder sb = new StringBuilder().append("#")
|
||||
.append(Integer.toString(i, 16));
|
||||
return sb.toString();
|
||||
}
|
||||
*****/
|
||||
|
||||
/** convert single byte to String */
|
||||
/*****
|
||||
public static String btoh (byte b) {
|
||||
int i = 0xff & b;
|
||||
return itoh(i);
|
||||
}
|
||||
*****/
|
||||
}
|
||||
|
||||
|
@@ -1,4 +1,3 @@
|
||||
/* KeySelector.java */
|
||||
package org.xlattice.crypto.filters;
|
||||
|
||||
/**
|
||||
@@ -12,25 +11,34 @@ package org.xlattice.crypto.filters;
|
||||
*
|
||||
* minor tweaks by jrandom, exposing unsynchronized access and
|
||||
* allowing larger M and K. changes released into the public domain.
|
||||
*
|
||||
* As of 0.8.11, bitoffset and wordoffset out parameters moved from fields
|
||||
* to selector arguments, to allow concurrency.
|
||||
* ALl methods are now thread-safe.
|
||||
*/
|
||||
public class KeySelector {
|
||||
|
||||
private int m;
|
||||
private int k;
|
||||
private byte[] b;
|
||||
private int offset; // index into b to select
|
||||
private int length; // length into b to select
|
||||
private int[] bitOffset;
|
||||
private int[] wordOffset;
|
||||
private BitSelector bitSel;
|
||||
private WordSelector wordSel;
|
||||
private final int m;
|
||||
private final int k;
|
||||
private final BitSelector bitSel;
|
||||
private final WordSelector wordSel;
|
||||
|
||||
public interface BitSelector {
|
||||
public void getBitSelectors();
|
||||
/**
|
||||
* @param bitOffset Out parameter of length k
|
||||
* @since 0.8.11 out parameter added
|
||||
*/
|
||||
public void getBitSelectors(byte[] b, int offset, int length, int[] bitOffset);
|
||||
}
|
||||
|
||||
public interface WordSelector {
|
||||
public void getWordSelectors();
|
||||
/**
|
||||
* @param wordOffset Out parameter of length k
|
||||
* @since 0.8.11 out parameter added
|
||||
*/
|
||||
public void getWordSelectors(byte[] b, int offset, int length, int[] wordOffset);
|
||||
}
|
||||
|
||||
/** AND with byte to expose index-many bits */
|
||||
public final static int[] UNMASK = {
|
||||
// 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
||||
@@ -49,8 +57,6 @@ public class KeySelector {
|
||||
*
|
||||
* @param m size of the filter as a power of 2
|
||||
* @param k number of 'hash functions'
|
||||
* @param bitOffset array of k bit offsets (offset of flag bit in word)
|
||||
* @param wordOffset array of k word offsets (offset of word flag is in)
|
||||
*
|
||||
* Note that if k and m are too big, the GenericWordSelector blows up -
|
||||
* The max for 32-byte keys is m=23 and k=11.
|
||||
@@ -59,15 +65,13 @@ public class KeySelector {
|
||||
*
|
||||
* It isn't clear how to fix this.
|
||||
*/
|
||||
public KeySelector (int m, int k, int[] bitOffset, int [] wordOffset) {
|
||||
public KeySelector (int m, int k) {
|
||||
//if ( (m < 2) || (m > 20)|| (k < 1)
|
||||
// || (bitOffset == null) || (wordOffset == null)) {
|
||||
// throw new IllegalArgumentException();
|
||||
//}
|
||||
this.m = m;
|
||||
this.k = k;
|
||||
this.bitOffset = bitOffset;
|
||||
this.wordOffset = wordOffset;
|
||||
bitSel = new GenericBitSelector();
|
||||
wordSel = new GenericWordSelector();
|
||||
}
|
||||
@@ -78,7 +82,7 @@ public class KeySelector {
|
||||
*/
|
||||
public class GenericBitSelector implements BitSelector {
|
||||
/** Do the extraction */
|
||||
public void getBitSelectors() {
|
||||
public void getBitSelectors(byte[] b, int offset, int length, int[] bitOffset) {
|
||||
int curBit = 8 * offset;
|
||||
int curByte;
|
||||
for (int j = 0; j < k; j++) {
|
||||
@@ -132,7 +136,7 @@ public class KeySelector {
|
||||
*/
|
||||
public class GenericWordSelector implements WordSelector {
|
||||
/** Extract the k offsets into the word offset array */
|
||||
public void getWordSelectors() {
|
||||
public void getWordSelectors(byte[] b, int offset, int length, int[] wordOffset) {
|
||||
int stride = m - 5;
|
||||
//assert true: stride<16;
|
||||
int curBit = (k * 5) + (offset * 8);
|
||||
@@ -221,32 +225,47 @@ public class KeySelector {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a key, populate the word and bit offset arrays, each
|
||||
* of which has k elements.
|
||||
*
|
||||
* @param key cryptographic key used in populating the arrays
|
||||
* @param bitOffset Out parameter of length k
|
||||
* @param wordOffset Out parameter of length k
|
||||
* @since 0.8.11 out parameters added
|
||||
*/
|
||||
public void getOffsets (byte[] key) { getOffsets(key, 0, key.length); }
|
||||
public void getOffsets (byte[] key, int off, int len) {
|
||||
if (key == null) {
|
||||
throw new IllegalArgumentException("null key");
|
||||
}
|
||||
if (len < 20) {
|
||||
throw new IllegalArgumentException(
|
||||
"key must be at least 20 bytes long");
|
||||
}
|
||||
b = key;
|
||||
offset = off;
|
||||
length = len;
|
||||
public void getOffsets (byte[] key, int[] bitOffset, int[] wordOffset) {
|
||||
getOffsets(key, 0, key.length, bitOffset, wordOffset);
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a key, populate the word and bit offset arrays, each
|
||||
* of which has k elements.
|
||||
*
|
||||
* @param key cryptographic key used in populating the arrays
|
||||
* @param bitOffset Out parameter of length k
|
||||
* @param wordOffset Out parameter of length k
|
||||
* @since 0.8.11 out parameters added
|
||||
*/
|
||||
public void getOffsets (byte[] key, int off, int len, int[] bitOffset, int[] wordOffset) {
|
||||
// skip these checks for speed
|
||||
//if (key == null) {
|
||||
// throw new IllegalArgumentException("null key");
|
||||
//}
|
||||
//if (len < 20) {
|
||||
// throw new IllegalArgumentException(
|
||||
// "key must be at least 20 bytes long");
|
||||
//}
|
||||
// // DEBUG
|
||||
// System.out.println("KeySelector.getOffsets for "
|
||||
// + BloomSHA1.keyToString(b));
|
||||
// // END
|
||||
bitSel.getBitSelectors();
|
||||
wordSel.getWordSelectors();
|
||||
bitSel.getBitSelectors(key, off, len, bitOffset);
|
||||
wordSel.getWordSelectors(key, off, len, wordOffset);
|
||||
}
|
||||
|
||||
/*****
|
||||
// DEBUG METHODS ////////////////////////////////////////////////
|
||||
String itoh(int i) {
|
||||
return BloomSHA1.itoh(i);
|
||||
@@ -254,6 +273,7 @@ public class KeySelector {
|
||||
String btoh(byte b) {
|
||||
return BloomSHA1.btoh(b);
|
||||
}
|
||||
*****/
|
||||
}
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user