From b64eff9bbb5a8a9d4ebe5c90de9f426465c7a1f7 Mon Sep 17 00:00:00 2001 From: zzz Date: Thu, 8 Sep 2011 13:56:19 +0000 Subject: [PATCH] * Ministreaming: Drop old classes replaced by streaming years ago. --- .../i2p/client/streaming/ByteCollector.java | 298 ------- .../client/streaming/I2PServerSocketImpl.java | 166 ---- .../i2p/client/streaming/I2PSocketImpl.java | 696 --------------- .../streaming/I2PSocketManagerImpl.java | 817 ------------------ 4 files changed, 1977 deletions(-) delete mode 100644 apps/ministreaming/java/src/net/i2p/client/streaming/ByteCollector.java delete mode 100644 apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java delete mode 100644 apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java delete mode 100644 apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerImpl.java diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/ByteCollector.java b/apps/ministreaming/java/src/net/i2p/client/streaming/ByteCollector.java deleted file mode 100644 index 38d96ddb7..000000000 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/ByteCollector.java +++ /dev/null @@ -1,298 +0,0 @@ -package net.i2p.client.streaming; - -/** - * Like a StringBuffer, but for bytes. This class is not internally synchronized, - * so care should be taken when using in a multithreaded environment. - * - * @deprecated Only used by deprecated I2PSocketImpl - */ -class ByteCollector { - byte[] contents; - int size; - - private static final int INITIAL_CAPACITY = 1024; - private static final int SHORT_CAPACITY = 80; - - /** - * New collector with the default initial capacity - * - */ - public ByteCollector() { - this(INITIAL_CAPACITY); - } - - /** - * New collector with an initial capacity as specified - * - */ - public ByteCollector(int capacity) { - contents = new byte[capacity]; - size = 0; - } - - - /** - * New collector containing the specified bytes - * - */ - public ByteCollector(byte[] b) { - this(); - append(b); - } - - /** - * New collector with the specified byte - * - */ - public ByteCollector(byte b) { - this(); - append(b); - } - - /** - * Add a new byte to the collector (extending the buffer if necessary) - * - * @param b byte to add - * @return this object - */ - public ByteCollector append(byte b) { - ensureCapacity(size + 1); - contents[size++] = b; - return this; - } - - /** - * Add new bytes to the collector (extending the buffer if necessary) - * - * @param b bytes to add - * @return this object - */ - public ByteCollector append(byte[] b) { - ensureCapacity(size + b.length); - System.arraycopy(b, 0, contents, size, b.length); - size += b.length; - return this; - } - - /** - * Add new bytes to the collector (extending the buffer if necessary) - * - * @param b byte array to add from - * @param len number of bytes in the array to add - * @return this object - */ - public ByteCollector append(byte[] b, int len) { - return append(b, 0, len); - } - - /** - * Add new bytes to the collector (extending the buffer if necessary) - * - * @param b byte array to add from - * @param off offset into the array to begin adding from - * @param len number of bytes in the array to add - * @return this object - */ - public ByteCollector append(byte[] b, int off, int len) { - ensureCapacity(size + len); - System.arraycopy(b, off, contents, size, len); - size += len; - return this; - } - - /** - * Add the contents of the byte collector to the current collector (extending the buffer if necessary) - * - * @param bc collector to copy - * @return this object - */ - public ByteCollector append(ByteCollector bc) { - // optimieren? - return append(bc.toByteArray()); - } - - /** - * Copy the contents of the collector into a new array and return it - * - * @return new array containing a copy of the current collector's data - */ - public byte[] toByteArray() { - byte[] result = new byte[size]; - System.arraycopy(contents, 0, result, 0, size); - return result; - } - - /** - * Pull off the first $maxlen bytes from the collector, shifting the remaining - * bytes into the beginning of the collector's array. - * - * @param maxlen max number of bytes we want to pull from the collector (we will get - * less if the collector doesnt have that many bytes yet) - * @return copy of the bytes pulled from the collector - */ - public byte[] startToByteArray(int maxlen) { - if (size < maxlen) { - byte[] res = toByteArray(); - clear(); - return res; - } else { - byte[] result = new byte[maxlen]; - System.arraycopy(contents, 0, result, 0, maxlen); - System.arraycopy(contents, maxlen, contents, 0, size - maxlen); - size -= maxlen; - return result; - } - } - - /** - * How many bytes are available for retrieval? - * - * @return number of bytes - */ - public int getCurrentSize() { - return size; - } - - /** - * Make sure we have sufficient storage space. - * - * @param cap minimum number of bytes that the buffer should contain - * @return true if the the collector was expanded to meet the minimum, - * false if it was already large enough - */ - public boolean ensureCapacity(int cap) { - if (contents.length < cap) { - int l = contents.length; - while (l < cap) { - l = (l * 3) / 2 + 1; - } - byte[] newcont = new byte[l]; - System.arraycopy(contents, 0, newcont, 0, size); - contents = newcont; - return true; - } - return false; - } - - /** - * Does the collector have meaningful data or is it empty? - * - * @return true if it has no data - */ - public boolean isEmpty() { - return size == 0; - } - - /** - * Search through the collector for the first occurrence of the sequence of - * bytes contained within the specified collector - * - * @param bc bytes that will be searched for - * @return index into the current collector, or -1 if it isn't found - */ - public int indexOf(ByteCollector bc) { - // optimieren? - return indexOf(bc.toByteArray()); - } - - /** - * Search through the collector for the first occurrence of the specified - * byte - * - * @param b byte that will be searched for - * @return index into the current collector, or -1 if it isn't found - */ - public int indexOf(byte b) { - // optimieren? - return indexOf(new byte[] { b}); - } - - /** - * Search through the collector for the first occurrence of the sequence of - * bytes - * - * @param ba bytes that will be searched for - * @return index into the current collector, or -1 if it isn't found - */ - public int indexOf(byte[] ba) { - loop: for (int i = 0; i < size - ba.length + 1; i++) { - for (int j = 0; j < ba.length; j++) { - if (contents[i + j] != ba[j]) continue loop; - } - return i; - } - return -1; - } - - /** - * Empty the collector. This does not affect its capacity. - * - */ - public void clear() { - size = 0; - } - - /** - * Empty the collector and reduce its capacity. - * - */ - public void clearAndShorten() { - size = 0; - contents = new byte[SHORT_CAPACITY]; - } - - /** - * Render the bytes as a string - * - * @return the, uh, string - */ - @Override - public String toString() { - return new String(toByteArray()); - } - - @Override - public int hashCode() { - int h = 0; - for (int i = 0; i < size; i++) { - h += contents[i] * contents[i]; - } - return h; - } - - /** - * Compare the collectors. - * - * @return true if and only if both are the same size and the - * byte arrays they contain are equal. - */ - @Override - public boolean equals(Object o) { - if (o instanceof ByteCollector) { - ByteCollector by = (ByteCollector) o; - if (size != by.size) return false; - for (int i = 0; i < size; i++) { - if (contents[i] != by.contents[i]) return false; - } - return true; - } else { - return false; - } - } - - /** - * Remove the first byte from the collector, shifting its contents accordingly. - * - * @return byte removed - * @throws IllegalArgumentException if the collector is empty - */ - public byte removeFirst() throws IllegalArgumentException { - byte bb = contents[0]; - if (size == 0) throw new IllegalArgumentException("ByteCollector is empty"); - if (size > 1) - System.arraycopy(contents, 1, contents, 0, --size); - else - size = 0; - return bb; - } -} diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java deleted file mode 100644 index c1378b3a1..000000000 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java +++ /dev/null @@ -1,166 +0,0 @@ -package net.i2p.client.streaming; - -import java.net.ConnectException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import net.i2p.I2PAppContext; -import net.i2p.I2PException; -import net.i2p.util.Clock; -import net.i2p.util.Log; - -/** - * Server socket implementation, allowing multiple threads to accept I2PSockets - * and pull from a queue populated by various threads (each of whom have their own - * timeout) - * - * @deprecated use I2PServerSocketFull - */ -class I2PServerSocketImpl implements I2PServerSocket { - private final static Log _log = new Log(I2PServerSocketImpl.class); - private I2PSocketManager mgr; - /** list of sockets waiting for the client to accept them */ - private final List pendingSockets = Collections.synchronizedList(new ArrayList(4)); - - /** have we been closed */ - private volatile boolean closing = false; - - /** lock on this when accepting a pending socket, and wait on it for notification of acceptance */ - private final Object socketAcceptedLock = new Object(); - /** lock on this when adding a new socket to the pending list, and wait on it accordingly */ - private final Object socketAddedLock = new Object(); - - /** - * Set Sock Option accept timeout stub, does nothing in ministreaming - * @param x - */ - public void setSoTimeout(long x) { - } - - /** - * Get Sock Option accept timeout stub, does nothing in ministreaming - * @return timeout - */ - public long getSoTimeout() { - return -1; - } - - public I2PServerSocketImpl(I2PSocketManager mgr) { - this.mgr = mgr; - } - - /** - * Waits for the next socket connecting. If a remote user tried to make a - * connection and the local application wasn't .accept()ing new connections, - * they should get refused (if .accept() doesnt occur in some small period - - * currently 5 seconds) - * - * @return a connected I2PSocket - * - * @throws I2PException if there is a problem with reading a new socket - * from the data available (aka the I2PSession closed, etc) - * @throws ConnectException if the I2PServerSocket is closed - */ - public I2PSocket accept() throws I2PException, ConnectException { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("accept() called, pending: " + pendingSockets.size()); - - I2PSocket ret = null; - - while ( (ret == null) && (!closing) ){ - while (pendingSockets.isEmpty()) { - if (closing) throw new ConnectException("I2PServerSocket closed"); - try { - synchronized(socketAddedLock) { - socketAddedLock.wait(); - } - } catch (InterruptedException ie) {} - } - synchronized (pendingSockets) { - if (!pendingSockets.isEmpty()) { - ret = (I2PSocket)pendingSockets.remove(0); - } - } - if (ret != null) { - synchronized (socketAcceptedLock) { - socketAcceptedLock.notifyAll(); - } - } - } - - if (_log.shouldLog(Log.DEBUG)) - _log.debug("TIMING: handed out accept result " + ret.hashCode()); - return ret; - } - - /** - * Make the socket available and wait until the client app accepts it, or until - * the given timeout elapses. This doesn't have any limits on the queue size - - * perhaps it should add some choking (e.g. after 5 waiting for accept, refuse) - * - * @param timeoutMs how long to wait until accept - * @return true if the socket was accepted, false if the timeout expired - * or the socket was closed - */ - public boolean addWaitForAccept(I2PSocket s, long timeoutMs) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("addWaitForAccept [new socket arrived [" + s.toString() + "], pending: " + pendingSockets.size()); - - if (closing) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Already closing the socket"); - return false; - } - - Clock clock = I2PAppContext.getGlobalContext().clock(); - long start = clock.now(); - long end = start + timeoutMs; - pendingSockets.add(s); - synchronized (socketAddedLock) { - socketAddedLock.notifyAll(); - } - - // keep looping until the socket has been grabbed by the accept() - // (or the expiration passes, or the socket is closed) - while (pendingSockets.contains(s)) { - long now = clock.now(); - if (now >= end) { - if (_log.shouldLog(Log.INFO)) - _log.info("Expired while waiting for accept (time elapsed =" + (now - start) + "ms) for socket " + s.toString()); - pendingSockets.remove(s); - return false; - } - if (closing) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Server socket closed while waiting for accept"); - pendingSockets.remove(s); - return false; - } - long remaining = end - now; - try { - synchronized (socketAcceptedLock) { - socketAcceptedLock.wait(remaining); - } - } catch (InterruptedException ie) {} - } - long now = clock.now(); - if (_log.shouldLog(Log.DEBUG)) - _log.info("Socket accepted after " + (now-start) + "ms for socket " + s.toString()); - return true; - } - - public void close() { - closing = true; - // let anyone .accept()ing know to fsck off - synchronized (socketAddedLock) { - socketAddedLock.notifyAll(); - } - // let anyone addWaitForAccept()ing know to fsck off - synchronized (socketAcceptedLock) { - socketAcceptedLock.notifyAll(); - } - } - - public I2PSocketManager getManager() { return mgr; } -} diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java deleted file mode 100644 index 05ff0631e..000000000 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java +++ /dev/null @@ -1,696 +0,0 @@ -package net.i2p.client.streaming; - -import java.io.IOException; -import java.io.InputStream; -import java.io.InterruptedIOException; -import java.io.OutputStream; - -import net.i2p.I2PAppContext; -import net.i2p.I2PException; -import net.i2p.client.I2PSession; -import net.i2p.client.I2PSessionException; -import net.i2p.data.Destination; -import net.i2p.util.Clock; -import net.i2p.util.I2PThread; -import net.i2p.util.Log; - - -/** - * Initial stub implementation for the socket - * - * @deprecated use I2PSocketFull - */ -class I2PSocketImpl implements I2PSocket { - private final static Log _log = new Log(I2PSocketImpl.class); - - public static final int MAX_PACKET_SIZE = 1024 * 32; - public static final int PACKET_DELAY = 100; - - private I2PSocketManagerImpl manager; - private Destination local; - private Destination remote; - private String localID; - private String remoteID; - private final Object remoteIDWaiter = new Object(); - private I2PInputStream in; - private I2POutputStream out; - private I2PSocket.SocketErrorListener _socketErrorListener; - private boolean outgoing; - private long _socketId; - private static long __socketId = 0; - private long _bytesRead = 0; - private long _bytesWritten = 0; - private long _createdOn; - private long _closedOn; - private long _remoteIdSetTime; - private I2PSocketOptions _options; - private final Object flagLock = new Object(); - - /** - * Whether the I2P socket has already been closed. - */ - private boolean closed = false; - - /** - * Whether to send out a close packet when the socket is - * closed. (If the socket is closed because of an incoming close - * packet, we need not send one.) - */ - private boolean sendClose = true; - - /** - * Whether the I2P socket has already been closed and all data - * (from I2P to the app, dunno whether to call this incoming or - * outgoing) has been processed. - */ - private boolean closed2 = false; - - /** - * @param peer who this socket is (or should be) connected to - * @param mgr how we talk to the network - * @param outgoing did we initiate the connection (true) or did we receive it (false)? - * @param localID what is our half of the socket ID? - */ - public I2PSocketImpl(Destination peer, I2PSocketManagerImpl mgr, boolean outgoing, String localID) { - this.outgoing = outgoing; - manager = mgr; - remote = peer; - _socketId = ++__socketId; - local = mgr.getSession().getMyDestination(); - String us = mgr.getSession().getMyDestination().calculateHash().toBase64().substring(0,4); - String name = us + (outgoing ? "->" : "<-") + peer.calculateHash().toBase64().substring(0,4); - in = new I2PInputStream(name + " in"); - I2PInputStream pin = new I2PInputStream(name + " out"); - out = new I2POutputStream(pin); - new I2PSocketRunner(pin); - this.localID = localID; - _createdOn = I2PAppContext.getGlobalContext().clock().now(); - _remoteIdSetTime = -1; - _closedOn = -1; - _options = mgr.getDefaultOptions(); - } - - /** - * Our half of the socket's unique ID - * - */ - public String getLocalID() { - return localID; - } - - /** - * We've received the other side's half of the socket's unique ID - */ - public void setRemoteID(String id) { - synchronized (remoteIDWaiter) { - remoteID = id; - _remoteIdSetTime = System.currentTimeMillis(); - remoteIDWaiter.notifyAll(); - } - } - - /** - * Retrieve the other side's half of the socket's unique ID, or null if it - * isn't known yet - * - * @param wait if true, we should wait until we receive it from the peer, otherwise - * return what we know immediately (which may be null) - */ - public String getRemoteID(boolean wait) { - try { - return getRemoteID(wait, -1); - } catch (InterruptedIOException iie) { - _log.error("wtf, we said we didn't want it to time out! you smell", iie); - return null; - } - } - - /** - * Retrieve the other side's half of the socket's unique ID, or null if it isn't - * known yet and we were instructed not to wait - * - * @param wait should we wait for the peer to send us their half of the ID, or - * just return immediately? - * @param maxWait if we're going to wait, after how long should we timeout and fail? - * (if this value is < 0, we wait indefinitely) - * @throws InterruptedIOException when the max waiting period has been exceeded - */ - public String getRemoteID(boolean wait, long maxWait) throws InterruptedIOException { - long dieAfter = System.currentTimeMillis() + maxWait; - synchronized (remoteIDWaiter) { - if (wait) { - if (remoteID == null) { - try { - if (maxWait >= 0) - remoteIDWaiter.wait(maxWait); - else - remoteIDWaiter.wait(); - } catch (InterruptedException ex) { - } - } - - long now = System.currentTimeMillis(); - if ((maxWait >= 0) && (now >= dieAfter)) { - long waitedExcess = now - dieAfter; - throw new InterruptedIOException("Timed out waiting for remote ID (waited " + waitedExcess - + "ms too long [" + maxWait + "ms, remId " + remoteID - + ", remId set " + (now-_remoteIdSetTime) + "ms ago])"); - } - - if (_log.shouldLog(Log.DEBUG)) - _log.debug("TIMING: RemoteID set to " - + I2PSocketManagerImpl.getReadableForm(remoteID) + " for " - + this.hashCode()); - } - return remoteID; - } - } - - /** - * Retrieve the other side's half of the socket's unique ID, or null if it - * isn't known yet. This does not wait - * - */ - public String getRemoteID() { - return getRemoteID(false); - } - - /** - * The other side has given us some data, so inject it into our socket's - * inputStream - * - * @param data the data to inject into our local inputStream - */ - public void queueData(byte[] data) { - _bytesRead += data.length; - try { - in.queueData(data, false); - } catch (IOException ioe) { - _log.log(Log.CRIT, "wtf, we said DONT block, how can we timeout?", ioe); - } - } - - /** - * Return the Destination of this side of the socket. - */ - public Destination getThisDestination() { - return local; - } - - /** - * Return the destination of the peer. - */ - public Destination getPeerDestination() { - return remote; - } - - /** - * Return an InputStream to read from the socket. - */ - public InputStream getInputStream() throws IOException { - if ((in == null)) throw new IOException("Not connected"); - return in; - } - - /** - * Return an OutputStream to write into the socket. - */ - public OutputStream getOutputStream() throws IOException { - if ((out == null)) throw new IOException("Not connected"); - return out; - } - - /** - * Closes the socket if not closed yet (from the Application - * side). - */ - public void close() throws IOException { - synchronized (flagLock) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Closing connection"); - closed = true; - _closedOn = I2PAppContext.getGlobalContext().clock().now(); - } - out.close(); - in.notifyClosed(); - } - - public boolean isClosed() { return _closedOn > 0; } - - /** - * Close the socket from the I2P side (by a close packet) - */ - protected void internalClose() { - synchronized (flagLock) { - closed = true; - closed2 = true; - sendClose = false; - _closedOn = I2PAppContext.getGlobalContext().clock().now(); - } - out.close(); - in.notifyClosed(); - } - - private byte getMask(int add) { - if (outgoing) - return (byte)(I2PSocketManagerImpl.DATA_IN + (byte)add); - else - return (byte)(I2PSocketManagerImpl.DATA_OUT + (byte)add); - } - - public void setOptions(I2PSocketOptions options) { - _options = options; - in.setReadTimeout(options.getReadTimeout()); - } - - public I2PSocketOptions getOptions() { - return _options; - } - - /** - * How long we will wait blocked on a read() operation. This is simply a - * helper to query the I2PSocketOptions - * - * @return milliseconds to wait, or -1 if we will wait indefinitely - */ - public long getReadTimeout() { - return _options.getReadTimeout(); - } - - /** - * Define how long we will wait blocked on a read() operation (-1 will make - * the socket wait forever). This is simply a helper to adjust the - * I2PSocketOptions - * - */ - public void setReadTimeout(long ms) { - _options.setReadTimeout(ms); - in.setReadTimeout(ms); - } - - public void setSocketErrorListener(I2PSocket.SocketErrorListener lsnr) { - _socketErrorListener = lsnr; - } - - void errorOccurred() { - if (_socketErrorListener != null) - _socketErrorListener.errorOccurred(); - } - - public long getBytesSent() { return _bytesWritten; } - public long getBytesReceived() { return _bytesRead; } - public long getCreatedOn() { return _createdOn; } - public long getClosedOn() { return _closedOn; } - - /** - * The remote port. - * @return 0 always - * @since 0.8.9 - */ - public int getPort() { - return I2PSession.PORT_UNSPECIFIED; - } - - /** - * The local port. - * @return 0 always - * @since 0.8.9 - */ - public int getLocalPort() { - return I2PSession.PORT_UNSPECIFIED; - } - - - private String getPrefix() { return "[" + _socketId + "]: "; } - - //-------------------------------------------------- - private class I2PInputStream extends InputStream { - private String streamName; - private final ByteCollector bc = new ByteCollector(); - private boolean inStreamClosed = false; - - private long readTimeout = -1; - - public I2PInputStream(String name) { - streamName = name; - } - - public long getReadTimeout() { - return readTimeout; - } - - private String getStreamPrefix() { - return getPrefix() + streamName + ": "; - } - - public void setReadTimeout(long ms) { - readTimeout = ms; - } - - public int read() throws IOException { - byte[] b = new byte[1]; - int res = read(b); - if (res == 1) return b[0] & 0xff; - if (res == -1) return -1; - throw new RuntimeException("Incorrect read() result"); - } - - // I have to ask if this method is really needed, since the JDK has this already, - // including the timeouts. Perhaps the need is for debugging more than anything - // else? - @Override - public int read(byte[] b, int off, int len) throws IOException { - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getStreamPrefix() + "Read called for " + len + " bytes (avail=" - + bc.getCurrentSize() + "): " + this.hashCode()); - if (len == 0) return 0; - long dieAfter = System.currentTimeMillis() + readTimeout; - byte[] read = null; - synchronized (bc) { - read = bc.startToByteArray(len); - bc.notifyAll(); - } - boolean timedOut = false; - - while ( (read.length == 0) && (!inStreamClosed) ) { - synchronized (flagLock) { - if (closed) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getStreamPrefix() + "Closed is set after reading " - + _bytesRead + " and writing " + _bytesWritten - + ", so closing stream: " + hashCode()); - return -1; - } - } - try { - synchronized (I2PSocketImpl.I2PInputStream.this) { - if (readTimeout >= 0) { - wait(readTimeout); - } else { - wait(); - } - } - } catch (InterruptedException ex) {} - - if ((readTimeout >= 0) - && (System.currentTimeMillis() >= dieAfter)) { - throw new InterruptedIOException(getStreamPrefix() + "Timeout reading from I2PSocket (" - + readTimeout + " msecs)"); - } - - synchronized (bc) { - read = bc.startToByteArray(len); - bc.notifyAll(); - } - } - if (read.length > len) throw new RuntimeException("BUG"); - if ( (inStreamClosed) && (read.length <= 0) ) - return -1; - - System.arraycopy(read, 0, b, off, read.length); - - if (_log.shouldLog(Log.DEBUG)) { - _log.debug(getStreamPrefix() + "Read from I2PInputStream " + hashCode() + " returned " - + read.length + " bytes"); - } - //if (_log.shouldLog(Log.DEBUG)) { - // _log.debug("Read from I2PInputStream " + this.hashCode() - // + " returned "+read.length+" bytes:\n" - // + HexDump.dump(read)); - //} - return read.length; - } - - /** - * @return 0 if empty, > 0 if there is data. - */ - @Override - public int available() { - synchronized (bc) { - return bc.getCurrentSize(); - } - } - - /** - * Add the data to the queue - * - * @param allowBlock if true, we will block if the buffer and the socket options - * say so, otherwise we simply take the data regardless. - * @throws InterruptedIOException if the queue's buffer is full, the socket has - * a write timeout, and that timeout is exceeded - * @throws IOException if the connection was closed while queueing up the data - */ - void queueData(byte[] data, boolean allowBlock) throws InterruptedIOException, IOException { - queueData(data, 0, data.length, allowBlock); - } - - /** - * Add the data to the queue - * - * @param allowBlock if true, we will block if the buffer and the socket options - * say so, otherwise we simply take the data regardless. - * @throws InterruptedIOException if the queue's buffer is full, the socket has - * a write timeout, and that timeout is exceeded - * @throws IOException if the connection was closed while queueing up the data - */ - public void queueData(byte[] data, int off, int len, boolean allowBlock) throws InterruptedIOException, IOException { - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getStreamPrefix() + "Insert " + len + " bytes into queue: " + hashCode()); - Clock clock = I2PAppContext.getGlobalContext().clock(); - long endAfter = clock.now() + _options.getWriteTimeout(); - synchronized (bc) { - if (allowBlock) { - if (_options.getMaxBufferSize() > 0) { - while (bc.getCurrentSize() > _options.getMaxBufferSize()) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getStreamPrefix() + "Buffer size exceeded: pending " - + bc.getCurrentSize() + " limit " + _options.getMaxBufferSize()); - if (_options.getWriteTimeout() > 0) { - long timeLeft = endAfter - clock.now(); - if (timeLeft <= 0) { - long waited = _options.getWriteTimeout() - timeLeft; - throw new InterruptedIOException(getStreamPrefix() + "Waited too long (" - + waited + "ms) to write " - + len + " with a buffer at " + bc.getCurrentSize()); - } - } - if (inStreamClosed) - throw new IOException(getStreamPrefix() + "Stream closed while writing"); - if (_closedOn > 0) - throw new IOException(getStreamPrefix() + "I2PSocket closed while writing"); - try { - bc.wait(1000); - } catch (InterruptedException ie) {} - } - } - } - bc.append(data, off, len); - } - synchronized (I2PInputStream.this) { - I2PInputStream.this.notifyAll(); - } - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getStreamPrefix() + "After insert " + len + " bytes into queue: " + hashCode()); - } - - public void notifyClosed() { - synchronized (I2PInputStream.this) { - I2PInputStream.this.notifyAll(); - } - } - - @Override - public void close() throws IOException { - super.close(); - notifyClosed(); - synchronized (bc) { - inStreamClosed = true; - bc.notifyAll(); - } - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getStreamPrefix() + "After close"); - } - - } - - private class I2POutputStream extends OutputStream { - - public I2PInputStream sendTo; - - public I2POutputStream(I2PInputStream sendTo) { - this.sendTo = sendTo; - } - - public void write(int b) throws IOException { - write(new byte[] { (byte) b}); - } - - // This override is faster than the built in JDK, - // but there are other variations not handled - @Override - public void write(byte[] b, int off, int len) throws IOException { - _bytesWritten += len; - sendTo.queueData(b, off, len, true); - } - - @Override - public void close() { - sendTo.notifyClosed(); - } - } - - private static volatile long __runnerId = 0; - private class I2PSocketRunner extends I2PThread { - - public InputStream in; - - public I2PSocketRunner(InputStream in) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getPrefix() + "Runner's input stream is: " + in.hashCode()); - this.in = in; - String peer = I2PSocketImpl.this.remote.calculateHash().toBase64(); - setName("SocketRunner " + (++__runnerId) + "/" + _socketId + " " + peer.substring(0, 4)); - start(); - } - - /** - * Pump some more data - * - * @return true if we should keep on handling, false otherwise - */ - private boolean handleNextPacket(ByteCollector bc, byte buffer[]) - throws IOException, I2PSessionException { - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getPrefix() + ":" + Thread.currentThread().getName() + "handleNextPacket"); - int len = in.read(buffer); - int bcsize = 0; - synchronized (bc) { - bcsize = bc.getCurrentSize(); - } - - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getPrefix() + ":" + Thread.currentThread().getName() + "handleNextPacket len=" + len + " bcsize=" + bcsize); - - if (len != -1) { - synchronized (bc) { - bc.append(buffer, len); - } - } else if (bcsize == 0) { - // nothing left in the buffer, and read(..) got EOF (-1). - // the bart the - return false; - } - if ((bcsize < MAX_PACKET_SIZE) && (in.available() == 0)) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getPrefix() + ":" + Thread.currentThread().getName() + "Runner Point d: " + hashCode()); - - try { - Thread.sleep(PACKET_DELAY); - } catch (InterruptedException e) { - _log.warn("wtf", e); - } - } - if ((bcsize >= MAX_PACKET_SIZE) || (in.available() == 0)) { - byte data[] = null; - synchronized (bc) { - data = bc.startToByteArray(MAX_PACKET_SIZE); - } - if (data.length > 0) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getPrefix() + ":" + Thread.currentThread().getName() + "Message size is: " + data.length); - boolean sent = sendBlock(data); - if (!sent) { - if (_log.shouldLog(Log.WARN)) - _log.warn(getPrefix() + ":" + Thread.currentThread().getName() + "Error sending message to peer. Killing socket runner"); - errorOccurred(); - return false; - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getPrefix() + ":" + Thread.currentThread().getName() + "Message sent to peer"); - } - } - } - return true; - } - - @Override - public void run() { - byte[] buffer = new byte[MAX_PACKET_SIZE]; - ByteCollector bc = new ByteCollector(); - boolean keepHandling = true; - int packetsHandled = 0; - try { - // try { - while (keepHandling) { - keepHandling = handleNextPacket(bc, buffer); - packetsHandled++; - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getPrefix() + ":" + Thread.currentThread().getName() - + "Packets handled: " + packetsHandled); - } - if (_log.shouldLog(Log.INFO)) - _log.info(getPrefix() + ":" + Thread.currentThread().getName() - + "After handling packets, we're done. Packets handled: " + packetsHandled); - - if ((bc.getCurrentSize() > 0) && (packetsHandled > 1)) { - if (_log.shouldLog(Log.WARN)) - _log.warn(getPrefix() + "We lost some data queued up due to a network send error (input stream: " - + in.hashCode() + "; " - + "queue size: " + bc.getCurrentSize() + ")"); - } - synchronized (flagLock) { - closed2 = true; - } - boolean sc; - synchronized (flagLock) { - sc = sendClose; - } // FIXME: Race here? - if (sc) { - if (_log.shouldLog(Log.INFO)) - _log.info(getPrefix() + ":" + Thread.currentThread().getName() - + "Sending close packet: (we started? " + outgoing - + ") after reading " + _bytesRead + " and writing " + _bytesWritten); - byte[] packet = I2PSocketManagerImpl.makePacket(getMask(0x02), remoteID, new byte[0]); - boolean sent = manager.getSession().sendMessage(remote, packet); - if (!sent) { - if (_log.shouldLog(Log.WARN)) - _log.warn(getPrefix() + ":" + Thread.currentThread().getName() - + "Error sending close packet to peer"); - errorOccurred(); - } - } - manager.removeSocket(I2PSocketImpl.this); - internalClose(); - } catch (InterruptedIOException ex) { - _log.error(getPrefix() + "BUG! read() operations should not timeout!", ex); - } catch (IOException ex) { - // WHOEVER removes this event on inconsistent - // state before fixing the inconsistent state (a - // reference on the socket in the socket manager - // etc.) will get hanged by me personally -- mihi - _log.error(getPrefix() + "Error running - **INCONSISTENT STATE!!!**", ex); - } catch (I2PException ex) { - _log.error(getPrefix() + "Error running - **INCONSISTENT STATE!!!**", ex); - } - } - - private boolean sendBlock(byte data[]) throws I2PSessionException { - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getPrefix() + "TIMING: Block to send for " + I2PSocketImpl.this.hashCode()); - if (remoteID == null) { - _log.error(getPrefix() + "NULL REMOTEID"); - return false; - } - byte[] packet = I2PSocketManagerImpl.makePacket(getMask(0x00), remoteID, data); - boolean sent; - synchronized (flagLock) { - if (closed2) return false; - } - sent = manager.getSession().sendMessage(remote, packet); - return sent; - } - } - - @Override - public String toString() { return "" + hashCode(); } -} diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerImpl.java deleted file mode 100644 index e0a5b022f..000000000 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerImpl.java +++ /dev/null @@ -1,817 +0,0 @@ -/* - * licensed under BSD license... - * (if you know the proper clause for that, add it ...) - */ -package net.i2p.client.streaming; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.io.UnsupportedEncodingException; -import java.net.ConnectException; -import java.net.NoRouteToHostException; -import java.net.ServerSocket; -import java.net.Socket; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Properties; -import java.util.Set; - -import net.i2p.I2PAppContext; -import net.i2p.I2PException; -import net.i2p.client.I2PSession; -import net.i2p.client.I2PSessionException; -import net.i2p.client.I2PSessionListener; -import net.i2p.data.Base64; -import net.i2p.data.DataFormatException; -import net.i2p.data.Destination; -import net.i2p.util.Log; - - -/** - * Centralize the coordination and multiplexing of the local client's streaming. - * There should be one I2PSocketManager for each I2PSession, and if an application - * is sending and receiving data through the streaming library using an - * I2PSocketManager, it should not attempt to call I2PSession's setSessionListener - * or receive any messages with its .receiveMessage - * - * @deprecated use I2PSocketManagerFull - */ -class I2PSocketManagerImpl implements I2PSocketManager, I2PSessionListener { - private I2PAppContext _context; - private Log _log; - private /* FIXME final FIXME */ I2PSession _session; - private I2PServerSocketImpl _serverSocket = null; - private final Object lock = new Object(); // for locking socket lists - private HashMap _outSockets; - private HashMap _inSockets; - private I2PSocketOptions _defaultOptions; - private long _acceptTimeout; - private String _name; - private final List _listeners = new ArrayList(1);; - private static int __managerId = 0; - - public static final short ACK = 0x51; - public static final short CLOSE_OUT = 0x52; - public static final short DATA_OUT = 0x50; - public static final short SYN = 0xA1; - public static final short CLOSE_IN = 0xA2; - public static final short DATA_IN = 0xA0; - public static final short CHAFF = 0xFF; - - /** - * How long to wait for the client app to accept() before sending back CLOSE? - * This includes the time waiting in the queue. Currently set to 5 seconds. - */ - private static final long ACCEPT_TIMEOUT_DEFAULT = 5*1000; - - public I2PSocketManagerImpl() { - this("SocketManager " + (++__managerId)); - } - public I2PSocketManagerImpl(String name) { - init(I2PAppContext.getGlobalContext(), null, null, name); - } - - public void init(I2PAppContext context, I2PSession session, Properties opts, String name) { - _name = name; - _context = context; - _log = _context.logManager().getLog(I2PSocketManager.class); - _inSockets = new HashMap(16); - _outSockets = new HashMap(16); - _acceptTimeout = ACCEPT_TIMEOUT_DEFAULT; - // _listeners = new ArrayList(1); - setSession(session); - setDefaultOptions(buildOptions(opts)); - _context.statManager().createRateStat("streaming.lifetime", "How long before the socket is closed?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 }); - _context.statManager().createRateStat("streaming.sent", "How many bytes are sent in the stream?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 }); - _context.statManager().createRateStat("streaming.received", "How many bytes are received in the stream?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 }); - _context.statManager().createRateStat("streaming.transferBalance", "How many streams send more than they receive (positive means more sent, negative means more received)?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 }); - _context.statManager().createRateStat("streaming.synNoAck", "How many times have we sent a SYN but not received an ACK?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 }); - _context.statManager().createRateStat("streaming.ackSendFailed", "How many times have we tried to send an ACK to a SYN and failed?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 }); - _context.statManager().createRateStat("streaming.nackSent", "How many times have we refused a SYN with a NACK?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 }); - _context.statManager().createRateStat("streaming.nackReceived", "How many times have we received a NACK to our SYN?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 }); - } - - public I2PSession getSession() { - return _session; - } - - public void setSession(I2PSession session) { - _session = session; - if (session != null) session.setSessionListener(this); - } - - /** - * How long should we wait for the client to .accept() a socket before - * sending back a NACK/Close? - * - * @param ms milliseconds to wait, maximum - */ - public void setAcceptTimeout(long ms) { _acceptTimeout = ms; } - public long getAcceptTimeout() { return _acceptTimeout; } - - public void disconnected(I2PSession session) { - _log.info(getName() + ": Disconnected from the session"); - destroySocketManager(); - List listeners = null; - synchronized (_listeners) { - listeners = new ArrayList(_listeners); - _listeners.clear(); - } - for (int i = 0; i < listeners.size(); i++) { - I2PSocketManager.DisconnectListener lsnr = (I2PSocketManager.DisconnectListener)listeners.get(i); - lsnr.sessionDisconnected(); - } - } - - public void errorOccurred(I2PSession session, String message, Throwable error) { - _log.error(getName() + ": Error occurred: [" + message + "]", error); - } - - public void messageAvailable(I2PSession session, int msgId, long size) { - try { - I2PSocketImpl s; - byte msg[] = session.receiveMessage(msgId); - if (msg.length == 1 && msg[0] == -1) { - _log.debug(getName() + ": Ping received"); - return; - } - if (msg.length < 4) { - _log.warn(getName() + ": ==== packet too short ===="); - return; - } - int type = msg[0] & 0xff; - String id = toString(new byte[] { msg[1], msg[2], msg[3]}); - byte[] payload = new byte[msg.length - 4]; - System.arraycopy(msg, 4, payload, 0, payload.length); - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getName() + ": Message read: type = [" + Integer.toHexString(type) - + "] id = [" + getReadableForm(id) - + "] payload length: [" + payload.length + "]"); - switch (type) { - case ACK: - ackAvailable(id, payload); - return; - case CLOSE_OUT: - disconnectAvailable(id, payload); - return; - case DATA_OUT: - sendOutgoingAvailable(id, payload); - return; - case SYN: - synIncomingAvailable(id, payload, session); - return; - case CLOSE_IN: - disconnectIncoming(id, payload); - return; - case DATA_IN: - sendIncoming(id, payload); - return; - case CHAFF: - // ignore - return; - default: - handleUnknown(type, id, payload); - return; - } - } catch (I2PException ise) { - _log.warn(getName() + ": Error processing", ise); - } catch (IllegalStateException ise) { - _log.debug(getName() + ": Error processing", ise); - } - } - - /** - * We've received an ACK packet (hopefully, in response to a SYN that we - * recently sent out). Notify the associated I2PSocket that we now have - * the remote stream ID (which should get things going, since the handshake - * is complete). - * - */ - private void ackAvailable(String id, byte payload[]) { - long begin = _context.clock().now(); - I2PSocketImpl s = null; - synchronized (lock) { - s = (I2PSocketImpl) _outSockets.get(id); - } - - if (s == null) { - _log.warn(getName() + ": No socket responsible for ACK packet for id " + getReadableForm(id)); - return; - } - - long socketRetrieved = _context.clock().now(); - - String remoteId = null; - remoteId = s.getRemoteID(false); - - if ( (payload.length == 3) && (remoteId == null) ) { - String newID = toString(payload); - long beforeSetRemId = _context.clock().now(); - s.setRemoteID(newID); - if (_log.shouldLog(Log.DEBUG)) { - _log.debug(getName() + ": ackAvailable - socket retrieval took " - + (socketRetrieved-begin) + "ms, getRemoteId took " - + (beforeSetRemId-socketRetrieved) + "ms, setRemoteId took " - + (_context.clock().now()-beforeSetRemId) + "ms"); - } - return; - } else { - // (payload.length != 3 || getRemoteId != null) - if (_log.shouldLog(Log.WARN)) { - if (payload.length != 3) - _log.warn(getName() + ": Ack packet had " + payload.length + " bytes"); - else - _log.warn(getName() + ": Remote ID already exists? " + remoteId); - } - if (_log.shouldLog(Log.DEBUG)) { - _log.debug(getName() + ": invalid ack - socket retrieval took " - + (socketRetrieved-begin) + "ms, overall took " - + (_context.clock().now()-begin) + "ms"); - } - return; - } - } - - /** - * We received a disconnect packet, telling us to tear down the specified - * stream. - */ - private void disconnectAvailable(String id, byte payload[]) { - I2PSocketImpl s = null; - synchronized (lock) { - s = (I2PSocketImpl) _outSockets.get(id); - } - - _log.debug(getName() + ": *Disconnect outgoing for socket " + s + " on id " - + getReadableForm(id)); - try { - if (s != null) { - if (payload.length > 0) { - _log.debug(getName() + ": Disconnect packet had " - + payload.length + " bytes"); - } - if (s.getRemoteID(false) == null) { - s.setRemoteID(null); // Just to wake up socket - return; - } - s.internalClose(); - synchronized (lock) { - _outSockets.remove(id); - } - } - return; - } catch (Exception t) { - _log.warn(getName() + ": Ignoring error on disconnect for socket " + s, t); - } - } - - /** - * We've received data on a stream we created - toss the data onto - * the socket for handling. - * - * @throws IllegalStateException if the socket isn't open or isn't known - */ - private void sendOutgoingAvailable(String id, byte payload[]) throws IllegalStateException { - I2PSocketImpl s = null; - synchronized (lock) { - s = (I2PSocketImpl) _outSockets.get(id); - } - - // packet send outgoing - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getName() + ": *Packet send outgoing [" + payload.length + "] for socket " - + s + " on id " + getReadableForm(id)); - if (s != null) { - s.queueData(payload); - return; - } else { - if (_log.shouldLog(Log.WARN)) - _log.warn(getName() + ": Null socket with data available"); - throw new IllegalStateException("Null socket with data available"); - } - } - - /** - * We've received a SYN packet (a request for a new stream). If the client has - * said they want incoming sockets (by retrieving the serverSocket), the stream - * will be ACKed, but if they have not, they'll be NACKed) - * - * @throws DataFormatException if the destination in the SYN was invalid - * @throws I2PSessionException if there was an I2P error sending the ACK or NACK - */ - private void synIncomingAvailable(String id, byte payload[], I2PSession session) - throws DataFormatException, I2PSessionException { - Destination d = new Destination(); - d.fromByteArray(payload); - - I2PSocketImpl s = null; - boolean acceptConnections = (_serverSocket != null); - String newLocalID = null; - synchronized (lock) { - newLocalID = makeID(_inSockets); - if (acceptConnections) { - s = new I2PSocketImpl(d, this, false, newLocalID); - s.setRemoteID(id); - } - } - _log.debug(getName() + ": *Syn! for socket " + s + " on id " + getReadableForm(newLocalID) - + " from " + d.calculateHash().toBase64().substring(0,6)); - - if (!acceptConnections) { - // The app did not instantiate an I2PServerSocket - byte[] packet = makePacket((byte) CLOSE_OUT, id, toBytes(newLocalID)); - boolean replySentOk = false; - synchronized (_session) { - replySentOk = _session.sendMessage(d, packet); - } - if (!replySentOk) { - _log.warn(getName() + ": Error sending close to " + d.calculateHash().toBase64() - + " in response to a new con message", - new Exception("Failed creation")); - } - _context.statManager().addRateData("streaming.nackSent", 1, 1); - return; - } - - if (_serverSocket.addWaitForAccept(s, _acceptTimeout)) { - _inSockets.put(newLocalID, s); - byte[] packet = makePacket((byte) ACK, id, toBytes(newLocalID)); - boolean replySentOk = false; - replySentOk = _session.sendMessage(d, packet); - if (!replySentOk) { - if (_log.shouldLog(Log.WARN)) - _log.warn(getName() + ": Error sending reply to " + d.calculateHash().toBase64() - + " in response to a new con message for socket " + s, - new Exception("Failed creation")); - s.internalClose(); - _context.statManager().addRateData("streaming.ackSendFailed", 1, 1); - } - } else { - // timed out or serverSocket closed - byte[] packet = toBytes(" " + id); - packet[0] = CLOSE_OUT; - boolean nackSent = session.sendMessage(d, packet); - if (!nackSent) { - _log.warn(getName() + ": Error sending NACK for session creation for socket " + s); - } - s.internalClose(); - _context.statManager().addRateData("streaming,nackSent", 1, 1); - } - return; - } - - /** - * We've received a disconnect for a socket we didn't initiate, so kill - * the socket. - * - */ - private void disconnectIncoming(String id, byte payload[]) { - I2PSocketImpl s = null; - synchronized (lock) { - s = (I2PSocketImpl) _inSockets.get(id); - if (payload.length == 0 && s != null) { - _inSockets.remove(id); - } - } - - _log.debug(getName() + ": *Disconnect incoming for socket " + s); - - try { - if (payload.length == 0 && s != null) { - s.internalClose(); - return; - } else { - if ( (payload.length > 0) && (_log.shouldLog(Log.ERROR)) ) - _log.warn(getName() + ": Disconnect packet had " + payload.length + " bytes"); - if (s != null) - s.internalClose(); - return; - } - } catch (Exception t) { - _log.warn(getName() + ": Ignoring error on disconnect", t); - return; - } - } - - /** - * We've received data on a stream we received - toss the data onto - * the socket for handling. - * - * @throws IllegalStateException if the socket isn't open or isn't known - */ - private void sendIncoming(String id, byte payload[]) throws IllegalStateException { - I2PSocketImpl s = null; - synchronized (lock) { - s = (I2PSocketImpl) _inSockets.get(id); - } - - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getName() + ": *Packet send incoming [" + payload.length + "] for socket " + s); - - if (s != null) { - s.queueData(payload); - return; - } else { - _log.info(getName() + ": Null socket with data available"); - throw new IllegalStateException("Null socket with data available"); - } - } - - /** - * Unknown packet. moo. - * - */ - private void handleUnknown(int type, String id, byte payload[]) { - _log.error(getName() + ": \n\n=============== Unknown packet! " + "============" - + "\nType: " + type - + "\nID: " + getReadableForm(id) - + "\nBase64'ed Data: " + Base64.encode(payload) - + "\n\n\n"); - if (id != null) { - synchronized (lock) { - _inSockets.remove(id); - _outSockets.remove(id); - } - } - } - - public void reportAbuse(I2PSession session, int severity) { - _log.error(getName() + ": Abuse reported [" + severity + "]"); - } - - public void setDefaultOptions(I2PSocketOptions options) { - _defaultOptions = options; - } - - public I2PSocketOptions getDefaultOptions() { - return _defaultOptions; - } - - public I2PSocketOptions buildOptions() { return buildOptions(null); } - public I2PSocketOptions buildOptions(Properties opts) { - return new I2PSocketOptionsImpl(opts); - } - - public I2PServerSocket getServerSocket() { - if (_serverSocket == null) { - _serverSocket = new I2PServerSocketImpl(this); - } - return _serverSocket; - } - - /** - * @throws UnsupportedOperationException - * @since 0.8.4 - */ - public ServerSocket getStandardServerSocket() { - throw new UnsupportedOperationException(); - } - - /** - * Create a new connected socket (block until the socket is created) - * - * @param peer Destination to connect to - * @param options I2P socket options to be used for connecting - * - * @throws ConnectException if the peer refuses the connection - * @throws NoRouteToHostException if the peer is not found or not reachable - * @throws InterruptedIOException if the connection timeouts - * @throws I2PException if there is some other I2P-related problem - */ - public I2PSocket connect(Destination peer, I2PSocketOptions options) - throws I2PException, ConnectException, - NoRouteToHostException, InterruptedIOException { - String localID, lcID; - I2PSocketImpl s; - synchronized (lock) { - localID = makeID(_outSockets); - lcID = getReadableForm(localID); - s = new I2PSocketImpl(peer, this, true, localID); - _outSockets.put(localID, s); - } - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getName() + ": connect(" + peer.calculateHash().toBase64().substring(0,6) - + ", ...): localID = " + lcID); - - try { - ByteArrayOutputStream pubkey = new ByteArrayOutputStream(); - _session.getMyDestination().writeBytes(pubkey); - String remoteID; - byte[] packet = makePacket((byte) SYN, localID, pubkey.toByteArray()); - boolean sent = false; - sent = _session.sendMessage(peer, packet); - if (!sent) { - _log.info(getName() + ": Unable to send & receive ack for SYN packet for socket " - + s + " with localID = " + lcID); - synchronized (lock) { - _outSockets.remove(s.getLocalID()); - } - _context.statManager().addRateData("streaming.synNoAck", 1, 1); - throw new I2PException("Error sending through I2P network"); - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getName() + ": syn sent ok to " - + peer.calculateHash().toBase64().substring(0,6) - + " with localID = " + lcID); - } - if (options != null) - remoteID = s.getRemoteID(true, options.getConnectTimeout()); - else - remoteID = s.getRemoteID(true, getDefaultOptions().getConnectTimeout()); - - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getName() + ": remoteID received from " - + peer.calculateHash().toBase64().substring(0,6) - + ": " + getReadableForm(remoteID) - + " with localID = " + lcID); - - if (remoteID == null) { - _context.statManager().addRateData("streaming.nackReceived", 1, 1); - throw new ConnectException("Connection refused by peer for socket " + s); - } - if ("".equals(remoteID)) { - _context.statManager().addRateData("streaming.synNoAck", 1, 1); - throw new NoRouteToHostException("Unable to reach peer for socket " + s); - } - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getName() + ": TIMING: s given out for remoteID " - + getReadableForm(remoteID) + " for socket " + s); - - return s; - } catch (InterruptedIOException ioe) { - if (_log.shouldLog(Log.WARN)) - _log.warn(getName() + ": Timeout waiting for ack from syn for id " - + lcID + " to " + peer.calculateHash().toBase64().substring(0,6) - + " for socket " + s, ioe); - synchronized (lock) { - _outSockets.remove(s.getLocalID()); - } - s.internalClose(); - _context.statManager().addRateData("streaming.synNoAck", 1, 1); - throw new InterruptedIOException("Timeout waiting for ack"); - } catch (ConnectException ex) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getName() + ": Connection error waiting for ack from syn for id " - + lcID + " to " + peer.calculateHash().toBase64().substring(0,6) - + " for socket " + s, ex); - s.internalClose(); - throw ex; - } catch (NoRouteToHostException ex) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getName() + ": No route to host waiting for ack from syn for id " - + lcID + " to " + peer.calculateHash().toBase64().substring(0,6) - + " for socket " + s, ex); - s.internalClose(); - throw ex; - } catch (IOException ex) { - if (_log.shouldLog(Log.WARN)) - _log.warn(getName() + ": Error sending syn on id " - + lcID + " to " + peer.calculateHash().toBase64().substring(0,6) - + " for socket " + s, ex); - synchronized (lock) { - _outSockets.remove(s.getLocalID()); - } - s.internalClose(); - throw new I2PException("Unhandled IOException occurred"); - } catch (I2PException ex) { - if (_log.shouldLog(Log.INFO)) - _log.info(getName() + ": Error sending syn on id " - + lcID + " to " + peer.calculateHash().toBase64().substring(0,6) - + " for socket " + s, ex); - synchronized (lock) { - _outSockets.remove(s.getLocalID()); - } - s.internalClose(); - throw ex; - } catch (Exception e) { - s.internalClose(); - _log.warn(getName() + ": Unhandled error connecting on " - + lcID + " to " + peer.calculateHash().toBase64().substring(0,6) - + " for socket " + s, e); - throw new ConnectException("Unhandled error connecting: " + e.getMessage()); - } - } - - /** - * Create a new connected socket (block until the socket is created) - * - * @param peer Destination to connect to - * - * @throws ConnectException if the peer refuses the connection - * @throws NoRouteToHostException if the peer is not found or not reachable - * @throws InterruptedIOException if the connection timeouts - * @throws I2PException if there is some other I2P-related problem - */ - public I2PSocket connect(Destination peer) throws I2PException, ConnectException, - NoRouteToHostException, InterruptedIOException { - return connect(peer, null); - } - - /** - * @throws UnsupportedOperationException - * @since 0.8.4 - */ - public Socket connectToSocket(Destination peer) { - throw new UnsupportedOperationException(); - } - - /** - * @throws UnsupportedOperationException - * @since 0.8.4 - */ - public Socket connectToSocket(Destination peer, int timeout) { - throw new UnsupportedOperationException(); - } - - /** - * Destroy the socket manager, freeing all the associated resources. This - * method will block untill all the managed sockets are closed. - * - */ - public void destroySocketManager() { - if (_serverSocket != null) { - _serverSocket.close(); - _serverSocket = null; - } - - synchronized (lock) { - Iterator iter; - String id = null; - I2PSocketImpl sock; - - iter = _inSockets.keySet().iterator(); - while (iter.hasNext()) { - id = (String)iter.next(); - sock = (I2PSocketImpl)_inSockets.get(id); - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getName() + ": Closing inSocket \"" - + getReadableForm(sock.getLocalID()) + "\""); - sock.internalClose(); - } - - iter = _outSockets.keySet().iterator(); - while (iter.hasNext()) { - id = (String)iter.next(); - sock = (I2PSocketImpl)_outSockets.get(id); - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getName() + ": Closing outSocket \"" - + getReadableForm(sock.getLocalID()) + "\""); - sock.internalClose(); - } - } - - _log.debug(getName() + ": Waiting for all open sockets to really close..."); - synchronized (lock) { - while ((_inSockets.size() != 0) || (_outSockets.size() != 0)) { - try { - lock.wait(); - } catch (InterruptedException e) {} - } - } - - try { - _log.debug(getName() + ": Destroying I2P session..."); - _session.destroySession(); - _log.debug(getName() + ": I2P session destroyed"); - } catch (I2PSessionException e) { - _log.warn(getName() + ": Error destroying I2P session", e); - } - } - - /** - * Retrieve a set of currently connected I2PSockets, either initiated locally or remotely. - * - */ - public Set listSockets() { - Set sockets = new HashSet(8); - synchronized (lock) { - sockets.addAll(_inSockets.values()); - sockets.addAll(_outSockets.values()); - } - return sockets; - } - - /** - * Ping the specified peer, returning true if they replied to the ping within - * the timeout specified, false otherwise. This call blocks. - * - * @deprecated timeout is ignored - use I2PSocketManagerFull.ping() - * @param timeoutMs ignored - */ - public boolean ping(Destination peer, long timeoutMs) { - try { - return _session.sendMessage(peer, new byte[] { (byte) CHAFF}); - } catch (I2PException ex) { - _log.warn(getName() + ": I2PException:", ex); - return false; - } - } - - public void removeSocket(I2PSocketImpl sock) { - String localId = sock.getLocalID(); - boolean removed = false; - synchronized (lock) { - removed = (null != _inSockets.remove(localId)); - removed = removed || (null != _outSockets.remove(localId)); - lock.notify(); - } - - long now = _context.clock().now(); - long lifetime = now - sock.getCreatedOn(); - long timeSinceClose = now - sock.getClosedOn(); - long sent = sock.getBytesSent(); - long recv = sock.getBytesReceived(); - - if (_log.shouldLog(Log.DEBUG)) { - _log.debug(getName() + ": Removing socket \"" + getReadableForm(localId) + "\" [" + sock - + ", send: " + sent + ", recv: " + recv - + ", lifetime: " + lifetime + "ms, time since close: " + timeSinceClose - + " removed? " + removed + ")]", - new Exception("removeSocket called")); - } - - _context.statManager().addRateData("streaming.lifetime", lifetime, lifetime); - _context.statManager().addRateData("streaming.sent", sent, lifetime); - _context.statManager().addRateData("streaming.received", recv, lifetime); - - if (sent > recv) { - _context.statManager().addRateData("streaming.transferBalance", 1, lifetime); - } else if (recv > sent) { - _context.statManager().addRateData("streaming.transferBalance", -1, lifetime); - } else { - // noop - } - } - - public String getName() { return _name; } - public void setName(String name) { _name = name; } - - public void addDisconnectListener(I2PSocketManager.DisconnectListener lsnr) { - synchronized (_listeners) { - _listeners.add(lsnr); - } - } - public void removeDisconnectListener(I2PSocketManager.DisconnectListener lsnr) { - synchronized (_listeners) { - _listeners.remove(lsnr); - } - } - - public static String getReadableForm(String id) { - if (id == null) return "(null)"; - if (id.length() != 3) return "Bogus"; - return Base64.encode(toBytes(id)); - } - - /** - * Create a new part the connection ID that is locally unique - * - * @param uniqueIn map of already known local IDs so we don't collide. WARNING - NOT THREADSAFE! - */ - private static String makeID(HashMap uniqueIn) { - String newID; - do { - int id = (int) (Math.random() * 16777215 + 1); - byte[] nid = new byte[3]; - nid[0] = (byte) (id / 65536); - nid[1] = (byte) ((id / 256) % 256); - nid[2] = (byte) (id % 256); - newID = toString(nid); - } while (uniqueIn.get(newID) != null); - return newID; - } - - /** - * Create a new packet of the given type for the specified connection containing - * the given payload - */ - public static byte[] makePacket(byte type, String id, byte[] payload) { - byte[] packet = new byte[payload.length + 4]; - packet[0] = type; - byte[] temp = toBytes(id); - if (temp.length != 3) throw new RuntimeException("Incorrect ID length: " + temp.length); - System.arraycopy(temp, 0, packet, 1, 3); - System.arraycopy(payload, 0, packet, 4, payload.length); - return packet; - } - - private static final String toString(byte data[]) { - try { - return new String(data, "ISO-8859-1"); - } catch (UnsupportedEncodingException uee) { - throw new RuntimeException("WTF! iso-8859-1 isn't supported?"); - } - } - - private static final byte[] toBytes(String str) { - try { - return str.getBytes("ISO-8859-1"); - } catch (UnsupportedEncodingException uee) { - throw new RuntimeException("WTF! iso-8859-1 isn't supported?"); - } - } -}