diff --git a/apps/sam/java/src/net/i2p/sam/SAMHandlerFactory.java b/apps/sam/java/src/net/i2p/sam/SAMHandlerFactory.java index bd0a23d4e..21a0e97d2 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMHandlerFactory.java +++ b/apps/sam/java/src/net/i2p/sam/SAMHandlerFactory.java @@ -112,12 +112,15 @@ public class SAMHandlerFactory { case 1: handler = new SAMv1Handler(s, verMajor, verMinor, i2cpProps); break; + case 2: + handler = new SAMv2Handler(s, verMajor, verMinor, i2cpProps); + break; default: _log.error("BUG! Trying to initialize the wrong SAM version!"); throw new SAMException("BUG! (in handler instantiation)"); } } catch (IOException e) { - _log.error("Error creating the v1 handler", e); + _log.error("Error creating the handler for version "+verMajor, e); throw new SAMException("IOException caught during SAM handler instantiation"); } return handler; @@ -133,15 +136,16 @@ public class SAMHandlerFactory { || (maxMajor == -1) || (maxMinor == -1)) { return null; } - if (minMajor > maxMajor) { - return null; - } else if ((minMajor == maxMajor) && (minMinor > maxMinor)) { - return null; - } - if ((minMajor >= 1) && (minMinor >= 0)) { - return "1.0"; - } + if ((minMinor >= 10) || (maxMinor >= 10)) return null ; + + float fminVer = (float) minMajor + (float) minMinor / 10 ; + float fmaxVer = (float) maxMajor + (float) maxMinor / 10 ; + + + if ( ( fminVer <= 2.0 ) && ( fmaxVer >= 2.0 ) ) return "2.0" ; + + if ( ( fminVer <= 1.0 ) && ( fmaxVer >= 1.0 ) ) return "1.0" ; return null; } diff --git a/apps/sam/java/src/net/i2p/sam/SAMStreamReceiver.java b/apps/sam/java/src/net/i2p/sam/SAMStreamReceiver.java index 1a0fd4823..3ee48e56c 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMStreamReceiver.java +++ b/apps/sam/java/src/net/i2p/sam/SAMStreamReceiver.java @@ -15,14 +15,31 @@ import net.i2p.data.Destination; /** * Interface for sending streaming data to a SAM client */ + public interface SAMStreamReceiver { + /** + * Sends the result of a stream send operation + */ + public void streamSendAnswer( int id, String result, String bufferState ) throws IOException; + + /** + * Notifies that the outwards buffer is free for writing + */ + public void notifyStreamSendBufferFree( int id ) throws IOException; /** * Notify about a new incoming connection * * @param id New connection id */ - public void notifyStreamConnection(int id, Destination dest) throws IOException; + public void notifyStreamIncomingConnection ( int id, Destination dest ) throws IOException; + + /** + * Notify about a new outgoing connection + * + * @param id New connection id + */ + public void notifyStreamOutgoingConnection(int id, String result, String msg) throws IOException; /** * Send a byte array to a SAM client. diff --git a/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java b/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java index 82373ed06..0c946471d 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java +++ b/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java @@ -47,13 +47,13 @@ public class SAMStreamSession { private final static Log _log = new Log(SAMStreamSession.class); - private final static int SOCKET_HANDLER_BUF_SIZE = 32768; + protected final static int SOCKET_HANDLER_BUF_SIZE = 32768; - private SAMStreamReceiver recv = null; + protected SAMStreamReceiver recv = null; private SAMStreamSessionServer server = null; - private I2PSocketManager socketMgr = null; + protected I2PSocketManager socketMgr = null; private Object handlersMapLock = new Object(); /** stream id (Long) to SAMStreamSessionSocketReader */ @@ -65,13 +65,14 @@ public class SAMStreamSession { private int lastNegativeId = 0; // Can we create outgoing connections? - private boolean canCreate = false; + protected boolean canCreate = false; /** * should we flush every time we get a STREAM SEND, or leave that up to * the streaming lib to decide? */ - private boolean forceFlush = false; + protected boolean forceFlush = false; + public static String PROP_FORCE_FLUSH = "sam.forceFlush"; public static String DEFAULT_FORCE_FLUSH = "false"; @@ -189,7 +190,7 @@ public class SAMStreamSession { * @throws InterruptedIOException if the connection timeouts * @throws I2PException if there's another I2P-related error */ - public boolean connect(int id, String dest, Properties props) throws I2PException, ConnectException, NoRouteToHostException, DataFormatException, InterruptedIOException, SAMInvalidDirectionException { + public boolean connect ( int id, String dest, Properties props ) throws I2PException, ConnectException, NoRouteToHostException, DataFormatException, InterruptedIOException, SAMInvalidDirectionException, IOException { if (!canCreate) { _log.debug("Trying to create an outgoing connection using a receive-only session"); throw new SAMInvalidDirectionException("Trying to create connections through a receive-only session"); @@ -208,10 +209,15 @@ public class SAMStreamSession { opts.setConnectTimeout(60 * 1000); _log.debug("Connecting new I2PSocket..."); + + // blocking connection (SAMv1) + I2PSocket i2ps = socketMgr.connect(d, opts); createSocketHandler(i2ps, id); + recv.notifyStreamOutgoingConnection ( id, "OK", null ); + return true; } @@ -277,7 +283,7 @@ public class SAMStreamSession { * * @return An id associated to the socket handler */ - private int createSocketHandler(I2PSocket s, int id) { + protected int createSocketHandler ( I2PSocket s, int id ) { SAMStreamSessionSocketReader reader = null; StreamSender sender = null; if (id == 0) { @@ -285,8 +291,8 @@ public class SAMStreamSession { } try { - reader = new SAMStreamSessionSocketReader(s, id); - sender = new StreamSender(s, id); + reader = newSAMStreamSessionSocketReader(s, id); + sender = newStreamSender(s, id); } catch (IOException e) { _log.error("IOException when creating SAM STREAM session socket handler", e); recv.stopStreamReceiving(); @@ -318,7 +324,7 @@ public class SAMStreamSession { * * @param id Handler id */ - private SAMStreamSessionSocketReader getSocketReader(int id) { + protected SAMStreamSessionSocketReader getSocketReader ( int id ) { synchronized (handlersMapLock) { return (SAMStreamSessionSocketReader)handlersMap.get(new Integer(id)); } @@ -334,7 +340,7 @@ public class SAMStreamSession { * * @param id Handler id */ - private boolean checkSocketHandlerId(int id) { + protected boolean checkSocketHandlerId ( int id ) { synchronized (handlersMapLock) { return (!(handlersMap.get(new Integer(id)) == null)); } @@ -345,7 +351,7 @@ public class SAMStreamSession { * * @param id Handler id to be removed */ - private void removeSocketHandler(int id) { + protected void removeSocketHandler ( int id ) { SAMStreamSessionSocketReader reader = null; StreamSender sender = null; @@ -446,7 +452,8 @@ public class SAMStreamSession { } _log.debug("New connection id: " + id); - recv.notifyStreamConnection(id, i2ps.getPeerDestination()); + + recv.notifyStreamIncomingConnection ( id, i2ps.getPeerDestination() ); } catch (I2PException e) { _log.debug("Caught I2PException", e); break; @@ -469,29 +476,62 @@ public class SAMStreamSession { } + + boolean setReceiveLimit ( int id, long limit, boolean nolimit ) + { + _log.debug ( "Protocol v1 does not support a receive limit for streams" ); + return false ; + } + /** - * SAM STREAM socket handler, running in its own thread. It forwards + * SAM STREAM socket reader, running in its own thread. It forwards * forward data to/from an I2P socket. * * @author human */ public class SAMStreamSessionSocketReader implements Runnable { - private I2PSocket i2pSocket = null; + protected I2PSocket i2pSocket = null; - private Object runningLock = new Object(); - private boolean stillRunning = true; + protected Object runningLock = new Object(); + + protected boolean stillRunning = true; + + protected int id; - private int id; - /** * Create a new SAM STREAM session socket reader * * @param s Socket to be handled * @param id Unique id assigned to the handler */ - public SAMStreamSessionSocketReader(I2PSocket s, int id) throws IOException { - _log.debug("Instantiating new SAM STREAM session socket handler"); + public SAMStreamSessionSocketReader ( I2PSocket s, int id ) throws IOException {} + + /** + * Stop a SAM STREAM session socket reader thread immediately. + */ + public void stopRunning() {} + + public void run() {} + + } + + protected SAMStreamSessionSocketReader + newSAMStreamSessionSocketReader ( I2PSocket s, int id ) throws IOException { + return new SAMv1StreamSessionSocketReader ( s, id ); + } + + public class SAMv1StreamSessionSocketReader extends SAMStreamSessionSocketReader { + /** + * Create a new SAM STREAM session socket reader + * + * @param s Socket to be handled + * @param id Unique id assigned to the handler + */ + + public SAMv1StreamSessionSocketReader ( I2PSocket s, int id ) throws IOException { + super(s, id); + _log.debug("Instantiating new SAM STREAM session socket reader"); i2pSocket = s; this.id = id; @@ -507,6 +547,7 @@ public class SAMStreamSession { if (stillRunning) { stillRunning = false; } + runningLock.notifyAll() ; } } @@ -558,7 +599,40 @@ public class SAMStreamSession { * Lets us push data through the stream without blocking, (even after exceeding * the I2PSocket's buffer) */ - private class StreamSender implements Runnable { + protected class StreamSender implements Runnable { + public StreamSender ( I2PSocket s, int id ) throws IOException {} + + /** + * Send bytes through the SAM STREAM session socket sender + * + * @param data Data to be sent + * + * @throws IOException if the client didnt provide enough data + */ + public void sendBytes ( InputStream in, int size ) throws IOException {} + + + /** + * Stop a SAM STREAM session socket sender thread immediately + * + */ + public void stopRunning() {} + + /** + * Stop a SAM STREAM session socket sender gracefully: stop the + * sender thread once all pending data has been sent. + */ + public void shutDownGracefully() {} + + public void run() {} + } + + protected StreamSender newStreamSender ( I2PSocket s, int id ) throws IOException { + return new v1StreamSender ( s, id ) ; + } + + protected class v1StreamSender extends StreamSender + { private List _data; private int _id; private ByteCache _cache; @@ -567,7 +641,8 @@ public class SAMStreamSession { private Object runningLock = new Object(); private I2PSocket i2pSocket = null; - public StreamSender(I2PSocket s, int id) throws IOException { + public v1StreamSender ( I2PSocket s, int id ) throws IOException { + super ( s, id ); _data = new ArrayList(1); _id = id; _cache = ByteCache.getInstance(4, 32*1024); diff --git a/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java b/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java index dfca50e6d..cd3861f9d 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java +++ b/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java @@ -35,6 +35,8 @@ import net.i2p.util.Log; * @author human */ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramReceiver, SAMStreamReceiver { + protected int verMajorId = 1; + protected int verMinorId = 0; private final static Log _log = new Log(SAMv1Handler.class); @@ -42,7 +44,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag private SAMRawSession rawSession = null; private SAMDatagramSession datagramSession = null; - private SAMStreamSession streamSession = null; + protected SAMStreamSession streamSession = null; private long _id; private static volatile long __id = 0; @@ -74,11 +76,15 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag _id = ++__id; _log.debug("SAM version 1 handler instantiated"); - if ((this.verMajor != 1) || (this.verMinor != 0)) { + if ( ! verifVersion() ) { throw new SAMException("BUG! Wrong protocol version!"); } } + public boolean verifVersion() { + return ( verMajor == 1 && verMinor == 0 ) ; + } + public void handle() { String msg = null; String domain = null; @@ -248,7 +254,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag } props.remove("DIRECTION"); - streamSession = new SAMStreamSession(destKeystream, dir,props,this); + streamSession = newSAMStreamSession(destKeystream, dir,props); } else { _log.debug("Unrecognized SESSION STYLE: \"" + style +"\""); return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Unrecognized SESSION STYLE\"\n"); @@ -275,6 +281,13 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag } } + + SAMStreamSession newSAMStreamSession(String destKeystream, String direction, Properties props ) + throws IOException, DataFormatException, SAMException + { + return new SAMStreamSession(destKeystream, direction, props, this) ; + } + /* Parse and execute a DEST message*/ private boolean execDestMessage(String opcode, Properties props) { @@ -489,7 +502,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag } /* Parse and execute a STREAM message */ - private boolean execStreamMessage(String opcode, Properties props) { + protected boolean execStreamMessage(String opcode, Properties props) { if (streamSession == null) { _log.error("STREAM message received, but no STREAM session exists"); return false; @@ -508,7 +521,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag } } - private boolean execStreamSend(Properties props) { + protected boolean execStreamSend(Properties props) { if (props == null) { _log.debug("No parameters specified in STREAM SEND message"); return false; @@ -570,7 +583,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag } } - private boolean execStreamConnect(Properties props) { + protected boolean execStreamConnect(Properties props) { if (props == null) { _log.debug("No parameters specified in STREAM CONNECT message"); return false; @@ -604,39 +617,38 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag props.remove("DESTINATION"); try { - if (!streamSession.connect(id, dest, props)) { - _log.debug("STREAM connection failed"); - return false; + try { + if (!streamSession.connect(id, dest, props)) { + _log.debug("STREAM connection failed"); + return false; + } + } catch (DataFormatException e) { + _log.debug("Invalid destination in STREAM CONNECT message"); + notifyStreamOutgoingConnection ( id, "INVALID_KEY", null ); + } catch (SAMInvalidDirectionException e) { + _log.debug("STREAM CONNECT failed: " + e.getMessage()); + notifyStreamOutgoingConnection ( id, "INVALID_DIRECTION", null ); + } catch (ConnectException e) { + _log.debug("STREAM CONNECT failed: " + e.getMessage()); + notifyStreamOutgoingConnection ( id, "CONNECTION_REFUSED", null ); + } catch (NoRouteToHostException e) { + _log.debug("STREAM CONNECT failed: " + e.getMessage()); + notifyStreamOutgoingConnection ( id, "CANT_REACH_PEER", null ); + } catch (InterruptedIOException e) { + _log.debug("STREAM CONNECT failed: " + e.getMessage()); + notifyStreamOutgoingConnection ( id, "TIMEOUT", null ); + } catch (I2PException e) { + _log.debug("STREAM CONNECT failed: " + e.getMessage()); + notifyStreamOutgoingConnection ( id, "I2P_ERROR", null ); } - return writeString("STREAM STATUS RESULT=OK ID=" + id + "\n"); - } catch (DataFormatException e) { - _log.debug("Invalid destination in STREAM CONNECT message"); - return writeString("STREAM STATUS RESULT=INVALID_KEY ID=" - + id + "\n"); - } catch (SAMInvalidDirectionException e) { - _log.debug("STREAM CONNECT failed: " + e.getMessage()); - return writeString("STREAM STATUS RESULT=INVALID_DIRECTION ID=" - + id + "\n"); - } catch (ConnectException e) { - _log.debug("STREAM CONNECT failed: " + e.getMessage()); - return writeString("STREAM STATUS RESULT=CONNECTION_REFUSED ID=" - + id + "\n"); - } catch (NoRouteToHostException e) { - _log.debug("STREAM CONNECT failed: " + e.getMessage()); - return writeString("STREAM STATUS RESULT=CANT_REACH_PEER ID=" - + id + "\n"); - } catch (InterruptedIOException e) { - _log.debug("STREAM CONNECT failed: " + e.getMessage()); - return writeString("STREAM STATUS RESULT=TIMEOUT ID=" - + id + "\n"); - } catch (I2PException e) { - _log.debug("STREAM CONNECT failed: " + e.getMessage()); - return writeString("STREAM STATUS RESULT=I2P_ERROR ID=" - + id + "\n"); + } catch (IOException e) { + return false ; } + + return true ; } - private boolean execStreamClose(Properties props) { + protected boolean execStreamClose(Properties props) { if (props == null) { _log.debug("No parameters specified in STREAM CLOSE message"); return false; @@ -745,7 +757,41 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag } // SAMStreamReceiver implementation - public void notifyStreamConnection(int id, Destination d) throws IOException { + + public void streamSendAnswer( int id, String result, String bufferState ) throws IOException + { + if ( streamSession == null ) + { + _log.error ( "BUG! Want to answer to stream SEND, but session is null!" ); + throw new NullPointerException ( "BUG! STREAM session is null!" ); + } + + if ( !writeString ( "STREAM SEND ID=" + id + + " RESULT=" + result + + " STATE=" + bufferState + + "\n" ) ) + { + throw new IOException ( "Error notifying connection to SAM client" ); + } + } + + + public void notifyStreamSendBufferFree( int id ) throws IOException + { + if ( streamSession == null ) + { + _log.error ( "BUG! Stream outgoing buffer is free, but session is null!" ); + throw new NullPointerException ( "BUG! STREAM session is null!" ); + } + + if ( !writeString ( "STREAM READY_TO_SEND ID=" + id + "\n" ) ) + { + throw new IOException ( "Error notifying connection to SAM client" ); + } + } + + + public void notifyStreamIncomingConnection(int id, Destination d) throws IOException { if (streamSession == null) { _log.error("BUG! Received stream connection, but session is null!"); throw new NullPointerException("BUG! STREAM session is null!"); @@ -758,6 +804,28 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag } } + public void notifyStreamOutgoingConnection ( int id, String result, String msg ) throws IOException + { + if ( streamSession == null ) + { + _log.error ( "BUG! Received stream connection, but session is null!" ); + throw new NullPointerException ( "BUG! STREAM session is null!" ); + } + + String msgString = "" ; + + if ( msg != null ) msgString = " MESSAGE=\"" + msg + "\""; + + if ( !writeString ( "STREAM STATUS RESULT=" + + result + + " ID=" + id + + msgString + + "\n" ) ) + { + throw new IOException ( "Error notifying connection to SAM client" ); + } + } + public void receiveStreamBytes(int id, byte data[], int len) throws IOException { if (streamSession == null) { _log.error("Received stream bytes, but session is null!"); diff --git a/apps/sam/java/src/net/i2p/sam/SAMv2Handler.java b/apps/sam/java/src/net/i2p/sam/SAMv2Handler.java new file mode 100644 index 000000000..5d52b7fce --- /dev/null +++ b/apps/sam/java/src/net/i2p/sam/SAMv2Handler.java @@ -0,0 +1,196 @@ +package net.i2p.sam; +/* + * free (adj.): unencumbered; not under the control of others + * Written by human in 2004 and released into the public domain + * with no warranty of any kind, either expressed or implied. + * It probably won't make your computer catch on fire, or eat + * your children, but it might. Use at your own risk. + * + */ + +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.io.OutputStream; +import java.net.ConnectException; +import java.net.NoRouteToHostException; +import java.net.Socket; +import java.util.Properties; +import java.util.StringTokenizer; + +import net.i2p.I2PException; +import net.i2p.client.I2PSessionException; +import net.i2p.data.Base64; +import net.i2p.data.DataFormatException; +import net.i2p.data.DataHelper; +import net.i2p.data.Destination; +import net.i2p.util.Log; + +/** + * Class able to handle a SAM version 2 client connection. + * + * @author mkvore + */ + +public class SAMv2Handler extends SAMv1Handler implements SAMRawReceiver, SAMDatagramReceiver, SAMStreamReceiver +{ + + private final static Log _log = new Log ( SAMv2Handler.class ); + + + /** + * Create a new SAM version 2 handler. This constructor expects + * that the SAM HELLO message has been still answered (and + * stripped) from the socket input stream. + * + * @param s Socket attached to a SAM client + * @param verMajor SAM major version to manage (should be 2) + * @param verMinor SAM minor version to manage + */ + public SAMv2Handler ( Socket s, int verMajor, int verMinor ) throws SAMException, IOException + { + this ( s, verMajor, verMinor, new Properties() ); + } + + /** + * Create a new SAM version 2 handler. This constructor expects + * that the SAM HELLO message has been still answered (and + * stripped) from the socket input stream. + * + * @param s Socket attached to a SAM client + * @param verMajor SAM major version to manage (should be 2) + * @param verMinor SAM minor version to manage + * @param i2cpProps properties to configure the I2CP connection (host, port, etc) + */ + + public SAMv2Handler ( Socket s, int verMajor, int verMinor, Properties i2cpProps ) throws SAMException, IOException + { + super ( s, verMajor, verMinor, i2cpProps ); + } + + public boolean verifVersion() + { + return (verMajor == 2 && verMinor == 0) ; + } + + SAMStreamSession newSAMStreamSession(String destKeystream, String direction, Properties props ) + throws IOException, DataFormatException, SAMException + { + return new SAMv2StreamSession(destKeystream, direction, props, this) ; + } + + + /* Parse and execute a STREAM message */ + protected boolean execStreamMessage ( String opcode, Properties props ) + { + if ( streamSession == null ) + { + _log.error ( "STREAM message received, but no STREAM session exists" ); + return false; + } + + if ( opcode.equals ( "SEND" ) ) + { + return execStreamSend ( props ); + } + else if ( opcode.equals ( "CONNECT" ) ) + { + return execStreamConnect ( props ); + } + else if ( opcode.equals ( "CLOSE" ) ) + { + return execStreamClose ( props ); + } + else if ( opcode.equals ( "RECEIVE") ) + { + return execStreamReceive( props ); + } + else + { + _log.debug ( "Unrecognized RAW message opcode: \"" + + opcode + "\"" ); + return false; + } + } + + + + + + private boolean execStreamReceive ( Properties props ) + { + if ( props == null ) + { + _log.debug ( "No parameters specified in STREAM RECEIVE message" ); + return false; + } + + int id; + + { + String strid = props.getProperty ( "ID" ); + + if ( strid == null ) + { + _log.debug ( "ID not specified in STREAM RECEIVE message" ); + return false; + } + + try + { + id = Integer.parseInt ( strid ); + } + catch ( NumberFormatException e ) + { + _log.debug ( "Invalid STREAM RECEIVE ID specified: " + strid ); + return false; + } + } + + boolean nolimit = false; + + long limit = 0; + { + String strsize = props.getProperty ( "LIMIT" ); + + if ( strsize == null ) + { + _log.debug ( "Limit not specified in STREAM RECEIVE message" ); + return false; + } + + if ( strsize.equals( "NONE" ) ) + { + nolimit = true ; + } + else + { + try + { + limit = Long.parseLong ( strsize ); + } + catch ( NumberFormatException e ) + { + _log.debug ( "Invalid STREAM RECEIVE size specified: " + strsize ); + return false; + } + + if ( limit < 0 ) + { + _log.debug ( "Specified limit (" + limit + + ") is out of protocol limits" ); + return false; + } + } + } + + streamSession.setReceiveLimit ( id, limit, nolimit ) ; + + return true; + } + + +} diff --git a/apps/sam/java/src/net/i2p/sam/SAMv2StreamSession.java b/apps/sam/java/src/net/i2p/sam/SAMv2StreamSession.java new file mode 100644 index 000000000..198b79fe2 --- /dev/null +++ b/apps/sam/java/src/net/i2p/sam/SAMv2StreamSession.java @@ -0,0 +1,574 @@ +package net.i2p.sam; +/* + * free (adj.): unencumbered; not under the control of others + * Written by human in 2004 and released into the public domain + * with no warranty of any kind, either expressed or implied. + * It probably won't make your computer catch on fire, or eat + * your children, but it might. Use at your own risk. + * + */ + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.io.OutputStream; +import java.net.ConnectException; +import java.net.NoRouteToHostException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.Set; + +import net.i2p.I2PException; +import net.i2p.client.I2PClient; +import net.i2p.client.streaming.I2PServerSocket; +import net.i2p.client.streaming.I2PSocket; +import net.i2p.client.streaming.I2PSocketManager; +import net.i2p.client.streaming.I2PSocketManagerFactory; +import net.i2p.client.streaming.I2PSocketOptions; +import net.i2p.data.Base64; +import net.i2p.data.ByteArray; +import net.i2p.data.DataHelper; +import net.i2p.data.DataFormatException; +import net.i2p.data.Destination; +import net.i2p.util.ByteCache; +import net.i2p.util.I2PThread; +import net.i2p.util.Log; + +/** + * SAMv2 STREAM session class. + * + * @author mkvore + */ + +public class SAMv2StreamSession extends SAMStreamSession +{ + + private final static Log _log = new Log ( SAMv2StreamSession.class ); + + /** + * Create a new SAM STREAM session. + * + * @param dest Base64-encoded destination (private key) + * @param dir Session direction ("RECEIVE", "CREATE" or "BOTH") + * @param props Properties to setup the I2P session + * @param recv Object that will receive incoming data + */ + public SAMv2StreamSession ( String dest, String dir, Properties props, + SAMStreamReceiver recv ) throws IOException, DataFormatException, SAMException + { + super ( dest, dir, props, recv ); + } + + /** + * Create a new SAM STREAM session. + * + * @param destStream Input stream containing the destination keys + * @param dir Session direction ("RECEIVE", "CREATE" or "BOTH") + * @param props Properties to setup the I2P session + * @param recv Object that will receive incoming data + */ + public SAMv2StreamSession ( InputStream destStream, String dir, + Properties props, SAMStreamReceiver recv ) throws IOException, DataFormatException, SAMException + { + super ( destStream, dir, props, recv ); + } + + /** + * Connect the SAM STREAM session to the specified Destination + * + * @param id Unique id for the connection + * @param dest Base64-encoded Destination to connect to + * @param props Options to be used for connection + * + * @throws DataFormatException if the destination is not valid + * @throws SAMInvalidDirectionException if trying to connect through a + * receive-only session + * @return true if the communication with the SAM client is ok + */ + + public boolean connect ( int id, String dest, Properties props ) + throws DataFormatException, SAMInvalidDirectionException + { + if ( !canCreate ) + { + _log.debug ( "Trying to create an outgoing connection using a receive-only session" ); + throw new SAMInvalidDirectionException ( "Trying to create connections through a receive-only session" ); + } + + if ( checkSocketHandlerId ( id ) ) + { + _log.debug ( "The specified id (" + id + ") is already in use" ); + return false ; + } + + Destination d = new Destination(); + + d.fromBase64 ( dest ); + + I2PSocketOptions opts = socketMgr.buildOptions ( props ); + + if ( props.getProperty ( I2PSocketOptions.PROP_CONNECT_TIMEOUT ) == null ) + opts.setConnectTimeout ( 60 * 1000 ); + + _log.debug ( "Connecting new I2PSocket..." ); + + + // non-blocking connection (SAMv2) + + StreamConnector connector ; + + connector = new StreamConnector ( id, d, opts ); + + I2PThread connectThread = new I2PThread ( connector, "StreamConnector" + id ) ; + + connectThread.start() ; + + return true ; + } + + + + + /** + * SAM STREAM socket connecter, running in its own thread. + * + * @author mkvore + */ + + public class StreamConnector implements Runnable + { + + private Object runningLock = new Object(); + private boolean stillRunning = true; + + private int id; + private Destination dest ; + private I2PSocketOptions opts ; + + /** + * Create a new SAM STREAM session socket reader + * + * @param id Unique id assigned to the handler + * @param dest Destination to reach + * @param opts Socket options (I2PSocketOptions) + */ + + + public StreamConnector ( int id, Destination dest, I2PSocketOptions opts )// throws IOException + { + _log.debug ( "Instantiating new SAM STREAM connector" ); + + this.id = id ; + this.opts = opts ; + this.dest = dest ; + } + + + public void run() + { + _log.debug ( "run() called for socket connector " + id ); + + try + { + try + { + I2PSocket i2ps = socketMgr.connect ( dest, opts ); + + createSocketHandler ( i2ps, id ); + + recv.notifyStreamOutgoingConnection ( id, "OK", null ); + } + + catch ( DataFormatException e ) + { + _log.debug ( "Invalid destination in STREAM CONNECT message" ); + recv.notifyStreamOutgoingConnection ( id, "INVALID_KEY", e.getMessage() ); + } + catch ( ConnectException e ) + { + _log.debug ( "STREAM CONNECT failed: " + e.getMessage() ); + recv.notifyStreamOutgoingConnection ( id, "CONNECTION_REFUSED", e.getMessage() ); + } + catch ( NoRouteToHostException e ) + { + _log.debug ( "STREAM CONNECT failed: " + e.getMessage() ); + recv.notifyStreamOutgoingConnection ( id, "CANT_REACH_PEER", e.getMessage() ); + } + catch ( InterruptedIOException e ) + { + _log.debug ( "STREAM CONNECT failed: " + e.getMessage() ); + recv.notifyStreamOutgoingConnection ( id, "TIMEOUT", e.getMessage() ); + } + catch ( I2PException e ) + { + _log.debug ( "STREAM CONNECT failed: " + e.getMessage() ); + recv.notifyStreamOutgoingConnection ( id, "I2P_ERROR", e.getMessage() ); + } + } + catch ( IOException e ) + { + _log.debug ( "Error sending disconnection notice for handler " + + id, e ); + } + + _log.debug ( "Shutting down SAM STREAM session connector " + id ); + } + } + + + + /** + * Lets us push data through the stream without blocking, (even after exceeding + * the I2PSocket's buffer) + */ + + protected StreamSender newStreamSender ( I2PSocket s, int id ) throws IOException + { + return new v2StreamSender ( s, id ) ; + } + + protected SAMStreamSessionSocketReader + newSAMStreamSessionSocketReader(I2PSocket s, int id ) throws IOException + { + return new SAMv2StreamSessionSocketReader(s,id); + } + + protected class v2StreamSender extends StreamSender + + { + private List _data; + private int _dataSize; + private int _id; + private ByteCache _cache; + private OutputStream _out = null; + private boolean _stillRunning, _shuttingDownGracefully; + private Object runningLock = new Object(); + private I2PSocket i2pSocket = null; + + public v2StreamSender ( I2PSocket s, int id ) throws IOException + { + super ( s, id ); + _data = new ArrayList ( 1 ); + _dataSize = 0; + _id = id; + _cache = ByteCache.getInstance ( 10, 32 * 1024 ); + _out = s.getOutputStream(); + _stillRunning = true; + _shuttingDownGracefully = false; + i2pSocket = s; + } + + /** + * Send bytes through the SAM STREAM session socket sender + * + * @param data Data to be sent + * + * @throws IOException if the client didnt provide enough data + */ + public void sendBytes ( InputStream in, int size ) throws IOException + { + if ( _log.shouldLog ( Log.DEBUG ) ) + _log.debug ( "Handler " + _id + ": sending " + size + " bytes" ); + + ByteArray ba = _cache.acquire(); + + int read = DataHelper.read ( in, ba.getData(), 0, size ); + + if ( read != size ) + throw new IOException ( "Insufficient data from the SAM client (" + read + "/" + size + ")" ); + + ba.setValid ( read ); + + synchronized ( _data ) + { + if ( _dataSize >= SOCKET_HANDLER_BUF_SIZE ) + { + _cache.release ( ba, false ); + recv.streamSendAnswer ( _id, "FAILED", "BUFFER_FULL" ) ; + } + else + { + _dataSize += size ; + _data.add ( ba ); + _data.notifyAll(); + + if ( _dataSize >= SOCKET_HANDLER_BUF_SIZE ) + { + recv.streamSendAnswer ( _id, "OK", "BUFFER_FULL" ) ; + } + else + { + recv.streamSendAnswer ( _id, "OK", "READY" ); + } + } + } + } + + /** + * Stop a SAM STREAM session socket sender thread immediately + * + */ + public void stopRunning() + { + _log.debug ( "stopRunning() invoked on socket sender " + _id ); + + synchronized ( runningLock ) + { + if ( _stillRunning ) + { + _stillRunning = false; + + try + { + i2pSocket.close(); + } + catch ( IOException e ) + { + _log.debug ( "Caught IOException", e ); + } + + synchronized ( _data ) + { + _data.clear(); + _data.notifyAll(); + } + } + } + } + + /** + * Stop a SAM STREAM session socket sender gracefully: stop the + * sender thread once all pending data has been sent. + */ + public void shutDownGracefully() + { + _log.debug ( "shutDownGracefully() invoked on socket sender " + _id ); + _shuttingDownGracefully = true; + } + + public void run() + { + _log.debug ( "run() called for socket sender " + _id ); + ByteArray data = null; + + while ( _stillRunning ) + { + data = null; + + try + { + synchronized ( _data ) + { + if ( _data.size() > 0 ) + { + int formerSize = _dataSize ; + data = ( ByteArray ) _data.remove ( 0 ); + _dataSize -= data.getValid(); + + if ( ( formerSize >= SOCKET_HANDLER_BUF_SIZE ) && ( _dataSize < SOCKET_HANDLER_BUF_SIZE ) ) + recv.notifyStreamSendBufferFree ( _id ); + } + else if ( _shuttingDownGracefully ) + { + /* No data left and shutting down gracefully? + If so, stop the sender. */ + stopRunning(); + break; + } + else + { + /* Wait for data. */ + _data.wait ( 5000 ); + } + } + + if ( data != null ) + { + try + { + _out.write ( data.getData(), 0, data.getValid() ); + + if ( forceFlush ) + { + // i dont like doing this, but it clears the buffer issues + _out.flush(); + } + } + catch ( IOException ioe ) + { + // ok, the stream failed, but the SAM client didn't + + if ( _log.shouldLog ( Log.WARN ) ) + _log.warn ( "Stream failed", ioe ); + + removeSocketHandler ( _id ); + + stopRunning(); + + } + finally + { + _cache.release ( data, false ); + } + } + } + catch ( InterruptedException ie ) {} + catch ( IOException e ) {}} + + synchronized ( _data ) + { + _data.clear(); + } + } + } + + + + /** + * Send bytes through a SAM STREAM session. + * + * @param data Bytes to be sent + * + * @return True if the data was queued for sending, false otherwise + */ + public boolean setReceiveLimit ( int id, long limit, boolean nolimit ) + { + SAMStreamSessionSocketReader reader = getSocketReader ( id ); + + if ( reader == null ) + { + if ( _log.shouldLog ( Log.WARN ) ) + _log.warn ( "Trying to set a limit to a nonexistent reader " + id ); + + return false; + } + + ( (SAMv2StreamSessionSocketReader) reader).setLimit ( limit, nolimit ); + + return true; + } + + + /** + * SAM STREAM socket reader, running in its own thread. It forwards + * forward data to/from an I2P socket. + * + * @author human + */ + + + + public class SAMv2StreamSessionSocketReader extends SAMv1StreamSessionSocketReader + { + + protected boolean nolimit ; + protected long limit ; + protected long totalReceived ; + + + /** + * Create a new SAM STREAM session socket reader + * + * @param s Socket to be handled + * @param id Unique id assigned to the handler + */ + public SAMv2StreamSessionSocketReader ( I2PSocket s, int id ) throws IOException + { + super ( s, id ); + nolimit = false ; + limit = 0 ; + totalReceived = 0 ; + } + + public void setLimit ( long limit, boolean nolimit ) + { + synchronized (runningLock) + { + this.limit = limit ; + this.nolimit = nolimit ; + runningLock.notify() ; + } + _log.debug ( "new limit set for socket reader " + id + " : " + (nolimit ? "NOLIMIT" : limit + " bytes" ) ); + } + + public void run() + { + _log.debug ( "run() called for socket reader " + id ); + + int read = -1; + byte[] data = new byte[SOCKET_HANDLER_BUF_SIZE]; + + try + { + InputStream in = i2pSocket.getInputStream(); + + while ( stillRunning ) + { + synchronized (runningLock) + { + while ( stillRunning && ( !nolimit && totalReceived >= limit) ) + { + try{ + runningLock.wait() ; + } + catch (InterruptedException ie) + {} + } + if ( !stillRunning ) + break ; + } + + read = in.read ( data ); + + if ( read == -1 ) + { + _log.debug ( "Handler " + id + ": connection closed" ); + break; + } + + totalReceived += read ; + + recv.receiveStreamBytes ( id, data, read ); + } + } + catch ( IOException e ) + { + _log.debug ( "Caught IOException", e ); + } + + try + { + i2pSocket.close(); + } + catch ( IOException e ) + { + _log.debug ( "Caught IOException", e ); + } + + if ( stillRunning ) + { + removeSocketHandler ( id ); + // FIXME: we need error reporting here! + + try + { + recv.notifyStreamDisconnection ( id, "OK", null ); + } + catch ( IOException e ) + { + _log.debug ( "Error sending disconnection notice for handler " + + id, e ); + } + } + + _log.debug ( "Shutting down SAM STREAM session socket handler " + id ); + } + } + + + +} diff --git a/history.txt b/history.txt index e870b83f9..b8694c2ef 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,9 @@ -$Id: history.txt,v 1.598 2007-11-26 16:54:00 zzz Exp $ +$Id: history.txt,v 1.599 2007-12-01 22:13:15 complication Exp $ + +2007-12-02 Complication + * Commit SAM v2 patch from mkvore (thank you!) + * Minor reformatting to preserve consistent whitespace + in old SAM classes (new classes unaltered) 2007-12-01 Complication * Separate the checks "does Jetty .zip file need downloading" diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 806fd5df3..87062b470 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.533 $ $Date: 2007-11-26 16:53:58 $"; + public final static String ID = "$Revision: 1.534 $ $Date: 2007-12-01 22:13:18 $"; public final static String VERSION = "0.6.1.30"; - public final static long BUILD = 5; + public final static long BUILD = 6; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("Router ID: " + RouterVersion.ID);