initial impl (heartbeat ping/pong works, no gui yet, but the stats generated are pretty readable)

This commit is contained in:
jrandom
2004-04-10 04:11:39 +00:00
committed by zzz
parent ee119de6c4
commit f8a47c3c6a
9 changed files with 1633 additions and 0 deletions

View File

@@ -0,0 +1,41 @@
<?xml version="1.0" encoding="UTF-8"?>
<project basedir="." default="all" name="heartbeat">
<target name="all" depends="clean, build" />
<target name="build" depends="builddep, jar" />
<target name="builddep">
<ant dir="../../../core/java/" target="build" />
</target>
<target name="compile">
<mkdir dir="./build" />
<mkdir dir="./build/obj" />
<javac srcdir="./src" debug="true" destdir="./build/obj" classpath="../../../core/java/build/i2p.jar" />
</target>
<target name="jar" depends="compile">
<jar destfile="./build/heartbeat.jar" basedir="./build/obj" includes="**/*.class">
<manifest>
<attribute name="Main-Class" value="net.i2p.heartbeat.Heartbeat" />
<attribute name="Class-Path" value="i2p.jar heartbeat.jar" />
</manifest>
</jar>
</target>
<target name="javadoc">
<mkdir dir="./build" />
<mkdir dir="./build/javadoc" />
<javadoc
sourcepath="./src:../../../core/java/src:../../../core/java/test" destdir="./build/javadoc"
packagenames="*"
use="true"
access="package"
splitindex="true"
windowtitle="I2P heartbeat monitor" />
</target>
<target name="clean">
<delete dir="./build" />
</target>
<target name="cleandep" depends="clean">
<ant dir="../../../core/java/" target="cleandep" />
</target>
<target name="distclean" depends="clean">
<ant dir="../../../core/java/" target="distclean" />
</target>
</project>

View File

