This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch cassandra-5.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-5.0 by this push:
     new 9ce86e0ff8 SAI result retriever is filtering too many rows
9ce86e0ff8 is described below

commit 9ce86e0ff8b6344b528a0640f9dafa23f97dd85a
Author: Mike Adamson <madam...@datastax.com>
AuthorDate: Tue Aug 8 17:07:01 2023 +0100

    SAI result retriever is filtering too many rows
    
    This patch fixes a bug in the SegmentMetadata that
    was only storing the partition key for min and max
    primary keys for a segment. It also contains some
    refactoring of the PrimaryKey to remove the deferred
    loading of PrimaryKeys by the PrimaryKeyMaps.
    
    Patch by Mike Adamson; reviewed by Caleb Rackliffe and Andrés de la Peña 
for CASSANDRA-18734
---
 CHANGES.txt                                        |   1 +
 .../apache/cassandra/index/sai/IndexContext.java   |   9 +-
 .../cassandra/index/sai/StorageAttachedIndex.java  |   1 +
 .../index/sai/StorageAttachedIndexGroup.java       |   2 +-
 .../cassandra/index/sai/disk/PrimaryKeyMap.java    |   3 +-
 .../index/sai/disk/StorageAttachedIndexWriter.java |   3 +-
 .../index/sai/disk/format/IndexDescriptor.java     |  11 +-
 .../index/sai/disk/v1/MemtableIndexWriter.java     |  14 +-
 .../index/sai/disk/v1/SkinnyPrimaryKeyMap.java     |  40 +--
 .../index/sai/disk/v1/WidePrimaryKeyMap.java       |  37 +--
 .../index/sai/disk/v1/keystore/KeyLookup.java      |   6 +-
 .../index/sai/disk/v1/segment/SegmentMetadata.java |  33 +-
 .../index/sai/memory/TrieMemoryIndex.java          |   3 +-
 .../cassandra/index/sai/plan/QueryController.java  |  13 +-
 .../sai/plan/StorageAttachedIndexSearcher.java     |  24 +-
 .../cassandra/index/sai/utils/PrimaryKey.java      | 368 ++++++++++++++-------
 .../test/microbench/sai/KeyLookupBench.java        |   4 +-
 .../org/apache/cassandra/index/sai/SAITester.java  |   4 +-
 .../index/sai/disk/format/IndexDescriptorTest.java |   5 +-
 .../sai/disk/v1/InvertedIndexSearcherTest.java     |  10 +-
 .../index/sai/disk/v1/SegmentFlushTest.java        |  16 +-
 .../index/sai/disk/v1/WideRowPrimaryKeyTest.java   |   3 +-
 .../v1/bbtree/BlockBalancedTreeIndexBuilder.java   |   8 +-
 .../index/sai/disk/v1/keystore/KeyLookupTest.java  |  28 +-
 .../sai/iterators/KeyRangeConcatIteratorTest.java  |   6 +-
 .../index/sai/iterators/LongIterator.java          |   2 +-
 .../AbstractInMemoryKeyRangeIteratorTester.java    |  16 +-
 .../PriorityInMemoryKeyRangeIteratorTest.java      |   4 +-
 .../index/sai/memory/TrieMemoryIndexTest.java      |   1 +
 .../index/sai/utils/AbstractPrimaryKeyTester.java  |  15 +-
 .../index/sai/utils/IndexInputLeakDetector.java    |   2 +-
 .../cassandra/index/sai/utils/PrimaryKeyTest.java  |  61 +++-
 32 files changed, 449 insertions(+), 304 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 953ca7f3dc..c65a1e6671 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.0-alpha2
+ * Fix SAI's SegmentMetadata min and max primary keys (CASSANDRA-18734)
  * Remove commons-codec dependency (CASSANDRA-18772)
 Merged from 4.1:
 Merged from 4.0:
diff --git a/src/java/org/apache/cassandra/index/sai/IndexContext.java 
b/src/java/org/apache/cassandra/index/sai/IndexContext.java
index ac33c837da..61eb844b02 100644
--- a/src/java/org/apache/cassandra/index/sai/IndexContext.java
+++ b/src/java/org/apache/cassandra/index/sai/IndexContext.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.marshal.UUIDType;
 import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.index.sai.analyzer.AbstractAnalyzer;
 import org.apache.cassandra.index.sai.disk.SSTableIndex;
 import org.apache.cassandra.index.sai.disk.format.Version;
@@ -96,6 +97,7 @@ public class IndexContext
     public IndexContext(String keyspace,
                         String table,
                         AbstractType<?> partitionKeyType,
+                        IPartitioner partitioner,
                         ClusteringComparator clusteringComparator,
                         ColumnMetadata columnMetadata,
                         IndexTarget.Type indexType,
@@ -108,7 +110,7 @@ public class IndexContext
         this.columnMetadata = Objects.requireNonNull(columnMetadata);
         this.indexType = Objects.requireNonNull(indexType);
         this.validator = TypeUtil.cellValueType(columnMetadata, indexType);
-        this.primaryKeyFactory = new PrimaryKey.Factory(clusteringComparator);
+        this.primaryKeyFactory = new PrimaryKey.Factory(partitioner, 
clusteringComparator);
 
         this.indexMetadata = indexMetadata;
         this.memtableIndexManager = indexMetadata == null ? null : new 
MemtableIndexManager(this);
@@ -122,6 +124,11 @@ public class IndexContext
                                                      : 
AbstractAnalyzer.fromOptions(getValidator(), indexMetadata.options);
     }
 
