SAM v3.3: Fixes after testing

- Set Master properties in handler, not in session, so they take
- Create subhandlers for the subsessions
- Create socket manager with preferred createDisconectedManager()
  so we get exceptions
- Fix check for master session
- Enhance error messages
- Add basic master session test for SAMStreamSend
- Add check for DESTINATION in SESSION ADD
- Don't return DESTINATION in an I2P_ERROR response
Next to do: master support in SAMStreamSink
This commit is contained in:
zzz
2016-02-06 00:21:37 +00:00
parent 270bc24b62
commit 62ad7996f1
6 changed files with 164 additions and 47 deletions

View File

@@ -56,10 +56,6 @@ class MasterSession extends SAMv3StreamSession implements SAMDatagramReceiver, S
public MasterSession(String nick, SAMv3DatagramServer dgServer, SAMv3Handler handler, Properties props)
throws IOException, DataFormatException, SAMException {
super(nick);
props.setProperty("net.i2p.streaming.enforceProtocol", "true");
props.setProperty("i2cp.dontPublishLeaseSet", "false");
props.setProperty("FROM_PORT", Integer.toString(I2PSession.PORT_UNSPECIFIED));
props.setProperty("TO_PORT", Integer.toString(I2PSession.PORT_UNSPECIFIED));
dgs = dgServer;
sessions = new ConcurrentHashMap<String, SAMMessageSess>(4);
this.handler = handler;
@@ -84,6 +80,8 @@ class MasterSession extends SAMv3StreamSession implements SAMDatagramReceiver, S
* @return null for success, or error message
*/
public synchronized String add(String nick, String style, Properties props) {
if (props.containsKey("DESTINATION"))
return "SESSION ADD may not contain DESTINATION";
SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
if (rec != null || sessions.containsKey(nick))
return "Duplicate ID " + nick;
@@ -103,9 +101,11 @@ class MasterSession extends SAMv3StreamSession implements SAMDatagramReceiver, S
}
int listenProtocol;
SAMMessageSess sess;
// temp
SAMv3Handler subhandler;
try {
I2PSession isess = socketMgr.getSession();
subhandler = new SAMv3Handler(handler.getClientSocket(), handler.verMajor,
handler.verMinor, handler.getBridge());
if (style.equals("RAW")) {
if (!props.containsKey("PORT"))
return "RAW subsession must specify PORT";
@@ -124,21 +124,32 @@ class MasterSession extends SAMv3StreamSession implements SAMDatagramReceiver, S
return "Bad LISTEN_PROTOCOL " + spr;
}
}
sess = new SAMv3RawSession(nick, props, handler, isess, listenProtocol, listenPort, dgs);
SAMv3RawSession ssess = new SAMv3RawSession(nick, props, handler, isess, listenProtocol, listenPort, dgs);
subhandler.setSession(ssess);
sess = ssess;
} else if (style.equals("DATAGRAM")) {
if (!props.containsKey("PORT"))
return "DATAGRAM subsession must specify PORT";
listenProtocol = I2PSession.PROTO_DATAGRAM;
sess = new SAMv3DatagramSession(nick, props, handler, isess, listenPort, dgs);
SAMv3DatagramSession ssess = new SAMv3DatagramSession(nick, props, handler, isess, listenPort, dgs);
subhandler.setSession(ssess);
sess = ssess;
} else if (style.equals("STREAM")) {
listenProtocol = I2PSession.PROTO_STREAMING;
// FIXME need something that hangs off an existing dest
sess = new SAMv3StreamSession(nick, props, handler, socketMgr, listenPort);
SAMv3StreamSession ssess = new SAMv3StreamSession(nick, props, handler, socketMgr, listenPort);
subhandler.setSession(ssess);
sess = ssess;
} else {
return "Unrecognized SESSION STYLE " + style;
}
} catch (Exception e) {
// temp
} catch (IOException e) {
return e.toString();
} catch (DataFormatException e) {
return e.toString();
} catch (SAMException e) {
return e.toString();
} catch (I2PSessionException e) {
return e.toString();
}
@@ -148,8 +159,7 @@ class MasterSession extends SAMv3StreamSession implements SAMDatagramReceiver, S
return "Duplicate protocol " + listenProtocol + " and port " + listenPort;
}
// add to session db and our map
rec = new SessionRecord(getDestination().toBase64(), props, handler);
rec = new SessionRecord(getDestination().toBase64(), props, subhandler);
try {
if (!SAMv3Handler.sSessionsHash.put(nick, rec))
return "Duplicate ID " + nick;

View File

@@ -29,6 +29,7 @@ import net.i2p.I2PAppContext;
import net.i2p.I2PException;
import net.i2p.client.I2PClient;
import net.i2p.client.I2PSession;
import net.i2p.client.I2PSessionException;
import net.i2p.client.streaming.I2PServerSocket;
import net.i2p.client.streaming.I2PSocket;
import net.i2p.client.streaming.I2PSocketManager;
@@ -51,13 +52,9 @@ import net.i2p.util.Log;
class SAMStreamSession implements SAMMessageSess {
protected final Log _log;
protected final static int SOCKET_HANDLER_BUF_SIZE = 32768;
protected final SAMStreamReceiver recv;
protected final SAMStreamSessionServer server;
protected final I2PSocketManager socketMgr;
/** stream id (Long) to SAMStreamSessionSocketReader */
@@ -163,12 +160,13 @@ class SAMStreamSession implements SAMMessageSess {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Creating I2PSocketManager...");
socketMgr = I2PSocketManagerFactory.createManager(destStream,
i2cpHost,
i2cpPort,
allprops);
if (socketMgr == null) {
throw new SAMException("Error creating I2PSocketManager");
try {
// we do it this way so we get exceptions
socketMgr = I2PSocketManagerFactory.createDisconnectedManager(destStream,
i2cpHost, i2cpPort, allprops);
socketMgr.getSession().connect();
} catch (I2PSessionException ise) {
throw new SAMException("Error creating I2PSocketManager: " + ise.getMessage(), ise);
}
socketMgr.addDisconnectListener(new DisconnectListener());

View File

@@ -133,7 +133,31 @@ class SAMv3Handler extends SAMv1Handler
Session getSession() {
return session;
}
/**
* For subsessions created by MasterSession
* @since 0.9.25
*/
void setSession(SAMv3RawSession sess) {
rawSession = sess; session = sess;
}
/**
* For subsessions created by MasterSession
* @since 0.9.25
*/
void setSession(SAMv3DatagramSession sess) {
datagramSession = sess; session = sess;
}
/**
* For subsessions created by MasterSession
* @since 0.9.25
*/
void setSession(SAMv3StreamSession sess) {
streamSession = sess; session = sess;
}
@Override
public void handle() {
String msg = null;
@@ -435,7 +459,16 @@ class SAMv3Handler extends SAMv1Handler
Properties allProps = new Properties();
allProps.putAll(i2cpProps);
allProps.putAll(props);
if (style.equals("MASTER")) {
// We must put these here, as SessionRecord.getProps() makes a copy,
// and the socket manager is instantiated in the
// SAMStreamSession constructor.
allProps.setProperty("i2p.streaming.enforceProtocol", "true");
allProps.setProperty("i2cp.dontPublishLeaseSet", "false");
allProps.setProperty("FROM_PORT", Integer.toString(I2PSession.PORT_UNSPECIFIED));
allProps.setProperty("TO_PORT", Integer.toString(I2PSession.PORT_UNSPECIFIED));
}
try {
sSessionsHash.put( nick, new SessionRecord(dest, allProps, this) ) ;
@@ -486,7 +519,7 @@ class SAMv3Handler extends SAMv1Handler
} else if (opcode.equals("ADD") || opcode.equals("REMOVE")) {
// prevent trouble in finally block
ok = true;
if (streamSession != null || datagramSession != null || rawSession != null)
if (streamSession == null || datagramSession == null || rawSession == null)
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Not a MASTER session\"\n");
MasterSession msess = (MasterSession) session;
String msg;
@@ -512,14 +545,14 @@ class SAMv3Handler extends SAMv1Handler
} catch (I2PSessionException e) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("I2P error when instantiating session", e);
return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n");
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"" + e.getMessage() + "\"\n");
} catch (SAMException e) {
if (_log.shouldLog(Log.INFO))
_log.info("Funny SAM error", e);
return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n");
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"" + e.getMessage() + "\"\n");
} catch (IOException e) {
_log.error("Unexpected IOException", e);
return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n");
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"" + e.getMessage() + "\"\n");
} finally {
// unregister the session if it has not been created
if ( !ok && nick!=null ) {
@@ -566,23 +599,21 @@ class SAMv3Handler extends SAMv1Handler
}
rec = sSessionsHash.get(nick);
if ( rec==null ) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("STREAM SESSION ID does not exist");
try {
notifyStreamResult(true, "INVALID_ID", "STREAM SESSION ID does not exist");
notifyStreamResult(true, "INVALID_ID", "STREAM SESSION ID " + nick + " does not exist");
} catch (IOException e) {}
return false ;
}
streamSession = rec.getHandler().streamSession ;
if (streamSession==null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("specified ID is not a stream session");
try {
notifyStreamResult(true, "I2P_ERROR", "specified ID is not a STREAM session");
notifyStreamResult(true, "I2P_ERROR", "specified ID " + nick + " is not a STREAM session");
} catch (IOException e) {}
return false ;
}
@@ -638,19 +669,19 @@ class SAMv3Handler extends SAMv1Handler
} catch (DataFormatException e) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Invalid destination in STREAM CONNECT message");
notifyStreamResult ( verbose, "INVALID_KEY", null );
notifyStreamResult ( verbose, "INVALID_KEY", e.getMessage());
} catch (ConnectException e) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("STREAM CONNECT failed", e);
notifyStreamResult ( verbose, "CONNECTION_REFUSED", null );
notifyStreamResult ( verbose, "CONNECTION_REFUSED", e.getMessage());
} catch (NoRouteToHostException e) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("STREAM CONNECT failed", e);
notifyStreamResult ( verbose, "CANT_REACH_PEER", null );
notifyStreamResult ( verbose, "CANT_REACH_PEER", e.getMessage());
} catch (InterruptedIOException e) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("STREAM CONNECT failed", e);
notifyStreamResult ( verbose, "TIMEOUT", null );
notifyStreamResult ( verbose, "TIMEOUT", e.getMessage());
} catch (I2PException e) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("STREAM CONNECT failed", e);
@@ -702,7 +733,7 @@ class SAMv3Handler extends SAMv1Handler
} catch (SAMException e) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("STREAM ACCEPT failed", e);
notifyStreamResult ( verbose, "ALREADY_ACCEPTING", null );
notifyStreamResult ( verbose, "ALREADY_ACCEPTING", e.getMessage());
}
} catch (IOException e) {
}
@@ -710,6 +741,11 @@ class SAMv3Handler extends SAMv1Handler
}
/**
* @param verbose if false, does nothing
* @param result non-null
* @param message may be null
*/
public void notifyStreamResult(boolean verbose, String result, String message) throws IOException {
if (!verbose) return ;
String msgString = createMessageString(message);

View File

@@ -42,6 +42,10 @@ class SessionRecord {
return m_dest;
}
/**
* Warning - returns a copy.
* @return a copy
*/
synchronized public Properties getProps()
{
Properties p = new Properties();

View File

@@ -18,6 +18,7 @@ public class SAMEventHandler extends SAMClientEventListenerImpl {
private String _version;
private final Object _helloLock = new Object();
private Boolean _sessionCreateOk;
private Boolean _sessionAddOk;
private Boolean _streamStatusOk;
private final Object _sessionCreateLock = new Object();
private final Object _namingReplyLock = new Object();
@@ -41,13 +42,19 @@ public class SAMEventHandler extends SAMClientEventListenerImpl {
}
}
/** may be called twice, first for CREATE and second for ADD */
@Override
public void sessionStatusReceived(String result, String destination, String msg) {
synchronized (_sessionCreateLock) {
Boolean ok;
if (SAMReader.SAMClientEventListener.SESSION_STATUS_OK.equals(result))
_sessionCreateOk = Boolean.TRUE;
ok = Boolean.TRUE;
else
_sessionCreateOk = Boolean.FALSE;
ok = Boolean.FALSE;
if (_sessionCreateOk == null)
_sessionCreateOk = ok;
else if (_sessionAddOk == null)
_sessionAddOk = ok;
_sessionCreateLock.notifyAll();
}
}
@@ -120,6 +127,25 @@ public class SAMEventHandler extends SAMClientEventListenerImpl {
}
}
/**
* Wait for the session to be added, returning true if everything went ok
*
* @return true if everything ok
* @since 0.9.25
*/
public boolean waitForSessionAddReply() {
while (true) {
try {
synchronized (_sessionCreateLock) {
if (_sessionAddOk == null)
_sessionCreateLock.wait();
else
return _sessionAddOk.booleanValue();
}
} catch (InterruptedException ie) { return false; }
}
}
/**
* Wait for the stream to be created, returning true if everything went ok
*

View File

@@ -55,14 +55,17 @@ public class SAMStreamSend {
private static I2PSSLSocketFactory _sslSocketFactory;
private static final int STREAM=0, DG=1, V1DG=2, RAW=3, V1RAW=4;
private static final String USAGE = "Usage: SAMStreamSend [-s] [-m mode] [-v version] [-b samHost] [-p samPort] [-o opt=val] [-u user] [-w password] peerDestFile dataDir\n" +
private static final int MASTER=8;
private static final String USAGE = "Usage: SAMStreamSend [-s] [-x] [-m mode] [-v version] [-b samHost] [-p samPort] [-o opt=val] [-u user] [-w password] peerDestFile dataDir\n" +
" modes: stream: 0; datagram: 1; v1datagram: 2; raw: 3; v1raw: 4\n" +
" -s: use SSL\n" +
" -x: use master session (forces -v 3.3)\n" +
" multiple -o session options are allowed";
public static void main(String args[]) {
Getopt g = new Getopt("SAM", args, "sb:m:o:p:u:v:w:");
Getopt g = new Getopt("SAM", args, "sxb:m:o:p:u:v:w:");
boolean isSSL = false;
boolean isMaster = false;
int mode = STREAM;
String version = "1.0";
String host = "127.0.0.1";
@@ -77,6 +80,10 @@ public class SAMStreamSend {
isSSL = true;
break;
case 'x':
isMaster = true;
break;
case 'm':
mode = Integer.parseInt(g.getOptarg());
if (mode < 0 || mode > V1RAW) {
@@ -123,6 +130,10 @@ public class SAMStreamSend {
System.err.println(USAGE);
return;
}
if (isMaster) {
mode += MASTER;
version = "3.3";
}
if ((user == null && password != null) ||
(user != null && password == null)) {
System.err.println("both user and password or neither");
@@ -162,6 +173,8 @@ public class SAMStreamSend {
_log.debug("Reader created");
OutputStream out = sock.getOutputStream();
String ourDest = handshake(out, version, true, eventHandler, mode, user, password, sessionOpts);
if (mode >= MASTER)
mode -= MASTER;
if (ourDest == null)
throw new IOException("handshake failed");
if (_log.shouldLog(Log.DEBUG))
@@ -230,7 +243,10 @@ public class SAMStreamSend {
return sock;
}
/** @return our b64 dest or null */
/**
* @param isMaster is this the control socket
* @return our b64 dest or null
*/
private String handshake(OutputStream samOut, String version, boolean isMaster,
SAMEventHandler eventHandler, int mode, String user, String password,
String opts) {
@@ -261,6 +277,16 @@ public class SAMStreamSend {
_v3ID = "xx€€xx" + _v3ID;
_conOptions = "ID=" + _v3ID;
}
boolean masterMode; // are we using v3.3 master session
String command;
if (mode >= MASTER) {
masterMode = true;
command = "ADD";
mode -= MASTER;
} else {
masterMode = false;
command = "CREATE DESTINATION=TRANSIENT";
}
String style;
if (mode == STREAM)
style = "STREAM";
@@ -268,16 +294,33 @@ public class SAMStreamSend {
style = "DATAGRAM";
else
style = "RAW";
String req = "SESSION CREATE STYLE=" + style + " DESTINATION=TRANSIENT " + _conOptions + ' ' + opts + '\n';
if (masterMode) {
String req = "SESSION CREATE DESTINATION=TRANSIENT STYLE=MASTER ID=master " + opts + '\n';
samOut.write(req.getBytes("UTF-8"));
samOut.flush();
if (_log.shouldLog(Log.DEBUG))
_log.debug("SESSION CREATE STYLE=MASTER sent");
boolean ok = eventHandler.waitForSessionCreateReply();
if (!ok)
throw new IOException("SESSION CREATE STYLE=MASTER failed");
if (_log.shouldLog(Log.DEBUG))
_log.debug("SESSION CREATE STYLE=MASTER reply found: " + ok);
}
String req = "SESSION " + command + " STYLE=" + style + ' ' + _conOptions + ' ' + opts + '\n';
samOut.write(req.getBytes("UTF-8"));
samOut.flush();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Session create sent");
boolean ok = eventHandler.waitForSessionCreateReply();
_log.debug("SESSION " + command + " sent");
boolean ok;
if (masterMode)
ok = eventHandler.waitForSessionAddReply();
else
ok = eventHandler.waitForSessionCreateReply();
if (!ok)
throw new IOException("Session create failed");
throw new IOException("SESSION " + command + " failed");
if (_log.shouldLog(Log.DEBUG))
_log.debug("Session create reply found: " + ok);
_log.debug("SESSION " + command + " reply found: " + ok);
req = "NAMING LOOKUP NAME=ME\n";
samOut.write(req.getBytes("UTF-8"));