blambov commented on code in PR #1723: URL: https://github.com/apache/cassandra/pull/1723#discussion_r972075808
########## src/java/org/apache/cassandra/db/memtable/TrieMemtable.java: ########## @@ -0,0 +1,793 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.memtable; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Throwables; +import com.google.common.collect.Iterators; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.BufferDecoratedKey; +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DataRange; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.DeletionInfo; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.db.RegularAndStaticColumns; +import org.apache.cassandra.db.Slices; +import org.apache.cassandra.db.commitlog.CommitLogPosition; +import org.apache.cassandra.db.filter.ClusteringIndexFilter; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator; +import org.apache.cassandra.db.partitions.BTreePartitionData; +import org.apache.cassandra.db.partitions.BTreePartitionUpdater; +import org.apache.cassandra.db.partitions.ImmutableBTreePartition; +import org.apache.cassandra.db.partitions.Partition; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.EncodingStats; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.tries.MemtableTrie; +import org.apache.cassandra.db.tries.Trie; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.Bounds; +import org.apache.cassandra.dht.IncludingExcludingBounds; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.index.transactions.UpdateTransaction; +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.sstable.format.SSTableReadsListener; +import org.apache.cassandra.metrics.TableMetrics; +import org.apache.cassandra.metrics.TrieMemtableMetricsView; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.TableMetadataRef; +import org.apache.cassandra.utils.Clock; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MBeanWrapper; +import org.apache.cassandra.utils.bytecomparable.ByteComparable; +import org.apache.cassandra.utils.bytecomparable.ByteSource; +import org.apache.cassandra.utils.concurrent.OpOrder; +import org.apache.cassandra.utils.memory.EnsureOnHeap; +import org.apache.cassandra.utils.memory.MemtableAllocator; +import org.github.jamm.Unmetered; + +public class TrieMemtable extends AbstractAllocatorMemtable +{ + private static final Logger logger = LoggerFactory.getLogger(TrieMemtable.class); + public static final String TRIE_MEMTABLE_CONFIG_OBJECT_NAME = "org.apache.cassandra.db:type=TrieMemtableConfig"; + + /** Buffer type to use for memtable tries (on- vs off-heap) */ + public static final BufferType BUFFER_TYPE; + + public static final String SHARDS_OPTION = "shards"; + + static + { + switch (DatabaseDescriptor.getMemtableAllocationType()) + { + case unslabbed_heap_buffers: + case heap_buffers: + BUFFER_TYPE = BufferType.ON_HEAP; + break; + case offheap_buffers: + case offheap_objects: + BUFFER_TYPE = BufferType.OFF_HEAP; + break; + default: + throw new AssertionError(); + } + + MBeanWrapper.instance.registerMBean(new TrieMemtableConfig(), TRIE_MEMTABLE_CONFIG_OBJECT_NAME, MBeanWrapper.OnException.LOG); + } + + /** If keys is below this length, we will use a recursive procedure for inserting data in the memtable trie. */ + @VisibleForTesting + public static final int MAX_RECURSIVE_KEY_LENGTH = 128; + + /** The byte-ordering conversion version to use for memtables. */ + public static final ByteComparable.Version BYTE_COMPARABLE_VERSION = ByteComparable.Version.OSS42; + + // Set to true when the memtable requests a switch (e.g. for trie size limit being reached) to ensure only one + // thread calls cfs.switchMemtableIfCurrent. + private AtomicBoolean switchRequested = new AtomicBoolean(false); + + + // The boundaries for the keyspace as they were calculated when the memtable is created. + // The boundaries will be NONE for system keyspaces or if StorageService is not yet initialized. + // The fact this is fixed for the duration of the memtable lifetime, guarantees we'll always pick the same core + // for the a given key, even if we race with the StorageService initialization or with topology changes. + @Unmetered + private final ShardBoundaries boundaries; + + /** + * Core-specific memtable regions. All writes must go through the specific core. The data structures used + * are concurrent-read safe, thus reads can be carried out from any thread. + */ + private final MemtableShard[] shards; + + /** + * A merged view of the memtable map. Used for partition range queries and flush. + * For efficiency we serve single partition requests off the shard which offers more direct MemtableTrie methods. + */ + private final Trie<BTreePartitionData> mergedTrie; + + @Unmetered + private final TrieMemtableMetricsView metrics; + + @VisibleForTesting + public static final String SHARD_COUNT_PROPERTY = "cassandra.trie.memtable.shard.count"; + + // default shard count, used when a specific number of shards is not specified in the options + private static volatile int SHARD_COUNT = Integer.getInteger(SHARD_COUNT_PROPERTY, FBUtilities.getAvailableProcessors()); + + // only to be used by init(), to setup the very first memtable for the cfs + TrieMemtable(AtomicReference<CommitLogPosition> commitLogLowerBound, TableMetadataRef metadataRef, Owner owner, Integer shardCountOption) + { + super(commitLogLowerBound, metadataRef, owner); + int shardCount = shardCountOption != null ? shardCountOption : getShardCount(); + this.boundaries = owner.localRangeSplits(shardCount); + this.metrics = new TrieMemtableMetricsView(metadataRef.keyspace, metadataRef.name); + this.shards = generatePartitionShards(boundaries.shardCount(), allocator, metadataRef, metrics); + this.mergedTrie = makeMergedTrie(shards); + } + + private static MemtableShard[] generatePartitionShards(int splits, + MemtableAllocator allocator, + TableMetadataRef metadata, + TrieMemtableMetricsView metrics) + { + MemtableShard[] partitionMapContainer = new MemtableShard[splits]; + for (int i = 0; i < splits; i++) + partitionMapContainer[i] = new MemtableShard(metadata, allocator, metrics); + + return partitionMapContainer; + } + + private static Trie<BTreePartitionData> makeMergedTrie(MemtableShard[] shards) + { + List<Trie<BTreePartitionData>> tries = new ArrayList<>(shards.length); + for (MemtableShard shard : shards) + tries.add(shard.data); + return Trie.mergeDistinct(tries); + } + + @Override + public boolean isClean() + { + for (MemtableShard shard : shards) + if (!shard.isEmpty()) + return false; + return true; + } + + @Override + public void discard() + { + super.discard(); + // metrics here are not thread safe, but I think we can live with that + metrics.lastFlushShardDataSizes.reset(); + for (MemtableShard shard : shards) + { + metrics.lastFlushShardDataSizes.update(shard.liveDataSize()); + } + for (MemtableShard shard : shards) + { + shard.data.discardBuffers(); + } + } + + /** + * Should only be called by ColumnFamilyStore.apply via Keyspace.apply, which supplies the appropriate + * OpOrdering. + * + * commitLogSegmentPosition should only be null if this is a secondary index, in which case it is *expected* to be null + */ + @Override + public long put(PartitionUpdate update, UpdateTransaction indexer, OpOrder.Group opGroup) + { + DecoratedKey key = update.partitionKey(); + MemtableShard shard = shards[boundaries.getShardForKey(key)]; + long colUpdateTimeDelta = shard.put(key, update, indexer, opGroup); + + if (shard.data.reachedAllocatedSizeThreshold() && !switchRequested.getAndSet(true)) + { + logger.info("Scheduling flush due to trie size limit reached."); + owner.signalFlushRequired(this, ColumnFamilyStore.FlushReason.MEMTABLE_LIMIT); + } + + return colUpdateTimeDelta; + } + + /** + * Technically we should scatter gather on all the core threads because the size in following calls are not + * using volatile variables, but for metrics purpose this should be good enough. + */ + @Override + public long getLiveDataSize() + { + long total = 0L; + for (MemtableShard shard : shards) + total += shard.liveDataSize(); + return total; + } + + @Override + public long operationCount() + { + long total = 0L; + for (MemtableShard shard : shards) + total += shard.currentOperations(); + return total; + } + + @Override + public long partitionCount() + { + int total = 0; + for (MemtableShard shard : shards) + total += shard.size(); + return total; + } + + @Override + public long getMinTimestamp() + { + long min = Long.MAX_VALUE; + for (MemtableShard shard : shards) + min = Long.min(min, shard.minTimestamp()); + return min; + } + + @Override + public int getMinLocalDeletionTime() + { + int min = Integer.MAX_VALUE; + for (MemtableShard shard : shards) + min = Integer.min(min, shard.minLocalDeletionTime()); + return min; + } + + @Override + RegularAndStaticColumns columns() + { + for (MemtableShard shard : shards) + columnsCollector.update(shard.columnsCollector); + return columnsCollector.get(); + } + + @Override + EncodingStats encodingStats() + { + for (MemtableShard shard : shards) + statsCollector.update(shard.statsCollector.get()); + return statsCollector.get(); + } + + @Override + public MemtableUnfilteredPartitionIterator partitionIterator(final ColumnFilter columnFilter, + final DataRange dataRange, + SSTableReadsListener readsListener) + { + AbstractBounds<PartitionPosition> keyRange = dataRange.keyRange(); + + PartitionPosition left = keyRange.left; + PartitionPosition right = keyRange.right; + if (left.isMinimum()) + left = null; + if (right.isMinimum()) + right = null; + + boolean isBound = keyRange instanceof Bounds; + boolean includeStart = isBound || keyRange instanceof IncludingExcludingBounds; + boolean includeStop = isBound || keyRange instanceof Range; + + Trie<BTreePartitionData> subMap = mergedTrie.subtrie(left, includeStart, right, includeStop); + + return new MemtableUnfilteredPartitionIterator(metadata(), + allocator.ensureOnHeap(), + subMap, + columnFilter, + dataRange); + // readsListener is ignored as it only accepts sstable signals + } + + private Partition getPartition(DecoratedKey key) + { + int shardIndex = boundaries.getShardForKey(key); + BTreePartitionData data = shards[shardIndex].data.get(key); + if (data != null) + return createPartition(metadata(), allocator.ensureOnHeap(), key, data); + else + return null; + } + + @Override + public UnfilteredRowIterator rowIterator(DecoratedKey key, Slices slices, ColumnFilter selectedColumns, boolean reversed, SSTableReadsListener listener) + { + Partition p = getPartition(key); + if (p == null) + return null; + else + return p.unfilteredIterator(selectedColumns, slices, reversed); + } + + @Override + public UnfilteredRowIterator rowIterator(DecoratedKey key) + { + Partition p = getPartition(key); + return p != null ? p.unfilteredIterator() : null; + } + + private static MemtablePartition createPartition(TableMetadata metadata, EnsureOnHeap ensureOnHeap, DecoratedKey key, BTreePartitionData data) + { + return new MemtablePartition(metadata, ensureOnHeap, key, data); + } + + private static MemtablePartition getPartitionFromTrieEntry(TableMetadata metadata, EnsureOnHeap ensureOnHeap, Map.Entry<ByteComparable, BTreePartitionData> en) + { + DecoratedKey key = BufferDecoratedKey.fromByteComparable(en.getKey(), + BYTE_COMPARABLE_VERSION, + metadata.partitioner); + return createPartition(metadata, ensureOnHeap, key, en.getValue()); + } + + + @Override + public FlushablePartitionSet<MemtablePartition> getFlushSet(PartitionPosition from, PartitionPosition to) + { + Trie<BTreePartitionData> toFlush = mergedTrie.subtrie(from, true, to, false); + long keySize = 0; + int keyCount = 0; + + for (Iterator<Map.Entry<ByteComparable, BTreePartitionData>> it = toFlush.entryIterator(); it.hasNext(); ) + { + Map.Entry<ByteComparable, BTreePartitionData> en = it.next(); + byte[] keyBytes = DecoratedKey.keyFromByteSource(ByteSource.peekable(en.getKey().asComparableBytes(BYTE_COMPARABLE_VERSION)), + BYTE_COMPARABLE_VERSION, + metadata().partitioner); + keySize += keyBytes.length; + keyCount++; + } + long partitionKeySize = keySize; + int partitionCount = keyCount; + + return new AbstractFlushablePartitionSet<MemtablePartition>() + { + public Memtable memtable() + { + return TrieMemtable.this; + } + + public PartitionPosition from() + { + return from; + } + + public PartitionPosition to() + { + return to; + } + + public long partitionCount() + { + return partitionCount; + } + + public Iterator<MemtablePartition> iterator() + { + return Iterators.transform(toFlush.entryIterator(), + // During flushing we are certain the memtable will remain at least until + // the flush completes. No copying to heap is necessary. + entry -> getPartitionFromTrieEntry(metadata(), EnsureOnHeap.NOOP, entry)); + } + + public long partitionKeysSize() + { + return partitionKeySize; + } + }; + } + + static class MemtableShard + { + // The following fields are volatile as we have to make sure that when we + // collect results from all sub-ranges, the thread accessing the value + // is guaranteed to see the changes to the values. + + // The smallest timestamp for all partitions stored in this shard + private volatile long minTimestamp = Long.MAX_VALUE; + + private volatile int minLocalDeletionTime = Integer.MAX_VALUE; + + private volatile long liveDataSize = 0; + + private volatile long currentOperations = 0; + + @Unmetered + private ReentrantLock writeLock = new ReentrantLock(); + + // Content map for the given shard. This is implemented as a memtable trie which uses the prefix-free + // byte-comparable ByteSource representations of the keys to address the partitions. + // + // This map is used in a single-producer, multi-consumer fashion: only one thread will insert items but + // several threads may read from it and iterate over it. Iterators are created when a the first item of + // a flow is requested for example, and then used asynchronously when sub-sequent items are requested. + // + // Therefore, iterators should not throw ConcurrentModificationExceptions if the underlying map is modified + // during iteration, they should provide a weakly consistent view of the map instead. + // + // Also, this data is backed by memtable memory, when accessing it callers must specify if it can be accessed + // unsafely, meaning that the memtable will not be discarded as long as the data is used, or whether the data + // should be copied on heap for off-heap allocators. + @VisibleForTesting + final MemtableTrie<BTreePartitionData> data; + + private final ColumnsCollector columnsCollector; + + private final StatsCollector statsCollector; + + @Unmetered // total pool size should not be included in memtable's deep size + private final MemtableAllocator allocator; + + @Unmetered + private final TrieMemtableMetricsView metrics; + + @VisibleForTesting + MemtableShard(TableMetadataRef metadata, MemtableAllocator allocator, TrieMemtableMetricsView metrics) + { + this.data = new MemtableTrie<>(BUFFER_TYPE); + this.columnsCollector = new AbstractMemtable.ColumnsCollector(metadata.get().regularAndStaticColumns()); + this.statsCollector = new AbstractMemtable.StatsCollector(); + this.allocator = allocator; + this.metrics = metrics; + } + + public long put(DecoratedKey key, PartitionUpdate update, UpdateTransaction indexer, OpOrder.Group opGroup) + { + BTreePartitionUpdater updater = new BTreePartitionUpdater(allocator, allocator.cloner(opGroup), opGroup, indexer); + boolean locked = writeLock.tryLock(); + if (locked) + { + metrics.uncontendedPuts.inc(); + } + else + { + metrics.contendedPuts.inc(); + long lockStartTime = Clock.Global.nanoTime(); + writeLock.lock(); + metrics.contentionTime.addNano(Clock.Global.nanoTime() - lockStartTime); + } + try + { + try + { + long onHeap = data.sizeOnHeap(); + long offHeap = data.sizeOffHeap(); + // Use the fast recursive put if we know the key is small enough to not cause a stack overflow. + try + { + data.putSingleton(key, + update, + updater::mergePartitions, + key.getKeyLength() < MAX_RECURSIVE_KEY_LENGTH); + } + catch (MemtableTrie.SpaceExhaustedException e) + { + // This should never really happen as a flush would be triggered long before this limit is reached. + throw Throwables.propagate(e); + } + allocator.offHeap().adjust(data.sizeOffHeap() - offHeap, opGroup); + allocator.onHeap().adjust(data.sizeOnHeap() - onHeap, opGroup); + } + finally + { + updateMinTimestamp(update.stats().minTimestamp); + updateMinLocalDeletionTime(update.stats().minLocalDeletionTime); + updateLiveDataSize(updater.dataSize); + updateCurrentOperations(update.operationCount()); + + // TODO: lambov 2021-03-30: check if stats are further optimisable + columnsCollector.update(update.columns()); + statsCollector.update(update.stats()); + } + } + finally + { + writeLock.unlock(); + } + return updater.colUpdateTimeDelta; + } + + public boolean isEmpty() + { + return data.isEmpty(); + } + + private void updateMinTimestamp(long timestamp) + { + if (timestamp < minTimestamp) + minTimestamp = timestamp; + } + + private void updateMinLocalDeletionTime(int delTime) + { + if (delTime < minLocalDeletionTime) + minLocalDeletionTime = delTime; + } + + void updateLiveDataSize(long size) + { + liveDataSize = liveDataSize + size; + } + + private void updateCurrentOperations(long op) + { + currentOperations = currentOperations + op; + } + + public int size() + { + return data.valuesCount(); + } + + long minTimestamp() + { + return minTimestamp; + } + + long liveDataSize() + { + return liveDataSize; + } + + long currentOperations() + { + return currentOperations; + } + + int minLocalDeletionTime() + { + return minLocalDeletionTime; + } + } + + static class MemtableUnfilteredPartitionIterator extends AbstractUnfilteredPartitionIterator implements UnfilteredPartitionIterator + { + private final TableMetadata metadata; + private final EnsureOnHeap ensureOnHeap; + private final Trie<BTreePartitionData> source; + private final Iterator<Map.Entry<ByteComparable, BTreePartitionData>> iter; + private final ColumnFilter columnFilter; + private final DataRange dataRange; + + public MemtableUnfilteredPartitionIterator(TableMetadata metadata, + EnsureOnHeap ensureOnHeap, + Trie<BTreePartitionData> source, + ColumnFilter columnFilter, + DataRange dataRange) + { + this.metadata = metadata; + this.ensureOnHeap = ensureOnHeap; + this.iter = source.entryIterator(); + this.source = source; + this.columnFilter = columnFilter; + this.dataRange = dataRange; + } + + public TableMetadata metadata() + { + return metadata; + } + + public boolean hasNext() + { + return iter.hasNext(); + } + + public UnfilteredRowIterator next() + { + Partition partition = getPartitionFromTrieEntry(metadata(), ensureOnHeap, iter.next()); + DecoratedKey key = partition.partitionKey(); + ClusteringIndexFilter filter = dataRange.clusteringIndexFilter(key); + + return filter.getUnfilteredRowIterator(columnFilter, partition); + } + } + + static class MemtablePartition extends ImmutableBTreePartition + { + + private final EnsureOnHeap ensureOnHeap; + + private MemtablePartition(TableMetadata table, EnsureOnHeap ensureOnHeap, DecoratedKey key, BTreePartitionData data) + { + super(table, key, data); + this.ensureOnHeap = ensureOnHeap; + } + + @Override + protected boolean canHaveShadowedData() + { + // The BtreePartitionData we store in the memtable are build iteratively by BTreePartitionData.add(), which + // doesn't make sure there isn't shadowed data, so we'll need to eliminate any. + return true; + } + + + @Override + public DeletionInfo deletionInfo() + { + return ensureOnHeap.applyToDeletionInfo(super.deletionInfo()); + } + + @Override + public Row staticRow() + { + return ensureOnHeap.applyToStatic(super.staticRow()); + } + + @Override + public DecoratedKey partitionKey() + { + return ensureOnHeap.applyToPartitionKey(super.partitionKey()); + } + + @Override + public Row getRow(Clustering<?> clustering) + { + return ensureOnHeap.applyToRow(super.getRow(clustering)); + } + + @Override + public Row lastRow() + { + return ensureOnHeap.applyToRow(super.lastRow()); + } + + @Override + public UnfilteredRowIterator unfilteredIterator(ColumnFilter selection, Slices slices, boolean reversed) + { + return unfilteredIterator(holder(), selection, slices, reversed); + } + + @Override + public UnfilteredRowIterator unfilteredIterator(ColumnFilter selection, NavigableSet<Clustering<?>> clusteringsInQueryOrder, boolean reversed) + { + return ensureOnHeap + .applyToPartition(super.unfilteredIterator(selection, clusteringsInQueryOrder, reversed)); + } + + @Override + public UnfilteredRowIterator unfilteredIterator() + { + return unfilteredIterator(ColumnFilter.selection(super.columns()), Slices.ALL, false); + } + + @Override + public UnfilteredRowIterator unfilteredIterator(BTreePartitionData current, ColumnFilter selection, Slices slices, boolean reversed) + { + return ensureOnHeap + .applyToPartition(super.unfilteredIterator(current, selection, slices, reversed)); + } + + @Override + public Iterator<Row> iterator() + { + return ensureOnHeap.applyToPartition(super.iterator()); + } + } + + public static Factory factory(Map<String, String> optionsCopy) + { + String shardsString = optionsCopy.remove(SHARDS_OPTION); + Integer shardCount = shardsString != null ? Integer.parseInt(shardsString) : null; + return new Factory(shardCount); + } + + static class Factory implements Memtable.Factory + { + final Integer shardCount; + + Factory(Integer shardCount) + { + this.shardCount = shardCount; + } + + public Memtable create(AtomicReference<CommitLogPosition> commitLogLowerBound, + TableMetadataRef metadaRef, + Owner owner) + { + return new TrieMemtable(commitLogLowerBound, metadaRef, owner, shardCount); + } + + @Override + public TableMetrics.ReleasableMetric createMemtableMetrics(TableMetadataRef metadataRef) + { + TrieMemtableMetricsView metrics = new TrieMemtableMetricsView(metadataRef.keyspace, metadataRef.name); + return metrics::release; + } + + public boolean equals(Object o) + { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + Factory factory = (Factory) o; + return Objects.equals(shardCount, factory.shardCount); + } + + public int hashCode() + { + return Objects.hash(shardCount); + } + } + + @VisibleForTesting + public long unusedReservedMemory() + { + long size = 0; + for (MemtableShard shard : shards) + size += shard.data.unusedReservedMemory(); + return size; + } + + private static class TrieMemtableConfig implements TrieMemtableConfigMXBean + { + @Override + public void setShardCount(String shardCount) + { + if ("auto".equalsIgnoreCase(shardCount)) + { + SHARD_COUNT = FBUtilities.getAvailableProcessors(); + } + else + { + try + { + SHARD_COUNT = Integer.valueOf(shardCount); + } + catch (NumberFormatException ex) + { + logger.warn("Unable to parse {} as valid value for shard count", shardCount); + return; + } + } + logger.info("Requested setting shard count to {}; set to: {}", shardCount, SHARD_COUNT); + } + } + + @VisibleForTesting + public static int getShardCount() Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