+    public boolean hasClustering()
+    {
+        return clusteringComparator.size() > 0;
+    }
+
     public AbstractType<?> keyValidator()
     {
         return partitionKeyType;
diff --git a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java 
b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java
index 805a6b65e1..a0cdf9c3ac 100644
--- a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java
+++ b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java
@@ -178,6 +178,7 @@ public class StorageAttachedIndex implements Index
         this.indexContext = new IndexContext(tableMetadata.keyspace,
                                              tableMetadata.name,
                                              tableMetadata.partitionKeyType,
+                                             tableMetadata.partitioner,
                                              tableMetadata.comparator,
                                              target.left,
                                              target.right,
diff --git 
a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java 
b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java
index 7ce084d9db..de4c8b3570 100644
--- a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java
+++ b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java
@@ -191,7 +191,7 @@ public class StorageAttachedIndexGroup implements 
Index.Group, INotificationCons
     @Override
     public SSTableFlushObserver getFlushObserver(Descriptor descriptor, 
LifecycleNewTracker tracker, TableMetadata tableMetadata)
     {
-        IndexDescriptor indexDescriptor = IndexDescriptor.create(descriptor, 
tableMetadata.comparator);
+        IndexDescriptor indexDescriptor = IndexDescriptor.create(descriptor, 
tableMetadata.partitioner, tableMetadata.comparator);
         try
         {
             return 
StorageAttachedIndexWriter.createFlushObserverWriter(indexDescriptor, indexes, 
tracker);
diff --git a/src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyMap.java 
b/src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyMap.java
index fa47b9f4a4..330acf6954 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyMap.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyMap.java
@@ -22,6 +22,7 @@ import java.io.Closeable;
 import java.io.IOException;
 
 import javax.annotation.concurrent.NotThreadSafe;
+import javax.annotation.concurrent.ThreadSafe;
 
 import org.apache.cassandra.index.sai.utils.PrimaryKey;
 
@@ -36,7 +37,7 @@ public interface PrimaryKeyMap extends Closeable
      * A factory for creating {@link PrimaryKeyMap} instances. Implementations 
of this
      * interface are expected to be threadsafe.
      */
-    @NotThreadSafe
+    @ThreadSafe
     interface Factory extends Closeable
     {
         /**
diff --git 
a/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java 
b/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java
index f44a68bde1..f35ae67f93 100644
--- 
a/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java
+++ 
b/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java
@@ -247,7 +247,8 @@ public class StorageAttachedIndexWriter implements 
SSTableFlushObserver
 
     private void addRow(Row row) throws IOException, 
InMemoryTrie.SpaceExhaustedException
     {
-        PrimaryKey primaryKey = 
indexDescriptor.primaryKeyFactory.create(currentKey, row.clustering());
+        PrimaryKey primaryKey = indexDescriptor.hasClustering() ? 
indexDescriptor.primaryKeyFactory.create(currentKey, row.clustering())
+                                                                : 
indexDescriptor.primaryKeyFactory.create(currentKey);
         perSSTableWriter.nextRow(primaryKey);
         rowMapping.add(primaryKey, sstableRowId);
 
diff --git 
a/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java 
b/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java
index 37364f6448..eda5cb7ed3 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ClusteringComparator;
 import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.index.sai.IndexContext;
 import org.apache.cassandra.index.sai.IndexValidation;
 import org.apache.cassandra.index.sai.SSTableContext;
@@ -74,17 +75,17 @@ public class IndexDescriptor
     public final ClusteringComparator clusteringComparator;
     public final PrimaryKey.Factory primaryKeyFactory;
 
-    private IndexDescriptor(Version version, Descriptor sstableDescriptor, 
ClusteringComparator clusteringComparator)
+    private IndexDescriptor(Version version, Descriptor sstableDescriptor, 
IPartitioner partitioner, ClusteringComparator clusteringComparator)
     {
         this.version = version;
         this.sstableDescriptor = sstableDescriptor;
         this.clusteringComparator = clusteringComparator;
-        this.primaryKeyFactory = new PrimaryKey.Factory(clusteringComparator);
+        this.primaryKeyFactory = new PrimaryKey.Factory(partitioner, 
clusteringComparator);
     }
 
-    public static IndexDescriptor create(Descriptor descriptor, 
ClusteringComparator clusteringComparator)
+    public static IndexDescriptor create(Descriptor descriptor, IPartitioner 
partitioner, ClusteringComparator clusteringComparator)
     {
-        return new IndexDescriptor(Version.LATEST, descriptor, 
clusteringComparator);
+        return new IndexDescriptor(Version.LATEST, descriptor, partitioner, 
clusteringComparator);
     }
 
     public static IndexDescriptor create(SSTableReader sstable)
@@ -93,6 +94,7 @@ public class IndexDescriptor
         {
             IndexDescriptor indexDescriptor = new IndexDescriptor(version,
                                                                   
sstable.descriptor,
+                                                                  
sstable.getPartitioner(),
                                                                   
sstable.metadata().comparator);
 
             if 
(version.onDiskFormat().isPerSSTableIndexBuildComplete(indexDescriptor))
@@ -102,6 +104,7 @@ public class IndexDescriptor
         }
         return new IndexDescriptor(Version.LATEST,
                                    sstable.descriptor,
+                                   sstable.getPartitioner(),
                                    sstable.metadata().comparator);
     }
 
diff --git 
a/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java 
b/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java
index da57ac29e0..97e82ed805 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java
@@ -27,7 +27,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.carrotsearch.hppc.LongArrayList;
-import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.index.sai.IndexContext;
@@ -105,14 +104,11 @@ public class MemtableIndexWriter implements 
PerColumnIndexWriter
                 return;
             }
 
-            final DecoratedKey minKey = rowMapping.minKey.partitionKey();
-            final DecoratedKey maxKey = rowMapping.maxKey.partitionKey();
-
             final Iterator<Pair<ByteComparable, LongArrayList>> iterator = 
rowMapping.merge(memtable);
 
             try (MemtableTermsIterator terms = new 
MemtableTermsIterator(memtable.getMinTerm(), memtable.getMaxTerm(), iterator))
             {
-                long cellCount = flush(minKey, maxKey, 
indexContext.getValidator(), terms, rowMapping.maxSSTableRowId);
+                long cellCount = flush(rowMapping.minKey, rowMapping.maxKey, 
indexContext.getValidator(), terms, rowMapping.maxSSTableRowId);
 
                 
indexDescriptor.createComponentOnDisk(IndexComponent.COLUMN_COMPLETION_MARKER, 
indexContext);
 
@@ -135,8 +131,8 @@ public class MemtableIndexWriter implements 
PerColumnIndexWriter
         }
     }
 
-    private long flush(DecoratedKey minKey,
-                       DecoratedKey maxKey,
+    private long flush(PrimaryKey minKey,
+                       PrimaryKey maxKey,
                        AbstractType<?> termComparator,
                        MemtableTermsIterator terms,
                        long maxSSTableRowId) throws IOException
@@ -175,8 +171,8 @@ public class MemtableIndexWriter implements 
PerColumnIndexWriter
                                                        numRows,
                                                        
terms.getMinSSTableRowId(),
                                                        
terms.getMaxSSTableRowId(),
-                                                       
indexDescriptor.primaryKeyFactory.createPartitionKeyOnly(minKey),
-                                                       
indexDescriptor.primaryKeyFactory.createPartitionKeyOnly(maxKey),
+                                                       minKey,
+                                                       maxKey,
                                                        terms.getMinTerm(),
                                                        terms.getMaxTerm(),
                                                        indexMetas);
diff --git 
a/src/java/org/apache/cassandra/index/sai/disk/v1/SkinnyPrimaryKeyMap.java 
b/src/java/org/apache/cassandra/index/sai/disk/v1/SkinnyPrimaryKeyMap.java
index ad7e16751b..b535861290 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/v1/SkinnyPrimaryKeyMap.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SkinnyPrimaryKeyMap.java
@@ -18,8 +18,11 @@
 
 package org.apache.cassandra.index.sai.disk.v1;
 
-import org.apache.cassandra.db.BufferDecoratedKey;
-import org.apache.cassandra.db.Clustering;
+import java.io.IOException;
+import java.util.Arrays;
+import javax.annotation.concurrent.NotThreadSafe;
+import javax.annotation.concurrent.ThreadSafe;
+
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.index.sai.disk.PrimaryKeyMap;
@@ -35,15 +38,6 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.FileHandle;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.Throwables;
-import org.apache.cassandra.utils.bytecomparable.ByteComparable;
-import org.apache.cassandra.utils.bytecomparable.ByteSource;
-import org.apache.cassandra.utils.bytecomparable.ByteSourceInverse;
-
-import javax.annotation.concurrent.NotThreadSafe;
-import javax.annotation.concurrent.ThreadSafe;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
 
 /**
  * A {@link PrimaryKeyMap} for skinny tables (those with no clustering 
columns).
@@ -130,7 +124,6 @@ public class SkinnyPrimaryKeyMap implements PrimaryKeyMap
     protected final KeyLookup.Cursor partitionKeyCursor;
     protected final IPartitioner partitioner;
     protected final PrimaryKey.Factory primaryKeyFactory;
-    protected final ByteBuffer tokenBuffer = ByteBuffer.allocate(Long.BYTES);
 
     protected SkinnyPrimaryKeyMap(LongArray tokenArray,
                                   LongArray partitionArray,
@@ -148,9 +141,7 @@ public class SkinnyPrimaryKeyMap implements PrimaryKeyMap
     @Override
     public PrimaryKey primaryKeyFromRowId(long sstableRowId)
     {
-        tokenBuffer.putLong(tokenArray.get(sstableRowId));
-        tokenBuffer.rewind();
-        return 
primaryKeyFactory.createDeferred(partitioner.getTokenFactory().fromByteArray(tokenBuffer),
 () -> supplier(sstableRowId));
+        return primaryKeyFactory.create(readPartitionKey(sstableRowId));
     }
 
     @Override
@@ -159,7 +150,9 @@ public class SkinnyPrimaryKeyMap implements PrimaryKeyMap
         long rowId = tokenArray.indexOf(primaryKey.token().getLongValue());
         // If the key is token only, the token is out of range, we are at the 
end of our keys, or we have skipped a token
         // we can return straight away.
-        if (primaryKey.isTokenOnly() || rowId < 0 || rowId + 1 == 
tokenArray.length() || tokenArray.get(rowId) != 
primaryKey.token().getLongValue())
+        if (primaryKey.kind() == PrimaryKey.Kind.TOKEN ||
+            rowId < 0 ||
+            rowId + 1 == tokenArray.length() || tokenArray.get(rowId) != 
primaryKey.token().getLongValue())
             return rowId;
         // Otherwise we need to check for token collision.
         return tokenCollisionDetection(primaryKey, rowId);
@@ -188,21 +181,8 @@ public class SkinnyPrimaryKeyMap implements PrimaryKeyMap
         return rowId;
     }
 
-    protected PrimaryKey supplier(long sstableRowId)
-    {
-        return primaryKeyFactory.create(readPartitionKey(sstableRowId), 
Clustering.EMPTY);
-    }
-
     protected DecoratedKey readPartitionKey(long sstableRowId)
     {
-        long partitionId = partitionArray.get(sstableRowId);
-        ByteSource.Peekable peekable = 
ByteSource.peekable(partitionKeyCursor.seekToPointId(partitionId).asComparableBytes(ByteComparable.Version.OSS50));
-
-        byte[] keyBytes = ByteSourceInverse.getUnescapedBytes(peekable);
-
-        assert keyBytes != null : "Primary key from map did not contain a 
partition key";
-
-        ByteBuffer decoratedKey = ByteBuffer.wrap(keyBytes);
-        return new BufferDecoratedKey(partitioner.getToken(decoratedKey), 
decoratedKey);
+        return 
primaryKeyFactory.partitionKeyFromComparableBytes(partitionKeyCursor.seekToPointId(partitionArray.get(sstableRowId)));
     }
 }
diff --git 
a/src/java/org/apache/cassandra/index/sai/disk/v1/WidePrimaryKeyMap.java 
b/src/java/org/apache/cassandra/index/sai/disk/v1/WidePrimaryKeyMap.java
index 0016c49493..a93d6a7ff1 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/v1/WidePrimaryKeyMap.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/v1/WidePrimaryKeyMap.java
@@ -18,9 +18,13 @@
 
 package org.apache.cassandra.index.sai.disk.v1;
 
+import java.io.IOException;
+import java.util.Arrays;
+import javax.annotation.concurrent.NotThreadSafe;
+import javax.annotation.concurrent.ThreadSafe;
+
 import org.apache.cassandra.db.Clustering;
 import org.apache.cassandra.db.ClusteringComparator;
-import org.apache.cassandra.db.marshal.ByteBufferAccessor;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.index.sai.disk.PrimaryKeyMap;
 import org.apache.cassandra.index.sai.disk.format.IndexComponent;
@@ -33,13 +37,6 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.FileHandle;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.Throwables;
-import org.apache.cassandra.utils.bytecomparable.ByteComparable;
-import org.apache.cassandra.utils.bytecomparable.ByteSource;
-
-import javax.annotation.concurrent.NotThreadSafe;
-import javax.annotation.concurrent.ThreadSafe;
-import java.io.IOException;
-import java.util.Arrays;
 
 /**
  * An extension of the {@link SkinnyPrimaryKeyMap} for wide tables (those with 
clustering columns).
@@ -125,6 +122,12 @@ public class WidePrimaryKeyMap extends SkinnyPrimaryKeyMap
         this.clusteringKeyCursor = clusteringKeyCursor;
     }
 
+    @Override
+    public PrimaryKey primaryKeyFromRowId(long sstableRowId)
+    {
+        return primaryKeyFactory.create(readPartitionKey(sstableRowId), 
readClusteringKey(sstableRowId));
+    }
+
     @Override
     public long rowIdFromPrimaryKey(PrimaryKey primaryKey)
     {
@@ -132,7 +135,7 @@ public class WidePrimaryKeyMap extends SkinnyPrimaryKeyMap
 
         // If the key only has a token (initial range skip in the query), the 
token is out of range,
         // or we have skipped a token, return the rowId from the token array.
-        if (primaryKey.isTokenOnly() || rowId < 0 || tokenArray.get(rowId) != 
primaryKey.token().getLongValue())
+        if (primaryKey.kind() == PrimaryKey.Kind.TOKEN || rowId < 0 || 
tokenArray.get(rowId) != primaryKey.token().getLongValue())
             return rowId;
 
         rowId = tokenCollisionDetection(primaryKey, rowId);
@@ -148,23 +151,9 @@ public class WidePrimaryKeyMap extends SkinnyPrimaryKeyMap
         FileUtils.closeQuietly(clusteringKeyCursor);
     }
 
-    @Override
-    protected PrimaryKey supplier(long sstableRowId)
-    {
-        return primaryKeyFactory.create(readPartitionKey(sstableRowId), 
readClusteringKey(sstableRowId));
-    }
-
     private Clustering<?> readClusteringKey(long sstableRowId)
     {
-        ByteSource.Peekable peekable = 
ByteSource.peekable(clusteringKeyCursor.seekToPointId(sstableRowId)
-                                                                              
.asComparableBytes(ByteComparable.Version.OSS50));
-
-        Clustering<?> clustering = 
clusteringComparator.clusteringFromByteComparable(ByteBufferAccessor.instance, 
v -> peekable);
-
-        if (clustering == null)
-            clustering = Clustering.EMPTY;
-
-        return clustering;
+        return 
primaryKeyFactory.clusteringFromByteComparable(clusteringKeyCursor.seekToPointId(sstableRowId));
     }
 
     // Returns the rowId of the next partition or the number of rows if 
supplied rowId is in the last partition
diff --git 
a/src/java/org/apache/cassandra/index/sai/disk/v1/keystore/KeyLookup.java 
b/src/java/org/apache/cassandra/index/sai/disk/v1/keystore/KeyLookup.java
index 7f6f8f3b82..7b8808e89e 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/v1/keystore/KeyLookup.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/v1/keystore/KeyLookup.java
@@ -143,10 +143,10 @@ public class KeyLookup
          * in these cases the internal buffer is cleared.
          *
          * @param pointId point id to lookup
-         * @return The {@link ByteComparable} containing the key
+         * @return The {@link ByteSource} containing the key
          * @throws IndexOutOfBoundsException if the target point id is less 
than -1 or greater than the number of keys
          */
-        public @Nonnull ByteComparable seekToPointId(long pointId)
+        public @Nonnull ByteSource seekToPointId(long pointId)
         {
             if (pointId < 0 || pointId >= keyLookupMeta.keyCount)
                 throw new 
IndexOutOfBoundsException(String.format(INDEX_OUT_OF_BOUNDS, pointId, 
keyLookupMeta.keyCount));
@@ -170,7 +170,7 @@ public class KeyLookup
                 updateCurrentBlockIndex(currentPointId);
             }
 
-            return ByteComparable.fixedLength(currentKey.bytes, 
currentKey.offset, currentKey.length);
+            return ByteSource.fixedLength(currentKey.bytes, currentKey.offset, 
currentKey.length);
         }
 
         /**
diff --git 
a/src/java/org/apache/cassandra/index/sai/disk/v1/segment/SegmentMetadata.java 
b/src/java/org/apache/cassandra/index/sai/disk/v1/segment/SegmentMetadata.java
index e9510f945b..28aa9b2e4d 100644
--- 
a/src/java/org/apache/cassandra/index/sai/disk/v1/segment/SegmentMetadata.java
+++ 
b/src/java/org/apache/cassandra/index/sai/disk/v1/segment/SegmentMetadata.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.index.sai.disk.v1.segment;
 
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -30,12 +31,14 @@ import java.util.stream.Stream;
 
 import com.google.common.collect.ImmutableMap;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.index.sai.disk.format.IndexComponent;
 import org.apache.cassandra.index.sai.disk.v1.MetadataSource;
 import org.apache.cassandra.index.sai.disk.v1.MetadataWriter;
 import org.apache.cassandra.index.sai.utils.PrimaryKey;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.bytecomparable.ByteComparable;
+import org.apache.cassandra.utils.bytecomparable.ByteSource;
+import org.apache.cassandra.utils.bytecomparable.ByteSourceInverse;
 import org.apache.lucene.store.DataInput;
 import org.apache.lucene.store.IndexOutput;
 
@@ -54,7 +57,7 @@ public class SegmentMetadata
 
     /**
      * Min and max sstable rowId in current segment.
-     *
+     * <p>
      * For index generated by compaction, minSSTableRowId is the same as 
segmentRowIdOffset.
      * But for flush, segmentRowIdOffset is taken from previous segment's 
maxSSTableRowId.
      */
@@ -80,7 +83,7 @@ public class SegmentMetadata
 
     /**
      * Root, offset, length for each index structure in the segment.
-     *
+     * <p>
      * Note: postings block offsets are stored in terms dictionary, no need to 
worry about its root.
      */
     public final ComponentMetadataMap componentMetadatas;
@@ -118,8 +121,8 @@ public class SegmentMetadata
         this.numRows = input.readLong();
         this.minSSTableRowId = input.readLong();
         this.maxSSTableRowId = input.readLong();
-        this.minKey = 
primaryKeyFactory.createPartitionKeyOnly(DatabaseDescriptor.getPartitioner().decorateKey(readBytes(input)));
-        this.maxKey = 
primaryKeyFactory.createPartitionKeyOnly(DatabaseDescriptor.getPartitioner().decorateKey(readBytes(input)));
+        this.minKey = 
primaryKeyFactory.fromComparableBytes(ByteSource.fixedLength(readBytes(input)));
+        this.maxKey = 
primaryKeyFactory.fromComparableBytes(ByteSource.fixedLength(readBytes(input)));
         this.minTerm = readBytes(input);
         this.maxTerm = readBytes(input);
         this.componentMetadatas = new ComponentMetadataMap(input);
@@ -158,9 +161,10 @@ public class SegmentMetadata
                 output.writeLong(metadata.minSSTableRowId);
                 output.writeLong(metadata.maxSSTableRowId);
 
-                Stream.of(metadata.minKey.partitionKey().getKey(),
-                          metadata.maxKey.partitionKey().getKey(),
-                          metadata.minTerm, metadata.maxTerm).forEach(bb -> 
writeBytes(bb, output));
+                
Stream.of(ByteSourceInverse.readBytes(metadata.minKey.asComparableBytes(ByteComparable.Version.OSS50)),
+                          
ByteSourceInverse.readBytes(metadata.maxKey.asComparableBytes(ByteComparable.Version.OSS50)))
+                      .forEach(b -> writeBytes(b, output));
+                Stream.of(metadata.minTerm, metadata.maxTerm).forEach(bb -> 
writeBytes(bb, output));
 
                 metadata.componentMetadatas.write(output);
             }
@@ -195,6 +199,19 @@ public class SegmentMetadata
             out.writeInt(bytes.length);
             out.writeBytes(bytes, 0, bytes.length);
         }
+        catch (IOException e)
+        {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    private static void writeBytes(byte[] bytes, IndexOutput out)
+    {
+        try
+        {
+            out.writeInt(bytes.length);
+            out.writeBytes(bytes, 0, bytes.length);
+        }
         catch (IOException ioe)
         {
             throw new RuntimeException(ioe);
diff --git 
a/src/java/org/apache/cassandra/index/sai/memory/TrieMemoryIndex.java 
b/src/java/org/apache/cassandra/index/sai/memory/TrieMemoryIndex.java
index 0df665d930..f992301df9 100644
--- a/src/java/org/apache/cassandra/index/sai/memory/TrieMemoryIndex.java
+++ b/src/java/org/apache/cassandra/index/sai/memory/TrieMemoryIndex.java
@@ -97,7 +97,8 @@ public class TrieMemoryIndex
         {
             value = TypeUtil.asIndexBytes(value, validator);
             analyzer.reset(value);
-            final PrimaryKey primaryKey = 
indexContext.keyFactory().create(key, clustering);
+            final PrimaryKey primaryKey = indexContext.hasClustering() ? 
indexContext.keyFactory().create(key, clustering)
+                                                                       : 
indexContext.keyFactory().create(key);
             final long initialSizeOnHeap = data.sizeOnHeap();
             final long initialSizeOffHeap = data.sizeOffHeap();
             final long reducerHeapSize = primaryKeysReducer.heapAllocations();
diff --git a/src/java/org/apache/cassandra/index/sai/plan/QueryController.java 
b/src/java/org/apache/cassandra/index/sai/plan/QueryController.java
index bf435678ac..2d01983219 100644
--- a/src/java/org/apache/cassandra/index/sai/plan/QueryController.java
+++ b/src/java/org/apache/cassandra/index/sai/plan/QueryController.java
@@ -119,6 +119,7 @@ public class QueryController
         return indexes.isEmpty() ? new IndexContext(cfs.metadata().keyspace,
                                                     cfs.metadata().name,
                                                     
cfs.metadata().partitionKeyType,
+                                                    cfs.getPartitioner(),
                                                     cfs.metadata().comparator,
                                                     expression.column(),
                                                     IndexTarget.Type.VALUES,
@@ -196,7 +197,7 @@ public class QueryController
      * The query does not select the key if both of the following statements 
are false:
      *  1. The table associated with the query is not using clustering keys
      *  2. The clustering index filter for the command wants the row.
-     *
+     * <p>
      *  Item 2 is important in paged queries where the {@link 
org.apache.cassandra.db.filter.ClusteringIndexSliceFilter} for
      *  subsequent paged queries may not select rows that are returned by the 
index
      *  search because that is initially partition based.
@@ -206,7 +207,7 @@ public class QueryController
      */
     public boolean doesNotSelect(PrimaryKey key)
     {
-        return !key.hasEmptyClustering() && 
!command.clusteringIndexFilter(key.partitionKey()).selects(key.clustering());
+        return key.kind() == PrimaryKey.Kind.WIDE && 
!command.clusteringIndexFilter(key.partitionKey()).selects(key.clustering());
     }
 
     // Note: This method assumes that the selects method has already been 
called for the
@@ -215,7 +216,13 @@ public class QueryController
     {
         ClusteringIndexFilter clusteringIndexFilter = 
command.clusteringIndexFilter(key.partitionKey());
 
-        if (key.hasEmptyClustering())
+        assert cfs.metadata().comparator.size() == 0 && 
!key.kind().hasClustering ||
+               cfs.metadata().comparator.size() > 0 && 
key.kind().hasClustering :
+               "PrimaryKey " + key + " clustering does not match table. There 
should be a clustering of size " + cfs.metadata().comparator.size();
+
+        // If we have skinny partitions or the key is for a static row then we 
need to get the partition as
+        // requested by the original query.
+        if (cfs.metadata().comparator.size() == 0 || key.kind() == 
PrimaryKey.Kind.STATIC)
             return clusteringIndexFilter;
         else
             return new 
ClusteringIndexNamesFilter(FBUtilities.singleton(key.clustering(), 
cfs.metadata().comparator),
diff --git 
a/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java
 
b/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java
index adcf36d157..295189b023 100644
--- 
a/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java
+++ 
b/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java
@@ -72,7 +72,7 @@ public class StorageAttachedIndexSearcher implements 
Index.Searcher
         this.command = command;
         this.queryContext = new QueryContext(command, executionQuotaMs);
         this.queryController = new QueryController(cfs, command, 
filterOperation, queryContext, tableQueryMetrics);
-        this.keyFactory = new PrimaryKey.Factory(cfs.metadata().comparator);
+        this.keyFactory = new PrimaryKey.Factory(cfs.getPartitioner(), 
cfs.metadata().comparator);
     }
 
     @Override
@@ -138,8 +138,8 @@ public class StorageAttachedIndexSearcher implements 
Index.Searcher
             this.queryContext = queryContext;
             this.keyFactory = keyFactory;
 
-            this.firstPrimaryKey = 
keyFactory.createTokenOnly(queryController.mergeRange().left.getToken());
-            this.lastPrimaryKey = 
keyFactory.createTokenOnly(queryController.mergeRange().right.getToken());
+            this.firstPrimaryKey = 
keyFactory.create(queryController.mergeRange().left.getToken());
+            this.lastPrimaryKey = 
keyFactory.create(queryController.mergeRange().right.getToken());
         }
 
         @Override
@@ -298,7 +298,7 @@ public class StorageAttachedIndexSearcher implements 
Index.Searcher
          */
         private void skipTo(@Nonnull Token token)
         {
-            resultKeyIterator.skipTo(keyFactory.createTokenOnly(token));
+            resultKeyIterator.skipTo(keyFactory.create(token));
         }
 
         /**
@@ -380,6 +380,13 @@ public class StorageAttachedIndexSearcher implements 
Index.Searcher
         private static UnfilteredRowIterator 
applyIndexFilter(UnfilteredRowIterator partition, FilterTree tree, QueryContext 
queryContext)
         {
             Row staticRow = partition.staticRow();
+
+            // We want to short-circuit the filtering of the whole partition 
if the static row
+            // satisfies the filter. If that is the case we just need to 
return the whole partition.
+            queryContext.rowsFiltered++;
+            if (tree.isSatisfiedBy(partition.partitionKey(), staticRow, 
staticRow))
+                return partition;
+
             List<Unfiltered> clusters = new ArrayList<>();
 
             while (partition.hasNext())
@@ -393,15 +400,6 @@ public class StorageAttachedIndexSearcher implements 
Index.Searcher
                 }
             }
 
-            if (clusters.isEmpty())
-            {
-                queryContext.rowsFiltered++;
-                if (tree.isSatisfiedBy(partition.partitionKey(), staticRow, 
staticRow))
-                {
-                    clusters.add(staticRow);
-                }
-            }
-
             /*
              * If {@code clusters} is empty, which means either all clustering 
row and static row pairs failed,
              *       or static row and static row pair failed. In both cases, 
we should not return any partition.
diff --git a/src/java/org/apache/cassandra/index/sai/utils/PrimaryKey.java 
b/src/java/org/apache/cassandra/index/sai/utils/PrimaryKey.java
index fe541998e2..7918cbad0c 100644
--- a/src/java/org/apache/cassandra/index/sai/utils/PrimaryKey.java
+++ b/src/java/org/apache/cassandra/index/sai/utils/PrimaryKey.java
@@ -17,34 +17,58 @@
  */
 package org.apache.cassandra.index.sai.utils;
 
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Objects;
 
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
-import javax.annotation.Nullable;
-
+import org.apache.cassandra.db.BufferDecoratedKey;
 import org.apache.cassandra.db.Clustering;
 import org.apache.cassandra.db.ClusteringComparator;
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.marshal.ByteBufferAccessor;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.bytecomparable.ByteComparable;
 import org.apache.cassandra.utils.bytecomparable.ByteSource;
+import org.apache.cassandra.utils.bytecomparable.ByteSourceInverse;
 
 /**
  * Representation of the primary key for a row consisting of the {@link 
DecoratedKey} and
  * {@link Clustering} associated with a {@link 
org.apache.cassandra.db.rows.Row}.
+ * The {@link Factory.TokenOnlyPrimaryKey} is used by the {@link 
org.apache.cassandra.index.sai.plan.StorageAttachedIndexSearcher} to
+ * position the search within the query range.
  */
 public interface PrimaryKey extends Comparable<PrimaryKey>, ByteComparable
 {
+    /**
+     * See the javadoc for {@link #kind()} for how this enum is used.
+      */
+    enum Kind
+    {
+        TOKEN(false),
+        SKINNY(false),
+        WIDE(true),
+        STATIC(true);
+
+        public final boolean hasClustering;
+
+        Kind(boolean hasClustering)
+        {
+            this.hasClustering = hasClustering;
+        }
+    }
+
     class Factory
     {
+        private final IPartitioner partitioner;
         private final ClusteringComparator clusteringComparator;
 
-        public Factory(ClusteringComparator clusteringComparator)
+        public Factory(IPartitioner partitioner, ClusteringComparator 
clusteringComparator)
         {
+            this.partitioner = partitioner;
             this.clusteringComparator = clusteringComparator;
         }
 
@@ -54,18 +78,22 @@ public interface PrimaryKey extends Comparable<PrimaryKey>, 
ByteComparable
          * {@link Token} only primary keys are used for defining the partition 
range
          * of a query.
          */
-        public PrimaryKey createTokenOnly(Token token)
+        public PrimaryKey create(Token token)
         {
             assert token != null : "Cannot create a primary key with a null 
token";
 
             return new TokenOnlyPrimaryKey(token);
         }
 
-        public PrimaryKey createPartitionKeyOnly(DecoratedKey partitionKey)
+        /**
+         * Create a {@link PrimaryKey} for tables without clustering columns
+         */
+        public PrimaryKey create(DecoratedKey partitionKey)
         {
+            assert clusteringComparator.size() == 0 : "Cannot create a skinny 
primary key for a table with clustering columns";
             assert partitionKey != null : "Cannot create a primary key with a 
null partition key";
 
-            return new ImmutablePrimaryKey(partitionKey, null);
+            return new SkinnyPrimaryKey(partitionKey);
         }
 
         /**
@@ -74,66 +102,103 @@ public interface PrimaryKey extends 
Comparable<PrimaryKey>, ByteComparable
          */
         public PrimaryKey create(DecoratedKey partitionKey, Clustering<?> 
clustering)
         {
+            assert clusteringComparator.size() > 0 : "Cannot create a wide 
primary key for a table without clustering columns";
             assert partitionKey != null : "Cannot create a primary key with a 
null partition key";
             assert clustering != null : "Cannot create a primary key with a 
null clustering";
 
-            return new ImmutablePrimaryKey(partitionKey, clustering);
+            return clustering == Clustering.STATIC_CLUSTERING ? new 
StaticPrimaryKey(partitionKey) : new WidePrimaryKey(partitionKey, clustering);
+        }
+
+        /**
+         * Create a {@link PrimaryKey} from a {@link ByteSource}. This should 
only be used with {@link ByteSource} instances
+         * created by calls to {@link PrimaryKey#asComparableBytes(Version)}.
+         */
+        public PrimaryKey fromComparableBytes(ByteSource byteSource)
+        {
+            if (clusteringComparator.size() > 0)
+            {
+                ByteSource.Peekable peekable = ByteSource.peekable(byteSource);
+                DecoratedKey partitionKey = 
partitionKeyFromComparableBytes(ByteSourceInverse.nextComponentSource(peekable));
+                Clustering<?> clustering = 
clusteringFromByteComparable(ByteSourceInverse.nextComponentSource(peekable));
+                return create(partitionKey, clustering);
+            }
+            else
+            {
+                return create(partitionKeyFromComparableBytes(byteSource));
+            }
+        }
+
+        /**
+         * Create a {@link DecoratedKey} from a {@link ByteSource}. This is a 
separate method because of it's use by
+         * the {@link org.apache.cassandra.index.sai.disk.PrimaryKeyMap} 
implementations to create partition keys.
+         */
+        public DecoratedKey partitionKeyFromComparableBytes(ByteSource 
byteSource)
+        {
+            ByteBuffer decoratedKey = 
ByteBuffer.wrap(ByteSourceInverse.getUnescapedBytes(ByteSource.peekable(byteSource)));
+            return new BufferDecoratedKey(partitioner.getToken(decoratedKey), 
decoratedKey);
         }
 
-        public PrimaryKey createDeferred(Token token, Supplier<PrimaryKey> 
primaryKeySupplier)
+        /**
+         * Create a {@link Clustering} from a {@link ByteSource}. This is a 
separate method because of its use by
+         * the {@link 
org.apache.cassandra.index.sai.disk.v1.WidePrimaryKeyMap} to create its 
clustering keys.
+         */
+        public Clustering<?> clusteringFromByteComparable(ByteSource 
byteSource)
         {
-            assert token != null : "Cannot create a deferred primary key with 
a null token";
-            assert primaryKeySupplier != null : "Cannot create a deferred 
primary key with a null key supplier";
+            Clustering<?> clustering = 
clusteringComparator.clusteringFromByteComparable(ByteBufferAccessor.instance, 
v -> byteSource);
 
-            return new MutablePrimaryKey(token, primaryKeySupplier);
+            // Clustering is null for static rows
+            return (clustering == null) ? Clustering.STATIC_CLUSTERING : 
clustering;
         }
 
-        abstract class AbstractPrimaryKey implements PrimaryKey
+        class TokenOnlyPrimaryKey implements PrimaryKey
         {
+            protected final Token token;
+
+            TokenOnlyPrimaryKey(Token token)
+            {
+                this.token = token;
+            }
+
             @Override
-            @SuppressWarnings("ConstantConditions")
-            public ByteSource asComparableBytes(ByteComparable.Version version)
+            public Kind kind()
             {
-                ByteSource keyComparable = 
ByteSource.of(partitionKey().getKey(), version);
-                if (clusteringComparator.size() == 0)
-                    return keyComparable;
-                // It is important that the 
ClusteringComparator.asBytesComparable method is used
-                // to maintain the correct clustering sort order
-                ByteSource clusteringComparable = clustering() == null ||
-                                                  clustering().isEmpty() ? null
-                                                                         : 
clusteringComparator.asByteComparable(clustering())
-                                                                               
                .asComparableBytes(version);
-                return ByteSource.withTerminator(version == 
ByteComparable.Version.LEGACY ? ByteSource.END_OF_STREAM
-                                                                               
           : ByteSource.TERMINATOR,
-                                                 keyComparable,
-                                                 clusteringComparable);
+                return Kind.TOKEN;
             }
 
             @Override
-            @SuppressWarnings("ConstantConditions")
-            public int compareTo(PrimaryKey o)
+            public Token token()
             {
-                int cmp = token().compareTo(o.token());
+                return token;
+            }
 
-                // If the tokens don't match then we don't need to compare any 
more of the key.
-                // Otherwise, if either of the keys are token only we can only 
compare tokens
-                if ((cmp != 0) || isTokenOnly() || o.isTokenOnly())
-                    return cmp;
+            @Override
+            public DecoratedKey partitionKey()
+            {
+                throw new UnsupportedOperationException();
+            }
 
-                // Next compare the partition keys. If they are not equal or
-                // this is a single row partition key or there are no
-                // clusterings then we can return the result of this without
-                // needing to compare the clusterings
-                cmp = partitionKey().compareTo(o.partitionKey());
-                if (cmp != 0 || hasEmptyClustering() || o.hasEmptyClustering())
-                    return cmp;
-                return clusteringComparator.compare(clustering(), 
o.clustering());
+            @Override
+            public Clustering<?> clustering()
+            {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public ByteSource asComparableBytes(Version version)
+            {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public int compareTo(PrimaryKey o)
+            {
+                return token().compareTo(o.token());
             }
 
             @Override
             public int hashCode()
             {
-                return Objects.hash(token(), partitionKey(), clustering(), 
clusteringComparator);
+                return Objects.hash(token(), clusteringComparator);
             }
 
             @Override
@@ -145,165 +210,219 @@ public interface PrimaryKey extends 
Comparable<PrimaryKey>, ByteComparable
             }
 
             @Override
-            @SuppressWarnings("ConstantConditions")
             public String toString()
             {
-                return isTokenOnly() ? String.format("PrimaryKey: { token: %s 
}", token())
-                                     : String.format("PrimaryKey: { token: %s, 
partition: %s, clustering: %s:%s } ",
-                                                     token(),
-                                                     partitionKey(),
-                                                     clustering() == null ? 
null : clustering().kind(),
-                                                     clustering() == null ? 
null : Arrays.stream(clustering().getBufferArray())
-                                                                               
          .map(ByteBufferUtil::bytesToHex)
-                                                                               
          .collect(Collectors.joining(", ")));
+                return String.format("PrimaryKey: { token: %s }", token());
             }
         }
 
-        class TokenOnlyPrimaryKey extends AbstractPrimaryKey
+        class SkinnyPrimaryKey extends TokenOnlyPrimaryKey
         {
-            private final Token token;
+            protected final DecoratedKey partitionKey;
 
-            TokenOnlyPrimaryKey(Token token)
+            SkinnyPrimaryKey(DecoratedKey partitionKey)
             {
-                this.token = token;
+                super(partitionKey.getToken());
+                this.partitionKey = partitionKey;
             }
 
             @Override
-            public boolean isTokenOnly()
+            public Kind kind()
             {
-                return true;
+                return Kind.SKINNY;
             }
 
             @Override
-            public Token token()
+            public DecoratedKey partitionKey()
             {
-                return token;
+                return partitionKey;
             }
 
             @Override
-            public DecoratedKey partitionKey()
+            public ByteSource asComparableBytes(Version version)
             {
-                throw new UnsupportedOperationException();
+                return ByteSource.of(partitionKey().getKey(), version);
             }
 
             @Override
-            public Clustering<?> clustering()
+            public int compareTo(PrimaryKey o)
             {
-                throw new UnsupportedOperationException();
+                int cmp = super.compareTo(o);
+
+                // If the tokens don't match then we don't need to compare any 
more of the key.
+                // Otherwise, if the other key is token only we can only 
compare tokens
+                // This is used by the ResultRetriever to skip to the current 
key range start position
+                // during result retrieval.
+                if ((cmp != 0) || o.kind() == Kind.TOKEN)
+                    return cmp;
+
+                // Finally compare the partition keys
+                return partitionKey().compareTo(o.partitionKey());
             }
 
             @Override
-            public ByteSource asComparableBytes(Version version)
+            public int hashCode()
             {
-                throw new UnsupportedOperationException();
+                return Objects.hash(token(), partitionKey(), Clustering.EMPTY, 
clusteringComparator);
+            }
+
+            @Override
+            public String toString()
+            {
+                return String.format("PrimaryKey: { token: %s, partition: %s 
}", token(), partitionKey());
             }
         }
 
-        class ImmutablePrimaryKey extends AbstractPrimaryKey
+        class StaticPrimaryKey extends SkinnyPrimaryKey
         {
-            private final Token token;
-            private final DecoratedKey partitionKey;
-            private final Clustering<?> clustering;
+            StaticPrimaryKey(DecoratedKey partitionKey)
+            {
+                super(partitionKey);
+            }
 
-            ImmutablePrimaryKey(DecoratedKey partitionKey, Clustering<?> 
clustering)
+            @Override
+            public Kind kind()
             {
-                this.token = partitionKey.getToken();
-                this.partitionKey = partitionKey;
-                this.clustering = clustering;
+                return Kind.STATIC;
             }
 
             @Override
-            public Token token()
+            public Clustering<?> clustering()
             {
-                return token;
+                return Clustering.STATIC_CLUSTERING;
             }
 
             @Override
-            public DecoratedKey partitionKey()
+            public ByteSource asComparableBytes(ByteComparable.Version version)
             {
-                return partitionKey;
+                ByteSource keyComparable = 
ByteSource.of(partitionKey().getKey(), version);
+                // Static clustering cannot be serialized or made to a byte 
comparable, so we use null as the component.
+                return ByteSource.withTerminator(version == 
ByteComparable.Version.LEGACY ? ByteSource.END_OF_STREAM
+                                                                               
           : ByteSource.TERMINATOR,
+                                                 keyComparable,
+                                                 null);
             }
 
             @Override
-            public Clustering<?> clustering()
+            public int compareTo(PrimaryKey o)
             {
-                return clustering;
+                int cmp = super.compareTo(o);
+                if (cmp != 0 || o.kind() == Kind.TOKEN || o.kind() == 
Kind.SKINNY)
+                    return cmp;
+                // The static clustering comes first in the sort order of if 
the other key has static clustering we
+                // are equals otherwise we are less than the other
+                return o.kind() == Kind.STATIC ? 0 : -1;
+            }
+
+            @Override
+            public int hashCode()
+            {
+                return Objects.hash(token(), partitionKey(), 
Clustering.STATIC_CLUSTERING, clusteringComparator);
+            }
+
+            @Override
+            public String toString()
+            {
+                return String.format("PrimaryKey: { token: %s, partition: %s, 
clustering: STATIC } ", token(), partitionKey());
             }
         }
 
-        class MutablePrimaryKey extends AbstractPrimaryKey
+        class WidePrimaryKey extends SkinnyPrimaryKey
         {
-            private final Token token;
-            private final Supplier<PrimaryKey> primaryKeySupplier;
+            private final Clustering<?> clustering;
 
-            private boolean notLoaded = true;
-            private DecoratedKey partitionKey;
-            private Clustering<?> clustering;
+            WidePrimaryKey(DecoratedKey partitionKey, Clustering<?> clustering)
+            {
+                super(partitionKey);
+                this.clustering = clustering;
+            }
 
-            MutablePrimaryKey(Token token, Supplier<PrimaryKey> 
primaryKeySupplier)
+            @Override
+            public Kind kind()
             {
-                this.token = token;
-                this.primaryKeySupplier = primaryKeySupplier;
+                return Kind.WIDE;
             }
 
             @Override
-            public Token token()
+            public Clustering<?> clustering()
             {
-                return token;
+                return clustering;
             }
 
             @Override
-            public DecoratedKey partitionKey()
+            public ByteSource asComparableBytes(ByteComparable.Version version)
             {
-                loadDeferred();
-                return partitionKey;
+                ByteSource keyComparable = 
ByteSource.of(partitionKey().getKey(), version);
+                // It is important that the 
ClusteringComparator.asBytesComparable method is used
+                // to maintain the correct clustering sort order.
+                ByteSource clusteringComparable = 
clusteringComparator.asByteComparable(clustering()).asComparableBytes(version);
+                return ByteSource.withTerminator(version == 
ByteComparable.Version.LEGACY ? ByteSource.END_OF_STREAM
+                                                                               
           : ByteSource.TERMINATOR,
+                                                 keyComparable,
+                                                 clusteringComparable);
             }
 
             @Override
-            public Clustering<?> clustering()
+            public int compareTo(PrimaryKey o)
             {
-                loadDeferred();
-                return clustering;
+                int cmp = super.compareTo(o);
+                if (cmp != 0 || o.kind() == Kind.TOKEN || o.kind() == 
Kind.SKINNY)
+                    return cmp;
+                // At this point we will be greater than other if it is static
+                if (o.kind() == Kind.STATIC)
+                    return 1;
+                return clusteringComparator.compare(clustering(), 
o.clustering());
             }
 
-            private void loadDeferred()
+            @Override
+            public int hashCode()
             {
-                if (notLoaded)
-                {
-                    PrimaryKey deferredPrimaryKey = primaryKeySupplier.get();
-                    this.partitionKey = deferredPrimaryKey.partitionKey();
-                    this.clustering = deferredPrimaryKey.clustering();
-                    notLoaded = false;
-                }
+                return Objects.hash(token(), partitionKey(), clustering(), 
clusteringComparator);
+            }
+
+            @Override
+            public String toString()
+            {
+                return String.format("PrimaryKey: { token: %s, partition: %s, 
clustering: %s:%s } ",
+                                     token(),
+                                     partitionKey(),
+                                     clustering().kind(),
+                                     
Arrays.stream(clustering().getBufferArray())
+                                           .map(ByteBufferUtil::bytesToHex)
+                                           .collect(Collectors.joining(", ")));
             }
         }
     }
 
-    default boolean isTokenOnly()
-    {
-        return false;
-    }
+    /**
+     * Returns the {@link Kind} of the {@link PrimaryKey}. The {@link Kind} is 
used locally in the {@link #compareTo(Object)}
+     * methods to determine how far the comparision needs to go between keys.
+     * <p>
+     * The {@link Kind} values have a categorization of {@code isClustering}. 
This indicates whether the key belongs to
+     * a table with clustering tables or not.
+     */
+    Kind kind();
 
+    /**
+     * Returns the {@link Token} component of the {@link PrimaryKey}
+     */
     Token token();
 
-    @Nullable
+    /**
+     * Returns the {@link DecoratedKey} representing the partition key of the 
{@link PrimaryKey}.
+     * <p>
+     * Note: This cannot be null but some {@link PrimaryKey} implementations 
can throw {@link UnsupportedOperationException}
+     * if they do not support partition keys.
+     */
     DecoratedKey partitionKey();
 
-    @Nullable
-    Clustering<?> clustering();
-
     /**
-     * Return whether the primary key has an empty clustering or not.
-     * By default, the clustering is empty if the internal clustering
-     * is null or is empty.
-     *
-     * @return {@code true} if the clustering is empty, otherwise {@code false}
+     * Returns the {@link Clustering} representing the clustering component of 
the {@link PrimaryKey}.
+     * <p>
+     * Note: This cannot be null but some {@link PrimaryKey} implementations 
can throw {@link UnsupportedOperationException}
+     * if they do not support clustering columns.
      */
-    @SuppressWarnings("ConstantConditions")
-    default boolean hasEmptyClustering()
-    {
-        return clustering() == null || clustering().isEmpty();
-    }
+    Clustering<?> clustering();
 
     /**
      * Returns the {@link PrimaryKey} as a {@link ByteSource} byte comparable 
representation.
@@ -314,6 +433,7 @@ public interface PrimaryKey extends Comparable<PrimaryKey>, 
ByteComparable
      *
      * @param version the {@link ByteComparable.Version} to use for the 
implementation
      * @return the {@code ByteSource} byte comparable.
+     * @throws UnsupportedOperationException for {@link PrimaryKey} 
implementations that are not byte-comparable
      */
     ByteSource asComparableBytes(ByteComparable.Version version);
 }
diff --git 
a/test/microbench/org/apache/cassandra/test/microbench/sai/KeyLookupBench.java 
b/test/microbench/org/apache/cassandra/test/microbench/sai/KeyLookupBench.java
index 54f909efad..65bded3a45 100644
--- 
a/test/microbench/org/apache/cassandra/test/microbench/sai/KeyLookupBench.java
+++ 
b/test/microbench/org/apache/cassandra/test/microbench/sai/KeyLookupBench.java
@@ -117,13 +117,13 @@ public class KeyLookupBench
                                                metadata.name,
                                                Util.newUUIDGen().get());
 
-        indexDescriptor = IndexDescriptor.create(descriptor, 
metadata.comparator);
+        indexDescriptor = IndexDescriptor.create(descriptor, 
metadata.partitioner, metadata.comparator);
 
         
CassandraRelevantProperties.SAI_SORTED_TERMS_PARTITION_BLOCK_SHIFT.setInt(partitionBlockShift);
         
CassandraRelevantProperties.SAI_SORTED_TERMS_CLUSTERING_BLOCK_SHIFT.setInt(clusteringBlockShift);
         SSTableComponentsWriter writer = new 
SSTableComponentsWriter(indexDescriptor);
 
-        PrimaryKey.Factory factory = new 
PrimaryKey.Factory(metadata.comparator);
+        PrimaryKey.Factory factory = new 
PrimaryKey.Factory(metadata.partitioner, metadata.comparator);
 
         PrimaryKey[] primaryKeys = new PrimaryKey[rows];
         int partition = 0;
diff --git a/test/unit/org/apache/cassandra/index/sai/SAITester.java 
b/test/unit/org/apache/cassandra/index/sai/SAITester.java
index c1080fb528..e043d6602f 100644
--- a/test/unit/org/apache/cassandra/index/sai/SAITester.java
+++ b/test/unit/org/apache/cassandra/index/sai/SAITester.java
@@ -71,6 +71,7 @@ import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.index.Index;
 import org.apache.cassandra.index.sai.disk.SSTableIndex;
 import org.apache.cassandra.index.sai.disk.format.IndexComponent;
@@ -141,7 +142,7 @@ public abstract class SAITester extends CQLTester
 
     public static final ClusteringComparator EMPTY_COMPARATOR = new 
ClusteringComparator();
 
-    public static final PrimaryKey.Factory TEST_FACTORY = new 
PrimaryKey.Factory(EMPTY_COMPARATOR);
+    public static final PrimaryKey.Factory TEST_FACTORY = new 
PrimaryKey.Factory(Murmur3Partitioner.instance, EMPTY_COMPARATOR);
 
     @BeforeClass
     public static void setUpClass()
@@ -254,6 +255,7 @@ public abstract class SAITester extends CQLTester
         return new IndexContext("test_ks",
                                 "test_cf",
                                 UTF8Type.instance,
+                                Murmur3Partitioner.instance,
                                 new ClusteringComparator(),
                                 ColumnMetadata.regularColumn("sai", 
"internal", name, validator),
                                 IndexTarget.Type.SIMPLE,
diff --git 
a/test/unit/org/apache/cassandra/index/sai/disk/format/IndexDescriptorTest.java 
b/test/unit/org/apache/cassandra/index/sai/disk/format/IndexDescriptorTest.java
index b6546c9551..264e217e70 100644
--- 
a/test/unit/org/apache/cassandra/index/sai/disk/format/IndexDescriptorTest.java
+++ 
b/test/unit/org/apache/cassandra/index/sai/disk/format/IndexDescriptorTest.java
@@ -31,6 +31,7 @@ import org.junit.rules.TemporaryFolder;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.index.sai.IndexContext;
 import org.apache.cassandra.index.sai.SAITester;
 import org.apache.cassandra.io.sstable.Descriptor;
@@ -68,7 +69,7 @@ public class IndexDescriptorTest
     {
         createFileOnDisk("-SAI+aa+GroupComplete.db");
 
-        IndexDescriptor indexDescriptor = IndexDescriptor.create(descriptor, 
SAITester.EMPTY_COMPARATOR);
+        IndexDescriptor indexDescriptor = IndexDescriptor.create(descriptor, 
Murmur3Partitioner.instance, SAITester.EMPTY_COMPARATOR);
 
         assertEquals(Version.AA, indexDescriptor.version);
         
assertTrue(indexDescriptor.hasComponent(IndexComponent.GROUP_COMPLETION_MARKER));
@@ -79,7 +80,7 @@ public class IndexDescriptorTest
     {
         createFileOnDisk("-SAI+aa+test_index+ColumnComplete.db");
 
-        IndexDescriptor indexDescriptor = IndexDescriptor.create(descriptor, 
SAITester.EMPTY_COMPARATOR);
+        IndexDescriptor indexDescriptor = IndexDescriptor.create(descriptor, 
Murmur3Partitioner.instance, SAITester.EMPTY_COMPARATOR);
         IndexContext indexContext = SAITester.createIndexContext("test_index", 
UTF8Type.instance);
 
         assertEquals(Version.AA, indexDescriptor.version);
diff --git 
a/test/unit/org/apache/cassandra/index/sai/disk/v1/InvertedIndexSearcherTest.java
 
b/test/unit/org/apache/cassandra/index/sai/disk/v1/InvertedIndexSearcherTest.java
index 485d2b095a..dd3547ed79 100644
--- 
a/test/unit/org/apache/cassandra/index/sai/disk/v1/InvertedIndexSearcherTest.java
+++ 
b/test/unit/org/apache/cassandra/index/sai/disk/v1/InvertedIndexSearcherTest.java
@@ -62,12 +62,12 @@ public class InvertedIndexSearcherTest extends 
SAIRandomizedTester
 {
     public static final PrimaryKeyMap TEST_PRIMARY_KEY_MAP = new 
PrimaryKeyMap()
     {
-        private final PrimaryKey.Factory primaryKeyFactory = new 
PrimaryKey.Factory(new ClusteringComparator());
+        private final PrimaryKey.Factory primaryKeyFactory = new 
PrimaryKey.Factory(Murmur3Partitioner.instance, new ClusteringComparator());
 
         @Override
         public PrimaryKey primaryKeyFromRowId(long sstableRowId)
         {
-            return primaryKeyFactory.createTokenOnly(new 
Murmur3Partitioner.LongToken(sstableRowId));
+            return primaryKeyFactory.create(new 
Murmur3Partitioner.LongToken(sstableRowId));
         }
 
         @Override
@@ -127,7 +127,7 @@ public class InvertedIndexSearcherTest extends 
SAIRandomizedTester
                     final int idxToSkip = numPostings - 7;
                     // tokens are equal to their corresponding row IDs
                     final long tokenToSkip = 
termsEnum.get(t).right.get(idxToSkip);
-                    results.skipTo(SAITester.TEST_FACTORY.createTokenOnly(new 
Murmur3Partitioner.LongToken(tokenToSkip)));
+                    results.skipTo(SAITester.TEST_FACTORY.create(new 
Murmur3Partitioner.LongToken(tokenToSkip)));
 
                     for (int p = idxToSkip; p < numPostings; ++p)
                     {
@@ -189,8 +189,8 @@ public class InvertedIndexSearcherTest extends 
SAIRandomizedTester
                                                                     size,
                                                                     0,
                                                                     
Long.MAX_VALUE,
-                                                                    
SAITester.TEST_FACTORY.createTokenOnly(DatabaseDescriptor.getPartitioner().getMinimumToken()),
-                                                                    
SAITester.TEST_FACTORY.createTokenOnly(DatabaseDescriptor.getPartitioner().getMaximumToken()),
+                                                                    
SAITester.TEST_FACTORY.create(DatabaseDescriptor.getPartitioner().getMinimumToken()),
+                                                                    
SAITester.TEST_FACTORY.create(DatabaseDescriptor.getPartitioner().getMaximumToken()),
                                                                     
wrap(termsEnum.get(0).left),
                                                                     
wrap(termsEnum.get(terms - 1).left),
                                                                     
indexMetas);
diff --git 
a/test/unit/org/apache/cassandra/index/sai/disk/v1/SegmentFlushTest.java 
b/test/unit/org/apache/cassandra/index/sai/disk/v1/SegmentFlushTest.java
index 96d746df29..cbd74d8bef 100644
--- a/test/unit/org/apache/cassandra/index/sai/disk/v1/SegmentFlushTest.java
+++ b/test/unit/org/apache/cassandra/index/sai/disk/v1/SegmentFlushTest.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.rows.BTreeRow;
 import org.apache.cassandra.db.rows.BufferCell;
 import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.index.sai.IndexContext;
 import org.apache.cassandra.index.sai.SAITester;
 import org.apache.cassandra.index.sai.disk.format.IndexComponent;
@@ -54,6 +55,7 @@ import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.io.util.FileHandle;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.bytecomparable.ByteComparable;
 
 import static org.apache.cassandra.Util.dk;
@@ -75,6 +77,8 @@ public class SegmentFlushTest
     public static void init()
     {
         DatabaseDescriptor.daemonInitialization();
+        DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+        
StorageService.instance.setPartitionerUnsafe(Murmur3Partitioner.instance);
     }
 
     @After
@@ -106,6 +110,7 @@ public class SegmentFlushTest
     {
         Path tmpDir = Files.createTempDirectory("SegmentFlushTest");
         IndexDescriptor indexDescriptor = IndexDescriptor.create(new 
Descriptor(new File(tmpDir.toFile()), "ks", "cf", new 
SequenceBasedSSTableId(1)),
+                                                                 
Murmur3Partitioner.instance,
                                                                  
SAITester.EMPTY_COMPARATOR);
 
         ColumnMetadata column = ColumnMetadata.regularColumn("sai", 
"internal", "column", UTF8Type.instance);
@@ -114,6 +119,7 @@ public class SegmentFlushTest
         IndexContext indexContext = new IndexContext("ks",
                                                      "cf",
                                                      UTF8Type.instance,
+                                                     
Murmur3Partitioner.instance,
                                                      new 
ClusteringComparator(),
                                                      column,
                                                      IndexTarget.Type.SIMPLE,
@@ -127,13 +133,13 @@ public class SegmentFlushTest
         DecoratedKey key1 = keys.get(0);
         ByteBuffer term1 = UTF8Type.instance.decompose("a");
         Row row1 = createRow(column, term1);
-        writer.addRow(SAITester.TEST_FACTORY.create(key1, Clustering.EMPTY), 
row1, sstableRowId1);
+        writer.addRow(SAITester.TEST_FACTORY.create(key1), row1, 
sstableRowId1);
 
         // expect a flush if exceed max rowId per segment
         DecoratedKey key2 = keys.get(1);
         ByteBuffer term2 = UTF8Type.instance.decompose("b");
         Row row2 = createRow(column, term2);
-        writer.addRow(SAITester.TEST_FACTORY.create(key2, Clustering.EMPTY), 
row2, sstableRowId2);
+        writer.addRow(SAITester.TEST_FACTORY.create(key2), row2, 
sstableRowId2);
 
         writer.complete(Stopwatch.createStarted());
 
@@ -147,8 +153,8 @@ public class SegmentFlushTest
         segmentRowIdOffset = sstableRowId1;
         posting1 = 0;
         posting2 = segments == 1 ? (int) (sstableRowId2 - segmentRowIdOffset) 
: 0;
-        minKey = SAITester.TEST_FACTORY.createTokenOnly(key1.getToken());
-        maxKey = segments == 1 ? 
SAITester.TEST_FACTORY.createTokenOnly(key2.getToken()) : minKey;
+        minKey = SAITester.TEST_FACTORY.create(key1.getToken());
+        maxKey = segments == 1 ? 
SAITester.TEST_FACTORY.create(key2.getToken()) : minKey;
         minTerm = term1;
         maxTerm = segments == 1 ? term2 : term1;
         numRows = segments == 1 ? 2 : 1;
@@ -160,7 +166,7 @@ public class SegmentFlushTest
             segmentRowIdOffset = sstableRowId2;
             posting1 = 0;
             posting2 = 0;
-            minKey = SAITester.TEST_FACTORY.createTokenOnly(key2.getToken());
+            minKey = SAITester.TEST_FACTORY.create(key2.getToken());
             maxKey = minKey;
             minTerm = term2;
             maxTerm = term2;
diff --git 
a/test/unit/org/apache/cassandra/index/sai/disk/v1/WideRowPrimaryKeyTest.java 
b/test/unit/org/apache/cassandra/index/sai/disk/v1/WideRowPrimaryKeyTest.java
index 767ac6fa0f..9a6a4dcb15 100644
--- 
a/test/unit/org/apache/cassandra/index/sai/disk/v1/WideRowPrimaryKeyTest.java
+++ 
b/test/unit/org/apache/cassandra/index/sai/disk/v1/WideRowPrimaryKeyTest.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import org.junit.Test;
 
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.index.sai.disk.PrimaryKeyMap;
 import org.apache.cassandra.index.sai.disk.format.IndexDescriptor;
 import org.apache.cassandra.index.sai.utils.AbstractPrimaryKeyTester;
@@ -42,7 +43,7 @@ public class WideRowPrimaryKeyTest extends 
AbstractPrimaryKeyTester
 
         SSTableComponentsWriter writer = new 
SSTableComponentsWriter(indexDescriptor);
 
-        PrimaryKey.Factory factory = new 
PrimaryKey.Factory(compositePartitionMultipleClusteringAsc.comparator);
+        PrimaryKey.Factory factory = new 
PrimaryKey.Factory(Murmur3Partitioner.instance, 
compositePartitionMultipleClusteringAsc.comparator);
 
         int rows = nextInt(1000, 10000);
         PrimaryKey[] keys = new PrimaryKey[rows];
diff --git 
a/test/unit/org/apache/cassandra/index/sai/disk/v1/bbtree/BlockBalancedTreeIndexBuilder.java
 
b/test/unit/org/apache/cassandra/index/sai/disk/v1/bbtree/BlockBalancedTreeIndexBuilder.java
index b69e5155c8..a62fe3991a 100644
--- 
a/test/unit/org/apache/cassandra/index/sai/disk/v1/bbtree/BlockBalancedTreeIndexBuilder.java
+++ 
b/test/unit/org/apache/cassandra/index/sai/disk/v1/bbtree/BlockBalancedTreeIndexBuilder.java
@@ -65,12 +65,12 @@ public class BlockBalancedTreeIndexBuilder
 {
     public static final PrimaryKeyMap TEST_PRIMARY_KEY_MAP = new 
PrimaryKeyMap()
     {
-        private final PrimaryKey.Factory primaryKeyFactory = new 
PrimaryKey.Factory(null);
+        private final PrimaryKey.Factory primaryKeyFactory = new 
PrimaryKey.Factory(Murmur3Partitioner.instance, null);
 
         @Override
         public PrimaryKey primaryKeyFromRowId(long sstableRowId)
         {
-            return primaryKeyFactory.createTokenOnly(new 
Murmur3Partitioner.LongToken(sstableRowId));
+            return primaryKeyFactory.create(new 
Murmur3Partitioner.LongToken(sstableRowId));
         }
 
         @Override
@@ -122,8 +122,8 @@ public class BlockBalancedTreeIndexBuilder
                                        minSegmentRowId,
                                        maxSegmentRowId,
                                        // min/max is unused for now
-                                       
SAITester.TEST_FACTORY.createTokenOnly(Murmur3Partitioner.instance.decorateKey(UTF8Type.instance.fromString("a")).getToken()),
-                                       
SAITester.TEST_FACTORY.createTokenOnly(Murmur3Partitioner.instance.decorateKey(UTF8Type.instance.fromString("b")).getToken()),
+                                       
SAITester.TEST_FACTORY.create(Murmur3Partitioner.instance.decorateKey(UTF8Type.instance.fromString("a")).getToken()),
+                                       
SAITester.TEST_FACTORY.create(Murmur3Partitioner.instance.decorateKey(UTF8Type.instance.fromString("b")).getToken()),
                                        UTF8Type.instance.fromString("c"),
                                        UTF8Type.instance.fromString("d"),
                                        indexMetas);
diff --git 
a/test/unit/org/apache/cassandra/index/sai/disk/v1/keystore/KeyLookupTest.java 
b/test/unit/org/apache/cassandra/index/sai/disk/v1/keystore/KeyLookupTest.java
index bd522bde5b..92a31f87f7 100644
--- 
a/test/unit/org/apache/cassandra/index/sai/disk/v1/keystore/KeyLookupTest.java
+++ 
b/test/unit/org/apache/cassandra/index/sai/disk/v1/keystore/KeyLookupTest.java
@@ -29,7 +29,6 @@ import java.util.Map;
 import org.junit.Before;
 import org.junit.Test;
 
-import org.apache.cassandra.db.Clustering;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.UTF8Type;
@@ -75,7 +74,7 @@ public class KeyLookupTest extends SAIRandomizedTester
         {
             ByteBuffer buffer = 
UTF8Type.instance.decompose(Integer.toString(x));
             DecoratedKey partitionKey = 
Murmur3Partitioner.instance.decorateKey(buffer);
-            PrimaryKey primaryKey = 
SAITester.TEST_FACTORY.create(partitionKey, Clustering.EMPTY);
+            PrimaryKey primaryKey = 
SAITester.TEST_FACTORY.create(partitionKey);
             primaryKeys.add(primaryKey);
         }
 
@@ -316,35 +315,18 @@ public class KeyLookupTest extends SAIRandomizedTester
         // iterate ascending
         withKeyLookupCursor(cursor -> {
             for (int x = 0; x < keys.size(); x++)
-            {
-                ByteComparable key = cursor.seekToPointId(x);
-
-                byte[] bytes = 
ByteSourceInverse.readBytes(key.asComparableBytes(ByteComparable.Version.OSS50));
-
-                assertArrayEquals(keys.get(x), bytes);
-            }
+                assertArrayEquals(keys.get(x), 
ByteSourceInverse.readBytes(cursor.seekToPointId(x)));
         });
 
         // iterate ascending skipping blocks
         withKeyLookupCursor(cursor -> {
             for (int x = 0; x < keys.size(); x += 17)
-            {
-                ByteComparable key = cursor.seekToPointId(x);
-
-                byte[] bytes = 
ByteSourceInverse.readBytes(key.asComparableBytes(ByteComparable.Version.OSS50));
-
-                assertArrayEquals(keys.get(x), bytes);
-            }
+                assertArrayEquals(keys.get(x), 
ByteSourceInverse.readBytes(cursor.seekToPointId(x)));
         });
 
         withKeyLookupCursor(cursor -> {
-            ByteComparable key = cursor.seekToPointId(7);
-            byte[] bytes = 
ByteSourceInverse.readBytes(key.asComparableBytes(ByteComparable.Version.OSS50));
-            assertArrayEquals(keys.get(7), bytes);
-
-            key = cursor.seekToPointId(7);
-            bytes = 
ByteSourceInverse.readBytes(key.asComparableBytes(ByteComparable.Version.OSS50));
-            assertArrayEquals(keys.get(7), bytes);
+            assertArrayEquals(keys.get(7), 
ByteSourceInverse.readBytes(cursor.seekToPointId(7)));
+            assertArrayEquals(keys.get(7), 
ByteSourceInverse.readBytes(cursor.seekToPointId(7)));
         });
     }
 
diff --git 
a/test/unit/org/apache/cassandra/index/sai/iterators/KeyRangeConcatIteratorTest.java
 
b/test/unit/org/apache/cassandra/index/sai/iterators/KeyRangeConcatIteratorTest.java
index 6f1afe7eb9..821942588e 100644
--- 
a/test/unit/org/apache/cassandra/index/sai/iterators/KeyRangeConcatIteratorTest.java
+++ 
b/test/unit/org/apache/cassandra/index/sai/iterators/KeyRangeConcatIteratorTest.java
@@ -35,7 +35,7 @@ import static org.junit.Assert.assertTrue;
 
 public class KeyRangeConcatIteratorTest extends AbstractKeyRangeIteratorTester
 {
-    PrimaryKey.Factory primaryKeyFactory = new PrimaryKey.Factory(null);
+    PrimaryKey.Factory primaryKeyFactory = new 
PrimaryKey.Factory(Murmur3Partitioner.instance, null);
     @Test
     public void testValidation()
     {
@@ -427,7 +427,7 @@ public class KeyRangeConcatIteratorTest extends 
AbstractKeyRangeIteratorTester
     private String createErrorMessage(int max, int min)
     {
         return String.format(KeyRangeConcatIterator.MUST_BE_SORTED_ERROR,
-                             primaryKeyFactory.createTokenOnly(new 
Murmur3Partitioner.LongToken(max)),
-                             primaryKeyFactory.createTokenOnly(new 
Murmur3Partitioner.LongToken(min)));
+                             primaryKeyFactory.create(new 
Murmur3Partitioner.LongToken(max)),
+                             primaryKeyFactory.create(new 
Murmur3Partitioner.LongToken(min)));
     }
 }
diff --git 
a/test/unit/org/apache/cassandra/index/sai/iterators/LongIterator.java 
b/test/unit/org/apache/cassandra/index/sai/iterators/LongIterator.java
index 9a271f79d1..5c4122f99d 100644
--- a/test/unit/org/apache/cassandra/index/sai/iterators/LongIterator.java
+++ b/test/unit/org/apache/cassandra/index/sai/iterators/LongIterator.java
@@ -92,7 +92,7 @@ public class LongIterator extends KeyRangeIterator
 
     public static PrimaryKey fromToken(long token)
     {
-        return SAITester.TEST_FACTORY.createTokenOnly(new 
Murmur3Partitioner.LongToken(token));
+        return SAITester.TEST_FACTORY.create(new 
Murmur3Partitioner.LongToken(token));
     }
 
 
diff --git 
a/test/unit/org/apache/cassandra/index/sai/memory/AbstractInMemoryKeyRangeIteratorTester.java
 
b/test/unit/org/apache/cassandra/index/sai/memory/AbstractInMemoryKeyRangeIteratorTester.java
index cbf5d05675..3c21941a2e 100644
--- 
a/test/unit/org/apache/cassandra/index/sai/memory/AbstractInMemoryKeyRangeIteratorTester.java
+++ 
b/test/unit/org/apache/cassandra/index/sai/memory/AbstractInMemoryKeyRangeIteratorTester.java
@@ -35,7 +35,7 @@ public abstract class AbstractInMemoryKeyRangeIteratorTester
     @Before
     public void setup()
     {
-        primaryKeyFactory = new PrimaryKey.Factory(SAITester.EMPTY_COMPARATOR);
+        primaryKeyFactory = new 
PrimaryKey.Factory(Murmur3Partitioner.instance, SAITester.EMPTY_COMPARATOR);
     }
 
     @Test
@@ -99,7 +99,7 @@ public abstract class AbstractInMemoryKeyRangeIteratorTester
     {
         KeyRangeIterator iterator = makeIterator(1, 3, 1, 2, 3);
 
-        iterator.skipTo(primaryKeyFactory.createTokenOnly(new 
Murmur3Partitioner.LongToken(0)));
+        iterator.skipTo(primaryKeyFactory.create(new 
Murmur3Partitioner.LongToken(0)));
 
         assertIterator(iterator, 1, 2, 3);
     }
@@ -109,7 +109,7 @@ public abstract class AbstractInMemoryKeyRangeIteratorTester
     {
         KeyRangeIterator iterator = makeIterator(1, 3, 1, 2, 3);
 
-        iterator.skipTo(primaryKeyFactory.createTokenOnly(new 
Murmur3Partitioner.LongToken(1)));
+        iterator.skipTo(primaryKeyFactory.create(new 
Murmur3Partitioner.LongToken(1)));
 
         assertIterator(iterator, 1, 2, 3);
     }
@@ -119,7 +119,7 @@ public abstract class AbstractInMemoryKeyRangeIteratorTester
     {
         KeyRangeIterator iterator = makeIterator(1, 3, 1, 2, 3);
 
-        iterator.skipTo(primaryKeyFactory.createTokenOnly(new 
Murmur3Partitioner.LongToken(2)));
+        iterator.skipTo(primaryKeyFactory.create(new 
Murmur3Partitioner.LongToken(2)));
 
         assertIterator(iterator, 2, 3);
     }
@@ -129,7 +129,7 @@ public abstract class AbstractInMemoryKeyRangeIteratorTester
     {
         KeyRangeIterator iterator = makeIterator(1, 3, 1, 2, 3);
 
-        iterator.skipTo(primaryKeyFactory.createTokenOnly(new 
Murmur3Partitioner.LongToken(3)));
+        iterator.skipTo(primaryKeyFactory.create(new 
Murmur3Partitioner.LongToken(3)));
 
         assertIterator(iterator, 3);
     }
@@ -139,7 +139,7 @@ public abstract class AbstractInMemoryKeyRangeIteratorTester
     {
         KeyRangeIterator iterator = makeIterator(1, 3, 1, 2, 3);
 
-        iterator.skipTo(primaryKeyFactory.createTokenOnly(new 
Murmur3Partitioner.LongToken(4)));
+        iterator.skipTo(primaryKeyFactory.create(new 
Murmur3Partitioner.LongToken(4)));
 
         assertIterator(iterator);
     }
@@ -149,7 +149,7 @@ public abstract class AbstractInMemoryKeyRangeIteratorTester
     {
         KeyRangeIterator iterator = makeIterator(1, 3, 1, 1, 2, 2, 3, 3);
 
-        iterator.skipTo(primaryKeyFactory.createTokenOnly(new 
Murmur3Partitioner.LongToken(2)));
+        iterator.skipTo(primaryKeyFactory.create(new 
Murmur3Partitioner.LongToken(2)));
 
         assertIterator(iterator, 2, 3);
     }
@@ -168,6 +168,6 @@ public abstract class AbstractInMemoryKeyRangeIteratorTester
 
     protected PrimaryKey keyForToken(long token)
     {
-        return primaryKeyFactory.createTokenOnly(new 
Murmur3Partitioner.LongToken(token));
+        return primaryKeyFactory.create(new 
Murmur3Partitioner.LongToken(token));
     }
 }
diff --git 
a/test/unit/org/apache/cassandra/index/sai/memory/PriorityInMemoryKeyRangeIteratorTest.java
 
b/test/unit/org/apache/cassandra/index/sai/memory/PriorityInMemoryKeyRangeIteratorTest.java
index 707db7f1e9..5864c79b55 100644
--- 
a/test/unit/org/apache/cassandra/index/sai/memory/PriorityInMemoryKeyRangeIteratorTest.java
+++ 
b/test/unit/org/apache/cassandra/index/sai/memory/PriorityInMemoryKeyRangeIteratorTest.java
@@ -33,8 +33,8 @@ public class PriorityInMemoryKeyRangeIteratorTest extends 
AbstractInMemoryKeyRan
 
         Arrays.stream(tokens).forEach(t -> queue.add(keyForToken(t)));
 
-        return new 
InMemoryKeyRangeIterator(primaryKeyFactory.createTokenOnly(new 
Murmur3Partitioner.LongToken(minimumTokenValue)),
-                                            
primaryKeyFactory.createTokenOnly(new 
Murmur3Partitioner.LongToken(maximumTokenValue)),
+        return new InMemoryKeyRangeIterator(primaryKeyFactory.create(new 
Murmur3Partitioner.LongToken(minimumTokenValue)),
+                                            primaryKeyFactory.create(new 
Murmur3Partitioner.LongToken(maximumTokenValue)),
                                             queue);
     }
 }
diff --git 
a/test/unit/org/apache/cassandra/index/sai/memory/TrieMemoryIndexTest.java 
b/test/unit/org/apache/cassandra/index/sai/memory/TrieMemoryIndexTest.java
index 9cf8ff5318..3277d79dc1 100644
--- a/test/unit/org/apache/cassandra/index/sai/memory/TrieMemoryIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/sai/memory/TrieMemoryIndexTest.java
@@ -245,6 +245,7 @@ public class TrieMemoryIndexTest extends SAIRandomizedTester
         indexContext = new IndexContext(table.keyspace,
                                         table.name,
                                         table.partitionKeyType,
+                                        table.partitioner,
                                         table.comparator,
                                         target.left,
                                         target.right,
diff --git 
a/test/unit/org/apache/cassandra/index/sai/utils/AbstractPrimaryKeyTester.java 
b/test/unit/org/apache/cassandra/index/sai/utils/AbstractPrimaryKeyTester.java
index a67cf97d9a..be887bc64c 100644
--- 
a/test/unit/org/apache/cassandra/index/sai/utils/AbstractPrimaryKeyTester.java
+++ 
b/test/unit/org/apache/cassandra/index/sai/utils/AbstractPrimaryKeyTester.java
@@ -28,7 +28,6 @@ import org.apache.cassandra.db.marshal.ReversedType;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.utils.bytecomparable.ByteComparable;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
@@ -53,6 +52,13 @@ public class AbstractPrimaryKeyTester extends 
SAIRandomizedTester
                                                                                
  .addClusteringColumn("ck1", UTF8Type.instance)
                                                                                
  .build();
 
+    protected static final TableMetadata 
simplePartitionStaticAndSingleClusteringAsc = TableMetadata.builder("test", 
"test")
+                                                                               
                     .partitioner(Murmur3Partitioner.instance)
+                                                                               
                     .addPartitionKeyColumn("pk1", Int32Type.instance)
+                                                                               
                     .addStaticColumn("sk1", Int32Type.instance)
+                                                                               
                     .addClusteringColumn("ck1", UTF8Type.instance)
+                                                                               
                     .build();
+
     protected static final TableMetadata simplePartitionMultipleClusteringAsc 
= TableMetadata.builder("test", "test")
                                                                                
    .partitioner(Murmur3Partitioner.instance)
                                                                                
    .addPartitionKeyColumn("pk1", Int32Type.instance)
@@ -118,13 +124,6 @@ public class AbstractPrimaryKeyTester extends 
SAIRandomizedTester
                                                                                
         .addClusteringColumn("ck2", 
ReversedType.getInstance(UTF8Type.instance))
                                                                                
         .build();
 
-    protected void assertByteComparison(PrimaryKey a, PrimaryKey b, int 
expected)
-    {
-        assertEquals(expected, ByteComparable.compare(a::asComparableBytes,
-                                                      b::asComparableBytes,
-                                                      
ByteComparable.Version.OSS50));
-    }
-
     protected void assertCompareToAndEquals(PrimaryKey a, PrimaryKey b, int 
expected)
     {
         if (expected > 0)
diff --git 
a/test/unit/org/apache/cassandra/index/sai/utils/IndexInputLeakDetector.java 
b/test/unit/org/apache/cassandra/index/sai/utils/IndexInputLeakDetector.java
index 00518bb972..35e556f763 100644
--- a/test/unit/org/apache/cassandra/index/sai/utils/IndexInputLeakDetector.java
+++ b/test/unit/org/apache/cassandra/index/sai/utils/IndexInputLeakDetector.java
@@ -41,7 +41,7 @@ public class IndexInputLeakDetector extends TestRuleAdapter
     {
         TrackingIndexFileUtils trackingIndexFileUtils = new 
TrackingIndexFileUtils(sequentialWriterOption);
         trackedIndexFileUtils.add(trackingIndexFileUtils);
-        return IndexDescriptor.create(descriptor, tableMetadata.comparator);
+        return IndexDescriptor.create(descriptor, tableMetadata.partitioner, 
tableMetadata.comparator);
     }
 
     @Override
diff --git a/test/unit/org/apache/cassandra/index/sai/utils/PrimaryKeyTest.java 
b/test/unit/org/apache/cassandra/index/sai/utils/PrimaryKeyTest.java
index 9b9f738a4e..c64372feab 100644
--- a/test/unit/org/apache/cassandra/index/sai/utils/PrimaryKeyTest.java
+++ b/test/unit/org/apache/cassandra/index/sai/utils/PrimaryKeyTest.java
@@ -23,17 +23,18 @@ import java.util.Arrays;
 import org.junit.Test;
 
 import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.dht.Murmur3Partitioner;
 
 public class PrimaryKeyTest extends AbstractPrimaryKeyTester
 {
     @Test
     public void singlePartitionTest()
     {
-        PrimaryKey.Factory factory = new 
PrimaryKey.Factory(simplePartition.comparator);
+        PrimaryKey.Factory factory = new 
PrimaryKey.Factory(Murmur3Partitioner.instance, simplePartition.comparator);
         int rows = nextInt(10, 100);
         PrimaryKey[] keys = new PrimaryKey[rows];
         for (int index = 0; index < rows; index++)
-            keys[index] = factory.create(makeKey(simplePartition, index), 
Clustering.EMPTY);
+            keys[index] = factory.create(makeKey(simplePartition, index));
 
         Arrays.sort(keys);
 
@@ -43,11 +44,11 @@ public class PrimaryKeyTest extends AbstractPrimaryKeyTester
     @Test
     public void compositePartitionTest()
     {
-        PrimaryKey.Factory factory = new 
PrimaryKey.Factory(compositePartition.comparator);
+        PrimaryKey.Factory factory = new 
PrimaryKey.Factory(Murmur3Partitioner.instance, compositePartition.comparator);
         int rows = nextInt(10, 100);
         PrimaryKey[] keys = new PrimaryKey[rows];
         for (int index = 0; index < rows; index++)
-            keys[index] = factory.create(makeKey(compositePartition, index, 
index + 1), Clustering.EMPTY);
+            keys[index] = factory.create(makeKey(compositePartition, index, 
index + 1));
 
         Arrays.sort(keys);
 
@@ -57,7 +58,7 @@ public class PrimaryKeyTest extends AbstractPrimaryKeyTester
     @Test
     public void simplePartitonSingleClusteringAscTest()
     {
-        PrimaryKey.Factory factory = new 
PrimaryKey.Factory(simplePartitionSingleClusteringAsc.comparator);
+        PrimaryKey.Factory factory = new 
PrimaryKey.Factory(Murmur3Partitioner.instance, 
simplePartitionSingleClusteringAsc.comparator);
         int rows = nextInt(10, 100);
         PrimaryKey[] keys = new PrimaryKey[rows];
         int partition = 0;
@@ -78,10 +79,40 @@ public class PrimaryKeyTest extends AbstractPrimaryKeyTester
         compareToAndEqualsTests(factory, keys);
     }
 
+    @Test
+    public void simplePartitonStaticAndSingleClusteringAscTest()
+    {
+        PrimaryKey.Factory factory = new 
PrimaryKey.Factory(Murmur3Partitioner.instance, 
simplePartitionStaticAndSingleClusteringAsc.comparator);
+        int rows = nextInt(10, 100);
+        PrimaryKey[] keys = new PrimaryKey[rows];
+        int partition = 0;
+        int clustering = 0;
+        for (int index = 0; index < rows; index++)
+        {
+            if (clustering == 0)
+            {
+                keys[index] = 
factory.create(makeKey(simplePartitionSingleClusteringAsc, partition), 
Clustering.STATIC_CLUSTERING);
+                clustering++;
+            }
+            else
+                keys[index] = 
factory.create(makeKey(simplePartitionSingleClusteringAsc, partition),
+                                             
makeClustering(simplePartitionSingleClusteringAsc, 
Integer.toString(clustering++)));
+            if (clustering == 5)
+            {
+                clustering = 0;
+                partition++;
+            }
+        }
+
+        Arrays.sort(keys);
+
+        compareToAndEqualsTests(factory, keys);
+    }
+
     @Test
     public void simplePartitionMultipleClusteringAscTest()
     {
-        PrimaryKey.Factory factory = new 
PrimaryKey.Factory(simplePartitionMultipleClusteringAsc.comparator);
+        PrimaryKey.Factory factory = new 
PrimaryKey.Factory(Murmur3Partitioner.instance, 
simplePartitionMultipleClusteringAsc.comparator);
         int rows = nextInt(100, 1000);
         PrimaryKey[] keys = new PrimaryKey[rows];
         int partition = 0;
@@ -111,7 +142,7 @@ public class PrimaryKeyTest extends AbstractPrimaryKeyTester
     @Test
     public void simplePartitonSingleClusteringDescTest()
     {
-        PrimaryKey.Factory factory = new 
PrimaryKey.Factory(simplePartitionSingleClusteringDesc.comparator);
+        PrimaryKey.Factory factory = new 
PrimaryKey.Factory(Murmur3Partitioner.instance, 
simplePartitionSingleClusteringDesc.comparator);
         int rows = nextInt(10, 100);
         PrimaryKey[] keys = new PrimaryKey[rows];
         int partition = 0;
@@ -135,7 +166,7 @@ public class PrimaryKeyTest extends AbstractPrimaryKeyTester
     @Test
     public void simplePartitionMultipleClusteringDescTest()
     {
-        PrimaryKey.Factory factory = new 
PrimaryKey.Factory(simplePartitionMultipleClusteringDesc.comparator);
+        PrimaryKey.Factory factory = new 
PrimaryKey.Factory(Murmur3Partitioner.instance, 
simplePartitionMultipleClusteringDesc.comparator);
         int rows = nextInt(100, 1000);
         PrimaryKey[] keys = new PrimaryKey[rows];
         int partition = 0;
@@ -165,7 +196,7 @@ public class PrimaryKeyTest extends AbstractPrimaryKeyTester
     @Test
     public void compositePartitionSingleClusteringAscTest()
     {
-        PrimaryKey.Factory factory = new 
PrimaryKey.Factory(compositePartitionSingleClusteringAsc.comparator);
+        PrimaryKey.Factory factory = new 
PrimaryKey.Factory(Murmur3Partitioner.instance, 
compositePartitionSingleClusteringAsc.comparator);
         int rows = nextInt(10, 100);
         PrimaryKey[] keys = new PrimaryKey[rows];
         int partition = 0;
@@ -189,7 +220,7 @@ public class PrimaryKeyTest extends AbstractPrimaryKeyTester
     @Test
     public void compositePartitionMultipleClusteringAscTest()
     {
-        PrimaryKey.Factory factory = new 
PrimaryKey.Factory(compositePartitionMultipleClusteringAsc.comparator);
+        PrimaryKey.Factory factory = new 
PrimaryKey.Factory(Murmur3Partitioner.instance, 
compositePartitionMultipleClusteringAsc.comparator);
         int rows = nextInt(100, 1000);
         PrimaryKey[] keys = new PrimaryKey[rows];
         int partition = 0;
@@ -219,7 +250,7 @@ public class PrimaryKeyTest extends AbstractPrimaryKeyTester
     @Test
     public void compositePartitionSingleClusteringDescTest()
     {
-        PrimaryKey.Factory factory = new 
PrimaryKey.Factory(compositePartitionSingleClusteringDesc.comparator);
+        PrimaryKey.Factory factory = new 
PrimaryKey.Factory(Murmur3Partitioner.instance, 
compositePartitionSingleClusteringDesc.comparator);
         int rows = nextInt(10, 100);
         PrimaryKey[] keys = new PrimaryKey[rows];
         int partition = 0;
@@ -243,7 +274,7 @@ public class PrimaryKeyTest extends AbstractPrimaryKeyTester
     @Test
     public void compositePartitionMultipleClusteringDescTest()
     {
-        PrimaryKey.Factory factory = new 
PrimaryKey.Factory(compositePartitionMultipleClusteringDesc.comparator);
+        PrimaryKey.Factory factory = new 
PrimaryKey.Factory(Murmur3Partitioner.instance, 
compositePartitionMultipleClusteringDesc.comparator);
         int rows = nextInt(100, 1000);
         PrimaryKey[] keys = new PrimaryKey[rows];
         int partition = 0;
@@ -273,7 +304,7 @@ public class PrimaryKeyTest extends AbstractPrimaryKeyTester
     @Test
     public void simplePartitionMultipleClusteringMixedTest()
     {
-        PrimaryKey.Factory factory = new 
PrimaryKey.Factory(simplePartitionMultipleClusteringMixed.comparator);
+        PrimaryKey.Factory factory = new 
PrimaryKey.Factory(Murmur3Partitioner.instance, 
simplePartitionMultipleClusteringMixed.comparator);
         int rows = nextInt(100, 1000);
         PrimaryKey[] keys = new PrimaryKey[rows];
         int partition = 0;
@@ -303,7 +334,7 @@ public class PrimaryKeyTest extends AbstractPrimaryKeyTester
     @Test
     public void compositePartitionMultipleClusteringMixedTest()
     {
-        PrimaryKey.Factory factory = new 
PrimaryKey.Factory(compositePartitionMultipleClusteringMixed.comparator);
+        PrimaryKey.Factory factory = new 
PrimaryKey.Factory(Murmur3Partitioner.instance, 
compositePartitionMultipleClusteringMixed.comparator);
         int rows = nextInt(100, 1000);
         PrimaryKey[] keys = new PrimaryKey[rows];
         int partition = 0;
@@ -335,7 +366,7 @@ public class PrimaryKeyTest extends AbstractPrimaryKeyTester
         for (int index = 0; index < keys.length - 1; index++)
         {
             PrimaryKey key = keys[index];
-            PrimaryKey tokenOnlyKey = factory.createTokenOnly(key.token());
+            PrimaryKey tokenOnlyKey = factory.create(key.token());
 
             assertCompareToAndEquals(tokenOnlyKey, key, 0);
             assertCompareToAndEquals(key, key, 0);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to