diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java index 2cd067375..8111d8c74 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java @@ -13,6 +13,7 @@ import java.net.NoRouteToHostException; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.Properties; import java.util.Set; import net.i2p.I2PAppContext; @@ -99,4 +100,6 @@ public interface I2PSocketManager { public String getName(); public void setName(String name); + + public void init(I2PAppContext context, I2PSession session, Properties opts, String name); } diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java index 7cc709479..076ee99fa 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java @@ -6,6 +6,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.Properties; +import net.i2p.I2PAppContext; import net.i2p.I2PException; import net.i2p.client.I2PClient; import net.i2p.client.I2PClientFactory; @@ -23,6 +24,9 @@ import net.i2p.util.Log; public class I2PSocketManagerFactory { private final static Log _log = new Log(I2PSocketManagerFactory.class); + public static final String PROP_MANAGER = "i2p.streaming.manager"; + public static final String DEFAULT_MANAGER = "net.i2p.client.streaming.I2PSocketManagerImpl"; + /** * Create a socket manager using a brand new destination connected to the * I2CP router on the local machine on the default port (7654). @@ -76,23 +80,60 @@ public class I2PSocketManagerFactory { public static I2PSocketManager createManager(InputStream myPrivateKeyStream, String i2cpHost, int i2cpPort, Properties opts) { I2PClient client = I2PClientFactory.createClient(); - opts.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_GUARANTEED); + if (true) { + // for the old streaming lib + opts.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_GUARANTEED); + } else { + // for new streaming lib: + opts.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_BEST_EFFORT); + } + opts.setProperty(I2PClient.PROP_TCP_HOST, i2cpHost); opts.setProperty(I2PClient.PROP_TCP_PORT, "" + i2cpPort); try { I2PSession session = client.createSession(myPrivateKeyStream, opts); session.connect(); - return createManager(session); + return createManager(session, opts, "manager"); } catch (I2PSessionException ise) { _log.error("Error creating session for socket manager", ise); return null; } } - private static I2PSocketManager createManager(I2PSession session) { - I2PSocketManagerImpl mgr = new I2PSocketManagerImpl(); - mgr.setSession(session); - mgr.setDefaultOptions(new I2PSocketOptions()); - return mgr; + private static I2PSocketManager createManager(I2PSession session, Properties opts, String name) { + if (false) { + I2PSocketManagerImpl mgr = new I2PSocketManagerImpl(); + mgr.setSession(session); + mgr.setDefaultOptions(new I2PSocketOptions()); + return mgr; + } else { + String classname = opts.getProperty(PROP_MANAGER, DEFAULT_MANAGER); + if (classname != null) { + try { + Class cls = Class.forName(classname); + Object obj = cls.newInstance(); + if (obj instanceof I2PSocketManager) { + I2PSocketManager mgr = (I2PSocketManager)obj; + I2PAppContext context = I2PAppContext.getGlobalContext(); + mgr.init(context, session, opts, name); + return mgr; + } else { + throw new IllegalStateException("Invalid manager class [" + classname + "]"); + } + } catch (ClassNotFoundException cnfe) { + _log.error("Error loading " + classname, cnfe); + throw new IllegalStateException("Invalid manager class [" + classname + "] - not found"); + } catch (InstantiationException ie) { + _log.error("Error loading " + classname, ie); + throw new IllegalStateException("Invalid manager class [" + classname + "] - unable to instantiate"); + } catch (IllegalAccessException iae) { + _log.error("Error loading " + classname, iae); + throw new IllegalStateException("Invalid manager class [" + classname + "] - illegal access"); + } + } else { + throw new IllegalStateException("No manager class specified"); + } + } + } } \ No newline at end of file diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerImpl.java index 1165b7b8a..3bfb656a8 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerImpl.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerImpl.java @@ -13,6 +13,7 @@ import java.net.NoRouteToHostException; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.Properties; import java.util.Set; import net.i2p.I2PAppContext; @@ -65,13 +66,18 @@ public class I2PSocketManagerImpl implements I2PSocketManager, I2PSessionListene this("SocketManager " + (++__managerId)); } public I2PSocketManagerImpl(String name) { + init(I2PAppContext.getGlobalContext(), null, null, name); + } + + public void init(I2PAppContext context, I2PSession session, Properties opts, String name) { _name = name; - _session = null; + _context = context; + _log = _context.logManager().getLog(I2PSocketManager.class); _inSockets = new HashMap(16); _outSockets = new HashMap(16); _acceptTimeout = ACCEPT_TIMEOUT_DEFAULT; - _context = I2PAppContext.getGlobalContext(); - _log = _context.logManager().getLog(I2PSocketManager.class); + setSession(session); + setDefaultOptions(new I2PSocketOptions()); _context.statManager().createRateStat("streaming.lifetime", "How long before the socket is closed?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("streaming.sent", "How many bytes are sent in the stream?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("streaming.received", "How many bytes are received in the stream?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 }); diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkSend.java b/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkSend.java new file mode 100644 index 000000000..f56c4c1b2 --- /dev/null +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkSend.java @@ -0,0 +1,132 @@ +package net.i2p.client.streaming; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.OutputStream; + +import java.net.ConnectException; +import java.net.NoRouteToHostException; + +import java.util.Random; + +import net.i2p.I2PAppContext; +import net.i2p.I2PException; +import net.i2p.data.Destination; +import net.i2p.data.DataFormatException; +import net.i2p.util.Log; + +/** + * Simple streaming lib test app that connects to a given destination and sends + * the contents of a file, then disconnects. See the {@link #main} + * + */ +public class StreamSinkSend { + private Log _log; + private String _sendFile; + private int _writeDelay; + private String _peerDestFile; + + /** + * Build the client but don't fire it up. + * @param filename file to send + * @param writeDelayMs how long to wait between each .write (0 for no delay) + * @param serverDestFile file containing the StreamSinkServer's binary Destination + */ + public StreamSinkSend(String filename, int writeDelayMs, String serverDestFile) { + _sendFile = filename; + _writeDelay = writeDelayMs; + _peerDestFile = serverDestFile; + _log = I2PAppContext.getGlobalContext().logManager().getLog(StreamSinkClient.class); + } + + /** + * Actually connect and run the client - this call blocks until completion. + * + */ + public void runClient() { + I2PSocketManager mgr = I2PSocketManagerFactory.createManager(); + Destination peer = null; + FileInputStream fis = null; + try { + fis = new FileInputStream(_peerDestFile); + peer = new Destination(); + peer.readBytes(fis); + } catch (IOException ioe) { + _log.error("Error finding the peer destination to contact in " + _peerDestFile, ioe); + return; + } catch (DataFormatException dfe) { + _log.error("Peer destination is not valid in " + _peerDestFile, dfe); + return; + } finally { + if (fis == null) try { fis.close(); } catch (IOException ioe) {} + } + + + System.out.println("Send " + _sendFile + " to " + peer.calculateHash().toBase64()); + + try { + I2PSocket sock = mgr.connect(peer); + byte buf[] = new byte[32*1024]; + OutputStream out = sock.getOutputStream(); + long beforeSending = System.currentTimeMillis(); + fis = new FileInputStream(_sendFile); + long size = 0; + while (true) { + int read = fis.read(buf); + if (read < 0) + break; + out.write(buf, 0, read); + size += read; + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Wrote " + read); + if (_writeDelay > 0) { + try { Thread.sleep(_writeDelay); } catch (InterruptedException ie) {} + } + } + fis.close(); + long afterSending = System.currentTimeMillis(); + System.out.println("Sent " + (size / 1024) + "KB in " + (afterSending-beforeSending) + "ms"); + sock.close(); + } catch (InterruptedIOException iie) { + _log.error("Timeout connecting to the peer", iie); + return; + } catch (NoRouteToHostException nrthe) { + _log.error("Unable to connect to the peer", nrthe); + return; + } catch (ConnectException ce) { + _log.error("Connection already dropped", ce); + return; + } catch (I2PException ie) { + _log.error("Error connecting to the peer", ie); + return; + } catch (IOException ioe) { + _log.error("IO error sending", ioe); + return; + } + } + + /** + * Fire up the client. Usage: StreamSinkClient sendFile writeDelayMs serverDestFile
+ * + */ + public static void main(String args[]) { + if (args.length != 3) { + System.out.println("Usage: StreamSinkClient sendFile writeDelayMs serverDestFile"); + } else { + int writeDelayMs = -1; + try { + writeDelayMs = Integer.parseInt(args[1]); + } catch (NumberFormatException nfe) { + System.err.println("Write delay ms invalid [" + args[1] + "]"); + return; + } + StreamSinkSend client = new StreamSinkSend(args[0], writeDelayMs, args[2]); + client.runClient(); + } + } +} diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java b/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java index faf5a86e1..8567f2976 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java @@ -109,10 +109,13 @@ public class StreamSinkServer { while ( (read = in.read(buf)) != -1) { _fos.write(buf, 0, read); } + _log.error("Got EOF from client socket"); } catch (IOException ioe) { _log.error("Error writing the sink", ioe); } finally { if (_fos != null) try { _fos.close(); } catch (IOException ioe) {} + if (_sock != null) try { _sock.close(); } catch (IOException ioe) {} + _log.error("Client socket closed"); } } } diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java index 12abfe271..6f1eb467d 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl.java @@ -386,8 +386,13 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa } } if ( (msgId != null) && (size != null) ) { - if (_sessionListener != null) - _sessionListener.messageAvailable(I2PSessionImpl.this, msgId.intValue(), size.intValue()); + if (_sessionListener != null) { + try { + _sessionListener.messageAvailable(I2PSessionImpl.this, msgId.intValue(), size.intValue()); + } catch (Exception e) { + _log.log(Log.CRIT, "Error notifying app of message availability", e); + } + } } } } diff --git a/core/java/src/net/i2p/client/I2PSessionImpl2.java b/core/java/src/net/i2p/client/I2PSessionImpl2.java index 05c6fe677..7d9ed1e70 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl2.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl2.java @@ -82,7 +82,8 @@ class I2PSessionImpl2 extends I2PSessionImpl { // success or accepted). we may want to break this out into a seperate // attribute, allowing both nonblocking sends and transparently managed keys, // as well as the nonblocking sends with application managed keys. Later. - if (isGuaranteed() || true) { + if (isGuaranteed() || false) { + //_log.error("sendGuaranteed"); return sendGuaranteed(dest, payload, keyUsed, tagsSent); } return sendBestEffort(dest, payload, keyUsed, tagsSent); @@ -111,16 +112,28 @@ class I2PSessionImpl2 extends I2PSessionImpl { if (key == null) key = _context.sessionKeyManager().createSession(dest.getPublicKey()); SessionTag tag = _context.sessionKeyManager().consumeNextAvailableTag(dest.getPublicKey(), key); Set sentTags = null; - if (_context.sessionKeyManager().getAvailableTags(dest.getPublicKey(), key) < 10) { + int oldTags = _context.sessionKeyManager().getAvailableTags(dest.getPublicKey(), key); + long availTimeLeft = _context.sessionKeyManager().getAvailableTimeLeft(dest.getPublicKey(), key); + if (oldTags < 10) { sentTags = createNewTags(50); - } else if (_context.sessionKeyManager().getAvailableTimeLeft(dest.getPublicKey(), key) < 30 * 1000) { + //_log.error("** sendBestEffort only had " + oldTags + " adding 50"); + } else if (availTimeLeft < 30 * 1000) { // if we have > 10 tags, but they expire in under 30 seconds, we want more sentTags = createNewTags(50); if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Tags are almost expired, adding 50 new ones"); + //_log.error("** sendBestEffort available time left " + availTimeLeft); + } else { + //_log.error("sendBestEffort old tags: " + oldTags + " available time left: " + availTimeLeft); } SessionKey newKey = null; if (false) // rekey newKey = _context.keyGenerator().generateSessionKey(); + + if ( (tagsSent != null) && (tagsSent.size() > 0) ) { + if (sentTags == null) + sentTags = new HashSet(); + sentTags.addAll(tagsSent); + } long nonce = _context.random().nextInt(Integer.MAX_VALUE); MessageState state = new MessageState(nonce, getPrefix()); diff --git a/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java b/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java index 13bc32d33..5a44f2a19 100644 --- a/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java +++ b/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java @@ -207,6 +207,8 @@ class TransientSessionKeyManager extends SessionKeyManager { sess.addTags(set); if (_log.shouldLog(Log.DEBUG)) _log.debug("Tags delivered to set " + set + " on session " + sess); + if (sessionTags.size() > 0) + _log.debug("Tags delivered: " + sessionTags.size() + " total = " + sess.availableTags()); } /** diff --git a/history.txt b/history.txt index f431aab91..3910c51b7 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,17 @@ -$Id: history.txt,v 1.54 2004/10/18 14:08:01 jrandom Exp $ +$Id: history.txt,v 1.55 2004/10/23 20:42:35 jrandom Exp $ + +2004-10-24 jrandom + * Allow explicit inclusion of session tags in the SDK, enabling the + resending of tags bundled with messages that would not otherwise + be ACKed. + * Don't force mode=guaranteed for end to end delivery - if mode=bestEffort + no DeliveryStatusMessage will be bundled (and as such, client apps using + it will need to do their own session tag ack/nack). + * Handle client errors when notifying them of message availability. + * New StreamSinkSend which sends a file to a destination and disconnects. + * Update the I2PSocketManagerFactory to build the specific + I2PSocketManager instance based on the "i2p.streaming.manager" property, + containing the class name of the I2PSocketManager to instantiate. 2004-10-23 jrandom * Minor ministreaming lib refactoring to simplify integration of the full diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 0e741725c..715fdd67e 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.60 $ $Date: 2004/10/18 14:08:00 $"; + public final static String ID = "$Revision: 1.61 $ $Date: 2004/10/23 20:42:35 $"; public final static String VERSION = "0.4.1.3"; - public final static long BUILD = 1; + public final static long BUILD = 2; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID);