From dca66c8de88b57fe938f695363fa4306c4ebfd08 Mon Sep 17 00:00:00 2001 From: jrandom Date: Tue, 6 Jul 2004 14:38:35 +0000 Subject: [PATCH] leave all threads at base priority (except the client runner, where we push at max) don't consider a connection valid until it has been up for 30 seconds (so people who are simply establishing connections but whose nats are still messed up get the error) when dealing with expired after accepted, dont drop unless it expired outside the fudge factor increase the default maxWaitingJobs to 100, since we can get lots at once (and we dont gobble as much memory as we used to) also, don't wake up the jobQueueRunner in getNext once a second, instead just let the threads updating the queue notify --- router/java/src/net/i2p/router/JobQueue.java | 30 ++++++++++++------- .../net/i2p/router/admin/AdminListener.java | 2 +- .../net/i2p/router/admin/AdminManager.java | 2 +- .../message/HandleTunnelMessageJob.java | 12 ++++++-- .../tcp/RestrictiveTCPConnection.java | 2 +- .../router/transport/tcp/TCPTransport.java | 5 ++-- 6 files changed, 36 insertions(+), 17 deletions(-) diff --git a/router/java/src/net/i2p/router/JobQueue.java b/router/java/src/net/i2p/router/JobQueue.java index fa3465a5c..d261387d5 100644 --- a/router/java/src/net/i2p/router/JobQueue.java +++ b/router/java/src/net/i2p/router/JobQueue.java @@ -86,7 +86,7 @@ public class JobQueue { /** max ready and waiting jobs before we start dropping 'em */ private int _maxWaitingJobs = DEFAULT_MAX_WAITING_JOBS; - private final static int DEFAULT_MAX_WAITING_JOBS = 20; + private final static int DEFAULT_MAX_WAITING_JOBS = 100; private final static String PROP_MAX_WAITING_JOBS = "router.maxWaitingJobs"; /** @@ -117,7 +117,7 @@ public class JobQueue { I2PThread pumperThread = new I2PThread(_pumper); pumperThread.setDaemon(true); pumperThread.setName("QueuePumper"); - pumperThread.setPriority(I2PThread.MIN_PRIORITY); + //pumperThread.setPriority(I2PThread.MIN_PRIORITY); pumperThread.start(); } @@ -286,12 +286,12 @@ public class JobQueue { return rv; } else { if (_log.shouldLog(Log.DEBUG)) - _log.debug("No jobs pending, waiting a second"); + _log.debug("No jobs pending, waiting"); } try { synchronized (_runnerLock) { - _runnerLock.wait(1000); + _runnerLock.wait(); } } catch (InterruptedException ie) {} } @@ -367,6 +367,7 @@ public class JobQueue { _queueRunners.put(new Integer(i), runner); Thread t = new I2PThread(runner); t.setName("JobQueue"+(_runnerId++)); + //t.setPriority(I2PThread.MAX_PRIORITY-1); t.setDaemon(false); t.start(); } @@ -661,7 +662,7 @@ public class JobQueue { public String renderStatusHTML() { ArrayList readyJobs = null; ArrayList timedJobs = null; - ArrayList activeJobs = new ArrayList(4); + ArrayList activeJobs = new ArrayList(1); ArrayList justFinishedJobs = new ArrayList(4); synchronized (_readyJobs) { readyJobs = new ArrayList(_readyJobs); } synchronized (_timedJobs) { timedJobs = new ArrayList(_timedJobs); } @@ -670,10 +671,10 @@ public class JobQueue { JobQueueRunner runner = (JobQueueRunner)iter.next(); Job job = runner.getCurrentJob(); if (job != null) { - activeJobs.add(job.getName()); + activeJobs.add(job); } else { job = runner.getLastJob(); - justFinishedJobs.add(job.getName()); + justFinishedJobs.add(job); } } } @@ -684,19 +685,28 @@ public class JobQueue { buf.append(_queueRunners.size()); } buf.append("
\n"); + + long now = _context.clock().now(); + buf.append("# active jobs: ").append(activeJobs.size()).append("
    \n"); for (int i = 0; i < activeJobs.size(); i++) { - buf.append("
  1. ").append(activeJobs.get(i)).append("
  2. \n"); + Job j = (Job)activeJobs.get(i); + buf.append("
  3. [started ").append(now-j.getTiming().getStartAfter()).append("ms ago]: "); + buf.append(j.toString()).append("
  4. \n"); } buf.append("
\n"); buf.append("# just finished jobs: ").append(justFinishedJobs.size()).append("
    \n"); for (int i = 0; i < justFinishedJobs.size(); i++) { - buf.append("
  1. ").append(justFinishedJobs.get(i)).append("
  2. \n"); + Job j = (Job)justFinishedJobs.get(i); + buf.append("
  3. [finished ").append(now-j.getTiming().getActualEnd()).append("ms ago]: "); + buf.append(j.toString()).append("
  4. \n"); } buf.append("
\n"); buf.append("# ready/waiting jobs: ").append(readyJobs.size()).append(" (lots of these mean there's likely a big problem)
    \n"); for (int i = 0; i < readyJobs.size(); i++) { - buf.append("
  1. ").append(readyJobs.get(i)).append("
  2. \n"); + Job j = (Job)readyJobs.get(i); + buf.append("
  3. [waiting ").append(now-j.getTiming().getStartAfter()).append("ms]: "); + buf.append(j.toString()).append("
  4. \n"); } buf.append("
\n"); diff --git a/router/java/src/net/i2p/router/admin/AdminListener.java b/router/java/src/net/i2p/router/admin/AdminListener.java index 77cc81876..92b8bbcc3 100644 --- a/router/java/src/net/i2p/router/admin/AdminListener.java +++ b/router/java/src/net/i2p/router/admin/AdminListener.java @@ -97,7 +97,7 @@ public class AdminListener implements Runnable { AdminRunner runner = new AdminRunner(_context, socket); I2PThread t = new I2PThread(runner); t.setName("Admin Runner"); - t.setPriority(Thread.MIN_PRIORITY); + //t.setPriority(Thread.MIN_PRIORITY); t.setDaemon(true); t.start(); } diff --git a/router/java/src/net/i2p/router/admin/AdminManager.java b/router/java/src/net/i2p/router/admin/AdminManager.java index 0d916bcbf..39ddfa0b7 100644 --- a/router/java/src/net/i2p/router/admin/AdminManager.java +++ b/router/java/src/net/i2p/router/admin/AdminManager.java @@ -48,7 +48,7 @@ public class AdminManager implements Service { I2PThread t = new I2PThread(_listener); t.setName("Admin Listener:" + port); t.setDaemon(true); - t.setPriority(Thread.MIN_PRIORITY); + //t.setPriority(Thread.MIN_PRIORITY); t.start(); } } diff --git a/router/java/src/net/i2p/router/message/HandleTunnelMessageJob.java b/router/java/src/net/i2p/router/message/HandleTunnelMessageJob.java index d426f6c35..d40ee2155 100644 --- a/router/java/src/net/i2p/router/message/HandleTunnelMessageJob.java +++ b/router/java/src/net/i2p/router/message/HandleTunnelMessageJob.java @@ -31,6 +31,7 @@ import net.i2p.router.ClientMessage; import net.i2p.router.InNetMessage; import net.i2p.router.JobImpl; import net.i2p.router.MessageReceptionInfo; +import net.i2p.router.Router; import net.i2p.router.RouterContext; import net.i2p.router.TunnelInfo; import net.i2p.util.Log; @@ -64,18 +65,25 @@ public class HandleTunnelMessageJob extends JobImpl { TunnelId id = _message.getTunnelId(); long excessLag = _context.clock().now() - _message.getMessageExpiration().getTime(); - if (excessLag > 0) { + if (excessLag > Router.CLOCK_FUDGE_FACTOR) { // expired while on the queue if (_log.shouldLog(Log.WARN)) _log.warn("Accepted message (" + _message.getUniqueId() + ") expired on the queue for tunnel " + id.getTunnelId() + " expiring " - + (_context.clock().now() - _message.getMessageExpiration().getTime()) + + excessLag + "ms ago"); _context.statManager().addRateData("tunnel.expiredAfterAcceptTime", excessLag, excessLag); _context.messageHistory().messageProcessingError(_message.getUniqueId(), TunnelMessage.class.getName(), "tunnel message expired on the queue"); return; + } else if (excessLag > 0) { + // almost expired while on the queue + if (_log.shouldLog(Log.WARN)) + _log.warn("Accepted message (" + _message.getUniqueId() + ") *almost* expired on the queue for tunnel " + + id.getTunnelId() + " expiring " + + excessLag + + "ms ago"); } TunnelInfo info = _context.tunnelManager().getTunnelInfo(id); diff --git a/router/java/src/net/i2p/router/transport/tcp/RestrictiveTCPConnection.java b/router/java/src/net/i2p/router/transport/tcp/RestrictiveTCPConnection.java index 47ec67d2f..e555ceac5 100644 --- a/router/java/src/net/i2p/router/transport/tcp/RestrictiveTCPConnection.java +++ b/router/java/src/net/i2p/router/transport/tcp/RestrictiveTCPConnection.java @@ -224,7 +224,7 @@ class RestrictiveTCPConnection extends TCPConnection { I2PThread sockCreator = new I2PThread(creator); sockCreator.setDaemon(true); sockCreator.setName("PeerCallback:" + _transport.getListenPort()); - sockCreator.setPriority(I2PThread.MIN_PRIORITY); + //sockCreator.setPriority(I2PThread.MIN_PRIORITY); sockCreator.start(); if (_log.shouldLog(Log.DEBUG)) diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java b/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java index b5e49a601..bfa381180 100644 --- a/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java +++ b/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java @@ -237,7 +237,7 @@ public class TCPTransport extends TransportImpl { I2PThread sockCreator = new I2PThread(creator); sockCreator.setDaemon(true); sockCreator.setName("SocketCreator_:" + _listenPort); - sockCreator.setPriority(I2PThread.MIN_PRIORITY); + //sockCreator.setPriority(I2PThread.MIN_PRIORITY); sockCreator.start(); try { @@ -536,7 +536,7 @@ public class TCPTransport extends TransportImpl { String lifetime = null; for (int i = 0; i < curCons.size(); i++) { TCPConnection con = (TCPConnection)curCons.get(i); - if (con.getLifetime() > 0) { + if (con.getLifetime() > 30*1000) { established++; lifetime = DataHelper.formatDuration(con.getLifetime()); } @@ -545,6 +545,7 @@ public class TCPTransport extends TransportImpl { buf.append(lifetime); else buf.append("[pending]"); + buf.append("\n"); } buf.append("\n");