diff --git a/apps/i2psnark/java/src/org/klomp/snark/ConnectionAcceptor.java b/apps/i2psnark/java/src/org/klomp/snark/ConnectionAcceptor.java index 8fe2f2142..653e97bc6 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/ConnectionAcceptor.java +++ b/apps/i2psnark/java/src/org/klomp/snark/ConnectionAcceptor.java @@ -27,19 +27,40 @@ import net.i2p.I2PException; import net.i2p.client.streaming.I2PServerSocket; import net.i2p.client.streaming.I2PSocket; import net.i2p.util.I2PThread; +import net.i2p.util.Log; /** * Accepts connections on a TCP port and routes them to sub-acceptors. */ public class ConnectionAcceptor implements Runnable { + private static final ConnectionAcceptor _instance = new ConnectionAcceptor(); + public static final ConnectionAcceptor instance() { return _instance; } + private Log _log = new Log(ConnectionAcceptor.class); private I2PServerSocket serverSocket; - private final PeerAcceptor peeracceptor; + private PeerAcceptor peeracceptor; private Thread thread; private boolean stop; private boolean socketChanged; + private ConnectionAcceptor() {} + + public synchronized void startAccepting(PeerCoordinatorSet set, I2PServerSocket socket) { + if (serverSocket != socket) { + if ( (peeracceptor == null) || (peeracceptor.coordinators != set) ) + peeracceptor = new PeerAcceptor(set); + serverSocket = socket; + stop = false; + socketChanged = true; + if (thread == null) { + thread = new I2PThread(this, "I2PSnark acceptor"); + thread.setDaemon(true); + thread.start(); + } + } + } + public ConnectionAcceptor(I2PServerSocket serverSocket, PeerAcceptor peeracceptor) { @@ -55,6 +76,7 @@ public class ConnectionAcceptor implements Runnable public void halt() { + if (true) throw new RuntimeException("wtf"); stop = true; I2PServerSocket ss = serverSocket; @@ -129,6 +151,8 @@ public class ConnectionAcceptor implements Runnable serverSocket.close(); } catch (I2PException ignored) { } + + throw new RuntimeException("wtf"); } private class Handler implements Runnable { @@ -140,11 +164,17 @@ public class ConnectionAcceptor implements Runnable try { InputStream in = _socket.getInputStream(); OutputStream out = _socket.getOutputStream(); - BufferedInputStream bis = new BufferedInputStream(in); - BufferedOutputStream bos = new BufferedOutputStream(out); - peeracceptor.connection(_socket, bis, bos); + if (true) { + in = new BufferedInputStream(in); + out = new BufferedOutputStream(out); + } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Handling socket from " + _socket.getPeerDestination().calculateHash().toBase64()); + peeracceptor.connection(_socket, in, out); } catch (IOException ioe) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Error handling connection from " + _socket.getPeerDestination().calculateHash().toBase64(), ioe); try { _socket.close(); } catch (IOException ignored) { } } } diff --git a/apps/i2psnark/java/src/org/klomp/snark/Message.java b/apps/i2psnark/java/src/org/klomp/snark/Message.java index 1e95fda18..091353293 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/Message.java +++ b/apps/i2psnark/java/src/org/klomp/snark/Message.java @@ -23,6 +23,8 @@ package org.klomp.snark; import java.io.DataOutputStream; import java.io.IOException; +import net.i2p.util.SimpleTimer; + // Used to queue outgoing connections // sendMessage() should be used to translate them to wire format. class Message @@ -54,6 +56,8 @@ class Message int off; int len; + SimpleTimer.TimedEvent expireEvent; + /** Utility method for sending a message through a DataStream. */ void sendMessage(DataOutputStream dos) throws IOException { diff --git a/apps/i2psnark/java/src/org/klomp/snark/MetaInfo.java b/apps/i2psnark/java/src/org/klomp/snark/MetaInfo.java index 9cd7bd7d7..66e183720 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/MetaInfo.java +++ b/apps/i2psnark/java/src/org/klomp/snark/MetaInfo.java @@ -443,7 +443,7 @@ public class MetaInfo } _log.debug(buf.toString()); byte[] infoBytes = BEncoder.bencode(info); - _log.debug("info bencoded: [" + Base64.encode(infoBytes, true) + "]"); + //_log.debug("info bencoded: [" + Base64.encode(infoBytes, true) + "]"); try { MessageDigest digest = MessageDigest.getInstance("SHA"); diff --git a/apps/i2psnark/java/src/org/klomp/snark/Peer.java b/apps/i2psnark/java/src/org/klomp/snark/Peer.java index 5a8d6f09e..3958aed46 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/Peer.java +++ b/apps/i2psnark/java/src/org/klomp/snark/Peer.java @@ -28,6 +28,7 @@ import java.util.Map; import org.klomp.snark.bencode.*; import net.i2p.client.streaming.I2PSocket; +import net.i2p.data.DataHelper; import net.i2p.util.Log; public class Peer implements Comparable @@ -48,7 +49,11 @@ public class Peer implements Comparable // was successful, the connection setup and runs PeerState state; + private I2PSocket sock; + private boolean deregister = true; + private static long __id; + private long _id; /** * Creates a disconnected peer given a PeerID, your own id and the @@ -60,7 +65,8 @@ public class Peer implements Comparable this.peerID = peerID; this.my_id = my_id; this.metainfo = metainfo; - _log.debug("Creating a new peer with " + peerID.getAddress().calculateHash().toBase64(), new Exception("creating")); + _id = ++__id; + //_log.debug("Creating a new peer with " + peerID.getAddress().calculateHash().toBase64(), new Exception("creating")); } /** @@ -72,16 +78,17 @@ public class Peer implements Comparable * * @exception IOException when an error occurred during the handshake. */ - public Peer(final I2PSocket sock, BufferedInputStream bis, - BufferedOutputStream bos, byte[] my_id, MetaInfo metainfo) + public Peer(final I2PSocket sock, InputStream in, OutputStream out, byte[] my_id, MetaInfo metainfo) throws IOException { this.my_id = my_id; this.metainfo = metainfo; + this.sock = sock; - byte[] id = handshake(bis, bos); + byte[] id = handshake(in, out); this.peerID = new PeerID(id, sock.getPeerDestination()); - _log.debug("Creating a new peer with " + peerID.getAddress().calculateHash().toBase64(), new Exception("creating")); + _id = ++__id; + _log.debug("Creating a new peer with " + peerID.getAddress().calculateHash().toBase64(), new Exception("creating " + _id)); } /** @@ -97,7 +104,7 @@ public class Peer implements Comparable */ public String toString() { - return peerID.toString(); + return peerID.toString() + _id; } /** @@ -155,12 +162,19 @@ public class Peer implements Comparable // Do we need to handshake? if (din == null) { - I2PSocket sock = I2PSnarkUtil.instance().connect(peerID); - BufferedInputStream bis - = new BufferedInputStream(sock.getInputStream()); - BufferedOutputStream bos - = new BufferedOutputStream(sock.getOutputStream()); - byte [] id = handshake(bis, bos); + sock = I2PSnarkUtil.instance().connect(peerID); + if ((sock == null) || (sock.isClosed())) { + throw new IOException("Unable to reach " + peerID); + } + InputStream in = new BufferedInputStream(sock.getInputStream()); + OutputStream out = sock.getOutputStream(); //new BufferedOutputStream(sock.getOutputStream()); + if (true) + out = new BufferedOutputStream(out); + //BufferedInputStream bis + // = new BufferedInputStream(sock.getInputStream()); + //BufferedOutputStream bos + // = new BufferedOutputStream(sock.getOutputStream()); + byte [] id = handshake(in, out); //handshake(bis, bos); byte [] expected_id = peerID.getID(); if (!Arrays.equals(expected_id, id)) throw new IOException("Unexpected peerID '" @@ -189,11 +203,14 @@ public class Peer implements Comparable { // Ignore, probably just the other side closing the connection. // Or refusing the connection, timing out, etc. + if (_log.shouldLog(Log.DEBUG)) + _log.debug(this.toString(), eofe); } catch(Throwable t) { - Snark.debug(this + ": " + t, Snark.ERROR); - t.printStackTrace(); + _log.error(this + ": " + t.getMessage(), t); + if (t instanceof OutOfMemoryError) + throw (OutOfMemoryError)t; } finally { @@ -205,11 +222,11 @@ public class Peer implements Comparable * Sets DataIn/OutputStreams, does the handshake and returns the id * reported by the other side. */ - private byte[] handshake(BufferedInputStream bis, BufferedOutputStream bos) + private byte[] handshake(InputStream in, OutputStream out) //BufferedInputStream bis, BufferedOutputStream bos) throws IOException { - din = new DataInputStream(bis); - dout = new DataOutputStream(bos); + din = new DataInputStream(in); + dout = new DataOutputStream(out); // Handshake write - header dout.write(19); @@ -228,7 +245,7 @@ public class Peer implements Comparable byte b = din.readByte(); if (b != 19) throw new IOException("Handshake failure, expected 19, got " - + (b & 0xff)); + + (b & 0xff) + " on " + sock); byte[] bs = new byte[19]; din.readFully(bs); @@ -287,6 +304,15 @@ public class Peer implements Comparable if (pl != null) pl.disconnected(this); } + I2PSocket csock = sock; + sock = null; + if ( (csock != null) && (!csock.isClosed()) ) { + try { + csock.close(); + } catch (IOException ioe) { + _log.warn("Error disconnecting " + toString(), ioe); + } + } } /** @@ -393,4 +419,17 @@ public class Peer implements Comparable s.uploaded = 0; } } + + public long getInactiveTime() { + PeerState s = state; + if (s != null) { + PeerConnectionOut out = s.out; + if (out != null) + return System.currentTimeMillis() - out.lastSent; + else + return -1; //"state, no out"; + } else { + return -1; //"no state"; + } + } } diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerAcceptor.java b/apps/i2psnark/java/src/org/klomp/snark/PeerAcceptor.java index c09f92b3d..9e12db6ce 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerAcceptor.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerAcceptor.java @@ -27,6 +27,7 @@ import java.util.Iterator; import net.i2p.client.streaming.I2PSocket; import net.i2p.data.Base64; import net.i2p.data.DataHelper; +import net.i2p.util.Log; /** * Accepts incomming connections from peers. The ConnectionAcceptor @@ -36,8 +37,9 @@ import net.i2p.data.DataHelper; */ public class PeerAcceptor { + private static final Log _log = new Log(PeerAcceptor.class); private final PeerCoordinator coordinator; - private final PeerCoordinatorSet coordinators; + final PeerCoordinatorSet coordinators; public PeerAcceptor(PeerCoordinator coordinator) { @@ -51,13 +53,8 @@ public class PeerAcceptor this.coordinator = null; } - private static final int LOOKAHEAD_SIZE = "19".length() + - "BitTorrent protocol".length() + - 8 + // blank, reserved - 20; // infohash - public void connection(I2PSocket socket, - BufferedInputStream bis, BufferedOutputStream bos) + InputStream in, OutputStream out) throws IOException { // inside this Peer constructor's handshake is where you'd deal with the other @@ -65,16 +62,23 @@ public class PeerAcceptor // support, but because of how the protocol works, we can get away with just reading // ahead the first $LOOKAHEAD_SIZE bytes to figure out which infohash they want to // talk about, and we can just look for that in our list of active torrents. - bis.mark(LOOKAHEAD_SIZE); - byte peerInfoHash[] = readHash(bis); - bis.reset(); + byte peerInfoHash[] = null; + try { + peerInfoHash = readHash(in); + _log.info("infohash read from " + socket.getPeerDestination().calculateHash().toBase64() + + ": " + Base64.encode(peerInfoHash)); + } catch (IOException ioe) { + _log.info("Unable to read the infohash from " + socket.getPeerDestination().calculateHash().toBase64()); + throw ioe; + } + in = new SequenceInputStream(new ByteArrayInputStream(peerInfoHash), in); if (coordinator != null) { // single torrent capability MetaInfo meta = coordinator.getMetaInfo(); if (DataHelper.eq(meta.getInfoHash(), peerInfoHash)) { if (coordinator.needPeers()) { - Peer peer = new Peer(socket, bis, bos, coordinator.getID(), + Peer peer = new Peer(socket, in, out, coordinator.getID(), coordinator.getMetaInfo()); coordinator.addPeer(peer); } @@ -94,13 +98,15 @@ public class PeerAcceptor if (DataHelper.eq(meta.getInfoHash(), peerInfoHash)) { if (cur.needPeers()) { - Peer peer = new Peer(socket, bis, bos, cur.getID(), + Peer peer = new Peer(socket, in, out, cur.getID(), cur.getMetaInfo()); cur.addPeer(peer); return; } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Rejecting new peer for " + cur.snark.torrent); socket.close(); return; } @@ -112,6 +118,11 @@ public class PeerAcceptor } } + private static final int LOOKAHEAD_SIZE = "19".length() + + "BitTorrent protocol".length() + + 8 + // blank, reserved + 20; // infohash + /** * Read ahead to the infohash, throwing an exception if there isn't enough data */ diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionIn.java b/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionIn.java index 83a04e0ce..a99edd79c 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionIn.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionIn.java @@ -169,10 +169,12 @@ class PeerConnectionIn implements Runnable catch (IOException ioe) { // Ignore, probably the other side closed connection. + if (_log.shouldLog(Log.INFO)) + _log.info("IOError talking with " + peer, ioe); } catch (Throwable t) { - I2PSnarkUtil.instance().debug(peer.toString(), Snark.ERROR, t); + _log.error("Error talking with " + peer, t); if (t instanceof OutOfMemoryError) throw (OutOfMemoryError)t; } diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionOut.java b/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionOut.java index 91562415e..8bbeb7172 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionOut.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionOut.java @@ -42,6 +42,8 @@ class PeerConnectionOut implements Runnable private static long __id = 0; private long _id; + + long lastSent; public PeerConnectionOut(Peer peer, DataOutputStream dout) { @@ -49,6 +51,7 @@ class PeerConnectionOut implements Runnable this.dout = dout; _id = ++__id; + lastSent = System.currentTimeMillis(); quit = false; thread = new I2PThread(this, "Snark sender " + _id); thread.start(); @@ -73,8 +76,6 @@ class PeerConnectionOut implements Runnable try { // Make sure everything will reach the other side. - // i2p flushes passively, no need to force it - // ... maybe not though dout.flush(); // Wait till more data arrives. @@ -103,33 +104,38 @@ class PeerConnectionOut implements Runnable Message nm = (Message)it.next(); if (nm.type == Message.PIECE) { - if (state.choking) + if (state.choking) { it.remove(); + SimpleTimer.getInstance().removeEvent(nm.expireEvent); + } nm = null; } else if (nm.type == Message.REQUEST && state.choked) { it.remove(); + SimpleTimer.getInstance().removeEvent(nm.expireEvent); nm = null; } if (m == null && nm != null) { m = nm; + SimpleTimer.getInstance().removeEvent(nm.expireEvent); it.remove(); } } - if (m == null && sendQueue.size() > 0) + if (m == null && sendQueue.size() > 0) { m = (Message)sendQueue.remove(0); + SimpleTimer.getInstance().removeEvent(m.expireEvent); + } } } if (m != null) { - if (Snark.debug >= Snark.ALL) - Snark.debug("Send " + peer + ": " + m, Snark.ALL); if (_log.shouldLog(Log.DEBUG)) _log.debug("Send " + peer + ": " + m + " on " + peer.metainfo.getName()); m.sendMessage(dout); + lastSent = System.currentTimeMillis(); // Remove all piece messages after sending a choke message. if (m.type == Message.CHOKE) @@ -146,10 +152,12 @@ class PeerConnectionOut implements Runnable catch (IOException ioe) { // Ignore, probably other side closed connection. + if (_log.shouldLog(Log.INFO)) + _log.info("IOError sending to " + peer, ioe); } catch (Throwable t) { - I2PSnarkUtil.instance().debug(peer.toString(), Snark.ERROR, t); + _log.error("Error sending to " + peer, t); if (t instanceof OutOfMemoryError) throw (OutOfMemoryError)t; } @@ -164,8 +172,8 @@ class PeerConnectionOut implements Runnable { synchronized(sendQueue) { - if (quit == true) - return; + //if (quit == true) + // return; quit = true; thread.interrupt(); @@ -189,12 +197,13 @@ class PeerConnectionOut implements Runnable } } - /** remove messages not sent in 30s */ - private static final int SEND_TIMEOUT = 30*1000; + /** remove messages not sent in 2m */ + private static final int SEND_TIMEOUT = 120*1000; private class RemoveTooSlow implements SimpleTimer.TimedEvent { private Message _m; public RemoveTooSlow(Message m) { _m = m; + m.expireEvent = RemoveTooSlow.this; } public void timeReached() { diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java b/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java index 3fa5144cf..1d6970901 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java @@ -186,20 +186,27 @@ public class PeerCoordinator implements PeerListener return; } + Peer toDisconnect = null; synchronized(peers) { - if (peerIDInList(peer.getPeerID(), peers)) + Peer old = peerIDInList(peer.getPeerID(), peers); + if ( (old != null) && (old.getInactiveTime() > 2*60*1000) ) { + // idle for 2 minutes, kill the old con + peers.remove(old); + toDisconnect = old; + old = null; + } + if (old != null) { - if (Snark.debug >= Snark.INFO) - Snark.debug("Already connected to: " + peer, Snark.INFO); + if (_log.shouldLog(Log.WARN)) + _log.warn("Already connected to: " + peer + ": " + old + ", inactive for " + old.getInactiveTime()); peer.disconnect(false); // Don't deregister this connection/peer. } else { - if (Snark.debug >= Snark.INFO) - Snark.debug("New connection to peer: " + peer, Snark.INFO); + if (_log.shouldLog(Log.INFO)) + _log.info("New connection to peer: " + peer + " for " + metainfo.getName()); - _log.info("New connection to peer " + peer + " for " + metainfo.getName()); // Add it to the beginning of the list. // And try to optimistically make it a uploader. peers.add(0, peer); @@ -210,15 +217,21 @@ public class PeerCoordinator implements PeerListener listener.peerChange(this, peer); } } + if (toDisconnect != null) { + toDisconnect.disconnect(false); + removePeerFromPieces(toDisconnect); + } } - private static boolean peerIDInList(PeerID pid, List peers) + private static Peer peerIDInList(PeerID pid, List peers) { Iterator it = peers.iterator(); - while (it.hasNext()) - if (pid.sameID(((Peer)it.next()).getPeerID())) - return true; - return false; + while (it.hasNext()) { + Peer cur = (Peer)it.next(); + if (pid.sameID(cur.getPeerID())) + return cur; + } + return null; } public void addPeer(final Peer peer) @@ -253,12 +266,13 @@ public class PeerCoordinator implements PeerListener new I2PThread(r, threadName).start(); } else - if (Snark.debug >= Snark.INFO) + if (_log.shouldLog(Log.DEBUG)) { if (peer.isConnected()) - Snark.debug("Add peer already connected: " + peer, Snark.INFO); + _log.info("Add peer already connected: " + peer); else - Snark.debug("MAX_CONNECTIONS = " + MAX_CONNECTIONS - + " not accepting extra peer: " + peer, Snark.INFO); + _log.info("MAX_CONNECTIONS = " + MAX_CONNECTIONS + + " not accepting extra peer: " + peer); + } } @@ -288,8 +302,8 @@ public class PeerCoordinator implements PeerListener while (uploaders < MAX_UPLOADERS && interested.size() > 0) { Peer peer = (Peer)interested.remove(0); - if (Snark.debug >= Snark.INFO) - Snark.debug("Unchoke: " + peer, Snark.INFO); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Unchoke: " + peer); peer.setChoking(false); uploaders++; // Put peer back at the end of the list. @@ -448,10 +462,6 @@ public class PeerCoordinator implements PeerListener Piece p = new Piece(piece); if (!wantedPieces.contains(p)) { - if (Snark.debug >= Snark.INFO) - Snark.debug(peer + " piece " + piece + " no longer needed", - Snark.INFO); - _log.info("Got unwanted piece " + piece + "/" + metainfo.getPieces() +" from " + peer + " for " + metainfo.getName()); // No need to announce have piece to peers. @@ -463,17 +473,12 @@ public class PeerCoordinator implements PeerListener { if (storage.putPiece(piece, bs)) { - if (Snark.debug >= Snark.INFO) - Snark.debug("Recv p" + piece + " " + peer, Snark.INFO); _log.info("Got valid piece " + piece + "/" + metainfo.getPieces() +" from " + peer + " for " + metainfo.getName()); } else { // Oops. We didn't actually download this then... :( downloaded -= metainfo.getPieceLength(piece); - if (Snark.debug >= Snark.NOTICE) - Snark.debug("Got BAD piece " + piece + " from " + peer, - Snark.NOTICE); _log.warn("Got BAD piece " + piece + "/" + metainfo.getPieces() + " from " + peer + " for " + metainfo.getName()); return false; // No need to announce BAD piece to peers. } @@ -504,8 +509,8 @@ public class PeerCoordinator implements PeerListener public void gotChoke(Peer peer, boolean choke) { - if (Snark.debug >= Snark.INFO) - Snark.debug("Got choke(" + choke + "): " + peer, Snark.INFO); + if (_log.shouldLog(Log.INFO)) + _log.info("Got choke(" + choke + "): " + peer); if (listener != null) listener.peerChange(this, peer); @@ -523,8 +528,8 @@ public class PeerCoordinator implements PeerListener { uploaders++; peer.setChoking(false); - if (Snark.debug >= Snark.INFO) - Snark.debug("Unchoke: " + peer, Snark.INFO); + if (_log.shouldLog(Log.INFO)) + _log.info("Unchoke: " + peer); } } } @@ -537,7 +542,7 @@ public class PeerCoordinator implements PeerListener public void disconnected(Peer peer) { if (_log.shouldLog(Log.INFO)) - _log.info("Disconnected " + peer); + _log.info("Disconnected " + peer, new Exception("Disconnected by")); synchronized(peers) { diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerState.java b/apps/i2psnark/java/src/org/klomp/snark/PeerState.java index 75b64e443..6820999e7 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerState.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerState.java @@ -26,8 +26,11 @@ import java.util.List; import java.util.Set; import java.util.HashSet; +import net.i2p.util.Log; + class PeerState { + private Log _log = new Log(PeerState.class); final Peer peer; final PeerListener listener; final MetaInfo metainfo; @@ -59,7 +62,7 @@ class PeerState // If we have te resend outstanding requests (true after we got choked). private boolean resend = false; - private final static int MAX_PIPELINE = 5; + private final static int MAX_PIPELINE = 1; private final static int PARTSIZE = 64*1024; // default was 16K, i2p-bt uses 64KB PeerState(Peer peer, PeerListener listener, MetaInfo metainfo, @@ -77,16 +80,15 @@ class PeerState void keepAliveMessage() { - if (Snark.debug >= Snark.DEBUG) - Snark.debug(peer + " rcv alive", Snark.DEBUG); + if (_log.shouldLog(Log.DEBUG)) + _log.debug(peer + " rcv alive"); /* XXX - ignored */ } void chokeMessage(boolean choke) { - if (Snark.debug >= Snark.DEBUG) - Snark.debug(peer + " rcv " + (choke ? "" : "un") + "choked", - Snark.DEBUG); + if (_log.shouldLog(Log.DEBUG)) + _log.debug(peer + " rcv " + (choke ? "" : "un") + "choked"); choked = choke; if (choked) @@ -100,24 +102,23 @@ class PeerState void interestedMessage(boolean interest) { - if (Snark.debug >= Snark.DEBUG) - Snark.debug(peer + " rcv " + (interest ? "" : "un") - + "interested", Snark.DEBUG); + if (_log.shouldLog(Log.DEBUG)) + _log.debug(peer + " rcv " + (interest ? "" : "un") + + "interested"); interested = interest; listener.gotInterest(peer, interest); } void haveMessage(int piece) { - if (Snark.debug >= Snark.DEBUG) - Snark.debug(peer + " rcv have(" + piece + ")", Snark.DEBUG); + if (_log.shouldLog(Log.DEBUG)) + _log.debug(peer + " rcv have(" + piece + ")"); // Sanity check if (piece < 0 || piece >= metainfo.getPieces()) { // XXX disconnect? - if (Snark.debug >= Snark.INFO) - Snark.debug("Got strange 'have: " + piece + "' message from " + peer, - + Snark.INFO); + if (_log.shouldLog(Log.WARN)) + _log.warn("Got strange 'have: " + piece + "' message from " + peer); return; } @@ -138,14 +139,13 @@ class PeerState { synchronized(this) { - if (Snark.debug >= Snark.DEBUG) - Snark.debug(peer + " rcv bitfield", Snark.DEBUG); + if (_log.shouldLog(Log.DEBUG)) + _log.debug(peer + " rcv bitfield"); if (bitfield != null) { // XXX - Be liberal in what you except? - if (Snark.debug >= Snark.INFO) - Snark.debug("Got unexpected bitfield message from " + peer, - Snark.INFO); + if (_log.shouldLog(Log.WARN)) + _log.warn("Got unexpected bitfield message from " + peer); return; } @@ -157,14 +157,13 @@ class PeerState void requestMessage(int piece, int begin, int length) { - if (Snark.debug >= Snark.DEBUG) - Snark.debug(peer + " rcv request(" - + piece + ", " + begin + ", " + length + ") ", - Snark.DEBUG); + if (_log.shouldLog(Log.DEBUG)) + _log.debug(peer + " rcv request(" + + piece + ", " + begin + ", " + length + ") "); if (choking) { - if (Snark.debug >= Snark.INFO) - Snark.debug("Request received, but choking " + peer, Snark.INFO); + if (_log.shouldLog(Log.INFO)) + _log.info("Request received, but choking " + peer); return; } @@ -177,12 +176,11 @@ class PeerState || length > 4*PARTSIZE) { // XXX - Protocol error -> disconnect? - if (Snark.debug >= Snark.INFO) - Snark.debug("Got strange 'request: " + piece + if (_log.shouldLog(Log.WARN)) + _log.warn("Got strange 'request: " + piece + ", " + begin + ", " + length - + "' message from " + peer, - Snark.INFO); + + "' message from " + peer); return; } @@ -190,8 +188,8 @@ class PeerState if (pieceBytes == null) { // XXX - Protocol error-> diconnect? - if (Snark.debug >= Snark.INFO) - Snark.debug("Got request for unknown piece: " + piece, Snark.INFO); + if (_log.shouldLog(Log.WARN)) + _log.warn("Got request for unknown piece: " + piece); return; } @@ -199,25 +197,18 @@ class PeerState if (begin >= pieceBytes.length || begin + length > pieceBytes.length) { // XXX - Protocol error-> disconnect? - if (Snark.debug >= Snark.INFO) - Snark.debug("Got out of range 'request: " + piece + if (_log.shouldLog(Log.WARN)) + _log.warn("Got out of range 'request: " + piece + ", " + begin + ", " + length - + "' message from " + peer, - Snark.INFO); + + "' message from " + peer); return; } - if (Snark.debug >= Snark.DEBUG) - Snark.debug("Sending (" + piece + ", " + begin + ", " - + length + ")" + " to " + peer, Snark.DEBUG); + if (_log.shouldLog(Log.INFO)) + _log.info("Sending (" + piece + ", " + begin + ", " + + length + ")" + " to " + peer); out.sendPiece(piece, begin, length, pieceBytes); - - // Tell about last subpiece delivery. - if (begin + length == pieceBytes.length) - if (Snark.debug >= Snark.DEBUG) - Snark.debug("Send p" + piece + " " + peer, - Snark.DEBUG); } /** @@ -245,14 +236,13 @@ class PeerState { if (listener.gotPiece(peer, req.piece, req.bs)) { - if (Snark.debug >= Snark.DEBUG) - Snark.debug("Got " + req.piece + ": " + peer, Snark.DEBUG); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Got " + req.piece + ": " + peer); } else { - if (Snark.debug >= Snark.DEBUG) - Snark.debug("Got BAD " + req.piece + " from " + peer, - Snark.DEBUG); + if (_log.shouldLog(Log.WARN)) + _log.warn("Got BAD " + req.piece + " from " + peer); // XXX ARGH What now !?! downloaded = 0; } @@ -275,21 +265,20 @@ class PeerState */ Request getOutstandingRequest(int piece, int begin, int length) { - if (Snark.debug >= Snark.DEBUG) - Snark.debug("getChunk(" + if (_log.shouldLog(Log.DEBUG)) + _log.debug("getChunk(" + piece + "," + begin + "," + length + ") " - + peer, Snark.DEBUG); + + peer); int r = getFirstOutstandingRequest(piece); // Unrequested piece number? if (r == -1) { - if (Snark.debug >= Snark.INFO) - Snark.debug("Unrequested 'piece: " + piece + ", " + if (_log.shouldLog(Log.INFO)) + _log.info("Unrequested 'piece: " + piece + ", " + begin + ", " + length + "' received from " - + peer, - Snark.INFO); + + peer); downloaded = 0; // XXX - punishment? return null; } @@ -309,13 +298,12 @@ class PeerState // Something wrong? if (req.piece != piece || req.off != begin || req.len != length) { - if (Snark.debug >= Snark.INFO) - Snark.debug("Unrequested or unneeded 'piece: " + if (_log.shouldLog(Log.INFO)) + _log.info("Unrequested or unneeded 'piece: " + piece + ", " + begin + ", " + length + "' received from " - + peer, - Snark.INFO); + + peer); downloaded = 0; // XXX - punishment? return null; } @@ -323,9 +311,9 @@ class PeerState // Report missing requests. if (r != 0) { - if (Snark.debug >= Snark.INFO) - System.err.print("Some requests dropped, got " + req - + ", wanted:"); + if (_log.shouldLog(Log.WARN)) + _log.warn("Some requests dropped, got " + req + + ", wanted for peer: " + peer); for (int i = 0; i < r; i++) { Request dropReq = (Request)outstandingRequests.remove(0); @@ -338,11 +326,9 @@ class PeerState if (!choked) out.sendRequest(dropReq); */ - if (Snark.debug >= Snark.INFO) - System.err.print(" " + dropReq); + if (_log.shouldLog(Log.WARN)) + _log.warn("dropped " + dropReq + " with peer " + peer); } - if (Snark.debug >= Snark.INFO) - System.err.println(" " + peer); } outstandingRequests.remove(0); } @@ -356,24 +342,23 @@ class PeerState void cancelMessage(int piece, int begin, int length) { - if (Snark.debug >= Snark.DEBUG) - Snark.debug("Got cancel message (" - + piece + ", " + begin + ", " + length + ")", - Snark.DEBUG); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Got cancel message (" + + piece + ", " + begin + ", " + length + ")"); out.cancelRequest(piece, begin, length); } void unknownMessage(int type, byte[] bs) { - if (Snark.debug >= Snark.WARNING) - Snark.debug("Warning: Ignoring unknown message type: " + type - + " length: " + bs.length, Snark.WARNING); + if (_log.shouldLog(Log.WARN)) + _log.warn("Warning: Ignoring unknown message type: " + type + + " length: " + bs.length); } void havePiece(int piece) { - if (Snark.debug >= Snark.DEBUG) - Snark.debug("Tell " + peer + " havePiece(" + piece + ")", Snark.DEBUG); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Tell " + peer + " havePiece(" + piece + ")"); synchronized(this) { @@ -474,8 +459,8 @@ class PeerState } } - if (Snark.debug >= Snark.DEBUG) - Snark.debug(peer + " requests " + outstandingRequests, Snark.DEBUG); + if (_log.shouldLog(Log.DEBUG)) + _log.debug(peer + " requests " + outstandingRequests); } // Starts requesting first chunk of next piece. Returns true if @@ -486,8 +471,8 @@ class PeerState if (bitfield != null) { int nextPiece = listener.wantPiece(peer, bitfield); - if (Snark.debug >= Snark.DEBUG) - Snark.debug(peer + " want piece " + nextPiece, Snark.DEBUG); + if (_log.shouldLog(Log.DEBUG)) + _log.debug(peer + " want piece " + nextPiece); synchronized(this) { if (nextPiece != -1 @@ -512,8 +497,8 @@ class PeerState synchronized void setInteresting(boolean interest) { - if (Snark.debug >= Snark.DEBUG) - Snark.debug(peer + " setInteresting(" + interest + ")", Snark.DEBUG); + if (_log.shouldLog(Log.DEBUG)) + _log.debug(peer + " setInteresting(" + interest + ")"); if (interest != interesting) { @@ -527,8 +512,8 @@ class PeerState synchronized void setChoking(boolean choke) { - if (Snark.debug >= Snark.DEBUG) - Snark.debug(peer + " setChoking(" + choke + ")", Snark.DEBUG); + if (_log.shouldLog(Log.DEBUG)) + _log.debug(peer + " setChoking(" + choke + ")"); if (choking != choke) { diff --git a/apps/i2psnark/java/src/org/klomp/snark/Snark.java b/apps/i2psnark/java/src/org/klomp/snark/Snark.java index 3b5f4c4fb..4d9529602 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/Snark.java +++ b/apps/i2psnark/java/src/org/klomp/snark/Snark.java @@ -372,10 +372,9 @@ public class Snark coordinator = new PeerCoordinator(id, meta, storage, clistener, this); PeerCoordinatorSet set = PeerCoordinatorSet.instance(); set.add(coordinator); - PeerAcceptor peeracceptor = new PeerAcceptor(set); //coordinator); - ConnectionAcceptor acceptor = new ConnectionAcceptor(serversocket, - peeracceptor); - + ConnectionAcceptor acceptor = ConnectionAcceptor.instance(); + acceptor.startAccepting(set, serversocket); + trackerclient = new TrackerClient(meta, coordinator); if (start) startTorrent(); diff --git a/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java b/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java index 8d2967c59..4df41ff30 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java +++ b/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java @@ -462,7 +462,7 @@ public class I2PSnarkServlet extends HttpServlet { return bytes + "B"; else if (bytes < 5*1024*1024) return (bytes/1024) + "KB"; - else if (bytes < 5*1024*1024*1024) + else if (bytes < 5*1024*1024*1024l) return (bytes/(1024*1024)) + "MB"; else return (bytes/(1024*1024*1024)) + "GB"; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java index c2f412693..0daf0ba65 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java @@ -109,4 +109,11 @@ public class I2PSocketFull implements I2PSocket { _connection = null; _listener = null; } + public String toString() { + Connection c = _connection; + if (c == null) + return super.toString(); + else + return c.toString(); + } } diff --git a/apps/syndie/java/src/net/i2p/syndie/data/ArchiveIndex.java b/apps/syndie/java/src/net/i2p/syndie/data/ArchiveIndex.java index ca6410552..4d0704eb1 100644 --- a/apps/syndie/java/src/net/i2p/syndie/data/ArchiveIndex.java +++ b/apps/syndie/java/src/net/i2p/syndie/data/ArchiveIndex.java @@ -291,6 +291,7 @@ public class ArchiveIndex { if (!blog.equals(summary.blog)) continue; } + if ( (tag != null) && (tag.trim().length() > 0) ) { if (!tag.equals(summary.tag)) { if (_log.shouldLog(Log.DEBUG)) @@ -315,17 +316,21 @@ public class ArchiveIndex { for (int j = 0; j < summary.entries.size(); j++) { EntrySummary entry = (EntrySummary)summary.entries.get(j); - if (entry.entry.getEntryId() < lowestEntryId) + if (entry.entry.getEntryId() < lowestEntryId) { + long daysAgo1 = entry.entry.getEntryId() / (24*60*60*1000l); + long daysAgo2 = lowestEntryId / (24*60*60*1000l); continue; - String k = (Long.MAX_VALUE-entry.entry.getEntryId()) + "-" + entry.entry.getKeyHash().toBase64(); - ordered.put(k, entry.entry); - //System.err.println("Including match: " + k); + } else { + String k = (Long.MAX_VALUE-entry.entry.getEntryId()) + "-" + entry.entry.getKeyHash().toBase64(); + ordered.put(k, entry.entry); + } } } for (Iterator iter = ordered.values().iterator(); iter.hasNext(); ) { BlogURI entry = (BlogURI)iter.next(); - if (entry.getEntryId() < lowestEntryId) + if (entry.getEntryId() < lowestEntryId) { continue; + } if (!out.contains(entry)) out.add(entry); } diff --git a/apps/syndie/java/src/net/i2p/syndie/web/BaseServlet.java b/apps/syndie/java/src/net/i2p/syndie/web/BaseServlet.java index 728a0a354..0458a6a0a 100644 --- a/apps/syndie/java/src/net/i2p/syndie/web/BaseServlet.java +++ b/apps/syndie/java/src/net/i2p/syndie/web/BaseServlet.java @@ -679,7 +679,7 @@ public abstract class BaseServlet extends HttpServlet { for (Iterator iter = names.iterator(); iter.hasNext(); ) { String name = (String) iter.next(); PetName pn = db.getByName(name); - if ("syndieblog".equals(pn.getProtocol())) { + if ("syndieblog".equals(pn.getProtocol()) && pn.isMember(FilteredThreadIndex.GROUP_FAVORITE)) { if ( (author != null) && (author.equals(pn.getLocation())) ) out.write("\n"); else @@ -1151,6 +1151,16 @@ public abstract class BaseServlet extends HttpServlet { "}\n" + ".postReplyOptions {\n" + " background-color: #BBBBFF;\n" + +"}\n" + +".syndieBlogFavorites {\n" + +" float: left;\n" + +" margin: 5px 0px 0 0;\n" + +" display: inline;\n" + +"}\n" + +".syndieBlogList {\n" + +" float: right;\n" + +" margin: 5px 0px 0 0;\n" + +" display: inline;\n" + "}\n"; diff --git a/apps/syndie/java/src/net/i2p/syndie/web/ViewBlogsServlet.java b/apps/syndie/java/src/net/i2p/syndie/web/ViewBlogsServlet.java index 63b43c7cd..7b62a1a3f 100644 --- a/apps/syndie/java/src/net/i2p/syndie/web/ViewBlogsServlet.java +++ b/apps/syndie/java/src/net/i2p/syndie/web/ViewBlogsServlet.java @@ -19,9 +19,35 @@ import net.i2p.syndie.sml.*; * */ public class ViewBlogsServlet extends BaseServlet { - private static final int MAX_AUTHORS_AT_ONCE = 100; + private static final int MAX_AUTHORS_AT_ONCE = 20; private static final int MAX_TAGS = 50; + /** renders the posts from the last 3 days */ + private String getViewBlogLink(Hash blog, long lastPost) { + long dayBegin = BlogManager.instance().getDayBegin(); + int daysAgo = 2; + if ( (lastPost > 0) && (dayBegin - 3*24*60*6081000 > lastPost) ) // last post was old 3 days ago + daysAgo = (int)((dayBegin - lastPost + 24*60*60*1000-1)/(24*60*60*1000)); + daysAgo++; + return getControlTarget() + "?" + ThreadedHTMLRenderer.PARAM_AUTHOR + '=' + blog.toBase64() + + '&' + ThreadedHTMLRenderer.PARAM_THREAD_AUTHOR + "=true&daysBack=" + daysAgo; + } + + private String getPostDate(long when) { + String age = null; + long dayBegin = BlogManager.instance().getDayBegin(); + long postId = when; + if (postId >= dayBegin) { + age = "today"; + } else if (postId >= dayBegin - 24*60*60*1000) { + age = "yesterday"; + } else { + int daysAgo = (int)((dayBegin - postId + 24*60*60*1000-1)/(24*60*60*1000)); + age = daysAgo + " days ago"; + } + return age; + } + protected void renderServletDetails(User user, HttpServletRequest req, PrintWriter out, ThreadIndex index, int threadOffset, BlogURI visibleEntry, Archive archive) throws IOException { TreeSet orderedRoots = new TreeSet(new NewestEntryFirstComparator()); @@ -35,8 +61,45 @@ public class ViewBlogsServlet extends BaseServlet { TreeSet tags = new TreeSet(); List writtenAuthors = new ArrayList(); - out.write("Blogs:\n"); - out.write(""); + + + out.write(""); + if ( (user != null) && (user.getAuthenticated()) ) { + out.write("Favorite blogs:
\n"); + out.write("Your blog
\n"); + + PetNameDB db = user.getPetNameDB(); + for (Iterator iter = orderedRoots.iterator(); iter.hasNext() && writtenAuthors.size() < MAX_AUTHORS_AT_ONCE; ) { + BlogURI uri= (BlogURI)iter.next(); + if (writtenAuthors.contains(uri.getKeyHash())) { + // skip + } else { + PetName pn = db.getByLocation(uri.getKeyHash().toBase64()); + if (pn != null) { + if (pn.isMember(FilteredThreadIndex.GROUP_FAVORITE)) { + out.write(""); + out.write(HTMLRenderer.sanitizeString(pn.getName(), 32)); + out.write(" (" + getPostDate(uri.getEntryId()) + ")
\n"); + writtenAuthors.add(uri.getKeyHash()); + } else if (pn.isMember(FilteredThreadIndex.GROUP_IGNORE)) { + // ignore 'em + writtenAuthors.add(uri.getKeyHash()); + } else { + // bookmarked, but not a favorite... leave them for later + } + } else { + // not bookmarked, leave them for later + } + } + } + } + out.write("
\n"); + + // now for the non-bookmarked people + out.write(""); + out.write("Most recently updated blogs:
\n"); for (Iterator iter = orderedRoots.iterator(); iter.hasNext() && writtenAuthors.size() < MAX_AUTHORS_AT_ONCE; ) { BlogURI uri= (BlogURI)iter.next(); String curTags[] = archive.getEntry(uri).getTags(); @@ -67,18 +130,17 @@ public class ViewBlogsServlet extends BaseServlet { age = daysAgo + " days ago"; } - out.write(""); out.write(HTMLRenderer.sanitizeString(desc, 32)); - out.write(" \n"); + out.write(" (" + getPostDate(uri.getEntryId()) + ")
\n"); writtenAuthors.add(uri.getKeyHash()); } } - out.write("\n"); + out.write("
\n"); + /* out.write("Topics:\n"); out.write(""); for (Iterator iter = tags.iterator(); iter.hasNext(); ) { @@ -88,6 +150,7 @@ public class ViewBlogsServlet extends BaseServlet { out.write(HTMLRenderer.sanitizeString(tag, 32)); out.write(" "); } + */ out.write("\n"); } diff --git a/apps/syndie/java/src/net/i2p/syndie/web/ViewThreadedServlet.java b/apps/syndie/java/src/net/i2p/syndie/web/ViewThreadedServlet.java index 23b4ea0d2..375733a6a 100644 --- a/apps/syndie/java/src/net/i2p/syndie/web/ViewThreadedServlet.java +++ b/apps/syndie/java/src/net/i2p/syndie/web/ViewThreadedServlet.java @@ -54,19 +54,20 @@ public class ViewThreadedServlet extends BaseServlet { String tags = req.getParameter(ThreadedHTMLRenderer.PARAM_TAGS); String post = req.getParameter(ThreadedHTMLRenderer.PARAM_VIEW_POST); String thread = req.getParameter(ThreadedHTMLRenderer.PARAM_VIEW_THREAD); + boolean threadAuthorOnly = Boolean.valueOf(req.getParameter(ThreadedHTMLRenderer.PARAM_THREAD_AUTHOR) + "").booleanValue(); + long dayBegin = BlogManager.instance().getDayBegin(); + String daysStr = req.getParameter(ThreadedHTMLRenderer.PARAM_DAYS_BACK); + int days = 1; + try { + if (daysStr != null) + days = Integer.parseInt(daysStr); + } catch (NumberFormatException nfe) { + days = 1; + } + dayBegin -= (days-1) * 24*60*60*1000l; + if ( (author != null) && empty(post) && empty(thread) ) { - long dayBegin = BlogManager.instance().getDayBegin(); - String daysStr = req.getParameter(ThreadedHTMLRenderer.PARAM_DAYS_BACK); - int days = 1; - try { - if (daysStr != null) - days = Integer.parseInt(daysStr); - } catch (NumberFormatException nfe) { - days = 1; - } - dayBegin -= (days-1) * 24*60*60*1000; - ArchiveIndex aindex = archive.getIndex(); PetNameDB db = user.getPetNameDB(); if ("favorites".equals(author)) { @@ -91,6 +92,22 @@ public class ViewThreadedServlet extends BaseServlet { if ( (key != null) && (key.length == Hash.HASH_LENGTH) ) { loc.setData(key); aindex.selectMatchesOrderByEntryId(rv, loc, tags, dayBegin); + } else { + } + } + + // how inefficient can we get? + if (threadAuthorOnly && (rv.size() > 0)) { + // lets filter out any posts that are not roots + for (int i = 0; i < rv.size(); i++) { + BlogURI curURI = (BlogURI)rv.get(i); + ThreadNode node = index.getNode(curURI); + if ( (node != null) && (node.getParent() == null) ) { + // ok, its a root + } else { + rv.remove(i); + i--; + } } } } @@ -135,6 +152,7 @@ public class ViewThreadedServlet extends BaseServlet { } } } + return rv; } diff --git a/history.txt b/history.txt index 2f4f98d16..340377796 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,12 @@ -$Id: history.txt,v 1.362 2005/12/17 04:22:07 jrandom Exp $ +$Id: history.txt,v 1.363 2005/12/18 00:39:54 jrandom Exp $ + +2005-12-19 jrandom + * I2PSnark logging, disconnect old inactive peers rather than new ones, + memory usage reduction, better OOM handling, and a shared connection + acceptor. + * Cleaned up the Syndie blog page and the resulting filters (viewing a + blog from the blog page shows threads started by the selected author, + not those that they merely participate in) 2005-12-18 jrandom * Added a standalone runner for the I2PSnark web ui (build with the diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index ff54a0c3e..be418c92e 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.311 $ $Date: 2005/12/14 04:32:51 $"; + public final static String ID = "$Revision: 1.312 $ $Date: 2005/12/16 22:47:03 $"; public final static String VERSION = "0.6.1.7"; - public final static long BUILD = 5; + public final static long BUILD = 6; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("Router ID: " + RouterVersion.ID);