forked from I2P_Developers/i2p.i2p
on kaffe i've periodically seen some hangs in the jobqueue, so lets try being a bit more conservative with the synchroniation, and include some debugging output in the router console to help track it down (if this doesnt fix it)
This commit is contained in:
@@ -120,7 +120,7 @@ public class JobQueue {
|
|||||||
I2PThread pumperThread = new I2PThread(_pumper);
|
I2PThread pumperThread = new I2PThread(_pumper);
|
||||||
pumperThread.setDaemon(true);
|
pumperThread.setDaemon(true);
|
||||||
pumperThread.setName("QueuePumper");
|
pumperThread.setName("QueuePumper");
|
||||||
//pumperThread.setPriority(I2PThread.MIN_PRIORITY);
|
//pumperThread.setPriority(I2PThread.NORM_PRIORITY+1);
|
||||||
pumperThread.start();
|
pumperThread.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -373,10 +373,13 @@ public class JobQueue {
|
|||||||
*/
|
*/
|
||||||
private void awaken(int numMadeReady) {
|
private void awaken(int numMadeReady) {
|
||||||
// notify a sufficient number of waiting runners
|
// notify a sufficient number of waiting runners
|
||||||
for (int i = 0; i < numMadeReady; i++) {
|
//for (int i = 0; i < numMadeReady; i++) {
|
||||||
synchronized (_runnerLock) {
|
// synchronized (_runnerLock) {
|
||||||
_runnerLock.notify();
|
// _runnerLock.notify();
|
||||||
}
|
// }
|
||||||
|
//}
|
||||||
|
synchronized (_runnerLock) {
|
||||||
|
_runnerLock.notify();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -414,17 +417,12 @@ public class JobQueue {
|
|||||||
timeToWait = timeLeft;
|
timeToWait = timeLeft;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (toAdd == null) {
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
|
||||||
_log.debug("Waiting " + timeToWait + " before rechecking the timed queue");
|
|
||||||
try {
|
|
||||||
_timedJobs.wait(timeToWait);
|
|
||||||
} catch (InterruptedException ie) {}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
if (toAdd != null) {
|
if (toAdd != null) {
|
||||||
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
|
_log.debug("Not waiting - we have " + toAdd.size() + " newly ready jobs");
|
||||||
synchronized (_readyJobs) {
|
synchronized (_readyJobs) {
|
||||||
// rather than addAll, which allocs a byte array rv before adding,
|
// rather than addAll, which allocs a byte array rv before adding,
|
||||||
// we iterate, since toAdd is usually going to only be 1 or 2 entries
|
// we iterate, since toAdd is usually going to only be 1 or 2 entries
|
||||||
@@ -436,6 +434,14 @@ public class JobQueue {
|
|||||||
}
|
}
|
||||||
|
|
||||||
awaken(toAdd.size());
|
awaken(toAdd.size());
|
||||||
|
} else {
|
||||||
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
|
_log.debug("Waiting " + timeToWait + " before rechecking the timed queue");
|
||||||
|
try {
|
||||||
|
synchronized (_timedJobs) {
|
||||||
|
_timedJobs.wait(timeToWait);
|
||||||
|
}
|
||||||
|
} catch (InterruptedException ie) {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
@@ -548,8 +554,15 @@ public class JobQueue {
|
|||||||
ArrayList timedJobs = null;
|
ArrayList timedJobs = null;
|
||||||
ArrayList activeJobs = new ArrayList(1);
|
ArrayList activeJobs = new ArrayList(1);
|
||||||
ArrayList justFinishedJobs = new ArrayList(4);
|
ArrayList justFinishedJobs = new ArrayList(4);
|
||||||
|
out.write("<!-- jobQueue rendering -->\n".getBytes());
|
||||||
|
out.flush();
|
||||||
synchronized (_readyJobs) { readyJobs = new ArrayList(_readyJobs); }
|
synchronized (_readyJobs) { readyJobs = new ArrayList(_readyJobs); }
|
||||||
|
out.write("<!-- jobQueue rendering: after readyJobs sync -->\n".getBytes());
|
||||||
|
out.flush();
|
||||||
synchronized (_timedJobs) { timedJobs = new ArrayList(_timedJobs); }
|
synchronized (_timedJobs) { timedJobs = new ArrayList(_timedJobs); }
|
||||||
|
out.write("<!-- jobQueue rendering: after timedJobs sync -->\n".getBytes());
|
||||||
|
out.flush();
|
||||||
|
int numRunners = 0;
|
||||||
synchronized (_queueRunners) {
|
synchronized (_queueRunners) {
|
||||||
for (Iterator iter = _queueRunners.values().iterator(); iter.hasNext();) {
|
for (Iterator iter = _queueRunners.values().iterator(); iter.hasNext();) {
|
||||||
JobQueueRunner runner = (JobQueueRunner)iter.next();
|
JobQueueRunner runner = (JobQueueRunner)iter.next();
|
||||||
@@ -561,14 +574,15 @@ public class JobQueue {
|
|||||||
justFinishedJobs.add(job);
|
justFinishedJobs.add(job);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
numRunners = _queueRunners.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
out.write("<!-- jobQueue rendering: after queueRunners sync -->\n".getBytes());
|
||||||
|
out.flush();
|
||||||
|
|
||||||
StringBuffer buf = new StringBuffer(32*1024);
|
StringBuffer buf = new StringBuffer(32*1024);
|
||||||
buf.append("<h2>JobQueue</h2>");
|
buf.append("<h2>JobQueue</h2>");
|
||||||
buf.append("# runners: ");
|
buf.append("# runners: ").append(numRunners);
|
||||||
synchronized (_queueRunners) {
|
|
||||||
buf.append(_queueRunners.size());
|
|
||||||
}
|
|
||||||
buf.append("<br />\n");
|
buf.append("<br />\n");
|
||||||
|
|
||||||
long now = _context.clock().now();
|
long now = _context.clock().now();
|
||||||
@@ -607,7 +621,15 @@ public class JobQueue {
|
|||||||
buf.append(new Date(j.getTiming().getStartAfter())).append("</li>\n");
|
buf.append(new Date(j.getTiming().getStartAfter())).append("</li>\n");
|
||||||
}
|
}
|
||||||
buf.append("</ol>\n");
|
buf.append("</ol>\n");
|
||||||
|
|
||||||
|
out.write("<!-- jobQueue rendering: after main buffer, before stats -->\n".getBytes());
|
||||||
|
out.flush();
|
||||||
|
|
||||||
getJobStats(buf);
|
getJobStats(buf);
|
||||||
|
|
||||||
|
out.write("<!-- jobQueue rendering: after stats -->\n".getBytes());
|
||||||
|
out.flush();
|
||||||
|
|
||||||
out.write(buf.toString().getBytes());
|
out.write(buf.toString().getBytes());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user