@@ -0,0 +1,251 @@
package net.i2p.heartbeat;
import net.i2p.data.Destination;
import net.i2p.data.DataFormatException;
import net.i2p.util.Log;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.StringTokenizer;
/**
* Define the configuration for testing against one particular peer as a client
*
*/
public class ClientConfig {
private static final Log _log = new Log(ClientConfig.class);
private Destination _peer;
private Destination _us;
private String _statFile;
private int _statDuration;
private int _statFrequency;
private int _sendFrequency;
private int _sendSize;
private int _numHops;
private String _comment;
private int _averagePeriods[];
public static final String PROP_PREFIX = "peer.";
public static final String PROP_PEER = ".peer";
public static final String PROP_STATFILE = ".statFile";
public static final String PROP_STATDURATION = ".statDuration";
public static final String PROP_STATFREQUENCY = ".statFrequency";
public static final String PROP_SENDFREQUENCY = ".sendFrequency";
public static final String PROP_SENDSIZE = ".sendSize";
public static final String PROP_COMMENT = ".comment";
public static final String PROP_AVERAGEPERIODS = ".averagePeriods";
public ClientConfig() {
this(null, null, null, -1, -1, -1, -1, 0, null, null);
}
/**
* @param peer who we will test against
* @param us who we are
* @param duration how many minutes to keep events for
* @param statFreq how often to write out stats
* @param sendFreq how often to send pings
* @param sendSize how large the pings should be
* @param numHops how many hops is the current Heartbeat app using
* @param comment describe this test
* @param averagePeriods list of minutes to summarize over
*/
public ClientConfig(Destination peer, Destination us, String statFile, int duration, int statFreq, int sendFreq, int sendSize, int numHops, String comment, int averagePeriods[]) {
_peer = peer;
_us = us;
_statFile = statFile;
_statDuration = duration;
_statFrequency = statFreq;
_sendFrequency = sendFreq;
_sendSize = sendSize;
_numHops = numHops;
_comment = comment;
_averagePeriods = averagePeriods;
}
/** peer to test against */
public Destination getPeer() { return _peer; }
public void setPeer(Destination peer) { _peer = peer; }
/** who we are when we test */
public Destination getUs() { return _us; }
public void setUs(Destination us) { _us = us; }
/** location to write the current stats to */
public String getStatFile() { return _statFile; }
public void setStatFile(String statFile) { _statFile = statFile; }
/** how many minutes of statistics should be maintained within the window for this client? */
public int getStatDuration() { return _statDuration; }
public void setStatDuration(int durationMinutes) { _statDuration = durationMinutes; }
/** how frequenty the stats are written out (in seconds) */
public int getStatFrequency() { return _statFrequency; }
public void setStatFrequency(int freqSeconds) { _statFrequency = freqSeconds; }
/** how frequenty we send messages to the peer (in seconds) */
public int getSendFrequency() { return _sendFrequency; }
public void setSendFrequency(int freqSeconds) { _sendFrequency = freqSeconds; }
/**
* How many bytes should the ping messages be (min values ~700, max ~32KB)?
*
*/
public int getSendSize() { return _sendSize; }
public void setSendSize(int numBytes) { _sendSize = numBytes; }
/**
* Brief 1 line description of the test. Useful comments are along the lines
* of "The peer is located on a fast router and connection with 2 hop tunnels".
*
*/
public String getComment() { return _comment; }
public void setComment(String comment) { _comment = comment; }
/**
* Periods that the client's tests should be averaged over.
*
* @return list of periods (in minutes) that the data should be averaged over, or null
*/
public int[] getAveragePeriods() { return _averagePeriods; }
public void setAveragePeriods(int periods[]) { _averagePeriods = periods; }
/**
* How many hops is this test engine configured to use for its outbound and inbound tunnels?
*
*/
public int getNumHops() { return _numHops; }
public void setNumHops(int numHops) { _numHops = numHops; }
/**
* Load the client config from the properties specified, deriving the current
* config entry from the peer number.
*
* @return true if it was loaded correctly, false if there were errors
*/
public boolean load(Properties clientConfig, int peerNum) {
if ( (clientConfig == null) || (peerNum < 0) ) return false;
String peerVal = clientConfig.getProperty(PROP_PREFIX + peerNum + PROP_PEER);
String statFileVal = clientConfig.getProperty(PROP_PREFIX + peerNum + PROP_STATFILE);
String statDurationVal = clientConfig.getProperty(PROP_PREFIX + peerNum + PROP_STATDURATION);
String statFrequencyVal = clientConfig.getProperty(PROP_PREFIX + peerNum + PROP_STATFREQUENCY);
String sendFrequencyVal = clientConfig.getProperty(PROP_PREFIX + peerNum + PROP_SENDFREQUENCY);
String sendSizeVal = clientConfig.getProperty(PROP_PREFIX + peerNum + PROP_SENDSIZE);
String commentVal = clientConfig.getProperty(PROP_PREFIX + peerNum + PROP_COMMENT);
String periodsVal = clientConfig.getProperty(PROP_PREFIX + peerNum + PROP_AVERAGEPERIODS);
if ( (peerVal == null) || (statFileVal == null) || (statDurationVal == null) ||
(statFrequencyVal == null) || (sendFrequencyVal == null) || (sendSizeVal == null) ) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Peer number "+ peerNum + " does not exist");
return false;
}
try {
int duration = getInt(statDurationVal);
int statFreq = getInt(statFrequencyVal);
int sendFreq = getInt(sendFrequencyVal);
int sendSize = getInt(sendSizeVal);
if ( (duration <= 0) || (statFreq <= 0) || (sendFreq <= 0) || (sendSize <= 0) ) {
if (_log.shouldLog(Log.WARN))
_log.warn("Invalid client config: duration [" + statDurationVal + "] stat frequency [" + statFrequencyVal +
"] send frequency [" + sendFrequencyVal + "] send size [" + sendSizeVal + "]");
return false;
}
statFileVal = statFileVal.trim();
if (statFileVal.length() <= 0) {
if (_log.shouldLog(Log.WARN))
_log.warn("Stat file is blank for peer " + peerNum);
return false;
}
Destination d = new Destination();
d.fromBase64(peerVal);
if (commentVal == null)
commentVal = "";
commentVal = commentVal.trim();
commentVal = commentVal.replace('\n', '_');
List periods = new ArrayList(4);
if (periodsVal != null) {
StringTokenizer tok = new StringTokenizer(periodsVal);
while (tok.hasMoreTokens()) {
String periodVal = tok.nextToken();
int minutes = getInt(periodVal);
if (minutes > 0)
periods.add(new Integer(minutes));
}
}
int avgPeriods[] = new int[periods.size()];
for (int i = 0; i < periods.size(); i++)
avgPeriods[i] = ((Integer)periods.get(i)).intValue();
_comment = commentVal;
_statDuration = duration;
_statFrequency = statFreq;
_sendFrequency = sendFreq;
_sendSize = sendSize;
_statFile = statFileVal;
_peer = d;
_averagePeriods = avgPeriods;
return true;
} catch (DataFormatException dfe) {
_log.error("Peer destination for " + peerNum + " was invalid: " + peerVal);
return false;
}
}
/**
* Store the client config to the properties specified, deriving the current
* config entry from the peer number.
*
* @return true if it was stored correctly, false if there were errors
*/
public boolean store(Properties clientConfig, int peerNum) {
if ( (_peer == null) || (_sendFrequency <= 0) || (_sendSize <= 0) ||
(_statDuration <= 0) || (_statFrequency <= 0) || (_statFile == null) ) {
return false;
}
String comment = _comment;
if (comment == null)
comment = "";
comment = comment.trim();
comment = comment.replace('\n', '_');
StringBuffer buf = new StringBuffer(32);
if (_averagePeriods != null) {
for (int i = 0; i < _averagePeriods.length; i++) {
buf.append(_averagePeriods[i]).append(' ');
}
}
clientConfig.setProperty(PROP_PREFIX + peerNum + PROP_PEER, _peer.toBase64());
clientConfig.setProperty(PROP_PREFIX + peerNum + PROP_STATFILE, _statFile);
clientConfig.setProperty(PROP_PREFIX + peerNum + PROP_STATDURATION, _statDuration + "");
clientConfig.setProperty(PROP_PREFIX + peerNum + PROP_STATFREQUENCY, _statFrequency + "");
clientConfig.setProperty(PROP_PREFIX + peerNum + PROP_SENDFREQUENCY, _sendFrequency + "");
clientConfig.setProperty(PROP_PREFIX + peerNum + PROP_SENDSIZE, _sendSize + "");
clientConfig.setProperty(PROP_PREFIX + peerNum + PROP_COMMENT, comment);
clientConfig.setProperty(PROP_PREFIX + peerNum + PROP_AVERAGEPERIODS, buf.toString());
return true;
}
private static final int getInt(String val) {
if (val == null) return -1;
try {
int i = Integer.parseInt(val);
return i;
} catch (NumberFormatException nfe) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Value [" + val + "] is not a valid integer");
return -1;
}
}
}

View File

