* Router I2CP:

- Make classes extensible for router-side test stubs
   - Add router-side local-only test implementation, no full router required.
     Only tested with external clients, probably doesn't work in-JVM.
   - Don't start threads in ClientManager constructor
   - Remove unused Reader param in ClientMessageEventListener methods
   - Cleanups, volatiles, finals, javadocs
This commit is contained in:
zzz
2013-07-10 18:12:47 +00:00
parent d3e0161a6b
commit 2caa6ad975
8 changed files with 340 additions and 50 deletions

View File

@@ -58,7 +58,7 @@ import net.i2p.util.SimpleTimer;
class ClientConnectionRunner { class ClientConnectionRunner {
protected final Log _log; protected final Log _log;
protected final RouterContext _context; protected final RouterContext _context;
private final ClientManager _manager; protected final ClientManager _manager;
/** socket for this particular peer connection */ /** socket for this particular peer connection */
private final Socket _socket; private final Socket _socket;
/** output stream of the socket that I2CP messages bound to the client should be written to */ /** output stream of the socket that I2CP messages bound to the client should be written to */
@@ -137,7 +137,7 @@ class ClientConnectionRunner {
if (_dead || _reader != null) if (_dead || _reader != null)
throw new IllegalStateException(); throw new IllegalStateException();
_reader = new I2CPMessageReader(new BufferedInputStream(_socket.getInputStream(), BUF_SIZE), _reader = new I2CPMessageReader(new BufferedInputStream(_socket.getInputStream(), BUF_SIZE),
new ClientMessageEventListener(_context, this, true)); createListener());
_writer = new ClientWriterRunner(_context, this); _writer = new ClientWriterRunner(_context, this);
I2PThread t = new I2PThread(_writer); I2PThread t = new I2PThread(_writer);
t.setName("I2CP Writer " + __id.incrementAndGet()); t.setName("I2CP Writer " + __id.incrementAndGet());
@@ -148,6 +148,14 @@ class ClientConnectionRunner {
// TODO need a cleaner for unclaimed items in _messages, but we have no timestamps... // TODO need a cleaner for unclaimed items in _messages, but we have no timestamps...
} }
/**
* Allow override for testing
* @since 0.9.8
*/
protected I2CPMessageReader.I2CPMessageEventListener createListener() {
return new ClientMessageEventListener(_context, this, true);
}
/** /**
* Die a horrible death. Cannot be restarted. * Die a horrible death. Cannot be restarted.
*/ */
@@ -460,8 +468,8 @@ class ClientConnectionRunner {
* @param set LeaseSet with requested leases - this object must be updated to contain the * @param set LeaseSet with requested leases - this object must be updated to contain the
* signed version (as well as any changed/added/removed Leases) * signed version (as well as any changed/added/removed Leases)
* @param expirationTime ms to wait before failing * @param expirationTime ms to wait before failing
* @param onCreateJob Job to run after the LeaseSet is authorized * @param onCreateJob Job to run after the LeaseSet is authorized, null OK
* @param onFailedJob Job to run after the timeout passes without receiving authorization * @param onFailedJob Job to run after the timeout passes without receiving authorization, null OK
*/ */
void requestLeaseSet(LeaseSet set, long expirationTime, Job onCreateJob, Job onFailedJob) { void requestLeaseSet(LeaseSet set, long expirationTime, Job onCreateJob, Job onFailedJob) {
if (_dead) { if (_dead) {

View File

@@ -10,6 +10,7 @@ package net.i2p.router.client;
import java.io.IOException; import java.io.IOException;
import java.io.Writer; import java.io.Writer;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
@@ -44,7 +45,7 @@ import net.i2p.util.Log;
*/ */
class ClientManager { class ClientManager {
private final Log _log; private final Log _log;
private ClientListenerRunner _listener; protected ClientListenerRunner _listener;
// Destination --> ClientConnectionRunner // Destination --> ClientConnectionRunner
// Locked for adds/removes but not lookups // Locked for adds/removes but not lookups
private final Map<Destination, ClientConnectionRunner> _runners; private final Map<Destination, ClientConnectionRunner> _runners;
@@ -53,8 +54,9 @@ class ClientManager {
private final Map<Hash, ClientConnectionRunner> _runnersByHash; private final Map<Hash, ClientConnectionRunner> _runnersByHash;
// ClientConnectionRunner for clients w/out a Dest yet // ClientConnectionRunner for clients w/out a Dest yet
private final Set<ClientConnectionRunner> _pendingRunners; private final Set<ClientConnectionRunner> _pendingRunners;
private final RouterContext _ctx; protected final RouterContext _ctx;
private volatile boolean _isStarted; protected final int _port;
protected volatile boolean _isStarted;
/** Disable external interface, allow internal clients only @since 0.8.3 */ /** Disable external interface, allow internal clients only @since 0.8.3 */
private static final String PROP_DISABLE_EXTERNAL = "i2cp.disableInterface"; private static final String PROP_DISABLE_EXTERNAL = "i2cp.disableInterface";
@@ -65,6 +67,10 @@ class ClientManager {
private static final long REQUEST_LEASESET_TIMEOUT = 60*1000; private static final long REQUEST_LEASESET_TIMEOUT = 60*1000;
/**
* Does not start the listeners.
* Caller must call start()
*/
public ClientManager(RouterContext context, int port) { public ClientManager(RouterContext context, int port) {
_ctx = context; _ctx = context;
_log = context.logManager().getLog(ClientManager.class); _log = context.logManager().getLog(ClientManager.class);
@@ -75,22 +81,27 @@ class ClientManager {
_runners = new ConcurrentHashMap(); _runners = new ConcurrentHashMap();
_runnersByHash = new ConcurrentHashMap(); _runnersByHash = new ConcurrentHashMap();
_pendingRunners = new HashSet(); _pendingRunners = new HashSet();
startListeners(port); _port = port;
// following are for RequestLeaseSetJob // following are for RequestLeaseSetJob
_ctx.statManager().createRateStat("client.requestLeaseSetSuccess", "How frequently the router requests successfully a new leaseSet?", "ClientMessages", new long[] { 60*60*1000 }); _ctx.statManager().createRateStat("client.requestLeaseSetSuccess", "How frequently the router requests successfully a new leaseSet?", "ClientMessages", new long[] { 60*60*1000 });
_ctx.statManager().createRateStat("client.requestLeaseSetTimeout", "How frequently the router requests a new leaseSet but gets no reply?", "ClientMessages", new long[] { 60*60*1000 }); _ctx.statManager().createRateStat("client.requestLeaseSetTimeout", "How frequently the router requests a new leaseSet but gets no reply?", "ClientMessages", new long[] { 60*60*1000 });
_ctx.statManager().createRateStat("client.requestLeaseSetDropped", "How frequently the router requests a new leaseSet but the client drops?", "ClientMessages", new long[] { 60*60*1000 }); _ctx.statManager().createRateStat("client.requestLeaseSetDropped", "How frequently the router requests a new leaseSet but the client drops?", "ClientMessages", new long[] { 60*60*1000 });
} }
/** @since 0.9.8 */
public synchronized void start() {
startListeners();
}
/** Todo: Start a 3rd listener for IPV6? */ /** Todo: Start a 3rd listener for IPV6? */
private void startListeners(int port) { protected void startListeners() {
if (!_ctx.getBooleanProperty(PROP_DISABLE_EXTERNAL)) { if (!_ctx.getBooleanProperty(PROP_DISABLE_EXTERNAL)) {
// there's no option to start both an SSL and non-SSL listener // there's no option to start both an SSL and non-SSL listener
if (_ctx.getBooleanProperty(PROP_ENABLE_SSL)) if (_ctx.getBooleanProperty(PROP_ENABLE_SSL))
_listener = new SSLClientListenerRunner(_ctx, this, port); _listener = new SSLClientListenerRunner(_ctx, this, _port);
else else
_listener = new ClientListenerRunner(_ctx, this, port); _listener = new ClientListenerRunner(_ctx, this, _port);
Thread t = new I2PThread(_listener, "ClientListener:" + port, true); Thread t = new I2PThread(_listener, "ClientListener:" + _port, true);
t.start(); t.start();
} }
_isStarted = true; _isStarted = true;
@@ -102,9 +113,7 @@ class ClientManager {
// to let the old listener die // to let the old listener die
try { Thread.sleep(2*1000); } catch (InterruptedException ie) {} try { Thread.sleep(2*1000); } catch (InterruptedException ie) {}
int port = _ctx.getProperty(ClientManagerFacadeImpl.PROP_CLIENT_PORT, startListeners();
ClientManagerFacadeImpl.DEFAULT_PORT);
startListeners(port);
} }
/** /**
@@ -404,12 +413,18 @@ class ClientManager {
} }
} }
/**
* @return unmodifiable, not a copy
*/
Set<Destination> getRunnerDestinations() { Set<Destination> getRunnerDestinations() {
Set<Destination> dests = new HashSet(); return Collections.unmodifiableSet(_runners.keySet());
dests.addAll(_runners.keySet());
return dests;
} }
/**
* Unused
*
* @param dest null for all local destinations
*/
public void reportAbuse(Destination dest, String reason, int severity) { public void reportAbuse(Destination dest, String reason, int severity) {
if (dest != null) { if (dest != null) {
ClientConnectionRunner runner = getRunner(dest); ClientConnectionRunner runner = getRunner(dest);
@@ -417,9 +432,7 @@ class ClientManager {
runner.reportAbuse(reason, severity); runner.reportAbuse(reason, severity);
} }
} else { } else {
Set dests = getRunnerDestinations(); for (Destination d : _runners.keySet()) {
for (Iterator iter = dests.iterator(); iter.hasNext(); ) {
Destination d = (Destination)iter.next();
reportAbuse(d, reason, severity); reportAbuse(d, reason, severity);
} }
} }

View File

@@ -56,6 +56,7 @@ public class ClientManagerFacadeImpl extends ClientManagerFacade implements Inte
_log.info("Starting up the client subsystem"); _log.info("Starting up the client subsystem");
int port = _context.getProperty(PROP_CLIENT_PORT, DEFAULT_PORT); int port = _context.getProperty(PROP_CLIENT_PORT, DEFAULT_PORT);
_manager = new ClientManager(_context, port); _manager = new ClientManager(_context, port);
_manager.start();
} }
public synchronized void shutdown() { public synchronized void shutdown() {
@@ -82,12 +83,12 @@ public class ClientManagerFacadeImpl extends ClientManagerFacade implements Inte
public boolean isAlive() { return _manager != null && _manager.isAlive(); } public boolean isAlive() { return _manager != null && _manager.isAlive(); }
private static final long MAX_TIME_TO_REBUILD = 10*60*1000; private static final long MAX_TIME_TO_REBUILD = 10*60*1000;
@Override @Override
public boolean verifyClientLiveliness() { public boolean verifyClientLiveliness() {
if (_manager == null) return true; if (_manager == null) return true;
boolean lively = true; boolean lively = true;
for (Iterator iter = _manager.getRunnerDestinations().iterator(); iter.hasNext(); ) { for (Destination dest : _manager.getRunnerDestinations()) {
Destination dest = (Destination)iter.next();
ClientConnectionRunner runner = _manager.getRunner(dest); ClientConnectionRunner runner = _manager.getRunner(dest);
if ( (runner == null) || (runner.getIsDead())) continue; if ( (runner == null) || (runner.getIsDead())) continue;
LeaseSet ls = runner.getLeaseSet(); LeaseSet ls = runner.getLeaseSet();

View File

@@ -46,8 +46,8 @@ import net.i2p.util.RandomSource;
*/ */
class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventListener { class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventListener {
private final Log _log; private final Log _log;
private final RouterContext _context; protected final RouterContext _context;
private final ClientConnectionRunner _runner; protected final ClientConnectionRunner _runner;
private final boolean _enforceAuth; private final boolean _enforceAuth;
private static final String PROP_AUTH = "i2cp.auth"; private static final String PROP_AUTH = "i2cp.auth";
@@ -73,40 +73,40 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
_log.debug("Message received: \n" + message); _log.debug("Message received: \n" + message);
switch (message.getType()) { switch (message.getType()) {
case GetDateMessage.MESSAGE_TYPE: case GetDateMessage.MESSAGE_TYPE:
handleGetDate(reader, (GetDateMessage)message); handleGetDate((GetDateMessage)message);
break; break;
case SetDateMessage.MESSAGE_TYPE: case SetDateMessage.MESSAGE_TYPE:
handleSetDate(reader, (SetDateMessage)message); handleSetDate((SetDateMessage)message);
break; break;
case CreateSessionMessage.MESSAGE_TYPE: case CreateSessionMessage.MESSAGE_TYPE:
handleCreateSession(reader, (CreateSessionMessage)message); handleCreateSession((CreateSessionMessage)message);
break; break;
case SendMessageMessage.MESSAGE_TYPE: case SendMessageMessage.MESSAGE_TYPE:
handleSendMessage(reader, (SendMessageMessage)message); handleSendMessage((SendMessageMessage)message);
break; break;
case SendMessageExpiresMessage.MESSAGE_TYPE: case SendMessageExpiresMessage.MESSAGE_TYPE:
handleSendMessage(reader, (SendMessageExpiresMessage)message); handleSendMessage((SendMessageExpiresMessage)message);
break; break;
case ReceiveMessageBeginMessage.MESSAGE_TYPE: case ReceiveMessageBeginMessage.MESSAGE_TYPE:
handleReceiveBegin(reader, (ReceiveMessageBeginMessage)message); handleReceiveBegin((ReceiveMessageBeginMessage)message);
break; break;
case ReceiveMessageEndMessage.MESSAGE_TYPE: case ReceiveMessageEndMessage.MESSAGE_TYPE:
handleReceiveEnd(reader, (ReceiveMessageEndMessage)message); handleReceiveEnd((ReceiveMessageEndMessage)message);
break; break;
case CreateLeaseSetMessage.MESSAGE_TYPE: case CreateLeaseSetMessage.MESSAGE_TYPE:
handleCreateLeaseSet(reader, (CreateLeaseSetMessage)message); handleCreateLeaseSet((CreateLeaseSetMessage)message);
break; break;
case DestroySessionMessage.MESSAGE_TYPE: case DestroySessionMessage.MESSAGE_TYPE:
handleDestroySession(reader, (DestroySessionMessage)message); handleDestroySession((DestroySessionMessage)message);
break; break;
case DestLookupMessage.MESSAGE_TYPE: case DestLookupMessage.MESSAGE_TYPE:
handleDestLookup(reader, (DestLookupMessage)message); handleDestLookup((DestLookupMessage)message);
break; break;
case ReconfigureSessionMessage.MESSAGE_TYPE: case ReconfigureSessionMessage.MESSAGE_TYPE:
handleReconfigureSession(reader, (ReconfigureSessionMessage)message); handleReconfigureSession((ReconfigureSessionMessage)message);
break; break;
case GetBandwidthLimitsMessage.MESSAGE_TYPE: case GetBandwidthLimitsMessage.MESSAGE_TYPE:
handleGetBWLimits(reader, (GetBandwidthLimitsMessage)message); handleGetBWLimits((GetBandwidthLimitsMessage)message);
break; break;
default: default:
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.ERROR))
@@ -131,7 +131,7 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
_runner.disconnected(); _runner.disconnected();
} }
private void handleGetDate(I2CPMessageReader reader, GetDateMessage message) { private void handleGetDate(GetDateMessage message) {
// sent by clients >= 0.8.7 // sent by clients >= 0.8.7
String clientVersion = message.getVersion(); String clientVersion = message.getVersion();
if (clientVersion != null) if (clientVersion != null)
@@ -148,7 +148,7 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
/** /**
* As of 0.8.7, does nothing. Do not allow a client to set the router's clock. * As of 0.8.7, does nothing. Do not allow a client to set the router's clock.
*/ */
private void handleSetDate(I2CPMessageReader reader, SetDateMessage message) { private void handleSetDate(SetDateMessage message) {
//_context.clock().setNow(message.getDate().getTime()); //_context.clock().setNow(message.getDate().getTime());
} }
@@ -160,7 +160,7 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
* DisconnectMessage in return, and not wait around for our DisconnectMessage. * DisconnectMessage in return, and not wait around for our DisconnectMessage.
* So keep it simple. * So keep it simple.
*/ */
private void handleCreateSession(I2CPMessageReader reader, CreateSessionMessage message) { private void handleCreateSession(CreateSessionMessage message) {
SessionConfig in = message.getSessionConfig(); SessionConfig in = message.getSessionConfig();
if (in.verifySignature()) { if (in.verifySignature()) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
@@ -209,17 +209,24 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("after sessionEstablished for " + message.getSessionConfig().getDestination().calculateHash().toBase64()); _log.debug("after sessionEstablished for " + message.getSessionConfig().getDestination().calculateHash().toBase64());
startCreateSessionJob();
_context.jobQueue().addJob(new CreateSessionJob(_context, _runner));
} }
/**
* Override for testing
* @since 0.9.8
*
*/
protected void startCreateSessionJob() {
_context.jobQueue().addJob(new CreateSessionJob(_context, _runner));
}
/** /**
* Handle a SendMessageMessage: give it a message Id, have the ClientManager distribute * Handle a SendMessageMessage: give it a message Id, have the ClientManager distribute
* it, and send the client an ACCEPTED message * it, and send the client an ACCEPTED message
* *
*/ */
private void handleSendMessage(I2CPMessageReader reader, SendMessageMessage message) { private void handleSendMessage(SendMessageMessage message) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("handleSendMessage called"); _log.debug("handleSendMessage called");
long beforeDistribute = _context.clock().now(); long beforeDistribute = _context.clock().now();
@@ -236,7 +243,7 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
* The client asked for a message, so we send it to them. * The client asked for a message, so we send it to them.
* *
*/ */
private void handleReceiveBegin(I2CPMessageReader reader, ReceiveMessageBeginMessage message) { private void handleReceiveBegin(ReceiveMessageBeginMessage message) {
if (_runner.isDead()) return; if (_runner.isDead()) return;
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Handling recieve begin: id = " + message.getMessageId()); _log.debug("Handling recieve begin: id = " + message.getMessageId());
@@ -266,17 +273,18 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
* pending queue, though it should. * pending queue, though it should.
* *
*/ */
private void handleReceiveEnd(I2CPMessageReader reader, ReceiveMessageEndMessage message) { private void handleReceiveEnd(ReceiveMessageEndMessage message) {
_runner.removePayload(new MessageId(message.getMessageId())); _runner.removePayload(new MessageId(message.getMessageId()));
} }
private void handleDestroySession(I2CPMessageReader reader, DestroySessionMessage message) { private void handleDestroySession(DestroySessionMessage message) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Destroying client session " + _runner.getSessionId()); _log.info("Destroying client session " + _runner.getSessionId());
_runner.stopRunning(); _runner.stopRunning();
} }
private void handleCreateLeaseSet(I2CPMessageReader reader, CreateLeaseSetMessage message) { /** override for testing */
protected void handleCreateLeaseSet(CreateLeaseSetMessage message) {
if ( (message.getLeaseSet() == null) || (message.getPrivateKey() == null) || (message.getSigningPrivateKey() == null) ) { if ( (message.getLeaseSet() == null) || (message.getPrivateKey() == null) || (message.getSigningPrivateKey() == null) ) {
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.ERROR))
_log.error("Null lease set granted: " + message); _log.error("Null lease set granted: " + message);
@@ -293,7 +301,8 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
_runner.leaseSetCreated(message.getLeaseSet()); _runner.leaseSetCreated(message.getLeaseSet());
} }
private void handleDestLookup(I2CPMessageReader reader, DestLookupMessage message) { /** override for testing */
protected void handleDestLookup(DestLookupMessage message) {
_context.jobQueue().addJob(new LookupDestJob(_context, _runner, message.getHash())); _context.jobQueue().addJob(new LookupDestJob(_context, _runner, message.getHash()));
} }
@@ -305,7 +314,7 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
* Note that this does NOT update the few options handled in * Note that this does NOT update the few options handled in
* ClientConnectionRunner.sessionEstablished(). Those can't be changed later. * ClientConnectionRunner.sessionEstablished(). Those can't be changed later.
*/ */
private void handleReconfigureSession(I2CPMessageReader reader, ReconfigureSessionMessage message) { private void handleReconfigureSession(ReconfigureSessionMessage message) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Updating options - old: " + _runner.getConfig() + " new: " + message.getSessionConfig()); _log.info("Updating options - old: " + _runner.getConfig() + " new: " + message.getSessionConfig());
if (!message.getSessionConfig().getDestination().equals(_runner.getConfig().getDestination())) { if (!message.getSessionConfig().getDestination().equals(_runner.getConfig().getDestination())) {
@@ -343,7 +352,7 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
* This could someday give a different answer to each client. * This could someday give a different answer to each client.
* But it's not enforced anywhere. * But it's not enforced anywhere.
*/ */
private void handleGetBWLimits(I2CPMessageReader reader, GetBandwidthLimitsMessage message) { protected void handleGetBWLimits(GetBandwidthLimitsMessage message) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Got BW Limits request"); _log.info("Got BW Limits request");
int in = _context.bandwidthLimiter().getInboundKBytesPerSecond() * 4 / 7; int in = _context.bandwidthLimiter().getInboundKBytesPerSecond() * 4 / 7;

View File

@@ -0,0 +1,67 @@
package net.i2p.router.client;
import java.net.Socket;
import net.i2p.data.Destination;
import net.i2p.data.Hash;
import net.i2p.data.Lease;
import net.i2p.data.LeaseSet;
import net.i2p.data.i2cp.I2CPMessageException;
import net.i2p.data.i2cp.I2CPMessageReader;
import net.i2p.data.i2cp.RequestVariableLeaseSetMessage;
import net.i2p.router.Job;
import net.i2p.router.RouterContext;
/**
* For testing
*
* @since 0.9.8
*/
class LocalClientConnectionRunner extends ClientConnectionRunner {
/**
* Create a new runner with the given queues
*
*/
public LocalClientConnectionRunner(RouterContext context, ClientManager manager, Socket socket) {
super(context, manager, socket);
}
/**
* Custom listener
*/
@Override
protected I2CPMessageReader.I2CPMessageEventListener createListener() {
return new LocalClientMessageEventListener(_context, this, true);
}
/**
* Just send the message directly,
* don't instantiate a RequestLeaseSetJob
*/
@Override
void requestLeaseSet(LeaseSet set, long expirationTime, Job onCreateJob, Job onFailedJob) {
RequestVariableLeaseSetMessage msg = new RequestVariableLeaseSetMessage();
msg.setSessionId(getSessionId());
for (int i = 0; i < set.getLeaseCount(); i++) {
Lease lease = set.getLease(i);
msg.addEndpoint(lease);
}
try {
doSend(msg);
} catch (I2CPMessageException ime) {
ime.printStackTrace();
}
}
/**
* So LocalClientMessageEventListener can lookup other local dests
*/
public Destination localLookup(Hash h) {
for (Destination d : _manager.getRunnerDestinations()) {
if (d.calculateHash().equals(h))
return d;
}
return null;
}
}

View File

@@ -0,0 +1,21 @@
package net.i2p.router.client;
import net.i2p.router.RouterContext;
/**
* For testing
*
* @since 0.9.8
*/
class LocalClientListenerRunner extends ClientListenerRunner {
public LocalClientListenerRunner(RouterContext context, ClientManager manager, int port) {
super(context, manager, port);
}
@Override
protected void runConnection(Socket socket) {
ClientConnectionRunner runner = new LocalClientConnectionRunner(_context, _manager, socket);
_manager.registerConnection(runner);
}
}

View File

@@ -0,0 +1,77 @@
package net.i2p.router.client;
/*
* free (adj.): unencumbered; not under the control of others
* Written by jrandom in 2003 and released into the public domain
* with no warranty of any kind, either expressed or implied.
* It probably won't make your computer catch on fire, or eat
* your children, but it might. Use at your own risk.
*
*/
import net.i2p.data.Destination;
import net.i2p.data.Payload;
import net.i2p.data.i2cp.MessageId;
import net.i2p.data.i2cp.MessageStatusMessage;
import net.i2p.router.RouterContext;
import net.i2p.util.I2PThread;
/**
* For testing clients without a full router.
* A complete router-side I2CP implementation, without requiring a router
* or any RouterContext subsystems or threads.
* Clients may connect only to other local clients.
* Lookups and bw limit messages also supported.
*
* @since 0.9.8
*/
class LocalClientManager extends ClientManager {
/**
* @param context stub, may be constructed with new RouterContext(null),
* no initAll() necessary
*/
public LocalClientManager(RouterContext context, int port) {
super(context, port);
}
@Override
protected void startListeners() {
_listener = new LocalClientListenerRunner(_ctx, this, _port);
Thread t = new I2PThread(_listener, "ClientListener:" + _port, true);
t.start();
_isStarted = true;
}
/**
* Local only
* TODO: add simulated delay and random drops to test streaming.
*
* @param flags ignored for local
*/
@Override
void distributeMessage(Destination fromDest, Destination toDest, Payload payload, MessageId msgId, long expiration, int flags) {
// check if there is a runner for it
ClientConnectionRunner sender = getRunner(fromDest);
ClientConnectionRunner runner = getRunner(toDest);
if (runner != null) {
runner.receiveMessage(toDest, fromDest, payload);
if (sender != null)
sender.updateMessageDeliveryStatus(msgId, MessageStatusMessage.STATUS_SEND_SUCCESS_LOCAL);
} else {
// remote. ignore.
System.out.println("Message " + msgId + " is targeting a REMOTE destination - DROPPED");
if (sender != null)
sender.updateMessageDeliveryStatus(msgId, MessageStatusMessage.STATUS_SEND_GUARANTEED_FAILURE);
}
}
public static void main(String args[]) {
RouterContext ctx = new RouterContext(null);
int port = ClientManagerFacadeImpl.DEFAULT_PORT;
ClientManager mgr = new LocalClientManager(ctx, port);
mgr.start();
System.out.println("Listening on port " + port);
try { Thread.sleep(5*60*1000); } catch (InterruptedException ie) {}
System.out.println("Done listening on port " + port);
}
}

View File

@@ -0,0 +1,94 @@
package net.i2p.router.client;
/*
* free (adj.): unencumbered; not under the control of others
* Written by jrandom in 2003 and released into the public domain
* with no warranty of any kind, either expressed or implied.
* It probably won't make your computer catch on fire, or eat
* your children, but it might. Use at your own risk.
*
*/
import java.util.Date;
import net.i2p.data.Destination;
import net.i2p.data.Hash;
import net.i2p.data.Lease;
import net.i2p.data.LeaseSet;
import net.i2p.data.TunnelId;
import net.i2p.data.i2cp.BandwidthLimitsMessage;
import net.i2p.data.i2cp.CreateLeaseSetMessage;
import net.i2p.data.i2cp.DestLookupMessage;
import net.i2p.data.i2cp.DestReplyMessage;
import net.i2p.data.i2cp.GetBandwidthLimitsMessage;
import net.i2p.data.i2cp.I2CPMessageException;
import net.i2p.router.RouterContext;
/**
* For testing
*
* @since 0.9.8
*/
class LocalClientMessageEventListener extends ClientMessageEventListener {
public LocalClientMessageEventListener(RouterContext context, ClientConnectionRunner runner, boolean enforceAuth) {
super(context, runner, enforceAuth);
}
/**
* Immediately send a fake leaseset
*/
@Override
protected void startCreateSessionJob() {
long exp = _context.clock().now() + 10*60*1000;
LeaseSet ls = new LeaseSet();
Lease lease = new Lease();
lease.setGateway(Hash.FAKE_HASH);
TunnelId id = new TunnelId(1);
lease.setTunnelId(id);
Date date = new Date(exp);
lease.setEndDate(date);
ls.addLease(lease);
_runner.requestLeaseSet(ls, exp, null, null);
}
/**
* Don't tell the netdb or key manager
*/
@Override
protected void handleCreateLeaseSet(CreateLeaseSetMessage message) {
_runner.leaseSetCreated(message.getLeaseSet());
}
/**
* Look only in current local dests
*/
@Override
protected void handleDestLookup(DestLookupMessage message) {
Hash h = message.getHash();
DestReplyMessage msg;
Destination d = ((LocalClientConnectionRunner)_runner).localLookup(h);
if (d != null)
msg = new DestReplyMessage(d);
else
msg = new DestReplyMessage(h);
try {
_runner.doSend(msg);
} catch (I2CPMessageException ime) {
ime.printStackTrace();
}
}
/**
* Send dummy limits
*/
@Override
protected void handleGetBWLimits(GetBandwidthLimitsMessage message) {
int limit = 1024*1024;
BandwidthLimitsMessage msg = new BandwidthLimitsMessage(limit, limit);
try {
_runner.doSend(msg);
} catch (I2CPMessageException ime) {
ime.printStackTrace();
}
}
}