SSU: Improve handling of socket that gets closed (ticket #1385)

This commit is contained in:
zzz
2014-10-07 12:09:10 +00:00
parent 088290c544
commit 8270a92a44
8 changed files with 111 additions and 17 deletions

View File

@@ -1,3 +1,17 @@
2014-10-07 zzz
* CPUID: Remove Intel model 2 again, this is spoofed in the VM
* Graphs: Catch an error caused by missing fonts
* i2ptunnel: Handle named sig types from i2ptunnel.config in the GUI
* SSU: Improve handling of socket that gets closed (ticket #1385)
* Startup: Delay ReadConfigJob another minute
2014-10-05 zzz
* Crypto: EdDSA cleanup
2014-10-04 zzz
* i2psnark: Disable changing types for predefined trackers
* i2ptunnel: Fix js confirm for delete button
2014-10-03 zzz
* Console: New add-to-addressbook links on leaseset page
* CPUID: Fix Intel processor identification

View File

@@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 6;
public final static long BUILD = 7;
/** for example "-test" */
public final static String EXTRA = "";

View File

@@ -0,0 +1,8 @@
package net.i2p.router.transport.udp;
/**
* @since 0.9.16
*/
interface SocketListener {
public void fail();
}

View File

@@ -15,7 +15,7 @@ import net.i2p.util.Log;
* Coordinate the low-level datagram socket, creating and managing the UDPSender and
* UDPReceiver.
*/
class UDPEndpoint {
class UDPEndpoint implements SocketListener {
private final RouterContext _context;
private final Log _log;
private int _listenPort;
@@ -42,7 +42,11 @@ class UDPEndpoint {
_isIPv6 = bindAddress == null || bindAddress instanceof Inet6Address;
}
/** caller should call getListenPort() after this to get the actual bound port and determine success */
/**
* Caller should call getListenPort() after this to get the actual bound port and determine success .
*
* Can be restarted.
*/
public synchronized void startup() throws SocketException {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Starting up the UDP endpoint");
@@ -53,10 +57,10 @@ class UDPEndpoint {
throw new SocketException("SSU Unable to bind to a port on " + _bindAddress);
}
int count = _counter.incrementAndGet();
_sender = new UDPSender(_context, _socket, "UDPSender " + count);
_sender = new UDPSender(_context, _socket, "UDPSender " + count, this);
_sender.startup();
if (_transport != null) {
_receiver = new UDPReceiver(_context, _transport, _socket, "UDPReceiver " + count);
_receiver = new UDPReceiver(_context, _transport, _socket, "UDPReceiver " + count, this);
_receiver.startup();
}
}
@@ -208,4 +212,25 @@ class UDPEndpoint {
public boolean isIPv6() {
return _isIPv6;
}
/**
* @since 0.9.16
*/
public void fail() {
shutdown();
_transport.fail(this);
}
/**
* @since 0.9.16
*/
@Override
public String toString() {
StringBuilder buf = new StringBuilder(64);
buf.append("UDP Socket ");
if (_bindAddress != null)
buf.append(_bindAddress.toString()).append(' ');
buf.append("port ").append(_listenPort);
return buf.toString();
}
}

View File

@@ -323,9 +323,9 @@ class UDPPacket implements CDQEntry {
if (_fragmentCount > 0)
buf.append(" fragCount=").append(_fragmentCount);
if (_enqueueTime >= 0)
if (_enqueueTime > 0)
buf.append(" sinceEnqueued=").append(_context.clock().now() - _enqueueTime);
if (_receivedTime >= 0)
if (_receivedTime > 0)
buf.append(" sinceReceived=").append(_context.clock().now() - _receivedTime);
//buf.append(" beforeReceiveFragments=").append((_beforeReceiveFragments > 0 ? _context.clock().now()-_beforeReceiveFragments : -1));
//buf.append(" sinceHandled=").append((_afterHandlingTime > 0 ? _context.clock().now()-_afterHandlingTime : -1));

View File

@@ -31,15 +31,18 @@ class UDPReceiver {
private final Runner _runner;
private final UDPTransport _transport;
private final PacketHandler _handler;
private final SocketListener _endpoint;
private static final boolean _isAndroid = SystemVersion.isAndroid();
public UDPReceiver(RouterContext ctx, UDPTransport transport, DatagramSocket socket, String name) {
public UDPReceiver(RouterContext ctx, UDPTransport transport, DatagramSocket socket, String name,
SocketListener lsnr) {
_context = ctx;
_log = ctx.logManager().getLog(UDPReceiver.class);
_name = name;
_socket = socket;
_transport = transport;
_endpoint = lsnr;
_handler = transport.getPacketHandler();
if (_handler == null)
throw new IllegalStateException();
@@ -51,6 +54,9 @@ class UDPReceiver {
_context.statManager().createRateStat("udp.ignorePacketFromDroplist", "Packet lifetime for those dropped on the drop list", "udp", UDPTransport.RATES);
}
/**
* Cannot be restarted (socket is final)
*/
public synchronized void startup() {
//adjustDropProbability();
_keepRunning = true;
@@ -281,12 +287,19 @@ class UDPReceiver {
_log.warn("Error receiving", ioe);
//}
packet.release();
// TODO count consecutive errors, give up after too many?
try { Thread.sleep(100); } catch (InterruptedException ie) {}
if (_socket.isClosed()) {
if (_keepRunning) {
_keepRunning = false;
_endpoint.fail();
}
} else if (_keepRunning) {
// TODO count consecutive errors, give up after too many?
try { Thread.sleep(100); } catch (InterruptedException ie) {}
}
}
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Stop receiving...");
if (_log.shouldLog(Log.WARN))
_log.warn("Stop receiving on " + _endpoint);
}
/******

View File

@@ -28,13 +28,14 @@ class UDPSender {
private volatile boolean _keepRunning;
private final Runner _runner;
private final boolean _dummy;
private final SocketListener _endpoint;
private static final int TYPE_POISON = 99999;
private static final int MIN_QUEUE_SIZE = 64;
private static final int MAX_QUEUE_SIZE = 384;
public UDPSender(RouterContext ctx, DatagramSocket socket, String name) {
public UDPSender(RouterContext ctx, DatagramSocket socket, String name, SocketListener lsnr) {
_context = ctx;
_dummy = false; // ctx.commSystem().isDummy();
_log = ctx.logManager().getLog(UDPSender.class);
@@ -44,6 +45,7 @@ class UDPSender {
_socket = socket;
_runner = new Runner();
_name = name;
_endpoint = lsnr;
_context.statManager().createRateStat("udp.pushTime", "How long a UDP packet takes to get pushed out", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendQueueSize", "How many packets are queued on the UDP sender", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendQueueFailed", "How often it was unable to add a new packet to the queue", "udp", UDPTransport.RATES);
@@ -69,6 +71,9 @@ class UDPSender {
_context.statManager().createRateStat("udp.sendPacketSize." + PacketBuilder.TYPE_CREAT, "session created packet size", "udp", UDPTransport.RATES);
}
/**
* Cannot be restarted (socket is final)
*/
public synchronized void startup() {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Starting the runner: " + _name);
@@ -78,6 +83,8 @@ class UDPSender {
}
public synchronized void shutdown() {
if (!_keepRunning)
return;
_keepRunning = false;
_outboundQueue.clear();
UDPPacket poison = UDPPacket.acquire(_context, false);
@@ -265,16 +272,23 @@ class UDPSender {
_context.statManager().addRateData("udp.sendPacketSize", size, packet.getLifetime());
} catch (IOException ioe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Error sending", ioe);
_log.warn("Error sending to " + packet.getPacket().getAddress(), ioe);
_context.statManager().addRateData("udp.sendException", 1, packet.getLifetime());
if (_socket.isClosed()) {
if (_keepRunning) {
_keepRunning = false;
_endpoint.fail();
}
}
}
// back to the cache
packet.release();
}
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Stop sending...");
if (_log.shouldLog(Log.WARN))
_log.warn("Stop sending on " + _endpoint);
_outboundQueue.clear();
}
/** @return next packet in queue. Will discard any packet older than MAX_HEAD_LIFETIME */

View File

@@ -370,7 +370,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if (_log.shouldLog(Log.INFO))
_log.info("Binding to the port: " + port);
if (_endpoints.isEmpty()) {
// will always be empty since we are removing them above
// _endpoints will always be empty since we removed them above
if (bindToAddrs.isEmpty()) {
UDPEndpoint endpoint = new UDPEndpoint(_context, this, port, null);
_endpoints.add(endpoint);
@@ -420,8 +420,11 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if (newPort < 0 && endpoint.isIPv4()) {
newPort = endpoint.getListenPort();
}
if (_log.shouldLog(Log.WARN))
_log.warn("Started " + endpoint);
} catch (SocketException se) {
_endpoints.remove(endpoint);
_log.error("Failed to start " + endpoint, se);
}
}
if (_endpoints.isEmpty()) {
@@ -494,6 +497,23 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
UDPAddress.clearCache();
}
/**
* The endpoint has failed. Remove it.
*
* @since 0.9.16
*/
public void fail(UDPEndpoint endpoint) {
if (_endpoints.remove(endpoint)) {
_log.log(Log.CRIT, "UDP port failure: " + endpoint);
if (_endpoints.isEmpty()) {
_log.log(Log.CRIT, "No more UDP sockets open");
setReachabilityStatus(CommSystemFacade.STATUS_HOSED);
// TODO restart?
}
rebuildExternalAddress();
}
}
/** @since IPv6 */
private boolean isAlive() {
return _inboundFragments.isAlive();