diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnel.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnel.java index 069199ef1..3c5c187b3 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnel.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnel.java @@ -49,6 +49,7 @@ import java.util.List; import java.util.Properties; import java.util.Set; import java.util.StringTokenizer; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; import net.i2p.I2PAppContext; @@ -70,16 +71,18 @@ import net.i2p.util.EventDispatcherImpl; import net.i2p.util.Log; /** + * An I2PTunnel tracks one or more I2PTunnelTasks and one or more I2PSessions. + * Usually one of each. + * * Todo: Most events are not listened to elsewhere, so error propagation is poor */ -public class I2PTunnel implements Logging, EventDispatcher { +public class I2PTunnel extends EventDispatcherImpl implements Logging { private final Log _log; - private final EventDispatcherImpl _event; private final I2PAppContext _context; private static long __tunnelId = 0; private final long _tunnelId; private final Properties _clientOptions; - private final List _sessions; + private final Set _sessions; public static final int PACKET_DELAY = 100; @@ -96,7 +99,7 @@ public class I2PTunnel implements Logging, EventDispatcher { private static final String nocli_args[] = { "-nocli", "-die"}; - private final List tasks = new ArrayList(); + private final List tasks = new ArrayList(); private int next_task_id = 1; private final Set listeners = new CopyOnWriteArraySet(); @@ -114,14 +117,14 @@ public class I2PTunnel implements Logging, EventDispatcher { } public I2PTunnel(String[] args, ConnectionEventListener lsnr) { + super(); _context = I2PAppContext.getGlobalContext(); // new I2PAppContext(); _tunnelId = ++__tunnelId; _log = _context.logManager().getLog(I2PTunnel.class); - _event = new EventDispatcherImpl(); // as of 0.8.4, include context properties Properties p = _context.getProperties(); _clientOptions = p; - _sessions = new ArrayList(1); + _sessions = new CopyOnWriteArraySet(); addConnectionEventListener(lsnr); boolean gui = true; @@ -193,22 +196,17 @@ public class I2PTunnel implements Logging, EventDispatcher { /** @return non-null */ List getSessions() { - synchronized (_sessions) { return new ArrayList(_sessions); - } } + void addSession(I2PSession session) { if (session == null) return; - synchronized (_sessions) { - if (!_sessions.contains(session)) - _sessions.add(session); - } + _sessions.add(session); } + void removeSession(I2PSession session) { if (session == null) return; - synchronized (_sessions) { - _sessions.remove(session); - } + _sessions.remove(session); } public Properties getClientOptions() { return _clientOptions; } @@ -218,9 +216,7 @@ public class I2PTunnel implements Logging, EventDispatcher { if (tsk.isOpen()) { tsk.setId(next_task_id); next_task_id++; - synchronized (tasks) { - tasks.add(tsk); - } + tasks.add(tsk); } } @@ -1261,10 +1257,8 @@ public class I2PTunnel implements Logging, EventDispatcher { */ public void runQuit(Logging l) { purgetasks(l); - synchronized (tasks) { - if (tasks.isEmpty()) { - System.exit(0); - } + if (tasks.isEmpty()) { + System.exit(0); } l.log("There are running tasks. Try 'list'."); notifyEvent("quitResult", "error"); @@ -1280,11 +1274,8 @@ public class I2PTunnel implements Logging, EventDispatcher { */ public void runList(Logging l) { purgetasks(l); - synchronized (tasks) { - for (int i = 0; i < tasks.size(); i++) { - I2PTunnelTask t = (I2PTunnelTask) tasks.get(i); - l.log("[" + t.getId() + "] " + t.toString()); - } + for (I2PTunnelTask t : tasks) { + l.log("[" + t.getId() + "] " + t.toString()); } notifyEvent("listDone", "done"); } @@ -1313,14 +1304,8 @@ public class I2PTunnel implements Logging, EventDispatcher { argindex++; } if (args[argindex].equalsIgnoreCase("all")) { - List curTasks = null; - synchronized (tasks) { - curTasks = new LinkedList(tasks); - } - boolean error = false; - for (int i = 0; i < curTasks.size(); i++) { - I2PTunnelTask t = (I2PTunnelTask) curTasks.get(i); + for (I2PTunnelTask t : tasks) { if (!closetask(t, forced, l)) { notifyEvent("closeResult", "error"); error = true; @@ -1442,9 +1427,7 @@ public class I2PTunnel implements Logging, EventDispatcher { boolean closed = false; _log.debug(getPrefix() + "closetask(): looking for task " + num); - synchronized (tasks) { - for (Iterator it = tasks.iterator(); it.hasNext();) { - I2PTunnelTask t = (I2PTunnelTask) it.next(); + for (I2PTunnelTask t : tasks) { int id = t.getId(); if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "closetask(): parsing task " + id + " (" + t.toString() + ")"); @@ -1454,7 +1437,6 @@ public class I2PTunnel implements Logging, EventDispatcher { } else if (id > num) { break; } - } } return closed; } @@ -1482,15 +1464,14 @@ public class I2PTunnel implements Logging, EventDispatcher { * */ private void purgetasks(Logging l) { - synchronized (tasks) { - for (Iterator it = tasks.iterator(); it.hasNext();) { - I2PTunnelTask t = (I2PTunnelTask) it.next(); + List removed = new ArrayList(); + for (I2PTunnelTask t : tasks) { if (!t.isOpen()) { _log.debug(getPrefix() + "Purging inactive tunnel: [" + t.getId() + "] " + t.toString()); - it.remove(); + removed.add(t); } } - } + tasks.removeAll(removed); } /** @@ -1657,41 +1638,4 @@ public class I2PTunnel implements Logging, EventDispatcher { public interface ConnectionEventListener { public void routerDisconnected(); } - - /* Required by the EventDispatcher interface */ - public EventDispatcher getEventDispatcher() { - return _event; - } - - public void attachEventDispatcher(EventDispatcher e) { - _event.attachEventDispatcher(e.getEventDispatcher()); - } - - public void detachEventDispatcher(EventDispatcher e) { - _event.detachEventDispatcher(e.getEventDispatcher()); - } - - public void notifyEvent(String e, Object a) { - _event.notifyEvent(e, a); - } - - public Object getEventValue(String n) { - return _event.getEventValue(n); - } - - public Set getEvents() { - return _event.getEvents(); - } - - public void ignoreEvents() { - _event.ignoreEvents(); - } - - public void unIgnoreEvents() { - _event.unIgnoreEvents(); - } - - public Object waitEventValue(String n) { - return _event.waitEventValue(n); - } } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java index 4c7076b8d..fa7b5e2e1 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java @@ -51,7 +51,7 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna protected long _clientId; protected final Object sockLock = new Object(); // Guards sockMgr and mySockets protected I2PSocketManager sockMgr; // should be final and use a factory. LINT - protected List mySockets = new ArrayList(); + protected final List mySockets = new ArrayList(); protected boolean _ownDest; protected Destination dest = null; @@ -59,7 +59,7 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna private boolean listenerReady = false; - private ServerSocket ss; + protected ServerSocket ss; private final Object startLock = new Object(); private boolean startRunning = false; @@ -196,7 +196,10 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna // no need to load the netDb with leaseSets for destinations that will never // be looked up - tunnel.getClientOptions().setProperty("i2cp.dontPublishLeaseSet", "true"); + boolean dccEnabled = (this instanceof I2PTunnelIRCClient) && + Boolean.valueOf(tunnel.getClientOptions().getProperty(I2PTunnelIRCClient.PROP_DCC)).booleanValue(); + if (!dccEnabled) + tunnel.getClientOptions().setProperty("i2cp.dontPublishLeaseSet", "true"); boolean openNow = !Boolean.valueOf(tunnel.getClientOptions().getProperty("i2cp.delayOpen")).booleanValue(); if (openNow) { @@ -683,11 +686,11 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna synchronized (sockLock) { if (sockMgr != null) { mySockets.retainAll(sockMgr.listSockets()); - if (!forced && mySockets.size() != 0) { - l.log("There are still active connections!"); + if ((!forced) && (!mySockets.isEmpty())) { + l.log("Not closing, there are still active connections!"); _log.debug("can't close: there are still active connections!"); - for (Iterator it = mySockets.iterator(); it.hasNext();) { - l.log("->" + it.next()); + for (I2PSocket s : mySockets) { + l.log(" -> " + s.toString()); } return false; } @@ -703,7 +706,8 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna try { if (ss != null) ss.close(); } catch (IOException ex) { - ex.printStackTrace(); + if (_log.shouldLog(Log.WARN)) + _log.warn("error closing", ex); return false; } //l.log("Client closed."); diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClientRunner.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClientRunner.java index 6afc940f7..648b54266 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClientRunner.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClientRunner.java @@ -25,7 +25,8 @@ import net.i2p.util.Log; */ public class I2PTunnelHTTPClientRunner extends I2PTunnelRunner { private Log _log; - public I2PTunnelHTTPClientRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData, List sockList, Runnable onTimeout) { + public I2PTunnelHTTPClientRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData, + List sockList, Runnable onTimeout) { super(s, i2ps, slock, initialI2PData, sockList, onTimeout); _log = I2PAppContext.getGlobalContext().logManager().getLog(I2PTunnelHTTPClientRunner.class); } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCClient.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCClient.java index ebf6cc0a1..a329ad704 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCClient.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCClient.java @@ -10,7 +10,13 @@ import java.util.List; import java.util.StringTokenizer; import net.i2p.client.streaming.I2PSocket; +import net.i2p.data.Base32; import net.i2p.data.Destination; +import net.i2p.i2ptunnel.irc.DCCClientManager; +import net.i2p.i2ptunnel.irc.DCCHelper; +import net.i2p.i2ptunnel.irc.I2PTunnelDCCServer; +import net.i2p.i2ptunnel.irc.IrcInboundFilter; +import net.i2p.i2ptunnel.irc.IrcOutboundFilter; import net.i2p.util.EventDispatcher; import net.i2p.util.I2PAppThread; import net.i2p.util.Log; @@ -18,7 +24,7 @@ import net.i2p.util.Log; /** * Todo: Can we extend I2PTunnelClient instead and remove some duplicated code? */ -public class I2PTunnelIRCClient extends I2PTunnelClientBase implements Runnable { +public class I2PTunnelIRCClient extends I2PTunnelClientBase { /** used to assign unique IDs to the threads / clients. no logic or functionality */ private static volatile long __clientId = 0; @@ -27,6 +33,14 @@ public class I2PTunnelIRCClient extends I2PTunnelClientBase implements Runnable protected List dests; private static final long DEFAULT_READ_TIMEOUT = 5*60*1000; // -1 protected long readTimeout = DEFAULT_READ_TIMEOUT; + private final boolean _dccEnabled; + private I2PTunnelDCCServer _DCCServer; + private DCCClientManager _DCCClientManager; + + /** + * @since 0.8.9 + */ + public static final String PROP_DCC = "i2ptunnel.ircclient.enableDCC"; /** * @throws IllegalArgumentException if the I2PTunnel does not contain @@ -75,23 +89,28 @@ public class I2PTunnelIRCClient extends I2PTunnelClientBase implements Runnable setName("IRC Client on " + tunnel.listenHost + ':' + localPort); + _dccEnabled = Boolean.valueOf(tunnel.getClientOptions().getProperty(PROP_DCC)).booleanValue(); + // TODO add some prudent tunnel options (or is it too late?) + startRunning(); notifyEvent("openIRCClientResult", "ok"); } protected void clientConnectionRun(Socket s) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("got a connection."); + if (_log.shouldLog(Log.INFO)) + _log.info("New connection local addr is: " + s.getLocalAddress() + + " from: " + s.getInetAddress()); Destination clientDest = pickDestination(); I2PSocket i2ps = null; try { i2ps = createI2PSocket(clientDest); i2ps.setReadTimeout(readTimeout); StringBuffer expectedPong = new StringBuffer(); - Thread in = new I2PAppThread(new IrcInboundFilter(s,i2ps, expectedPong, _log), "IRC Client " + __clientId + " in", true); + DCCHelper dcc = _dccEnabled ? new DCC(s.getLocalAddress().getAddress()) : null; + Thread in = new I2PAppThread(new IrcInboundFilter(s,i2ps, expectedPong, _log, dcc), "IRC Client " + __clientId + " in", true); in.start(); - Thread out = new I2PAppThread(new IrcOutboundFilter(s,i2ps, expectedPong, _log), "IRC Client " + __clientId + " out", true); + Thread out = new I2PAppThread(new IrcOutboundFilter(s,i2ps, expectedPong, _log, dcc), "IRC Client " + __clientId + " out", true); out.start(); } catch (Exception ex) { if (_log.shouldLog(Log.ERROR)) @@ -120,388 +139,112 @@ public class I2PTunnelIRCClient extends I2PTunnelClientBase implements Runnable return dests.get(index); } - /************************************************************************* - * - */ - public static class IrcInboundFilter implements Runnable { - - private final Socket local; - private final I2PSocket remote; - private final StringBuffer expectedPong; - private final Log _log; - - public IrcInboundFilter(Socket _local, I2PSocket _remote, StringBuffer pong, Log log) { - local=_local; - remote=_remote; - expectedPong=pong; - _log = log; - } - - public void run() { - // Todo: Don't use BufferedReader - IRC spec limits line length to 512 but... - BufferedReader in; - OutputStream output; - try { - in = new BufferedReader(new InputStreamReader(remote.getInputStream(), "ISO-8859-1")); - output=local.getOutputStream(); - } catch (IOException e) { - if (_log.shouldLog(Log.ERROR)) - _log.error("IrcInboundFilter: no streams",e); - return; + @Override + public boolean close(boolean forced) { + synchronized(this) { + if (_DCCServer != null) { + _DCCServer.close(forced); + _DCCServer = null; } - if (_log.shouldLog(Log.DEBUG)) - _log.debug("IrcInboundFilter: Running."); - try { - while(true) - { - try { - String inmsg = in.readLine(); - if(inmsg==null) - break; - if(inmsg.endsWith("\r")) - inmsg=inmsg.substring(0,inmsg.length()-1); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("in: [" + inmsg + "]"); - String outmsg = inboundFilter(inmsg, expectedPong); - if(outmsg!=null) - { - if(!inmsg.equals(outmsg)) { - if (_log.shouldLog(Log.WARN)) { - _log.warn("inbound FILTERED: "+outmsg); - _log.warn(" - inbound was: "+inmsg); - } - } else { - if (_log.shouldLog(Log.INFO)) - _log.info("inbound: "+outmsg); - } - outmsg=outmsg+"\r\n"; // rfc1459 sec. 2.3 - output.write(outmsg.getBytes("ISO-8859-1")); - // probably doesn't do much but can't hurt - output.flush(); - } else { - if (_log.shouldLog(Log.WARN)) - _log.warn("inbound BLOCKED: "+inmsg); - } - } catch (IOException e1) { - if (_log.shouldLog(Log.WARN)) - _log.warn("IrcInboundFilter: disconnected",e1); - break; - } - } - } catch (RuntimeException re) { - _log.error("Error filtering inbound data", re); - } finally { - try { local.close(); } catch (IOException e) {} - } - if(_log.shouldLog(Log.DEBUG)) - _log.debug("IrcInboundFilter: Done."); - } - - } - - /************************************************************************* - * - */ - public static class IrcOutboundFilter implements Runnable { - - private final Socket local; - private final I2PSocket remote; - private final StringBuffer expectedPong; - private final Log _log; - - public IrcOutboundFilter(Socket _local, I2PSocket _remote, StringBuffer pong, Log log) { - local=_local; - remote=_remote; - expectedPong=pong; - _log = log; - } - - public void run() { - // Todo: Don't use BufferedReader - IRC spec limits line length to 512 but... - BufferedReader in; - OutputStream output; - try { - in = new BufferedReader(new InputStreamReader(local.getInputStream(), "ISO-8859-1")); - output=remote.getOutputStream(); - } catch (IOException e) { - if (_log.shouldLog(Log.ERROR)) - _log.error("IrcOutboundFilter: no streams",e); - return; - } - if (_log.shouldLog(Log.DEBUG)) - _log.debug("IrcOutboundFilter: Running."); - try { - while(true) - { - try { - String inmsg = in.readLine(); - if(inmsg==null) - break; - if(inmsg.endsWith("\r")) - inmsg=inmsg.substring(0,inmsg.length()-1); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("out: [" + inmsg + "]"); - String outmsg = outboundFilter(inmsg, expectedPong); - if(outmsg!=null) - { - if(!inmsg.equals(outmsg)) { - if (_log.shouldLog(Log.WARN)) { - _log.warn("outbound FILTERED: "+outmsg); - _log.warn(" - outbound was: "+inmsg); - } - } else { - if (_log.shouldLog(Log.INFO)) - _log.info("outbound: "+outmsg); - } - outmsg=outmsg+"\r\n"; // rfc1459 sec. 2.3 - output.write(outmsg.getBytes("ISO-8859-1")); - // save 250 ms in streaming - output.flush(); - } else { - if (_log.shouldLog(Log.WARN)) - _log.warn("outbound BLOCKED: "+"\""+inmsg+"\""); - } - } catch (IOException e1) { - if (_log.shouldLog(Log.WARN)) - _log.warn("IrcOutboundFilter: disconnected",e1); - break; - } - } - } catch (RuntimeException re) { - _log.error("Error filtering outbound data", re); - } finally { - try { remote.close(); } catch (IOException e) {} - } - if (_log.shouldLog(Log.DEBUG)) - _log.debug("IrcOutboundFilter: Done."); + if (_DCCClientManager != null) { + _DCCClientManager.close(forced); + _DCCClientManager = null; } } - - - /************************************************************************* - * - */ - - public static String inboundFilter(String s, StringBuffer expectedPong) { - - String field[]=s.split(" ",4); - String command; - int idx=0; - final String[] allowedCommands = - { - // "NOTICE", // can contain CTCP - //"PING", - //"PONG", - "MODE", - "JOIN", - "NICK", - "QUIT", - "PART", - "WALLOPS", - "ERROR", - "KICK", - "H", // "hide operator status" (after kicking an op) - "TOPIC" - }; - - if(field[0].charAt(0)==':') - idx++; - - try { command = field[idx++]; } - catch (IndexOutOfBoundsException ioobe) // wtf, server sent borked command? - { - //_log.warn("Dropping defective message: index out of bounds while extracting command."); - return null; - } - - idx++; //skip victim - - // Allow numerical responses - try { - new Integer(command); - return s; - } catch(NumberFormatException nfe){} - - - if ("PING".equalsIgnoreCase(command)) - return "PING 127.0.0.1"; // no way to know what the ircd to i2ptunnel server con is, so localhost works - if ("PONG".equalsIgnoreCase(command)) { - // Turn the received ":irc.freshcoffee.i2p PONG irc.freshcoffee.i2p :127.0.0.1" - // into ":127.0.0.1 PONG 127.0.0.1 " so that the caller can append the client's extra parameter - // though, does 127.0.0.1 work for irc clients connecting remotely? and for all of them? sure would - // be great if irc clients actually followed the RFCs here, but i guess thats too much to ask. - // If we haven't PINGed them, or the PING we sent isn't something we know how to filter, this - // is blank. - // - // String pong = expectedPong.length() > 0 ? expectedPong.toString() : null; - // If we aren't going to rewrite it, pass it through - String pong = expectedPong.length() > 0 ? expectedPong.toString() : s; - expectedPong.setLength(0); - return pong; - } - - // Allow all allowedCommands - for(int i=0;i= 0) // CTCP marker ^A can be anywhere, not just immediately after the ':' - { - // CTCP - msg=msg.substring(2); - if(msg.startsWith("ACTION ")) { - // /me says hello - return s; - } - return null; // Block all other ctcp - } - return s; - } - - // Block the rest - return null; + return super.close(forced); } - - public static String outboundFilter(String s, StringBuffer expectedPong) { - - String field[]=s.split(" ",3); - String command; - final String[] allowedCommands = - { - // "NOTICE", // can contain CTCP - "MODE", - "JOIN", - "NICK", - "WHO", - "WHOIS", - "LIST", - "NAMES", - "NICK", - // "QUIT", // replace with a filtered QUIT to hide client quit messages - "SILENCE", - "MAP", // seems safe enough, the ircd should protect themselves though - // "PART", // replace with filtered PART to hide client part messages - "OPER", - // "PONG", // replaced with a filtered PING/PONG since some clients send the server IP (thanks aardvax!) - // "PING", - "KICK", - "HELPME", - "RULES", - "TOPIC", - "ISON", // jIRCii uses this for a ping (response is 303) - "INVITE" - }; - if(field[0].length()==0) - return null; // W T F? - - - if(field[0].charAt(0)==':') - return null; // wtf - - command = field[0].toUpperCase(); + // + // Start of the DCCHelper interface + // - if ("PING".equals(command)) { - // Most clients just send a PING and are happy with any old PONG. Others, - // like BitchX, actually expect certain behavior. It sends two different pings: - // "PING :irc.freshcoffee.i2p" and "PING 1234567890 127.0.0.1" (where the IP is the proxy) - // the PONG to the former seems to be "PONG 127.0.0.1", while the PONG to the later is - // ":irc.freshcoffee.i2p PONG irc.freshcoffe.i2p :1234567890". - // We don't want to send them our proxy's IP address, so we need to rewrite the PING - // sent to the server, but when we get a PONG back, use what we expected, rather than - // what they sent. - // - // Yuck. + private class DCC implements DCCHelper { - String rv = null; - expectedPong.setLength(0); - if (field.length == 1) { // PING - rv = "PING"; - // If we aren't rewriting the PING don't rewrite the PONG - // expectedPong.append("PONG 127.0.0.1"); - } else if (field.length == 2) { // PING nonce - rv = "PING " + field[1]; - // If we aren't rewriting the PING don't rewrite the PONG - // expectedPong.append("PONG ").append(field[1]); - } else if (field.length == 3) { // PING nonce serverLocation - rv = "PING " + field[1]; - expectedPong.append("PONG ").append(field[2]).append(" :").append(field[1]); // PONG serverLocation nonce - } else { - //if (_log.shouldLog(Log.ERROR)) - // _log.error("IRC client sent a PING we don't understand, filtering it (\"" + s + "\")"); - rv = null; - } - - //if (_log.shouldLog(Log.WARN)) - // _log.warn("sending ping [" + rv + "], waiting for [" + expectedPong + "] orig was [" + s + "]"); - - return rv; - } - if ("PONG".equals(command)) - return "PONG 127.0.0.1"; // no way to know what the ircd to i2ptunnel server con is, so localhost works + private final byte[] _localAddr; - // Allow all allowedCommands - for(int i=0;i= 0) // CTCP marker ^A can be anywhere, not just immediately after the ':' - { - // CTCP - msg=msg.substring(2); - if(msg.startsWith("ACTION ")) { - // /me says hello - return s; - } - return null; // Block all other ctcp - } - return s; - } - - if("USER".equals(command)) { - int idx = field[2].lastIndexOf(":"); - if(idx<0) - return "USER user hostname localhost :realname"; - String realname = field[2].substring(idx+1); - String ret = "USER "+field[1]+" hostname localhost :"+realname; - return ret; - } - - if ("PART".equals(command)) { - // hide client message - return "PART " + field[1] + " :leaving"; - } - - if ("QUIT".equals(command)) { - return "QUIT :leaving"; - } - - // Block the rest - return null; + /** + * @param local Our IP address, from the IRC client's perspective + */ + public DCC(byte[] local) { + if (local.length == 4) + _localAddr = local; + else + _localAddr = new byte[] {127, 0, 0, 1}; } + + public boolean isEnabled() { + return _dccEnabled; + } + + public String getB32Hostname() { + return Base32.encode(sockMgr.getSession().getMyDestination().calculateHash().getData()) + ".b32.i2p"; + } + + public byte[] getLocalAddress() { + return _localAddr; + } + + public int newOutgoing(byte[] ip, int port, String type) { + I2PTunnelDCCServer server; + synchronized(this) { + if (_DCCServer == null) { + if (_log.shouldLog(Log.INFO)) + _log.info("Starting DCC Server"); + _DCCServer = new I2PTunnelDCCServer(sockMgr, l, I2PTunnelIRCClient.this, getTunnel()); + // TODO add some prudent tunnel options (or is it too late?) + _DCCServer.startRunning(); + } + server = _DCCServer; + } + int rv = server.newOutgoing(ip, port, type); + if (_log.shouldLog(Log.INFO)) + _log.info("New outgoing " + type + ' ' + port + " returns " + rv); + return rv; + } + + public int newIncoming(String b32, int port, String type) { + DCCClientManager tracker; + synchronized(this) { + if (_DCCClientManager == null) { + if (_log.shouldLog(Log.INFO)) + _log.info("Starting DCC Client"); + _DCCClientManager = new DCCClientManager(sockMgr, l, I2PTunnelIRCClient.this, getTunnel()); + } + tracker = _DCCClientManager; + } + // The tracker starts our client + int rv = tracker.newIncoming(b32, port, type); + if (_log.shouldLog(Log.INFO)) + _log.info("New incoming " + type + ' ' + b32 + ' ' + port + " returns " + rv); + return rv; + } + + public int resumeOutgoing(int port) { + DCCClientManager tracker = _DCCClientManager; + if (tracker != null) + return tracker.resumeOutgoing(port); + return -1; + } + + public int resumeIncoming(int port) { + I2PTunnelDCCServer server = _DCCServer; + if (server != null) + return server.resumeIncoming(port); + return -1; + } + + public int acceptOutgoing(int port) { + I2PTunnelDCCServer server = _DCCServer; + if (server != null) + return server.acceptOutgoing(port); + return -1; + } + + public int acceptIncoming(int port) { + DCCClientManager tracker = _DCCClientManager; + if (tracker != null) + return tracker.acceptIncoming(port); + return -1; + } + } } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java index 0e1c9049f..81d894f23 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java @@ -20,10 +20,10 @@ import net.i2p.util.I2PAppThread; import net.i2p.util.Log; public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErrorListener { - private final static Log _log = new Log(I2PTunnelRunner.class); + private final Log _log = new Log(I2PTunnelRunner.class); private static volatile long __runnerId; - private long _runnerId; + private final long _runnerId; /** * max bytes streamed in a packet - smaller ones might be filled * up to this size. Larger ones are not split (at least not on @@ -34,35 +34,51 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr static final int NETWORK_BUFFER_SIZE = MAX_PACKET_SIZE; - private Socket s; - private I2PSocket i2ps; - final Object slock, finishLock = new Object(); + private final Socket s; + private final I2PSocket i2ps; + private final Object slock, finishLock = new Object(); boolean finished = false; - HashMap ostreams, sockets; - byte[] initialI2PData; - byte[] initialSocketData; + private final byte[] initialI2PData; + private final byte[] initialSocketData; /** when the last data was sent/received (or -1 if never) */ private long lastActivityOn; /** when the runner started up */ - private long startedOn; - private List sockList; + private final long startedOn; + private final List sockList; /** if we die before receiving any data, run this job */ - private Runnable onTimeout; + private final Runnable onTimeout; private long totalSent; private long totalReceived; private volatile long __forwarderId; - public I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData, List sockList) { + public I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData, + List sockList) { this(s, i2ps, slock, initialI2PData, null, sockList, null); } - public I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData, byte[] initialSocketData, List sockList) { + + public I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData, + byte[] initialSocketData, List sockList) { this(s, i2ps, slock, initialI2PData, initialSocketData, sockList, null); } - public I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData, List sockList, Runnable onTimeout) { + + public I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData, + List sockList, Runnable onTimeout) { this(s, i2ps, slock, initialI2PData, null, sockList, onTimeout); } - public I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData, byte[] initialSocketData, List sockList, Runnable onTimeout) { + + /** + * Starts itself + * + * @param slock the socket lock, non-null + * @param initialI2PData may be null + * @param initialSocketData may be null + * @param sockList may be null. Caller must add i2ps to the list! It will be removed here on completion. + * Will synchronize on slock when removing. + * @param onTImeout may be null + */ + public I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData, + byte[] initialSocketData, List sockList, Runnable onTimeout) { this.sockList = sockList; this.s = s; this.i2ps = i2ps; @@ -84,6 +100,7 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr * have we closed at least one (if not both) of the streams * [aka we're done running the streams]? * + * @deprecated unused */ public boolean isFinished() { return finished; @@ -93,7 +110,7 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr * When was the last data for this runner sent or received? * * @return date (ms since the epoch), or -1 if no data has been transferred yet - * + * @deprecated unused */ public long getLastActivityOn() { return lastActivityOn; @@ -237,11 +254,11 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr private class StreamForwarder extends I2PAppThread { - InputStream in; - OutputStream out; - String direction; - private boolean _toI2P; - private ByteCache _cache; + private final InputStream in; + private final OutputStream out; + private final String direction; + private final boolean _toI2P; + private final ByteCache _cache; private StreamForwarder(InputStream in, OutputStream out, boolean toI2P) { this.in = in; diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java index 6947b520f..94edfac46 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java @@ -47,7 +47,7 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { protected int remotePort; private boolean _usePool; - private Logging l; + protected Logging l; private static final long DEFAULT_READ_TIMEOUT = -1; // 3*60*1000; /** default timeout to 3 minutes - override if desired */ @@ -69,10 +69,13 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { protected boolean bidir = false; private ThreadPoolExecutor _executor; + /** unused? port should always be specified */ private int DEFAULT_LOCALPORT = 4488; protected int localPort = DEFAULT_LOCALPORT; /** + * @param privData Base64-encoded private key data, + * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} * @throws IllegalArgumentException if the I2CP configuration is b0rked so * badly that we cant create a socketManager */ @@ -84,6 +87,9 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { } /** + * @param privkey file containing the private key data, + * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} + * @param privkeyname the name of the privKey file, not clear why we need this too * @throws IllegalArgumentException if the I2CP configuration is b0rked so * badly that we cant create a socketManager */ @@ -105,6 +111,9 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { } /** + * @param privData stream containing the private key data, + * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} + * @param privkeyname the name of the privKey file, not clear why we need this too * @throws IllegalArgumentException if the I2CP configuration is b0rked so * badly that we cant create a socketManager */ @@ -114,10 +123,28 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { init(host, port, privData, privkeyname, l); } + /** + * @param sktMgr the existing socket manager + * @since 0.8.9 + */ + public I2PTunnelServer(InetAddress host, int port, I2PSocketManager sktMgr, + Logging l, EventDispatcher notifyThis, I2PTunnel tunnel) { + super("Server at " + host + ':' + port, notifyThis, tunnel); + this.l = l; + this.remoteHost = host; + this.remotePort = port; + _log = tunnel.getContext().logManager().getLog(getClass()); + sockMgr = sktMgr; + open = true; + } + private static final int RETRY_DELAY = 20*1000; private static final int MAX_RETRIES = 4; /** + * @param privData stream containing the private key data, + * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} + * @param privkeyname the name of the privKey file, not clear why we need this too * @throws IllegalArgumentException if the I2CP configuration is b0rked so * badly that we cant create a socketManager */ diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelTask.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelTask.java index 4a6a0bb66..e81e58ddd 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelTask.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelTask.java @@ -13,9 +13,7 @@ import net.i2p.util.EventDispatcherImpl; * Either a Server or a Client. */ -public abstract class I2PTunnelTask implements EventDispatcher { - - private final EventDispatcherImpl _event = new EventDispatcherImpl(); +public abstract class I2PTunnelTask extends EventDispatcherImpl { private int id; private String name; @@ -77,41 +75,4 @@ public abstract class I2PTunnelTask implements EventDispatcher { public String toString() { return name; } - - /* Required by the EventDispatcher interface */ - public EventDispatcher getEventDispatcher() { - return _event; - } - - public void attachEventDispatcher(EventDispatcher e) { - _event.attachEventDispatcher(e.getEventDispatcher()); - } - - public void detachEventDispatcher(EventDispatcher e) { - _event.detachEventDispatcher(e.getEventDispatcher()); - } - - public void notifyEvent(String e, Object a) { - _event.notifyEvent(e, a); - } - - public Object getEventValue(String n) { - return _event.getEventValue(n); - } - - public Set getEvents() { - return _event.getEvents(); - } - - public void ignoreEvents() { - _event.ignoreEvents(); - } - - public void unIgnoreEvents() { - _event.unIgnoreEvents(); - } - - public Object waitEventValue(String n) { - return _event.waitEventValue(n); - } -} \ No newline at end of file +} diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/TunnelController.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/TunnelController.java index 1f1f9e971..6844a4ba7 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/TunnelController.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/TunnelController.java @@ -22,16 +22,19 @@ import net.i2p.util.Log; import net.i2p.util.SecureFileOutputStream; /** - * Coordinate the runtime operation and configuration of a tunnel. + * Coordinate the runtime operation and configuration of a single I2PTunnel. + * An I2PTunnel tracks one or more I2PTunnelTasks and one or more I2PSessions. + * Usually one of each. + * * These objects are bundled together under a TunnelControllerGroup where the * entire group is stored / loaded from a single config file. * */ public class TunnelController implements Logging { - private Log _log; + private final Log _log; private Properties _config; - private I2PTunnel _tunnel; - private List _messages; + private final I2PTunnel _tunnel; + private final List _messages; private List _sessions; private boolean _running; private boolean _starting; diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/DCCClientManager.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/DCCClientManager.java new file mode 100644 index 000000000..d6a0f7958 --- /dev/null +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/DCCClientManager.java @@ -0,0 +1,236 @@ +package net.i2p.i2ptunnel.irc; + +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import net.i2p.client.streaming.I2PSocketManager; +import net.i2p.data.Base32; +import net.i2p.i2ptunnel.I2PTunnel; +import net.i2p.i2ptunnel.Logging; +import net.i2p.util.EventDispatcher; +import net.i2p.util.Log; + +/** + * Start, track, and expire the I2PTunnelDCCClients. + * + *
+ *
+ *                                            direct conn
+ *                <---> I2PTunnelDCCServer <--------------->I2PTunnelDCCClient <---->
+ *   originating                                                                     responding
+ *   chat client                                                                     chat client
+ *        CHAT    ---> I2PTunnelIRCClient --> IRC server --> I2TunnelIRCClient ----->
+ *        SEND    ---> I2PTunnelIRCClient --> IRC server --> I2TunnelIRCClient ----->
+ *        RESUME  <--- I2PTunnelIRCClient <-- IRC server <-- I2TunnelIRCClient <-----
+ *        ACCEPT  ---> I2PTunnelIRCClient --> IRC server --> I2TunnelIRCClient ----->
+ *
+ * 
+ * + * @since 0.8.9 + */ +public class DCCClientManager extends EventReceiver { + private final I2PSocketManager sockMgr; + private final EventDispatcher _dispatch; + private final Logging l; + private final I2PTunnel _tunnel; + private final Log _log; + + /** key is the DCC client's local port */ + private final ConcurrentHashMap _incoming; + /** key is the DCC client's local port */ + private final ConcurrentHashMap _active; + /** key is the DCC client's local port */ + private final ConcurrentHashMap _complete; + + // list of client tunnels? + private static long _id; + + private static final int MAX_INCOMING_PENDING = 10; + private static final int MAX_INCOMING_ACTIVE = 10; + private static final long ACTIVE_EXPIRE = 60*60*1000; + + public DCCClientManager(I2PSocketManager sktMgr, Logging logging, + EventDispatcher dispatch, I2PTunnel tunnel) { + sockMgr = sktMgr; + l = logging; + _dispatch = dispatch; + _tunnel = tunnel; + _log = tunnel.getContext().logManager().getLog(DCCClientManager.class); + _incoming = new ConcurrentHashMap(8); + _active = new ConcurrentHashMap(8); + _complete = new ConcurrentHashMap(8); + } + + public boolean close(boolean forced) { + for (I2PTunnelDCCClient c : _incoming.values()) { + c.stop(); + } + _incoming.clear(); + for (I2PTunnelDCCClient c : _active.values()) { + c.stop(); + } + _active.clear(); + _complete.clear(); + return true; + } + + /** + * An incoming DCC request + * + * @param b32 remote dcc server b32 address + * @param port remote dcc server I2P port + * @param type ignored + * @return local DCC client tunnel port or -1 on error + */ + public int newIncoming(String b32, int port, String type) { + return newIncoming(b32, port, type, 0); + } + + /** + * @param localPort bind to port or 0; if nonzero it will be the rv + */ + private int newIncoming(String b32, int port, String type, int localPort) { + b32 = b32.toLowerCase(); + // do some basic verification before starting the client + if (b32.length() != 60 || !b32.endsWith(".b32.i2p")) + return -1; + byte[] dec = Base32.decode(b32.substring(0, 52)); + if (dec == null || dec.length != 32) + return -1; + expireInbound(); + if (_incoming.size() >= MAX_INCOMING_PENDING || + _active.size() >= MAX_INCOMING_PENDING) { + _log.error("Too many incoming DCC, max is " + MAX_INCOMING_PENDING + + '/' + MAX_INCOMING_ACTIVE + " pending/active"); + return -1; + } + try { + // Transparent tunnel used for all types... + // Do we need to do any filtering for chat? + I2PTunnelDCCClient cTunnel = new I2PTunnelDCCClient(b32, localPort, port, l, sockMgr, + _dispatch, _tunnel, ++_id); + cTunnel.attachEventDispatcher(this); + int lport = cTunnel.getLocalPort(); + if (_log.shouldLog(Log.WARN)) + _log.warn("Opened client tunnel at port " + lport + + " pointing to " + b32 + ':' + port); + _incoming.put(Integer.valueOf(lport), cTunnel); + return lport; + } catch (IllegalArgumentException uhe) { + l.log("Could not find listen host to bind to [" + _tunnel.host + "]"); + _log.error("Error finding host to bind", uhe); + return -1; + } + } + + /** + * An outgoing RESUME request + * + * @param port local DCC client tunnel port + * @return remote DCC server i2p port or -1 on error + */ + public int resumeOutgoing(int port) { + Integer lport = Integer.valueOf(port); + I2PTunnelDCCClient tun = _complete.get(lport); + if (tun == null) { + tun = _active.get(lport); + if (tun == null) + // shouldn't happen + tun = _incoming.get(lport); + } + if (tun != null) { + tun.stop(); + return tun.getLocalPort(); + } + return -1; + } + + /** + * An incoming ACCEPT response + * + * @param port remote dcc server I2P port + * @return local DCC client tunnel port or -1 on error + */ + public int acceptIncoming(int port) { + // do a reverse lookup + for (I2PTunnelDCCClient tun : _complete.values()) { + if (tun.getRemotePort() == port) + return newIncoming(tun.getDest(), port, "ACCEPT", tun.getLocalPort()); + } + for (I2PTunnelDCCClient tun : _active.values()) { + if (tun.getRemotePort() == port) + return newIncoming(tun.getDest(), port, "ACCEPT", tun.getLocalPort()); + } + for (I2PTunnelDCCClient tun : _incoming.values()) { + if (tun.getRemotePort() == port) { + // shouldn't happen + tun.stop(); + return newIncoming(tun.getDest(), port, "ACCEPT", tun.getLocalPort()); + } + } + return -1; + } + + /** + * The EventReceiver callback + */ + public void notifyEvent(String eventName, Object args) { + if (eventName.equals(I2PTunnelDCCClient.CONNECT_START_EVENT)) { + try { + I2PTunnelDCCClient client = (I2PTunnelDCCClient) args; + connStarted(client); + } catch (ClassCastException cce) {} + } else if (eventName.equals(I2PTunnelDCCClient.CONNECT_STOP_EVENT)) { + try { + Integer port = (Integer) args; + connStopped(port); + } catch (ClassCastException cce) {} + } + } + + private void connStarted(I2PTunnelDCCClient client) { + Integer lport = Integer.valueOf(client.getLocalPort()); + I2PTunnelDCCClient c = _incoming.remove(lport); + if (c != null) { + _active.put(lport, client); + if (_log.shouldLog(Log.WARN)) + _log.warn("Added client tunnel for port " + lport + + " pending count now: " + _incoming.size() + + " active count now: " + _active.size() + + " complete count now: " + _complete.size()); + } + } + + private void connStopped(Integer lport) { + I2PTunnelDCCClient tun = _incoming.remove(lport); + if (tun != null) + _complete.put(lport, tun); + tun = _active.remove(lport); + if (tun != null) + _complete.put(lport, tun); + if (_log.shouldLog(Log.WARN)) + _log.warn("Removed client tunnel for port " + lport + + " pending count now: " + _incoming.size() + + " active count now: " + _active.size() + + " complete count now: " + _complete.size()); + } + + private void expireInbound() { + for (Iterator iter = _incoming.values().iterator(); iter.hasNext(); ) { + I2PTunnelDCCClient c = iter.next(); + if (c.getExpires() < _tunnel.getContext().clock().now()) { + iter.remove(); + c.stop(); + } + } + // shouldn't need to expire active + for (Iterator iter = _complete.values().iterator(); iter.hasNext(); ) { + I2PTunnelDCCClient c = iter.next(); + if (c.getExpires() < _tunnel.getContext().clock().now()) { + iter.remove(); + c.stop(); + } + } + } +} diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/DCCHelper.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/DCCHelper.java new file mode 100644 index 000000000..d7e6656f5 --- /dev/null +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/DCCHelper.java @@ -0,0 +1,74 @@ +package net.i2p.i2ptunnel.irc; + +/** + * Hooks to create and maintain DCC client and server tunnels + * + * @since 0.8.9 + */ +public interface DCCHelper { + + public boolean isEnabled(); + + /** + * String to put in the outgoing DCC + */ + public String getB32Hostname(); + + /** + * Our IP address (taken from the socket), must be IPv4 + */ + public byte[] getLocalAddress(); + + /** + * An outgoing DCC request + * + * @param ip local irc client IP + * @param port local irc client port + * @param type string + * @return local DCC server i2p port or -1 on error + */ + public int newOutgoing(byte[] ip, int port, String type); + + /** + * An incoming DCC request + * + * @param b32 remote dcc server b32 address + * @param port remote dcc server I2P port + * @param type string + * @return local DCC client tunnel port or -1 on error + */ + public int newIncoming(String b32, int port, String type); + + /** + * An outgoing RESUME request + * + * @param port local DCC client tunnel port + * @return remote DCC server i2p port or -1 on error + */ + public int resumeOutgoing(int port); + + /** + * An incoming RESUME request + * + * @param port local dcc server I2P port + * @return local IRC client DCC port or -1 on error + */ + public int resumeIncoming(int port); + + /** + * An outgoing ACCEPT response + * + * @param port local irc client DCC port + * @return local DCC server i2p port or -1 on error + */ + public int acceptOutgoing(int port); + + /** + * An incoming ACCEPT response + * + * @param port remote dcc server I2P port + * @return local DCC client tunnel port or -1 on error + */ + public int acceptIncoming(int port); + +} diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/EventReceiver.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/EventReceiver.java new file mode 100644 index 000000000..f03c8b90c --- /dev/null +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/EventReceiver.java @@ -0,0 +1,76 @@ +package net.i2p.i2ptunnel.irc; + +/* + * free (adj.): unencumbered; not under the control of others Written + * by human & jrandom in 2004 and released into the public domain with + * no warranty of any kind, either expressed or implied. It probably + * won't make your computer catch on fire, or eat your children, but + * it might. Use at your own risk. + * + */ + +import java.util.Set; + +import net.i2p.util.EventDispatcher; + +/** + * An implementation of the EventDispatcher interface for + * receiving events via in-line notifyEvent() only. + * Does not support chaining to additional dispatchers. + * Does not support waitEventValue(). + * Does not support ignoring. + * + * @since 0.8.9 + */ +public abstract class EventReceiver implements EventDispatcher { + + public EventDispatcher getEventDispatcher() { + return this; + } + + /** + * @throws UnsupportedOperationException always + */ + public void attachEventDispatcher(EventDispatcher ev) { + throw new UnsupportedOperationException(); + } + + /** + * @throws UnsupportedOperationException always + */ + public void detachEventDispatcher(EventDispatcher ev) { + throw new UnsupportedOperationException(); + } + + public abstract void notifyEvent(String eventName, Object args); + + /** + * @throws UnsupportedOperationException always + */ + public Object getEventValue(String name) { + throw new UnsupportedOperationException(); + } + + /** + * @throws UnsupportedOperationException always + */ + public Set getEvents() { + throw new UnsupportedOperationException(); + } + + /** + * @throws UnsupportedOperationException always + */ + public void ignoreEvents() { + throw new UnsupportedOperationException(); + } + + public void unIgnoreEvents() {} + + /** + * @throws UnsupportedOperationException always + */ + public Object waitEventValue(String name) { + throw new UnsupportedOperationException(); + } +} diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCClient.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCClient.java new file mode 100644 index 000000000..4d70d2ee2 --- /dev/null +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCClient.java @@ -0,0 +1,132 @@ +/* I2PTunnel is GPL'ed (with the exception mentioned in I2PTunnel.java) + * (c) 2003 - 2004 mihi + */ +package net.i2p.i2ptunnel.irc; + +import java.net.Socket; +import java.io.IOException; + +import net.i2p.client.streaming.I2PSocket; +import net.i2p.client.streaming.I2PSocketManager; +import net.i2p.client.streaming.I2PSocketOptions; +import net.i2p.data.Destination; +import net.i2p.i2ptunnel.I2PTunnel; +import net.i2p.i2ptunnel.I2PTunnelClientBase; +import net.i2p.i2ptunnel.I2PTunnelRunner; +import net.i2p.i2ptunnel.Logging; +import net.i2p.util.EventDispatcher; +import net.i2p.util.Log; + +/** + * A standard client, using an existing socket manager. + * Targets a single destination and port. + * Naming resolution is delayed until connect time. + * + * @since 0.8.9 + */ +public class I2PTunnelDCCClient extends I2PTunnelClientBase { + + // delay resolution until connect time + private final String _dest; + private final int _remotePort; + private long _expires; + + private static final long INBOUND_EXPIRE = 30*60*1000; + private static final long INBOUND_STOP_EXPIRE = 30*60*1000; + public static final String CONNECT_START_EVENT = "connectionStarted"; + public static final String CONNECT_STOP_EVENT = "connectionStopped"; + + /** + * @param dest the target, presumably b32 + * @param localPort if 0, use any port, get actual port selected with getLocalPort() + * @throws IllegalArgumentException if the I2PTunnel does not contain + * valid config to contact the router + */ + public I2PTunnelDCCClient(String dest, int localPort, int remotePort, Logging l, + I2PSocketManager sktMgr, EventDispatcher notifyThis, + I2PTunnel tunnel, long clientId) throws IllegalArgumentException { + super(localPort, l, sktMgr, tunnel, notifyThis, clientId); + _dest = dest; + _remotePort = remotePort; + _expires = tunnel.getContext().clock().now() + INBOUND_EXPIRE; + + setName("DCC send -> " + dest + ':' + remotePort); + + startRunning(); + } + + /** + * Accept one connection only. + */ + protected void clientConnectionRun(Socket s) { + I2PSocket i2ps = null; + if (_log.shouldLog(Log.INFO)) + _log.info("Opening DCC connection to " + _dest + ':' + _remotePort); + Destination dest = _context.namingService().lookup(_dest); + if (dest == null) { + _log.error("Could not find leaseset for DCC connection to " + _dest + ':' + _remotePort); + closeSocket(s); + stop(); + notifyEvent(CONNECT_STOP_EVENT, Integer.valueOf(getLocalPort())); + return; + } + + I2PSocketOptions opts = sockMgr.buildOptions(); + opts.setPort(_remotePort); + try { + i2ps = createI2PSocket(dest, opts); + new Runner(s, i2ps); + } catch (Exception ex) { + _log.error("Could not make DCC connection to " + _dest + ':' + _remotePort, ex); + closeSocket(s); + if (i2ps != null) { + try { i2ps.close(); } catch (IOException ioe) {} + } + notifyEvent(CONNECT_STOP_EVENT, Integer.valueOf(getLocalPort())); + } + stop(); + } + + public long getExpires() { + return _expires; + } + + public String getDest() { + return _dest; + } + + public int getRemotePort() { + return _remotePort; + } + + /** + * Stop listening for new sockets. + * We can't call super.close() as it kills all sockets in the sockMgr + */ + public void stop() { + open = false; + try { + ss.close(); + } catch (IOException ioe) {} + } + + /** + * Just so we can do the callbacks + */ + private class Runner extends I2PTunnelRunner { + + public Runner(Socket s, I2PSocket i2ps) { + // super calls start() + super(s, i2ps, sockLock, null, mySockets); + } + + @Override + public void run() { + _expires = getTunnel().getContext().clock().now() + INBOUND_STOP_EXPIRE; + notifyEvent(CONNECT_START_EVENT, I2PTunnelDCCClient.this); + super.run(); + _expires = getTunnel().getContext().clock().now() + INBOUND_STOP_EXPIRE; + notifyEvent(CONNECT_STOP_EVENT, Integer.valueOf(getLocalPort())); + } + } +} diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCServer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCServer.java new file mode 100644 index 000000000..0c3ec96ff --- /dev/null +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCServer.java @@ -0,0 +1,272 @@ +package net.i2p.i2ptunnel.irc; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.Socket; +import java.net.SocketException; +import java.net.UnknownHostException; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; + +import net.i2p.client.streaming.I2PSocket; +import net.i2p.client.streaming.I2PSocketManager; +import net.i2p.i2ptunnel.I2PTunnel; +import net.i2p.i2ptunnel.I2PTunnelRunner; +import net.i2p.i2ptunnel.I2PTunnelServer; +import net.i2p.i2ptunnel.Logging; +import net.i2p.util.EventDispatcher; +import net.i2p.util.Log; + +/** + * A standard server that only answers for registered ports, + * and each port can only be used once. + * + *
+ *
+ *                                            direct conn
+ *                <---> I2PTunnelDCCServer <--------------->I2PTunnelDCCClient <---->
+ *   originating                                                                     responding
+ *   chat client                                                                     chat client
+ *        CHAT    ---> I2PTunnelIRCClient --> IRC server --> I2TunnelIRCClient ----->
+ *        SEND    ---> I2PTunnelIRCClient --> IRC server --> I2TunnelIRCClient ----->
+ *        RESUME  <--- I2PTunnelIRCClient <-- IRC server <-- I2TunnelIRCClient <-----
+ *        ACCEPT  ---> I2PTunnelIRCClient --> IRC server --> I2TunnelIRCClient ----->
+ *
+ * 
+ * + * @since 0.8.9 + */ +public class I2PTunnelDCCServer extends I2PTunnelServer { + + /** key is the server's local I2P port */ + private final ConcurrentHashMap _outgoing; + /** key is the server's local I2P port */ + private final ConcurrentHashMap _active; + /** key is the server's local I2P port */ + private final ConcurrentHashMap _resume; + private final List _sockList; + + // list of client tunnels? + private static long _id; + + /** just to keep super() happy */ + private static final InetAddress DUMMY; + static { + InetAddress dummy = null; + try { + dummy = InetAddress.getByAddress(new byte[4]); + } catch (UnknownHostException uhe) {} + DUMMY = dummy; + } + + private static final int MIN_I2P_PORT = 1; + private static final int MAX_I2P_PORT = 65535; + private static final int MAX_OUTGOING_PENDING = 20; + private static final int MAX_OUTGOING_ACTIVE = 20; + private static final long OUTBOUND_EXPIRE = 30*60*1000; + private static final long ACTIVE_EXPIRE = 60*60*1000; + + /** + * There's no support for unsolicited incoming I2P connections, + * so there's no server host or port parameters. + * + * @param sktMgr an existing socket manager + * @throws IllegalArgumentException if the I2PTunnel does not contain + * valid config to contact the router + */ + public I2PTunnelDCCServer(I2PSocketManager sktMgr, Logging l, + EventDispatcher notifyThis, I2PTunnel tunnel) { + super(DUMMY, 0, sktMgr, l, notifyThis, tunnel); + _outgoing = new ConcurrentHashMap(8); + _active = new ConcurrentHashMap(8); + _resume = new ConcurrentHashMap(8); + _sockList = new CopyOnWriteArrayList(); + } + + /** + * An incoming DCC connection, only accept for a known port. + * Passed through without filtering. + */ + @Override + protected void blockingHandle(I2PSocket socket) { + if (_log.shouldLog(Log.INFO)) + _log.info("Incoming connection to '" + toString() + "' from: " + socket.getPeerDestination().calculateHash().toBase64()); + + try { + expireOutbound(); + int myPort = socket.getLocalPort(); + // Port is a one-time-use only + LocalAddress local = _outgoing.remove(Integer.valueOf(myPort)); + if (local == null) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Rejecting incoming DCC connection for unknown port " + myPort); + try { + socket.close(); + } catch (IOException ioe) {} + return; + } + if (_log.shouldLog(Log.WARN)) + _log.warn("Incoming DCC connection for I2P port " + myPort + + " sending to " + local.ia + ':' + local.port); + Socket s = new Socket(local.ia, local.port); + _sockList.add(socket); + new I2PTunnelRunner(s, socket, slock, null, _sockList); + local.socket = socket; + local.expire = getTunnel().getContext().clock().now() + OUTBOUND_EXPIRE; + _active.put(Integer.valueOf(myPort), local); + } catch (SocketException ex) { + try { + socket.close(); + } catch (IOException ioe) {} + if (_log.shouldLog(Log.ERROR)) + _log.error("Error connecting to server " + remoteHost + ':' + remotePort, ex); + } catch (IOException ex) { + _log.error("Error while waiting for I2PConnections", ex); + } + } + + @Override + public boolean close(boolean forced) { + _outgoing.clear(); + _active.clear(); + for (I2PSocket s : _sockList) { + try { + s.close(); + } catch (IOException ioe) {} + } + _sockList.clear(); + return super.close(forced); + } + + /** + * An outgoing DCC request + * + * @param ip local irc client IP + * @param port local irc client port + * @param type ignored + * @return i2p port or -1 on error + */ + public int newOutgoing(byte[] ip, int port, String type) { + return newOutgoing(ip, port, type, 0); + } + + /** + * @param port local dcc server I2P port or 0 to pick one at random + */ + private int newOutgoing(byte[] ip, int port, String type, int i2pPort) { + expireOutbound(); + if (_outgoing.size() >= MAX_OUTGOING_PENDING || + _active.size() >= MAX_OUTGOING_ACTIVE) { + _log.error("Too many outgoing DCC, max is " + MAX_OUTGOING_PENDING + + '/' + MAX_OUTGOING_ACTIVE + " pending/active"); + return -1; + } + InetAddress ia; + try { + ia = InetAddress.getByAddress(ip); + } catch (UnknownHostException uhe) { + return -1; + } + int limit = i2pPort > 0 ? 10 : 1; + LocalAddress client = new LocalAddress(ia, port, getTunnel().getContext().clock().now() + OUTBOUND_EXPIRE); + for (int i = 0; i < limit; i++) { + int iport; + if (i2pPort > 0) + iport = i2pPort; + else + iport = MIN_I2P_PORT + getTunnel().getContext().random().nextInt(1 + MAX_I2P_PORT - MIN_I2P_PORT); + if (_active.containsKey(Integer.valueOf(iport))) + continue; + LocalAddress old = _outgoing.putIfAbsent(Integer.valueOf(iport), client); + if (old != null) + continue; + // TODO expire in a few minutes + return iport; + } + // couldn't find an unused i2p port + return -1; + } + + /** + * An incoming RESUME request + * + * @param port local dcc server I2P port + * @return local IRC client DCC port or -1 on error + */ + public int resumeIncoming(int port) { + Integer iport = Integer.valueOf(port); + LocalAddress local = _active.remove(iport); + if (local != null) { + local.expire = getTunnel().getContext().clock().now() + OUTBOUND_EXPIRE; + _resume.put(Integer.valueOf(local.port), local); + return local.port; + } + local = _outgoing.get(iport); + if (local != null) { + // shouldn't happen + local.expire = getTunnel().getContext().clock().now() + OUTBOUND_EXPIRE; + return local.port; + } + return -1; + } + + /** + * An outgoing ACCEPT response + * + * @param port local irc client DCC port + * @return local DCC server i2p port or -1 on error + */ + public int acceptOutgoing(int port) { + // do a reverse lookup + for (Iterator> iter = _resume.entrySet().iterator(); iter.hasNext(); ) { + Map.Entry e = iter.next(); + LocalAddress local = e.getValue(); + if (local.port == port) { + iter.remove(); + return newOutgoing(local.ia.getAddress(), port, "ACCEPT", e.getKey().intValue()); + } + } + return -1; + } + + private InetAddress getListenHost(Logging l) { + try { + return InetAddress.getByName(getTunnel().listenHost); + } catch (UnknownHostException uhe) { + l.log("Could not find listen host to bind to [" + getTunnel().host + "]"); + _log.error("Error finding host to bind", uhe); + notifyEvent("openBaseClientResult", "error"); + return null; + } + } + + private void expireOutbound() { + for (Iterator iter = _outgoing.values().iterator(); iter.hasNext(); ) { + LocalAddress a = iter.next(); + if (a.expire < getTunnel().getContext().clock().now()) + iter.remove(); + } + for (Iterator iter = _active.values().iterator(); iter.hasNext(); ) { + LocalAddress a = iter.next(); + I2PSocket s = a.socket; + if (s != null && s.isClosed()) + iter.remove(); + } + } + + private static class LocalAddress { + public final InetAddress ia; + public final int port; + public long expire; + public I2PSocket socket; + + public LocalAddress(InetAddress a, int p, long exp) { + ia = a; + port = p; + expire = exp; + } + } +} diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/IRCFilter.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/IRCFilter.java new file mode 100644 index 000000000..f1a5c05d7 --- /dev/null +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/IRCFilter.java @@ -0,0 +1,483 @@ +package net.i2p.i2ptunnel.irc; + +import net.i2p.data.DataHelper; + + +/** + * Static methods to filter individual lines. + * Moved from I2PTunnelIRCClient.java + * + * @since 0.8.9 + */ +abstract class IRCFilter { + + private static final boolean ALLOW_ALL_DCC_IN = false; + private static final boolean ALLOW_ALL_DCC_OUT = false; + /** does not override DCC handling */ + private static final boolean ALLOW_ALL_CTCP_IN = false; + /** does not override DCC handling */ + private static final boolean ALLOW_ALL_CTCP_OUT = false; + + /************************************************************************* + * + * Modify or filter a single inbound line. + * + * @param helper may be null + * @return the original or modified line, or null if it should be dropped. + */ + public static String inboundFilter(String s, StringBuffer expectedPong, DCCHelper helper) { + + String field[]=s.split(" ",4); + String command; + int idx=0; + final String[] allowedCommands = + { + // "NOTICE", // can contain CTCP + //"PING", + //"PONG", + "MODE", + "JOIN", + "NICK", + "QUIT", + "PART", + "WALLOPS", + "ERROR", + "KICK", + "H", // "hide operator status" (after kicking an op) + "TOPIC", + // http://tools.ietf.org/html/draft-mitchell-irc-capabilities-01 + "CAP" + }; + + if(field[0].charAt(0)==':') + idx++; + + try { command = field[idx++]; } + catch (IndexOutOfBoundsException ioobe) // wtf, server sent borked command? + { + //_log.warn("Dropping defective message: index out of bounds while extracting command."); + return null; + } + + idx++; //skip victim + + // Allow numerical responses + try { + new Integer(command); + return s; + } catch(NumberFormatException nfe){} + + + if ("PING".equalsIgnoreCase(command)) + return "PING 127.0.0.1"; // no way to know what the ircd to i2ptunnel server con is, so localhost works + if ("PONG".equalsIgnoreCase(command)) { + // Turn the received ":irc.freshcoffee.i2p PONG irc.freshcoffee.i2p :127.0.0.1" + // into ":127.0.0.1 PONG 127.0.0.1 " so that the caller can append the client's extra parameter + // though, does 127.0.0.1 work for irc clients connecting remotely? and for all of them? sure would + // be great if irc clients actually followed the RFCs here, but i guess thats too much to ask. + // If we haven't PINGed them, or the PING we sent isn't something we know how to filter, this + // is blank. + // + // String pong = expectedPong.length() > 0 ? expectedPong.toString() : null; + // If we aren't going to rewrite it, pass it through + String pong = expectedPong.length() > 0 ? expectedPong.toString() : s; + expectedPong.setLength(0); + return pong; + } + + // Allow all allowedCommands + for(int i=0;i= 0) // CTCP marker ^A can be anywhere, not just immediately after the ':' + { + // CTCP + + // don't even try to parse multiple CTCP in the same message + int count = 0; + for (int i = 0; i < msg.length(); i++) { + if (msg.charAt(i) == 0x01) + count++; + } + if (count != 2) + return null; + + msg=msg.substring(2); + if(msg.startsWith("ACTION ")) { + // /me says hello + return s; + } + if (msg.startsWith("DCC ")) { + StringBuilder buf = new StringBuilder(128); + for (int i = 0; i <= idx - 2; i++) { + buf.append(field[i]).append(' '); + } + buf.append(":\001DCC "); + return filterDCCIn(buf.toString(), msg.substring(4), helper); + } + // XDCC looks safe, ip/port happens over regular DCC + // http://en.wikipedia.org/wiki/XDCC + if (msg.toUpperCase().startsWith("XDCC ") && helper != null && helper.isEnabled()) + return s; + if (ALLOW_ALL_CTCP_IN) + return s; + return null; // Block all other ctcp + } + return s; + } + + // Block the rest + return null; + } + + /************************************************************************* + * + * Modify or filter a single outbound line. + * + * @param helper may be null + * @return the original or modified line, or null if it should be dropped. + */ + public static String outboundFilter(String s, StringBuffer expectedPong, DCCHelper helper) { + + String field[]=s.split(" ",3); + String command; + final String[] allowedCommands = + { + // "NOTICE", // can contain CTCP + "MODE", + "JOIN", + "NICK", + "WHO", + "WHOIS", + "LIST", + "NAMES", + "NICK", + // "QUIT", // replace with a filtered QUIT to hide client quit messages + "SILENCE", + "MAP", // seems safe enough, the ircd should protect themselves though + // "PART", // replace with filtered PART to hide client part messages + "OPER", + // "PONG", // replaced with a filtered PING/PONG since some clients send the server IP (thanks aardvax!) + // "PING", + "KICK", + "HELPME", + "RULES", + "TOPIC", + "ISON", // jIRCii uses this for a ping (response is 303) + "INVITE", + "AWAY", // should be harmless + // http://tools.ietf.org/html/draft-mitchell-irc-capabilities-01 + "CAP" + }; + + if(field[0].length()==0) + return null; // W T F? + + + if(field[0].charAt(0)==':') + return null; // wtf + + command = field[0].toUpperCase(); + + if ("PING".equals(command)) { + // Most clients just send a PING and are happy with any old PONG. Others, + // like BitchX, actually expect certain behavior. It sends two different pings: + // "PING :irc.freshcoffee.i2p" and "PING 1234567890 127.0.0.1" (where the IP is the proxy) + // the PONG to the former seems to be "PONG 127.0.0.1", while the PONG to the later is + // ":irc.freshcoffee.i2p PONG irc.freshcoffe.i2p :1234567890". + // We don't want to send them our proxy's IP address, so we need to rewrite the PING + // sent to the server, but when we get a PONG back, use what we expected, rather than + // what they sent. + // + // Yuck. + + String rv = null; + expectedPong.setLength(0); + if (field.length == 1) { // PING + rv = "PING"; + // If we aren't rewriting the PING don't rewrite the PONG + // expectedPong.append("PONG 127.0.0.1"); + } else if (field.length == 2) { // PING nonce + rv = "PING " + field[1]; + // If we aren't rewriting the PING don't rewrite the PONG + // expectedPong.append("PONG ").append(field[1]); + } else if (field.length == 3) { // PING nonce serverLocation + rv = "PING " + field[1]; + expectedPong.append("PONG ").append(field[2]).append(" :").append(field[1]); // PONG serverLocation nonce + } else { + //if (_log.shouldLog(Log.ERROR)) + // _log.error("IRC client sent a PING we don't understand, filtering it (\"" + s + "\")"); + rv = null; + } + + //if (_log.shouldLog(Log.WARN)) + // _log.warn("sending ping [" + rv + "], waiting for [" + expectedPong + "] orig was [" + s + "]"); + + return rv; + } + if ("PONG".equals(command)) + return "PONG 127.0.0.1"; // no way to know what the ircd to i2ptunnel server con is, so localhost works + + // Allow all allowedCommands + for(int i=0;i= 0) // CTCP marker ^A can be anywhere, not just immediately after the ':' + { + // CTCP + + // don't even try to parse multiple CTCP in the same message + int count = 0; + for (int i = 0; i < msg.length(); i++) { + if (msg.charAt(i) == 0x01) + count++; + } + if (count != 2) + return null; + + msg=msg.substring(2); + if(msg.startsWith("ACTION ")) { + // /me says hello + return s; + } + if (msg.startsWith("DCC ")) + return filterDCCOut(field[0] + ' ' + field[1] + " :\001DCC ", msg.substring(4), helper); + // XDCC looks safe, ip/port happens over regular DCC + // http://en.wikipedia.org/wiki/XDCC + if (msg.toUpperCase().startsWith("XDCC ") && helper != null && helper.isEnabled()) + return s; + if (ALLOW_ALL_CTCP_OUT) + return s; + return null; // Block all other ctcp + } + return s; + } + + if("USER".equals(command)) { + int idx = field[2].lastIndexOf(":"); + if(idx<0) + return "USER user hostname localhost :realname"; + String realname = field[2].substring(idx+1); + String ret = "USER "+field[1]+" hostname localhost :"+realname; + return ret; + } + + if ("PART".equals(command)) { + // hide client message + return "PART " + field[1] + " :leaving"; + } + + if ("QUIT".equals(command)) { + return "QUIT :leaving"; + } + + // Block the rest + return null; + } + + /** + *
+     *  DCC CHAT chat xxx.b32.i2p i2p-port        -> DCC CHAT chat IP port
+     *  DCC SEND file xxx.b32.i2p i2p-port length -> DCC SEND file IP port length
+     *  DCC RESUME file i2p-port offset           -> DCC RESUME file port offset
+     *  DCC ACCEPT file i2p-port offset           -> DCC ACCEPT file port offset
+     *  DCC xxx                                   -> null
+     *
+ * + * @param pfx the message through the "DCC " part + * @param msg the message after the "DCC " part + * @param helper may be null + * @return the sanitized message or null to block + * @since 0.8.9 + */ + private static String filterDCCIn(String pfx, String msg, DCCHelper helper) { + // strip trailing ctcp (other one is in pfx) + int ctcp = msg.indexOf(0x01); + if (ctcp > 0) + msg = msg.substring(0, ctcp); + String[] args = msg.split(" ", 5); + if (args.length <= 0) + return null; + String type = args[0]; + boolean haveIP = true; + // no IP in these, replace port only + if (type == "RESUME" || type == "ACCEPT") { + haveIP = false; + } else if (!(type.equals("CHAT") || type.equals("SEND"))) { + if (ALLOW_ALL_DCC_IN) { + if (ctcp > 0) + return pfx + msg + (char) 0x01; + return pfx + msg; + } + return null; + } + if (helper == null || !helper.isEnabled()) + return null; + if (args.length < 3) + return null; + if (haveIP && args.length < 4) + return null; + String arg = args[1]; + int nextArg = 2; + String b32 = null; + if (haveIP) + b32 = args[nextArg++]; + int cPort; + try { + String cp = args[nextArg++]; + cPort = Integer.parseInt(cp); + } catch (NumberFormatException nfe) { + return null; + } + + int port = -1; + if (haveIP) { + port = helper.newIncoming(b32, cPort, type); + } else if (type.equals("ACCEPT")) { + port = helper.acceptIncoming(cPort); + } else if (type.equals("RESUME")) { + port = helper.resumeIncoming(cPort); + } + if (port < 0) + return null; + StringBuilder buf = new StringBuilder(256); + buf.append(pfx) + .append(type).append(' ').append(arg).append(' '); + if (haveIP) { + byte[] myIP = helper.getLocalAddress(); + buf.append(DataHelper.fromLong(myIP, 0, myIP.length)).append(' '); + } + buf.append(port); + while (args.length > nextArg) { + buf.append(' ').append(args[nextArg++]); + } + if (pfx.indexOf(0x01) >= 0) + buf.append((char) 0x01); + return buf.toString(); + } + + /** + *
+     *  DCC CHAT chat IP port        -> DCC CHAT chat xxx.b32.i2p i2p-port
+     *  DCC SEND file IP port length -> DCC SEND file xxx.b32.i2p i2p-port length
+     *  DCC RESUME file port offset  -> DCC RESUME file i2p-port offset
+     *  DCC ACCEPT file port offset  -> DCC ACCEPT file i2p-port offset
+     *  DCC xxx                      -> null
+     *
+ * + * @param pfx the message through the "DCC " part + * @param msg the message after the "DCC " part + * @param helper may be null + * @return the sanitized message or null to block + * @since 0.8.9 + */ + private static String filterDCCOut(String pfx, String msg, DCCHelper helper) { + // strip trailing ctcp (other one is in pfx) + int ctcp = msg.indexOf(0x01); + if (ctcp > 0) + msg = msg.substring(0, ctcp); + String[] args = msg.split(" ", 5); + if (args.length <= 0) + return null; + String type = args[0]; + boolean haveIP = true; + // no IP in these, replace port only + if (type == "RESUME" || type == "ACCEPT") { + haveIP = false; + } else if (!(type.equals("CHAT") || type.equals("SEND"))) { + if (ALLOW_ALL_DCC_OUT) { + if (ctcp > 0) + return pfx + msg + (char) 0x01; + return pfx + msg; + } + } + if (helper == null || !helper.isEnabled()) + return null; + if (args.length < 3) + return null; + if (haveIP && args.length < 4) + return null; + String arg = args[1]; + byte[] ip = null; + int nextArg = 2; + if (haveIP) { + try { + String ips = args[nextArg++]; + long ipl = Long.parseLong(ips); + if (ipl < 0x01000000) { + // "reverse/firewall DCC" + // http://en.wikipedia.org/wiki/Direct_Client-to-Client + // xchat sends an IP of 199 and a port of 0 + System.err.println("Reverse / Firewall DCC not supported IP = 0x" + Long.toHexString(ipl)); + return null; + } + ip = DataHelper.toLong(4, ipl); + } catch (NumberFormatException nfe) { + return null; + } + } + int cPort; + try { + String cp = args[nextArg++]; + cPort = Integer.parseInt(cp); + } catch (NumberFormatException nfe) { + return null; + } + if (cPort <= 0) { + // "reverse/firewall DCC" + // http://en.wikipedia.org/wiki/Direct_Client-to-Client + System.err.println("Reverse / Firewall DCC not supported"); + return null; + } + + int port = -1; + if (haveIP) { + port = helper.newOutgoing(ip, cPort, type); + } else if (type.equals("ACCEPT")) { + port = helper.acceptOutgoing(cPort); + } else if (type.equals("RESUME")) { + port = helper.resumeOutgoing(cPort); + } + if (port < 0) + return null; + StringBuilder buf = new StringBuilder(256); + buf.append(pfx) + .append(type).append(' ').append(arg).append(' '); + if (haveIP) + buf.append(helper.getB32Hostname()).append(' '); + buf.append(port); + while (args.length > nextArg) { + buf.append(' ').append(args[nextArg++]); + } + if (pfx.indexOf(0x01) >= 0) + buf.append((char) 0x01); + return buf.toString(); + } +} diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/IrcInboundFilter.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/IrcInboundFilter.java new file mode 100644 index 000000000..ce301a48e --- /dev/null +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/IrcInboundFilter.java @@ -0,0 +1,101 @@ +package net.i2p.i2ptunnel.irc; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.Socket; + +import net.i2p.client.streaming.I2PSocket; +import net.i2p.util.Log; + +/** + * Thread to do inbound filtering. + * Moved from I2PTunnelIRCClient.java + * + * @since 0.8.9 + */ +public class IrcInboundFilter implements Runnable { + + private final Socket local; + private final I2PSocket remote; + private final StringBuffer expectedPong; + private final Log _log; + private final DCCHelper _dccHelper; + + public IrcInboundFilter(Socket lcl, I2PSocket rem, StringBuffer pong, Log log) { + this(lcl, rem, pong, log, null); + } + + /** + * @param helper may be null + * @since 0.8.9 + */ + public IrcInboundFilter(Socket lcl, I2PSocket rem, StringBuffer pong, Log log, DCCHelper helper) { + local = lcl; + remote = rem; + expectedPong = pong; + _log = log; + _dccHelper = helper; + } + + public void run() { + // Todo: Don't use BufferedReader - IRC spec limits line length to 512 but... + BufferedReader in; + OutputStream output; + try { + in = new BufferedReader(new InputStreamReader(remote.getInputStream(), "ISO-8859-1")); + output=local.getOutputStream(); + } catch (IOException e) { + if (_log.shouldLog(Log.ERROR)) + _log.error("IrcInboundFilter: no streams",e); + return; + } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("IrcInboundFilter: Running."); + try { + while(true) + { + try { + String inmsg = in.readLine(); + if(inmsg==null) + break; + if(inmsg.endsWith("\r")) + inmsg=inmsg.substring(0,inmsg.length()-1); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("in: [" + inmsg + "]"); + String outmsg = IRCFilter.inboundFilter(inmsg, expectedPong, _dccHelper); + if(outmsg!=null) + { + if(!inmsg.equals(outmsg)) { + if (_log.shouldLog(Log.WARN)) { + _log.warn("inbound FILTERED: "+outmsg); + _log.warn(" - inbound was: "+inmsg); + } + } else { + if (_log.shouldLog(Log.INFO)) + _log.info("inbound: "+outmsg); + } + outmsg=outmsg+"\r\n"; // rfc1459 sec. 2.3 + output.write(outmsg.getBytes("ISO-8859-1")); + // probably doesn't do much but can't hurt + output.flush(); + } else { + if (_log.shouldLog(Log.WARN)) + _log.warn("inbound BLOCKED: "+inmsg); + } + } catch (IOException e1) { + if (_log.shouldLog(Log.WARN)) + _log.warn("IrcInboundFilter: disconnected",e1); + break; + } + } + } catch (RuntimeException re) { + _log.error("Error filtering inbound data", re); + } finally { + try { local.close(); } catch (IOException e) {} + } + if(_log.shouldLog(Log.DEBUG)) + _log.debug("IrcInboundFilter: Done."); + } +} diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/IrcOutboundFilter.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/IrcOutboundFilter.java new file mode 100644 index 000000000..ce68835c2 --- /dev/null +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/IrcOutboundFilter.java @@ -0,0 +1,101 @@ +package net.i2p.i2ptunnel.irc; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.Socket; + +import net.i2p.client.streaming.I2PSocket; +import net.i2p.util.Log; + +/** + * Thread to do inbound filtering. + * Moved from I2PTunnelIRCClient.java + * + * @since 0.8.9 + */ +public class IrcOutboundFilter implements Runnable { + + private final Socket local; + private final I2PSocket remote; + private final StringBuffer expectedPong; + private final Log _log; + private final DCCHelper _dccHelper; + + public IrcOutboundFilter(Socket lcl, I2PSocket rem, StringBuffer pong, Log log) { + this(lcl, rem, pong, log, null); + } + + /** + * @param helper may be null + * @since 0.8.9 + */ + public IrcOutboundFilter(Socket lcl, I2PSocket rem, StringBuffer pong, Log log, DCCHelper helper) { + local = lcl; + remote = rem; + expectedPong = pong; + _log = log; + _dccHelper = helper; + } + + public void run() { + // Todo: Don't use BufferedReader - IRC spec limits line length to 512 but... + BufferedReader in; + OutputStream output; + try { + in = new BufferedReader(new InputStreamReader(local.getInputStream(), "ISO-8859-1")); + output=remote.getOutputStream(); + } catch (IOException e) { + if (_log.shouldLog(Log.ERROR)) + _log.error("IrcOutboundFilter: no streams",e); + return; + } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("IrcOutboundFilter: Running."); + try { + while(true) + { + try { + String inmsg = in.readLine(); + if(inmsg==null) + break; + if(inmsg.endsWith("\r")) + inmsg=inmsg.substring(0,inmsg.length()-1); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("out: [" + inmsg + "]"); + String outmsg = IRCFilter.outboundFilter(inmsg, expectedPong, _dccHelper); + if(outmsg!=null) + { + if(!inmsg.equals(outmsg)) { + if (_log.shouldLog(Log.WARN)) { + _log.warn("outbound FILTERED: "+outmsg); + _log.warn(" - outbound was: "+inmsg); + } + } else { + if (_log.shouldLog(Log.INFO)) + _log.info("outbound: "+outmsg); + } + outmsg=outmsg+"\r\n"; // rfc1459 sec. 2.3 + output.write(outmsg.getBytes("ISO-8859-1")); + // save 250 ms in streaming + output.flush(); + } else { + if (_log.shouldLog(Log.WARN)) + _log.warn("outbound BLOCKED: "+"\""+inmsg+"\""); + } + } catch (IOException e1) { + if (_log.shouldLog(Log.WARN)) + _log.warn("IrcOutboundFilter: disconnected",e1); + break; + } + } + } catch (RuntimeException re) { + _log.error("Error filtering outbound data", re); + } finally { + try { remote.close(); } catch (IOException e) {} + } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("IrcOutboundFilter: Done."); + } +} diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/I2PSOCKSIRCTunnel.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/I2PSOCKSIRCTunnel.java index a19763678..ee5156f8d 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/I2PSOCKSIRCTunnel.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/I2PSOCKSIRCTunnel.java @@ -11,7 +11,8 @@ import java.net.Socket; import net.i2p.I2PAppContext; import net.i2p.client.streaming.I2PSocket; import net.i2p.i2ptunnel.I2PTunnel; -import net.i2p.i2ptunnel.I2PTunnelIRCClient; +import net.i2p.i2ptunnel.irc.IrcInboundFilter; +import net.i2p.i2ptunnel.irc.IrcOutboundFilter; import net.i2p.i2ptunnel.Logging; import net.i2p.util.EventDispatcher; import net.i2p.util.I2PAppThread; @@ -50,10 +51,10 @@ public class I2PSOCKSIRCTunnel extends I2PSOCKSTunnel { Socket clientSock = serv.getClientSocket(); I2PSocket destSock = serv.getDestinationI2PSocket(this); StringBuffer expectedPong = new StringBuffer(); - Thread in = new I2PAppThread(new I2PTunnelIRCClient.IrcInboundFilter(clientSock, destSock, expectedPong, _log), + Thread in = new I2PAppThread(new IrcInboundFilter(clientSock, destSock, expectedPong, _log), "SOCKS IRC Client " + (++__clientId) + " in", true); in.start(); - Thread out = new I2PAppThread(new I2PTunnelIRCClient.IrcOutboundFilter(clientSock, destSock, expectedPong, _log), + Thread out = new I2PAppThread(new IrcOutboundFilter(clientSock, destSock, expectedPong, _log), "SOCKS IRC Client " + __clientId + " out", true); out.start(); } catch (SOCKSException e) { diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/web/EditBean.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/web/EditBean.java index 63f119bca..100e87bb6 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/web/EditBean.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/web/EditBean.java @@ -21,6 +21,7 @@ import net.i2p.data.Signature; import net.i2p.data.SigningPrivateKey; import net.i2p.i2ptunnel.I2PTunnelHTTPClient; import net.i2p.i2ptunnel.I2PTunnelHTTPClientBase; +import net.i2p.i2ptunnel.I2PTunnelIRCClient; import net.i2p.i2ptunnel.TunnelController; import net.i2p.i2ptunnel.TunnelControllerGroup; import net.i2p.util.Addresses; @@ -170,6 +171,11 @@ public class EditBean extends IndexBean { return getBooleanProperty(tunnel, "i2cp.encryptLeaseSet"); } + /** @since 0.8.9 */ + public boolean getDCC(int tunnel) { + return getBooleanProperty(tunnel, I2PTunnelIRCClient.PROP_DCC); + } + public String getEncryptKey(int tunnel) { return getProperty(tunnel, "i2cp.leaseSetKey", ""); } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/web/IndexBean.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/web/IndexBean.java index 10f80aa15..cae459d88 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/web/IndexBean.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/web/IndexBean.java @@ -28,6 +28,7 @@ import net.i2p.data.PrivateKeyFile; import net.i2p.data.SessionKey; import net.i2p.i2ptunnel.I2PTunnelHTTPClient; import net.i2p.i2ptunnel.I2PTunnelHTTPClientBase; +import net.i2p.i2ptunnel.I2PTunnelIRCClient; import net.i2p.i2ptunnel.TunnelController; import net.i2p.i2ptunnel.TunnelControllerGroup; import net.i2p.util.ConcurrentHashSet; @@ -711,6 +712,11 @@ public class IndexBean { _booleanOptions.add("i2cp.encryptLeaseSet"); } + /** @since 0.8.9 */ + public void setDCC(String moo) { + _booleanOptions.add(I2PTunnelIRCClient.PROP_DCC); + } + protected static final String PROP_ENABLE_ACCESS_LIST = "i2cp.enableAccessList"; protected static final String PROP_ENABLE_BLACKLIST = "i2cp.enableBlackList"; @@ -1020,13 +1026,28 @@ public class IndexBean { else config.setProperty("interface", ""); } + + if ("ircclient".equals(_type)) { + boolean dcc = _booleanOptions.contains(I2PTunnelIRCClient.PROP_DCC); + config.setProperty("option." + I2PTunnelIRCClient.PROP_DCC, + "" + dcc); + // add some sane server options since they aren't in the GUI (yet) + if (dcc) { + config.setProperty("options." + PROP_MAX_CONNS_MIN, "3"); + config.setProperty("options." + PROP_MAX_CONNS_HOUR, "10"); + config.setProperty("options." + PROP_MAX_TOTAL_CONNS_MIN, "5"); + config.setProperty("options." + PROP_MAX_TOTAL_CONNS_HOUR, "25"); + } + } + return config; } private static final String _noShowOpts[] = { "inbound.length", "outbound.length", "inbound.lengthVariance", "outbound.lengthVariance", "inbound.backupQuantity", "outbound.backupQuantity", "inbound.quantity", "outbound.quantity", - "inbound.nickname", "outbound.nickname", "i2p.streaming.connectDelay", "i2p.streaming.maxWindowSize" + "inbound.nickname", "outbound.nickname", "i2p.streaming.connectDelay", "i2p.streaming.maxWindowSize", + I2PTunnelIRCClient.PROP_DCC }; private static final String _booleanClientOpts[] = { "i2cp.reduceOnIdle", "i2cp.closeOnIdle", "i2cp.newDestOnResume", "persistentClientKey", "i2cp.delayOpen" @@ -1048,6 +1069,7 @@ public class IndexBean { PROP_MAX_TOTAL_CONNS_MIN, PROP_MAX_TOTAL_CONNS_HOUR, PROP_MAX_TOTAL_CONNS_DAY, PROP_MAX_STREAMS }; + protected static final Set _noShowSet = new HashSet(64); static { _noShowSet.addAll(Arrays.asList(_noShowOpts)); diff --git a/apps/i2ptunnel/jsp/editClient.jsp b/apps/i2ptunnel/jsp/editClient.jsp index b95452b12..ad2d14aa2 100644 --- a/apps/i2ptunnel/jsp/editClient.jsp +++ b/apps/i2ptunnel/jsp/editClient.jsp @@ -173,6 +173,15 @@ class="tickbox" /> <%=intl._("(Check the Box for 'YES')")%> + <% if ("ircclient".equals(tunnelType)) { %> +
+ + class="tickbox" /> + <%=intl._("(Check the Box for 'YES')")%> +
+ <% } // ircclient %> diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocket.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocket.java index fee9fe667..595385cc4 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocket.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocket.java @@ -70,6 +70,21 @@ public interface I2PSocket { public boolean isClosed(); public void setSocketErrorListener(SocketErrorListener lsnr); + + /** + * The remote port. + * @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0) + * @since 0.8.9 + */ + public int getPort(); + + /** + * The local port. + * @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0) + * @since 0.8.9 + */ + public int getLocalPort(); + /** * Allow notification of underlying errors communicating across I2P without * waiting for any sort of cleanup process. For example, if some data could diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java index 98c0a2cbf..05ff0631e 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java @@ -7,6 +7,7 @@ import java.io.OutputStream; import net.i2p.I2PAppContext; import net.i2p.I2PException; +import net.i2p.client.I2PSession; import net.i2p.client.I2PSessionException; import net.i2p.data.Destination; import net.i2p.util.Clock; @@ -301,6 +302,24 @@ class I2PSocketImpl implements I2PSocket { public long getCreatedOn() { return _createdOn; } public long getClosedOn() { return _closedOn; } + /** + * The remote port. + * @return 0 always + * @since 0.8.9 + */ + public int getPort() { + return I2PSession.PORT_UNSPECIFIED; + } + + /** + * The local port. + * @return 0 always + * @since 0.8.9 + */ + public int getLocalPort() { + return I2PSession.PORT_UNSPECIFIED; + } + private String getPrefix() { return "[" + _socketId + "]: "; } @@ -671,7 +690,7 @@ class I2PSocketImpl implements I2PSocket { return sent; } } - + @Override public String toString() { return "" + hashCode(); } } diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java index 34dc8ac59..57f5f7bb4 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java @@ -90,7 +90,7 @@ public class I2PSocketManagerFactory { * Create a socket manager using the destination loaded from the given private key * stream and connected to the default I2CP host and port. * - * @param myPrivateKeyStream private key stream + * @param myPrivateKeyStream private key stream, format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} * @return the newly created socket manager, or null if there were errors */ public static I2PSocketManager createManager(InputStream myPrivateKeyStream) { @@ -101,7 +101,7 @@ public class I2PSocketManagerFactory { * Create a socket manager using the destination loaded from the given private key * stream and connected to the default I2CP host and port. * - * @param myPrivateKeyStream private key stream + * @param myPrivateKeyStream private key stream, format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} * @param opts I2CP options * @return the newly created socket manager, or null if there were errors */ @@ -114,7 +114,7 @@ public class I2PSocketManagerFactory { * stream and connected to the I2CP router on the specified machine on the given * port * - * @param myPrivateKeyStream private key stream + * @param myPrivateKeyStream private key stream, format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} * @param i2cpHost I2CP host * @param i2cpPort I2CP port * @param opts I2CP options diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java index 94532e51b..d926eb831 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java @@ -2,7 +2,7 @@ package net.i2p.client.streaming; /** * Define the configuration for streaming and verifying data on the socket. - * + * Use I2PSocketManager.buildOptions() to get one of these. */ public interface I2PSocketOptions { /** How much data will we accept that hasn't been written out yet. */ @@ -81,4 +81,32 @@ public interface I2PSocketOptions { * @param ms wait time to block on the output stream while waiting for the data to flush. */ public void setWriteTimeout(long ms); + + /** + * The remote port. + * @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0) + * @since 0.8.9 + */ + public int getPort(); + + /** + * The remote port. + * @param port 0 - 65535 + * @since 0.8.9 + */ + public void setPort(int port); + + /** + * The local port. + * @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0) + * @since 0.8.9 + */ + public int getLocalPort(); + + /** + * The local port. + * @param port 0 - 65535 + * @since 0.8.9 + */ + public void setLocalPort(int port); } diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptionsImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptionsImpl.java index b1fedcea7..cb66b1486 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptionsImpl.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptionsImpl.java @@ -4,22 +4,32 @@ import java.util.Properties; /** * Define the configuration for streaming and verifying data on the socket. - * + * Use I2PSocketManager.buildOptions() to get one of these. */ class I2PSocketOptionsImpl implements I2PSocketOptions { private long _connectTimeout; private long _readTimeout; private long _writeTimeout; private int _maxBufferSize; + private int _localPort; + private int _remotePort; public static final int DEFAULT_BUFFER_SIZE = 1024*64; public static final int DEFAULT_WRITE_TIMEOUT = -1; public static final int DEFAULT_CONNECT_TIMEOUT = 60*1000; + /** + * Sets max buffer size, connect timeout, read timeout, and write timeout + * from System properties. Does not set local port or remote port. + */ public I2PSocketOptionsImpl() { this(System.getProperties()); } + /** + * Initializes from System properties then copies over all options. + * @param opts may be null + */ public I2PSocketOptionsImpl(I2PSocketOptions opts) { this(System.getProperties()); if (opts != null) { @@ -27,13 +37,25 @@ class I2PSocketOptionsImpl implements I2PSocketOptions { _readTimeout = opts.getReadTimeout(); _writeTimeout = opts.getWriteTimeout(); _maxBufferSize = opts.getMaxBufferSize(); + _localPort = opts.getLocalPort(); + _remotePort = opts.getPort(); } } + /** + * Sets max buffer size, connect timeout, read timeout, and write timeout + * from properties. Does not set local port or remote port. + * @param opts may be null + */ public I2PSocketOptionsImpl(Properties opts) { init(opts); } + /** + * Sets max buffer size, connect timeout, read timeout, and write timeout + * from properties. Does not set local port or remote port. + * @param opts may be null + */ public void setProperties(Properties opts) { if (opts == null) return; if (opts.containsKey(PROP_BUFFER_SIZE)) @@ -46,6 +68,10 @@ class I2PSocketOptionsImpl implements I2PSocketOptions { _writeTimeout = getInt(opts, PROP_WRITE_TIMEOUT, DEFAULT_WRITE_TIMEOUT); } + /** + * Sets max buffer size, connect timeout, read timeout, and write timeout + * from properties. Does not set local port or remote port. + */ protected void init(Properties opts) { _maxBufferSize = getInt(opts, PROP_BUFFER_SIZE, DEFAULT_BUFFER_SIZE); _connectTimeout = getInt(opts, PROP_CONNECT_TIMEOUT, DEFAULT_CONNECT_TIMEOUT); @@ -144,4 +170,40 @@ class I2PSocketOptionsImpl implements I2PSocketOptions { public void setWriteTimeout(long ms) { _writeTimeout = ms; } + + /** + * The remote port. + * @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0) + * @since 0.8.9 + */ + public int getPort() { + return _remotePort; + } + + /** + * The remote port. + * @param port 0 - 65535 + * @since 0.8.9 + */ + public void setPort(int port) { + _remotePort = port; + } + + /** + * The local port. + * @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0) + * @since 0.8.9 + */ + public int getLocalPort() { + return _localPort; + } + + /** + * The local port. + * @param port 0 - 65535 + * @since 0.8.9 + */ + public void setLocalPort(int port) { + _localPort = port; + } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index 0eb8fe66f..20105da77 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -23,25 +23,25 @@ import net.i2p.util.SimpleTimer2; * */ class Connection { - private I2PAppContext _context; - private Log _log; - private ConnectionManager _connectionManager; + private final I2PAppContext _context; + private final Log _log; + private final ConnectionManager _connectionManager; private Destination _remotePeer; private long _sendStreamId; private long _receiveStreamId; private long _lastSendTime; - private AtomicLong _lastSendId; + private final AtomicLong _lastSendId; private boolean _resetReceived; private boolean _resetSent; private long _resetSentOn; private boolean _connected; private boolean _hardDisconnected; - private MessageInputStream _inputStream; - private MessageOutputStream _outputStream; - private SchedulerChooser _chooser; + private final MessageInputStream _inputStream; + private final MessageOutputStream _outputStream; + private final SchedulerChooser _chooser; private long _nextSendTime; private long _ackedPackets; - private long _createdOn; + private final long _createdOn; private long _closeSentOn; private long _closeReceivedOn; private int _unackedPacketsReceived; @@ -51,10 +51,10 @@ class Connection { private boolean _updatedShareOpts; /** Packet ID (Long) to PacketLocal for sent but unacked packets */ private final Map _outboundPackets; - private PacketQueue _outboundQueue; - private ConnectionPacketHandler _handler; + private final PacketQueue _outboundQueue; + private final ConnectionPacketHandler _handler; private ConnectionOptions _options; - private ConnectionDataReceiver _receiver; + private final ConnectionDataReceiver _receiver; private I2PSocketFull _socket; /** set to an error cause if the connection could not be established */ private String _connectionError; @@ -70,8 +70,10 @@ class Connection { private final Object _connectLock; /** how many messages have been resent and not yet ACKed? */ private int _activeResends; - private ConEvent _connectionEvent; - private int _randomWait; + private final ConEvent _connectionEvent; + private final int _randomWait; + private int _localPort; + private int _remotePort; private long _lifetimeBytesSent; private long _lifetimeBytesReceived; @@ -86,10 +88,13 @@ class Connection { public static final int MAX_WINDOW_SIZE = 128; - public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, PacketQueue queue, ConnectionPacketHandler handler) { + public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, + PacketQueue queue, ConnectionPacketHandler handler) { this(ctx, manager, chooser, queue, handler, null); } - public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, PacketQueue queue, ConnectionPacketHandler handler, ConnectionOptions opts) { + + public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, + PacketQueue queue, ConnectionPacketHandler handler, ConnectionOptions opts) { _context = ctx; _connectionManager = manager; _chooser = chooser; @@ -101,34 +106,31 @@ class Connection { // FIXME pass through a passive flush delay setting as the 4th arg _outputStream = new MessageOutputStream(_context, _receiver, (opts == null ? Packet.MAX_PAYLOAD_SIZE : opts.getMaxMessageSize())); _outboundPackets = new TreeMap(); + if (opts != null) { + _localPort = opts.getLocalPort(); + _remotePort = opts.getPort(); + } _options = (opts != null ? opts : new ConnectionOptions()); _outputStream.setWriteTimeout((int)_options.getWriteTimeout()); _inputStream.setReadTimeout((int)_options.getReadTimeout()); _lastSendId = new AtomicLong(-1); _nextSendTime = -1; - _ackedPackets = 0; _createdOn = _context.clock().now(); _closeSentOn = -1; _closeReceivedOn = -1; - _unackedPacketsReceived = 0; _congestionWindowEnd = _options.getWindowSize()-1; _highestAckedThrough = -1; _lastCongestionSeenAt = MAX_WINDOW_SIZE*2; // lets allow it to grow _lastCongestionTime = -1; _lastCongestionHighestUnacked = -1; - _resetReceived = false; _connected = true; _disconnectScheduledOn = -1; _lastReceivedOn = -1; _activityTimer = new ActivityTimer(); _ackSinceCongestion = true; _connectLock = new Object(); - _activeResends = 0; _resetSentOn = -1; - _isInbound = false; - _updatedShareOpts = false; _connectionEvent = new ConEvent(); - _hardDisconnected = false; _randomWait = _context.random().nextInt(10*1000); // just do this once to reduce usage _context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.chokeSizeBegin", "How many messages were outstanding when we started to choke?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); @@ -678,6 +680,23 @@ class Connection { public I2PSocketFull getSocket() { return _socket; } public void setSocket(I2PSocketFull socket) { _socket = socket; } + /** + * The remote port. + * @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0) + * @since 0.8.9 + */ + public int getPort() { + return _remotePort; + } + + /** + * @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0) + * @since 0.8.9 + */ + public int getLocalPort() { + return _localPort; + } + public String getConnectionError() { return _connectionError; } public void setConnectionError(String err) { _connectionError = err; } @@ -781,7 +800,7 @@ class Connection { } public int getLastCongestionSeenAt() { return _lastCongestionSeenAt; } - + void congestionOccurred() { // if we hit congestion and e.g. 5 packets are resent, // dont set the size to (winSize >> 4). only set the @@ -962,12 +981,13 @@ class Connection { * @return the inbound message stream */ public MessageInputStream getInputStream() { return _inputStream; } + /** stream that the local peer sends data to the remote peer on * @return the outbound message stream */ public MessageOutputStream getOutputStream() { return _outputStream; } - - @Override + + @Override public String toString() { StringBuilder buf = new StringBuilder(128); buf.append("[Connection "); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java index 07247e670..900fb9626 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java @@ -13,11 +13,14 @@ import net.i2p.util.Log; * */ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { - private I2PAppContext _context; - private Log _log; - private Connection _connection; + private final I2PAppContext _context; + private final Log _log; + private final Connection _connection; private static final MessageOutputStream.WriteStatus _dummyStatus = new DummyStatus(); + /** + * @param con non-null + */ public ConnectionDataReceiver(I2PAppContext ctx, Connection con) { _context = ctx; _log = ctx.logManager().getLog(ConnectionDataReceiver.class); @@ -41,10 +44,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { * @return !flush */ public boolean writeInProcess() { - Connection con = _connection; - if (con != null) - return con.getUnackedPacketsSent() >= con.getOptions().getWindowSize(); - return false; + return _connection.getUnackedPacketsSent() >= _connection.getOptions().getWindowSize(); } /** @@ -60,7 +60,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { */ public MessageOutputStream.WriteStatus writeData(byte[] buf, int off, int size) { Connection con = _connection; - if (con == null) return _dummyStatus; + //if (con == null) return _dummyStatus; boolean doSend = true; if ( (size <= 0) && (con.getLastSendId() >= 0) ) { if (con.getOutputStream().getClosed()) { @@ -121,7 +121,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { */ public PacketLocal send(byte buf[], int off, int size, boolean forceIncrement) { Connection con = _connection; - if (con == null) return null; + //if (con == null) return null; long before = System.currentTimeMillis(); PacketLocal packet = buildPacket(con, buf, off, size, forceIncrement); long built = System.currentTimeMillis(); @@ -185,6 +185,8 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { packet.setFlag(Packet.FLAG_SYNCHRONIZE); packet.setOptionalFrom(con.getSession().getMyDestination()); packet.setOptionalMaxSize(con.getOptions().getMaxMessageSize()); + packet.setLocalPort(con.getLocalPort()); + packet.setRemotePort(con.getPort()); } if (con.getSendStreamId() == Packet.STREAM_ID_UNKNOWN) { packet.setFlag(Packet.FLAG_NO_ACK); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java index 6ba876dd9..e2c9d3f55 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java @@ -18,10 +18,10 @@ import net.i2p.util.SimpleTimer; * @author zzz modded to use concurrent and bound queue size */ class ConnectionHandler { - private I2PAppContext _context; - private Log _log; - private ConnectionManager _manager; - private LinkedBlockingQueue _synQueue; + private final I2PAppContext _context; + private final Log _log; + private final ConnectionManager _manager; + private final LinkedBlockingQueue _synQueue; private boolean _active; private int _acceptTimeout; @@ -41,7 +41,6 @@ class ConnectionHandler { _log = context.logManager().getLog(ConnectionHandler.class); _manager = mgr; _synQueue = new LinkedBlockingQueue(MAX_QUEUE_SIZE); - _active = false; _acceptTimeout = DEFAULT_ACCEPT_TIMEOUT; } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java index 434f09585..f50a4b5cc 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java @@ -21,24 +21,24 @@ import net.i2p.util.SimpleTimer; * */ class ConnectionManager { - private I2PAppContext _context; - private Log _log; - private I2PSession _session; - private MessageHandler _messageHandler; - private PacketHandler _packetHandler; - private ConnectionHandler _connectionHandler; - private PacketQueue _outboundQueue; - private SchedulerChooser _schedulerChooser; - private ConnectionPacketHandler _conPacketHandler; - private TCBShare _tcbShare; + private final I2PAppContext _context; + private final Log _log; + private final I2PSession _session; + private final MessageHandler _messageHandler; + private final PacketHandler _packetHandler; + private final ConnectionHandler _connectionHandler; + private final PacketQueue _outboundQueue; + private final SchedulerChooser _schedulerChooser; + private final ConnectionPacketHandler _conPacketHandler; + private final TCBShare _tcbShare; /** Inbound stream ID (Long) to Connection map */ - private ConcurrentHashMap _connectionByInboundId; + private final ConcurrentHashMap _connectionByInboundId; /** Ping ID (Long) to PingRequest */ private final Map _pendingPings; private boolean _allowIncoming; private boolean _throttlersInitialized; private int _maxConcurrentStreams; - private ConnectionOptions _defaultOptions; + private final ConnectionOptions _defaultOptions; private volatile int _numWaiting; private long _soTimeout; private ConnThrottler _minuteThrottler; @@ -59,10 +59,12 @@ class ConnectionManager { _schedulerChooser = new SchedulerChooser(_context); _conPacketHandler = new ConnectionPacketHandler(_context); _tcbShare = new TCBShare(_context); - _session.setSessionListener(_messageHandler); + // PROTO_ANY is for backward compatibility (pre-0.7.1) + // TODO change proto to PROTO_STREAMING someday. + // Right now we get everything, and rely on Datagram to specify PROTO_UDP. + // PacketQueue has sent PROTO_STREAMING since the beginning of mux support (0.7.1) + _session.addMuxedSessionListener(_messageHandler, I2PSession.PROTO_ANY, I2PSession.PORT_ANY); _outboundQueue = new PacketQueue(_context, _session, this); - _allowIncoming = false; - _numWaiting = 0; /** Socket timeout for accept() */ _soTimeout = -1; @@ -141,7 +143,10 @@ class ConnectionManager { * it, or null if the syn's streamId was already taken */ public Connection receiveConnection(Packet synPacket) { - Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, new ConnectionOptions(_defaultOptions)); + ConnectionOptions opts = new ConnectionOptions(_defaultOptions); + opts.setPort(synPacket.getRemotePort()); + opts.setLocalPort(synPacket.getLocalPort()); + Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts); _tcbShare.updateOptsFromShare(con); con.setInbound(); long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java index ae14daa15..e6ec6fa42 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java @@ -106,6 +106,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { * This is based on documentation, the code, and logging, however there are still * some parts that could use more research. * + *
      *  1024 Tunnel Message
      *  - 21 Header (see router/tunnel/BatchedPreprocessor.java)
      * -----
@@ -169,7 +170,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
      * Similarly:
      *   3 msgs: 2722
      *   4 msgs: 3714
-     *
+     *
* * Before release 0.6.1.14 this was 4096. * From release 0.6.1.14 through release 0.6.4, this was 960. @@ -205,18 +206,35 @@ class ConnectionOptions extends I2PSocketOptionsImpl { public static final int DEFAULT_MAX_MESSAGE_SIZE = 1730; public static final int MIN_MESSAGE_SIZE = 512; + /** + * Sets max buffer size, connect timeout, read timeout, and write timeout + * from System properties. Does not set local port or remote port. + */ public ConnectionOptions() { super(); } + /** + * Sets max buffer size, connect timeout, read timeout, and write timeout + * from properties. Does not set local port or remote port. + * @param opts may be null + */ public ConnectionOptions(Properties opts) { super(opts); } + /** + * Initializes from System properties then copies over all options. + * @param opts may be null + */ public ConnectionOptions(I2PSocketOptions opts) { super(opts); } + /** + * Initializes from System properties then copies over all options. + * @param opts may be null + */ public ConnectionOptions(ConnectionOptions opts) { super(opts); if (opts != null) { @@ -235,8 +253,10 @@ class ConnectionOptions extends I2PSocketOptionsImpl { setInboundBufferSize(opts.getInboundBufferSize()); setCongestionAvoidanceGrowthRateFactor(opts.getCongestionAvoidanceGrowthRateFactor()); setSlowStartGrowthRateFactor(opts.getSlowStartGrowthRateFactor()); - setWriteTimeout(opts.getWriteTimeout()); - setReadTimeout(opts.getReadTimeout()); + // handled in super() + // not clear why added by jr 12/22/2005 + //setWriteTimeout(opts.getWriteTimeout()); + //setReadTimeout(opts.getReadTimeout()); setAnswerPings(opts.getAnswerPings()); initLists(opts); _maxConnsPerMinute = opts.getMaxConnsPerMinute(); @@ -248,7 +268,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { } } - @Override + @Override protected void init(Properties opts) { super.init(opts); _trend = new int[TREND_COUNT]; @@ -262,12 +282,14 @@ class ConnectionOptions extends I2PSocketOptionsImpl { setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, DEFAULT_INITIAL_ACK_DELAY)); setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, INITIAL_WINDOW_SIZE)); setMaxResends(getInt(opts, PROP_MAX_RESENDS, DEFAULT_MAX_SENDS)); - setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1)); + // handled in super() + //setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1)); setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 90*1000)); setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, INACTIVITY_ACTION_SEND)); setInboundBufferSize(getMaxMessageSize() * (Connection.MAX_WINDOW_SIZE + 2)); setCongestionAvoidanceGrowthRateFactor(getInt(opts, PROP_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR, 1)); setSlowStartGrowthRateFactor(getInt(opts, PROP_SLOW_START_GROWTH_RATE_FACTOR, 1)); + // overrides default in super() setConnectTimeout(getInt(opts, PROP_CONNECT_TIMEOUT, Connection.DISCONNECT_TIMEOUT)); setAnswerPings(getBool(opts, PROP_ANSWER_PINGS, DEFAULT_ANSWER_PINGS)); initLists(opts); @@ -279,7 +301,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { _maxTotalConnsPerDay = getInt(opts, PROP_MAX_TOTAL_CONNS_DAY, 0); } - @Override + @Override public void setProperties(Properties opts) { super.setProperties(opts); if (opts == null) return; @@ -303,8 +325,9 @@ class ConnectionOptions extends I2PSocketOptionsImpl { setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, INITIAL_WINDOW_SIZE)); if (opts.containsKey(PROP_MAX_RESENDS)) setMaxResends(getInt(opts, PROP_MAX_RESENDS, DEFAULT_MAX_SENDS)); - if (opts.containsKey(PROP_WRITE_TIMEOUT)) - setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1)); + // handled in super() + //if (opts.containsKey(PROP_WRITE_TIMEOUT)) + // setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1)); if (opts.containsKey(PROP_INACTIVITY_TIMEOUT)) setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 90*1000)); if (opts.containsKey(PROP_INACTIVITY_ACTION)) @@ -316,6 +339,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { setSlowStartGrowthRateFactor(getInt(opts, PROP_SLOW_START_GROWTH_RATE_FACTOR, 2)); if (opts.containsKey(PROP_CONNECT_TIMEOUT)) // wow 5 minutes!!! FIXME!! + // overrides default in super() setConnectTimeout(getInt(opts, PROP_CONNECT_TIMEOUT, Connection.DISCONNECT_TIMEOUT)); if (opts.containsKey(PROP_ANSWER_PINGS)) setAnswerPings(getBool(opts, PROP_ANSWER_PINGS, DEFAULT_ANSWER_PINGS)); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java index 526fae372..53ebb17e1 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -15,8 +15,8 @@ import net.i2p.util.SimpleTimer; * */ class ConnectionPacketHandler { - private I2PAppContext _context; - private Log _log; + private final I2PAppContext _context; + private final Log _log; public ConnectionPacketHandler(I2PAppContext context) { _context = context; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java index 71e1dd3ac..f40dbd0c7 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java @@ -8,7 +8,7 @@ import net.i2p.I2PException; * */ class I2PServerSocketFull implements I2PServerSocket { - private I2PSocketManagerFull _socketManager; + private final I2PSocketManagerFull _socketManager; public I2PServerSocketFull(I2PSocketManagerFull mgr) { _socketManager = mgr; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java index f8dbe74ea..5cd76a864 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java @@ -4,6 +4,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import net.i2p.client.I2PSession; import net.i2p.data.Destination; /** @@ -127,7 +128,28 @@ class I2PSocketFull implements I2PSocket { if (c != null) c.disconnectComplete(); } - @Override + + /** + * The remote port. + * @return the port or 0 if unknown + * @since 0.8.9 + */ + public int getPort() { + Connection c = _connection; + return c == null ? I2PSession.PORT_UNSPECIFIED : c.getPort(); + } + + /** + * The local port. + * @return the port or 0 if unknown + * @since 0.8.9 + */ + public int getLocalPort() { + Connection c = _connection; + return c == null ? I2PSession.PORT_UNSPECIFIED : c.getLocalPort(); + } + + @Override public String toString() { Connection c = _connection; if (c == null) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java index 791aa13ac..d9ca691b4 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java @@ -7,7 +7,7 @@ import java.util.concurrent.CopyOnWriteArraySet; import net.i2p.I2PAppContext; import net.i2p.client.I2PSession; import net.i2p.client.I2PSessionException; -import net.i2p.client.I2PSessionListener; +import net.i2p.client.I2PSessionMuxedListener; import net.i2p.util.Log; /** @@ -15,7 +15,7 @@ import net.i2p.util.Log; * Packets, if we can. * */ -class MessageHandler implements I2PSessionListener { +class MessageHandler implements I2PSessionMuxedListener { private final ConnectionManager _manager; private final I2PAppContext _context; private final Log _log; @@ -31,11 +31,23 @@ class MessageHandler implements I2PSessionListener { /** Instruct the client that the given session has received a message with * size # of bytes. + * This shouldn't be called anymore since we are registering as a muxed listener. * @param session session to notify * @param msgId message number available * @param size size of the message */ public void messageAvailable(I2PSession session, int msgId, long size) { + messageAvailable(session, msgId, size, I2PSession.PROTO_UNSPECIFIED, + I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED); + } + + /** Instruct the client that the given session has received a message with + * size # of bytes. + * @param session session to notify + * @param msgId message number available + * @param size size of the message + */ + public void messageAvailable(I2PSession session, int msgId, long size, int proto, int fromPort, int toPort) { byte data[] = null; try { data = session.receiveMessage(msgId); @@ -49,6 +61,8 @@ class MessageHandler implements I2PSessionListener { Packet packet = new Packet(); try { packet.readPacket(data, 0, data.length); + packet.setRemotePort(fromPort); + packet.setLocalPort(toPort); _manager.getPacketHandler().receivePacket(packet); } catch (IllegalArgumentException iae) { _context.statManager().addRateData("stream.packetReceiveFailure", 1, 0); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java index 71c9ebce9..737e0f7b2 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java @@ -63,14 +63,10 @@ class MessageOutputStream extends OutputStream { _buf = _dataCache.acquire().getData(); // new byte[bufSize]; _dataReceiver = receiver; _dataLock = new Object(); - _written = 0; - _closed = false; _writeTimeout = -1; _passiveFlushDelay = passiveFlushDelay; _nextBufferSize = -1; _sendPeriodBeginTime = ctx.clock().now(); - _sendPeriodBytes = 0; - _sendBps = 0; _context.statManager().createRateStat("stream.sendBps", "How fast we pump data through the stream", "Stream", new long[] { 60*1000, 5*60*1000, 60*60*1000 }); _flusher = new Flusher(); if (_log.shouldLog(Log.DEBUG)) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java index 67fdf1549..437eb5dee 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java @@ -13,6 +13,13 @@ import net.i2p.data.SigningPrivateKey; import net.i2p.util.Log; /** + * This contains solely the data that goes out on the wire, + * including the local and remote port which is embedded in + * the I2CP overhead, not in the packet itself. + * For local state saved for outbound packets, see PacketLocal. + * + *

+ * * Contain a single packet transferred as part of a streaming connection. * The data format is as follows:

    *
  • {@link #getSendStreamId sendStreamId} [4 byte value]
  • @@ -67,6 +74,8 @@ class Packet { private Destination _optionFrom; private int _optionDelay; private int _optionMaxSize; + private int _localPort; + private int _remotePort; /** * The receiveStreamId will be set to this when the packet doesn't know @@ -148,6 +157,10 @@ class Packet { public static final int DEFAULT_MAX_SIZE = 32*1024; protected static final int MAX_DELAY_REQUEST = 65535; + /** + * Does no initialization. + * See readPacket() for inbound packets, and the setters for outbound packets. + */ public Packet() { } private boolean _sendStreamIdSet = false; @@ -322,6 +335,40 @@ class Packet { _optionMaxSize = numBytes; } + /** + * @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0) + * @since 0.8.9 + */ + public int getLocalPort() { + return _localPort; + } + + /** + * Must be called to change the port, not set by readPacket() + * as the port is out-of-band in the I2CP header. + * @since 0.8.9 + */ + public void setLocalPort(int port) { + _localPort = port; + } + + /** + * @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0) + * @since 0.8.9 + */ + public int getRemotePort() { + return _remotePort; + } + + /** + * Must be called to change the port, not set by readPacket() + * as the port is out-of-band in the I2CP header. + * @since 0.8.9 + */ + public void setRemotePort(int port) { + _remotePort = port; + } + /** * Write the packet to the buffer (starting at the offset) and return * the number of bytes written. diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java index 4f977712c..ef145179c 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java @@ -16,9 +16,9 @@ import net.i2p.util.Log; * */ class PacketHandler { - private ConnectionManager _manager; - private I2PAppContext _context; - private Log _log; + private final ConnectionManager _manager; + private final I2PAppContext _context; + private final Log _log; //private int _lastDelay; //private int _dropped; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java index dd5fe1ceb..ca2e25d42 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java @@ -13,13 +13,13 @@ import net.i2p.util.SimpleTimer2; * retries, etc. */ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { - private I2PAppContext _context; - private Log _log; - private Connection _connection; + private final I2PAppContext _context; + private final Log _log; + private final Connection _connection; private Destination _to; private SessionKey _keyUsed; private Set _tagsSent; - private long _createdOn; + private final long _createdOn; private int _numSends; private long _lastSend; private long _acceptedOn; @@ -29,9 +29,11 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { private volatile boolean _retransmitted; private SimpleTimer2.TimedEvent _resendEvent; + /** not bound to a connection */ public PacketLocal(I2PAppContext ctx, Destination to) { this(ctx, to, null); } + public PacketLocal(I2PAppContext ctx, Destination to, Connection con) { _context = ctx; _createdOn = ctx.clock().now(); @@ -40,8 +42,6 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { _connection = con; _lastSend = -1; _cancelledOn = -1; - _nackCount = 0; - _retransmitted = false; } public Destination getTo() { return _to; } @@ -138,6 +138,8 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { } public int getNumSends() { return _numSends; } public long getLastSend() { return _lastSend; } + + /** @return null if not bound */ public Connection getConnection() { return _connection; } public void incrementNACKs() { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java index fa0aa87ce..4de4c6e16 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java @@ -19,11 +19,11 @@ import net.i2p.util.Log; * */ class PacketQueue { - private I2PAppContext _context; - private Log _log; - private I2PSession _session; - private ConnectionManager _connectionManager; - private ByteCache _cache = ByteCache.getInstance(64, 36*1024); + private final I2PAppContext _context; + private final Log _log; + private final I2PSession _session; + private final ConnectionManager _connectionManager; + private final ByteCache _cache = ByteCache.getInstance(64, 36*1024); public PacketQueue(I2PAppContext context, I2PSession session, ConnectionManager mgr) { _context = context; @@ -98,7 +98,7 @@ class PacketQueue { // I2PSession.PROTO_STREAMING, I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED); // I2PSessionMuxedImpl no tags sent = _session.sendMessage(packet.getTo(), buf, 0, size, null, null, expires, - I2PSession.PROTO_STREAMING, I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED); + I2PSession.PROTO_STREAMING, packet.getLocalPort(), packet.getRemotePort()); else // I2PSessionImpl2 //sent = _session.sendMessage(packet.getTo(), buf, 0, size, keyUsed, tagsSent, 0); @@ -107,7 +107,7 @@ class PacketQueue { // I2PSession.PROTO_STREAMING, I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED); // I2PSessionMuxedImpl no tags sent = _session.sendMessage(packet.getTo(), buf, 0, size, null, null, - I2PSession.PROTO_STREAMING, I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED); + I2PSession.PROTO_STREAMING, packet.getLocalPort(), packet.getRemotePort()); end = _context.clock().now(); if ( (end-begin > 1000) && (_log.shouldLog(Log.WARN)) ) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/StandardSocket.java b/apps/streaming/java/src/net/i2p/client/streaming/StandardSocket.java index 6ba78bfd4..b0ffbf250 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/StandardSocket.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/StandardSocket.java @@ -107,11 +107,11 @@ class StandardSocket extends Socket { } /** - * @return -1 always + * @return the port or 0 if unknown */ @Override public int getLocalPort() { - return -1; + return _socket.getLocalPort(); } /** @@ -139,11 +139,11 @@ class StandardSocket extends Socket { } /** - * @return 0 always + * @return the port or 0 if unknown */ @Override public int getPort() { - return 0; + return _socket.getPort(); } @Override diff --git a/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java b/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java index d2d02021a..d33e7a741 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java @@ -21,10 +21,10 @@ import net.i2p.util.SimpleTimer2; * */ class TCBShare { - private I2PAppContext _context; - private Log _log; - private Map _cache; - private CleanEvent _cleaner; + private final I2PAppContext _context; + private final Log _log; + private final Map _cache; + private final CleanEvent _cleaner; private static final long EXPIRE_TIME = 30*60*1000; private static final long CLEAN_TIME = 10*60*1000; diff --git a/core/java/src/net/i2p/client/I2PClient.java b/core/java/src/net/i2p/client/I2PClient.java index adc03bb27..6521cf258 100644 --- a/core/java/src/net/i2p/client/I2PClient.java +++ b/core/java/src/net/i2p/client/I2PClient.java @@ -44,7 +44,9 @@ public interface I2PClient { * using the specified options to both connect to the router, to instruct * the router how to handle the new session, and to configure the end to end * encryption. - * @param destKeyStream location from which to read the Destination, PrivateKey, and SigningPrivateKey from + * + * @param destKeyStream location from which to read the Destination, PrivateKey, and SigningPrivateKey from, + * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} * @param options set of options to configure the router with, if null will use System properties * @return new session allowing a Destination to recieve all of its messages and send messages to any other Destination. */ @@ -52,8 +54,10 @@ public interface I2PClient { /** Create a new destination with the default certificate creation properties and store * it, along with the private encryption and signing keys at the specified location + * * @param destKeyStream create a new destination and write out the object to the given stream, * formatted as Destination, PrivateKey, and SigningPrivateKey + * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} * @return new destination */ public Destination createDestination(OutputStream destKeyStream) throws I2PException, IOException; @@ -61,7 +65,8 @@ public interface I2PClient { /** Create a new destination with the given certificate and store it, along with the private * encryption and signing keys at the specified location * - * @param destKeyStream location to write out the destination, PrivateKey, and SigningPrivateKey + * @param destKeyStream location to write out the destination, PrivateKey, and SigningPrivateKey, + * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} * @param cert certificate to tie to the destination * @return newly created destination */ diff --git a/core/java/src/net/i2p/client/I2PClientImpl.java b/core/java/src/net/i2p/client/I2PClientImpl.java index d0147f0d7..23033dbf0 100644 --- a/core/java/src/net/i2p/client/I2PClientImpl.java +++ b/core/java/src/net/i2p/client/I2PClientImpl.java @@ -30,8 +30,12 @@ import net.i2p.data.SigningPublicKey; * @author jrandom */ class I2PClientImpl implements I2PClient { + /** * Create the destination with a null payload + * + * @param destKeyStream location to write out the destination, PrivateKey, and SigningPrivateKey, + * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} */ public Destination createDestination(OutputStream destKeyStream) throws I2PException, IOException { Certificate cert = new Certificate(); @@ -44,6 +48,8 @@ class I2PClientImpl implements I2PClient { * Create the destination with the given payload and write it out along with * the PrivateKey and SigningPrivateKey to the destKeyStream * + * @param destKeyStream location to write out the destination, PrivateKey, and SigningPrivateKey, + * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} */ public Destination createDestination(OutputStream destKeyStream, Certificate cert) throws I2PException, IOException { Destination d = new Destination(); @@ -67,13 +73,20 @@ class I2PClientImpl implements I2PClient { /** * Create a new session (though do not connect it yet) + * + * @param destKeyStream location from which to read the Destination, PrivateKey, and SigningPrivateKey from, + * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} * @param options set of options to configure the router with, if null will use System properties */ public I2PSession createSession(InputStream destKeyStream, Properties options) throws I2PSessionException { return createSession(I2PAppContext.getGlobalContext(), destKeyStream, options); } + /** * Create a new session (though do not connect it yet) + * + * @param destKeyStream location from which to read the Destination, PrivateKey, and SigningPrivateKey from, + * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} * @param options set of options to configure the router with, if null will use System properties */ public I2PSession createSession(I2PAppContext context, InputStream destKeyStream, Properties options) throws I2PSessionException { diff --git a/core/java/src/net/i2p/client/I2PSession.java b/core/java/src/net/i2p/client/I2PSession.java index d21c858d7..06b094488 100644 --- a/core/java/src/net/i2p/client/I2PSession.java +++ b/core/java/src/net/i2p/client/I2PSession.java @@ -217,11 +217,34 @@ public interface I2PSession { */ public int[] bandwidthLimits() throws I2PSessionException; - /** See I2PSessionMuxedImpl for details */ + /** + * Listen on specified protocol and port. + * + * An existing listener with the same proto and port is replaced. + * Only the listener with the best match is called back for each message. + * + * @param proto 1-254 or PROTO_ANY (0) for all; recommended: + * I2PSession.PROTO_STREAMING + * I2PSession.PROTO_DATAGRAM + * 255 disallowed + * @param port 1-65535 or PORT_ANY (0) for all + * @since 0.7.1 + */ public void addSessionListener(I2PSessionListener lsnr, int proto, int port); - /** See I2PSessionMuxedImpl for details */ + + /** + * Listen on specified protocol and port, and receive notification + * of proto, fromPort, and toPort for every message. + * @param proto 1-254 or PROTO_ANY (0) for all; 255 disallowed + * @param port 1-65535 or PORT_ANY (0) for all + * @since 0.7.1 + */ public void addMuxedSessionListener(I2PSessionMuxedListener l, int proto, int port); - /** See I2PSessionMuxedImpl for details */ + + /** + * removes the specified listener (only) + * @since 0.7.1 + */ public void removeListener(int proto, int port); public static final int PORT_ANY = 0; diff --git a/core/java/src/net/i2p/client/I2PSessionDemultiplexer.java b/core/java/src/net/i2p/client/I2PSessionDemultiplexer.java index 28d211fa5..5d28d1a21 100644 --- a/core/java/src/net/i2p/client/I2PSessionDemultiplexer.java +++ b/core/java/src/net/i2p/client/I2PSessionDemultiplexer.java @@ -16,13 +16,14 @@ import net.i2p.util.Log; * depending on whether they want to hear about the * protocol, from port, and to port for every received message. * - * This only calls one listener, not all that apply. + * messageAvailable() only calls one listener, not all that apply. + * The others call all listeners. * * @author zzz */ public class I2PSessionDemultiplexer implements I2PSessionMuxedListener { - private Log _log; - private Map _listeners; + private final Log _log; + private final Map _listeners; public I2PSessionDemultiplexer(I2PAppContext ctx) { _log = ctx.logManager().getLog(I2PSessionDemultiplexer.class); diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java index 0a29ff9d8..04dde2050 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl.java @@ -159,6 +159,8 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa * Create a new session, reading the Destination, PrivateKey, and SigningPrivateKey * from the destKeyStream, and using the specified options to connect to the router * + * @param destKeyStream stream containing the private key data, + * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} * @param options set of options to configure the router with, if null will use System properties * @throws I2PSessionException if there is a problem loading the private keys or */ diff --git a/core/java/src/net/i2p/client/I2PSessionImpl2.java b/core/java/src/net/i2p/client/I2PSessionImpl2.java index 3af551eaa..0f397f68e 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl2.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl2.java @@ -49,6 +49,8 @@ class I2PSessionImpl2 extends I2PSessionImpl { * Create a new session, reading the Destination, PrivateKey, and SigningPrivateKey * from the destKeyStream, and using the specified options to connect to the router * + * @param destKeyStream stream containing the private key data, + * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} * @param options set of options to configure the router with, if null will use System properties * @throws I2PSessionException if there is a problem loading the private keys or */ diff --git a/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java b/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java index fcf11d0da..e125fc49b 100644 --- a/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java @@ -65,9 +65,12 @@ import net.i2p.util.SimpleScheduler; * @author zzz */ class I2PSessionMuxedImpl extends I2PSessionImpl2 implements I2PSession { - private I2PSessionDemultiplexer _demultiplexer; + + private final I2PSessionDemultiplexer _demultiplexer; /* + * @param destKeyStream stream containing the private key data, + * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} * @param options set of options to configure the router with, if null will use System properties */ public I2PSessionMuxedImpl(I2PAppContext ctx, InputStream destKeyStream, Properties options) throws I2PSessionException { @@ -92,11 +95,11 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 implements I2PSession { * An existing listener with the same proto and port is replaced. * Only the listener with the best match is called back for each message. * - * @param proto 1-254 or PROTO_ANY for all; recommended: + * @param proto 1-254 or PROTO_ANY (0) for all; recommended: * I2PSession.PROTO_STREAMING * I2PSession.PROTO_DATAGRAM * 255 disallowed - * @param port 1-65535 or PORT_ANY for all + * @param port 1-65535 or PORT_ANY (0) for all */ @Override public void addSessionListener(I2PSessionListener lsnr, int proto, int port) { @@ -106,8 +109,8 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 implements I2PSession { /** * Listen on specified protocol and port, and receive notification * of proto, fromPort, and toPort for every message. - * @param proto 1-254 or 0 for all; 255 disallowed - * @param port 1-65535 or 0 for all + * @param proto 1-254 or PROTO_ANY (0) for all; 255 disallowed + * @param port 1-65535 or PORT_ANY (0) for all */ @Override public void addMuxedSessionListener(I2PSessionMuxedListener l, int proto, int port) { diff --git a/core/java/src/net/i2p/util/EventDispatcherImpl.java b/core/java/src/net/i2p/util/EventDispatcherImpl.java index 306925aff..700cf8e0f 100644 --- a/core/java/src/net/i2p/util/EventDispatcherImpl.java +++ b/core/java/src/net/i2p/util/EventDispatcherImpl.java @@ -9,13 +9,13 @@ package net.i2p.util; * */ -import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; -import java.util.ListIterator; +import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; /** * An implementation of the EventDispatcher interface. Since Java @@ -34,11 +34,9 @@ import java.util.Set; */ public class EventDispatcherImpl implements EventDispatcher { - //private final static Log _log = new Log(EventDispatcherImpl.class); - private boolean _ignore = false; - private final HashMap _events = new HashMap(4); - private final ArrayList _attached = new ArrayList(); + private final Map _events = new ConcurrentHashMap(4); + private final List _attached = new CopyOnWriteArrayList(); public EventDispatcher getEventDispatcher() { return this; @@ -46,23 +44,12 @@ public class EventDispatcherImpl implements EventDispatcher { public void attachEventDispatcher(EventDispatcher ev) { if (ev == null) return; - synchronized (_attached) { - //_log.debug(this.hashCode() + ": attaching EventDispatcher " + ev.hashCode()); - _attached.add(ev); - } + _attached.add(ev); } public void detachEventDispatcher(EventDispatcher ev) { if (ev == null) return; - synchronized (_attached) { - ListIterator it = _attached.listIterator(); - while (it.hasNext()) { - if (((EventDispatcher) it.next()) == ev) { - it.remove(); - break; - } - } - } + _attached.remove(ev); } public void notifyEvent(String eventName, Object args) { @@ -70,50 +57,28 @@ public class EventDispatcherImpl implements EventDispatcher { if (args == null) { args = "[null value]"; } - //_log.debug(this.hashCode() + ": got notification [" + eventName + "] = [" + args + "]"); + _events.put(eventName, args); synchronized (_events) { - _events.put(eventName, args); _events.notifyAll(); - synchronized (_attached) { - Iterator it = _attached.iterator(); - EventDispatcher e; - while (it.hasNext()) { - e = (EventDispatcher) it.next(); - //_log.debug(this.hashCode() + ": notifying attached EventDispatcher " + e.hashCode() + ": [" - // + eventName + "] = [" + args + "]"); - e.notifyEvent(eventName, args); - } - } + } + for (EventDispatcher e : _attached) { + e.notifyEvent(eventName, args); } } public Object getEventValue(String name) { if (_ignore) return null; - Object val; - - synchronized (_events) { - val = _events.get(name); - } - - return val; + return _events.get(name); } - public Set getEvents() { + public Set getEvents() { if (_ignore) return Collections.EMPTY_SET; - Set set; - - synchronized (_events) { - set = new HashSet(_events.keySet()); - } - - return set; + return new HashSet(_events.keySet()); } public void ignoreEvents() { _ignore = true; - synchronized (_events) { - _events.clear(); - } + _events.clear(); } public void unIgnoreEvents() { @@ -122,22 +87,15 @@ public class EventDispatcherImpl implements EventDispatcher { public Object waitEventValue(String name) { if (_ignore) return null; - Object val; - - //_log.debug(this.hashCode() + ": waiting for [" + name + "]"); do { synchronized (_events) { - if (_events.containsKey(name)) { - val = _events.get(name); - break; - } + Object val = _events.get(name); + if (val != null) + return val; try { - _events.wait(1 * 1000); - } catch (InterruptedException e) { // nop - } + _events.wait(5 * 1000); + } catch (InterruptedException e) {} } } while (true); - - return val; } }