synchronization reduction and keep track of the 'last' job for each runner (to help debug something i see once a week on kaffe)

This commit is contained in:
jrandom
2004-05-07 17:51:28 +00:00
committed by zzz
parent 07b6a8ba92
commit 303e257841
3 changed files with 90 additions and 61 deletions

View File

@@ -13,6 +13,8 @@ import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.SortedMap;
import java.util.Collections;
import net.i2p.router.message.HandleSourceRouteReplyMessageJob; import net.i2p.router.message.HandleSourceRouteReplyMessageJob;
import net.i2p.router.networkdb.HandleDatabaseLookupMessageJob; import net.i2p.router.networkdb.HandleDatabaseLookupMessageJob;
@@ -43,7 +45,7 @@ public class JobQueue {
/** when true, don't run any new jobs or update any limits, etc */ /** when true, don't run any new jobs or update any limits, etc */
private boolean _paused; private boolean _paused;
/** job name to JobStat for that job */ /** job name to JobStat for that job */
private TreeMap _jobStats; private SortedMap _jobStats;
/** how many job queue runners can go concurrently */ /** how many job queue runners can go concurrently */
private int _maxRunners; private int _maxRunners;
private QueuePumper _pumper; private QueuePumper _pumper;
@@ -116,7 +118,7 @@ public class JobQueue {
_timedJobs = new ArrayList(); _timedJobs = new ArrayList();
_queueRunners = new HashMap(); _queueRunners = new HashMap();
_paused = false; _paused = false;
_jobStats = new TreeMap(); _jobStats = Collections.synchronizedSortedMap(new TreeMap());
_allowParallelOperation = false; _allowParallelOperation = false;
_pumper = new QueuePumper(); _pumper = new QueuePumper();
I2PThread pumperThread = new I2PThread(_pumper); I2PThread pumperThread = new I2PThread(_pumper);
@@ -436,13 +438,15 @@ public class JobQueue {
MessageHistory hist = _context.messageHistory(); MessageHistory hist = _context.messageHistory();
long uptime = _context.router().getUptime(); long uptime = _context.router().getUptime();
synchronized (_jobStats) { JobStats stats = null;
if (!_jobStats.containsKey(key)) if (!_jobStats.containsKey(key)) {
_jobStats.put(key, new JobStats(key)); _jobStats.put(key, new JobStats(key));
JobStats stats = (JobStats)_jobStats.get(key); // yes, if two runners finish the same job at the same time, this could
// create an extra object. but, who cares, its pushed out of the map
stats.jobRan(duration, lag); // immediately anyway.
} }
stats = (JobStats)_jobStats.get(key);
stats.jobRan(duration, lag);
String dieMsg = null; String dieMsg = null;
@@ -599,15 +603,20 @@ public class JobQueue {
ArrayList readyJobs = null; ArrayList readyJobs = null;
ArrayList timedJobs = null; ArrayList timedJobs = null;
ArrayList activeJobs = new ArrayList(4); ArrayList activeJobs = new ArrayList(4);
ArrayList justFinishedJobs = new ArrayList(4);
synchronized (_readyJobs) { readyJobs = new ArrayList(_readyJobs); } synchronized (_readyJobs) { readyJobs = new ArrayList(_readyJobs); }
synchronized (_timedJobs) { timedJobs = new ArrayList(_timedJobs); } synchronized (_timedJobs) { timedJobs = new ArrayList(_timedJobs); }
synchronized (_queueRunners) { synchronized (_queueRunners) {
for (Iterator iter = _queueRunners.values().iterator(); iter.hasNext();) { for (Iterator iter = _queueRunners.values().iterator(); iter.hasNext();) {
JobQueueRunner runner = (JobQueueRunner)iter.next(); JobQueueRunner runner = (JobQueueRunner)iter.next();
Job job = runner.getCurrentJob(); Job job = runner.getCurrentJob();
if (job != null) if (job != null) {
activeJobs.add(job.getName()); activeJobs.add(job.getName());
} else {
job = runner.getLastJob();
justFinishedJobs.add(job.getName());
} }
}
} }
StringBuffer buf = new StringBuffer(20*1024); StringBuffer buf = new StringBuffer(20*1024);
buf.append("<h2>JobQueue</h2>"); buf.append("<h2>JobQueue</h2>");
@@ -621,6 +630,11 @@ public class JobQueue {
buf.append("<li>").append(activeJobs.get(i)).append("</li>\n"); buf.append("<li>").append(activeJobs.get(i)).append("</li>\n");
} }
buf.append("</ol>\n"); buf.append("</ol>\n");
buf.append("# just finished jobs: ").append(justFinishedJobs.size()).append("<ol>\n");
for (int i = 0; i < justFinishedJobs.size(); i++) {
buf.append("<li>").append(justFinishedJobs.get(i)).append("</li>\n");
}
buf.append("</ol>\n");
buf.append("# ready/waiting jobs: ").append(readyJobs.size()).append(" <i>(lots of these mean there's likely a big problem)</i><ol>\n"); buf.append("# ready/waiting jobs: ").append(readyJobs.size()).append(" <i>(lots of these mean there's likely a big problem)</i><ol>\n");
for (int i = 0; i < readyJobs.size(); i++) { for (int i = 0; i < readyJobs.size(); i++) {
buf.append("<li>").append(readyJobs.get(i)).append("</li>\n"); buf.append("<li>").append(readyJobs.get(i)).append("</li>\n");
@@ -662,7 +676,7 @@ public class JobQueue {
TreeMap tstats = null; TreeMap tstats = null;
synchronized (_jobStats) { synchronized (_jobStats) {
tstats = (TreeMap)_jobStats.clone(); tstats = new TreeMap(_jobStats);
} }
for (Iterator iter = tstats.values().iterator(); iter.hasNext(); ) { for (Iterator iter = tstats.values().iterator(); iter.hasNext(); ) {

View File

@@ -12,6 +12,7 @@ class JobQueueRunner implements Runnable {
private int _id; private int _id;
private long _numJobs; private long _numJobs;
private Job _currentJob; private Job _currentJob;
private Job _lastJob;
public JobQueueRunner(RouterContext context, int id) { public JobQueueRunner(RouterContext context, int id) {
_context = context; _context = context;
@@ -19,6 +20,7 @@ class JobQueueRunner implements Runnable {
_keepRunning = true; _keepRunning = true;
_numJobs = 0; _numJobs = 0;
_currentJob = null; _currentJob = null;
_lastJob = null;
_log = _context.logManager().getLog(JobQueueRunner.class); _log = _context.logManager().getLog(JobQueueRunner.class);
_context.statManager().createRateStat("jobQueue.jobRun", "How long jobs take", "JobQueue", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); _context.statManager().createRateStat("jobQueue.jobRun", "How long jobs take", "JobQueue", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("jobQueue.jobLag", "How long jobs have to wait before running", "JobQueue", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); _context.statManager().createRateStat("jobQueue.jobLag", "How long jobs have to wait before running", "JobQueue", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
@@ -27,6 +29,7 @@ class JobQueueRunner implements Runnable {
} }
public Job getCurrentJob() { return _currentJob; } public Job getCurrentJob() { return _currentJob; }
public Job getLastJob() { return _lastJob; }
public int getRunnerId() { return _id; } public int getRunnerId() { return _id; }
public void stopRunning() { _keepRunning = false; } public void stopRunning() { _keepRunning = false; }
public void run() { public void run() {
@@ -51,6 +54,7 @@ class JobQueueRunner implements Runnable {
long betweenJobs = now - lastActive; long betweenJobs = now - lastActive;
_context.statManager().addRateData("jobQueue.jobRunnerInactive", betweenJobs, betweenJobs); _context.statManager().addRateData("jobQueue.jobRunnerInactive", betweenJobs, betweenJobs);
_currentJob = job; _currentJob = job;
_lastJob = null;
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Runner " + _id + " running job " + job.getJobId() + ": " + job.getName()); _log.debug("Runner " + _id + " running job " + job.getJobId() + ": " + job.getName());
long origStartAfter = job.getTiming().getStartAfter(); long origStartAfter = job.getTiming().getStartAfter();
@@ -75,6 +79,7 @@ class JobQueueRunner implements Runnable {
_log.debug("Job duration " + duration + "ms for " + job.getName() _log.debug("Job duration " + duration + "ms for " + job.getName()
+ " with lag of " + (doStart-origStartAfter) + "ms"); + " with lag of " + (doStart-origStartAfter) + "ms");
lastActive = _context.clock().now(); lastActive = _context.clock().now();
_lastJob = _currentJob;
_currentJob = null; _currentJob = null;
} catch (Throwable t) { } catch (Throwable t) {
if (_log.shouldLog(Log.CRIT)) if (_log.shouldLog(Log.CRIT))

View File

@@ -5,71 +5,81 @@ import net.i2p.data.DataHelper;
/** glorified struct to contain basic job stats */ /** glorified struct to contain basic job stats */
class JobStats { class JobStats {
private String _job; private String _job;
private long _numRuns; private volatile long _numRuns;
private long _totalTime; private volatile long _totalTime;
private long _maxTime; private volatile long _maxTime;
private long _minTime; private volatile long _minTime;
private long _totalPendingTime; private volatile long _totalPendingTime;
private long _maxPendingTime; private volatile long _maxPendingTime;
private long _minPendingTime; private volatile long _minPendingTime;
public JobStats(String name) { public JobStats(String name) {
_job = name; _job = name;
_numRuns = 0; _numRuns = 0;
_totalTime = 0; _totalTime = 0;
_maxTime = -1; _maxTime = -1;
_minTime = -1; _minTime = -1;
_totalPendingTime = 0; _totalPendingTime = 0;
_maxPendingTime = -1; _maxPendingTime = -1;
_minPendingTime = -1; _minPendingTime = -1;
} }
public void jobRan(long runTime, long lag) { public void jobRan(long runTime, long lag) {
_numRuns++; _numRuns++;
_totalTime += runTime; _totalTime += runTime;
if ( (_maxTime < 0) || (runTime > _maxTime) ) if ( (_maxTime < 0) || (runTime > _maxTime) )
_maxTime = runTime; _maxTime = runTime;
if ( (_minTime < 0) || (runTime < _minTime) ) if ( (_minTime < 0) || (runTime < _minTime) )
_minTime = runTime; _minTime = runTime;
_totalPendingTime += lag; _totalPendingTime += lag;
if ( (_maxPendingTime < 0) || (lag > _maxPendingTime) ) if ( (_maxPendingTime < 0) || (lag > _maxPendingTime) )
_maxPendingTime = lag; _maxPendingTime = lag;
if ( (_minPendingTime < 0) || (lag < _minPendingTime) ) if ( (_minPendingTime < 0) || (lag < _minPendingTime) )
_minPendingTime = lag; _minPendingTime = lag;
} }
public String getName() { return _job; } public String getName() { return _job; }
public long getRuns() { return _numRuns; } public long getRuns() { return _numRuns; }
public long getTotalTime() { return _totalTime; } public long getTotalTime() { return _totalTime; }
public long getMaxTime() { return _maxTime; } public long getMaxTime() { return _maxTime; }
public long getMinTime() { return _minTime; } public long getMinTime() { return _minTime; }
public long getAvgTime() { if (_numRuns > 0) return _totalTime / _numRuns; else return 0; } public long getAvgTime() {
if (_numRuns > 0)
return _totalTime / _numRuns;
else
return 0;
}
public long getTotalPendingTime() { return _totalPendingTime; } public long getTotalPendingTime() { return _totalPendingTime; }
public long getMaxPendingTime() { return _maxPendingTime; } public long getMaxPendingTime() { return _maxPendingTime; }
public long getMinPendingTime() { return _minPendingTime; } public long getMinPendingTime() { return _minPendingTime; }
public long getAvgPendingTime() { if (_numRuns > 0) return _totalPendingTime / _numRuns; else return 0; } public long getAvgPendingTime() {
if (_numRuns > 0)
return _totalPendingTime / _numRuns;
else
return 0;
}
public int hashCode() { return _job.hashCode(); } public int hashCode() { return _job.hashCode(); }
public boolean equals(Object obj) { public boolean equals(Object obj) {
if ( (obj != null) && (obj instanceof JobStats) ) { if ( (obj != null) && (obj instanceof JobStats) ) {
JobStats stats = (JobStats)obj; JobStats stats = (JobStats)obj;
return DataHelper.eq(getName(), stats.getName()) && return DataHelper.eq(getName(), stats.getName()) &&
getRuns() == stats.getRuns() && getRuns() == stats.getRuns() &&
getTotalTime() == stats.getTotalTime() && getTotalTime() == stats.getTotalTime() &&
getMaxTime() == stats.getMaxTime() && getMaxTime() == stats.getMaxTime() &&
getMinTime() == stats.getMinTime(); getMinTime() == stats.getMinTime();
} else { } else {
return false; return false;
} }
} }
public String toString() { public String toString() {
StringBuffer buf = new StringBuffer(); StringBuffer buf = new StringBuffer();
buf.append("Over ").append(getRuns()).append(" runs, job <b>").append(getName()).append("</b> took "); buf.append("Over ").append(getRuns()).append(" runs, job <b>").append(getName()).append("</b> took ");
buf.append(getTotalTime()).append("ms (").append(getAvgTime()).append("ms/").append(getMaxTime()).append("ms/"); buf.append(getTotalTime()).append("ms (").append(getAvgTime()).append("ms/").append(getMaxTime()).append("ms/");
buf.append(getMinTime()).append("ms avg/max/min) after a total lag of "); buf.append(getMinTime()).append("ms avg/max/min) after a total lag of ");
buf.append(getTotalPendingTime()).append("ms (").append(getAvgPendingTime()).append("ms/"); buf.append(getTotalPendingTime()).append("ms (").append(getAvgPendingTime()).append("ms/");
buf.append(getMaxPendingTime()).append("ms/").append(getMinPendingTime()).append("ms avg/max/min)"); buf.append(getMaxPendingTime()).append("ms/").append(getMinPendingTime()).append("ms avg/max/min)");
return buf.toString(); return buf.toString();
} }
} }