diff --git a/router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java b/router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java index 205086472..05db0b0ce 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java @@ -2,6 +2,8 @@ package net.i2p.router.tunnel; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import net.i2p.router.RouterContext; import net.i2p.util.I2PThread; @@ -12,27 +14,36 @@ import net.i2p.util.I2PThread; */ public class TunnelGatewayPumper implements Runnable { private RouterContext _context; - private final List _wantsPumping; + private final BlockingQueue _wantsPumping; private boolean _stop; + private static final int PUMPERS = 4; /** Creates a new instance of TunnelGatewayPumper */ public TunnelGatewayPumper(RouterContext ctx) { _context = ctx; - _wantsPumping = new ArrayList(64); + _wantsPumping = new LinkedBlockingQueue(); _stop = false; - for (int i = 0; i < 4; i++) - new I2PThread(this, "GW pumper " + i, true).start(); + for (int i = 0; i < PUMPERS; i++) + new I2PThread(this, "Tunnel GW pumper " + i + '/' + PUMPERS, true).start(); } + public void stopPumping() { _stop=true; - synchronized (_wantsPumping) { _wantsPumping.notifyAll(); } + _wantsPumping.clear(); + PumpedTunnelGateway poison = new PoisonPTG(_context); + for (int i = 0; i < PUMPERS; i++) + _wantsPumping.offer(poison); + for (int i = 1; i <= 5 && !_wantsPumping.isEmpty(); i++) { + try { + Thread.sleep(i * 50); + } catch (InterruptedException ie) {} + } + _wantsPumping.clear(); } public void wantsPumping(PumpedTunnelGateway gw) { - synchronized (_wantsPumping) { - _wantsPumping.add(gw); - _wantsPumping.notify(); - } + if (!_stop) + _wantsPumping.offer(gw); } public void run() { @@ -40,17 +51,25 @@ public class TunnelGatewayPumper implements Runnable { List queueBuf = new ArrayList(32); while (!_stop) { try { - synchronized (_wantsPumping) { - if (_wantsPumping.size() > 0) - gw = _wantsPumping.remove(0); - else - _wantsPumping.wait(); - } + gw = _wantsPumping.take(); } catch (InterruptedException ie) {} if (gw != null) { + if (gw.getMessagesSent() == POISON_PTG) + break; gw.pump(queueBuf); gw = null; } } } + + private static final int POISON_PTG = -99999; + + private static class PoisonPTG extends PumpedTunnelGateway { + public PoisonPTG(RouterContext ctx) { + super(ctx, null, null, null, null); + } + + @Override + public int getMessagesSent() { return POISON_PTG; } + } }