forked from I2P_Developers/i2p.i2p
* Ministreaming: Drop old classes replaced by streaming
years ago.
This commit is contained in:
@@ -1,298 +0,0 @@
|
|||||||
package net.i2p.client.streaming;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Like a StringBuffer, but for bytes. This class is not internally synchronized,
|
|
||||||
* so care should be taken when using in a multithreaded environment.
|
|
||||||
*
|
|
||||||
* @deprecated Only used by deprecated I2PSocketImpl
|
|
||||||
*/
|
|
||||||
class ByteCollector {
|
|
||||||
byte[] contents;
|
|
||||||
int size;
|
|
||||||
|
|
||||||
private static final int INITIAL_CAPACITY = 1024;
|
|
||||||
private static final int SHORT_CAPACITY = 80;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* New collector with the default initial capacity
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
public ByteCollector() {
|
|
||||||
this(INITIAL_CAPACITY);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* New collector with an initial capacity as specified
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
public ByteCollector(int capacity) {
|
|
||||||
contents = new byte[capacity];
|
|
||||||
size = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* New collector containing the specified bytes
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
public ByteCollector(byte[] b) {
|
|
||||||
this();
|
|
||||||
append(b);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* New collector with the specified byte
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
public ByteCollector(byte b) {
|
|
||||||
this();
|
|
||||||
append(b);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Add a new byte to the collector (extending the buffer if necessary)
|
|
||||||
*
|
|
||||||
* @param b byte to add
|
|
||||||
* @return this object
|
|
||||||
*/
|
|
||||||
public ByteCollector append(byte b) {
|
|
||||||
ensureCapacity(size + 1);
|
|
||||||
contents[size++] = b;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Add new bytes to the collector (extending the buffer if necessary)
|
|
||||||
*
|
|
||||||
* @param b bytes to add
|
|
||||||
* @return this object
|
|
||||||
*/
|
|
||||||
public ByteCollector append(byte[] b) {
|
|
||||||
ensureCapacity(size + b.length);
|
|
||||||
System.arraycopy(b, 0, contents, size, b.length);
|
|
||||||
size += b.length;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Add new bytes to the collector (extending the buffer if necessary)
|
|
||||||
*
|
|
||||||
* @param b byte array to add from
|
|
||||||
* @param len number of bytes in the array to add
|
|
||||||
* @return this object
|
|
||||||
*/
|
|
||||||
public ByteCollector append(byte[] b, int len) {
|
|
||||||
return append(b, 0, len);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Add new bytes to the collector (extending the buffer if necessary)
|
|
||||||
*
|
|
||||||
* @param b byte array to add from
|
|
||||||
* @param off offset into the array to begin adding from
|
|
||||||
* @param len number of bytes in the array to add
|
|
||||||
* @return this object
|
|
||||||
*/
|
|
||||||
public ByteCollector append(byte[] b, int off, int len) {
|
|
||||||
ensureCapacity(size + len);
|
|
||||||
System.arraycopy(b, off, contents, size, len);
|
|
||||||
size += len;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Add the contents of the byte collector to the current collector (extending the buffer if necessary)
|
|
||||||
*
|
|
||||||
* @param bc collector to copy
|
|
||||||
* @return this object
|
|
||||||
*/
|
|
||||||
public ByteCollector append(ByteCollector bc) {
|
|
||||||
// optimieren?
|
|
||||||
return append(bc.toByteArray());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Copy the contents of the collector into a new array and return it
|
|
||||||
*
|
|
||||||
* @return new array containing a copy of the current collector's data
|
|
||||||
*/
|
|
||||||
public byte[] toByteArray() {
|
|
||||||
byte[] result = new byte[size];
|
|
||||||
System.arraycopy(contents, 0, result, 0, size);
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Pull off the first $maxlen bytes from the collector, shifting the remaining
|
|
||||||
* bytes into the beginning of the collector's array.
|
|
||||||
*
|
|
||||||
* @param maxlen max number of bytes we want to pull from the collector (we will get
|
|
||||||
* less if the collector doesnt have that many bytes yet)
|
|
||||||
* @return copy of the bytes pulled from the collector
|
|
||||||
*/
|
|
||||||
public byte[] startToByteArray(int maxlen) {
|
|
||||||
if (size < maxlen) {
|
|
||||||
byte[] res = toByteArray();
|
|
||||||
clear();
|
|
||||||
return res;
|
|
||||||
} else {
|
|
||||||
byte[] result = new byte[maxlen];
|
|
||||||
System.arraycopy(contents, 0, result, 0, maxlen);
|
|
||||||
System.arraycopy(contents, maxlen, contents, 0, size - maxlen);
|
|
||||||
size -= maxlen;
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* How many bytes are available for retrieval?
|
|
||||||
*
|
|
||||||
* @return number of bytes
|
|
||||||
*/
|
|
||||||
public int getCurrentSize() {
|
|
||||||
return size;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Make sure we have sufficient storage space.
|
|
||||||
*
|
|
||||||
* @param cap minimum number of bytes that the buffer should contain
|
|
||||||
* @return true if the the collector was expanded to meet the minimum,
|
|
||||||
* false if it was already large enough
|
|
||||||
*/
|
|
||||||
public boolean ensureCapacity(int cap) {
|
|
||||||
if (contents.length < cap) {
|
|
||||||
int l = contents.length;
|
|
||||||
while (l < cap) {
|
|
||||||
l = (l * 3) / 2 + 1;
|
|
||||||
}
|
|
||||||
byte[] newcont = new byte[l];
|
|
||||||
System.arraycopy(contents, 0, newcont, 0, size);
|
|
||||||
contents = newcont;
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Does the collector have meaningful data or is it empty?
|
|
||||||
*
|
|
||||||
* @return true if it has no data
|
|
||||||
*/
|
|
||||||
public boolean isEmpty() {
|
|
||||||
return size == 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Search through the collector for the first occurrence of the sequence of
|
|
||||||
* bytes contained within the specified collector
|
|
||||||
*
|
|
||||||
* @param bc bytes that will be searched for
|
|
||||||
* @return index into the current collector, or -1 if it isn't found
|
|
||||||
*/
|
|
||||||
public int indexOf(ByteCollector bc) {
|
|
||||||
// optimieren?
|
|
||||||
return indexOf(bc.toByteArray());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Search through the collector for the first occurrence of the specified
|
|
||||||
* byte
|
|
||||||
*
|
|
||||||
* @param b byte that will be searched for
|
|
||||||
* @return index into the current collector, or -1 if it isn't found
|
|
||||||
*/
|
|
||||||
public int indexOf(byte b) {
|
|
||||||
// optimieren?
|
|
||||||
return indexOf(new byte[] { b});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Search through the collector for the first occurrence of the sequence of
|
|
||||||
* bytes
|
|
||||||
*
|
|
||||||
* @param ba bytes that will be searched for
|
|
||||||
* @return index into the current collector, or -1 if it isn't found
|
|
||||||
*/
|
|
||||||
public int indexOf(byte[] ba) {
|
|
||||||
loop: for (int i = 0; i < size - ba.length + 1; i++) {
|
|
||||||
for (int j = 0; j < ba.length; j++) {
|
|
||||||
if (contents[i + j] != ba[j]) continue loop;
|
|
||||||
}
|
|
||||||
return i;
|
|
||||||
}
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Empty the collector. This does not affect its capacity.
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
public void clear() {
|
|
||||||
size = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Empty the collector and reduce its capacity.
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
public void clearAndShorten() {
|
|
||||||
size = 0;
|
|
||||||
contents = new byte[SHORT_CAPACITY];
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Render the bytes as a string
|
|
||||||
*
|
|
||||||
* @return the, uh, string
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return new String(toByteArray());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int hashCode() {
|
|
||||||
int h = 0;
|
|
||||||
for (int i = 0; i < size; i++) {
|
|
||||||
h += contents[i] * contents[i];
|
|
||||||
}
|
|
||||||
return h;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Compare the collectors.
|
|
||||||
*
|
|
||||||
* @return true if and only if both are the same size and the
|
|
||||||
* byte arrays they contain are equal.
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public boolean equals(Object o) {
|
|
||||||
if (o instanceof ByteCollector) {
|
|
||||||
ByteCollector by = (ByteCollector) o;
|
|
||||||
if (size != by.size) return false;
|
|
||||||
for (int i = 0; i < size; i++) {
|
|
||||||
if (contents[i] != by.contents[i]) return false;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
} else {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Remove the first byte from the collector, shifting its contents accordingly.
|
|
||||||
*
|
|
||||||
* @return byte removed
|
|
||||||
* @throws IllegalArgumentException if the collector is empty
|
|
||||||
*/
|
|
||||||
public byte removeFirst() throws IllegalArgumentException {
|
|
||||||
byte bb = contents[0];
|
|
||||||
if (size == 0) throw new IllegalArgumentException("ByteCollector is empty");
|
|
||||||
if (size > 1)
|
|
||||||
System.arraycopy(contents, 1, contents, 0, --size);
|
|
||||||
else
|
|
||||||
size = 0;
|
|
||||||
return bb;
|
|
||||||
}
|
|
||||||
}
|
|
@@ -1,166 +0,0 @@
|
|||||||
package net.i2p.client.streaming;
|
|
||||||
|
|
||||||
import java.net.ConnectException;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import net.i2p.I2PAppContext;
|
|
||||||
import net.i2p.I2PException;
|
|
||||||
import net.i2p.util.Clock;
|
|
||||||
import net.i2p.util.Log;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Server socket implementation, allowing multiple threads to accept I2PSockets
|
|
||||||
* and pull from a queue populated by various threads (each of whom have their own
|
|
||||||
* timeout)
|
|
||||||
*
|
|
||||||
* @deprecated use I2PServerSocketFull
|
|
||||||
*/
|
|
||||||
class I2PServerSocketImpl implements I2PServerSocket {
|
|
||||||
private final static Log _log = new Log(I2PServerSocketImpl.class);
|
|
||||||
private I2PSocketManager mgr;
|
|
||||||
/** list of sockets waiting for the client to accept them */
|
|
||||||
private final List<I2PSocket> pendingSockets = Collections.synchronizedList(new ArrayList<I2PSocket>(4));
|
|
||||||
|
|
||||||
/** have we been closed */
|
|
||||||
private volatile boolean closing = false;
|
|
||||||
|
|
||||||
/** lock on this when accepting a pending socket, and wait on it for notification of acceptance */
|
|
||||||
private final Object socketAcceptedLock = new Object();
|
|
||||||
/** lock on this when adding a new socket to the pending list, and wait on it accordingly */
|
|
||||||
private final Object socketAddedLock = new Object();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Set Sock Option accept timeout stub, does nothing in ministreaming
|
|
||||||
* @param x
|
|
||||||
*/
|
|
||||||
public void setSoTimeout(long x) {
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get Sock Option accept timeout stub, does nothing in ministreaming
|
|
||||||
* @return timeout
|
|
||||||
*/
|
|
||||||
public long getSoTimeout() {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
public I2PServerSocketImpl(I2PSocketManager mgr) {
|
|
||||||
this.mgr = mgr;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Waits for the next socket connecting. If a remote user tried to make a
|
|
||||||
* connection and the local application wasn't .accept()ing new connections,
|
|
||||||
* they should get refused (if .accept() doesnt occur in some small period -
|
|
||||||
* currently 5 seconds)
|
|
||||||
*
|
|
||||||
* @return a connected I2PSocket
|
|
||||||
*
|
|
||||||
* @throws I2PException if there is a problem with reading a new socket
|
|
||||||
* from the data available (aka the I2PSession closed, etc)
|
|
||||||
* @throws ConnectException if the I2PServerSocket is closed
|
|
||||||
*/
|
|
||||||
public I2PSocket accept() throws I2PException, ConnectException {
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
|
||||||
_log.debug("accept() called, pending: " + pendingSockets.size());
|
|
||||||
|
|
||||||
I2PSocket ret = null;
|
|
||||||
|
|
||||||
while ( (ret == null) && (!closing) ){
|
|
||||||
while (pendingSockets.isEmpty()) {
|
|
||||||
if (closing) throw new ConnectException("I2PServerSocket closed");
|
|
||||||
try {
|
|
||||||
synchronized(socketAddedLock) {
|
|
||||||
socketAddedLock.wait();
|
|
||||||
}
|
|
||||||
} catch (InterruptedException ie) {}
|
|
||||||
}
|
|
||||||
synchronized (pendingSockets) {
|
|
||||||
if (!pendingSockets.isEmpty()) {
|
|
||||||
ret = (I2PSocket)pendingSockets.remove(0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (ret != null) {
|
|
||||||
synchronized (socketAcceptedLock) {
|
|
||||||
socketAcceptedLock.notifyAll();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
|
||||||
_log.debug("TIMING: handed out accept result " + ret.hashCode());
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Make the socket available and wait until the client app accepts it, or until
|
|
||||||
* the given timeout elapses. This doesn't have any limits on the queue size -
|
|
||||||
* perhaps it should add some choking (e.g. after 5 waiting for accept, refuse)
|
|
||||||
*
|
|
||||||
* @param timeoutMs how long to wait until accept
|
|
||||||
* @return true if the socket was accepted, false if the timeout expired
|
|
||||||
* or the socket was closed
|
|
||||||
*/
|
|
||||||
public boolean addWaitForAccept(I2PSocket s, long timeoutMs) {
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
|
||||||
_log.debug("addWaitForAccept [new socket arrived [" + s.toString() + "], pending: " + pendingSockets.size());
|
|
||||||
|
|
||||||
if (closing) {
|
|
||||||
if (_log.shouldLog(Log.WARN))
|
|
||||||
_log.warn("Already closing the socket");
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
Clock clock = I2PAppContext.getGlobalContext().clock();
|
|
||||||
long start = clock.now();
|
|
||||||
long end = start + timeoutMs;
|
|
||||||
pendingSockets.add(s);
|
|
||||||
synchronized (socketAddedLock) {
|
|
||||||
socketAddedLock.notifyAll();
|
|
||||||
}
|
|
||||||
|
|
||||||
// keep looping until the socket has been grabbed by the accept()
|
|
||||||
// (or the expiration passes, or the socket is closed)
|
|
||||||
while (pendingSockets.contains(s)) {
|
|
||||||
long now = clock.now();
|
|
||||||
if (now >= end) {
|
|
||||||
if (_log.shouldLog(Log.INFO))
|
|
||||||
_log.info("Expired while waiting for accept (time elapsed =" + (now - start) + "ms) for socket " + s.toString());
|
|
||||||
pendingSockets.remove(s);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (closing) {
|
|
||||||
if (_log.shouldLog(Log.WARN))
|
|
||||||
_log.warn("Server socket closed while waiting for accept");
|
|
||||||
pendingSockets.remove(s);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
long remaining = end - now;
|
|
||||||
try {
|
|
||||||
synchronized (socketAcceptedLock) {
|
|
||||||
socketAcceptedLock.wait(remaining);
|
|
||||||
}
|
|
||||||
} catch (InterruptedException ie) {}
|
|
||||||
}
|
|
||||||
long now = clock.now();
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
|
||||||
_log.info("Socket accepted after " + (now-start) + "ms for socket " + s.toString());
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void close() {
|
|
||||||
closing = true;
|
|
||||||
// let anyone .accept()ing know to fsck off
|
|
||||||
synchronized (socketAddedLock) {
|
|
||||||
socketAddedLock.notifyAll();
|
|
||||||
}
|
|
||||||
// let anyone addWaitForAccept()ing know to fsck off
|
|
||||||
synchronized (socketAcceptedLock) {
|
|
||||||
socketAcceptedLock.notifyAll();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public I2PSocketManager getManager() { return mgr; }
|
|
||||||
}
|
|
@@ -1,696 +0,0 @@
|
|||||||
package net.i2p.client.streaming;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.io.InterruptedIOException;
|
|
||||||
import java.io.OutputStream;
|
|
||||||
|
|
||||||
import net.i2p.I2PAppContext;
|
|
||||||
import net.i2p.I2PException;
|
|
||||||
import net.i2p.client.I2PSession;
|
|
||||||
import net.i2p.client.I2PSessionException;
|
|
||||||
import net.i2p.data.Destination;
|
|
||||||
import net.i2p.util.Clock;
|
|
||||||
import net.i2p.util.I2PThread;
|
|
||||||
import net.i2p.util.Log;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Initial stub implementation for the socket
|
|
||||||
*
|
|
||||||
* @deprecated use I2PSocketFull
|
|
||||||
*/
|
|
||||||
class I2PSocketImpl implements I2PSocket {
|
|
||||||
private final static Log _log = new Log(I2PSocketImpl.class);
|
|
||||||
|
|
||||||
public static final int MAX_PACKET_SIZE = 1024 * 32;
|
|
||||||
public static final int PACKET_DELAY = 100;
|
|
||||||
|
|
||||||
private I2PSocketManagerImpl manager;
|
|
||||||
private Destination local;
|
|
||||||
private Destination remote;
|
|
||||||
private String localID;
|
|
||||||
private String remoteID;
|
|
||||||
private final Object remoteIDWaiter = new Object();
|
|
||||||
private I2PInputStream in;
|
|
||||||
private I2POutputStream out;
|
|
||||||
private I2PSocket.SocketErrorListener _socketErrorListener;
|
|
||||||
private boolean outgoing;
|
|
||||||
private long _socketId;
|
|
||||||
private static long __socketId = 0;
|
|
||||||
private long _bytesRead = 0;
|
|
||||||
private long _bytesWritten = 0;
|
|
||||||
private long _createdOn;
|
|
||||||
private long _closedOn;
|
|
||||||
private long _remoteIdSetTime;
|
|
||||||
private I2PSocketOptions _options;
|
|
||||||
private final Object flagLock = new Object();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Whether the I2P socket has already been closed.
|
|
||||||
*/
|
|
||||||
private boolean closed = false;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Whether to send out a close packet when the socket is
|
|
||||||
* closed. (If the socket is closed because of an incoming close
|
|
||||||
* packet, we need not send one.)
|
|
||||||
*/
|
|
||||||
private boolean sendClose = true;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Whether the I2P socket has already been closed and all data
|
|
||||||
* (from I2P to the app, dunno whether to call this incoming or
|
|
||||||
* outgoing) has been processed.
|
|
||||||
*/
|
|
||||||
private boolean closed2 = false;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param peer who this socket is (or should be) connected to
|
|
||||||
* @param mgr how we talk to the network
|
|
||||||
* @param outgoing did we initiate the connection (true) or did we receive it (false)?
|
|
||||||
* @param localID what is our half of the socket ID?
|
|
||||||
*/
|
|
||||||
public I2PSocketImpl(Destination peer, I2PSocketManagerImpl mgr, boolean outgoing, String localID) {
|
|
||||||
this.outgoing = outgoing;
|
|
||||||
manager = mgr;
|
|
||||||
remote = peer;
|
|
||||||
_socketId = ++__socketId;
|
|
||||||
local = mgr.getSession().getMyDestination();
|
|
||||||
String us = mgr.getSession().getMyDestination().calculateHash().toBase64().substring(0,4);
|
|
||||||
String name = us + (outgoing ? "->" : "<-") + peer.calculateHash().toBase64().substring(0,4);
|
|
||||||
in = new I2PInputStream(name + " in");
|
|
||||||
I2PInputStream pin = new I2PInputStream(name + " out");
|
|
||||||
out = new I2POutputStream(pin);
|
|
||||||
new I2PSocketRunner(pin);
|
|
||||||
this.localID = localID;
|
|
||||||
_createdOn = I2PAppContext.getGlobalContext().clock().now();
|
|
||||||
_remoteIdSetTime = -1;
|
|
||||||
_closedOn = -1;
|
|
||||||
_options = mgr.getDefaultOptions();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Our half of the socket's unique ID
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
public String getLocalID() {
|
|
||||||
return localID;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* We've received the other side's half of the socket's unique ID
|
|
||||||
*/
|
|
||||||
public void setRemoteID(String id) {
|
|
||||||
synchronized (remoteIDWaiter) {
|
|
||||||
remoteID = id;
|
|
||||||
_remoteIdSetTime = System.currentTimeMillis();
|
|
||||||
remoteIDWaiter.notifyAll();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Retrieve the other side's half of the socket's unique ID, or null if it
|
|
||||||
* isn't known yet
|
|
||||||
*
|
|
||||||
* @param wait if true, we should wait until we receive it from the peer, otherwise
|
|
||||||
* return what we know immediately (which may be null)
|
|
||||||
*/
|
|
||||||
public String getRemoteID(boolean wait) {
|
|
||||||
try {
|
|
||||||
return getRemoteID(wait, -1);
|
|
||||||
} catch (InterruptedIOException iie) {
|
|
||||||
_log.error("wtf, we said we didn't want it to time out! you smell", iie);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Retrieve the other side's half of the socket's unique ID, or null if it isn't
|
|
||||||
* known yet and we were instructed not to wait
|
|
||||||
*
|
|
||||||
* @param wait should we wait for the peer to send us their half of the ID, or
|
|
||||||
* just return immediately?
|
|
||||||
* @param maxWait if we're going to wait, after how long should we timeout and fail?
|
|
||||||
* (if this value is < 0, we wait indefinitely)
|
|
||||||
* @throws InterruptedIOException when the max waiting period has been exceeded
|
|
||||||
*/
|
|
||||||
public String getRemoteID(boolean wait, long maxWait) throws InterruptedIOException {
|
|
||||||
long dieAfter = System.currentTimeMillis() + maxWait;
|
|
||||||
synchronized (remoteIDWaiter) {
|
|
||||||
if (wait) {
|
|
||||||
if (remoteID == null) {
|
|
||||||
try {
|
|
||||||
if (maxWait >= 0)
|
|
||||||
remoteIDWaiter.wait(maxWait);
|
|
||||||
else
|
|
||||||
remoteIDWaiter.wait();
|
|
||||||
} catch (InterruptedException ex) {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
long now = System.currentTimeMillis();
|
|
||||||
if ((maxWait >= 0) && (now >= dieAfter)) {
|
|
||||||
long waitedExcess = now - dieAfter;
|
|
||||||
throw new InterruptedIOException("Timed out waiting for remote ID (waited " + waitedExcess
|
|
||||||
+ "ms too long [" + maxWait + "ms, remId " + remoteID
|
|
||||||
+ ", remId set " + (now-_remoteIdSetTime) + "ms ago])");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
|
||||||
_log.debug("TIMING: RemoteID set to "
|
|
||||||
+ I2PSocketManagerImpl.getReadableForm(remoteID) + " for "
|
|
||||||
+ this.hashCode());
|
|
||||||
}
|
|
||||||
return remoteID;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Retrieve the other side's half of the socket's unique ID, or null if it
|
|
||||||
* isn't known yet. This does not wait
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
public String getRemoteID() {
|
|
||||||
return getRemoteID(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The other side has given us some data, so inject it into our socket's
|
|
||||||
* inputStream
|
|
||||||
*
|
|
||||||
* @param data the data to inject into our local inputStream
|
|
||||||
*/
|
|
||||||
public void queueData(byte[] data) {
|
|
||||||
_bytesRead += data.length;
|
|
||||||
try {
|
|
||||||
in.queueData(data, false);
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
_log.log(Log.CRIT, "wtf, we said DONT block, how can we timeout?", ioe);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Return the Destination of this side of the socket.
|
|
||||||
*/
|
|
||||||
public Destination getThisDestination() {
|
|
||||||
return local;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Return the destination of the peer.
|
|
||||||
*/
|
|
||||||
public Destination getPeerDestination() {
|
|
||||||
return remote;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Return an InputStream to read from the socket.
|
|
||||||
*/
|
|
||||||
public InputStream getInputStream() throws IOException {
|
|
||||||
if ((in == null)) throw new IOException("Not connected");
|
|
||||||
return in;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Return an OutputStream to write into the socket.
|
|
||||||
*/
|
|
||||||
public OutputStream getOutputStream() throws IOException {
|
|
||||||
if ((out == null)) throw new IOException("Not connected");
|
|
||||||
return out;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Closes the socket if not closed yet (from the Application
|
|
||||||
* side).
|
|
||||||
*/
|
|
||||||
public void close() throws IOException {
|
|
||||||
synchronized (flagLock) {
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
|
||||||
_log.debug("Closing connection");
|
|
||||||
closed = true;
|
|
||||||
_closedOn = I2PAppContext.getGlobalContext().clock().now();
|
|
||||||
}
|
|
||||||
out.close();
|
|
||||||
in.notifyClosed();
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isClosed() { return _closedOn > 0; }
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Close the socket from the I2P side (by a close packet)
|
|
||||||
*/
|
|
||||||
protected void internalClose() {
|
|
||||||
synchronized (flagLock) {
|
|
||||||
closed = true;
|
|
||||||
closed2 = true;
|
|
||||||
sendClose = false;
|
|
||||||
_closedOn = I2PAppContext.getGlobalContext().clock().now();
|
|
||||||
}
|
|
||||||
out.close();
|
|
||||||
in.notifyClosed();
|
|
||||||
}
|
|
||||||
|
|
||||||
private byte getMask(int add) {
|
|
||||||
if (outgoing)
|
|
||||||
return (byte)(I2PSocketManagerImpl.DATA_IN + (byte)add);
|
|
||||||
else
|
|
||||||
return (byte)(I2PSocketManagerImpl.DATA_OUT + (byte)add);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setOptions(I2PSocketOptions options) {
|
|
||||||
_options = options;
|
|
||||||
in.setReadTimeout(options.getReadTimeout());
|
|
||||||
}
|
|
||||||
|
|
||||||
public I2PSocketOptions getOptions() {
|
|
||||||
return _options;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* How long we will wait blocked on a read() operation. This is simply a
|
|
||||||
* helper to query the I2PSocketOptions
|
|
||||||
*
|
|
||||||
* @return milliseconds to wait, or -1 if we will wait indefinitely
|
|
||||||
*/
|
|
||||||
public long getReadTimeout() {
|
|
||||||
return _options.getReadTimeout();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Define how long we will wait blocked on a read() operation (-1 will make
|
|
||||||
* the socket wait forever). This is simply a helper to adjust the
|
|
||||||
* I2PSocketOptions
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
public void setReadTimeout(long ms) {
|
|
||||||
_options.setReadTimeout(ms);
|
|
||||||
in.setReadTimeout(ms);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setSocketErrorListener(I2PSocket.SocketErrorListener lsnr) {
|
|
||||||
_socketErrorListener = lsnr;
|
|
||||||
}
|
|
||||||
|
|
||||||
void errorOccurred() {
|
|
||||||
if (_socketErrorListener != null)
|
|
||||||
_socketErrorListener.errorOccurred();
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getBytesSent() { return _bytesWritten; }
|
|
||||||
public long getBytesReceived() { return _bytesRead; }
|
|
||||||
public long getCreatedOn() { return _createdOn; }
|
|
||||||
public long getClosedOn() { return _closedOn; }
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The remote port.
|
|
||||||
* @return 0 always
|
|
||||||
* @since 0.8.9
|
|
||||||
*/
|
|
||||||
public int getPort() {
|
|
||||||
return I2PSession.PORT_UNSPECIFIED;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The local port.
|
|
||||||
* @return 0 always
|
|
||||||
* @since 0.8.9
|
|
||||||
*/
|
|
||||||
public int getLocalPort() {
|
|
||||||
return I2PSession.PORT_UNSPECIFIED;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
private String getPrefix() { return "[" + _socketId + "]: "; }
|
|
||||||
|
|
||||||
//--------------------------------------------------
|
|
||||||
private class I2PInputStream extends InputStream {
|
|
||||||
private String streamName;
|
|
||||||
private final ByteCollector bc = new ByteCollector();
|
|
||||||
private boolean inStreamClosed = false;
|
|
||||||
|
|
||||||
private long readTimeout = -1;
|
|
||||||
|
|
||||||
public I2PInputStream(String name) {
|
|
||||||
streamName = name;
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getReadTimeout() {
|
|
||||||
return readTimeout;
|
|
||||||
}
|
|
||||||
|
|
||||||
private String getStreamPrefix() {
|
|
||||||
return getPrefix() + streamName + ": ";
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setReadTimeout(long ms) {
|
|
||||||
readTimeout = ms;
|
|
||||||
}
|
|
||||||
|
|
||||||
public int read() throws IOException {
|
|
||||||
byte[] b = new byte[1];
|
|
||||||
int res = read(b);
|
|
||||||
if (res == 1) return b[0] & 0xff;
|
|
||||||
if (res == -1) return -1;
|
|
||||||
throw new RuntimeException("Incorrect read() result");
|
|
||||||
}
|
|
||||||
|
|
||||||
// I have to ask if this method is really needed, since the JDK has this already,
|
|
||||||
// including the timeouts. Perhaps the need is for debugging more than anything
|
|
||||||
// else?
|
|
||||||
@Override
|
|
||||||
public int read(byte[] b, int off, int len) throws IOException {
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
|
||||||
_log.debug(getStreamPrefix() + "Read called for " + len + " bytes (avail="
|
|
||||||
+ bc.getCurrentSize() + "): " + this.hashCode());
|
|
||||||
if (len == 0) return 0;
|
|
||||||
long dieAfter = System.currentTimeMillis() + readTimeout;
|
|
||||||
byte[] read = null;
|
|
||||||
synchronized (bc) {
|
|
||||||
read = bc.startToByteArray(len);
|
|
||||||
bc.notifyAll();
|
|
||||||
}
|
|
||||||
boolean timedOut = false;
|
|
||||||
|
|
||||||
while ( (read.length == 0) && (!inStreamClosed) ) {
|
|
||||||
synchronized (flagLock) {
|
|
||||||
if (closed) {
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
|
||||||
_log.debug(getStreamPrefix() + "Closed is set after reading "
|
|
||||||
+ _bytesRead + " and writing " + _bytesWritten
|
|
||||||
+ ", so closing stream: " + hashCode());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
synchronized (I2PSocketImpl.I2PInputStream.this) {
|
|
||||||
if (readTimeout >= 0) {
|
|
||||||
wait(readTimeout);
|
|
||||||
} else {
|
|
||||||
wait();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (InterruptedException ex) {}
|
|
||||||
|
|
||||||
if ((readTimeout >= 0)
|
|
||||||
&& (System.currentTimeMillis() >= dieAfter)) {
|
|
||||||
throw new InterruptedIOException(getStreamPrefix() + "Timeout reading from I2PSocket ("
|
|
||||||
+ readTimeout + " msecs)");
|
|
||||||
}
|
|
||||||
|
|
||||||
synchronized (bc) {
|
|
||||||
read = bc.startToByteArray(len);
|
|
||||||
bc.notifyAll();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (read.length > len) throw new RuntimeException("BUG");
|
|
||||||
if ( (inStreamClosed) && (read.length <= 0) )
|
|
||||||
return -1;
|
|
||||||
|
|
||||||
System.arraycopy(read, 0, b, off, read.length);
|
|
||||||
|
|
||||||
if (_log.shouldLog(Log.DEBUG)) {
|
|
||||||
_log.debug(getStreamPrefix() + "Read from I2PInputStream " + hashCode() + " returned "
|
|
||||||
+ read.length + " bytes");
|
|
||||||
}
|
|
||||||
//if (_log.shouldLog(Log.DEBUG)) {
|
|
||||||
// _log.debug("Read from I2PInputStream " + this.hashCode()
|
|
||||||
// + " returned "+read.length+" bytes:\n"
|
|
||||||
// + HexDump.dump(read));
|
|
||||||
//}
|
|
||||||
return read.length;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return 0 if empty, > 0 if there is data.
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public int available() {
|
|
||||||
synchronized (bc) {
|
|
||||||
return bc.getCurrentSize();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Add the data to the queue
|
|
||||||
*
|
|
||||||
* @param allowBlock if true, we will block if the buffer and the socket options
|
|
||||||
* say so, otherwise we simply take the data regardless.
|
|
||||||
* @throws InterruptedIOException if the queue's buffer is full, the socket has
|
|
||||||
* a write timeout, and that timeout is exceeded
|
|
||||||
* @throws IOException if the connection was closed while queueing up the data
|
|
||||||
*/
|
|
||||||
void queueData(byte[] data, boolean allowBlock) throws InterruptedIOException, IOException {
|
|
||||||
queueData(data, 0, data.length, allowBlock);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Add the data to the queue
|
|
||||||
*
|
|
||||||
* @param allowBlock if true, we will block if the buffer and the socket options
|
|
||||||
* say so, otherwise we simply take the data regardless.
|
|
||||||
* @throws InterruptedIOException if the queue's buffer is full, the socket has
|
|
||||||
* a write timeout, and that timeout is exceeded
|
|
||||||
* @throws IOException if the connection was closed while queueing up the data
|
|
||||||
*/
|
|
||||||
public void queueData(byte[] data, int off, int len, boolean allowBlock) throws InterruptedIOException, IOException {
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
|
||||||
_log.debug(getStreamPrefix() + "Insert " + len + " bytes into queue: " + hashCode());
|
|
||||||
Clock clock = I2PAppContext.getGlobalContext().clock();
|
|
||||||
long endAfter = clock.now() + _options.getWriteTimeout();
|
|
||||||
synchronized (bc) {
|
|
||||||
if (allowBlock) {
|
|
||||||
if (_options.getMaxBufferSize() > 0) {
|
|
||||||
while (bc.getCurrentSize() > _options.getMaxBufferSize()) {
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
|
||||||
_log.debug(getStreamPrefix() + "Buffer size exceeded: pending "
|
|
||||||
+ bc.getCurrentSize() + " limit " + _options.getMaxBufferSize());
|
|
||||||
if (_options.getWriteTimeout() > 0) {
|
|
||||||
long timeLeft = endAfter - clock.now();
|
|
||||||
if (timeLeft <= 0) {
|
|
||||||
long waited = _options.getWriteTimeout() - timeLeft;
|
|
||||||
throw new InterruptedIOException(getStreamPrefix() + "Waited too long ("
|
|
||||||
+ waited + "ms) to write "
|
|
||||||
+ len + " with a buffer at " + bc.getCurrentSize());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (inStreamClosed)
|
|
||||||
throw new IOException(getStreamPrefix() + "Stream closed while writing");
|
|
||||||
if (_closedOn > 0)
|
|
||||||
throw new IOException(getStreamPrefix() + "I2PSocket closed while writing");
|
|
||||||
try {
|
|
||||||
bc.wait(1000);
|
|
||||||
} catch (InterruptedException ie) {}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
bc.append(data, off, len);
|
|
||||||
}
|
|
||||||
synchronized (I2PInputStream.this) {
|
|
||||||
I2PInputStream.this.notifyAll();
|
|
||||||
}
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
|
||||||
_log.debug(getStreamPrefix() + "After insert " + len + " bytes into queue: " + hashCode());
|
|
||||||
}
|
|
||||||
|
|
||||||
public void notifyClosed() {
|
|
||||||
synchronized (I2PInputStream.this) {
|
|
||||||
I2PInputStream.this.notifyAll();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() throws IOException {
|
|
||||||
super.close();
|
|
||||||
notifyClosed();
|
|
||||||
synchronized (bc) {
|
|
||||||
inStreamClosed = true;
|
|
||||||
bc.notifyAll();
|
|
||||||
}
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
|
||||||
_log.debug(getStreamPrefix() + "After close");
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
private class I2POutputStream extends OutputStream {
|
|
||||||
|
|
||||||
public I2PInputStream sendTo;
|
|
||||||
|
|
||||||
public I2POutputStream(I2PInputStream sendTo) {
|
|
||||||
this.sendTo = sendTo;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void write(int b) throws IOException {
|
|
||||||
write(new byte[] { (byte) b});
|
|
||||||
}
|
|
||||||
|
|
||||||
// This override is faster than the built in JDK,
|
|
||||||
// but there are other variations not handled
|
|
||||||
@Override
|
|
||||||
public void write(byte[] b, int off, int len) throws IOException {
|
|
||||||
_bytesWritten += len;
|
|
||||||
sendTo.queueData(b, off, len, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() {
|
|
||||||
sendTo.notifyClosed();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static volatile long __runnerId = 0;
|
|
||||||
private class I2PSocketRunner extends I2PThread {
|
|
||||||
|
|
||||||
public InputStream in;
|
|
||||||
|
|
||||||
public I2PSocketRunner(InputStream in) {
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
|
||||||
_log.debug(getPrefix() + "Runner's input stream is: " + in.hashCode());
|
|
||||||
this.in = in;
|
|
||||||
String peer = I2PSocketImpl.this.remote.calculateHash().toBase64();
|
|
||||||
setName("SocketRunner " + (++__runnerId) + "/" + _socketId + " " + peer.substring(0, 4));
|
|
||||||
start();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Pump some more data
|
|
||||||
*
|
|
||||||
* @return true if we should keep on handling, false otherwise
|
|
||||||
*/
|
|
||||||
private boolean handleNextPacket(ByteCollector bc, byte buffer[])
|
|
||||||
throws IOException, I2PSessionException {
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
|
||||||
_log.debug(getPrefix() + ":" + Thread.currentThread().getName() + "handleNextPacket");
|
|
||||||
int len = in.read(buffer);
|
|
||||||
int bcsize = 0;
|
|
||||||
synchronized (bc) {
|
|
||||||
bcsize = bc.getCurrentSize();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
|
||||||
_log.debug(getPrefix() + ":" + Thread.currentThread().getName() + "handleNextPacket len=" + len + " bcsize=" + bcsize);
|
|
||||||
|
|
||||||
if (len != -1) {
|
|
||||||
synchronized (bc) {
|
|
||||||
bc.append(buffer, len);
|
|
||||||
}
|
|
||||||
} else if (bcsize == 0) {
|
|
||||||
// nothing left in the buffer, and read(..) got EOF (-1).
|
|
||||||
// the bart the
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if ((bcsize < MAX_PACKET_SIZE) && (in.available() == 0)) {
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
|
||||||
_log.debug(getPrefix() + ":" + Thread.currentThread().getName() + "Runner Point d: " + hashCode());
|
|
||||||
|
|
||||||
try {
|
|
||||||
Thread.sleep(PACKET_DELAY);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
_log.warn("wtf", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if ((bcsize >= MAX_PACKET_SIZE) || (in.available() == 0)) {
|
|
||||||
byte data[] = null;
|
|
||||||
synchronized (bc) {
|
|
||||||
data = bc.startToByteArray(MAX_PACKET_SIZE);
|
|
||||||
}
|
|
||||||
if (data.length > 0) {
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
|
||||||
_log.debug(getPrefix() + ":" + Thread.currentThread().getName() + "Message size is: " + data.length);
|
|
||||||
boolean sent = sendBlock(data);
|
|
||||||
if (!sent) {
|
|
||||||
if (_log.shouldLog(Log.WARN))
|
|
||||||
_log.warn(getPrefix() + ":" + Thread.currentThread().getName() + "Error sending message to peer. Killing socket runner");
|
|
||||||
errorOccurred();
|
|
||||||
return false;
|
|
||||||
} else {
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
|
||||||
_log.debug(getPrefix() + ":" + Thread.currentThread().getName() + "Message sent to peer");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
byte[] buffer = new byte[MAX_PACKET_SIZE];
|
|
||||||
ByteCollector bc = new ByteCollector();
|
|
||||||
boolean keepHandling = true;
|
|
||||||
int packetsHandled = 0;
|
|
||||||
try {
|
|
||||||
// try {
|
|
||||||
while (keepHandling) {
|
|
||||||
keepHandling = handleNextPacket(bc, buffer);
|
|
||||||
packetsHandled++;
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
|
||||||
_log.debug(getPrefix() + ":" + Thread.currentThread().getName()
|
|
||||||
+ "Packets handled: " + packetsHandled);
|
|
||||||
}
|
|
||||||
if (_log.shouldLog(Log.INFO))
|
|
||||||
_log.info(getPrefix() + ":" + Thread.currentThread().getName()
|
|
||||||
+ "After handling packets, we're done. Packets handled: " + packetsHandled);
|
|
||||||
|
|
||||||
if ((bc.getCurrentSize() > 0) && (packetsHandled > 1)) {
|
|
||||||
if (_log.shouldLog(Log.WARN))
|
|
||||||
_log.warn(getPrefix() + "We lost some data queued up due to a network send error (input stream: "
|
|
||||||
+ in.hashCode() + "; "
|
|
||||||
+ "queue size: " + bc.getCurrentSize() + ")");
|
|
||||||
}
|
|
||||||
synchronized (flagLock) {
|
|
||||||
closed2 = true;
|
|
||||||
}
|
|
||||||
boolean sc;
|
|
||||||
synchronized (flagLock) {
|
|
||||||
sc = sendClose;
|
|
||||||
} // FIXME: Race here?
|
|
||||||
if (sc) {
|
|
||||||
if (_log.shouldLog(Log.INFO))
|
|
||||||
_log.info(getPrefix() + ":" + Thread.currentThread().getName()
|
|
||||||
+ "Sending close packet: (we started? " + outgoing
|
|
||||||
+ ") after reading " + _bytesRead + " and writing " + _bytesWritten);
|
|
||||||
byte[] packet = I2PSocketManagerImpl.makePacket(getMask(0x02), remoteID, new byte[0]);
|
|
||||||
boolean sent = manager.getSession().sendMessage(remote, packet);
|
|
||||||
if (!sent) {
|
|
||||||
if (_log.shouldLog(Log.WARN))
|
|
||||||
_log.warn(getPrefix() + ":" + Thread.currentThread().getName()
|
|
||||||
+ "Error sending close packet to peer");
|
|
||||||
errorOccurred();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
manager.removeSocket(I2PSocketImpl.this);
|
|
||||||
internalClose();
|
|
||||||
} catch (InterruptedIOException ex) {
|
|
||||||
_log.error(getPrefix() + "BUG! read() operations should not timeout!", ex);
|
|
||||||
} catch (IOException ex) {
|
|
||||||
// WHOEVER removes this event on inconsistent
|
|
||||||
// state before fixing the inconsistent state (a
|
|
||||||
// reference on the socket in the socket manager
|
|
||||||
// etc.) will get hanged by me personally -- mihi
|
|
||||||
_log.error(getPrefix() + "Error running - **INCONSISTENT STATE!!!**", ex);
|
|
||||||
} catch (I2PException ex) {
|
|
||||||
_log.error(getPrefix() + "Error running - **INCONSISTENT STATE!!!**", ex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean sendBlock(byte data[]) throws I2PSessionException {
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
|
||||||
_log.debug(getPrefix() + "TIMING: Block to send for " + I2PSocketImpl.this.hashCode());
|
|
||||||
if (remoteID == null) {
|
|
||||||
_log.error(getPrefix() + "NULL REMOTEID");
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
byte[] packet = I2PSocketManagerImpl.makePacket(getMask(0x00), remoteID, data);
|
|
||||||
boolean sent;
|
|
||||||
synchronized (flagLock) {
|
|
||||||
if (closed2) return false;
|
|
||||||
}
|
|
||||||
sent = manager.getSession().sendMessage(remote, packet);
|
|
||||||
return sent;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() { return "" + hashCode(); }
|
|
||||||
}
|
|
@@ -1,817 +0,0 @@
|
|||||||
/*
|
|
||||||
* licensed under BSD license...
|
|
||||||
* (if you know the proper clause for that, add it ...)
|
|
||||||
*/
|
|
||||||
package net.i2p.client.streaming;
|
|
||||||
|
|
||||||
import java.io.ByteArrayOutputStream;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InterruptedIOException;
|
|
||||||
import java.io.UnsupportedEncodingException;
|
|
||||||
import java.net.ConnectException;
|
|
||||||
import java.net.NoRouteToHostException;
|
|
||||||
import java.net.ServerSocket;
|
|
||||||
import java.net.Socket;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Properties;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
import net.i2p.I2PAppContext;
|
|
||||||
import net.i2p.I2PException;
|
|
||||||
import net.i2p.client.I2PSession;
|
|
||||||
import net.i2p.client.I2PSessionException;
|
|
||||||
import net.i2p.client.I2PSessionListener;
|
|
||||||
import net.i2p.data.Base64;
|
|
||||||
import net.i2p.data.DataFormatException;
|
|
||||||
import net.i2p.data.Destination;
|
|
||||||
import net.i2p.util.Log;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Centralize the coordination and multiplexing of the local client's streaming.
|
|
||||||
* There should be one I2PSocketManager for each I2PSession, and if an application
|
|
||||||
* is sending and receiving data through the streaming library using an
|
|
||||||
* I2PSocketManager, it should not attempt to call I2PSession's setSessionListener
|
|
||||||
* or receive any messages with its .receiveMessage
|
|
||||||
*
|
|
||||||
* @deprecated use I2PSocketManagerFull
|
|
||||||
*/
|
|
||||||
class I2PSocketManagerImpl implements I2PSocketManager, I2PSessionListener {
|
|
||||||
private I2PAppContext _context;
|
|
||||||
private Log _log;
|
|
||||||
private /* FIXME final FIXME */ I2PSession _session;
|
|
||||||
private I2PServerSocketImpl _serverSocket = null;
|
|
||||||
private final Object lock = new Object(); // for locking socket lists
|
|
||||||
private HashMap<String,I2PSocket> _outSockets;
|
|
||||||
private HashMap<String,I2PSocket> _inSockets;
|
|
||||||
private I2PSocketOptions _defaultOptions;
|
|
||||||
private long _acceptTimeout;
|
|
||||||
private String _name;
|
|
||||||
private final List<DisconnectListener> _listeners = new ArrayList<DisconnectListener>(1);;
|
|
||||||
private static int __managerId = 0;
|
|
||||||
|
|
||||||
public static final short ACK = 0x51;
|
|
||||||
public static final short CLOSE_OUT = 0x52;
|
|
||||||
public static final short DATA_OUT = 0x50;
|
|
||||||
public static final short SYN = 0xA1;
|
|
||||||
public static final short CLOSE_IN = 0xA2;
|
|
||||||
public static final short DATA_IN = 0xA0;
|
|
||||||
public static final short CHAFF = 0xFF;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* How long to wait for the client app to accept() before sending back CLOSE?
|
|
||||||
* This includes the time waiting in the queue. Currently set to 5 seconds.
|
|
||||||
*/
|
|
||||||
private static final long ACCEPT_TIMEOUT_DEFAULT = 5*1000;
|
|
||||||
|
|
||||||
public I2PSocketManagerImpl() {
|
|
||||||
this("SocketManager " + (++__managerId));
|
|
||||||
}
|
|
||||||
public I2PSocketManagerImpl(String name) {
|
|
||||||
init(I2PAppContext.getGlobalContext(), null, null, name);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void init(I2PAppContext context, I2PSession session, Properties opts, String name) {
|
|
||||||
_name = name;
|
|
||||||
_context = context;
|
|
||||||
_log = _context.logManager().getLog(I2PSocketManager.class);
|
|
||||||
_inSockets = new HashMap<String,I2PSocket>(16);
|
|
||||||
_outSockets = new HashMap<String,I2PSocket>(16);
|
|
||||||
_acceptTimeout = ACCEPT_TIMEOUT_DEFAULT;
|
|
||||||
// _listeners = new ArrayList<DisconnectListener>(1);
|
|
||||||
setSession(session);
|
|
||||||
setDefaultOptions(buildOptions(opts));
|
|
||||||
_context.statManager().createRateStat("streaming.lifetime", "How long before the socket is closed?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
|
|
||||||
_context.statManager().createRateStat("streaming.sent", "How many bytes are sent in the stream?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
|
|
||||||
_context.statManager().createRateStat("streaming.received", "How many bytes are received in the stream?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
|
|
||||||
_context.statManager().createRateStat("streaming.transferBalance", "How many streams send more than they receive (positive means more sent, negative means more received)?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
|
|
||||||
_context.statManager().createRateStat("streaming.synNoAck", "How many times have we sent a SYN but not received an ACK?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
|
|
||||||
_context.statManager().createRateStat("streaming.ackSendFailed", "How many times have we tried to send an ACK to a SYN and failed?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
|
|
||||||
_context.statManager().createRateStat("streaming.nackSent", "How many times have we refused a SYN with a NACK?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
|
|
||||||
_context.statManager().createRateStat("streaming.nackReceived", "How many times have we received a NACK to our SYN?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
|
|
||||||
}
|
|
||||||
|
|
||||||
public I2PSession getSession() {
|
|
||||||
return _session;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setSession(I2PSession session) {
|
|
||||||
_session = session;
|
|
||||||
if (session != null) session.setSessionListener(this);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* How long should we wait for the client to .accept() a socket before
|
|
||||||
* sending back a NACK/Close?
|
|
||||||
*
|
|
||||||
* @param ms milliseconds to wait, maximum
|
|
||||||
*/
|
|
||||||
public void setAcceptTimeout(long ms) { _acceptTimeout = ms; }
|
|
||||||
public long getAcceptTimeout() { return _acceptTimeout; }
|
|
||||||
|
|
||||||
public void disconnected(I2PSession session) {
|
|
||||||
_log.info(getName() + ": Disconnected from the session");
|
|
||||||
destroySocketManager();
|
|
||||||
List<DisconnectListener> listeners = null;
|
|
||||||
synchronized (_listeners) {
|
|
||||||
listeners = new ArrayList<DisconnectListener>(_listeners);
|
|
||||||
_listeners.clear();
|
|
||||||
}
|
|
||||||
for (int i = 0; i < listeners.size(); i++) {
|
|
||||||
I2PSocketManager.DisconnectListener lsnr = (I2PSocketManager.DisconnectListener)listeners.get(i);
|
|
||||||
lsnr.sessionDisconnected();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void errorOccurred(I2PSession session, String message, Throwable error) {
|
|
||||||
_log.error(getName() + ": Error occurred: [" + message + "]", error);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void messageAvailable(I2PSession session, int msgId, long size) {
|
|
||||||
try {
|
|
||||||
I2PSocketImpl s;
|
|
||||||
byte msg[] = session.receiveMessage(msgId);
|
|
||||||
if (msg.length == 1 && msg[0] == -1) {
|
|
||||||
_log.debug(getName() + ": Ping received");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (msg.length < 4) {
|
|
||||||
_log.warn(getName() + ": ==== packet too short ====");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
int type = msg[0] & 0xff;
|
|
||||||
String id = toString(new byte[] { msg[1], msg[2], msg[3]});
|
|
||||||
byte[] payload = new byte[msg.length - 4];
|
|
||||||
System.arraycopy(msg, 4, payload, 0, payload.length);
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
|
||||||
_log.debug(getName() + ": Message read: type = [" + Integer.toHexString(type)
|
|
||||||
+ "] id = [" + getReadableForm(id)
|
|
||||||
+ "] payload length: [" + payload.length + "]");
|
|
||||||
switch (type) {
|
|
||||||
case ACK:
|
|
||||||
ackAvailable(id, payload);
|
|
||||||
return;
|
|
||||||
case CLOSE_OUT:
|
|
||||||
disconnectAvailable(id, payload);
|
|
||||||
return;
|
|
||||||
case DATA_OUT:
|
|
||||||
sendOutgoingAvailable(id, payload);
|
|
||||||
return;
|
|
||||||
case SYN:
|
|
||||||
synIncomingAvailable(id, payload, session);
|
|
||||||
return;
|
|
||||||
case CLOSE_IN:
|
|
||||||
disconnectIncoming(id, payload);
|
|
||||||
return;
|
|
||||||
case DATA_IN:
|
|
||||||
sendIncoming(id, payload);
|
|
||||||
return;
|
|
||||||
case CHAFF:
|
|
||||||
// ignore
|
|
||||||
return;
|
|
||||||
default:
|
|
||||||
handleUnknown(type, id, payload);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
} catch (I2PException ise) {
|
|
||||||
_log.warn(getName() + ": Error processing", ise);
|
|
||||||
} catch (IllegalStateException ise) {
|
|
||||||
_log.debug(getName() + ": Error processing", ise);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* We've received an ACK packet (hopefully, in response to a SYN that we
|
|
||||||
* recently sent out). Notify the associated I2PSocket that we now have
|
|
||||||
* the remote stream ID (which should get things going, since the handshake
|
|
||||||
* is complete).
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
private void ackAvailable(String id, byte payload[]) {
|
|
||||||
long begin = _context.clock().now();
|
|
||||||
I2PSocketImpl s = null;
|
|
||||||
synchronized (lock) {
|
|
||||||
s = (I2PSocketImpl) _outSockets.get(id);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (s == null) {
|
|
||||||
_log.warn(getName() + ": No socket responsible for ACK packet for id " + getReadableForm(id));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
long socketRetrieved = _context.clock().now();
|
|
||||||
|
|
||||||
String remoteId = null;
|
|
||||||
remoteId = s.getRemoteID(false);
|
|
||||||
|
|
||||||
if ( (payload.length == 3) && (remoteId == null) ) {
|
|
||||||
String newID = toString(payload);
|
|
||||||
long beforeSetRemId = _context.clock().now();
|
|
||||||
s.setRemoteID(newID);
|
|
||||||
if (_log.shouldLog(Log.DEBUG)) {
|
|
||||||
_log.debug(getName() + ": ackAvailable - socket retrieval took "
|
|
||||||
+ (socketRetrieved-begin) + "ms, getRemoteId took "
|
|
||||||
+ (beforeSetRemId-socketRetrieved) + "ms, setRemoteId took "
|
|
||||||
+ (_context.clock().now()-beforeSetRemId) + "ms");
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
} else {
|
|
||||||
// (payload.length != 3 || getRemoteId != null)
|
|
||||||
if (_log.shouldLog(Log.WARN)) {
|
|
||||||
if (payload.length != 3)
|
|
||||||
_log.warn(getName() + ": Ack packet had " + payload.length + " bytes");
|
|
||||||
else
|
|
||||||
_log.warn(getName() + ": Remote ID already exists? " + remoteId);
|
|
||||||
}
|
|
||||||
if (_log.shouldLog(Log.DEBUG)) {
|
|
||||||
_log.debug(getName() + ": invalid ack - socket retrieval took "
|
|
||||||
+ (socketRetrieved-begin) + "ms, overall took "
|
|
||||||
+ (_context.clock().now()-begin) + "ms");
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* We received a disconnect packet, telling us to tear down the specified
|
|
||||||
* stream.
|
|
||||||
*/
|
|
||||||
private void disconnectAvailable(String id, byte payload[]) {
|
|
||||||
I2PSocketImpl s = null;
|
|
||||||
synchronized (lock) {
|
|
||||||
s = (I2PSocketImpl) _outSockets.get(id);
|
|
||||||
}
|
|
||||||
|
|
||||||
_log.debug(getName() + ": *Disconnect outgoing for socket " + s + " on id "
|
|
||||||
+ getReadableForm(id));
|
|
||||||
try {
|
|
||||||
if (s != null) {
|
|
||||||
if (payload.length > 0) {
|
|
||||||
_log.debug(getName() + ": Disconnect packet had "
|
|
||||||
+ payload.length + " bytes");
|
|
||||||
}
|
|
||||||
if (s.getRemoteID(false) == null) {
|
|
||||||
s.setRemoteID(null); // Just to wake up socket
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
s.internalClose();
|
|
||||||
synchronized (lock) {
|
|
||||||
_outSockets.remove(id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
} catch (Exception t) {
|
|
||||||
_log.warn(getName() + ": Ignoring error on disconnect for socket " + s, t);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* We've received data on a stream we created - toss the data onto
|
|
||||||
* the socket for handling.
|
|
||||||
*
|
|
||||||
* @throws IllegalStateException if the socket isn't open or isn't known
|
|
||||||
*/
|
|
||||||
private void sendOutgoingAvailable(String id, byte payload[]) throws IllegalStateException {
|
|
||||||
I2PSocketImpl s = null;
|
|
||||||
synchronized (lock) {
|
|
||||||
s = (I2PSocketImpl) _outSockets.get(id);
|
|
||||||
}
|
|
||||||
|
|
||||||
// packet send outgoing
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
|
||||||
_log.debug(getName() + ": *Packet send outgoing [" + payload.length + "] for socket "
|
|
||||||
+ s + " on id " + getReadableForm(id));
|
|
||||||
if (s != null) {
|
|
||||||
s.queueData(payload);
|
|
||||||
return;
|
|
||||||
} else {
|
|
||||||
if (_log.shouldLog(Log.WARN))
|
|
||||||
_log.warn(getName() + ": Null socket with data available");
|
|
||||||
throw new IllegalStateException("Null socket with data available");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* We've received a SYN packet (a request for a new stream). If the client has
|
|
||||||
* said they want incoming sockets (by retrieving the serverSocket), the stream
|
|
||||||
* will be ACKed, but if they have not, they'll be NACKed)
|
|
||||||
*
|
|
||||||
* @throws DataFormatException if the destination in the SYN was invalid
|
|
||||||
* @throws I2PSessionException if there was an I2P error sending the ACK or NACK
|
|
||||||
*/
|
|
||||||
private void synIncomingAvailable(String id, byte payload[], I2PSession session)
|
|
||||||
throws DataFormatException, I2PSessionException {
|
|
||||||
Destination d = new Destination();
|
|
||||||
d.fromByteArray(payload);
|
|
||||||
|
|
||||||
I2PSocketImpl s = null;
|
|
||||||
boolean acceptConnections = (_serverSocket != null);
|
|
||||||
String newLocalID = null;
|
|
||||||
synchronized (lock) {
|
|
||||||
newLocalID = makeID(_inSockets);
|
|
||||||
if (acceptConnections) {
|
|
||||||
s = new I2PSocketImpl(d, this, false, newLocalID);
|
|
||||||
s.setRemoteID(id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_log.debug(getName() + ": *Syn! for socket " + s + " on id " + getReadableForm(newLocalID)
|
|
||||||
+ " from " + d.calculateHash().toBase64().substring(0,6));
|
|
||||||
|
|
||||||
if (!acceptConnections) {
|
|
||||||
// The app did not instantiate an I2PServerSocket
|
|
||||||
byte[] packet = makePacket((byte) CLOSE_OUT, id, toBytes(newLocalID));
|
|
||||||
boolean replySentOk = false;
|
|
||||||
synchronized (_session) {
|
|
||||||
replySentOk = _session.sendMessage(d, packet);
|
|
||||||
}
|
|
||||||
if (!replySentOk) {
|
|
||||||
_log.warn(getName() + ": Error sending close to " + d.calculateHash().toBase64()
|
|
||||||
+ " in response to a new con message",
|
|
||||||
new Exception("Failed creation"));
|
|
||||||
}
|
|
||||||
_context.statManager().addRateData("streaming.nackSent", 1, 1);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (_serverSocket.addWaitForAccept(s, _acceptTimeout)) {
|
|
||||||
_inSockets.put(newLocalID, s);
|
|
||||||
byte[] packet = makePacket((byte) ACK, id, toBytes(newLocalID));
|
|
||||||
boolean replySentOk = false;
|
|
||||||
replySentOk = _session.sendMessage(d, packet);
|
|
||||||
if (!replySentOk) {
|
|
||||||
if (_log.shouldLog(Log.WARN))
|
|
||||||
_log.warn(getName() + ": Error sending reply to " + d.calculateHash().toBase64()
|
|
||||||
+ " in response to a new con message for socket " + s,
|
|
||||||
new Exception("Failed creation"));
|
|
||||||
s.internalClose();
|
|
||||||
_context.statManager().addRateData("streaming.ackSendFailed", 1, 1);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// timed out or serverSocket closed
|
|
||||||
byte[] packet = toBytes(" " + id);
|
|
||||||
packet[0] = CLOSE_OUT;
|
|
||||||
boolean nackSent = session.sendMessage(d, packet);
|
|
||||||
if (!nackSent) {
|
|
||||||
_log.warn(getName() + ": Error sending NACK for session creation for socket " + s);
|
|
||||||
}
|
|
||||||
s.internalClose();
|
|
||||||
_context.statManager().addRateData("streaming,nackSent", 1, 1);
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* We've received a disconnect for a socket we didn't initiate, so kill
|
|
||||||
* the socket.
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
private void disconnectIncoming(String id, byte payload[]) {
|
|
||||||
I2PSocketImpl s = null;
|
|
||||||
synchronized (lock) {
|
|
||||||
s = (I2PSocketImpl) _inSockets.get(id);
|
|
||||||
if (payload.length == 0 && s != null) {
|
|
||||||
_inSockets.remove(id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
_log.debug(getName() + ": *Disconnect incoming for socket " + s);
|
|
||||||
|
|
||||||
try {
|
|
||||||
if (payload.length == 0 && s != null) {
|
|
||||||
s.internalClose();
|
|
||||||
return;
|
|
||||||
} else {
|
|
||||||
if ( (payload.length > 0) && (_log.shouldLog(Log.ERROR)) )
|
|
||||||
_log.warn(getName() + ": Disconnect packet had " + payload.length + " bytes");
|
|
||||||
if (s != null)
|
|
||||||
s.internalClose();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
} catch (Exception t) {
|
|
||||||
_log.warn(getName() + ": Ignoring error on disconnect", t);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* We've received data on a stream we received - toss the data onto
|
|
||||||
* the socket for handling.
|
|
||||||
*
|
|
||||||
* @throws IllegalStateException if the socket isn't open or isn't known
|
|
||||||
*/
|
|
||||||
private void sendIncoming(String id, byte payload[]) throws IllegalStateException {
|
|
||||||
I2PSocketImpl s = null;
|
|
||||||
synchronized (lock) {
|
|
||||||
s = (I2PSocketImpl) _inSockets.get(id);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
|
||||||
_log.debug(getName() + ": *Packet send incoming [" + payload.length + "] for socket " + s);
|
|
||||||
|
|
||||||
if (s != null) {
|
|
||||||
s.queueData(payload);
|
|
||||||
return;
|
|
||||||
} else {
|
|
||||||
_log.info(getName() + ": Null socket with data available");
|
|
||||||
throw new IllegalStateException("Null socket with data available");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Unknown packet. moo.
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
private void handleUnknown(int type, String id, byte payload[]) {
|
|
||||||
_log.error(getName() + ": \n\n=============== Unknown packet! " + "============"
|
|
||||||
+ "\nType: " + type
|
|
||||||
+ "\nID: " + getReadableForm(id)
|
|
||||||
+ "\nBase64'ed Data: " + Base64.encode(payload)
|
|
||||||
+ "\n\n\n");
|
|
||||||
if (id != null) {
|
|
||||||
synchronized (lock) {
|
|
||||||
_inSockets.remove(id);
|
|
||||||
_outSockets.remove(id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void reportAbuse(I2PSession session, int severity) {
|
|
||||||
_log.error(getName() + ": Abuse reported [" + severity + "]");
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setDefaultOptions(I2PSocketOptions options) {
|
|
||||||
_defaultOptions = options;
|
|
||||||
}
|
|
||||||
|
|
||||||
public I2PSocketOptions getDefaultOptions() {
|
|
||||||
return _defaultOptions;
|
|
||||||
}
|
|
||||||
|
|
||||||
public I2PSocketOptions buildOptions() { return buildOptions(null); }
|
|
||||||
public I2PSocketOptions buildOptions(Properties opts) {
|
|
||||||
return new I2PSocketOptionsImpl(opts);
|
|
||||||
}
|
|
||||||
|
|
||||||
public I2PServerSocket getServerSocket() {
|
|
||||||
if (_serverSocket == null) {
|
|
||||||
_serverSocket = new I2PServerSocketImpl(this);
|
|
||||||
}
|
|
||||||
return _serverSocket;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @throws UnsupportedOperationException
|
|
||||||
* @since 0.8.4
|
|
||||||
*/
|
|
||||||
public ServerSocket getStandardServerSocket() {
|
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a new connected socket (block until the socket is created)
|
|
||||||
*
|
|
||||||
* @param peer Destination to connect to
|
|
||||||
* @param options I2P socket options to be used for connecting
|
|
||||||
*
|
|
||||||
* @throws ConnectException if the peer refuses the connection
|
|
||||||
* @throws NoRouteToHostException if the peer is not found or not reachable
|
|
||||||
* @throws InterruptedIOException if the connection timeouts
|
|
||||||
* @throws I2PException if there is some other I2P-related problem
|
|
||||||
*/
|
|
||||||
public I2PSocket connect(Destination peer, I2PSocketOptions options)
|
|
||||||
throws I2PException, ConnectException,
|
|
||||||
NoRouteToHostException, InterruptedIOException {
|
|
||||||
String localID, lcID;
|
|
||||||
I2PSocketImpl s;
|
|
||||||
synchronized (lock) {
|
|
||||||
localID = makeID(_outSockets);
|
|
||||||
lcID = getReadableForm(localID);
|
|
||||||
s = new I2PSocketImpl(peer, this, true, localID);
|
|
||||||
_outSockets.put(localID, s);
|
|
||||||
}
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
|
||||||
_log.debug(getName() + ": connect(" + peer.calculateHash().toBase64().substring(0,6)
|
|
||||||
+ ", ...): localID = " + lcID);
|
|
||||||
|
|
||||||
try {
|
|
||||||
ByteArrayOutputStream pubkey = new ByteArrayOutputStream();
|
|
||||||
_session.getMyDestination().writeBytes(pubkey);
|
|
||||||
String remoteID;
|
|
||||||
byte[] packet = makePacket((byte) SYN, localID, pubkey.toByteArray());
|
|
||||||
boolean sent = false;
|
|
||||||
sent = _session.sendMessage(peer, packet);
|
|
||||||
if (!sent) {
|
|
||||||
_log.info(getName() + ": Unable to send & receive ack for SYN packet for socket "
|
|
||||||
+ s + " with localID = " + lcID);
|
|
||||||
synchronized (lock) {
|
|
||||||
_outSockets.remove(s.getLocalID());
|
|
||||||
}
|
|
||||||
_context.statManager().addRateData("streaming.synNoAck", 1, 1);
|
|
||||||
throw new I2PException("Error sending through I2P network");
|
|
||||||
} else {
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
|
||||||
_log.debug(getName() + ": syn sent ok to "
|
|
||||||
+ peer.calculateHash().toBase64().substring(0,6)
|
|
||||||
+ " with localID = " + lcID);
|
|
||||||
}
|
|
||||||
if (options != null)
|
|
||||||
remoteID = s.getRemoteID(true, options.getConnectTimeout());
|
|
||||||
else
|
|
||||||
remoteID = s.getRemoteID(true, getDefaultOptions().getConnectTimeout());
|
|
||||||
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
|
||||||
_log.debug(getName() + ": remoteID received from "
|
|
||||||
+ peer.calculateHash().toBase64().substring(0,6)
|
|
||||||
+ ": " + getReadableForm(remoteID)
|
|
||||||
+ " with localID = " + lcID);
|
|
||||||
|
|
||||||
if (remoteID == null) {
|
|
||||||
_context.statManager().addRateData("streaming.nackReceived", 1, 1);
|
|
||||||
throw new ConnectException("Connection refused by peer for socket " + s);
|
|
||||||
}
|
|
||||||
if ("".equals(remoteID)) {
|
|
||||||
_context.statManager().addRateData("streaming.synNoAck", 1, 1);
|
|
||||||
throw new NoRouteToHostException("Unable to reach peer for socket " + s);
|
|
||||||
}
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
|
||||||
_log.debug(getName() + ": TIMING: s given out for remoteID "
|
|
||||||
+ getReadableForm(remoteID) + " for socket " + s);
|
|
||||||
|
|
||||||
return s;
|
|
||||||
} catch (InterruptedIOException ioe) {
|
|
||||||
if (_log.shouldLog(Log.WARN))
|
|
||||||
_log.warn(getName() + ": Timeout waiting for ack from syn for id "
|
|
||||||
+ lcID + " to " + peer.calculateHash().toBase64().substring(0,6)
|
|
||||||
+ " for socket " + s, ioe);
|
|
||||||
synchronized (lock) {
|
|
||||||
_outSockets.remove(s.getLocalID());
|
|
||||||
}
|
|
||||||
s.internalClose();
|
|
||||||
_context.statManager().addRateData("streaming.synNoAck", 1, 1);
|
|
||||||
throw new InterruptedIOException("Timeout waiting for ack");
|
|
||||||
} catch (ConnectException ex) {
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
|
||||||
_log.debug(getName() + ": Connection error waiting for ack from syn for id "
|
|
||||||
+ lcID + " to " + peer.calculateHash().toBase64().substring(0,6)
|
|
||||||
+ " for socket " + s, ex);
|
|
||||||
s.internalClose();
|
|
||||||
throw ex;
|
|
||||||
} catch (NoRouteToHostException ex) {
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
|
||||||
_log.debug(getName() + ": No route to host waiting for ack from syn for id "
|
|
||||||
+ lcID + " to " + peer.calculateHash().toBase64().substring(0,6)
|
|
||||||
+ " for socket " + s, ex);
|
|
||||||
s.internalClose();
|
|
||||||
throw ex;
|
|
||||||
} catch (IOException ex) {
|
|
||||||
if (_log.shouldLog(Log.WARN))
|
|
||||||
_log.warn(getName() + ": Error sending syn on id "
|
|
||||||
+ lcID + " to " + peer.calculateHash().toBase64().substring(0,6)
|
|
||||||
+ " for socket " + s, ex);
|
|
||||||
synchronized (lock) {
|
|
||||||
_outSockets.remove(s.getLocalID());
|
|
||||||
}
|
|
||||||
s.internalClose();
|
|
||||||
throw new I2PException("Unhandled IOException occurred");
|
|
||||||
} catch (I2PException ex) {
|
|
||||||
if (_log.shouldLog(Log.INFO))
|
|
||||||
_log.info(getName() + ": Error sending syn on id "
|
|
||||||
+ lcID + " to " + peer.calculateHash().toBase64().substring(0,6)
|
|
||||||
+ " for socket " + s, ex);
|
|
||||||
synchronized (lock) {
|
|
||||||
_outSockets.remove(s.getLocalID());
|
|
||||||
}
|
|
||||||
s.internalClose();
|
|
||||||
throw ex;
|
|
||||||
} catch (Exception e) {
|
|
||||||
s.internalClose();
|
|
||||||
_log.warn(getName() + ": Unhandled error connecting on "
|
|
||||||
+ lcID + " to " + peer.calculateHash().toBase64().substring(0,6)
|
|
||||||
+ " for socket " + s, e);
|
|
||||||
throw new ConnectException("Unhandled error connecting: " + e.getMessage());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a new connected socket (block until the socket is created)
|
|
||||||
*
|
|
||||||
* @param peer Destination to connect to
|
|
||||||
*
|
|
||||||
* @throws ConnectException if the peer refuses the connection
|
|
||||||
* @throws NoRouteToHostException if the peer is not found or not reachable
|
|
||||||
* @throws InterruptedIOException if the connection timeouts
|
|
||||||
* @throws I2PException if there is some other I2P-related problem
|
|
||||||
*/
|
|
||||||
public I2PSocket connect(Destination peer) throws I2PException, ConnectException,
|
|
||||||
NoRouteToHostException, InterruptedIOException {
|
|
||||||
return connect(peer, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @throws UnsupportedOperationException
|
|
||||||
* @since 0.8.4
|
|
||||||
*/
|
|
||||||
public Socket connectToSocket(Destination peer) {
|
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @throws UnsupportedOperationException
|
|
||||||
* @since 0.8.4
|
|
||||||
*/
|
|
||||||
public Socket connectToSocket(Destination peer, int timeout) {
|
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Destroy the socket manager, freeing all the associated resources. This
|
|
||||||
* method will block untill all the managed sockets are closed.
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
public void destroySocketManager() {
|
|
||||||
if (_serverSocket != null) {
|
|
||||||
_serverSocket.close();
|
|
||||||
_serverSocket = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
synchronized (lock) {
|
|
||||||
Iterator iter;
|
|
||||||
String id = null;
|
|
||||||
I2PSocketImpl sock;
|
|
||||||
|
|
||||||
iter = _inSockets.keySet().iterator();
|
|
||||||
while (iter.hasNext()) {
|
|
||||||
id = (String)iter.next();
|
|
||||||
sock = (I2PSocketImpl)_inSockets.get(id);
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
|
||||||
_log.debug(getName() + ": Closing inSocket \""
|
|
||||||
+ getReadableForm(sock.getLocalID()) + "\"");
|
|
||||||
sock.internalClose();
|
|
||||||
}
|
|
||||||
|
|
||||||
iter = _outSockets.keySet().iterator();
|
|
||||||
while (iter.hasNext()) {
|
|
||||||
id = (String)iter.next();
|
|
||||||
sock = (I2PSocketImpl)_outSockets.get(id);
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
|
||||||
_log.debug(getName() + ": Closing outSocket \""
|
|
||||||
+ getReadableForm(sock.getLocalID()) + "\"");
|
|
||||||
sock.internalClose();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
_log.debug(getName() + ": Waiting for all open sockets to really close...");
|
|
||||||
synchronized (lock) {
|
|
||||||
while ((_inSockets.size() != 0) || (_outSockets.size() != 0)) {
|
|
||||||
try {
|
|
||||||
lock.wait();
|
|
||||||
} catch (InterruptedException e) {}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
_log.debug(getName() + ": Destroying I2P session...");
|
|
||||||
_session.destroySession();
|
|
||||||
_log.debug(getName() + ": I2P session destroyed");
|
|
||||||
} catch (I2PSessionException e) {
|
|
||||||
_log.warn(getName() + ": Error destroying I2P session", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Retrieve a set of currently connected I2PSockets, either initiated locally or remotely.
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
public Set<I2PSocket> listSockets() {
|
|
||||||
Set<I2PSocket> sockets = new HashSet<I2PSocket>(8);
|
|
||||||
synchronized (lock) {
|
|
||||||
sockets.addAll(_inSockets.values());
|
|
||||||
sockets.addAll(_outSockets.values());
|
|
||||||
}
|
|
||||||
return sockets;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Ping the specified peer, returning true if they replied to the ping within
|
|
||||||
* the timeout specified, false otherwise. This call blocks.
|
|
||||||
*
|
|
||||||
* @deprecated timeout is ignored - use I2PSocketManagerFull.ping()
|
|
||||||
* @param timeoutMs ignored
|
|
||||||
*/
|
|
||||||
public boolean ping(Destination peer, long timeoutMs) {
|
|
||||||
try {
|
|
||||||
return _session.sendMessage(peer, new byte[] { (byte) CHAFF});
|
|
||||||
} catch (I2PException ex) {
|
|
||||||
_log.warn(getName() + ": I2PException:", ex);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void removeSocket(I2PSocketImpl sock) {
|
|
||||||
String localId = sock.getLocalID();
|
|
||||||
boolean removed = false;
|
|
||||||
synchronized (lock) {
|
|
||||||
removed = (null != _inSockets.remove(localId));
|
|
||||||
removed = removed || (null != _outSockets.remove(localId));
|
|
||||||
lock.notify();
|
|
||||||
}
|
|
||||||
|
|
||||||
long now = _context.clock().now();
|
|
||||||
long lifetime = now - sock.getCreatedOn();
|
|
||||||
long timeSinceClose = now - sock.getClosedOn();
|
|
||||||
long sent = sock.getBytesSent();
|
|
||||||
long recv = sock.getBytesReceived();
|
|
||||||
|
|
||||||
if (_log.shouldLog(Log.DEBUG)) {
|
|
||||||
_log.debug(getName() + ": Removing socket \"" + getReadableForm(localId) + "\" [" + sock
|
|
||||||
+ ", send: " + sent + ", recv: " + recv
|
|
||||||
+ ", lifetime: " + lifetime + "ms, time since close: " + timeSinceClose
|
|
||||||
+ " removed? " + removed + ")]",
|
|
||||||
new Exception("removeSocket called"));
|
|
||||||
}
|
|
||||||
|
|
||||||
_context.statManager().addRateData("streaming.lifetime", lifetime, lifetime);
|
|
||||||
_context.statManager().addRateData("streaming.sent", sent, lifetime);
|
|
||||||
_context.statManager().addRateData("streaming.received", recv, lifetime);
|
|
||||||
|
|
||||||
if (sent > recv) {
|
|
||||||
_context.statManager().addRateData("streaming.transferBalance", 1, lifetime);
|
|
||||||
} else if (recv > sent) {
|
|
||||||
_context.statManager().addRateData("streaming.transferBalance", -1, lifetime);
|
|
||||||
} else {
|
|
||||||
// noop
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getName() { return _name; }
|
|
||||||
public void setName(String name) { _name = name; }
|
|
||||||
|
|
||||||
public void addDisconnectListener(I2PSocketManager.DisconnectListener lsnr) {
|
|
||||||
synchronized (_listeners) {
|
|
||||||
_listeners.add(lsnr);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
public void removeDisconnectListener(I2PSocketManager.DisconnectListener lsnr) {
|
|
||||||
synchronized (_listeners) {
|
|
||||||
_listeners.remove(lsnr);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static String getReadableForm(String id) {
|
|
||||||
if (id == null) return "(null)";
|
|
||||||
if (id.length() != 3) return "Bogus";
|
|
||||||
return Base64.encode(toBytes(id));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a new part the connection ID that is locally unique
|
|
||||||
*
|
|
||||||
* @param uniqueIn map of already known local IDs so we don't collide. WARNING - NOT THREADSAFE!
|
|
||||||
*/
|
|
||||||
private static String makeID(HashMap uniqueIn) {
|
|
||||||
String newID;
|
|
||||||
do {
|
|
||||||
int id = (int) (Math.random() * 16777215 + 1);
|
|
||||||
byte[] nid = new byte[3];
|
|
||||||
nid[0] = (byte) (id / 65536);
|
|
||||||
nid[1] = (byte) ((id / 256) % 256);
|
|
||||||
nid[2] = (byte) (id % 256);
|
|
||||||
newID = toString(nid);
|
|
||||||
} while (uniqueIn.get(newID) != null);
|
|
||||||
return newID;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a new packet of the given type for the specified connection containing
|
|
||||||
* the given payload
|
|
||||||
*/
|
|
||||||
public static byte[] makePacket(byte type, String id, byte[] payload) {
|
|
||||||
byte[] packet = new byte[payload.length + 4];
|
|
||||||
packet[0] = type;
|
|
||||||
byte[] temp = toBytes(id);
|
|
||||||
if (temp.length != 3) throw new RuntimeException("Incorrect ID length: " + temp.length);
|
|
||||||
System.arraycopy(temp, 0, packet, 1, 3);
|
|
||||||
System.arraycopy(payload, 0, packet, 4, payload.length);
|
|
||||||
return packet;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static final String toString(byte data[]) {
|
|
||||||
try {
|
|
||||||
return new String(data, "ISO-8859-1");
|
|
||||||
} catch (UnsupportedEncodingException uee) {
|
|
||||||
throw new RuntimeException("WTF! iso-8859-1 isn't supported?");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static final byte[] toBytes(String str) {
|
|
||||||
try {
|
|
||||||
return str.getBytes("ISO-8859-1");
|
|
||||||
} catch (UnsupportedEncodingException uee) {
|
|
||||||
throw new RuntimeException("WTF! iso-8859-1 isn't supported?");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
Reference in New Issue
Block a user