2004-12-21 jrandom

* Cleaned up the postinstall/startup scripts a bit more to handle winME,
      and added windows info to the headless docs. (thanks ardvark!)
    * Fixed a harmless (yet NPE inspiring) race during the final shutdown of
      a stream (thanks frosk!)
    * Add a pair of new stats for monitoring tunnel participation -
      tunnel.participatingBytesProcessed (total # bytes transferred) and
      tunnel.participatingBytesProcessedActive (total # bytes transferred for
      tunnels whose byte count exceed the 10m average).  This should help
      further monitor congestion issues.
    * Made the NamingService factory property public (thanks susi!)
This commit is contained in:
jrandom
2004-12-21 16:32:49 +00:00
committed by zzz
parent 6cb316b33e
commit 758293dc02
13 changed files with 96 additions and 89 deletions

View File

@@ -17,17 +17,20 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
private I2PAppContext _context;
private Log _log;
private Connection _connection;
private MessageOutputStream.WriteStatus _dummyStatus;
private static final MessageOutputStream.WriteStatus _dummyStatus = new DummyStatus();
public ConnectionDataReceiver(I2PAppContext ctx, Connection con) {
_context = ctx;
_log = ctx.logManager().getLog(ConnectionDataReceiver.class);
_connection = con;
_dummyStatus = new DummyStatus();
}
public boolean writeInProcess() {
return _connection.getUnackedPacketsSent() > 0;
Connection con = _connection;
if (con != null)
return con.getUnackedPacketsSent() > 0;
else
return false;
}
/**
@@ -42,10 +45,12 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
* delivery.
*/
public MessageOutputStream.WriteStatus writeData(byte[] buf, int off, int size) {
Connection con = _connection;
if (con == null) return _dummyStatus;
boolean doSend = true;
if ( (size <= 0) && (_connection.getLastSendId() >= 0) ) {
if (_connection.getOutputStream().getClosed()) {
if (_connection.getCloseSentOn() < 0) {
if ( (size <= 0) && (con.getLastSendId() >= 0) ) {
if (con.getOutputStream().getClosed()) {
if (con.getCloseSentOn() < 0) {
doSend = true;
} else {
// closed, no new data, and we've already sent a close packet
@@ -57,16 +62,18 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
}
}
if (_connection.getUnackedPacketsReceived() > 0)
if (con.getUnackedPacketsReceived() > 0)
doSend = true;
if (_log.shouldLog(Log.INFO) && !doSend)
_log.info("writeData called: size="+size + " doSend=" + doSend
+ " unackedReceived: " + _connection.getUnackedPacketsReceived()
+ " con: " + _connection, new Exception("write called by"));
+ " unackedReceived: " + con.getUnackedPacketsReceived()
+ " con: " + con, new Exception("write called by"));
if (doSend) {
PacketLocal packet = send(buf, off, size);
if (packet == null) return _dummyStatus;
//dont wait for non-acks
if ( (packet.getSequenceNum() > 0) || (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) )
return packet;
@@ -85,7 +92,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
* @param buf data to be sent - may be null
* @param off offset into the buffer to start writing from
* @param size how many bytes of the buffer to write (may be 0)
* @return the packet sent
* @return the packet sent, or null if the connection died
*/
public PacketLocal send(byte buf[], int off, int size) {
return send(buf, off, size, false);
@@ -99,10 +106,12 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
* @return the packet sent
*/
public PacketLocal send(byte buf[], int off, int size, boolean forceIncrement) {
Connection con = _connection;
if (con == null) return null;
long before = System.currentTimeMillis();
PacketLocal packet = buildPacket(buf, off, size, forceIncrement);
PacketLocal packet = buildPacket(con, buf, off, size, forceIncrement);
long built = System.currentTimeMillis();
_connection.sendPacket(packet);
con.sendPacket(packet);
long sent = System.currentTimeMillis();
if ( (built-before > 1000) && (_log.shouldLog(Log.WARN)) )
@@ -112,18 +121,18 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
return packet;
}
private boolean isAckOnly(int size) {
private boolean isAckOnly(Connection con, int size) {
boolean ackOnly = ( (size <= 0) && // no data
(_connection.getLastSendId() >= 0) && // not a SYN
( (!_connection.getOutputStream().getClosed()) || // not a CLOSE
(_connection.getOutputStream().getClosed() &&
_connection.getCloseSentOn() > 0) )); // or it is a dup CLOSE
(con.getLastSendId() >= 0) && // not a SYN
( (!con.getOutputStream().getClosed()) || // not a CLOSE
(con.getOutputStream().getClosed() &&
con.getCloseSentOn() > 0) )); // or it is a dup CLOSE
return ackOnly;
}
private PacketLocal buildPacket(byte buf[], int off, int size, boolean forceIncrement) {
boolean ackOnly = isAckOnly(size);
PacketLocal packet = new PacketLocal(_context, _connection.getRemotePeer(), _connection);
private PacketLocal buildPacket(Connection con, byte buf[], int off, int size, boolean forceIncrement) {
boolean ackOnly = isAckOnly(con, size);
PacketLocal packet = new PacketLocal(_context, con.getRemotePeer(), con);
byte data[] = new byte[size];
if (size > 0)
System.arraycopy(buf, off, data, 0, size);
@@ -131,36 +140,36 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
if (ackOnly && !forceIncrement)
packet.setSequenceNum(0);
else
packet.setSequenceNum(_connection.getNextOutboundPacketNum());
packet.setSendStreamId(_connection.getSendStreamId());
packet.setReceiveStreamId(_connection.getReceiveStreamId());
packet.setSequenceNum(con.getNextOutboundPacketNum());
packet.setSendStreamId(con.getSendStreamId());
packet.setReceiveStreamId(con.getReceiveStreamId());
_connection.getInputStream().updateAcks(packet);
packet.setOptionalDelay(_connection.getOptions().getChoke());
packet.setOptionalMaxSize(_connection.getOptions().getMaxMessageSize());
packet.setResendDelay(_connection.getOptions().getResendDelay());
con.getInputStream().updateAcks(packet);
packet.setOptionalDelay(con.getOptions().getChoke());
packet.setOptionalMaxSize(con.getOptions().getMaxMessageSize());
packet.setResendDelay(con.getOptions().getResendDelay());
if (_connection.getOptions().getProfile() == ConnectionOptions.PROFILE_INTERACTIVE)
if (con.getOptions().getProfile() == ConnectionOptions.PROFILE_INTERACTIVE)
packet.setFlag(Packet.FLAG_PROFILE_INTERACTIVE, true);
else
packet.setFlag(Packet.FLAG_PROFILE_INTERACTIVE, false);
packet.setFlag(Packet.FLAG_SIGNATURE_REQUESTED, _connection.getOptions().getRequireFullySigned());
packet.setFlag(Packet.FLAG_SIGNATURE_REQUESTED, con.getOptions().getRequireFullySigned());
if ( (!ackOnly) && (packet.getSequenceNum() <= 0) ) {
packet.setFlag(Packet.FLAG_SYNCHRONIZE);
packet.setOptionalFrom(_connection.getSession().getMyDestination());
packet.setOptionalFrom(con.getSession().getMyDestination());
}
// don't set the closed flag if this is a plain ACK and there are outstanding
// packets sent, otherwise the other side could receive the CLOSE prematurely,
// since this ACK could arrive before the unacked payload message.
if (_connection.getOutputStream().getClosed() &&
( (size > 0) || (_connection.getUnackedPacketsSent() <= 0) ) ) {
if (con.getOutputStream().getClosed() &&
( (size > 0) || (con.getUnackedPacketsSent() <= 0) ) ) {
packet.setFlag(Packet.FLAG_CLOSE);
_connection.setCloseSentOn(_context.clock().now());
con.setCloseSentOn(_context.clock().now());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Closed is set for a new packet on " + _connection + ": " + packet);
_log.debug("Closed is set for a new packet on " + con + ": " + packet);
} else {
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Closed is not set for a new packet on " + _connection + ": " + packet);