propagate from branch 'i2p.i2p' (head 2634e424f06e58231c47f1ec318e9ae21c94a8b3)

to branch 'i2p.i2p.zzz.test2' (head 68ca880caf69a06c0fd01fc70675df795aef1de2)
This commit is contained in:
zzz
2013-12-05 16:07:32 +00:00
25 changed files with 308 additions and 187 deletions

View File

@@ -229,13 +229,28 @@ public class Router implements RouterClock.ClockShiftListener {
// for the ping file
// Check for other router but do not start a thread yet so the update doesn't cause
// a NCDFE
if (!isOnlyRouterRunning()) {
_eventLog.addEvent(EventLog.ABORTED, "Another router running");
System.err.println("ERROR: There appears to be another router already running!");
System.err.println(" Please make sure to shut down old instances before starting up");
System.err.println(" a new one. If you are positive that no other instance is running,");
System.err.println(" please delete the file " + getPingFile().getAbsolutePath());
System.exit(-1);
for (int i = 0; i < 14; i++) {
// Wrapper can start us up too quickly after a crash, the ping file
// may still be less than LIVELINESS_DELAY (60s) old.
// So wait at least 60s to be sure.
if (isOnlyRouterRunning()) {
if (i > 0)
System.err.println("INFO: No, there wasn't another router already running. Proceeding with startup.");
break;
}
if (i < 13) {
if (i == 0)
System.err.println("WARN: There may be another router already running. Waiting a while to be sure...");
// yes this is ugly to sleep in the constructor.
try { Thread.sleep(5000); } catch (InterruptedException ie) {}
} else {
_eventLog.addEvent(EventLog.ABORTED, "Another router running");
System.err.println("ERROR: There appears to be another router already running!");
System.err.println(" Please make sure to shut down old instances before starting up");
System.err.println(" a new one. If you are positive that no other instance is running,");
System.err.println(" please delete the file " + getPingFile().getAbsolutePath());
System.exit(-1);
}
}
if (_config.get("router.firstVersion") == null) {

View File

@@ -25,6 +25,7 @@ import net.i2p.router.Router;
import net.i2p.router.RouterContext;
import net.i2p.router.networkdb.kademlia.FloodfillNetworkDatabaseFacade;
import net.i2p.util.ConcurrentHashSet;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer2;
@@ -89,6 +90,24 @@ class PeerManager {
super(_context.simpleTimer2(), REORGANIZE_TIME);
}
public void timeReached() {
(new ReorgThread(this)).start();
}
}
/**
* This takes too long to run on the SimpleTimer2 queue
* @since 0.9.10
*/
private class ReorgThread extends I2PThread {
private SimpleTimer2.TimedEvent _event;
public ReorgThread(SimpleTimer2.TimedEvent event) {
super("PeerManager Reorg");
setDaemon(true);
_event = event;
}
public void run() {
long start = System.currentTimeMillis();
try {
_organizer.reorganize(true);
@@ -104,7 +123,7 @@ class PeerManager {
delay = REORGANIZE_TIME_MEDIUM;
else
delay = REORGANIZE_TIME;
schedule(delay);
_event.schedule(delay);
}
}

View File

@@ -26,6 +26,7 @@ import net.i2p.router.RouterContext;
import net.i2p.router.transport.udp.UDPTransport;
import net.i2p.router.util.EventLog;
import net.i2p.util.Addresses;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer;
import net.i2p.util.SimpleTimer2;
@@ -223,6 +224,7 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
/* We hope the routerinfos are read in and things have settled down by now, but it's not required to be so */
private static final int START_DELAY = 5*60*1000;
private static final int LOOKUP_TIME = 30*60*1000;
private void startGeoIP() {
_context.simpleScheduler().addEvent(new QueueAll(), START_DELAY);
}
@@ -248,7 +250,26 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
private class Lookup implements SimpleTimer.TimedEvent {
public void timeReached() {
(new LookupThread()).start();
}
}
/**
* This takes too long to run on the SimpleTimer2 queue
* @since 0.9.10
*/
private class LookupThread extends I2PThread {
public LookupThread() {
super("GeoIP Lookup");
setDaemon(true);
}
public void run() {
long start = System.currentTimeMillis();
_geoIP.blockingLookup();
if (_log.shouldLog(Log.INFO))
_log.info("GeoIP lookup took " + (System.currentTimeMillis() - start));
}
}

View File

@@ -211,7 +211,7 @@ class EventPumper implements Runnable {
int failsafeInvalid = 0;
// Increase allowed idle time if we are well under allowed connections, otherwise decrease
if (_transport.haveCapacity(60))
if (_transport.haveCapacity(45))
_expireIdleWriteTime = Math.min(_expireIdleWriteTime + 1000, MAX_EXPIRE_IDLE_TIME);
else
_expireIdleWriteTime = Math.max(_expireIdleWriteTime - 3000, MIN_EXPIRE_IDLE_TIME);

View File

@@ -11,6 +11,7 @@ import java.util.Map;
import java.util.Set;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import net.i2p.data.Hash;
import net.i2p.data.SessionKey;
@@ -119,9 +120,9 @@ class PeerState {
*/
//private boolean _remoteWantsPreviousACKs;
/** how many bytes should we send to the peer in a second */
private volatile int _sendWindowBytes;
private int _sendWindowBytes;
/** how many bytes can we send to the peer in the current second */
private volatile int _sendWindowBytesRemaining;
private int _sendWindowBytesRemaining;
private long _lastSendRefill;
private int _sendBps;
private int _sendBytes;
@@ -225,13 +226,13 @@ class PeerState {
/** Make sure a 4229 byte TunnelBuildMessage can be sent in one volley with small MTU */
private static final int MIN_CONCURRENT_MSGS = 8;
/** how many concurrent outbound messages do we allow throws OutboundMessageFragments to send */
private volatile int _concurrentMessagesAllowed = MIN_CONCURRENT_MSGS;
private int _concurrentMessagesAllowed = MIN_CONCURRENT_MSGS;
/**
* how many outbound messages are currently being transmitted. Not thread safe, as we're not strict
*/
private volatile int _concurrentMessagesActive = 0;
private int _concurrentMessagesActive;
/** how many concurrency rejections have we had in a row */
private volatile int _consecutiveRejections = 0;
private int _consecutiveRejections;
/** is it inbound? **/
private final boolean _isInbound;
/** Last time it was made an introducer **/
@@ -436,9 +437,19 @@ class PeerState {
//public boolean getRemoteWantsPreviousACKs() { return _remoteWantsPreviousACKs; }
/** how many bytes should we send to the peer in a second */
public int getSendWindowBytes() { return _sendWindowBytes; }
public int getSendWindowBytes() {
synchronized(_outboundMessages) {
return _sendWindowBytes;
}
}
/** how many bytes can we send to the peer in the current second */
public int getSendWindowBytesRemaining() { return _sendWindowBytesRemaining; }
public int getSendWindowBytesRemaining() {
synchronized(_outboundMessages) {
return _sendWindowBytesRemaining;
}
}
/** what IP is the peer sending and receiving packets on? */
public byte[] getRemoteIP() { return _remoteIP; }
@@ -580,20 +591,24 @@ class PeerState {
/** return the smoothed send transfer rate */
public int getSendBps() { return _sendBps; }
public int getReceiveBps() { return _receiveBps; }
public int incrementConsecutiveFailedSends() {
_concurrentMessagesActive--;
if (_concurrentMessagesActive < 0)
_concurrentMessagesActive = 0;
//long now = _context.clock().now()/(10*1000);
//if (_lastFailedSendPeriod >= now) {
// // ignore... too fast
//} else {
// _lastFailedSendPeriod = now;
_consecutiveFailedSends++;
//}
return _consecutiveFailedSends;
synchronized(_outboundMessages) {
_concurrentMessagesActive--;
if (_concurrentMessagesActive < 0)
_concurrentMessagesActive = 0;
//long now = _context.clock().now()/(10*1000);
//if (_lastFailedSendPeriod >= now) {
// // ignore... too fast
//} else {
// _lastFailedSendPeriod = now;
_consecutiveFailedSends++;
//}
return _consecutiveFailedSends;
}
}
public long getInactivityTime() {
long now = _context.clock().now();
long lastActivity = Math.max(_lastReceiveTime, _lastSendFullyTime);
@@ -620,15 +635,17 @@ class PeerState {
* returning true if the full size can be decremented, false if it
* cannot. If it is not decremented, the window size remaining is
* not adjusted at all.
*
* Caller should synch
*/
public boolean allocateSendingBytes(int size, int messagePushCount) { return allocateSendingBytes(size, false, messagePushCount); }
private boolean allocateSendingBytes(int size, int messagePushCount) { return allocateSendingBytes(size, false, messagePushCount); }
public boolean allocateSendingBytes(int size, boolean isForACK) { return allocateSendingBytes(size, isForACK, -1); }
//private boolean allocateSendingBytes(int size, boolean isForACK) { return allocateSendingBytes(size, isForACK, -1); }
/**
* Caller should synch
*/
public boolean allocateSendingBytes(int size, boolean isForACK, int messagePushCount) {
private boolean allocateSendingBytes(int size, boolean isForACK, int messagePushCount) {
long now = _context.clock().now();
long duration = now - _lastSendRefill;
if (duration >= 1000) {
@@ -694,9 +711,25 @@ class PeerState {
****/
public int getSlowStartThreshold() { return _slowStartThreshold; }
public int getConcurrentSends() { return _concurrentMessagesActive; }
public int getConcurrentSendWindow() { return _concurrentMessagesAllowed; }
public int getConsecutiveSendRejections() { return _consecutiveRejections; }
public int getConcurrentSends() {
synchronized(_outboundMessages) {
return _concurrentMessagesActive;
}
}
public int getConcurrentSendWindow() {
synchronized(_outboundMessages) {
return _concurrentMessagesAllowed;
}
}
public int getConsecutiveSendRejections() {
synchronized(_outboundMessages) {
return _consecutiveRejections;
}
}
public boolean isInbound() { return _isInbound; }
/** @since IPv6 */
@@ -1674,6 +1707,8 @@ class PeerState {
/**
* Have 3 return values, because if allocateSendingBytes() returns false,
* then allocateSend() can stop iterating
*
* Caller should synch
*/
private ShouldSend locked_shouldSend(OutboundMessageState state) {
long now = _context.clock().now();

View File

@@ -2848,7 +2848,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
public void timeReached() {
// Increase allowed idle time if we are well under allowed connections, otherwise decrease
if (haveCapacity(60)) {
if (haveCapacity(45)) {
long inc;
// don't adjust too quickly if we are looping fast
if (_lastLoopShort)