Fix SafeMemoryWriter trimming and behaviour over 2G

patch by Branimir Lambov; reviewed by Benedict Elliott Smith for CASSANDRA-14649


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/49adbe7e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/49adbe7e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/49adbe7e

Branch: refs/heads/cassandra-3.11
Commit: 49adbe7e0f0c8a83f3b843b65612528498b5c9a5
Parents: 0e81892
Author: Branimir Lambov <branimir.lam...@datastax.com>
Authored: Thu Aug 16 16:15:07 2018 +0300
Committer: Branimir Lambov <branimir.lam...@datastax.com>
Committed: Tue Aug 21 11:53:30 2018 +0300

----------------------------------------------------------------------
 .../io/sstable/IndexSummaryBuilder.java         |  4 +-
 .../cassandra/io/util/DataOutputBuffer.java     |  8 +-
 .../io/util/DataOutputBufferFixed.java          |  2 +-
 .../cassandra/io/util/SafeMemoryWriter.java     | 16 ++--
 .../cassandra/io/util/DataOutputTest.java       |  4 +-
 .../cassandra/io/util/SafeMemoryWriterTest.java | 90 ++++++++++++++++++++
 6 files changed, 110 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/49adbe7e/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java 
b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
index 6110afe..0f604e0 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
@@ -207,8 +207,8 @@ public class IndexSummaryBuilder implements AutoCloseable
     {
         // this method should only be called when we've finished appending 
records, so we truncate the
         // memory we're using to the exact amount required to represent it 
before building our summary
-        entries.setCapacity(entries.length());
-        offsets.setCapacity(offsets.length());
+        entries.trim();
+        offsets.trim();
     }
 
     public IndexSummary build(IPartitioner partitioner)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/49adbe7e/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java 
b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
index 6ea6d97..3f1e081 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
@@ -37,7 +37,7 @@ public class DataOutputBuffer extends 
BufferedDataOutputStreamPlus
     /*
      * Threshold at which resizing transitions from doubling to increasing by 
50%
      */
-    private static final long DOUBLING_THRESHOLD = 
Long.getLong(Config.PROPERTY_PREFIX + "DOB_DOUBLING_THRESHOLD_MB", 64);
+    static final long DOUBLING_THRESHOLD = Long.getLong(Config.PROPERTY_PREFIX 
+ "DOB_DOUBLING_THRESHOLD_MB", 64);
 
     public DataOutputBuffer()
     {
@@ -83,7 +83,7 @@ public class DataOutputBuffer extends 
BufferedDataOutputStreamPlus
     @Override
     protected void doFlush(int count) throws IOException
     {
-        reallocate(count);
+        expandToFit(count);
     }
 
     //Hack for test, make it possible to override checking the buffer capacity
@@ -119,7 +119,7 @@ public class DataOutputBuffer extends 
BufferedDataOutputStreamPlus
         return validateReallocation(newSize);
     }
 
