adelapena commented on code in PR #2673:
URL: https://github.com/apache/cassandra/pull/2673#discussion_r1348558477


##########
src/java/org/apache/cassandra/service/reads/range/ScanAllRangesCommandIterator.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.range;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.EmptyIterators;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.PartitionRangeReadCommand;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.index.Index;
+import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.locator.ReplicaPlans;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.reads.DataResolver;
+import org.apache.cassandra.service.reads.ReadCallback;
+import org.apache.cassandra.service.reads.repair.NoopReadRepair;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.CloseableIterator;
+
+/**
+ * A custom {@link RangeCommandIterator} that queries all replicas required by 
consistency level at once with data range
+ * specify in {@link PartitionRangeReadCommand}.
+ *
+ * This is to speed up {@link Index.QueryPlan#isTopK()} queries that needs to 
find global top-k rows in the cluster, because
+ * existing {@link RangeCommandIterator} has to execute a top-k search per 
vnode range which is wasting resources.
+ */
+public class ScanAllRangesCommandIterator extends RangeCommandIterator

Review Comment:
   I think I would extend the class JavaDoc a bit to mention that the replica 
plans for each data range are combined into a single shared replica plan, and 
perhaps mention the consequences for reconciliation. Particularly, that queries 
with CL=ONE/LOCAL_ONE will use reconciliation.



