* Added support for SESSION CREATE STYLE=STREAM DIRECTION={CREATE|RECEIVE|BOTH}

* M-x untabify
(human)
This commit is contained in:
human
2004-04-16 03:42:05 +00:00
committed by zzz
parent 031338d84d
commit c0bb3da22f
4 changed files with 911 additions and 868 deletions

View File

@@ -43,8 +43,8 @@ public abstract class SAMHandler implements Runnable {
* *
*/ */
public void startHandling() { public void startHandling() {
thread = new I2PThread(this, "SAMHandler"); thread = new I2PThread(this, "SAMHandler");
thread.start(); thread.start();
} }
/** /**
@@ -61,13 +61,13 @@ public abstract class SAMHandler implements Runnable {
* @param data A byte array to be written * @param data A byte array to be written
*/ */
protected void writeBytes(byte[] data) throws IOException { protected void writeBytes(byte[] data) throws IOException {
synchronized (socketWLock) { synchronized (socketWLock) {
if (socketOS == null) { if (socketOS == null) {
socketOS = socket.getOutputStream(); socketOS = socket.getOutputStream();
} }
socketOS.write(data); socketOS.write(data);
socketOS.flush(); socketOS.flush();
} }
} }
/** /**
@@ -80,14 +80,14 @@ public abstract class SAMHandler implements Runnable {
* @return True is the string was successfully written, false otherwise * @return True is the string was successfully written, false otherwise
*/ */
protected boolean writeString(String str) { protected boolean writeString(String str) {
try { try {
writeBytes(str.getBytes("ISO-8859-1")); writeBytes(str.getBytes("ISO-8859-1"));
} catch (IOException e) { } catch (IOException e) {
_log.debug("Caught IOException", e); _log.debug("Caught IOException", e);
return false; return false;
} }
return true; return true;
} }
/** /**
@@ -95,9 +95,9 @@ public abstract class SAMHandler implements Runnable {
* *
*/ */
public void stopHandling() { public void stopHandling() {
synchronized (stopLock) { synchronized (stopLock) {
stopHandler = true; stopHandler = true;
} }
} }
/** /**
@@ -106,9 +106,9 @@ public abstract class SAMHandler implements Runnable {
* @return True if the handler should be stopped, false otherwise * @return True if the handler should be stopped, false otherwise
*/ */
protected boolean shouldStop() { protected boolean shouldStop() {
synchronized (stopLock) { synchronized (stopLock) {
return stopHandler; return stopHandler;
} }
} }
/** /**
@@ -119,6 +119,6 @@ public abstract class SAMHandler implements Runnable {
public abstract String toString(); public abstract String toString();
public final void run() { public final void run() {
handle(); handle();
} }
} }

View File

@@ -35,145 +35,145 @@ public class SAMHandlerFactory {
* @return A SAM protocol handler * @return A SAM protocol handler
*/ */
public static SAMHandler createSAMHandler(Socket s) throws SAMException { public static SAMHandler createSAMHandler(Socket s) throws SAMException {
BufferedReader br; BufferedReader br;
StringTokenizer tok; StringTokenizer tok;
try { try {
br = new BufferedReader(new InputStreamReader(s.getInputStream(), br = new BufferedReader(new InputStreamReader(s.getInputStream(),
"ISO-8859-1")); "ISO-8859-1"));
tok = new StringTokenizer(br.readLine(), " "); tok = new StringTokenizer(br.readLine(), " ");
} catch (IOException e) { } catch (IOException e) {
throw new SAMException("Error reading from socket: " throw new SAMException("Error reading from socket: "
+ e.getMessage()); + e.getMessage());
} catch (Exception e) { } catch (Exception e) {
throw new SAMException("Unexpected error: " throw new SAMException("Unexpected error: "
+ e.getMessage()); + e.getMessage());
} }
// Message format: HELLO VERSION MIN=v1 MAX=v2 // Message format: HELLO VERSION MIN=v1 MAX=v2
if (tok.countTokens() != 4) { if (tok.countTokens() != 4) {
throw new SAMException("Bad format in HELLO message"); throw new SAMException("Bad format in HELLO message");
} }
if (!tok.nextToken().equals("HELLO")) { if (!tok.nextToken().equals("HELLO")) {
throw new SAMException("Bad domain in HELLO message"); throw new SAMException("Bad domain in HELLO message");
} }
{ {
String opcode; String opcode;
if (!(opcode = tok.nextToken()).equals("VERSION")) { if (!(opcode = tok.nextToken()).equals("VERSION")) {
throw new SAMException("Unrecognized HELLO message opcode: \"" throw new SAMException("Unrecognized HELLO message opcode: \""
+ opcode + "\""); + opcode + "\"");
} }
} }
Properties props; Properties props;
props = SAMUtils.parseParams(tok); props = SAMUtils.parseParams(tok);
if (props == null) { if (props == null) {
throw new SAMException("No parameters in HELLO VERSION message"); throw new SAMException("No parameters in HELLO VERSION message");
} }
String minVer = props.getProperty("MIN"); String minVer = props.getProperty("MIN");
if (minVer == null) { if (minVer == null) {
throw new SAMException("Missing MIN parameter in HELLO VERSION message"); throw new SAMException("Missing MIN parameter in HELLO VERSION message");
} }
String maxVer = props.getProperty("MAX"); String maxVer = props.getProperty("MAX");
if (maxVer == null) { if (maxVer == null) {
throw new SAMException("Missing MAX parameter in HELLO VERSION message"); throw new SAMException("Missing MAX parameter in HELLO VERSION message");
} }
String ver = chooseBestVersion(minVer, maxVer); String ver = chooseBestVersion(minVer, maxVer);
if (ver == null) { if (ver == null) {
// Let's answer negatively // Let's answer negatively
try { try {
OutputStream out = s.getOutputStream(); OutputStream out = s.getOutputStream();
out.write("HELLO REPLY RESULT=NOVERSION\n".getBytes("ISO-8859-1")); out.write("HELLO REPLY RESULT=NOVERSION\n".getBytes("ISO-8859-1"));
return null; return null;
} catch (UnsupportedEncodingException e) { } catch (UnsupportedEncodingException e) {
_log.error("Caught UnsupportedEncodingException (" _log.error("Caught UnsupportedEncodingException ("
+ e.getMessage() + ")"); + e.getMessage() + ")");
throw new SAMException("Character encoding error: " throw new SAMException("Character encoding error: "
+ e.getMessage()); + e.getMessage());
} catch (IOException e) { } catch (IOException e) {
throw new SAMException("Error reading from socket: " throw new SAMException("Error reading from socket: "
+ e.getMessage()); + e.getMessage());
} }
} }
// Let's answer positively // Let's answer positively
try { try {
OutputStream out = s.getOutputStream(); OutputStream out = s.getOutputStream();
out.write(("HELLO REPLY RESULT=OK VERSION=" out.write(("HELLO REPLY RESULT=OK VERSION="
+ ver + "\n").getBytes("ISO-8859-1")); + ver + "\n").getBytes("ISO-8859-1"));
} catch (UnsupportedEncodingException e) { } catch (UnsupportedEncodingException e) {
_log.error("Caught UnsupportedEncodingException (" _log.error("Caught UnsupportedEncodingException ("
+ e.getMessage() + ")"); + e.getMessage() + ")");
throw new SAMException("Character encoding error: " throw new SAMException("Character encoding error: "
+ e.getMessage()); + e.getMessage());
} catch (IOException e) { } catch (IOException e) {
throw new SAMException("Error writing to socket: " throw new SAMException("Error writing to socket: "
+ e.getMessage()); + e.getMessage());
} }
// ...and instantiate the right SAM handler // ...and instantiate the right SAM handler
int verMajor = getMajor(ver); int verMajor = getMajor(ver);
int verMinor = getMinor(ver); int verMinor = getMinor(ver);
SAMHandler handler; SAMHandler handler;
switch (verMajor) { switch (verMajor) {
case 1: case 1:
handler = new SAMv1Handler(s, verMajor, verMinor); handler = new SAMv1Handler(s, verMajor, verMinor);
break; break;
default: default:
_log.error("BUG! Trying to initialize the wrong SAM version!"); _log.error("BUG! Trying to initialize the wrong SAM version!");
throw new SAMException("BUG triggered! (handler instantiation)"); throw new SAMException("BUG triggered! (handler instantiation)");
} }
return handler; return handler;
} }
/* Return the best version we can use, or null on failure */ /* Return the best version we can use, or null on failure */
private static String chooseBestVersion(String minVer, String maxVer) { private static String chooseBestVersion(String minVer, String maxVer) {
int minMajor = getMajor(minVer), minMinor = getMinor(minVer); int minMajor = getMajor(minVer), minMinor = getMinor(minVer);
int maxMajor = getMajor(maxVer), maxMinor = getMinor(maxVer); int maxMajor = getMajor(maxVer), maxMinor = getMinor(maxVer);
// Consistency checks // Consistency checks
if ((minMajor == -1) || (minMinor == -1) if ((minMajor == -1) || (minMinor == -1)
|| (maxMajor == -1) || (maxMinor == -1)) { || (maxMajor == -1) || (maxMinor == -1)) {
return null; return null;
} }
if (minMajor > maxMajor) { if (minMajor > maxMajor) {
return null; return null;
} else if ((minMajor == maxMajor) && (minMinor > maxMinor)) { } else if ((minMajor == maxMajor) && (minMinor > maxMinor)) {
return null; return null;
} }
if ((minMajor >= 1) && (minMinor >= 0)) { if ((minMajor >= 1) && (minMinor >= 0)) {
return "1.0"; return "1.0";
} }
return null; return null;
} }
/* Get the major protocol version from a string */ /* Get the major protocol version from a string */
private static int getMajor(String ver) { private static int getMajor(String ver) {
try { try {
String major = ver.substring(0, ver.indexOf(".")); String major = ver.substring(0, ver.indexOf("."));
return Integer.parseInt(major); return Integer.parseInt(major);
} catch (NumberFormatException e) { } catch (NumberFormatException e) {
return -1; return -1;
} catch (ArrayIndexOutOfBoundsException e) { } catch (ArrayIndexOutOfBoundsException e) {
return -1; return -1;
} }
} }
/* Get the minor protocol version from a string */ /* Get the minor protocol version from a string */
private static int getMinor(String ver) { private static int getMinor(String ver) {
try { try {
String major = ver.substring(ver.indexOf(".") + 1); String major = ver.substring(ver.indexOf(".") + 1);
return Integer.parseInt(major); return Integer.parseInt(major);
} catch (NumberFormatException e) { } catch (NumberFormatException e) {
return -1; return -1;
} catch (ArrayIndexOutOfBoundsException e) { } catch (ArrayIndexOutOfBoundsException e) {
return -1; return -1;
} }
} }
} }

