forked from I2P_Developers/i2p.i2p
v3 sink working
This commit is contained in:
@@ -31,10 +31,10 @@ public class SAMStreamSink {
|
||||
private final String _destFile;
|
||||
private final String _sinkDir;
|
||||
private String _conOptions;
|
||||
private SAMReader _reader;
|
||||
private SAMReader _reader, _reader2;
|
||||
private boolean _isV3;
|
||||
private String _v3ID;
|
||||
//private boolean _dead;
|
||||
private final SAMEventHandler _eventHandler;
|
||||
/** Connection id (Integer) to peer (Flooder) */
|
||||
private final Map<String, Sink> _remotePeers;
|
||||
|
||||
@@ -58,7 +58,6 @@ public class SAMStreamSink {
|
||||
_destFile = destFile;
|
||||
_sinkDir = sinkDir;
|
||||
_conOptions = "";
|
||||
_eventHandler = new SinkEventHandler(_context);
|
||||
_remotePeers = new HashMap<String, Sink>();
|
||||
}
|
||||
|
||||
@@ -67,20 +66,32 @@ public class SAMStreamSink {
|
||||
_log.debug("Starting up");
|
||||
try {
|
||||
Socket sock = connect();
|
||||
_reader = new SAMReader(_context, sock.getInputStream(), _eventHandler);
|
||||
SAMEventHandler eventHandler = new SinkEventHandler(_context);
|
||||
_reader = new SAMReader(_context, sock.getInputStream(), eventHandler);
|
||||
_reader.startReading();
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Reader created");
|
||||
OutputStream out = sock.getOutputStream();
|
||||
String ourDest = handshake(out, version, true);
|
||||
String ourDest = handshake(out, version, true, eventHandler);
|
||||
if (ourDest == null)
|
||||
throw new IOException("handshake failed");
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Handshake complete. we are " + ourDest);
|
||||
if (ourDest != null) {
|
||||
//boolean written =
|
||||
writeDest(ourDest);
|
||||
} else {
|
||||
_reader.stopReading();
|
||||
if (_isV3) {
|
||||
Socket sock2 = connect();
|
||||
eventHandler = new SinkEventHandler2(_context, sock2.getInputStream());
|
||||
_reader2 = new SAMReader(_context, sock2.getInputStream(), eventHandler);
|
||||
_reader2.startReading();
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Reader2 created");
|
||||
out = sock2.getOutputStream();
|
||||
String ok = handshake(out, version, false, eventHandler);
|
||||
if (ok == null)
|
||||
throw new IOException("2nd handshake failed");
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Handshake2 complete.");
|
||||
}
|
||||
writeDest(ourDest);
|
||||
} catch (IOException e) {
|
||||
_log.error("Unable to connect to SAM at " + _samHost + ":" + _samPort, e);
|
||||
}
|
||||
@@ -133,24 +144,115 @@ public class SAMStreamSink {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class SinkEventHandler2 extends SinkEventHandler {
|
||||
|
||||
private final InputStream _in;
|
||||
|
||||
public SinkEventHandler2(I2PAppContext ctx, InputStream in) {
|
||||
super(ctx);
|
||||
_in = in;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void streamStatusReceived(String result, String id, String message) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("got STREAM STATUS, result=" + result);
|
||||
super.streamStatusReceived(result, id, message);
|
||||
// with SILENT=true, there's nothing else coming, so fire up the Sink
|
||||
Sink sink = null;
|
||||
try {
|
||||
String dest = "TODO if not silent";
|
||||
sink = new Sink(_v3ID, dest);
|
||||
synchronized (_remotePeers) {
|
||||
_remotePeers.put(_v3ID, sink);
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
_log.error("Error creating a new sink", ioe);
|
||||
try { _in.close(); } catch (IOException ioe2) {}
|
||||
if (sink != null)
|
||||
sink.closed();
|
||||
return;
|
||||
}
|
||||
// inline so the reader doesn't grab the data
|
||||
try {
|
||||
boolean gotDest = false;
|
||||
byte[] dest = new byte[1024];
|
||||
int dlen = 0;
|
||||
byte buf[] = new byte[4096];
|
||||
int len;
|
||||
while((len = _in.read(buf)) >= 0) {
|
||||
if (!gotDest) {
|
||||
// eat the dest line
|
||||
for (int i = 0; i < len; i++) {
|
||||
byte b = buf[i];
|
||||
if (b == (byte) '\n') {
|
||||
gotDest = true;
|
||||
if (_log.shouldInfo()) {
|
||||
try {
|
||||
_log.info("Got incoming accept from: \"" + new String(dest, 0, dlen, "ISO-8859-1") + '"');
|
||||
} catch (IOException uee) {}
|
||||
}
|
||||
// feed any remaining to the sink
|
||||
i++;
|
||||
if (i < len)
|
||||
sink.received(buf, i, len - i);
|
||||
break;
|
||||
} else {
|
||||
if (dlen < dest.length) {
|
||||
dest[dlen++] = b;
|
||||
} else if (dlen == dest.length) {
|
||||
dlen++;
|
||||
_log.error("first line overflow on accept");
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
sink.received(buf, 0, len);
|
||||
}
|
||||
}
|
||||
sink.closed();
|
||||
} catch (IOException ioe) {
|
||||
_log.error("Error reading", ioe);
|
||||
} finally {
|
||||
try { _in.close(); } catch (IOException ioe) {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Socket connect() throws IOException {
|
||||
return new Socket(_samHost, Integer.parseInt(_samPort));
|
||||
}
|
||||
|
||||
/** @return our b64 dest or null */
|
||||
private String handshake(OutputStream samOut, String version, boolean isMaster) {
|
||||
private String handshake(OutputStream samOut, String version, boolean isMaster, SAMEventHandler eventHandler) {
|
||||
synchronized (samOut) {
|
||||
try {
|
||||
samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + '\n').getBytes());
|
||||
samOut.flush();
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Hello sent");
|
||||
String hisVersion = _eventHandler.waitForHelloReply();
|
||||
String hisVersion = eventHandler.waitForHelloReply();
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Hello reply found: " + hisVersion);
|
||||
if (hisVersion == null)
|
||||
throw new IOException("Hello failed");
|
||||
if (!isMaster) {
|
||||
// only for v3
|
||||
//String req = "STREAM ACCEPT SILENT=true ID=" + _v3ID + "\n";
|
||||
String req = "STREAM ACCEPT SILENT=false ID=" + _v3ID + "\n";
|
||||
samOut.write(req.getBytes());
|
||||
samOut.flush();
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("STREAM ACCEPT sent");
|
||||
// docs were wrong, we do not get a STREAM STATUS if SILENT=true
|
||||
//boolean ok = eventHandler.waitForStreamStatusReply();
|
||||
//if (!ok)
|
||||
// throw new IOException("Stream status failed");
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("got STREAM STATUS, awaiting connetion");
|
||||
return "OK";
|
||||
}
|
||||
_isV3 = VersionComparator.comp(hisVersion, "3") >= 0;
|
||||
String dest;
|
||||
if (_isV3) {
|
||||
@@ -178,7 +280,8 @@ public class SAMStreamSink {
|
||||
if (isMaster) {
|
||||
byte[] id = new byte[5];
|
||||
_context.random().nextBytes(id);
|
||||
_conOptions = "ID=" + Base32.encode(id);
|
||||
_v3ID = Base32.encode(id);
|
||||
_conOptions = "ID=" + _v3ID;
|
||||
}
|
||||
} else {
|
||||
// we use the filename as the name in sam.keys
|
||||
@@ -190,7 +293,7 @@ public class SAMStreamSink {
|
||||
samOut.flush();
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Session create sent");
|
||||
boolean ok = _eventHandler.waitForSessionCreateReply();
|
||||
boolean ok = eventHandler.waitForSessionCreateReply();
|
||||
if (!ok)
|
||||
throw new IOException("Session create failed");
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
@@ -201,7 +304,7 @@ public class SAMStreamSink {
|
||||
samOut.flush();
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Naming lookup sent");
|
||||
String destination = _eventHandler.waitForNamingReply("ME");
|
||||
String destination = eventHandler.waitForNamingReply("ME");
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Naming lookup reply found: " + destination);
|
||||
if (destination == null) {
|
||||
|
Reference in New Issue
Block a user