Fix integer overflow in DataOutputBuffer doubling and test as best as possible given that allocating 2 gigs in a unit test is problematic.
Patch by Ariel Weisberg; reviewed by tjake for CASSANDRA-10592 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f7aaea01 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f7aaea01 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f7aaea01 Branch: refs/heads/cassandra-3.0 Commit: f7aaea013e98178064103d9b4cd39f66bad083f3 Parents: f785f8b Author: Ariel Weisberg <ariel.weisb...@datastax.com> Authored: Tue Dec 1 12:33:46 2015 -0500 Committer: T Jake Luciani <j...@apache.org> Committed: Tue Dec 1 22:43:03 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../io/compress/CompressedSequentialWriter.java | 2 +- .../io/util/BufferedDataOutputStreamPlus.java | 22 +- .../cassandra/io/util/DataOutputBuffer.java | 78 ++++++- .../io/util/DataOutputBufferFixed.java | 2 +- .../cassandra/io/util/SafeMemoryWriter.java | 10 +- .../cassandra/io/util/SequentialWriter.java | 4 +- .../cassandra/io/util/DataOutputTest.java | 202 +++++++++++++++++++ 8 files changed, 301 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7aaea01/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a01011b..1af2745 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -14,6 +14,7 @@ * Keep the file open in trySkipCache (CASSANDRA-10669) * Updated trigger example (CASSANDRA-10257) Merged from 2.2: + * Fix IllegalArgumentException in DataOutputBuffer.reallocate for large buffers (CASSANDRA-10592) * Show CQL help in cqlsh in web browser (CASSANDRA-7225) * Serialize on disk the proper SSTable compression ratio (CASSANDRA-10775) * Reject index queries while the index is building (CASSANDRA-8505) @@ -90,6 +91,7 @@ Merged from 2.2: * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628) * Expose phi values from failure detector via JMX and tweak debug and trace logging (CASSANDRA-9526) + * Fix IllegalArgumentException in DataOutputBuffer.reallocate for large buffers (CASSANDRA-10592) Merged from 2.1: * Shutdown compaction in drain to prevent leak (CASSANDRA-10079) * (cqlsh) fix COPY using wrong variable name for time_format (CASSANDRA-10633) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7aaea01/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java index bbec6f5..14f1ba7 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java +++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java @@ -156,7 +156,7 @@ public class CompressedSequentialWriter extends SequentialWriter public FileMark mark() { if (!buffer.hasRemaining()) - doFlush(); + doFlush(0); return new CompressedFileWriterMark(chunkOffset, current(), buffer.position(), chunkCount + 1); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7aaea01/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java index 9434219..54122ee 100644 --- a/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java +++ b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java @@ -129,7 +129,7 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus } else { - doFlush(); + doFlush(len - copied); } } } @@ -154,8 +154,9 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus { assert toWrite.isDirect(); MemoryUtil.duplicateDirectByteBuffer(toWrite, hollowBuffer); + int toWriteRemaining = toWrite.remaining(); - if (toWrite.remaining() > buffer.remaining()) + if (toWriteRemaining > buffer.remaining()) { if (strictFlushing) { @@ -163,7 +164,7 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus } else { - doFlush(); + doFlush(toWriteRemaining - buffer.remaining()); while (hollowBuffer.remaining() > buffer.capacity()) channel.write(hollowBuffer); } @@ -182,7 +183,7 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus { hollowBuffer.limit(hollowBuffer.position() + buffer.remaining()); buffer.put(hollowBuffer); - doFlush(); + doFlush(originalLimit - hollowBuffer.position()); } hollowBuffer.limit(originalLimit); } @@ -191,7 +192,7 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus public void write(int b) throws IOException { if (!buffer.hasRemaining()) - doFlush(); + doFlush(1); buffer.put((byte) (b & 0xFF)); } @@ -199,7 +200,7 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus public void writeBoolean(boolean v) throws IOException { if (!buffer.hasRemaining()) - doFlush(); + doFlush(1); buffer.put(v ? (byte)1 : (byte)0); } @@ -310,8 +311,11 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus write(buffer); } + /* + * Count is the number of bytes remaining to write ignoring already remaining capacity + */ @DontInline - protected void doFlush() throws IOException + protected void doFlush(int count) throws IOException { buffer.flip(); @@ -324,13 +328,13 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus @Override public void flush() throws IOException { - doFlush(); + doFlush(0); } @Override public void close() throws IOException { - doFlush(); + doFlush(0); channel.close(); FileUtils.clean(buffer); buffer = null; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7aaea01/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java index 4ba5546..80a7fe2 100644 --- a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java +++ b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java @@ -21,6 +21,10 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; +import org.apache.cassandra.config.Config; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; /** * An implementation of the DataOutputStream interface using a FastByteArrayOutputStream and exposing @@ -30,6 +34,11 @@ import java.nio.channels.WritableByteChannel; */ public class DataOutputBuffer extends BufferedDataOutputStreamPlus { + /* + * Threshold at which resizing transitions from doubling to increasing by 50% + */ + private static final long DOUBLING_THRESHOLD = Long.getLong(Config.PROPERTY_PREFIX + "DOB_DOUBLING_THRESHOLD_MB", 64); + public DataOutputBuffer() { this(128); @@ -51,16 +60,70 @@ public class DataOutputBuffer extends BufferedDataOutputStreamPlus throw new UnsupportedOperationException(); } + //The actual value observed in Hotspot is only -2 + //ByteArrayOutputStream uses -8 + @VisibleForTesting + static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; + + @VisibleForTesting + static int saturatedArraySizeCast(long size) + { + Preconditions.checkArgument(size >= 0); + return (int)Math.min(MAX_ARRAY_SIZE, size); + } + + @VisibleForTesting + static int checkedArraySizeCast(long size) + { + Preconditions.checkArgument(size >= 0); + Preconditions.checkArgument(size <= MAX_ARRAY_SIZE); + return (int)size; + } + @Override - protected void doFlush() throws IOException + protected void doFlush(int count) throws IOException + { + reallocate(count); + } + + //Hack for test, make it possible to override checking the buffer capacity + @VisibleForTesting + long capacity() + { + return buffer.capacity(); + } + + @VisibleForTesting + long validateReallocation(long newSize) + { + int saturatedSize = saturatedArraySizeCast(newSize); + if (saturatedSize <= capacity()) + throw new RuntimeException(); + return saturatedSize; + } + + @VisibleForTesting + long calculateNewSize(long count) { - reallocate(buffer.capacity() * 2); + long capacity = capacity(); + //Both sides of this max expression need to use long arithmetic to avoid integer overflow + //count and capacity are longs so that ensures it right now. + long newSize = capacity + count; + + //For large buffers don't double, increase by 50% + if (capacity > 1024L * 1024L * DOUBLING_THRESHOLD) + newSize = Math.max((capacity * 3L) / 2L, newSize); + else + newSize = Math.max(capacity * 2L, newSize); + + return validateReallocation(newSize); } - protected void reallocate(long newSize) + protected void reallocate(long count) { - assert newSize <= Integer.MAX_VALUE; - ByteBuffer newBuffer = ByteBuffer.allocate((int) newSize); + if (count <= 0) + return; + ByteBuffer newBuffer = ByteBuffer.allocate(checkedArraySizeCast(calculateNewSize(count))); buffer.flip(); newBuffer.put(buffer); buffer = newBuffer; @@ -72,12 +135,13 @@ public class DataOutputBuffer extends BufferedDataOutputStreamPlus return new GrowingChannel(); } - private final class GrowingChannel implements WritableByteChannel + @VisibleForTesting + final class GrowingChannel implements WritableByteChannel { public int write(ByteBuffer src) throws IOException { int count = src.remaining(); - reallocate(Math.max((buffer.capacity() * 3) / 2, buffer.capacity() + count)); + reallocate(count); buffer.put(src); return count; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7aaea01/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java b/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java index fb8d671..c815c9e 100644 --- a/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java +++ b/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java @@ -47,7 +47,7 @@ public class DataOutputBufferFixed extends DataOutputBuffer } @Override - protected void doFlush() throws IOException + protected void doFlush(int count) throws IOException { throw new BufferOverflowException(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7aaea01/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java index aad3266..24eb93c 100644 --- a/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java +++ b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java @@ -42,8 +42,10 @@ public class SafeMemoryWriter extends DataOutputBuffer return memory; } - protected void reallocate(long newCapacity) + @Override + protected void reallocate(long count) { + long newCapacity = calculateNewSize(count); if (newCapacity != capacity()) { long position = length(); @@ -93,6 +95,12 @@ public class SafeMemoryWriter extends DataOutputBuffer return this; } + @Override + public long validateReallocation(long newSize) + { + return newSize; + } + private static long tailOffset(Memory memory) { return Math.max(0, memory.size - Integer.MAX_VALUE); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7aaea01/src/java/org/apache/cassandra/io/util/SequentialWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java index 5bdc15a..dd49868 100644 --- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java +++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java @@ -196,12 +196,12 @@ public class SequentialWriter extends BufferedDataOutputStreamPlus implements Tr */ protected void syncInternal() { - doFlush(); + doFlush(0); syncDataOnlyInternal(); } @Override - protected void doFlush() + protected void doFlush(int count) { flushData(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7aaea01/test/unit/org/apache/cassandra/io/util/DataOutputTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java index bbdf4e1..1fb5597 100644 --- a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java +++ b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java @@ -31,7 +31,10 @@ import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.Channels; +import java.util.ArrayDeque; +import java.util.Deque; import java.util.Random; +import java.util.concurrent.Callable; import java.util.concurrent.ThreadLocalRandom; import org.junit.Assert; @@ -83,6 +86,18 @@ public class DataOutputTest } @Test + public void testDataOutputBufferZeroReallocate() throws IOException + { + try (DataOutputBufferSpy write = new DataOutputBufferSpy()) + { + for (int ii = 0; ii < 1000000; ii++) + { + write.superReallocate(0); + } + } + } + + @Test public void testDataOutputDirectByteBuffer() throws IOException { ByteBuffer buf = wrap(new byte[345], true); @@ -102,6 +117,193 @@ public class DataOutputTest testRead(test, canon); } + private static class DataOutputBufferSpy extends DataOutputBuffer + { + Deque<Long> sizes = new ArrayDeque<>(); + + DataOutputBufferSpy() + { + sizes.offer(128L); + } + + void publicFlush() throws IOException + { + //Going to allow it to double instead of specifying a count + doFlush(1); + } + + void superReallocate(int count) throws IOException + { + super.reallocate(count); + } + + @Override + protected void reallocate(long count) + { + if (count <= 0) + return; + Long lastSize = sizes.peekLast(); + long newSize = calculateNewSize(count); + sizes.offer(newSize); + if (newSize > DataOutputBuffer.MAX_ARRAY_SIZE) + throw new RuntimeException(); + if (newSize < 0) + throw new AssertionError(); + if (lastSize != null && newSize <= lastSize) + throw new AssertionError(); + } + + @Override + protected long capacity() + { + return sizes.peekLast().intValue(); + } + } + + //Check for overflow at the max size, without actually allocating all the memory + @Test + public void testDataOutputBufferMaxSizeFake() throws IOException + { + try (DataOutputBufferSpy write = new DataOutputBufferSpy()) + { + boolean threw = false; + try + { + while (true) + write.publicFlush(); + } + catch (RuntimeException e) { + if (e.getClass() == RuntimeException.class) + threw = true; + } + Assert.assertTrue(threw); + Assert.assertTrue(write.sizes.peekLast() >= DataOutputBuffer.MAX_ARRAY_SIZE); + } + } + + @Test + public void testDataOutputBufferMaxSize() throws IOException + { + //Need a lot of heap to run this test for real. + //Tested everything else as much as possible since we can't do it all the time + if (Runtime.getRuntime().maxMemory() < 5033164800L) + return; + + try (DataOutputBuffer write = new DataOutputBuffer()) + { + //Doesn't throw up to DataOuptutBuffer.MAX_ARRAY_SIZE which is the array size limit in Java + for (int ii = 0; ii < DataOutputBuffer.MAX_ARRAY_SIZE / 8; ii++) + write.writeLong(0); + write.write(new byte[7]); + + //Should fail due to validation + checkThrowsRuntimeException(validateReallocationCallable( write, DataOutputBuffer.MAX_ARRAY_SIZE + 1)); + //Check that it does throw + checkThrowsRuntimeException(new Callable<Object>() + { + public Object call() throws Exception + { + write.write(42); + return null; + } + }); + } + } + + //Can't test it for real without tons of heap so test as much validation as possible + @Test + public void testDataOutputBufferBigReallocation() throws Exception + { + //Check saturating cast behavior + Assert.assertEquals(DataOutputBuffer.MAX_ARRAY_SIZE, DataOutputBuffer.saturatedArraySizeCast(DataOutputBuffer.MAX_ARRAY_SIZE + 1L)); + Assert.assertEquals(DataOutputBuffer.MAX_ARRAY_SIZE, DataOutputBuffer.saturatedArraySizeCast(DataOutputBuffer.MAX_ARRAY_SIZE)); + Assert.assertEquals(DataOutputBuffer.MAX_ARRAY_SIZE - 1, DataOutputBuffer.saturatedArraySizeCast(DataOutputBuffer.MAX_ARRAY_SIZE - 1)); + Assert.assertEquals(0, DataOutputBuffer.saturatedArraySizeCast(0)); + Assert.assertEquals(1, DataOutputBuffer.saturatedArraySizeCast(1)); + checkThrowsIAE(saturatedArraySizeCastCallable(-1)); + + //Check checked cast behavior + checkThrowsIAE(checkedArraySizeCastCallable(DataOutputBuffer.MAX_ARRAY_SIZE + 1L)); + Assert.assertEquals(DataOutputBuffer.MAX_ARRAY_SIZE, DataOutputBuffer.checkedArraySizeCast(DataOutputBuffer.MAX_ARRAY_SIZE)); + Assert.assertEquals(DataOutputBuffer.MAX_ARRAY_SIZE - 1, DataOutputBuffer.checkedArraySizeCast(DataOutputBuffer.MAX_ARRAY_SIZE - 1)); + Assert.assertEquals(0, DataOutputBuffer.checkedArraySizeCast(0)); + Assert.assertEquals(1, DataOutputBuffer.checkedArraySizeCast(1)); + checkThrowsIAE(checkedArraySizeCastCallable(-1)); + + + try (DataOutputBuffer write = new DataOutputBuffer()) + { + //Checked validation performed by DOB + Assert.assertEquals(DataOutputBuffer.MAX_ARRAY_SIZE, write.validateReallocation(DataOutputBuffer.MAX_ARRAY_SIZE + 1L)); + Assert.assertEquals(DataOutputBuffer.MAX_ARRAY_SIZE, write.validateReallocation(DataOutputBuffer.MAX_ARRAY_SIZE)); + Assert.assertEquals(DataOutputBuffer.MAX_ARRAY_SIZE - 1, write.validateReallocation(DataOutputBuffer.MAX_ARRAY_SIZE - 1)); + checkThrowsRuntimeException(validateReallocationCallable( write, 0)); + checkThrowsRuntimeException(validateReallocationCallable( write, 1)); + checkThrowsIAE(validateReallocationCallable( write, -1)); + } + } + + Callable<Object> saturatedArraySizeCastCallable(final long value) + { + return new Callable<Object>() + { + @Override + public Object call() throws Exception + { + return DataOutputBuffer.saturatedArraySizeCast(value); + } + }; + } + + Callable<Object> checkedArraySizeCastCallable(final long value) + { + return new Callable<Object>() + { + @Override + public Object call() throws Exception + { + return DataOutputBuffer.checkedArraySizeCast(value); + } + }; + } + + Callable<Object> validateReallocationCallable(final DataOutputBuffer write, final long value) + { + return new Callable<Object>() + { + @Override + public Object call() throws Exception + { + return write.validateReallocation(value); + } + }; + } + + private static void checkThrowsIAE(Callable<Object> c) + { + checkThrowsException(c, IllegalArgumentException.class); + } + + private static void checkThrowsRuntimeException(Callable<Object> c) + { + checkThrowsException(c, RuntimeException.class); + } + + private static void checkThrowsException(Callable<Object> c, Class<?> exceptionClass) + { + boolean threw = false; + try + { + c.call(); + } + catch (Throwable t) + { + if (t.getClass() == exceptionClass) + threw = true; + } + Assert.assertTrue(threw); + } + @Test public void testSafeMemoryWriter() throws IOException {