diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java index f5f1520f3..4797901be 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java @@ -101,11 +101,17 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna this.l = l; this.handlerName = handlerName + _clientId; - synchronized (sockLock) { - if (ownDest) { - sockMgr = buildSocketManager(); - } else { - sockMgr = getSocketManager(); + while (sockMgr == null) { + synchronized (sockLock) { + if (ownDest) { + sockMgr = buildSocketManager(); + } else { + sockMgr = getSocketManager(); + } + } + if (sockMgr == null) { + _log.log(Log.CRIT, "Unable to create socket manager"); + try { Thread.sleep(10*1000); } catch (InterruptedException ie) {} } } if (sockMgr == null) { diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java index 1f1e02157..12d7824ea 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java @@ -30,7 +30,7 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL * Sun's impl of BufferedOutputStream), but that is the streaming * api's job... */ - static int MAX_PACKET_SIZE = 1024 * 16; + static int MAX_PACKET_SIZE = 1024 * 4; static final int NETWORK_BUFFER_SIZE = MAX_PACKET_SIZE; @@ -215,7 +215,7 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL this.out = out; _toI2P = toI2P; direction = (toI2P ? "toI2P" : "fromI2P"); - _cache = ByteCache.getInstance(16, NETWORK_BUFFER_SIZE); + _cache = ByteCache.getInstance(32, NETWORK_BUFFER_SIZE); setName("StreamForwarder " + _runnerId + "." + (++__forwarderId)); start(); } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java index a1bec018a..d0497fbfd 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java @@ -75,10 +75,16 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { I2PClient client = I2PClientFactory.createClient(); Properties props = new Properties(); props.putAll(getTunnel().getClientOptions()); - synchronized (slock) { - sockMgr = I2PSocketManagerFactory.createManager(privData, getTunnel().host, Integer.parseInt(getTunnel().port), - props); + while (sockMgr == null) { + synchronized (slock) { + sockMgr = I2PSocketManagerFactory.createManager(privData, getTunnel().host, Integer.parseInt(getTunnel().port), + props); + } + if (sockMgr == null) { + _log.log(Log.CRIT, "Unable to create socket manager"); + try { Thread.sleep(10*1000); } catch (InterruptedException ie) {} + } } sockMgr.setName("Server"); getTunnel().addSession(sockMgr.getSession()); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java index 4c2841be5..6bbb3d94d 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java @@ -52,10 +52,12 @@ class PacketQueue { if (_log.shouldLog(Log.DEBUG)) conStr = (packet.getConnection() != null ? packet.getConnection().toString() : ""); if (packet.getAckTime() > 0) { - _log.debug("Not resending " + packet); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Not resending " + packet); return; } else { - _log.debug("Sending... " + packet); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Sending... " + packet); } ByteArray ba = _cache.acquire(); diff --git a/core/java/src/net/i2p/data/DataHelper.java b/core/java/src/net/i2p/data/DataHelper.java index 7b2dd00a8..e3095926b 100644 --- a/core/java/src/net/i2p/data/DataHelper.java +++ b/core/java/src/net/i2p/data/DataHelper.java @@ -31,11 +31,12 @@ import java.util.List; import java.util.Properties; import java.util.TreeMap; import java.util.zip.GZIPInputStream; -import java.util.zip.GZIPOutputStream; import net.i2p.util.ByteCache; import net.i2p.util.CachingByteArrayOutputStream; import net.i2p.util.OrderedProperties; +import net.i2p.util.ReusableGZIPInputStream; +import net.i2p.util.ReusableGZIPOutputStream; /** * Defines some simple IO routines for dealing with marshalling data structures @@ -826,20 +827,21 @@ public class DataHelper { return rv; } + private static final int MAX_UNCOMPRESSED = 40*1024; /** compress the data and return a new GZIP compressed array */ public static byte[] compress(byte orig[]) { return compress(orig, 0, orig.length); } public static byte[] compress(byte orig[], int offset, int size) { if ((orig == null) || (orig.length <= 0)) return orig; + if (size >= MAX_UNCOMPRESSED) + throw new IllegalArgumentException("tell jrandom size=" + size); + ReusableGZIPOutputStream out = ReusableGZIPOutputStream.acquire(); try { - CachingByteArrayOutputStream baos = new CachingByteArrayOutputStream(16, 40*1024); - GZIPOutputStream out = new GZIPOutputStream(baos, size); out.write(orig, offset, size); out.finish(); out.flush(); - byte rv[] = baos.toByteArray(); - baos.releaseBuffer(); + byte rv[] = out.getData(); //if (_log.shouldLog(Log.DEBUG)) // _log.debug("Compression of " + orig.length + " into " + rv.length + " (or " + 100.0d // * (((double) orig.length) / ((double) rv.length)) + "% savings)"); @@ -847,31 +849,34 @@ public class DataHelper { } catch (IOException ioe) { //_log.error("Error compressing?!", ioe); return null; + } finally { + ReusableGZIPOutputStream.release(out); } + } - + /** decompress the GZIP compressed data (returning null on error) */ public static byte[] decompress(byte orig[]) throws IOException { return (orig != null ? decompress(orig, 0, orig.length) : null); } public static byte[] decompress(byte orig[], int offset, int length) throws IOException { if ((orig == null) || (orig.length <= 0)) return orig; - GZIPInputStream in = new GZIPInputStream(new ByteArrayInputStream(orig, offset, length), length); - CachingByteArrayOutputStream baos = new CachingByteArrayOutputStream(16, 40*1024); - ByteCache cache = ByteCache.getInstance(10, 4*1024); - ByteArray ba = cache.acquire(); - byte buf[] = ba.getData(); // new byte[4 * 1024]; + + ReusableGZIPInputStream in = ReusableGZIPInputStream.acquire(); + in.initialize(new ByteArrayInputStream(orig, offset, length)); + + ByteCache cache = ByteCache.getInstance(16, MAX_UNCOMPRESSED); + ByteArray outBuf = cache.acquire(); + int written = 0; while (true) { - int read = in.read(buf); - if (read == -1) break; - baos.write(buf, 0, read); + int read = in.read(outBuf.getData(), written, MAX_UNCOMPRESSED-written); + if (read == -1) + break; + written += read; } - byte rv[] = baos.toByteArray(); - cache.release(ba); - baos.releaseBuffer(); - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("Decompression of " + orig.length + " into " + rv.length + " (or " + 100.0d - // * (((double) rv.length) / ((double) orig.length)) + "% savings)"); + byte rv[] = new byte[written]; + System.arraycopy(outBuf.getData(), 0, rv, 0, written); + cache.release(outBuf); return rv; } diff --git a/core/java/src/net/i2p/data/LeaseSet.java b/core/java/src/net/i2p/data/LeaseSet.java index 6e480ea52..214b7df7e 100644 --- a/core/java/src/net/i2p/data/LeaseSet.java +++ b/core/java/src/net/i2p/data/LeaseSet.java @@ -298,6 +298,16 @@ public class LeaseSet extends DataStructureImpl { } _signature.writeBytes(out); } + + public int size() { + return PublicKey.KEYSIZE_BYTES //destination.pubKey + + SigningPublicKey.KEYSIZE_BYTES // destination.signPubKey + + 2 // destination.certificate + + PublicKey.KEYSIZE_BYTES // encryptionKey + + SigningPublicKey.KEYSIZE_BYTES // signingKey + + 1 + + _leases.size() * (Hash.HASH_LENGTH + 4 + 8); + } public boolean equals(Object object) { if ((object == null) || !(object instanceof LeaseSet)) return false; diff --git a/core/java/src/net/i2p/data/i2cp/CreateLeaseSetMessage.java b/core/java/src/net/i2p/data/i2cp/CreateLeaseSetMessage.java index 596796311..25c5f7945 100644 --- a/core/java/src/net/i2p/data/i2cp/CreateLeaseSetMessage.java +++ b/core/java/src/net/i2p/data/i2cp/CreateLeaseSetMessage.java @@ -91,7 +91,11 @@ public class CreateLeaseSetMessage extends I2CPMessageImpl { protected byte[] doWriteMessage() throws I2CPMessageException, IOException { if ((_sessionId == null) || (_signingPrivateKey == null) || (_privateKey == null) || (_leaseSet == null)) throw new I2CPMessageException("Unable to write out the message as there is not enough data"); - ByteArrayOutputStream os = new ByteArrayOutputStream(512); + int size = 4 // sessionId + + SigningPrivateKey.KEYSIZE_BYTES + + PrivateKey.KEYSIZE_BYTES + + _leaseSet.size(); + ByteArrayOutputStream os = new ByteArrayOutputStream(size); try { _sessionId.writeBytes(os); _signingPrivateKey.writeBytes(os); diff --git a/core/java/src/net/i2p/util/LookaheadInputStream.java b/core/java/src/net/i2p/util/LookaheadInputStream.java new file mode 100644 index 000000000..774f09be4 --- /dev/null +++ b/core/java/src/net/i2p/util/LookaheadInputStream.java @@ -0,0 +1,128 @@ +package net.i2p.util; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.FilterInputStream; +import java.util.Arrays; + +/** + * Simple lookahead buffer to keep the last K bytes in reserve, + * configured to easily be reused. Currently only used by the + * ResettableGZIPInputStream + * + */ +public class LookaheadInputStream extends FilterInputStream { + private boolean _eofReached; + private byte[] _footerLookahead; + private static final InputStream _fakeInputStream = new ByteArrayInputStream(new byte[0]); + + public LookaheadInputStream(int lookaheadSize) { + super(_fakeInputStream); + _eofReached = false; + _footerLookahead = new byte[lookaheadSize]; + } + + public boolean getEOFReached() { return _eofReached; } + + public void initialize(InputStream src) throws IOException { + in = src; + _eofReached = false; + Arrays.fill(_footerLookahead, (byte)0x00); + int footerRead = 0; + while (footerRead < _footerLookahead.length) { + int read = in.read(_footerLookahead); + if (read == -1) throw new IOException("EOF reading the footer lookahead"); + footerRead += read; + } + boolean f = true; + } + + public int read() throws IOException { + if (_eofReached) + return -1; //throw new IOException("Already past the EOF"); + int c = in.read(); + if (c == -1) { + _eofReached = true; + return -1; + } + int rv = _footerLookahead[0]; + System.arraycopy(_footerLookahead, 1, _footerLookahead, 0, _footerLookahead.length-1); + _footerLookahead[_footerLookahead.length-1] = (byte)c; + if (rv < 0) rv += 256; + return rv; + } + public int read(byte buf[]) throws IOException { + return read(buf, 0, buf.length); + } + public int read(byte buf[], int off, int len) throws IOException { + if (_eofReached) + return -1; + for (int i = 0; i < len; i++) { + int c = read(); + if (c == -1) { + if (i == 0) + return -1; + else + return i; + } else { + buf[off+i] = (byte)c; + } + } + return len; + } + + /** grab the lookahead footer */ + public byte[] getFooter() { return _footerLookahead; } + + public static void main(String args[]) { + byte buf[] = new byte[32]; + for (int i = 0; i < 32; i++) + buf[i] = (byte)i; + ByteArrayInputStream bais = new ByteArrayInputStream(buf); + try { + LookaheadInputStream lis = new LookaheadInputStream(8); + lis.initialize(bais); + byte rbuf[] = new byte[32]; + int read = lis.read(rbuf); + if (read != 24) throw new RuntimeException("Should have stopped (read=" + read + ")"); + for (int i = 0; i < 24; i++) + if (rbuf[i] != (byte)i) + throw new RuntimeException("Error at " + i + " [" + rbuf[i] + "]"); + for (int i = 0; i < 8; i++) + if (lis.getFooter()[i] != (byte)(i+24)) + throw new RuntimeException("Error at footer " + i + " [" + lis.getFooter()[i] + "]"); + System.out.println("Everything is fine in general"); + } catch (Exception e) { + e.printStackTrace(); + } + + for (int i = 9; i < 32*1024; i++) { + if (!test(i)) break; + } + } + + private static boolean test(int size) { + byte buf[] = new byte[size]; + new java.util.Random().nextBytes(buf); + ByteArrayInputStream bais = new ByteArrayInputStream(buf); + try { + LookaheadInputStream lis = new LookaheadInputStream(8); + lis.initialize(bais); + byte rbuf[] = new byte[size]; + int read = lis.read(rbuf); + if (read != (size-8)) throw new RuntimeException("Should have stopped (read=" + read + ")"); + for (int i = 0; i < (size-8); i++) + if (rbuf[i] != buf[i]) + throw new RuntimeException("Error at " + i + " [" + rbuf[i] + "]"); + for (int i = 0; i < 8; i++) + if (lis.getFooter()[i] != buf[i+(size-8)]) + throw new RuntimeException("Error at footer " + i + " [" + lis.getFooter()[i] + "]"); + System.out.println("Everything is fine at size=" + size); + return true; + } catch (Exception e) { + e.printStackTrace(); + return false; + } + } +} diff --git a/core/java/src/net/i2p/util/ResettableGZIPInputStream.java b/core/java/src/net/i2p/util/ResettableGZIPInputStream.java new file mode 100644 index 000000000..2896fa587 --- /dev/null +++ b/core/java/src/net/i2p/util/ResettableGZIPInputStream.java @@ -0,0 +1,281 @@ +package net.i2p.util; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; + +import java.util.zip.CRC32; +import java.util.zip.Inflater; +import java.util.zip.InflaterInputStream; +import java.util.zip.GZIPInputStream; + +/** + * GZIP implementation per + * RFC 1952, reusing + * java's standard CRC32 and Inflater and InflaterInputStream implementations. + * The main difference is that this implementation allows its state to be + * reset to initial values, and hence reused, while the standard + * GZIPInputStream reads the GZIP header from the stream on instantiation. + * + */ +public class ResettableGZIPInputStream extends InflaterInputStream { + private static final int FOOTER_SIZE = 8; // CRC32 + ISIZE + private static final boolean DEBUG = false; + /** keep a typesafe copy of (LookaheadInputStream)in */ + private LookaheadInputStream _lookaheadStream; + private CRC32 _crc32; + private byte _buf1[] = new byte[1]; + private boolean _complete; + + /** + * Build a new GZIP stream without a bound compressed stream. You need + * to initialize this with initialize(compressedStream) when you want to + * decompress a stream. + */ + public ResettableGZIPInputStream() { + super(new LookaheadInputStream(FOOTER_SIZE), new Inflater(true)); + _lookaheadStream = (LookaheadInputStream)in; + _crc32 = new CRC32(); + _complete = false; + } + public ResettableGZIPInputStream(InputStream compressedStream) throws IOException { + this(); + initialize(compressedStream); + } + + /** + * Blocking call to initialize this stream with the data from the given + * compressed stream. + * + */ + public void initialize(InputStream compressedStream) throws IOException { + len = 0; + inf.reset(); + _complete = false; + _crc32.reset(); + _buf1[0] = 0x0; + // blocking call to read the footer/lookahead, and use the compressed + // stream as the source for further lookahead bytes + _lookaheadStream.initialize(compressedStream); + // now blocking call to read and verify the GZIP header from the + // lookahead stream + verifyHeader(); + } + + public int read() throws IOException { + if (_complete) { + // shortcircuit so the inflater doesn't try to refill + // with the footer's data (which would fail, causing ZLIB err) + return -1; + } + int read = read(_buf1, 0, 1); + if (read == -1) + return -1; + else + return _buf1[0]; + } + + public int read(byte buf[]) throws IOException { + return read(buf, 0, buf.length); + } + public int read(byte buf[], int off, int len) throws IOException { + if (_complete) { + // shortcircuit so the inflater doesn't try to refill + // with the footer's data (which would fail, causing ZLIB err) + return -1; + } + int read = super.read(buf, off, len); + if (read == -1) { + verifyFooter(); + return -1; + } else { + _crc32.update(buf, off, read); + if (_lookaheadStream.getEOFReached()) { + verifyFooter(); + inf.reset(); // so it doesn't bitch about missing data... + _complete = true; + } + return read; + } + } + + long getCurrentCRCVal() { return _crc32.getValue(); } + + void verifyFooter() throws IOException { + byte footer[] = _lookaheadStream.getFooter(); + + long expectedCRCVal = _crc32.getValue(); + + // damn RFC writing their bytes backwards... + if (!(footer[0] == (byte)(expectedCRCVal & 0xFF))) + throw new IOException("footer[0]=" + footer[0] + " expectedCRC[0]=" + + (expectedCRCVal & 0xFF)); + if (!(footer[1] == (byte)(expectedCRCVal >>> 8))) + throw new IOException("footer[1]=" + footer[1] + " expectedCRC[1]=" + + ((expectedCRCVal >>> 8) & 0xFF)); + if (!(footer[2] == (byte)(expectedCRCVal >>> 16))) + throw new IOException("footer[2]=" + footer[2] + " expectedCRC[2]=" + + ((expectedCRCVal >>> 16) & 0xFF)); + if (!(footer[3] == (byte)(expectedCRCVal >>> 24))) + throw new IOException("footer[3]=" + footer[3] + " expectedCRC[3]=" + + ((expectedCRCVal >>> 24) & 0xFF)); + + int expectedSizeVal = inf.getTotalOut(); + + if (!(footer[4] == (byte)expectedSizeVal)) + throw new IOException("footer[4]=" + footer[4] + " expectedSize[0]=" + + (expectedSizeVal & 0xFF)); + if (!(footer[5] == (byte)(expectedSizeVal >>> 8))) + throw new IOException("footer[5]=" + footer[5] + " expectedSize[1]=" + + ((expectedSizeVal >>> 8) & 0xFF)); + if (!(footer[6] == (byte)(expectedSizeVal >>> 16))) + throw new IOException("footer[6]=" + footer[6] + " expectedSize[2]=" + + ((expectedSizeVal >>> 16) & 0xFF)); + if (!(footer[7] == (byte)(expectedSizeVal >>> 24))) + throw new IOException("footer[7]=" + footer[7] + " expectedSize[3]=" + + ((expectedSizeVal >>> 24) & 0xFF)); + } + + /** + * Make sure the header is valid, throwing an IOException if its b0rked. + */ + private void verifyHeader() throws IOException { + int c = in.read(); + if (c != 0x1F) throw new IOException("First magic byte was wrong [" + c + "]"); + c = in.read(); + if (c != 0x8B) throw new IOException("Second magic byte was wrong [" + c + "]"); + c = in.read(); + if (c != 0x08) throw new IOException("Compression format is invalid [" + c + "]"); + + int flags = in.read(); + + // snag (and ignore) the MTIME + c = in.read(); + if (c == -1) throw new IOException("EOF on MTIME0 [" + c + "]"); + c = in.read(); + if (c == -1) throw new IOException("EOF on MTIME1 [" + c + "]"); + c = in.read(); + if (c == -1) throw new IOException("EOF on MTIME2 [" + c + "]"); + c = in.read(); + if (c == -1) throw new IOException("EOF on MTIME3 [" + c + "]"); + + c = in.read(); + if ( (c != 0x00) && (c != 0x02) && (c != 0x04) ) + throw new IOException("Invalid extended flags [" + c + "]"); + + c = in.read(); // ignore creator OS + + // handle flags... + if (0 != (flags & (1<<5))) { + // extra header, read and ignore + int len = 0; + c = in.read(); + if (c == -1) throw new IOException("EOF reading the extra header"); + len = c; + c = in.read(); + if (c == -1) throw new IOException("EOF reading the extra header"); + len += (c << 8); + + // now skip that data + for (int i = 0; i < len; i++) { + c = in.read(); + if (c == -1) throw new IOException("EOF reading the extra header's body"); + } + } + + if (0 != (flags & (1 << 4))) { + // ignore the name + c = in.read(); + while (c != 0) { + if (c == -1) throw new IOException("EOF reading the name"); + c = in.read(); + } + } + + if (0 != (flags & (1 << 3))) { + // ignore the comment + c = in.read(); + while (c != 0) { + if (c == -1) throw new IOException("EOF reading the comment"); + c = in.read(); + } + } + + if (0 != (flags & (1 << 6))) { + // ignore the header CRC16 (we still check the body CRC32) + c = in.read(); + if (c == -1) throw new IOException("EOF reading the CRC16"); + c = in.read(); + if (c == -1) throw new IOException("EOF reading the CRC16"); + } + } + + public static void main(String args[]) { + for (int i = 129; i < 64*1024; i++) { + if (!test(i)) return; + } + + byte orig[] = "ho ho ho, merry christmas".getBytes(); + try { + java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(64); + java.util.zip.GZIPOutputStream o = new java.util.zip.GZIPOutputStream(baos); + o.write(orig); + o.finish(); + o.flush(); + o.close(); + byte compressed[] = baos.toByteArray(); + + ResettableGZIPInputStream i = new ResettableGZIPInputStream(); + i.initialize(new ByteArrayInputStream(compressed)); + byte readBuf[] = new byte[128]; + int read = i.read(readBuf); + if (read != orig.length) + throw new RuntimeException("read=" + read); + for (int j = 0; j < read; j++) + if (readBuf[j] != orig[j]) + throw new RuntimeException("wtf, j=" + j + " readBuf=" + readBuf[j] + " orig=" + orig[j]); + boolean ok = (-1 == i.read()); + if (!ok) throw new RuntimeException("wtf, not EOF after the data?"); + System.out.println("Match ok"); + } catch (Exception e) { + e.printStackTrace(); + } + } + + private static boolean test(int size) { + byte b[] = new byte[size]; + new java.util.Random().nextBytes(b); + try { + java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(size); + java.util.zip.GZIPOutputStream o = new java.util.zip.GZIPOutputStream(baos); + o.write(b); + o.finish(); + o.flush(); + byte compressed[] = baos.toByteArray(); + + ResettableGZIPInputStream in = new ResettableGZIPInputStream(new ByteArrayInputStream(compressed)); + java.io.ByteArrayOutputStream baos2 = new java.io.ByteArrayOutputStream(size); + byte rbuf[] = new byte[512]; + while (true) { + int read = in.read(rbuf); + if (read == -1) + break; + baos2.write(rbuf, 0, read); + } + byte rv[] = baos2.toByteArray(); + if (rv.length != b.length) + throw new RuntimeException("read length: " + rv.length + " expected: " + b.length); + + if (!net.i2p.data.DataHelper.eq(rv, 0, b, 0, b.length)) { + throw new RuntimeException("foo, read=" + rv.length); + } else { + System.out.println("match, w00t @ " + size); + return true; + } + } catch (Exception e) { + System.out.println("Error dealing with size=" + size + ": " + e.getMessage()); + e.printStackTrace(); + return false; + } + } +} diff --git a/core/java/src/net/i2p/util/ResettableGZIPOutputStream.java b/core/java/src/net/i2p/util/ResettableGZIPOutputStream.java new file mode 100644 index 000000000..3d5184fb5 --- /dev/null +++ b/core/java/src/net/i2p/util/ResettableGZIPOutputStream.java @@ -0,0 +1,171 @@ +package net.i2p.util; + +import java.io.ByteArrayOutputStream; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.OutputStream; + +import java.util.zip.CRC32; +import java.util.zip.Deflater; +import java.util.zip.DeflaterOutputStream; +import java.util.zip.GZIPOutputStream; +import java.util.zip.GZIPInputStream; +import net.i2p.data.DataHelper; + +/** + * GZIP implementation per + * RFC 1952, reusing + * java's standard CRC32 and Deflater implementations. The main difference + * is that this implementation allows its state to be reset to initial + * values, and hence reused, while the standard GZIPOutputStream writes the + * GZIP header to the stream on instantiation, rather than on first write. + * + */ +public class ResettableGZIPOutputStream extends DeflaterOutputStream { + /** has the header been written out yet? */ + private boolean _headerWritten; + /** how much data is in the uncompressed stream? */ + private long _writtenSize; + private CRC32 _crc32; + private static final boolean DEBUG = false; + + public ResettableGZIPOutputStream(OutputStream o) { + super(o, new Deflater(9, true)); + _headerWritten = false; + _crc32 = new CRC32(); + } + /** + * Reinitialze everything so we can write a brand new gzip output stream + * again. + */ + public void reset() { + if (DEBUG) + System.out.println("Resetting (writtenSize=" + _writtenSize + ")"); + def.reset(); + _crc32.reset(); + _writtenSize = 0; + _headerWritten = false; + } + + private static final byte[] HEADER = new byte[] { + (byte)0x1F, (byte)0x8b, // magic bytes + 0x08, // compression format == DEFLATE + 0x00, // flags (NOT using CRC16, filename, etc) + 0x00, 0x00, 0x00, 0x00, // no modification time available (don't leak this!) + 0x02, // maximum compression + (byte)0xFF // unknown creator OS (!!!) + }; + + /** + * obviously not threadsafe, but its a stream, thats standard + */ + private void ensureHeaderIsWritten() throws IOException { + if (_headerWritten) return; + if (DEBUG) System.out.println("Writing header"); + out.write(HEADER); + _headerWritten = true; + } + + private void writeFooter() throws IOException { + // damn RFC writing their bytes backwards... + long crcVal = _crc32.getValue(); + out.write((int)(crcVal & 0xFF)); + out.write((int)((crcVal >>> 8) & 0xFF)); + out.write((int)((crcVal >>> 16) & 0xFF)); + out.write((int)((crcVal >>> 24) & 0xFF)); + + long sizeVal = _writtenSize; // % (1 << 31) // *redundant* + out.write((int)(sizeVal & 0xFF)); + out.write((int)((sizeVal >>> 8) & 0xFF)); + out.write((int)((sizeVal >>> 16) & 0xFF)); + out.write((int)((sizeVal >>> 24) & 0xFF)); + out.flush(); + if (DEBUG) { + System.out.println("Footer written: crcVal=" + crcVal + " sizeVal=" + sizeVal + " written=" + _writtenSize); + System.out.println("size hex: " + Long.toHexString(sizeVal)); + System.out.print( "size2 hex:" + Long.toHexString((int)(sizeVal & 0xFF))); + System.out.print( Long.toHexString((int)((sizeVal >>> 8) & 0xFF))); + System.out.print( Long.toHexString((int)((sizeVal >>> 16) & 0xFF))); + System.out.print( Long.toHexString((int)((sizeVal >>> 24) & 0xFF))); + System.out.println(); + } + } + + public void close() throws IOException { + finish(); + super.close(); + } + public void finish() throws IOException { + ensureHeaderIsWritten(); + super.finish(); + writeFooter(); + } + + public void write(int b) throws IOException { + ensureHeaderIsWritten(); + _crc32.update(b); + _writtenSize++; + super.write(b); + } + public void write(byte buf[]) throws IOException { + write(buf, 0, buf.length); + } + public void write(byte buf[], int off, int len) throws IOException { + ensureHeaderIsWritten(); + _crc32.update(buf, off, len); + _writtenSize += len; + super.write(buf, off, len); + } + + public static void main(String args[]) { + for (int i = 0; i < 2; i++) + test(); + } + private static void test() { + byte b[] = "hi, how are you today?".getBytes(); + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(64); + ResettableGZIPOutputStream o = new ResettableGZIPOutputStream(baos); + o.write(b); + o.finish(); + o.flush(); + byte compressed[] = baos.toByteArray(); + + ByteArrayOutputStream baos2 = new ByteArrayOutputStream(); + SnoopGZIPOutputStream gzo = new SnoopGZIPOutputStream(baos2); + gzo.write(b); + gzo.finish(); + gzo.flush(); + long value = gzo.getCRC().getValue(); + byte compressed2[] = baos2.toByteArray(); + System.out.println("CRC32 values: Resettable = " + o._crc32.getValue() + + " GZIP = " + value); + + System.out.print("Resettable compressed data: "); + for (int i = 0; i < compressed.length; i++) + System.out.print(Integer.toHexString(compressed[i] & 0xFF) + " "); + System.out.println(); + System.out.print(" GZIP compressed data: "); + for (int i = 0; i < compressed2.length; i++) + System.out.print(Integer.toHexString(compressed2[i] & 0xFF) + " "); + System.out.println(); + + GZIPInputStream in = new GZIPInputStream(new ByteArrayInputStream(compressed)); + byte rv[] = new byte[128]; + int read = in.read(rv); + if (!DataHelper.eq(rv, 0, b, 0, b.length)) + throw new RuntimeException("foo, read=" + read); + else + System.out.println("match, w00t"); + } catch (Exception e) { e.printStackTrace(); } + } + + /** just for testing/verification, expose the CRC32 values */ + private static final class SnoopGZIPOutputStream extends GZIPOutputStream { + public SnoopGZIPOutputStream(OutputStream o) throws IOException { + super(o); + } + public CRC32 getCRC() { return crc; } + } +} + diff --git a/core/java/src/net/i2p/util/ReusableGZIPInputStream.java b/core/java/src/net/i2p/util/ReusableGZIPInputStream.java new file mode 100644 index 000000000..4e330fe55 --- /dev/null +++ b/core/java/src/net/i2p/util/ReusableGZIPInputStream.java @@ -0,0 +1,126 @@ +package net.i2p.util; + +import java.io.ByteArrayOutputStream; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.zip.GZIPOutputStream; +import java.util.zip.GZIPInputStream; +import net.i2p.data.DataHelper; + +/** + * Provide a cache of reusable GZIP streams, each handling up to 32KB without + * expansion. + * + */ +public class ReusableGZIPInputStream extends ResettableGZIPInputStream { + private static ArrayList _available = new ArrayList(16); + /** + * Pull a cached instance + */ + public static ReusableGZIPInputStream acquire() { + ReusableGZIPInputStream rv = null; + synchronized (_available) { + if (_available.size() > 0) + rv = (ReusableGZIPInputStream)_available.remove(0); + } + if (rv == null) { + rv = new ReusableGZIPInputStream(); + } + return rv; + } + /** + * Release an instance back into the cache (this will reset the + * state) + */ + public static void release(ReusableGZIPInputStream released) { + synchronized (_available) { + if (_available.size() < 16) + _available.add(released); + } + } + + private ReusableGZIPInputStream() { super(); } + + public static void main(String args[]) { + for (int i = 0; i < 2; i++) + test(); + for (int i = 0; i < 64*1024; i++) { + if (!test(i)) break; + } + } + private static void test() { + byte b[] = "hi, how are you today?".getBytes(); + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(64); + GZIPOutputStream o = new GZIPOutputStream(baos); + o.write(b); + o.finish(); + o.flush(); + byte compressed[] = baos.toByteArray(); + + ReusableGZIPInputStream in = ReusableGZIPInputStream.acquire(); + in.initialize(new ByteArrayInputStream(compressed)); + byte rv[] = new byte[128]; + int read = in.read(rv); + if (!DataHelper.eq(rv, 0, b, 0, b.length)) + throw new RuntimeException("foo, read=" + read); + else + System.out.println("match, w00t"); + ReusableGZIPInputStream.release(in); + } catch (Exception e) { e.printStackTrace(); } + } + + private static boolean test(int size) { + byte b[] = new byte[size]; + new java.util.Random().nextBytes(b); + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(size); + GZIPOutputStream o = new GZIPOutputStream(baos); + o.write(b); + o.finish(); + o.flush(); + byte compressed[] = baos.toByteArray(); + + ReusableGZIPInputStream in = ReusableGZIPInputStream.acquire(); + in.initialize(new ByteArrayInputStream(compressed)); + ByteArrayOutputStream baos2 = new ByteArrayOutputStream(size); + byte rbuf[] = new byte[128]; + try { + while (true) { + int read = in.read(rbuf); + if (read == -1) + break; + baos2.write(rbuf, 0, read); + } + } catch (IOException ioe) { + ioe.printStackTrace(); + long crcVal = in.getCurrentCRCVal(); + //try { in.verifyFooter(); } catch (IOException ioee) { + // ioee.printStackTrace(); + //} + throw ioe; + } catch (RuntimeException re) { + re.printStackTrace(); + throw re; + } + ReusableGZIPInputStream.release(in); + byte rv[] = baos2.toByteArray(); + if (rv.length != b.length) + throw new RuntimeException("read length: " + rv.length + " expected: " + b.length); + + if (!DataHelper.eq(rv, 0, b, 0, b.length)) { + throw new RuntimeException("foo, read=" + rv.length); + } else { + System.out.println("match, w00t"); + return true; + } + } catch (Exception e) { + System.out.println("Error dealing with size=" + size + ": " + e.getMessage()); + e.printStackTrace(); + return false; + } + } +} + diff --git a/core/java/src/net/i2p/util/ReusableGZIPOutputStream.java b/core/java/src/net/i2p/util/ReusableGZIPOutputStream.java new file mode 100644 index 000000000..ebdd1f391 --- /dev/null +++ b/core/java/src/net/i2p/util/ReusableGZIPOutputStream.java @@ -0,0 +1,124 @@ +package net.i2p.util; + +import java.io.ByteArrayOutputStream; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.zip.GZIPOutputStream; +import java.util.zip.GZIPInputStream; +import net.i2p.data.DataHelper; + +/** + * Provide a cache of reusable GZIP streams, each handling up to 32KB without + * expansion. + * + */ +public class ReusableGZIPOutputStream extends ResettableGZIPOutputStream { + private static ArrayList _available = new ArrayList(16); + /** + * Pull a cached instance + */ + public static ReusableGZIPOutputStream acquire() { + ReusableGZIPOutputStream rv = null; + synchronized (_available) { + if (_available.size() > 0) + rv = (ReusableGZIPOutputStream)_available.remove(0); + } + if (rv == null) { + rv = new ReusableGZIPOutputStream(); + } + return rv; + } + + /** + * Release an instance back into the cache (this will discard any + * state) + */ + public static void release(ReusableGZIPOutputStream out) { + out.reset(); + synchronized (_available) { + if (_available.size() < 16) + _available.add(out); + } + } + + private ByteArrayOutputStream _buffer = null; + private ReusableGZIPOutputStream() { + super(new ByteArrayOutputStream(40*1024)); + _buffer = (ByteArrayOutputStream)out; + } + /** clear the data so we can start again afresh */ + public void reset() { + super.reset(); + _buffer.reset(); + } + /** pull the contents of the stream written */ + public byte[] getData() { return _buffer.toByteArray(); } + + public static void main(String args[]) { + try { + for (int i = 0; i < 2; i++) + test(); + for (int i = 500; i < 64*1024; i++) { + if (!test(i)) break; + } + } catch (Exception e) { e.printStackTrace(); } + try { Thread.sleep(10*1000); } catch (InterruptedException ie){} + System.out.println("After all tests are complete..."); + } + private static void test() { + byte b[] = "hi, how are you today?".getBytes(); + try { + ReusableGZIPOutputStream o = ReusableGZIPOutputStream.acquire(); + o.write(b); + o.finish(); + o.flush(); + byte compressed[] = o.getData(); + ReusableGZIPOutputStream.release(o); + + GZIPInputStream in = new GZIPInputStream(new ByteArrayInputStream(compressed)); + byte rv[] = new byte[128]; + int read = in.read(rv); + if (!DataHelper.eq(rv, 0, b, 0, b.length)) + throw new RuntimeException("foo, read=" + read); + else + System.out.println("match, w00t"); + } catch (Exception e) { e.printStackTrace(); } + } + + private static boolean test(int size) { + byte b[] = new byte[size]; + new java.util.Random().nextBytes(b); + try { + ReusableGZIPOutputStream o = ReusableGZIPOutputStream.acquire(); + o.write(b); + o.finish(); + o.flush(); + byte compressed[] = o.getData(); + ReusableGZIPOutputStream.release(o); + + GZIPInputStream in = new GZIPInputStream(new ByteArrayInputStream(compressed)); + ByteArrayOutputStream baos2 = new ByteArrayOutputStream(256*1024); + byte rbuf[] = new byte[128]; + while (true) { + int read = in.read(rbuf); + if (read == -1) + break; + baos2.write(rbuf, 0, read); + } + byte rv[] = baos2.toByteArray(); + if (!DataHelper.eq(rv, 0, b, 0, b.length)) { + throw new RuntimeException("foo, read=" + rv.length); + } else { + System.out.println("match, w00t @ " + size); + return true; + } + } catch (Exception e) { + System.out.println("Error on size=" + size + ": " + e.getMessage()); + e.printStackTrace(); + return false; + } + } +} + diff --git a/core/java/test/net/i2p/data/DataHelperTest.java b/core/java/test/net/i2p/data/DataHelperTest.java index 486fe3ba4..05f21bbc7 100644 --- a/core/java/test/net/i2p/data/DataHelperTest.java +++ b/core/java/test/net/i2p/data/DataHelperTest.java @@ -23,6 +23,8 @@ public class DataHelperTest { } public void runTests() { + // compress + testCompress(); // long (read/write/to/from) testLong(); // date (read/write/to/from) @@ -31,7 +33,6 @@ public class DataHelperTest { // properties // boolean // readline - // compress } /** @@ -153,6 +154,28 @@ public class DataHelperTest { } } + private void testCompress() { + for (int i = 0; i < 32*1024; i++) + testCompress(i); + } + + private void testCompress(int size) { + byte data[] = new byte[size]; + _context.random().nextBytes(data); + byte compressed[] = DataHelper.compress(data); + try { + byte decompressed[] = DataHelper.decompress(compressed); + boolean ok = DataHelper.eq(data, decompressed); + if (!ok) + throw new RuntimeException("failed match at size=" + size); + else + System.out.println("Match at size=" + size); + } catch (java.io.IOException ioe) { + ioe.printStackTrace(); + throw new RuntimeException("Error at size=" + size +":" + ioe.getMessage()); + } + } + public static void main(String args[]) { DataHelperTest test = new DataHelperTest(I2PAppContext.getGlobalContext()); test.runTests(); diff --git a/history.txt b/history.txt index 758c9a87c..c60663492 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,15 @@ -$Id: history.txt,v 1.161 2005/03/01 12:50:54 jrandom Exp $ +$Id: history.txt,v 1.162 2005/03/02 22:36:52 jrandom Exp $ + +2005-03-03 jrandom + * Loop while starting up the I2PTunnel instances, in case the I2CP + listener isn't up yet (thanks detonate!) + * Implement custom reusable GZIP streams to both reduce memory churn + and prevent the exposure of data in the standard GZIP header (creation + time, OS, etc). This is RFC1952 compliant, and backwards compatible, + though has only been tested within the confines of I2P's compression use + (DataHelper.[de]compress). + * Preemptively support the next protocol version, so that after the 0.5.0.2 + release, we'll be able to drop protocol=2 to get rid of 0.5 users. 2005-03-02 jrandom * Fix one substantial OOM cause (session tag manager was only dropping diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index b32052883..b259edd3e 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.156 $ $Date: 2005/03/01 12:50:54 $"; + public final static String ID = "$Revision: 1.157 $ $Date: 2005/03/02 22:36:53 $"; public final static String VERSION = "0.5.0.1"; - public final static long BUILD = 8; + public final static long BUILD = 9; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java b/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java index 3665f3fa7..d08e77653 100644 --- a/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java +++ b/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java @@ -87,7 +87,9 @@ public class TCPTransport extends TransportImpl { public static final int DEFAULT_ESTABLISHERS = 3; /** Ordered list of supported I2NP protocols */ - public static final int[] SUPPORTED_PROTOCOLS = new int[] { 2 }; + public static final int[] SUPPORTED_PROTOCOLS = new int[] { 2 + , 3 // forward compat, so we can drop 0.5 builds after 0.5.0.2 + }; /** blah, people shouldnt use defaults... */ public static final int DEFAULT_LISTEN_PORT = 8887;