View File

@@ -54,57 +54,78 @@ public class SAMStreamSession {
private Object idLock = new Object(); private Object idLock = new Object();
private int lastNegativeId = 0; private int lastNegativeId = 0;
// Can we create outgoing connections?
private boolean canCreate = false;
/** /**
* Create a new SAM STREAM session. * Create a new SAM STREAM session.
* *
* @param dest Base64-encoded destination (private key) * @param dest Base64-encoded destination (private key)
* @param dir Session direction ("RECEIVE", "CREATE" or "BOTH")
* @param props Properties to setup the I2P session * @param props Properties to setup the I2P session
* @param recv Object that will receive incoming data * @param recv Object that will receive incoming data
*/ */
public SAMStreamSession(String dest, Properties props, public SAMStreamSession(String dest, String dir, Properties props,
SAMStreamReceiver recv) throws IOException, DataFormatException, SAMException { SAMStreamReceiver recv) throws IOException, DataFormatException, SAMException {
ByteArrayInputStream bais; ByteArrayInputStream bais;
bais = new ByteArrayInputStream(Base64.decode(dest)); bais = new ByteArrayInputStream(Base64.decode(dest));
initSAMStreamSession(bais, props, recv); initSAMStreamSession(bais, dir, props, recv);
} }
/** /**
* Create a new SAM STREAM session. * Create a new SAM STREAM session.
* *
* @param destStream Input stream containing the destination keys * @param destStream Input stream containing the destination keys
* @param dir Session direction ("RECEIVE", "CREATE" or "BOTH")
* @param props Properties to setup the I2P session * @param props Properties to setup the I2P session
* @param recv Object that will receive incoming data * @param recv Object that will receive incoming data
*/ */
public SAMStreamSession(InputStream destStream, Properties props, public SAMStreamSession(InputStream destStream, String dir,
SAMStreamReceiver recv) throws IOException, DataFormatException, SAMException { Properties props, SAMStreamReceiver recv) throws IOException, DataFormatException, SAMException {
initSAMStreamSession(destStream, props, recv); initSAMStreamSession(destStream, dir, props, recv);
} }
private void initSAMStreamSession(InputStream destStream, Properties props, private void initSAMStreamSession(InputStream destStream, String dir,
SAMStreamReceiver recv) throws IOException, DataFormatException, SAMException{ Properties props, SAMStreamReceiver recv) throws IOException, DataFormatException, SAMException{
this.recv = recv; this.recv = recv;
_log.debug("SAM STREAM session instantiated"); _log.debug("SAM STREAM session instantiated");
Properties allprops = new Properties(); Properties allprops = new Properties();
allprops.putAll(System.getProperties()); allprops.putAll(System.getProperties());
allprops.putAll(props); allprops.putAll(props);
// FIXME: we should setup I2CP host and port, too // FIXME: we should setup I2CP host and port, too
_log.debug("Creating I2PSocketManager..."); _log.debug("Creating I2PSocketManager...");
socketMgr = I2PSocketManagerFactory.createManager(destStream, socketMgr = I2PSocketManagerFactory.createManager(destStream,
"127.0.0.1", "127.0.0.1",
7654, allprops); 7654, allprops);
if (socketMgr == null) { if (socketMgr == null) {
throw new SAMException("Error creating I2PSocketManager"); throw new SAMException("Error creating I2PSocketManager");
} }
server = new SAMStreamSessionServer(); boolean canReceive = false;
Thread t = new I2PThread(server, "SAMStreamSessionServer"); if (dir.equals("BOTH")) {
canCreate = true;
canReceive = true;
} else if (dir.equals("CREATE")) {
canCreate = true;
} else if (dir.equals("RECEIVE")) {
canReceive = true;
} else {
_log.error("BUG! Wrong direction passed to SAMStreamSession: "
+ dir);
throw new SAMException("BUG! Wrong direction specified!");
}
t.start(); if (canReceive) {
server = new SAMStreamSessionServer();
Thread t = new I2PThread(server, "SAMStreamSessionServer");
t.start();
}
} }
/** /**
@@ -113,7 +134,7 @@ public class SAMStreamSession {
* @return The SAM STREAM session Destination. * @return The SAM STREAM session Destination.
*/ */
public Destination getDestination() { public Destination getDestination() {
return socketMgr.getSession().getMyDestination(); return socketMgr.getSession().getMyDestination();
} }
/** /**
@@ -123,25 +144,30 @@ public class SAMStreamSession {
* @param dest Base64-encoded Destination to connect to * @param dest Base64-encoded Destination to connect to
* @param props Options to be used for connection * @param props Options to be used for connection
*/ */
public boolean connect(int id, String dest, Properties props) throws I2PException, DataFormatException { public boolean connect(int id, String dest, Properties props) throws I2PException, DataFormatException, SAMInvalidDirectionException {
if (checkSocketHandlerId(id)) { if (!canCreate) {
_log.debug("The specified id (" + id + ") is already in use"); _log.debug("Trying to create an outgoing connection using a receive-only session");
return false; throw new SAMInvalidDirectionException("Trying to create connections through a receive-only session");
} }
Destination d = new Destination(); if (checkSocketHandlerId(id)) {
d.fromBase64(dest); _log.debug("The specified id (" + id + ") is already in use");
return false;
}
// FIXME: we should config I2PSocketOptions here Destination d = new Destination();
d.fromBase64(dest);
// FIXME: we should config I2PSocketOptions here
I2PSocketOptions opts = new I2PSocketOptions(); I2PSocketOptions opts = new I2PSocketOptions();
opts.setConnectTimeout(60 * 1000); opts.setConnectTimeout(60 * 1000);
_log.debug("Connecting new I2PSocket..."); _log.debug("Connecting new I2PSocket...");
I2PSocket i2ps = socketMgr.connect(d, opts); I2PSocket i2ps = socketMgr.connect(d, opts);
createSocketHandler(i2ps, id); createSocketHandler(i2ps, id);
return true; return true;
} }
/** /**
@@ -152,15 +178,15 @@ public class SAMStreamSession {
* @return True if the data was sent, false otherwise * @return True if the data was sent, false otherwise
*/ */
public boolean sendBytes(int id, byte[] data) { public boolean sendBytes(int id, byte[] data) {
Destination d = new Destination(); Destination d = new Destination();
SAMStreamSessionSocketHandler handler = getSocketHandler(id); SAMStreamSessionSocketHandler handler = getSocketHandler(id);
if (handler == null) { if (handler == null) {
_log.error("Trying to send bytes through inexistent handler " +id); _log.error("Trying to send bytes through inexistent handler " +id);
return false; return false;
} }
return handler.sendBytes(data); return handler.sendBytes(data);
} }
/** /**
@@ -168,9 +194,11 @@ public class SAMStreamSession {
* *
*/ */
public void close() { public void close() {
server.stopRunning(); if (server != null) {
removeAllSocketHandlers(); server.stopRunning();
recv.stopStreamReceiving(); }
removeAllSocketHandlers();
recv.stopStreamReceiving();
} }
/** /**
@@ -179,13 +207,13 @@ public class SAMStreamSession {
* @param id Connection id * @param id Connection id
*/ */
public boolean closeConnection(int id) { public boolean closeConnection(int id) {
if (!checkSocketHandlerId(id)) { if (!checkSocketHandlerId(id)) {
_log.debug("The specified id (" + id + ") does not exist!"); _log.debug("The specified id (" + id + ") does not exist!");
return false; return false;
} }
removeSocketHandler(id); removeSocketHandler(id);
return true; return true;
} }
/** /**
@@ -197,34 +225,34 @@ public class SAMStreamSession {
* @return An id associated to the socket handler * @return An id associated to the socket handler
*/ */
private int createSocketHandler(I2PSocket s, int id) { private int createSocketHandler(I2PSocket s, int id) {
SAMStreamSessionSocketHandler handler; SAMStreamSessionSocketHandler handler;
if (id == 0) { if (id == 0) {
id = createUniqueId(); id = createUniqueId();
} }
try { try {
handler = new SAMStreamSessionSocketHandler(s, id); handler = new SAMStreamSessionSocketHandler(s, id);
} catch (IOException e) { } catch (IOException e) {
_log.error("IOException when creating SAM STREAM session socket handler", e); _log.error("IOException when creating SAM STREAM session socket handler", e);
recv.stopStreamReceiving(); recv.stopStreamReceiving();
return 0; return 0;
} }
synchronized (handlersMapLock) { synchronized (handlersMapLock) {
handlersMap.put(new Integer(id), handler); handlersMap.put(new Integer(id), handler);
} }
I2PThread t = new I2PThread(handler, "SAMStreamSessionSocketHandler"); I2PThread t = new I2PThread(handler, "SAMStreamSessionSocketHandler");
t.start(); t.start();
return id; return id;
} }
/* Create an unique id, either positive or negative */ /* Create an unique id, either positive or negative */
private int createUniqueId() { private int createUniqueId() {
synchronized (idLock) { synchronized (idLock) {
return --lastNegativeId; return --lastNegativeId;
} }
} }
/** /**
@@ -233,9 +261,9 @@ public class SAMStreamSession {
* @param id Handler id * @param id Handler id
*/ */
private SAMStreamSessionSocketHandler getSocketHandler(int id) { private SAMStreamSessionSocketHandler getSocketHandler(int id) {
synchronized (handlersMapLock) { synchronized (handlersMapLock) {
return (SAMStreamSessionSocketHandler)handlersMap.get(new Integer(id)); return (SAMStreamSessionSocketHandler)handlersMap.get(new Integer(id));
} }
} }
/** /**
@@ -244,9 +272,9 @@ public class SAMStreamSession {
* @param id Handler id * @param id Handler id
*/ */
private boolean checkSocketHandlerId(int id) { private boolean checkSocketHandlerId(int id) {
synchronized (handlersMapLock) { synchronized (handlersMapLock) {
return (!(handlersMap.get(new Integer(id)) == null)); return (!(handlersMap.get(new Integer(id)) == null));
} }
} }
/** /**
@@ -255,42 +283,41 @@ public class SAMStreamSession {
* @param id Handler id to be removed * @param id Handler id to be removed
*/ */
private void removeSocketHandler(int id) { private void removeSocketHandler(int id) {
SAMStreamSessionSocketHandler removed; SAMStreamSessionSocketHandler removed;
synchronized (handlersMapLock) { synchronized (handlersMapLock) {
removed = (SAMStreamSessionSocketHandler)handlersMap.remove(new Integer(id)); removed = (SAMStreamSessionSocketHandler)handlersMap.remove(new Integer(id));
} }
if (removed == null) { if (removed == null) {
_log.error("BUG! Trying to remove inexistent SAM STREAM session socket handler " + id); _log.error("BUG! Trying to remove inexistent SAM STREAM session socket handler " + id);
recv.stopStreamReceiving(); recv.stopStreamReceiving();
} else { } else {
removed.stopRunning(); removed.stopRunning();
_log.debug("Removed SAM STREAM session socket handler " + id); _log.debug("Removed SAM STREAM session socket handler " + id);
} }
} }
/** /**
* Remove and close all the socket handlers managed by this SAM * Remove and close all the socket handlers managed by this SAM
* STREAM session. * STREAM session.
* *
* @param id Handler id to be removed
*/ */
private void removeAllSocketHandlers() { private void removeAllSocketHandlers() {
Integer id; Integer id;
Set keySet; Set keySet;
Iterator iter; Iterator iter;
synchronized (handlersMapLock) { synchronized (handlersMapLock) {
keySet = handlersMap.keySet(); keySet = handlersMap.keySet();
iter = keySet.iterator(); iter = keySet.iterator();
while (iter.hasNext()) { while (iter.hasNext()) {
id = (Integer)iter.next(); id = (Integer)iter.next();
((SAMStreamSessionSocketHandler)handlersMap.get(id)).stopRunning(); ((SAMStreamSessionSocketHandler)handlersMap.get(id)).stopRunning();
} }
handlersMap.clear(); handlersMap.clear();
} }
} }
/** /**
@@ -301,76 +328,76 @@ public class SAMStreamSession {
*/ */
public class SAMStreamSessionServer implements Runnable { public class SAMStreamSessionServer implements Runnable {
private Object runningLock = new Object(); private Object runningLock = new Object();
private boolean stillRunning = true; private boolean stillRunning = true;
private I2PServerSocket serverSocket = null; private I2PServerSocket serverSocket = null;
/** /**
* Create a new SAM STREAM session server * Create a new SAM STREAM session server
* *
*/ */
public SAMStreamSessionServer() { public SAMStreamSessionServer() {
_log.debug("Instantiating new SAM STREAM session server"); _log.debug("Instantiating new SAM STREAM session server");
serverSocket = socketMgr.getServerSocket(); serverSocket = socketMgr.getServerSocket();
} }
/** /**
* Stop a SAM STREAM session server * Stop a SAM STREAM session server
* *
*/ */
public void stopRunning() { public void stopRunning() {
_log.debug("SAMStreamSessionServer.stopRunning() invoked"); _log.debug("SAMStreamSessionServer.stopRunning() invoked");
synchronized (runningLock) { synchronized (runningLock) {
if (stillRunning) { if (stillRunning) {
stillRunning = false; stillRunning = false;
try { try {
serverSocket.close(); serverSocket.close();
} catch (I2PException e) { } catch (I2PException e) {
_log.error("I2PException caught", e); _log.error("I2PException caught", e);
} }
} }
} }
} }
public void run() { public void run() {
_log.debug("SAM STREAM session server running"); _log.debug("SAM STREAM session server running");
I2PSocket i2ps; I2PSocket i2ps;
while (stillRunning) { while (stillRunning) {
try { try {
i2ps = serverSocket.accept(); i2ps = serverSocket.accept();
_log.debug("New incoming connection"); _log.debug("New incoming connection");
int id = createSocketHandler(i2ps, 0); int id = createSocketHandler(i2ps, 0);
if (id == 0) { if (id == 0) {
_log.error("SAM STREAM session handler not created!"); _log.error("SAM STREAM session handler not created!");
i2ps.close(); i2ps.close();
continue; continue;
} }
_log.debug("New connection id: " + id); _log.debug("New connection id: " + id);
recv.notifyStreamConnection(id, i2ps.getPeerDestination()); recv.notifyStreamConnection(id, i2ps.getPeerDestination());
} catch (I2PException e) { } catch (I2PException e) {
_log.debug("Caught I2PException", e); _log.debug("Caught I2PException", e);
break; break;
} catch (IOException e) { } catch (IOException e) {
_log.debug("Caught IOException", e); _log.debug("Caught IOException", e);
break; break;
} }
} }
try { try {
serverSocket.close(); // In case it wasn't closed, yet serverSocket.close(); // In case it wasn't closed, yet
} catch (I2PException e) { } catch (I2PException e) {
_log.debug("Caught I2PException", e); _log.debug("Caught I2PException", e);
} }
_log.debug("Shutting down SAM STREAM session server"); _log.debug("Shutting down SAM STREAM session server");
} }
} }
/** /**
@@ -380,109 +407,109 @@ public class SAMStreamSession {
* @author human * @author human
*/ */
public class SAMStreamSessionSocketHandler implements Runnable { public class SAMStreamSessionSocketHandler implements Runnable {
private I2PSocket i2pSocket = null; private I2PSocket i2pSocket = null;
private OutputStream i2pSocketOS = null; private OutputStream i2pSocketOS = null;
private Object runningLock = new Object(); private Object runningLock = new Object();
private boolean stillRunning = true; private boolean stillRunning = true;
private int id; private int id;
/** /**
* Create a new SAM STREAM session socket handler * Create a new SAM STREAM session socket handler
* *
* @param s Socket to be handled * @param s Socket to be handled
* @param id Unique id assigned to the handler * @param id Unique id assigned to the handler
*/ */
public SAMStreamSessionSocketHandler(I2PSocket s, int id) throws IOException { public SAMStreamSessionSocketHandler(I2PSocket s, int id) throws IOException {
_log.debug("Instantiating new SAM STREAM session socket handler"); _log.debug("Instantiating new SAM STREAM session socket handler");
i2pSocket = s; i2pSocket = s;
i2pSocketOS = s.getOutputStream(); i2pSocketOS = s.getOutputStream();
this.id = id; this.id = id;
} }
/** /**
* Send bytes through the SAM STREAM session socket handler * Send bytes through the SAM STREAM session socket handler
* *
* @param data Data to be sent * @param data Data to be sent
* *
* @return True if data has been sent without errors, false otherwise * @return True if data has been sent without errors, false otherwise
*/ */
public boolean sendBytes(byte[] data) { public boolean sendBytes(byte[] data) {
if (_log.shouldLog(Log.DEBUG)) { if (_log.shouldLog(Log.DEBUG)) {
_log.debug("Handler " + id + ": sending " + data.length _log.debug("Handler " + id + ": sending " + data.length
+ " bytes"); + " bytes");
} }
try { try {
i2pSocketOS.write(data); i2pSocketOS.write(data);
} catch (IOException e) { } catch (IOException e) {
_log.error("Error sending data through I2P socket", e); _log.error("Error sending data through I2P socket", e);
return false; return false;
} }
return true; return true;
} }
/** /**
* Stop a SAM STREAM session socket handler * Stop a SAM STREAM session socket handler
* *
*/ */
public void stopRunning() { public void stopRunning() {
_log.debug("stopRunning() invoked on socket handler " + id); _log.debug("stopRunning() invoked on socket handler " + id);
synchronized (runningLock) { synchronized (runningLock) {
if (stillRunning) { if (stillRunning) {
stillRunning = false; stillRunning = false;
try { try {
i2pSocket.close(); i2pSocket.close();
} catch (IOException e) { } catch (IOException e) {
_log.debug("Caught IOException", e); _log.debug("Caught IOException", e);
} }
} }
} }
} }
public void run() { public void run() {
_log.debug("SAM STREAM session socket handler running"); _log.debug("SAM STREAM session socket handler running");
int read = -1; int read = -1;
byte[] data = new byte[SOCKET_HANDLER_BUF_SIZE]; byte[] data = new byte[SOCKET_HANDLER_BUF_SIZE];
try { try {
InputStream in = i2pSocket.getInputStream(); InputStream in = i2pSocket.getInputStream();
while (stillRunning) { while (stillRunning) {
read = in.read(data); read = in.read(data);
if (read == -1) { if (read == -1) {
_log.debug("Handler " + id + ": connection closed"); _log.debug("Handler " + id + ": connection closed");
break; break;
} }
recv.receiveStreamBytes(id, data, read); recv.receiveStreamBytes(id, data, read);
} }
} catch (IOException e) { } catch (IOException e) {
_log.debug("Caught IOException", e); _log.debug("Caught IOException", e);
} }
try { try {
i2pSocket.close(); i2pSocket.close();
} catch (IOException e) { } catch (IOException e) {
_log.debug("Caught IOException", e); _log.debug("Caught IOException", e);
} }
if (stillRunning) { if (stillRunning) {
removeSocketHandler(id); removeSocketHandler(id);
// FIXME: we need error reporting here! // FIXME: we need error reporting here!
try { try {
recv.notifyStreamDisconnection(id, "OK", null); recv.notifyStreamDisconnection(id, "OK", null);
} catch (IOException e) { } catch (IOException e) {
_log.debug("Error sending disconnection notice for handler " _log.debug("Error sending disconnection notice for handler "
+ id, e); + id, e);
} }
} }
_log.debug("Shutting down SAM STREAM session socket handler " +id); _log.debug("Shutting down SAM STREAM session socket handler " +id);
} }
} }
} }

File diff suppressed because it is too large Load Diff