-    protected void reallocate(long count)
+    protected void expandToFit(long count)
     {
         if (count <= 0)
             return;
@@ -141,7 +141,7 @@ public class DataOutputBuffer extends 
BufferedDataOutputStreamPlus
         public int write(ByteBuffer src) throws IOException
         {
             int count = src.remaining();
-            reallocate(count);
+            expandToFit(count);
             buffer.put(src);
             return count;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/49adbe7e/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java 
b/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java
index c815c9e..c9767fc 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java
@@ -58,7 +58,7 @@ public class DataOutputBufferFixed extends DataOutputBuffer
      * @see org.apache.cassandra.io.util.DataOutputBuffer#reallocate(long)
      */
     @Override
-    protected void reallocate(long newSize)
+    protected void expandToFit(long newSize)
     {
         throw new BufferOverflowException();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/49adbe7e/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java 
b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
index 24eb93c..a2b8f20 100644
--- a/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
@@ -43,9 +43,13 @@ public class SafeMemoryWriter extends DataOutputBuffer
     }
 
     @Override
-    protected void reallocate(long count)
+    protected void expandToFit(long count)
+    {
+        resizeTo(calculateNewSize(count));
+    }
+
+    private void resizeTo(long newCapacity)
     {
-        long newCapacity = calculateNewSize(count);
         if (newCapacity != capacity())
         {
             long position = length();
@@ -63,9 +67,9 @@ public class SafeMemoryWriter extends DataOutputBuffer
         }
     }
 
-    public void setCapacity(long newCapacity)
+    public void trim()
     {
-        reallocate(newCapacity);
+        resizeTo(length());
     }
 
     public void close()
@@ -98,7 +102,9 @@ public class SafeMemoryWriter extends DataOutputBuffer
     @Override
     public long validateReallocation(long newSize)
     {
-        return newSize;
+        // Make sure size does not grow by more than the max buffer size, 
otherwise we'll hit an exception
+        // when setting up the buffer position.
+        return Math.min(newSize, length() + Integer.MAX_VALUE);
     }
 
     private static long tailOffset(Memory memory)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/49adbe7e/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java 
b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
index 1fb5597..90e77d6 100644
--- a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
+++ b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
@@ -134,11 +134,11 @@ public class DataOutputTest
 
         void superReallocate(int count) throws IOException
         {
-            super.reallocate(count);
+            super.expandToFit(count);
         }
 
         @Override
-        protected void reallocate(long count)
+        protected void expandToFit(long count)
         {
             if (count <= 0)
                 return;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/49adbe7e/test/unit/org/apache/cassandra/io/util/SafeMemoryWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/SafeMemoryWriterTest.java 
b/test/unit/org/apache/cassandra/io/util/SafeMemoryWriterTest.java
new file mode 100644
index 0000000..12c8c98
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/util/SafeMemoryWriterTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.util;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import sun.misc.VM;
+
+import static org.junit.Assert.assertEquals;
+
+public class SafeMemoryWriterTest
+{
+    Random rand = new Random();
+    static final int CHUNK = 54321;
+
+    @Test
+    public void testTrim() throws IOException
+    {
+        testSafeMemoryWriter(CHUNK * 5, CHUNK, 65536);
+    }
+
+    @Test
+    public void testOver2GBuffer() throws IOException
+    {
+        // we want the last resize to happen at this size, so that 
calculateNewSize wants to expand by over 2G
+        long initialSize = (Integer.MAX_VALUE * 33L / 32) * 2;
+        // a little more than the value above
+        long testSize = initialSize * 33 / 32;
+
+        // start with smaller initial size, but make sure it would grow to the 
required value above
+        while (initialSize * 2 / 3 > 1024L * 1024L * 
DataOutputBuffer.DOUBLING_THRESHOLD)
+            initialSize = initialSize * 2 / 3;
+
+        if (VM.maxDirectMemory() * 2 / 3 < testSize)
+        {
+            testSize = VM.maxDirectMemory() * 2 / 3;
+            System.err.format("Insufficient direct memory for full test, 
reducing to: %,d %x\n", testSize, testSize);
+        }
+
+        testSafeMemoryWriter(testSize, CHUNK, initialSize);
+    }
+
+    public void testSafeMemoryWriter(long toSize, int chunkSize, long 
initialSize) throws IOException
+    {
+        byte[] data = new byte[chunkSize];
+        rand.nextBytes(data);
+        try (SafeMemoryWriter writer = new SafeMemoryWriter(initialSize))
+        {
+
+            long l;
+            for (l = 0; l < toSize; l += data.length)
+            {
+                writer.write(data);
+            }
+            writer.trim();
+
+            try (SafeMemory written = writer.currentBuffer().sharedCopy())
+            {
+                assertEquals(l, written.size);
+
+                byte[] writtenBytes = new byte[chunkSize];
+                for (l = 0; l < toSize; l += writtenBytes.length)
+                {
+                    written.getBytes(l, writtenBytes, 0, writtenBytes.length);
+                    Assert.assertTrue(Arrays.equals(data, writtenBytes));   // 
assertArrayEquals is too slow for this
+                }
+            }
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to