maedhroz commented on code in PR #4353:
URL: https://github.com/apache/cassandra/pull/4353#discussion_r2678323966


##########
src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java:
##########
@@ -468,79 +501,321 @@ private UnfilteredRowIterator 
queryStorageAndFilter(List<PrimaryKey> keys)
             }
         }
 
-        private UnfilteredRowIterator filterPartition(List<PrimaryKey> keys, 
UnfilteredRowIterator partition, FilterTree tree)
+        @Override
+        public TableMetadata metadata()
         {
-            Row staticRow = partition.staticRow();
-            DecoratedKey partitionKey = partition.partitionKey();
-            List<Unfiltered> matches = new ArrayList<>();
-            boolean hasMatch = false;
-            Set<PrimaryKey> keysToShadow = topK ? new HashSet<>(keys) : 
Collections.emptySet();
+            return queryController.metadata();
+        }
 
-            while (partition.hasNext())
-            {
-                Unfiltered unfiltered = partition.next();
+        @Override
+        public void close()
+        {
+            FileUtils.closeQuietly(resultKeyIterator);
+            if (tableQueryMetrics != null) 
tableQueryMetrics.record(queryContext);
+        }
+    }
 
-                if (unfiltered.isRow())
-                {
-                    queryContext.rowsFiltered++;
+    private static UnfilteredRowIterator filterPartition(UnfilteredRowIterator 
partition, FilterTree tree, QueryContext context)
+    {
+        Row staticRow = partition.staticRow();
+        DecoratedKey partitionKey = partition.partitionKey();
+        List<Unfiltered> matches = new ArrayList<>();
+        boolean hasMatch = false;
 
-                    if (tree.isSatisfiedBy(partitionKey, (Row) unfiltered, 
staticRow))
-                    {
-                        matches.add(unfiltered);
-                        hasMatch = true;
+        // todo static keys??
+        while (partition.hasNext())
+        {
+            Unfiltered unfiltered = partition.next();
 
-                        if (topK)
-                        {
-                            PrimaryKey shadowed = 
keyFactory.hasClusteringColumns()
-                                                  ? 
keyFactory.create(partitionKey, ((Row) unfiltered).clustering())
-                                                  : 
keyFactory.create(partitionKey);
-                            keysToShadow.remove(shadowed);
-                        }
-                    }
+            if (unfiltered.isRow())
+            {
+                context.rowsFiltered++;
+
+                if (tree.isSatisfiedBy(partitionKey, (Row) unfiltered, 
staticRow))
+                {
+                    matches.add(unfiltered);
+                    hasMatch = true;
                 }
             }
+        }
 
-            // If any non-static rows match the filter, there should be no 
need to shadow the static primary key:
-            if (topK && hasMatch && keyFactory.hasClusteringColumns())
-                keysToShadow.remove(keyFactory.create(partitionKey, 
Clustering.STATIC_CLUSTERING));
+        // We may not have any non-static row data to filter...
+        if (!hasMatch)
+        {
+            context.rowsFiltered++;
 
-            // We may not have any non-static row data to filter...
-            if (!hasMatch)
+            if (tree.isSatisfiedBy(partitionKey, staticRow, staticRow))
             {
-                queryContext.rowsFiltered++;
+                hasMatch = true;
+            }
+        }
 
-                if (tree.isSatisfiedBy(partitionKey, staticRow, staticRow))
-                {
-                    hasMatch = true;
+        if (!hasMatch)
+        {
+            // If there are no matches, return an empty partition. If 
reconciliation is required at the
+            // coordinator, replica filtering protection may make a second 
round trip to complete its view
+            // of the partition.
+            return null;
+        }
 
-                    if (topK)
-                        keysToShadow.clear();
-                }
+        // Return all matches found, along with the static row...
+        return new SinglePartitionIterator(partition, staticRow, 
matches.iterator());
+    }
+
+    private static class SinglePartitionIterator extends 
AbstractUnfilteredRowIterator
+    {
+        private final Iterator<Unfiltered> rows;
+
+        public SinglePartitionIterator(UnfilteredRowIterator partition, Row 
staticRow, Iterator<Unfiltered> rows)
+        {
+            super(partition.metadata(),
+                  partition.partitionKey(),
+                  partition.partitionLevelDeletion(),
+                  partition.columns(),
+                  staticRow,
+                  partition.isReverseOrder(),
+                  partition.stats());
+
+            this.rows = rows;
+        }
+
+        @Override
+        protected Unfiltered computeNext()
+        {
+            return rows.hasNext() ? rows.next() : endOfData();
+        }
+    }
+
+    /**
+     * A result retriever that consumes an iterator primary keys sorted by 
some score, materializes the row for each
+     * primary key (currently, each primary key is required to be fully 
qualified and should only point to one row),
+     * apply the filter tree to the row to test that the real row satisfies 
the WHERE clause, and finally tests
+     * that the row is valid for the ORDER BY clause. The class performs some 
optimizations to avoid materializing
+     * rows unnecessarily. See the class for more details.
+     * <p>
+     * The resulting {@link UnfilteredRowIterator} objects are not guaranteed 
to be in any particular order. It is
+     * the responsibility of the caller to sort the results if necessary.
+     */
+    public static class ScoreOrderedResultRetriever extends 
AbstractIterator<UnfilteredRowIterator> implements UnfilteredPartitionIterator
+    {
+        private final ColumnFamilyStore.ViewFragment view;
+        private final List<AbstractBounds<PartitionPosition>> keyRanges;
+        private final boolean coversFullRing;
+        private final CloseableIterator<PrimaryKeyWithScore> 
scoredPrimaryKeyIterator;
+        private final FilterTree filterTree;
+        private final QueryController controller;
+        private final ReadExecutionController executionController;
+        private final QueryContext queryContext;
+
+        private final HashSet<PrimaryKey> processedKeys;
+        private final Queue<UnfilteredRowIterator> pendingRows;
+
+        // The limit requested by the query. We cannot load more than 
softLimit rows in bulk because we only want
+        // to fetch the topk rows where k is the limit. However, we allow the 
iterator to fetch more rows than the
+        // soft limit to avoid confusing behavior. When the softLimit is 
reached, the iterator will fetch one row
+        // at a time.
+        private final int softLimit;
+        private int returnedRowCount = 0;
+
+        private ScoreOrderedResultRetriever(QueryController controller,
+                                            ReadExecutionController 
executionController,
+                                            QueryContext queryContext,
+                                            QueryViewBuilder.QueryView 
queryView,
+                                            int limit)
+        {
+            assert queryView.view.size() == 1;
+            QueryViewBuilder.QueryExpressionView queryExpressionView = 
queryView.view.stream().findFirst().get();
+            this.view = queryExpressionView.viewFragment;
+            this.keyRanges = 
controller.dataRanges().stream().map(DataRange::keyRange).collect(Collectors.toList());
+            this.coversFullRing = keyRanges.size() == 1 && 
RangeUtil.coversFullRing(keyRanges.get(0));
+
+            this.scoredPrimaryKeyIterator = 
Operation.buildIteratorForOrder(controller, queryExpressionView);
+            this.filterTree = Operation.buildFilter(controller, 
controller.usesStrictFiltering());
+            this.controller = controller;
+            this.executionController = executionController;
+            this.queryContext = queryContext;
+
+            this.processedKeys = new HashSet<>(limit);
+            this.pendingRows = new ArrayDeque<>(limit);
+            this.softLimit = limit;
+        }
+
+        @Override
+        public UnfilteredRowIterator computeNext()
+        {
+            if (pendingRows.isEmpty())
+                fillPendingRows();
+            returnedRowCount++;
+            // Because we know ordered keys are fully qualified, we do not 
iterate partitions
+            return !pendingRows.isEmpty() ? pendingRows.poll() : endOfData();
+        }
+
+        /**
+         * Fills the pendingRows queue to generate a queue of row iterators 
for the supplied keys by repeatedly calling
+         * {@link #readAndValidatePartition} until it gives enough non-null 
results.
+         */
+        private void fillPendingRows()
+        {
+            // Group PKs by source sstable/memtable
+            Map<PrimaryKey, List<PrimaryKeyWithScore>> groupedKeys = new 
HashMap<>();
+            // We always want to get at least 1.
+            int rowsToRetrieve = Math.max(1, softLimit - returnedRowCount);
+            // We want to get the first unique `rowsToRetrieve` keys to 
materialize
+            // Don't pass the priority queue here because it is more efficient 
to add keys in bulk
+            fillKeys(groupedKeys, rowsToRetrieve, null);
+            // Sort the primary keys by PrK order, just in case that helps 
with cache and disk efficiency
+            PriorityQueue<PrimaryKey> primaryKeyPriorityQueue = new 
PriorityQueue<>(groupedKeys.keySet());
+
+            // drain groupedKeys into pendingRows
+            while (!groupedKeys.isEmpty())
+            {
+                PrimaryKey pk = primaryKeyPriorityQueue.poll();
+                List<PrimaryKeyWithScore> sourceKeys = groupedKeys.remove(pk);
+                UnfilteredRowIterator partitionIterator = 
readAndValidatePartition(pk, sourceKeys);
+                if (partitionIterator != null)
+                    pendingRows.add(partitionIterator);
+                else
+                    // The current primaryKey did not produce a partition 
iterator. We know the caller will need
+                    // `rowsToRetrieve` rows, so we get the next unique key 
and add it to the queue.
+                    fillKeys(groupedKeys, 1, primaryKeyPriorityQueue);
             }
+        }
 
-            if (topK && !keysToShadow.isEmpty())
+        /**
+         * Fills the `groupedKeys` Map with the next `count` unique primary 
keys that are in the keys produced by calling
+         * {@link #nextSelectedKeyInRange()}. We map PrimaryKey to a list of 
PrimaryKeyWithScore because the same
+         * primary key can be in the result set multiple times, but with 
different source tables.
+         * @param groupedKeys the map to fill
+         * @param count the number of unique PrimaryKeys to consume from the 
iterator
+         * @param primaryKeyPriorityQueue the priority queue to add new keys 
to. If the queue is null, we do not add
+         *                                keys to the queue.
+         */
+        private void fillKeys(Map<PrimaryKey, List<PrimaryKeyWithScore>> 
groupedKeys, int count, PriorityQueue<PrimaryKey> primaryKeyPriorityQueue)
+        {
+            int initialSize = groupedKeys.size();
+            while (groupedKeys.size() - initialSize < count)
             {
-                // Record primary keys shadowed by expired TTLs, row 
tombstones, or range tombstones:
-                
queryContext.vectorContext().recordShadowedPrimaryKeys(keysToShadow);
+                PrimaryKeyWithScore primaryKeyWithScore = 
nextSelectedKeyInRange();
+                if (primaryKeyWithScore == null)
+                    return;
+                PrimaryKey nextPrimaryKey = primaryKeyWithScore.primaryKey();
+                List<PrimaryKeyWithScore> accumulator = 
groupedKeys.computeIfAbsent(nextPrimaryKey, k -> new ArrayList<>());
+                if (primaryKeyPriorityQueue != null && accumulator.isEmpty())
+                    primaryKeyPriorityQueue.add(nextPrimaryKey);
+                accumulator.add(primaryKeyWithScore);
             }
+        }
+
+        /**
+         * Determine if the key is in one of the queried key ranges. We do not 
iterate through results in
+         * {@link PrimaryKey} order, so we have to check each range.
+         * @param key

Review Comment:
   nit: `key` is kind of dangling here...I'm fine with just removing it in the 
face of a pretty helpful overall description.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to