maedhroz commented on code in PR #1723:
URL: https://github.com/apache/cassandra/pull/1723#discussion_r983724344


##########
src/java/org/apache/cassandra/db/tries/MemtableTrie.java:
##########
@@ -0,0 +1,1039 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.tries;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.agrona.concurrent.UnsafeBuffer;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.bytecomparable.ByteSource;
+import org.apache.cassandra.utils.bytecomparable.ByteComparable;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.github.jamm.MemoryLayoutSpecification;
+
+/**
+ * Memtable trie, i.e. an in-memory trie built for fast modification and reads 
executing concurrently with writes from
+ * a single mutator thread.
+ *
+ * Writes to this should be atomic (i.e. reads should see either the content 
before the write, or the content after the
+ * write; if any read sees the write, then any subsequent (i.e. started after 
it completed) read should also see it).
+ * This implementation does not currently guarantee this, but we still get the 
desired result as `apply` is only used
+ * with singleton tries.
+ */
+public class MemtableTrie<T> extends MemtableReadTrie<T>
+{
+    // See the trie format description in MemtableReadTrie.
+
+    /**
+     * Trie size limit. This is not enforced, but users must check from time 
to time that it is not exceeded (using
+     * reachedAllocatedSizeThreshold()) and start switching to a new trie if 
it is.
+     * This must be done to avoid tries growing beyond their hard 2GB size 
limit (due to the 32-bit pointers).
+     */
+    private static final int ALLOCATED_SIZE_THRESHOLD;
+    static
+    {
+        String propertyName = "dse.trie_size_limit_mb";
+        // Default threshold + 10% == 1 GB. Adjusted slightly up to avoid a 
tiny final allocation for the 2G max.
+        int limitInMB = Integer.parseInt(System.getProperty(propertyName,
+                                                            
Integer.toString(1024 * 10 / 11 + 1)));
+        if (limitInMB < 1 || limitInMB > 2047)
+            throw new AssertionError(propertyName + " must be within 1 and 
2047");
+        ALLOCATED_SIZE_THRESHOLD = 1024 * 1024 * limitInMB;
+    }
+
+    private int allocatedPos = 0;
+    private int contentCount = 0;
+
+    private final BufferType bufferType;    // on or off heap
+
+    // constants for space calculations
+    private static final long EMPTY_SIZE_ON_HEAP;
+    private static final long EMPTY_SIZE_OFF_HEAP;
+    private static final long REFERENCE_ARRAY_ON_HEAP_SIZE = 
ObjectSizes.measureDeep(new AtomicReferenceArray<>(0));
+
+    static
+    {
+        MemtableTrie<Object> empty = new MemtableTrie<>(BufferType.ON_HEAP);
+        EMPTY_SIZE_ON_HEAP = ObjectSizes.measureDeep(empty);
+        empty = new MemtableTrie<>(BufferType.OFF_HEAP);
+        EMPTY_SIZE_OFF_HEAP = ObjectSizes.measureDeep(empty);
+    }
+
+    public MemtableTrie(BufferType bufferType)
+    {
+        super(new UnsafeBuffer[31 - BUF_START_SHIFT],  // last one is 1G for a 
total of ~2G bytes
+              new AtomicReferenceArray[29 - CONTENTS_START_SHIFT],  // takes 
at least 4 bytes to write pointer to one content -> 4 times smaller than buffers
+              NONE);
+        this.bufferType = bufferType;
+        assert INITIAL_BUFFER_CAPACITY % BLOCK_SIZE == 0;
+    }
+
+    // Buffer, content list and block management
+
+    public static class SpaceExhaustedException extends Exception
+    {
+        public SpaceExhaustedException()
+        {
+            super("The hard 2GB limit on trie size has been exceeded");
+        }
+    }
+
+    final void putInt(int pos, int value)
+    {
+        getChunk(pos).putInt(inChunkPointer(pos), value);
+    }
+
+    final void putIntOrdered(int pos, int value)
+    {
+        getChunk(pos).putIntOrdered(inChunkPointer(pos), value);
+    }
+
+    final void putIntVolatile(int pos, int value)
+    {
+        getChunk(pos).putIntVolatile(inChunkPointer(pos), value);
+    }
+
+    final void putShort(int pos, short value)
+    {
+        getChunk(pos).putShort(inChunkPointer(pos), value);
+    }
+
+    final void putShortVolatile(int pos, short value)
+    {
+        getChunk(pos).putShort(inChunkPointer(pos), value);
+    }
+
+    final void putByte(int pos, byte value)
+    {
+        getChunk(pos).putByte(inChunkPointer(pos), value);
+    }
+
+
+    private int allocateBlock() throws SpaceExhaustedException
+    {
+        // Note: If this method is modified, please run 
MemtableTrieTest.testOver1GSize to verify it acts correctly
+        // close to the 2G limit.
+        int v = allocatedPos;
+        if (inChunkPointer(v) == 0)
+        {
+            int leadBit = getChunkIdx(v, BUF_START_SHIFT, BUF_START_SIZE);
+            if (leadBit == 31)
+                throw new SpaceExhaustedException();
+
+            assert buffers[leadBit] == null;
+            ByteBuffer newBuffer = bufferType.allocate(BUF_START_SIZE << 
leadBit);
+            buffers[leadBit] = new UnsafeBuffer(newBuffer);
+            // The above does not contain any happens-before enforcing writes, 
thus at this point the new buffer may be
+            // invisible to any concurrent readers. Touching the volatile root 
pointer (which any new read must go
+            // through) enforces a happens-before that makes it visible to all 
new reads (note: when the write completes
+            // it must do some volatile write, but that will be in the new 
buffer and without the line below could
+            // remain unreachable by other cores).
+            root = root;
+        }
+
+        allocatedPos += BLOCK_SIZE;
+        return v;
+    }
+
+    private int addContent(T value)
+    {
+        int index = contentCount++;
+        int leadBit = getChunkIdx(index, CONTENTS_START_SHIFT, 
CONTENTS_START_SIZE);
+        int ofs = inChunkPointer(index, leadBit, CONTENTS_START_SIZE);
+        AtomicReferenceArray<T> array = contentArrays[leadBit];
+        if (array == null)
+        {
+            assert ofs == 0;
+            contentArrays[leadBit] = array = new 
AtomicReferenceArray<>(CONTENTS_START_SIZE << leadBit);
+        }
+        array.lazySet(ofs, value); // no need for a volatile set here; at this 
point the item is not referenced
+                                   // by any node in the trie, and a volatile 
set will be made to reference it.
+        return index;
+    }
+
+    private void setContent(int index, T value)
+    {
+        int leadBit = getChunkIdx(index, CONTENTS_START_SHIFT, 
CONTENTS_START_SIZE);
+        int ofs = inChunkPointer(index, leadBit, CONTENTS_START_SIZE);
+        AtomicReferenceArray<T> array = contentArrays[leadBit];
+        array.set(ofs, value);
+    }
+
+    public void discardBuffers()
+    {
+        if (bufferType == BufferType.ON_HEAP)
+            return; // no cleaning needed
+
+        for (UnsafeBuffer b : buffers)
+        {
+            if (b != null)
+                FileUtils.clean(b.byteBuffer());
+        }
+    }
+
+    // Write methods
+
+    // Write visibility model: writes are not volatile, with the exception of 
the final write before a call returns
+    // the same value that was present before (e.g. content was updated 
in-place / existing node got a new child or had
+    // a child pointer updated); if the whole path including the root node 
changed, the root itself gets a volatile
+    // write.
+    // This final write is the point where any new cells created during the 
write become visible for readers for the
+    // first time, and such readers must pass through reading that pointer, 
which forces a happens-before relationship
+    // that extends to all values written by this thread before it.
+
+    /**
+     * Attach a child to the given non-content node. This may be an update for 
an existing branch, or a new child for
+     * the node. An update _is_ required (i.e. this is only called when the 
newChild pointer is not the same as the
+     * existing value).
+     */
+    private int attachChild(int node, int trans, int newChild) throws 
SpaceExhaustedException
+    {
+        if (isLeaf(node))
+            throw new AssertionError("attachChild cannot be used on content 
nodes.");
+
+        switch (offset(node))
+        {
+            case PREFIX_OFFSET:
+                throw new AssertionError("attachChild cannot be used on 
content nodes.");
+            case SPARSE_OFFSET:
+                return attachChildToSparse(node, trans, newChild);
+            case SPLIT_OFFSET:
+                attachChildToSplit(node, trans, newChild);
+                return node;
+            case LAST_POINTER_OFFSET - 1:
+                // If this is the last character in a Chain block, we can 
modify the child in-place
+                if (trans == getUnsignedByte(node))
+                {
+                    putIntVolatile(node + 1, newChild);
+                    return node;
+                }
+                // else pass through
+            default:
+                return attachChildToChain(node, trans, newChild);
+        }
+    }
+
+    /**
+     * Attach a child to the given split node. This may be an update for an 
existing branch, or a new child for the node.
+     */
+    private void attachChildToSplit(int node, int trans, int newChild) throws 
SpaceExhaustedException
+    {
+        int midPos = splitBlockPointerAddress(node, splitNodeMidIndex(trans), 
SPLIT_START_LEVEL_LIMIT);
+        int mid = getInt(midPos);
+        if (isNull(mid))
+        {
+            mid = createEmptySplitNode();
+            int tailPos = splitBlockPointerAddress(mid, 
splitNodeTailIndex(trans), SPLIT_OTHER_LEVEL_LIMIT);
+            int tail = createEmptySplitNode();
+            int childPos = splitBlockPointerAddress(tail, 
splitNodeChildIndex(trans), SPLIT_OTHER_LEVEL_LIMIT);
+            putInt(childPos, newChild);
+            putInt(tailPos, tail);
+            putIntVolatile(midPos, mid);
+            return;
+        }
+
+        int tailPos = splitBlockPointerAddress(mid, splitNodeTailIndex(trans), 
SPLIT_OTHER_LEVEL_LIMIT);
+        int tail = getInt(tailPos);
+        if (isNull(tail))
+        {
+            tail = createEmptySplitNode();
+            int childPos = splitBlockPointerAddress(tail, 
splitNodeChildIndex(trans), SPLIT_OTHER_LEVEL_LIMIT);
+            putInt(childPos, newChild);
+            putIntVolatile(tailPos, tail);
+            return;
+        }
+
+        int childPos = splitBlockPointerAddress(tail, 
splitNodeChildIndex(trans), SPLIT_OTHER_LEVEL_LIMIT);
+        putIntVolatile(childPos, newChild);
+    }
+
+    /**
+     * Attach a child to the given sparse node. This may be an update for an 
existing branch, or a new child for the node.
+     */
+    private int attachChildToSparse(int node, int trans, int newChild) throws 
SpaceExhaustedException
+    {
+        int i;
+        // first check if this is an update and modify in-place if so
+        for (i = 0; i < SPARSE_CHILD_COUNT; ++i)
+        {
+            if (isNull(getInt(node + SPARSE_CHILDREN_OFFSET + i * 4)))
+                break;
+            if ((getUnsignedByte(node + SPARSE_BYTES_OFFSET + i)) == trans)
+            {
+                putIntVolatile(node + SPARSE_CHILDREN_OFFSET + i * 4, 
newChild);
+                return node;
+            }
+        }
+
+        if (i == SPARSE_CHILD_COUNT)
+        {
+            // Node is full. Switch to split
+            int split = createEmptySplitNode();
+            for (i = 0; i < SPARSE_CHILD_COUNT; ++i)
+            {
+                int t = getUnsignedByte(node + SPARSE_BYTES_OFFSET + i);
+                int p = getInt(node + SPARSE_CHILDREN_OFFSET + i * 4);
+                attachChildToSplitNonVolatile(split, t, p);
+            }
+            attachChildToSplitNonVolatile(split, trans, newChild);
+            return split;
+        }
+
+        // Add a new transition. They are not kept in order, so append it at 
the first free position.
+        putByte(node + SPARSE_BYTES_OFFSET + i,  (byte) trans);
+
+        // Update order word.
+        int order = getUnsignedShort(node + SPARSE_ORDER_OFFSET);
+        int newOrder = insertInOrderWord(order, i, trans, node + 
SPARSE_BYTES_OFFSET);
+
+        // Sparse nodes have two access modes: via the order word, when 
listing transitions, or directly to characters
+        // and addresses.
+        // To support the former, we volatile write to the order word last, 
and everything is correctly set up.
+        // The latter does not touch the order word. To support that too, we 
volatile write the address, as the reader
+        // can't determine if the position is in use based on the character 
byte alone (00 is also a valid transition).
+        // Note that this means that reader must check the transition byte 
AFTER the address, to ensure they get the
+        // correct value (see getSparseChild).
+
+        // setting child enables reads to start seeing the new branch
+        putIntVolatile(node + SPARSE_CHILDREN_OFFSET + i * 4, newChild);
+
+        // some readers will decide whether to check the pointer based on the 
order word
+        // write that volatile to make sure they see the new change too
+        putShortVolatile(node + SPARSE_ORDER_OFFSET,  (short) newOrder);
+        return node;
+    }
+
+    /**
+     * Insert the given newIndex in the base-6 encoded order word in the 
correct position with respect to the ordering.
+     *
+     * E.g. if the existing bytes were 20, 50, 30 with order word 120 (decimal 
48), then
+     *   - insertOrderWord(120, 3, 5, ptr)  must return 1203 (decimal 48*6 + 3)
+     *   - insertOrderWord(120, 3, 25, ptr) must return 1230 (decimal 8*36 + 
3*6 + 0)
+     *   - insertOrderWord(120, 3, 35, ptr) must return 1320 (decimal 1*216 + 
3*36 + 12)
+     *   - insertOrderWord(120, 3, 55, ptr) must return 3120 (decimal 3*216 + 
48)
+     */
+    private int insertInOrderWord(int order, int newIndex, int transitionByte, 
int bytesPosition)
+    {
+        int s = order;
+        int r = 1;
+        while (s != 0)
+        {
+            int b = getUnsignedByte(bytesPosition + s % SPARSE_CHILD_COUNT);
+            if (b > transitionByte)
+                break;
+
+            assert b < transitionByte;
+            r *= 6;
+            s /= 6;
+        }
+        // insert i after the ones we have passed (order % r) and before the 
remaining (s)
+        return order % r + (s * 6 + newIndex) * r;
+    }
+
+    /**
+     * Non-volatile version of attachChildToSplit. Used when the split node is 
not reachable yet (during the conversion
+     * from sparse).
+     */
+    private void attachChildToSplitNonVolatile(int node, int trans, int 
newChild) throws SpaceExhaustedException
+    {
+        assert offset(node) == SPLIT_OFFSET;
+        int midPos = splitBlockPointerAddress(node, splitNodeMidIndex(trans), 
SPLIT_START_LEVEL_LIMIT);
+        int mid = getInt(midPos);
+        if (isNull(mid))
+        {
+            mid = createEmptySplitNode();
+            putInt(midPos, mid);
+        }
+
+        assert offset(mid) == SPLIT_OFFSET;
+        int tailPos = splitBlockPointerAddress(mid, splitNodeTailIndex(trans), 
SPLIT_OTHER_LEVEL_LIMIT);
+        int tail = getInt(tailPos);
+        if (isNull(tail))
+        {
+            tail = createEmptySplitNode();
+            putInt(tailPos, tail);
+        }
+
+        assert offset(tail) == SPLIT_OFFSET;
+        int childPos = splitBlockPointerAddress(tail, 
splitNodeChildIndex(trans), SPLIT_OTHER_LEVEL_LIMIT);
+        putInt(childPos, newChild);
+    }
+
+    /**
+     * Attach a child to the given chain node. This may be an update for an 
existing branch with different target
+     * address, or a second child for the node.
+     * This method always copies the node -- with the exception of updates 
that change the child of the last node in a
+     * chain block with matching transition byte (which this method is not 
used for, see attachChild), modifications to
+     * chain nodes cannot be done in place, either because we introduce a new 
transition byte and have to convert from
+     * the single-transition chain type to sparse, or because we have to remap 
the child from the implicit node + 1 to
+     * something else.
+     */
+    private int attachChildToChain(int node, int transitionByte, int newChild) 
throws SpaceExhaustedException
+    {
+        int existingByte = getUnsignedByte(node);
+        if (transitionByte == existingByte)
+        {
+            // This will only be called if new child is different from old, 
and the update is not on the final child
+            // where we can change it in place (see attachChild). We must 
always create something new.
+            // If the child is a chain, we can expand it (since it's a 
different value, its branch must be new and
+            // nothing can already reside in the rest of the block).
+            return expandOrCreateChainNode(transitionByte, newChild);
+        }
+
+        // The new transition is different, so we no longer have only one 
transition. Change type.
+        int existingChild = node + 1;
+        if (offset(existingChild) == LAST_POINTER_OFFSET)
+        {
+            existingChild = getInt(existingChild);
+        }
+        return createSparseNode(existingByte, existingChild, transitionByte, 
newChild);
+    }
+
+    private boolean isExpandableChain(int newChild)
+    {
+        int newOffset = offset(newChild);
+        return newChild > 0 && newChild - 1 > NONE && newOffset > 
CHAIN_MIN_OFFSET && newOffset <= CHAIN_MAX_OFFSET;
+    }
+
+    /**
+     * Create a sparse node with two children.
+     */
+    private int createSparseNode(int byte1, int child1, int byte2, int child2) 
throws SpaceExhaustedException
+    {
+        assert byte1 != byte2;
+        if (byte1 > byte2)
+        {
+            // swap them so the smaller is byte1, i.e. there's always 
something bigger than child 0 so 0 never is
+            // at the end of the order
+            int t = byte1; byte1 = byte2; byte2 = t;
+            t = child1; child1 = child2; child2 = t;
+        }
+
+        int node = allocateBlock() + SPARSE_OFFSET;
+        putByte(node + SPARSE_BYTES_OFFSET + 0,  (byte) byte1);
+        putByte(node + SPARSE_BYTES_OFFSET + 1,  (byte) byte2);
+        putInt(node + SPARSE_CHILDREN_OFFSET + 0 * 4, child1);
+        putInt(node + SPARSE_CHILDREN_OFFSET + 1 * 4, child2);
+        putShort(node + SPARSE_ORDER_OFFSET,  (short) (1 * 6 + 0));
+        // Note: this does not need a volatile write as it is a new node, 
returning a new pointer, which needs to be
+        // put in an existing node or the root. That action ends in a 
happens-before enforcing write.
+        return node;
+    }
+
+    /**
+     * Creates a chain node with the single provided transition (pointing to 
the provided child).
+     * Note that to avoid creating inefficient tries with under-utilized chain 
nodes, this should only be called from
+     * {@link #expandOrCreateChainNode} and other call-sites should call 
{@link #expandOrCreateChainNode}.
+     */
+    private int createNewChainNode(int transitionByte, int newChild) throws 
SpaceExhaustedException
+    {
+        int newNode = allocateBlock() + LAST_POINTER_OFFSET - 1;
+        putByte(newNode, (byte) transitionByte);
+        putInt(newNode + 1, newChild);
+        // Note: this does not need a volatile write as it is a new node, 
returning a new pointer, which needs to be
+        // put in an existing node or the root. That action ends in a 
happens-before enforcing write.
+        return newNode;
+    }
+
+    /** Like {@link #createNewChainNode}, but if the new child is already a 
chain node and has room, expand
+     * it instead of creating a brand new node. */
+    private int expandOrCreateChainNode(int transitionByte, int newChild) 
throws SpaceExhaustedException
+    {
+        if (isExpandableChain(newChild))
+        {
+            // attach as a new character in child node
+            int newNode = newChild - 1;
+            putByte(newNode, (byte) transitionByte);
+            return newNode;
+        }
+
+        return createNewChainNode(transitionByte, newChild);
+    }
+
+    private int createEmptySplitNode() throws SpaceExhaustedException
+    {
+        return allocateBlock() + SPLIT_OFFSET;
+    }
+
+    private int createContentNode(int contentIndex, int child, boolean 
isSafeChain) throws SpaceExhaustedException
+    {
+        assert !isLeaf(child);
+        if (isNull(child))
+            return ~contentIndex;

Review Comment:
   nit: Is this case tested via TrieMemtable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to