2005-07-05

* Use a buffered PRNG, pulling the PRNG data off a larger precalculated
      buffer, rather than the underlying PRNG's (likely small) one, which in
      turn reduces the frequency of recalcing.
    * More tuning to reduce temporary allocation churn
This commit is contained in:
jrandom
2005-07-05 22:08:56 +00:00
committed by zzz
parent 18d3f5d25d
commit f688b9112d
22 changed files with 709 additions and 65 deletions

View File

@@ -22,7 +22,8 @@ import net.i2p.util.Log;
*/
public class TunnelDataMessage extends I2NPMessageImpl {
private Log _log;
private TunnelId _tunnelId;
private long _tunnelId;
private TunnelId _tunnelIdObj;
private byte[] _data;
public final static int MESSAGE_TYPE = 18;
@@ -48,8 +49,17 @@ public class TunnelDataMessage extends I2NPMessageImpl {
setMessageExpiration(context.clock().now() + EXPIRATION_PERIOD);
}
public TunnelId getTunnelId() { return _tunnelId; }
public void setTunnelId(TunnelId id) { _tunnelId = id; }
public long getTunnelId() { return _tunnelId; }
public void setTunnelId(long id) { _tunnelId = id; }
public TunnelId getTunnelIdObj() {
if (_tunnelIdObj == null)
_tunnelIdObj = new TunnelId(_tunnelId); // not thread safe, but immutable, so who cares
return _tunnelIdObj;
}
public void setTunnelId(TunnelId id) {
_tunnelIdObj = id;
_tunnelId = id.getTunnelId();
}
public byte[] getData() { return _data; }
public void setData(byte data[]) {
@@ -62,10 +72,10 @@ public class TunnelDataMessage extends I2NPMessageImpl {
if (type != MESSAGE_TYPE) throw new I2NPMessageException("Message type is incorrect for this message");
int curIndex = offset;
_tunnelId = new TunnelId(DataHelper.fromLong(data, curIndex, 4));
_tunnelId = DataHelper.fromLong(data, curIndex, 4);
curIndex += 4;
if (_tunnelId.getTunnelId() <= 0)
if (_tunnelId <= 0)
throw new I2NPMessageException("Invalid tunnel Id " + _tunnelId);
// we cant cache it in trivial form, as other components (e.g. HopProcessor)
@@ -82,12 +92,12 @@ public class TunnelDataMessage extends I2NPMessageImpl {
protected int calculateWrittenLength() { return 4 + DATA_SIZE; }
/** write the message body to the output array, starting at the given index */
protected int writeMessageBody(byte out[], int curIndex) throws I2NPMessageException {
if ( (_tunnelId == null) || (_data == null) )
if ( (_tunnelId <= 0) || (_data == null) )
throw new I2NPMessageException("Not enough data to write out (id=" + _tunnelId + " data=" + _data + ")");
if (_data.length <= 0)
throw new I2NPMessageException("Not enough data to write out (data.length=" + _data.length + ")");
DataHelper.toLong(out, curIndex, 4, _tunnelId.getTunnelId());
DataHelper.toLong(out, curIndex, 4, _tunnelId);
curIndex += 4;
System.arraycopy(_data, 0, out, curIndex, DATA_SIZE);
curIndex += _data.length;
@@ -99,7 +109,7 @@ public class TunnelDataMessage extends I2NPMessageImpl {
public int getType() { return MESSAGE_TYPE; }
public int hashCode() {
return DataHelper.hashCode(getTunnelId()) +
return (int)_tunnelId +
DataHelper.hashCode(_data);
}

View File

@@ -219,6 +219,11 @@ public class MessageHistory {
addEntry(getPrefix() + "message " + messageId + " on tunnel " + tunnelId + " / " + toTunnel + " as " + type);
}
public void tunnelDispatched(long messageId, long innerMessageId, long tunnelId, String type) {
if (!_doLog) return;
addEntry(getPrefix() + "message " + messageId + "/" + innerMessageId + " on " + tunnelId + " as " + type);
}
/**
* The local router has detected a failure in the given tunnel
*
@@ -472,13 +477,14 @@ public class MessageHistory {
buf.append("Break message ").append(messageId).append(" into fragments: ").append(numFragments);
addEntry(buf.toString());
}
public void fragmentMessage(long messageId, int numFragments, String tunnel) {
public void fragmentMessage(long messageId, int numFragments, Object tunnel) {
if (!_doLog) return;
if (messageId == -1) throw new IllegalArgumentException("why are you -1?");
StringBuffer buf = new StringBuffer(48);
buf.append(getPrefix());
buf.append("Break message ").append(messageId).append(" into fragments: ").append(numFragments);
buf.append(" on ").append(tunnel);
if (tunnel != null)
buf.append(" on ").append(tunnel.toString());
addEntry(buf.toString());
}
public void droppedTunnelDataMessageUnknown(long msgId, long tunnelId) {

View File

@@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.197 $ $Date: 2005/05/25 16:32:38 $";
public final static String ID = "$Revision: 1.198 $ $Date: 2005/07/04 15:44:21 $";
public final static String VERSION = "0.5.0.7";
public final static long BUILD = 9;
public final static long BUILD = 10;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@@ -61,6 +61,9 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
public TransportBid getBid(OutNetMessage msg) {
return _manager.getBid(msg);
}
public TransportBid getNextBid(OutNetMessage msg) {
return _manager.getNextBid(msg);
}
public void processMessage(OutNetMessage msg) {
//GetBidsJob j = new GetBidsJob(_context, this, msg);

View File

@@ -59,28 +59,13 @@ public class GetBidsJob extends JobImpl {
return;
}
List bids = facade.getBids(msg);
if ( (bids == null) || (bids.size() <= 0) ) {
context.shitlist().shitlistRouter(to, "No bids after " + (bids != null ? bids.size() + " tries" : "0 tries"));
TransportBid bid = facade.getNextBid(msg);
if (bid == null) {
context.shitlist().shitlistRouter(to, "No more bids available");
context.netDb().fail(to);
fail(context, msg);
} else {
int lowestCost = -1;
TransportBid winner = null;
for (int i = 0; i < bids.size(); i++) {
TransportBid bid = (TransportBid)bids.get(i);
if ( (lowestCost < 0) || (bid.getLatencyMs() < lowestCost) ) {
winner = bid;
lowestCost = bid.getLatencyMs();
}
}
if (winner != null) {
if (log.shouldLog(Log.INFO))
log.info("Winning bid: " + winner + " out of " + bids);
winner.getTransport().send(msg);
}
bid.getTransport().send(msg);
}
}

View File

@@ -159,6 +159,33 @@ public class TransportManager implements TransportEventListener {
return rv;
}
public TransportBid getNextBid(OutNetMessage msg) {
Set failedTransports = msg.getFailedTransports();
TransportBid rv = null;
for (int i = 0; i < _transports.size(); i++) {
Transport t = (Transport)_transports.get(i);
if (failedTransports.contains(t.getStyle())) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Skipping transport " + t.getStyle() + " as it already failed");
continue;
}
// we always want to try all transports, in case there is a faster bidirectional one
// already connected (e.g. peer only has a public PHTTP address, but they've connected
// to us via TCP, send via TCP)
TransportBid bid = t.bid(msg.getTarget(), msg.getMessageSize());
if (bid != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Transport " + t.getStyle() + " bid: " + bid);
if ( (rv == null) || (rv.getLatencyMs() > bid.getLatencyMs()) )
rv = bid;
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Transport " + t.getStyle() + " did not produce a bid");
}
}
return rv;
}
public void messageReceived(I2NPMessage message, RouterIdentity fromRouter, Hash fromRouterHash) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("I2NPMessage received: " + message.getClass().getName(), new Exception("Where did I come from again?"));

View File

@@ -101,7 +101,7 @@ public class ACKSender implements Runnable {
_context.statManager().addRateData("udp.sendACKRemaining", remaining, 0);
now = _context.clock().now();
_context.statManager().addRateData("udp.ackFrequency", now-lastSend, now-wanted);
_context.statManager().getStatLog().addData(peer.getRemoteHostId().toString(), "udp.peer.sendACKCount", ackBitfields.size(), 0);
//_context.statManager().getStatLog().addData(peer.getRemoteHostId().toString(), "udp.peer.sendACKCount", ackBitfields.size(), 0);
UDPPacket ack = _builder.buildACK(peer, ackBitfields);
ack.markType(1);
if (_log.shouldLog(Log.INFO))

View File

@@ -34,7 +34,7 @@ public class OutboundMessageState {
private int _nextSendFragment;
public static final int MAX_FRAGMENTS = 32;
private static final ByteCache _cache = ByteCache.getInstance(64, MAX_FRAGMENTS*1024);
private static final ByteCache _cache = ByteCache.getInstance(128, MAX_FRAGMENTS*1024);
public OutboundMessageState(I2PAppContext context) {
_context = context;

View File

@@ -27,6 +27,7 @@ public class PacketBuilder {
private Log _log;
private static final ByteCache _ivCache = ByteCache.getInstance(64, UDPPacket.IV_SIZE);
private static final ByteCache _hmacCache = ByteCache.getInstance(64, Hash.HASH_LENGTH);
public PacketBuilder(I2PAppContext ctx) {
_context = ctx;
@@ -482,16 +483,20 @@ public class PacketBuilder {
int hmacOff = packet.getPacket().getOffset();
int hmacLen = encryptSize + UDPPacket.IV_SIZE + 2;
Hash hmac = _context.hmac().calculate(macKey, data, hmacOff, hmacLen);
//Hash hmac = _context.hmac().calculate(macKey, data, hmacOff, hmacLen);
ByteArray ba = _hmacCache.acquire();
_context.hmac().calculate(macKey, data, hmacOff, hmacLen, ba.getData(), 0);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Authenticating " + packet.getPacketDataLength() + // packet.getPacket().getLength() +
"\nIV: " + Base64.encode(iv.getData()) +
"\nraw mac: " + hmac.toBase64() +
"\nraw mac: " + Base64.encode(ba.getData()) +
"\nMAC key: " + macKey.toBase64());
// ok, now lets put it back where it belongs...
System.arraycopy(data, hmacOff, data, encryptOffset, encryptSize);
System.arraycopy(hmac.getData(), 0, data, hmacOff, UDPPacket.MAC_SIZE);
//System.arraycopy(hmac.getData(), 0, data, hmacOff, UDPPacket.MAC_SIZE);
System.arraycopy(ba.getData(), 0, data, hmacOff, UDPPacket.MAC_SIZE);
System.arraycopy(iv.getData(), 0, data, hmacOff + UDPPacket.MAC_SIZE, UDPPacket.IV_SIZE);
_hmacCache.release(ba);
}
}

View File

@@ -68,9 +68,9 @@ public class UDPPacket {
public static final byte BITFIELD_CONTINUATION = (byte)(1 << 7);
private static final int MAX_VALIDATE_SIZE = MAX_PACKET_SIZE;
private static final ByteCache _validateCache = ByteCache.getInstance(16, MAX_VALIDATE_SIZE);
private static final ByteCache _ivCache = ByteCache.getInstance(16, IV_SIZE);
private static final ByteCache _dataCache = ByteCache.getInstance(64, MAX_PACKET_SIZE);
private static final ByteCache _validateCache = ByteCache.getInstance(64, MAX_VALIDATE_SIZE);
private static final ByteCache _ivCache = ByteCache.getInstance(64, IV_SIZE);
private static final ByteCache _dataCache = ByteCache.getInstance(128, MAX_PACKET_SIZE);
private UDPPacket(I2PAppContext ctx) {
_context = ctx;
@@ -237,8 +237,11 @@ public class UDPPacket {
_released = true;
//_releasedBy = new Exception("released by");
//_acquiredBy = null;
//_dataCache.release(_dataBuf);
if (!CACHE) return;
//
if (!CACHE) {
_dataCache.release(_dataBuf);
return;
}
synchronized (_packetCache) {
if (_packetCache.size() <= 64) {
_packetCache.add(this);

View File

@@ -57,8 +57,8 @@ public class BatchedRouterPreprocessor extends BatchedPreprocessor {
protected void notePreprocessing(long messageId, int numFragments) {
if (_config != null)
_routerContext.messageHistory().fragmentMessage(messageId, numFragments, _config.toString());
_routerContext.messageHistory().fragmentMessage(messageId, numFragments, _config);
else
_routerContext.messageHistory().fragmentMessage(messageId, numFragments, _hopConfig.toString());
_routerContext.messageHistory().fragmentMessage(messageId, numFragments, _hopConfig);
}
}

View File

@@ -26,6 +26,7 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
protected static final int IV_SIZE = HopProcessor.IV_LENGTH;
protected static final ByteCache _dataCache = ByteCache.getInstance(512, PREPROCESSED_SIZE);
protected static final ByteCache _ivCache = ByteCache.getInstance(128, IV_SIZE);
protected static final ByteCache _hashCache = ByteCache.getInstance(128, Hash.HASH_LENGTH);
public TrivialPreprocessor(I2PAppContext ctx) {
_context = ctx;
@@ -104,7 +105,11 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
// payload ready, now H(instructions+payload+IV)
System.arraycopy(iv, 0, fragments, fragmentLength, IV_SIZE);
Hash h = _context.sha().calculateHash(fragments, 0, fragmentLength + IV_SIZE);
ByteArray hashBuf = _hashCache.acquire();
//Hash h = _context.sha().calculateHash(fragments, 0, fragmentLength + IV_SIZE);
_context.sha().calculateHash(fragments, 0, fragmentLength + IV_SIZE, hashBuf.getData(), 0);
//Hash h = _context.sha().calculateHash(target, 0, offset + IV_SIZE);
//_log.debug("before shift: " + Base64.encode(target));
// now shiiiiiift
@@ -121,10 +126,12 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
int offset = 0;
System.arraycopy(iv, 0, fragments, offset, IV_SIZE);
offset += IV_SIZE;
System.arraycopy(h.getData(), 0, fragments, offset, 4);
//System.arraycopy(h.getData(), 0, fragments, offset, 4);
System.arraycopy(hashBuf.getData(), 0, fragments, offset, 4);
offset += 4;
//_log.debug("before pad : " + Base64.encode(target));
_hashCache.release(hashBuf);
_ivCache.release(ivBuf);
// fits in a single message, so may be smaller than the full size

View File

@@ -349,35 +349,35 @@ public class TunnelDispatcher implements Service {
long before = _context.clock().now();
TunnelParticipant participant = null;
synchronized (_participants) {
participant = (TunnelParticipant)_participants.get(msg.getTunnelId());
participant = (TunnelParticipant)_participants.get(msg.getTunnelIdObj());
}
if (participant != null) {
// we are either just a random participant or the inbound endpoint
if (_log.shouldLog(Log.DEBUG))
_log.debug("dispatch to participant " + participant + ": " + msg.getUniqueId() + " from "
+ recvFrom.toBase64().substring(0,4));
_context.messageHistory().tunnelDispatched(msg.getUniqueId(), msg.getTunnelId().getTunnelId(), "participant");
_context.messageHistory().tunnelDispatched(msg.getUniqueId(), msg.getTunnelId(), "participant");
participant.dispatch(msg, recvFrom);
_context.statManager().addRateData("tunnel.dispatchParticipant", 1, 0);
} else {
OutboundTunnelEndpoint endpoint = null;
synchronized (_outboundEndpoints) {
endpoint = (OutboundTunnelEndpoint)_outboundEndpoints.get(msg.getTunnelId());
endpoint = (OutboundTunnelEndpoint)_outboundEndpoints.get(msg.getTunnelIdObj());
}
if (endpoint != null) {
// we are the outobund endpoint
if (_log.shouldLog(Log.DEBUG))
_log.debug("dispatch where we are the outbound endpoint: " + endpoint + ": "
+ msg + " from " + recvFrom.toBase64().substring(0,4));
_context.messageHistory().tunnelDispatched(msg.getUniqueId(), msg.getTunnelId().getTunnelId(), "outbound endpoint");
_context.messageHistory().tunnelDispatched(msg.getUniqueId(), msg.getTunnelId(), "outbound endpoint");
endpoint.dispatch(msg, recvFrom);
_context.statManager().addRateData("tunnel.dispatchEndpoint", 1, 0);
} else {
_context.messageHistory().droppedTunnelDataMessageUnknown(msg.getUniqueId(), msg.getTunnelId().getTunnelId());
_context.messageHistory().droppedTunnelDataMessageUnknown(msg.getUniqueId(), msg.getTunnelId());
int level = (_context.router().getUptime() > 10*60*1000 ? Log.ERROR : Log.WARN);
if (_log.shouldLog(level))
_log.log(level, "no matching participant/endpoint for id=" + msg.getTunnelId().getTunnelId()
_log.log(level, "no matching participant/endpoint for id=" + msg.getTunnelId()
+ " expiring in " + DataHelper.formatDuration(msg.getMessageExpiration()-_context.clock().now())
+ ": existing = " + _participants.size() + " / " + _outboundEndpoints.size());
}
@@ -410,8 +410,9 @@ public class TunnelDispatcher implements Service {
+ msg.getMessage().getClass().getName());
return;
}
_context.messageHistory().tunnelDispatched("message " + msg.getUniqueId() + "/" + msg.getMessage().getUniqueId() + " on tunnel "
+ msg.getTunnelId().getTunnelId() + " as inbound gateway");
//_context.messageHistory().tunnelDispatched("message " + msg.getUniqueId() + "/" + msg.getMessage().getUniqueId() + " on tunnel "
// + msg.getTunnelId().getTunnelId() + " as inbound gateway");
_context.messageHistory().tunnelDispatched(msg.getUniqueId(), msg.getMessage().getUniqueId(), msg.getTunnelId().getTunnelId(), "inbound gateway");
gw.add(msg);
_context.statManager().addRateData("tunnel.dispatchInbound", 1, 0);
} else {

View File

@@ -104,7 +104,7 @@ public class RequestTunnelJob extends JobImpl {
// inbound tunnel, which means we are the first person asked, and if
// it is a zero hop tunnel, then we are also the last person asked
long id = getContext().random().nextLong(TunnelId.MAX_ID_VALUE);
long id = getContext().random().nextLong(TunnelId.MAX_ID_VALUE-1) + 1;
_currentConfig.setReceiveTunnelId(DataHelper.toLong(4, id));
if (_config.getLength() > 1) {
if (_log.shouldLog(Log.DEBUG))