die phttp die

This commit is contained in:
jrandom
2004-09-26 15:32:24 +00:00
committed by zzz
parent b67b243ebd
commit 0f54ba59fb
3 changed files with 0 additions and 825 deletions

View File

@@ -1,240 +0,0 @@
package net.i2p.router.transport.phttp;
/*
* free (adj.): unencumbered; not under the control of others
* Written by jrandom in 2003 and released into the public domain
* with no warranty of any kind, either expressed or implied.
* It probably won't make your computer catch on fire, or eat
* your children, but it might. Use at your own risk.
*
*/
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Date;
import net.i2p.data.DataFormatException;
import net.i2p.data.DataHelper;
import net.i2p.data.Signature;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.data.i2np.I2NPMessageException;
import net.i2p.data.i2np.I2NPMessageHandler;
import net.i2p.router.RouterContext;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
class PHTTPPoller {
private Log _log;
private PHTTPTransport _transport;
private URL _pollURL;
private Poller _poller;
private RouterContext _context;
private boolean _polling;
public PHTTPPoller(RouterContext context, PHTTPTransport transport) {
_context = context;
_log = context.logManager().getLog(PHTTPPoller.class);
_transport = transport;
_pollURL = null;
_poller = new Poller();
_polling = false;
}
public synchronized void startPolling() {
if (_polling) return;
_polling = true;
try {
_pollURL = new URL(_transport.getMyPollURL());
} catch (MalformedURLException mue) {
_log.error("Invalid polling URL [" + _transport.getMyPollURL() + "]", mue);
return;
}
Thread t = new I2PThread(_poller);
t.setName("HTTP Poller");
t.setDaemon(true);
t.setPriority(I2PThread.MIN_PRIORITY);
t.start();
}
public void stopPolling() {
_poller.stopPolling();
}
private byte[] getAuthData() {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream(4);
long nonce = _context.random().nextInt(Integer.MAX_VALUE);
_log.debug("Creating nonce with value [" + nonce + "]");
DataHelper.writeLong(baos, 4, nonce);
byte nonceData[] = baos.toByteArray();
Signature sig = _context.dsa().sign(nonceData, _transport.getMySigningKey());
baos = new ByteArrayOutputStream(512);
DataHelper.writeLong(baos, 4, nonce);
sig.writeBytes(baos);
byte data[] = baos.toByteArray();
return data;
} catch (NumberFormatException nfe) {
_log.error("Error writing the authentication data", nfe);
return null;
} catch (DataFormatException dfe) {
_log.error("Error formatting the authentication data", dfe);
return null;
} catch (IOException ioe) {
_log.error("Error writing the authentication data", ioe);
return null;
}
}
public final static String CONFIG_POLL = "i2np.phttp.shouldPoll";
public final static boolean DEFAULT_POLL = false;
boolean shouldRejectMessages() {
String val = _context.router().getConfigSetting(CONFIG_POLL);
if (null == val) {
return !DEFAULT_POLL;
} else {
return !("true".equals(val));
}
}
class Poller implements Runnable {
private boolean _running;
private I2NPMessageHandler _handler = new I2NPMessageHandler(_context);
public void run() {
_running = true;
// wait 5 seconds before starting to poll so we don't drop too many messages
try { Thread.sleep(10*1000); } catch (InterruptedException ie) {}
_log.debug("Poller running with delay [" + _transport.getPollFrequencyMs() + "]");
try {
while (_running) {
int numRead = getMessages();
if (numRead > 0)
_log.info("# messages found: " + numRead);
try { Thread.sleep(_transport.getPollFrequencyMs()); } catch (InterruptedException ie) {}
}
} catch (Throwable t) {
_log.info("Error while polling", t);
}
}
private int getMessages() {
// open the _pollURL, authenticate ourselves, and get any messages available
byte authData[] = getAuthData();
if (authData == null) return 0;
//_context.bandwidthLimiter().delayOutbound(null, authData.length + 512, false); // HTTP overhead
try {
_log.debug("Before opening " + _pollURL.toExternalForm());
HttpURLConnection con = (HttpURLConnection)_pollURL.openConnection();
// send the info
con.setRequestMethod("POST");
con.setUseCaches(false);
con.setDoOutput(true);
con.setDoInput(true);
ByteArrayOutputStream baos = new ByteArrayOutputStream(authData.length + 64);
String target = _transport.getMyIdentity().getHash().toBase64();
baos.write("target=".getBytes());
baos.write(target.getBytes());
baos.write("&".getBytes());
baos.write(authData);
byte data[] = baos.toByteArray();
//_log.debug("Data to be sent: " + Base64.encode(data));
con.setRequestProperty("Content-length", ""+data.length);
con.getOutputStream().write(data);
_log.debug("Data sent, before reading results of poll for [" + target + "]");
con.connect();
// fetch the results
int rc = con.getResponseCode();
_log.debug("Response code: " + rc);
switch (rc) {
case 200: // ok
_log.debug("Polling can progress");
break;
case 401: // signature failed
_log.error("Signature failed during polling???");
return 0;
case 404: // not yet registered
_log.error("Not registered with the relay - reregistering (in case they failed)");
_transport.registerWithRelay();
return 0;
default: // unknown
_log.error("Invalid error code returned: " + rc);
return 0;
}
InputStream in = con.getInputStream();
Date peerTime = DataHelper.readDate(in);
long offset = peerTime.getTime() - System.currentTimeMillis();
if (_transport.getTrustTime()) {
_log.info("Updating time offset to " + offset + " (old offset: " + _context.clock().getOffset() + ")");
_context.clock().setOffset(offset);
}
boolean shouldReject = shouldRejectMessages();
if (shouldReject) {
_log.debug("Rejecting any messages [we just checked in so we could get the time]");
return 0;
}
int numMessages = (int)DataHelper.readLong(in, 2);
if ( (numMessages > 100) || (numMessages < 0) ) {
_log.error("Invalid # messages specified [" + numMessages + "], skipping");
return 0;
}
int bytesRead = 512; // HTTP overhead
int numSuccessful = 0;
for (int i = 0; i < numMessages; i++) {
_log.debug("Receiving message " + (i+1) + " of "+ numMessages + " pending");
long len = DataHelper.readLong(in, 4);
byte msgBuf[] = new byte[(int)len];
int read = DataHelper.read(in, msgBuf);
if (read == -1) {
_log.error("Unable to read the message as we encountered an EOF");
return i - 1;
} else if (read != len) {
_log.error("Unable to read the message fully [" + read + " read, " + len + " expected]");
return i - 1;
} else {
bytesRead += 4 + read;
try {
I2NPMessage msg = _handler.readMessage(new ByteArrayInputStream(msgBuf));
if (msg == null) {
_log.warn("PHTTP couldn't read a message from the peer out of a " + len + " byte buffer");
} else {
_log.info("Receive message " + (i+1) + " of " + numMessages + ": " + msg.getClass().getName());
_transport.messageReceived(msg, null, null, _handler.getLastReadTime(), (int)len);
numSuccessful++;
}
} catch (IOException ioe) {
_log.warn("Unable to read the message fully", ioe);
} catch (I2NPMessageException ime) {
_log.warn("Poorly formatted message", ime);
}
}
}
//_context.bandwidthLimiter().delayInbound(null, bytesRead);
return numSuccessful;
} catch (Throwable t) {
_log.debug("Error polling", t);
return 0;
}
}
public void stopPolling() { _running = false; }
}
}

