Allow multiple simultaneous ACCEPT sockets.

Add support for parallel accepts in sink client
This commit is contained in:
zzz
2015-11-27 19:39:32 +00:00
parent bafec18093
commit cb979fb685
2 changed files with 59 additions and 43 deletions

View File

@ -14,8 +14,14 @@ import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.NoRouteToHostException;
import java.net.SocketTimeoutException;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.security.GeneralSecurityException;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLSocket;
@ -30,11 +36,6 @@ import net.i2p.data.Destination;
import net.i2p.util.I2PAppThread;
import net.i2p.util.I2PSSLSocketFactory;
import net.i2p.util.Log;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
/**
* SAMv3 STREAM session class.
@ -48,7 +49,11 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi
private static final int BUFFER_SIZE = 1024 ;
private final Object socketServerLock = new Object();
/** this is ONLY set for FORWARD, not for ACCEPT */
private I2PServerSocket socketServer;
/** this is the count of active ACCEPT sockets */
private final AtomicInteger _acceptors = new AtomicInteger();
private static I2PSSLSocketFactory _sslSocketFactory;
private final String nick ;
@ -154,6 +159,8 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi
/**
* Accept a single incoming STREAM on the socket stolen from the handler.
* As of version 3.2 (0.9.24), multiple simultaneous accepts are allowed.
* Accepts and forwarding may not be done at the same time.
*
* @param handler The handler that communicates with the requesting client
* @param verbose If true, SAM will send the Base64-encoded peer Destination of an
@ -170,23 +177,22 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi
public void accept(SAMv3Handler handler, boolean verbose)
throws I2PException, InterruptedIOException, IOException, SAMException {
synchronized( this.socketServerLock )
{
if (this.socketServer!=null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("a socket server is already defined for this destination");
throw new SAMException("a socket server is already defined for this destination");
}
this.socketServer = this.socketMgr.getServerSocket();
}
I2PSocket i2ps = this.socketServer.accept();
synchronized(this.socketServerLock) {
if (this.socketServer != null) {
if (_log.shouldWarn())
_log.warn("a forwarding server is already defined for this destination");
throw new SAMException("a forwarding server is already defined for this destination");
}
}
I2PSocket i2ps;
_acceptors.incrementAndGet();
try {
i2ps = socketMgr.getServerSocket().accept();
} finally {
_acceptors.decrementAndGet();
}
synchronized( this.socketServerLock )
{
this.socketServer = null ;
}
SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
if ( rec==null || i2ps==null ) throw new InterruptedIOException() ;
@ -212,7 +218,8 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi
/**
* Forward sockets from I2P to the host/port provided
* Forward sockets from I2P to the host/port provided.
* Accepts and forwarding may not be done at the same time.
*/
public void startForwardingIncoming(Properties props, boolean sendPorts) throws SAMException, InterruptedIOException
{
@ -236,14 +243,17 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi
_log.debug("no host specified. Taken from the client socket : " + host +':'+port);
}
boolean isSSL = Boolean.parseBoolean(props.getProperty("SSL"));
synchronized( this.socketServerLock )
{
if (this.socketServer!=null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("a socket server is already defined for this destination");
throw new SAMException("a socket server is already defined for this destination");
}
if (_acceptors.get() > 0) {
if (_log.shouldWarn())
_log.warn("an accepting server is already defined for this destination");
throw new SAMException("an accepting server is already defined for this destination");
}
synchronized(this.socketServerLock) {
if (this.socketServer!=null) {
if (_log.shouldWarn())
_log.warn("a forwarding server is already defined for this destination");
throw new SAMException("a forwarding server is already defined for this destination");
}
this.socketServer = this.socketMgr.getServerSocket();
}
@ -427,7 +437,7 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi
}
}
public I2PServerSocket getSocketServer()
private I2PServerSocket getSocketServer()
{
synchronized ( this.socketServerLock ) {
return this.socketServer ;

View File

@ -168,18 +168,22 @@ public class SAMStreamSink {
t.start();
}
if (_isV3 && mode == STREAM) {
Socket sock2 = connect(isSSL);
out = sock2.getOutputStream();
eventHandler = new SinkEventHandler2(_context, sock2.getInputStream(), out);
_reader2 = new SAMReader(_context, sock2.getInputStream(), eventHandler);
_reader2.startReading();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Reader2 created");
String ok = handshake(out, version, false, eventHandler, mode, user, password, "");
if (ok == null)
throw new IOException("2nd handshake failed");
if (_log.shouldLog(Log.DEBUG))
_log.debug("Handshake2 complete.");
// test multiple acceptors, only works in 3.2
int acceptors = isV32 ? 4 : 1;
for (int i = 0; i < acceptors; i++) {
Socket sock2 = connect(isSSL);
out = sock2.getOutputStream();
eventHandler = new SinkEventHandler2(_context, sock2.getInputStream(), out);
_reader2 = new SAMReader(_context, sock2.getInputStream(), eventHandler);
_reader2.startReading();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Reader " + (2 + i) + " created");
String ok = handshake(out, version, false, eventHandler, mode, user, password, "");
if (ok == null)
throw new IOException("handshake " + (2 + i) + " failed");
if (_log.shouldLog(Log.DEBUG))
_log.debug("Handshake " + (2 + i) + " complete.");
}
} else if (_isV3 && (mode == DG || mode == RAW || mode == RAWHDR)) {
// set up a listening DatagramSocket
(new DGRcvr(mode)).start();
@ -622,6 +626,8 @@ public class SAMStreamSink {
sinkDir.mkdirs();
File out = File.createTempFile("sink", ".dat", sinkDir);
if (_log.shouldWarn())
_log.warn("outputting to " + out);
_out = new FileOutputStream(out);
_started = _context.clock().now();
}