forked from I2P_Developers/i2p.i2p
propagate from branch 'i2p.i2p' (head 28b73161ea8915467ac9a4a8eb910d8fef1d42cd)
to branch 'i2p.i2p.zzz.test' (head 4470e8eb34649523d9f0055e754d90226f0d7bcb)
This commit is contained in:
@@ -23,6 +23,8 @@ package org.klomp.snark;
|
||||
import java.util.Iterator;
|
||||
import java.util.TimerTask;
|
||||
|
||||
import net.i2p.data.DataHelper;
|
||||
|
||||
/**
|
||||
* TimerTask that monitors the peers and total up/download speeds.
|
||||
* Works together with the main Snark class to report periodical statistics.
|
||||
@@ -83,21 +85,12 @@ class PeerMonitorTask extends TimerTask
|
||||
|
||||
// Print some statistics
|
||||
long downloaded = coordinator.getDownloaded();
|
||||
String totalDown;
|
||||
if (downloaded >= 10 * 1024 * 1024)
|
||||
totalDown = (downloaded / (1024 * 1024)) + "MB";
|
||||
else
|
||||
totalDown = (downloaded / 1024 )+ "KB";
|
||||
String totalDown = DataHelper.formatSize(downloaded) + "B";
|
||||
long uploaded = coordinator.getUploaded();
|
||||
String totalUp;
|
||||
if (uploaded >= 10 * 1024 * 1024)
|
||||
totalUp = (uploaded / (1024 * 1024)) + "MB";
|
||||
else
|
||||
totalUp = (uploaded / 1024) + "KB";
|
||||
String totalUp = DataHelper.formatSize(uploaded) + "B";
|
||||
|
||||
int needP = coordinator.storage.needed();
|
||||
long needMB
|
||||
= needP * coordinator.metainfo.getPieceLength(0) / (1024 * 1024);
|
||||
long needMB = needP * coordinator.metainfo.getPieceLength(0) / (1024 * 1024);
|
||||
int totalP = coordinator.metainfo.getPieces();
|
||||
long totalMB = coordinator.metainfo.getTotalLength() / (1024 * 1024);
|
||||
|
||||
|
@@ -60,9 +60,9 @@ class PeerState
|
||||
// If we have te resend outstanding requests (true after we got choked).
|
||||
private boolean resend = false;
|
||||
|
||||
private final static int MAX_PIPELINE = 3; // this is for outbound requests
|
||||
private final static int MAX_PIPELINE = 5; // this is for outbound requests
|
||||
private final static int MAX_PIPELINE_BYTES = 128*1024; // this is for inbound requests
|
||||
public final static int PARTSIZE = 32*1024; // Snark was 16K, i2p-bt uses 64KB
|
||||
public final static int PARTSIZE = 16*1024; // outbound request
|
||||
private final static int MAX_PARTSIZE = 64*1024; // Don't let anybody request more than this
|
||||
|
||||
PeerState(Peer peer, PeerListener listener, MetaInfo metainfo,
|
||||
|
@@ -362,7 +362,7 @@ public class SnarkManager implements Snark.CompleteListener {
|
||||
public Properties getConfig() { return _config; }
|
||||
|
||||
/** hardcoded for sanity. perhaps this should be customizable, for people who increase their ulimit, etc. */
|
||||
private static final int MAX_FILES_PER_TORRENT = 256;
|
||||
private static final int MAX_FILES_PER_TORRENT = 512;
|
||||
|
||||
/** set of filenames that we are dealing with */
|
||||
public Set listTorrentFiles() { synchronized (_snarks) { return new HashSet(_snarks.keySet()); } }
|
||||
@@ -543,16 +543,18 @@ public class SnarkManager implements Snark.CompleteListener {
|
||||
return "Too many files in " + info.getName() + " (" + files.size() + "), deleting it";
|
||||
} else if (info.getPieces() <= 0) {
|
||||
return "No pieces in " + info.getName() + "? deleting it";
|
||||
} else if (info.getPieceLength(0) > 1*1024*1024) {
|
||||
return "Pieces are too large in " + info.getName() + " (" + info.getPieceLength(0)/1024 + "KB), deleting it";
|
||||
} else if (info.getTotalLength() > 10*1024*1024*1024l) {
|
||||
} else if (info.getPieceLength(0) > Storage.MAX_PIECE_SIZE) {
|
||||
return "Pieces are too large in " + info.getName() + " (" + DataHelper.formatSize(info.getPieceLength(0)) +
|
||||
"B), deleting it";
|
||||
} else if (info.getTotalLength() > Storage.MAX_TOTAL_SIZE) {
|
||||
System.out.println("torrent info: " + info.toString());
|
||||
List lengths = info.getLengths();
|
||||
if (lengths != null)
|
||||
for (int i = 0; i < lengths.size(); i++)
|
||||
System.out.println("File " + i + " is " + lengths.get(i) + " long");
|
||||
|
||||
return "Torrents larger than 10GB are not supported yet (because we're paranoid): " + info.getName() + ", deleting it";
|
||||
return "Torrents larger than " + DataHelper.formatSize(Storage.MAX_TOTAL_SIZE) +
|
||||
"B are not supported yet (because we're paranoid): " + info.getName() + ", deleting it";
|
||||
} else {
|
||||
// ok
|
||||
return null;
|
||||
@@ -637,8 +639,7 @@ public class SnarkManager implements Snark.CompleteListener {
|
||||
public void torrentComplete(Snark snark) {
|
||||
File f = new File(snark.torrent);
|
||||
long len = snark.meta.getTotalLength();
|
||||
addMessage("Download complete of " + f.getName()
|
||||
+ (len < 5*1024*1024 ? " (size: " + (len/1024) + "KB)" : " (size: " + (len/(1024*1024l)) + "MB)"));
|
||||
addMessage("Download complete of " + f.getName() + " (size: " + DataHelper.formatSize(len) + "B)");
|
||||
updateStatus(snark);
|
||||
}
|
||||
|
||||
|
@@ -56,10 +56,11 @@ public class Storage
|
||||
boolean changed;
|
||||
|
||||
/** The default piece size. */
|
||||
private static int MIN_PIECE_SIZE = 256*1024;
|
||||
private static int MAX_PIECE_SIZE = 1024*1024;
|
||||
private static final int MIN_PIECE_SIZE = 256*1024;
|
||||
public static final int MAX_PIECE_SIZE = 1024*1024;
|
||||
/** The maximum number of pieces in a torrent. */
|
||||
private static long MAX_PIECES = 100*1024/20;
|
||||
public static final int MAX_PIECES = 10*1024;
|
||||
public static final long MAX_TOTAL_SIZE = MAX_PIECE_SIZE * (long) MAX_PIECES;
|
||||
|
||||
/**
|
||||
* Creates a new storage based on the supplied MetaInfo. This will
|
||||
|
@@ -159,6 +159,9 @@ public class EepGet {
|
||||
markSize = Integer.parseInt(args[i+1]);
|
||||
lineLen = Integer.parseInt(args[i+2]);
|
||||
i += 2;
|
||||
} else if (args[i].startsWith("-")) {
|
||||
usage();
|
||||
return;
|
||||
} else {
|
||||
url = args[i];
|
||||
}
|
||||
|
@@ -19,6 +19,7 @@ import net.i2p.data.Hash;
|
||||
import net.i2p.data.RouterAddress;
|
||||
import net.i2p.data.RouterInfo;
|
||||
import net.i2p.router.networkdb.kademlia.FloodfillNetworkDatabaseFacade;
|
||||
import net.i2p.util.ConcurrentHashSet;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
/**
|
||||
@@ -55,20 +56,16 @@ public class Blocklist {
|
||||
private int _blocklistSize;
|
||||
private final Object _lock = new Object();
|
||||
private Entry _wrapSave;
|
||||
private final Set _inProcess = new HashSet(0);
|
||||
private Map _peerBlocklist = new HashMap(0);
|
||||
private final Set _singleIPBlocklist = new HashSet(0);
|
||||
private final Set<Hash> _inProcess = new HashSet(0);
|
||||
private Map<Hash, String> _peerBlocklist = new HashMap(0);
|
||||
private final Set<Integer> _singleIPBlocklist = new ConcurrentHashSet(0);
|
||||
|
||||
public Blocklist(RouterContext context) {
|
||||
_context = context;
|
||||
_log = context.logManager().getLog(Blocklist.class);
|
||||
_blocklist = null;
|
||||
_blocklistSize = 0;
|
||||
// _lock = new Object();
|
||||
_wrapSave = null;
|
||||
// _inProcess = new HashSet(0);
|
||||
// _peerBlocklist = new HashMap(0);
|
||||
// _singleIPBlocklist = new HashSet(0);
|
||||
}
|
||||
|
||||
public Blocklist() {
|
||||
@@ -446,15 +443,11 @@ public class Blocklist {
|
||||
}
|
||||
|
||||
private boolean add(int ip) {
|
||||
synchronized(_singleIPBlocklist) {
|
||||
return _singleIPBlocklist.add(new Integer(ip));
|
||||
}
|
||||
return _singleIPBlocklist.add(Integer.valueOf(ip));
|
||||
}
|
||||
|
||||
private boolean isOnSingleList(int ip) {
|
||||
synchronized(_singleIPBlocklist) {
|
||||
return _singleIPBlocklist.contains(new Integer(ip));
|
||||
}
|
||||
return _singleIPBlocklist.contains(Integer.valueOf(ip));
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -586,11 +579,11 @@ public class Blocklist {
|
||||
|
||||
// methods to get and store the from/to values in the array
|
||||
|
||||
private int getFrom(long entry) {
|
||||
private static int getFrom(long entry) {
|
||||
return (int) ((entry >> 32) & 0xffffffff);
|
||||
}
|
||||
|
||||
private int getTo(long entry) {
|
||||
private static int getTo(long entry) {
|
||||
return (int) (entry & 0xffffffff);
|
||||
}
|
||||
|
||||
@@ -602,7 +595,7 @@ public class Blocklist {
|
||||
* So the size is (cough) almost 2MB for the 240,000 line splist.txt.
|
||||
*
|
||||
*/
|
||||
private long toEntry(byte ip1[], byte ip2[]) {
|
||||
private static long toEntry(byte ip1[], byte ip2[]) {
|
||||
long entry = 0;
|
||||
for (int i = 0; i < 4; i++)
|
||||
entry |= ((long) (ip2[i] & 0xff)) << ((3-i)*8);
|
||||
@@ -621,14 +614,18 @@ public class Blocklist {
|
||||
_blocklist[idx] = entry;
|
||||
}
|
||||
|
||||
private int toInt(byte ip[]) {
|
||||
private static int toInt(byte ip[]) {
|
||||
int rv = 0;
|
||||
for (int i = 0; i < 4; i++)
|
||||
rv |= (ip[i] & 0xff) << ((3-i)*8);
|
||||
return rv;
|
||||
}
|
||||
|
||||
private String toStr(long entry) {
|
||||
public static String toStr(byte[] ip) {
|
||||
return toStr(toInt(ip));
|
||||
}
|
||||
|
||||
private static String toStr(long entry) {
|
||||
StringBuffer buf = new StringBuffer(32);
|
||||
for (int i = 7; i >= 0; i--) {
|
||||
buf.append((entry >> (8*i)) & 0xff);
|
||||
@@ -640,7 +637,7 @@ public class Blocklist {
|
||||
return buf.toString();
|
||||
}
|
||||
|
||||
private String toStr(int ip) {
|
||||
private static String toStr(int ip) {
|
||||
StringBuffer buf = new StringBuffer(16);
|
||||
for (int i = 3; i >= 0; i--) {
|
||||
buf.append((ip >> (8*i)) & 0xff);
|
||||
@@ -756,9 +753,7 @@ public class Blocklist {
|
||||
public void renderStatusHTML(Writer out) throws IOException {
|
||||
out.write("<h2>IP Blocklist</h2>");
|
||||
Set singles = new TreeSet();
|
||||
synchronized(_singleIPBlocklist) {
|
||||
singles.addAll(_singleIPBlocklist);
|
||||
}
|
||||
singles.addAll(_singleIPBlocklist);
|
||||
if (singles.size() > 0) {
|
||||
out.write("<table><tr><td><b>Transient IPs</b></td></tr>");
|
||||
for (Iterator iter = singles.iterator(); iter.hasNext(); ) {
|
||||
|
@@ -58,6 +58,7 @@ public abstract class CommSystemFacade implements Service {
|
||||
public boolean isBacklogged(Hash dest) { return false; }
|
||||
public boolean wasUnreachable(Hash dest) { return false; }
|
||||
public boolean isEstablished(Hash dest) { return false; }
|
||||
public byte[] getIP(Hash dest) { return null; }
|
||||
|
||||
/**
|
||||
* Tell other transports our address changed
|
||||
|
@@ -19,6 +19,8 @@ import java.util.Properties;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import net.i2p.data.Hash;
|
||||
import net.i2p.data.RouterAddress;
|
||||
@@ -40,17 +42,17 @@ public class ProfileOrganizer {
|
||||
private Log _log;
|
||||
private RouterContext _context;
|
||||
/** H(routerIdentity) to PeerProfile for all peers that are fast and high capacity*/
|
||||
private Map _fastPeers;
|
||||
private Map<Hash, PeerProfile> _fastPeers;
|
||||
/** H(routerIdentity) to PeerProfile for all peers that have high capacities */
|
||||
private Map _highCapacityPeers;
|
||||
private Map<Hash, PeerProfile> _highCapacityPeers;
|
||||
/** H(routerIdentity) to PeerProfile for all peers that well integrated into the network and not failing horribly */
|
||||
private Map _wellIntegratedPeers;
|
||||
private Map<Hash, PeerProfile> _wellIntegratedPeers;
|
||||
/** H(routerIdentity) to PeerProfile for all peers that are not failing horribly */
|
||||
private Map _notFailingPeers;
|
||||
private Map<Hash, PeerProfile> _notFailingPeers;
|
||||
/** H(routerIdnetity), containing elements in _notFailingPeers */
|
||||
private List _notFailingPeersList;
|
||||
private List<Hash> _notFailingPeersList;
|
||||
/** H(routerIdentity) to PeerProfile for all peers that ARE failing horribly (but that we haven't dropped reference to yet) */
|
||||
private Map _failingPeers;
|
||||
private Map<Hash, PeerProfile> _failingPeers;
|
||||
/** who are we? */
|
||||
private Hash _us;
|
||||
private ProfilePersistenceHelper _persistenceHelper;
|
||||
@@ -84,7 +86,7 @@ public class ProfileOrganizer {
|
||||
public static final int DEFAULT_MINIMUM_HIGH_CAPACITY_PEERS = 10;
|
||||
|
||||
/** synchronized against this lock when updating the tier that peers are located in (and when fetching them from a peer) */
|
||||
private final Object _reorganizeLock = new Object();
|
||||
private final ReentrantReadWriteLock _reorganizeLock = new ReentrantReadWriteLock(true);
|
||||
|
||||
/** incredibly weak PRNG, just used for shuffling peers. no need to waste the real PRNG on this */
|
||||
private Random _random = new Random();
|
||||
@@ -112,6 +114,29 @@ public class ProfileOrganizer {
|
||||
_context.statManager().createRateStat("peer.profileReorgTime", "How long the reorg takes overall", "Peers", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
|
||||
}
|
||||
|
||||
private void getReadLock() {
|
||||
_reorganizeLock.readLock().lock();
|
||||
}
|
||||
|
||||
private void releaseReadLock() {
|
||||
_reorganizeLock.readLock().unlock();
|
||||
}
|
||||
|
||||
/** @return true if the lock was acquired */
|
||||
private boolean getWriteLock() {
|
||||
try {
|
||||
boolean rv = _reorganizeLock.writeLock().tryLock(5000, TimeUnit.MILLISECONDS);
|
||||
if (!rv)
|
||||
_log.error("no lock, size is: " + _reorganizeLock.getQueueLength(), new Exception("rats"));
|
||||
return rv;
|
||||
} catch (InterruptedException ie) {}
|
||||
return false;
|
||||
}
|
||||
|
||||
private void releaseWriteLock() {
|
||||
_reorganizeLock.writeLock().unlock();
|
||||
}
|
||||
|
||||
public void setUs(Hash us) { _us = us; }
|
||||
Hash getUs() { return _us; }
|
||||
|
||||
@@ -124,42 +149,52 @@ public class ProfileOrganizer {
|
||||
*
|
||||
*/
|
||||
public PeerProfile getProfile(Hash peer) {
|
||||
synchronized (_reorganizeLock) {
|
||||
getReadLock();
|
||||
try {
|
||||
return locked_getProfile(peer);
|
||||
}
|
||||
} finally { releaseReadLock(); }
|
||||
}
|
||||
|
||||
/**
|
||||
* Add the new profile, returning the old value (or null if no profile existed)
|
||||
*
|
||||
*/
|
||||
public PeerProfile addProfile(PeerProfile profile) throws IllegalStateException {
|
||||
public PeerProfile addProfile(PeerProfile profile) {
|
||||
if ( (profile == null) || (profile.getPeer() == null) ) return null;
|
||||
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("New profile created for " + profile.getPeer().toBase64());
|
||||
|
||||
synchronized (_reorganizeLock) {
|
||||
PeerProfile old = locked_getProfile(profile.getPeer());
|
||||
profile.coalesceStats();
|
||||
PeerProfile old = getProfile(profile.getPeer());
|
||||
profile.coalesceStats();
|
||||
if (!getWriteLock())
|
||||
return old;
|
||||
try {
|
||||
locked_placeProfile(profile);
|
||||
_strictCapacityOrder.add(profile);
|
||||
return old;
|
||||
}
|
||||
} finally { releaseWriteLock(); }
|
||||
return old;
|
||||
}
|
||||
|
||||
public int countFastPeers() { synchronized (_reorganizeLock) { return _fastPeers.size(); } }
|
||||
public int countHighCapacityPeers() { synchronized (_reorganizeLock) { return _highCapacityPeers.size(); } }
|
||||
public int countWellIntegratedPeers() { synchronized (_reorganizeLock) { return _wellIntegratedPeers.size(); } }
|
||||
public int countNotFailingPeers() { synchronized (_reorganizeLock) { return _notFailingPeers.size(); } }
|
||||
public int countFailingPeers() { synchronized (_reorganizeLock) { return _failingPeers.size(); } }
|
||||
private int count(Map m) {
|
||||
getReadLock();
|
||||
try {
|
||||
return m.size();
|
||||
} finally { releaseReadLock(); }
|
||||
}
|
||||
|
||||
public int countFastPeers() { return count(_fastPeers); }
|
||||
public int countHighCapacityPeers() { return count(_highCapacityPeers); }
|
||||
public int countWellIntegratedPeers() { return count(_wellIntegratedPeers); }
|
||||
public int countNotFailingPeers() { return count(_notFailingPeers); }
|
||||
public int countFailingPeers() { return count(_failingPeers); }
|
||||
|
||||
public int countActivePeers() {
|
||||
synchronized (_reorganizeLock) {
|
||||
int activePeers = 0;
|
||||
|
||||
long hideBefore = _context.clock().now() - 6*60*60*1000;
|
||||
|
||||
int activePeers = 0;
|
||||
long hideBefore = _context.clock().now() - 6*60*60*1000;
|
||||
|
||||
getReadLock();
|
||||
try {
|
||||
for (Iterator iter = _failingPeers.values().iterator(); iter.hasNext(); ) {
|
||||
PeerProfile profile = (PeerProfile)iter.next();
|
||||
if (profile.getLastSendSuccessful() >= hideBefore)
|
||||
@@ -174,15 +209,21 @@ public class ProfileOrganizer {
|
||||
else if (profile.getLastHeardFrom() >= hideBefore)
|
||||
activePeers++;
|
||||
}
|
||||
|
||||
return activePeers;
|
||||
}
|
||||
} finally { releaseReadLock(); }
|
||||
return activePeers;
|
||||
}
|
||||
|
||||
public boolean isFast(Hash peer) { synchronized (_reorganizeLock) { return _fastPeers.containsKey(peer); } }
|
||||
public boolean isHighCapacity(Hash peer) { synchronized (_reorganizeLock) { return _highCapacityPeers.containsKey(peer); } }
|
||||
public boolean isWellIntegrated(Hash peer) { synchronized (_reorganizeLock) { return _wellIntegratedPeers.containsKey(peer); } }
|
||||
public boolean isFailing(Hash peer) { synchronized (_reorganizeLock) { return _failingPeers.containsKey(peer); } }
|
||||
private boolean isX(Map m, Hash peer) {
|
||||
getReadLock();
|
||||
try {
|
||||
return m.containsKey(peer);
|
||||
} finally { releaseReadLock(); }
|
||||
}
|
||||
|
||||
public boolean isFast(Hash peer) { return isX(_fastPeers, peer); }
|
||||
public boolean isHighCapacity(Hash peer) { return isX(_highCapacityPeers, peer); }
|
||||
public boolean isWellIntegrated(Hash peer) { return isX(_wellIntegratedPeers, peer); }
|
||||
public boolean isFailing(Hash peer) { return isX(_failingPeers, peer); }
|
||||
|
||||
/**
|
||||
* if a peer sends us more than 5 replies in a searchReply that we cannot
|
||||
@@ -236,9 +277,10 @@ public class ProfileOrganizer {
|
||||
selectFastPeers(howMany, exclude, matches, 0);
|
||||
}
|
||||
public void selectFastPeers(int howMany, Set exclude, Set matches, int mask) {
|
||||
synchronized (_reorganizeLock) {
|
||||
getReadLock();
|
||||
try {
|
||||
locked_selectPeers(_fastPeers, howMany, exclude, matches, mask);
|
||||
}
|
||||
} finally { releaseReadLock(); }
|
||||
if (matches.size() < howMany) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("selectFastPeers("+howMany+"), not enough fast (" + matches.size() + ") going on to highCap");
|
||||
@@ -258,7 +300,8 @@ public class ProfileOrganizer {
|
||||
selectHighCapacityPeers(howMany, exclude, matches, 0);
|
||||
}
|
||||
public void selectHighCapacityPeers(int howMany, Set exclude, Set matches, int mask) {
|
||||
synchronized (_reorganizeLock) {
|
||||
getReadLock();
|
||||
try {
|
||||
// we only use selectHighCapacityPeers when we are selecting for PURPOSE_TEST
|
||||
// or we are falling back due to _fastPeers being too small, so we can always
|
||||
// exclude the fast peers
|
||||
@@ -269,7 +312,7 @@ public class ProfileOrganizer {
|
||||
exclude.addAll(_fastPeers.keySet());
|
||||
*/
|
||||
locked_selectPeers(_highCapacityPeers, howMany, exclude, matches, mask);
|
||||
}
|
||||
} finally { releaseReadLock(); }
|
||||
if (matches.size() < howMany) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("selectHighCap("+howMany+"), not enough fast (" + matches.size() + ") going on to notFailing");
|
||||
@@ -288,9 +331,10 @@ public class ProfileOrganizer {
|
||||
selectWellIntegratedPeers(howMany, exclude, matches, 0);
|
||||
}
|
||||
public void selectWellIntegratedPeers(int howMany, Set exclude, Set matches, int mask) {
|
||||
synchronized (_reorganizeLock) {
|
||||
getReadLock();
|
||||
try {
|
||||
locked_selectPeers(_wellIntegratedPeers, howMany, exclude, matches, mask);
|
||||
}
|
||||
} finally { releaseReadLock(); }
|
||||
if (matches.size() < howMany) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("selectWellIntegrated("+howMany+"), not enough integrated (" + matches.size() + ") going on to notFailing");
|
||||
@@ -375,7 +419,8 @@ public class ProfileOrganizer {
|
||||
int needed = howMany - orig;
|
||||
int start = 0;
|
||||
List selected = new ArrayList(needed);
|
||||
synchronized (_reorganizeLock) {
|
||||
getReadLock();
|
||||
try {
|
||||
// we randomize the whole list when rebuilding it, but randomizing
|
||||
// the entire list on each peer selection is a bit crazy
|
||||
start = _context.random().nextInt(_notFailingPeersList.size());
|
||||
@@ -397,7 +442,7 @@ public class ProfileOrganizer {
|
||||
_log.debug("Not selectable: " + cur.toBase64());
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally { releaseReadLock(); }
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Selecting all not failing (strict? " + onlyNotFailing + " start=" + start
|
||||
+ ") found " + selected.size() + " new peers: " + selected + " all=" + _notFailingPeersList.size() + " strict=" + _strictCapacityOrder.size());
|
||||
@@ -418,25 +463,27 @@ public class ProfileOrganizer {
|
||||
*
|
||||
*/
|
||||
public void selectFailingPeers(int howMany, Set exclude, Set matches) {
|
||||
synchronized (_reorganizeLock) {
|
||||
getReadLock();
|
||||
try {
|
||||
locked_selectPeers(_failingPeers, howMany, exclude, matches);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
/**
|
||||
} finally { releaseReadLock(); }
|
||||
return;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the peers the transport layer thinks are unreachable, and
|
||||
* add in the peers with the SSU peer testing bug,
|
||||
* and peers requiring introducers.
|
||||
*
|
||||
*/
|
||||
*
|
||||
*/
|
||||
public List selectPeersLocallyUnreachable() {
|
||||
List n;
|
||||
int count;
|
||||
synchronized (_reorganizeLock) {
|
||||
getReadLock();
|
||||
try {
|
||||
count = _notFailingPeers.size();
|
||||
n = new ArrayList(_notFailingPeers.keySet());
|
||||
}
|
||||
} finally { releaseReadLock(); }
|
||||
List l = new ArrayList(count / 4);
|
||||
for (Iterator iter = n.iterator(); iter.hasNext(); ) {
|
||||
Hash peer = (Hash)iter.next();
|
||||
@@ -483,7 +530,8 @@ public class ProfileOrganizer {
|
||||
*
|
||||
*/
|
||||
public List selectPeersRecentlyRejecting() {
|
||||
synchronized (_reorganizeLock) {
|
||||
getReadLock();
|
||||
try {
|
||||
long cutoff = _context.clock().now() - (20*1000);
|
||||
int count = _notFailingPeers.size();
|
||||
List l = new ArrayList(count / 128);
|
||||
@@ -493,7 +541,7 @@ public class ProfileOrganizer {
|
||||
l.add(prof.getPeer());
|
||||
}
|
||||
return l;
|
||||
}
|
||||
} finally { releaseReadLock(); }
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -501,14 +549,15 @@ public class ProfileOrganizer {
|
||||
*
|
||||
*/
|
||||
public Set selectAllPeers() {
|
||||
synchronized (_reorganizeLock) {
|
||||
getReadLock();
|
||||
try {
|
||||
Set allPeers = new HashSet(_failingPeers.size() + _notFailingPeers.size() + _highCapacityPeers.size() + _fastPeers.size());
|
||||
allPeers.addAll(_failingPeers.keySet());
|
||||
allPeers.addAll(_notFailingPeers.keySet());
|
||||
allPeers.addAll(_highCapacityPeers.keySet());
|
||||
allPeers.addAll(_fastPeers.keySet());
|
||||
return allPeers;
|
||||
}
|
||||
} finally { releaseReadLock(); }
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -532,8 +581,10 @@ public class ProfileOrganizer {
|
||||
expireOlderThan = _context.clock().now() - 6*60*60*1000;
|
||||
}
|
||||
|
||||
if (!getWriteLock())
|
||||
return;
|
||||
long start = System.currentTimeMillis();
|
||||
synchronized (_reorganizeLock) {
|
||||
try {
|
||||
Set allPeers = _strictCapacityOrder; //new HashSet(_failingPeers.size() + _notFailingPeers.size() + _highCapacityPeers.size() + _fastPeers.size());
|
||||
//allPeers.addAll(_failingPeers.values());
|
||||
//allPeers.addAll(_notFailingPeers.values());
|
||||
@@ -557,35 +608,37 @@ public class ProfileOrganizer {
|
||||
}
|
||||
sortTime = System.currentTimeMillis() - sortStart;
|
||||
_strictCapacityOrder = reordered;
|
||||
|
||||
|
||||
long thresholdStart = System.currentTimeMillis();
|
||||
locked_calculateThresholds(allPeers);
|
||||
thresholdTime = System.currentTimeMillis()-thresholdStart;
|
||||
|
||||
|
||||
_failingPeers.clear();
|
||||
_fastPeers.clear();
|
||||
_highCapacityPeers.clear();
|
||||
_notFailingPeers.clear();
|
||||
_notFailingPeersList.clear();
|
||||
_wellIntegratedPeers.clear();
|
||||
|
||||
|
||||
long placeStart = System.currentTimeMillis();
|
||||
|
||||
|
||||
for (Iterator iter = allPeers.iterator(); iter.hasNext(); ) {
|
||||
PeerProfile profile = (PeerProfile)iter.next();
|
||||
locked_placeProfile(profile);
|
||||
}
|
||||
|
||||
|
||||
locked_unfailAsNecessary();
|
||||
locked_promoteFastAsNecessary();
|
||||
|
||||
Collections.shuffle(_notFailingPeersList, _context.random());
|
||||
|
||||
placeTime = System.currentTimeMillis()-placeStart;
|
||||
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Profiles reorganized. averages: [integration: " + _thresholdIntegrationValue
|
||||
+ ", capacity: " + _thresholdCapacityValue + ", speed: " + _thresholdSpeedValue + "]");
|
||||
placeTime = System.currentTimeMillis()-placeStart;
|
||||
} finally { releaseWriteLock(); }
|
||||
|
||||
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Profiles reorganized. averages: [integration: " + _thresholdIntegrationValue
|
||||
+ ", capacity: " + _thresholdCapacityValue + ", speed: " + _thresholdSpeedValue + "]");
|
||||
/*****
|
||||
if (_log.shouldLog(Log.DEBUG)) {
|
||||
StringBuffer buf = new StringBuffer(512);
|
||||
@@ -597,7 +650,6 @@ public class ProfileOrganizer {
|
||||
_log.debug("fast: " + _fastPeers.values());
|
||||
}
|
||||
*****/
|
||||
}
|
||||
|
||||
long total = System.currentTimeMillis()-start;
|
||||
_context.statManager().addRateData("peer.profileSortTime", sortTime, profileCount);
|
||||
@@ -899,11 +951,12 @@ public class ProfileOrganizer {
|
||||
all.removeAll(matches);
|
||||
all.remove(_us);
|
||||
Collections.shuffle(all, _random);
|
||||
Set IPSet = new HashSet(8);
|
||||
for (int i = 0; (matches.size() < howMany) && (i < all.size()); i++) {
|
||||
Hash peer = (Hash)all.get(i);
|
||||
boolean ok = isSelectable(peer);
|
||||
if (ok) {
|
||||
ok = mask <= 0 || notRestricted(peer, matches, mask);
|
||||
ok = mask <= 0 || notRestricted(peer, IPSet, mask);
|
||||
if ((!ok) && _log.shouldLog(Log.WARN))
|
||||
_log.warn("IP restriction prevents " + peer + " from joining " + matches);
|
||||
}
|
||||
@@ -917,79 +970,69 @@ public class ProfileOrganizer {
|
||||
/**
|
||||
* Does the peer's IP address NOT match the IP address of any peer already in the set,
|
||||
* on any transport, within a given mask?
|
||||
* mask is 1-4 (number of bytes to match) or 0 to disable
|
||||
* Perhaps rewrite this to just make a set of all the IP addresses rather than loop.
|
||||
* @param mask is 1-4 (number of bytes to match)
|
||||
* @param IPMatches all IPs so far, modified by this routine
|
||||
*/
|
||||
private boolean notRestricted(Hash peer, Set matches, int mask) {
|
||||
if (mask <= 0) return true;
|
||||
if (matches.size() <= 0) return true;
|
||||
RouterInfo pinfo = _context.netDb().lookupRouterInfoLocally(peer);
|
||||
if (pinfo == null) return false;
|
||||
Set paddr = pinfo.getAddresses();
|
||||
if (paddr == null || paddr.size() == 0)
|
||||
private boolean notRestricted(Hash peer, Set IPSet, int mask) {
|
||||
Set peerIPs = maskedIPSet(peer, mask);
|
||||
if (containsAny(IPSet, peerIPs))
|
||||
return false;
|
||||
List pladdr = new ArrayList(paddr);
|
||||
List lmatches = new ArrayList(matches);
|
||||
// for each match
|
||||
for (int i = 0; i < matches.size(); i++) {
|
||||
RouterInfo minfo = _context.netDb().lookupRouterInfoLocally((Hash) lmatches.get(i));
|
||||
if (minfo == null) continue;
|
||||
Set maddr = minfo.getAddresses();
|
||||
if (maddr == null || maddr.size() == 0)
|
||||
continue;
|
||||
List mladdr = new ArrayList(maddr);
|
||||
String oldphost = null;
|
||||
// for each peer address
|
||||
for (int j = 0; j < paddr.size(); j++) {
|
||||
RouterAddress pa = (RouterAddress) pladdr.get(j);
|
||||
if (pa == null) continue;
|
||||
Properties pprops = pa.getOptions();
|
||||
if (pprops == null) continue;
|
||||
String phost = pprops.getProperty("host");
|
||||
if (phost == null) continue;
|
||||
if (oldphost != null && oldphost.equals(phost)) continue;
|
||||
oldphost = phost;
|
||||
InetAddress pi;
|
||||
try {
|
||||
pi = InetAddress.getByName(phost);
|
||||
} catch (UnknownHostException uhe) {
|
||||
continue;
|
||||
}
|
||||
if (pi == null) continue;
|
||||
byte[] pib = pi.getAddress();
|
||||
String oldmhost = null;
|
||||
// for each match address
|
||||
for (int k = 0; k < maddr.size(); k++) {
|
||||
RouterAddress ma = (RouterAddress) mladdr.get(k);
|
||||
if (ma == null) continue;
|
||||
Properties mprops = ma.getOptions();
|
||||
if (mprops == null) continue;
|
||||
String mhost = mprops.getProperty("host");
|
||||
if (mhost == null) continue;
|
||||
if (oldmhost != null && oldmhost.equals(mhost)) continue;
|
||||
oldmhost = mhost;
|
||||
InetAddress mi;
|
||||
try {
|
||||
mi = InetAddress.getByName(mhost);
|
||||
} catch (UnknownHostException uhe) {
|
||||
continue;
|
||||
}
|
||||
if (mi == null) continue;
|
||||
byte[] mib = mi.getAddress();
|
||||
// assume ipv4, compare 1 to 4 bytes
|
||||
// log.info("Comparing " + pi + " with " + mi);
|
||||
for (int m = 0; m < mask; m++) {
|
||||
if (pib[m] != mib[m])
|
||||
break;
|
||||
if (m == mask-1)
|
||||
return false; // IP match
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
IPSet.addAll(peerIPs);
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* The Set of IPs for this peer, with a given mask.
|
||||
* Includes the comm system's record of the IP, and all netDb addresses.
|
||||
*
|
||||
* @return an opaque set of masked IPs for this peer
|
||||
*/
|
||||
private Set maskedIPSet(Hash peer, int mask) {
|
||||
Set rv = new HashSet(2);
|
||||
byte[] commIP = _context.commSystem().getIP(peer);
|
||||
if (commIP != null)
|
||||
rv.add(maskedIP(commIP, mask));
|
||||
RouterInfo pinfo = _context.netDb().lookupRouterInfoLocally(peer);
|
||||
if (pinfo == null)
|
||||
return rv;
|
||||
Set<RouterAddress> paddr = pinfo.getAddresses();
|
||||
if (paddr == null)
|
||||
return rv;
|
||||
for (RouterAddress pa : paddr) {
|
||||
Properties pprops = pa.getOptions();
|
||||
if (pprops == null) continue;
|
||||
String phost = pprops.getProperty("host");
|
||||
if (phost == null) continue;
|
||||
InetAddress pi;
|
||||
try {
|
||||
pi = InetAddress.getByName(phost);
|
||||
} catch (UnknownHostException uhe) {
|
||||
continue;
|
||||
}
|
||||
if (pi == null) continue;
|
||||
byte[] pib = pi.getAddress();
|
||||
rv.add(maskedIP(pib, mask));
|
||||
}
|
||||
return rv;
|
||||
}
|
||||
|
||||
/** generate an arbitrary unique value for this ip/mask (mask = 1-4) */
|
||||
private Integer maskedIP(byte[] ip, int mask) {
|
||||
int rv = 0;
|
||||
for (int i = 0; i < mask; i++)
|
||||
rv = (rv << 8) | (ip[i] & 0xff);
|
||||
return Integer.valueOf(rv);
|
||||
}
|
||||
|
||||
/** does a contain any of the elements in b? */
|
||||
private boolean containsAny(Set a, Set b) {
|
||||
for (Object o : b) {
|
||||
if (a.contains(o))
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public boolean isSelectable(Hash peer) {
|
||||
NetworkDatabaseFacade netDb = _context.netDb();
|
||||
// the CLI shouldn't depend upon the netDb
|
||||
|
@@ -36,6 +36,8 @@ import net.i2p.router.RouterContext;
|
||||
import net.i2p.router.networkdb.kademlia.FloodfillNetworkDatabaseFacade;
|
||||
import net.i2p.util.ConcurrentHashSet;
|
||||
import net.i2p.util.Log;
|
||||
import net.i2p.util.SimpleScheduler;
|
||||
import net.i2p.util.SimpleTimer;
|
||||
|
||||
/**
|
||||
* Defines a way to send a message to another peer and start listening for messages
|
||||
@@ -72,6 +74,7 @@ public abstract class TransportImpl implements Transport {
|
||||
_unreachableEntries = new HashMap(16);
|
||||
_wasUnreachableEntries = new ConcurrentHashSet(16);
|
||||
_currentAddress = null;
|
||||
SimpleScheduler.getInstance().addPeriodicEvent(new CleanupUnreachable(), 2 * UNREACHABLE_PERIOD, UNREACHABLE_PERIOD / 2);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -462,13 +465,10 @@ public abstract class TransportImpl implements Transport {
|
||||
if (!isInbound)
|
||||
markWasUnreachable(peer, false);
|
||||
}
|
||||
private class CleanupUnreachable extends JobImpl {
|
||||
public CleanupUnreachable(RouterContext ctx) {
|
||||
super(ctx);
|
||||
}
|
||||
public String getName() { return "Cleanup " + getStyle() + " unreachable list"; }
|
||||
public void runJob() {
|
||||
long now = getContext().clock().now();
|
||||
|
||||
private class CleanupUnreachable implements SimpleTimer.TimedEvent {
|
||||
public void timeReached() {
|
||||
long now = _context.clock().now();
|
||||
synchronized (_unreachableEntries) {
|
||||
for (Iterator iter = _unreachableEntries.keySet().iterator(); iter.hasNext(); ) {
|
||||
Hash peer = (Hash)iter.next();
|
||||
@@ -477,7 +477,6 @@ public abstract class TransportImpl implements Transport {
|
||||
iter.remove();
|
||||
}
|
||||
}
|
||||
requeue(60*1000);
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -398,6 +398,8 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
continue;
|
||||
|
||||
RouterInfo info = fac.lookupRouterInfoLocally(peer);
|
||||
if (info == null)
|
||||
continue;
|
||||
|
||||
OutNetMessage infoMsg = new OutNetMessage(_context);
|
||||
infoMsg.setExpiration(_context.clock().now()+10*1000);
|
||||
|
@@ -106,7 +106,18 @@ public class FragmentHandler {
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("Corrupt fragment received: offset = " + offset, e);
|
||||
_context.statManager().addRateData("tunnel.corruptMessage", 1, 1);
|
||||
throw e;
|
||||
// java.lang.IllegalStateException: wtf, don't get the completed size when we're not complete - null fragment i=0 of 1
|
||||
// at net.i2p.router.tunnel.FragmentedMessage.getCompleteSize(FragmentedMessage.java:194)
|
||||
// at net.i2p.router.tunnel.FragmentedMessage.toByteArray(FragmentedMessage.java:223)
|
||||
// at net.i2p.router.tunnel.FragmentHandler.receiveComplete(FragmentHandler.java:380)
|
||||
// at net.i2p.router.tunnel.FragmentHandler.receiveSubsequentFragment(FragmentHandler.java:353)
|
||||
// at net.i2p.router.tunnel.FragmentHandler.receiveFragment(FragmentHandler.java:208)
|
||||
// at net.i2p.router.tunnel.FragmentHandler.receiveTunnelMessage(FragmentHandler.java:92)
|
||||
// ...
|
||||
// still trying to find root cause
|
||||
// let's limit the damage here and skip the:
|
||||
// .transport.udp.MessageReceiver: b0rked receiving a message.. wazza huzza hmm?
|
||||
//throw e;
|
||||
} finally {
|
||||
// each of the FragmentedMessages populated make a copy out of the
|
||||
// payload, which they release separately, so we can release
|
||||
|
Reference in New Issue
Block a user