##########
src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java:
##########
@@ -274,10 +292,51 @@ else if (relation.isLIKE())
                 throw invalidRequest("Non PRIMARY KEY columns found in where 
clause: %s ",
                                      Joiner.on(", 
").join(nonPrimaryKeyColumns));
             }
+
+            var annRestriction = 
Streams.stream(nonPrimaryKeyRestrictions).filter(SingleRestriction::isANN).findFirst();
+            if (annRestriction.isPresent())
+            {
+                // If there is an ANN restriction then it must be for a 
vector<float, n> column, and it must have an index
+                var annColumn = annRestriction.get().getFirstColumn();
+
+                if (!annColumn.type.isVector() || 
!(((VectorType<?>)annColumn.type).elementType instanceof FloatType))
+                    throw 
invalidRequest(StatementRestrictions.ANN_ONLY_SUPPORTED_ON_VECTOR_MESSAGE);
+                if (indexRegistry == null || 
indexRegistry.listIndexes().stream().noneMatch(i -> i.dependsOn(annColumn)))
+                    throw 
invalidRequest(StatementRestrictions.ANN_REQUIRES_INDEX_MESSAGE);
+                // We do not allow ANN query filtering using non-indexed 
columns
+                var nonAnnColumns = 
Streams.stream(nonPrimaryKeyRestrictions).filter(r -> !r.isANN()).map(r -> 
r.getFirstColumn()).collect(Collectors.toList());
+                var clusteringColumns = 
clusteringColumnsRestrictions.getColumnDefinitions();
+                if (!nonAnnColumns.isEmpty() || !clusteringColumns.isEmpty())
+                {
+                    var nonIndexedColumns = 
Stream.concat(nonAnnColumns.stream(), clusteringColumns.stream())

Review Comment:
   I think we also need to consider `partitionKeyRestrictions` to exclude 
restrictions on columns that are part of a multi-column partition key. Those 
also require `ALLOW FILTERING` and post-filtering.



##########
test/unit/org/apache/cassandra/index/sai/cql/VectorSegmentationTest.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.index.sai.cql;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Test;
+
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.marshal.FloatType;
+import org.apache.cassandra.db.marshal.VectorType;
+import org.apache.cassandra.index.sai.disk.v1.segment.SegmentBuilder;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class VectorSegmentationTest extends VectorTester
+{
+    private static final int dimension = 100;
+
+    @Test
+    public void testMultipleSegmentsForCreatingIndex() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, val vector<float, " + dimension 
+ ">, PRIMARY KEY(pk))");
+
+        int vectorCount = 100;
+        List<float[]> vectors = new ArrayList<>();
+        for (int row = 0; row < vectorCount; row++)
+        {
+            float[] vector = nextVector();
+            vectors.add(vector);
+            execute("INSERT INTO %s (pk, val) VALUES (?, ?)", row, 
vector(vector));
+        }
+
+        flush();
+
+        SegmentBuilder.updateLastValidSegmentRowId(17); // 17 rows per segment
+        createIndex("CREATE CUSTOM INDEX ON %s(val) USING 
'StorageAttachedIndex'");
+
+        int limit = 35;
+        float[] queryVector = nextVector();
+        UntypedResultSet resultSet = execute("SELECT * FROM %s ORDER BY val 
ANN OF ? LIMIT " + limit, vector(queryVector));
+        assertThat(resultSet.size()).isEqualTo(limit);
+
+        List<float[]> resultVectors = getVectorsFromResult(resultSet);
+        double recall = rawIndexedRecall(vectors, queryVector, resultVectors, 
limit);
+        assertThat(recall).isGreaterThanOrEqualTo(0.99);
+    }
+
+    @Test
+    public void testMultipleSegmentsForCompaction() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, val vector<float, " + dimension 
+ ">, PRIMARY KEY(pk))");
+        createIndex("CREATE CUSTOM INDEX ON %s(val) USING 
'StorageAttachedIndex'");
+
+        List<float[]> vectors = new ArrayList<>();
+        int rowsPerSSTable = 10;
+        int sstables = 5;
+        int pk = 0;
+        for (int i = 0; i < sstables; i++)
+        {
+            for (int row = 0; row < rowsPerSSTable; row++)
+            {
+                float[] vector = nextVector();
+                execute("INSERT INTO %s (pk, val) VALUES (?, ?)", pk++, 
vector(vector));
+                vectors.add(vector);
+            }
+
+            flush();
+        }
+
+        int limit = 30;
+        float[] queryVector = nextVector();
+        UntypedResultSet resultSet = execute("SELECT * FROM %s ORDER BY val 
ANN OF ? LIMIT " + limit, vector(queryVector));
+        assertThat(resultSet.size()).isEqualTo(limit);
+
+        List<float[]> resultVectors = getVectorsFromResult(resultSet);
+        double recall = rawIndexedRecall(vectors, queryVector, resultVectors, 
limit);
+        assertThat(recall).isGreaterThanOrEqualTo(0.99);
+
+
+        SegmentBuilder.updateLastValidSegmentRowId(11); // 11 rows per segment
+        compact();
+
+        queryVector = nextVector();
+        resultSet = execute("SELECT * FROM %s ORDER BY val ANN OF ? LIMIT " + 
limit, vector(queryVector));
+        assertThat(resultSet.size()).isEqualTo(limit);
+
+        resultVectors = getVectorsFromResult(resultSet);
+        recall = rawIndexedRecall(vectors, queryVector, resultVectors, limit);
+        assertThat(recall).isGreaterThanOrEqualTo(0.99);
+    }
+
+    protected Vector<Float> vector(float[] values)
+    {
+        Float[] floats = new Float[values.length];
+        for (int i = 0; i < values.length; i++)
+            floats[i] = values[i];
+
+        return new Vector<>(floats);
+    }

Review Comment:
   I think this method is overriding `CQLTester#vector(float[])` with a mostly 
identical implementation.



##########
src/java/org/apache/cassandra/service/reads/DataResolver.java:
##########
@@ -154,15 +155,28 @@ private ResolveContext(E replicas)
                                                                    true,
                                                                    
command.selectsFullPartition(),
                                                                    
enforceStrictLiveness);
+
+            // In case of top-k query, do not trim reconciled rows here 
because QueryPlan#postProcessor()
+            // needs to compare all rows
+            if (command.isTopK())
+                this.mergedResultCounter.onlyCount();
         }
 
         private boolean needsReadRepair()
         {
+            // each replica may return different estimated top-K rows, it 
doesn't mean data is not replicated.
+            if (command.isTopK())
+                return false;

Review Comment:
   We don't need this change, since the check right below makes sure RR is off 
when the number of replicas is greater than one. Indeed, the distributed tests 
pass without it.



##########
src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java:
##########
@@ -90,17 +99,28 @@
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 
+import static 
org.apache.cassandra.index.sai.disk.v1.IndexWriterConfig.MAX_TOP_K;
+
 public class StorageAttachedIndex implements Index
 {
     public static final String NAME = "sai";
 
+    public static final String VECTOR_USAGE_WARNING = "SAI ANN indexes on 
vector columns are experimental and are not recommended for production use.\n" +

Review Comment:
   We probably should add a line about the inability to combine ANN with 
filtering.



##########
src/java/org/apache/cassandra/service/reads/DataResolver.java:
##########
@@ -154,15 +155,28 @@ private ResolveContext(E replicas)
                                                                    true,
                                                                    
command.selectsFullPartition(),
                                                                    
enforceStrictLiveness);
+
+            // In case of top-k query, do not trim reconciled rows here 
because QueryPlan#postProcessor()
+            // needs to compare all rows
+            if (command.isTopK())
+                this.mergedResultCounter.onlyCount();
         }
 
         private boolean needsReadRepair()
         {
+            // each replica may return different estimated top-K rows, it 
doesn't mean data is not replicated.
+            if (command.isTopK())
+                return false;

Review Comment:
   Ah, I see what's going on here. The methods to skip SRP and RR are based on 
the number of replicas on the replica plan, which is identical to the number of 
responses, etc. Queries with CL=ONE/LOCAL_ONE should use a single replica, so 
the code skips SRP and RR if there is a single replica and thereof there is 
nothing to reconcile. 
   
   However, this patch adds a new `ScanAllRangesCommandIterator` that combines 
the separate replica plans of each data range into a single shared replica 
plan. That's why the `DataResolver` can see multiple replicas in top-k queries 
with CL=ONE/LOCAL_ONE. 
   
   I guess we can keep the `command.isTopK` checks, but add some comments about 
how the replica plan in top-k queries can have multiple replicas independently 
of the CL.
   
   A thing that can be revisited in the future is how the new 
`ScanAllRangesCommandIterator` forces the reconciliation of disjoint ranges, 
which seems wasteful.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to