From 0b49fa98f91150f70fc9706014537af599b2cde3 Mon Sep 17 00:00:00 2001 From: zzz Date: Sat, 18 May 2013 18:00:17 +0000 Subject: [PATCH] * SSU: Fixes for i2np.udp.allowLocal, log tweaks, sender/receiver thread name tweaks * Limit tunnel GW pumper threads when testing --- .../net/i2p/router/transport/TransportImpl.java | 3 ++- .../router/transport/udp/EstablishmentManager.java | 2 +- .../transport/udp/OutboundMessageFragments.java | 4 ++-- .../net/i2p/router/transport/udp/UDPEndpoint.java | 7 +++++-- .../net/i2p/router/transport/udp/UDPReceiver.java | 7 ++----- .../net/i2p/router/transport/udp/UDPTransport.java | 14 +++++++++++++- .../net/i2p/router/tunnel/TunnelGatewayPumper.java | 12 ++++++++---- router/java/test/junit/net/i2p/router/SSUDemo.java | 3 ++- 8 files changed, 35 insertions(+), 17 deletions(-) diff --git a/router/java/src/net/i2p/router/transport/TransportImpl.java b/router/java/src/net/i2p/router/transport/TransportImpl.java index e82580b19..c6d49580f 100644 --- a/router/java/src/net/i2p/router/transport/TransportImpl.java +++ b/router/java/src/net/i2p/router/transport/TransportImpl.java @@ -687,7 +687,8 @@ public abstract class TransportImpl implements Transport { else _wasUnreachableEntries.remove(peer); if (_log.shouldLog(Log.INFO)) - _log.info(this.getStyle() + " setting wasUnreachable to " + yes + " for " + peer); + _log.info(this.getStyle() + " setting wasUnreachable to " + yes + " for " + peer, + yes ? new Exception() : null); } /** diff --git a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java index f9b9e2d1f..4afd49b7b 100644 --- a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java +++ b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java @@ -249,7 +249,7 @@ class EstablishmentManager { maybeTo = new RemoteHostId(remAddr.getAddress(), port); if ((!_transport.isValid(maybeTo.getIP())) || - Arrays.equals(maybeTo.getIP(), _transport.getExternalIP())) { + (Arrays.equals(maybeTo.getIP(), _transport.getExternalIP()) && !_transport.allowLocal())) { _transport.failed(msg, "Remote peer's IP isn't valid"); _transport.markUnreachable(toHash); //_context.banlist().banlistRouter(msg.getTarget().getIdentity().calculateHash(), "Invalid SSU address", UDPTransport.STYLE); diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java index d4c8be833..96e9a6686 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java @@ -309,8 +309,8 @@ class OutboundMessageFragments { // use max of 1 second so finishMessages() and/or PeerState.finishMessages() // gets called regularly int toWait = Math.min(Math.max(nextSendDelay, 10), MAX_WAIT); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("wait for " + toWait); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("wait for " + toWait); // wait.. or somethin' synchronized (_activePeers) { try { diff --git a/router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java b/router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java index 206eaee7b..b750df660 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java @@ -6,6 +6,7 @@ import java.net.InetAddress; import java.net.Inet4Address; import java.net.Inet6Address; import java.net.SocketException; +import java.util.concurrent.atomic.AtomicInteger; import net.i2p.router.RouterContext; import net.i2p.util.Log; @@ -24,6 +25,7 @@ class UDPEndpoint { private DatagramSocket _socket; private final InetAddress _bindAddress; private final boolean _isIPv4, _isIPv6; + private static final AtomicInteger _counter = new AtomicInteger(); /** * @param transport may be null for unit testing ONLY @@ -50,10 +52,11 @@ class UDPEndpoint { _log.log(Log.CRIT, "UDP Unable to open a port"); throw new SocketException("SSU Unable to bind to a port on " + _bindAddress); } - _sender = new UDPSender(_context, _socket, "UDPSender"); + int count = _counter.incrementAndGet(); + _sender = new UDPSender(_context, _socket, "UDPSender " + count); _sender.startup(); if (_transport != null) { - _receiver = new UDPReceiver(_context, _transport, _socket, "UDPReceiver"); + _receiver = new UDPReceiver(_context, _transport, _socket, "UDPReceiver " + count); _receiver.startup(); } } diff --git a/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java b/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java index 468c780ec..d8976d483 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java @@ -31,15 +31,12 @@ class UDPReceiver { private final Runner _runner; private final UDPTransport _transport; private final PacketHandler _handler; - private static int __id; - private final int _id; private static final boolean _isAndroid = SystemVersion.isAndroid(); public UDPReceiver(RouterContext ctx, UDPTransport transport, DatagramSocket socket, String name) { _context = ctx; _log = ctx.logManager().getLog(UDPReceiver.class); - _id = ++__id; _name = name; _socket = socket; _transport = transport; @@ -57,7 +54,7 @@ class UDPReceiver { public synchronized void startup() { //adjustDropProbability(); _keepRunning = true; - I2PThread t = new I2PThread(_runner, _name + '.' + _id, true); + I2PThread t = new I2PThread(_runner, _name, true); t.start(); } @@ -154,7 +151,7 @@ class UDPReceiver { } // drop anything apparently from our IP (any port) - if (Arrays.equals(from.getIP(), _transport.getExternalIP())) { + if (Arrays.equals(from.getIP(), _transport.getExternalIP()) && !_transport.allowLocal()) { if (_log.shouldLog(Log.WARN)) _log.warn("Dropping (spoofed?) packet from ourselves"); packet.release(); diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index b98396379..fe7d7bfc7 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -888,6 +888,15 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority if (isPubliclyRoutable(addr) && (addr.length != 16 || _haveIPv6Address)) return true; + return allowLocal(); + } + + /** + * Are we allowed to connect to local addresses? + * + * @since IPv6 + */ + boolean allowLocal() { return _context.getBooleanProperty("i2np.udp.allowLocal"); } @@ -1522,7 +1531,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority int port = addr.getPort(); if (ip == null || port < MIN_PEER_PORT || (!isValid(ip)) || - Arrays.equals(ip, getExternalIP())) { + (Arrays.equals(ip, getExternalIP()) && !allowLocal())) { continue; } } @@ -1960,6 +1969,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority // dropPeer(msg.getPeer(), false); //else if (consecutive > 2 * MAX_CONSECUTIVE_FAILED) // they're sending us data, but we cant reply? // dropPeer(msg.getPeer(), false); + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Failed sending " + msg + " to " + msg.getPeer()); } noteSend(msg, false); if (m != null) diff --git a/router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java b/router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java index 7dfe8dc76..680a23966 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java @@ -42,10 +42,14 @@ class TunnelGatewayPumper implements Runnable { _context = ctx; _wantsPumping = new LinkedHashSet(16); _backlogged = new HashSet(16); - long maxMemory = Runtime.getRuntime().maxMemory(); - if (maxMemory == Long.MAX_VALUE) - maxMemory = 96*1024*1024l; - _pumpers = (int) Math.max(MIN_PUMPERS, Math.min(MAX_PUMPERS, 1 + (maxMemory / (32*1024*1024)))); + if (ctx.getBooleanProperty("i2p.dummyTunnelManager")) { + _pumpers = 1; + } else { + long maxMemory = Runtime.getRuntime().maxMemory(); + if (maxMemory == Long.MAX_VALUE) + maxMemory = 96*1024*1024l; + _pumpers = (int) Math.max(MIN_PUMPERS, Math.min(MAX_PUMPERS, 1 + (maxMemory / (32*1024*1024)))); + } for (int i = 0; i < _pumpers; i++) new I2PThread(this, "Tunnel GW pumper " + (i+1) + '/' + _pumpers, true).start(); } diff --git a/router/java/test/junit/net/i2p/router/SSUDemo.java b/router/java/test/junit/net/i2p/router/SSUDemo.java index 9b48e548d..f505fea60 100644 --- a/router/java/test/junit/net/i2p/router/SSUDemo.java +++ b/router/java/test/junit/net/i2p/router/SSUDemo.java @@ -234,7 +234,7 @@ public class SSUDemo { // we know its a FooMessage, since thats the type of message that the handler // is registered as FooMessage m = (FooMessage)_msg; - System.out.println("RECV: " + Base64.encode(m.getData()) + " from " + _from); + System.out.println("RECV FooMessage: " + Base64.encode(m.getData()) + " from " + _from); } public String getName() { return "Handle Foo message"; } } @@ -301,6 +301,7 @@ public class SSUDemo { // we know its a DatabaseStoreMessage, since thats the type of message that the handler // is registered as DatabaseStoreMessage m = (DatabaseStoreMessage)_msg; + System.out.println("RECV: " + m); try { _us.netDb().store(m.getKey(), (RouterInfo) m.getEntry()); } catch (IllegalArgumentException iae) {