* ministreaming:

- small pedantic fix
    * streaming:
      - Fix a deadly race condition.
      - Some small pedantic fixes.
    * core:
      - Fix a deadly race condition.
    * BOB:
      - Fixed some races that occured from fixing races in streaming and core.
      - Some badly needed code refactoring to depend less on the database.
This commit is contained in:
sponge
2009-07-16 03:03:33 +00:00
parent 5d40ad1749
commit 5106c37ac4
18 changed files with 1157 additions and 1078 deletions

View File

@@ -1,7 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project-private xmlns="http://www.netbeans.org/ns/project-private/1"> <project-private xmlns="http://www.netbeans.org/ns/project-private/1">
<editor-bookmarks xmlns="http://www.netbeans.org/ns/editor-bookmarks/1"/> <editor-bookmarks xmlns="http://www.netbeans.org/ns/editor-bookmarks/1"/>
<open-files xmlns="http://www.netbeans.org/ns/projectui-open-files/1">
<file>file:/root/NetBeansProjects/i2p.i2p/apps/BOB/src/net/i2p/BOB/I2PtoTCP.java</file>
</open-files>
</project-private> </project-private>

View File

@@ -166,6 +166,7 @@ public class BOB {
public static void stop() { public static void stop() {
spin.set(false); spin.set(false);
} }
/** /**
* Listen for incoming connections and handle them * Listen for incoming connections and handle them
* *
@@ -188,9 +189,10 @@ public class BOB {
i = Y2.hashCode(); i = Y2.hashCode();
try { try {
{ {
File cfg = new File(configLocation); File cfg = new File(configLocation);
if (!cfg.isAbsolute()) if (!cfg.isAbsolute()) {
cfg = new File(I2PAppContext.getGlobalContext().getConfigDir(), configLocation); cfg = new File(I2PAppContext.getGlobalContext().getConfigDir(), configLocation);
}
try { try {
FileInputStream fi = new FileInputStream(cfg); FileInputStream fi = new FileInputStream(cfg);
props.load(fi); props.load(fi);
@@ -233,9 +235,10 @@ public class BOB {
props.setProperty(PROP_BOB_HOST, "localhost"); props.setProperty(PROP_BOB_HOST, "localhost");
} }
if (save) { if (save) {
File cfg = new File(configLocation); File cfg = new File(configLocation);
if (!cfg.isAbsolute()) if (!cfg.isAbsolute()) {
cfg = new File(I2PAppContext.getGlobalContext().getConfigDir(), configLocation); cfg = new File(I2PAppContext.getGlobalContext().getConfigDir(), configLocation);
}
try { try {
warn("Writing new defaults file " + cfg.getAbsolutePath()); warn("Writing new defaults file " + cfg.getAbsolutePath());
FileOutputStream fo = new FileOutputStream(cfg); FileOutputStream fo = new FileOutputStream(cfg);

File diff suppressed because it is too large Load Diff

View File

@@ -25,6 +25,7 @@ package net.i2p.BOB;
import java.net.ConnectException; import java.net.ConnectException;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import net.i2p.I2PException; import net.i2p.I2PException;
import net.i2p.client.streaming.I2PServerSocket; import net.i2p.client.streaming.I2PServerSocket;
import net.i2p.client.streaming.I2PSocket; import net.i2p.client.streaming.I2PSocket;
@@ -43,6 +44,7 @@ public class I2Plistener implements Runnable {
// private int tgwatch; // private int tgwatch;
public I2PSocketManager socketManager; public I2PSocketManager socketManager;
public I2PServerSocket serverSocket; public I2PServerSocket serverSocket;
private AtomicBoolean lives;
/** /**
* Constructor * Constructor
@@ -52,13 +54,14 @@ public class I2Plistener implements Runnable {
* @param database * @param database
* @param _log * @param _log
*/ */
I2Plistener(I2PServerSocket SS, I2PSocketManager S, NamedDB info, NamedDB database, Log _log) { I2Plistener(I2PServerSocket SS, I2PSocketManager S, NamedDB info, NamedDB database, Log _log, AtomicBoolean lives) {
this.database = database; this.database = database;
this.info = info; this.info = info;
this._log = _log; this._log = _log;
this.socketManager = S; this.socketManager = S;
serverSocket = SS; this.serverSocket = SS;
// tgwatch = 1; // tgwatch = 1;
this.lives = lives;
} }
private void rlock() throws Exception { private void rlock() throws Exception {
@@ -82,27 +85,10 @@ public class I2Plistener implements Runnable {
try { try {
die: die:
{ {
try {
serverSocket.setSoTimeout(50);
serverSocket.setSoTimeout(50); while (lives.get()) {
boolean spin = true;
while (spin) {
try {
rlock();
} catch (Exception e) {
break die;
}
try {
spin = info.get("RUNNING").equals(Boolean.TRUE);
} catch (Exception e) {
try {
runlock();
} catch (Exception e2) {
break die;
}
break die;
}
try {
try { try {
sessSocket = serverSocket.accept(); sessSocket = serverSocket.accept();
g = true; g = true;
@@ -115,14 +101,15 @@ public class I2Plistener implements Runnable {
g = false; g = false;
conn++; conn++;
// toss the connection to a new thread. // toss the connection to a new thread.
I2PtoTCP conn_c = new I2PtoTCP(sessSocket, info, database); I2PtoTCP conn_c = new I2PtoTCP(sessSocket, info, database, lives);
Thread t = new Thread(conn_c, Thread.currentThread().getName() + " I2PtoTCP " + conn); Thread t = new Thread(conn_c, Thread.currentThread().getName() + " I2PtoTCP " + conn);
t.start(); t.start();
} }
} catch (Exception e) {
// System.out.println("Exception " + e);
} }
} catch (I2PException e) {
// bad shit
System.out.println("Exception " + e);
} }
} }
} finally { } finally {

View File

@@ -26,6 +26,7 @@ package net.i2p.BOB;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.Socket; import java.net.Socket;
import java.util.concurrent.atomic.AtomicBoolean;
import net.i2p.client.streaming.I2PSocket; import net.i2p.client.streaming.I2PSocket;
/** /**
@@ -38,6 +39,7 @@ public class I2PtoTCP implements Runnable {
private I2PSocket I2P; private I2PSocket I2P;
private NamedDB info, database; private NamedDB info, database;
private Socket sock; private Socket sock;
private AtomicBoolean lives;
/** /**
* Constructor * Constructor
@@ -46,10 +48,11 @@ public class I2PtoTCP implements Runnable {
* @param info * @param info
* @param database * @param database
*/ */
I2PtoTCP(I2PSocket I2Psock, NamedDB info, NamedDB database) { I2PtoTCP(I2PSocket I2Psock, NamedDB info, NamedDB database, AtomicBoolean lives) {
this.I2P = I2Psock; this.I2P = I2Psock;
this.info = info; this.info = info;
this.database = database; this.database = database;
this.lives = lives;
} }
private void rlock() { private void rlock() {
@@ -113,35 +116,19 @@ public class I2PtoTCP implements Runnable {
out.flush(); // not really needed, but... out.flush(); // not really needed, but...
} }
// setup to cross the streams // setup to cross the streams
TCPio conn_c = new TCPio(in, Iout /*, info, database */); // app -> I2P TCPio conn_c = new TCPio(in, Iout, lives); // app -> I2P
TCPio conn_a = new TCPio(Iin, out /* , info, database */); // I2P -> app TCPio conn_a = new TCPio(Iin, out, lives); // I2P -> app
t = new Thread(conn_c, Thread.currentThread().getName() + " TCPioA"); t = new Thread(conn_c, Thread.currentThread().getName() + " TCPioA");
q = new Thread(conn_a, Thread.currentThread().getName() + " TCPioB"); q = new Thread(conn_a, Thread.currentThread().getName() + " TCPioB");
// Fire! // Fire!
t.start(); t.start();
q.start(); q.start();
boolean spin = true; while (t.isAlive() && q.isAlive() && lives.get()) { // AND is used here to kill off the other thread
while (t.isAlive() && q.isAlive() && spin) { // AND is used here to kill off the other thread
try { try {
Thread.sleep(10); //sleep for 10 ms Thread.sleep(10); //sleep for 10 ms
} catch (InterruptedException e) { } catch (InterruptedException e) {
break die; break die;
} }
try {
rlock();
} catch (Exception e) {
break die;
}
try {
spin = info.get("RUNNING").equals(Boolean.TRUE);
} catch (Exception e) {
try {
runlock();
} catch (Exception e2) {
break die;
}
break die;
}
} }
// System.out.println("I2PtoTCP: Going away..."); // System.out.println("I2PtoTCP: Going away...");
} catch (Exception e) { } catch (Exception e) {
@@ -150,14 +137,6 @@ public class I2PtoTCP implements Runnable {
} }
} // die } // die
} finally { } finally {
try {
t.interrupt();
} catch (Exception e) {
}
try {
q.interrupt();
} catch (Exception e) {
}
try { try {
in.close(); in.close();
} catch (Exception ex) { } catch (Exception ex) {
@@ -174,6 +153,14 @@ public class I2PtoTCP implements Runnable {
Iout.close(); Iout.close();
} catch (Exception ex) { } catch (Exception ex) {
} }
try {
t.interrupt();
} catch (Exception e) {
}
try {
q.interrupt();
} catch (Exception e) {
}
try { try {
// System.out.println("I2PtoTCP: Close I2P"); // System.out.println("I2PtoTCP: Close I2P");
I2P.close(); I2P.close();

View File

@@ -43,7 +43,7 @@ import net.i2p.util.Log;
*/ */
public class MUXlisten implements Runnable { public class MUXlisten implements Runnable {
private NamedDB database, info; private NamedDB database, info;
private Log _log; private Log _log;
private I2PSocketManager socketManager; private I2PSocketManager socketManager;
private ByteArrayInputStream prikey; private ByteArrayInputStream prikey;
@@ -54,6 +54,7 @@ public class MUXlisten implements Runnable {
boolean go_out; boolean go_out;
boolean come_in; boolean come_in;
private AtomicBoolean lock; private AtomicBoolean lock;
private AtomicBoolean lives;
/** /**
* Constructor Will fail if INPORT is occupied. * Constructor Will fail if INPORT is occupied.
@@ -65,47 +66,86 @@ public class MUXlisten implements Runnable {
* @throws java.io.IOException * @throws java.io.IOException
*/ */
MUXlisten(AtomicBoolean lock, NamedDB database, NamedDB info, Log _log) throws I2PException, IOException, RuntimeException { MUXlisten(AtomicBoolean lock, NamedDB database, NamedDB info, Log _log) throws I2PException, IOException, RuntimeException {
int port = 0; try {
InetAddress host = null; int port = 0;
this.lock = lock; InetAddress host = null;
this.tg = null; this.lock = lock;
this.database = database; this.tg = null;
this.info = info; this.database = database;
this._log = _log; this.info = info;
this._log = _log;
lives = new AtomicBoolean(false);
this.database.getReadLock(); this.database.getWriteLock();
this.info.getReadLock(); this.info.getWriteLock();
N = this.info.get("NICKNAME").toString(); this.info.add("STARTING", new Boolean(true));
prikey = new ByteArrayInputStream((byte[]) info.get("KEYS")); this.info.releaseWriteLock();
// Make a new copy so that anything else won't muck with our database. this.database.releaseWriteLock();
Properties R = (Properties) info.get("PROPERTIES"); this.database.getReadLock();
Properties Q = new Properties(); this.info.getReadLock();
Lifted.copyProperties(R, Q);
this.database.releaseReadLock();
this.info.releaseReadLock();
this.database.getReadLock(); N = this.info.get("NICKNAME").toString();
this.info.getReadLock(); prikey = new ByteArrayInputStream((byte[]) info.get("KEYS"));
this.go_out = info.exists("OUTPORT"); // Make a new copy so that anything else won't muck with our database.
this.come_in = info.exists("INPORT"); Properties R = (Properties) info.get("PROPERTIES");
if (this.come_in) { Properties Q = new Properties();
port = Integer.parseInt(info.get("INPORT").toString()); Lifted.copyProperties(R, Q);
host = InetAddress.getByName(info.get("INHOST").toString()); this.database.releaseReadLock();
this.info.releaseReadLock();
this.database.getReadLock();
this.info.getReadLock();
this.go_out = info.exists("OUTPORT");
this.come_in = info.exists("INPORT");
if (this.come_in) {
port = Integer.parseInt(info.get("INPORT").toString());
host = InetAddress.getByName(info.get("INHOST").toString());
}
this.database.releaseReadLock();
this.info.releaseReadLock();
socketManager = I2PSocketManagerFactory.createManager(prikey, Q);
if (this.come_in) {
this.listener = new ServerSocket(port, backlog, host);
}
// I2PException, IOException, RuntimeException
// To bad we can't just catch and enumerate....
// } catch (I2PException e) {
// Something went bad.
// this.database.getWriteLock();
// this.info.getWriteLock();
// this.info.add("STARTING", new Boolean(false));
// this.info.releaseWriteLock();
// this.database.releaseWriteLock();
// throw new I2PException(e);
} catch (IOException e) {
// Something went bad.
this.database.getWriteLock();
this.info.getWriteLock();
this.info.add("STARTING", new Boolean(false));
this.info.releaseWriteLock();
this.database.releaseWriteLock();
throw new IOException(e);
} catch (RuntimeException e) {
// Something went bad.
this.database.getWriteLock();
this.info.getWriteLock();
this.info.add("STARTING", new Boolean(false));
this.info.releaseWriteLock();
this.database.releaseWriteLock();
throw new RuntimeException(e);
} catch (Exception e) {
// Something else went bad.
this.database.getWriteLock();
this.info.getWriteLock();
this.info.add("STARTING", new Boolean(false));
this.info.releaseWriteLock();
this.database.releaseWriteLock();
// throw new Exception(e);
// Debugging, I guess.
e.printStackTrace();
throw new RuntimeException(e);
} }
this.database.releaseReadLock();
this.info.releaseReadLock();
socketManager = I2PSocketManagerFactory.createManager(prikey, Q);
if (this.come_in) {
this.listener = new ServerSocket(port, backlog, host);
}
// Everything is OK as far as we can tell.
this.database.getWriteLock();
this.info.getWriteLock();
this.info.add("STARTING", new Boolean(true));
this.info.releaseWriteLock();
this.database.releaseWriteLock();
} }
private void rlock() throws Exception { private void rlock() throws Exception {
@@ -142,18 +182,22 @@ public class MUXlisten implements Runnable {
try { try {
info.add("RUNNING", new Boolean(true)); info.add("RUNNING", new Boolean(true));
} catch (Exception e) { } catch (Exception e) {
lock.set(false);
wunlock(); wunlock();
return; return;
} }
} catch (Exception e) { } catch (Exception e) {
lock.set(false);
return; return;
} }
try { try {
wunlock(); wunlock();
} catch (Exception e) { } catch (Exception e) {
lock.set(false);
return; return;
} }
// socketManager.addDisconnectListener(new DisconnectListener()); // socketManager.addDisconnectListener(new DisconnectListener());
lives.set(true);
lock.set(false); lock.set(false);
quit: quit:
{ {
@@ -166,14 +210,14 @@ public class MUXlisten implements Runnable {
if (go_out) { if (go_out) {
// I2P -> TCP // I2P -> TCP
SS = socketManager.getServerSocket(); SS = socketManager.getServerSocket();
I2Plistener conn = new I2Plistener(SS, socketManager, info, database, _log); I2Plistener conn = new I2Plistener(SS, socketManager, info, database, _log, lives);
t = new Thread(tg, conn, "BOBI2Plistener " + N); t = new Thread(tg, conn, "BOBI2Plistener " + N);
t.start(); t.start();
} }
if (come_in) { if (come_in) {
// TCP -> I2P // TCP -> I2P
TCPlistener conn = new TCPlistener(listener, socketManager, info, database, _log); TCPlistener conn = new TCPlistener(listener, socketManager, info, database, _log, lives);
q = new Thread(tg, conn, "BOBTCPlistener " + N); q = new Thread(tg, conn, "BOBTCPlistener " + N);
q.start(); q.start();
} }
@@ -195,7 +239,7 @@ public class MUXlisten implements Runnable {
break quit; break quit;
} }
boolean spin = true; boolean spin = true;
while (spin) { while (spin && lives.get()) {
try { try {
Thread.sleep(1000); //sleep for 1 second Thread.sleep(1000); //sleep for 1 second
} catch (InterruptedException e) { } catch (InterruptedException e) {
@@ -226,26 +270,30 @@ public class MUXlisten implements Runnable {
} }
} // quit } // quit
} finally { } finally {
// Start cleanup. lives.set(false);
while (!lock.compareAndSet(false, true)) { // Some grace time.
// wait try {
Thread.sleep(100);
} catch (InterruptedException ex) {
} }
// zero out everything.
try { try {
wlock(); wlock();
try { try {
info.add("STARTING", new Boolean(false)); info.add("STARTING", new Boolean(false));
info.add("STOPPING", new Boolean(false)); info.add("STOPPING", new Boolean(true));
info.add("RUNNING", new Boolean(false)); info.add("RUNNING", new Boolean(false));
} catch (Exception e) { } catch (Exception e) {
lock.set(false);
wunlock(); wunlock();
return; return;
} }
wunlock(); wunlock();
} catch (Exception e) { } catch (Exception e) {
} }
// Start cleanup.
while (!lock.compareAndSet(false, true)) {
// wait
}
if (SS != null) { if (SS != null) {
try { try {
SS.close(); SS.close();
@@ -261,11 +309,28 @@ public class MUXlisten implements Runnable {
// Some grace time. // Some grace time.
try { try {
Thread.sleep(250); Thread.sleep(100);
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
} }
// zero out everything.
try {
wlock();
try {
info.add("STARTING", new Boolean(false));
info.add("STOPPING", new Boolean(false));
info.add("RUNNING", new Boolean(false));
} catch (Exception e) {
lock.set(false);
wunlock();
return;
}
wunlock();
} catch (Exception e) {
}
lock.set(false); // Should we force waiting for all threads?? lock.set(false); // Should we force waiting for all threads??
// Wait around till all threads are collected. // Wait around till all threads are collected.
if (tg != null) { if (tg != null) {
String boner = tg.getName(); String boner = tg.getName();
@@ -276,7 +341,7 @@ public class MUXlisten implements Runnable {
// visit(tg, 0, boner); // visit(tg, 0, boner);
int foo = tg.activeCount() + tg.activeGroupCount(); int foo = tg.activeCount() + tg.activeGroupCount();
// hopefully no longer needed! // hopefully no longer needed!
// int bar = foo; // int bar = lives;
// System.out.println("BOB: MUXlisten: Waiting on threads for " + boner); // System.out.println("BOB: MUXlisten: Waiting on threads for " + boner);
// System.out.println("\nBOB: MUXlisten: ThreadGroup dump BEGIN " + boner); // System.out.println("\nBOB: MUXlisten: ThreadGroup dump BEGIN " + boner);
// visit(tg, 0, boner); // visit(tg, 0, boner);
@@ -284,12 +349,12 @@ public class MUXlisten implements Runnable {
// Happily spin forever :-( // Happily spin forever :-(
while (foo != 0) { while (foo != 0) {
foo = tg.activeCount() + tg.activeGroupCount(); foo = tg.activeCount() + tg.activeGroupCount();
// if (foo != bar && foo != 0) { // if (lives != bar && lives != 0) {
// System.out.println("\nBOB: MUXlisten: ThreadGroup dump BEGIN " + boner); // System.out.println("\nBOB: MUXlisten: ThreadGroup dump BEGIN " + boner);
// visit(tg, 0, boner); // visit(tg, 0, boner);
// System.out.println("BOB: MUXlisten: ThreadGroup dump END " + boner + "\n"); // System.out.println("BOB: MUXlisten: ThreadGroup dump END " + boner + "\n");
// } // }
// bar = foo; // bar = lives;
try { try {
Thread.sleep(100); //sleep for 100 ms (One tenth second) Thread.sleep(100); //sleep for 100 ms (One tenth second)
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
@@ -356,5 +421,4 @@ public class MUXlisten implements Runnable {
visit(groups[i], level + 1, groups[i].getName()); visit(groups[i], level + 1, groups[i].getName());
} }
} }
} }

View File

@@ -26,6 +26,7 @@ package net.i2p.BOB;
import net.i2p.client.streaming.RetransmissionTimer; import net.i2p.client.streaming.RetransmissionTimer;
import net.i2p.util.SimpleScheduler; import net.i2p.util.SimpleScheduler;
import net.i2p.util.SimpleTimer2; import net.i2p.util.SimpleTimer2;
/** /**
* Start from command line * Start from command line
* *

View File

@@ -43,10 +43,10 @@ public class NamedDB {
} }
synchronized public void getReadLock() { synchronized public void getReadLock() {
while((writersWaiting != 0)) { while ((writersWaiting != 0)) {
try { try {
wait(); wait();
} catch(InterruptedException ie) { } catch (InterruptedException ie) {
} }
} }
readers++; readers++;
@@ -59,10 +59,10 @@ public class NamedDB {
synchronized public void getWriteLock() { synchronized public void getWriteLock() {
writersWaiting++; writersWaiting++;
while(readers != 0 && writersWaiting != 1 ) { while (readers != 0 && writersWaiting != 1) {
try { try {
wait(); wait();
} catch(InterruptedException ie) { } catch (InterruptedException ie) {
} }
} }
} }
@@ -79,8 +79,8 @@ public class NamedDB {
* @throws ArrayIndexOutOfBoundsException when key does not exist * @throws ArrayIndexOutOfBoundsException when key does not exist
*/ */
public int idx(Object key) throws ArrayIndexOutOfBoundsException { public int idx(Object key) throws ArrayIndexOutOfBoundsException {
for(int i = 0; i < index; i++) { for (int i = 0; i < index; i++) {
if(key.equals(data[i][0])) { if (key.equals(data[i][0])) {
return i; return i;
} }
} }
@@ -100,17 +100,17 @@ public class NamedDB {
try { try {
k = idx(key); k = idx(key);
} catch(ArrayIndexOutOfBoundsException b) { } catch (ArrayIndexOutOfBoundsException b) {
return; return;
} }
olddata = new Object[index + 2][2]; olddata = new Object[index + 2][2];
// copy to olddata, skipping 'k' // copy to olddata, skipping 'k'
for(i = 0 , l = 0; l < index; i++, l++) { for (i = 0, l = 0; l < index; i++, l++) {
if(i == k) { if (i == k) {
l++; l++;
didsomething++; didsomething++;
} }
for(j = 0; j < 2; j++) { for (j = 0; j < 2; j++) {
olddata[i][j] = data[l][j]; olddata[i][j] = data[l][j];
} }
} }
@@ -132,13 +132,13 @@ public class NamedDB {
olddata = new Object[index + 2][2]; olddata = new Object[index + 2][2];
// copy to olddata // copy to olddata
for(i = 0; i < index; i++) { for (i = 0; i < index; i++) {
for(j = 0; j < 2; j++) { for (j = 0; j < 2; j++) {
olddata[i][j] = data[i][j]; olddata[i][j] = data[i][j];
} }
} }
data = olddata; data = olddata;
data[index++] = new Object[] {key, val}; data[index++] = new Object[]{key, val};
} }
/** /**
@@ -149,8 +149,8 @@ public class NamedDB {
* @throws java.lang.RuntimeException * @throws java.lang.RuntimeException
*/ */
public Object get(Object key) throws RuntimeException { public Object get(Object key) throws RuntimeException {
for(int i = 0; i < index; i++) { for (int i = 0; i < index; i++) {
if(key.equals(data[i][0])) { if (key.equals(data[i][0])) {
return data[i][1]; return data[i][1];
} }
} }
@@ -164,8 +164,8 @@ public class NamedDB {
* @return true if an object exists, else returns false * @return true if an object exists, else returns false
*/ */
public boolean exists(Object key) { public boolean exists(Object key) {
for(int i = 0; i < index; i++) { for (int i = 0; i < index; i++) {
if(key.equals(data[i][0])) { if (key.equals(data[i][0])) {
return true; return true;
} }
} }
@@ -180,7 +180,7 @@ public class NamedDB {
* @throws java.lang.RuntimeException * @throws java.lang.RuntimeException
*/ */
public Object getnext(int i) throws RuntimeException { public Object getnext(int i) throws RuntimeException {
if(i < index && i > -1) { if (i < index && i > -1) {
return data[i][1]; return data[i][1];
} }
throw new RuntimeException("No more data"); throw new RuntimeException("No more data");

View File

@@ -26,6 +26,7 @@ package net.i2p.BOB;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* Shove data from one stream to the other. * Shove data from one stream to the other.
@@ -36,7 +37,7 @@ public class TCPio implements Runnable {
private InputStream Ain; private InputStream Ain;
private OutputStream Aout; private OutputStream Aout;
// private NamedDB info, database; private AtomicBoolean lives;
/** /**
* Constructor * Constructor
@@ -46,11 +47,10 @@ public class TCPio implements Runnable {
* *
* param database * param database
*/ */
TCPio(InputStream Ain, OutputStream Aout /*, NamedDB info , NamedDB database */) { TCPio(InputStream Ain, OutputStream Aout, AtomicBoolean lives) {
this.Ain = Ain; this.Ain = Ain;
this.Aout = Aout; this.Aout = Aout;
// this.info = info; this.lives = lives;
// this.database = database;
} }
/** /**
@@ -84,10 +84,9 @@ public class TCPio implements Runnable {
int b; int b;
byte a[] = new byte[1]; byte a[] = new byte[1];
boolean spin = true;
try { try {
try { try {
while (spin) { while (lives.get()) {
b = Ain.read(a, 0, 1); b = Ain.read(a, 0, 1);
if (b > 0) { if (b > 0) {
Aout.write(a, 0, b); Aout.write(a, 0, b);
@@ -105,9 +104,6 @@ public class TCPio implements Runnable {
* *
*/ */
// System.out.println("TCPio: End Of Stream"); // System.out.println("TCPio: End Of Stream");
// Ain.close();
// Aout.close();
//return;
break; break;
} }
} }

View File

@@ -29,6 +29,7 @@ import java.net.Socket;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
// import net.i2p.client.I2PSession; // import net.i2p.client.I2PSession;
// import net.i2p.client.I2PSessionException; // import net.i2p.client.I2PSessionException;
import java.util.concurrent.atomic.AtomicBoolean;
import net.i2p.client.streaming.I2PServerSocket; import net.i2p.client.streaming.I2PServerSocket;
import net.i2p.client.streaming.I2PSocketManager; import net.i2p.client.streaming.I2PSocketManager;
import net.i2p.util.Log; import net.i2p.util.Log;
@@ -45,6 +46,7 @@ public class TCPlistener implements Runnable {
public I2PSocketManager socketManager; public I2PSocketManager socketManager;
public I2PServerSocket serverSocket; public I2PServerSocket serverSocket;
private ServerSocket listener; private ServerSocket listener;
private AtomicBoolean lives;
/** /**
* Constructor * Constructor
@@ -53,12 +55,13 @@ public class TCPlistener implements Runnable {
* @param database * @param database
* @param _log * @param _log
*/ */
TCPlistener(ServerSocket listener, I2PSocketManager S, NamedDB info, NamedDB database, Log _log) { TCPlistener(ServerSocket listener, I2PSocketManager S, NamedDB info, NamedDB database, Log _log, AtomicBoolean lives) {
this.database = database; this.database = database;
this.info = info; this.info = info;
this._log = _log; this._log = _log;
this.socketManager = S; this.socketManager = S;
this.listener = listener; this.listener = listener;
this.lives = lives;
} }
private void rlock() throws Exception { private void rlock() throws Exception {
@@ -77,7 +80,6 @@ public class TCPlistener implements Runnable {
*/ */
public void run() { public void run() {
boolean g = false; boolean g = false;
boolean spin = true;
int conn = 0; int conn = 0;
try { try {
die: die:
@@ -85,22 +87,7 @@ public class TCPlistener implements Runnable {
try { try {
Socket server = new Socket(); Socket server = new Socket();
listener.setSoTimeout(50); // We don't block, we cycle and check. listener.setSoTimeout(50); // We don't block, we cycle and check.
while (spin) { while (lives.get()) {
try {
rlock();
} catch (Exception e) {
break die;
}
try {
spin = info.get("RUNNING").equals(Boolean.TRUE);
} catch (Exception e) {
try {
runlock();
} catch (Exception e2) {
break die;
}
break die;
}
try { try {
server = listener.accept(); server = listener.accept();
g = true; g = true;
@@ -110,7 +97,7 @@ public class TCPlistener implements Runnable {
if (g) { if (g) {
conn++; conn++;
// toss the connection to a new thread. // toss the connection to a new thread.
TCPtoI2P conn_c = new TCPtoI2P(socketManager, server, info, database); TCPtoI2P conn_c = new TCPtoI2P(socketManager, server, info, database, lives);
Thread t = new Thread(conn_c, Thread.currentThread().getName() + " TCPtoI2P " + conn); Thread t = new Thread(conn_c, Thread.currentThread().getName() + " TCPtoI2P " + conn);
t.start(); t.start();
g = false; g = false;

View File

@@ -30,6 +30,7 @@ import java.io.OutputStream;
import java.net.ConnectException; import java.net.ConnectException;
import java.net.NoRouteToHostException; import java.net.NoRouteToHostException;
import java.net.Socket; import java.net.Socket;
import java.util.concurrent.atomic.AtomicBoolean;
import net.i2p.I2PException; import net.i2p.I2PException;
import net.i2p.client.streaming.I2PSocket; import net.i2p.client.streaming.I2PSocket;
import net.i2p.client.streaming.I2PSocketManager; import net.i2p.client.streaming.I2PSocketManager;
@@ -49,6 +50,7 @@ public class TCPtoI2P implements Runnable {
private NamedDB info, database; private NamedDB info, database;
private Socket sock; private Socket sock;
private I2PSocketManager socketManager; private I2PSocketManager socketManager;
private AtomicBoolean lives;
/** /**
* Constructor * Constructor
@@ -57,11 +59,12 @@ public class TCPtoI2P implements Runnable {
* param info * param info
* param database * param database
*/ */
TCPtoI2P(I2PSocketManager i2p, Socket socket , NamedDB info, NamedDB database) { TCPtoI2P(I2PSocketManager i2p, Socket socket, NamedDB info, NamedDB database, AtomicBoolean lives) {
this.sock = socket; this.sock = socket;
this.info = info; this.info = info;
this.database = database; this.database = database;
this.socketManager = i2p; this.socketManager = i2p;
this.lives = lives;
} }
/** /**
@@ -157,19 +160,15 @@ public class TCPtoI2P implements Runnable {
Iin = I2P.getInputStream(); Iin = I2P.getInputStream();
Iout = I2P.getOutputStream(); Iout = I2P.getOutputStream();
// setup to cross the streams // setup to cross the streams
TCPio conn_c = new TCPio(in, Iout /*, info, database */); // app -> I2P TCPio conn_c = new TCPio(in, Iout, lives); // app -> I2P
TCPio conn_a = new TCPio(Iin, out /*, info, database */); // I2P -> app TCPio conn_a = new TCPio(Iin, out, lives); // I2P -> app
t = new Thread(conn_c, Thread.currentThread().getName() + " TCPioA"); t = new Thread(conn_c, Thread.currentThread().getName() + " TCPioA");
q = new Thread(conn_a, Thread.currentThread().getName() + " TCPioB"); q = new Thread(conn_a, Thread.currentThread().getName() + " TCPioB");
// Fire! // Fire!
t.start(); t.start();
q.start(); q.start();
boolean spin = true; while (t.isAlive() && q.isAlive() && lives.get()) { // AND is used here to kill off the other thread
while (t.isAlive() && q.isAlive()) { // AND is used here to kill off the other thread
Thread.sleep(10); //sleep for 10 ms Thread.sleep(10); //sleep for 10 ms
rlock();
spin = info.get("RUNNING").equals(Boolean.TRUE);
runlock();
} }
} catch (I2PException e) { } catch (I2PException e) {
Emsg(e.toString(), out); Emsg(e.toString(), out);

View File

@@ -78,26 +78,26 @@ public class UDPIOthread implements I2PSessionListener, Runnable {
try { try {
in = new DataInputStream(socket.getInputStream()); in = new DataInputStream(socket.getInputStream());
out = new DataOutputStream(socket.getOutputStream()); out = new DataOutputStream(socket.getOutputStream());
while(up) { while (up) {
int c = in.read(data); int c = in.read(data);
// Note: could do a loopback test here with a wrapper. // Note: could do a loopback test here with a wrapper.
boolean ok = _session.sendMessage(_peerDestination, data, 0, c); boolean ok = _session.sendMessage(_peerDestination, data, 0, c);
if(!ok) { if (!ok) {
up = false; // Is this the right thing to do?? up = false; // Is this the right thing to do??
} }
} }
} catch(IOException ioe) { } catch (IOException ioe) {
_log.error("Error running", ioe); _log.error("Error running", ioe);
} catch(I2PSessionException ise) { } catch (I2PSessionException ise) {
_log.error("Error communicating", ise); _log.error("Error communicating", ise);
// } catch(DataFormatException dfe) { // } catch(DataFormatException dfe) {
// _log.error("Peer destination file is not valid", dfe); // _log.error("Peer destination file is not valid", dfe);
} finally { } finally {
if(_session != null) { if (_session != null) {
try { try {
_session.destroySession(); _session.destroySession();
} catch(I2PSessionException ise) { } catch (I2PSessionException ise) {
// ignored // ignored
} }
} }
@@ -116,9 +116,9 @@ public class UDPIOthread implements I2PSessionListener, Runnable {
byte msg[] = session.receiveMessage(msgId); byte msg[] = session.receiveMessage(msgId);
out.write(msg); out.write(msg);
out.flush(); out.flush();
} catch(I2PSessionException ise) { } catch (I2PSessionException ise) {
up = false; up = false;
} catch(IOException ioe) { } catch (IOException ioe) {
up = false; up = false;
} }
} }

View File

@@ -20,15 +20,15 @@ class I2PServerSocketImpl implements I2PServerSocket {
private final static Log _log = new Log(I2PServerSocketImpl.class); private final static Log _log = new Log(I2PServerSocketImpl.class);
private I2PSocketManager mgr; private I2PSocketManager mgr;
/** list of sockets waiting for the client to accept them */ /** list of sockets waiting for the client to accept them */
private List<I2PSocket> pendingSockets = Collections.synchronizedList(new ArrayList<I2PSocket>(4)); private final List<I2PSocket> pendingSockets = Collections.synchronizedList(new ArrayList<I2PSocket>(4));
/** have we been closed */ /** have we been closed */
private volatile boolean closing = false; private volatile boolean closing = false;
/** lock on this when accepting a pending socket, and wait on it for notification of acceptance */ /** lock on this when accepting a pending socket, and wait on it for notification of acceptance */
private Object socketAcceptedLock = new Object(); private final Object socketAcceptedLock = new Object();
/** lock on this when adding a new socket to the pending list, and wait on it accordingly */ /** lock on this when adding a new socket to the pending list, and wait on it accordingly */
private Object socketAddedLock = new Object(); private final Object socketAddedLock = new Object();
/** /**
* Set Sock Option accept timeout stub, does nothing in ministreaming * Set Sock Option accept timeout stub, does nothing in ministreaming

View File

@@ -40,15 +40,15 @@ import net.i2p.util.Log;
class I2PSocketManagerImpl implements I2PSocketManager, I2PSessionListener { class I2PSocketManagerImpl implements I2PSocketManager, I2PSessionListener {
private I2PAppContext _context; private I2PAppContext _context;
private Log _log; private Log _log;
private I2PSession _session; private /* final */ I2PSession _session;
private I2PServerSocketImpl _serverSocket = null; private I2PServerSocketImpl _serverSocket = null;
private Object lock = new Object(); // for locking socket lists private final Object lock = new Object(); // for locking socket lists
private HashMap<String,I2PSocket> _outSockets; private HashMap<String,I2PSocket> _outSockets;
private HashMap<String,I2PSocket> _inSockets; private HashMap<String,I2PSocket> _inSockets;
private I2PSocketOptions _defaultOptions; private I2PSocketOptions _defaultOptions;
private long _acceptTimeout; private long _acceptTimeout;
private String _name; private String _name;
private List<DisconnectListener> _listeners; private final List<DisconnectListener> _listeners = new ArrayList<DisconnectListener>(1);;
private static int __managerId = 0; private static int __managerId = 0;
public static final short ACK = 0x51; public static final short ACK = 0x51;
@@ -79,7 +79,7 @@ class I2PSocketManagerImpl implements I2PSocketManager, I2PSessionListener {
_inSockets = new HashMap<String,I2PSocket>(16); _inSockets = new HashMap<String,I2PSocket>(16);
_outSockets = new HashMap<String,I2PSocket>(16); _outSockets = new HashMap<String,I2PSocket>(16);
_acceptTimeout = ACCEPT_TIMEOUT_DEFAULT; _acceptTimeout = ACCEPT_TIMEOUT_DEFAULT;
_listeners = new ArrayList<DisconnectListener>(1); // _listeners = new ArrayList<DisconnectListener>(1);
setSession(session); setSession(session);
setDefaultOptions(buildOptions(opts)); setDefaultOptions(buildOptions(opts));
_context.statManager().createRateStat("streaming.lifetime", "How long before the socket is closed?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("streaming.lifetime", "How long before the socket is closed?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });

View File

@@ -230,8 +230,8 @@ public class I2PSocketManagerFull implements I2PSocketManager {
* *
*/ */
public void destroySocketManager() { public void destroySocketManager() {
_connectionManager.disconnectAllHard();
_connectionManager.setAllowIncomingConnections(false); _connectionManager.setAllowIncomingConnections(false);
_connectionManager.disconnectAllHard();
// should we destroy the _session too? // should we destroy the _session too?
// yes, since the old lib did (and SAM wants it to, and i dont know why not) // yes, since the old lib did (and SAM wants it to, and i dont know why not)
if ( (_session != null) && (!_session.isClosed()) ) { if ( (_session != null) && (!_session.isClosed()) ) {

View File

@@ -101,7 +101,12 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
private boolean _dateReceived; private boolean _dateReceived;
/** lock that we wait upon, that the SetDateMessageHandler notifies */ /** lock that we wait upon, that the SetDateMessageHandler notifies */
private final Object _dateReceivedLock = new Object(); private final Object _dateReceivedLock = new Object();
/** whether the session connection is in the process of being opened */
protected boolean _opening;
/** monitor for waiting until opened */
private final Object _openingWait = new Object();
/** /**
* thread that we tell when new messages are available who then tells us * thread that we tell when new messages are available who then tells us
* to fetch them. The point of this is so that the fetch doesn't block the * to fetch them. The point of this is so that the fetch doesn't block the
@@ -136,6 +141,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
_log = context.logManager().getLog(I2PSessionImpl.class); _log = context.logManager().getLog(I2PSessionImpl.class);
_handlerMap = new I2PClientMessageHandlerMap(context); _handlerMap = new I2PClientMessageHandlerMap(context);
_closed = true; _closed = true;
_opening = false;
_closing = false; _closing = false;
_producer = new I2CPMessageProducer(context); _producer = new I2CPMessageProducer(context);
_availabilityNotifier = new AvailabilityNotifier(); _availabilityNotifier = new AvailabilityNotifier();
@@ -212,6 +218,17 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
return _leaseSet; return _leaseSet;
} }
void setOpening(boolean ls) {
_opening = ls;
synchronized (_openingWait) {
_openingWait.notifyAll();
}
}
boolean getOpening() {
return _opening;
}
/** /**
* Load up the destKeyFile for our Destination, PrivateKey, and SigningPrivateKey * Load up the destKeyFile for our Destination, PrivateKey, and SigningPrivateKey
* *
@@ -235,6 +252,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
* not reachable * not reachable
*/ */
public void connect() throws I2PSessionException { public void connect() throws I2PSessionException {
setOpening(true);
_closed = false; _closed = false;
_availabilityNotifier.stopNotifying(); _availabilityNotifier.stopNotifying();
I2PThread notifier = new I2PThread(_availabilityNotifier); I2PThread notifier = new I2PThread(_availabilityNotifier);
@@ -294,11 +312,14 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
+ (connected - startConnect) + (connected - startConnect)
+ "ms - ready to participate in the network!"); + "ms - ready to participate in the network!");
startIdleMonitor(); startIdleMonitor();
setOpening(false);
} catch (UnknownHostException uhe) { } catch (UnknownHostException uhe) {
_closed = true; _closed = true;
setOpening(false);
throw new I2PSessionException(getPrefix() + "Invalid session configuration", uhe); throw new I2PSessionException(getPrefix() + "Invalid session configuration", uhe);
} catch (IOException ioe) { } catch (IOException ioe) {
_closed = true; _closed = true;
setOpening(false);
throw new I2PSessionException(getPrefix() + "Problem connecting to " + _hostname + " on port " + _portNum, ioe); throw new I2PSessionException(getPrefix() + "Problem connecting to " + _hostname + " on port " + _portNum, ioe);
} }
} }
@@ -547,13 +568,28 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
} }
/** /**
* Tear down the session, and do NOT reconnect * Tear down the session, and do NOT reconnect.
*
* Blocks if session has not been fully started.
*/ */
public void destroySession() { public void destroySession() {
destroySession(true); destroySession(true);
} }
/**
* Tear down the session, and do NOT reconnect.
*
* Blocks if session has not been fully started.
*/
public void destroySession(boolean sendDisconnect) { public void destroySession(boolean sendDisconnect) {
while (_opening) {
synchronized (_openingWait) {
try {
_openingWait.wait(1000);
} catch (InterruptedException ie) { // nop
}
}
}
if (_closed) return; if (_closed) return;
if (_log.shouldLog(Log.INFO)) _log.info(getPrefix() + "Destroy the session", new Exception("DestroySession()")); if (_log.shouldLog(Log.INFO)) _log.info(getPrefix() + "Destroy the session", new Exception("DestroySession()"));

View File

@@ -1,3 +1,15 @@
2009-07-16 sponge
* ministreaming:
- small pedantic fix
* streaming:
- Fix a deadly race condition.
- Some small pedantic fixes.
* core:
- Fix a deadly race condition.
* BOB:
- Fixed some races that occured from fixing races in streaming and core.
- Some badly needed code refactoring to depend less on the database.
2009-07-15 zzz 2009-07-15 zzz
* Console: * Console:
- Make light the default theme - Make light the default theme

View File

@@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */ /** deprecated */
public final static String ID = "Monotone"; public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION; public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 12; public final static long BUILD = 13;
/** for example "-test" */ /** for example "-test" */
public final static String EXTRA = ""; public final static String EXTRA = "";
public final static String FULL_VERSION = VERSION + "-" + BUILD + EXTRA; public final static String FULL_VERSION = VERSION + "-" + BUILD + EXTRA;