@@ -0,0 +1,110 @@
package net.i2p.heartbeat;
import net.i2p.data.Destination;
import net.i2p.util.Clock;
import net.i2p.util.Log;
import net.i2p.util.I2PThread;
/**
* Responsible for actually conducting the tests, coordinating the storing of the
* stats, and the management of the rates. This has its own thread specific for
* pumping data around as well.
*
*/
class ClientEngine {
private static final Log _log = new Log(ClientEngine.class);
/** who can send our pings/ */
private Heartbeat _heartbeat;
/** actual test state */
private PeerData _data;
/** have we been stopped? */
private boolean _active;
/** used to generate engine IDs */
private static int __id = 0;
/** this engine's id, unique to the {test,sendingClient,startTime} */
private int _id;
private static PeerDataWriter writer = new PeerDataWriter();
/**
* Create a new engine that will send its pings through the given heartbeat
* system, and will coordinate the test according to the configuration specified.
*
*/
public ClientEngine(Heartbeat heartbeat, ClientConfig config) {
_heartbeat = heartbeat;
_data = new PeerData(config);
_active = false;
_id = ++__id;
}
/** stop sending any more pings or writing any more state */
public void stopEngine() {
_active = false;
if (_log.shouldLog(Log.INFO))
_log.info("Stopping engine talking to peer " + _data.getConfig().getPeer().calculateHash().toBase64());
}
/** start up the test (this does not block, as it fires up the test thread) */
public void startEngine() {
_active = true;
I2PThread t = new I2PThread(new ClientRunner());
t.setName("HeartbeatClient " + _id);
t.start();
}
/** who are we testing? */
public Destination getPeer() { return _data.getConfig().getPeer(); }
/** what is our series identifier (used to locally identify a test) */
public int getSeriesNum() { return _id; }
/**
* receive notification from the heartbeat system that a pong was received in
* reply to a ping we have sent.
*
* @param sentOn when did we send the ping?
* @param replyOn when did the peer send the pong?
*/
public void receivePong(long sentOn, long replyOn) {
_data.pongReceived(sentOn, replyOn);
}
/** fire off a new ping */
private void doSend() {
long now = Clock.getInstance().now();
_heartbeat.sendPing(_data.getConfig().getPeer(), _id, now, _data.getConfig().getSendSize());
_data.addPing(now);
}
/** our actual heartbeat pumper - this drives the test */
private class ClientRunner implements Runnable {
public void run() {
if (_log.shouldLog(Log.INFO))
_log.info("Starting engine talking to peer " + _data.getConfig().getPeer().calculateHash().toBase64());
// when do we need to send the next PING?
long nextSend = Clock.getInstance().now();
// when do we need to write out the next state data?
long nextWrite = Clock.getInstance().now();
while (_active) {
if (Clock.getInstance().now() >= nextSend) {
doSend();
nextSend = Clock.getInstance().now() + _data.getConfig().getSendFrequency()*1000;
}
if (Clock.getInstance().now() >= nextWrite) {
boolean written = writer.persist(_data);
if (!written) {
if (_log.shouldLog(Log.ERROR))
_log.error("Unable to write the client state data");
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Client state data written");
}
}
_data.cleanup();
try { Thread.sleep(1000); } catch (InterruptedException ie) {}
}
}
}
}

View File

@@ -0,0 +1,233 @@
package net.i2p.heartbeat;
import net.i2p.util.Log;
import net.i2p.util.Clock;
import net.i2p.data.Destination;
import java.io.IOException;
import java.io.File;
import java.io.FileInputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Iterator;
import java.util.Date;
/**
* Main driver for the heartbeat engine, loading 0 or more tests, firing
* up a ClientEngine for each, and serving as a pong server. If there isn't
* a configuration file, or if the configuration file doesn't specify any tests,
* it simply sits around as a pong server, passively responding to whatever is
* sent its way. <p />
*
* The config file format is examplified below:
* <pre>
* # where the router is located (default is localhost)
* i2cpHost=localhost
* # I2CP port for the router (default is 7654)
* i2cpPort=4001
* # How many hops we want the router to put in our tunnels (default is 2)
* numHops=2
* # where our private destination keys are located - if this doesn't exist,
* # a new one will be created and saved there (by default, heartbeat.keys)
* privateDestinationFile=heartbeat_r2.keys
*
* ## peer tests configured below:
*
* # destination peer for test 0
* peer.0.peer=[destination in base64]
* # where will we write out the stat data?
* peer.0.statFile=heartbeatStat_khWY_30s_1kb.txt
* # how many minutes will we keep stats for?
* peer.0.statDuration=30
* # how often will we write out new stat data (in seconds)?
* peer.0.statFrequency=60
* # how often will we send a ping to the peer (in seconds)?
* peer.0.sendFrequency=30
* # how many bytes will be included in the ping?
* peer.0.sendSize=1024
* # take a guess...
* peer.0.comment=Test with localhost sending 1KB of data every 30 seconds
* # we can keep track of a few moving averages - this value includes a whitespace
* # delimited list of numbers, each specifying a period to calculate the average
* # over (in minutes)
* peer.0.averagePeriods=1 5 30
* ## repeat the peer.0.* for as many tests as desired, incrementing as necessary
* </pre>
*
*/
public class Heartbeat {
private static final Log _log = new Log(Heartbeat.class);
/** location containing this heartbeat's config */
private String _configFile;
/** clientNum (Integer) to ClientConfig mapping */
private Map _clientConfigs;
/** series num (Integer) to ClientEngine mapping */
private Map _clientEngines;
/** helper class for managing our I2P send/receive and message formatting */
private I2PAdapter _adapter;
/** our own callback that the I2PAdapter notifies on ping or pong messages */
private PingPongAdapter _eventAdapter;
/** if there are no command line arguments, load the config from "heartbeat.config" */
public static final String CONFIG_FILE_DEFAULT = "heartbeat.config";
/** build up a new heartbeat manager, but don't actually do anything */
public Heartbeat(String configFile) {
_configFile = configFile;
_clientConfigs = new HashMap();
_clientEngines = new HashMap();
_eventAdapter = new PingPongAdapter();
_adapter = new I2PAdapter();
_adapter.setListener(_eventAdapter);
}
private Heartbeat() {}
/** load up the config data (but don't build any engines or start them up) */
public void loadConfig() {
Properties props = new Properties();
FileInputStream fin = null;
File configFile = new File (_configFile);
if (configFile.exists()) {
try {
fin = new FileInputStream(_configFile);
props.load(fin);
} catch (IOException ioe) {
if (_log.shouldLog(Log.ERROR))
_log.error("Error reading the config data", ioe);
} finally {
if (fin != null) try { fin.close(); } catch (IOException ioe) {}
}
}
loadBaseConfig(props);
loadClientConfigs(props);
}
/**
* send a ping message to the peer
*
* @param peer peer to ping
* @param seriesNum id used to keep track of multiple pings (of different size/frequency) to a peer
* @param now current time to be sent in the ping (so we can watch for it in the pong)
* @param size total message size to send
*/
void sendPing(Destination peer, int seriesNum, long now, int size) {
if (_adapter.getIsConnected())
_adapter.sendPing(peer, seriesNum, now, size);
}
/** load up the base data (I2CP config, etc) */
private void loadBaseConfig(Properties props) {
_adapter.loadConfig(props);
}
/** load up all of the test config data */
private void loadClientConfigs(Properties props) {
int i = 0;
while (true) {
ClientConfig config = new ClientConfig();
if (!config.load(props, i))
break;
_clientConfigs.put(new Integer(i), config);
i++;
}
}
/** connect to the network */
private void connect() {
boolean connected = _adapter.connect();
if (!connected)
_log.error("Unable to connect to the router");
}
/** disconnect from the network */
private void disconnect() {
_adapter.disconnect();
}
/** start up all of the tests */
public void startEngines() {
for (Iterator iter = _clientConfigs.values().iterator(); iter.hasNext(); ) {
ClientConfig config = (ClientConfig)iter.next();
ClientEngine engine = new ClientEngine(this, config);
config.setUs(_adapter.getLocalDestination());
config.setNumHops(_adapter.getNumHops());
_clientEngines.put(new Integer(engine.getSeriesNum()), engine);
engine.startEngine();
}
}
/** stop all of the tests */
public void stopEngines() {
for (Iterator iter = _clientEngines.values().iterator(); iter.hasNext(); ) {
ClientEngine engine = (ClientEngine)iter.next();
engine.stopEngine();
}
_clientEngines.clear();
}
/**
* Fire up a new heartbeat system, waiting until, well, forever. Builds
* a new heartbeat system, loads the config, connects to the network, starts
* the engines, and then sits back and relaxes, responding to any pings and
* running any tests. <p />
*
* <code> <b>Usage: </b> Heartbeat [<i>configFileName</i>]</code> <p />
*/
public static void main(String args[]) {
String configFile = CONFIG_FILE_DEFAULT;
if (args.length == 1)
configFile = args[0];
if (_log.shouldLog(Log.INFO))
_log.info("Starting up with config file " + configFile);
Heartbeat heartbeat = new Heartbeat(configFile);
heartbeat.loadConfig();
heartbeat.connect();
heartbeat.startEngines();
Object o = new Object();
while (true) {
try {
synchronized (o) {
o.wait();
}
} catch (InterruptedException ie) {}
}
}
/**
* Receive event notification from the I2PAdapter
*
*/
private class PingPongAdapter implements I2PAdapter.PingPongEventListener {
/**
* We were pinged, so always just send a pong back.
*
* @param from who sent us the ping?
* @param seriesNum what series did the sender specify?
* @param sentOn when did the sender say they sent their ping?
* @param data arbitrary payload data
*/
public void receivePing(Destination from, int seriesNum, Date sentOn, byte[] data) {
if (_adapter.getIsConnected())
_adapter.sendPong(from, seriesNum, sentOn, data);
}
/**
* We received a pong, so find the right client engine and tell it about the pong.
*
* @param from who sent us the pong
* @param seriesNum our client ID
* @param sentOn when did we send the ping?
* @param replyOn when did they send their pong?
* @param data the arbitrary data we sent in the ping (that they sent back in the pong)
*/
public void receivePong(Destination from, int seriesNum, Date sentOn, Date replyOn, byte[] data) {
ClientEngine engine = (ClientEngine)_clientEngines.get(new Integer(seriesNum));
if (engine.getPeer().equals(from))
engine.receivePong(sentOn.getTime(), replyOn.getTime());
}
}
}

