maedhroz commented on code in PR #4353:
URL: https://github.com/apache/cassandra/pull/4353#discussion_r2710947473


##########
src/java/org/apache/cassandra/index/sai/plan/QueryController.java:
##########
@@ -313,117 +360,90 @@ public boolean doesNotSelect(PrimaryKey key)
     }
 
     // This is an ANN only query
-    public KeyRangeIterator getTopKRows(RowFilter.Expression expression)
+    public CloseableIterator<PrimaryKeyWithScore> 
getTopKRows(QueryViewBuilder.QueryExpressionView queryExpressionView)
     {
-        assert expression.operator() == Operator.ANN;
-        StorageAttachedIndex index = indexFor(expression);
-        assert index != null;
-        Expression planExpression = Expression.create(index).add(Operator.ANN, 
expression.getIndexValue().duplicate());
-
-        QueryViewBuilder.QueryView queryView = new 
QueryViewBuilder(Collections.singleton(planExpression), mergeRange).build();
-        Runnable onClose = () -> 
queryView.referencedIndexes.forEach(SSTableIndex::releaseQuietly);
-
+        assert queryExpressionView.expression.operator == 
Expression.IndexOperator.ANN;
+        List<CloseableIterator<PrimaryKeyWithScore>> intermediateResults = new 
ArrayList<>();
         try
         {
-            List<KeyRangeIterator> memtableResults = queryView.view
-                                                              .stream()
-                                                              .map(v -> 
v.memtableIndexes)
-                                                              
.flatMap(Collection::stream)
-                                                              .map(idx -> 
idx.search(queryContext, planExpression, mergeRange))
-                                                              
.collect(Collectors.toList());
-
-            List<KeyRangeIterator> sstableIntersections = queryView.view
-                                                                   .stream()
-                                                                   
.map(this::createRowIdIterator)
-                                                                   
.collect(Collectors.toList());
-
-            return IndexSearchResultIterator.build(sstableIntersections, 
memtableResults, queryView.referencedIndexes, queryContext, onClose);
+            for (MemtableIndex memtableIndex : 
queryExpressionView.memtableIndexes)
+                intermediateResults.add(memtableIndex.orderBy(queryContext, 
queryExpressionView.expression, mergeRange));
+            for (SSTableIndex sstableIndex : 
queryExpressionView.sstableIndexes)
+                
intermediateResults.addAll(sstableIndex.orderBy(queryExpressionView.expression, 
mergeRange, queryContext));
+            return intermediateResults.isEmpty() ? CloseableIterator.empty()
+                                                 : new 
MergePrimaryKeyWithScoreIterator(intermediateResults);
         }
         catch (Throwable t)
         {
             // all sstable indexes in view have been referenced, need to clean 
up when exception is thrown
-            onClose.run();
-            throw t;
+            
queryExpressionView.sstableIndexes.forEach(SSTableIndex::releaseQuietly);
+            intermediateResults.forEach(FileUtils::closeQuietly);
+            throw Throwables.cleaned(t);
         }
     }
 
     // This is a hybrid query. We apply all other predicates before ordering 
and limiting.
-    public KeyRangeIterator getTopKRows(KeyRangeIterator source, 
RowFilter.Expression expression)
+    public CloseableIterator<PrimaryKeyWithScore> getTopKRows(KeyRangeIterator 
source, QueryViewBuilder.QueryExpressionView queryExpressionView)
     {
-        return new KeyRangeOrderingIterator(source, orderChunkSize, list -> 
this.getTopKRows(list, expression));
+        List<PrimaryKey> primaryKeys = materializeKeysAndCloseSource(source);
+        if (primaryKeys.isEmpty())
+            return CloseableIterator.empty();
+        return getTopKRows(primaryKeys, queryExpressionView);
     }
 
