Repository: cassandra Updated Branches: refs/heads/cassandra-3.1 5b6a368c9 -> 4c6f32569
Fix integer overflow in DataOutputBuffer doubling and test as best as possible given that allocating 2 gigs in a unit test is problematic. Patch by Ariel Weisberg; reviewed by tjake for CASSANDRA-10592 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a320737b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a320737b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a320737b Branch: refs/heads/cassandra-3.1 Commit: a320737b18c19e3ec59035e5e487f2af1dcd0172 Parents: 2491ede Author: Ariel Weisberg <ariel.weisb...@datastax.com> Authored: Tue Oct 27 12:19:14 2015 -0400 Committer: T Jake Luciani <j...@apache.org> Committed: Tue Dec 1 22:34:28 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../io/util/BufferedDataOutputStreamPlus.java | 20 +- .../cassandra/io/util/DataOutputBuffer.java | 78 ++++++- .../io/util/DataOutputBufferFixed.java | 2 +- .../cassandra/io/util/SafeMemoryWriter.java | 10 +- .../cassandra/io/util/DataOutputTest.java | 202 +++++++++++++++++++ 6 files changed, 296 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a320737b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 7541212..cf73f57 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -16,6 +16,7 @@ * Fix RangeNamesQueryPager (CASSANDRA-10509) * Deprecate Pig support (CASSANDRA-10542) * Reduce contention getting instances of CompositeType (CASSANDRA-10433) + * Fix IllegalArgumentException in DataOutputBuffer.reallocate for large buffers (CASSANDRA-10592) Merged from 2.1: * Add proper error handling to stream receiver (CASSANDRA-10774) * Warn or fail when changing cluster topology live (CASSANDRA-10243) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a320737b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java index 5669a8d..d55db47 100644 --- a/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java +++ b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java @@ -118,7 +118,7 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus } else { - doFlush(); + doFlush(len - copied); } } } @@ -142,11 +142,12 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus else { assert toWrite.isDirect(); - if (toWrite.remaining() > buffer.remaining()) + int toWriteRemaining = toWrite.remaining(); + if (toWriteRemaining > buffer.remaining()) { - doFlush(); + doFlush(toWriteRemaining); MemoryUtil.duplicateDirectByteBuffer(toWrite, hollowBuffer); - if (toWrite.remaining() > buffer.remaining()) + if (toWriteRemaining > buffer.remaining()) { while (hollowBuffer.hasRemaining()) channel.write(hollowBuffer); @@ -254,7 +255,10 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus write(buffer); } - protected void doFlush() throws IOException + /* + * Count is the number of bytes remaining to write ignoring already remaining capacity + */ + protected void doFlush(int count) throws IOException { buffer.flip(); @@ -267,13 +271,13 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus @Override public void flush() throws IOException { - doFlush(); + doFlush(0); } @Override public void close() throws IOException { - doFlush(); + doFlush(0); channel.close(); FileUtils.clean(buffer); buffer = null; @@ -282,7 +286,7 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus protected void ensureRemaining(int minimum) throws IOException { if (buffer.remaining() < minimum) - doFlush(); + doFlush(minimum); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/a320737b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java index 6ffc895..6ea6d97 100644 --- a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java +++ b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java @@ -21,6 +21,10 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; +import org.apache.cassandra.config.Config; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; /** * An implementation of the DataOutputStream interface using a FastByteArrayOutputStream and exposing @@ -30,6 +34,11 @@ import java.nio.channels.WritableByteChannel; */ public class DataOutputBuffer extends BufferedDataOutputStreamPlus { + /* + * Threshold at which resizing transitions from doubling to increasing by 50% + */ + private static final long DOUBLING_THRESHOLD = Long.getLong(Config.PROPERTY_PREFIX + "DOB_DOUBLING_THRESHOLD_MB", 64); + public DataOutputBuffer() { this(128); @@ -51,16 +60,70 @@ public class DataOutputBuffer extends BufferedDataOutputStreamPlus throw new UnsupportedOperationException(); } + //The actual value observed in Hotspot is only -2 + //ByteArrayOutputStream uses -8 + @VisibleForTesting + static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; + + @VisibleForTesting + static int saturatedArraySizeCast(long size) + { + Preconditions.checkArgument(size >= 0); + return (int)Math.min(MAX_ARRAY_SIZE, size); + } + + @VisibleForTesting + static int checkedArraySizeCast(long size) + { + Preconditions.checkArgument(size >= 0); + Preconditions.checkArgument(size <= MAX_ARRAY_SIZE); + return (int)size; + } + @Override - protected void doFlush() throws IOException + protected void doFlush(int count) throws IOException + { + reallocate(count); + } + + //Hack for test, make it possible to override checking the buffer capacity + @VisibleForTesting + long capacity() + { + return buffer.capacity(); + } + + @VisibleForTesting + long validateReallocation(long newSize) + { + int saturatedSize = saturatedArraySizeCast(newSize); + if (saturatedSize <= capacity()) + throw new RuntimeException(); + return saturatedSize; + } + + @VisibleForTesting + long calculateNewSize(long count) { - reallocate(buffer.capacity() * 2); + long capacity = capacity(); + //Both sides of this max expression need to use long arithmetic to avoid integer overflow + //count and capacity are longs so that ensures it right now. + long newSize = capacity + count; + + //For large buffers don't double, increase by 50% + if (capacity > 1024L * 1024L * DOUBLING_THRESHOLD) + newSize = Math.max((capacity * 3L) / 2L, newSize); + else + newSize = Math.max(capacity * 2L, newSize); + + return validateReallocation(newSize); } - protected void reallocate(long newSize) + protected void reallocate(long count) { - assert newSize <= Integer.MAX_VALUE; - ByteBuffer newBuffer = ByteBuffer.allocate((int) newSize); + if (count <= 0) + return; + ByteBuffer newBuffer = ByteBuffer.allocate(checkedArraySizeCast(calculateNewSize(count))); buffer.flip(); newBuffer.put(buffer); buffer = newBuffer; @@ -72,12 +135,13 @@ public class DataOutputBuffer extends BufferedDataOutputStreamPlus return new GrowingChannel(); } - private final class GrowingChannel implements WritableByteChannel + @VisibleForTesting + final class GrowingChannel implements WritableByteChannel { public int write(ByteBuffer src) throws IOException { int count = src.remaining(); - reallocate(Math.max((buffer.capacity() * 3) / 2, buffer.capacity() + count)); + reallocate(count); buffer.put(src); return count; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a320737b/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java b/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java index fb8d671..c815c9e 100644 --- a/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java +++ b/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java @@ -47,7 +47,7 @@ public class DataOutputBufferFixed extends DataOutputBuffer } @Override - protected void doFlush() throws IOException + protected void doFlush(int count) throws IOException { throw new BufferOverflowException(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a320737b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java index aad3266..24eb93c 100644 --- a/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java +++ b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java @@ -42,8 +42,10 @@ public class SafeMemoryWriter extends DataOutputBuffer return memory; } - protected void reallocate(long newCapacity) + @Override + protected void reallocate(long count) { + long newCapacity = calculateNewSize(count); if (newCapacity != capacity()) { long position = length(); @@ -93,6 +95,12 @@ public class SafeMemoryWriter extends DataOutputBuffer return this; } + @Override + public long validateReallocation(long newSize) + { + return newSize; + } + private static long tailOffset(Memory memory) { return Math.max(0, memory.size - Integer.MAX_VALUE); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a320737b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java index bbdf4e1..1fb5597 100644 --- a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java +++ b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java @@ -31,7 +31,10 @@ import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.Channels; +import java.util.ArrayDeque; +import java.util.Deque; import java.util.Random; +import java.util.concurrent.Callable; import java.util.concurrent.ThreadLocalRandom; import org.junit.Assert; @@ -83,6 +86,18 @@ public class DataOutputTest } @Test + public void testDataOutputBufferZeroReallocate() throws IOException + { + try (DataOutputBufferSpy write = new DataOutputBufferSpy()) + { + for (int ii = 0; ii < 1000000; ii++) + { + write.superReallocate(0); + } + } + } + + @Test public void testDataOutputDirectByteBuffer() throws IOException { ByteBuffer buf = wrap(new byte[345], true); @@ -102,6 +117,193 @@ public class DataOutputTest testRead(test, canon); } + private static class DataOutputBufferSpy extends DataOutputBuffer + { + Deque<Long> sizes = new ArrayDeque<>(); + + DataOutputBufferSpy() + { + sizes.offer(128L); + } + + void publicFlush() throws IOException + { + //Going to allow it to double instead of specifying a count + doFlush(1); + } + + void superReallocate(int count) throws IOException + { + super.reallocate(count); + } + + @Override + protected void reallocate(long count) + { + if (count <= 0) + return; + Long lastSize = sizes.peekLast(); + long newSize = calculateNewSize(count); + sizes.offer(newSize); + if (newSize > DataOutputBuffer.MAX_ARRAY_SIZE) + throw new RuntimeException(); + if (newSize < 0) + throw new AssertionError(); + if (lastSize != null && newSize <= lastSize) + throw new AssertionError(); + } + + @Override + protected long capacity() + { + return sizes.peekLast().intValue(); + } + } + + //Check for overflow at the max size, without actually allocating all the memory + @Test + public void testDataOutputBufferMaxSizeFake() throws IOException + { + try (DataOutputBufferSpy write = new DataOutputBufferSpy()) + { + boolean threw = false; + try + { + while (true) + write.publicFlush(); + } + catch (RuntimeException e) { + if (e.getClass() == RuntimeException.class) + threw = true; + } + Assert.assertTrue(threw); + Assert.assertTrue(write.sizes.peekLast() >= DataOutputBuffer.MAX_ARRAY_SIZE); + } + } + + @Test + public void testDataOutputBufferMaxSize() throws IOException + { + //Need a lot of heap to run this test for real. + //Tested everything else as much as possible since we can't do it all the time + if (Runtime.getRuntime().maxMemory() < 5033164800L) + return; + + try (DataOutputBuffer write = new DataOutputBuffer()) + { + //Doesn't throw up to DataOuptutBuffer.MAX_ARRAY_SIZE which is the array size limit in Java + for (int ii = 0; ii < DataOutputBuffer.MAX_ARRAY_SIZE / 8; ii++) + write.writeLong(0); + write.write(new byte[7]); + + //Should fail due to validation + checkThrowsRuntimeException(validateReallocationCallable( write, DataOutputBuffer.MAX_ARRAY_SIZE + 1)); + //Check that it does throw + checkThrowsRuntimeException(new Callable<Object>() + { + public Object call() throws Exception + { + write.write(42); + return null; + } + }); + } + } + + //Can't test it for real without tons of heap so test as much validation as possible + @Test + public void testDataOutputBufferBigReallocation() throws Exception + { + //Check saturating cast behavior + Assert.assertEquals(DataOutputBuffer.MAX_ARRAY_SIZE, DataOutputBuffer.saturatedArraySizeCast(DataOutputBuffer.MAX_ARRAY_SIZE + 1L)); + Assert.assertEquals(DataOutputBuffer.MAX_ARRAY_SIZE, DataOutputBuffer.saturatedArraySizeCast(DataOutputBuffer.MAX_ARRAY_SIZE)); + Assert.assertEquals(DataOutputBuffer.MAX_ARRAY_SIZE - 1, DataOutputBuffer.saturatedArraySizeCast(DataOutputBuffer.MAX_ARRAY_SIZE - 1)); + Assert.assertEquals(0, DataOutputBuffer.saturatedArraySizeCast(0)); + Assert.assertEquals(1, DataOutputBuffer.saturatedArraySizeCast(1)); + checkThrowsIAE(saturatedArraySizeCastCallable(-1)); + + //Check checked cast behavior + checkThrowsIAE(checkedArraySizeCastCallable(DataOutputBuffer.MAX_ARRAY_SIZE + 1L)); + Assert.assertEquals(DataOutputBuffer.MAX_ARRAY_SIZE, DataOutputBuffer.checkedArraySizeCast(DataOutputBuffer.MAX_ARRAY_SIZE)); + Assert.assertEquals(DataOutputBuffer.MAX_ARRAY_SIZE - 1, DataOutputBuffer.checkedArraySizeCast(DataOutputBuffer.MAX_ARRAY_SIZE - 1)); + Assert.assertEquals(0, DataOutputBuffer.checkedArraySizeCast(0)); + Assert.assertEquals(1, DataOutputBuffer.checkedArraySizeCast(1)); + checkThrowsIAE(checkedArraySizeCastCallable(-1)); + + + try (DataOutputBuffer write = new DataOutputBuffer()) + { + //Checked validation performed by DOB + Assert.assertEquals(DataOutputBuffer.MAX_ARRAY_SIZE, write.validateReallocation(DataOutputBuffer.MAX_ARRAY_SIZE + 1L)); + Assert.assertEquals(DataOutputBuffer.MAX_ARRAY_SIZE, write.validateReallocation(DataOutputBuffer.MAX_ARRAY_SIZE)); + Assert.assertEquals(DataOutputBuffer.MAX_ARRAY_SIZE - 1, write.validateReallocation(DataOutputBuffer.MAX_ARRAY_SIZE - 1)); + checkThrowsRuntimeException(validateReallocationCallable( write, 0)); + checkThrowsRuntimeException(validateReallocationCallable( write, 1)); + checkThrowsIAE(validateReallocationCallable( write, -1)); + } + } + + Callable<Object> saturatedArraySizeCastCallable(final long value) + { + return new Callable<Object>() + { + @Override + public Object call() throws Exception + { + return DataOutputBuffer.saturatedArraySizeCast(value); + } + }; + } + + Callable<Object> checkedArraySizeCastCallable(final long value) + { + return new Callable<Object>() + { + @Override + public Object call() throws Exception + { + return DataOutputBuffer.checkedArraySizeCast(value); + } + }; + } + + Callable<Object> validateReallocationCallable(final DataOutputBuffer write, final long value) + { + return new Callable<Object>() + { + @Override + public Object call() throws Exception + { + return write.validateReallocation(value); + } + }; + } + + private static void checkThrowsIAE(Callable<Object> c) + { + checkThrowsException(c, IllegalArgumentException.class); + } + + private static void checkThrowsRuntimeException(Callable<Object> c) + { + checkThrowsException(c, RuntimeException.class); + } + + private static void checkThrowsException(Callable<Object> c, Class<?> exceptionClass) + { + boolean threw = false; + try + { + c.call(); + } + catch (Throwable t) + { + if (t.getClass() == exceptionClass) + threw = true; + } + Assert.assertTrue(threw); + } + @Test public void testSafeMemoryWriter() throws IOException {