From 0c049f39d96cf62b9f8058619ba2591b804f993f Mon Sep 17 00:00:00 2001 From: jrandom Date: Mon, 8 Nov 2004 05:40:20 +0000 Subject: [PATCH] 2004-11-08 jrandom * Remove spurious flush calls from I2PTunnel, and work with the I2PSocket's output stream directly (as the various implementations do their own buffering). * Another pass at a long standing JobQueue bug - dramatically simplify the job management synchronization since we dont need to deal with high contention (unlike last year when we had dozens of queue runners going at once). * Logging --- .../net/i2p/i2ptunnel/I2PTunnelRunner.java | 72 +++++---- .../client/streaming/StreamSinkServer.java | 9 +- history.txt | 12 +- router/java/src/net/i2p/router/JobQueue.java | 141 ++++++++---------- .../src/net/i2p/router/RouterVersion.java | 4 +- 5 files changed, 131 insertions(+), 107 deletions(-) diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java index 7df83d0ab..c850549f7 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java @@ -12,7 +12,6 @@ import java.net.Socket; import java.net.SocketException; import java.util.HashMap; -import net.i2p.client.I2PSession; import net.i2p.client.streaming.I2PSocket; import net.i2p.util.Clock; import net.i2p.util.I2PThread; @@ -38,13 +37,14 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL Object slock, finishLock = new Object(); boolean finished = false; HashMap ostreams, sockets; - I2PSession session; byte[] initialData; /** when the last data was sent/received (or -1 if never) */ private long lastActivityOn; /** when the runner started up */ private long startedOn; + private volatile long __forwarderId; + public I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialData) { this.s = s; this.i2ps = i2ps; @@ -55,6 +55,7 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL if (_log.shouldLog(Log.INFO)) _log.info("I2PTunnelRunner started"); _runnerId = ++__runnerId; + __forwarderId = i2ps.hashCode(); setName("I2PTunnelRunner " + _runnerId); start(); } @@ -96,15 +97,15 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL OutputStream out = new BufferedOutputStream(s.getOutputStream(), NETWORK_BUFFER_SIZE); i2ps.setSocketErrorListener(this); InputStream i2pin = i2ps.getInputStream(); - OutputStream i2pout = new BufferedOutputStream(i2ps.getOutputStream(), MAX_PACKET_SIZE); + OutputStream i2pout = i2ps.getOutputStream(); //new BufferedOutputStream(i2ps.getOutputStream(), MAX_PACKET_SIZE); if (initialData != null) { synchronized (slock) { i2pout.write(initialData); - i2pout.flush(); + //i2pout.flush(); } } - Thread t1 = new StreamForwarder(in, i2pout); - Thread t2 = new StreamForwarder(i2pin, out); + Thread t1 = new StreamForwarder(in, i2pout, "toI2P"); + Thread t2 = new StreamForwarder(i2pin, out, "fromI2P"); synchronized (finishLock) { while (!finished) { finishLock.wait(); @@ -118,19 +119,21 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL t1.join(); t2.join(); } catch (InterruptedException ex) { - _log.error("Interrupted", ex); + if (_log.shouldLog(Log.ERROR)) + _log.error("Interrupted", ex); } catch (IOException ex) { - ex.printStackTrace(); - _log.debug("Error forwarding", ex); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Error forwarding", ex); } catch (Exception e) { - _log.error("Internal error", e); + if (_log.shouldLog(Log.ERROR)) + _log.error("Internal error", e); } finally { try { if (s != null) s.close(); if (i2ps != null) i2ps.close(); } catch (IOException ex) { - ex.printStackTrace(); - _log.error("Could not close socket", ex); + if (_log.shouldLog(Log.ERROR)) + _log.error("Could not close socket", ex); } } } @@ -142,21 +145,31 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL } } - private volatile long __forwarderId = 0; - private class StreamForwarder extends I2PThread { InputStream in; OutputStream out; + String direction; - private StreamForwarder(InputStream in, OutputStream out) { + private StreamForwarder(InputStream in, OutputStream out, String dir) { this.in = in; this.out = out; + direction = dir; setName("StreamForwarder " + _runnerId + "." + (++__forwarderId)); start(); } public void run() { + if (_log.shouldLog(Log.DEBUG)) { + String from = i2ps.getThisDestination().calculateHash().toBase64().substring(0,6); + String to = i2ps.getPeerDestination().calculateHash().toBase64().substring(0,6); + + _log.debug(direction + ": Forwarding between " + + from + + " and " + + to); + } + byte[] buffer = new byte[NETWORK_BUFFER_SIZE]; try { int len; @@ -166,30 +179,39 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL if (len > 0) updateActivity(); if (in.available() == 0) { + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("Flushing after sending " + len + " bytes through"); + if (_log.shouldLog(Log.DEBUG)) + _log.debug(direction + ": " + len + " bytes flushed through to " + + i2ps.getPeerDestination().calculateHash().toBase64().substring(0,6)); try { Thread.sleep(I2PTunnel.PACKET_DELAY); } catch (InterruptedException e) { e.printStackTrace(); } - } - if (in.available() == 0) { - out.flush(); // make sure the data get though + + if (in.available() <= 0) + out.flush(); // make sure the data get though } } } catch (SocketException ex) { // this *will* occur when the other threads closes the socket synchronized (finishLock) { if (!finished) { - _log.debug("Socket closed - error reading and writing", - ex); + if (_log.shouldLog(Log.DEBUG)) + _log.debug(direction + ": Socket closed - error reading and writing", + ex); } } } catch (InterruptedIOException ex) { - _log.warn("Closing connection due to timeout (error: \"" - + ex.getMessage() + "\")"); + if (_log.shouldLog(Log.WARN)) + _log.warn(direction + ": Closing connection due to timeout (error: \"" + + ex.getMessage() + "\")"); } catch (IOException ex) { - if (!finished) - _log.error("Error forwarding", ex); + if (!finished) { + if (_log.shouldLog(Log.ERROR)) + _log.error(direction + ": Error forwarding", ex); + } //else // _log.warn("You may ignore this", ex); } finally { @@ -198,7 +220,7 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL in.close(); } catch (IOException ex) { if (_log.shouldLog(Log.WARN)) - _log.warn("Error closing streams", ex); + _log.warn(direction + ": Error closing streams", ex); } synchronized (finishLock) { finished = true; diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java b/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java index f9aae66e1..4d586ad3d 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java @@ -118,11 +118,15 @@ public class StreamSinkServer { try { InputStream in = _sock.getInputStream(); byte buf[] = new byte[4096]; + long written = 0; int read = 0; while ( (read = in.read(buf)) != -1) { _fos.write(buf, 0, read); + written += read; + if (_log.shouldLog(Log.DEBUG)) + _log.debug("read and wrote " + read); } - _log.error("Got EOF from client socket"); + _log.error("Got EOF from client socket [written=" + written + "]"); } catch (IOException ioe) { _log.error("Error writing the sink", ioe); } finally { @@ -143,6 +147,9 @@ public class StreamSinkServer { public static void main(String args[]) { StreamSinkServer server = null; switch (args.length) { + case 0: + server = new StreamSinkServer("dataDir", "server.key", "localhost", 10001); + break; case 2: server = new StreamSinkServer(args[0], args[1]); break; diff --git a/history.txt b/history.txt index 538aefc65..4885cc8c2 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,14 @@ -$Id: history.txt,v 1.66 2004/11/06 22:00:56 jrandom Exp $ +$Id: history.txt,v 1.67 2004/11/07 22:18:01 jrandom Exp $ + +2004-11-08 jrandom + * Remove spurious flush calls from I2PTunnel, and work with the + I2PSocket's output stream directly (as the various implementations + do their own buffering). + * Another pass at a long standing JobQueue bug - dramatically simplify + the job management synchronization since we dont need to deal with + high contention (unlike last year when we had dozens of queue runners + going at once). + * Logging 2004-11-08 jrandom * Make the SAM bridge more resiliant to bad handshakes (thanks duck!) diff --git a/router/java/src/net/i2p/router/JobQueue.java b/router/java/src/net/i2p/router/JobQueue.java index 0f105699a..1bff65959 100644 --- a/router/java/src/net/i2p/router/JobQueue.java +++ b/router/java/src/net/i2p/router/JobQueue.java @@ -52,6 +52,8 @@ public class JobQueue { /** have we been killed or are we alive? */ private boolean _alive; + private Object _jobLock; + /** default max # job queue runners operating */ private final static int DEFAULT_MAX_RUNNERS = 1; /** router.config parameter to override the max runners */ @@ -109,8 +111,9 @@ public class JobQueue { new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); _alive = true; - _readyJobs = new ArrayList(); - _timedJobs = new ArrayList(); + _readyJobs = new ArrayList(16); + _timedJobs = new ArrayList(64); + _jobLock = new Object(); _queueRunners = new HashMap(); _jobStats = Collections.synchronizedSortedMap(new TreeMap()); _allowParallelOperation = false; @@ -134,68 +137,59 @@ public class JobQueue { long numReady = 0; boolean alreadyExists = false; - synchronized (_readyJobs) { + synchronized (_jobLock) { if (_readyJobs.contains(job)) alreadyExists = true; numReady = _readyJobs.size(); - } - if (!alreadyExists) { - synchronized (_timedJobs) { + if (!alreadyExists) { if (_timedJobs.contains(job)) alreadyExists = true; } - } - _context.statManager().addRateData("jobQueue.readyJobs", numReady, 0); - if (shouldDrop(job, numReady)) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Dropping job due to overload! # ready jobs: " - + numReady + ": job = " + job); - job.dropped(); - _context.statManager().addRateData("jobQueue.droppedJobs", 1, 1); - synchronized (_readyJobs) { - _readyJobs.notifyAll(); + _context.statManager().addRateData("jobQueue.readyJobs", numReady, 0); + if (shouldDrop(job, numReady)) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Dropping job due to overload! # ready jobs: " + + numReady + ": job = " + job); + job.dropped(); + _context.statManager().addRateData("jobQueue.droppedJobs", 1, 1); + _jobLock.notifyAll(); + return; } - return; - } - if (!alreadyExists) { - if (job.getTiming().getStartAfter() <= _context.clock().now()) { - // don't skew us - its 'start after' its been queued, or later - job.getTiming().setStartAfter(_context.clock().now()); - if (job instanceof JobImpl) - ((JobImpl)job).madeReady(); - synchronized (_readyJobs) { + if (!alreadyExists) { + if (job.getTiming().getStartAfter() <= _context.clock().now()) { + // don't skew us - its 'start after' its been queued, or later + job.getTiming().setStartAfter(_context.clock().now()); + if (job instanceof JobImpl) + ((JobImpl)job).madeReady(); _readyJobs.add(job); - _readyJobs.notifyAll(); + _jobLock.notifyAll(); + } else { + _timedJobs.add(job); + _jobLock.notifyAll(); } } else { - synchronized (_timedJobs) { - _timedJobs.add(job); - _timedJobs.notifyAll(); - } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Not adding already enqueued job " + job.getName()); } - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Not adding already enqueued job " + job.getName()); } - return; } public void timingUpdated() { - synchronized (_timedJobs) { - _timedJobs.notifyAll(); + synchronized (_jobLock) { + _jobLock.notifyAll(); } } public int getReadyCount() { - synchronized (_readyJobs) { + synchronized (_jobLock) { return _readyJobs.size(); } } public long getMaxLag() { - synchronized (_readyJobs) { + synchronized (_jobLock) { if (_readyJobs.size() <= 0) return 0; // first job is the one that has been waiting the longest long startAfter = ((Job)_readyJobs.get(0)).getTiming().getStartAfter(); @@ -237,19 +231,17 @@ public class JobQueue { public void allowParallelOperation() { _allowParallelOperation = true; } public void restart() { - synchronized (_timedJobs) { + synchronized (_jobLock) { _timedJobs.clear(); - } - synchronized (_readyJobs) { _readyJobs.clear(); - _readyJobs.notifyAll(); + _jobLock.notifyAll(); } } void shutdown() { _alive = false; - synchronized (_readyJobs) { - _readyJobs.notifyAll(); + synchronized (_jobLock) { + _jobLock.notifyAll(); } if (_log.shouldLog(Log.WARN)) { StringBuffer buf = new StringBuffer(1024); @@ -339,11 +331,11 @@ public class JobQueue { */ Job getNext() { while (_alive) { - synchronized (_readyJobs) { + synchronized (_jobLock) { if (_readyJobs.size() > 0) { return (Job)_readyJobs.remove(0); } else { - try { _readyJobs.wait(); } catch (InterruptedException ie) {} + try { _jobLock.wait(); } catch (InterruptedException ie) {} } } } @@ -413,7 +405,7 @@ public class JobQueue { long now = _context.clock().now(); long timeToWait = 0; ArrayList toAdd = null; - synchronized (_timedJobs) { + synchronized (_jobLock) { for (int i = 0; i < _timedJobs.size(); i++) { Job j = (Job)_timedJobs.get(i); // find jobs due to start before now @@ -431,13 +423,10 @@ public class JobQueue { timeToWait = timeLeft; } } - } - - if (toAdd != null) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Not waiting - we have " + toAdd.size() + " newly ready jobs"); - synchronized (_readyJobs) { + if (toAdd != null) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Not waiting - we have " + toAdd.size() + " newly ready jobs"); // rather than addAll, which allocs a byte array rv before adding, // we iterate, since toAdd is usually going to only be 1 or 2 entries // and since readyJobs will often have the space, we can avoid the @@ -445,22 +434,20 @@ public class JobQueue { // on some profiling data ;) for (int i = 0; i < toAdd.size(); i++) _readyJobs.add(toAdd.get(i)); - _readyJobs.notifyAll(); + _jobLock.notifyAll(); + } else { + if (timeToWait < 100) + timeToWait = 100; + if (timeToWait > 10*1000) + timeToWait = 10*1000; + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Waiting " + timeToWait + " before rechecking the timed queue"); + try { + _jobLock.wait(timeToWait); + } catch (InterruptedException ie) {} } - } else { - if (timeToWait < 100) - timeToWait = 100; - if (timeToWait > 10*1000) - timeToWait = 10*1000; - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Waiting " + timeToWait + " before rechecking the timed queue"); - try { - synchronized (_timedJobs) { - _timedJobs.wait(timeToWait); - } - } catch (InterruptedException ie) {} - } - } + } // synchronize (_jobLock) + } // while (_alive) } catch (Throwable t) { _context.clock().removeUpdateListener(this); if (_log.shouldLog(Log.ERROR)) @@ -470,8 +457,8 @@ public class JobQueue { public void offsetChanged(long delta) { updateJobTimings(delta); - synchronized (_timedJobs) { - _timedJobs.notifyAll(); + synchronized (_jobLock) { + _jobLock.notifyAll(); } } @@ -482,13 +469,11 @@ public class JobQueue { * completion. */ private void updateJobTimings(long delta) { - synchronized (_timedJobs) { + synchronized (_jobLock) { for (int i = 0; i < _timedJobs.size(); i++) { Job j = (Job)_timedJobs.get(i); j.getTiming().offsetChanged(delta); } - } - synchronized (_readyJobs) { for (int i = 0; i < _readyJobs.size(); i++) { Job j = (Job)_readyJobs.get(i); j.getTiming().offsetChanged(delta); @@ -605,11 +590,11 @@ public class JobQueue { out.write(str.toString()); out.flush(); - synchronized (_readyJobs) { readyJobs = new ArrayList(_readyJobs); } - out.write("\n"); - out.flush(); - synchronized (_timedJobs) { timedJobs = new ArrayList(_timedJobs); } - out.write("\n"); + synchronized (_jobLock) { + readyJobs = new ArrayList(_readyJobs); + timedJobs = new ArrayList(_timedJobs); + } + out.write("\n"); out.flush(); StringBuffer buf = new StringBuffer(32*1024); diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 9f2057d3e..cad0b6445 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.71 $ $Date: 2004/11/06 21:25:13 $"; + public final static String ID = "$Revision: 1.72 $ $Date: 2004/11/06 22:00:57 $"; public final static String VERSION = "0.4.1.4"; - public final static long BUILD = 0; + public final static long BUILD = 1; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID);