adelapena commented on code in PR #1723: URL: https://github.com/apache/cassandra/pull/1723#discussion_r970644316
########## test/unit/org/apache/cassandra/db/memtable/MemtableSizeTestBase.java: ########## @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.memtable; + +import java.lang.reflect.Field; +import java.util.List; + +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runners.Parameterized; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.Util; +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.utils.FBUtilities; +import org.github.jamm.MemoryMeter; + +// Note: This test can be run in idea with the allocation type configured in the test yaml and memtable using the +// value memtableClass is initialized with. +public class MemtableSizeTestBase extends CQLTester +{ + // Note: To see a printout of the usage for each object, add .enableDebug() here (most useful with smaller number of + // partitions). + static MemoryMeter meter = new MemoryMeter().ignoreKnownSingletons() + .withGuessing(MemoryMeter.Guess.FALLBACK_UNSAFE); + + static final Logger logger = LoggerFactory.getLogger(MemtableSizeTestBase.class); + + static final int partitions = 50_000; + static final int rowsPerPartition = 4; + + static final int deletedPartitions = 10_000; + static final int deletedRows = 5_000; + + @Parameterized.Parameter(0) + public String memtableClass = "skiplist"; + + @Parameterized.Parameters(name = "{0}") + public static List<Object> parameters() + { + return ImmutableList.of("skiplist", + "skiplist_sharded", + "trie"); + } + + // Must be within 3% of the real usage. We are actually more precise than this, but the threshold is set higher to + // avoid flakes. For on-heap allocators we allow for extra overheads below. + final int MAX_DIFFERENCE_PERCENT = 3; + // Slab overhead, added when the memtable uses heap_buffers. + final int SLAB_OVERHEAD = 1024 * 1024; + + public static void setup(Config.MemtableAllocationType allocationType) + { + try + { + Field confField = DatabaseDescriptor.class.getDeclaredField("conf"); + confField.setAccessible(true); + Config conf = (Config) confField.get(null); + conf.memtable_allocation_type = allocationType; + conf.memtable_cleanup_threshold = 0.8f; // give us more space to fit test data without flushing + } + catch (NoSuchFieldException | IllegalAccessException e) + { + throw Throwables.propagate(e); Review Comment: `Throwables.propagate` is deprecated, info [here](https://github.com/google/guava/wiki/Why-we-deprecated-Throwables.propagate). We should probably just do `throw new RuntimeException(e)`. ########## src/java/org/apache/cassandra/db/memtable/TrieMemtable.java: ########## @@ -0,0 +1,793 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.memtable; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Throwables; +import com.google.common.collect.Iterators; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.BufferDecoratedKey; +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DataRange; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.DeletionInfo; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.db.RegularAndStaticColumns; +import org.apache.cassandra.db.Slices; +import org.apache.cassandra.db.commitlog.CommitLogPosition; +import org.apache.cassandra.db.filter.ClusteringIndexFilter; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator; +import org.apache.cassandra.db.partitions.BTreePartitionData; +import org.apache.cassandra.db.partitions.BTreePartitionUpdater; +import org.apache.cassandra.db.partitions.ImmutableBTreePartition; +import org.apache.cassandra.db.partitions.Partition; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.EncodingStats; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.tries.MemtableTrie; +import org.apache.cassandra.db.tries.Trie; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.Bounds; +import org.apache.cassandra.dht.IncludingExcludingBounds; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.index.transactions.UpdateTransaction; +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.sstable.format.SSTableReadsListener; +import org.apache.cassandra.metrics.TableMetrics; +import org.apache.cassandra.metrics.TrieMemtableMetricsView; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.TableMetadataRef; +import org.apache.cassandra.utils.Clock; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MBeanWrapper; +import org.apache.cassandra.utils.bytecomparable.ByteComparable; +import org.apache.cassandra.utils.bytecomparable.ByteSource; +import org.apache.cassandra.utils.concurrent.OpOrder; +import org.apache.cassandra.utils.memory.EnsureOnHeap; +import org.apache.cassandra.utils.memory.MemtableAllocator; +import org.github.jamm.Unmetered; + +public class TrieMemtable extends AbstractAllocatorMemtable +{ + private static final Logger logger = LoggerFactory.getLogger(TrieMemtable.class); + public static final String TRIE_MEMTABLE_CONFIG_OBJECT_NAME = "org.apache.cassandra.db:type=TrieMemtableConfig"; + + /** Buffer type to use for memtable tries (on- vs off-heap) */ + public static final BufferType BUFFER_TYPE; + + public static final String SHARDS_OPTION = "shards"; + + static + { + switch (DatabaseDescriptor.getMemtableAllocationType()) + { + case unslabbed_heap_buffers: + case heap_buffers: + BUFFER_TYPE = BufferType.ON_HEAP; + break; + case offheap_buffers: + case offheap_objects: + BUFFER_TYPE = BufferType.OFF_HEAP; + break; + default: + throw new AssertionError(); + } + + MBeanWrapper.instance.registerMBean(new TrieMemtableConfig(), TRIE_MEMTABLE_CONFIG_OBJECT_NAME, MBeanWrapper.OnException.LOG); + } + + /** If keys is below this length, we will use a recursive procedure for inserting data in the memtable trie. */ + @VisibleForTesting + public static final int MAX_RECURSIVE_KEY_LENGTH = 128; + + /** The byte-ordering conversion version to use for memtables. */ + public static final ByteComparable.Version BYTE_COMPARABLE_VERSION = ByteComparable.Version.OSS42; + + // Set to true when the memtable requests a switch (e.g. for trie size limit being reached) to ensure only one + // thread calls cfs.switchMemtableIfCurrent. + private AtomicBoolean switchRequested = new AtomicBoolean(false); + + + // The boundaries for the keyspace as they were calculated when the memtable is created. + // The boundaries will be NONE for system keyspaces or if StorageService is not yet initialized. + // The fact this is fixed for the duration of the memtable lifetime, guarantees we'll always pick the same core + // for the a given key, even if we race with the StorageService initialization or with topology changes. + @Unmetered + private final ShardBoundaries boundaries; + + /** + * Core-specific memtable regions. All writes must go through the specific core. The data structures used + * are concurrent-read safe, thus reads can be carried out from any thread. + */ + private final MemtableShard[] shards; + + /** + * A merged view of the memtable map. Used for partition range queries and flush. + * For efficiency we serve single partition requests off the shard which offers more direct MemtableTrie methods. + */ + private final Trie<BTreePartitionData> mergedTrie; + + @Unmetered + private final TrieMemtableMetricsView metrics; + + @VisibleForTesting + public static final String SHARD_COUNT_PROPERTY = "cassandra.trie.memtable.shard.count"; + + // default shard count, used when a specific number of shards is not specified in the options + private static volatile int SHARD_COUNT = Integer.getInteger(SHARD_COUNT_PROPERTY, FBUtilities.getAvailableProcessors()); + + // only to be used by init(), to setup the very first memtable for the cfs + TrieMemtable(AtomicReference<CommitLogPosition> commitLogLowerBound, TableMetadataRef metadataRef, Owner owner, Integer shardCountOption) + { + super(commitLogLowerBound, metadataRef, owner); + int shardCount = shardCountOption != null ? shardCountOption : getShardCount(); + this.boundaries = owner.localRangeSplits(shardCount); + this.metrics = new TrieMemtableMetricsView(metadataRef.keyspace, metadataRef.name); + this.shards = generatePartitionShards(boundaries.shardCount(), allocator, metadataRef, metrics); + this.mergedTrie = makeMergedTrie(shards); + } + + private static MemtableShard[] generatePartitionShards(int splits, + MemtableAllocator allocator, + TableMetadataRef metadata, + TrieMemtableMetricsView metrics) + { + MemtableShard[] partitionMapContainer = new MemtableShard[splits]; + for (int i = 0; i < splits; i++) + partitionMapContainer[i] = new MemtableShard(metadata, allocator, metrics); + + return partitionMapContainer; + } + + private static Trie<BTreePartitionData> makeMergedTrie(MemtableShard[] shards) + { + List<Trie<BTreePartitionData>> tries = new ArrayList<>(shards.length); + for (MemtableShard shard : shards) + tries.add(shard.data); + return Trie.mergeDistinct(tries); + } + + @Override + public boolean isClean() + { + for (MemtableShard shard : shards) + if (!shard.isEmpty()) + return false; + return true; + } + + @Override + public void discard() + { + super.discard(); + // metrics here are not thread safe, but I think we can live with that + metrics.lastFlushShardDataSizes.reset(); + for (MemtableShard shard : shards) + { + metrics.lastFlushShardDataSizes.update(shard.liveDataSize()); + } + for (MemtableShard shard : shards) + { + shard.data.discardBuffers(); + } + } + + /** + * Should only be called by ColumnFamilyStore.apply via Keyspace.apply, which supplies the appropriate + * OpOrdering. + * + * commitLogSegmentPosition should only be null if this is a secondary index, in which case it is *expected* to be null + */ + @Override + public long put(PartitionUpdate update, UpdateTransaction indexer, OpOrder.Group opGroup) + { + DecoratedKey key = update.partitionKey(); + MemtableShard shard = shards[boundaries.getShardForKey(key)]; + long colUpdateTimeDelta = shard.put(key, update, indexer, opGroup); + + if (shard.data.reachedAllocatedSizeThreshold() && !switchRequested.getAndSet(true)) + { + logger.info("Scheduling flush due to trie size limit reached."); + owner.signalFlushRequired(this, ColumnFamilyStore.FlushReason.MEMTABLE_LIMIT); + } + + return colUpdateTimeDelta; + } + + /** + * Technically we should scatter gather on all the core threads because the size in following calls are not + * using volatile variables, but for metrics purpose this should be good enough. + */ + @Override + public long getLiveDataSize() + { + long total = 0L; + for (MemtableShard shard : shards) + total += shard.liveDataSize(); + return total; + } + + @Override + public long operationCount() + { + long total = 0L; + for (MemtableShard shard : shards) + total += shard.currentOperations(); + return total; + } + + @Override + public long partitionCount() + { + int total = 0; + for (MemtableShard shard : shards) + total += shard.size(); + return total; + } + + @Override + public long getMinTimestamp() + { + long min = Long.MAX_VALUE; + for (MemtableShard shard : shards) + min = Long.min(min, shard.minTimestamp()); + return min; + } + + @Override + public int getMinLocalDeletionTime() + { + int min = Integer.MAX_VALUE; + for (MemtableShard shard : shards) + min = Integer.min(min, shard.minLocalDeletionTime()); + return min; + } + + @Override + RegularAndStaticColumns columns() + { + for (MemtableShard shard : shards) + columnsCollector.update(shard.columnsCollector); + return columnsCollector.get(); + } + + @Override + EncodingStats encodingStats() + { + for (MemtableShard shard : shards) + statsCollector.update(shard.statsCollector.get()); + return statsCollector.get(); + } + + @Override + public MemtableUnfilteredPartitionIterator partitionIterator(final ColumnFilter columnFilter, + final DataRange dataRange, + SSTableReadsListener readsListener) + { + AbstractBounds<PartitionPosition> keyRange = dataRange.keyRange(); + + PartitionPosition left = keyRange.left; + PartitionPosition right = keyRange.right; + if (left.isMinimum()) + left = null; + if (right.isMinimum()) + right = null; + + boolean isBound = keyRange instanceof Bounds; + boolean includeStart = isBound || keyRange instanceof IncludingExcludingBounds; + boolean includeStop = isBound || keyRange instanceof Range; + + Trie<BTreePartitionData> subMap = mergedTrie.subtrie(left, includeStart, right, includeStop); + + return new MemtableUnfilteredPartitionIterator(metadata(), + allocator.ensureOnHeap(), + subMap, + columnFilter, + dataRange); + // readsListener is ignored as it only accepts sstable signals + } + + private Partition getPartition(DecoratedKey key) + { + int shardIndex = boundaries.getShardForKey(key); + BTreePartitionData data = shards[shardIndex].data.get(key); + if (data != null) + return createPartition(metadata(), allocator.ensureOnHeap(), key, data); + else + return null; + } + + @Override + public UnfilteredRowIterator rowIterator(DecoratedKey key, Slices slices, ColumnFilter selectedColumns, boolean reversed, SSTableReadsListener listener) + { + Partition p = getPartition(key); + if (p == null) + return null; + else + return p.unfilteredIterator(selectedColumns, slices, reversed); + } + + @Override + public UnfilteredRowIterator rowIterator(DecoratedKey key) + { + Partition p = getPartition(key); + return p != null ? p.unfilteredIterator() : null; + } + + private static MemtablePartition createPartition(TableMetadata metadata, EnsureOnHeap ensureOnHeap, DecoratedKey key, BTreePartitionData data) + { + return new MemtablePartition(metadata, ensureOnHeap, key, data); + } + + private static MemtablePartition getPartitionFromTrieEntry(TableMetadata metadata, EnsureOnHeap ensureOnHeap, Map.Entry<ByteComparable, BTreePartitionData> en) + { + DecoratedKey key = BufferDecoratedKey.fromByteComparable(en.getKey(), + BYTE_COMPARABLE_VERSION, + metadata.partitioner); + return createPartition(metadata, ensureOnHeap, key, en.getValue()); + } + + + @Override + public FlushablePartitionSet<MemtablePartition> getFlushSet(PartitionPosition from, PartitionPosition to) + { + Trie<BTreePartitionData> toFlush = mergedTrie.subtrie(from, true, to, false); + long keySize = 0; + int keyCount = 0; + + for (Iterator<Map.Entry<ByteComparable, BTreePartitionData>> it = toFlush.entryIterator(); it.hasNext(); ) + { + Map.Entry<ByteComparable, BTreePartitionData> en = it.next(); + byte[] keyBytes = DecoratedKey.keyFromByteSource(ByteSource.peekable(en.getKey().asComparableBytes(BYTE_COMPARABLE_VERSION)), + BYTE_COMPARABLE_VERSION, + metadata().partitioner); + keySize += keyBytes.length; + keyCount++; + } + long partitionKeySize = keySize; + int partitionCount = keyCount; + + return new AbstractFlushablePartitionSet<MemtablePartition>() + { + public Memtable memtable() + { + return TrieMemtable.this; + } + + public PartitionPosition from() + { + return from; + } + + public PartitionPosition to() + { + return to; + } + + public long partitionCount() + { + return partitionCount; + } + + public Iterator<MemtablePartition> iterator() + { + return Iterators.transform(toFlush.entryIterator(), + // During flushing we are certain the memtable will remain at least until + // the flush completes. No copying to heap is necessary. + entry -> getPartitionFromTrieEntry(metadata(), EnsureOnHeap.NOOP, entry)); + } + + public long partitionKeysSize() + { + return partitionKeySize; + } + }; + } + + static class MemtableShard + { + // The following fields are volatile as we have to make sure that when we + // collect results from all sub-ranges, the thread accessing the value + // is guaranteed to see the changes to the values. + + // The smallest timestamp for all partitions stored in this shard + private volatile long minTimestamp = Long.MAX_VALUE; + + private volatile int minLocalDeletionTime = Integer.MAX_VALUE; + + private volatile long liveDataSize = 0; + + private volatile long currentOperations = 0; + + @Unmetered + private ReentrantLock writeLock = new ReentrantLock(); + + // Content map for the given shard. This is implemented as a memtable trie which uses the prefix-free + // byte-comparable ByteSource representations of the keys to address the partitions. + // + // This map is used in a single-producer, multi-consumer fashion: only one thread will insert items but + // several threads may read from it and iterate over it. Iterators are created when a the first item of + // a flow is requested for example, and then used asynchronously when sub-sequent items are requested. + // + // Therefore, iterators should not throw ConcurrentModificationExceptions if the underlying map is modified + // during iteration, they should provide a weakly consistent view of the map instead. + // + // Also, this data is backed by memtable memory, when accessing it callers must specify if it can be accessed + // unsafely, meaning that the memtable will not be discarded as long as the data is used, or whether the data + // should be copied on heap for off-heap allocators. + @VisibleForTesting + final MemtableTrie<BTreePartitionData> data; + + private final ColumnsCollector columnsCollector; + + private final StatsCollector statsCollector; + + @Unmetered // total pool size should not be included in memtable's deep size + private final MemtableAllocator allocator; + + @Unmetered + private final TrieMemtableMetricsView metrics; + + @VisibleForTesting + MemtableShard(TableMetadataRef metadata, MemtableAllocator allocator, TrieMemtableMetricsView metrics) + { + this.data = new MemtableTrie<>(BUFFER_TYPE); + this.columnsCollector = new AbstractMemtable.ColumnsCollector(metadata.get().regularAndStaticColumns()); + this.statsCollector = new AbstractMemtable.StatsCollector(); + this.allocator = allocator; + this.metrics = metrics; + } + + public long put(DecoratedKey key, PartitionUpdate update, UpdateTransaction indexer, OpOrder.Group opGroup) + { + BTreePartitionUpdater updater = new BTreePartitionUpdater(allocator, allocator.cloner(opGroup), opGroup, indexer); + boolean locked = writeLock.tryLock(); + if (locked) + { + metrics.uncontendedPuts.inc(); + } + else + { + metrics.contendedPuts.inc(); + long lockStartTime = Clock.Global.nanoTime(); + writeLock.lock(); + metrics.contentionTime.addNano(Clock.Global.nanoTime() - lockStartTime); + } + try + { + try + { + long onHeap = data.sizeOnHeap(); + long offHeap = data.sizeOffHeap(); + // Use the fast recursive put if we know the key is small enough to not cause a stack overflow. + try + { + data.putSingleton(key, + update, + updater::mergePartitions, + key.getKeyLength() < MAX_RECURSIVE_KEY_LENGTH); + } + catch (MemtableTrie.SpaceExhaustedException e) + { + // This should never really happen as a flush would be triggered long before this limit is reached. + throw Throwables.propagate(e); + } + allocator.offHeap().adjust(data.sizeOffHeap() - offHeap, opGroup); + allocator.onHeap().adjust(data.sizeOnHeap() - onHeap, opGroup); + } + finally + { + updateMinTimestamp(update.stats().minTimestamp); + updateMinLocalDeletionTime(update.stats().minLocalDeletionTime); + updateLiveDataSize(updater.dataSize); + updateCurrentOperations(update.operationCount()); + + // TODO: lambov 2021-03-30: check if stats are further optimisable Review Comment: Would this be done in a followup ticket? ########## src/java/org/apache/cassandra/db/memtable/TrieMemtable.java: ########## @@ -0,0 +1,793 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.memtable; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Throwables; +import com.google.common.collect.Iterators; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.BufferDecoratedKey; +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DataRange; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.DeletionInfo; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.db.RegularAndStaticColumns; +import org.apache.cassandra.db.Slices; +import org.apache.cassandra.db.commitlog.CommitLogPosition; +import org.apache.cassandra.db.filter.ClusteringIndexFilter; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator; +import org.apache.cassandra.db.partitions.BTreePartitionData; +import org.apache.cassandra.db.partitions.BTreePartitionUpdater; +import org.apache.cassandra.db.partitions.ImmutableBTreePartition; +import org.apache.cassandra.db.partitions.Partition; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.EncodingStats; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.tries.MemtableTrie; +import org.apache.cassandra.db.tries.Trie; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.Bounds; +import org.apache.cassandra.dht.IncludingExcludingBounds; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.index.transactions.UpdateTransaction; +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.sstable.format.SSTableReadsListener; +import org.apache.cassandra.metrics.TableMetrics; +import org.apache.cassandra.metrics.TrieMemtableMetricsView; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.TableMetadataRef; +import org.apache.cassandra.utils.Clock; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MBeanWrapper; +import org.apache.cassandra.utils.bytecomparable.ByteComparable; +import org.apache.cassandra.utils.bytecomparable.ByteSource; +import org.apache.cassandra.utils.concurrent.OpOrder; +import org.apache.cassandra.utils.memory.EnsureOnHeap; +import org.apache.cassandra.utils.memory.MemtableAllocator; +import org.github.jamm.Unmetered; + +public class TrieMemtable extends AbstractAllocatorMemtable +{ + private static final Logger logger = LoggerFactory.getLogger(TrieMemtable.class); + public static final String TRIE_MEMTABLE_CONFIG_OBJECT_NAME = "org.apache.cassandra.db:type=TrieMemtableConfig"; + + /** Buffer type to use for memtable tries (on- vs off-heap) */ + public static final BufferType BUFFER_TYPE; + + public static final String SHARDS_OPTION = "shards"; + + static + { + switch (DatabaseDescriptor.getMemtableAllocationType()) + { + case unslabbed_heap_buffers: + case heap_buffers: + BUFFER_TYPE = BufferType.ON_HEAP; + break; + case offheap_buffers: + case offheap_objects: + BUFFER_TYPE = BufferType.OFF_HEAP; + break; + default: + throw new AssertionError(); + } + + MBeanWrapper.instance.registerMBean(new TrieMemtableConfig(), TRIE_MEMTABLE_CONFIG_OBJECT_NAME, MBeanWrapper.OnException.LOG); + } + + /** If keys is below this length, we will use a recursive procedure for inserting data in the memtable trie. */ + @VisibleForTesting + public static final int MAX_RECURSIVE_KEY_LENGTH = 128; + + /** The byte-ordering conversion version to use for memtables. */ + public static final ByteComparable.Version BYTE_COMPARABLE_VERSION = ByteComparable.Version.OSS42; + + // Set to true when the memtable requests a switch (e.g. for trie size limit being reached) to ensure only one + // thread calls cfs.switchMemtableIfCurrent. + private AtomicBoolean switchRequested = new AtomicBoolean(false); + + + // The boundaries for the keyspace as they were calculated when the memtable is created. + // The boundaries will be NONE for system keyspaces or if StorageService is not yet initialized. + // The fact this is fixed for the duration of the memtable lifetime, guarantees we'll always pick the same core + // for the a given key, even if we race with the StorageService initialization or with topology changes. + @Unmetered + private final ShardBoundaries boundaries; + + /** + * Core-specific memtable regions. All writes must go through the specific core. The data structures used Review Comment: These aren't necessarily core-specific, are they? ########## src/java/org/apache/cassandra/db/memtable/TrieMemtableConfigMXBean.java: ########## @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.memtable; + +public interface TrieMemtableConfigMXBean +{ + /** + * Adjust the shard count for trie memtables that do not specify it explicitly in the memtable options. + * Changes will apply on the next memtable flush. + */ + public void setShardCount(String numShards); Review Comment: Shouldn't we also expose `getShardCount`? ########## src/java/org/apache/cassandra/db/memtable/TrieMemtable.java: ########## @@ -0,0 +1,793 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.memtable; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Throwables; +import com.google.common.collect.Iterators; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.BufferDecoratedKey; +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DataRange; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.DeletionInfo; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.db.RegularAndStaticColumns; +import org.apache.cassandra.db.Slices; +import org.apache.cassandra.db.commitlog.CommitLogPosition; +import org.apache.cassandra.db.filter.ClusteringIndexFilter; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator; +import org.apache.cassandra.db.partitions.BTreePartitionData; +import org.apache.cassandra.db.partitions.BTreePartitionUpdater; +import org.apache.cassandra.db.partitions.ImmutableBTreePartition; +import org.apache.cassandra.db.partitions.Partition; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.EncodingStats; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.tries.MemtableTrie; +import org.apache.cassandra.db.tries.Trie; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.Bounds; +import org.apache.cassandra.dht.IncludingExcludingBounds; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.index.transactions.UpdateTransaction; +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.sstable.format.SSTableReadsListener; +import org.apache.cassandra.metrics.TableMetrics; +import org.apache.cassandra.metrics.TrieMemtableMetricsView; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.TableMetadataRef; +import org.apache.cassandra.utils.Clock; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MBeanWrapper; +import org.apache.cassandra.utils.bytecomparable.ByteComparable; +import org.apache.cassandra.utils.bytecomparable.ByteSource; +import org.apache.cassandra.utils.concurrent.OpOrder; +import org.apache.cassandra.utils.memory.EnsureOnHeap; +import org.apache.cassandra.utils.memory.MemtableAllocator; +import org.github.jamm.Unmetered; + +public class TrieMemtable extends AbstractAllocatorMemtable +{ + private static final Logger logger = LoggerFactory.getLogger(TrieMemtable.class); + public static final String TRIE_MEMTABLE_CONFIG_OBJECT_NAME = "org.apache.cassandra.db:type=TrieMemtableConfig"; + + /** Buffer type to use for memtable tries (on- vs off-heap) */ + public static final BufferType BUFFER_TYPE; + + public static final String SHARDS_OPTION = "shards"; + + static + { + switch (DatabaseDescriptor.getMemtableAllocationType()) + { + case unslabbed_heap_buffers: + case heap_buffers: + BUFFER_TYPE = BufferType.ON_HEAP; + break; + case offheap_buffers: + case offheap_objects: + BUFFER_TYPE = BufferType.OFF_HEAP; + break; + default: + throw new AssertionError(); + } + + MBeanWrapper.instance.registerMBean(new TrieMemtableConfig(), TRIE_MEMTABLE_CONFIG_OBJECT_NAME, MBeanWrapper.OnException.LOG); + } + + /** If keys is below this length, we will use a recursive procedure for inserting data in the memtable trie. */ + @VisibleForTesting + public static final int MAX_RECURSIVE_KEY_LENGTH = 128; + + /** The byte-ordering conversion version to use for memtables. */ + public static final ByteComparable.Version BYTE_COMPARABLE_VERSION = ByteComparable.Version.OSS42; + + // Set to true when the memtable requests a switch (e.g. for trie size limit being reached) to ensure only one + // thread calls cfs.switchMemtableIfCurrent. + private AtomicBoolean switchRequested = new AtomicBoolean(false); + + + // The boundaries for the keyspace as they were calculated when the memtable is created. + // The boundaries will be NONE for system keyspaces or if StorageService is not yet initialized. + // The fact this is fixed for the duration of the memtable lifetime, guarantees we'll always pick the same core + // for the a given key, even if we race with the StorageService initialization or with topology changes. + @Unmetered + private final ShardBoundaries boundaries; + + /** + * Core-specific memtable regions. All writes must go through the specific core. The data structures used + * are concurrent-read safe, thus reads can be carried out from any thread. + */ + private final MemtableShard[] shards; + + /** + * A merged view of the memtable map. Used for partition range queries and flush. + * For efficiency we serve single partition requests off the shard which offers more direct MemtableTrie methods. + */ + private final Trie<BTreePartitionData> mergedTrie; + + @Unmetered + private final TrieMemtableMetricsView metrics; + + @VisibleForTesting + public static final String SHARD_COUNT_PROPERTY = "cassandra.trie.memtable.shard.count"; + + // default shard count, used when a specific number of shards is not specified in the options + private static volatile int SHARD_COUNT = Integer.getInteger(SHARD_COUNT_PROPERTY, FBUtilities.getAvailableProcessors()); + + // only to be used by init(), to setup the very first memtable for the cfs + TrieMemtable(AtomicReference<CommitLogPosition> commitLogLowerBound, TableMetadataRef metadataRef, Owner owner, Integer shardCountOption) + { + super(commitLogLowerBound, metadataRef, owner); + int shardCount = shardCountOption != null ? shardCountOption : getShardCount(); + this.boundaries = owner.localRangeSplits(shardCount); + this.metrics = new TrieMemtableMetricsView(metadataRef.keyspace, metadataRef.name); + this.shards = generatePartitionShards(boundaries.shardCount(), allocator, metadataRef, metrics); + this.mergedTrie = makeMergedTrie(shards); + } + + private static MemtableShard[] generatePartitionShards(int splits, + MemtableAllocator allocator, + TableMetadataRef metadata, + TrieMemtableMetricsView metrics) + { + MemtableShard[] partitionMapContainer = new MemtableShard[splits]; + for (int i = 0; i < splits; i++) + partitionMapContainer[i] = new MemtableShard(metadata, allocator, metrics); + + return partitionMapContainer; + } + + private static Trie<BTreePartitionData> makeMergedTrie(MemtableShard[] shards) + { + List<Trie<BTreePartitionData>> tries = new ArrayList<>(shards.length); + for (MemtableShard shard : shards) + tries.add(shard.data); + return Trie.mergeDistinct(tries); + } + + @Override + public boolean isClean() + { + for (MemtableShard shard : shards) + if (!shard.isEmpty()) + return false; + return true; + } + + @Override + public void discard() + { + super.discard(); + // metrics here are not thread safe, but I think we can live with that + metrics.lastFlushShardDataSizes.reset(); + for (MemtableShard shard : shards) + { + metrics.lastFlushShardDataSizes.update(shard.liveDataSize()); + } + for (MemtableShard shard : shards) + { + shard.data.discardBuffers(); + } + } + + /** + * Should only be called by ColumnFamilyStore.apply via Keyspace.apply, which supplies the appropriate + * OpOrdering. + * + * commitLogSegmentPosition should only be null if this is a secondary index, in which case it is *expected* to be null + */ + @Override + public long put(PartitionUpdate update, UpdateTransaction indexer, OpOrder.Group opGroup) + { + DecoratedKey key = update.partitionKey(); + MemtableShard shard = shards[boundaries.getShardForKey(key)]; + long colUpdateTimeDelta = shard.put(key, update, indexer, opGroup); + + if (shard.data.reachedAllocatedSizeThreshold() && !switchRequested.getAndSet(true)) + { + logger.info("Scheduling flush due to trie size limit reached."); + owner.signalFlushRequired(this, ColumnFamilyStore.FlushReason.MEMTABLE_LIMIT); + } + + return colUpdateTimeDelta; + } + + /** + * Technically we should scatter gather on all the core threads because the size in following calls are not + * using volatile variables, but for metrics purpose this should be good enough. + */ + @Override + public long getLiveDataSize() + { + long total = 0L; + for (MemtableShard shard : shards) + total += shard.liveDataSize(); + return total; + } + + @Override + public long operationCount() + { + long total = 0L; + for (MemtableShard shard : shards) + total += shard.currentOperations(); + return total; + } + + @Override + public long partitionCount() + { + int total = 0; + for (MemtableShard shard : shards) + total += shard.size(); + return total; + } + + @Override + public long getMinTimestamp() + { + long min = Long.MAX_VALUE; + for (MemtableShard shard : shards) + min = Long.min(min, shard.minTimestamp()); + return min; + } + + @Override + public int getMinLocalDeletionTime() + { + int min = Integer.MAX_VALUE; + for (MemtableShard shard : shards) + min = Integer.min(min, shard.minLocalDeletionTime()); + return min; + } + + @Override + RegularAndStaticColumns columns() + { + for (MemtableShard shard : shards) + columnsCollector.update(shard.columnsCollector); + return columnsCollector.get(); + } + + @Override + EncodingStats encodingStats() + { + for (MemtableShard shard : shards) + statsCollector.update(shard.statsCollector.get()); + return statsCollector.get(); + } + + @Override + public MemtableUnfilteredPartitionIterator partitionIterator(final ColumnFilter columnFilter, + final DataRange dataRange, + SSTableReadsListener readsListener) + { + AbstractBounds<PartitionPosition> keyRange = dataRange.keyRange(); + + PartitionPosition left = keyRange.left; + PartitionPosition right = keyRange.right; + if (left.isMinimum()) + left = null; + if (right.isMinimum()) + right = null; + + boolean isBound = keyRange instanceof Bounds; + boolean includeStart = isBound || keyRange instanceof IncludingExcludingBounds; + boolean includeStop = isBound || keyRange instanceof Range; + + Trie<BTreePartitionData> subMap = mergedTrie.subtrie(left, includeStart, right, includeStop); + + return new MemtableUnfilteredPartitionIterator(metadata(), + allocator.ensureOnHeap(), + subMap, + columnFilter, + dataRange); + // readsListener is ignored as it only accepts sstable signals + } + + private Partition getPartition(DecoratedKey key) + { + int shardIndex = boundaries.getShardForKey(key); + BTreePartitionData data = shards[shardIndex].data.get(key); + if (data != null) + return createPartition(metadata(), allocator.ensureOnHeap(), key, data); + else + return null; + } + + @Override + public UnfilteredRowIterator rowIterator(DecoratedKey key, Slices slices, ColumnFilter selectedColumns, boolean reversed, SSTableReadsListener listener) + { + Partition p = getPartition(key); + if (p == null) + return null; + else + return p.unfilteredIterator(selectedColumns, slices, reversed); + } + + @Override + public UnfilteredRowIterator rowIterator(DecoratedKey key) + { + Partition p = getPartition(key); + return p != null ? p.unfilteredIterator() : null; + } + + private static MemtablePartition createPartition(TableMetadata metadata, EnsureOnHeap ensureOnHeap, DecoratedKey key, BTreePartitionData data) + { + return new MemtablePartition(metadata, ensureOnHeap, key, data); + } + + private static MemtablePartition getPartitionFromTrieEntry(TableMetadata metadata, EnsureOnHeap ensureOnHeap, Map.Entry<ByteComparable, BTreePartitionData> en) + { + DecoratedKey key = BufferDecoratedKey.fromByteComparable(en.getKey(), + BYTE_COMPARABLE_VERSION, + metadata.partitioner); + return createPartition(metadata, ensureOnHeap, key, en.getValue()); + } + + + @Override + public FlushablePartitionSet<MemtablePartition> getFlushSet(PartitionPosition from, PartitionPosition to) + { + Trie<BTreePartitionData> toFlush = mergedTrie.subtrie(from, true, to, false); + long keySize = 0; + int keyCount = 0; + + for (Iterator<Map.Entry<ByteComparable, BTreePartitionData>> it = toFlush.entryIterator(); it.hasNext(); ) + { + Map.Entry<ByteComparable, BTreePartitionData> en = it.next(); + byte[] keyBytes = DecoratedKey.keyFromByteSource(ByteSource.peekable(en.getKey().asComparableBytes(BYTE_COMPARABLE_VERSION)), + BYTE_COMPARABLE_VERSION, + metadata().partitioner); + keySize += keyBytes.length; + keyCount++; + } + long partitionKeySize = keySize; + int partitionCount = keyCount; + + return new AbstractFlushablePartitionSet<MemtablePartition>() + { + public Memtable memtable() + { + return TrieMemtable.this; + } + + public PartitionPosition from() + { + return from; + } + + public PartitionPosition to() + { + return to; + } + + public long partitionCount() + { + return partitionCount; + } + + public Iterator<MemtablePartition> iterator() + { + return Iterators.transform(toFlush.entryIterator(), + // During flushing we are certain the memtable will remain at least until + // the flush completes. No copying to heap is necessary. + entry -> getPartitionFromTrieEntry(metadata(), EnsureOnHeap.NOOP, entry)); + } + + public long partitionKeysSize() + { + return partitionKeySize; + } + }; + } + + static class MemtableShard + { + // The following fields are volatile as we have to make sure that when we + // collect results from all sub-ranges, the thread accessing the value + // is guaranteed to see the changes to the values. + + // The smallest timestamp for all partitions stored in this shard + private volatile long minTimestamp = Long.MAX_VALUE; + + private volatile int minLocalDeletionTime = Integer.MAX_VALUE; + + private volatile long liveDataSize = 0; + + private volatile long currentOperations = 0; + + @Unmetered + private ReentrantLock writeLock = new ReentrantLock(); + + // Content map for the given shard. This is implemented as a memtable trie which uses the prefix-free + // byte-comparable ByteSource representations of the keys to address the partitions. + // + // This map is used in a single-producer, multi-consumer fashion: only one thread will insert items but + // several threads may read from it and iterate over it. Iterators are created when a the first item of + // a flow is requested for example, and then used asynchronously when sub-sequent items are requested. + // + // Therefore, iterators should not throw ConcurrentModificationExceptions if the underlying map is modified + // during iteration, they should provide a weakly consistent view of the map instead. + // + // Also, this data is backed by memtable memory, when accessing it callers must specify if it can be accessed + // unsafely, meaning that the memtable will not be discarded as long as the data is used, or whether the data + // should be copied on heap for off-heap allocators. + @VisibleForTesting + final MemtableTrie<BTreePartitionData> data; + + private final ColumnsCollector columnsCollector; + + private final StatsCollector statsCollector; + + @Unmetered // total pool size should not be included in memtable's deep size + private final MemtableAllocator allocator; + + @Unmetered + private final TrieMemtableMetricsView metrics; + + @VisibleForTesting + MemtableShard(TableMetadataRef metadata, MemtableAllocator allocator, TrieMemtableMetricsView metrics) + { + this.data = new MemtableTrie<>(BUFFER_TYPE); + this.columnsCollector = new AbstractMemtable.ColumnsCollector(metadata.get().regularAndStaticColumns()); + this.statsCollector = new AbstractMemtable.StatsCollector(); + this.allocator = allocator; + this.metrics = metrics; + } + + public long put(DecoratedKey key, PartitionUpdate update, UpdateTransaction indexer, OpOrder.Group opGroup) + { + BTreePartitionUpdater updater = new BTreePartitionUpdater(allocator, allocator.cloner(opGroup), opGroup, indexer); + boolean locked = writeLock.tryLock(); + if (locked) + { + metrics.uncontendedPuts.inc(); + } + else + { + metrics.contendedPuts.inc(); + long lockStartTime = Clock.Global.nanoTime(); + writeLock.lock(); + metrics.contentionTime.addNano(Clock.Global.nanoTime() - lockStartTime); + } + try + { + try + { + long onHeap = data.sizeOnHeap(); + long offHeap = data.sizeOffHeap(); + // Use the fast recursive put if we know the key is small enough to not cause a stack overflow. + try + { + data.putSingleton(key, + update, + updater::mergePartitions, + key.getKeyLength() < MAX_RECURSIVE_KEY_LENGTH); + } + catch (MemtableTrie.SpaceExhaustedException e) + { + // This should never really happen as a flush would be triggered long before this limit is reached. + throw Throwables.propagate(e); + } + allocator.offHeap().adjust(data.sizeOffHeap() - offHeap, opGroup); + allocator.onHeap().adjust(data.sizeOnHeap() - onHeap, opGroup); + } + finally + { + updateMinTimestamp(update.stats().minTimestamp); + updateMinLocalDeletionTime(update.stats().minLocalDeletionTime); + updateLiveDataSize(updater.dataSize); + updateCurrentOperations(update.operationCount()); + + // TODO: lambov 2021-03-30: check if stats are further optimisable + columnsCollector.update(update.columns()); + statsCollector.update(update.stats()); + } + } + finally + { + writeLock.unlock(); + } + return updater.colUpdateTimeDelta; + } + + public boolean isEmpty() + { + return data.isEmpty(); + } + + private void updateMinTimestamp(long timestamp) + { + if (timestamp < minTimestamp) + minTimestamp = timestamp; + } + + private void updateMinLocalDeletionTime(int delTime) + { + if (delTime < minLocalDeletionTime) + minLocalDeletionTime = delTime; + } + + void updateLiveDataSize(long size) + { + liveDataSize = liveDataSize + size; + } + + private void updateCurrentOperations(long op) + { + currentOperations = currentOperations + op; Review Comment: Is it ok to update the volatile `currentOperations` this way? ########## src/java/org/apache/cassandra/db/memtable/TrieMemtable.java: ########## @@ -0,0 +1,793 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.memtable; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Throwables; +import com.google.common.collect.Iterators; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.BufferDecoratedKey; +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DataRange; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.DeletionInfo; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.db.RegularAndStaticColumns; +import org.apache.cassandra.db.Slices; +import org.apache.cassandra.db.commitlog.CommitLogPosition; +import org.apache.cassandra.db.filter.ClusteringIndexFilter; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator; +import org.apache.cassandra.db.partitions.BTreePartitionData; +import org.apache.cassandra.db.partitions.BTreePartitionUpdater; +import org.apache.cassandra.db.partitions.ImmutableBTreePartition; +import org.apache.cassandra.db.partitions.Partition; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.EncodingStats; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.tries.MemtableTrie; +import org.apache.cassandra.db.tries.Trie; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.Bounds; +import org.apache.cassandra.dht.IncludingExcludingBounds; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.index.transactions.UpdateTransaction; +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.sstable.format.SSTableReadsListener; +import org.apache.cassandra.metrics.TableMetrics; +import org.apache.cassandra.metrics.TrieMemtableMetricsView; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.TableMetadataRef; +import org.apache.cassandra.utils.Clock; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MBeanWrapper; +import org.apache.cassandra.utils.bytecomparable.ByteComparable; +import org.apache.cassandra.utils.bytecomparable.ByteSource; +import org.apache.cassandra.utils.concurrent.OpOrder; +import org.apache.cassandra.utils.memory.EnsureOnHeap; +import org.apache.cassandra.utils.memory.MemtableAllocator; +import org.github.jamm.Unmetered; + +public class TrieMemtable extends AbstractAllocatorMemtable +{ + private static final Logger logger = LoggerFactory.getLogger(TrieMemtable.class); + public static final String TRIE_MEMTABLE_CONFIG_OBJECT_NAME = "org.apache.cassandra.db:type=TrieMemtableConfig"; + + /** Buffer type to use for memtable tries (on- vs off-heap) */ + public static final BufferType BUFFER_TYPE; + + public static final String SHARDS_OPTION = "shards"; + + static + { + switch (DatabaseDescriptor.getMemtableAllocationType()) + { + case unslabbed_heap_buffers: + case heap_buffers: + BUFFER_TYPE = BufferType.ON_HEAP; + break; + case offheap_buffers: + case offheap_objects: + BUFFER_TYPE = BufferType.OFF_HEAP; + break; + default: + throw new AssertionError(); + } + + MBeanWrapper.instance.registerMBean(new TrieMemtableConfig(), TRIE_MEMTABLE_CONFIG_OBJECT_NAME, MBeanWrapper.OnException.LOG); + } + + /** If keys is below this length, we will use a recursive procedure for inserting data in the memtable trie. */ + @VisibleForTesting + public static final int MAX_RECURSIVE_KEY_LENGTH = 128; + + /** The byte-ordering conversion version to use for memtables. */ + public static final ByteComparable.Version BYTE_COMPARABLE_VERSION = ByteComparable.Version.OSS42; + + // Set to true when the memtable requests a switch (e.g. for trie size limit being reached) to ensure only one + // thread calls cfs.switchMemtableIfCurrent. + private AtomicBoolean switchRequested = new AtomicBoolean(false); + + + // The boundaries for the keyspace as they were calculated when the memtable is created. + // The boundaries will be NONE for system keyspaces or if StorageService is not yet initialized. + // The fact this is fixed for the duration of the memtable lifetime, guarantees we'll always pick the same core + // for the a given key, even if we race with the StorageService initialization or with topology changes. + @Unmetered + private final ShardBoundaries boundaries; + + /** + * Core-specific memtable regions. All writes must go through the specific core. The data structures used + * are concurrent-read safe, thus reads can be carried out from any thread. + */ + private final MemtableShard[] shards; + + /** + * A merged view of the memtable map. Used for partition range queries and flush. + * For efficiency we serve single partition requests off the shard which offers more direct MemtableTrie methods. + */ + private final Trie<BTreePartitionData> mergedTrie; + + @Unmetered + private final TrieMemtableMetricsView metrics; + + @VisibleForTesting + public static final String SHARD_COUNT_PROPERTY = "cassandra.trie.memtable.shard.count"; + + // default shard count, used when a specific number of shards is not specified in the options + private static volatile int SHARD_COUNT = Integer.getInteger(SHARD_COUNT_PROPERTY, FBUtilities.getAvailableProcessors()); + + // only to be used by init(), to setup the very first memtable for the cfs + TrieMemtable(AtomicReference<CommitLogPosition> commitLogLowerBound, TableMetadataRef metadataRef, Owner owner, Integer shardCountOption) + { + super(commitLogLowerBound, metadataRef, owner); + int shardCount = shardCountOption != null ? shardCountOption : getShardCount(); + this.boundaries = owner.localRangeSplits(shardCount); + this.metrics = new TrieMemtableMetricsView(metadataRef.keyspace, metadataRef.name); + this.shards = generatePartitionShards(boundaries.shardCount(), allocator, metadataRef, metrics); + this.mergedTrie = makeMergedTrie(shards); + } + + private static MemtableShard[] generatePartitionShards(int splits, + MemtableAllocator allocator, + TableMetadataRef metadata, + TrieMemtableMetricsView metrics) + { + MemtableShard[] partitionMapContainer = new MemtableShard[splits]; + for (int i = 0; i < splits; i++) + partitionMapContainer[i] = new MemtableShard(metadata, allocator, metrics); + + return partitionMapContainer; + } + + private static Trie<BTreePartitionData> makeMergedTrie(MemtableShard[] shards) + { + List<Trie<BTreePartitionData>> tries = new ArrayList<>(shards.length); + for (MemtableShard shard : shards) + tries.add(shard.data); + return Trie.mergeDistinct(tries); + } + + @Override + public boolean isClean() + { + for (MemtableShard shard : shards) + if (!shard.isEmpty()) + return false; + return true; + } + + @Override + public void discard() + { + super.discard(); + // metrics here are not thread safe, but I think we can live with that + metrics.lastFlushShardDataSizes.reset(); + for (MemtableShard shard : shards) + { + metrics.lastFlushShardDataSizes.update(shard.liveDataSize()); + } + for (MemtableShard shard : shards) + { + shard.data.discardBuffers(); + } + } + + /** + * Should only be called by ColumnFamilyStore.apply via Keyspace.apply, which supplies the appropriate + * OpOrdering. + * + * commitLogSegmentPosition should only be null if this is a secondary index, in which case it is *expected* to be null + */ + @Override + public long put(PartitionUpdate update, UpdateTransaction indexer, OpOrder.Group opGroup) + { + DecoratedKey key = update.partitionKey(); + MemtableShard shard = shards[boundaries.getShardForKey(key)]; + long colUpdateTimeDelta = shard.put(key, update, indexer, opGroup); + + if (shard.data.reachedAllocatedSizeThreshold() && !switchRequested.getAndSet(true)) + { + logger.info("Scheduling flush due to trie size limit reached."); + owner.signalFlushRequired(this, ColumnFamilyStore.FlushReason.MEMTABLE_LIMIT); + } + + return colUpdateTimeDelta; + } + + /** + * Technically we should scatter gather on all the core threads because the size in following calls are not + * using volatile variables, but for metrics purpose this should be good enough. + */ + @Override + public long getLiveDataSize() + { + long total = 0L; + for (MemtableShard shard : shards) + total += shard.liveDataSize(); + return total; + } + + @Override + public long operationCount() + { + long total = 0L; + for (MemtableShard shard : shards) + total += shard.currentOperations(); + return total; + } + + @Override + public long partitionCount() + { + int total = 0; + for (MemtableShard shard : shards) + total += shard.size(); + return total; + } + + @Override + public long getMinTimestamp() + { + long min = Long.MAX_VALUE; + for (MemtableShard shard : shards) + min = Long.min(min, shard.minTimestamp()); + return min; + } + + @Override + public int getMinLocalDeletionTime() + { + int min = Integer.MAX_VALUE; + for (MemtableShard shard : shards) + min = Integer.min(min, shard.minLocalDeletionTime()); + return min; + } + + @Override + RegularAndStaticColumns columns() + { + for (MemtableShard shard : shards) + columnsCollector.update(shard.columnsCollector); + return columnsCollector.get(); + } + + @Override + EncodingStats encodingStats() + { + for (MemtableShard shard : shards) + statsCollector.update(shard.statsCollector.get()); + return statsCollector.get(); + } + + @Override + public MemtableUnfilteredPartitionIterator partitionIterator(final ColumnFilter columnFilter, + final DataRange dataRange, + SSTableReadsListener readsListener) + { + AbstractBounds<PartitionPosition> keyRange = dataRange.keyRange(); + + PartitionPosition left = keyRange.left; + PartitionPosition right = keyRange.right; + if (left.isMinimum()) + left = null; + if (right.isMinimum()) + right = null; + + boolean isBound = keyRange instanceof Bounds; + boolean includeStart = isBound || keyRange instanceof IncludingExcludingBounds; + boolean includeStop = isBound || keyRange instanceof Range; + + Trie<BTreePartitionData> subMap = mergedTrie.subtrie(left, includeStart, right, includeStop); + + return new MemtableUnfilteredPartitionIterator(metadata(), + allocator.ensureOnHeap(), + subMap, + columnFilter, + dataRange); + // readsListener is ignored as it only accepts sstable signals + } + + private Partition getPartition(DecoratedKey key) + { + int shardIndex = boundaries.getShardForKey(key); + BTreePartitionData data = shards[shardIndex].data.get(key); + if (data != null) + return createPartition(metadata(), allocator.ensureOnHeap(), key, data); + else + return null; + } + + @Override + public UnfilteredRowIterator rowIterator(DecoratedKey key, Slices slices, ColumnFilter selectedColumns, boolean reversed, SSTableReadsListener listener) + { + Partition p = getPartition(key); + if (p == null) + return null; + else + return p.unfilteredIterator(selectedColumns, slices, reversed); + } + + @Override + public UnfilteredRowIterator rowIterator(DecoratedKey key) + { + Partition p = getPartition(key); + return p != null ? p.unfilteredIterator() : null; + } + + private static MemtablePartition createPartition(TableMetadata metadata, EnsureOnHeap ensureOnHeap, DecoratedKey key, BTreePartitionData data) + { + return new MemtablePartition(metadata, ensureOnHeap, key, data); + } + + private static MemtablePartition getPartitionFromTrieEntry(TableMetadata metadata, EnsureOnHeap ensureOnHeap, Map.Entry<ByteComparable, BTreePartitionData> en) + { + DecoratedKey key = BufferDecoratedKey.fromByteComparable(en.getKey(), + BYTE_COMPARABLE_VERSION, + metadata.partitioner); + return createPartition(metadata, ensureOnHeap, key, en.getValue()); + } + + + @Override + public FlushablePartitionSet<MemtablePartition> getFlushSet(PartitionPosition from, PartitionPosition to) + { + Trie<BTreePartitionData> toFlush = mergedTrie.subtrie(from, true, to, false); + long keySize = 0; + int keyCount = 0; + + for (Iterator<Map.Entry<ByteComparable, BTreePartitionData>> it = toFlush.entryIterator(); it.hasNext(); ) + { + Map.Entry<ByteComparable, BTreePartitionData> en = it.next(); + byte[] keyBytes = DecoratedKey.keyFromByteSource(ByteSource.peekable(en.getKey().asComparableBytes(BYTE_COMPARABLE_VERSION)), + BYTE_COMPARABLE_VERSION, + metadata().partitioner); + keySize += keyBytes.length; + keyCount++; + } + long partitionKeySize = keySize; + int partitionCount = keyCount; + + return new AbstractFlushablePartitionSet<MemtablePartition>() + { + public Memtable memtable() + { + return TrieMemtable.this; + } + + public PartitionPosition from() + { + return from; + } + + public PartitionPosition to() + { + return to; + } + + public long partitionCount() + { + return partitionCount; + } + + public Iterator<MemtablePartition> iterator() + { + return Iterators.transform(toFlush.entryIterator(), + // During flushing we are certain the memtable will remain at least until + // the flush completes. No copying to heap is necessary. + entry -> getPartitionFromTrieEntry(metadata(), EnsureOnHeap.NOOP, entry)); + } + + public long partitionKeysSize() + { + return partitionKeySize; + } + }; + } + + static class MemtableShard + { + // The following fields are volatile as we have to make sure that when we + // collect results from all sub-ranges, the thread accessing the value + // is guaranteed to see the changes to the values. + + // The smallest timestamp for all partitions stored in this shard + private volatile long minTimestamp = Long.MAX_VALUE; + + private volatile int minLocalDeletionTime = Integer.MAX_VALUE; + + private volatile long liveDataSize = 0; + + private volatile long currentOperations = 0; + + @Unmetered + private ReentrantLock writeLock = new ReentrantLock(); + + // Content map for the given shard. This is implemented as a memtable trie which uses the prefix-free + // byte-comparable ByteSource representations of the keys to address the partitions. + // + // This map is used in a single-producer, multi-consumer fashion: only one thread will insert items but + // several threads may read from it and iterate over it. Iterators are created when a the first item of + // a flow is requested for example, and then used asynchronously when sub-sequent items are requested. + // + // Therefore, iterators should not throw ConcurrentModificationExceptions if the underlying map is modified + // during iteration, they should provide a weakly consistent view of the map instead. + // + // Also, this data is backed by memtable memory, when accessing it callers must specify if it can be accessed + // unsafely, meaning that the memtable will not be discarded as long as the data is used, or whether the data + // should be copied on heap for off-heap allocators. + @VisibleForTesting + final MemtableTrie<BTreePartitionData> data; + + private final ColumnsCollector columnsCollector; + + private final StatsCollector statsCollector; + + @Unmetered // total pool size should not be included in memtable's deep size + private final MemtableAllocator allocator; + + @Unmetered + private final TrieMemtableMetricsView metrics; + + @VisibleForTesting + MemtableShard(TableMetadataRef metadata, MemtableAllocator allocator, TrieMemtableMetricsView metrics) + { + this.data = new MemtableTrie<>(BUFFER_TYPE); + this.columnsCollector = new AbstractMemtable.ColumnsCollector(metadata.get().regularAndStaticColumns()); + this.statsCollector = new AbstractMemtable.StatsCollector(); + this.allocator = allocator; + this.metrics = metrics; + } + + public long put(DecoratedKey key, PartitionUpdate update, UpdateTransaction indexer, OpOrder.Group opGroup) + { + BTreePartitionUpdater updater = new BTreePartitionUpdater(allocator, allocator.cloner(opGroup), opGroup, indexer); + boolean locked = writeLock.tryLock(); + if (locked) + { + metrics.uncontendedPuts.inc(); + } + else + { + metrics.contendedPuts.inc(); + long lockStartTime = Clock.Global.nanoTime(); + writeLock.lock(); + metrics.contentionTime.addNano(Clock.Global.nanoTime() - lockStartTime); + } + try + { + try + { + long onHeap = data.sizeOnHeap(); + long offHeap = data.sizeOffHeap(); + // Use the fast recursive put if we know the key is small enough to not cause a stack overflow. + try + { + data.putSingleton(key, + update, + updater::mergePartitions, + key.getKeyLength() < MAX_RECURSIVE_KEY_LENGTH); + } + catch (MemtableTrie.SpaceExhaustedException e) + { + // This should never really happen as a flush would be triggered long before this limit is reached. + throw Throwables.propagate(e); Review Comment: `Throwables.propagate` is deprecated, info [here](https://github.com/google/guava/wiki/Why-we-deprecated-Throwables.propagate). We should probably just do `throw new RuntimeException(e)`. ########## test/unit/org/apache/cassandra/db/tries/MemtableTrieTestBase.java: ########## @@ -0,0 +1,630 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.tries; + +import java.nio.ByteBuffer; +import java.util.*; +import java.util.function.Function; +import java.util.stream.Stream; + +import com.google.common.base.Throwables; +import com.google.common.collect.HashMultiset; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Multiset; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.bytecomparable.ByteComparable; +import org.apache.cassandra.utils.ObjectSizes; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public abstract class MemtableTrieTestBase +{ + // Set this to true (in combination with smaller count) to dump the tries while debugging a problem. + // Do not commit the code with VERBOSE = true. + private static final boolean VERBOSE = false; + + private static final int COUNT = 100000; + private static final int KEY_CHOICE = 25; + private static final int MIN_LENGTH = 10; + private static final int MAX_LENGTH = 50; + + Random rand = new Random(); + + static final ByteComparable.Version VERSION = MemtableTrie.BYTE_COMPARABLE_VERSION; + + abstract boolean usePut(); + + @Test + public void testSingle() + { + ByteComparable e = ByteComparable.of("test"); + MemtableTrie<String> trie = new MemtableTrie<>(BufferType.OFF_HEAP); + putSimpleResolve(trie, e, "test", (x, y) -> y); + System.out.println("Trie " + trie.dump()); + assertEquals("test", trie.get(e)); + assertEquals(null, trie.get(ByteComparable.of("teste"))); + } + + @Test + public void testSplitMulti() + { + testEntries(new String[] { "testing", "tests", "trials", "trial", "aaaa", "aaaab", "abdddd", "abeeee" }); + } + + @Test + public void testSplitMultiBug() + { + testEntriesHex(new String[] { "0c4143aeff", "0c4143ae69ff" }); + } + + + @Test + public void testSparse00bug() + { + String[] tests = new String[] { + "40bd256e6fd2adafc44033303000", + "40bdd47ec043641f2b403131323400", + "40bd00bf5ae8cf9d1d403133323800", + }; + MemtableTrie<String> trie = new MemtableTrie<>(BufferType.OFF_HEAP); + for (String test : tests) + { + ByteComparable e = ByteComparable.fixedLength(ByteBufferUtil.hexToBytes(test)); + System.out.println("Adding " + asString(e) + ": " + test); + putSimpleResolve(trie, e, test, (x, y) -> y); + } + + System.out.println(trie.dump()); + + for (String test : tests) + assertEquals(test, trie.get(ByteComparable.fixedLength(ByteBufferUtil.hexToBytes(test)))); + + Arrays.sort(tests); + + int idx = 0; + for (String s : trie.values()) + { + if (s != tests[idx]) + throw new AssertionError("" + s + "!=" + tests[idx]); + ++idx; + } + assertEquals(tests.length, idx); + } + + @Test + public void testUpdateContent() + { + String[] tests = new String[] {"testing", "tests", "trials", "trial", "testing", "trial", "trial"}; + String[] values = new String[] {"testing", "tests", "trials", "trial", "t2", "x2", "y2"}; + MemtableTrie<String> trie = new MemtableTrie<>(BufferType.OFF_HEAP); + for (int i = 0; i < tests.length; ++i) + { + String test = tests[i]; + String v = values[i]; + ByteComparable e = ByteComparable.of(test); + System.out.println("Adding " + asString(e) + ": " + v); + putSimpleResolve(trie, e, v, (x, y) -> "" + x + y); + System.out.println("Trie " + trie.dump()); + } + + for (int i = 0; i < tests.length; ++i) + { + String test = tests[i]; + assertEquals(Stream.iterate(0, x -> x + 1) + .limit(tests.length) + .filter(x -> tests[x] == test) + .map(x -> values[x]) + .reduce("", (x, y) -> "" + x + y), + trie.get(ByteComparable.of(test))); + } + } + + static class SpecStackEntry + { + Object[] children; + int curChild; + Object content; + SpecStackEntry parent; + + public SpecStackEntry(Object[] spec, Object content, SpecStackEntry parent) + { + this.children = spec; + this.content = content; + this.parent = parent; + this.curChild = -1; + } + } + + public static class CursorFromSpec implements Trie.Cursor<ByteBuffer> + { + SpecStackEntry stack; + int depth; + + CursorFromSpec(Object[] spec) + { + stack = new SpecStackEntry(spec, null, null); + depth = 0; + } + + public int advance() + { + SpecStackEntry current = stack; + while (current != null && ++current.curChild >= current.children.length) + { + current = current.parent; + --depth; + } + if (current == null) + { + assert depth == -1; + return depth; + } + + Object child = current.children[current.curChild]; + if (child instanceof Object[]) + stack = new SpecStackEntry((Object[]) child, null, current); + else + stack = new SpecStackEntry(new Object[0], child, current); + + return ++depth; + } + + public int advanceMultiple() + { + if (++stack.curChild >= stack.children.length) + return skipChildren(); + + Object child = stack.children[stack.curChild]; + while (child instanceof Object[]) + { + stack = new SpecStackEntry((Object[]) child, null, stack); + ++depth; + if (stack.children.length == 0) + return depth; + child = stack.children[0]; + } + stack = new SpecStackEntry(new Object[0], child, stack); + + + return ++depth; + } + + public int skipChildren() + { + --depth; + stack = stack.parent; + return advance(); + } + + public int depth() + { + return depth; + } + + public ByteBuffer content() + { + return (ByteBuffer) stack.content; + } + + public int incomingTransition() + { + SpecStackEntry parent = stack.parent; + return parent != null ? parent.curChild + 0x30 : -1; + } + } + + static Trie<ByteBuffer> specifiedTrie(Object[] nodeDef) + { + return new Trie<ByteBuffer>() + { + @Override + protected Cursor<ByteBuffer> cursor() + { + return new CursorFromSpec(nodeDef); + } + }; + } + + @Test + public void testEntriesNullChildBug() + { + Object[] trieDef = new Object[] + { + new Object[] { // 0 + ByteBufferUtil.bytes(1), // 01 + ByteBufferUtil.bytes(2) // 02 + }, + // If requestChild returns null, bad things can happen (DB-2982) + null, // 1 + ByteBufferUtil.bytes(3), // 2 + new Object[] { // 3 + ByteBufferUtil.bytes(4), // 30 + // Also try null on the Remaining.ONE path + null // 31 + }, + ByteBufferUtil.bytes(5), // 4 + // Also test requestUniqueDescendant returning null + new Object[] { // 5 + new Object[] { // 50 + new Object[] { // 500 + null // 5000 + } + } + }, + ByteBufferUtil.bytes(6) // 6 + }; + + SortedMap<ByteComparable, ByteBuffer> expected = new TreeMap<>((bytes1, bytes2) -> ByteComparable.compare(bytes1, bytes2, VERSION)); + expected.put(comparable("00"), ByteBufferUtil.bytes(1)); + expected.put(comparable("01"), ByteBufferUtil.bytes(2)); + expected.put(comparable("2"), ByteBufferUtil.bytes(3)); + expected.put(comparable("30"), ByteBufferUtil.bytes(4)); + expected.put(comparable("4"), ByteBufferUtil.bytes(5)); + expected.put(comparable("6"), ByteBufferUtil.bytes(6)); + + Trie<ByteBuffer> trie = specifiedTrie(trieDef); + System.out.println(trie.dump()); + assertSameContent(trie, expected); + } + + static ByteComparable comparable(String s) + { + ByteBuffer b = ByteBufferUtil.bytes(s); + return ByteComparable.fixedLength(b); + } + + @Test + public void testDirect() + { + ByteComparable[] src = generateKeys(rand, COUNT); + SortedMap<ByteComparable, ByteBuffer> content = new TreeMap<>((bytes1, bytes2) -> ByteComparable.compare(bytes1, bytes2, VERSION)); + MemtableTrie<ByteBuffer> trie = makeMemtableTrie(src, content, usePut()); + int keysize = Arrays.stream(src) + .mapToInt(src1 -> ByteComparable.length(src1, VERSION)) + .sum(); + long ts = ObjectSizes.measureDeep(content); + long onh = ObjectSizes.measureDeep(trie.contentArrays); + System.out.format("Trie size on heap %,d off heap %,d measured %,d keys %,d treemap %,d\n", + trie.sizeOnHeap(), trie.sizeOffHeap(), onh, keysize, ts); + System.out.format("per entry on heap %.2f off heap %.2f measured %.2f keys %.2f treemap %.2f\n", + trie.sizeOnHeap() * 1.0 / COUNT, trie.sizeOffHeap() * 1.0 / COUNT, onh * 1.0 / COUNT, keysize * 1.0 / COUNT, ts * 1.0 / COUNT); + if (VERBOSE) + System.out.println("Trie " + trie.dump(ByteBufferUtil::bytesToHex)); + + assertSameContent(trie, content); + checkGet(trie, content); + } + + @Test + public void testPrefixEvolution() + { + testEntries(new String[] { "testing", + "test", + "tests", + "tester", + "testers", + // test changing type with prefix + "types", + "types1", + "types", + "types2", + "types3", + "types4", + "types", + "types5", + "types6", + "types7", + "types8", + "types", + // test adding prefix to chain + "chain123", + "chain", + // test adding prefix to sparse + "sparse1", + "sparse2", + "sparse3", + "sparse", + // test adding prefix to split + "split1", + "split2", + "split3", + "split4", + "split5", + "split6", + "split7", + "split8", + "split" + }); + } + + @Test + public void testPrefixUnsafeMulti() + { + // Make sure prefixes on inside a multi aren't overwritten by embedded metadata node. + + testEntries(new String[] { "test89012345678901234567890", + "test8", + "test89", + "test890", + "test8901", + "test89012", + "test890123", + "test8901234", + }); + } + + private void testEntries(String[] tests) + { + for (Function<String, ByteComparable> mapping : + ImmutableList.<Function<String, ByteComparable>>of(ByteComparable::of, + s -> ByteComparable.fixedLength(s.getBytes()))) + { + testEntries(tests, mapping); + } + } + + private void testEntriesHex(String[] tests) + { + testEntries(tests, s -> ByteComparable.fixedLength(ByteBufferUtil.hexToBytes(s))); + // Run the other translations just in case. + testEntries(tests); + } + + private void testEntries(String[] tests, Function<String, ByteComparable> mapping) + + { + MemtableTrie<String> trie = new MemtableTrie<>(BufferType.OFF_HEAP); + for (String test : tests) + { + ByteComparable e = mapping.apply(test); + System.out.println("Adding " + asString(e) + ": " + test); + putSimpleResolve(trie, e, test, (x, y) -> y); + System.out.println("Trie\n" + trie.dump()); + } + + for (String test : tests) + assertEquals(test, trie.get(mapping.apply(test))); + } + + static MemtableTrie<ByteBuffer> makeMemtableTrie(ByteComparable[] src, + Map<ByteComparable, ByteBuffer> content, + boolean usePut) + + { + MemtableTrie<ByteBuffer> trie = new MemtableTrie<>(BufferType.OFF_HEAP); + addToMemtableTrie(src, content, trie, usePut); + return trie; + } + + static void addToMemtableTrie(ByteComparable[] src, + Map<ByteComparable, ByteBuffer> content, + MemtableTrie<ByteBuffer> trie, + boolean usePut) + + { + for (ByteComparable b : src) + { + // Note: Because we don't ensure order when calling resolve, just use a hash of the key as payload + // (so that all sources have the same value). + int payload = asString(b).hashCode(); + ByteBuffer v = ByteBufferUtil.bytes(payload); + content.put(b, v); + if (VERBOSE) + System.out.println("Adding " + asString(b) + ": " + ByteBufferUtil.bytesToHex(v)); + putSimpleResolve(trie, b, v, (x, y) -> y, usePut); + if (VERBOSE) + System.out.println(trie.dump(ByteBufferUtil::bytesToHex)); + } + } + + static void checkGet(MemtableTrie<ByteBuffer> trie, Map<ByteComparable, ByteBuffer> items) + { + for (Map.Entry<ByteComparable, ByteBuffer> en : items.entrySet()) + { + assertEquals(en.getValue(), trie.get(en.getKey())); + } + } + + static void assertSameContent(Trie<ByteBuffer> trie, SortedMap<ByteComparable, ByteBuffer> map) + { + assertMapEquals(trie, map); + assertForEachEntryEquals(trie, map); + assertValuesEqual(trie, map); + assertForEachValueEquals(trie, map); + assertUnorderedValuesEqual(trie, map); + } + + private static void assertValuesEqual(Trie<ByteBuffer> trie, SortedMap<ByteComparable, ByteBuffer> map) + { + assertIterablesEqual(trie.values(), map.values()); + } + + private static void assertUnorderedValuesEqual(Trie<ByteBuffer> trie, SortedMap<ByteComparable, ByteBuffer> map) + { + Multiset<ByteBuffer> unordered = HashMultiset.create(); + StringBuilder errors = new StringBuilder(); + for (ByteBuffer b : trie.valuesUnordered()) + unordered.add(b); + + for (ByteBuffer b : map.values()) + if (!unordered.remove(b)) + errors.append("\nMissing value in valuesUnordered: " + ByteBufferUtil.bytesToHex(b)); + + for (ByteBuffer b : unordered) + errors.append("\nExtra value in valuesUnordered: " + ByteBufferUtil.bytesToHex(b)); + + assertEquals("", errors.toString()); + } + + private static void assertForEachEntryEquals(Trie<ByteBuffer> trie, SortedMap<ByteComparable, ByteBuffer> map) + { + Iterator<Map.Entry<ByteComparable, ByteBuffer>> it = map.entrySet().iterator(); + trie.forEachEntry((key, value) -> { + Assert.assertTrue("Map exhausted first, key " + asString(key), it.hasNext()); + Map.Entry<ByteComparable, ByteBuffer> entry = it.next(); + assertEquals(0, ByteComparable.compare(entry.getKey(), key, Trie.BYTE_COMPARABLE_VERSION)); + assertEquals(entry.getValue(), value); + }); + Assert.assertFalse("Trie exhausted first", it.hasNext()); + } + + private static void assertForEachValueEquals(Trie<ByteBuffer> trie, SortedMap<ByteComparable, ByteBuffer> map) + { + Iterator<ByteBuffer> it = map.values().iterator(); + trie.forEachValue(value -> { + Assert.assertTrue("Map exhausted first, value " + ByteBufferUtil.bytesToHex(value), it.hasNext()); + ByteBuffer entry = it.next(); + assertEquals(entry, value); + }); + Assert.assertFalse("Trie exhausted first", it.hasNext()); + } + + static void assertMapEquals(Trie<ByteBuffer> trie, SortedMap<ByteComparable, ByteBuffer> map) + { + assertMapEquals(trie.entrySet(), map.entrySet()); + } + + static void assertMapEquals(Iterable<Map.Entry<ByteComparable, ByteBuffer>> container1, + Iterable<Map.Entry<ByteComparable, ByteBuffer>> container2) + { + Iterator<Map.Entry<ByteComparable, ByteBuffer>> it1 = container1.iterator(); + Iterator<Map.Entry<ByteComparable, ByteBuffer>> it2 = container2.iterator(); + List<ByteComparable> failedAt = new ArrayList<>(); + StringBuilder b = new StringBuilder(); + while (it1.hasNext() && it2.hasNext()) + { + Map.Entry<ByteComparable, ByteBuffer> en1 = it1.next(); + Map.Entry<ByteComparable, ByteBuffer> en2 = it2.next(); + b.append(String.format("TreeSet %s:%s\n", asString(en2.getKey()), ByteBufferUtil.bytesToHex(en2.getValue()))); + b.append(String.format("Trie %s:%s\n", asString(en1.getKey()), ByteBufferUtil.bytesToHex(en1.getValue()))); + if (ByteComparable.compare(en1.getKey(), en2.getKey(), VERSION) != 0 || ByteBufferUtil.compareUnsigned(en1.getValue(), en2.getValue()) != 0) + failedAt.add(en1.getKey()); + } + while (it1.hasNext()) + { + Map.Entry<ByteComparable, ByteBuffer> en1 = it1.next(); + b.append(String.format("Trie %s:%s\n", asString(en1.getKey()), ByteBufferUtil.bytesToHex(en1.getValue()))); + failedAt.add(en1.getKey()); + } + while (it2.hasNext()) + { + Map.Entry<ByteComparable, ByteBuffer> en2 = it2.next(); + b.append(String.format("TreeSet %s:%s\n", asString(en2.getKey()), ByteBufferUtil.bytesToHex(en2.getValue()))); + failedAt.add(en2.getKey()); + } + if (!failedAt.isEmpty()) + { + String message = "Failed at " + Lists.transform(failedAt, MemtableTrieTestBase::asString); + System.err.println(message); + System.err.println(b); + Assert.fail(message); + } + } + + static <E extends Comparable<E>> void assertIterablesEqual(Iterable<E> expectedIterable, Iterable<E> actualIterable) + { + Iterator<E> expected = expectedIterable.iterator(); + Iterator<E> actual = actualIterable.iterator(); + while (actual.hasNext() && expected.hasNext()) + { + Assert.assertEquals(actual.next(), expected.next()); + } + if (expected.hasNext()) + Assert.fail("Remaing values in expected, starting with " + expected.next()); + else if (actual.hasNext()) + Assert.fail("Remaing values in actual, starting with " + actual.next()); + } + + static ByteComparable[] generateKeys(Random rand, int count) + { + ByteComparable[] sources = new ByteComparable[count]; + TreeSet<ByteComparable> added = new TreeSet<>((bytes1, bytes2) -> ByteComparable.compare(bytes1, bytes2, VERSION)); + for (int i = 0; i < count; ++i) + { + sources[i] = generateKey(rand); + if (!added.add(sources[i])) + --i; + } + + // note: not sorted! + return sources; + } + + static ByteComparable generateKey(Random rand) + { + return generateKey(rand, MIN_LENGTH, MAX_LENGTH); + } + + static ByteComparable generateKey(Random rand, int minLength, int maxLength) + { + int len = rand.nextInt(maxLength - minLength + 1) + minLength; + byte[] bytes = new byte[len]; + int p = 0; + int length = bytes.length; + while (p < length) + { + int seed = rand.nextInt(KEY_CHOICE); + Random r2 = new Random(seed); + int m = r2.nextInt(5) + 2 + p; + if (m > length) + m = length; + while (p < m) + bytes[p++] = (byte) r2.nextInt(256); + } + return ByteComparable.fixedLength(bytes); + } + + static String asString(ByteComparable bc) + { + return bc != null ? bc.byteComparableAsString(VERSION) : "null"; + } + + <T, M> void putSimpleResolve(MemtableTrie<T> trie, + ByteComparable key, + T value, + Trie.MergeResolver<T> resolver) + { + putSimpleResolve(trie, key, value, resolver, usePut()); + } + + static <T, M> void putSimpleResolve(MemtableTrie<T> trie, + ByteComparable key, + T value, + Trie.MergeResolver<T> resolver, + boolean usePut) + { + try + { + trie.putSingleton(key, + value, + (existing, update) -> existing != null ? resolver.resolve(existing, update) : update, + usePut); + } + catch (MemtableTrie.SpaceExhaustedException e) + { + throw Throwables.propagate(e); Review Comment: `Throwables.propagate` is deprecated, info [here](https://github.com/google/guava/wiki/Why-we-deprecated-Throwables.propagate). We should probably just do `throw new RuntimeException(e)`. ########## src/java/org/apache/cassandra/db/memtable/TrieMemtable.java: ########## @@ -0,0 +1,793 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.memtable; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Throwables; +import com.google.common.collect.Iterators; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.BufferDecoratedKey; +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DataRange; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.DeletionInfo; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.db.RegularAndStaticColumns; +import org.apache.cassandra.db.Slices; +import org.apache.cassandra.db.commitlog.CommitLogPosition; +import org.apache.cassandra.db.filter.ClusteringIndexFilter; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator; +import org.apache.cassandra.db.partitions.BTreePartitionData; +import org.apache.cassandra.db.partitions.BTreePartitionUpdater; +import org.apache.cassandra.db.partitions.ImmutableBTreePartition; +import org.apache.cassandra.db.partitions.Partition; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.EncodingStats; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.tries.MemtableTrie; +import org.apache.cassandra.db.tries.Trie; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.Bounds; +import org.apache.cassandra.dht.IncludingExcludingBounds; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.index.transactions.UpdateTransaction; +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.sstable.format.SSTableReadsListener; +import org.apache.cassandra.metrics.TableMetrics; +import org.apache.cassandra.metrics.TrieMemtableMetricsView; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.TableMetadataRef; +import org.apache.cassandra.utils.Clock; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MBeanWrapper; +import org.apache.cassandra.utils.bytecomparable.ByteComparable; +import org.apache.cassandra.utils.bytecomparable.ByteSource; +import org.apache.cassandra.utils.concurrent.OpOrder; +import org.apache.cassandra.utils.memory.EnsureOnHeap; +import org.apache.cassandra.utils.memory.MemtableAllocator; +import org.github.jamm.Unmetered; + +public class TrieMemtable extends AbstractAllocatorMemtable +{ + private static final Logger logger = LoggerFactory.getLogger(TrieMemtable.class); + public static final String TRIE_MEMTABLE_CONFIG_OBJECT_NAME = "org.apache.cassandra.db:type=TrieMemtableConfig"; + + /** Buffer type to use for memtable tries (on- vs off-heap) */ + public static final BufferType BUFFER_TYPE; + + public static final String SHARDS_OPTION = "shards"; + + static + { + switch (DatabaseDescriptor.getMemtableAllocationType()) + { + case unslabbed_heap_buffers: + case heap_buffers: + BUFFER_TYPE = BufferType.ON_HEAP; + break; + case offheap_buffers: + case offheap_objects: + BUFFER_TYPE = BufferType.OFF_HEAP; + break; + default: + throw new AssertionError(); + } + + MBeanWrapper.instance.registerMBean(new TrieMemtableConfig(), TRIE_MEMTABLE_CONFIG_OBJECT_NAME, MBeanWrapper.OnException.LOG); + } + + /** If keys is below this length, we will use a recursive procedure for inserting data in the memtable trie. */ + @VisibleForTesting + public static final int MAX_RECURSIVE_KEY_LENGTH = 128; + + /** The byte-ordering conversion version to use for memtables. */ + public static final ByteComparable.Version BYTE_COMPARABLE_VERSION = ByteComparable.Version.OSS42; + + // Set to true when the memtable requests a switch (e.g. for trie size limit being reached) to ensure only one + // thread calls cfs.switchMemtableIfCurrent. + private AtomicBoolean switchRequested = new AtomicBoolean(false); + + + // The boundaries for the keyspace as they were calculated when the memtable is created. + // The boundaries will be NONE for system keyspaces or if StorageService is not yet initialized. + // The fact this is fixed for the duration of the memtable lifetime, guarantees we'll always pick the same core + // for the a given key, even if we race with the StorageService initialization or with topology changes. + @Unmetered + private final ShardBoundaries boundaries; + + /** + * Core-specific memtable regions. All writes must go through the specific core. The data structures used + * are concurrent-read safe, thus reads can be carried out from any thread. + */ + private final MemtableShard[] shards; + + /** + * A merged view of the memtable map. Used for partition range queries and flush. + * For efficiency we serve single partition requests off the shard which offers more direct MemtableTrie methods. + */ + private final Trie<BTreePartitionData> mergedTrie; + + @Unmetered + private final TrieMemtableMetricsView metrics; + + @VisibleForTesting + public static final String SHARD_COUNT_PROPERTY = "cassandra.trie.memtable.shard.count"; + + // default shard count, used when a specific number of shards is not specified in the options + private static volatile int SHARD_COUNT = Integer.getInteger(SHARD_COUNT_PROPERTY, FBUtilities.getAvailableProcessors()); Review Comment: This isn't a constant, so we should probably name it `shardCount` or, even better, `defaultShardCount` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