View File

@@ -0,0 +1,462 @@
package net.i2p.heartbeat;
import java.io.ByteArrayOutputStream;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Properties;
import java.util.Date;
import java.util.Arrays;
import net.i2p.data.DataFormatException;
import net.i2p.data.DataHelper;
import net.i2p.client.I2PClientFactory;
import net.i2p.client.I2PClient;
import net.i2p.client.I2PSession;
import net.i2p.client.I2PSessionException;
import net.i2p.client.I2PSessionListener;
import net.i2p.I2PException;
import net.i2p.data.Destination;
import net.i2p.util.Log;
import net.i2p.util.Clock;
/**
* Tie-in to the I2P SDK for the Heartbeat system, talking to the I2PSession and
* dealing with the raw ping and pong messages.
*
*/
class I2PAdapter {
private final static Log _log = new Log(I2PAdapter.class);
/** I2CP host */
private String _i2cpHost;
/** I2CP port */
private int _i2cpPort;
/** how long do we want our tunnels to be? */
private int _numHops;
/** filename containing the heartbeat engine's private destination info */
private String _privateDestFile;
/** our destination */
private Destination _localDest;
/** who do we tell? */
private PingPongEventListener _listener;
/** how do we talk to the router */
private I2PSession _session;
/** object that receives our i2cp notifications from the session and tells us */
private I2PListener _i2pListener;
/**
* This config property tells us where the private destination data for our
* connection (or if it doesn't exist, where will we save it)
*/
private static final String DEST_FILE_PROP = "privateDestinationFile";
/** by default, the private destination data is in "heartbeat.keys" */
private static final String DEST_FILE_DEFAULT = "heartbeat.keys";
/** This config property defines where the I2P router is */
private static final String I2CP_HOST_PROP = "i2cpHost";
/** by default, the I2P host is "localhost" */
private static final String I2CP_HOST_DEFAULT = "localhost";
/** This config property defines the I2CP port on the router */
private static final String I2CP_PORT_PROP = "i2cpPort";
/** by default, the I2CP port is 7654 */
private static final int I2CP_PORT_DEFAULT = 7654;
/** This property defines how many hops we want in our tunnels. */
public static final String NUMHOPS_PROP = "numHops";
/** by default, use 2 hop tunnels */
public static final int NUMHOPS_DEFAULT = 2;
public I2PAdapter() {
_privateDestFile = null;
_i2cpHost = null;
_i2cpPort = -1;
_localDest = null;
_listener = null;
_session = null;
_numHops = 0;
}
/** who are we? */
public Destination getLocalDestination() { return _localDest; }
/** who gets notified when we receive a ping or a pong? */
public PingPongEventListener getListener() { return _listener; }
public void setListener(PingPongEventListener listener) { _listener = listener; }
/** how many hops do we want in our tunnels? */
public int getNumHops() { return _numHops; }
/** are we connected? */
public boolean getIsConnected() { return _session != null; }
/**
* Read in all of the config data
*
*/
void loadConfig(Properties props) {
String privDestFile = props.getProperty(DEST_FILE_PROP, DEST_FILE_DEFAULT);
String host = props.getProperty(I2CP_HOST_PROP, I2CP_HOST_DEFAULT);
String port = props.getProperty(I2CP_PORT_PROP, ""+I2CP_PORT_DEFAULT);
String numHops = props.getProperty(NUMHOPS_PROP, ""+NUMHOPS_DEFAULT);
int portNum = -1;
try {
portNum = Integer.parseInt(port);
} catch (NumberFormatException nfe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Invalid I2CP port specified [" + port + "]");
portNum = I2CP_PORT_DEFAULT;
}
int hops = -1;
try {
hops = Integer.parseInt(numHops);
} catch (NumberFormatException nfe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Invalid # hops specified [" + numHops + "]");
hops = NUMHOPS_DEFAULT;
}
_numHops = hops;
_privateDestFile = privDestFile;
_i2cpHost = host;
_i2cpPort = portNum;
}
/** write out the config to the props */
void storeConfig(Properties props) {
if (_privateDestFile != null)
props.setProperty(DEST_FILE_PROP, _privateDestFile);
else
props.setProperty(DEST_FILE_PROP, DEST_FILE_DEFAULT);
if (_i2cpHost != null)
props.setProperty(I2CP_HOST_PROP, _i2cpHost);
else
props.setProperty(I2CP_HOST_PROP, I2CP_HOST_DEFAULT);
if (_i2cpPort > 0)
props.setProperty(I2CP_PORT_PROP, ""+_i2cpPort);
else
props.setProperty(I2CP_PORT_PROP, ""+I2CP_PORT_DEFAULT);
props.setProperty(NUMHOPS_PROP, ""+_numHops);
}
private static final int TYPE_PING = 0;
private static final int TYPE_PONG = 1;
/**
* send a ping message to the peer
*
* @param peer peer to ping
* @param seriesNum id used to keep track of multiple pings (of different size/frequency) to a peer
* @param now current time to be sent in the ping (so we can watch for it in the pong)
* @param size total message size to send
*
* @throws IllegalStateException if we are not connected to the router
*/
public void sendPing(Destination peer, int seriesNum, long now, int size) {
if (_session == null) throw new IllegalStateException("Not connected to the router");
ByteArrayOutputStream baos = new ByteArrayOutputStream(size);
try {
_localDest.writeBytes(baos);
DataHelper.writeLong(baos, 2, seriesNum);
DataHelper.writeLong(baos, 1, TYPE_PING);
DataHelper.writeDate(baos, new Date(now));
int padding = size - baos.size();
byte paddingData[] = new byte[padding];
Arrays.fill(paddingData, (byte)0x2A);
DataHelper.writeLong(baos, 2, padding);
baos.write(paddingData);
boolean sent = _session.sendMessage(peer, baos.toByteArray());
if (!sent) {
if (_log.shouldLog(Log.ERROR))
_log.error("Error sending the ping to " + peer.calculateHash().toBase64() + " for series " + seriesNum);
} else {
if (_log.shouldLog(Log.INFO))
_log.info("Ping sent to " + peer.calculateHash().toBase64() + " for series " + seriesNum);
}
} catch (IOException ioe) {
if (_log.shouldLog(Log.ERROR))
_log.error("Error sending the ping", ioe);
} catch (DataFormatException dfe) {
if (_log.shouldLog(Log.ERROR))
_log.error("Error writing out the ping message", dfe);
} catch (I2PSessionException ise) {
if (_log.shouldLog(Log.ERROR))
_log.error("Error writing out the ping message", ise);
}
}
/**
* send a pong message to the peer
*
* @param peer peer to pong
* @param seriesNum id given to us in the ping
* @param sentOn date the peer said they sent us the message
* @param data payload the peer sent us in the ping
*
* @throws IllegalStateException if we are not connected to the router
*/
public void sendPong(Destination peer, int seriesNum, Date sentOn, byte data[]) {
if (_session == null) throw new IllegalStateException("Not connected to the router");
ByteArrayOutputStream baos = new ByteArrayOutputStream(data.length + 768);
try {
_localDest.writeBytes(baos);
DataHelper.writeLong(baos, 2, seriesNum);
DataHelper.writeLong(baos, 1, TYPE_PONG);
DataHelper.writeDate(baos, sentOn);
DataHelper.writeDate(baos, new Date(Clock.getInstance().now()));
DataHelper.writeLong(baos, 2, data.length);
baos.write(data);
boolean sent = _session.sendMessage(peer, baos.toByteArray());
if (!sent) {
if (_log.shouldLog(Log.ERROR))
_log.error("Error sending the pong to " + peer.calculateHash().toBase64() + " for series " + seriesNum + " which was sent on " + sentOn);
} else {
if (_log.shouldLog(Log.INFO))
_log.info("Pong sent to " + peer.calculateHash().toBase64() + " for series " + seriesNum + " which was sent on " + sentOn);
}
} catch (IOException ioe) {
if (_log.shouldLog(Log.ERROR))
_log.error("Error sending the ping", ioe);
} catch (DataFormatException dfe) {
if (_log.shouldLog(Log.ERROR))
_log.error("Error writing out the pong message", dfe);
} catch (I2PSessionException ise) {
if (_log.shouldLog(Log.ERROR))
_log.error("Error writing out the pong message", ise);
}
}
/**
* We've received this data from I2P - parse it into a ping or a pong
* and notify accordingly
*/
private void handleMessage(byte data[]) {
ByteArrayInputStream bais = new ByteArrayInputStream(data);
try {
Destination from = new Destination();
from.readBytes(bais);
int series = (int)DataHelper.readLong(bais, 2);
long type = DataHelper.readLong(bais, 1);
Date sentOn = DataHelper.readDate(bais);
Date receivedOn = null;
if (type == TYPE_PONG) {
receivedOn = DataHelper.readDate(bais);
}
int size = (int)DataHelper.readLong(bais, 2);
byte payload[] = new byte[size];
int read = DataHelper.read(bais, payload);
if (read != size)
throw new IOException("Malformed payload - read " + read + " instead of " + size);
if (_listener == null) {
if (_log.shouldLog(Log.ERROR))
_log.error("Listener isn't set, but we received a valid message of type " + type + " sent from " + from.calculateHash().toBase64());
return;
}
if (type == TYPE_PING) {
if (_log.shouldLog(Log.INFO))
_log.info("Ping received from " + from.calculateHash().toBase64() + " on series " + series + " sent on " + sentOn + " containing " + size + " bytes");
_listener.receivePing(from, series, sentOn, payload);
} else if (type == TYPE_PONG) {
if (_log.shouldLog(Log.INFO))
_log.info("Pong received from " + from.calculateHash().toBase64() + " on series " + series + " sent on " + sentOn + " with pong sent on " + receivedOn + " containing " + size + " bytes");
_listener.receivePong(from, series, sentOn, receivedOn, payload);
} else {
throw new IOException("Invalid message type " + type);
}
} catch (IOException ioe) {
if (_log.shouldLog(Log.ERROR))
_log.error("Error handling the message", ioe);
} catch (DataFormatException dfe) {
if (_log.shouldLog(Log.ERROR))
_log.error("Error parsing the message", dfe);
}
}
/**
* connect to the I2P router and either authenticate ourselves with the
* destination we're given, or create a new one and write that to the
* destination file.
*
* @return true if we connect successfully, false otherwise
*/
boolean connect() {
I2PClient client = I2PClientFactory.createClient();
Destination us = null;
File destFile = new File(_privateDestFile);
us = verifyDestination(client, destFile);
if (us == null) return false;
// if we're here, we got a destination. lets connect
FileInputStream fin = null;
try {
fin = new FileInputStream(destFile);
Properties options = getOptions();
I2PSession session = client.createSession(fin, options);
I2PListener lsnr = new I2PListener();
session.setSessionListener(lsnr);
session.connect();
_localDest = session.getMyDestination();
if (_log.shouldLog(Log.INFO))
_log.info("I2CP Session created and connected as " + _localDest.calculateHash().toBase64());
_session = session;
_i2pListener = lsnr;
} catch (I2PSessionException ise) {
if (_log.shouldLog(Log.ERROR))
_log.error("Error connecting", ise);
return false;
} catch (IOException ioe) {
if (_log.shouldLog(Log.ERROR))
_log.error("Error loading the destionation", ioe);
return false;
} finally {
if (fin != null) try { fin.close(); } catch (IOException ioe) {}
}
return true;
}
/**
* load, verify, or create a destination
*
* @return the destination loaded, or null if there was an error
*/
private Destination verifyDestination(I2PClient client, File destFile) {
Destination us = null;
FileInputStream fin = null;
if (destFile.exists()) {
try {
fin = new FileInputStream(destFile);
us = new Destination();
us.readBytes(fin);
if (_log.shouldLog(Log.INFO))
_log.info("Existing destination loaded: [" + us.toBase64() + "]");
} catch (IOException ioe) {
if (fin != null) try { fin.close(); } catch (IOException ioe2) {}
fin = null;
destFile.delete();
us = null;
} catch (DataFormatException dfe) {
if (fin != null) try { fin.close(); } catch (IOException ioe2) {}
fin = null;
destFile.delete();
us = null;
} finally {
if (fin != null) try { fin.close(); } catch (IOException ioe2) {}
fin = null;
}
}
if (us == null) {
// need to create a new one
FileOutputStream fos = null;
try {
fos = new FileOutputStream(destFile);
us = client.createDestination(fos);
if (_log.shouldLog(Log.INFO))
_log.info("New destination created: [" + us.toBase64() + "]");
} catch (IOException ioe) {
if (_log.shouldLog(Log.ERROR))
_log.error("Error writing out the destination keys being created", ioe);
return null;
} catch (I2PException ie) {
if (_log.shouldLog(Log.ERROR))
_log.error("Error creating the destination", ie);
return null;
} finally {
if (fos != null) try { fos.close(); } catch (IOException ioe) {}
}
}
return us;
}
/**
* I2PSession connect options
*/
private Properties getOptions() {
Properties props = new Properties();
props.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_BEST_EFFORT);
props.setProperty(I2PClient.PROP_TCP_HOST, _i2cpHost);
props.setProperty(I2PClient.PROP_TCP_PORT, _i2cpPort + "");
props.setProperty("tunnels.depthInbound", ""+_numHops);
props.setProperty("tunnels.depthOutbound", ""+_numHops);
return props;
}
/** disconnect from the I2P router */
void disconnect() {
if (_session != null) {
try {
_session.destroySession();
} catch (I2PSessionException ise) {
if (_log.shouldLog(Log.ERROR))
_log.error("Error destroying the session", ise);
}
_session = null;
}
}
/**
* Defines an event notification system for receiving pings and pongs
*
*/
public interface PingPongEventListener {
/**
* receive a ping message from the peer
*
* @param from peer that sent us the ping
* @param seriesNum id the peer sent us in the ping
* @param sentOn date the peer said they sent us the message
* @param data payload from the ping
*/
void receivePing(Destination from, int seriesNum, Date sentOn, byte data[]);
/**
* receive a pong message from the peer
*
* @param from peer that sent us the pong
* @param seriesNum id the peer sent us in the pong (that we sent them in the ping)
* @param sentOn when we sent out the ping
* @param replyOn when they sent out the pong
* @param data payload from the ping/pong
*/
void receivePong(Destination from, int seriesNum, Date sentOn, Date replyOn, byte data[]);
}
/**
* Receive data from the session and pass it along to handleMessage for parsing/dispersal
*
*/
private class I2PListener implements I2PSessionListener {
public void disconnected(I2PSession session) {
if (_log.shouldLog(Log.ERROR))
_log.error("Session disconnected");
disconnect();
}
public void errorOccurred(I2PSession session, String message, Throwable error) {
if (_log.shouldLog(Log.ERROR))
_log.error("Error occurred", error);
}
public void reportAbuse(I2PSession session, int severity) {
if (_log.shouldLog(Log.ERROR))
_log.error("Abuse reported");
}
public void messageAvailable(I2PSession session, int msgId, long size) {
try {
byte data[] = session.receiveMessage(msgId);
handleMessage(data);
} catch (I2PSessionException ise) {
if (_log.shouldLog(Log.ERROR))
_log.error("Error receiving the message", ise);
disconnect();
}
}
}
}

