2004-11-05 jrandom

* Bugfixes and unit tests for the SAM bridge to handle quoted message
      parameters, verify proper operation after multiple session lifetimes,
      as well as some synchronization problems.
    * New properties method on the DataHelper class.
    * Address a race on fast disconnecting clients
This commit is contained in:
jrandom
2004-11-05 10:53:40 +00:00
committed by zzz
parent 4341a0c198
commit 7a0236ad29
10 changed files with 172 additions and 75 deletions

View File

@@ -46,7 +46,7 @@ public class SAMBridge implements Runnable {
* app designated destination name to the base64 of the I2P formatted
* destination keys (Destination+PrivateKey+SigningPrivateKey)
*/
private Map nameToPrivKeys = Collections.synchronizedMap(new HashMap(8));
private Map nameToPrivKeys;
private boolean acceptConnections = true;
@@ -65,6 +65,7 @@ public class SAMBridge implements Runnable {
*/
public SAMBridge(String listenHost, int listenPort, Properties i2cpProps, String persistFile) {
persistFilename = persistFile;
nameToPrivKeys = new HashMap(8);
loadKeys();
try {
if ( (listenHost != null) && !("0.0.0.0".equals(listenHost)) ) {
@@ -93,16 +94,18 @@ public class SAMBridge implements Runnable {
* @return null if the name does not exist, or if it is improperly formatted
*/
public Destination getDestination(String name) {
String val = (String)nameToPrivKeys.get(name);
if (val == null) return null;
try {
Destination d = new Destination();
d.fromBase64(val);
return d;
} catch (DataFormatException dfe) {
_log.error("Error retrieving the destination from " + name, dfe);
nameToPrivKeys.remove(name);
return null;
synchronized (nameToPrivKeys) {
String val = (String)nameToPrivKeys.get(name);
if (val == null) return null;
try {
Destination d = new Destination();
d.fromBase64(val);
return d;
} catch (DataFormatException dfe) {
_log.error("Error retrieving the destination from " + name, dfe);
nameToPrivKeys.remove(name);
return null;
}
}
}
@@ -114,9 +117,11 @@ public class SAMBridge implements Runnable {
* @return null if the name does not exist, else the stream
*/
public String getKeystream(String name) {
String val = (String)nameToPrivKeys.get(name);
if (val == null) return null;
return val;
synchronized (nameToPrivKeys) {
String val = (String)nameToPrivKeys.get(name);
if (val == null) return null;
return val;
}
}
/**
@@ -124,7 +129,9 @@ public class SAMBridge implements Runnable {
*
*/
public void addKeystream(String name, String stream) {
nameToPrivKeys.put(name, stream);
synchronized (nameToPrivKeys) {
nameToPrivKeys.put(name, stream);
}
storeKeys();
}
@@ -132,49 +139,52 @@ public class SAMBridge implements Runnable {
* Load up the keys from the persistFilename
*
*/
private synchronized void loadKeys() {
Map keys = new HashMap(16);
FileInputStream in = null;
try {
in = new FileInputStream(persistFilename);
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
String line = null;
while ( (line = reader.readLine()) != null) {
int eq = line.indexOf('=');
String name = line.substring(0, eq);
String privKeys = line.substring(eq+1);
keys.put(name, privKeys);
private void loadKeys() {
synchronized (nameToPrivKeys) {
nameToPrivKeys.clear();
FileInputStream in = null;
try {
in = new FileInputStream(persistFilename);
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
String line = null;
while ( (line = reader.readLine()) != null) {
int eq = line.indexOf('=');
String name = line.substring(0, eq);
String privKeys = line.substring(eq+1);
nameToPrivKeys.put(name, privKeys);
}
} catch (FileNotFoundException fnfe) {
_log.warn("Key file does not exist at " + persistFilename);
} catch (IOException ioe) {
_log.error("Unable to read the keys from " + persistFilename, ioe);
} finally {
if (in != null) try { in.close(); } catch (IOException ioe) {}
}
} catch (FileNotFoundException fnfe) {
_log.warn("Key file does not exist at " + persistFilename);
} catch (IOException ioe) {
_log.error("Unable to read the keys from " + persistFilename, ioe);
} finally {
if (in != null) try { in.close(); } catch (IOException ioe) {}
}
nameToPrivKeys = Collections.synchronizedMap(keys);
}
/**
* Store the current keys to disk in the location specified on creation
*
*/
private synchronized void storeKeys() {
FileOutputStream out = null;
try {
out = new FileOutputStream(persistFilename);
for (Iterator iter = nameToPrivKeys.keySet().iterator(); iter.hasNext(); ) {
String name = (String)iter.next();
String privKeys = (String)nameToPrivKeys.get(name);
out.write(name.getBytes());
out.write('=');
out.write(privKeys.getBytes());
out.write('\n');
private void storeKeys() {
synchronized (nameToPrivKeys) {
FileOutputStream out = null;
try {
out = new FileOutputStream(persistFilename);
for (Iterator iter = nameToPrivKeys.keySet().iterator(); iter.hasNext(); ) {
String name = (String)iter.next();
String privKeys = (String)nameToPrivKeys.get(name);
out.write(name.getBytes());
out.write('=');
out.write(privKeys.getBytes());
out.write('\n');
}
} catch (IOException ioe) {
_log.error("Error writing out the SAM keys to " + persistFilename, ioe);
} finally {
if (out != null) try { out.close(); } catch (IOException ioe) {}
}
} catch (IOException ioe) {
_log.error("Error writing out the SAM keys to " + persistFilename, ioe);
} finally {
if (out != null) try { out.close(); } catch (IOException ioe) {}
}
}

View File

@@ -111,9 +111,10 @@ public class SAMUtils {
*/
public static Properties parseParams(StringTokenizer tok) throws SAMException {
int pos, nprops = 0, ntoks = tok.countTokens();
String token, param, value;
String token, param;
Properties props = new Properties();
StringBuffer value = new StringBuffer();
for (int i = 0; i < ntoks; ++i) {
token = tok.nextToken();
@@ -123,9 +124,16 @@ public class SAMUtils {
throw new SAMException("Bad formatting for param [" + token + "]");
}
param = token.substring(0, pos);
value = token.substring(pos + 1);
value.append(token.substring(pos+1));
if (value.charAt(0) == '"') {
while ( (i < ntoks) && (value.lastIndexOf("\"") <= 0) ) {
value.append(' ').append(tok.nextToken());
i++;
}
}
props.setProperty(param, value);
props.setProperty(param, value.toString());
value.setLength(0);
nprops += 1;
}
@@ -157,4 +165,19 @@ public class SAMUtils {
return msg;
}
public static void main(String args[]) {
try {
test("a=b c=d e=\"f g h\"");
test("a=\"b c d\" e=\"f g h\" i=\"j\"");
test("a=\"b c d\" e=f i=\"j\"");
} catch (Exception e) {
e.printStackTrace();
}
}
private static void test(String props) throws Exception {
StringTokenizer tok = new StringTokenizer(props);
Properties p = parseParams(tok);
System.out.println(p);
}
}

View File

@@ -107,7 +107,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
msg = buf.toString("ISO-8859-1").trim();
if (_log.shouldLog(Log.DEBUG)) {
_log.debug("New message received: " + msg);
_log.debug("New message received: [" + msg + "]");
}
buf.reset();

View File

@@ -33,7 +33,11 @@ public class TestStreamTransfer {
private static void runTest(String samHost, int samPort, String conOptions) {
startAlice(samHost, samPort, conOptions);
testBob(samHost, samPort, conOptions);
for (int i = 0; i < 20; i++) {
testBob("bob" + i, samHost, samPort, conOptions);
if (i % 2 == 1)
try { Thread.sleep(10*1000); } catch (InterruptedException ie) {}
}
}
private static void startAlice(String host, int port, String conOptions) {
@@ -95,11 +99,13 @@ public class TestStreamTransfer {
try { _out.close(); } catch (IOException ioe) {}
try { _s.close(); } catch (IOException ioe) {}
_streams.clear();
_dead = true;
}
}
}
private void doRun() throws IOException, SAMException {
String line = _reader.readLine();
_log.debug("Read: " + line);
StringTokenizer tok = new StringTokenizer(line);
String maj = tok.nextToken();
String min = tok.nextToken();
@@ -146,12 +152,15 @@ public class TestStreamTransfer {
}
_log.info("Received from the stream " + id + ": [" + new String(payload) + "]");
try { Thread.sleep(5*1000); } catch (InterruptedException ie) {}
/*
// now echo it back
String reply = "STREAM SEND ID=" + id +
" SIZE=" + payloadSize +
"\n" + payload;
"\n" + new String(payload);
_out.write(reply.getBytes());
_out.flush();
_log.info("Reply sent back [" + new String(reply.getBytes()) + "]");
*/
} else {
_log.error("Received unsupported type [" + maj + "/"+ min + "]");
return;
@@ -159,8 +168,27 @@ public class TestStreamTransfer {
}
}
private static void testBob(String host, int port, String conOptions) {
_log.info("\n\nTesting Bob\n\n\n");
private static void testBob(String sessionName, String host, int port, String conOptions) {
I2PThread t = new I2PThread(new TestBob(sessionName, host, port, conOptions), sessionName);
t.start();
}
private static class TestBob implements Runnable {
private String _sessionName;
private String _host;
private int _port;
private String _opts;
public TestBob(String name, String host, int port, String opts) {
_sessionName = name;
_host = host;
_port = port;
_opts = opts;
}
public void run() {
doTestBob(_sessionName, _host, _port, _opts);
}
}
private static void doTestBob(String sessionName, String host, int port, String conOptions) {
_log.info("\n\nTesting " + sessionName + "\n\n\n");
try {
Socket s = new Socket(host, port);
OutputStream out = s.getOutputStream();
@@ -168,32 +196,37 @@ public class TestStreamTransfer {
BufferedReader reader = new BufferedReader(new InputStreamReader(s.getInputStream()));
String line = reader.readLine();
_log.debug("line read for valid version: " + line);
String req = "SESSION CREATE STYLE=STREAM DESTINATION=Bob " + conOptions + "\n";
String req = "SESSION CREATE STYLE=STREAM DESTINATION=" + sessionName + " " + conOptions + "\n";
out.write(req.getBytes());
line = reader.readLine();
_log.info("Response to creating the session with destination Bob: " + line);
_log.info("Response to creating the session with destination "+ sessionName+": " + line);
req = "STREAM CONNECT ID=42 DESTINATION=" + _alice + "\n";
out.write(req.getBytes());
line = reader.readLine();
_log.info("Response to the stream connect from Bob to Alice: " + line);
_log.info("Response to the stream connect from "+sessionName+" to Alice: " + line);
StringTokenizer tok = new StringTokenizer(line);
String maj = tok.nextToken();
String min = tok.nextToken();
Properties props = SAMUtils.parseParams(tok);
_log.info("props = " + props);
String result = props.getProperty("RESULT");
if (!("OK".equals(result))) {
_log.error("Unable to connect!");
_dead = true;
//_dead = true;
return;
}
try { Thread.sleep(5*1000) ; } catch (InterruptedException ie) {}
req = "STREAM SEND ID=42 SIZE=10\nBlahBlah!!";
_log.info("Sending data");
out.write(req.getBytes());
out.flush();
try { Thread.sleep(20*1000); } catch (InterruptedException ie) {}
_log.info("Sending close");
req = "STREAM CLOSE ID=42\n";
out.write(req.getBytes());
try { Thread.sleep(3*1000); } catch (InterruptedException ie) {}
_dead = true;
out.flush();
try { Thread.sleep(30*1000); } catch (InterruptedException ie) {}
//_dead = true;
s.close();
} catch (Exception e) {
_log.error("Error testing for valid version", e);
@@ -203,7 +236,7 @@ public class TestStreamTransfer {
public static void main(String args[]) {
// "i2cp.tcp.host=www.i2p.net i2cp.tcp.port=7765 tunnels.inboundDepth=0";
// "i2cp.tcp.host=localhost i2cp.tcp.port=7654 tunnels.inboundDepth=0";
String conOptions = "i2cp.tcp.host=www.i2p.net i2cp.tcp.port=7765 tunnels.inboundDepth=0";
String conOptions = "i2cp.tcp.host=localhost i2cp.tcp.port=10001 tunnels.inboundDepth=0";
if (args.length > 0) {
conOptions = "";
for (int i = 0; i < args.length; i++)
@@ -215,8 +248,8 @@ public class TestStreamTransfer {
} catch (Throwable t) {
_log.error("Error running test", t);
}
try { Thread.sleep(5*1000); } catch (InterruptedException ie) {}
System.exit(0);
//try { Thread.sleep(5*1000); } catch (InterruptedException ie) {}
//System.exit(0);
}
}