View File

@@ -1,285 +0,0 @@
package net.i2p.router.transport.phttp;
/*
* free (adj.): unencumbered; not under the control of others
* Written by jrandom in 2003 and released into the public domain
* with no warranty of any kind, either expressed or implied.
* It probably won't make your computer catch on fire, or eat
* your children, but it might. Use at your own risk.
*
*/
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Date;
import java.util.Iterator;
import net.i2p.data.RouterAddress;
import net.i2p.router.OutNetMessage;
import net.i2p.router.RouterContext;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
class PHTTPSender {
private Log _log;
private RouterContext _context;
private PHTTPTransport _transport;
private volatile long _sendId = 0;
public final static long RECHECK_DELAY = 1000; // 1 sec
public final static long HARD_TIMEOUT = 30*1000; // no timeouts > 30 seconds
/** H(routerIdent).toBase64() of the target to receive the message */
public final static String PARAM_SEND_TARGET = "target";
/** # ms to wait for the message to be delivered before failing it */
public final static String PARAM_SEND_TIMEOUTMS = "timeoutMs";
/** # bytes to be sent in the message */
public final static String PARAM_SEND_DATA_LENGTH = "dataLength";
/** local time in ms */
public final static String PARAM_SEND_TIME = "localTime";
private final static String PROP_STATUS = "status";
private final static String STATUS_OK = "accepted";
private final static String STATUS_PENDING = "pending";
private final static String STATUS_CLOCKSKEW = "clockSkew_"; /** prefix for (remote-local) */
/** HTTP error code if the target is known and accepting messages */
public final static int CODE_OK = 201; // created
/** HTTP error code if the target is not known or is not accepting messages */
public final static int CODE_FAIL = 410; // gone
/* the URL to check to see when the message is delivered */
public final static String PROP_CHECK_URL = "statusCheckURL";
/** HTTP error code if the message was sent completely */
public final static int CODE_NOT_PENDING = 410; // gone
/** HTTP error code if the message is still pending */
public final static int CODE_PENDING = 204; // ok, but no content
public PHTTPSender(RouterContext context, PHTTPTransport transport) {
_context = context;
_log = context.logManager().getLog(PHTTPSender.class);
_transport = transport;
}
public void send(OutNetMessage msg) {
_log.debug("Sending message " + msg.getMessage().getClass().getName() + " to " + msg.getTarget().getIdentity().getHash().toBase64());
Thread t = new I2PThread(new Send(msg));
t.setName("PHTTP Sender " + (_sendId++));
t.setDaemon(true);
t.start();
}
class Send implements Runnable {
private OutNetMessage _msg;
public Send(OutNetMessage msg) {
_msg = msg;
}
public void run() {
boolean ok = false;
try {
ok = doSend(_msg);
} catch (IOException ioe) {
_log.error("Error sending the message", ioe);
}
_transport.afterSend(_msg, ok);
}
}
private boolean doSend(OutNetMessage msg) throws IOException {
long delay = 0; // _context.bandwidthLimiter().calculateDelayOutbound(msg.getTarget().getIdentity(), (int)msg.getMessageSize());
_log.debug("Delaying [" + delay + "ms]");
try { Thread.sleep(delay); } catch (InterruptedException ie) {}
_log.debug("Continuing with sending");
// now send
URL sendURL = getURL(msg);
if (sendURL == null) {
_log.debug("No URL to send");
return false;
} else {
_log.debug("Sending to " + sendURL.toExternalForm());
HttpURLConnection con = (HttpURLConnection)sendURL.openConnection();
// send the info
con.setRequestMethod("POST");
con.setUseCaches(false);
con.setDoOutput(true);
con.setDoInput(true);
byte data[] = getData(msg);
if (data == null) return false;
//_context.bandwidthLimiter().delayOutbound(msg.getTarget().getIdentity(), data.length+512, false); // HTTP overhead
con.setRequestProperty("Content-length", ""+data.length);
OutputStream out = con.getOutputStream();
out.write(data);
out.flush();
_log.debug("Data sent, before reading");
// fetch the results
String checkURL = getCheckURL(con);
if (checkURL != null) {
_log.debug("Message sent");
return checkDelivery(checkURL, msg);
} else {
_log.warn("Target not known or unable to send to " + msg.getTarget().getIdentity().getHash().toBase64());
return false;
}
}
}
private String getCheckURL(HttpURLConnection con) throws IOException {
BufferedReader reader = new BufferedReader(new InputStreamReader(con.getInputStream()));
String statusLine = reader.readLine();
if (statusLine == null) {
_log.error("Null response line when checking URL");
return null;
}
boolean statusOk = false;
if (!statusLine.startsWith(PROP_STATUS)) {
_log.warn("Response does not begin with status [" + statusLine + "]");
return null;
} else {
String statVal = statusLine.substring(PROP_STATUS.length() + 1);
statusOk = STATUS_OK.equals(statVal);
if (!statusOk) {
_log.info("Status was not ok for sending [" + statVal + "]");
return null;
}
}
String checkURL = reader.readLine();
if (!checkURL.startsWith(PROP_CHECK_URL)) {
_log.warn("Incorrect OK response: " + checkURL);
return null;
} else {
String checkURLStr = checkURL.substring(PROP_CHECK_URL.length()+1);
_log.debug("Check URL = [" + checkURLStr + "]");
return checkURLStr;
}
}
private boolean checkDelivery(String checkURLStr, OutNetMessage msg) {
long now = _context.clock().now();
long expiration = msg.getExpiration();
if (expiration <= now)
expiration = now + HARD_TIMEOUT;
_log.debug("Check delivery [expiration = " + new Date(expiration) + "]");
try {
URL checkStatusURL = new URL(checkURLStr);
long delay = RECHECK_DELAY;
do {
//_context.bandwidthLimiter().delayOutbound(msg.getTarget().getIdentity(), 512, false); // HTTP overhead
//_context.bandwidthLimiter().delayInbound(msg.getTarget().getIdentity(), 512); // HTTP overhead
_log.debug("Checking delivery at " + checkURLStr);
HttpURLConnection con = (HttpURLConnection)checkStatusURL.openConnection();
con.setRequestMethod("GET");
//con.setInstanceFollowRedirects(false); // kaffe doesn't support this (yet)
con.setDoInput(true);
con.setDoOutput(false);
con.setUseCaches(false);
con.connect();
boolean isPending = getIsPending(con);
if (!isPending) {
_log.info("Check delivery successful for message " + msg.getMessage().getClass().getName());
return true;
}
if (now + delay > expiration)
delay = expiration - now - 30; // 30 = kludgy # for the next 4 statements
_log.debug("Still pending (wait " + delay + "ms)");
Thread.sleep(delay);
//delay += RECHECK_DELAY;
now = _context.clock().now();
} while (now < expiration);
_log.warn("Timeout for checking delivery to " + checkURLStr + " for message " + msg.getMessage().getClass().getName());
} catch (Throwable t) {
_log.debug("Error checking for delivery", t);
}
return false;
}
private boolean getIsPending(HttpURLConnection con) throws IOException {
int len = con.getContentLength();
int rc = con.getResponseCode();
BufferedReader reader = new BufferedReader(new InputStreamReader(con.getInputStream()));
String statusLine = reader.readLine();
if (statusLine == null) {
_log.warn("Server didn't send back a status line [len = " + len + ", rc = " + rc + "]");
return false;
}
boolean statusPending = false;
if (!statusLine.startsWith(PROP_STATUS)) {
_log.warn("Response does not begin with status [" + statusLine + "]");
return false;
} else {
String statVal = statusLine.substring(PROP_STATUS.length() + 1);
statusPending = STATUS_PENDING.equals(statVal);
if (statVal.startsWith(STATUS_CLOCKSKEW)) {
long skew = Long.MAX_VALUE;
String skewStr = statVal.substring(STATUS_CLOCKSKEW.length()+1);
try {
skew = Long.parseLong(skewStr);
} catch (Throwable t) {
_log.error("Unable to decode the clock skew [" + skewStr + "]");
skew = Long.MAX_VALUE;
}
_log.error("Clock skew talking with phttp relay: " + skew + "ms (remote-local)");
}
return statusPending;
}
}
private byte[] getData(OutNetMessage msg) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream((int)(msg.getMessageSize() + 64));
String target = msg.getTarget().getIdentity().getHash().toBase64();
StringBuffer buf = new StringBuffer();
buf.append(PARAM_SEND_TARGET).append('=').append(target).append('&');
buf.append(PARAM_SEND_TIMEOUTMS).append('=').append(msg.getExpiration() - _context.clock().now()).append('&');
buf.append(PARAM_SEND_DATA_LENGTH).append('=').append(msg.getMessageSize()).append('&');
buf.append(PARAM_SEND_TIME).append('=').append(_context.clock().now()).append('&').append('\n');
baos.write(buf.toString().getBytes());
baos.write(msg.getMessageData());
byte data[] = baos.toByteArray();
_log.debug("Data to be sent: " + data.length);
return data;
} catch (Throwable t) {
_log.error("Error preparing the data", t);
return null;
}
}
private URL getURL(OutNetMessage msg) {
for (Iterator iter = msg.getTarget().getAddresses().iterator(); iter.hasNext(); ) {
RouterAddress addr = (RouterAddress)iter.next();
URL url = getURL(addr);
if (url != null) return url;
}
_log.warn("No URLs could be constructed to send to " + msg.getTarget().getIdentity().getHash().toBase64());
return null;
}
private URL getURL(RouterAddress addr) {
if (PHTTPTransport.STYLE.equals(addr.getTransportStyle())) {
String url = addr.getOptions().getProperty(PHTTPTransport.PROP_TO_SEND_URL);
if (url == null) return null;
try {
return new URL(url);
} catch (MalformedURLException mue) {
_log.info("Address has a bad url [" + url + "]", mue);
}
}
return null;
}
}

