maedhroz commented on code in PR #1723: URL: https://github.com/apache/cassandra/pull/1723#discussion_r983724344
########## src/java/org/apache/cassandra/db/tries/MemtableTrie.java: ########## @@ -0,0 +1,1039 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.tries; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.atomic.AtomicReferenceArray; + +import com.google.common.annotations.VisibleForTesting; + +import org.agrona.concurrent.UnsafeBuffer; +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.bytecomparable.ByteSource; +import org.apache.cassandra.utils.bytecomparable.ByteComparable; +import org.apache.cassandra.utils.ObjectSizes; +import org.github.jamm.MemoryLayoutSpecification; + +/** + * Memtable trie, i.e. an in-memory trie built for fast modification and reads executing concurrently with writes from + * a single mutator thread. + * + * Writes to this should be atomic (i.e. reads should see either the content before the write, or the content after the + * write; if any read sees the write, then any subsequent (i.e. started after it completed) read should also see it). + * This implementation does not currently guarantee this, but we still get the desired result as `apply` is only used + * with singleton tries. + */ +public class MemtableTrie<T> extends MemtableReadTrie<T> +{ + // See the trie format description in MemtableReadTrie. + + /** + * Trie size limit. This is not enforced, but users must check from time to time that it is not exceeded (using + * reachedAllocatedSizeThreshold()) and start switching to a new trie if it is. + * This must be done to avoid tries growing beyond their hard 2GB size limit (due to the 32-bit pointers). + */ + private static final int ALLOCATED_SIZE_THRESHOLD; + static + { + String propertyName = "dse.trie_size_limit_mb"; + // Default threshold + 10% == 1 GB. Adjusted slightly up to avoid a tiny final allocation for the 2G max. + int limitInMB = Integer.parseInt(System.getProperty(propertyName, + Integer.toString(1024 * 10 / 11 + 1))); + if (limitInMB < 1 || limitInMB > 2047) + throw new AssertionError(propertyName + " must be within 1 and 2047"); + ALLOCATED_SIZE_THRESHOLD = 1024 * 1024 * limitInMB; + } + + private int allocatedPos = 0; + private int contentCount = 0; + + private final BufferType bufferType; // on or off heap + + // constants for space calculations + private static final long EMPTY_SIZE_ON_HEAP; + private static final long EMPTY_SIZE_OFF_HEAP; + private static final long REFERENCE_ARRAY_ON_HEAP_SIZE = ObjectSizes.measureDeep(new AtomicReferenceArray<>(0)); + + static + { + MemtableTrie<Object> empty = new MemtableTrie<>(BufferType.ON_HEAP); + EMPTY_SIZE_ON_HEAP = ObjectSizes.measureDeep(empty); + empty = new MemtableTrie<>(BufferType.OFF_HEAP); + EMPTY_SIZE_OFF_HEAP = ObjectSizes.measureDeep(empty); + } + + public MemtableTrie(BufferType bufferType) + { + super(new UnsafeBuffer[31 - BUF_START_SHIFT], // last one is 1G for a total of ~2G bytes + new AtomicReferenceArray[29 - CONTENTS_START_SHIFT], // takes at least 4 bytes to write pointer to one content -> 4 times smaller than buffers + NONE); + this.bufferType = bufferType; + assert INITIAL_BUFFER_CAPACITY % BLOCK_SIZE == 0; + } + + // Buffer, content list and block management + + public static class SpaceExhaustedException extends Exception + { + public SpaceExhaustedException() + { + super("The hard 2GB limit on trie size has been exceeded"); + } + } + + final void putInt(int pos, int value) + { + getChunk(pos).putInt(inChunkPointer(pos), value); + } + + final void putIntOrdered(int pos, int value) + { + getChunk(pos).putIntOrdered(inChunkPointer(pos), value); + } + + final void putIntVolatile(int pos, int value) + { + getChunk(pos).putIntVolatile(inChunkPointer(pos), value); + } + + final void putShort(int pos, short value) + { + getChunk(pos).putShort(inChunkPointer(pos), value); + } + + final void putShortVolatile(int pos, short value) + { + getChunk(pos).putShort(inChunkPointer(pos), value); + } + + final void putByte(int pos, byte value) + { + getChunk(pos).putByte(inChunkPointer(pos), value); + } + + + private int allocateBlock() throws SpaceExhaustedException + { + // Note: If this method is modified, please run MemtableTrieTest.testOver1GSize to verify it acts correctly + // close to the 2G limit. + int v = allocatedPos; + if (inChunkPointer(v) == 0) + { + int leadBit = getChunkIdx(v, BUF_START_SHIFT, BUF_START_SIZE); + if (leadBit == 31) + throw new SpaceExhaustedException(); + + assert buffers[leadBit] == null; + ByteBuffer newBuffer = bufferType.allocate(BUF_START_SIZE << leadBit); + buffers[leadBit] = new UnsafeBuffer(newBuffer); + // The above does not contain any happens-before enforcing writes, thus at this point the new buffer may be + // invisible to any concurrent readers. Touching the volatile root pointer (which any new read must go + // through) enforces a happens-before that makes it visible to all new reads (note: when the write completes + // it must do some volatile write, but that will be in the new buffer and without the line below could + // remain unreachable by other cores). + root = root; + } + + allocatedPos += BLOCK_SIZE; + return v; + } + + private int addContent(T value) + { + int index = contentCount++; + int leadBit = getChunkIdx(index, CONTENTS_START_SHIFT, CONTENTS_START_SIZE); + int ofs = inChunkPointer(index, leadBit, CONTENTS_START_SIZE); + AtomicReferenceArray<T> array = contentArrays[leadBit]; + if (array == null) + { + assert ofs == 0; + contentArrays[leadBit] = array = new AtomicReferenceArray<>(CONTENTS_START_SIZE << leadBit); + } + array.lazySet(ofs, value); // no need for a volatile set here; at this point the item is not referenced + // by any node in the trie, and a volatile set will be made to reference it. + return index; + } + + private void setContent(int index, T value) + { + int leadBit = getChunkIdx(index, CONTENTS_START_SHIFT, CONTENTS_START_SIZE); + int ofs = inChunkPointer(index, leadBit, CONTENTS_START_SIZE); + AtomicReferenceArray<T> array = contentArrays[leadBit]; + array.set(ofs, value); + } + + public void discardBuffers() + { + if (bufferType == BufferType.ON_HEAP) + return; // no cleaning needed + + for (UnsafeBuffer b : buffers) + { + if (b != null) + FileUtils.clean(b.byteBuffer()); + } + } + + // Write methods + + // Write visibility model: writes are not volatile, with the exception of the final write before a call returns + // the same value that was present before (e.g. content was updated in-place / existing node got a new child or had + // a child pointer updated); if the whole path including the root node changed, the root itself gets a volatile + // write. + // This final write is the point where any new cells created during the write become visible for readers for the + // first time, and such readers must pass through reading that pointer, which forces a happens-before relationship + // that extends to all values written by this thread before it. + + /** + * Attach a child to the given non-content node. This may be an update for an existing branch, or a new child for + * the node. An update _is_ required (i.e. this is only called when the newChild pointer is not the same as the + * existing value). + */ + private int attachChild(int node, int trans, int newChild) throws SpaceExhaustedException + { + if (isLeaf(node)) + throw new AssertionError("attachChild cannot be used on content nodes."); + + switch (offset(node)) + { + case PREFIX_OFFSET: + throw new AssertionError("attachChild cannot be used on content nodes."); + case SPARSE_OFFSET: + return attachChildToSparse(node, trans, newChild); + case SPLIT_OFFSET: + attachChildToSplit(node, trans, newChild); + return node; + case LAST_POINTER_OFFSET - 1: + // If this is the last character in a Chain block, we can modify the child in-place + if (trans == getUnsignedByte(node)) + { + putIntVolatile(node + 1, newChild); + return node; + } + // else pass through + default: + return attachChildToChain(node, trans, newChild); + } + } + + /** + * Attach a child to the given split node. This may be an update for an existing branch, or a new child for the node. + */ + private void attachChildToSplit(int node, int trans, int newChild) throws SpaceExhaustedException + { + int midPos = splitBlockPointerAddress(node, splitNodeMidIndex(trans), SPLIT_START_LEVEL_LIMIT); + int mid = getInt(midPos); + if (isNull(mid)) + { + mid = createEmptySplitNode(); + int tailPos = splitBlockPointerAddress(mid, splitNodeTailIndex(trans), SPLIT_OTHER_LEVEL_LIMIT); + int tail = createEmptySplitNode(); + int childPos = splitBlockPointerAddress(tail, splitNodeChildIndex(trans), SPLIT_OTHER_LEVEL_LIMIT); + putInt(childPos, newChild); + putInt(tailPos, tail); + putIntVolatile(midPos, mid); + return; + } + + int tailPos = splitBlockPointerAddress(mid, splitNodeTailIndex(trans), SPLIT_OTHER_LEVEL_LIMIT); + int tail = getInt(tailPos); + if (isNull(tail)) + { + tail = createEmptySplitNode(); + int childPos = splitBlockPointerAddress(tail, splitNodeChildIndex(trans), SPLIT_OTHER_LEVEL_LIMIT); + putInt(childPos, newChild); + putIntVolatile(tailPos, tail); + return; + } + + int childPos = splitBlockPointerAddress(tail, splitNodeChildIndex(trans), SPLIT_OTHER_LEVEL_LIMIT); + putIntVolatile(childPos, newChild); + } + + /** + * Attach a child to the given sparse node. This may be an update for an existing branch, or a new child for the node. + */ + private int attachChildToSparse(int node, int trans, int newChild) throws SpaceExhaustedException + { + int i; + // first check if this is an update and modify in-place if so + for (i = 0; i < SPARSE_CHILD_COUNT; ++i) + { + if (isNull(getInt(node + SPARSE_CHILDREN_OFFSET + i * 4))) + break; + if ((getUnsignedByte(node + SPARSE_BYTES_OFFSET + i)) == trans) + { + putIntVolatile(node + SPARSE_CHILDREN_OFFSET + i * 4, newChild); + return node; + } + } + + if (i == SPARSE_CHILD_COUNT) + { + // Node is full. Switch to split + int split = createEmptySplitNode(); + for (i = 0; i < SPARSE_CHILD_COUNT; ++i) + { + int t = getUnsignedByte(node + SPARSE_BYTES_OFFSET + i); + int p = getInt(node + SPARSE_CHILDREN_OFFSET + i * 4); + attachChildToSplitNonVolatile(split, t, p); + } + attachChildToSplitNonVolatile(split, trans, newChild); + return split; + } + + // Add a new transition. They are not kept in order, so append it at the first free position. + putByte(node + SPARSE_BYTES_OFFSET + i, (byte) trans); + + // Update order word. + int order = getUnsignedShort(node + SPARSE_ORDER_OFFSET); + int newOrder = insertInOrderWord(order, i, trans, node + SPARSE_BYTES_OFFSET); + + // Sparse nodes have two access modes: via the order word, when listing transitions, or directly to characters + // and addresses. + // To support the former, we volatile write to the order word last, and everything is correctly set up. + // The latter does not touch the order word. To support that too, we volatile write the address, as the reader + // can't determine if the position is in use based on the character byte alone (00 is also a valid transition). + // Note that this means that reader must check the transition byte AFTER the address, to ensure they get the + // correct value (see getSparseChild). + + // setting child enables reads to start seeing the new branch + putIntVolatile(node + SPARSE_CHILDREN_OFFSET + i * 4, newChild); + + // some readers will decide whether to check the pointer based on the order word + // write that volatile to make sure they see the new change too + putShortVolatile(node + SPARSE_ORDER_OFFSET, (short) newOrder); + return node; + } + + /** + * Insert the given newIndex in the base-6 encoded order word in the correct position with respect to the ordering. + * + * E.g. if the existing bytes were 20, 50, 30 with order word 120 (decimal 48), then + * - insertOrderWord(120, 3, 5, ptr) must return 1203 (decimal 48*6 + 3) + * - insertOrderWord(120, 3, 25, ptr) must return 1230 (decimal 8*36 + 3*6 + 0) + * - insertOrderWord(120, 3, 35, ptr) must return 1320 (decimal 1*216 + 3*36 + 12) + * - insertOrderWord(120, 3, 55, ptr) must return 3120 (decimal 3*216 + 48) + */ + private int insertInOrderWord(int order, int newIndex, int transitionByte, int bytesPosition) + { + int s = order; + int r = 1; + while (s != 0) + { + int b = getUnsignedByte(bytesPosition + s % SPARSE_CHILD_COUNT); + if (b > transitionByte) + break; + + assert b < transitionByte; + r *= 6; + s /= 6; + } + // insert i after the ones we have passed (order % r) and before the remaining (s) + return order % r + (s * 6 + newIndex) * r; + } + + /** + * Non-volatile version of attachChildToSplit. Used when the split node is not reachable yet (during the conversion + * from sparse). + */ + private void attachChildToSplitNonVolatile(int node, int trans, int newChild) throws SpaceExhaustedException + { + assert offset(node) == SPLIT_OFFSET; + int midPos = splitBlockPointerAddress(node, splitNodeMidIndex(trans), SPLIT_START_LEVEL_LIMIT); + int mid = getInt(midPos); + if (isNull(mid)) + { + mid = createEmptySplitNode(); + putInt(midPos, mid); + } + + assert offset(mid) == SPLIT_OFFSET; + int tailPos = splitBlockPointerAddress(mid, splitNodeTailIndex(trans), SPLIT_OTHER_LEVEL_LIMIT); + int tail = getInt(tailPos); + if (isNull(tail)) + { + tail = createEmptySplitNode(); + putInt(tailPos, tail); + } + + assert offset(tail) == SPLIT_OFFSET; + int childPos = splitBlockPointerAddress(tail, splitNodeChildIndex(trans), SPLIT_OTHER_LEVEL_LIMIT); + putInt(childPos, newChild); + } + + /** + * Attach a child to the given chain node. This may be an update for an existing branch with different target + * address, or a second child for the node. + * This method always copies the node -- with the exception of updates that change the child of the last node in a + * chain block with matching transition byte (which this method is not used for, see attachChild), modifications to + * chain nodes cannot be done in place, either because we introduce a new transition byte and have to convert from + * the single-transition chain type to sparse, or because we have to remap the child from the implicit node + 1 to + * something else. + */ + private int attachChildToChain(int node, int transitionByte, int newChild) throws SpaceExhaustedException + { + int existingByte = getUnsignedByte(node); + if (transitionByte == existingByte) + { + // This will only be called if new child is different from old, and the update is not on the final child + // where we can change it in place (see attachChild). We must always create something new. + // If the child is a chain, we can expand it (since it's a different value, its branch must be new and + // nothing can already reside in the rest of the block). + return expandOrCreateChainNode(transitionByte, newChild); + } + + // The new transition is different, so we no longer have only one transition. Change type. + int existingChild = node + 1; + if (offset(existingChild) == LAST_POINTER_OFFSET) + { + existingChild = getInt(existingChild); + } + return createSparseNode(existingByte, existingChild, transitionByte, newChild); + } + + private boolean isExpandableChain(int newChild) + { + int newOffset = offset(newChild); + return newChild > 0 && newChild - 1 > NONE && newOffset > CHAIN_MIN_OFFSET && newOffset <= CHAIN_MAX_OFFSET; + } + + /** + * Create a sparse node with two children. + */ + private int createSparseNode(int byte1, int child1, int byte2, int child2) throws SpaceExhaustedException + { + assert byte1 != byte2; + if (byte1 > byte2) + { + // swap them so the smaller is byte1, i.e. there's always something bigger than child 0 so 0 never is + // at the end of the order + int t = byte1; byte1 = byte2; byte2 = t; + t = child1; child1 = child2; child2 = t; + } + + int node = allocateBlock() + SPARSE_OFFSET; + putByte(node + SPARSE_BYTES_OFFSET + 0, (byte) byte1); + putByte(node + SPARSE_BYTES_OFFSET + 1, (byte) byte2); + putInt(node + SPARSE_CHILDREN_OFFSET + 0 * 4, child1); + putInt(node + SPARSE_CHILDREN_OFFSET + 1 * 4, child2); + putShort(node + SPARSE_ORDER_OFFSET, (short) (1 * 6 + 0)); + // Note: this does not need a volatile write as it is a new node, returning a new pointer, which needs to be + // put in an existing node or the root. That action ends in a happens-before enforcing write. + return node; + } + + /** + * Creates a chain node with the single provided transition (pointing to the provided child). + * Note that to avoid creating inefficient tries with under-utilized chain nodes, this should only be called from + * {@link #expandOrCreateChainNode} and other call-sites should call {@link #expandOrCreateChainNode}. + */ + private int createNewChainNode(int transitionByte, int newChild) throws SpaceExhaustedException + { + int newNode = allocateBlock() + LAST_POINTER_OFFSET - 1; + putByte(newNode, (byte) transitionByte); + putInt(newNode + 1, newChild); + // Note: this does not need a volatile write as it is a new node, returning a new pointer, which needs to be + // put in an existing node or the root. That action ends in a happens-before enforcing write. + return newNode; + } + + /** Like {@link #createNewChainNode}, but if the new child is already a chain node and has room, expand + * it instead of creating a brand new node. */ + private int expandOrCreateChainNode(int transitionByte, int newChild) throws SpaceExhaustedException + { + if (isExpandableChain(newChild)) + { + // attach as a new character in child node + int newNode = newChild - 1; + putByte(newNode, (byte) transitionByte); + return newNode; + } + + return createNewChainNode(transitionByte, newChild); + } + + private int createEmptySplitNode() throws SpaceExhaustedException + { + return allocateBlock() + SPLIT_OFFSET; + } + + private int createContentNode(int contentIndex, int child, boolean isSafeChain) throws SpaceExhaustedException + { + assert !isLeaf(child); + if (isNull(child)) + return ~contentIndex; Review Comment: nit: Is this case tested via TrieMemtable? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

