Transport:

- Maintain a router hash -> IP map in transport,
    to support additional IP checks
    (unused for now)
  - Catch error on pre-2.6 kernels
  - Some concurrent conversion
  - Fix an HTML error on peers.jsp
This commit is contained in:
zzz
2009-04-02 20:33:54 +00:00
parent 49c7fc30c0
commit 3a12182838
8 changed files with 62 additions and 20 deletions

View File

@@ -133,6 +133,10 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
return _manager.wasUnreachable(dest);
}
public byte[] getIP(Hash dest) {
return _manager.getIP(dest);
}
public List getMostRecentErrorMessages() {
return _manager.getMostRecentErrorMessages();
}

View File

@@ -20,6 +20,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import net.i2p.data.Hash;
import net.i2p.data.RouterAddress;
@@ -34,6 +35,7 @@ import net.i2p.router.OutNetMessage;
import net.i2p.router.Router;
import net.i2p.router.RouterContext;
import net.i2p.router.networkdb.kademlia.FloodfillNetworkDatabaseFacade;
import net.i2p.util.ConcurrentHashSet;
import net.i2p.util.Log;
/**
@@ -47,8 +49,10 @@ public abstract class TransportImpl implements Transport {
private List _sendPool;
protected RouterContext _context;
/** map from routerIdentHash to timestamp (Long) that the peer was last unreachable */
private Map _unreachableEntries;
private Set _wasUnreachableEntries;
private Map<Hash, Long> _unreachableEntries;
private Set<Hash> _wasUnreachableEntries;
/** global router ident -> IP */
private static Map<Hash, byte[]> _IPMap = new ConcurrentHashMap(128);
/**
* Initialize the new transport
@@ -67,7 +71,7 @@ public abstract class TransportImpl implements Transport {
_context.statManager().createRateStat("transport.expiredOnQueueLifetime", "How long a message that expires on our outbound queue is processed", "Transport", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l } );
_sendPool = new ArrayList(16);
_unreachableEntries = new HashMap(16);
_wasUnreachableEntries = new HashSet(16);
_wasUnreachableEntries = new ConcurrentHashSet(16);
_currentAddress = null;
}
@@ -483,10 +487,8 @@ public abstract class TransportImpl implements Transport {
* This is NOT reset if the peer contacts us and it is never expired.
*/
public boolean wasUnreachable(Hash peer) {
synchronized (_wasUnreachableEntries) {
if (_wasUnreachableEntries.contains(peer))
return true;
}
if (_wasUnreachableEntries.contains(peer))
return true;
RouterInfo ri = _context.netDb().lookupRouterInfoLocally(peer);
if (ri == null)
return false;
@@ -496,16 +498,22 @@ public abstract class TransportImpl implements Transport {
* Maintain the WasUnreachable list
*/
public void markWasUnreachable(Hash peer, boolean yes) {
synchronized (_wasUnreachableEntries) {
if (yes)
_wasUnreachableEntries.add(peer);
else
_wasUnreachableEntries.remove(peer);
}
if (yes)
_wasUnreachableEntries.add(peer);
else
_wasUnreachableEntries.remove(peer);
if (_log.shouldLog(Log.WARN))
_log.warn(this.getStyle() + " setting wasUnreachable to " + yes + " for " + peer);
}
public static void setIP(Hash peer, byte[] ip) {
_IPMap.put(peer, ip);
}
public static byte[] getIP(Hash peer) {
return _IPMap.get(peer);
}
public static boolean isPubliclyRoutable(byte addr[]) {
if (addr.length == 4) {
if ((addr[0]&0xFF) == 127) return false;

View File

@@ -33,7 +33,7 @@ import net.i2p.util.Log;
public class TransportManager implements TransportEventListener {
private Log _log;
private List _transports;
private List<Transport> _transports;
private RouterContext _context;
private final static String PROP_ENABLE_UDP = "i2np.udp.enable";
@@ -229,6 +229,19 @@ public class TransportManager implements TransportEventListener {
return true;
}
/**
* IP of the peer from the last connection (in or out, any transport).
* This may be different from that advertised in the netDb,
* as the peer may be hidden, or connect from a different IP, or
* change his netDb later, in an attempt to avoid restrictions.
*
* For blocking purposes, etc. it's worth checking both
* the netDb addresses and this address.
*/
public byte[] getIP(Hash dest) {
return TransportImpl.getIP(dest);
}
Map getAddresses() {
Map rv = new HashMap(_transports.size());
for (int i = 0; i < _transports.size(); i++) {

View File

@@ -472,6 +472,8 @@ public class EstablishState {
byte nextReadIV[] = new byte[16];
System.arraycopy(_e_bobSig, _e_bobSig.length-16, nextReadIV, 0, nextReadIV.length);
_con.finishOutboundEstablishment(_dh.getSessionKey(), (_tsA-_tsB), nextWriteIV, nextReadIV); // skew in seconds
_transport.setIP(_con.getRemotePeer().calculateHash(),
_con.getChannel().socket().getInetAddress().getAddress());
return;
}
}
@@ -546,15 +548,17 @@ public class EstablishState {
Signature sig = new Signature(s);
_verified = _context.dsa().verifySignature(sig, toVerify, alice.getSigningPublicKey());
if (_verified) {
byte[] ip = _con.getChannel().socket().getInetAddress().getAddress();
if (_context.shitlist().isShitlistedForever(alice.calculateHash())) {
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping inbound connection from permanently shitlisted peer: " + alice.calculateHash().toBase64());
// So next time we will not accept the con from this IP,
// rather than doing the whole handshake
_context.blocklist().add(_con.getChannel().socket().getInetAddress().getAddress());
_context.blocklist().add(ip);
fail("Peer is shitlisted forever: " + alice.calculateHash().toBase64());
return;
}
_transport.setIP(alice.calculateHash(), ip);
if (_log.shouldLog(Log.DEBUG))
_log.debug(prefix() + "verification successful for " + _con);

View File

@@ -66,22 +66,25 @@ public class EventPumper implements Runnable {
public void startPumping() {
if (_log.shouldLog(Log.INFO))
_log.info("Starting pumper");
_alive = true;
_wantsRead = new ArrayList(16);
_wantsWrite = new ArrayList(4);
_wantsRegister = new ArrayList(1);
_wantsConRegister = new ArrayList(4);
try {
_selector = Selector.open();
_alive = true;
new I2PThread(this, "NTCP Pumper", true).start();
} catch (IOException ioe) {
_log.error("Error opening the selector", ioe);
_log.log(Log.CRIT, "Error opening the NTCP selector", ioe);
} catch (java.lang.InternalError jlie) {
// "unable to get address of epoll functions, pre-2.6 kernel?"
_log.log(Log.CRIT, "Error opening the NTCP selector", jlie);
}
new I2PThread(this, "NTCP Pumper", true).start();
}
public void stopPumping() {
_alive = false;
if (_selector.isOpen())
if (_selector != null && _selector.isOpen())
_selector.wakeup();
}

View File

@@ -241,6 +241,8 @@ public class NTCPTransport extends TransportImpl {
super.afterSend(msg, sendSuccessful, allowRequeue, msToSend);
}
public TransportBid bid(RouterInfo toAddress, long dataSize) {
if (!isAlive())
return null;
Hash peer = toAddress.getIdentity().calculateHash();
if (_context.shitlist().isShitlisted(peer, STYLE)) {
// we aren't shitlisted in general (since we are trying to get a bid), but we have
@@ -591,7 +593,10 @@ public class NTCPTransport extends TransportImpl {
for (Iterator iter = peers.iterator(); iter.hasNext(); ) {
NTCPConnection con = (NTCPConnection)iter.next();
String name = con.getRemotePeer().calculateHash().toBase64().substring(0,6);
buf.append("<tr><td><code><a href=\"netdb.jsp?r=").append(name).append("\">").append(name);
buf.append("<tr><td><code><a href=\"netdb.jsp?r=").append(name).append("\">").append(name).append("</a>");
//byte[] ip = getIP(con.getRemotePeer().calculateHash());
//if (ip != null)
// buf.append(' ').append(_context.blocklist().toStr(ip));
buf.append("</code></td><td align=\"center\"><code>");
if (con.isInbound())
buf.append("in");

View File

@@ -450,6 +450,7 @@ public class EstablishmentManager {
_transport.addRemotePeerState(peer);
_transport.inboundConnectionReceived();
_transport.setIP(remote.calculateHash(), state.getSentIP());
_context.statManager().addRateData("udp.inboundEstablishTime", state.getLifetime(), 0);
sendInboundComplete(peer);
@@ -531,6 +532,7 @@ public class EstablishmentManager {
_transport.addRemotePeerState(peer);
_transport.setIP(remote.calculateHash(), state.getSentIP());
_context.statManager().addRateData("udp.outboundEstablishTime", state.getLifetime(), 0);
sendOurInfo(peer, false);

View File

@@ -1762,6 +1762,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
buf.append(" [shitlisted]");
appended = true;
}
//byte[] ip = getIP(peer.getRemotePeer());
//if (ip != null)
// buf.append(' ').append(_context.blocklist().toStr(ip));
buf.append("</code></td>");
long idleIn = (now-peer.getLastReceiveTime())/1000;