This commit is contained in:
mkvore-commit
2009-04-04 10:28:31 +00:00
parent bc086a78eb
commit de6edc6a99
6 changed files with 55 additions and 47 deletions

View File

@@ -44,7 +44,7 @@ public interface I2PServerSocket {
* @throws ConnectException if the I2PServerSocket is closed * @throws ConnectException if the I2PServerSocket is closed
* @throws SocketTimeoutException * @throws SocketTimeoutException
*/ */
public I2PSocket accept(boolean blocking) throws I2PException, ConnectException, SocketTimeoutException; public I2PSocket accept(long timeout) throws I2PException, ConnectException, SocketTimeoutException, InterruptedException;
/** /**
* Waits until there is a socket waiting for acception or the timeout is * Waits until there is a socket waiting for acception or the timeout is
@@ -58,7 +58,7 @@ public interface I2PServerSocket {
* from the data available (aka the I2PSession closed, etc) * from the data available (aka the I2PSession closed, etc)
* @throws ConnectException if the I2PServerSocket is closed * @throws ConnectException if the I2PServerSocket is closed
*/ */
public boolean waitIncoming(long timeoutMs) throws I2PException, ConnectException, InterruptedException; public void waitIncoming(long timeoutMs) throws I2PException, ConnectException, InterruptedException;
/** /**
* Set Sock Option accept timeout * Set Sock Option accept timeout

View File

@@ -60,28 +60,24 @@ class I2PServerSocketImpl implements I2PServerSocket {
* *
* @param timeoutMs timeout in ms. A negative value waits forever. * @param timeoutMs timeout in ms. A negative value waits forever.
* *
* @return true if a socket is available, false if not
*
* @throws I2PException if there is a problem with reading a new socket * @throws I2PException if there is a problem with reading a new socket
* from the data available (aka the I2PSession closed, etc) * from the data available (aka the I2PSession closed, etc)
* @throws ConnectException if the I2PServerSocket is closed * @throws ConnectException if the I2PServerSocket is closed
*/ */
public boolean waitIncoming(long timeoutMs) throws I2PException, ConnectException { public void waitIncoming(long timeoutMs) throws I2PException, ConnectException, InterruptedException {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("waitIncoming() called, pending: " + pendingSockets.size()); _log.debug("waitIncoming() called, pending: " + pendingSockets.size());
boolean isTimed = (timeoutMs>=0); boolean isTimed = (timeoutMs>0);
if (isTimed) { if (isTimed) {
Clock clock = I2PAppContext.getGlobalContext().clock(); Clock clock = I2PAppContext.getGlobalContext().clock();
long now = clock.now(); long now = clock.now();
long end = now + timeoutMs; long end = now + timeoutMs;
while (pendingSockets.size() <= 0 && now<end) { while (pendingSockets.size() <= 0 && now<end) {
if (closing) throw new ConnectException("I2PServerSocket closed"); if (closing) throw new ConnectException("I2PServerSocket closed");
try {
synchronized(socketAddedLock) { synchronized(socketAddedLock) {
socketAddedLock.wait(end - now); socketAddedLock.wait(end - now);
} }
} catch (InterruptedException ie) {}
now = clock.now(); now = clock.now();
} }
} else { } else {
@@ -94,7 +90,6 @@ class I2PServerSocketImpl implements I2PServerSocket {
} catch (InterruptedException ie) {} } catch (InterruptedException ie) {}
} }
} }
return (pendingSockets.size()>0);
} }
@@ -112,16 +107,20 @@ class I2PServerSocketImpl implements I2PServerSocket {
* @throws ConnectException if the I2PServerSocket is closed * @throws ConnectException if the I2PServerSocket is closed
*/ */
public I2PSocket accept(boolean blocking) throws I2PException, ConnectException { public I2PSocket accept(long timeout) throws I2PException, ConnectException, InterruptedException {
I2PSocket ret = null; I2PSocket ret = null;
if (blocking) { if (timeout<=0) {
ret = accept(); ret = accept();
} else { } else {
long now = I2PAppContext.getGlobalContext().clock().now();
long expiration = timeout + now ;
synchronized (pendingSockets) { synchronized (pendingSockets) {
if (pendingSockets.size() > 0) { while (pendingSockets.size() == 0 && expiration>now) {
ret = (I2PSocket)pendingSockets.remove(0); pendingSockets.wait(expiration-now);
now = I2PAppContext.getGlobalContext().clock().now();
} }
ret = (I2PSocket)pendingSockets.remove(0);
} }
if (ret != null) { if (ret != null) {
synchronized (socketAcceptedLock) { synchronized (socketAcceptedLock) {
@@ -151,10 +150,12 @@ class I2PServerSocketImpl implements I2PServerSocket {
I2PSocket ret = null; I2PSocket ret = null;
while ( (ret == null) && (!closing) ){ while ( (ret == null) && (!closing) ){
try {
this.waitIncoming(-1); this.waitIncoming(0);
ret = accept(1);
ret = accept(false); } catch (InterruptedException e) {
throw new I2PException("Thread interrupted") ;
}
} }
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))

View File

@@ -16,7 +16,7 @@ else :
if len(sys.argv)==3 : if len(sys.argv)==3 :
name = sys.argv[2] name = sys.argv[2]
else : else :
name = "essaiSamForward" name = "datagramSamForward"
sess = socket.socket( sess = socket.socket(
socket.AF_INET, socket.SOCK_STREAM) socket.AF_INET, socket.SOCK_STREAM)

View File

@@ -269,10 +269,9 @@ public class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handle
{ {
while (session.socketServer!=null) { while (session.socketServer!=null) {
boolean available = false ;
I2PSocket i2ps = null ; I2PSocket i2ps = null ;
try { try {
available = session.socketServer.waitIncoming(-1); session.socketServer.waitIncoming(0);
} catch (ConnectException e) { } catch (ConnectException e) {
_log.debug("ConnectException"); _log.debug("ConnectException");
break ; break ;
@@ -283,7 +282,6 @@ public class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handle
_log.debug("InterruptedException"); _log.debug("InterruptedException");
break ; break ;
} }
if ( !available ) continue ;
java.net.InetSocketAddress addr = new java.net.InetSocketAddress(host,port); java.net.InetSocketAddress addr = new java.net.InetSocketAddress(host,port);
@@ -296,7 +294,7 @@ public class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handle
} }
try { try {
i2ps = session.socketServer.accept(false); i2ps = session.socketServer.accept(1);
} catch (Exception e) {} } catch (Exception e) {}
if (i2ps==null) { if (i2ps==null) {

View File

@@ -23,6 +23,7 @@ class ConnectionHandler {
private Log _log; private Log _log;
private ConnectionManager _manager; private ConnectionManager _manager;
private LinkedBlockingQueue<Packet> _synQueue; private LinkedBlockingQueue<Packet> _synQueue;
private Object _synSignal;
private boolean _active; private boolean _active;
private int _acceptTimeout; private int _acceptTimeout;
@@ -81,6 +82,12 @@ class ConnectionHandler {
boolean success = _synQueue.offer(packet); // fail immediately if full boolean success = _synQueue.offer(packet); // fail immediately if full
if (success) { if (success) {
SimpleScheduler.getInstance().addEvent(new TimeoutSyn(packet), _acceptTimeout); SimpleScheduler.getInstance().addEvent(new TimeoutSyn(packet), _acceptTimeout);
if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE))
synchronized (this._synSignal)
{
this._synSignal.notifyAll();
}
} else { } else {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Dropping new SYN request, as the queue is full"); _log.warn("Dropping new SYN request, as the queue is full");
@@ -89,8 +96,17 @@ class ConnectionHandler {
} }
} }
public boolean waitSyn( long ms ) throws InterruptedException { /**
throw new InterruptedException(); * Wait until some SYN packet is available
* @param ms max amount of time to wait for a connection (if negative or null,
* wait indefinitely)
* @throws InterruptedException
*/
public void waitSyn( long ms ) throws InterruptedException {
synchronized (this._synSignal)
{
this._synSignal.wait(ms);
}
} }
/** /**
@@ -120,6 +136,9 @@ class ConnectionHandler {
return null; return null;
} }
if ( (timeoutMs > 0) && (expiration < _context.clock().now()) )
return null;
Packet syn = null; Packet syn = null;
while ( _active && syn == null) { while ( _active && syn == null) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
@@ -162,8 +181,6 @@ class ConnectionHandler {
} }
} }
// keep looping... // keep looping...
if ( (timeoutMs >= 0) && (expiration < _context.clock().now()) )
return null;
} }
} }

View File

@@ -65,28 +65,20 @@ public class I2PServerSocketFull implements I2PServerSocket {
* @throws SocketTimeoutException if the timeout has been reached * @throws SocketTimeoutException if the timeout has been reached
*/ */
public I2PSocket accept(boolean blocking) throws I2PException, SocketTimeoutException { public I2PSocket accept(long timeout) throws I2PException {
long timeout = this.getSoTimeout(); long reset_timeout = this.getSoTimeout();
try { try {
if (blocking) this.setSoTimeout(timeout);
{
this.setSoTimeout(-1);
} else {
this.setSoTimeout(0);
}
try {
return this.accept(); return this.accept();
} catch (SocketTimeoutException e) { } catch (SocketTimeoutException e) {
if (blocking) throw e; return null ;
else return null ;
}
} finally { } finally {
this.setSoTimeout(timeout); this.setSoTimeout(reset_timeout);
} }
} }
public boolean waitIncoming(long timeoutMs) throws InterruptedException { public void waitIncoming(long timeoutMs) throws InterruptedException {
return this._socketManager.getConnectionManager().getConnectionHandler().waitSyn(timeoutMs); this._socketManager.getConnectionManager().getConnectionHandler().waitSyn(timeoutMs);
} }
} }