i2ptunnel: Improve error handling for UDP tunnels,

Sink.send() may now throw RuntimeException,
converted from IOException or I2PSessionException;
interrupt runner threads on error;
ignore I2PSessionException in Pinger.close();
logging and javadoc improvements;
untested
This commit is contained in:
zzz
2015-05-09 22:22:13 +00:00
parent 5ee6826241
commit 7f30f481b2
16 changed files with 106 additions and 25 deletions

View File

@@ -22,6 +22,10 @@ public class MultiSink<S extends Sink> implements Source, Sink {
public void start() {} public void start() {}
/**
* May throw RuntimeException from underlying sinks
* @throws RuntimeException
*/
public void send(Destination from, byte[] data) { public void send(Destination from, byte[] data) {
Sink s = this.cache.get(from); Sink s = this.cache.get(from);
if (s == null) { if (s == null) {

View File

@@ -23,6 +23,10 @@ public class ReplyTracker<S extends Sink> implements Source, Sink {
public void start() {} public void start() {}
/**
* May throw RuntimeException from underlying sink
* @throws RuntimeException
*/
public void send(Destination to, byte[] data) { public void send(Destination to, byte[] data) {
this.cache.put(to, this.reply); this.cache.put(to, this.reply);
this.sink.send(to, data); this.sink.send(to, data);

View File

@@ -64,6 +64,10 @@ public class SOCKSUDPPort implements Source, Sink {
this.udpsource.stop(); this.udpsource.stop();
} }
/**
* May throw RuntimeException from underlying sink
* @throws RuntimeException
*/
public void send(Destination from, byte[] data) { public void send(Destination from, byte[] data) {
this.wrapper.send(from, data); this.wrapper.send(from, data);
} }

View File

@@ -30,6 +30,8 @@ public class SOCKSUDPUnwrapper implements Source, Sink {
/** /**
* *
* May throw RuntimeException from underlying sink
* @throws RuntimeException
*/ */
public void send(Destination ignored_from, byte[] data) { public void send(Destination ignored_from, byte[] data) {
SOCKSHeader h; SOCKSHeader h;

View File

@@ -25,6 +25,8 @@ public class SOCKSUDPWrapper implements Source, Sink {
/** /**
* Use the cached header, which should have the host string and port * Use the cached header, which should have the host string and port
* *
* May throw RuntimeException from underlying sink
* @throws RuntimeException
*/ */
public void send(Destination from, byte[] data) { public void send(Destination from, byte[] data) {
if (this.sink == null) if (this.sink == null)

View File

@@ -27,6 +27,10 @@ public class MultiSource implements Source, Sink {
this.sinks.clear(); this.sinks.clear();
} }
/**
* May throw RuntimeException from underlying sinks
* @throws RuntimeException
*/
public void send(Destination ignored_from, byte[] data) { public void send(Destination ignored_from, byte[] data) {
for(Destination dest : this.sinks) { for(Destination dest : this.sinks) {
this.sink.send(dest, data); this.sink.send(dest, data);

View File

@@ -1,7 +1,9 @@
package net.i2p.i2ptunnel.streamr; package net.i2p.i2ptunnel.streamr;
import net.i2p.i2ptunnel.udp.*; import net.i2p.i2ptunnel.udp.*;
import net.i2p.I2PAppContext;
import net.i2p.util.I2PAppThread; import net.i2p.util.I2PAppThread;
import net.i2p.util.Log;
/** /**
* *
@@ -31,7 +33,9 @@ public class Pinger implements Source, Runnable {
// send unsubscribe-message // send unsubscribe-message
byte[] data = new byte[1]; byte[] data = new byte[1];
data[0] = 1; data[0] = 1;
try {
this.sink.send(null, data); this.sink.send(null, data);
} catch (RuntimeException re) {}
} }
public void run() { public void run() {
@@ -41,7 +45,14 @@ public class Pinger implements Source, Runnable {
int i = 0; int i = 0;
while(this.running) { while(this.running) {
//System.out.print("p"); //System.out.print("p");
try {
this.sink.send(null, data); this.sink.send(null, data);
} catch (RuntimeException re) {
Log log = I2PAppContext.getGlobalContext().logManager().getLog(getClass());
if (log.shouldWarn())
log.warn("error sending", re);
break;
}
synchronized(this.waitlock) { synchronized(this.waitlock) {
int delay = 10000; int delay = 10000;
if (i < 5) { if (i < 5) {
@@ -50,7 +61,9 @@ public class Pinger implements Source, Runnable {
} }
try { try {
this.waitlock.wait(delay); this.waitlock.wait(delay);
} catch(InterruptedException ie) {} } catch(InterruptedException ie) {
break;
}
} }
} }
} }

View File

@@ -2,9 +2,11 @@ package net.i2p.i2ptunnel.streamr;
import java.util.Set; import java.util.Set;
import net.i2p.I2PAppContext;
import net.i2p.data.Destination; import net.i2p.data.Destination;
import net.i2p.i2ptunnel.udp.*; import net.i2p.i2ptunnel.udp.*;
import net.i2p.util.ConcurrentHashSet; import net.i2p.util.ConcurrentHashSet;
import net.i2p.util.Log;
/** /**
* server-mode * server-mode
@@ -19,10 +21,18 @@ public class Subscriber implements Sink {
this.subscriptions = new ConcurrentHashSet<Destination>(); this.subscriptions = new ConcurrentHashSet<Destination>();
} }
/**
* Doesn't really "send" anywhere, just subscribes or unsubscribes the destination
*
* @param dest to subscribe or unsubscribe
* @param data must be a single byte, 0 to subscribe, 1 to unsubscribe
*/
public void send(Destination dest, byte[] data) { public void send(Destination dest, byte[] data) {
if(dest == null || data.length < 1) { if(dest == null || data.length < 1) {
// invalid packet // invalid packet
// TODO: write to log Log log = I2PAppContext.getGlobalContext().logManager().getLog(getClass());
if (log.shouldWarn())
log.warn("bad subscription from " + dest);
} else { } else {
byte ctrl = data[0]; byte ctrl = data[0];
if(ctrl == 0) { if(ctrl == 0) {
@@ -40,7 +50,9 @@ public class Subscriber implements Sink {
multi.remove(dest); multi.remove(dest);
} else { } else {
// invalid packet // invalid packet
// TODO: write to log Log log = I2PAppContext.getGlobalContext().logManager().getLog(getClass());
if (log.shouldWarn())
log.warn("bad subscription from " + dest);
} }
} }
} }

View File

@@ -32,7 +32,10 @@ public class I2PSink implements Sink {
} }
} }
/** @param src ignored */ /**
* @param src ignored
* @throws RuntimeException if session is closed
*/
public synchronized void send(Destination src, byte[] data) { public synchronized void send(Destination src, byte[] data) {
//System.out.print("w"); //System.out.print("w");
// create payload // create payload
@@ -49,9 +52,8 @@ public class I2PSink implements Sink {
this.sess.sendMessage(this.dest, payload, this.sess.sendMessage(this.dest, payload,
(this.raw ? I2PSession.PROTO_DATAGRAM_RAW : I2PSession.PROTO_DATAGRAM), (this.raw ? I2PSession.PROTO_DATAGRAM_RAW : I2PSession.PROTO_DATAGRAM),
I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED); I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED);
} catch(I2PSessionException exc) { } catch (I2PSessionException ise) {
// TODO: handle better throw new RuntimeException("failed to send data", ise);
exc.printStackTrace();
} }
} }

View File

@@ -31,7 +31,10 @@ public class I2PSinkAnywhere implements Sink {
} }
} }
/** @param to - where it's going */ /**
* @param to - where it's going
* @throws RuntimeException if session is closed
*/
public synchronized void send(Destination to, byte[] data) { public synchronized void send(Destination to, byte[] data) {
// create payload // create payload
byte[] payload; byte[] payload;
@@ -47,9 +50,8 @@ public class I2PSinkAnywhere implements Sink {
this.sess.sendMessage(to, payload, this.sess.sendMessage(to, payload,
(this.raw ? I2PSession.PROTO_DATAGRAM_RAW : I2PSession.PROTO_DATAGRAM), (this.raw ? I2PSession.PROTO_DATAGRAM_RAW : I2PSession.PROTO_DATAGRAM),
I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED); I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED);
} catch(I2PSessionException exc) { } catch (I2PSessionException ise) {
// TODO: handle better throw new RuntimeException("failed to send data", ise);
exc.printStackTrace();
} }
} }

View File

@@ -3,10 +3,12 @@ package net.i2p.i2ptunnel.udp;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import net.i2p.I2PAppContext;
import net.i2p.client.I2PSession; import net.i2p.client.I2PSession;
import net.i2p.client.I2PSessionListener; import net.i2p.client.I2PSessionListener;
import net.i2p.client.datagram.I2PDatagramDissector; import net.i2p.client.datagram.I2PDatagramDissector;
import net.i2p.util.I2PAppThread; import net.i2p.util.I2PAppThread;
import net.i2p.util.Log;
/** /**
* *
@@ -48,7 +50,8 @@ public class I2PSource implements Source, Runnable {
public void run() { public void run() {
// create dissector // create dissector
I2PDatagramDissector diss = new I2PDatagramDissector(); I2PDatagramDissector diss = new I2PDatagramDissector();
while(true) { _running = true;
while (_running) {
try { try {
// get id // get id
int id = this.queue.take(); int id = this.queue.take();
@@ -71,7 +74,10 @@ public class I2PSource implements Source, Runnable {
} }
//System.out.print("r"); //System.out.print("r");
} catch(Exception e) { } catch(Exception e) {
e.printStackTrace(); Log log = I2PAppContext.getGlobalContext().logManager().getLog(getClass());
if (log.shouldWarn())
log.warn("error sending", e);
break;
} }
} }
} }
@@ -91,11 +97,15 @@ public class I2PSource implements Source, Runnable {
} }
public void disconnected(I2PSession arg0) { public void disconnected(I2PSession arg0) {
// ignore _running = false;
thread.interrupt();
} }
public void errorOccurred(I2PSession arg0, String arg1, Throwable arg2) { public void errorOccurred(I2PSession arg0, String arg1, Throwable arg2) {
// ignore Log log = I2PAppContext.getGlobalContext().logManager().getLog(getClass());
log.error(arg1, arg2);
_running = false;
thread.interrupt();
} }
} }
@@ -106,4 +116,5 @@ public class I2PSource implements Source, Runnable {
protected final Thread thread; protected final Thread thread;
protected final boolean verify; protected final boolean verify;
protected final boolean raw; protected final boolean raw;
private volatile boolean _running;
} }

View File

@@ -7,5 +7,9 @@ import net.i2p.data.Destination;
* @author welterde * @author welterde
*/ */
public interface Sink { public interface Sink {
/**
* @param src some implementations may ignore
* @throws RuntimeException in some implementations
*/
public void send(Destination src, byte[] data); public void send(Destination src, byte[] data);
} }

View File

@@ -1,5 +1,6 @@
package net.i2p.i2ptunnel.udp; package net.i2p.i2ptunnel.udp;
import java.io.IOException;
import java.net.DatagramSocket; import java.net.DatagramSocket;
import java.net.DatagramPacket; import java.net.DatagramPacket;
import java.net.InetAddress; import java.net.InetAddress;
@@ -12,13 +13,16 @@ import net.i2p.data.Destination;
*/ */
public class UDPSink implements Sink { public class UDPSink implements Sink {
/**
* @param src ignored
* @throws IllegalArgumentException on DatagramSocket IOException
*/
public UDPSink(InetAddress host, int port) { public UDPSink(InetAddress host, int port) {
// create socket // create socket
try { try {
this.sock = new DatagramSocket(); this.sock = new DatagramSocket();
} catch(Exception e) { } catch (IOException e) {
// TODO: fail better throw new IllegalArgumentException("failed to open udp-socket", e);
throw new RuntimeException("failed to open udp-socket", e);
} }
this.remoteHost = host; this.remoteHost = host;
@@ -27,6 +31,10 @@ public class UDPSink implements Sink {
this.remotePort = port; this.remotePort = port;
} }
/**
* @param src ignored
* @throws RuntimeException on DatagramSocket IOException
*/
public void send(Destination src, byte[] data) { public void send(Destination src, byte[] data) {
// if data.length > this.sock.getSendBufferSize() ... // if data.length > this.sock.getSendBufferSize() ...
@@ -36,9 +44,8 @@ public class UDPSink implements Sink {
// send packet // send packet
try { try {
this.sock.send(packet); this.sock.send(packet);
} catch(Exception e) { } catch (IOException ioe) {
// TODO: fail a bit better throw new RuntimeException("failed to send data", ioe);
e.printStackTrace();
} }
} }

View File

@@ -1,9 +1,12 @@
package net.i2p.i2ptunnel.udp; package net.i2p.i2ptunnel.udp;
import java.io.IOException;
import java.net.DatagramSocket; import java.net.DatagramSocket;
import java.net.DatagramPacket; import java.net.DatagramPacket;
import net.i2p.I2PAppContext;
import net.i2p.util.I2PAppThread; import net.i2p.util.I2PAppThread;
import net.i2p.util.Log;
/** /**
* *
@@ -12,11 +15,14 @@ import net.i2p.util.I2PAppThread;
public class UDPSource implements Source, Runnable { public class UDPSource implements Source, Runnable {
public static final int MAX_SIZE = 15360; public static final int MAX_SIZE = 15360;
/**
* @throws RuntimeException on DatagramSocket IOException
*/
public UDPSource(int port) { public UDPSource(int port) {
// create udp-socket // create udp-socket
try { try {
this.sock = new DatagramSocket(port); this.sock = new DatagramSocket(port);
} catch(Exception e) { } catch (IOException e) {
throw new RuntimeException("failed to listen...", e); throw new RuntimeException("failed to listen...", e);
} }
@@ -57,7 +63,9 @@ public class UDPSource implements Source, Runnable {
this.sink.send(null, nbuf); this.sink.send(null, nbuf);
//System.out.print("i"); //System.out.print("i");
} catch(Exception e) { } catch(Exception e) {
e.printStackTrace(); Log log = I2PAppContext.getGlobalContext().logManager().getLog(getClass());
if (log.shouldWarn())
log.warn("error sending", e);
break; break;
} }
} }

View File

@@ -180,6 +180,7 @@ import net.i2p.util.EventDispatcher;
* *
* @param to - ignored if configured for a single destination * @param to - ignored if configured for a single destination
* (we use the dest specified in the constructor) * (we use the dest specified in the constructor)
* @throws RuntimeException if session is closed
*/ */
public void send(Destination to, byte[] data) { public void send(Destination to, byte[] data) {
_i2pSink.send(to, data); _i2pSink.send(to, data);

View File

@@ -195,6 +195,7 @@ public class I2PTunnelUDPServerBase extends I2PTunnelTask implements Source, Sin
* Sink Methods * Sink Methods
* *
* @param to * @param to
* @throws RuntimeException if session is closed
* *
*/ */
public void send(Destination to, byte[] data) { public void send(Destination to, byte[] data) {