From 84b741ac98387f00e257357ad629fd9fb8e9a754 Mon Sep 17 00:00:00 2001 From: jrandom Date: Thu, 27 Jul 2006 06:20:25 +0000 Subject: [PATCH] 2006-07-27 jrandom * Further NTCP write status cleanup * Handle more oddly-timed NTCP disconnections (thanks bar!) --- history.txt | 6 +++- .../src/net/i2p/router/RouterVersion.java | 4 +-- .../router/transport/ntcp/EventPumper.java | 28 +++++++++++++++---- .../router/transport/ntcp/NTCPConnection.java | 12 +++++++- 4 files changed, 41 insertions(+), 9 deletions(-) diff --git a/history.txt b/history.txt index 74785cad1..0acd5398e 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,8 @@ -$Id: history.txt,v 1.496 2006-07-26 01:36:18 jrandom Exp $ +$Id: history.txt,v 1.497 2006-07-26 20:04:59 jrandom Exp $ + +2006-07-27 jrandom + * Further NTCP write status cleanup + * Handle more oddly-timed NTCP disconnections (thanks bar!) 2006-07-26 jrandom * When dropping a netDb router reference, only accept newer diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 8bbd79a54..6c2f1c05d 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.435 $ $Date: 2006-07-26 01:36:30 $"; + public final static String ID = "$Revision: 1.436 $ $Date: 2006-07-26 19:56:52 $"; public final static String VERSION = "0.6.1.22"; - public final static long BUILD = 2; + public final static long BUILD = 3; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java b/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java index 4ff75822e..e081a0fee 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java +++ b/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java @@ -33,10 +33,13 @@ public class EventPumper implements Runnable { private static final int BUF_SIZE = 8*1024; private static final int MAX_CACHE_SIZE = 64; /** - * every 30s or so, iterate across all ntcp connections just to make sure - * we have their interestOps set properly (and to expire any looong idle cons) + * every few seconds, iterate across all ntcp connections just to make sure + * we have their interestOps set properly (and to expire any looong idle cons). + * as the number of connections grows, we should try to make this happen + * less frequently (or not at all), but while the connection count is small, + * the time to iterate across them to check a few flags shouldn't be a problem. */ - private static final long FAILSAFE_ITERATION_FREQ = 30*1000l; + private static final long FAILSAFE_ITERATION_FREQ = 2*1000l; public EventPumper(RouterContext ctx, NTCPTransport transport) { _context = ctx; @@ -408,6 +411,11 @@ public class EventPumper implements Runnable { con.recv(rbuf); } } + } catch (CancelledKeyException cke) { + if (_log.shouldLog(Log.WARN)) _log.warn("error reading", cke); + con.close(); + _context.statManager().addRateData("ntcp.readError", 1, 0); + if (buf != null) releaseBuf(buf); } catch (IOException ioe) { if (_log.shouldLog(Log.WARN)) _log.warn("error reading", ioe); con.close(); @@ -442,7 +450,7 @@ public class EventPumper implements Runnable { int written = con.getChannel().write(buf); totalWritten += written; if (written == 0) { - if ( (buf.remaining() > 0) || (con.getWriteBufCount() > 1) ) { + if ( (buf.remaining() > 0) || (con.getWriteBufCount() >= 1) ) { if (_log.shouldLog(Log.DEBUG)) _log.debug("done writing, but data remains..."); key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); } else { @@ -469,6 +477,10 @@ public class EventPumper implements Runnable { break; } } + } catch (CancelledKeyException cke) { + if (_log.shouldLog(Log.WARN)) _log.warn("error writing", ioe); + _context.statManager().addRateData("ntcp.writeError", 1, 0); + con.close(); } catch (IOException ioe) { if (_log.shouldLog(Log.WARN)) _log.warn("error writing", ioe); _context.statManager().addRateData("ntcp.writeError", 1, 0); @@ -490,7 +502,11 @@ public class EventPumper implements Runnable { while (buf.size() > 0) { NTCPConnection con = (NTCPConnection)buf.remove(0); SelectionKey key = con.getKey(); - key.interestOps(key.interestOps() | SelectionKey.OP_READ); + try { + key.interestOps(key.interestOps() | SelectionKey.OP_READ); + } catch (CancelledKeyException cke) { + // ignore, we remove/etc elsewhere + } } synchronized (_wantsWrite) { @@ -569,6 +585,8 @@ public class EventPumper implements Runnable { // _context.shitlist().shitlistRouter(con.getRemotePeer().calculateHash(), "unable to connect/resolve: " + uae.getMessage(), NTCPTransport.STYLE); con.close(true); //} + } catch (CancelledKeyException cke) { + con.close(false); } } catch (ClosedChannelException cce) { if (_log.shouldLog(Log.WARN)) _log.warn("Error registering", cce); diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java index 5653a9bc5..eab454413 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java @@ -236,6 +236,16 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { } for (int i = 0; i < msgs.size(); i++) { OutNetMessage msg = (OutNetMessage)msgs.get(i); + Object buf = msg.releasePreparationBuffer(); + if (buf != null) + releaseBuf((PrepBuffer)buf); + _transport.afterSend(msg, false, allowRequeue, msg.getLifetime()); + } + OutNetMessage msg = _currentOutbound; + if (msg != null) { + Object buf = msg.releasePreparationBuffer(); + if (buf != null) + releaseBuf((PrepBuffer)buf); _transport.afterSend(msg, false, allowRequeue, msg.getLifetime()); } } @@ -806,7 +816,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { public ByteBuffer getNextWriteBuf() { synchronized (_writeBufs) { if (_writeBufs.size() > 0) - return (ByteBuffer)_writeBufs.remove(0); + return (ByteBuffer)_writeBufs.get(0); // not remove! we removeWriteBuf afterwards } return null; }