View File

@@ -0,0 +1,268 @@
package net.i2p.heartbeat;
import net.i2p.util.Clock;
import net.i2p.util.Log;
import net.i2p.stat.Rate;
import net.i2p.stat.RateStat;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
/**
* Contain the current window of data for a particular series of ping/pong stats
* sent to a peer. This should be periodically kept clean by calling cleanup()
* to timeout expired pings and to drop data outside the window.
*
*/
public class PeerData {
private final static Log _log = new Log(PeerData.class);
/** peer / sequence / config in this data series */
private ClientConfig _peer;
/** date sent (Long) to EventDataPoint containing the datapoints sent in the current period */
private Map _dataPoints;
/** date sent (Long) to EventDataPoint containing pings that haven't yet timed out or been ponged */
private Map _pendingPings;
private long _sessionStart;
private long _lifetimeSent;
private long _lifetimeReceived;
/** rate averaging the time to send over a variety of periods */
private RateStat _sendRate;
/** rate averaging the time to receive over a variety of periods */
private RateStat _receiveRate;
/** rate averaging the frequency of lost messages over a variety of periods */
private RateStat _lostRate;
/** how long we wait before timing out pending pings (30 seconds) */
private static final long TIMEOUT_PERIOD = 30*1000;
/** synchronize on this when updating _dataPoints or _pendingPings */
private Object _updateLock = new Object();
public PeerData(ClientConfig config) {
_peer = config;
_dataPoints = new TreeMap();
_pendingPings = new TreeMap();
_sessionStart = Clock.getInstance().now();
_lifetimeSent = 0;
_lifetimeReceived = 0;
_sendRate = new RateStat("sendRate", "How long it takes to send", "peer", getPeriods(config.getAveragePeriods()));
_receiveRate = new RateStat("receiveRate", "How long it takes to receive", "peer", getPeriods(config.getAveragePeriods()));
_lostRate = new RateStat("lostRate", "How frequently we lose messages", "peer", getPeriods(config.getAveragePeriods()));
}
/** turn the periods (# minutes) into rate periods (# milliseconds) */
private static long[] getPeriods(int periods[]) {
long rv[] = null;
if (periods == null) periods = new int[0];
rv = new long[periods.length];
for (int i = 0; i < periods.length; i++)
rv[i] = (long)periods[i] * 60*1000; // they're in minutes
Arrays.sort(rv);
return rv;
}
/** how many pings are still outstanding? */
public int getPendingCount() { synchronized (_updateLock) { return _pendingPings.size(); } }
/** how many data points are available in the current window? */
public int getDataPointCount() { synchronized (_updateLock) { return _dataPoints.size(); } }
/** when did this test begin? */
public long getSessionStart() { return _sessionStart; }
/** how many pings have we sent for this test? */
public long getLifetimeSent() { return _lifetimeSent; }
/** how many pongs have we received for this test? */
public long getLifetimeReceived() { return _lifetimeReceived; }
public ClientConfig getConfig() { return _peer; }
/**
* What periods are we averaging the data over (in minutes)?
*/
public int[] getAveragePeriods() { return (_peer.getAveragePeriods() != null ? _peer.getAveragePeriods() : new int[0]); }
/**
* average time to send over the given period.
*
* @param period number of minutes to retrieve the average for
* @return milliseconds average, or -1 if we dont track that period
*/
public double getAverageSendTime(int period) { return getAverage(_sendRate, period); }
/**
* average time to receive over the given period.
*
* @param period number of minutes to retrieve the average for
* @return milliseconds average, or -1 if we dont track that period
*/
public double getAverageReceiveTime(int period) { return getAverage(_receiveRate, period); }
/**
* number of lost messages over the given period.
*
* @param period number of minutes to retrieve the average for
* @return number of lost messages in the period, or -1 if we dont track that period
*/
public double getLostMessages(int period) {
Rate rate = _lostRate.getRate(period * 60*1000);
if (rate == null)
return -1;
return rate.getCurrentTotalValue();
}
private double getAverage(RateStat stat, int period) {
Rate rate = stat.getRate(period * 60*1000);
if (rate == null)
return -1;
return rate.getAverageValue();
}
/**
* Return an ordered list of data points in the current window (after doing a cleanup)
*
* @return list of EventDataPoint objects
*/
public List getDataPoints() {
cleanup();
synchronized (_updateLock) {
return new ArrayList(_dataPoints.values());
}
}
/**
* We have sent the peer a ping on this series (using the send time as given)
*
*/
public void addPing(long dateSent) {
EventDataPoint sent = new EventDataPoint(dateSent);
synchronized (_updateLock) {
_pendingPings.put(new Long(dateSent), sent);
}
_lifetimeSent++;
}
/**
* we have received a pong from the peer on this series
*
* @param dateSent when we sent the ping
* @param pongSent when the peer received the ping and sent the pong
*/
public void pongReceived(long dateSent, long pongSent) {
long now = Clock.getInstance().now();
synchronized (_updateLock) {
EventDataPoint data = (EventDataPoint)_pendingPings.remove(new Long(dateSent));
if (data != null) {
data.setPongReceived(now);
data.setPongSent(pongSent);
data.setWasPonged(true);
_dataPoints.put(new Long(dateSent), data);
}
}
_sendRate.addData(pongSent-dateSent, 0);
_receiveRate.addData(now-pongSent, 0);
_lifetimeReceived++;
}
/**
* drop all datapoints outside the window we're watching, and timeout all
* pending pings not ponged in the TIMEOUT_PERIOD, both updating the lost message
* rate and coallescing all of the rates.
*
*/
public void cleanup() {
long dropBefore = Clock.getInstance().now() - _peer.getStatDuration() * 60*1000;
long timeoutBefore = Clock.getInstance().now() - TIMEOUT_PERIOD;
long numDropped = 0;
long numTimedOut = 0;
synchronized (_updateLock) {
List toTimeout = new ArrayList(4);
List toDrop = new ArrayList(4);
for (Iterator iter = _pendingPings.keySet().iterator(); iter.hasNext(); ) {
Long when = (Long)iter.next();
if (when.longValue() < dropBefore)
toDrop.add(when);
else if (when.longValue() < timeoutBefore)
toTimeout.add(when);
else
break; // its ordered, so once we are past timeoutBefore, no need
}
for (Iterator iter = toDrop.iterator(); iter.hasNext(); ) {
_pendingPings.remove(iter.next());
}
List toAdd = new ArrayList(toTimeout.size());
for (Iterator iter = toTimeout.iterator(); iter.hasNext(); ) {
Long when = (Long)iter.next();
EventDataPoint data = (EventDataPoint)_pendingPings.remove(when);
data.setWasPonged(false);
toAdd.add(data);
}
numDropped = toDrop.size();
numTimedOut = toDrop.size();
toDrop.clear();
for (Iterator iter = _dataPoints.keySet().iterator(); iter.hasNext(); ) {
Long when = (Long)iter.next();
if (when.longValue() < dropBefore)
toDrop.add(when);
else
break; // ordered
}
for (Iterator iter = toDrop.iterator(); iter.hasNext(); ) {
_dataPoints.remove(iter.next());
}
numDropped += toDrop.size();
for (Iterator iter = toAdd.iterator(); iter.hasNext(); ) {
EventDataPoint data = (EventDataPoint)iter.next();
_dataPoints.put(new Long(data.getPingSent()), data);
}
numTimedOut += toAdd.size();
}
_lostRate.addData(numTimedOut, 0);
_receiveRate.coallesceStats();
_sendRate.coallesceStats();
_lostRate.coallesceStats();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Peer data cleaned up " + numTimedOut + " timed out pings and removed " + numDropped + " old entries");
}
/** actual data point for the peer */
public class EventDataPoint {
private boolean _wasPonged;
private long _pingSent;
private long _pongSent;
private long _pongReceived;
public EventDataPoint() {
this(-1);
}
public EventDataPoint(long pingSentOn) {
_wasPonged = false;
_pingSent = pingSentOn;
_pongSent = -1;
_pongReceived = -1;
}
/** when did we send this ping? */
public long getPingSent() { return _pingSent; }
public void setPingSent(long when) { _pingSent = when; }
/** when did the peer receive the ping? */
public long getPongSent() { return _pongSent; }
public void setPongSent(long when) { _pongSent = when; }
/** when did we receive the peer's pong? */
public long getPongReceived() { return _pongReceived; }
public void setPongReceived(long when) { _pongReceived = when; }
/** did the peer reply in time? */
public boolean getWasPonged() { return _wasPonged; }
public void setWasPonged(boolean pong) { _wasPonged = pong; }
}
}

