maedhroz commented on code in PR #1723:
URL: https://github.com/apache/cassandra/pull/1723#discussion_r983721758


##########
src/java/org/apache/cassandra/db/tries/MemtableTrie.java:
##########
@@ -0,0 +1,1039 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.tries;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.agrona.concurrent.UnsafeBuffer;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.bytecomparable.ByteSource;
+import org.apache.cassandra.utils.bytecomparable.ByteComparable;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.github.jamm.MemoryLayoutSpecification;
+
+/**
+ * Memtable trie, i.e. an in-memory trie built for fast modification and reads 
executing concurrently with writes from
+ * a single mutator thread.
+ *
+ * Writes to this should be atomic (i.e. reads should see either the content 
before the write, or the content after the
+ * write; if any read sees the write, then any subsequent (i.e. started after 
it completed) read should also see it).
+ * This implementation does not currently guarantee this, but we still get the 
desired result as `apply` is only used
+ * with singleton tries.
+ */
+public class MemtableTrie<T> extends MemtableReadTrie<T>
+{
+    // See the trie format description in MemtableReadTrie.
+
+    /**
+     * Trie size limit. This is not enforced, but users must check from time 
to time that it is not exceeded (using
+     * reachedAllocatedSizeThreshold()) and start switching to a new trie if 
it is.
+     * This must be done to avoid tries growing beyond their hard 2GB size 
limit (due to the 32-bit pointers).
+     */
+    private static final int ALLOCATED_SIZE_THRESHOLD;
+    static
+    {
+        String propertyName = "dse.trie_size_limit_mb";
+        // Default threshold + 10% == 1 GB. Adjusted slightly up to avoid a 
tiny final allocation for the 2G max.
+        int limitInMB = Integer.parseInt(System.getProperty(propertyName,
+                                                            
Integer.toString(1024 * 10 / 11 + 1)));
+        if (limitInMB < 1 || limitInMB > 2047)
+            throw new AssertionError(propertyName + " must be within 1 and 
2047");
+        ALLOCATED_SIZE_THRESHOLD = 1024 * 1024 * limitInMB;
+    }
+
+    private int allocatedPos = 0;
+    private int contentCount = 0;
+
+    private final BufferType bufferType;    // on or off heap
+
+    // constants for space calculations
+    private static final long EMPTY_SIZE_ON_HEAP;
+    private static final long EMPTY_SIZE_OFF_HEAP;
+    private static final long REFERENCE_ARRAY_ON_HEAP_SIZE = 
ObjectSizes.measureDeep(new AtomicReferenceArray<>(0));
+
+    static
+    {
+        MemtableTrie<Object> empty = new MemtableTrie<>(BufferType.ON_HEAP);
+        EMPTY_SIZE_ON_HEAP = ObjectSizes.measureDeep(empty);
+        empty = new MemtableTrie<>(BufferType.OFF_HEAP);
+        EMPTY_SIZE_OFF_HEAP = ObjectSizes.measureDeep(empty);
+    }
+
+    public MemtableTrie(BufferType bufferType)
+    {
+        super(new UnsafeBuffer[31 - BUF_START_SHIFT],  // last one is 1G for a 
total of ~2G bytes
+              new AtomicReferenceArray[29 - CONTENTS_START_SHIFT],  // takes 
at least 4 bytes to write pointer to one content -> 4 times smaller than buffers
+              NONE);
+        this.bufferType = bufferType;
+        assert INITIAL_BUFFER_CAPACITY % BLOCK_SIZE == 0;
+    }
+
+    // Buffer, content list and block management
+
+    public static class SpaceExhaustedException extends Exception
+    {
+        public SpaceExhaustedException()
+        {
+            super("The hard 2GB limit on trie size has been exceeded");
+        }
+    }
+
+    final void putInt(int pos, int value)
+    {
+        getChunk(pos).putInt(inChunkPointer(pos), value);
+    }
+
+    final void putIntOrdered(int pos, int value)
+    {
+        getChunk(pos).putIntOrdered(inChunkPointer(pos), value);
+    }
+
+    final void putIntVolatile(int pos, int value)
+    {
+        getChunk(pos).putIntVolatile(inChunkPointer(pos), value);
+    }
+
+    final void putShort(int pos, short value)
+    {
+        getChunk(pos).putShort(inChunkPointer(pos), value);
+    }
+
+    final void putShortVolatile(int pos, short value)
+    {
+        getChunk(pos).putShort(inChunkPointer(pos), value);
+    }
+
+    final void putByte(int pos, byte value)
+    {
+        getChunk(pos).putByte(inChunkPointer(pos), value);
+    }
+
+
+    private int allocateBlock() throws SpaceExhaustedException
+    {
+        // Note: If this method is modified, please run 
MemtableTrieTest.testOver1GSize to verify it acts correctly
+        // close to the 2G limit.
+        int v = allocatedPos;
+        if (inChunkPointer(v) == 0)
+        {
+            int leadBit = getChunkIdx(v, BUF_START_SHIFT, BUF_START_SIZE);
+            if (leadBit == 31)
+                throw new SpaceExhaustedException();
+
+            assert buffers[leadBit] == null;
+            ByteBuffer newBuffer = bufferType.allocate(BUF_START_SIZE << 
leadBit);
+            buffers[leadBit] = new UnsafeBuffer(newBuffer);
+            // The above does not contain any happens-before enforcing writes, 
thus at this point the new buffer may be
+            // invisible to any concurrent readers. Touching the volatile root 
pointer (which any new read must go
+            // through) enforces a happens-before that makes it visible to all 
new reads (note: when the write completes
+            // it must do some volatile write, but that will be in the new 
buffer and without the line below could
+            // remain unreachable by other cores).
+            root = root;
+        }
+
+        allocatedPos += BLOCK_SIZE;
+        return v;
+    }
+
+    private int addContent(T value)
+    {
+        int index = contentCount++;
+        int leadBit = getChunkIdx(index, CONTENTS_START_SHIFT, 
CONTENTS_START_SIZE);
+        int ofs = inChunkPointer(index, leadBit, CONTENTS_START_SIZE);
+        AtomicReferenceArray<T> array = contentArrays[leadBit];
+        if (array == null)
+        {
+            assert ofs == 0;
+            contentArrays[leadBit] = array = new 
AtomicReferenceArray<>(CONTENTS_START_SIZE << leadBit);
+        }
+        array.lazySet(ofs, value); // no need for a volatile set here; at this 
point the item is not referenced
+                                   // by any node in the trie, and a volatile 
set will be made to reference it.
+        return index;
+    }
+
+    private void setContent(int index, T value)
+    {
+        int leadBit = getChunkIdx(index, CONTENTS_START_SHIFT, 
CONTENTS_START_SIZE);
+        int ofs = inChunkPointer(index, leadBit, CONTENTS_START_SIZE);
+        AtomicReferenceArray<T> array = contentArrays[leadBit];
+        array.set(ofs, value);
+    }
+
+    public void discardBuffers()
+    {
+        if (bufferType == BufferType.ON_HEAP)
+            return; // no cleaning needed
+
+        for (UnsafeBuffer b : buffers)
+        {
+            if (b != null)
+                FileUtils.clean(b.byteBuffer());
+        }
+    }
+
+    // Write methods
+
+    // Write visibility model: writes are not volatile, with the exception of 
the final write before a call returns
+    // the same value that was present before (e.g. content was updated 
in-place / existing node got a new child or had
+    // a child pointer updated); if the whole path including the root node 
changed, the root itself gets a volatile
+    // write.
+    // This final write is the point where any new cells created during the 
write become visible for readers for the
+    // first time, and such readers must pass through reading that pointer, 
which forces a happens-before relationship
+    // that extends to all values written by this thread before it.
+
+    /**
+     * Attach a child to the given non-content node. This may be an update for 
an existing branch, or a new child for
+     * the node. An update _is_ required (i.e. this is only called when the 
newChild pointer is not the same as the
+     * existing value).
+     */
+    private int attachChild(int node, int trans, int newChild) throws 
SpaceExhaustedException
+    {
+        if (isLeaf(node))
+            throw new AssertionError("attachChild cannot be used on content 
nodes.");

Review Comment:
   nit: Is this case tested via `TrieMemtable`?



##########
src/java/org/apache/cassandra/db/tries/MemtableTrie.java:
##########
@@ -0,0 +1,1039 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.tries;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.agrona.concurrent.UnsafeBuffer;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.bytecomparable.ByteSource;
+import org.apache.cassandra.utils.bytecomparable.ByteComparable;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.github.jamm.MemoryLayoutSpecification;
+
+/**
+ * Memtable trie, i.e. an in-memory trie built for fast modification and reads 
executing concurrently with writes from
+ * a single mutator thread.
+ *
+ * Writes to this should be atomic (i.e. reads should see either the content 
before the write, or the content after the
+ * write; if any read sees the write, then any subsequent (i.e. started after 
it completed) read should also see it).
+ * This implementation does not currently guarantee this, but we still get the 
desired result as `apply` is only used
+ * with singleton tries.
+ */
+public class MemtableTrie<T> extends MemtableReadTrie<T>
+{
+    // See the trie format description in MemtableReadTrie.
+
+    /**
+     * Trie size limit. This is not enforced, but users must check from time 
to time that it is not exceeded (using
+     * reachedAllocatedSizeThreshold()) and start switching to a new trie if 
it is.
+     * This must be done to avoid tries growing beyond their hard 2GB size 
limit (due to the 32-bit pointers).
+     */
+    private static final int ALLOCATED_SIZE_THRESHOLD;
+    static
+    {
+        String propertyName = "dse.trie_size_limit_mb";
+        // Default threshold + 10% == 1 GB. Adjusted slightly up to avoid a 
tiny final allocation for the 2G max.
+        int limitInMB = Integer.parseInt(System.getProperty(propertyName,
+                                                            
Integer.toString(1024 * 10 / 11 + 1)));
+        if (limitInMB < 1 || limitInMB > 2047)
+            throw new AssertionError(propertyName + " must be within 1 and 
2047");
+        ALLOCATED_SIZE_THRESHOLD = 1024 * 1024 * limitInMB;
+    }
+
+    private int allocatedPos = 0;
+    private int contentCount = 0;
+
+    private final BufferType bufferType;    // on or off heap
+
+    // constants for space calculations
+    private static final long EMPTY_SIZE_ON_HEAP;
+    private static final long EMPTY_SIZE_OFF_HEAP;
+    private static final long REFERENCE_ARRAY_ON_HEAP_SIZE = 
ObjectSizes.measureDeep(new AtomicReferenceArray<>(0));
+
+    static
+    {
+        MemtableTrie<Object> empty = new MemtableTrie<>(BufferType.ON_HEAP);
+        EMPTY_SIZE_ON_HEAP = ObjectSizes.measureDeep(empty);
+        empty = new MemtableTrie<>(BufferType.OFF_HEAP);
+        EMPTY_SIZE_OFF_HEAP = ObjectSizes.measureDeep(empty);
+    }
+
+    public MemtableTrie(BufferType bufferType)
+    {
+        super(new UnsafeBuffer[31 - BUF_START_SHIFT],  // last one is 1G for a 
total of ~2G bytes
+              new AtomicReferenceArray[29 - CONTENTS_START_SHIFT],  // takes 
at least 4 bytes to write pointer to one content -> 4 times smaller than buffers
+              NONE);
+        this.bufferType = bufferType;
+        assert INITIAL_BUFFER_CAPACITY % BLOCK_SIZE == 0;
+    }
+
+    // Buffer, content list and block management
+
+    public static class SpaceExhaustedException extends Exception
+    {
+        public SpaceExhaustedException()
+        {
+            super("The hard 2GB limit on trie size has been exceeded");
+        }
+    }
+
+    final void putInt(int pos, int value)
+    {
+        getChunk(pos).putInt(inChunkPointer(pos), value);
+    }
+
+    final void putIntOrdered(int pos, int value)
+    {
+        getChunk(pos).putIntOrdered(inChunkPointer(pos), value);
+    }
+
+    final void putIntVolatile(int pos, int value)
+    {
+        getChunk(pos).putIntVolatile(inChunkPointer(pos), value);
+    }
+
+    final void putShort(int pos, short value)
+    {
+        getChunk(pos).putShort(inChunkPointer(pos), value);
+    }
+
+    final void putShortVolatile(int pos, short value)
+    {
+        getChunk(pos).putShort(inChunkPointer(pos), value);
+    }
+
+    final void putByte(int pos, byte value)
+    {
+        getChunk(pos).putByte(inChunkPointer(pos), value);
+    }
+
+
+    private int allocateBlock() throws SpaceExhaustedException
+    {
+        // Note: If this method is modified, please run 
MemtableTrieTest.testOver1GSize to verify it acts correctly
+        // close to the 2G limit.
+        int v = allocatedPos;
+        if (inChunkPointer(v) == 0)
+        {
+            int leadBit = getChunkIdx(v, BUF_START_SHIFT, BUF_START_SIZE);
+            if (leadBit == 31)
+                throw new SpaceExhaustedException();
+
+            assert buffers[leadBit] == null;
+            ByteBuffer newBuffer = bufferType.allocate(BUF_START_SIZE << 
leadBit);
+            buffers[leadBit] = new UnsafeBuffer(newBuffer);
+            // The above does not contain any happens-before enforcing writes, 
thus at this point the new buffer may be
+            // invisible to any concurrent readers. Touching the volatile root 
pointer (which any new read must go
+            // through) enforces a happens-before that makes it visible to all 
new reads (note: when the write completes
+            // it must do some volatile write, but that will be in the new 
buffer and without the line below could
+            // remain unreachable by other cores).
+            root = root;
+        }
+
+        allocatedPos += BLOCK_SIZE;
+        return v;
+    }
+
+    private int addContent(T value)
+    {
+        int index = contentCount++;
+        int leadBit = getChunkIdx(index, CONTENTS_START_SHIFT, 
CONTENTS_START_SIZE);
+        int ofs = inChunkPointer(index, leadBit, CONTENTS_START_SIZE);
+        AtomicReferenceArray<T> array = contentArrays[leadBit];
+        if (array == null)
+        {
+            assert ofs == 0;
+            contentArrays[leadBit] = array = new 
AtomicReferenceArray<>(CONTENTS_START_SIZE << leadBit);
+        }
+        array.lazySet(ofs, value); // no need for a volatile set here; at this 
point the item is not referenced
+                                   // by any node in the trie, and a volatile 
set will be made to reference it.
+        return index;
+    }
+
+    private void setContent(int index, T value)
+    {
+        int leadBit = getChunkIdx(index, CONTENTS_START_SHIFT, 
CONTENTS_START_SIZE);
+        int ofs = inChunkPointer(index, leadBit, CONTENTS_START_SIZE);
+        AtomicReferenceArray<T> array = contentArrays[leadBit];
+        array.set(ofs, value);
+    }
+
+    public void discardBuffers()
+    {
+        if (bufferType == BufferType.ON_HEAP)
+            return; // no cleaning needed
+
+        for (UnsafeBuffer b : buffers)
+        {
+            if (b != null)
+                FileUtils.clean(b.byteBuffer());
+        }
+    }
+
+    // Write methods
+
+    // Write visibility model: writes are not volatile, with the exception of 
the final write before a call returns
+    // the same value that was present before (e.g. content was updated 
in-place / existing node got a new child or had
+    // a child pointer updated); if the whole path including the root node 
changed, the root itself gets a volatile
+    // write.
+    // This final write is the point where any new cells created during the 
write become visible for readers for the
+    // first time, and such readers must pass through reading that pointer, 
which forces a happens-before relationship
+    // that extends to all values written by this thread before it.
+
+    /**
+     * Attach a child to the given non-content node. This may be an update for 
an existing branch, or a new child for
+     * the node. An update _is_ required (i.e. this is only called when the 
newChild pointer is not the same as the
+     * existing value).
+     */
+    private int attachChild(int node, int trans, int newChild) throws 
SpaceExhaustedException
+    {
+        if (isLeaf(node))
+            throw new AssertionError("attachChild cannot be used on content 
nodes.");
+
+        switch (offset(node))
+        {
+            case PREFIX_OFFSET:
+                throw new AssertionError("attachChild cannot be used on 
content nodes.");

Review Comment:
   nit: Is this case tested via `TrieMemtable`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to