This is an automated email from the ASF dual-hosted git repository. adelapena pushed a commit to branch cassandra-5.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-5.0 by this push: new 9ce86e0ff8 SAI result retriever is filtering too many rows 9ce86e0ff8 is described below commit 9ce86e0ff8b6344b528a0640f9dafa23f97dd85a Author: Mike Adamson <madam...@datastax.com> AuthorDate: Tue Aug 8 17:07:01 2023 +0100 SAI result retriever is filtering too many rows This patch fixes a bug in the SegmentMetadata that was only storing the partition key for min and max primary keys for a segment. It also contains some refactoring of the PrimaryKey to remove the deferred loading of PrimaryKeys by the PrimaryKeyMaps. Patch by Mike Adamson; reviewed by Caleb Rackliffe and Andrés de la Peña for CASSANDRA-18734 --- CHANGES.txt | 1 + .../apache/cassandra/index/sai/IndexContext.java | 9 +- .../cassandra/index/sai/StorageAttachedIndex.java | 1 + .../index/sai/StorageAttachedIndexGroup.java | 2 +- .../cassandra/index/sai/disk/PrimaryKeyMap.java | 3 +- .../index/sai/disk/StorageAttachedIndexWriter.java | 3 +- .../index/sai/disk/format/IndexDescriptor.java | 11 +- .../index/sai/disk/v1/MemtableIndexWriter.java | 14 +- .../index/sai/disk/v1/SkinnyPrimaryKeyMap.java | 40 +-- .../index/sai/disk/v1/WidePrimaryKeyMap.java | 37 +-- .../index/sai/disk/v1/keystore/KeyLookup.java | 6 +- .../index/sai/disk/v1/segment/SegmentMetadata.java | 33 +- .../index/sai/memory/TrieMemoryIndex.java | 3 +- .../cassandra/index/sai/plan/QueryController.java | 13 +- .../sai/plan/StorageAttachedIndexSearcher.java | 24 +- .../cassandra/index/sai/utils/PrimaryKey.java | 368 ++++++++++++++------- .../test/microbench/sai/KeyLookupBench.java | 4 +- .../org/apache/cassandra/index/sai/SAITester.java | 4 +- .../index/sai/disk/format/IndexDescriptorTest.java | 5 +- .../sai/disk/v1/InvertedIndexSearcherTest.java | 10 +- .../index/sai/disk/v1/SegmentFlushTest.java | 16 +- .../index/sai/disk/v1/WideRowPrimaryKeyTest.java | 3 +- .../v1/bbtree/BlockBalancedTreeIndexBuilder.java | 8 +- .../index/sai/disk/v1/keystore/KeyLookupTest.java | 28 +- .../sai/iterators/KeyRangeConcatIteratorTest.java | 6 +- .../index/sai/iterators/LongIterator.java | 2 +- .../AbstractInMemoryKeyRangeIteratorTester.java | 16 +- .../PriorityInMemoryKeyRangeIteratorTest.java | 4 +- .../index/sai/memory/TrieMemoryIndexTest.java | 1 + .../index/sai/utils/AbstractPrimaryKeyTester.java | 15 +- .../index/sai/utils/IndexInputLeakDetector.java | 2 +- .../cassandra/index/sai/utils/PrimaryKeyTest.java | 61 +++- 32 files changed, 449 insertions(+), 304 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 953ca7f3dc..c65a1e6671 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.0-alpha2 + * Fix SAI's SegmentMetadata min and max primary keys (CASSANDRA-18734) * Remove commons-codec dependency (CASSANDRA-18772) Merged from 4.1: Merged from 4.0: diff --git a/src/java/org/apache/cassandra/index/sai/IndexContext.java b/src/java/org/apache/cassandra/index/sai/IndexContext.java index ac33c837da..61eb844b02 100644 --- a/src/java/org/apache/cassandra/index/sai/IndexContext.java +++ b/src/java/org/apache/cassandra/index/sai/IndexContext.java @@ -44,6 +44,7 @@ import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.db.marshal.UUIDType; import org.apache.cassandra.db.rows.Cell; import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.index.sai.analyzer.AbstractAnalyzer; import org.apache.cassandra.index.sai.disk.SSTableIndex; import org.apache.cassandra.index.sai.disk.format.Version; @@ -96,6 +97,7 @@ public class IndexContext public IndexContext(String keyspace, String table, AbstractType<?> partitionKeyType, + IPartitioner partitioner, ClusteringComparator clusteringComparator, ColumnMetadata columnMetadata, IndexTarget.Type indexType, @@ -108,7 +110,7 @@ public class IndexContext this.columnMetadata = Objects.requireNonNull(columnMetadata); this.indexType = Objects.requireNonNull(indexType); this.validator = TypeUtil.cellValueType(columnMetadata, indexType); - this.primaryKeyFactory = new PrimaryKey.Factory(clusteringComparator); + this.primaryKeyFactory = new PrimaryKey.Factory(partitioner, clusteringComparator); this.indexMetadata = indexMetadata; this.memtableIndexManager = indexMetadata == null ? null : new MemtableIndexManager(this); @@ -122,6 +124,11 @@ public class IndexContext : AbstractAnalyzer.fromOptions(getValidator(), indexMetadata.options); } + public boolean hasClustering() + { + return clusteringComparator.size() > 0; + } + public AbstractType<?> keyValidator() { return partitionKeyType; diff --git a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java index 805a6b65e1..a0cdf9c3ac 100644 --- a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java +++ b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java @@ -178,6 +178,7 @@ public class StorageAttachedIndex implements Index this.indexContext = new IndexContext(tableMetadata.keyspace, tableMetadata.name, tableMetadata.partitionKeyType, + tableMetadata.partitioner, tableMetadata.comparator, target.left, target.right, diff --git a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java index 7ce084d9db..de4c8b3570 100644 --- a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java +++ b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java @@ -191,7 +191,7 @@ public class StorageAttachedIndexGroup implements Index.Group, INotificationCons @Override public SSTableFlushObserver getFlushObserver(Descriptor descriptor, LifecycleNewTracker tracker, TableMetadata tableMetadata) { - IndexDescriptor indexDescriptor = IndexDescriptor.create(descriptor, tableMetadata.comparator); + IndexDescriptor indexDescriptor = IndexDescriptor.create(descriptor, tableMetadata.partitioner, tableMetadata.comparator); try { return StorageAttachedIndexWriter.createFlushObserverWriter(indexDescriptor, indexes, tracker); diff --git a/src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyMap.java b/src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyMap.java index fa47b9f4a4..330acf6954 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyMap.java +++ b/src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyMap.java @@ -22,6 +22,7 @@ import java.io.Closeable; import java.io.IOException; import javax.annotation.concurrent.NotThreadSafe; +import javax.annotation.concurrent.ThreadSafe; import org.apache.cassandra.index.sai.utils.PrimaryKey; @@ -36,7 +37,7 @@ public interface PrimaryKeyMap extends Closeable * A factory for creating {@link PrimaryKeyMap} instances. Implementations of this * interface are expected to be threadsafe. */ - @NotThreadSafe + @ThreadSafe interface Factory extends Closeable { /** diff --git a/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java b/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java index f44a68bde1..f35ae67f93 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java +++ b/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java @@ -247,7 +247,8 @@ public class StorageAttachedIndexWriter implements SSTableFlushObserver private void addRow(Row row) throws IOException, InMemoryTrie.SpaceExhaustedException { - PrimaryKey primaryKey = indexDescriptor.primaryKeyFactory.create(currentKey, row.clustering()); + PrimaryKey primaryKey = indexDescriptor.hasClustering() ? indexDescriptor.primaryKeyFactory.create(currentKey, row.clustering()) + : indexDescriptor.primaryKeyFactory.create(currentKey); perSSTableWriter.nextRow(primaryKey); rowMapping.add(primaryKey, sstableRowId); diff --git a/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java b/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java index 37364f6448..eda5cb7ed3 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java +++ b/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java @@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ClusteringComparator; import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.index.sai.IndexContext; import org.apache.cassandra.index.sai.IndexValidation; import org.apache.cassandra.index.sai.SSTableContext; @@ -74,17 +75,17 @@ public class IndexDescriptor public final ClusteringComparator clusteringComparator; public final PrimaryKey.Factory primaryKeyFactory; - private IndexDescriptor(Version version, Descriptor sstableDescriptor, ClusteringComparator clusteringComparator) + private IndexDescriptor(Version version, Descriptor sstableDescriptor, IPartitioner partitioner, ClusteringComparator clusteringComparator) { this.version = version; this.sstableDescriptor = sstableDescriptor; this.clusteringComparator = clusteringComparator; - this.primaryKeyFactory = new PrimaryKey.Factory(clusteringComparator); + this.primaryKeyFactory = new PrimaryKey.Factory(partitioner, clusteringComparator); } - public static IndexDescriptor create(Descriptor descriptor, ClusteringComparator clusteringComparator) + public static IndexDescriptor create(Descriptor descriptor, IPartitioner partitioner, ClusteringComparator clusteringComparator) { - return new IndexDescriptor(Version.LATEST, descriptor, clusteringComparator); + return new IndexDescriptor(Version.LATEST, descriptor, partitioner, clusteringComparator); } public static IndexDescriptor create(SSTableReader sstable) @@ -93,6 +94,7 @@ public class IndexDescriptor { IndexDescriptor indexDescriptor = new IndexDescriptor(version, sstable.descriptor, + sstable.getPartitioner(), sstable.metadata().comparator); if (version.onDiskFormat().isPerSSTableIndexBuildComplete(indexDescriptor)) @@ -102,6 +104,7 @@ public class IndexDescriptor } return new IndexDescriptor(Version.LATEST, sstable.descriptor, + sstable.getPartitioner(), sstable.metadata().comparator); } diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java b/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java index da57ac29e0..97e82ed805 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java @@ -27,7 +27,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.carrotsearch.hppc.LongArrayList; -import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.index.sai.IndexContext; @@ -105,14 +104,11 @@ public class MemtableIndexWriter implements PerColumnIndexWriter return; } - final DecoratedKey minKey = rowMapping.minKey.partitionKey(); - final DecoratedKey maxKey = rowMapping.maxKey.partitionKey(); - final Iterator<Pair<ByteComparable, LongArrayList>> iterator = rowMapping.merge(memtable); try (MemtableTermsIterator terms = new MemtableTermsIterator(memtable.getMinTerm(), memtable.getMaxTerm(), iterator)) { - long cellCount = flush(minKey, maxKey, indexContext.getValidator(), terms, rowMapping.maxSSTableRowId); + long cellCount = flush(rowMapping.minKey, rowMapping.maxKey, indexContext.getValidator(), terms, rowMapping.maxSSTableRowId); indexDescriptor.createComponentOnDisk(IndexComponent.COLUMN_COMPLETION_MARKER, indexContext); @@ -135,8 +131,8 @@ public class MemtableIndexWriter implements PerColumnIndexWriter } } - private long flush(DecoratedKey minKey, - DecoratedKey maxKey, + private long flush(PrimaryKey minKey, + PrimaryKey maxKey, AbstractType<?> termComparator, MemtableTermsIterator terms, long maxSSTableRowId) throws IOException @@ -175,8 +171,8 @@ public class MemtableIndexWriter implements PerColumnIndexWriter numRows, terms.getMinSSTableRowId(), terms.getMaxSSTableRowId(), - indexDescriptor.primaryKeyFactory.createPartitionKeyOnly(minKey), - indexDescriptor.primaryKeyFactory.createPartitionKeyOnly(maxKey), + minKey, + maxKey, terms.getMinTerm(), terms.getMaxTerm(), indexMetas); diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/SkinnyPrimaryKeyMap.java b/src/java/org/apache/cassandra/index/sai/disk/v1/SkinnyPrimaryKeyMap.java index ad7e16751b..b535861290 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/SkinnyPrimaryKeyMap.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SkinnyPrimaryKeyMap.java @@ -18,8 +18,11 @@ package org.apache.cassandra.index.sai.disk.v1; -import org.apache.cassandra.db.BufferDecoratedKey; -import org.apache.cassandra.db.Clustering; +import java.io.IOException; +import java.util.Arrays; +import javax.annotation.concurrent.NotThreadSafe; +import javax.annotation.concurrent.ThreadSafe; + import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.index.sai.disk.PrimaryKeyMap; @@ -35,15 +38,6 @@ import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.FileHandle; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.Throwables; -import org.apache.cassandra.utils.bytecomparable.ByteComparable; -import org.apache.cassandra.utils.bytecomparable.ByteSource; -import org.apache.cassandra.utils.bytecomparable.ByteSourceInverse; - -import javax.annotation.concurrent.NotThreadSafe; -import javax.annotation.concurrent.ThreadSafe; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Arrays; /** * A {@link PrimaryKeyMap} for skinny tables (those with no clustering columns). @@ -130,7 +124,6 @@ public class SkinnyPrimaryKeyMap implements PrimaryKeyMap protected final KeyLookup.Cursor partitionKeyCursor; protected final IPartitioner partitioner; protected final PrimaryKey.Factory primaryKeyFactory; - protected final ByteBuffer tokenBuffer = ByteBuffer.allocate(Long.BYTES); protected SkinnyPrimaryKeyMap(LongArray tokenArray, LongArray partitionArray, @@ -148,9 +141,7 @@ public class SkinnyPrimaryKeyMap implements PrimaryKeyMap @Override public PrimaryKey primaryKeyFromRowId(long sstableRowId) { - tokenBuffer.putLong(tokenArray.get(sstableRowId)); - tokenBuffer.rewind(); - return primaryKeyFactory.createDeferred(partitioner.getTokenFactory().fromByteArray(tokenBuffer), () -> supplier(sstableRowId)); + return primaryKeyFactory.create(readPartitionKey(sstableRowId)); } @Override @@ -159,7 +150,9 @@ public class SkinnyPrimaryKeyMap implements PrimaryKeyMap long rowId = tokenArray.indexOf(primaryKey.token().getLongValue()); // If the key is token only, the token is out of range, we are at the end of our keys, or we have skipped a token // we can return straight away. - if (primaryKey.isTokenOnly() || rowId < 0 || rowId + 1 == tokenArray.length() || tokenArray.get(rowId) != primaryKey.token().getLongValue()) + if (primaryKey.kind() == PrimaryKey.Kind.TOKEN || + rowId < 0 || + rowId + 1 == tokenArray.length() || tokenArray.get(rowId) != primaryKey.token().getLongValue()) return rowId; // Otherwise we need to check for token collision. return tokenCollisionDetection(primaryKey, rowId); @@ -188,21 +181,8 @@ public class SkinnyPrimaryKeyMap implements PrimaryKeyMap return rowId; } - protected PrimaryKey supplier(long sstableRowId) - { - return primaryKeyFactory.create(readPartitionKey(sstableRowId), Clustering.EMPTY); - } - protected DecoratedKey readPartitionKey(long sstableRowId) { - long partitionId = partitionArray.get(sstableRowId); - ByteSource.Peekable peekable = ByteSource.peekable(partitionKeyCursor.seekToPointId(partitionId).asComparableBytes(ByteComparable.Version.OSS50)); - - byte[] keyBytes = ByteSourceInverse.getUnescapedBytes(peekable); - - assert keyBytes != null : "Primary key from map did not contain a partition key"; - - ByteBuffer decoratedKey = ByteBuffer.wrap(keyBytes); - return new BufferDecoratedKey(partitioner.getToken(decoratedKey), decoratedKey); + return primaryKeyFactory.partitionKeyFromComparableBytes(partitionKeyCursor.seekToPointId(partitionArray.get(sstableRowId))); } } diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/WidePrimaryKeyMap.java b/src/java/org/apache/cassandra/index/sai/disk/v1/WidePrimaryKeyMap.java index 0016c49493..a93d6a7ff1 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/WidePrimaryKeyMap.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/WidePrimaryKeyMap.java @@ -18,9 +18,13 @@ package org.apache.cassandra.index.sai.disk.v1; +import java.io.IOException; +import java.util.Arrays; +import javax.annotation.concurrent.NotThreadSafe; +import javax.annotation.concurrent.ThreadSafe; + import org.apache.cassandra.db.Clustering; import org.apache.cassandra.db.ClusteringComparator; -import org.apache.cassandra.db.marshal.ByteBufferAccessor; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.index.sai.disk.PrimaryKeyMap; import org.apache.cassandra.index.sai.disk.format.IndexComponent; @@ -33,13 +37,6 @@ import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.FileHandle; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.Throwables; -import org.apache.cassandra.utils.bytecomparable.ByteComparable; -import org.apache.cassandra.utils.bytecomparable.ByteSource; - -import javax.annotation.concurrent.NotThreadSafe; -import javax.annotation.concurrent.ThreadSafe; -import java.io.IOException; -import java.util.Arrays; /** * An extension of the {@link SkinnyPrimaryKeyMap} for wide tables (those with clustering columns). @@ -125,6 +122,12 @@ public class WidePrimaryKeyMap extends SkinnyPrimaryKeyMap this.clusteringKeyCursor = clusteringKeyCursor; } + @Override + public PrimaryKey primaryKeyFromRowId(long sstableRowId) + { + return primaryKeyFactory.create(readPartitionKey(sstableRowId), readClusteringKey(sstableRowId)); + } + @Override public long rowIdFromPrimaryKey(PrimaryKey primaryKey) { @@ -132,7 +135,7 @@ public class WidePrimaryKeyMap extends SkinnyPrimaryKeyMap // If the key only has a token (initial range skip in the query), the token is out of range, // or we have skipped a token, return the rowId from the token array. - if (primaryKey.isTokenOnly() || rowId < 0 || tokenArray.get(rowId) != primaryKey.token().getLongValue()) + if (primaryKey.kind() == PrimaryKey.Kind.TOKEN || rowId < 0 || tokenArray.get(rowId) != primaryKey.token().getLongValue()) return rowId; rowId = tokenCollisionDetection(primaryKey, rowId); @@ -148,23 +151,9 @@ public class WidePrimaryKeyMap extends SkinnyPrimaryKeyMap FileUtils.closeQuietly(clusteringKeyCursor); } - @Override - protected PrimaryKey supplier(long sstableRowId) - { - return primaryKeyFactory.create(readPartitionKey(sstableRowId), readClusteringKey(sstableRowId)); - } - private Clustering<?> readClusteringKey(long sstableRowId) { - ByteSource.Peekable peekable = ByteSource.peekable(clusteringKeyCursor.seekToPointId(sstableRowId) - .asComparableBytes(ByteComparable.Version.OSS50)); - - Clustering<?> clustering = clusteringComparator.clusteringFromByteComparable(ByteBufferAccessor.instance, v -> peekable); - - if (clustering == null) - clustering = Clustering.EMPTY; - - return clustering; + return primaryKeyFactory.clusteringFromByteComparable(clusteringKeyCursor.seekToPointId(sstableRowId)); } // Returns the rowId of the next partition or the number of rows if supplied rowId is in the last partition diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/keystore/KeyLookup.java b/src/java/org/apache/cassandra/index/sai/disk/v1/keystore/KeyLookup.java index 7f6f8f3b82..7b8808e89e 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/keystore/KeyLookup.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/keystore/KeyLookup.java @@ -143,10 +143,10 @@ public class KeyLookup * in these cases the internal buffer is cleared. * * @param pointId point id to lookup - * @return The {@link ByteComparable} containing the key + * @return The {@link ByteSource} containing the key * @throws IndexOutOfBoundsException if the target point id is less than -1 or greater than the number of keys */ - public @Nonnull ByteComparable seekToPointId(long pointId) + public @Nonnull ByteSource seekToPointId(long pointId) { if (pointId < 0 || pointId >= keyLookupMeta.keyCount) throw new IndexOutOfBoundsException(String.format(INDEX_OUT_OF_BOUNDS, pointId, keyLookupMeta.keyCount)); @@ -170,7 +170,7 @@ public class KeyLookup updateCurrentBlockIndex(currentPointId); } - return ByteComparable.fixedLength(currentKey.bytes, currentKey.offset, currentKey.length); + return ByteSource.fixedLength(currentKey.bytes, currentKey.offset, currentKey.length); } /** diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/segment/SegmentMetadata.java b/src/java/org/apache/cassandra/index/sai/disk/v1/segment/SegmentMetadata.java index e9510f945b..28aa9b2e4d 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/segment/SegmentMetadata.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/segment/SegmentMetadata.java @@ -18,6 +18,7 @@ package org.apache.cassandra.index.sai.disk.v1.segment; import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; @@ -30,12 +31,14 @@ import java.util.stream.Stream; import com.google.common.collect.ImmutableMap; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.index.sai.disk.format.IndexComponent; import org.apache.cassandra.index.sai.disk.v1.MetadataSource; import org.apache.cassandra.index.sai.disk.v1.MetadataWriter; import org.apache.cassandra.index.sai.utils.PrimaryKey; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.bytecomparable.ByteComparable; +import org.apache.cassandra.utils.bytecomparable.ByteSource; +import org.apache.cassandra.utils.bytecomparable.ByteSourceInverse; import org.apache.lucene.store.DataInput; import org.apache.lucene.store.IndexOutput; @@ -54,7 +57,7 @@ public class SegmentMetadata /** * Min and max sstable rowId in current segment. - * + * <p> * For index generated by compaction, minSSTableRowId is the same as segmentRowIdOffset. * But for flush, segmentRowIdOffset is taken from previous segment's maxSSTableRowId. */ @@ -80,7 +83,7 @@ public class SegmentMetadata /** * Root, offset, length for each index structure in the segment. - * + * <p> * Note: postings block offsets are stored in terms dictionary, no need to worry about its root. */ public final ComponentMetadataMap componentMetadatas; @@ -118,8 +121,8 @@ public class SegmentMetadata this.numRows = input.readLong(); this.minSSTableRowId = input.readLong(); this.maxSSTableRowId = input.readLong(); - this.minKey = primaryKeyFactory.createPartitionKeyOnly(DatabaseDescriptor.getPartitioner().decorateKey(readBytes(input))); - this.maxKey = primaryKeyFactory.createPartitionKeyOnly(DatabaseDescriptor.getPartitioner().decorateKey(readBytes(input))); + this.minKey = primaryKeyFactory.fromComparableBytes(ByteSource.fixedLength(readBytes(input))); + this.maxKey = primaryKeyFactory.fromComparableBytes(ByteSource.fixedLength(readBytes(input))); this.minTerm = readBytes(input); this.maxTerm = readBytes(input); this.componentMetadatas = new ComponentMetadataMap(input); @@ -158,9 +161,10 @@ public class SegmentMetadata output.writeLong(metadata.minSSTableRowId); output.writeLong(metadata.maxSSTableRowId); - Stream.of(metadata.minKey.partitionKey().getKey(), - metadata.maxKey.partitionKey().getKey(), - metadata.minTerm, metadata.maxTerm).forEach(bb -> writeBytes(bb, output)); + Stream.of(ByteSourceInverse.readBytes(metadata.minKey.asComparableBytes(ByteComparable.Version.OSS50)), + ByteSourceInverse.readBytes(metadata.maxKey.asComparableBytes(ByteComparable.Version.OSS50))) + .forEach(b -> writeBytes(b, output)); + Stream.of(metadata.minTerm, metadata.maxTerm).forEach(bb -> writeBytes(bb, output)); metadata.componentMetadatas.write(output); } @@ -195,6 +199,19 @@ public class SegmentMetadata out.writeInt(bytes.length); out.writeBytes(bytes, 0, bytes.length); } + catch (IOException e) + { + throw new UncheckedIOException(e); + } + } + + private static void writeBytes(byte[] bytes, IndexOutput out) + { + try + { + out.writeInt(bytes.length); + out.writeBytes(bytes, 0, bytes.length); + } catch (IOException ioe) { throw new RuntimeException(ioe); diff --git a/src/java/org/apache/cassandra/index/sai/memory/TrieMemoryIndex.java b/src/java/org/apache/cassandra/index/sai/memory/TrieMemoryIndex.java index 0df665d930..f992301df9 100644 --- a/src/java/org/apache/cassandra/index/sai/memory/TrieMemoryIndex.java +++ b/src/java/org/apache/cassandra/index/sai/memory/TrieMemoryIndex.java @@ -97,7 +97,8 @@ public class TrieMemoryIndex { value = TypeUtil.asIndexBytes(value, validator); analyzer.reset(value); - final PrimaryKey primaryKey = indexContext.keyFactory().create(key, clustering); + final PrimaryKey primaryKey = indexContext.hasClustering() ? indexContext.keyFactory().create(key, clustering) + : indexContext.keyFactory().create(key); final long initialSizeOnHeap = data.sizeOnHeap(); final long initialSizeOffHeap = data.sizeOffHeap(); final long reducerHeapSize = primaryKeysReducer.heapAllocations(); diff --git a/src/java/org/apache/cassandra/index/sai/plan/QueryController.java b/src/java/org/apache/cassandra/index/sai/plan/QueryController.java index bf435678ac..2d01983219 100644 --- a/src/java/org/apache/cassandra/index/sai/plan/QueryController.java +++ b/src/java/org/apache/cassandra/index/sai/plan/QueryController.java @@ -119,6 +119,7 @@ public class QueryController return indexes.isEmpty() ? new IndexContext(cfs.metadata().keyspace, cfs.metadata().name, cfs.metadata().partitionKeyType, + cfs.getPartitioner(), cfs.metadata().comparator, expression.column(), IndexTarget.Type.VALUES, @@ -196,7 +197,7 @@ public class QueryController * The query does not select the key if both of the following statements are false: * 1. The table associated with the query is not using clustering keys * 2. The clustering index filter for the command wants the row. - * + * <p> * Item 2 is important in paged queries where the {@link org.apache.cassandra.db.filter.ClusteringIndexSliceFilter} for * subsequent paged queries may not select rows that are returned by the index * search because that is initially partition based. @@ -206,7 +207,7 @@ public class QueryController */ public boolean doesNotSelect(PrimaryKey key) { - return !key.hasEmptyClustering() && !command.clusteringIndexFilter(key.partitionKey()).selects(key.clustering()); + return key.kind() == PrimaryKey.Kind.WIDE && !command.clusteringIndexFilter(key.partitionKey()).selects(key.clustering()); } // Note: This method assumes that the selects method has already been called for the @@ -215,7 +216,13 @@ public class QueryController { ClusteringIndexFilter clusteringIndexFilter = command.clusteringIndexFilter(key.partitionKey()); - if (key.hasEmptyClustering()) + assert cfs.metadata().comparator.size() == 0 && !key.kind().hasClustering || + cfs.metadata().comparator.size() > 0 && key.kind().hasClustering : + "PrimaryKey " + key + " clustering does not match table. There should be a clustering of size " + cfs.metadata().comparator.size(); + + // If we have skinny partitions or the key is for a static row then we need to get the partition as + // requested by the original query. + if (cfs.metadata().comparator.size() == 0 || key.kind() == PrimaryKey.Kind.STATIC) return clusteringIndexFilter; else return new ClusteringIndexNamesFilter(FBUtilities.singleton(key.clustering(), cfs.metadata().comparator), diff --git a/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java b/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java index adcf36d157..295189b023 100644 --- a/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java +++ b/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java @@ -72,7 +72,7 @@ public class StorageAttachedIndexSearcher implements Index.Searcher this.command = command; this.queryContext = new QueryContext(command, executionQuotaMs); this.queryController = new QueryController(cfs, command, filterOperation, queryContext, tableQueryMetrics); - this.keyFactory = new PrimaryKey.Factory(cfs.metadata().comparator); + this.keyFactory = new PrimaryKey.Factory(cfs.getPartitioner(), cfs.metadata().comparator); } @Override @@ -138,8 +138,8 @@ public class StorageAttachedIndexSearcher implements Index.Searcher this.queryContext = queryContext; this.keyFactory = keyFactory; - this.firstPrimaryKey = keyFactory.createTokenOnly(queryController.mergeRange().left.getToken()); - this.lastPrimaryKey = keyFactory.createTokenOnly(queryController.mergeRange().right.getToken()); + this.firstPrimaryKey = keyFactory.create(queryController.mergeRange().left.getToken()); + this.lastPrimaryKey = keyFactory.create(queryController.mergeRange().right.getToken()); } @Override @@ -298,7 +298,7 @@ public class StorageAttachedIndexSearcher implements Index.Searcher */ private void skipTo(@Nonnull Token token) { - resultKeyIterator.skipTo(keyFactory.createTokenOnly(token)); + resultKeyIterator.skipTo(keyFactory.create(token)); } /** @@ -380,6 +380,13 @@ public class StorageAttachedIndexSearcher implements Index.Searcher private static UnfilteredRowIterator applyIndexFilter(UnfilteredRowIterator partition, FilterTree tree, QueryContext queryContext) { Row staticRow = partition.staticRow(); + + // We want to short-circuit the filtering of the whole partition if the static row + // satisfies the filter. If that is the case we just need to return the whole partition. + queryContext.rowsFiltered++; + if (tree.isSatisfiedBy(partition.partitionKey(), staticRow, staticRow)) + return partition; + List<Unfiltered> clusters = new ArrayList<>(); while (partition.hasNext()) @@ -393,15 +400,6 @@ public class StorageAttachedIndexSearcher implements Index.Searcher } } - if (clusters.isEmpty()) - { - queryContext.rowsFiltered++; - if (tree.isSatisfiedBy(partition.partitionKey(), staticRow, staticRow)) - { - clusters.add(staticRow); - } - } - /* * If {@code clusters} is empty, which means either all clustering row and static row pairs failed, * or static row and static row pair failed. In both cases, we should not return any partition. diff --git a/src/java/org/apache/cassandra/index/sai/utils/PrimaryKey.java b/src/java/org/apache/cassandra/index/sai/utils/PrimaryKey.java index fe541998e2..7918cbad0c 100644 --- a/src/java/org/apache/cassandra/index/sai/utils/PrimaryKey.java +++ b/src/java/org/apache/cassandra/index/sai/utils/PrimaryKey.java @@ -17,34 +17,58 @@ */ package org.apache.cassandra.index.sai.utils; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Objects; -import java.util.function.Supplier; import java.util.stream.Collectors; -import javax.annotation.Nullable; - +import org.apache.cassandra.db.BufferDecoratedKey; import org.apache.cassandra.db.Clustering; import org.apache.cassandra.db.ClusteringComparator; import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.marshal.ByteBufferAccessor; +import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Token; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.bytecomparable.ByteComparable; import org.apache.cassandra.utils.bytecomparable.ByteSource; +import org.apache.cassandra.utils.bytecomparable.ByteSourceInverse; /** * Representation of the primary key for a row consisting of the {@link DecoratedKey} and * {@link Clustering} associated with a {@link org.apache.cassandra.db.rows.Row}. + * The {@link Factory.TokenOnlyPrimaryKey} is used by the {@link org.apache.cassandra.index.sai.plan.StorageAttachedIndexSearcher} to + * position the search within the query range. */ public interface PrimaryKey extends Comparable<PrimaryKey>, ByteComparable { + /** + * See the javadoc for {@link #kind()} for how this enum is used. + */ + enum Kind + { + TOKEN(false), + SKINNY(false), + WIDE(true), + STATIC(true); + + public final boolean hasClustering; + + Kind(boolean hasClustering) + { + this.hasClustering = hasClustering; + } + } + class Factory { + private final IPartitioner partitioner; private final ClusteringComparator clusteringComparator; - public Factory(ClusteringComparator clusteringComparator) + public Factory(IPartitioner partitioner, ClusteringComparator clusteringComparator) { + this.partitioner = partitioner; this.clusteringComparator = clusteringComparator; } @@ -54,18 +78,22 @@ public interface PrimaryKey extends Comparable<PrimaryKey>, ByteComparable * {@link Token} only primary keys are used for defining the partition range * of a query. */ - public PrimaryKey createTokenOnly(Token token) + public PrimaryKey create(Token token) { assert token != null : "Cannot create a primary key with a null token"; return new TokenOnlyPrimaryKey(token); } - public PrimaryKey createPartitionKeyOnly(DecoratedKey partitionKey) + /** + * Create a {@link PrimaryKey} for tables without clustering columns + */ + public PrimaryKey create(DecoratedKey partitionKey) { + assert clusteringComparator.size() == 0 : "Cannot create a skinny primary key for a table with clustering columns"; assert partitionKey != null : "Cannot create a primary key with a null partition key"; - return new ImmutablePrimaryKey(partitionKey, null); + return new SkinnyPrimaryKey(partitionKey); } /** @@ -74,66 +102,103 @@ public interface PrimaryKey extends Comparable<PrimaryKey>, ByteComparable */ public PrimaryKey create(DecoratedKey partitionKey, Clustering<?> clustering) { + assert clusteringComparator.size() > 0 : "Cannot create a wide primary key for a table without clustering columns"; assert partitionKey != null : "Cannot create a primary key with a null partition key"; assert clustering != null : "Cannot create a primary key with a null clustering"; - return new ImmutablePrimaryKey(partitionKey, clustering); + return clustering == Clustering.STATIC_CLUSTERING ? new StaticPrimaryKey(partitionKey) : new WidePrimaryKey(partitionKey, clustering); + } + + /** + * Create a {@link PrimaryKey} from a {@link ByteSource}. This should only be used with {@link ByteSource} instances + * created by calls to {@link PrimaryKey#asComparableBytes(Version)}. + */ + public PrimaryKey fromComparableBytes(ByteSource byteSource) + { + if (clusteringComparator.size() > 0) + { + ByteSource.Peekable peekable = ByteSource.peekable(byteSource); + DecoratedKey partitionKey = partitionKeyFromComparableBytes(ByteSourceInverse.nextComponentSource(peekable)); + Clustering<?> clustering = clusteringFromByteComparable(ByteSourceInverse.nextComponentSource(peekable)); + return create(partitionKey, clustering); + } + else + { + return create(partitionKeyFromComparableBytes(byteSource)); + } + } + + /** + * Create a {@link DecoratedKey} from a {@link ByteSource}. This is a separate method because of it's use by + * the {@link org.apache.cassandra.index.sai.disk.PrimaryKeyMap} implementations to create partition keys. + */ + public DecoratedKey partitionKeyFromComparableBytes(ByteSource byteSource) + { + ByteBuffer decoratedKey = ByteBuffer.wrap(ByteSourceInverse.getUnescapedBytes(ByteSource.peekable(byteSource))); + return new BufferDecoratedKey(partitioner.getToken(decoratedKey), decoratedKey); } - public PrimaryKey createDeferred(Token token, Supplier<PrimaryKey> primaryKeySupplier) + /** + * Create a {@link Clustering} from a {@link ByteSource}. This is a separate method because of its use by + * the {@link org.apache.cassandra.index.sai.disk.v1.WidePrimaryKeyMap} to create its clustering keys. + */ + public Clustering<?> clusteringFromByteComparable(ByteSource byteSource) { - assert token != null : "Cannot create a deferred primary key with a null token"; - assert primaryKeySupplier != null : "Cannot create a deferred primary key with a null key supplier"; + Clustering<?> clustering = clusteringComparator.clusteringFromByteComparable(ByteBufferAccessor.instance, v -> byteSource); - return new MutablePrimaryKey(token, primaryKeySupplier); + // Clustering is null for static rows + return (clustering == null) ? Clustering.STATIC_CLUSTERING : clustering; } - abstract class AbstractPrimaryKey implements PrimaryKey + class TokenOnlyPrimaryKey implements PrimaryKey { + protected final Token token; + + TokenOnlyPrimaryKey(Token token) + { + this.token = token; + } + @Override - @SuppressWarnings("ConstantConditions") - public ByteSource asComparableBytes(ByteComparable.Version version) + public Kind kind() { - ByteSource keyComparable = ByteSource.of(partitionKey().getKey(), version); - if (clusteringComparator.size() == 0) - return keyComparable; - // It is important that the ClusteringComparator.asBytesComparable method is used - // to maintain the correct clustering sort order - ByteSource clusteringComparable = clustering() == null || - clustering().isEmpty() ? null - : clusteringComparator.asByteComparable(clustering()) - .asComparableBytes(version); - return ByteSource.withTerminator(version == ByteComparable.Version.LEGACY ? ByteSource.END_OF_STREAM - : ByteSource.TERMINATOR, - keyComparable, - clusteringComparable); + return Kind.TOKEN; } @Override - @SuppressWarnings("ConstantConditions") - public int compareTo(PrimaryKey o) + public Token token() { - int cmp = token().compareTo(o.token()); + return token; + } - // If the tokens don't match then we don't need to compare any more of the key. - // Otherwise, if either of the keys are token only we can only compare tokens - if ((cmp != 0) || isTokenOnly() || o.isTokenOnly()) - return cmp; + @Override + public DecoratedKey partitionKey() + { + throw new UnsupportedOperationException(); + } - // Next compare the partition keys. If they are not equal or - // this is a single row partition key or there are no - // clusterings then we can return the result of this without - // needing to compare the clusterings - cmp = partitionKey().compareTo(o.partitionKey()); - if (cmp != 0 || hasEmptyClustering() || o.hasEmptyClustering()) - return cmp; - return clusteringComparator.compare(clustering(), o.clustering()); + @Override + public Clustering<?> clustering() + { + throw new UnsupportedOperationException(); + } + + @Override + public ByteSource asComparableBytes(Version version) + { + throw new UnsupportedOperationException(); + } + + @Override + public int compareTo(PrimaryKey o) + { + return token().compareTo(o.token()); } @Override public int hashCode() { - return Objects.hash(token(), partitionKey(), clustering(), clusteringComparator); + return Objects.hash(token(), clusteringComparator); } @Override @@ -145,165 +210,219 @@ public interface PrimaryKey extends Comparable<PrimaryKey>, ByteComparable } @Override - @SuppressWarnings("ConstantConditions") public String toString() { - return isTokenOnly() ? String.format("PrimaryKey: { token: %s }", token()) - : String.format("PrimaryKey: { token: %s, partition: %s, clustering: %s:%s } ", - token(), - partitionKey(), - clustering() == null ? null : clustering().kind(), - clustering() == null ? null : Arrays.stream(clustering().getBufferArray()) - .map(ByteBufferUtil::bytesToHex) - .collect(Collectors.joining(", "))); + return String.format("PrimaryKey: { token: %s }", token()); } } - class TokenOnlyPrimaryKey extends AbstractPrimaryKey + class SkinnyPrimaryKey extends TokenOnlyPrimaryKey { - private final Token token; + protected final DecoratedKey partitionKey; - TokenOnlyPrimaryKey(Token token) + SkinnyPrimaryKey(DecoratedKey partitionKey) { - this.token = token; + super(partitionKey.getToken()); + this.partitionKey = partitionKey; } @Override - public boolean isTokenOnly() + public Kind kind() { - return true; + return Kind.SKINNY; } @Override - public Token token() + public DecoratedKey partitionKey() { - return token; + return partitionKey; } @Override - public DecoratedKey partitionKey() + public ByteSource asComparableBytes(Version version) { - throw new UnsupportedOperationException(); + return ByteSource.of(partitionKey().getKey(), version); } @Override - public Clustering<?> clustering() + public int compareTo(PrimaryKey o) { - throw new UnsupportedOperationException(); + int cmp = super.compareTo(o); + + // If the tokens don't match then we don't need to compare any more of the key. + // Otherwise, if the other key is token only we can only compare tokens + // This is used by the ResultRetriever to skip to the current key range start position + // during result retrieval. + if ((cmp != 0) || o.kind() == Kind.TOKEN) + return cmp; + + // Finally compare the partition keys + return partitionKey().compareTo(o.partitionKey()); } @Override - public ByteSource asComparableBytes(Version version) + public int hashCode() { - throw new UnsupportedOperationException(); + return Objects.hash(token(), partitionKey(), Clustering.EMPTY, clusteringComparator); + } + + @Override + public String toString() + { + return String.format("PrimaryKey: { token: %s, partition: %s }", token(), partitionKey()); } } - class ImmutablePrimaryKey extends AbstractPrimaryKey + class StaticPrimaryKey extends SkinnyPrimaryKey { - private final Token token; - private final DecoratedKey partitionKey; - private final Clustering<?> clustering; + StaticPrimaryKey(DecoratedKey partitionKey) + { + super(partitionKey); + } - ImmutablePrimaryKey(DecoratedKey partitionKey, Clustering<?> clustering) + @Override + public Kind kind() { - this.token = partitionKey.getToken(); - this.partitionKey = partitionKey; - this.clustering = clustering; + return Kind.STATIC; } @Override - public Token token() + public Clustering<?> clustering() { - return token; + return Clustering.STATIC_CLUSTERING; } @Override - public DecoratedKey partitionKey() + public ByteSource asComparableBytes(ByteComparable.Version version) { - return partitionKey; + ByteSource keyComparable = ByteSource.of(partitionKey().getKey(), version); + // Static clustering cannot be serialized or made to a byte comparable, so we use null as the component. + return ByteSource.withTerminator(version == ByteComparable.Version.LEGACY ? ByteSource.END_OF_STREAM + : ByteSource.TERMINATOR, + keyComparable, + null); } @Override - public Clustering<?> clustering() + public int compareTo(PrimaryKey o) { - return clustering; + int cmp = super.compareTo(o); + if (cmp != 0 || o.kind() == Kind.TOKEN || o.kind() == Kind.SKINNY) + return cmp; + // The static clustering comes first in the sort order of if the other key has static clustering we + // are equals otherwise we are less than the other + return o.kind() == Kind.STATIC ? 0 : -1; + } + + @Override + public int hashCode() + { + return Objects.hash(token(), partitionKey(), Clustering.STATIC_CLUSTERING, clusteringComparator); + } + + @Override + public String toString() + { + return String.format("PrimaryKey: { token: %s, partition: %s, clustering: STATIC } ", token(), partitionKey()); } } - class MutablePrimaryKey extends AbstractPrimaryKey + class WidePrimaryKey extends SkinnyPrimaryKey { - private final Token token; - private final Supplier<PrimaryKey> primaryKeySupplier; + private final Clustering<?> clustering; - private boolean notLoaded = true; - private DecoratedKey partitionKey; - private Clustering<?> clustering; + WidePrimaryKey(DecoratedKey partitionKey, Clustering<?> clustering) + { + super(partitionKey); + this.clustering = clustering; + } - MutablePrimaryKey(Token token, Supplier<PrimaryKey> primaryKeySupplier) + @Override + public Kind kind() { - this.token = token; - this.primaryKeySupplier = primaryKeySupplier; + return Kind.WIDE; } @Override - public Token token() + public Clustering<?> clustering() { - return token; + return clustering; } @Override - public DecoratedKey partitionKey() + public ByteSource asComparableBytes(ByteComparable.Version version) { - loadDeferred(); - return partitionKey; + ByteSource keyComparable = ByteSource.of(partitionKey().getKey(), version); + // It is important that the ClusteringComparator.asBytesComparable method is used + // to maintain the correct clustering sort order. + ByteSource clusteringComparable = clusteringComparator.asByteComparable(clustering()).asComparableBytes(version); + return ByteSource.withTerminator(version == ByteComparable.Version.LEGACY ? ByteSource.END_OF_STREAM + : ByteSource.TERMINATOR, + keyComparable, + clusteringComparable); } @Override - public Clustering<?> clustering() + public int compareTo(PrimaryKey o) { - loadDeferred(); - return clustering; + int cmp = super.compareTo(o); + if (cmp != 0 || o.kind() == Kind.TOKEN || o.kind() == Kind.SKINNY) + return cmp; + // At this point we will be greater than other if it is static + if (o.kind() == Kind.STATIC) + return 1; + return clusteringComparator.compare(clustering(), o.clustering()); } - private void loadDeferred() + @Override + public int hashCode() { - if (notLoaded) - { - PrimaryKey deferredPrimaryKey = primaryKeySupplier.get(); - this.partitionKey = deferredPrimaryKey.partitionKey(); - this.clustering = deferredPrimaryKey.clustering(); - notLoaded = false; - } + return Objects.hash(token(), partitionKey(), clustering(), clusteringComparator); + } + + @Override + public String toString() + { + return String.format("PrimaryKey: { token: %s, partition: %s, clustering: %s:%s } ", + token(), + partitionKey(), + clustering().kind(), + Arrays.stream(clustering().getBufferArray()) + .map(ByteBufferUtil::bytesToHex) + .collect(Collectors.joining(", "))); } } } - default boolean isTokenOnly() - { - return false; - } + /** + * Returns the {@link Kind} of the {@link PrimaryKey}. The {@link Kind} is used locally in the {@link #compareTo(Object)} + * methods to determine how far the comparision needs to go between keys. + * <p> + * The {@link Kind} values have a categorization of {@code isClustering}. This indicates whether the key belongs to + * a table with clustering tables or not. + */ + Kind kind(); + /** + * Returns the {@link Token} component of the {@link PrimaryKey} + */ Token token(); - @Nullable + /** + * Returns the {@link DecoratedKey} representing the partition key of the {@link PrimaryKey}. + * <p> + * Note: This cannot be null but some {@link PrimaryKey} implementations can throw {@link UnsupportedOperationException} + * if they do not support partition keys. + */ DecoratedKey partitionKey(); - @Nullable - Clustering<?> clustering(); - /** - * Return whether the primary key has an empty clustering or not. - * By default, the clustering is empty if the internal clustering - * is null or is empty. - * - * @return {@code true} if the clustering is empty, otherwise {@code false} + * Returns the {@link Clustering} representing the clustering component of the {@link PrimaryKey}. + * <p> + * Note: This cannot be null but some {@link PrimaryKey} implementations can throw {@link UnsupportedOperationException} + * if they do not support clustering columns. */ - @SuppressWarnings("ConstantConditions") - default boolean hasEmptyClustering() - { - return clustering() == null || clustering().isEmpty(); - } + Clustering<?> clustering(); /** * Returns the {@link PrimaryKey} as a {@link ByteSource} byte comparable representation. @@ -314,6 +433,7 @@ public interface PrimaryKey extends Comparable<PrimaryKey>, ByteComparable * * @param version the {@link ByteComparable.Version} to use for the implementation * @return the {@code ByteSource} byte comparable. + * @throws UnsupportedOperationException for {@link PrimaryKey} implementations that are not byte-comparable */ ByteSource asComparableBytes(ByteComparable.Version version); } diff --git a/test/microbench/org/apache/cassandra/test/microbench/sai/KeyLookupBench.java b/test/microbench/org/apache/cassandra/test/microbench/sai/KeyLookupBench.java index 54f909efad..65bded3a45 100644 --- a/test/microbench/org/apache/cassandra/test/microbench/sai/KeyLookupBench.java +++ b/test/microbench/org/apache/cassandra/test/microbench/sai/KeyLookupBench.java @@ -117,13 +117,13 @@ public class KeyLookupBench metadata.name, Util.newUUIDGen().get()); - indexDescriptor = IndexDescriptor.create(descriptor, metadata.comparator); + indexDescriptor = IndexDescriptor.create(descriptor, metadata.partitioner, metadata.comparator); CassandraRelevantProperties.SAI_SORTED_TERMS_PARTITION_BLOCK_SHIFT.setInt(partitionBlockShift); CassandraRelevantProperties.SAI_SORTED_TERMS_CLUSTERING_BLOCK_SHIFT.setInt(clusteringBlockShift); SSTableComponentsWriter writer = new SSTableComponentsWriter(indexDescriptor); - PrimaryKey.Factory factory = new PrimaryKey.Factory(metadata.comparator); + PrimaryKey.Factory factory = new PrimaryKey.Factory(metadata.partitioner, metadata.comparator); PrimaryKey[] primaryKeys = new PrimaryKey[rows]; int partition = 0; diff --git a/test/unit/org/apache/cassandra/index/sai/SAITester.java b/test/unit/org/apache/cassandra/index/sai/SAITester.java index c1080fb528..e043d6602f 100644 --- a/test/unit/org/apache/cassandra/index/sai/SAITester.java +++ b/test/unit/org/apache/cassandra/index/sai/SAITester.java @@ -71,6 +71,7 @@ import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.index.Index; import org.apache.cassandra.index.sai.disk.SSTableIndex; import org.apache.cassandra.index.sai.disk.format.IndexComponent; @@ -141,7 +142,7 @@ public abstract class SAITester extends CQLTester public static final ClusteringComparator EMPTY_COMPARATOR = new ClusteringComparator(); - public static final PrimaryKey.Factory TEST_FACTORY = new PrimaryKey.Factory(EMPTY_COMPARATOR); + public static final PrimaryKey.Factory TEST_FACTORY = new PrimaryKey.Factory(Murmur3Partitioner.instance, EMPTY_COMPARATOR); @BeforeClass public static void setUpClass() @@ -254,6 +255,7 @@ public abstract class SAITester extends CQLTester return new IndexContext("test_ks", "test_cf", UTF8Type.instance, + Murmur3Partitioner.instance, new ClusteringComparator(), ColumnMetadata.regularColumn("sai", "internal", name, validator), IndexTarget.Type.SIMPLE, diff --git a/test/unit/org/apache/cassandra/index/sai/disk/format/IndexDescriptorTest.java b/test/unit/org/apache/cassandra/index/sai/disk/format/IndexDescriptorTest.java index b6546c9551..264e217e70 100644 --- a/test/unit/org/apache/cassandra/index/sai/disk/format/IndexDescriptorTest.java +++ b/test/unit/org/apache/cassandra/index/sai/disk/format/IndexDescriptorTest.java @@ -31,6 +31,7 @@ import org.junit.rules.TemporaryFolder; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.index.sai.IndexContext; import org.apache.cassandra.index.sai.SAITester; import org.apache.cassandra.io.sstable.Descriptor; @@ -68,7 +69,7 @@ public class IndexDescriptorTest { createFileOnDisk("-SAI+aa+GroupComplete.db"); - IndexDescriptor indexDescriptor = IndexDescriptor.create(descriptor, SAITester.EMPTY_COMPARATOR); + IndexDescriptor indexDescriptor = IndexDescriptor.create(descriptor, Murmur3Partitioner.instance, SAITester.EMPTY_COMPARATOR); assertEquals(Version.AA, indexDescriptor.version); assertTrue(indexDescriptor.hasComponent(IndexComponent.GROUP_COMPLETION_MARKER)); @@ -79,7 +80,7 @@ public class IndexDescriptorTest { createFileOnDisk("-SAI+aa+test_index+ColumnComplete.db"); - IndexDescriptor indexDescriptor = IndexDescriptor.create(descriptor, SAITester.EMPTY_COMPARATOR); + IndexDescriptor indexDescriptor = IndexDescriptor.create(descriptor, Murmur3Partitioner.instance, SAITester.EMPTY_COMPARATOR); IndexContext indexContext = SAITester.createIndexContext("test_index", UTF8Type.instance); assertEquals(Version.AA, indexDescriptor.version); diff --git a/test/unit/org/apache/cassandra/index/sai/disk/v1/InvertedIndexSearcherTest.java b/test/unit/org/apache/cassandra/index/sai/disk/v1/InvertedIndexSearcherTest.java index 485d2b095a..dd3547ed79 100644 --- a/test/unit/org/apache/cassandra/index/sai/disk/v1/InvertedIndexSearcherTest.java +++ b/test/unit/org/apache/cassandra/index/sai/disk/v1/InvertedIndexSearcherTest.java @@ -62,12 +62,12 @@ public class InvertedIndexSearcherTest extends SAIRandomizedTester { public static final PrimaryKeyMap TEST_PRIMARY_KEY_MAP = new PrimaryKeyMap() { - private final PrimaryKey.Factory primaryKeyFactory = new PrimaryKey.Factory(new ClusteringComparator()); + private final PrimaryKey.Factory primaryKeyFactory = new PrimaryKey.Factory(Murmur3Partitioner.instance, new ClusteringComparator()); @Override public PrimaryKey primaryKeyFromRowId(long sstableRowId) { - return primaryKeyFactory.createTokenOnly(new Murmur3Partitioner.LongToken(sstableRowId)); + return primaryKeyFactory.create(new Murmur3Partitioner.LongToken(sstableRowId)); } @Override @@ -127,7 +127,7 @@ public class InvertedIndexSearcherTest extends SAIRandomizedTester final int idxToSkip = numPostings - 7; // tokens are equal to their corresponding row IDs final long tokenToSkip = termsEnum.get(t).right.get(idxToSkip); - results.skipTo(SAITester.TEST_FACTORY.createTokenOnly(new Murmur3Partitioner.LongToken(tokenToSkip))); + results.skipTo(SAITester.TEST_FACTORY.create(new Murmur3Partitioner.LongToken(tokenToSkip))); for (int p = idxToSkip; p < numPostings; ++p) { @@ -189,8 +189,8 @@ public class InvertedIndexSearcherTest extends SAIRandomizedTester size, 0, Long.MAX_VALUE, - SAITester.TEST_FACTORY.createTokenOnly(DatabaseDescriptor.getPartitioner().getMinimumToken()), - SAITester.TEST_FACTORY.createTokenOnly(DatabaseDescriptor.getPartitioner().getMaximumToken()), + SAITester.TEST_FACTORY.create(DatabaseDescriptor.getPartitioner().getMinimumToken()), + SAITester.TEST_FACTORY.create(DatabaseDescriptor.getPartitioner().getMaximumToken()), wrap(termsEnum.get(0).left), wrap(termsEnum.get(terms - 1).left), indexMetas); diff --git a/test/unit/org/apache/cassandra/index/sai/disk/v1/SegmentFlushTest.java b/test/unit/org/apache/cassandra/index/sai/disk/v1/SegmentFlushTest.java index 96d746df29..cbd74d8bef 100644 --- a/test/unit/org/apache/cassandra/index/sai/disk/v1/SegmentFlushTest.java +++ b/test/unit/org/apache/cassandra/index/sai/disk/v1/SegmentFlushTest.java @@ -39,6 +39,7 @@ import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.db.rows.BTreeRow; import org.apache.cassandra.db.rows.BufferCell; import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.index.sai.IndexContext; import org.apache.cassandra.index.sai.SAITester; import org.apache.cassandra.index.sai.disk.format.IndexComponent; @@ -54,6 +55,7 @@ import org.apache.cassandra.io.util.File; import org.apache.cassandra.io.util.FileHandle; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.IndexMetadata; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.bytecomparable.ByteComparable; import static org.apache.cassandra.Util.dk; @@ -75,6 +77,8 @@ public class SegmentFlushTest public static void init() { DatabaseDescriptor.daemonInitialization(); + DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance); + StorageService.instance.setPartitionerUnsafe(Murmur3Partitioner.instance); } @After @@ -106,6 +110,7 @@ public class SegmentFlushTest { Path tmpDir = Files.createTempDirectory("SegmentFlushTest"); IndexDescriptor indexDescriptor = IndexDescriptor.create(new Descriptor(new File(tmpDir.toFile()), "ks", "cf", new SequenceBasedSSTableId(1)), + Murmur3Partitioner.instance, SAITester.EMPTY_COMPARATOR); ColumnMetadata column = ColumnMetadata.regularColumn("sai", "internal", "column", UTF8Type.instance); @@ -114,6 +119,7 @@ public class SegmentFlushTest IndexContext indexContext = new IndexContext("ks", "cf", UTF8Type.instance, + Murmur3Partitioner.instance, new ClusteringComparator(), column, IndexTarget.Type.SIMPLE, @@ -127,13 +133,13 @@ public class SegmentFlushTest DecoratedKey key1 = keys.get(0); ByteBuffer term1 = UTF8Type.instance.decompose("a"); Row row1 = createRow(column, term1); - writer.addRow(SAITester.TEST_FACTORY.create(key1, Clustering.EMPTY), row1, sstableRowId1); + writer.addRow(SAITester.TEST_FACTORY.create(key1), row1, sstableRowId1); // expect a flush if exceed max rowId per segment DecoratedKey key2 = keys.get(1); ByteBuffer term2 = UTF8Type.instance.decompose("b"); Row row2 = createRow(column, term2); - writer.addRow(SAITester.TEST_FACTORY.create(key2, Clustering.EMPTY), row2, sstableRowId2); + writer.addRow(SAITester.TEST_FACTORY.create(key2), row2, sstableRowId2); writer.complete(Stopwatch.createStarted()); @@ -147,8 +153,8 @@ public class SegmentFlushTest segmentRowIdOffset = sstableRowId1; posting1 = 0; posting2 = segments == 1 ? (int) (sstableRowId2 - segmentRowIdOffset) : 0; - minKey = SAITester.TEST_FACTORY.createTokenOnly(key1.getToken()); - maxKey = segments == 1 ? SAITester.TEST_FACTORY.createTokenOnly(key2.getToken()) : minKey; + minKey = SAITester.TEST_FACTORY.create(key1.getToken()); + maxKey = segments == 1 ? SAITester.TEST_FACTORY.create(key2.getToken()) : minKey; minTerm = term1; maxTerm = segments == 1 ? term2 : term1; numRows = segments == 1 ? 2 : 1; @@ -160,7 +166,7 @@ public class SegmentFlushTest segmentRowIdOffset = sstableRowId2; posting1 = 0; posting2 = 0; - minKey = SAITester.TEST_FACTORY.createTokenOnly(key2.getToken()); + minKey = SAITester.TEST_FACTORY.create(key2.getToken()); maxKey = minKey; minTerm = term2; maxTerm = term2; diff --git a/test/unit/org/apache/cassandra/index/sai/disk/v1/WideRowPrimaryKeyTest.java b/test/unit/org/apache/cassandra/index/sai/disk/v1/WideRowPrimaryKeyTest.java index 767ac6fa0f..9a6a4dcb15 100644 --- a/test/unit/org/apache/cassandra/index/sai/disk/v1/WideRowPrimaryKeyTest.java +++ b/test/unit/org/apache/cassandra/index/sai/disk/v1/WideRowPrimaryKeyTest.java @@ -23,6 +23,7 @@ import java.util.Arrays; import org.junit.Test; import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.index.sai.disk.PrimaryKeyMap; import org.apache.cassandra.index.sai.disk.format.IndexDescriptor; import org.apache.cassandra.index.sai.utils.AbstractPrimaryKeyTester; @@ -42,7 +43,7 @@ public class WideRowPrimaryKeyTest extends AbstractPrimaryKeyTester SSTableComponentsWriter writer = new SSTableComponentsWriter(indexDescriptor); - PrimaryKey.Factory factory = new PrimaryKey.Factory(compositePartitionMultipleClusteringAsc.comparator); + PrimaryKey.Factory factory = new PrimaryKey.Factory(Murmur3Partitioner.instance, compositePartitionMultipleClusteringAsc.comparator); int rows = nextInt(1000, 10000); PrimaryKey[] keys = new PrimaryKey[rows]; diff --git a/test/unit/org/apache/cassandra/index/sai/disk/v1/bbtree/BlockBalancedTreeIndexBuilder.java b/test/unit/org/apache/cassandra/index/sai/disk/v1/bbtree/BlockBalancedTreeIndexBuilder.java index b69e5155c8..a62fe3991a 100644 --- a/test/unit/org/apache/cassandra/index/sai/disk/v1/bbtree/BlockBalancedTreeIndexBuilder.java +++ b/test/unit/org/apache/cassandra/index/sai/disk/v1/bbtree/BlockBalancedTreeIndexBuilder.java @@ -65,12 +65,12 @@ public class BlockBalancedTreeIndexBuilder { public static final PrimaryKeyMap TEST_PRIMARY_KEY_MAP = new PrimaryKeyMap() { - private final PrimaryKey.Factory primaryKeyFactory = new PrimaryKey.Factory(null); + private final PrimaryKey.Factory primaryKeyFactory = new PrimaryKey.Factory(Murmur3Partitioner.instance, null); @Override public PrimaryKey primaryKeyFromRowId(long sstableRowId) { - return primaryKeyFactory.createTokenOnly(new Murmur3Partitioner.LongToken(sstableRowId)); + return primaryKeyFactory.create(new Murmur3Partitioner.LongToken(sstableRowId)); } @Override @@ -122,8 +122,8 @@ public class BlockBalancedTreeIndexBuilder minSegmentRowId, maxSegmentRowId, // min/max is unused for now - SAITester.TEST_FACTORY.createTokenOnly(Murmur3Partitioner.instance.decorateKey(UTF8Type.instance.fromString("a")).getToken()), - SAITester.TEST_FACTORY.createTokenOnly(Murmur3Partitioner.instance.decorateKey(UTF8Type.instance.fromString("b")).getToken()), + SAITester.TEST_FACTORY.create(Murmur3Partitioner.instance.decorateKey(UTF8Type.instance.fromString("a")).getToken()), + SAITester.TEST_FACTORY.create(Murmur3Partitioner.instance.decorateKey(UTF8Type.instance.fromString("b")).getToken()), UTF8Type.instance.fromString("c"), UTF8Type.instance.fromString("d"), indexMetas); diff --git a/test/unit/org/apache/cassandra/index/sai/disk/v1/keystore/KeyLookupTest.java b/test/unit/org/apache/cassandra/index/sai/disk/v1/keystore/KeyLookupTest.java index bd522bde5b..92a31f87f7 100644 --- a/test/unit/org/apache/cassandra/index/sai/disk/v1/keystore/KeyLookupTest.java +++ b/test/unit/org/apache/cassandra/index/sai/disk/v1/keystore/KeyLookupTest.java @@ -29,7 +29,6 @@ import java.util.Map; import org.junit.Before; import org.junit.Test; -import org.apache.cassandra.db.Clustering; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.db.marshal.UTF8Type; @@ -75,7 +74,7 @@ public class KeyLookupTest extends SAIRandomizedTester { ByteBuffer buffer = UTF8Type.instance.decompose(Integer.toString(x)); DecoratedKey partitionKey = Murmur3Partitioner.instance.decorateKey(buffer); - PrimaryKey primaryKey = SAITester.TEST_FACTORY.create(partitionKey, Clustering.EMPTY); + PrimaryKey primaryKey = SAITester.TEST_FACTORY.create(partitionKey); primaryKeys.add(primaryKey); } @@ -316,35 +315,18 @@ public class KeyLookupTest extends SAIRandomizedTester // iterate ascending withKeyLookupCursor(cursor -> { for (int x = 0; x < keys.size(); x++) - { - ByteComparable key = cursor.seekToPointId(x); - - byte[] bytes = ByteSourceInverse.readBytes(key.asComparableBytes(ByteComparable.Version.OSS50)); - - assertArrayEquals(keys.get(x), bytes); - } + assertArrayEquals(keys.get(x), ByteSourceInverse.readBytes(cursor.seekToPointId(x))); }); // iterate ascending skipping blocks withKeyLookupCursor(cursor -> { for (int x = 0; x < keys.size(); x += 17) - { - ByteComparable key = cursor.seekToPointId(x); - - byte[] bytes = ByteSourceInverse.readBytes(key.asComparableBytes(ByteComparable.Version.OSS50)); - - assertArrayEquals(keys.get(x), bytes); - } + assertArrayEquals(keys.get(x), ByteSourceInverse.readBytes(cursor.seekToPointId(x))); }); withKeyLookupCursor(cursor -> { - ByteComparable key = cursor.seekToPointId(7); - byte[] bytes = ByteSourceInverse.readBytes(key.asComparableBytes(ByteComparable.Version.OSS50)); - assertArrayEquals(keys.get(7), bytes); - - key = cursor.seekToPointId(7); - bytes = ByteSourceInverse.readBytes(key.asComparableBytes(ByteComparable.Version.OSS50)); - assertArrayEquals(keys.get(7), bytes); + assertArrayEquals(keys.get(7), ByteSourceInverse.readBytes(cursor.seekToPointId(7))); + assertArrayEquals(keys.get(7), ByteSourceInverse.readBytes(cursor.seekToPointId(7))); }); } diff --git a/test/unit/org/apache/cassandra/index/sai/iterators/KeyRangeConcatIteratorTest.java b/test/unit/org/apache/cassandra/index/sai/iterators/KeyRangeConcatIteratorTest.java index 6f1afe7eb9..821942588e 100644 --- a/test/unit/org/apache/cassandra/index/sai/iterators/KeyRangeConcatIteratorTest.java +++ b/test/unit/org/apache/cassandra/index/sai/iterators/KeyRangeConcatIteratorTest.java @@ -35,7 +35,7 @@ import static org.junit.Assert.assertTrue; public class KeyRangeConcatIteratorTest extends AbstractKeyRangeIteratorTester { - PrimaryKey.Factory primaryKeyFactory = new PrimaryKey.Factory(null); + PrimaryKey.Factory primaryKeyFactory = new PrimaryKey.Factory(Murmur3Partitioner.instance, null); @Test public void testValidation() { @@ -427,7 +427,7 @@ public class KeyRangeConcatIteratorTest extends AbstractKeyRangeIteratorTester private String createErrorMessage(int max, int min) { return String.format(KeyRangeConcatIterator.MUST_BE_SORTED_ERROR, - primaryKeyFactory.createTokenOnly(new Murmur3Partitioner.LongToken(max)), - primaryKeyFactory.createTokenOnly(new Murmur3Partitioner.LongToken(min))); + primaryKeyFactory.create(new Murmur3Partitioner.LongToken(max)), + primaryKeyFactory.create(new Murmur3Partitioner.LongToken(min))); } } diff --git a/test/unit/org/apache/cassandra/index/sai/iterators/LongIterator.java b/test/unit/org/apache/cassandra/index/sai/iterators/LongIterator.java index 9a271f79d1..5c4122f99d 100644 --- a/test/unit/org/apache/cassandra/index/sai/iterators/LongIterator.java +++ b/test/unit/org/apache/cassandra/index/sai/iterators/LongIterator.java @@ -92,7 +92,7 @@ public class LongIterator extends KeyRangeIterator public static PrimaryKey fromToken(long token) { - return SAITester.TEST_FACTORY.createTokenOnly(new Murmur3Partitioner.LongToken(token)); + return SAITester.TEST_FACTORY.create(new Murmur3Partitioner.LongToken(token)); } diff --git a/test/unit/org/apache/cassandra/index/sai/memory/AbstractInMemoryKeyRangeIteratorTester.java b/test/unit/org/apache/cassandra/index/sai/memory/AbstractInMemoryKeyRangeIteratorTester.java index cbf5d05675..3c21941a2e 100644 --- a/test/unit/org/apache/cassandra/index/sai/memory/AbstractInMemoryKeyRangeIteratorTester.java +++ b/test/unit/org/apache/cassandra/index/sai/memory/AbstractInMemoryKeyRangeIteratorTester.java @@ -35,7 +35,7 @@ public abstract class AbstractInMemoryKeyRangeIteratorTester @Before public void setup() { - primaryKeyFactory = new PrimaryKey.Factory(SAITester.EMPTY_COMPARATOR); + primaryKeyFactory = new PrimaryKey.Factory(Murmur3Partitioner.instance, SAITester.EMPTY_COMPARATOR); } @Test @@ -99,7 +99,7 @@ public abstract class AbstractInMemoryKeyRangeIteratorTester { KeyRangeIterator iterator = makeIterator(1, 3, 1, 2, 3); - iterator.skipTo(primaryKeyFactory.createTokenOnly(new Murmur3Partitioner.LongToken(0))); + iterator.skipTo(primaryKeyFactory.create(new Murmur3Partitioner.LongToken(0))); assertIterator(iterator, 1, 2, 3); } @@ -109,7 +109,7 @@ public abstract class AbstractInMemoryKeyRangeIteratorTester { KeyRangeIterator iterator = makeIterator(1, 3, 1, 2, 3); - iterator.skipTo(primaryKeyFactory.createTokenOnly(new Murmur3Partitioner.LongToken(1))); + iterator.skipTo(primaryKeyFactory.create(new Murmur3Partitioner.LongToken(1))); assertIterator(iterator, 1, 2, 3); } @@ -119,7 +119,7 @@ public abstract class AbstractInMemoryKeyRangeIteratorTester { KeyRangeIterator iterator = makeIterator(1, 3, 1, 2, 3); - iterator.skipTo(primaryKeyFactory.createTokenOnly(new Murmur3Partitioner.LongToken(2))); + iterator.skipTo(primaryKeyFactory.create(new Murmur3Partitioner.LongToken(2))); assertIterator(iterator, 2, 3); } @@ -129,7 +129,7 @@ public abstract class AbstractInMemoryKeyRangeIteratorTester { KeyRangeIterator iterator = makeIterator(1, 3, 1, 2, 3); - iterator.skipTo(primaryKeyFactory.createTokenOnly(new Murmur3Partitioner.LongToken(3))); + iterator.skipTo(primaryKeyFactory.create(new Murmur3Partitioner.LongToken(3))); assertIterator(iterator, 3); } @@ -139,7 +139,7 @@ public abstract class AbstractInMemoryKeyRangeIteratorTester { KeyRangeIterator iterator = makeIterator(1, 3, 1, 2, 3); - iterator.skipTo(primaryKeyFactory.createTokenOnly(new Murmur3Partitioner.LongToken(4))); + iterator.skipTo(primaryKeyFactory.create(new Murmur3Partitioner.LongToken(4))); assertIterator(iterator); } @@ -149,7 +149,7 @@ public abstract class AbstractInMemoryKeyRangeIteratorTester { KeyRangeIterator iterator = makeIterator(1, 3, 1, 1, 2, 2, 3, 3); - iterator.skipTo(primaryKeyFactory.createTokenOnly(new Murmur3Partitioner.LongToken(2))); + iterator.skipTo(primaryKeyFactory.create(new Murmur3Partitioner.LongToken(2))); assertIterator(iterator, 2, 3); } @@ -168,6 +168,6 @@ public abstract class AbstractInMemoryKeyRangeIteratorTester protected PrimaryKey keyForToken(long token) { - return primaryKeyFactory.createTokenOnly(new Murmur3Partitioner.LongToken(token)); + return primaryKeyFactory.create(new Murmur3Partitioner.LongToken(token)); } } diff --git a/test/unit/org/apache/cassandra/index/sai/memory/PriorityInMemoryKeyRangeIteratorTest.java b/test/unit/org/apache/cassandra/index/sai/memory/PriorityInMemoryKeyRangeIteratorTest.java index 707db7f1e9..5864c79b55 100644 --- a/test/unit/org/apache/cassandra/index/sai/memory/PriorityInMemoryKeyRangeIteratorTest.java +++ b/test/unit/org/apache/cassandra/index/sai/memory/PriorityInMemoryKeyRangeIteratorTest.java @@ -33,8 +33,8 @@ public class PriorityInMemoryKeyRangeIteratorTest extends AbstractInMemoryKeyRan Arrays.stream(tokens).forEach(t -> queue.add(keyForToken(t))); - return new InMemoryKeyRangeIterator(primaryKeyFactory.createTokenOnly(new Murmur3Partitioner.LongToken(minimumTokenValue)), - primaryKeyFactory.createTokenOnly(new Murmur3Partitioner.LongToken(maximumTokenValue)), + return new InMemoryKeyRangeIterator(primaryKeyFactory.create(new Murmur3Partitioner.LongToken(minimumTokenValue)), + primaryKeyFactory.create(new Murmur3Partitioner.LongToken(maximumTokenValue)), queue); } } diff --git a/test/unit/org/apache/cassandra/index/sai/memory/TrieMemoryIndexTest.java b/test/unit/org/apache/cassandra/index/sai/memory/TrieMemoryIndexTest.java index 9cf8ff5318..3277d79dc1 100644 --- a/test/unit/org/apache/cassandra/index/sai/memory/TrieMemoryIndexTest.java +++ b/test/unit/org/apache/cassandra/index/sai/memory/TrieMemoryIndexTest.java @@ -245,6 +245,7 @@ public class TrieMemoryIndexTest extends SAIRandomizedTester indexContext = new IndexContext(table.keyspace, table.name, table.partitionKeyType, + table.partitioner, table.comparator, target.left, target.right, diff --git a/test/unit/org/apache/cassandra/index/sai/utils/AbstractPrimaryKeyTester.java b/test/unit/org/apache/cassandra/index/sai/utils/AbstractPrimaryKeyTester.java index a67cf97d9a..be887bc64c 100644 --- a/test/unit/org/apache/cassandra/index/sai/utils/AbstractPrimaryKeyTester.java +++ b/test/unit/org/apache/cassandra/index/sai/utils/AbstractPrimaryKeyTester.java @@ -28,7 +28,6 @@ import org.apache.cassandra.db.marshal.ReversedType; import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.schema.TableMetadata; -import org.apache.cassandra.utils.bytecomparable.ByteComparable; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; @@ -53,6 +52,13 @@ public class AbstractPrimaryKeyTester extends SAIRandomizedTester .addClusteringColumn("ck1", UTF8Type.instance) .build(); + protected static final TableMetadata simplePartitionStaticAndSingleClusteringAsc = TableMetadata.builder("test", "test") + .partitioner(Murmur3Partitioner.instance) + .addPartitionKeyColumn("pk1", Int32Type.instance) + .addStaticColumn("sk1", Int32Type.instance) + .addClusteringColumn("ck1", UTF8Type.instance) + .build(); + protected static final TableMetadata simplePartitionMultipleClusteringAsc = TableMetadata.builder("test", "test") .partitioner(Murmur3Partitioner.instance) .addPartitionKeyColumn("pk1", Int32Type.instance) @@ -118,13 +124,6 @@ public class AbstractPrimaryKeyTester extends SAIRandomizedTester .addClusteringColumn("ck2", ReversedType.getInstance(UTF8Type.instance)) .build(); - protected void assertByteComparison(PrimaryKey a, PrimaryKey b, int expected) - { - assertEquals(expected, ByteComparable.compare(a::asComparableBytes, - b::asComparableBytes, - ByteComparable.Version.OSS50)); - } - protected void assertCompareToAndEquals(PrimaryKey a, PrimaryKey b, int expected) { if (expected > 0) diff --git a/test/unit/org/apache/cassandra/index/sai/utils/IndexInputLeakDetector.java b/test/unit/org/apache/cassandra/index/sai/utils/IndexInputLeakDetector.java index 00518bb972..35e556f763 100644 --- a/test/unit/org/apache/cassandra/index/sai/utils/IndexInputLeakDetector.java +++ b/test/unit/org/apache/cassandra/index/sai/utils/IndexInputLeakDetector.java @@ -41,7 +41,7 @@ public class IndexInputLeakDetector extends TestRuleAdapter { TrackingIndexFileUtils trackingIndexFileUtils = new TrackingIndexFileUtils(sequentialWriterOption); trackedIndexFileUtils.add(trackingIndexFileUtils); - return IndexDescriptor.create(descriptor, tableMetadata.comparator); + return IndexDescriptor.create(descriptor, tableMetadata.partitioner, tableMetadata.comparator); } @Override diff --git a/test/unit/org/apache/cassandra/index/sai/utils/PrimaryKeyTest.java b/test/unit/org/apache/cassandra/index/sai/utils/PrimaryKeyTest.java index 9b9f738a4e..c64372feab 100644 --- a/test/unit/org/apache/cassandra/index/sai/utils/PrimaryKeyTest.java +++ b/test/unit/org/apache/cassandra/index/sai/utils/PrimaryKeyTest.java @@ -23,17 +23,18 @@ import java.util.Arrays; import org.junit.Test; import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.dht.Murmur3Partitioner; public class PrimaryKeyTest extends AbstractPrimaryKeyTester { @Test public void singlePartitionTest() { - PrimaryKey.Factory factory = new PrimaryKey.Factory(simplePartition.comparator); + PrimaryKey.Factory factory = new PrimaryKey.Factory(Murmur3Partitioner.instance, simplePartition.comparator); int rows = nextInt(10, 100); PrimaryKey[] keys = new PrimaryKey[rows]; for (int index = 0; index < rows; index++) - keys[index] = factory.create(makeKey(simplePartition, index), Clustering.EMPTY); + keys[index] = factory.create(makeKey(simplePartition, index)); Arrays.sort(keys); @@ -43,11 +44,11 @@ public class PrimaryKeyTest extends AbstractPrimaryKeyTester @Test public void compositePartitionTest() { - PrimaryKey.Factory factory = new PrimaryKey.Factory(compositePartition.comparator); + PrimaryKey.Factory factory = new PrimaryKey.Factory(Murmur3Partitioner.instance, compositePartition.comparator); int rows = nextInt(10, 100); PrimaryKey[] keys = new PrimaryKey[rows]; for (int index = 0; index < rows; index++) - keys[index] = factory.create(makeKey(compositePartition, index, index + 1), Clustering.EMPTY); + keys[index] = factory.create(makeKey(compositePartition, index, index + 1)); Arrays.sort(keys); @@ -57,7 +58,7 @@ public class PrimaryKeyTest extends AbstractPrimaryKeyTester @Test public void simplePartitonSingleClusteringAscTest() { - PrimaryKey.Factory factory = new PrimaryKey.Factory(simplePartitionSingleClusteringAsc.comparator); + PrimaryKey.Factory factory = new PrimaryKey.Factory(Murmur3Partitioner.instance, simplePartitionSingleClusteringAsc.comparator); int rows = nextInt(10, 100); PrimaryKey[] keys = new PrimaryKey[rows]; int partition = 0; @@ -78,10 +79,40 @@ public class PrimaryKeyTest extends AbstractPrimaryKeyTester compareToAndEqualsTests(factory, keys); } + @Test + public void simplePartitonStaticAndSingleClusteringAscTest() + { + PrimaryKey.Factory factory = new PrimaryKey.Factory(Murmur3Partitioner.instance, simplePartitionStaticAndSingleClusteringAsc.comparator); + int rows = nextInt(10, 100); + PrimaryKey[] keys = new PrimaryKey[rows]; + int partition = 0; + int clustering = 0; + for (int index = 0; index < rows; index++) + { + if (clustering == 0) + { + keys[index] = factory.create(makeKey(simplePartitionSingleClusteringAsc, partition), Clustering.STATIC_CLUSTERING); + clustering++; + } + else + keys[index] = factory.create(makeKey(simplePartitionSingleClusteringAsc, partition), + makeClustering(simplePartitionSingleClusteringAsc, Integer.toString(clustering++))); + if (clustering == 5) + { + clustering = 0; + partition++; + } + } + + Arrays.sort(keys); + + compareToAndEqualsTests(factory, keys); + } + @Test public void simplePartitionMultipleClusteringAscTest() { - PrimaryKey.Factory factory = new PrimaryKey.Factory(simplePartitionMultipleClusteringAsc.comparator); + PrimaryKey.Factory factory = new PrimaryKey.Factory(Murmur3Partitioner.instance, simplePartitionMultipleClusteringAsc.comparator); int rows = nextInt(100, 1000); PrimaryKey[] keys = new PrimaryKey[rows]; int partition = 0; @@ -111,7 +142,7 @@ public class PrimaryKeyTest extends AbstractPrimaryKeyTester @Test public void simplePartitonSingleClusteringDescTest() { - PrimaryKey.Factory factory = new PrimaryKey.Factory(simplePartitionSingleClusteringDesc.comparator); + PrimaryKey.Factory factory = new PrimaryKey.Factory(Murmur3Partitioner.instance, simplePartitionSingleClusteringDesc.comparator); int rows = nextInt(10, 100); PrimaryKey[] keys = new PrimaryKey[rows]; int partition = 0; @@ -135,7 +166,7 @@ public class PrimaryKeyTest extends AbstractPrimaryKeyTester @Test public void simplePartitionMultipleClusteringDescTest() { - PrimaryKey.Factory factory = new PrimaryKey.Factory(simplePartitionMultipleClusteringDesc.comparator); + PrimaryKey.Factory factory = new PrimaryKey.Factory(Murmur3Partitioner.instance, simplePartitionMultipleClusteringDesc.comparator); int rows = nextInt(100, 1000); PrimaryKey[] keys = new PrimaryKey[rows]; int partition = 0; @@ -165,7 +196,7 @@ public class PrimaryKeyTest extends AbstractPrimaryKeyTester @Test public void compositePartitionSingleClusteringAscTest() { - PrimaryKey.Factory factory = new PrimaryKey.Factory(compositePartitionSingleClusteringAsc.comparator); + PrimaryKey.Factory factory = new PrimaryKey.Factory(Murmur3Partitioner.instance, compositePartitionSingleClusteringAsc.comparator); int rows = nextInt(10, 100); PrimaryKey[] keys = new PrimaryKey[rows]; int partition = 0; @@ -189,7 +220,7 @@ public class PrimaryKeyTest extends AbstractPrimaryKeyTester @Test public void compositePartitionMultipleClusteringAscTest() { - PrimaryKey.Factory factory = new PrimaryKey.Factory(compositePartitionMultipleClusteringAsc.comparator); + PrimaryKey.Factory factory = new PrimaryKey.Factory(Murmur3Partitioner.instance, compositePartitionMultipleClusteringAsc.comparator); int rows = nextInt(100, 1000); PrimaryKey[] keys = new PrimaryKey[rows]; int partition = 0; @@ -219,7 +250,7 @@ public class PrimaryKeyTest extends AbstractPrimaryKeyTester @Test public void compositePartitionSingleClusteringDescTest() { - PrimaryKey.Factory factory = new PrimaryKey.Factory(compositePartitionSingleClusteringDesc.comparator); + PrimaryKey.Factory factory = new PrimaryKey.Factory(Murmur3Partitioner.instance, compositePartitionSingleClusteringDesc.comparator); int rows = nextInt(10, 100); PrimaryKey[] keys = new PrimaryKey[rows]; int partition = 0; @@ -243,7 +274,7 @@ public class PrimaryKeyTest extends AbstractPrimaryKeyTester @Test public void compositePartitionMultipleClusteringDescTest() { - PrimaryKey.Factory factory = new PrimaryKey.Factory(compositePartitionMultipleClusteringDesc.comparator); + PrimaryKey.Factory factory = new PrimaryKey.Factory(Murmur3Partitioner.instance, compositePartitionMultipleClusteringDesc.comparator); int rows = nextInt(100, 1000); PrimaryKey[] keys = new PrimaryKey[rows]; int partition = 0; @@ -273,7 +304,7 @@ public class PrimaryKeyTest extends AbstractPrimaryKeyTester @Test public void simplePartitionMultipleClusteringMixedTest() { - PrimaryKey.Factory factory = new PrimaryKey.Factory(simplePartitionMultipleClusteringMixed.comparator); + PrimaryKey.Factory factory = new PrimaryKey.Factory(Murmur3Partitioner.instance, simplePartitionMultipleClusteringMixed.comparator); int rows = nextInt(100, 1000); PrimaryKey[] keys = new PrimaryKey[rows]; int partition = 0; @@ -303,7 +334,7 @@ public class PrimaryKeyTest extends AbstractPrimaryKeyTester @Test public void compositePartitionMultipleClusteringMixedTest() { - PrimaryKey.Factory factory = new PrimaryKey.Factory(compositePartitionMultipleClusteringMixed.comparator); + PrimaryKey.Factory factory = new PrimaryKey.Factory(Murmur3Partitioner.instance, compositePartitionMultipleClusteringMixed.comparator); int rows = nextInt(100, 1000); PrimaryKey[] keys = new PrimaryKey[rows]; int partition = 0; @@ -335,7 +366,7 @@ public class PrimaryKeyTest extends AbstractPrimaryKeyTester for (int index = 0; index < keys.length - 1; index++) { PrimaryKey key = keys[index]; - PrimaryKey tokenOnlyKey = factory.createTokenOnly(key.token()); + PrimaryKey tokenOnlyKey = factory.create(key.token()); assertCompareToAndEquals(tokenOnlyKey, key, 0); assertCompareToAndEquals(key, key, 0); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org