View File

@@ -1,300 +0,0 @@
package net.i2p.router.transport.phttp;
/*
* free (adj.): unencumbered; not under the control of others
* Written by jrandom in 2003 and released into the public domain
* with no warranty of any kind, either expressed or implied.
* It probably won't make your computer catch on fire, or eat
* your children, but it might. Use at your own risk.
*
*/
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Date;
import java.util.Properties;
import net.i2p.data.DataHelper;
import net.i2p.data.RouterAddress;
import net.i2p.data.RouterIdentity;
import net.i2p.data.RouterInfo;
import net.i2p.data.SigningPrivateKey;
import net.i2p.router.JobImpl;
import net.i2p.router.OutNetMessage;
import net.i2p.router.RouterContext;
import net.i2p.router.transport.TransportBid;
import net.i2p.router.transport.TransportImpl;
import net.i2p.util.Log;
/**
*
*
*/
public class PHTTPTransport extends TransportImpl {
private Log _log;
public final static String STYLE = "PHTTP";
private RouterAddress _myAddress;
private String _mySendURL;
private String _myPollURL;
private String _myRegisterURL;
private long _timeOffset;
private long _pollFrequencyMs;
private int _transportCost;
private PHTTPPoller _poller;
private PHTTPSender _sender;
private boolean _trustTime;
/** how long after a registration failure should we delay? this gets doubled each time */
private long _nextRegisterDelay = 1000;
/** if the phttp relay is down, check it up to once every 5 minutes */
private final static long MAX_REGISTER_DELAY = 5*60*1000;
/** URL to which registration with the server can occur */
public final static String PROP_TO_REGISTER_URL = "registerURL";
/** URL to which messages destined for this address can be sent */
public final static String PROP_TO_SEND_URL = "sendURL";
public final static String PROP_LOCALTIME = "localtime";
/* key=val keys sent back on registration */
public final static String PROP_STATUS = "status";
public final static String PROP_POLL_URL = "pollURL";
public final static String PROP_SEND_URL = "sendURL";
public final static String PROP_TIME_OFFSET = "timeOffset"; // ms (remote-local)
/* values for the PROP_STATUS */
public final static String STATUS_FAILED = "failed";
public final static String STATUS_REGISTERED = "registered";
public final static String CONFIG_POLL_FREQUENCY = "i2np.phttp.pollFrequencySeconds";
public final static long DEFAULT_POLL_FREQUENCY = 60*1000; // every 60 seconds
/**
* do we want to assume that the relay's clock is sync'ed with NTP and update
* our offset according to what they say?
*/
public final static String CONFIG_TRUST_TIME = "i2np.phttp.trustRelayTime";
public final static boolean DEFAULT_TRUST_TIME = true;
public PHTTPTransport(RouterContext ctx, RouterAddress myAddress) {
super(ctx);
_log = ctx.logManager().getLog(PHTTPTransport.class);
_myAddress = myAddress;
if (myAddress != null) {
Properties opts = myAddress.getOptions();
_myRegisterURL = opts.getProperty(PROP_TO_REGISTER_URL);
_mySendURL = opts.getProperty(PROP_TO_SEND_URL);
_pollFrequencyMs = DEFAULT_POLL_FREQUENCY;
String pollFreq = _context.router().getConfigSetting(CONFIG_POLL_FREQUENCY);
if (pollFreq != null) {
try {
long val = Long.parseLong(pollFreq);
_pollFrequencyMs = val*1000;
_log.info("PHTTP Polling Frequency specified as once every " + val + " seconds");
} catch (NumberFormatException nfe) {
_log.error("Poll frequency is not valid (" + pollFreq + ")", nfe);
}
} else {
_log.info("PHTTP Polling Frequency not specified via (" + CONFIG_POLL_FREQUENCY + "), defaulting to once every " + (DEFAULT_POLL_FREQUENCY/1000) + " seconds");
}
String trustTime = _context.router().getConfigSetting(CONFIG_TRUST_TIME);
if (trustTime != null) {
_trustTime = Boolean.TRUE.toString().equalsIgnoreCase(trustTime);
} else {
_trustTime = DEFAULT_TRUST_TIME;
}
_context.jobQueue().addJob(new RegisterJob());
}
_sender = new PHTTPSender(_context, this);
_poller = new PHTTPPoller(_context, this);
}
public String getMySendURL() { return _mySendURL; }
SigningPrivateKey getMySigningKey() { return _context.keyManager().getSigningPrivateKey(); }
RouterIdentity getMyIdentity() { return _context.router().getRouterInfo().getIdentity(); }
String getMyPollURL() { return _myPollURL; }
long getPollFrequencyMs() { return _pollFrequencyMs; }
private class RegisterJob extends JobImpl {
public RegisterJob() {
super(PHTTPTransport.this._context);
}
public String getName() { return "Register with PHTTP relay"; }
public void runJob() {
boolean ok = doRegisterWithRelay();
if (ok) {
_log.debug("Registration successful with the last registration delay of " + _nextRegisterDelay + "ms");
_poller.startPolling();
} else {
_nextRegisterDelay = _nextRegisterDelay * 2;
if (_nextRegisterDelay > MAX_REGISTER_DELAY)
_nextRegisterDelay = MAX_REGISTER_DELAY;
long nextRegister = _context.clock().now() + _nextRegisterDelay;
_log.debug("Registration failed, next registration attempt in " + _nextRegisterDelay + "ms");
requeue(nextRegister);
}
}
}
boolean registerWithRelay() {
boolean ok = doRegisterWithRelay();
if (ok) {
_log.info("Registered with PHTTP relay");
return ok;
}
_log.error("Unable to register with relay");
return false;
}
synchronized boolean doRegisterWithRelay() {
_log.debug("Beginning registration");
ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
try {
DataHelper.writeDate(baos, new Date(_context.clock().now()));
_context.router().getRouterInfo().getIdentity().writeBytes(baos);
int postLength = baos.size();
//_context.bandwidthLimiter().delayOutbound(null, postLength+512, false); // HTTP overhead
//_context.bandwidthLimiter().delayInbound(null, 2048+512); // HTTP overhead
long now = _context.clock().now();
_log.debug("Before opening " + _myRegisterURL);
URL url = new URL(_myRegisterURL);
HttpURLConnection con = (HttpURLConnection)url.openConnection();
// send the info
con.setRequestMethod("POST");
con.setUseCaches(false);
con.setDoOutput(true);
con.setDoInput(true);
con.setRequestProperty("Content-length", ""+postLength);
baos.writeTo(con.getOutputStream());
_log.debug("Data sent, before reading");
con.connect();
// fetch the results
BufferedReader reader = new BufferedReader(new InputStreamReader(con.getInputStream()));
String line = null;
String stat = null;
boolean ok = false;
while ( (line = reader.readLine()) != null) {
if (line.startsWith(PROP_SEND_URL)) {
_mySendURL = line.substring(PROP_SEND_URL.length()+1).trim();
} else if (line.startsWith(PROP_POLL_URL)) {
_myPollURL = line.substring(PROP_POLL_URL.length()+1).trim();
} else if (line.startsWith(PROP_STATUS)) {
stat = line.substring(PROP_STATUS.length()+1).trim();
if (STATUS_REGISTERED.equals(stat.toLowerCase()))
ok = true;
} else if (line.startsWith(PROP_TIME_OFFSET)) {
String offset = line.substring(PROP_TIME_OFFSET.length()+1).trim();
try {
_timeOffset = Long.parseLong(offset);
} catch (Throwable t) {
_log.warn("Unable to parse time offset [" + offset + "] - treating as MAX");
_timeOffset = Long.MAX_VALUE;
}
}
if ( (_myPollURL != null) && (_mySendURL != null) && (stat != null) )
break;
}
if (_trustTime) {
_log.info("Setting time offset to " + _timeOffset + " (old offset: " + _context.clock().getOffset() + ")");
_context.clock().setOffset(_timeOffset);
}
//if ( (_timeOffset > Router.CLOCK_FUDGE_FACTOR) || (_timeOffset < 0 - Router.CLOCK_FUDGE_FACTOR) ) {
// _log.error("Unable to register with PHTTP relay, as there is too much clock skew! " + _timeOffset + "ms difference (them-us)", new Exception("Too much clock skew with phttp relay!"));
// return false;
//}
if (ok) {
_log.info("Registered with the PHTTP relay [" + _myRegisterURL + "]");
_log.info("Registered sending url: [" + _mySendURL + "]");
_log.info("Registered polling url: [" + _myPollURL + "]");
return true;
} else {
_log.warn("PHTTP relay [" + _myRegisterURL + "] rejected registration");
}
} catch (Throwable t) {
_log.warn("Error registering", t);
}
return false;
}
protected void outboundMessageReady() {
OutNetMessage msg = getNextMessage();
if (msg != null) {
_context.jobQueue().addJob(new PushNewMessageJob(msg));
} else {
_log.debug("OutboundMessageReady called, but none were available");
}
}
public TransportBid bid(RouterInfo toAddress, long dataSize) {
if (_poller.shouldRejectMessages())
return null; // we're not using phttp
long latencyStartup = 0; //_context.bandwidthLimiter().calculateDelayOutbound(toAddress.getIdentity(), (int)dataSize);
latencyStartup += _pollFrequencyMs / 2; // average distance until the next poll
long sendTime = (int)((dataSize)/(16*1024)); // 16K/sec ARBITRARY
int bytes = (int)dataSize+1024;
// lets seriously penalize phttp to heavily prefer TCP
bytes += 1024*100;
latencyStartup += 1000*600;
TransportBid bid = new TransportBid();
bid.setBandwidthBytes(bytes);
bid.setExpiration(new Date(_context.clock().now()+1000*60)); // 1 minute, since the bwlimiter goes per minute
bid.setLatencyMs((int) (latencyStartup + sendTime));
bid.setMessageSize((int)dataSize);
bid.setRouter(toAddress);
bid.setTransport(this);
RouterAddress addr = getTargetAddress(toAddress);
if (addr == null)
return null;
return bid;
}
public RouterAddress startListening() {
_log.debug("Start listening");
return _myAddress;
}
public void stopListening() {
if (_poller != null)
_poller.stopPolling();
}
public void rotateAddresses() {}
public void addAddressInfo(Properties infoForNewAddress) {}
public String getStyle() { return STYLE; }
boolean getTrustTime() { return _trustTime; }
private class PushNewMessageJob extends JobImpl {
private OutNetMessage _msg;
public PushNewMessageJob(OutNetMessage msg) {
super(PHTTPTransport.this._context);
_msg = msg;
}
public String getName() { return "Push New PHTTP Message"; }
public void runJob() {
long delay = 0; // _context.bandwidthLimiter().calculateDelayOutbound(_msg.getTarget().getIdentity(), (int)_msg.getMessageSize());
if (delay > 0) {
getTiming().setStartAfter(delay + _context.clock().now());
PHTTPTransport.this._context.jobQueue().addJob(this);
} else {
_sender.send(_msg);
}
}
}
}