diff --git a/router/java/src/net/i2p/router/MultiRouter.java b/router/java/src/net/i2p/router/MultiRouter.java
new file mode 100644
index 000000000..4bd110ac1
--- /dev/null
+++ b/router/java/src/net/i2p/router/MultiRouter.java
@@ -0,0 +1,116 @@
+package net.i2p.router;
+
+import java.io.IOException;
+import java.io.FileInputStream;
+import java.util.ArrayList;
+import java.util.Properties;
+import net.i2p.I2PAppContext;
+import net.i2p.util.Log;
+
+/**
+ * Fire up multiple routers in the same VM, all with their own RouterContext
+ * (and all that entails). In addition, this creates a root I2PAppContext for
+ * any objects not booted through one of the RouterContexts. Each of these
+ * contexts are configured through a simple properties file (where the name=value
+ * contained in them are used for the context's getProperty(name)).
+ *
+ * Usage:
+ * MultiRouter globalContextFile routerContextFile[ routerContextFile]*
+ *
+ *
+ * Each routerContext specified is used to boot up a single router. It is HIGHLY
+ * recommended that those context files contain a few base env properties:
+ * - loggerFilenameOverride=rN/logs/log-router-#.txt
+ * - router.configLocation=rN/router.config
+ *
+ * (where "rN" is an instance number, such as r0 or r9).
+ * Additionally, two other properties might be useful:
+ * - i2p.vmCommSystem=true
+ * - i2p.encryption=off
+ *
+ * The first line tells the router to use an in-VM comm system for sending
+ * messages back and forth between routers (see net.i2p.transport.VMCommSystem),
+ * and the second tells the router to stub out ElGamal, AES, and DSA code, reducing
+ * the CPU load (but obviously making the router incapable of talking to things
+ * that need the encryption enabled). To run a client app through a router that
+ * has i2p.encryption=off, you should also add that line to the client's JVM
+ * (for example, java -Di2p.encryption=off -jar lib/i2ptunnel.jar
).
+ *
+ * The multirouter waits until all of the routers are shut down (which none will
+ * do on their own, so as before, you'll want to kill the proc or ^C it).
+ */
+public class MultiRouter {
+ private static Log _log;
+ private static ArrayList _routers = new ArrayList(8);
+ private static I2PAppContext _defaultContext;
+
+ public static void main(String args[]) {
+ if ( (args == null) || (args.length <= 1) ) {
+ usage();
+ return;
+ }
+ _defaultContext = new I2PAppContext(getEnv(args[0]));
+ _log = _defaultContext.logManager().getLog(MultiRouter.class);
+ try { Thread.sleep(5*1000); } catch (InterruptedException ie) {}
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ public void run() {
+ try { Thread.sleep(15*1000); } catch (InterruptedException ie) {}
+ Runtime.getRuntime().halt(-1);
+ }
+ });
+
+ for (int i = 1; i < args.length; i++) {
+ Router router = new Router(getEnv(args[i]));
+ router.setKillVMOnEnd(false);
+ _routers.add(router);
+ _log.info("Router " + i + " created from " + args[i]);
+ try { Thread.sleep(1*1000); } catch (InterruptedException ie) {}
+ }
+
+ for (int i = 0; i < _routers.size(); i++) {
+ ((Router)_routers.get(i)).runRouter();
+ //try { Thread.sleep(10*1000); } catch (InterruptedException ie) {}
+ }
+ _log.info("All " + _routers.size() + " routers started up");
+ waitForCompletion();
+ }
+
+ private static Properties getEnv(String filename) {
+ Properties props = new Properties();
+ try {
+ props.load(new FileInputStream(filename));
+ return props;
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+ return null;
+ }
+ }
+
+ private static void waitForCompletion() {
+ while (true) {
+ int alive = 0;
+ for (int i = 0; i < _routers.size(); i++) {
+ Router r = (Router)_routers.get(i);
+ if (!r.isAlive()) {
+ _log.info("Router " + i + " is dead");
+ } else {
+ alive++;
+ }
+ }
+ if (alive > 0) {
+ try { Thread.sleep(30*1000); } catch (InterruptedException ie) {}
+ } else {
+ break;
+ }
+ }
+ _log.info("All routers shut down");
+ }
+
+ private static void usage() {
+ System.err.println("Usage: MultiRouter globalContextFile routerContextFile[ routerContextFile]*");
+ System.err.println(" The context files contain key=value entries specifying properties");
+ System.err.println(" to load into the given context. In addition, each routerContextFile");
+ System.err.println(" in turn is used to boot a router");
+ }
+}
\ No newline at end of file
diff --git a/router/java/src/net/i2p/router/MultiRouterBuilder.java b/router/java/src/net/i2p/router/MultiRouterBuilder.java
new file mode 100644
index 000000000..da50393b1
--- /dev/null
+++ b/router/java/src/net/i2p/router/MultiRouterBuilder.java
@@ -0,0 +1,204 @@
+package net.i2p.router;
+
+import java.io.IOException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.InputStreamReader;
+import java.io.BufferedReader;
+
+/**
+ * Build a set of config files suitable for use by the multirouter as a
+ * simulation, as well as a pair of scripts for running the simulation.
+ * Usage:
+ * MultiRouterBuilder [routerDir routerPortStart]*
+ *
+ *
+ * Each router is configured with their own $routerDir/router.config file so
+ * that all of its data is stored under the $routerDir (profiles, keys, netDb,
+ * etc). In addition, each router has the i2cp port set to $routerPortStart+1,
+ * the admin port set to $routerPortStart+2, and some commented out clientApp
+ * lines (configured with the SAM bridge at $routerPortStart+3 and an EepProxy at
+ * $routerPortStart+4).
+ *
+ * It then builds a $routerDir/heartbeat.config containing the following lines:
+ * - i2cpHost=localhost
+ * - i2cpPort=$routerPortStart+1
+ * - numHops=2
+ * - privateDestinationFile=$routerDir/heartbeat.keys
+ * - publicDestinationFile=$routerDir/heartbeat.txt
+ *
+ *
+ * Then it goes on to create the $routerDir/routerEnv.txt:
+ * - loggerFilenameOverride=$routerDir/logs/log-router-#.txt
+ * - router.configLocation=$routerDir/router.config
+ * - i2p.vmCommSystem=true
+ * - i2p.encryption=off
+ *
+ *
+ * In addition, it creates a baseEnv.txt:
+ * - loggerFilenameOverride=logs/log-base-#.txt
+ *
+ *
+ * Finally, it creates the MultiRouter startup script to launch all of these
+ * routers, stored at runNetSim.bat / runNetSim.sh
+ *
+ */
+public class MultiRouterBuilder {
+ public static void main(String args[]) {
+ if (args.length <= 2) {
+ usage();
+ return;
+ }
+ for (int i = 0; i < args.length; i += 2) {
+ String dir = args[i];
+ try {
+ int basePortNum = Integer.parseInt(args[i+1]);
+ buildConfig(dir, basePortNum);
+ } catch (NumberFormatException nfe) {
+ nfe.printStackTrace();
+ }
+ }
+ buildBaseEnv();
+ buildStartupScript(args);
+ }
+
+ private static void buildBaseEnv() {
+ File envFile = new File("baseEnv.txt");
+ try {
+ FileOutputStream fos = new FileOutputStream(envFile);
+ fos.write(("loggerFilenameOverride=logs/log-base-#.txt\n").getBytes());
+ fos.close();
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+ }
+ File f = new File("logs");
+ f.mkdirs();
+ }
+
+ private static void buildStartupScript(String args[]) {
+ buildStartupScriptWin(args);
+ buildStartupScriptNix(args);
+ }
+ private static void buildStartupScriptWin(String args[]) {
+ StringBuffer buf = new StringBuffer(4096);
+ buf.append("@echo off\ntitle I2P Router Sim\n");
+ buf.append("echo After all of the routers have started up, you should cross seed them\n");
+ buf.append("echo Simply copy */netDb/routerInfo-* to all of the various */netDb/ directories\n");
+ buf.append("java -cp lib\\i2p.jar;lib\\router.jar;lib\\mstreaming.jar;");
+ buf.append("lib\\heartbeat.jar;lib\\i2ptunnel.jar;lib\\netmonitor.jar;");
+ buf.append("lib\\sam.jar ");
+ buf.append("-Djava.library.path=. ");
+ buf.append("-DloggerFilenameOverride=logs\\log-sim-#.txt ");
+ buf.append("net.i2p.router.MultiRouter baseEnv.txt ");
+ for (int i = 0; i < args.length; i += 2)
+ buf.append(args[i]).append("\\routerEnv.txt ");
+ buf.append("\npause\n");
+ try {
+ FileOutputStream fos = new FileOutputStream("runNetSim.bat");
+ fos.write(buf.toString().getBytes());
+ fos.close();
+ } catch (IOException ioe) { ioe.printStackTrace(); }
+ }
+
+ private static void buildStartupScriptNix(String args[]) {
+ StringBuffer buf = new StringBuffer(4096);
+ buf.append("#!/bin/sh\n");
+ buf.append("nohup java -cp lib/i2p.jar:lib/router.jar:lib/mstreaming.jar:");
+ buf.append("lib/heartbeat.jar:lib/i2ptunnel.jar:lib/netmonitor.jar:");
+ buf.append("lib/sam.jar ");
+ buf.append("-Djava.library.path=. ");
+ buf.append("-DloggerFilenameOverride=logs/log-sim-#.txt ");
+ buf.append("net.i2p.router.MultiRouter baseEnv.txt ");
+ for (int i = 1; i < args.length; i += 2)
+ buf.append(args[i]).append("/routerEnv.txt ");
+ buf.append(" > sim.txt &\n");
+ buf.append("echo $! > sim.pid\n");
+ buf.append("echo \"After all of the routers have started up, you should cross seed them\"\n");
+ buf.append("echo \"Simply copy */netDb/routerInfo-* to all of the various */netDb/ directories\"\n");
+ try {
+ FileOutputStream fos = new FileOutputStream("runNetSim.sh");
+ fos.write(buf.toString().getBytes());
+ fos.close();
+ } catch (IOException ioe) { ioe.printStackTrace(); }
+ }
+
+ private static void buildConfig(String dir, int basePort) {
+ File baseDir = new File(dir);
+ baseDir.mkdirs();
+ File cfgFile = new File(baseDir, "router.config");
+ StringBuffer buf = new StringBuffer(8*1024);
+ buf.append("i2np.bandwidth.inboundBytesPerMinute=-60\n");
+ buf.append("i2np.bandwidth.outboundBytesPerMinute=-60\n");
+ buf.append("router.publishPeerRankings=true\n");
+ buf.append("router.keepHistory=true\n");
+ buf.append("router.submitHistory=false\n");
+ buf.append("router.maxJobRunners=1\n");
+ buf.append("router.jobLagWarning=10000\n");
+ buf.append("router.jobLagFatal=30000\n");
+ buf.append("router.jobRunWarning=10000\n");
+ buf.append("router.jobRunFatal=30000\n");
+ buf.append("router.jobWarmupTime=600000\n");
+ buf.append("router.targetClients=2\n");
+ buf.append("tunnels.numInbound=2\n");
+ buf.append("tunnels.numOutbound=2\n");
+ buf.append("tunnels.depthInbound=2\n");
+ buf.append("tunnels.depthOutbound=2\n");
+ buf.append("tunnels.tunnelDuration=600000\n");
+ buf.append("router.maxWaitingJobs=30\n");
+ buf.append("router.profileDir=").append(baseDir.getPath()).append("/peerProfiles\n");
+ buf.append("router.historyFilename=").append(baseDir.getPath()).append("/messageHistory.txt\n");
+ buf.append("router.sessionKeys.location=").append(baseDir.getPath()).append("/sessionKeys.dat\n");
+ buf.append("router.info.location=").append(baseDir.getPath()).append("/router.info\n");
+ buf.append("router.keys.location=").append(baseDir.getPath()).append("/router.keys\n");
+ buf.append("router.networkDatabase.dbDir=").append(baseDir.getPath()).append("/netDb\n");
+ buf.append("router.tunnelPoolFile=").append(baseDir.getPath()).append("/tunnelPool.dat\n");
+ buf.append("router.keyBackupDir=").append(baseDir.getPath()).append("/keyBackup\n");
+ buf.append("i2np.tcp.port=").append(basePort).append('\n');
+ buf.append("i2cp.port=").append(basePort+1).append('\n');
+ buf.append("router.adminPort=").append(basePort+2).append('\n');
+ buf.append("#clientApp.0.main=net.i2p.sam.SAMBridge\n");
+ buf.append("#clientApp.0.name=SAM\n");
+ buf.append("#clientApp.0.args=localhost ").append(basePort+3).append(" i2cp.tcp.host=localhost i2cp.tcp.port=").append(basePort+1).append("\n");
+ buf.append("#clientApp.1.main=net.i2p.i2ptunnel.I2PTunnel\n");
+ buf.append("#clientApp.1.name=EepProxy\n");
+ buf.append("#clientApp.1.args=-nogui -e \"config localhost ").append(basePort+1).append("\" -e \"httpclient ").append(basePort+4).append("\"\n");
+ buf.append("#clientApp.2.main=net.i2p.heartbeat.Heartbeat\n");
+ buf.append("#clientApp.2.name=Heartbeat\n");
+ buf.append("#clientApp.2.args=").append(baseDir.getPath()).append("/heartbeat.config\n");
+
+ try {
+ FileOutputStream fos = new FileOutputStream(cfgFile);
+ fos.write(buf.toString().getBytes());
+ fos.close();
+
+ fos = new FileOutputStream(new File(baseDir, "heartbeat.config"));
+ StringBuffer tbuf = new StringBuffer(1024);
+ tbuf.append("i2cpHost=localhost\n");
+ tbuf.append("i2cpPort=").append(basePort+1).append('\n');
+ tbuf.append("numHops=2\n");
+ tbuf.append("privateDestinationFile=").append(baseDir.getPath()).append("/heartbeat.keys\n");
+ tbuf.append("publicDestinationFile=").append(baseDir.getPath()).append("/heartbeat.txt\n");
+ fos.write(tbuf.toString().getBytes());
+ fos.close();
+
+
+ File envFile = new File(baseDir, "routerEnv.txt");
+ fos = new FileOutputStream(envFile);
+ fos.write(("loggerFilenameOverride="+baseDir+ "/logs/log-router-#.txt\n").getBytes());
+ fos.write(("router.configLocation="+baseDir+"/router.config\n").getBytes());
+ fos.write(("i2p.vmCommSystem=true\n").getBytes());
+ fos.write(("i2p.encryption=off\n").getBytes());
+ fos.close();
+
+ File f = new File(baseDir, "logs");
+ f.mkdirs();
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+ }
+ }
+
+ private static void usage() {
+ System.err.println("Usage: MultiRouterBuilder [routerDir routerPortStart]*");
+ }
+}
\ No newline at end of file
diff --git a/router/java/src/net/i2p/router/Router.java b/router/java/src/net/i2p/router/Router.java
index 8161a5d85..5c2dcfeff 100644
--- a/router/java/src/net/i2p/router/Router.java
+++ b/router/java/src/net/i2p/router/Router.java
@@ -62,6 +62,10 @@ public class Router {
private long _started;
private boolean _higherVersionSeen;
private SessionKeyPersistenceHelper _sessionKeyPersistenceHelper;
+ private boolean _killVMOnEnd;
+ private boolean _isAlive;
+ private I2PThread.OOMEventListener _oomListener;
+ private ShutdownHook _shutdownHook;
public final static String PROP_CONFIG_FILE = "router.configLocation";
@@ -73,21 +77,47 @@ public class Router {
public final static String PROP_KEYS_FILENAME = "router.keys.location";
public final static String PROP_KEYS_FILENAME_DEFAULT = "router.keys";
- public Router() {
+ static {
// grumble about sun's java caching DNS entries *forever*
System.setProperty("sun.net.inetaddr.ttl", "0");
System.setProperty("networkaddress.cache.ttl", "0");
// (no need for keepalive)
System.setProperty("http.keepAlive", "false");
+ }
+
+ public Router() { this(null, null); }
+ public Router(Properties envProps) { this(null, envProps); }
+ public Router(String configFilename) { this(configFilename, null); }
+ public Router(String configFilename, Properties envProps) {
_config = new Properties();
- _context = new RouterContext(this);
- _configFilename = _context.getProperty(PROP_CONFIG_FILE, "router.config");
+ _context = new RouterContext(this, envProps);
+ if (configFilename == null)
+ _configFilename = _context.getProperty(PROP_CONFIG_FILE, "router.config");
+ else
+ _configFilename = configFilename;
_routerInfo = null;
_higherVersionSeen = false;
_log = _context.logManager().getLog(Router.class);
+ _log.info("New router created with config file " + _configFilename);
_sessionKeyPersistenceHelper = new SessionKeyPersistenceHelper(_context);
+ _killVMOnEnd = true;
+ _oomListener = new I2PThread.OOMEventListener() {
+ public void outOfMemory(OutOfMemoryError oom) {
+ _log.log(Log.CRIT, "Thread ran out of memory", oom);
+ shutdown();
+ }
+ };
+ _shutdownHook = new ShutdownHook();
}
+ /**
+ * Configure the router to kill the JVM when the router shuts down, as well
+ * as whether to explicitly halt the JVM during the hard fail process.
+ *
+ */
+ public void setKillVMOnEnd(boolean shouldDie) { _killVMOnEnd = shouldDie; }
+ public boolean getKillVMOnEnd() { return _killVMOnEnd; }
+
public String getConfigFilename() { return _configFilename; }
public void setConfigFilename(String filename) { _configFilename = filename; }
@@ -115,15 +145,11 @@ public class Router {
/** wall clock uptime */
public long getUptime() { return _context.clock().now() - _context.clock().getOffset() - _started; }
- private void runRouter() {
+ void runRouter() {
+ _isAlive = true;
_started = _context.clock().now();
- Runtime.getRuntime().addShutdownHook(new ShutdownHook());
- I2PThread.setOOMEventListener(new I2PThread.OOMEventListener() {
- public void outOfMemory(OutOfMemoryError oom) {
- _log.log(Log.CRIT, "Thread ran out of memory", oom);
- shutdown();
- }
- });
+ Runtime.getRuntime().addShutdownHook(_shutdownHook);
+ I2PThread.addOOMEventListener(_oomListener);
setupHandlers();
startupQueue();
_context.jobQueue().addJob(new CoallesceStatsJob());
@@ -133,6 +159,8 @@ public class Router {
_context.jobQueue().addJob(new StartupJob(_context));
}
+ public boolean isAlive() { return _isAlive; }
+
/**
* coallesce the stats framework every minute
*
@@ -355,6 +383,8 @@ public class Router {
}
public void shutdown() {
+ _isAlive = false;
+ I2PThread.removeOOMEventListener(_oomListener);
try { _context.jobQueue().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the job queue", t); }
try { _context.statPublisher().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the stats manager", t); }
try { _context.clientManager().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the client manager", t); }
@@ -366,8 +396,10 @@ public class Router {
dumpStats();
_log.log(Log.CRIT, "Shutdown complete", new Exception("Shutdown"));
try { _context.logManager().shutdown(); } catch (Throwable t) { }
- try { Thread.sleep(1000); } catch (InterruptedException ie) {}
- Runtime.getRuntime().halt(-1);
+ if (_killVMOnEnd) {
+ try { Thread.sleep(1000); } catch (InterruptedException ie) {}
+ Runtime.getRuntime().halt(-1);
+ }
}
private void dumpStats() {
diff --git a/router/java/src/net/i2p/router/RouterContext.java b/router/java/src/net/i2p/router/RouterContext.java
index 99e546dfc..5e82b333c 100644
--- a/router/java/src/net/i2p/router/RouterContext.java
+++ b/router/java/src/net/i2p/router/RouterContext.java
@@ -4,6 +4,7 @@ import net.i2p.data.Hash;
import net.i2p.router.client.ClientManagerFacadeImpl;
import net.i2p.router.transport.OutboundMessageRegistry;
import net.i2p.router.networkdb.kademlia.KademliaNetworkDatabaseFacade;
+import net.i2p.router.transport.VMCommSystem;
import net.i2p.router.transport.CommSystemFacadeImpl;
import net.i2p.router.transport.BandwidthLimiter;
import net.i2p.router.transport.TrivialBandwidthLimiter;
@@ -68,7 +69,10 @@ public class RouterContext extends I2PAppContext {
_messageRegistry = new OutboundMessageRegistry(this);
_netDb = new KademliaNetworkDatabaseFacade(this);
_keyManager = new KeyManager(this);
- _commSystem = new CommSystemFacadeImpl(this);
+ if ("false".equals(getProperty("i2p.vmCommSystem", "false")))
+ _commSystem = new CommSystemFacadeImpl(this);
+ else
+ _commSystem = new VMCommSystem(this);
_profileOrganizer = new ProfileOrganizer(this);
_peerManagerFacade = new PeerManagerFacadeImpl(this);
_profileManager = new ProfileManagerImpl(this);
@@ -184,4 +188,33 @@ public class RouterContext extends I2PAppContext {
public Calculator speedCalculator() { return _speedCalc; }
/** how do we rank the reliability of profiles? */
public Calculator reliabilityCalculator() { return _reliabilityCalc; }
+
+ public String toString() {
+ StringBuffer buf = new StringBuffer(512);
+ buf.append("RouterContext: ").append(super.toString()).append('\n');
+ buf.append(_router).append('\n');
+ buf.append(_clientManagerFacade).append('\n');
+ buf.append(_clientMessagePool).append('\n');
+ buf.append(_jobQueue).append('\n');
+ buf.append(_inNetMessagePool).append('\n');
+ buf.append(_outNetMessagePool).append('\n');
+ buf.append(_messageHistory).append('\n');
+ buf.append(_messageRegistry).append('\n');
+ buf.append(_netDb).append('\n');
+ buf.append(_keyManager).append('\n');
+ buf.append(_commSystem).append('\n');
+ buf.append(_profileOrganizer).append('\n');
+ buf.append(_peerManagerFacade).append('\n');
+ buf.append(_profileManager).append('\n');
+ buf.append(_bandwidthLimiter).append('\n');
+ buf.append(_tunnelManager).append('\n');
+ buf.append(_statPublisher).append('\n');
+ buf.append(_shitlist).append('\n');
+ buf.append(_messageValidator).append('\n');
+ buf.append(_isFailingCalc).append('\n');
+ buf.append(_integrationCalc).append('\n');
+ buf.append(_speedCalc).append('\n');
+ buf.append(_reliabilityCalc).append('\n');
+ return buf.toString();
+ }
}
\ No newline at end of file
diff --git a/router/java/src/net/i2p/router/transport/VMCommSystem.java b/router/java/src/net/i2p/router/transport/VMCommSystem.java
new file mode 100644
index 000000000..3592b4133
--- /dev/null
+++ b/router/java/src/net/i2p/router/transport/VMCommSystem.java
@@ -0,0 +1,126 @@
+package net.i2p.router.transport;
+
+import net.i2p.data.Hash;
+import net.i2p.data.i2np.I2NPMessage;
+import net.i2p.data.i2np.I2NPMessageHandler;
+import net.i2p.router.RouterContext;
+import net.i2p.router.CommSystemFacade;
+import net.i2p.router.OutNetMessage;
+import net.i2p.router.InNetMessage;
+import net.i2p.router.InNetMessagePool;
+import net.i2p.router.JobImpl;
+import net.i2p.util.Log;
+
+import java.io.ByteArrayInputStream;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Collections;
+
+/**
+ * Hacked up in-VM comm system for talking between contexts. It doesn't even
+ * generate any routerAddresses, but instead tracks the peers through a singleton.
+ * Currently, the comm system doesn't even inject any lag, though it could (later).
+ * It does honor the standard transport stats though, but not the TCP specific ones.
+ *
+ */
+public class VMCommSystem extends CommSystemFacade {
+ private Log _log;
+ private RouterContext _context;
+ /**
+ * Mapping from Hash to VMCommSystem for all routers hooked together
+ */
+ private static Map _commSystemFacades = Collections.synchronizedMap(new HashMap(16));
+
+ public VMCommSystem(RouterContext context) {
+ _context = context;
+ _log = context.logManager().getLog(VMCommSystem.class);
+ _context.statManager().createFrequencyStat("transport.sendMessageFailureFrequency", "How often do we fail to send messages?", "Transport", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
+ _context.statManager().createRateStat("transport.sendMessageSize", "How large are the messages sent?", "Transport", new long[] { 60*1000l, 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
+ _context.statManager().createRateStat("transport.receiveMessageSize", "How large are the messages received?", "Transport", new long[] { 60*1000l, 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
+ _context.statManager().createRateStat("transport.sendProcessingTime", "How long does it take from noticing that we want to send the message to having it completely sent (successfully or failed)?", "Transport", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
+ }
+
+ /**
+ * The router wants us to send the given message to the peer. Do so, or fire
+ * off the failing job.
+ */
+ public void processMessage(OutNetMessage msg) {
+ Hash peer = msg.getTarget().getIdentity().getHash();
+ VMCommSystem peerSys = (VMCommSystem)_commSystemFacades.get(peer);
+
+ long now = _context.clock().now();
+ long sendTime = now - msg.getSendBegin();
+
+ boolean sendSuccessful = false;
+
+ if (peerSys == null) {
+ _context.jobQueue().addJob(msg.getOnFailedSendJob());
+ _context.statManager().updateFrequency("transport.sendMessageFailureFrequency");
+ _context.profileManager().messageFailed(msg.getTarget().getIdentity().getHash(), "vm");
+ } else {
+ _context.jobQueue().addJob(msg.getOnSendJob());
+ _context.profileManager().messageSent(msg.getTarget().getIdentity().getHash(), "vm", sendTime, msg.getMessageSize());
+ _context.statManager().addRateData("transport.sendMessageSize", msg.getMessageSize(), sendTime);
+ peerSys.receive(msg.getMessage().toByteArray(), _context.routerHash());
+ //_context.jobQueue().addJob(new SendJob(peerSys, msg.getMessage(), _context));
+ sendSuccessful = true;
+ }
+
+ if (true) {
+ I2NPMessage dmsg = msg.getMessage();
+ String type = dmsg.getClass().getName();
+ _context.messageHistory().sendMessage(type, dmsg.getUniqueId(), dmsg.getMessageExpiration(), msg.getTarget().getIdentity().getHash(), sendSuccessful);
+ }
+
+ _context.statManager().addRateData("transport.sendProcessingTime", msg.getLifetime(), msg.getLifetime());
+ }
+
+ private class ReceiveJob extends JobImpl {
+ private Hash _from;
+ private byte _msg[];
+ private RouterContext _ctx;
+ public ReceiveJob(Hash from, byte msg[], RouterContext us) {
+ super(us);
+ _ctx = us;
+ _from = from;
+ _msg = msg;
+ // bah, uberspeed!
+ //getTiming().setStartAfter(us.clock().now() + 50);
+ }
+ public void runJob() {
+ I2NPMessageHandler handler = new I2NPMessageHandler(_ctx);
+ try {
+ I2NPMessage msg = handler.readMessage(new ByteArrayInputStream(_msg));
+ int size = _msg.length;
+ InNetMessage inMsg = new InNetMessage();
+ inMsg.setFromRouterHash(_from);
+ inMsg.setMessage(msg);
+ _ctx.profileManager().messageReceived(_from, "vm", 1, size);
+ _ctx.statManager().addRateData("transport.receiveMessageSize", size, 1);
+ _ctx.inNetMessagePool().add(inMsg);
+ } catch (Exception e) {
+ _log.error("wtf, error reading/formatting a VM message?", e);
+ }
+ }
+ public String getName() { return "Receive Message"; }
+ }
+
+ /**
+ * We send messages between comms as bytes so that we strip any router-local
+ * info. For example, a router tags the # attempts to send through a
+ * leaseSet, what type of tunnel a tunnelId is bound to, etc.
+ *
+ */
+ public void receive(byte message[], Hash fromPeer) {
+ _context.jobQueue().addJob(new ReceiveJob(fromPeer, message, _context));
+ }
+
+ public void shutdown() {
+ _commSystemFacades.remove(_context.routerHash());
+ }
+
+ public void startup() {
+ _commSystemFacades.put(_context.routerHash(), this);
+ }
+}
\ No newline at end of file