maedhroz commented on code in PR #4353:
URL: https://github.com/apache/cassandra/pull/4353#discussion_r2678364121


##########
src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java:
##########
@@ -468,79 +501,321 @@ private UnfilteredRowIterator 
queryStorageAndFilter(List<PrimaryKey> keys)
             }
         }
 
-        private UnfilteredRowIterator filterPartition(List<PrimaryKey> keys, 
UnfilteredRowIterator partition, FilterTree tree)
+        @Override
+        public TableMetadata metadata()
         {
-            Row staticRow = partition.staticRow();
-            DecoratedKey partitionKey = partition.partitionKey();
-            List<Unfiltered> matches = new ArrayList<>();
-            boolean hasMatch = false;
-            Set<PrimaryKey> keysToShadow = topK ? new HashSet<>(keys) : 
Collections.emptySet();
+            return queryController.metadata();
+        }
 
-            while (partition.hasNext())
-            {
-                Unfiltered unfiltered = partition.next();
+        @Override
+        public void close()
+        {
+            FileUtils.closeQuietly(resultKeyIterator);
+            if (tableQueryMetrics != null) 
tableQueryMetrics.record(queryContext);
+        }
+    }
 
-                if (unfiltered.isRow())
-                {
-                    queryContext.rowsFiltered++;
+    private static UnfilteredRowIterator filterPartition(UnfilteredRowIterator 
partition, FilterTree tree, QueryContext context)
+    {
+        Row staticRow = partition.staticRow();
+        DecoratedKey partitionKey = partition.partitionKey();
+        List<Unfiltered> matches = new ArrayList<>();
+        boolean hasMatch = false;
 
-                    if (tree.isSatisfiedBy(partitionKey, (Row) unfiltered, 
staticRow))
-                    {
-                        matches.add(unfiltered);
-                        hasMatch = true;
+        // todo static keys??
+        while (partition.hasNext())
+        {
+            Unfiltered unfiltered = partition.next();
 
-                        if (topK)
-                        {
-                            PrimaryKey shadowed = 
keyFactory.hasClusteringColumns()
-                                                  ? 
keyFactory.create(partitionKey, ((Row) unfiltered).clustering())
-                                                  : 
keyFactory.create(partitionKey);
-                            keysToShadow.remove(shadowed);
-                        }
-                    }
+            if (unfiltered.isRow())
+            {
+                context.rowsFiltered++;
+
+                if (tree.isSatisfiedBy(partitionKey, (Row) unfiltered, 
staticRow))
+                {
+                    matches.add(unfiltered);
+                    hasMatch = true;
                 }
             }
+        }
 
-            // If any non-static rows match the filter, there should be no 
need to shadow the static primary key:
-            if (topK && hasMatch && keyFactory.hasClusteringColumns())
-                keysToShadow.remove(keyFactory.create(partitionKey, 
Clustering.STATIC_CLUSTERING));
+        // We may not have any non-static row data to filter...
+        if (!hasMatch)
+        {
+            context.rowsFiltered++;
 
-            // We may not have any non-static row data to filter...
-            if (!hasMatch)
+            if (tree.isSatisfiedBy(partitionKey, staticRow, staticRow))
             {
-                queryContext.rowsFiltered++;
+                hasMatch = true;
+            }
+        }
 
-                if (tree.isSatisfiedBy(partitionKey, staticRow, staticRow))
-                {
-                    hasMatch = true;
+        if (!hasMatch)
+        {
+            // If there are no matches, return an empty partition. If 
reconciliation is required at the
+            // coordinator, replica filtering protection may make a second 
round trip to complete its view
+            // of the partition.
+            return null;
+        }
 
-                    if (topK)
-                        keysToShadow.clear();
-                }
+        // Return all matches found, along with the static row...
+        return new SinglePartitionIterator(partition, staticRow, 
matches.iterator());
+    }
+
+    private static class SinglePartitionIterator extends 
AbstractUnfilteredRowIterator
+    {
+        private final Iterator<Unfiltered> rows;
+
+        public SinglePartitionIterator(UnfilteredRowIterator partition, Row 
staticRow, Iterator<Unfiltered> rows)
+        {
+            super(partition.metadata(),
+                  partition.partitionKey(),
+                  partition.partitionLevelDeletion(),
+                  partition.columns(),
+                  staticRow,
+                  partition.isReverseOrder(),
+                  partition.stats());
+
+            this.rows = rows;
+        }
+
+        @Override
+        protected Unfiltered computeNext()
+        {
+            return rows.hasNext() ? rows.next() : endOfData();
+        }
+    }
+
+    /**
+     * A result retriever that consumes an iterator primary keys sorted by 
some score, materializes the row for each
+     * primary key (currently, each primary key is required to be fully 
qualified and should only point to one row),
+     * apply the filter tree to the row to test that the real row satisfies 
the WHERE clause, and finally tests
+     * that the row is valid for the ORDER BY clause. The class performs some 
optimizations to avoid materializing
+     * rows unnecessarily. See the class for more details.
+     * <p>
+     * The resulting {@link UnfilteredRowIterator} objects are not guaranteed 
to be in any particular order. It is
+     * the responsibility of the caller to sort the results if necessary.
+     */
+    public static class ScoreOrderedResultRetriever extends 
AbstractIterator<UnfilteredRowIterator> implements UnfilteredPartitionIterator
+    {
+        private final ColumnFamilyStore.ViewFragment view;
+        private final List<AbstractBounds<PartitionPosition>> keyRanges;
+        private final boolean coversFullRing;
+        private final CloseableIterator<PrimaryKeyWithScore> 
scoredPrimaryKeyIterator;
+        private final FilterTree filterTree;
+        private final QueryController controller;
+        private final ReadExecutionController executionController;
+        private final QueryContext queryContext;
+
+        private final HashSet<PrimaryKey> processedKeys;
+        private final Queue<UnfilteredRowIterator> pendingRows;
+
+        // The limit requested by the query. We cannot load more than 
softLimit rows in bulk because we only want
+        // to fetch the topk rows where k is the limit. However, we allow the 
iterator to fetch more rows than the
+        // soft limit to avoid confusing behavior. When the softLimit is 
reached, the iterator will fetch one row
+        // at a time.
+        private final int softLimit;
+        private int returnedRowCount = 0;
+
+        private ScoreOrderedResultRetriever(QueryController controller,
+                                            ReadExecutionController 
executionController,
+                                            QueryContext queryContext,
+                                            QueryViewBuilder.QueryView 
queryView,
+                                            int limit)
+        {
+            assert queryView.view.size() == 1;
+            QueryViewBuilder.QueryExpressionView queryExpressionView = 
queryView.view.stream().findFirst().get();
+            this.view = queryExpressionView.viewFragment;
+            this.keyRanges = 
controller.dataRanges().stream().map(DataRange::keyRange).collect(Collectors.toList());
+            this.coversFullRing = keyRanges.size() == 1 && 
RangeUtil.coversFullRing(keyRanges.get(0));
+
+            this.scoredPrimaryKeyIterator = 
Operation.buildIteratorForOrder(controller, queryExpressionView);
+            this.filterTree = Operation.buildFilter(controller, 
controller.usesStrictFiltering());
+            this.controller = controller;
+            this.executionController = executionController;
+            this.queryContext = queryContext;
+
+            this.processedKeys = new HashSet<>(limit);
+            this.pendingRows = new ArrayDeque<>(limit);
+            this.softLimit = limit;
+        }
+
+        @Override
+        public UnfilteredRowIterator computeNext()
+        {
+            if (pendingRows.isEmpty())
+                fillPendingRows();
+            returnedRowCount++;
+            // Because we know ordered keys are fully qualified, we do not 
iterate partitions
+            return !pendingRows.isEmpty() ? pendingRows.poll() : endOfData();
+        }
+
+        /**
+         * Fills the pendingRows queue to generate a queue of row iterators 
for the supplied keys by repeatedly calling
+         * {@link #readAndValidatePartition} until it gives enough non-null 
results.
+         */
+        private void fillPendingRows()
+        {
+            // Group PKs by source sstable/memtable
+            Map<PrimaryKey, List<PrimaryKeyWithScore>> groupedKeys = new 
HashMap<>();
+            // We always want to get at least 1.
+            int rowsToRetrieve = Math.max(1, softLimit - returnedRowCount);
+            // We want to get the first unique `rowsToRetrieve` keys to 
materialize
+            // Don't pass the priority queue here because it is more efficient 
to add keys in bulk
+            fillKeys(groupedKeys, rowsToRetrieve, null);
+            // Sort the primary keys by PrK order, just in case that helps 
with cache and disk efficiency
+            PriorityQueue<PrimaryKey> primaryKeyPriorityQueue = new 
PriorityQueue<>(groupedKeys.keySet());
+
+            // drain groupedKeys into pendingRows
+            while (!groupedKeys.isEmpty())
+            {
+                PrimaryKey pk = primaryKeyPriorityQueue.poll();
+                List<PrimaryKeyWithScore> sourceKeys = groupedKeys.remove(pk);
+                UnfilteredRowIterator partitionIterator = 
readAndValidatePartition(pk, sourceKeys);
+                if (partitionIterator != null)
+                    pendingRows.add(partitionIterator);
+                else
+                    // The current primaryKey did not produce a partition 
iterator. We know the caller will need
+                    // `rowsToRetrieve` rows, so we get the next unique key 
and add it to the queue.
+                    fillKeys(groupedKeys, 1, primaryKeyPriorityQueue);
             }
+        }
 
-            if (topK && !keysToShadow.isEmpty())
+        /**
+         * Fills the `groupedKeys` Map with the next `count` unique primary 
keys that are in the keys produced by calling
+         * {@link #nextSelectedKeyInRange()}. We map PrimaryKey to a list of 
PrimaryKeyWithScore because the same
+         * primary key can be in the result set multiple times, but with 
different source tables.
+         * @param groupedKeys the map to fill
+         * @param count the number of unique PrimaryKeys to consume from the 
iterator
+         * @param primaryKeyPriorityQueue the priority queue to add new keys 
to. If the queue is null, we do not add
+         *                                keys to the queue.
+         */
+        private void fillKeys(Map<PrimaryKey, List<PrimaryKeyWithScore>> 
groupedKeys, int count, PriorityQueue<PrimaryKey> primaryKeyPriorityQueue)
+        {
+            int initialSize = groupedKeys.size();
+            while (groupedKeys.size() - initialSize < count)
             {
-                // Record primary keys shadowed by expired TTLs, row 
tombstones, or range tombstones:
-                
queryContext.vectorContext().recordShadowedPrimaryKeys(keysToShadow);
+                PrimaryKeyWithScore primaryKeyWithScore = 
nextSelectedKeyInRange();
+                if (primaryKeyWithScore == null)
+                    return;
+                PrimaryKey nextPrimaryKey = primaryKeyWithScore.primaryKey();
+                List<PrimaryKeyWithScore> accumulator = 
groupedKeys.computeIfAbsent(nextPrimaryKey, k -> new ArrayList<>());
+                if (primaryKeyPriorityQueue != null && accumulator.isEmpty())
+                    primaryKeyPriorityQueue.add(nextPrimaryKey);
+                accumulator.add(primaryKeyWithScore);
             }
+        }
+
+        /**
+         * Determine if the key is in one of the queried key ranges. We do not 
iterate through results in
+         * {@link PrimaryKey} order, so we have to check each range.
+         * @param key
+         * @return true if the key is in one of the queried key ranges
+         */
+        private boolean isInRange(DecoratedKey key)
+        {
+            if (coversFullRing)
+                return true;
 
-            if (!hasMatch)
+            for (AbstractBounds<PartitionPosition> range : keyRanges)
+                if (range.contains(key))
+                    return true;
+            return false;
+        }
+
+        /**
+         * Returns the next available key contained by one of the keyRanges 
and selected by the queryController.
+         * If the next key falls out of the current key range, it skips to the 
next key range, and so on.
+         * If no more keys acceptd by the controller are available, returns 
null.
+         */
+        private @Nullable PrimaryKeyWithScore nextSelectedKeyInRange()
+        {
+            while (scoredPrimaryKeyIterator.hasNext())
             {
-                // If there are no matches, return an empty partition. If 
reconciliation is required at the
-                // coordinator, replica filtering protection may make a second 
round trip to complete its view
-                // of the partition.
+                PrimaryKeyWithScore key = scoredPrimaryKeyIterator.next();
+                if (isInRange(key.primaryKey().partitionKey()) && 
!controller.doesNotSelect(key.primaryKey()))
+                    return key;
+            }
+            return null;
+        }
+
+        /**
+         * Reads and validates a partition for a given primary key against its 
sources.
+         * <p>
+         * @param pk The primary key of the partition to read and validate
+         * @param sourceKeys A list of PrimaryKeyWithScore objects associated 
with the primary key.
+         *                   Multiple sort keys can exist for the same primary 
key when data comes from different
+         *                   sstables or memtables.
+         *
+         * @return An UnfilteredRowIterator containing the validated partition 
data, or null if:
+         *         - The key has already been processed
+         *         - The partition does not pass index filters
+         *         - The partition contains no valid rows
+         *         - The row data does not match the index metadata for any of 
the provided primary keys
+         */
+        public UnfilteredRowIterator readAndValidatePartition(PrimaryKey pk, 
List<PrimaryKeyWithScore> sourceKeys)
+        {
+            // If we've already processed the key, we can skip it. Because the 
score ordered iterator does not
+            // deduplicate rows, we could see dupes if a row is in the 
ordering index multiple times. This happens
+            // in the case of dupes and of overwrites.
+            if (processedKeys.contains(pk))
                 return null;
+
+            try (UnfilteredRowIterator partition = controller.queryStorage(pk, 
view, executionController))
+            {
+                queryContext.partitionsRead++;
+                queryContext.checkpoint();
+                Row staticRow = partition.staticRow();
+                UnfilteredRowIterator clusters = filterPartition(partition, 
filterTree, queryContext);
+
+                if (clusters == null || !clusters.hasNext())
+                {
+                    processedKeys.add(pk);
+                    return null;
+                }
+
+                long now = FBUtilities.nowInSeconds();
+                boolean isRowValid = false;
+                Unfiltered row = clusters.next();
+                assert !clusters.hasNext() : "Expected only one row per 
partition";
+                if (!row.isRangeTombstoneMarker())
+                {
+                    for (PrimaryKeyWithScore sourceKey : sourceKeys)
+                    {
+                        // Each of these primary keys are equal, but they have 
different source tables. Therefore,
+                        // we check to see if the row is valid for any of 
them, and if it is, we return the row.
+                        if (sourceKey.isIndexDataValid((Row) row, now))
+                        {
+                            isRowValid = true;
+                            // We can only count the pk as processed once we 
know it was valid for one of the
+                            // scored keys.
+                            processedKeys.add(pk);
+                            break;
+                        }
+                    }
+                }
+                return isRowValid ? new PrimaryKeyIterator(partition, 
staticRow, row)
+                                  : null;
             }
+        }
 
-            // Return all matches found, along with the static row... 
-            return new PartitionIterator(partition, staticRow, 
matches.iterator());
+        @Override
+        public TableMetadata metadata()
+        {
+            return controller.metadata();
         }
 
-        private class PartitionIterator extends AbstractUnfilteredRowIterator
+        public void close()
         {
-            private final Iterator<Unfiltered> rows;
+            FileUtils.closeQuietly(scoredPrimaryKeyIterator);
+        }
+
+        public static class PrimaryKeyIterator extends 
AbstractUnfilteredRowIterator

Review Comment:
   ```suggestion
           public static class SingleRowIterator extends 
AbstractUnfilteredRowIterator
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to