diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java index 833ba62b4..0226b476d 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java @@ -15,6 +15,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Set; +import net.i2p.I2PAppContext; import net.i2p.I2PException; import net.i2p.client.I2PSession; import net.i2p.client.I2PSessionException; @@ -22,8 +23,10 @@ import net.i2p.client.I2PSessionListener; import net.i2p.data.Base64; import net.i2p.data.DataFormatException; import net.i2p.data.Destination; +import net.i2p.stat.StatManager; import net.i2p.util.Log; + /** * Centralize the coordination and multiplexing of the local client's streaming. * There should be one I2PSocketManager for each I2PSession, and if an application @@ -33,7 +36,8 @@ import net.i2p.util.Log; * */ public class I2PSocketManager implements I2PSessionListener { - private final static Log _log = new Log(I2PSocketManager.class); + private I2PAppContext _context; + private Log _log; private I2PSession _session; private I2PServerSocketImpl _serverSocket = null; private Object lock = new Object(); // for locking socket lists @@ -41,6 +45,8 @@ public class I2PSocketManager implements I2PSessionListener { private HashMap _inSockets; private I2PSocketOptions _defaultOptions; private long _acceptTimeout; + private String _name; + private static int __managerId = 0; public static final short ACK = 0x51; public static final short CLOSE_OUT = 0x52; @@ -57,10 +63,24 @@ public class I2PSocketManager implements I2PSessionListener { private static final long ACCEPT_TIMEOUT_DEFAULT = 5*1000; public I2PSocketManager() { + this("SocketManager " + (++__managerId)); + } + public I2PSocketManager(String name) { + _name = name; _session = null; _inSockets = new HashMap(16); _outSockets = new HashMap(16); _acceptTimeout = ACCEPT_TIMEOUT_DEFAULT; + _context = I2PAppContext.getGlobalContext(); + _log = _context.logManager().getLog(I2PSocketManager.class); + _context.statManager().createRateStat("streaming.lifetime", "How long before the socket is closed?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("streaming.sent", "How many bytes are sent in the stream?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("streaming.received", "How many bytes are received in the stream?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("streaming.transferBalance", "How many streams send more than they receive (positive means more sent, negative means more received)?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("streaming.synNoAck", "How many times have we sent a SYN but not received an ACK?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("streaming.ackSendFailed", "How many times have we tried to send an ACK to a SYN and failed?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("streaming.nackSent", "How many times have we refused a SYN with a NACK?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("streaming.nackReceived", "How many times have we received a NACK to our SYN?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 }); } public I2PSession getSession() { @@ -82,12 +102,12 @@ public class I2PSocketManager implements I2PSessionListener { public long getAcceptTimeout() { return _acceptTimeout; } public void disconnected(I2PSession session) { - _log.info("Disconnected from the session"); + _log.info(getName() + ": Disconnected from the session"); destroySocketManager(); } public void errorOccurred(I2PSession session, String message, Throwable error) { - _log.error("Error occurred: [" + message + "]", error); + _log.error(getName() + ": Error occurred: [" + message + "]", error); } public void messageAvailable(I2PSession session, int msgId, long size) { @@ -95,11 +115,11 @@ public class I2PSocketManager implements I2PSessionListener { I2PSocketImpl s; byte msg[] = session.receiveMessage(msgId); if (msg.length == 1 && msg[0] == -1) { - _log.debug("Ping received"); + _log.debug(getName() + ": Ping received"); return; } if (msg.length < 4) { - _log.error("==== packet too short ===="); + _log.error(getName() + ": ==== packet too short ===="); return; } int type = msg[0] & 0xff; @@ -107,7 +127,7 @@ public class I2PSocketManager implements I2PSessionListener { byte[] payload = new byte[msg.length - 4]; System.arraycopy(msg, 4, payload, 0, payload.length); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Message read: type = [" + Integer.toHexString(type) + _log.debug(getName() + ": Message read: type = [" + Integer.toHexString(type) + "] id = [" + getReadableForm(id) + "] payload length: [" + payload.length + "]"); switch (type) { @@ -136,9 +156,9 @@ public class I2PSocketManager implements I2PSessionListener { return; } } catch (I2PException ise) { - _log.error("Error processing", ise); + _log.error(getName() + ": Error processing", ise); } catch (IllegalStateException ise) { - _log.debug("Error processing", ise); + _log.debug(getName() + ": Error processing", ise); } } @@ -150,30 +170,45 @@ public class I2PSocketManager implements I2PSessionListener { * */ private void ackAvailable(String id, byte payload[]) { + long begin = _context.clock().now(); I2PSocketImpl s = null; synchronized (lock) { s = (I2PSocketImpl) _outSockets.get(id); } if (s == null) { - _log.warn("No socket responsible for ACK packet"); + _log.warn(getName() + ": No socket responsible for ACK packet"); return; } + long socketRetrieved = _context.clock().now(); + String remoteId = null; remoteId = s.getRemoteID(false); if ( (payload.length == 3) && (remoteId == null) ) { String newID = toString(payload); + long beforeSetRemId = _context.clock().now(); s.setRemoteID(newID); + if (_log.shouldLog(Log.DEBUG)) { + _log.debug(getName() + ": ackAvailable - socket retrieval took " + + (socketRetrieved-begin) + "ms, getRemoteId took " + + (beforeSetRemId-socketRetrieved) + "ms, setRemoteId took " + + (_context.clock().now()-beforeSetRemId) + "ms"); + } return; } else { // (payload.length != 3 || getRemoteId != null) if (_log.shouldLog(Log.WARN)) { if (payload.length != 3) - _log.warn("Ack packet had " + payload.length + " bytes"); + _log.warn(getName() + ": Ack packet had " + payload.length + " bytes"); else - _log.warn("Remote ID already exists? " + remoteId); + _log.warn(getName() + ": Remote ID already exists? " + remoteId); + } + if (_log.shouldLog(Log.DEBUG)) { + _log.debug(getName() + ": invalid ack - socket retrieval took " + + (socketRetrieved-begin) + "ms, overall took " + + (_context.clock().now()-begin) + "ms"); } return; } @@ -189,11 +224,11 @@ public class I2PSocketManager implements I2PSessionListener { s = (I2PSocketImpl) _outSockets.get(id); } - _log.debug("*Disconnect outgoing for socket " + s); + _log.debug(getName() + ": *Disconnect outgoing for socket " + s); try { if (s != null) { if (payload.length > 0) { - _log.debug("Disconnect packet had " + _log.debug(getName() + ": Disconnect packet had " + payload.length + " bytes"); } if (s.getRemoteID(false) == null) { @@ -207,7 +242,7 @@ public class I2PSocketManager implements I2PSessionListener { } return; } catch (Exception t) { - _log.error("Ignoring error on disconnect for socket " + s, t); + _log.error(getName() + ": Ignoring error on disconnect for socket " + s, t); } } @@ -225,12 +260,12 @@ public class I2PSocketManager implements I2PSessionListener { // packet send outgoing if (_log.shouldLog(Log.DEBUG)) - _log.debug("*Packet send outgoing [" + payload.length + "] for socket " + s); + _log.debug(getName() + ": *Packet send outgoing [" + payload.length + "] for socket " + s); if (s != null) { s.queueData(payload); return; } else { - _log.error("Null socket with data available"); + _log.error(getName() + ": Null socket with data available"); throw new IllegalStateException("Null socket with data available"); } } @@ -258,7 +293,7 @@ public class I2PSocketManager implements I2PSessionListener { s.setRemoteID(id); } } - _log.debug("*Syn! for socket " + s); + _log.debug(getName() + ": *Syn! for socket " + s); if (!acceptConnections) { // The app did not instantiate an I2PServerSocket @@ -268,10 +303,11 @@ public class I2PSocketManager implements I2PSessionListener { replySentOk = _session.sendMessage(d, packet); } if (!replySentOk) { - _log.error("Error sending close to " + d.calculateHash().toBase64() + _log.error(getName() + ": Error sending close to " + d.calculateHash().toBase64() + " in response to a new con message", new Exception("Failed creation")); } + _context.statManager().addRateData("streaming.nackSent", 1, 1); return; } @@ -282,10 +318,11 @@ public class I2PSocketManager implements I2PSessionListener { replySentOk = _session.sendMessage(d, packet); if (!replySentOk) { if (_log.shouldLog(Log.WARN)) - _log.warn("Error sending reply to " + d.calculateHash().toBase64() + _log.warn(getName() + ": Error sending reply to " + d.calculateHash().toBase64() + " in response to a new con message for socket " + s, new Exception("Failed creation")); s.internalClose(); + _context.statManager().addRateData("streaming.ackSendFailed", 1, 1); } } else { // timed out or serverSocket closed @@ -293,9 +330,10 @@ public class I2PSocketManager implements I2PSessionListener { packet[0] = CLOSE_OUT; boolean nackSent = session.sendMessage(d, packet); if (!nackSent) { - _log.warn("Error sending NACK for session creation for socket " + s); + _log.warn(getName() + ": Error sending NACK for session creation for socket " + s); } s.internalClose(); + _context.statManager().addRateData("streaming,nackSent", 1, 1); } return; } @@ -314,7 +352,7 @@ public class I2PSocketManager implements I2PSessionListener { } } - _log.debug("*Disconnect incoming for socket " + s); + _log.debug(getName() + ": *Disconnect incoming for socket " + s); try { if (payload.length == 0 && s != null) { @@ -322,13 +360,13 @@ public class I2PSocketManager implements I2PSessionListener { return; } else { if ( (payload.length > 0) && (_log.shouldLog(Log.ERROR)) ) - _log.error("Disconnect packet had " + payload.length + " bytes"); + _log.error(getName() + ": Disconnect packet had " + payload.length + " bytes"); if (s != null) s.internalClose(); return; } } catch (Exception t) { - _log.error("Ignoring error on disconnect", t); + _log.error(getName() + ": Ignoring error on disconnect", t); return; } } @@ -346,13 +384,13 @@ public class I2PSocketManager implements I2PSessionListener { } if (_log.shouldLog(Log.DEBUG)) - _log.debug("*Packet send incoming [" + payload.length + "] for socket " + s); + _log.debug(getName() + ": *Packet send incoming [" + payload.length + "] for socket " + s); if (s != null) { s.queueData(payload); return; } else { - _log.info("Null socket with data available"); + _log.info(getName() + ": Null socket with data available"); throw new IllegalStateException("Null socket with data available"); } } @@ -362,7 +400,7 @@ public class I2PSocketManager implements I2PSessionListener { * */ private void handleUnknown(int type, String id, byte payload[]) { - _log.error("\n\n=============== Unknown packet! " + "============" + _log.error(getName() + ": \n\n=============== Unknown packet! " + "============" + "\nType: " + (int) type + "\nID: " + getReadableForm(id) + "\nBase64'ed Data: " + Base64.encode(payload) @@ -376,7 +414,7 @@ public class I2PSocketManager implements I2PSessionListener { } public void reportAbuse(I2PSession session, int severity) { - _log.error("Abuse reported [" + severity + "]"); + _log.error(getName() + ": Abuse reported [" + severity + "]"); } public void setDefaultOptions(I2PSocketOptions options) { @@ -424,31 +462,37 @@ public class I2PSocketManager implements I2PSessionListener { boolean sent = false; sent = _session.sendMessage(peer, packet); if (!sent) { - _log.info("Unable to send & receive ack for SYN packet for socket " + s); + _log.info(getName() + ": Unable to send & receive ack for SYN packet for socket " + s); synchronized (lock) { _outSockets.remove(s.getLocalID()); } + _context.statManager().addRateData("streaming.synNoAck", 1, 1); throw new I2PException("Error sending through I2P network"); } remoteID = s.getRemoteID(true, options.getConnectTimeout()); - if (remoteID == null) + if (remoteID == null) { + _context.statManager().addRateData("streaming.nackReceived", 1, 1); throw new ConnectException("Connection refused by peer for socket " + s); - if ("".equals(remoteID)) + } + if ("".equals(remoteID)) { + _context.statManager().addRateData("streaming.synNoAck", 1, 1); throw new NoRouteToHostException("Unable to reach peer for socket " + s); + } if (_log.shouldLog(Log.DEBUG)) - _log.debug("TIMING: s given out for remoteID " + _log.debug(getName() + ": TIMING: s given out for remoteID " + getReadableForm(remoteID) + " for socket " + s); return s; } catch (InterruptedIOException ioe) { if (_log.shouldLog(Log.ERROR)) - _log.error("Timeout waiting for ack from syn for id " + _log.error(getName() + ": Timeout waiting for ack from syn for id " + getReadableForm(lcID) + " for socket " + s, ioe); synchronized (lock) { _outSockets.remove(s.getLocalID()); } s.internalClose(); + _context.statManager().addRateData("streaming.synNoAck", 1, 1); throw new InterruptedIOException("Timeout waiting for ack"); } catch (ConnectException ex) { s.internalClose(); @@ -458,7 +502,7 @@ public class I2PSocketManager implements I2PSessionListener { throw ex; } catch (IOException ex) { if (_log.shouldLog(Log.ERROR)) - _log.error("Error sending syn on id " + getReadableForm(lcID) + " for socket " + s, ex); + _log.error(getName() + ": Error sending syn on id " + getReadableForm(lcID) + " for socket " + s, ex); synchronized (lock) { _outSockets.remove(s.getLocalID()); } @@ -466,7 +510,7 @@ public class I2PSocketManager implements I2PSessionListener { throw new I2PException("Unhandled IOException occurred"); } catch (I2PException ex) { if (_log.shouldLog(Log.INFO)) - _log.info("Error sending syn on id " + getReadableForm(lcID) + " for socket " + s, ex); + _log.info(getName() + ": Error sending syn on id " + getReadableForm(lcID) + " for socket " + s, ex); synchronized (lock) { _outSockets.remove(s.getLocalID()); } @@ -474,7 +518,7 @@ public class I2PSocketManager implements I2PSessionListener { throw ex; } catch (Exception e) { s.internalClose(); - _log.error("Unhandled error connecting", e); + _log.error(getName() + ": Unhandled error connecting", e); throw new ConnectException("Unhandled error connecting: " + e.getMessage()); } } @@ -515,7 +559,7 @@ public class I2PSocketManager implements I2PSessionListener { id = (String)iter.next(); sock = (I2PSocketImpl)_inSockets.get(id); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Closing inSocket \"" + _log.debug(getName() + ": Closing inSocket \"" + getReadableForm(sock.getLocalID()) + "\""); sock.internalClose(); } @@ -525,13 +569,13 @@ public class I2PSocketManager implements I2PSessionListener { id = (String)iter.next(); sock = (I2PSocketImpl)_outSockets.get(id); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Closing outSocket \"" + _log.debug(getName() + ": Closing outSocket \"" + getReadableForm(sock.getLocalID()) + "\""); sock.internalClose(); } } - _log.debug("Waiting for all open sockets to really close..."); + _log.debug(getName() + ": Waiting for all open sockets to really close..."); synchronized (lock) { while ((_inSockets.size() != 0) || (_outSockets.size() != 0)) { try { @@ -541,11 +585,11 @@ public class I2PSocketManager implements I2PSessionListener { } try { - _log.debug("Destroying I2P session..."); + _log.debug(getName() + ": Destroying I2P session..."); _session.destroySession(); - _log.debug("I2P session destroyed"); + _log.debug(getName() + ": I2P session destroyed"); } catch (I2PSessionException e) { - _log.error("Error destroying I2P session", e); + _log.error(getName() + ": Error destroying I2P session", e); } } @@ -571,21 +615,47 @@ public class I2PSocketManager implements I2PSessionListener { try { return _session.sendMessage(peer, new byte[] { (byte) CHAFF}); } catch (I2PException ex) { - _log.error("I2PException:", ex); + _log.error(getName() + ": I2PException:", ex); return false; } } public void removeSocket(I2PSocketImpl sock) { synchronized (lock) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Removing socket \"" + getReadableForm(sock.getLocalID()) + "\" [" + sock + "]"); _inSockets.remove(sock.getLocalID()); _outSockets.remove(sock.getLocalID()); lock.notify(); } + + long now = _context.clock().now(); + long lifetime = now - sock.getCreatedOn(); + long timeSinceClose = now - sock.getClosedOn(); + long sent = sock.getBytesSent(); + long recv = sock.getBytesReceived(); + + if (_log.shouldLog(Log.DEBUG)) { + _log.debug(getName() + ": Removing socket \"" + getReadableForm(sock.getLocalID()) + "\" [" + sock + + ", send: " + sent + ", recv: " + recv + + ", lifetime: " + lifetime + "ms, time since close: " + timeSinceClose + ")]", + new Exception("removeSocket called")); + } + + _context.statManager().addRateData("streaming.lifetime", lifetime, lifetime); + _context.statManager().addRateData("streaming.sent", sent, lifetime); + _context.statManager().addRateData("streaming.received", recv, lifetime); + + if (sent > recv) { + _context.statManager().addRateData("streaming.transferBalance", 1, lifetime); + } else if (recv > sent) { + _context.statManager().addRateData("streaming.transferBalance", -1, lifetime); + } else { + // noop + } } + public String getName() { return _name; } + public void setName(String name) { _name = name; } + public static String getReadableForm(String id) { if (id == null) return "(null)"; if (id.length() != 3) return "Bogus";