diff --git a/apps/sam/java/src/net/i2p/sam/ReadLine.java b/apps/sam/java/src/net/i2p/sam/ReadLine.java index 2af5f19d3..88c9a2553 100644 --- a/apps/sam/java/src/net/i2p/sam/ReadLine.java +++ b/apps/sam/java/src/net/i2p/sam/ReadLine.java @@ -3,13 +3,14 @@ package net.i2p.sam; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; -import java.io.InputStreamReader; +import java.io.Reader; import java.net.Socket; import java.net.SocketException; import java.net.SocketTimeoutException; /** * Modified from I2PTunnelHTTPServer + * Handles UTF-8. Does not buffer past the end of line. * * @since 0.9.24 */ @@ -24,39 +25,44 @@ class ReadLine { * Warning - 8KB line length limit as of 0.7.13, @throws IOException if exceeded * * @param buf output - * @param timeout throws SocketTimeoutException immediately if zero or negative + * @param timeout forever if if zero or negative * @throws SocketTimeoutException if timeout is reached before newline * @throws EOFException if EOF is reached before newline * @throws LineTooLongException if too long * @throws IOException on other errors in the underlying stream */ public static void readLine(Socket socket, StringBuilder buf, int timeout) throws IOException { - if (timeout <= 0) - throw new SocketTimeoutException(); - long expires = System.currentTimeMillis() + timeout; - // this reads and buffers extra bytes, so we can't use it - // unless we're going to decode UTF-8 on-the-fly, we're stuck with ASCII - //InputStreamReader in = new InputStreamReader(socket.getInputStream(), "UTF-8"); - InputStream in = socket.getInputStream(); + final int origTimeout = timeout; int c; int i = 0; - socket.setSoTimeout(timeout); - while ( (c = in.read()) != -1) { + final long expires; + if (origTimeout > 0) { + socket.setSoTimeout(timeout); + expires = System.currentTimeMillis() + timeout; + } else { + expires = 0; + } + final Reader reader = new UTF8Reader(socket.getInputStream()); + while ( (c = reader.read()) != -1) { if (++i > MAX_LINE_LENGTH) throw new LineTooLongException("Line too long - max " + MAX_LINE_LENGTH); if (c == '\n') break; - int newTimeout = (int) (expires - System.currentTimeMillis()); - if (newTimeout <= 0) - throw new SocketTimeoutException(); - buf.append((char)c); - if (newTimeout != timeout) { - timeout = newTimeout; - socket.setSoTimeout(timeout); + if (origTimeout > 0) { + int newTimeout = (int) (expires - System.currentTimeMillis()); + if (newTimeout <= 0) + throw new SocketTimeoutException(); + buf.append((char)c); + if (newTimeout != timeout) { + timeout = newTimeout; + socket.setSoTimeout(timeout); + } + } else { + buf.append((char)c); } } if (c == -1) { - if (System.currentTimeMillis() >= expires) + if (origTimeout > 0 && System.currentTimeMillis() >= expires) throw new SocketTimeoutException(); else throw new EOFException(); diff --git a/apps/sam/java/src/net/i2p/sam/SAMHandlerFactory.java b/apps/sam/java/src/net/i2p/sam/SAMHandlerFactory.java index c484c1b84..b814124f8 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMHandlerFactory.java +++ b/apps/sam/java/src/net/i2p/sam/SAMHandlerFactory.java @@ -57,6 +57,8 @@ class SAMHandlerFactory { } catch (RuntimeException e) { throw new SAMException("Unexpected error", e); } + if (log.shouldDebug()) + log.debug("New message received: [" + line + ']'); // Message format: HELLO VERSION [MIN=v1] [MAX=v2] Properties props = SAMUtils.parseParams(line); diff --git a/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java b/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java index 5eb235448..e779e05fa 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java +++ b/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java @@ -98,6 +98,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece String opcode = null; boolean canContinue = false; Properties props; + final StringBuilder buf = new StringBuilder(128); this.thread.setName("SAMv1Handler " + _id); if (_log.shouldLog(Log.DEBUG)) @@ -120,19 +121,13 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece _log.info("Connection closed by client"); break; } - java.io.InputStream is = clientSocketChannel.socket().getInputStream(); - if (is == null) { - _log.info("Connection closed by client"); - break; - } - msg = DataHelper.readLine(is); - if (msg == null) { - _log.info("Connection closed by client (line read : null)"); - break; - } + buf.setLength(0); + // TODO set timeout first time + ReadLine.readLine(clientSocketChannel.socket(), buf, 0); + msg = buf.toString(); if (_log.shouldLog(Log.DEBUG)) { - _log.debug("New message received: [" + msg + "]"); + _log.debug("New message received: [" + msg + ']'); } props = SAMUtils.parseParams(msg); domain = props.getProperty(SAMUtils.COMMAND); diff --git a/apps/sam/java/src/net/i2p/sam/SAMv3Handler.java b/apps/sam/java/src/net/i2p/sam/SAMv3Handler.java index 5d7bff33f..6723c2624 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMv3Handler.java +++ b/apps/sam/java/src/net/i2p/sam/SAMv3Handler.java @@ -329,21 +329,13 @@ class SAMv3Handler extends SAMv1Handler } } else { buf.setLength(0); - if (DataHelper.readLine(in, buf)) - line = buf.toString(); - else - line = null; - } - if (line==null) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Connection closed by client (line read : null)"); - break; + // TODO first time, set a timeout + ReadLine.readLine(socket, buf, 0); + line = buf.toString(); } - if (_log.shouldLog(Log.DEBUG)) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("New message received: [" + msg + "]"); - } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("New message received: [" + line + ']'); props = SAMUtils.parseParams(line); domain = props.getProperty(SAMUtils.COMMAND); if (domain == null) { diff --git a/apps/sam/java/src/net/i2p/sam/UTF8Reader.java b/apps/sam/java/src/net/i2p/sam/UTF8Reader.java new file mode 100644 index 000000000..da375f529 --- /dev/null +++ b/apps/sam/java/src/net/i2p/sam/UTF8Reader.java @@ -0,0 +1,152 @@ +package net.i2p.sam; + +import java.io.IOException; +import java.io.InputStream; +import java.io.Reader; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CoderResult; + + +/** + * An unbuffered version of InputStreamReader. + * + * Does not read any extra characters, as long as input is well-formed. + * This permits the partial reading of an InputStream as UTF-8 + * and then passing the remainder of the input stream elsewhere. + * This isn't the most robust for malformed input, so it + * may not be appropriate for e.g. HTTP headers. + * + * Not thread-safe, obviously. + * + * May be moved to net.i2p.util if anybody else needs it. + * + * @since 0.9.24 somewhat adapted from net.i2p.util.TranslateReader + */ +public class UTF8Reader extends Reader { + + private final InputStream _in; + // following three are lazily initialized when needed + private ByteBuffer _bb; + private CharBuffer _cb; + private CharsetDecoder _dc; + + // Charset.forName("UTF-8").newDecoder().replacement().charAt(0) & 0xffff + private static final int REPLACEMENT = 0xfffd; + + /** + * @param in UTF-8 + */ + public UTF8Reader(InputStream in) { + super(); + _in = in; + } + + /** + * @return replacement character on decoding error + */ + @Override + public int read() throws IOException { + int b = _in.read(); + if (b < 0) + return b; + // https://en.wikipedia.org/wiki/Utf-8 + if ((b & 0x80) == 0) + return b; + if (_bb == null) { + _bb = ByteBuffer.allocate(6); + _cb = CharBuffer.allocate(1); + _dc = Charset.forName("UTF-8").newDecoder(); + } else { + _bb.clear(); + _cb.clear(); + } + _bb.put((byte) b); + int end; // how many more + if ((b & 0xe0) == 0xc0) + end = 1; + else if ((b & 0xf0) == 0xe0) + end = 2; + else if ((b & 0xf8) == 0xf0) + end = 3; + else if ((b & 0xfc) == 0xf8) + end = 4; + else if ((b & 0xfe) == 0xfc) + end = 5; + else // error, 10xxxxxx + return REPLACEMENT; + for (int i = 0; i < end; i++) { + b = _in.read(); + if (b < 0) + return REPLACEMENT; // next read will return EOF + // we aren't going to check for all errors, + // but let's fail fast on this one + if ((b & 0x80) == 0) + return REPLACEMENT; + _bb.put((byte) b); + } + _dc.reset(); + _bb.flip(); + CoderResult result = _dc.decode(_bb, _cb, true); + // Overflow and underflow are not errors. + // It seems to return underflow every time. + // So just check if we got a character back in the buffer. + _cb.flip(); + if (result.isError() || !_cb.hasRemaining()) + return REPLACEMENT; + // let underflow and overflow go, return first + return _cb.get() & 0xffff; + } + + @Override + public int read(char cbuf[]) throws IOException { + return read(cbuf, 0, cbuf.length); + } + + public int read(char cbuf[], int off, int len) throws IOException { + for (int i = 0; i < len; i++) { + int c = read(); + if (c < 0) { + if (i == 0) + return -1; + return i; + } + cbuf[off + i] = (char) c; + } + return len; + } + + public void close() throws IOException { + _in.close(); + } + +/**** + public static void main(String[] args) { + try { + String s = "Consider the encoding of the Euro sign, €." + + " The Unicode code point for \"€\" is U+20AC."; + byte[] test = s.getBytes("UTF-8"); + InputStream bais = new java.io.ByteArrayInputStream(test); + UTF8Reader r = new UTF8Reader(bais); + int b; + StringBuilder buf = new StringBuilder(128); + while ((b = r.read()) >= 0) { + buf.append((char) b); + } + System.out.println("Received: " + buf); + System.out.println("Test passed? " + buf.toString().equals(s)); + buf.setLength(0); + bais = new java.io.ByteArrayInputStream(new byte[] { 'x', (byte) 0xcc, 'x' } ); + r = new UTF8Reader(bais); + while ((b = r.read()) >= 0) { + buf.append((char) b); + } + System.out.println("Received: " + buf); + } catch (IOException ioe) { + ioe.printStackTrace(); + } + } +****/ +} diff --git a/apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java b/apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java index d8f743208..8b912f401 100644 --- a/apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java +++ b/apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java @@ -237,9 +237,10 @@ public class SAMStreamSend { synchronized (samOut) { try { if (user != null && password != null) - samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + " USER=" + user + " PASSWORD=" + password + '\n').getBytes()); + samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + " USER=\"" + user.replace("\"", "\\\"") + + "\" PASSWORD=\"" + password.replace("\"", "\\\"") + "\"\n").getBytes("UTF-8")); else - samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + '\n').getBytes()); + samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + '\n').getBytes("UTF-8")); samOut.flush(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Hello sent"); @@ -256,6 +257,8 @@ public class SAMStreamSend { byte[] id = new byte[5]; _context.random().nextBytes(id); _v3ID = Base32.encode(id); + if (_isV32) + _v3ID = "xx€€xx" + _v3ID; _conOptions = "ID=" + _v3ID; } String style; @@ -266,7 +269,7 @@ public class SAMStreamSend { else style = "RAW"; String req = "SESSION CREATE STYLE=" + style + " DESTINATION=TRANSIENT " + _conOptions + ' ' + opts + '\n'; - samOut.write(req.getBytes()); + samOut.write(req.getBytes("UTF-8")); samOut.flush(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Session create sent"); @@ -277,7 +280,7 @@ public class SAMStreamSend { _log.debug("Session create reply found: " + ok); req = "NAMING LOOKUP NAME=ME\n"; - samOut.write(req.getBytes()); + samOut.write(req.getBytes("UTF-8")); samOut.flush(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Naming lookup sent"); @@ -363,7 +366,7 @@ public class SAMStreamSend { if (_isV3) buf.append(" FROM_PORT=1234 TO_PORT=5678"); buf.append('\n'); - byte[] msg = DataHelper.getASCII(buf.toString()); + byte[] msg = DataHelper.getUTF8(buf.toString()); synchronized (_samOut) { _samOut.write(msg); _samOut.flush(); @@ -431,7 +434,7 @@ public class SAMStreamSend { } else { throw new IOException("unsupported mode " + _mode); } - byte msg[] = DataHelper.getASCII(m); + byte msg[] = DataHelper.getUTF8(m); _samOut.write(msg); } _samOut.write(data, 0, read); @@ -440,16 +443,16 @@ public class SAMStreamSend { } else { // real datagrams ByteArrayOutputStream baos = new ByteArrayOutputStream(read + 1024); - baos.write(DataHelper.getASCII("3.0 ")); - baos.write(DataHelper.getASCII(_v3ID)); + baos.write(DataHelper.getUTF8("3.0 ")); + baos.write(DataHelper.getUTF8(_v3ID)); baos.write((byte) ' '); - baos.write(DataHelper.getASCII(_remoteDestination)); + baos.write(DataHelper.getUTF8(_remoteDestination)); if (_isV32) { // only set TO_PORT to test session setting of FROM_PORT if (_mode == RAW) - baos.write(DataHelper.getASCII(" PROTOCOL=123 TO_PORT=5678")); + baos.write(DataHelper.getUTF8(" PROTOCOL=123 TO_PORT=5678")); else - baos.write(DataHelper.getASCII(" TO_PORT=5678")); + baos.write(DataHelper.getUTF8(" TO_PORT=5678")); } baos.write((byte) '\n'); baos.write(data, 0, read); @@ -476,12 +479,13 @@ public class SAMStreamSend { _log.info("Error closing", ioe); } } else { - byte msg[] = ("STREAM CLOSE ID=" + _connectionId + "\n").getBytes(); try { + byte msg[] = ("STREAM CLOSE ID=" + _connectionId + "\n").getBytes("UTF-8"); synchronized (_samOut) { _samOut.write(msg); _samOut.flush(); - _samOut.close(); + // we can't close this yet, we will lose data + //_samOut.close(); } } catch (IOException ioe) { _log.info("Error closing", ioe); @@ -492,20 +496,18 @@ public class SAMStreamSend { } closed(); + // stop the reader, since we're only doing this once for testing + // you wouldn't do this in a real application + // closing the master socket too fast will kill the data socket flushing through + try { + Thread.sleep(10000); + } catch (InterruptedException ie) {} if (_log.shouldLog(Log.DEBUG)) _log.debug("Runner exiting"); if (toSend != _totalSent) _log.error("Only sent " + _totalSent + " of " + toSend + " bytes"); if (_reader2 != null) _reader2.stopReading(); - // stop the reader, since we're only doing this once for testing - // you wouldn't do this in a real application - if (_isV3) { - // closing the master socket too fast will kill the data socket flushing through - try { - Thread.sleep(10000); - } catch (InterruptedException ie) {} - } _reader.stopReading(); } } diff --git a/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java b/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java index ef865afcc..d8cd8a359 100644 --- a/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java +++ b/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java @@ -307,7 +307,7 @@ public class SAMStreamSink { try { Thread.sleep(127*1000); synchronized(_out) { - _out.write(DataHelper.getASCII("PING " + System.currentTimeMillis() + '\n')); + _out.write(DataHelper.getUTF8("PING " + System.currentTimeMillis() + '\n')); _out.flush(); } } catch (InterruptedException ie) { @@ -377,7 +377,7 @@ public class SAMStreamSink { _log.info("Got PING " + data + ", sending PONG " + data); synchronized (_out) { try { - _out.write(("PONG " + data + '\n').getBytes()); + _out.write(("PONG " + data + '\n').getBytes("UTF-8")); _out.flush(); } catch (IOException ioe) { _log.error("PONG fail", ioe); @@ -514,9 +514,9 @@ public class SAMStreamSink { synchronized (samOut) { try { if (user != null && password != null) - samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + " USER=" + user + " PASSWORD=" + password + '\n').getBytes()); + samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + " USER=" + user + " PASSWORD=" + password + '\n').getBytes("UTF-8")); else - samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + '\n').getBytes()); + samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + '\n').getBytes("UTF-8")); samOut.flush(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Hello sent"); @@ -536,7 +536,7 @@ public class SAMStreamSink { req = "STREAM FORWARD ID=" + _v3ID + " PORT=" + V3FORWARDPORT + '\n'; else throw new IllegalStateException("mode " + mode); - samOut.write(req.getBytes()); + samOut.write(req.getBytes("UTF-8")); samOut.flush(); if (_log.shouldLog(Log.DEBUG)) _log.debug("STREAM ACCEPT/FORWARD sent"); @@ -600,7 +600,7 @@ public class SAMStreamSink { else style = "RAW HEADER=true PORT=" + V3DGPORT; String req = "SESSION CREATE STYLE=" + style + " DESTINATION=" + dest + ' ' + _conOptions + ' ' + sopts + '\n'; - samOut.write(req.getBytes()); + samOut.write(req.getBytes("UTF-8")); samOut.flush(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Session create sent"); @@ -612,7 +612,7 @@ public class SAMStreamSink { _log.debug("Session create reply found: " + ok); } req = "NAMING LOOKUP NAME=ME\n"; - samOut.write(req.getBytes()); + samOut.write(req.getBytes("UTF-8")); samOut.flush(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Naming lookup sent"); @@ -649,7 +649,7 @@ public class SAMStreamSink { FileOutputStream fos = null; try { fos = new FileOutputStream(f); - fos.write(dest.getBytes()); + fos.write(dest.getBytes("UTF-8")); if (_log.shouldLog(Log.DEBUG)) _log.debug("My destination written to " + _destFile); } catch (IOException e) {