forked from I2P_Developers/i2p.i2p
SimpleTimer2:
- Fix bug in forceReschedule() that caused subsequent uncaught IllegalStateException; forceReschedule() is only used by streaming timers - Log uncaught exceptions - Enforce 5 second minimum delay for periodic events - atomic count - de-wtf
This commit is contained in:
@@ -6,6 +6,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import net.i2p.I2PAppContext;
|
||||
|
||||
@@ -38,7 +39,7 @@ public class SimpleTimer2 {
|
||||
private static final int MAX_THREADS = 4;
|
||||
private final ScheduledThreadPoolExecutor _executor;
|
||||
private final String _name;
|
||||
private volatile int _count;
|
||||
private final AtomicInteger _count = new AtomicInteger();
|
||||
private final int _threads;
|
||||
|
||||
/**
|
||||
@@ -102,7 +103,7 @@ public class SimpleTimer2 {
|
||||
super.afterExecute(r, t);
|
||||
if (t != null) { // shoudn't happen, caught in RunnableEvent.run()
|
||||
Log log = I2PAppContext.getGlobalContext().logManager().getLog(SimpleTimer2.class);
|
||||
log.log(Log.CRIT, "wtf, event borked: " + r, t);
|
||||
log.log(Log.CRIT, "event borked: " + r, t);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -110,7 +111,7 @@ public class SimpleTimer2 {
|
||||
private class CustomThreadFactory implements ThreadFactory {
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread rv = Executors.defaultThreadFactory().newThread(r);
|
||||
rv.setName(_name + ' ' + (++_count) + '/' + _threads);
|
||||
rv.setName(_name + ' ' + _count.incrementAndGet() + '/' + _threads);
|
||||
// Uncomment this to test threadgrouping, but we should be all safe now that the constructor preallocates!
|
||||
// String name = rv.getThreadGroup().getName();
|
||||
// if(!name.equals("main")) {
|
||||
@@ -164,7 +165,8 @@ public class SimpleTimer2 {
|
||||
* New code should use SimpleTimer2.TimedEvent.
|
||||
*
|
||||
* @since 0.9.20
|
||||
* @param timeoutMs run first and subsequent iterations of this event every timeoutMs ms
|
||||
* @param timeoutMs run subsequent iterations of this event every timeoutMs ms, 5000 minimum
|
||||
* @throws IllegalArgumentException if timeoutMs less than 5000
|
||||
*/
|
||||
public void addPeriodicEvent(final SimpleTimer.TimedEvent event, final long timeoutMs) {
|
||||
addPeriodicEvent(event, timeoutMs, timeoutMs);
|
||||
@@ -183,7 +185,8 @@ public class SimpleTimer2 {
|
||||
*
|
||||
* @since 0.9.20
|
||||
* @param delay run the first iteration of this event after delay ms
|
||||
* @param timeoutMs run subsequent iterations of this event every timeoutMs ms
|
||||
* @param timeoutMs run subsequent iterations of this event every timeoutMs ms, 5000 minimum
|
||||
* @throws IllegalArgumentException if timeoutMs less than 5000
|
||||
*/
|
||||
public void addPeriodicEvent(final SimpleTimer.TimedEvent event, final long delay, final long timeoutMs) {
|
||||
|
||||
@@ -286,9 +289,12 @@ public class SimpleTimer2 {
|
||||
public synchronized void schedule(long timeoutMs) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Scheduling: " + this + " timeout = " + timeoutMs + " state: " + _state);
|
||||
if (timeoutMs <= 0 && _log.shouldLog(Log.WARN))
|
||||
if (timeoutMs <= 0) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Timeout <= 0: " + this + " timeout = " + timeoutMs + " state: " + _state);
|
||||
timeoutMs = 1; // otherwise we may execute before _future is updated, which is fine
|
||||
// except it triggers 'early execution' warning logging
|
||||
}
|
||||
|
||||
// always set absolute time of execution
|
||||
_nextRun = timeoutMs + System.currentTimeMillis();
|
||||
@@ -352,11 +358,13 @@ public class SimpleTimer2 {
|
||||
* @param timeoutMs
|
||||
*/
|
||||
public synchronized void forceReschedule(long timeoutMs) {
|
||||
// don't cancel while running!
|
||||
if (_state == TimedEventState.SCHEDULED)
|
||||
cancel();
|
||||
schedule(timeoutMs);
|
||||
}
|
||||
|
||||
/** returns true if cancelled */
|
||||
/** @return true if cancelled */
|
||||
public synchronized boolean cancel() {
|
||||
// always clear
|
||||
_rescheduleAfterRun = false;
|
||||
@@ -378,20 +386,29 @@ public class SimpleTimer2 {
|
||||
}
|
||||
|
||||
public void run() {
|
||||
try {
|
||||
run2();
|
||||
} catch (RuntimeException re) {
|
||||
_log.error("timer error", re);
|
||||
throw re;
|
||||
}
|
||||
}
|
||||
|
||||
private void run2() {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Running: " + this);
|
||||
long before = System.currentTimeMillis();
|
||||
long delay = 0;
|
||||
synchronized(this) {
|
||||
if (_rescheduleAfterRun)
|
||||
throw new IllegalStateException("rescheduleAfterRun cannot be true here");
|
||||
throw new IllegalStateException(this + " rescheduleAfterRun cannot be true here");
|
||||
|
||||
switch(_state) {
|
||||
case CANCELLED:
|
||||
return; // goodbye
|
||||
case IDLE: // fall through
|
||||
case RUNNING:
|
||||
throw new IllegalStateException("not possible to be in " + _state);
|
||||
throw new IllegalStateException(this + " not possible to be in " + _state);
|
||||
case SCHEDULED: // proceed, switch to IDLE in case I need to reschedule
|
||||
_state = TimedEventState.IDLE;
|
||||
}
|
||||
@@ -411,12 +428,12 @@ public class SimpleTimer2 {
|
||||
if (_future != null)
|
||||
delay = _future.getDelay(TimeUnit.MILLISECONDS);
|
||||
else if (_log.shouldLog(Log.WARN))
|
||||
_log.warn(_pool + " wtf, no _future " + this);
|
||||
_log.warn(_pool + " no _future " + this);
|
||||
// This can be an incorrect warning especially after a schedule(0)
|
||||
if (_log.shouldLog(Log.WARN) && delay > 100)
|
||||
_log.warn(_pool + " wtf, early execution " + delay + ": " + this);
|
||||
_log.warn(_pool + " early execution " + delay + ": " + this);
|
||||
else if (_log.shouldLog(Log.WARN) && delay < -1000)
|
||||
_log.warn(" wtf, late execution " + (0 - delay) + ": " + this + _pool.debug());
|
||||
_log.warn(" late execution " + (0 - delay) + ": " + this + _pool.debug());
|
||||
try {
|
||||
timeReached();
|
||||
} catch (Throwable t) {
|
||||
@@ -426,7 +443,7 @@ public class SimpleTimer2 {
|
||||
switch(_state) {
|
||||
case SCHEDULED: // fall through
|
||||
case IDLE:
|
||||
throw new IllegalStateException("can't be " + _state);
|
||||
throw new IllegalStateException(this + " can't be " + _state);
|
||||
case CANCELLED:
|
||||
break; // nothing
|
||||
case RUNNING:
|
||||
@@ -441,7 +458,7 @@ public class SimpleTimer2 {
|
||||
}
|
||||
long time = System.currentTimeMillis() - before;
|
||||
if (time > 500 && _log.shouldLog(Log.WARN))
|
||||
_log.warn(_pool + " wtf, event execution took " + time + ": " + this);
|
||||
_log.warn(_pool + " event execution took " + time + ": " + this);
|
||||
if (_log.shouldLog(Log.INFO)) {
|
||||
// this call is slow - iterates through a HashMap -
|
||||
// would be better to have a local AtomicLong if we care
|
||||
@@ -470,6 +487,7 @@ public class SimpleTimer2 {
|
||||
return _executor.getCompletedTaskCount();
|
||||
}
|
||||
|
||||
/** warning - slow */
|
||||
private String debug() {
|
||||
_executor.purge(); // Remove cancelled tasks from the queue so we get a good queue size stat
|
||||
return
|
||||
@@ -490,10 +508,13 @@ public class SimpleTimer2 {
|
||||
* Schedule periodic event
|
||||
*
|
||||
* @param delay run the first iteration of this event after delay ms
|
||||
* @param timeoutMs run subsequent iterations of this event every timeoutMs ms
|
||||
* @param timeoutMs run subsequent iterations of this event every timeoutMs ms, 5000 minimum
|
||||
* @throws IllegalArgumentException if timeoutMs less than 5000
|
||||
*/
|
||||
public PeriodicTimedEvent(SimpleTimer2 pool, long delay, long timeoutMs) {
|
||||
super(pool, delay);
|
||||
if (timeoutMs < 5000)
|
||||
throw new IllegalArgumentException("timeout minimum 5000");
|
||||
_timeoutMs = timeoutMs;
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user