adelapena commented on code in PR #2673: URL: https://github.com/apache/cassandra/pull/2673#discussion_r1348558477
########## src/java/org/apache/cassandra/service/reads/range/ScanAllRangesCommandIterator.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.reads.range; + +import java.util.HashSet; +import java.util.Set; + +import com.google.common.base.Preconditions; + +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.EmptyIterators; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.PartitionRangeReadCommand; +import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.index.Index; +import org.apache.cassandra.locator.EndpointsForRange; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.ReplicaPlan; +import org.apache.cassandra.locator.ReplicaPlans; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.service.reads.DataResolver; +import org.apache.cassandra.service.reads.ReadCallback; +import org.apache.cassandra.service.reads.repair.NoopReadRepair; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.CloseableIterator; + +/** + * A custom {@link RangeCommandIterator} that queries all replicas required by consistency level at once with data range + * specify in {@link PartitionRangeReadCommand}. + * + * This is to speed up {@link Index.QueryPlan#isTopK()} queries that needs to find global top-k rows in the cluster, because + * existing {@link RangeCommandIterator} has to execute a top-k search per vnode range which is wasting resources. + */ +public class ScanAllRangesCommandIterator extends RangeCommandIterator Review Comment: I think I would extend the class JavaDoc a bit to mention that the replica plans for each data range are combined into a single shared replica plan, and perhaps mention the consequences for reconciliation. Particularly, that queries with CL=ONE/LOCAL_ONE will use reconciliation. ########## src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java: ########## @@ -274,10 +292,51 @@ else if (relation.isLIKE()) throw invalidRequest("Non PRIMARY KEY columns found in where clause: %s ", Joiner.on(", ").join(nonPrimaryKeyColumns)); } + + var annRestriction = Streams.stream(nonPrimaryKeyRestrictions).filter(SingleRestriction::isANN).findFirst(); + if (annRestriction.isPresent()) + { + // If there is an ANN restriction then it must be for a vector<float, n> column, and it must have an index + var annColumn = annRestriction.get().getFirstColumn(); + + if (!annColumn.type.isVector() || !(((VectorType<?>)annColumn.type).elementType instanceof FloatType)) + throw invalidRequest(StatementRestrictions.ANN_ONLY_SUPPORTED_ON_VECTOR_MESSAGE); + if (indexRegistry == null || indexRegistry.listIndexes().stream().noneMatch(i -> i.dependsOn(annColumn))) + throw invalidRequest(StatementRestrictions.ANN_REQUIRES_INDEX_MESSAGE); + // We do not allow ANN query filtering using non-indexed columns + var nonAnnColumns = Streams.stream(nonPrimaryKeyRestrictions).filter(r -> !r.isANN()).map(r -> r.getFirstColumn()).collect(Collectors.toList()); + var clusteringColumns = clusteringColumnsRestrictions.getColumnDefinitions(); + if (!nonAnnColumns.isEmpty() || !clusteringColumns.isEmpty()) + { + var nonIndexedColumns = Stream.concat(nonAnnColumns.stream(), clusteringColumns.stream()) Review Comment: I think we also need to consider `partitionKeyRestrictions` to exclude restrictions on columns that are part of a multi-column partition key. Those also require `ALLOW FILTERING` and post-filtering. ########## test/unit/org/apache/cassandra/index/sai/cql/VectorSegmentationTest.java: ########## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.index.sai.cql; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.Test; + +import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.db.marshal.FloatType; +import org.apache.cassandra.db.marshal.VectorType; +import org.apache.cassandra.index.sai.disk.v1.segment.SegmentBuilder; + +import static org.assertj.core.api.Assertions.assertThat; + +public class VectorSegmentationTest extends VectorTester +{ + private static final int dimension = 100; + + @Test + public void testMultipleSegmentsForCreatingIndex() throws Throwable + { + createTable("CREATE TABLE %s (pk int, val vector<float, " + dimension + ">, PRIMARY KEY(pk))"); + + int vectorCount = 100; + List<float[]> vectors = new ArrayList<>(); + for (int row = 0; row < vectorCount; row++) + { + float[] vector = nextVector(); + vectors.add(vector); + execute("INSERT INTO %s (pk, val) VALUES (?, ?)", row, vector(vector)); + } + + flush(); + + SegmentBuilder.updateLastValidSegmentRowId(17); // 17 rows per segment + createIndex("CREATE CUSTOM INDEX ON %s(val) USING 'StorageAttachedIndex'"); + + int limit = 35; + float[] queryVector = nextVector(); + UntypedResultSet resultSet = execute("SELECT * FROM %s ORDER BY val ANN OF ? LIMIT " + limit, vector(queryVector)); + assertThat(resultSet.size()).isEqualTo(limit); + + List<float[]> resultVectors = getVectorsFromResult(resultSet); + double recall = rawIndexedRecall(vectors, queryVector, resultVectors, limit); + assertThat(recall).isGreaterThanOrEqualTo(0.99); + } + + @Test + public void testMultipleSegmentsForCompaction() throws Throwable + { + createTable("CREATE TABLE %s (pk int, val vector<float, " + dimension + ">, PRIMARY KEY(pk))"); + createIndex("CREATE CUSTOM INDEX ON %s(val) USING 'StorageAttachedIndex'"); + + List<float[]> vectors = new ArrayList<>(); + int rowsPerSSTable = 10; + int sstables = 5; + int pk = 0; + for (int i = 0; i < sstables; i++) + { + for (int row = 0; row < rowsPerSSTable; row++) + { + float[] vector = nextVector(); + execute("INSERT INTO %s (pk, val) VALUES (?, ?)", pk++, vector(vector)); + vectors.add(vector); + } + + flush(); + } + + int limit = 30; + float[] queryVector = nextVector(); + UntypedResultSet resultSet = execute("SELECT * FROM %s ORDER BY val ANN OF ? LIMIT " + limit, vector(queryVector)); + assertThat(resultSet.size()).isEqualTo(limit); + + List<float[]> resultVectors = getVectorsFromResult(resultSet); + double recall = rawIndexedRecall(vectors, queryVector, resultVectors, limit); + assertThat(recall).isGreaterThanOrEqualTo(0.99); + + + SegmentBuilder.updateLastValidSegmentRowId(11); // 11 rows per segment + compact(); + + queryVector = nextVector(); + resultSet = execute("SELECT * FROM %s ORDER BY val ANN OF ? LIMIT " + limit, vector(queryVector)); + assertThat(resultSet.size()).isEqualTo(limit); + + resultVectors = getVectorsFromResult(resultSet); + recall = rawIndexedRecall(vectors, queryVector, resultVectors, limit); + assertThat(recall).isGreaterThanOrEqualTo(0.99); + } + + protected Vector<Float> vector(float[] values) + { + Float[] floats = new Float[values.length]; + for (int i = 0; i < values.length; i++) + floats[i] = values[i]; + + return new Vector<>(floats); + } Review Comment: I think this method is overriding `CQLTester#vector(float[])` with a mostly identical implementation. ########## src/java/org/apache/cassandra/service/reads/DataResolver.java: ########## @@ -154,15 +155,28 @@ private ResolveContext(E replicas) true, command.selectsFullPartition(), enforceStrictLiveness); + + // In case of top-k query, do not trim reconciled rows here because QueryPlan#postProcessor() + // needs to compare all rows + if (command.isTopK()) + this.mergedResultCounter.onlyCount(); } private boolean needsReadRepair() { + // each replica may return different estimated top-K rows, it doesn't mean data is not replicated. + if (command.isTopK()) + return false; Review Comment: We don't need this change, since the check right below makes sure RR is off when the number of replicas is greater than one. Indeed, the distributed tests pass without it. ########## src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java: ########## @@ -90,17 +99,28 @@ import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.IndexMetadata; import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.ClientWarn; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.concurrent.OpOrder; +import static org.apache.cassandra.index.sai.disk.v1.IndexWriterConfig.MAX_TOP_K; + public class StorageAttachedIndex implements Index { public static final String NAME = "sai"; + public static final String VECTOR_USAGE_WARNING = "SAI ANN indexes on vector columns are experimental and are not recommended for production use.\n" + Review Comment: We probably should add a line about the inability to combine ANN with filtering. ########## src/java/org/apache/cassandra/service/reads/DataResolver.java: ########## @@ -154,15 +155,28 @@ private ResolveContext(E replicas) true, command.selectsFullPartition(), enforceStrictLiveness); + + // In case of top-k query, do not trim reconciled rows here because QueryPlan#postProcessor() + // needs to compare all rows + if (command.isTopK()) + this.mergedResultCounter.onlyCount(); } private boolean needsReadRepair() { + // each replica may return different estimated top-K rows, it doesn't mean data is not replicated. + if (command.isTopK()) + return false; Review Comment: Ah, I see what's going on here. The methods to skip SRP and RR are based on the number of replicas on the replica plan, which is identical to the number of responses, etc. Queries with CL=ONE/LOCAL_ONE should use a single replica, so the code skips SRP and RR if there is a single replica and thereof there is nothing to reconcile. However, this patch adds a new `ScanAllRangesCommandIterator` that combines the separate replica plans of each data range into a single shared replica plan. That's why the `DataResolver` can see multiple replicas in top-k queries with CL=ONE/LOCAL_ONE. I guess we can keep the `command.isTopK` checks, but add some comments about how the replica plan in top-k queries can have multiple replicas independently of the CL. A thing that can be revisited in the future is how the new `ScanAllRangesCommandIterator` forces the reconciliation of disjoint ranges, which seems wasteful. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

