* TunnelGatewayPumper: Refactor for concurrent

This commit is contained in:
zzz
2010-03-09 20:43:30 +00:00
parent 05f2a62cbb
commit d79387bd92

View File

@@ -2,6 +2,8 @@ package net.i2p.router.tunnel;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
import net.i2p.util.I2PThread; import net.i2p.util.I2PThread;
@@ -12,27 +14,36 @@ import net.i2p.util.I2PThread;
*/ */
public class TunnelGatewayPumper implements Runnable { public class TunnelGatewayPumper implements Runnable {
private RouterContext _context; private RouterContext _context;
private final List<PumpedTunnelGateway> _wantsPumping; private final BlockingQueue<PumpedTunnelGateway> _wantsPumping;
private boolean _stop; private boolean _stop;
private static final int PUMPERS = 4;
/** Creates a new instance of TunnelGatewayPumper */ /** Creates a new instance of TunnelGatewayPumper */
public TunnelGatewayPumper(RouterContext ctx) { public TunnelGatewayPumper(RouterContext ctx) {
_context = ctx; _context = ctx;
_wantsPumping = new ArrayList(64); _wantsPumping = new LinkedBlockingQueue();
_stop = false; _stop = false;
for (int i = 0; i < 4; i++) for (int i = 0; i < PUMPERS; i++)
new I2PThread(this, "GW pumper " + i, true).start(); new I2PThread(this, "Tunnel GW pumper " + i + '/' + PUMPERS, true).start();
} }
public void stopPumping() { public void stopPumping() {
_stop=true; _stop=true;
synchronized (_wantsPumping) { _wantsPumping.notifyAll(); } _wantsPumping.clear();
PumpedTunnelGateway poison = new PoisonPTG(_context);
for (int i = 0; i < PUMPERS; i++)
_wantsPumping.offer(poison);
for (int i = 1; i <= 5 && !_wantsPumping.isEmpty(); i++) {
try {
Thread.sleep(i * 50);
} catch (InterruptedException ie) {}
}
_wantsPumping.clear();
} }
public void wantsPumping(PumpedTunnelGateway gw) { public void wantsPumping(PumpedTunnelGateway gw) {
synchronized (_wantsPumping) { if (!_stop)
_wantsPumping.add(gw); _wantsPumping.offer(gw);
_wantsPumping.notify();
}
} }
public void run() { public void run() {
@@ -40,17 +51,25 @@ public class TunnelGatewayPumper implements Runnable {
List<TunnelGateway.Pending> queueBuf = new ArrayList(32); List<TunnelGateway.Pending> queueBuf = new ArrayList(32);
while (!_stop) { while (!_stop) {
try { try {
synchronized (_wantsPumping) { gw = _wantsPumping.take();
if (_wantsPumping.size() > 0)
gw = _wantsPumping.remove(0);
else
_wantsPumping.wait();
}
} catch (InterruptedException ie) {} } catch (InterruptedException ie) {}
if (gw != null) { if (gw != null) {
if (gw.getMessagesSent() == POISON_PTG)
break;
gw.pump(queueBuf); gw.pump(queueBuf);
gw = null; gw = null;
} }
} }
} }
private static final int POISON_PTG = -99999;
private static class PoisonPTG extends PumpedTunnelGateway {
public PoisonPTG(RouterContext ctx) {
super(ctx, null, null, null, null);
}
@Override
public int getMessagesSent() { return POISON_PTG; }
}
} }