diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java index 1dc336b71..a822498b2 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java @@ -9,12 +9,16 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InterruptedIOException; import java.io.UnsupportedEncodingException; +import java.net.ConnectException; +import java.net.NoRouteToHostException; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.Set; import net.i2p.I2PException; import net.i2p.client.I2PSession; +import net.i2p.client.I2PSessionException; import net.i2p.client.I2PSessionListener; import net.i2p.data.Base64; import net.i2p.data.Destination; @@ -37,8 +41,6 @@ public class I2PSocketManager implements I2PSessionListener { private HashMap _inSockets; private I2PSocketOptions _defaultOptions; - public static final int PUBKEY_LENGTH = 387; - public I2PSocketManager() { _session = null; _inSockets = new HashMap(16); @@ -135,54 +137,48 @@ public class I2PSocketManager implements I2PSessionListener { case 0xA1: // SYN incoming _log.debug("*Syn!"); - if (payload.length == PUBKEY_LENGTH) { - String newLocalID = makeID(_inSockets); - Destination d = new Destination(); - d.readBytes(new ByteArrayInputStream(payload)); - - if (_serverSocket == null) { - // The app did not instantiate an I2PServerSocket - byte[] packet = makePacket((byte) 0x52, id, newLocalID.getBytes("ISO-8859-1")); - boolean replySentOk = false; - synchronized (_session) { - replySentOk = _session.sendMessage(d, packet); - } - if (!replySentOk) { - _log.error("Error sending close to " + d.calculateHash().toBase64() - + " in response to a new con message", new Exception("Failed creation")); - } - return; + String newLocalID = makeID(_inSockets); + Destination d = new Destination(); + d.readBytes(new ByteArrayInputStream(payload)); + + if (_serverSocket == null) { + // The app did not instantiate an I2PServerSocket + byte[] packet = makePacket((byte) 0x52, id, newLocalID.getBytes("ISO-8859-1")); + boolean replySentOk = false; + synchronized (_session) { + replySentOk = _session.sendMessage(d, packet); } - - s = new I2PSocketImpl(d, this, false, newLocalID); - s.setRemoteID(id); - if (_serverSocket.getNewSocket(s)) { - _inSockets.put(newLocalID, s); - byte[] packet = makePacket((byte) 0x51, id, newLocalID.getBytes("ISO-8859-1")); - boolean replySentOk = false; - synchronized (_session) { - replySentOk = _session.sendMessage(d, packet); - } - if (!replySentOk) { - _log.error("Error sending reply to " + d.calculateHash().toBase64() - + " in response to a new con message", new Exception("Failed creation")); - s.internalClose(); - } - } else { - byte[] packet = (" " + id).getBytes("ISO-8859-1"); - packet[0] = 0x52; - boolean nackSent = session.sendMessage(d, packet); - if (!nackSent) { - _log.error("Error sending NACK for session creation"); - } - s.internalClose(); + if (!replySentOk) { + _log.error("Error sending close to " + d.calculateHash().toBase64() + + " in response to a new con message", new Exception("Failed creation")); } return; - } else { - _log.error("Syn packet that has a payload not equal to the pubkey length (" + payload.length - + " != " + PUBKEY_LENGTH + ")"); - return; } + + s = new I2PSocketImpl(d, this, false, newLocalID); + s.setRemoteID(id); + if (_serverSocket.getNewSocket(s)) { + _inSockets.put(newLocalID, s); + byte[] packet = makePacket((byte) 0x51, id, newLocalID.getBytes("ISO-8859-1")); + boolean replySentOk = false; + synchronized (_session) { + replySentOk = _session.sendMessage(d, packet); + } + if (!replySentOk) { + _log.error("Error sending reply to " + d.calculateHash().toBase64() + + " in response to a new con message", new Exception("Failed creation")); + s.internalClose(); + } + } else { + byte[] packet = (" " + id).getBytes("ISO-8859-1"); + packet[0] = 0x52; + boolean nackSent = session.sendMessage(d, packet); + if (!nackSent) { + _log.error("Error sending NACK for session creation"); + } + s.internalClose(); + } + return; case 0xA2: // disconnect incoming _log.debug("*Disconnect incoming!"); @@ -254,9 +250,15 @@ public class I2PSocketManager implements I2PSessionListener { /** * Create a new connected socket (block until the socket is created) * - * @throws I2PException if there is a problem connecting + * @param peer Destination to connect to + * @param options I2P socket options to be used for connecting + * + * @throws ConnectException if the peer refuses the connection + * @throws NoRouteToHostException if the peer is not found or not reachable + * @throws InterruptedException if the connection timeouts + * @throws I2PException if there is some other I2P-related problem */ - public I2PSocket connect(Destination peer, I2PSocketOptions options) throws I2PException { + public I2PSocket connect(Destination peer, I2PSocketOptions options) throws I2PException, ConnectException, NoRouteToHostException, InterruptedException { String localID, lcID; I2PSocketImpl s; @@ -280,11 +282,11 @@ public class I2PSocketManager implements I2PSessionListener { synchronized (lock) { _outSockets.remove(s.getLocalID()); } - throw new I2PException("Unable to reach peer"); + throw new I2PException("Error sending through I2P network"); } remoteID = s.getRemoteID(true, options.getConnectTimeout()); - if (remoteID == null) { throw new I2PException("Peer refused connection"); } - if ("".equals(remoteID)) { throw new I2PException("Unable to reach peer"); } + if (remoteID == null) { throw new ConnectException("Connection refused by peer"); } + if ("".equals(remoteID)) { throw new NoRouteToHostException("Unable to reach peer"); } _log.debug("TIMING: s given out for remoteID " + getReadableForm(remoteID)); return s; } catch (InterruptedIOException ioe) { @@ -292,13 +294,17 @@ public class I2PSocketManager implements I2PSessionListener { synchronized (lock) { _outSockets.remove(s.getLocalID()); } - throw new I2PException("Timeout waiting for ack"); + throw new InterruptedException("Timeout waiting for ack"); + } catch (ConnectException ex) { + throw ex; + } catch (NoRouteToHostException ex) { + throw ex; } catch (IOException ex) { _log.error("Error sending syn on id " + getReadableForm(lcID), ex); synchronized (lock) { _outSockets.remove(s.getLocalID()); } - throw new I2PException("IOException occurred"); + throw new I2PException("Unhandled IOException occurred"); } catch (I2PException ex) { _log.info("Error sending syn on id " + getReadableForm(lcID), ex); synchronized (lock) { @@ -308,10 +314,78 @@ public class I2PSocketManager implements I2PSessionListener { } } - public I2PSocket connect(Destination peer) throws I2PException { + /** + * Create a new connected socket (block until the socket is created) + * + * @param peer Destination to connect to + * + * @throws ConnectException if the peer refuses the connection + * @throws NoRouteToHostException if the peer is not found or not reachable + * @throws InterruptedException if the connection timeouts + * @throws I2PException if there is some other I2P-related problem + */ + public I2PSocket connect(Destination peer) throws I2PException, ConnectException, NoRouteToHostException, InterruptedException { return connect(peer, null); } + /** + * Destroy the socket manager, freeing all the associated resources. This + * method will block untill all the managed sockets are closed. + * + */ + public void destroySocketManager() { + + try { + if (_serverSocket != null) { + _serverSocket.close(); + _serverSocket = null; + } + } catch (I2PException ex) { + _log.error("Error closing I2PServerSocket", ex); + } + + synchronized (lock) { + Iterator iter; + String id = null; + I2PSocketImpl sock; + + iter = _inSockets.keySet().iterator(); + while (iter.hasNext()) { + id = (String)iter.next(); + sock = (I2PSocketImpl)_inSockets.get(id); + _log.debug("Closing inSocket \"" + + getReadableForm(sock.getLocalID()) + "\""); + sock.internalClose(); + } + + iter = _outSockets.keySet().iterator(); + while (iter.hasNext()) { + id = (String)iter.next(); + sock = (I2PSocketImpl)_outSockets.get(id); + _log.debug("Closing outSocket \"" + + getReadableForm(sock.getLocalID()) + "\""); + sock.internalClose(); + } + } + + _log.debug("Waiting for all open sockets to really close..."); + synchronized (lock) { + while ((_inSockets.size() != 0) || (_outSockets.size() != 0)) { + try { + lock.wait(); + } catch (InterruptedException e) {} + } + } + + try { + _log.debug("Destroying I2P session..."); + _session.destroySession(); + _log.debug("I2P session destroyed"); + } catch (I2PSessionException e) { + _log.error("Error destroying I2P session", e); + } + } + /** * Retrieve a set of currently connected I2PSockets, either initiated locally or remotely. * @@ -341,8 +415,11 @@ public class I2PSocketManager implements I2PSessionListener { public void removeSocket(I2PSocketImpl sock) { synchronized (lock) { + _log.debug("Removing socket \"" + + getReadableForm(sock.getLocalID()) + "\""); _inSockets.remove(sock.getLocalID()); _outSockets.remove(sock.getLocalID()); + lock.notify(); } }