From 89745f50026b31609c2e654c23e19ed4dbd0049e Mon Sep 17 00:00:00 2001 From: zzz Date: Sat, 30 May 2015 13:19:29 +0000 Subject: [PATCH] HTTP Client: Greatly simplify decompression by using InflaterOutputStream, available since Java 6. Removes PipedInputStream, PipedOutputStream. Removes Pusher threads. Remove delay workaround for truncated pages, no longer required. --- .../net/i2p/i2ptunnel/GunzipOutputStream.java | 372 ++++++++++++++++++ .../i2ptunnel/HTTPResponseOutputStream.java | 117 +----- 2 files changed, 373 insertions(+), 116 deletions(-) create mode 100644 apps/i2ptunnel/java/src/net/i2p/i2ptunnel/GunzipOutputStream.java diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/GunzipOutputStream.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/GunzipOutputStream.java new file mode 100644 index 000000000..291d903db --- /dev/null +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/GunzipOutputStream.java @@ -0,0 +1,372 @@ +package net.i2p.i2ptunnel; + +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.zip.CRC32; +import java.util.zip.Inflater; +import java.util.zip.InflaterOutputStream; + +import net.i2p.data.DataHelper; + +/** + * Gunzip implementation per + * RFC 1952, reusing + * java's standard CRC32 and Inflater and InflaterOutputStream implementations. + * + * Note that the underlying InflaterOutputStream cannot be reused after close(), + * so we don't have a Reusable version of this. + * + * Modified from net.i2p.util.ResettableGZIPInputStream to use Java 6 InflaterOutputstream + * @since 0.9.21 + */ +class GunzipOutputStream extends InflaterOutputStream { + private static final int FOOTER_SIZE = 8; // CRC32 + ISIZE + private final CRC32 _crc32; + private final byte _buf1[] = new byte[1]; + private boolean _complete; + private byte _footer[] = new byte[FOOTER_SIZE]; + private long _bytesReceived; + private long _bytesReceivedAtCompletion; + + private enum HeaderState { MB1, MB2, CF, MT0, MT1, MT2, MT3, EF, OS, FLAGS, + EH1, EH2, EHDATA, NAME, COMMENT, CRC1, CRC2, DONE } + private HeaderState _state = HeaderState.MB1; + private int _flags; + private int _extHdrToRead; + + /** + * Build a new Gunzip stream + */ + public GunzipOutputStream(OutputStream uncompressedStream) throws IOException { + super(uncompressedStream, new Inflater(true)); + _crc32 = new CRC32(); + } + + @Override + public void write(int b) throws IOException { + _buf1[0] = (byte) b; + write(_buf1, 0, 1); + } + + @Override + public void write(byte buf[]) throws IOException { + write(buf, 0, buf.length); + } + + @Override + public void write(byte buf[], int off, int len) throws IOException { + if (_complete) { + // shortcircuit so the inflater doesn't try to refill + // with the footer's data (which would fail, causing ZLIB err) + return; + } + boolean isFinished = inf.finished(); + for (int i = off; i < off + len; i++) { + if (!isFinished) { + if (_state != HeaderState.DONE) { + verifyHeader(buf[i]); + continue; + } + // ensure we call the same method variant so we don't depend on underlying implementation + super.write(buf, i, 1); + if (inf.finished()) { + isFinished = true; + _bytesReceivedAtCompletion = _bytesReceived; + } + } + _footer[(int) (_bytesReceived++ % FOOTER_SIZE)] = buf[i]; + if (isFinished) { + long footerSize = _bytesReceivedAtCompletion - _bytesReceived; + // could be at 7 or 8... + // we write the first byte of the footer to the Inflater if necessary... + // see comments in ResettableGZIPInputStream for details + if (footerSize >= FOOTER_SIZE - 1) { + try { + verifyFooter(); + inf.reset(); // so it doesn't bitch about missing data... + _complete = true; + return; + } catch (IOException ioe) { + // failed at 7, retry at 8 + if (footerSize == FOOTER_SIZE - 1 && i < off + len - 1) + continue; + _complete = true; + throw ioe; + } + } + } + } + } + + /** + * Inflater statistic + */ + public long getTotalRead() { + try { + return inf.getBytesRead(); + } catch (Exception e) { + return 0; + } + } + + /** + * Inflater statistic + */ + public long getTotalExpanded() { + try { + return inf.getBytesWritten(); + } catch (Exception e) { + // possible NPE in some implementations + return 0; + } + } + + /** + * Inflater statistic + */ + public long getRemaining() { + try { + return inf.getRemaining(); + } catch (Exception e) { + // possible NPE in some implementations + return 0; + } + } + + /** + * Inflater statistic + */ + public boolean getFinished() { + try { + return inf.finished(); + } catch (Exception e) { + // possible NPE in some implementations + return true; + } + } + + @Override + public void close() throws IOException { + _complete = true; + _state = HeaderState.DONE; + super.close(); + } + + @Override + public String toString() { + return "GOS read: " + getTotalRead() + " expanded: " + getTotalExpanded() + " remaining: " + getRemaining() + " finished: " + getFinished(); + } + + /** + * @throws IOException on CRC or length check fail + */ + private void verifyFooter() throws IOException { + int idx = (int) (_bytesReceivedAtCompletion % FOOTER_SIZE); + byte[] footer; + if (idx == 0) { + footer = _footer; + } else { + footer = new byte[FOOTER_SIZE]; + for (int i = 0; i < FOOTER_SIZE; i++) { + footer[i] = _footer[(int) ((_bytesReceivedAtCompletion + i) % FOOTER_SIZE)]; + } + } + + long actualSize = inf.getTotalOut(); + long expectedSize = DataHelper.fromLongLE(footer, 4, 4); + if (expectedSize != actualSize) + throw new IOException("gunzip expected " + expectedSize + " bytes, got " + actualSize); + + long actualCRC = _crc32.getValue(); + long expectedCRC = DataHelper.fromLongLE(footer, 0, 4); + if (expectedCRC != actualCRC) + throw new IOException("gunzip CRC fail expected 0x" + Long.toHexString(expectedCRC) + + " bytes, got 0x" + Long.toHexString(actualCRC)); + } + + /** + * Make sure the header is valid, throwing an IOException if it is bad. + * Pushes through the state machine, checking as we go. + * Call for each byte until HeaderState is DONE. + */ + private void verifyHeader(byte b) throws IOException { + int c = b & 0xff; + switch (_state) { + case MB1: + if (c != 0x1F) throw new IOException("First magic byte was wrong [" + c + "]"); + _state = HeaderState.MB2; + break; + + case MB2: + if (c != 0x8B) throw new IOException("Second magic byte was wrong [" + c + "]"); + _state = HeaderState.CF; + break; + + case CF: + if (c != 0x08) throw new IOException("Compression format is invalid [" + c + "]"); + _state = HeaderState.FLAGS; + break; + + case FLAGS: + _flags = c; + _state = HeaderState.MT0; + break; + + case MT0: + // ignore + _state = HeaderState.MT1; + break; + + case MT1: + // ignore + _state = HeaderState.MT2; + break; + + case MT2: + // ignore + _state = HeaderState.MT3; + break; + + case MT3: + // ignore + _state = HeaderState.EF; + break; + + case EF: + if ( (c != 0x00) && (c != 0x02) && (c != 0x04) ) + throw new IOException("Invalid extended flags [" + c + "]"); + _state = HeaderState.OS; + break; + + case OS: + // ignore + if (0 != (_flags & (1<<5))) + _state = HeaderState.EH1; + else if (0 != (_flags & (1<<4))) + _state = HeaderState.NAME; + else if (0 != (_flags & (1<<3))) + _state = HeaderState.COMMENT; + else if (0 != (_flags & (1<<6))) + _state = HeaderState.CRC1; + else + _state = HeaderState.DONE; + break; + + case EH1: + _extHdrToRead = c; + _state = HeaderState.EH2; + break; + + case EH2: + _extHdrToRead += (c << 8); + if (_extHdrToRead > 0) + _state = HeaderState.EHDATA; + else if (0 != (_flags & (1<<4))) + _state = HeaderState.NAME; + if (0 != (_flags & (1<<3))) + _state = HeaderState.COMMENT; + else if (0 != (_flags & (1<<6))) + _state = HeaderState.CRC1; + else + _state = HeaderState.DONE; + break; + + case EHDATA: + // ignore + if (--_extHdrToRead <= 0) { + if (0 != (_flags & (1<<4))) + _state = HeaderState.NAME; + if (0 != (_flags & (1<<3))) + _state = HeaderState.COMMENT; + else if (0 != (_flags & (1<<6))) + _state = HeaderState.CRC1; + else + _state = HeaderState.DONE; + } + break; + + case NAME: + // ignore + if (c == 0) { + if (0 != (_flags & (1<<3))) + _state = HeaderState.COMMENT; + else if (0 != (_flags & (1<<6))) + _state = HeaderState.CRC1; + else + _state = HeaderState.DONE; + } + break; + + case COMMENT: + // ignore + if (c == 0) { + if (0 != (_flags & (1<<6))) + _state = HeaderState.CRC1; + else + _state = HeaderState.DONE; + } + break; + + case CRC1: + // ignore + _state = HeaderState.CRC2; + break; + + case CRC2: + // ignore + _state = HeaderState.DONE; + break; + + case DONE: + default: + break; + } + } + +/**** + public static void main(String args[]) { + java.util.Random r = new java.util.Random(); + for (int i = 0; i < 1050; i++) { + byte[] b = new byte[i]; + r.nextBytes(b); + if (!test(b)) return; + } + for (int i = 1; i < 64*1024; i+= 29) { + byte[] b = new byte[i]; + r.nextBytes(b); + if (!test(b)) return; + } + } + + private static boolean test(byte[] b) { + int size = b.length; + try { + java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(size); + java.util.zip.GZIPOutputStream o = new java.util.zip.GZIPOutputStream(baos); + o.write(b); + o.finish(); + o.flush(); + byte compressed[] = baos.toByteArray(); + + java.io.ByteArrayOutputStream baos2 = new java.io.ByteArrayOutputStream(size); + GunzipOutputStream out = new GunzipOutputStream(baos2); + out.write(compressed); + byte rv[] = baos2.toByteArray(); + if (rv.length != b.length) + throw new RuntimeException("read length: " + rv.length + " expected: " + b.length); + + if (!net.i2p.data.DataHelper.eq(rv, 0, b, 0, b.length)) { + throw new RuntimeException("foo, read=" + rv.length); + } else { + System.out.println("match, w00t @ " + size); + return true; + } + } catch (Exception e) { + System.out.println("Error dealing with size=" + size + ": " + e.getMessage()); + e.printStackTrace(); + return false; + } + } +****/ +} diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/HTTPResponseOutputStream.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/HTTPResponseOutputStream.java index 13ea4183f..25f7fcf68 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/HTTPResponseOutputStream.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/HTTPResponseOutputStream.java @@ -10,20 +10,13 @@ package net.i2p.i2ptunnel; import java.io.FilterOutputStream; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; -import java.io.PipedInputStream; -import java.io.PipedOutputStream; import java.util.Locale; -import java.util.concurrent.RejectedExecutionException; import net.i2p.I2PAppContext; import net.i2p.data.ByteArray; -import net.i2p.util.BigPipedInputStream; import net.i2p.util.ByteCache; -import net.i2p.util.I2PAppThread; import net.i2p.util.Log; -import net.i2p.util.ReusableGZIPInputStream; /** * This does the transparent gzip decompression on the client side. @@ -46,7 +39,6 @@ class HTTPResponseOutputStream extends FilterOutputStream { protected boolean _gzip; protected long _dataExpected; protected String _contentType; - private PipedInputStream _pipedInputStream; private static final int CACHE_SIZE = 8*1024; private static final ByteCache _cache = ByteCache.getInstance(8, CACHE_SIZE); @@ -251,124 +243,17 @@ class HTTPResponseOutputStream extends FilterOutputStream { public void close() throws IOException { if (_log.shouldLog(Log.INFO)) _log.info("Closing " + out + " threaded?? " + shouldCompress(), new Exception("I did it")); - PipedInputStream pi; synchronized(this) { // synch with changing out field below super.close(); - pi = _pipedInputStream; - } - // Prevent truncation of gunzipped data as - // I2PTunnelHTTPClientRunner.close() closes the Socket after this. - // Closing pipe only notifies read end, doesn't wait. - // TODO switch to Java 6 InflaterOutputStream and get rid of Pusher thread - if (pi != null) { - for (int i = 0; i < 50; i++) { - if (pi.available() <= 0) { - if (i > 0 && _log.shouldWarn()) - _log.warn("Waited " + (i*20) + " for read side to close"); - break; - } - try { - Thread.sleep(20); - } catch (InterruptedException ie) {} - } } } protected void beginProcessing() throws IOException { //out.flush(); - PipedInputStream pi = BigPipedInputStream.getInstance(); - PipedOutputStream po = new PipedOutputStream(pi); - Runnable r = new Pusher(pi, out); - if (_log.shouldLog(Log.INFO)) - _log.info("Starting threaded decompressing pusher to " + out); + OutputStream po = new GunzipOutputStream(out); synchronized(this) { out = po; - _pipedInputStream = pi; - } - // TODO we should be able to do this inline somehow - TunnelControllerGroup tcg = TunnelControllerGroup.getInstance(); - if (tcg != null) { - // Run in the client thread pool, as there should be an unused thread - // there after the accept(). - // Overridden in I2PTunnelHTTPServer, where it does not use the client pool. - try { - tcg.getClientExecutor().execute(r); - } catch (RejectedExecutionException ree) { - // shouldn't happen - throw ree; - } - } else { - // Fallback in case TCG.getInstance() is null, never instantiated - // and we were not started by TCG. - // Maybe a plugin loaded before TCG? Should be rare. - Thread t = new I2PAppThread(r, "Pusher"); - t.start(); - } - } - - private class Pusher implements Runnable { - private final InputStream _inRaw; - private final OutputStream _out; - - public Pusher(InputStream in, OutputStream out) { - _inRaw = in; - _out = out; - } - - public void run() { - if (_log.shouldLog(Log.INFO)) - _log.info("Starting pusher from " + _inRaw + " to: " + _out); - ReusableGZIPInputStream _in = ReusableGZIPInputStream.acquire(); - long written = 0; - ByteArray ba = null; - try { - // blocking - _in.initialize(_inRaw); - ba = _cache.acquire(); - byte buf[] = ba.getData(); - int read = -1; - while ( (read = _in.read(buf)) != -1) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Read " + read + " and writing it to the browser/streams"); - _out.write(buf, 0, read); - _out.flush(); - written += read; - } - } catch (IOException ioe) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Error decompressing: " + written + ", " + _in.getTotalRead() + - "/" + _in.getTotalExpanded() + - " from " + _inRaw + " to: " + _out, ioe); - } catch (OutOfMemoryError oom) { - _log.error("OOM in HTTP Decompressor", oom); - } finally { - if (_log.shouldInfo()) - _log.info("After decompression, written=" + written + - " read=" + _in.getTotalRead() - + ", expanded=" + _in.getTotalExpanded() + ", remaining=" + _in.getRemaining() - + ", finished=" + _in.getFinished() + - " from " + _inRaw + " to: " + _out); - if (ba != null) - _cache.release(ba); - if (_out != null) try { - _out.close(); - } catch (IOException ioe) {} - try { - _in.close(); - } catch (IOException ioe) {} - } - - double compressed = _in.getTotalRead(); - double expanded = _in.getTotalExpanded(); - ReusableGZIPInputStream.release(_in); - if (compressed > 0 && expanded > 0) { - // only update the stats if we did something - double ratio = compressed/expanded; - _context.statManager().addRateData("i2ptunnel.httpCompressionRatio", (int)(100d*ratio)); - _context.statManager().addRateData("i2ptunnel.httpCompressed", (long)compressed); - _context.statManager().addRateData("i2ptunnel.httpExpanded", (long)expanded); - } } }