-    private KeyRangeIterator getTopKRows(List<PrimaryKey> rawSourceKeys, 
RowFilter.Expression expression)
+    private CloseableIterator<PrimaryKeyWithScore> 
getTopKRows(List<PrimaryKey> sourceKeys, QueryViewBuilder.QueryExpressionView 
queryExpressionView)
     {
-        VectorQueryContext vectorQueryContext = queryContext.vectorContext();
-        // Filter out PKs now. Each PK is passed to every segment of the ANN 
index, so filtering shadowed keys
-        // eagerly can save some work when going from PK to row id for on disk 
segments.
-        // Since the result is shared with multiple streams, we use an 
unmodifiable list.
-        List<PrimaryKey> sourceKeys = 
rawSourceKeys.stream().filter(vectorQueryContext::shouldInclude).collect(Collectors.toList());
-        StorageAttachedIndex index = indexFor(expression);
-        assert index != null : "Cannot do ANN ordering on an unindexed column";
-        Expression planExpression = Expression.create(index);
-        planExpression.add(Operator.ANN, 
expression.getIndexValue().duplicate());
-
-        QueryViewBuilder.QueryView queryView = new 
QueryViewBuilder(Collections.singleton(planExpression), mergeRange).build();
-        Runnable onClose = () -> 
queryView.referencedIndexes.forEach(SSTableIndex::releaseQuietly);
-
+        List<CloseableIterator<PrimaryKeyWithScore>> intermediateResults = new 
ArrayList<>();
         try
         {
-            List<KeyRangeIterator> memtableResults = queryView.view
-                                                              .stream()
-                                                              .map(v -> 
v.memtableIndexes)
-                                                              
.flatMap(Collection::stream)
-                                                              .map(idx -> 
idx.limitToTopResults(sourceKeys, planExpression, vectorQueryContext.limit()))
-                                                              
.collect(Collectors.toList());
-
-            List<KeyRangeIterator> sstableIntersections = queryView.view
-                                                                   .stream()
-                                                                   
.flatMap(pair -> pair.sstableIndexes.stream())
-                                                                   .map(idx -> 
{
-                                                                       try
-                                                                       {
-                                                                           
return idx.limitToTopKResults(queryContext, sourceKeys, planExpression);
-                                                                       }
-                                                                       catch 
(IOException e)
-                                                                       {
-                                                                           
throw new UncheckedIOException(e);
-                                                                       }
-                                                                   })
-                                                                   
.collect(Collectors.toList());
-
-            return IndexSearchResultIterator.build(sstableIntersections, 
memtableResults, queryView.referencedIndexes, queryContext, onClose);
+            for (MemtableIndex memtableIndex : 
queryExpressionView.memtableIndexes)
+                
intermediateResults.add(memtableIndex.orderResultsBy(queryContext, sourceKeys, 
queryExpressionView.expression));
+            for (SSTableIndex sstableIndex : 
queryExpressionView.sstableIndexes)
+                
intermediateResults.addAll(sstableIndex.orderResultsBy(queryContext, 
sourceKeys, queryExpressionView.expression));
+            return intermediateResults.isEmpty() ? CloseableIterator.empty()
+                                                 : new 
MergePrimaryKeyWithScoreIterator(intermediateResults);
         }
         catch (Throwable t)
         {
             // all sstable indexes in view have been referenced, need to clean 
up when exception is thrown
-            onClose.run();
-            throw t;
+            
queryExpressionView.sstableIndexes.forEach(SSTableIndex::releaseQuietly);
+            intermediateResults.forEach(FileUtils::closeQuietly);
+            throw Throwables.cleaned(t);
         }
     }
 
     /**
-     * Create row id iterator from different indexes' on-disk searcher of the 
same sstable
+     * Materialize the keys from the given source iterator. If there is a 
meaningful {@link #mergeRange}, the keys
+     * are filtered to only include those within the range. Note: closes the 
source iterator.
+     * @param source The source iterator to fully consume by materializing its 
keys
+     * @return The list of materialized keys within the {@link #mergeRange}.
      */
-    private KeyRangeIterator 
createRowIdIterator(QueryViewBuilder.QueryExpressionView indexExpression)
+    private List<PrimaryKey> materializeKeysAndCloseSource(KeyRangeIterator 
source)
     {
-        List<KeyRangeIterator> subIterators = indexExpression.sstableIndexes
-                           .stream()
-                           .map(index ->
-                                {
-                                    try
-                                    {
-                                        List<KeyRangeIterator> iterators = 
index.search(indexExpression.expression, mergeRange, queryContext);
-                                        // concat the result from multiple 
segments for the same index
-                                        return 
KeyRangeConcatIterator.builder(iterators.size()).add(iterators).build();
-                                    }
-                                    catch (Throwable ex)
-                                    {
-                                        throw Throwables.cleaned(ex);
-                                    }
-                                }).collect(Collectors.toList());
-
-        return KeyRangeUnionIterator.build(subIterators);
+        try (source)
+        {
+            // Skip to the first key (which is really just a token) in the 
range if it is not the minimum token
+            if (!mergeRange.left.isMinimum())
+                source.skipTo(firstPrimaryKey);
+
+            if (!source.hasNext())
+                return List.of();
+
+            PrimaryKey maxToken = 
keyFactory.create(mergeRange.right.getToken());
+            boolean hasLimitingMaxToken = !maxToken.token().isMinimum() && 
maxToken.compareTo(source.getMaximum()) < 0;
+            List<PrimaryKey> primaryKeys = new ArrayList<>();
+            int count = 0;
+            while (source.hasNext())
+            {
+                PrimaryKey next = source.next();
+                if (hasLimitingMaxToken && next.compareTo(maxToken) > 0)
+                    break;
+                primaryKeys.add(next);
+                if (MAX_MATERIALIZED_KEYS < ++count)
+                    throw new 
QueryMaterializesTooManyPrimaryKeysException("Too many primary keys. Attempted 
to load more than: " + MAX_MATERIALIZED_KEYS);

Review Comment:
   I've mulled this back and forth a bit, but I think we want to consider two 
things:
   
   1.) Moving `max_materialized_keys` to something much less than 1M. 
`ReplicaFilteringProtectionOptions` is a similar place where we accumulate keys 
in memory, and it defaults to a warning at 2000 (and failure at 32k). I think 
we should go with something more conservative like 4096 (if we're into powers 
of 2).
   
   2.) Rather than failing the query, we should just back out and hit the 
"orderBy" pathway if we hit the max/threshold. It's entirely possible that with 
an extremely non-selective filter, we'll still be able to complete the query 
before the timeout, because post-filtering won't filter many things out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to