From 26c4f983d7b5f039ce3834898e1add7f43fc9eb5 Mon Sep 17 00:00:00 2001 From: sponge Date: Fri, 29 May 2009 21:14:08 +0000 Subject: [PATCH] * added big fat start/stop lock into BOB * added zap command to shut down BOB... now we need a way to start it after it stops. :-) --- apps/BOB/src/net/i2p/BOB/BOB.java | 255 +++++++++++++----- apps/BOB/src/net/i2p/BOB/DoCMDS.java | 27 +- apps/BOB/src/net/i2p/BOB/I2Plistener.java | 2 - apps/BOB/src/net/i2p/BOB/MUXlisten.java | 96 ++++--- apps/BOB/src/net/i2p/BOB/TCPio.java | 50 ++-- apps/BOB/src/net/i2p/BOB/TCPlistener.java | 7 - history.txt | 5 + .../src/net/i2p/router/RouterVersion.java | 2 +- 8 files changed, 294 insertions(+), 150 deletions(-) diff --git a/apps/BOB/src/net/i2p/BOB/BOB.java b/apps/BOB/src/net/i2p/BOB/BOB.java index 830808a54..9f87e4f88 100644 --- a/apps/BOB/src/net/i2p/BOB/BOB.java +++ b/apps/BOB/src/net/i2p/BOB/BOB.java @@ -27,14 +27,18 @@ import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; +import java.net.ConnectException; import java.net.InetAddress; import java.net.ServerSocket; import java.net.Socket; +import java.net.SocketTimeoutException; import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; import net.i2p.client.I2PClient; import net.i2p.client.streaming.RetransmissionTimer; import net.i2p.util.Log; import net.i2p.util.SimpleScheduler; +import net.i2p.util.SimpleStore; import net.i2p.util.SimpleTimer2; /** @@ -114,10 +118,15 @@ public class BOB { public final static String PROP_CONFIG_LOCATION = "BOB.config"; public final static String PROP_BOB_PORT = "BOB.port"; public final static String PROP_BOB_HOST = "BOB.host"; - private static int maxConnections = 0; private static NamedDB database; private static Properties props = new Properties(); - + private static AtomicBoolean spin = new AtomicBoolean(true); + private static final String P_RUNNING = "RUNNING"; + private static final String P_STARTING = "STARTING"; + private static final String P_STOPPING = "STOPPING"; + private static AtomicBoolean lock = new AtomicBoolean(false); + // no longer used. + // private static int maxConnections = 0; /** * Log a warning @@ -149,6 +158,12 @@ public class BOB { _log.error(arg); } + /** + * Stop BOB gracefully + */ + public static void stop() { + spin.set(false); + } /** * Listen for incoming connections and handle them * @@ -156,6 +171,7 @@ public class BOB { */ public static void main(String[] args) { database = new NamedDB(); + ServerSocket listener = null; int i = 0; boolean save = false; // Set up all defaults to be passed forward to other threads. @@ -168,77 +184,174 @@ public class BOB { i = Y.hashCode(); i = Y1.hashCode(); i = Y2.hashCode(); - - { - try { - FileInputStream fi = new FileInputStream(configLocation); - props.load(fi); - fi.close(); - } catch(FileNotFoundException fnfe) { - warn("Unable to load up the BOB config file " + configLocation + ", Using defaults."); - warn(fnfe.toString()); - save = true; - } catch(IOException ioe) { - warn("IOException on BOB config file " + configLocation + ", using defaults."); - warn(ioe.toString()); - } - } - // Global router and client API configurations that are missing are set to defaults here. - if(!props.containsKey(I2PClient.PROP_TCP_HOST)) { - props.setProperty(I2PClient.PROP_TCP_HOST, "localhost"); - } - if(!props.containsKey(I2PClient.PROP_TCP_PORT)) { - props.setProperty(I2PClient.PROP_TCP_PORT, "7654"); - } - if(!props.containsKey(I2PClient.PROP_RELIABILITY)) { - props.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_BEST_EFFORT); - } - if(!props.containsKey(PROP_BOB_PORT)) { - props.setProperty(PROP_BOB_PORT, "2827"); // 0xB0B - } - if(!props.containsKey("inbound.length")) { - props.setProperty("inbound.length", "1"); - } - if(!props.containsKey("outbound.length")) { - props.setProperty("outbound.length", "1"); - } - if(!props.containsKey("inbound.lengthVariance")) { - props.setProperty("inbound.lengthVariance", "0"); - } - if(!props.containsKey("outbound.lengthVariance")) { - props.setProperty("outbound.lengthVariance", "0"); - } - if(!props.containsKey(PROP_BOB_HOST)) { - props.setProperty(PROP_BOB_HOST, "localhost"); - } - if(save) { - try { - warn("Writing new defaults file " + configLocation); - FileOutputStream fo = new FileOutputStream(configLocation); - props.store(fo, configLocation); - fo.close(); - } catch(IOException ioe) { - error("IOException on BOB config file " + configLocation + ", " + ioe); - } - } - - i = 0; try { - info("BOB is now running."); - ServerSocket listener = new ServerSocket(Integer.parseInt(props.getProperty(PROP_BOB_PORT)), 10, InetAddress.getByName(props.getProperty(PROP_BOB_HOST))); - Socket server; - - while((i++ < maxConnections) || (maxConnections == 0)) { - //DoCMDS connection; - - server = listener.accept(); - DoCMDS conn_c = new DoCMDS(server, props, database, _log); - Thread t = new Thread(conn_c); - t.start(); + { + try { + FileInputStream fi = new FileInputStream(configLocation); + props.load(fi); + fi.close(); + } catch (FileNotFoundException fnfe) { + warn("Unable to load up the BOB config file " + configLocation + ", Using defaults."); + warn(fnfe.toString()); + save = true; + } catch (IOException ioe) { + warn("IOException on BOB config file " + configLocation + ", using defaults."); + warn(ioe.toString()); + } } - } catch(IOException ioe) { - error("IOException on socket listen: " + ioe); - ioe.printStackTrace(); + // Global router and client API configurations that are missing are set to defaults here. + if (!props.containsKey(I2PClient.PROP_TCP_HOST)) { + props.setProperty(I2PClient.PROP_TCP_HOST, "localhost"); + } + if (!props.containsKey(I2PClient.PROP_TCP_PORT)) { + props.setProperty(I2PClient.PROP_TCP_PORT, "7654"); + } + if (!props.containsKey(I2PClient.PROP_RELIABILITY)) { + props.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_BEST_EFFORT); + } + if (!props.containsKey(PROP_BOB_PORT)) { + props.setProperty(PROP_BOB_PORT, "2827"); // 0xB0B + } + if (!props.containsKey("inbound.length")) { + props.setProperty("inbound.length", "1"); + } + if (!props.containsKey("outbound.length")) { + props.setProperty("outbound.length", "1"); + } + if (!props.containsKey("inbound.lengthVariance")) { + props.setProperty("inbound.lengthVariance", "0"); + } + if (!props.containsKey("outbound.lengthVariance")) { + props.setProperty("outbound.lengthVariance", "0"); + } + if (!props.containsKey(PROP_BOB_HOST)) { + props.setProperty(PROP_BOB_HOST, "localhost"); + } + if (save) { + try { + warn("Writing new defaults file " + configLocation); + FileOutputStream fo = new FileOutputStream(configLocation); + props.store(fo, configLocation); + fo.close(); + } catch (IOException ioe) { + error("IOException on BOB config file " + configLocation + ", " + ioe); + } + } + + i = 0; + boolean g = false; + try { + info("BOB is now running."); + listener = new ServerSocket(Integer.parseInt(props.getProperty(PROP_BOB_PORT)), 10, InetAddress.getByName(props.getProperty(PROP_BOB_HOST))); + Socket server = null; + listener.setSoTimeout(500); // .5 sec + while (spin.get()) { + //DoCMDS connection; + + try { + server = listener.accept(); + g = true; + } catch (ConnectException ce) { + g = false; + } catch (SocketTimeoutException ste) { + g = false; + } + + if (g) { + DoCMDS conn_c = new DoCMDS(spin, lock, server, props, database, _log); + Thread t = new Thread(conn_c); + t.setName("BOB.DoCMDS " + i); + t.start(); + i++; + } + } + } catch (IOException ioe) { + error("IOException on socket listen: " + ioe); + ioe.printStackTrace(); + } + } finally { + info("BOB is now shutting down..."); + // Clean up everything. + try { + listener.close(); + } catch (Exception ex) { + // nop + } + // Find all our "BOB.DoCMDS" threads, wait for them to be finished. + // We could order them to stop, but that could cause nasty issues in the locks. + visitAllThreads(); + database.getReadLock(); + int all = database.getcount(); + database.releaseReadLock(); + NamedDB nickinfo; + for (i = 0; i < all; i++) { + database.getReadLock(); + nickinfo = (NamedDB) database.getnext(i); + nickinfo.getReadLock(); + if (nickinfo.get(P_RUNNING).equals(Boolean.TRUE) && nickinfo.get(P_STOPPING).equals(Boolean.FALSE) && nickinfo.get(P_STARTING).equals(Boolean.FALSE)) { + nickinfo.releaseReadLock(); + database.releaseReadLock(); + database.getWriteLock(); + nickinfo.getWriteLock(); + nickinfo.add(P_STOPPING, new Boolean(true)); + nickinfo.releaseWriteLock(); + database.releaseWriteLock(); + } else { + nickinfo.releaseReadLock(); + database.releaseReadLock(); + } + } + info("BOB is now stopped."); + + } + } + + /** + * Find the root thread group, + * then find all theads with certain names and wait for them all to be dead. + * + */ + private static void visitAllThreads() { + ThreadGroup root = Thread.currentThread().getThreadGroup().getParent(); + while (root.getParent() != null) { + root = root.getParent(); + } + + // Visit each thread group + waitjoin(root, 0, root.getName()); + } + + private static void waitjoin(ThreadGroup group, int level, String tn) { + // Get threads in `group' + int numThreads = group.activeCount(); + Thread[] threads = new Thread[numThreads * 2]; + numThreads = group.enumerate(threads, false); + // Enumerate each thread in `group' and wait for it to stop if it is one of ours. + for (int i = 0; i < numThreads; i++) { + // Get thread + Thread thread = threads[i]; + if (thread.getName().startsWith("BOB.DoCMDS ")) { + try { + if (thread.isAlive()) { + try { + thread.join(); + } catch (InterruptedException ex) { + } + } + } catch (SecurityException se) { + //nop + } + } + } + + // Get thread subgroups of `group' + int numGroups = group.activeGroupCount(); + ThreadGroup[] groups = new ThreadGroup[numGroups * 2]; + numGroups = group.enumerate(groups, false); + + // Recursively visit each subgroup + for (int i = 0; i < numGroups; i++) { + waitjoin(groups[i], level + 1, groups[i].getName()); } } } diff --git a/apps/BOB/src/net/i2p/BOB/DoCMDS.java b/apps/BOB/src/net/i2p/BOB/DoCMDS.java index 4a13844cb..c49be9826 100644 --- a/apps/BOB/src/net/i2p/BOB/DoCMDS.java +++ b/apps/BOB/src/net/i2p/BOB/DoCMDS.java @@ -31,10 +31,12 @@ import java.io.PrintStream; import java.net.Socket; import java.util.Properties; import java.util.StringTokenizer; +import java.util.concurrent.atomic.AtomicBoolean; import net.i2p.I2PException; import net.i2p.client.I2PClientFactory; import net.i2p.data.Destination; import net.i2p.util.Log; +import net.i2p.util.SimpleStore; /** * Simplistic command parser for BOB @@ -57,6 +59,8 @@ public class DoCMDS implements Runnable { private boolean dk, ns, ip, op; private NamedDB nickinfo; private Log _log; + private AtomicBoolean LIVE; + private AtomicBoolean lock; /* database strings */ private static final String P_DEST = "DESTINATION"; private static final String P_INHOST = "INHOST"; @@ -94,6 +98,7 @@ public class DoCMDS implements Runnable { private static final String C_status = "status"; private static final String C_stop = "stop"; private static final String C_verify = "verify"; + private static final String C_zap = "zap"; /* all the coomands available, plus description */ private static final String C_ALL[][] = { @@ -119,6 +124,7 @@ public class DoCMDS implements Runnable { {C_status, C_status + " nickname * Display status of a nicknamed tunnel."}, {C_stop, C_stop + " * Stops the current nicknamed tunnel."}, {C_verify, C_verify + " BASE64_key * Verifies BASE64 destination."}, + {C_zap, C_zap + " * Shuts down BOB."}, {"", "COMMANDS: " + // this is ugly, but... C_help + " " + C_clear + " " + @@ -141,19 +147,22 @@ public class DoCMDS implements Runnable { C_start + " " + C_status + " " + C_stop + " " + - C_verify + C_verify + " " + + C_zap }, {" ", " "} // end of list }; /** - * + * @parm LIVE * @param server * @param props * @param database * @param _log */ - DoCMDS(Socket server, Properties props, NamedDB database, Log _log) { + DoCMDS(AtomicBoolean LIVE, AtomicBoolean lock, Socket server, Properties props, NamedDB database, Log _log) { + this.lock = lock; + this.LIVE = LIVE; this.server = server; this.props = new Properties(); this.database = database; @@ -509,6 +518,11 @@ public class DoCMDS implements Runnable { } else if (Command.equals(C_quit)) { // End the command session break quit; + } else if (Command.equals(C_zap)) { + // Kill BOB!! (let's hope this works!) + LIVE.set(false); + // End the command session + break quit; } else if (Command.equals(C_newkeys)) { if (ns) { try { @@ -1260,7 +1274,10 @@ public class DoCMDS implements Runnable { } else { MUXlisten tunnel; try { - tunnel = new MUXlisten(database, nickinfo, _log); + while(!lock.compareAndSet(false, true)) { + // wait + } + tunnel = new MUXlisten(lock, database, nickinfo, _log); Thread t = new Thread(tunnel); t.start(); // try { @@ -1270,8 +1287,10 @@ public class DoCMDS implements Runnable { // } out.println("OK tunnel starting"); } catch (I2PException e) { + lock.set(false); out.println("ERROR starting tunnel: " + e); } catch (IOException e) { + lock.set(false); out.println("ERROR starting tunnel: " + e); } } diff --git a/apps/BOB/src/net/i2p/BOB/I2Plistener.java b/apps/BOB/src/net/i2p/BOB/I2Plistener.java index caaadc76d..b8d9145f2 100644 --- a/apps/BOB/src/net/i2p/BOB/I2Plistener.java +++ b/apps/BOB/src/net/i2p/BOB/I2Plistener.java @@ -25,8 +25,6 @@ package net.i2p.BOB; import java.net.ConnectException; import java.net.SocketTimeoutException; -import java.util.logging.Level; -import java.util.logging.Logger; import net.i2p.I2PException; import net.i2p.client.streaming.I2PServerSocket; import net.i2p.client.streaming.I2PSocket; diff --git a/apps/BOB/src/net/i2p/BOB/MUXlisten.java b/apps/BOB/src/net/i2p/BOB/MUXlisten.java index f77d5bc82..5f6885dd5 100644 --- a/apps/BOB/src/net/i2p/BOB/MUXlisten.java +++ b/apps/BOB/src/net/i2p/BOB/MUXlisten.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.net.InetAddress; import java.net.ServerSocket; import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; import net.i2p.I2PException; import net.i2p.client.streaming.I2PServerSocket; import net.i2p.client.streaming.I2PSocketManager; @@ -52,6 +53,7 @@ public class MUXlisten implements Runnable { private int backlog = 50; // should this be more? less? boolean go_out; boolean come_in; + private AtomicBoolean lock; /** * Constructor Will fail if INPORT is occupied. @@ -62,9 +64,10 @@ public class MUXlisten implements Runnable { * @throws net.i2p.I2PException * @throws java.io.IOException */ - MUXlisten(NamedDB database, NamedDB info, Log _log) throws I2PException, IOException, RuntimeException { + MUXlisten(AtomicBoolean lock, NamedDB database, NamedDB info, Log _log) throws I2PException, IOException, RuntimeException { int port = 0; InetAddress host = null; + this.lock = lock; this.tg = null; this.database = database; this.info = info; @@ -151,7 +154,7 @@ public class MUXlisten implements Runnable { return; } // socketManager.addDisconnectListener(new DisconnectListener()); - + lock.set(false); quit: { try { @@ -216,7 +219,7 @@ public class MUXlisten implements Runnable { break die; } } - + /* cleared in the finally... try { wlock(); try { @@ -233,6 +236,7 @@ public class MUXlisten implements Runnable { } catch (Exception e) { break die; } + */ } // die } catch (Exception e) { @@ -241,11 +245,11 @@ public class MUXlisten implements Runnable { } } // quit } finally { - // Start cleanup. Allow threads above this one to catch the stop signal. - try { - Thread.sleep(250); - } catch (InterruptedException ex) { + // Start cleanup. + while (!lock.compareAndSet(false, true)) { + // wait } + // zero out everything. try { wlock(); @@ -261,7 +265,6 @@ public class MUXlisten implements Runnable { } catch (Exception e) { } - if (SS != null) { try { SS.close(); @@ -279,7 +282,14 @@ public class MUXlisten implements Runnable { socketManager.destroySocketManager(); } catch (Exception e) { // nop - } + } + // Some grace time. + try { + Thread.sleep(250); + } catch (InterruptedException ex) { + } + + lock.set(false); // Should we force waiting for all threads?? // Wait around till all threads are collected. if (tg != null) { String boner = tg.getName(); @@ -362,39 +372,41 @@ public class MUXlisten implements Runnable { } } + /* private static void nuke(ThreadGroup group, int level) { - // Get threads in `group' - int numThreads = group.activeCount(); - Thread[] threads = new Thread[numThreads * 2]; - numThreads = group.enumerate(threads, false); - // Enumerate each thread in `group' and stop it. - for (int i = 0; i < numThreads; i++) { - // Get thread - Thread thread = threads[i]; - try { - if (thread.isAlive()) { - thread.stop(); - } - } catch (SecurityException se) { - //nop - } - } - - // Get thread subgroups of `group' - int numGroups = group.activeGroupCount(); - ThreadGroup[] groups = new ThreadGroup[numGroups * 2]; - numGroups = group.enumerate(groups, false); - - // Recursively visit each subgroup - for (int i = 0; i < numGroups; i++) { - nuke(groups[i], level + 1); - } - try { - group.destroy(); - } catch (IllegalThreadStateException IE) { - //nop - } catch (SecurityException se) { - //nop - } + // Get threads in `group' + int numThreads = group.activeCount(); + Thread[] threads = new Thread[numThreads * 2]; + numThreads = group.enumerate(threads, false); + // Enumerate each thread in `group' and stop it. + for (int i = 0; i < numThreads; i++) { + // Get thread + Thread thread = threads[i]; + try { + if (thread.isAlive()) { + thread.stop(); } + } catch (SecurityException se) { + //nop + } + } + + // Get thread subgroups of `group' + int numGroups = group.activeGroupCount(); + ThreadGroup[] groups = new ThreadGroup[numGroups * 2]; + numGroups = group.enumerate(groups, false); + + // Recursively visit each subgroup + for (int i = 0; i < numGroups; i++) { + nuke(groups[i], level + 1); + } + try { + group.destroy(); + } catch (IllegalThreadStateException IE) { + //nop + } catch (SecurityException se) { + //nop + } + } + */ } diff --git a/apps/BOB/src/net/i2p/BOB/TCPio.java b/apps/BOB/src/net/i2p/BOB/TCPio.java index d92a5cef0..99d408c43 100644 --- a/apps/BOB/src/net/i2p/BOB/TCPio.java +++ b/apps/BOB/src/net/i2p/BOB/TCPio.java @@ -49,8 +49,8 @@ public class TCPio implements Runnable { TCPio(InputStream Ain, OutputStream Aout /*, NamedDB info , NamedDB database */) { this.Ain = Ain; this.Aout = Aout; - // this.info = info; - // this.database = database; + // this.info = info; + // this.database = database; } /** @@ -86,30 +86,35 @@ public class TCPio implements Runnable { byte a[] = new byte[1]; boolean spin = true; try { - while(spin) { - b = Ain.read(a, 0, 1); - if(b > 0) { - Aout.write(a, 0, b); - } else if(b == 0) { - Thread.yield(); // this should act like a mini sleep. - if(Ain.available() == 0) { + try { + while (spin) { + b = Ain.read(a, 0, 1); + if (b > 0) { + Aout.write(a, 0, b); + } else if (b == 0) { + Thread.yield(); // this should act like a mini sleep. + if (Ain.available() == 0) { Thread.sleep(10); + } + } else { + /* according to the specs: + * + * The total number of bytes read into the buffer, + * or -1 if there is no more data because the end of + * the stream has been reached. + * + */ + // System.out.println("TCPio: End Of Stream"); + // Ain.close(); + // Aout.close(); + //return; + break; } - } else { - /* according to the specs: - * - * The total number of bytes read into the buffer, - * or -1 if there is no more data because the end of - * the stream has been reached. - * - */ - // System.out.println("TCPio: End Of Stream"); - Ain.close(); - Aout.close(); - return; } + } catch (Exception e) { } - } catch(Exception e) { + // System.out.println("TCPio: Leaving."); + } finally { // Eject!!! Eject!!! //System.out.println("TCPio: Caught an exception " + e); try { @@ -122,6 +127,5 @@ public class TCPio implements Runnable { } return; } - // System.out.println("TCPio: Leaving."); } } diff --git a/apps/BOB/src/net/i2p/BOB/TCPlistener.java b/apps/BOB/src/net/i2p/BOB/TCPlistener.java index 0ac67d277..8f559c09f 100644 --- a/apps/BOB/src/net/i2p/BOB/TCPlistener.java +++ b/apps/BOB/src/net/i2p/BOB/TCPlistener.java @@ -29,8 +29,6 @@ import java.net.Socket; import java.net.SocketTimeoutException; // import net.i2p.client.I2PSession; // import net.i2p.client.I2PSessionException; -import java.util.logging.Level; -import java.util.logging.Logger; import net.i2p.client.streaming.I2PServerSocket; import net.i2p.client.streaming.I2PSocketManager; import net.i2p.util.Log; @@ -142,12 +140,7 @@ public class TCPlistener implements Runnable { g = false; } } - listener.close(); } catch (IOException ioe) { - try { - listener.close(); - } catch (IOException e) { - } } } } finally { diff --git a/history.txt b/history.txt index f61fa1148..32bf7cb33 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,8 @@ +2009-05-29 sponge + * added big fat start/stop lock into BOB + * added zap command to shut down BOB... now we need a way to start it + after it stops. :-) + 2009-05-27 Mathiasdm * Increase sendProcessingTime some more, add a property to configure. Configure with 'router.defaultProcessingTimeThrottle'. diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 0116da736..f47a425fd 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 8; + public final static long BUILD = 9; /** for example "-test" */ public final static String EXTRA = ""; public final static String FULL_VERSION = VERSION + "-" + BUILD + EXTRA;