From 05f2a62cbb197e2809c924e2f679f4217015fd77 Mon Sep 17 00:00:00 2001 From: zzz Date: Tue, 9 Mar 2010 17:32:29 +0000 Subject: [PATCH] * Job Queue: - Replace some locks with concurrent - Change job ID to a long so it won't wrap - Remove some unused stats - Java 5 and debug cleanup --- router/java/src/net/i2p/router/Job.java | 2 +- router/java/src/net/i2p/router/JobImpl.java | 6 +- router/java/src/net/i2p/router/JobQueue.java | 194 ++++++++++-------- .../src/net/i2p/router/JobQueueRunner.java | 52 ++--- 4 files changed, 135 insertions(+), 119 deletions(-) diff --git a/router/java/src/net/i2p/router/Job.java b/router/java/src/net/i2p/router/Job.java index 2a9f12a061..6c85c1af46 100644 --- a/router/java/src/net/i2p/router/Job.java +++ b/router/java/src/net/i2p/router/Job.java @@ -19,7 +19,7 @@ public interface Job { */ public String getName(); /** unique id */ - public int getJobId(); + public long getJobId(); /** * Timing criteria for the task */ diff --git a/router/java/src/net/i2p/router/JobImpl.java b/router/java/src/net/i2p/router/JobImpl.java index 2934855c8d..38aa860259 100644 --- a/router/java/src/net/i2p/router/JobImpl.java +++ b/router/java/src/net/i2p/router/JobImpl.java @@ -15,8 +15,8 @@ import net.i2p.util.Log; public abstract class JobImpl implements Job { private RouterContext _context; private JobTiming _timing; - private static int _idSrc = 0; - private int _id; + private static long _idSrc = 0; + private long _id; private Exception _addedBy; private long _madeReadyOn; @@ -28,7 +28,7 @@ public abstract class JobImpl implements Job { _madeReadyOn = 0; } - public int getJobId() { return _id; } + public long getJobId() { return _id; } public JobTiming getTiming() { return _timing; } public final RouterContext getContext() { return _context; } diff --git a/router/java/src/net/i2p/router/JobQueue.java b/router/java/src/net/i2p/router/JobQueue.java index 54d6b41f39..06a9004efe 100644 --- a/router/java/src/net/i2p/router/JobQueue.java +++ b/router/java/src/net/i2p/router/JobQueue.java @@ -12,10 +12,14 @@ import java.io.IOException; import java.io.Writer; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.Iterator; +import java.util.List; +import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import net.i2p.data.DataHelper; import net.i2p.router.networkdb.HandleDatabaseLookupMessageJob; @@ -33,15 +37,15 @@ public class JobQueue { private RouterContext _context; /** Integer (runnerId) to JobQueueRunner for created runners */ - private final HashMap _queueRunners; + private final Map _queueRunners; /** a counter to identify a job runner */ private volatile static int _runnerId = 0; /** list of jobs that are ready to run ASAP */ - private ArrayList _readyJobs; + private BlockingQueue _readyJobs; /** list of jobs that are scheduled for running in the future */ - private ArrayList _timedJobs; + private List _timedJobs; /** job name to JobStat for that job */ - private final SortedMap _jobStats; + private final Map _jobStats; /** how many job queue runners can go concurrently */ private int _maxRunners = 1; private QueuePumper _pumper; @@ -52,9 +56,12 @@ public class JobQueue { private final Object _jobLock; + /** how many when we go parallel */ + private static final int RUNNERS = 4; + /** default max # job queue runners operating */ private final static int DEFAULT_MAX_RUNNERS = 1; - /** router.config parameter to override the max runners */ + /** router.config parameter to override the max runners @deprecated unimplemented */ private final static String PROP_MAX_RUNNERS = "router.maxJobRunners"; /** how frequently should we check and update the max runners */ @@ -63,33 +70,39 @@ public class JobQueue { /** if a job is this lagged, spit out a warning, but keep going */ private long _lagWarning = DEFAULT_LAG_WARNING; private final static long DEFAULT_LAG_WARNING = 5*1000; + /** @deprecated unimplemented */ private final static String PROP_LAG_WARNING = "router.jobLagWarning"; - /** if a job is this lagged, the router is hosed, so shut it down */ + /** if a job is this lagged, the router is hosed, so spit out a warning (dont shut it down) */ private long _lagFatal = DEFAULT_LAG_FATAL; private final static long DEFAULT_LAG_FATAL = 30*1000; + /** @deprecated unimplemented */ private final static String PROP_LAG_FATAL = "router.jobLagFatal"; /** if a job takes this long to run, spit out a warning, but keep going */ private long _runWarning = DEFAULT_RUN_WARNING; private final static long DEFAULT_RUN_WARNING = 5*1000; + /** @deprecated unimplemented */ private final static String PROP_RUN_WARNING = "router.jobRunWarning"; - /** if a job takes this long to run, the router is hosed, so shut it down */ + /** if a job takes this long to run, the router is hosed, so spit out a warning (dont shut it down) */ private long _runFatal = DEFAULT_RUN_FATAL; private final static long DEFAULT_RUN_FATAL = 30*1000; + /** @deprecated unimplemented */ private final static String PROP_RUN_FATAL = "router.jobRunFatal"; /** don't enforce fatal limits until the router has been up for this long */ private long _warmupTime = DEFAULT_WARMUP_TIME; private final static long DEFAULT_WARMUP_TIME = 10*60*1000; - private final static String PROP_WARMUM_TIME = "router.jobWarmupTime"; + /** @deprecated unimplemented */ + private final static String PROP_WARMUP_TIME = "router.jobWarmupTime"; /** max ready and waiting jobs before we start dropping 'em */ private int _maxWaitingJobs = DEFAULT_MAX_WAITING_JOBS; private final static int DEFAULT_MAX_WAITING_JOBS = 100; + /** @deprecated unimplemented */ private final static String PROP_MAX_WAITING_JOBS = "router.maxWaitingJobs"; - + /** * queue runners wait on this whenever they're not doing anything, and * this gets notified *once* whenever there are ready jobs @@ -109,16 +122,14 @@ public class JobQueue { new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); _alive = true; - _readyJobs = new ArrayList(16); + _readyJobs = new LinkedBlockingQueue(); _timedJobs = new ArrayList(64); _jobLock = new Object(); - _queueRunners = new HashMap(); - _jobStats = Collections.synchronizedSortedMap(new TreeMap()); + _queueRunners = new ConcurrentHashMap(RUNNERS); + _jobStats = new ConcurrentHashMap(); _allowParallelOperation = false; _pumper = new QueuePumper(); - I2PThread pumperThread = new I2PThread(_pumper); - pumperThread.setDaemon(true); - pumperThread.setName("QueuePumper"); + I2PThread pumperThread = new I2PThread(_pumper, "Job Queue Pumper", true); //pumperThread.setPriority(I2PThread.NORM_PRIORITY+1); pumperThread.start(); } @@ -128,7 +139,7 @@ public class JobQueue { * */ public void addJob(Job job) { - if (job == null) return; + if (job == null || !_alive) return; if (job instanceof JobImpl) ((JobImpl)job).addedToQueue(); @@ -136,6 +147,7 @@ public class JobQueue { long numReady = 0; boolean alreadyExists = false; boolean dropped = false; + // getNext() is now outside the jobLock, is that ok? synchronized (_jobLock) { if (_readyJobs.contains(job)) alreadyExists = true; @@ -155,7 +167,7 @@ public class JobQueue { job.getTiming().setStartAfter(_context.clock().now()); if (job instanceof JobImpl) ((JobImpl)job).madeReady(); - _readyJobs.add(job); + _readyJobs.offer(job); } else { _timedJobs.add(job); } @@ -167,12 +179,10 @@ public class JobQueue { _context.statManager().addRateData("jobQueue.readyJobs", numReady, 0); if (dropped) { _context.statManager().addRateData("jobQueue.droppedJobs", 1, 1); - if (_log.shouldLog(Log.WARN)) - _log.warn("Dropping job due to overload! # ready jobs: " + if (_log.shouldLog(Log.ERROR)) + _log.error("Dropping job due to overload! # ready jobs: " + numReady + ": job = " + job); } - - return; } public void removeJob(Job job) { @@ -189,17 +199,15 @@ public class JobQueue { } public int getReadyCount() { - synchronized (_jobLock) { return _readyJobs.size(); - } } + public long getMaxLag() { - synchronized (_jobLock) { - if (_readyJobs.size() <= 0) return 0; + Job j = _readyJobs.peek(); + if (j == null) return 0; // first job is the one that has been waiting the longest - long startAfter = ((Job)_readyJobs.get(0)).getTiming().getStartAfter(); + long startAfter = j.getTiming().getStartAfter(); return _context.clock().now() - startAfter; - } } /** @@ -228,9 +236,10 @@ public class JobQueue { public void allowParallelOperation() { _allowParallelOperation = true; - runQueue(4); + runQueue(RUNNERS); } + /** @deprecated do you really want to do this? */ public void restart() { synchronized (_jobLock) { _timedJobs.clear(); @@ -241,14 +250,21 @@ public class JobQueue { void shutdown() { _alive = false; - synchronized (_jobLock) { - _jobLock.notifyAll(); - } + _timedJobs.clear(); + _readyJobs.clear(); + // The JobQueueRunners are NOT daemons, + // so they must be stopped. + Job poison = new PoisonJob(); + for (int i = 0; i < _queueRunners.size(); i++) + _readyJobs.offer(poison); + + + /******** if (_log.shouldLog(Log.WARN)) { StringBuilder buf = new StringBuilder(1024); buf.append("current jobs: \n"); for (Iterator iter = _queueRunners.values().iterator(); iter.hasNext(); ) { - JobQueueRunner runner = (JobQueueRunner)iter.next(); + JobQueueRunner runner = iter.next(); Job j = runner.getCurrentJob(); buf.append("Runner ").append(runner.getRunnerId()).append(": "); @@ -279,7 +295,9 @@ public class JobQueue { buf.append(_timedJobs.get(i).toString()).append("\n\t"); _log.log(Log.WARN, buf.toString()); } + ********/ } + boolean isAlive() { return _alive; } /** @@ -287,9 +305,8 @@ public class JobQueue { */ public long getLastJobBegin() { long when = -1; - // not synchronized, so might b0rk if the runners are changed - for (Iterator iter = _queueRunners.values().iterator(); iter.hasNext(); ) { - long cur = ((JobQueueRunner)iter.next()).getLastBegin(); + for (JobQueueRunner runner : _queueRunners.values()) { + long cur = runner.getLastBegin(); if (cur > when) cur = when; } @@ -300,9 +317,8 @@ public class JobQueue { */ public long getLastJobEnd() { long when = -1; - // not synchronized, so might b0rk if the runners are changed - for (Iterator iter = _queueRunners.values().iterator(); iter.hasNext(); ) { - long cur = ((JobQueueRunner)iter.next()).getLastEnd(); + for (JobQueueRunner runner : _queueRunners.values()) { + long cur = runner.getLastEnd(); if (cur > when) cur = when; } @@ -315,9 +331,7 @@ public class JobQueue { public Job getLastJob() { Job j = null; long when = -1; - // not synchronized, so might b0rk if the runners are changed - for (Iterator iter = _queueRunners.values().iterator(); iter.hasNext(); ) { - JobQueueRunner cur = (JobQueueRunner)iter.next(); + for (JobQueueRunner cur : _queueRunners.values()) { if (cur.getLastBegin() > when) { j = cur.getCurrentJob(); when = cur.getLastBegin(); @@ -333,13 +347,10 @@ public class JobQueue { Job getNext() { while (_alive) { try { - synchronized (_jobLock) { - if (_readyJobs.size() > 0) { - return (Job)_readyJobs.remove(0); - } else { - _jobLock.wait(); - } - } + Job j = _readyJobs.take(); + if (j.getJobId() == POISON_ID) + break; + return j; } catch (InterruptedException ie) {} } if (_log.shouldLog(Log.WARN)) @@ -355,8 +366,7 @@ public class JobQueue { * the current job. * */ - public void runQueue(int numThreads) { - synchronized (_queueRunners) { + public synchronized void runQueue(int numThreads) { // we're still starting up [serially] and we've got at least one runner, // so dont do anything if ( (_queueRunners.size() > 0) && (!_allowParallelOperation) ) return; @@ -377,8 +387,7 @@ public class JobQueue { t.start(); } } else if (_queueRunners.size() == numThreads) { - for (Iterator iter = _queueRunners.values().iterator(); iter.hasNext(); ) { - JobQueueRunner runner = (JobQueueRunner)iter.next(); + for (JobQueueRunner runner : _queueRunners.values()) { runner.startRunning(); } } else { // numThreads < # runners, so shrink @@ -387,7 +396,6 @@ public class JobQueue { // runner.stopRunning(); //} } - } } void removeRunner(int id) { _queueRunners.remove(Integer.valueOf(id)); } @@ -407,11 +415,11 @@ public class JobQueue { while (_alive) { long now = _context.clock().now(); long timeToWait = -1; - ArrayList toAdd = null; + List toAdd = null; try { synchronized (_jobLock) { for (int i = 0; i < _timedJobs.size(); i++) { - Job j = (Job)_timedJobs.get(i); + Job j = _timedJobs.get(i); // find jobs due to start before now long timeLeft = j.getTiming().getStartAfter() - now; if (timeLeft <= 0) { @@ -437,7 +445,7 @@ public class JobQueue { // extra alloc. (no, i'm not just being insane - i'm updating this based // on some profiling data ;) for (int i = 0; i < toAdd.size(); i++) - _readyJobs.add(toAdd.get(i)); + _readyJobs.offer(toAdd.get(i)); _jobLock.notifyAll(); } else { if (timeToWait < 0) @@ -476,17 +484,15 @@ public class JobQueue { private void updateJobTimings(long delta) { synchronized (_jobLock) { for (int i = 0; i < _timedJobs.size(); i++) { - Job j = (Job)_timedJobs.get(i); + Job j = _timedJobs.get(i); j.getTiming().offsetChanged(delta); } - for (int i = 0; i < _readyJobs.size(); i++) { - Job j = (Job)_readyJobs.get(i); + for (Job j : _readyJobs) { j.getTiming().offsetChanged(delta); } } synchronized (_runnerLock) { - for (Iterator iter = _queueRunners.values().iterator(); iter.hasNext(); ) { - JobQueueRunner runner = (JobQueueRunner)iter.next(); + for (JobQueueRunner runner : _queueRunners.values()) { Job job = runner.getCurrentJob(); if (job != null) job.getTiming().offsetChanged(delta); @@ -509,14 +515,14 @@ public class JobQueue { if (lag < 0) lag = 0; if (duration < 0) duration = 0; - JobStats stats = null; - if (!_jobStats.containsKey(key)) { - _jobStats.put(key, new JobStats(key)); + JobStats stats = _jobStats.get(key); + if (stats == null) { + stats = new JobStats(key); + _jobStats.put(key, stats); // yes, if two runners finish the same job at the same time, this could // create an extra object. but, who cares, its pushed out of the map // immediately anyway. } - stats = (JobStats)_jobStats.get(key); stats.jobRan(duration, lag); String dieMsg = null; @@ -555,26 +561,39 @@ public class JobQueue { } + /** job ID counter changed from int to long so it won't wrap negative */ + private static final int POISON_ID = -99999; + + private static class PoisonJob implements Job { + public String getName() { return null; } + public long getJobId() { return POISON_ID; } + public JobTiming getTiming() { return null; } + public void runJob() {} + public Exception getAddedBy() { return null; } + public void dropped() {} + } + //// // the remainder are utility methods for dumping status info //// public void renderStatusHTML(Writer out) throws IOException { - ArrayList readyJobs = null; - ArrayList timedJobs = null; - ArrayList activeJobs = new ArrayList(1); - ArrayList justFinishedJobs = new ArrayList(4); + List readyJobs = null; + List timedJobs = null; + List activeJobs = new ArrayList(RUNNERS); + List justFinishedJobs = new ArrayList(RUNNERS); //out.write("\n"); out.flush(); - int states[] = null; + //int states[] = null; int numRunners = 0; - synchronized (_queueRunners) { - states = new int[_queueRunners.size()]; + + { + //states = new int[_queueRunners.size()]; int i = 0; - for (Iterator iter = _queueRunners.values().iterator(); iter.hasNext(); i++) { - JobQueueRunner runner = (JobQueueRunner)iter.next(); - states[i] = runner.getState(); + for (Iterator iter = _queueRunners.values().iterator(); iter.hasNext(); i++) { + JobQueueRunner runner = iter.next(); + //states[i] = runner.getState(); Job job = runner.getCurrentJob(); if (job != null) { activeJobs.add(job); @@ -621,21 +640,21 @@ public class JobQueue { buf.append("
Active jobs: ").append(activeJobs.size()).append("
    \n"); for (int i = 0; i < activeJobs.size(); i++) { - Job j = (Job)activeJobs.get(i); + Job j = activeJobs.get(i); buf.append("
  1. [started ").append(DataHelper.formatDuration(now-j.getTiming().getStartAfter())).append(" ago]: "); buf.append(j.toString()).append("
  2. \n"); } buf.append("
\n"); buf.append("
Just finished jobs: ").append(justFinishedJobs.size()).append("
    \n"); for (int i = 0; i < justFinishedJobs.size(); i++) { - Job j = (Job)justFinishedJobs.get(i); + Job j = justFinishedJobs.get(i); buf.append("
  1. [finished ").append(DataHelper.formatDuration(now-j.getTiming().getActualEnd())).append(" ago]: "); buf.append(j.toString()).append("
  2. \n"); } buf.append("
\n"); buf.append("
Ready/waiting jobs: ").append(readyJobs.size()).append("
    \n"); for (int i = 0; i < readyJobs.size(); i++) { - Job j = (Job)readyJobs.get(i); + Job j = readyJobs.get(i); buf.append("
  1. [waiting "); buf.append(DataHelper.formatDuration(now-j.getTiming().getStartAfter())); buf.append("]: "); @@ -645,13 +664,13 @@ public class JobQueue { out.flush(); buf.append("
    Scheduled jobs: ").append(timedJobs.size()).append("
      \n"); - TreeMap ordered = new TreeMap(); + TreeMap ordered = new TreeMap(); for (int i = 0; i < timedJobs.size(); i++) { - Job j = (Job)timedJobs.get(i); + Job j = timedJobs.get(i); ordered.put(new Long(j.getTiming().getStartAfter()), j); } - for (Iterator iter = ordered.values().iterator(); iter.hasNext(); ) { - Job j = (Job)iter.next(); + for (Iterator iter = ordered.values().iterator(); iter.hasNext(); ) { + Job j = iter.next(); long time = j.getTiming().getStartAfter() - now; buf.append("
    1. ").append(j.getName()).append(" in "); buf.append(DataHelper.formatDuration(time)).append("
    2. \n"); @@ -685,13 +704,10 @@ public class JobQueue { long maxPendingTime = -1; long minPendingTime = -1; - TreeMap tstats = null; - synchronized (_jobStats) { - tstats = new TreeMap(_jobStats); - } + TreeMap tstats = new TreeMap(_jobStats); - for (Iterator iter = tstats.values().iterator(); iter.hasNext(); ) { - JobStats stats = (JobStats)iter.next(); + for (Iterator iter = tstats.values().iterator(); iter.hasNext(); ) { + JobStats stats = iter.next(); buf.append(""); buf.append("").append(stats.getName()).append(""); buf.append("").append(stats.getRuns()).append(""); diff --git a/router/java/src/net/i2p/router/JobQueueRunner.java b/router/java/src/net/i2p/router/JobQueueRunner.java index c6633dbb2d..8b55a8c6ca 100644 --- a/router/java/src/net/i2p/router/JobQueueRunner.java +++ b/router/java/src/net/i2p/router/JobQueueRunner.java @@ -23,12 +23,12 @@ class JobQueueRunner implements Runnable { _currentJob = null; _lastJob = null; _log = _context.logManager().getLog(JobQueueRunner.class); - _context.statManager().createRateStat("jobQueue.jobRun", "How long jobs take", "JobQueue", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); - _context.statManager().createRateStat("jobQueue.jobRunSlow", "How long jobs that take over a second take", "JobQueue", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); + _context.statManager().createRateStat("jobQueue.jobRun", "How long jobs take", "JobQueue", new long[] { 60*60*1000l, 24*60*60*1000l }); + _context.statManager().createRateStat("jobQueue.jobRunSlow", "How long jobs that take over a second take", "JobQueue", new long[] { 60*60*1000l, 24*60*60*1000l }); _context.statManager().createRateStat("jobQueue.jobLag", "How long jobs have to wait before running", "JobQueue", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); - _context.statManager().createRateStat("jobQueue.jobWait", "How long does a job sit on the job queue?", "JobQueue", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); - _context.statManager().createRateStat("jobQueue.jobRunnerInactive", "How long are runners inactive?", "JobQueue", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); - _state = 1; + _context.statManager().createRateStat("jobQueue.jobWait", "How long does a job sit on the job queue?", "JobQueue", new long[] { 60*60*1000l, 24*60*60*1000l }); + //_context.statManager().createRateStat("jobQueue.jobRunnerInactive", "How long are runners inactive?", "JobQueue", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); + //_state = 1; } final int getState() { return _state; } @@ -41,16 +41,16 @@ class JobQueueRunner implements Runnable { public long getLastBegin() { return _lastBegin; } public long getLastEnd() { return _lastEnd; } public void run() { - _state = 2; + //_state = 2; long lastActive = _context.clock().now(); long jobNum = 0; while ( (_keepRunning) && (_context.jobQueue().isAlive()) ) { - _state = 3; + //_state = 3; try { Job job = _context.jobQueue().getNext(); - _state = 4; + //_state = 4; if (job == null) { - _state = 5; + //_state = 5; if (_context.router().isAlive()) if (_log.shouldLog(Log.ERROR)) _log.error("getNext returned null - dead?"); @@ -60,14 +60,14 @@ class JobQueueRunner implements Runnable { long enqueuedTime = 0; if (job instanceof JobImpl) { - _state = 6; + //_state = 6; long when = ((JobImpl)job).getMadeReadyOn(); if (when <= 0) { - _state = 7; + //_state = 7; _log.error("Job was not made ready?! " + job, new Exception("Not made ready?!")); } else { - _state = 8; + //_state = 8; enqueuedTime = now - when; } } @@ -75,27 +75,27 @@ class JobQueueRunner implements Runnable { long betweenJobs = now - lastActive; _currentJob = job; _lastJob = null; - _state = 9; + //_state = 9; if (_log.shouldLog(Log.DEBUG)) _log.debug("Runner " + _id + " running job " + job.getJobId() + ": " + job.getName()); long origStartAfter = job.getTiming().getStartAfter(); long doStart = _context.clock().now(); - _state = 10; + //_state = 10; job.getTiming().start(); runCurrentJob(); job.getTiming().end(); - _state = 11; + //_state = 11; long duration = job.getTiming().getActualEnd() - job.getTiming().getActualStart(); long beforeUpdate = _context.clock().now(); - _state = 12; + //_state = 12; _context.jobQueue().updateStats(job, doStart, origStartAfter, duration); - _state = 13; + //_state = 13; long diff = _context.clock().now() - beforeUpdate; long lag = doStart - origStartAfter; if (lag < 0) lag = 0; - _context.statManager().addRateData("jobQueue.jobRunnerInactive", betweenJobs, betweenJobs); + //_context.statManager().addRateData("jobQueue.jobRunnerInactive", betweenJobs, betweenJobs); _context.statManager().addRateData("jobQueue.jobRun", duration, duration); _context.statManager().addRateData("jobQueue.jobLag", lag, 0); _context.statManager().addRateData("jobQueue.jobWait", enqueuedTime, enqueuedTime); @@ -107,7 +107,7 @@ class JobQueueRunner implements Runnable { + ") on job " + _currentJob); } - _state = 14; + //_state = 14; if (diff > 100) { if (_log.shouldLog(Log.WARN)) @@ -121,7 +121,7 @@ class JobQueueRunner implements Runnable { _currentJob = null; _lastEnd = lastActive; jobNum++; - _state = 15; + //_state = 15; //if ( (jobNum % 10) == 0) // System.gc(); @@ -130,22 +130,22 @@ class JobQueueRunner implements Runnable { _log.log(Log.CRIT, "WTF, error running?", t); } } - _state = 16; + //_state = 16; if (_context.router().isAlive()) if (_log.shouldLog(Log.CRIT)) _log.log(Log.CRIT, "Queue runner " + _id + " exiting"); _context.jobQueue().removeRunner(_id); - _state = 17; + //_state = 17; } private void runCurrentJob() { try { - _state = 18; + //_state = 18; _lastBegin = _context.clock().now(); _currentJob.runJob(); - _state = 19; + //_state = 19; } catch (OutOfMemoryError oom) { - _state = 20; + //_state = 20; try { if (_log.shouldLog(Log.CRIT)) _log.log(Log.CRIT, "Router ran out of memory, shutting down", oom); @@ -157,7 +157,7 @@ class JobQueueRunner implements Runnable { try { Thread.sleep(1000); } catch (InterruptedException ie) {} System.exit(-1); } catch (Throwable t) { - _state = 21; + //_state = 21; if (_log.shouldLog(Log.CRIT)) _log.log(Log.CRIT, "Error processing job [" + _currentJob.getName() + "] on thread " + _id + ": " + t.getMessage(), t);