Fix protocol for V3 datagram and raw sessions

Add V3 datagram and raw sessions to send client
minor cleanups
This commit is contained in:
zzz
2015-11-27 15:59:42 +00:00
parent e77c5bd05c
commit 9f625a03fb
6 changed files with 89 additions and 42 deletions

View File

@@ -12,6 +12,7 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Properties; import java.util.Properties;
import net.i2p.client.I2PSession;
import net.i2p.client.I2PSessionException; import net.i2p.client.I2PSessionException;
import net.i2p.client.datagram.I2PDatagramDissector; import net.i2p.client.datagram.I2PDatagramDissector;
import net.i2p.client.datagram.I2PDatagramMaker; import net.i2p.client.datagram.I2PDatagramMaker;
@@ -77,6 +78,7 @@ class SAMDatagramSession extends SAMMessageSession {
* *
* @param dest Destination * @param dest Destination
* @param data Bytes to be sent * @param data Bytes to be sent
* @param proto ignored, will always use PROTO_DATAGRAM (17)
* *
* @return True if the data was sent, false otherwise * @return True if the data was sent, false otherwise
* @throws DataFormatException on unknown / bad dest * @throws DataFormatException on unknown / bad dest
@@ -90,7 +92,7 @@ class SAMDatagramSession extends SAMMessageSession {
synchronized (dgramMaker) { synchronized (dgramMaker) {
dgram = dgramMaker.makeI2PDatagram(data); dgram = dgramMaker.makeI2PDatagram(data);
} }
return sendBytesThroughMessageSession(dest, dgram, proto, fromPort, toPort); return sendBytesThroughMessageSession(dest, dgram, I2PSession.PROTO_DATAGRAM, fromPort, toPort);
} }
protected void messageReceived(byte[] msg, int proto, int fromPort, int toPort) { protected void messageReceived(byte[] msg, int proto, int fromPort, int toPort) {

View File

@@ -22,7 +22,7 @@ import net.i2p.client.I2PSessionMuxedListener;
import net.i2p.data.Base64; import net.i2p.data.Base64;
import net.i2p.data.DataFormatException; import net.i2p.data.DataFormatException;
import net.i2p.data.Destination; import net.i2p.data.Destination;
import net.i2p.util.HexDump; //import net.i2p.util.HexDump;
import net.i2p.util.I2PAppThread; import net.i2p.util.I2PAppThread;
import net.i2p.util.Log; import net.i2p.util.Log;
@@ -261,10 +261,10 @@ abstract class SAMMessageSession {
byte msg[] = session.receiveMessage(msgId); byte msg[] = session.receiveMessage(msgId);
if (msg == null) if (msg == null)
return; return;
if (_log.shouldLog(Log.DEBUG)) { //if (_log.shouldLog(Log.DEBUG)) {
_log.debug("Content of message " + msgId + ":\n" // _log.debug("Content of message " + msgId + ":\n"
+ HexDump.dump(msg)); // + HexDump.dump(msg));
} //}
messageReceived(msg, proto, fromPort, toPort); messageReceived(msg, proto, fromPort, toPort);
} catch (I2PSessionException e) { } catch (I2PSessionException e) {

View File

@@ -12,6 +12,7 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Properties; import java.util.Properties;
import net.i2p.client.I2PSession;
import net.i2p.client.I2PSessionException; import net.i2p.client.I2PSessionException;
import net.i2p.data.DataFormatException; import net.i2p.data.DataFormatException;
import net.i2p.util.Log; import net.i2p.util.Log;
@@ -66,6 +67,7 @@ class SAMRawSession extends SAMMessageSession {
* Send bytes through a SAM RAW session. * Send bytes through a SAM RAW session.
* *
* @param data Bytes to be sent * @param data Bytes to be sent
* @param proto if 0, will use PROTO_DATAGRAM_RAW (18)
* *
* @return True if the data was sent, false otherwise * @return True if the data was sent, false otherwise
* @throws DataFormatException on unknown / bad dest * @throws DataFormatException on unknown / bad dest
@@ -75,6 +77,8 @@ class SAMRawSession extends SAMMessageSession {
int fromPort, int toPort) throws DataFormatException, I2PSessionException { int fromPort, int toPort) throws DataFormatException, I2PSessionException {
if (data.length > RAW_SIZE_MAX) if (data.length > RAW_SIZE_MAX)
throw new DataFormatException("Data size limit exceeded (" + data.length + ")"); throw new DataFormatException("Data size limit exceeded (" + data.length + ")");
if (proto == I2PSession.PROTO_UNSPECIFIED)
proto = I2PSession.PROTO_DATAGRAM_RAW;
return sendBytesThroughMessageSession(dest, data, proto, fromPort, toPort); return sendBytesThroughMessageSession(dest, data, proto, fromPort, toPort);
} }

View File

@@ -164,6 +164,7 @@ class SAMv3DatagramServer implements Handler {
else if (t.startsWith("TO_PORT=")) else if (t.startsWith("TO_PORT="))
tp = t.substring("TO_PORT=".length()); tp = t.substring("TO_PORT=".length());
} }
int proto = I2PSession.PROTO_UNSPECIFIED; int proto = I2PSession.PROTO_UNSPECIFIED;
int fromPort = I2PSession.PORT_UNSPECIFIED; int fromPort = I2PSession.PORT_UNSPECIFIED;
int toPort = I2PSession.PORT_UNSPECIFIED; int toPort = I2PSession.PORT_UNSPECIFIED;
@@ -196,7 +197,7 @@ class SAMv3DatagramServer implements Handler {
is.read(data); is.read(data);
SAMv3Handler.Session sess = rec.getHandler().getSession(); SAMv3Handler.Session sess = rec.getHandler().getSession();
if (sess != null) if (sess != null)
rec.getHandler().getSession().sendBytes(dest,data, proto, fromPort, toPort); sess.sendBytes(dest, data, proto, fromPort, toPort);
else else
warn("Dropping datagram, no session for " + nick); warn("Dropping datagram, no session for " + nick);
} else { } else {

View File

@@ -1,10 +1,14 @@
package net.i2p.sam.client; package net.i2p.sam.client;
import java.io.ByteArrayOutputStream;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
import java.security.GeneralSecurityException; import java.security.GeneralSecurityException;
import java.util.HashMap; import java.util.HashMap;
@@ -37,6 +41,7 @@ public class SAMStreamSend {
private String _conOptions; private String _conOptions;
private SAMReader _reader, _reader2; private SAMReader _reader, _reader2;
private boolean _isV3; private boolean _isV3;
private boolean _isV32;
private String _v3ID; private String _v3ID;
//private boolean _dead; //private boolean _dead;
/** Connection id (Integer) to peer (Flooder) */ /** Connection id (Integer) to peer (Flooder) */
@@ -155,7 +160,7 @@ public class SAMStreamSend {
throw new IOException("handshake failed"); throw new IOException("handshake failed");
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Handshake complete. we are " + ourDest); _log.debug("Handshake complete. we are " + ourDest);
if (_isV3 && mode != V1DG && mode != V1RAW) { if (_isV3 && mode == STREAM) {
Socket sock2 = connect(isSSL); Socket sock2 = connect(isSSL);
eventHandler = new SendEventHandler(_context); eventHandler = new SendEventHandler(_context);
_reader2 = new SAMReader(_context, sock2.getInputStream(), eventHandler); _reader2 = new SAMReader(_context, sock2.getInputStream(), eventHandler);
@@ -169,9 +174,9 @@ public class SAMStreamSend {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Handshake2 complete."); _log.debug("Handshake2 complete.");
} }
if (ourDest != null) { if (mode == DG || mode == RAW)
send(out, eventHandler, mode); out = null;
} send(out, eventHandler, mode);
} catch (IOException e) { } catch (IOException e) {
_log.error("Unable to connect to SAM at " + _samHost + ":" + _samPort, e); _log.error("Unable to connect to SAM at " + _samHost + ":" + _samPort, e);
if (_reader != null) if (_reader != null)
@@ -241,6 +246,7 @@ public class SAMStreamSend {
return "OK"; return "OK";
_isV3 = VersionComparator.comp(hisVersion, "3") >= 0; _isV3 = VersionComparator.comp(hisVersion, "3") >= 0;
if (_isV3) { if (_isV3) {
_isV32 = VersionComparator.comp(hisVersion, "3.2") >= 0;
byte[] id = new byte[5]; byte[] id = new byte[5];
_context.random().nextBytes(id); _context.random().nextBytes(id);
_v3ID = Base32.encode(id); _v3ID = Base32.encode(id);
@@ -307,11 +313,21 @@ public class SAMStreamSend {
private final OutputStream _samOut; private final OutputStream _samOut;
private final SAMEventHandler _eventHandler; private final SAMEventHandler _eventHandler;
private final int _mode; private final int _mode;
private final DatagramSocket _dgSock;
private final InetSocketAddress _dgSAM;
public Sender(OutputStream samOut, SAMEventHandler eventHandler, int mode) { public Sender(OutputStream samOut, SAMEventHandler eventHandler, int mode) throws IOException {
_samOut = samOut; _samOut = samOut;
_eventHandler = eventHandler; _eventHandler = eventHandler;
_mode = mode; _mode = mode;
if (mode == DG || mode == RAW) {
// samOut will be null
_dgSock = new DatagramSocket();
_dgSAM = new InetSocketAddress(_samHost, 7655);
} else {
_dgSock = null;
_dgSAM = null;
}
synchronized (_remotePeers) { synchronized (_remotePeers) {
if (_v3ID != null) if (_v3ID != null)
_connectionId = _v3ID; _connectionId = _v3ID;
@@ -396,22 +412,42 @@ public class SAMStreamSend {
_log.debug("Sending " + read + " on " + _connectionId + " after " + (now-lastSend)); _log.debug("Sending " + read + " on " + _connectionId + " after " + (now-lastSend));
lastSend = now; lastSend = now;
synchronized (_samOut) { if (_samOut != null) {
if (!_isV3 || _mode == V1DG || _mode == V1RAW) { synchronized (_samOut) {
String m; if (!_isV3 || _mode == V1DG || _mode == V1RAW) {
if (_mode == STREAM) String m;
m = "STREAM SEND ID=" + _connectionId + " SIZE=" + read + "\n"; if (_mode == STREAM) {
else if (_mode == V1DG) m = "STREAM SEND ID=" + _connectionId + " SIZE=" + read + "\n";
m = "DATAGRAM SEND DESTINATION=" + _remoteDestination + " SIZE=" + read + "\n"; } else if (_mode == V1DG) {
else if (_mode == V1RAW) m = "DATAGRAM SEND DESTINATION=" + _remoteDestination + " SIZE=" + read + "\n";
m = "RAW SEND DESTINATION=" + _remoteDestination + " SIZE=" + read + "\n"; } else if (_mode == V1RAW) {
else m = "RAW SEND DESTINATION=" + _remoteDestination + " SIZE=" + read + "\n";
throw new IOException("unsupported mode " + _mode); } else {
byte msg[] = DataHelper.getASCII(m); throw new IOException("unsupported mode " + _mode);
_samOut.write(msg); }
byte msg[] = DataHelper.getASCII(m);
_samOut.write(msg);
}
_samOut.write(data, 0, read);
_samOut.flush();
} }
_samOut.write(data, 0, read); } else {
_samOut.flush(); // real datagrams
ByteArrayOutputStream baos = new ByteArrayOutputStream(read + 1024);
baos.write(DataHelper.getASCII("3.0 "));
baos.write(DataHelper.getASCII(_v3ID));
baos.write((byte) ' ');
baos.write(DataHelper.getASCII(_remoteDestination));
if (_isV32) {
// only set TO_PORT to test session setting of FROM_PORT
baos.write(DataHelper.getASCII(" TO_PORT=5678"));
}
baos.write((byte) '\n');
baos.write(data, 0, read);
byte[] pkt = baos.toByteArray();
DatagramPacket p = new DatagramPacket(pkt, pkt.length, _dgSAM);
_dgSock.send(p);
try { Thread.sleep(25); } catch (InterruptedException ie) {}
} }
_totalSent += read; _totalSent += read;
@@ -423,23 +459,27 @@ public class SAMStreamSend {
} }
} }
if (_isV3) { if (_samOut != null) {
try { if (_isV3) {
_samOut.close(); try {
} catch (IOException ioe) {
_log.info("Error closing", ioe);
}
} else {
byte msg[] = ("STREAM CLOSE ID=" + _connectionId + "\n").getBytes();
try {
synchronized (_samOut) {
_samOut.write(msg);
_samOut.flush();
_samOut.close(); _samOut.close();
} catch (IOException ioe) {
_log.info("Error closing", ioe);
}
} else {
byte msg[] = ("STREAM CLOSE ID=" + _connectionId + "\n").getBytes();
try {
synchronized (_samOut) {
_samOut.write(msg);
_samOut.flush();
_samOut.close();
}
} catch (IOException ioe) {
_log.info("Error closing", ioe);
} }
} catch (IOException ioe) {
_log.info("Error closing", ioe);
} }
} else if (_dgSock != null) {
_dgSock.close();
} }
closed(); closed();

View File

@@ -163,7 +163,7 @@ public class SAMStreamSink {
Thread t = new Pinger(out); Thread t = new Pinger(out);
t.start(); t.start();
} }
if (_isV3 && mode != V1DG && mode != V1RAW) { if (_isV3 && mode == STREAM) {
Socket sock2 = connect(isSSL); Socket sock2 = connect(isSSL);
out = sock2.getOutputStream(); out = sock2.getOutputStream();
eventHandler = new SinkEventHandler2(_context, sock2.getInputStream(), out); eventHandler = new SinkEventHandler2(_context, sock2.getInputStream(), out);