forked from I2P_Developers/i2p.i2p
beginning of branch i2p.i2p.i2p
This commit is contained in:
35
apps/ministreaming/java/build.xml
Normal file
35
apps/ministreaming/java/build.xml
Normal file
@@ -0,0 +1,35 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project basedir="." default="all" name="ministreaming">
|
||||
<target name="all" depends="clean, build" />
|
||||
<target name="build" depends="builddep, jar" />
|
||||
<target name="builddep">
|
||||
<ant dir="../../../core/java/" target="build" />
|
||||
</target>
|
||||
<target name="compile">
|
||||
<mkdir dir="./build" />
|
||||
<mkdir dir="./build/obj" />
|
||||
<javac srcdir="./src" debug="true" destdir="./build/obj" classpath="../../../core/java/build/i2p.jar" />
|
||||
</target>
|
||||
<target name="jar" depends="compile">
|
||||
<jar destfile="./build/mstreaming.jar" basedir="./build/obj" includes="**/*.class" />
|
||||
</target>
|
||||
<target name="javadoc">
|
||||
<mkdir dir="./build" />
|
||||
<mkdir dir="./build/javadoc" />
|
||||
<javadoc
|
||||
sourcepath="./src:../../../core/java/src:../../../core/java/test" destdir="./build/javadoc"
|
||||
packagenames="*"
|
||||
use="true"
|
||||
splitindex="true"
|
||||
windowtitle="I2P ministreaming library" />
|
||||
</target>
|
||||
<target name="clean">
|
||||
<delete dir="./build" />
|
||||
</target>
|
||||
<target name="cleandep" depends="clean">
|
||||
<ant dir="../../../core/java/" target="cleandep" />
|
||||
</target>
|
||||
<target name="distclean" depends="clean">
|
||||
<ant dir="../../../core/java/" target="distclean" />
|
||||
</target>
|
||||
</project>
|
@@ -0,0 +1,159 @@
|
||||
package net.i2p.client.streaming;
|
||||
|
||||
/** Like a StringBuffer, but for bytes */
|
||||
public class ByteCollector {
|
||||
byte[] contents;
|
||||
int size;
|
||||
|
||||
public ByteCollector() {
|
||||
contents=new byte[80];
|
||||
size=0;
|
||||
}
|
||||
|
||||
public ByteCollector(byte[] b) {
|
||||
this();
|
||||
append(b);
|
||||
}
|
||||
|
||||
public ByteCollector(byte b) {
|
||||
this();
|
||||
append(b);
|
||||
}
|
||||
|
||||
public ByteCollector append (byte b) {
|
||||
ensureCapacity(size+1);
|
||||
contents[size++]=b;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ByteCollector append (byte[] b) {
|
||||
ensureCapacity(size+b.length);
|
||||
System.arraycopy(b,0,contents,size,b.length);
|
||||
size+=b.length;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ByteCollector append(byte[] b, int len) {
|
||||
return append(b,0,len);
|
||||
}
|
||||
|
||||
public ByteCollector append(byte[] b, int off, int len) {
|
||||
ensureCapacity(size+len);
|
||||
System.arraycopy(b,off,contents,size,len);
|
||||
size+=len;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ByteCollector append(ByteCollector bc) {
|
||||
// optimieren?
|
||||
return append(bc.toByteArray());
|
||||
}
|
||||
|
||||
public byte[] toByteArray() {
|
||||
byte[] result=new byte[size];
|
||||
System.arraycopy(contents,0,result,0,size);
|
||||
return result;
|
||||
}
|
||||
|
||||
public byte[] startToByteArray(int maxlen) {
|
||||
if (size < maxlen) {
|
||||
byte[] res = toByteArray();
|
||||
clear();
|
||||
return res;
|
||||
} else {
|
||||
byte[] result = new byte[maxlen];
|
||||
System.arraycopy(contents,0,result,0,maxlen);
|
||||
System.arraycopy(contents,maxlen,contents,0,size-maxlen);
|
||||
size-=maxlen;
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
public int getCurrentSize() {
|
||||
return size;
|
||||
}
|
||||
|
||||
public boolean ensureCapacity(int cap) {
|
||||
if (contents.length<cap) {
|
||||
int l=contents.length;
|
||||
while (l<cap) {
|
||||
l=(l*3)/2+1;
|
||||
}
|
||||
byte[] newcont=new byte[l];
|
||||
System.arraycopy(contents,0,newcont,0,size);
|
||||
contents=newcont;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public boolean isEmpty() {
|
||||
return size==0;
|
||||
}
|
||||
|
||||
public int indexOf(ByteCollector bc) {
|
||||
// optimieren?
|
||||
return indexOf(bc.toByteArray());
|
||||
}
|
||||
|
||||
public int indexOf(byte b) {
|
||||
// optimieren?
|
||||
return indexOf(new byte[] {b});
|
||||
}
|
||||
|
||||
public int indexOf(byte[] ba) {
|
||||
loop:
|
||||
for (int i=0;i<size-ba.length+1;i++) {
|
||||
for (int j=0;j<ba.length;j++) {
|
||||
if (contents[i+j]!=ba[j]) continue loop;
|
||||
}
|
||||
return i;
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
public void clear() {
|
||||
size=0;
|
||||
}
|
||||
|
||||
public void clearAndShorten() {
|
||||
size=0;
|
||||
contents=new byte[80];
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return new String(toByteArray());
|
||||
}
|
||||
|
||||
public int hashCode() {
|
||||
int h =0;
|
||||
for (int i=0;i<size;i++) {
|
||||
h+=contents[i]*contents[i];
|
||||
}
|
||||
return h;
|
||||
}
|
||||
|
||||
public boolean equals(Object o) {
|
||||
if (o instanceof ByteCollector) {
|
||||
ByteCollector by=(ByteCollector)o;
|
||||
if (size!=by.size) return false;
|
||||
for (int i=0;i<size;i++) {
|
||||
if (contents[i]!=by.contents[i]) return false;
|
||||
}
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public byte removeFirst() {
|
||||
byte bb=contents[0];
|
||||
if (size==0)
|
||||
throw new IllegalArgumentException("ByteCollector is empty");
|
||||
if(size>1)
|
||||
System.arraycopy(contents,1,contents,0,--size);
|
||||
else
|
||||
size=0;
|
||||
return bb;
|
||||
}
|
||||
}
|
@@ -0,0 +1,29 @@
|
||||
package net.i2p.client.streaming;
|
||||
|
||||
import net.i2p.I2PException;
|
||||
|
||||
/**
|
||||
* Defines how to listen for streaming peer connections
|
||||
*
|
||||
*/
|
||||
public interface I2PServerSocket {
|
||||
/**
|
||||
* Closes the socket.
|
||||
*/
|
||||
public void close() throws I2PException;
|
||||
|
||||
/**
|
||||
* Waits for the next socket connecting. If a remote user tried to make a
|
||||
* connection and the local application wasn't .accept()ing new connections,
|
||||
* they should get refused (if .accept() doesnt occur in some small period)
|
||||
*
|
||||
* @throws I2PException if there is a problem with reading a new socket
|
||||
* from the data available (aka the I2PSession closed, etc)
|
||||
*/
|
||||
public I2PSocket accept() throws I2PException;
|
||||
|
||||
/**
|
||||
* Access the manager which is coordinating the server socket
|
||||
*/
|
||||
public I2PSocketManager getManager();
|
||||
}
|
@@ -0,0 +1,50 @@
|
||||
package net.i2p.client.streaming;
|
||||
|
||||
import net.i2p.I2PException;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
/**
|
||||
* Initial stub implementation for the server socket
|
||||
*
|
||||
*/
|
||||
class I2PServerSocketImpl implements I2PServerSocket {
|
||||
private final static Log _log = new Log(I2PServerSocketImpl.class);
|
||||
private I2PSocketManager mgr;
|
||||
private I2PSocket cached=null; // buffer one socket here
|
||||
|
||||
public I2PServerSocketImpl(I2PSocketManager mgr) {
|
||||
this.mgr = mgr;
|
||||
}
|
||||
|
||||
public synchronized I2PSocket accept() throws I2PException {
|
||||
while(cached == null) {
|
||||
myWait();
|
||||
}
|
||||
I2PSocket ret=cached;
|
||||
cached=null;
|
||||
notifyAll();
|
||||
_log.debug("TIMING: handed out accept result "+ret.hashCode());
|
||||
return ret;
|
||||
}
|
||||
|
||||
public synchronized boolean getNewSocket(I2PSocket s){
|
||||
while(cached != null) {
|
||||
myWait();
|
||||
}
|
||||
cached=s;
|
||||
notifyAll();
|
||||
return true;
|
||||
}
|
||||
|
||||
public void close() throws I2PException {
|
||||
//noop
|
||||
}
|
||||
|
||||
private void myWait() {
|
||||
try{
|
||||
wait();
|
||||
} catch (InterruptedException ex) {}
|
||||
}
|
||||
|
||||
public I2PSocketManager getManager() { return mgr; }
|
||||
}
|
@@ -0,0 +1,39 @@
|
||||
package net.i2p.client.streaming;
|
||||
|
||||
import net.i2p.data.Destination;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Minimalistic adapter between the socket api and I2PTunnel's way.
|
||||
* Note that this interface is a "subinterface" of the interface
|
||||
* defined in the "official" streaming api.
|
||||
*/
|
||||
public interface I2PSocket {
|
||||
/**
|
||||
* Return the Destination of this side of the socket.
|
||||
*/
|
||||
public Destination getThisDestination();
|
||||
|
||||
/**
|
||||
* Return the destination of the peer.
|
||||
*/
|
||||
public Destination getPeerDestination();
|
||||
|
||||
/**
|
||||
* Return an InputStream to read from the socket.
|
||||
*/
|
||||
public InputStream getInputStream() throws IOException;
|
||||
|
||||
/**
|
||||
* Return an OutputStream to write into the socket.
|
||||
*/
|
||||
public OutputStream getOutputStream() throws IOException;
|
||||
|
||||
/**
|
||||
* Closes the socket if not closed yet
|
||||
*/
|
||||
public void close() throws IOException;
|
||||
}
|
@@ -0,0 +1,335 @@
|
||||
package net.i2p.client.streaming;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.io.OutputStream;
|
||||
|
||||
import net.i2p.I2PException;
|
||||
import net.i2p.client.I2PSessionException;
|
||||
import net.i2p.data.Destination;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
/**
|
||||
* Initial stub implementation for the socket
|
||||
*
|
||||
*/
|
||||
class I2PSocketImpl implements I2PSocket {
|
||||
private final static Log _log = new Log(I2PSocketImpl.class);
|
||||
|
||||
public static final int MAX_PACKET_SIZE = 1024*32;
|
||||
public static final int PACKET_DELAY=100;
|
||||
|
||||
private I2PSocketManager manager;
|
||||
private Destination local;
|
||||
private Destination remote;
|
||||
private String localID;
|
||||
private String remoteID;
|
||||
private Object remoteIDWaiter = new Object();
|
||||
private I2PInputStream in;
|
||||
private I2POutputStream out;
|
||||
private boolean outgoing;
|
||||
private Object flagLock = new Object();
|
||||
private boolean closed = false, sendClose=true, closed2=false;
|
||||
|
||||
public I2PSocketImpl(Destination peer, I2PSocketManager mgr,
|
||||
boolean outgoing, String localID) {
|
||||
this.outgoing=outgoing;
|
||||
manager = mgr;
|
||||
remote = peer;
|
||||
local = mgr.getSession().getMyDestination();
|
||||
in = new I2PInputStream();
|
||||
I2PInputStream pin = new I2PInputStream();
|
||||
out = new I2POutputStream(pin);
|
||||
new I2PSocketRunner(pin);
|
||||
this.localID = localID;
|
||||
}
|
||||
|
||||
public String getLocalID() {
|
||||
return localID;
|
||||
}
|
||||
|
||||
public void setRemoteID(String id) {
|
||||
synchronized(remoteIDWaiter) {
|
||||
remoteID=id;
|
||||
remoteIDWaiter.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
public String getRemoteID(boolean wait) throws InterruptedIOException {
|
||||
return getRemoteID(wait, -1);
|
||||
}
|
||||
public String getRemoteID(boolean wait, long maxWait) throws InterruptedIOException {
|
||||
long dieAfter = System.currentTimeMillis() + maxWait;
|
||||
synchronized(remoteIDWaiter) {
|
||||
while (wait && remoteID==null) {
|
||||
try {
|
||||
if (maxWait > 0)
|
||||
remoteIDWaiter.wait(maxWait);
|
||||
else
|
||||
remoteIDWaiter.wait();
|
||||
} catch (InterruptedException ex) {}
|
||||
|
||||
if ( (maxWait > 0) && (System.currentTimeMillis() > dieAfter) )
|
||||
throw new InterruptedIOException("Timed out waiting for remote ID");
|
||||
}
|
||||
if (wait) {
|
||||
_log.debug("TIMING: RemoteID set to " + I2PSocketManager.getReadableForm(remoteID) +" for "+this.hashCode());
|
||||
}
|
||||
return remoteID;
|
||||
}
|
||||
}
|
||||
|
||||
public String getRemoteID() throws InterruptedIOException {
|
||||
return getRemoteID(false);
|
||||
}
|
||||
|
||||
public void queueData(byte[] data) {
|
||||
in.queueData(data);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the Destination of this side of the socket.
|
||||
*/
|
||||
public Destination getThisDestination() { return local; }
|
||||
|
||||
/**
|
||||
* Return the destination of the peer.
|
||||
*/
|
||||
public Destination getPeerDestination() { return remote; }
|
||||
|
||||
/**
|
||||
* Return an InputStream to read from the socket.
|
||||
*/
|
||||
public InputStream getInputStream() throws IOException {
|
||||
if ( (in == null) )
|
||||
throw new IOException("Not connected");
|
||||
return in;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return an OutputStream to write into the socket.
|
||||
*/
|
||||
public OutputStream getOutputStream() throws IOException {
|
||||
if ( (out == null) )
|
||||
throw new IOException("Not connected");
|
||||
return out;
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes the socket if not closed yet
|
||||
*/
|
||||
public void close() throws IOException {
|
||||
synchronized(flagLock) {
|
||||
_log.debug("Closing connection");
|
||||
closed=true;
|
||||
}
|
||||
out.close();
|
||||
in.notifyClosed();
|
||||
}
|
||||
|
||||
public void internalClose() {
|
||||
synchronized(flagLock) {
|
||||
closed=true;
|
||||
closed2=true;
|
||||
sendClose=false;
|
||||
}
|
||||
out.close();
|
||||
in.notifyClosed();
|
||||
}
|
||||
|
||||
|
||||
private byte getMask(int add) {
|
||||
return (byte)((outgoing?(byte)0xA0:(byte)0x50)+(byte)add);
|
||||
}
|
||||
|
||||
//--------------------------------------------------
|
||||
public class I2PInputStream extends InputStream {
|
||||
|
||||
private ByteCollector bc = new ByteCollector();
|
||||
|
||||
public int read() throws IOException {
|
||||
byte[] b = new byte[1];
|
||||
int res = read(b);
|
||||
if (res == 1) return b[0] & 0xff;
|
||||
if (res == -1) return -1;
|
||||
throw new RuntimeException("Incorrect read() result");
|
||||
}
|
||||
|
||||
public synchronized int read(byte[] b, int off, int len) throws IOException {
|
||||
_log.debug("Read called: "+this.hashCode());
|
||||
if (len==0) return 0;
|
||||
byte[] read = bc.startToByteArray(len);
|
||||
while (read.length==0) {
|
||||
synchronized(flagLock) {
|
||||
if (closed){
|
||||
_log.debug("Closed is set, so closing stream: "+this.hashCode());
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
try {
|
||||
wait();
|
||||
} catch (InterruptedException ex) {}
|
||||
read = bc.startToByteArray(len);
|
||||
}
|
||||
if (read.length>len) throw new RuntimeException("BUG");
|
||||
System.arraycopy(read,0,b,off,read.length);
|
||||
|
||||
if (_log.shouldLog(Log.DEBUG)) {
|
||||
_log.debug("Read from I2PInputStream " + this.hashCode()
|
||||
+ " returned "+read.length+" bytes");
|
||||
}
|
||||
//if (_log.shouldLog(Log.DEBUG)) {
|
||||
// _log.debug("Read from I2PInputStream " + this.hashCode()
|
||||
// + " returned "+read.length+" bytes:\n"
|
||||
// + HexDump.dump(read));
|
||||
//}
|
||||
return read.length;
|
||||
}
|
||||
|
||||
public int available() {
|
||||
return bc.getCurrentSize();
|
||||
}
|
||||
|
||||
public void queueData(byte[] data) {
|
||||
queueData(data,0,data.length);
|
||||
}
|
||||
|
||||
public synchronized void queueData(byte[] data, int off, int len) {
|
||||
_log.debug("Insert "+len+" bytes into queue: "+this.hashCode());
|
||||
bc.append(data, off, len);
|
||||
notifyAll();
|
||||
}
|
||||
|
||||
public synchronized void notifyClosed() {
|
||||
notifyAll();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public class I2POutputStream extends OutputStream {
|
||||
|
||||
public I2PInputStream sendTo;
|
||||
|
||||
public I2POutputStream(I2PInputStream sendTo) {
|
||||
this.sendTo=sendTo;
|
||||
}
|
||||
public void write(int b) throws IOException {
|
||||
write(new byte[] {(byte)b});
|
||||
}
|
||||
|
||||
public void write (byte[] b, int off, int len) throws IOException {
|
||||
sendTo.queueData(b,off,len);
|
||||
}
|
||||
|
||||
public void close() {
|
||||
sendTo.notifyClosed();
|
||||
}
|
||||
}
|
||||
|
||||
public class I2PSocketRunner extends Thread {
|
||||
|
||||
public InputStream in;
|
||||
|
||||
public I2PSocketRunner(InputStream in) {
|
||||
_log.debug("Runner's input stream is: "+in.hashCode());
|
||||
this.in=in;
|
||||
start();
|
||||
}
|
||||
|
||||
public void run() {
|
||||
byte[] buffer = new byte[MAX_PACKET_SIZE];
|
||||
ByteCollector bc = new ByteCollector();
|
||||
boolean sent = true;
|
||||
try {
|
||||
int len, bcsize;
|
||||
// try {
|
||||
while (true) {
|
||||
len = in.read(buffer);
|
||||
bcsize = bc.getCurrentSize();
|
||||
if (len != -1) {
|
||||
bc.append(buffer,len);
|
||||
} else if (bcsize == 0) {
|
||||
break;
|
||||
}
|
||||
if ((bcsize < MAX_PACKET_SIZE)
|
||||
&& (in.available()==0)) {
|
||||
_log.debug("Runner Point d: "+this.hashCode());
|
||||
|
||||
try {
|
||||
Thread.sleep(PACKET_DELAY);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
if ((bcsize >= MAX_PACKET_SIZE)
|
||||
|| (in.available()==0) ) {
|
||||
byte[] data = bc.startToByteArray(MAX_PACKET_SIZE);
|
||||
if (data.length > 0) {
|
||||
_log.debug("Message size is: "+data.length);
|
||||
sent = sendBlock(data);
|
||||
if (!sent) {
|
||||
_log.error("Error sending message to peer. Killing socket runner");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if ((bc.getCurrentSize() > 0) && sent) {
|
||||
_log.error("A SCARY MONSTER HAS EATEN SOME DATA! "
|
||||
+ "(input stream: " + in.hashCode() + "; "
|
||||
+ "queue size: " + bc.getCurrentSize() + ")");
|
||||
}
|
||||
synchronized(flagLock) {
|
||||
closed2=true;
|
||||
}
|
||||
// } catch (IOException ex) {
|
||||
// if (_log.shouldLog(Log.INFO))
|
||||
// _log.info("Error reading and writing", ex);
|
||||
// }
|
||||
boolean sc;
|
||||
synchronized(flagLock) {
|
||||
sc=sendClose;
|
||||
} // FIXME: Race here?
|
||||
if (sc) {
|
||||
_log.info("Sending close packet: "+outgoing);
|
||||
byte[] packet = I2PSocketManager.makePacket
|
||||
((byte)(getMask(0x02)),remoteID, new byte[0]);
|
||||
synchronized(manager.getSession()) {
|
||||
sent = manager.getSession().sendMessage(remote, packet);
|
||||
}
|
||||
if (!sent) {
|
||||
_log.error("Error sending close packet to peer");
|
||||
}
|
||||
}
|
||||
manager.removeSocket(I2PSocketImpl.this);
|
||||
} catch (IOException ex) {
|
||||
// WHOEVER removes this event on inconsistent
|
||||
// state before fixing the inconsistent state (a
|
||||
// reference on the socket in the socket manager
|
||||
// etc.) will get hanged by me personally -- mihi
|
||||
_log.error("Error running - **INCONSISTENT STATE!!!**", ex);
|
||||
} catch (I2PException ex) {
|
||||
_log.error("Error running - **INCONSISTENT STATE!!!**" , ex);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean sendBlock(byte data[]) throws I2PSessionException {
|
||||
_log.debug("TIMING: Block to send for "+I2PSocketImpl.this.hashCode());
|
||||
if (remoteID==null) {
|
||||
_log.error("NULL REMOTEID");
|
||||
return false;
|
||||
}
|
||||
byte[] packet = I2PSocketManager.makePacket(getMask(0x00), remoteID,
|
||||
data);
|
||||
boolean sent;
|
||||
synchronized(flagLock) {
|
||||
if (closed2) return false;
|
||||
}
|
||||
synchronized(manager.getSession()) {
|
||||
sent = manager.getSession().sendMessage(remote, packet);
|
||||
}
|
||||
return sent;
|
||||
}
|
||||
}
|
||||
}
|
@@ -0,0 +1,386 @@
|
||||
/*
|
||||
* licensed under BSD license...
|
||||
* (if you know the proper clause for that, add it ...)
|
||||
*/
|
||||
package net.i2p.client.streaming;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import net.i2p.I2PException;
|
||||
import net.i2p.client.I2PSession;
|
||||
import net.i2p.client.I2PSessionListener;
|
||||
import net.i2p.data.Base64;
|
||||
import net.i2p.data.Destination;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
/**
|
||||
* Centralize the coordination and multiplexing of the local client's streaming.
|
||||
* There should be one I2PSocketManager for each I2PSession, and if an application
|
||||
* is sending and receiving data through the streaming library using an
|
||||
* I2PSocketManager, it should not attempt to call I2PSession's setSessionListener
|
||||
* or receive any messages with its .receiveMessage
|
||||
*
|
||||
*/
|
||||
public class I2PSocketManager implements I2PSessionListener {
|
||||
private final static Log _log = new Log(I2PSocketManager.class);
|
||||
private I2PSession _session;
|
||||
private I2PServerSocketImpl _serverSocket;
|
||||
private Object lock = new Object(); // for locking socket lists
|
||||
private HashMap _outSockets;
|
||||
private HashMap _inSockets;
|
||||
private I2PSocketOptions _defaultOptions;
|
||||
|
||||
public static final int PUBKEY_LENGTH=387;
|
||||
|
||||
|
||||
public I2PSocketManager() {
|
||||
_session=null;
|
||||
_serverSocket = new I2PServerSocketImpl(this);
|
||||
_inSockets = new HashMap(16);
|
||||
_outSockets = new HashMap(16);
|
||||
}
|
||||
|
||||
public I2PSession getSession() {
|
||||
return _session;
|
||||
}
|
||||
|
||||
public void setSession(I2PSession session) {
|
||||
_session = session;
|
||||
if (session != null)
|
||||
session.setSessionListener(this);
|
||||
}
|
||||
|
||||
public void disconnected(I2PSession session) {
|
||||
_log.error("Disconnected from the session");
|
||||
}
|
||||
|
||||
public void errorOccurred(I2PSession session, String message, Throwable error) {
|
||||
_log.error("Error occurred: [" + message + "]", error);
|
||||
}
|
||||
|
||||
public void messageAvailable(I2PSession session, int msgId, long size) {
|
||||
try {
|
||||
I2PSocketImpl s;
|
||||
byte msg[] = session.receiveMessage(msgId);
|
||||
if (msg.length == 1 && msg[0] == -1) {
|
||||
_log.debug("Ping received");
|
||||
return;
|
||||
}
|
||||
if (msg.length <4) {
|
||||
_log.error("==== packet too short ====");
|
||||
return;
|
||||
}
|
||||
int type = msg[0] & 0xff;
|
||||
String id = new String(new byte[] {msg[1], msg[2], msg[3]},
|
||||
"ISO-8859-1");
|
||||
byte[] payload = new byte[msg.length-4];
|
||||
System.arraycopy(msg, 4, payload, 0, payload.length);
|
||||
_log.debug("Message read: type = [" + Integer.toHexString(type) +
|
||||
"] id = [" + getReadableForm(id)+
|
||||
"] payload length: " + payload.length + "]");
|
||||
synchronized(lock) {
|
||||
switch(type) {
|
||||
case 0x51: // ACK outgoing
|
||||
s = (I2PSocketImpl) _outSockets.get(id);
|
||||
if (s == null) {
|
||||
_log.warn("No socket responsible for ACK packet");
|
||||
return;
|
||||
}
|
||||
if (payload.length==3 && s.getRemoteID(false)==null) {
|
||||
String newID = new String(payload,
|
||||
"ISO-8859-1");
|
||||
s.setRemoteID(newID);
|
||||
return;
|
||||
} else {
|
||||
if (payload.length != 3)
|
||||
_log.warn("Ack packet had " + payload.length + " bytes");
|
||||
else
|
||||
_log.warn("Remote ID already exists? " + s.getRemoteID());
|
||||
return;
|
||||
}
|
||||
case 0x52: // disconnect outgoing
|
||||
_log.debug("*Disconnect outgoing!");
|
||||
try {
|
||||
s = (I2PSocketImpl) _outSockets.get(id);
|
||||
if (payload.length==0 && s != null) {
|
||||
s.internalClose();
|
||||
_outSockets.remove(id);
|
||||
return;
|
||||
} else {
|
||||
if (payload.length > 0)
|
||||
_log.warn("Disconnect packet had " + payload.length + " bytes");
|
||||
return;
|
||||
}
|
||||
} catch (Exception t) {
|
||||
_log.error("Ignoring error on disconnect", t);
|
||||
}
|
||||
case 0x50: // packet send outgoing
|
||||
_log.debug("*Packet send outgoing [" + payload.length + "]");
|
||||
s = (I2PSocketImpl) _outSockets.get(id);
|
||||
if (s != null) {
|
||||
s.queueData(payload);
|
||||
return;
|
||||
} else {
|
||||
_log.error("Null socket with data available");
|
||||
throw new IllegalStateException("Null socket with data available");
|
||||
}
|
||||
case 0xA1: // SYN incoming
|
||||
_log.debug("*Syn!");
|
||||
if (payload.length==PUBKEY_LENGTH) {
|
||||
String newLocalID = makeID(_inSockets);
|
||||
Destination d = new Destination();
|
||||
d.readBytes(new ByteArrayInputStream(payload));
|
||||
|
||||
s = new I2PSocketImpl(d, this, false,
|
||||
newLocalID);
|
||||
s.setRemoteID(id);
|
||||
if (_serverSocket.getNewSocket(s)) {
|
||||
_inSockets.put(newLocalID, s);
|
||||
byte[] packet = makePacket
|
||||
((byte)0x51, id,
|
||||
newLocalID.getBytes("ISO-8859-1"));
|
||||
boolean replySentOk = false;
|
||||
synchronized(_session) {
|
||||
replySentOk = _session.sendMessage(d, packet);
|
||||
}
|
||||
if (!replySentOk) {
|
||||
_log.error("Error sending reply to " +
|
||||
d.calculateHash().toBase64() +
|
||||
" in response to a new con message",
|
||||
new Exception("Failed creation"));
|
||||
s.internalClose();
|
||||
}
|
||||
} else {
|
||||
byte[] packet =
|
||||
(" "+id).getBytes("ISO-8859-1");
|
||||
packet[0]=0x52;
|
||||
boolean nackSent = session.sendMessage(d, packet);
|
||||
if (!nackSent) {
|
||||
_log.error("Error sending NACK for session creation");
|
||||
}
|
||||
s.internalClose();
|
||||
}
|
||||
return;
|
||||
} else {
|
||||
_log.error("Syn packet that has a payload not equal to the pubkey length (" + payload.length + " != " + PUBKEY_LENGTH + ")");
|
||||
return;
|
||||
}
|
||||
case 0xA2: // disconnect incoming
|
||||
_log.debug("*Disconnect incoming!");
|
||||
try {
|
||||
s = (I2PSocketImpl) _inSockets.get(id);
|
||||
if (payload.length==0 && s != null) {
|
||||
s.internalClose();
|
||||
_inSockets.remove(id);
|
||||
return;
|
||||
} else {
|
||||
if (payload.length > 0)
|
||||
_log.warn("Disconnect packet had " + payload.length + " bytes");
|
||||
return;
|
||||
}
|
||||
} catch (Exception t) {
|
||||
_log.error("Ignoring error on disconnect", t);
|
||||
return;
|
||||
}
|
||||
case 0xA0: // packet send incoming
|
||||
_log.debug("*Packet send incoming [" + payload.length + "]");
|
||||
s = (I2PSocketImpl) _inSockets.get(id);
|
||||
if (s != null) {
|
||||
s.queueData(payload);
|
||||
return;
|
||||
} else {
|
||||
_log.error("Null socket with data available");
|
||||
throw new IllegalStateException("Null socket with data available");
|
||||
}
|
||||
case 0xFF: // ignore
|
||||
return;
|
||||
}
|
||||
_log.error("\n\n=============== Unknown packet! "+
|
||||
"============"+
|
||||
"\nType: "+(int)type+
|
||||
"\nID: " + getReadableForm(id)+
|
||||
"\nBase64'ed Data: "+Base64.encode(payload)+
|
||||
"\n\n\n");
|
||||
if (id != null) {
|
||||
_inSockets.remove(id);
|
||||
_outSockets.remove(id);
|
||||
}
|
||||
}
|
||||
} catch (I2PException ise) {
|
||||
_log.error("Error processing", ise);
|
||||
} catch (IOException ioe) {
|
||||
_log.error("Error processing", ioe);
|
||||
} catch (IllegalStateException ise) {
|
||||
_log.debug("Error processing", ise);
|
||||
}
|
||||
}
|
||||
|
||||
public void reportAbuse(I2PSession session, int severity) {
|
||||
_log.error("Abuse reported [" + severity + "]");
|
||||
}
|
||||
|
||||
public void setDefaultOptions(I2PSocketOptions options) { _defaultOptions = options; }
|
||||
|
||||
public I2PSocketOptions getDefaultOptions() { return _defaultOptions ; }
|
||||
|
||||
public I2PServerSocket getServerSocket() { return _serverSocket; }
|
||||
|
||||
/**
|
||||
* Create a new connected socket (block until the socket is created)
|
||||
*
|
||||
* @throws I2PException if there is a problem connecting
|
||||
*/
|
||||
public I2PSocket connect(Destination peer, I2PSocketOptions options) throws I2PException {
|
||||
|
||||
String localID, lcID;
|
||||
I2PSocketImpl s;
|
||||
synchronized(lock) {
|
||||
localID=makeID(_outSockets);
|
||||
lcID=getReadableForm(localID);
|
||||
s = new I2PSocketImpl(peer, this, true, localID);
|
||||
_outSockets.put(s.getLocalID(),s);
|
||||
}
|
||||
try {
|
||||
ByteArrayOutputStream pubkey = new ByteArrayOutputStream();
|
||||
_session.getMyDestination().writeBytes(pubkey);
|
||||
String remoteID;
|
||||
byte[] packet = makePacket((byte)0xA1, localID,
|
||||
pubkey.toByteArray());
|
||||
boolean sent = false;
|
||||
synchronized(_session) {
|
||||
sent = _session.sendMessage(peer, packet);
|
||||
}
|
||||
if (!sent) {
|
||||
_log.info("Unable to send & receive ack for SYN packet");
|
||||
synchronized(lock) {
|
||||
_outSockets.remove(s.getLocalID());
|
||||
}
|
||||
throw new I2PException("Unable to reach peer");
|
||||
}
|
||||
remoteID = s.getRemoteID(true, options.getConnectTimeout());
|
||||
if ("".equals(remoteID)) {
|
||||
throw new I2PException("Unable to reach peer");
|
||||
}
|
||||
_log.debug("TIMING: s given out for remoteID "+getReadableForm(remoteID));
|
||||
return s;
|
||||
} catch (InterruptedIOException ioe) {
|
||||
_log.error("Timeout waiting for ack from syn for id " + getReadableForm(lcID), ioe);
|
||||
synchronized(lock) {
|
||||
_outSockets.remove(s.getLocalID());
|
||||
}
|
||||
throw new I2PException("Timeout waiting for ack");
|
||||
} catch (IOException ex) {
|
||||
_log.error("Error sending syn on id " + getReadableForm(lcID), ex);
|
||||
synchronized(lock) {
|
||||
_outSockets.remove(s.getLocalID());
|
||||
}
|
||||
throw new I2PException("IOException occurred");
|
||||
} catch (I2PException ex) {
|
||||
_log.info("Error sending syn on id " + getReadableForm(lcID), ex);
|
||||
synchronized(lock) {
|
||||
_outSockets.remove(s.getLocalID());
|
||||
}
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
public I2PSocket connect(Destination peer) throws I2PException {
|
||||
return connect(peer, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve a set of currently connected I2PSockets, either initiated locally or remotely.
|
||||
*
|
||||
*/
|
||||
public Set listSockets() {
|
||||
Set sockets = new HashSet(8);
|
||||
synchronized (lock) {
|
||||
sockets.addAll(_inSockets.values());
|
||||
sockets.addAll(_outSockets.values());
|
||||
}
|
||||
return sockets;
|
||||
}
|
||||
|
||||
/**
|
||||
* Ping the specified peer, returning true if they replied to the ping within
|
||||
* the timeout specified, false otherwise. This call blocks.
|
||||
*
|
||||
*/
|
||||
public boolean ping(Destination peer, long timeoutMs) {
|
||||
try {
|
||||
return _session.sendMessage(peer, new byte[] {(byte)0xFF});
|
||||
} catch (I2PException ex) {
|
||||
_log.error("I2PException:",ex);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public void removeSocket(I2PSocketImpl sock) {
|
||||
synchronized(lock) {
|
||||
_inSockets.remove(sock.getLocalID());
|
||||
_outSockets.remove(sock.getLocalID());
|
||||
}
|
||||
}
|
||||
|
||||
public static String getReadableForm(String id) {
|
||||
try {
|
||||
if (id.length() != 3) return "Bogus";
|
||||
return Base64.encode(id.getBytes("ISO-8859-1"));
|
||||
} catch (UnsupportedEncodingException ex) {
|
||||
ex.printStackTrace();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new part the connection ID that is locally unique
|
||||
*
|
||||
* @param uniqueIn map of already known local IDs so we don't collide. WARNING - NOT THREADSAFE!
|
||||
*/
|
||||
public static String makeID(HashMap uniqueIn) {
|
||||
String newID;
|
||||
try {
|
||||
do {
|
||||
int id = (int)(Math.random()*16777215+1);
|
||||
byte[] nid = new byte[3];
|
||||
nid[0]=(byte)(id / 65536);
|
||||
nid[1] = (byte)((id/256) % 256);
|
||||
nid[2]= (byte)(id %256);
|
||||
newID = new String(nid, "ISO-8859-1");
|
||||
} while (uniqueIn.get(newID) != null);
|
||||
return newID;
|
||||
} catch (UnsupportedEncodingException ex) {
|
||||
ex.printStackTrace();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new packet of the given type for the specified connection containing
|
||||
* the given payload
|
||||
*/
|
||||
public static byte[] makePacket(byte type, String id, byte[] payload) {
|
||||
try {
|
||||
byte[] packet = new byte[payload.length+4];
|
||||
packet[0]=type;
|
||||
byte[] temp = id.getBytes("ISO-8859-1");
|
||||
if (temp.length != 3)
|
||||
throw new RuntimeException("Incorrect ID length: "+
|
||||
temp.length);
|
||||
System.arraycopy(temp,0,packet,1,3);
|
||||
System.arraycopy(payload,0,packet,4,payload.length);
|
||||
return packet;
|
||||
} catch (UnsupportedEncodingException ex) {
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("Error building the packet", ex);
|
||||
return new byte[0];
|
||||
}
|
||||
}
|
||||
}
|
@@ -0,0 +1,86 @@
|
||||
package net.i2p.client.streaming;
|
||||
|
||||
import net.i2p.client.I2PSession;
|
||||
import net.i2p.client.I2PClient;
|
||||
import net.i2p.client.I2PClientFactory;
|
||||
import net.i2p.client.I2PSessionException;
|
||||
import net.i2p.I2PException;
|
||||
import net.i2p.data.Destination;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* Simplify the creation of I2PSession and transient I2P Destination objects if
|
||||
* necessary to create a socket manager. This class is most likely how classes
|
||||
* will begin their use of the socket library
|
||||
*
|
||||
*/
|
||||
public class I2PSocketManagerFactory {
|
||||
private final static Log _log = new Log(I2PSocketManagerFactory.class);
|
||||
|
||||
/**
|
||||
* Create a socket manager using a brand new destination connected to the
|
||||
* I2CP router on the local machine on the default port (7654).
|
||||
*
|
||||
* @return the newly created socket manager, or null if there were errors
|
||||
*/
|
||||
public static I2PSocketManager createManager() {
|
||||
return createManager("localhost", 7654, new Properties());
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a socket manager using a brand new destination connected to the
|
||||
* I2CP router on the given machine reachable through the given port.
|
||||
*
|
||||
* @return the newly created socket manager, or null if there were errors
|
||||
*/
|
||||
public static I2PSocketManager createManager(String i2cpHost, int i2cpPort, Properties opts) {
|
||||
I2PClient client = I2PClientFactory.createClient();
|
||||
ByteArrayOutputStream keyStream = new ByteArrayOutputStream(512);
|
||||
try {
|
||||
Destination dest = client.createDestination(keyStream);
|
||||
ByteArrayInputStream in = new ByteArrayInputStream(keyStream.toByteArray());
|
||||
return createManager(in, i2cpHost, i2cpPort, opts);
|
||||
} catch (IOException ioe) {
|
||||
_log.error("Error creating the destination for socket manager", ioe);
|
||||
return null;
|
||||
} catch (I2PException ie) {
|
||||
_log.error("Error creating the destination for socket manager", ie);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a socket manager using the destination loaded from the given private key
|
||||
* stream and connected to the I2CP router on the specified machine on the given
|
||||
* port
|
||||
*
|
||||
* @return the newly created socket manager, or null if there were errors
|
||||
*/
|
||||
public static I2PSocketManager createManager(InputStream myPrivateKeyStream, String i2cpHost, int i2cpPort, Properties opts) {
|
||||
I2PClient client = I2PClientFactory.createClient();
|
||||
opts.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_GUARANTEED);
|
||||
opts.setProperty(I2PClient.PROP_TCP_HOST, i2cpHost);
|
||||
opts.setProperty(I2PClient.PROP_TCP_PORT, ""+i2cpPort);
|
||||
try {
|
||||
I2PSession session = client.createSession(myPrivateKeyStream, opts);
|
||||
session.connect();
|
||||
return createManager(session);
|
||||
} catch (I2PSessionException ise) {
|
||||
_log.error("Error creating session for socket manager", ise);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private static I2PSocketManager createManager(I2PSession session) {
|
||||
I2PSocketManager mgr = new I2PSocketManager();
|
||||
mgr.setSession(session);
|
||||
return mgr;
|
||||
}
|
||||
}
|
@@ -0,0 +1,21 @@
|
||||
package net.i2p.client.streaming;
|
||||
|
||||
/**
|
||||
* Define the configuration for streaming and verifying data on the socket.
|
||||
* No options available...
|
||||
*
|
||||
*/
|
||||
public class I2PSocketOptions {
|
||||
private long _connectTimeout;
|
||||
public I2PSocketOptions() {
|
||||
_connectTimeout = -1;
|
||||
}
|
||||
|
||||
/**
|
||||
* How long we will wait for the ACK from a SYN, in milliseconds.
|
||||
*
|
||||
* @return milliseconds to wait, or -1 if we will wait indefinitely
|
||||
*/
|
||||
public long getConnectTimeout() { return _connectTimeout; }
|
||||
public void setConnectTimeout(long ms) { _connectTimeout = ms; }
|
||||
}
|
Reference in New Issue
Block a user