forked from I2P_Developers/i2p.i2p
* Streaming:
- Channel cleanups and comments - New I2PSocketAddress
This commit is contained in:
@@ -13,16 +13,21 @@ import java.nio.channels.spi.AbstractSelectionKey;
|
||||
import java.nio.channels.spi.SelectorProvider;
|
||||
|
||||
/**
|
||||
* As this does not (yet) extend ServerSocketChannel it cannot be returned by StandardServerSocket.getChannel(),
|
||||
* until we implement an I2P SocketAddress class.
|
||||
*
|
||||
* Warning, this interface and implementation is preliminary and subject to change without notice.
|
||||
*
|
||||
* @since 0.8.11
|
||||
*/
|
||||
public class AcceptingChannelImpl extends AcceptingChannel {
|
||||
boolean _isRegistered = false;
|
||||
SelectionKey whichKey = null;
|
||||
SelectorProvider provider = null;
|
||||
Selector sel = null;
|
||||
Object lock = null;
|
||||
I2PSocket next = null;
|
||||
I2PServerSocket socket;
|
||||
class AcceptingChannelImpl extends AcceptingChannel {
|
||||
private boolean _isRegistered;
|
||||
private SelectionKey whichKey;
|
||||
private SelectorProvider provider;
|
||||
private Selector sel;
|
||||
private Object lock;
|
||||
private volatile I2PSocket next;
|
||||
private final I2PServerSocket socket;
|
||||
|
||||
I2PSocket accept() throws I2PException, ConnectException {
|
||||
I2PSocket sock;
|
||||
@@ -31,9 +36,11 @@ public class AcceptingChannelImpl extends AcceptingChannel {
|
||||
} catch(SocketTimeoutException ex) {
|
||||
return null;
|
||||
}
|
||||
I2PSocket temp = next;
|
||||
next = sock;
|
||||
return temp;
|
||||
synchronized (this) {
|
||||
I2PSocket temp = next;
|
||||
next = sock;
|
||||
return temp;
|
||||
}
|
||||
}
|
||||
|
||||
AcceptingChannelImpl(I2PSocketManager manager) {
|
||||
@@ -96,7 +103,7 @@ public class AcceptingChannelImpl extends AcceptingChannel {
|
||||
|
||||
@Override
|
||||
public int readyOps() {
|
||||
if((operations & OP_ACCEPT) != 0)
|
||||
if((operations & OP_ACCEPT) != 0) {
|
||||
if(next != null) {
|
||||
return OP_ACCEPT;
|
||||
} else {
|
||||
@@ -107,6 +114,7 @@ public class AcceptingChannelImpl extends AcceptingChannel {
|
||||
if(next != null)
|
||||
return OP_ACCEPT;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
@@ -136,8 +144,9 @@ public class AcceptingChannelImpl extends AcceptingChannel {
|
||||
|
||||
@Override
|
||||
protected void implCloseChannel() throws IOException {
|
||||
if(next != null) {
|
||||
next.close();
|
||||
I2PSocket nxt = next;
|
||||
if(nxt != null) {
|
||||
nxt.close();
|
||||
}
|
||||
_socketManager.destroySocketManager();
|
||||
}
|
||||
|
@@ -9,6 +9,7 @@ import net.i2p.I2PException;
|
||||
*/
|
||||
class I2PServerSocketFull implements I2PServerSocket {
|
||||
private final I2PSocketManagerFull _socketManager;
|
||||
private volatile AcceptingChannel _channel;
|
||||
|
||||
public I2PServerSocketFull(I2PSocketManagerFull mgr) {
|
||||
_socketManager = mgr;
|
||||
@@ -28,8 +29,10 @@ class I2PServerSocketFull implements I2PServerSocket {
|
||||
/**
|
||||
* @since 0.8.11
|
||||
*/
|
||||
public AcceptingChannel getChannel() {
|
||||
return new AcceptingChannelImpl(_socketManager);
|
||||
public synchronized AcceptingChannel getChannel() {
|
||||
if (_channel == null)
|
||||
_channel = new AcceptingChannelImpl(_socketManager);
|
||||
return _channel;
|
||||
}
|
||||
|
||||
public long getSoTimeout() {
|
||||
|
@@ -16,6 +16,7 @@ class I2PSocketFull implements I2PSocket {
|
||||
private Connection _connection;
|
||||
private Destination _remotePeer;
|
||||
private Destination _localPeer;
|
||||
private volatile MessageChannel _channel;
|
||||
|
||||
public I2PSocketFull(Connection con) {
|
||||
_connection = con;
|
||||
@@ -70,8 +71,10 @@ class I2PSocketFull implements I2PSocket {
|
||||
/**
|
||||
* @since 0.8.9
|
||||
*/
|
||||
public SelectableChannel getChannel() {
|
||||
return new MessageChannel(this);
|
||||
public synchronized SelectableChannel getChannel() {
|
||||
if (_channel == null)
|
||||
_channel = new MessageChannel(this);
|
||||
return _channel;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@@ -15,18 +15,23 @@ import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
/**
|
||||
* As this does not (yet) extend SocketChannel it cannot be returned by StandardSocket.getChannel(),
|
||||
* until we implement an I2P SocketAddress class.
|
||||
*
|
||||
* Warning, this interface and implementation is preliminary and subject to change without notice.
|
||||
*
|
||||
* @since 0.8.9
|
||||
*/
|
||||
public class MessageChannel extends SelectableChannel implements ReadableByteChannel, WritableByteChannel {
|
||||
|
||||
final MessageInputStream in;
|
||||
final MessageOutputStream out;
|
||||
boolean _isRegistered = false;
|
||||
SelectionKey whichKey = null;
|
||||
SelectorProvider provider = null;
|
||||
Selector sel = null;
|
||||
Object lock = null;
|
||||
I2PSocket socket;
|
||||
private final MessageInputStream in;
|
||||
private final MessageOutputStream out;
|
||||
private boolean _isRegistered;
|
||||
private SelectionKey whichKey;
|
||||
private SelectorProvider provider;
|
||||
private Selector sel;
|
||||
private Object lock;
|
||||
private final I2PSocket socket;
|
||||
|
||||
MessageChannel(I2PSocket socket) {
|
||||
try {
|
||||
@@ -145,10 +150,10 @@ public class MessageChannel extends SelectableChannel implements ReadableByteCha
|
||||
* returns 0, which happens when there's
|
||||
* no more data available.
|
||||
*/
|
||||
|
||||
public int read(ByteBuffer buf) throws IOException {
|
||||
int amount = 0;
|
||||
for (;;) {
|
||||
// TODO if buf.hasArray() ... getArray() ... getArrayOffset() ...
|
||||
byte[] lbuf = new byte[buf.remaining()];
|
||||
int samount = in.read(lbuf);
|
||||
if (samount <= 0) {
|
||||
@@ -167,12 +172,12 @@ public class MessageChannel extends SelectableChannel implements ReadableByteCha
|
||||
* already set buffer size. Once it starts to fail
|
||||
* (wait timeout is 0) then put the bytes back and return.
|
||||
*/
|
||||
|
||||
public int write(ByteBuffer buf) throws IOException {
|
||||
int written = 0;
|
||||
for (;;) {
|
||||
if(buf.remaining()==0)
|
||||
return written;
|
||||
// TODO if buf.hasArray() ... getArray() ... getArrayOffset() ...
|
||||
byte[] lbuf = new byte[Math.min(buf.remaining(), 0x1000)];
|
||||
buf.get(lbuf);
|
||||
try {
|
||||
|
@@ -72,10 +72,11 @@ class StandardServerSocket extends ServerSocket {
|
||||
}
|
||||
|
||||
/**
|
||||
* @return null always
|
||||
* @return null always, see AcceptingChannelImpl for more info
|
||||
*/
|
||||
@Override
|
||||
public ServerSocketChannel getChannel() {
|
||||
//return _socket.getChannel();
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@@ -65,10 +65,11 @@ class StandardSocket extends Socket {
|
||||
}
|
||||
|
||||
/**
|
||||
* @return null always
|
||||
* @return null always, see MessageChannel for more info
|
||||
*/
|
||||
@Override
|
||||
public SocketChannel getChannel() {
|
||||
//return _socket.getChannel();
|
||||
return null;
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user