View File

@@ -0,0 +1,107 @@
package net.i2p.heartbeat;
import net.i2p.util.Log;
import net.i2p.util.Clock;
import java.io.IOException;
import java.io.File;
import java.io.FileOutputStream;
import java.text.SimpleDateFormat;
import java.text.DecimalFormat;
import java.text.DecimalFormatSymbols;
import java.util.Locale;
import java.util.Date;
import java.util.Iterator;
/**
* Actually write out the stats for peer test
*
*/
class PeerDataWriter {
private final static Log _log = new Log(PeerDataWriter.class);
/**
* persist the peer state to the location specified in the peer config
*
* @return true if it was persisted correctly, false on error
*/
public boolean persist(PeerData data) {
String filename = data.getConfig().getStatFile();
String header = getHeader(data);
File statFile = new File(filename);
FileOutputStream fos = null;
try {
fos = new FileOutputStream(statFile);
fos.write(header.getBytes());
fos.write("#action\tstatus\tdate and time sent \tsendMs\treplyMs\n".getBytes());
for (Iterator iter = data.getDataPoints().iterator(); iter.hasNext(); ) {
PeerData.EventDataPoint point = (PeerData.EventDataPoint)iter.next();
String line = getEvent(point);
fos.write(line.getBytes());
}
} catch (IOException ioe) {
if (_log.shouldLog(Log.ERROR))
_log.error("Error persisting the peer data for " + data.getConfig().getPeer().calculateHash().toBase64(), ioe);
return false;
} finally {
if (fos != null) try { fos.close(); } catch (IOException ioe) {}
}
return true;
}
private String getHeader(PeerData data) {
StringBuffer buf = new StringBuffer(1024);
buf.append("peer \t").append(data.getConfig().getPeer().calculateHash().toBase64()).append('\n');
buf.append("local \t").append(data.getConfig().getUs().calculateHash().toBase64()).append('\n');
buf.append("peerDest \t").append(data.getConfig().getPeer().toBase64()).append('\n');
buf.append("localDest \t").append(data.getConfig().getUs().toBase64()).append('\n');
buf.append("numTunnelHops\t").append(data.getConfig().getNumHops()).append('\n');
buf.append("comment \t").append(data.getConfig().getComment()).append('\n');
buf.append("sendFrequency\t").append(data.getConfig().getSendFrequency()).append('\n');
buf.append("sendSize \t").append(data.getConfig().getSendSize()).append('\n');
buf.append("sessionStart \t").append(getTime(data.getSessionStart())).append('\n');
buf.append("currentTime \t").append(getTime(Clock.getInstance().now())).append('\n');
buf.append("numPending \t").append(data.getPendingCount()).append('\n');
buf.append("lifetimeSent \t").append(data.getLifetimeSent()).append('\n');
buf.append("lifetimeRecv \t").append(data.getLifetimeReceived()).append('\n');
int periods[] = data.getAveragePeriods();
buf.append("#averages\tminutes\tsendMs\trecvMs\tnumLost\n");
for (int i = 0; i < periods.length; i++) {
buf.append("periodAverage\t").append(periods[i]).append('\t');
buf.append(getNum(data.getAverageSendTime(periods[i]))).append('\t');
buf.append(getNum(data.getAverageReceiveTime(periods[i]))).append('\t');
buf.append(getNum(data.getLostMessages(periods[i]))).append('\n');
}
return buf.toString();
}
private String getEvent(PeerData.EventDataPoint point) {
StringBuffer buf = new StringBuffer(128);
buf.append("EVENT\t");
if (point.getWasPonged())
buf.append("OK\t");
else
buf.append("LOST\t");
buf.append(getTime(point.getPingSent())).append('\t');
if (point.getWasPonged()) {
buf.append(point.getPongSent() - point.getPingSent()).append('\t');
buf.append(point.getPongReceived() - point.getPongSent()).append('\t');
}
buf.append('\n');
return buf.toString();
}
private static final SimpleDateFormat _fmt = new SimpleDateFormat("yyyyMMdd.HH:mm:ss.SSS", Locale.UK);
public String getTime(long when) {
synchronized (_fmt) {
return _fmt.format(new Date(when));
}
}
private static final DecimalFormat _numFmt = new DecimalFormat("#0", new DecimalFormatSymbols(Locale.UK));
public String getNum(double val) {
synchronized (_numFmt) {
return _numFmt.format(val);
}
}
}