maedhroz commented on code in PR #4353:
URL: https://github.com/apache/cassandra/pull/4353#discussion_r2678323966
##########
src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java:
##########
@@ -468,79 +501,321 @@ private UnfilteredRowIterator
queryStorageAndFilter(List<PrimaryKey> keys)
}
}
- private UnfilteredRowIterator filterPartition(List<PrimaryKey> keys,
UnfilteredRowIterator partition, FilterTree tree)
+ @Override
+ public TableMetadata metadata()
{
- Row staticRow = partition.staticRow();
- DecoratedKey partitionKey = partition.partitionKey();
- List<Unfiltered> matches = new ArrayList<>();
- boolean hasMatch = false;
- Set<PrimaryKey> keysToShadow = topK ? new HashSet<>(keys) :
Collections.emptySet();
+ return queryController.metadata();
+ }
- while (partition.hasNext())
- {
- Unfiltered unfiltered = partition.next();
+ @Override
+ public void close()
+ {
+ FileUtils.closeQuietly(resultKeyIterator);
+ if (tableQueryMetrics != null)
tableQueryMetrics.record(queryContext);
+ }
+ }
- if (unfiltered.isRow())
- {
- queryContext.rowsFiltered++;
+ private static UnfilteredRowIterator filterPartition(UnfilteredRowIterator
partition, FilterTree tree, QueryContext context)
+ {
+ Row staticRow = partition.staticRow();
+ DecoratedKey partitionKey = partition.partitionKey();
+ List<Unfiltered> matches = new ArrayList<>();
+ boolean hasMatch = false;
- if (tree.isSatisfiedBy(partitionKey, (Row) unfiltered,
staticRow))
- {
- matches.add(unfiltered);
- hasMatch = true;
+ // todo static keys??
+ while (partition.hasNext())
+ {
+ Unfiltered unfiltered = partition.next();
- if (topK)
- {
- PrimaryKey shadowed =
keyFactory.hasClusteringColumns()
- ?
keyFactory.create(partitionKey, ((Row) unfiltered).clustering())
- :
keyFactory.create(partitionKey);
- keysToShadow.remove(shadowed);
- }
- }
+ if (unfiltered.isRow())
+ {
+ context.rowsFiltered++;
+
+ if (tree.isSatisfiedBy(partitionKey, (Row) unfiltered,
staticRow))
+ {
+ matches.add(unfiltered);
+ hasMatch = true;
}
}
+ }
- // If any non-static rows match the filter, there should be no
need to shadow the static primary key:
- if (topK && hasMatch && keyFactory.hasClusteringColumns())
- keysToShadow.remove(keyFactory.create(partitionKey,
Clustering.STATIC_CLUSTERING));
+ // We may not have any non-static row data to filter...
+ if (!hasMatch)
+ {
+ context.rowsFiltered++;
- // We may not have any non-static row data to filter...
- if (!hasMatch)
+ if (tree.isSatisfiedBy(partitionKey, staticRow, staticRow))
{
- queryContext.rowsFiltered++;
+ hasMatch = true;
+ }
+ }
- if (tree.isSatisfiedBy(partitionKey, staticRow, staticRow))
- {
- hasMatch = true;
+ if (!hasMatch)
+ {
+ // If there are no matches, return an empty partition. If
reconciliation is required at the
+ // coordinator, replica filtering protection may make a second
round trip to complete its view
+ // of the partition.
+ return null;
+ }
- if (topK)
- keysToShadow.clear();
- }
+ // Return all matches found, along with the static row...
+ return new SinglePartitionIterator(partition, staticRow,
matches.iterator());
+ }
+
+ private static class SinglePartitionIterator extends
AbstractUnfilteredRowIterator
+ {
+ private final Iterator<Unfiltered> rows;
+
+ public SinglePartitionIterator(UnfilteredRowIterator partition, Row
staticRow, Iterator<Unfiltered> rows)
+ {
+ super(partition.metadata(),
+ partition.partitionKey(),
+ partition.partitionLevelDeletion(),
+ partition.columns(),
+ staticRow,
+ partition.isReverseOrder(),
+ partition.stats());
+
+ this.rows = rows;
+ }
+
+ @Override
+ protected Unfiltered computeNext()
+ {
+ return rows.hasNext() ? rows.next() : endOfData();
+ }
+ }
+
+ /**
+ * A result retriever that consumes an iterator primary keys sorted by
some score, materializes the row for each
+ * primary key (currently, each primary key is required to be fully
qualified and should only point to one row),
+ * apply the filter tree to the row to test that the real row satisfies
the WHERE clause, and finally tests
+ * that the row is valid for the ORDER BY clause. The class performs some
optimizations to avoid materializing
+ * rows unnecessarily. See the class for more details.
+ * <p>
+ * The resulting {@link UnfilteredRowIterator} objects are not guaranteed
to be in any particular order. It is
+ * the responsibility of the caller to sort the results if necessary.
+ */
+ public static class ScoreOrderedResultRetriever extends
AbstractIterator<UnfilteredRowIterator> implements UnfilteredPartitionIterator
+ {
+ private final ColumnFamilyStore.ViewFragment view;
+ private final List<AbstractBounds<PartitionPosition>> keyRanges;
+ private final boolean coversFullRing;
+ private final CloseableIterator<PrimaryKeyWithScore>
scoredPrimaryKeyIterator;
+ private final FilterTree filterTree;
+ private final QueryController controller;
+ private final ReadExecutionController executionController;
+ private final QueryContext queryContext;
+
+ private final HashSet<PrimaryKey> processedKeys;
+ private final Queue<UnfilteredRowIterator> pendingRows;
+
+ // The limit requested by the query. We cannot load more than
softLimit rows in bulk because we only want
+ // to fetch the topk rows where k is the limit. However, we allow the
iterator to fetch more rows than the
+ // soft limit to avoid confusing behavior. When the softLimit is
reached, the iterator will fetch one row
+ // at a time.
+ private final int softLimit;
+ private int returnedRowCount = 0;
+
+ private ScoreOrderedResultRetriever(QueryController controller,
+ ReadExecutionController
executionController,
+ QueryContext queryContext,
+ QueryViewBuilder.QueryView
queryView,
+ int limit)
+ {
+ assert queryView.view.size() == 1;
+ QueryViewBuilder.QueryExpressionView queryExpressionView =
queryView.view.stream().findFirst().get();
+ this.view = queryExpressionView.viewFragment;
+ this.keyRanges =
controller.dataRanges().stream().map(DataRange::keyRange).collect(Collectors.toList());
+ this.coversFullRing = keyRanges.size() == 1 &&
RangeUtil.coversFullRing(keyRanges.get(0));
+
+ this.scoredPrimaryKeyIterator =
Operation.buildIteratorForOrder(controller, queryExpressionView);
+ this.filterTree = Operation.buildFilter(controller,
controller.usesStrictFiltering());
+ this.controller = controller;
+ this.executionController = executionController;
+ this.queryContext = queryContext;
+
+ this.processedKeys = new HashSet<>(limit);
+ this.pendingRows = new ArrayDeque<>(limit);
+ this.softLimit = limit;
+ }
+
+ @Override
+ public UnfilteredRowIterator computeNext()
+ {
+ if (pendingRows.isEmpty())
+ fillPendingRows();
+ returnedRowCount++;
+ // Because we know ordered keys are fully qualified, we do not
iterate partitions
+ return !pendingRows.isEmpty() ? pendingRows.poll() : endOfData();
+ }
+
+ /**
+ * Fills the pendingRows queue to generate a queue of row iterators
for the supplied keys by repeatedly calling
+ * {@link #readAndValidatePartition} until it gives enough non-null
results.
+ */
+ private void fillPendingRows()
+ {
+ // Group PKs by source sstable/memtable
+ Map<PrimaryKey, List<PrimaryKeyWithScore>> groupedKeys = new
HashMap<>();
+ // We always want to get at least 1.
+ int rowsToRetrieve = Math.max(1, softLimit - returnedRowCount);
+ // We want to get the first unique `rowsToRetrieve` keys to
materialize
+ // Don't pass the priority queue here because it is more efficient
to add keys in bulk
+ fillKeys(groupedKeys, rowsToRetrieve, null);
+ // Sort the primary keys by PrK order, just in case that helps
with cache and disk efficiency
+ PriorityQueue<PrimaryKey> primaryKeyPriorityQueue = new
PriorityQueue<>(groupedKeys.keySet());
+
+ // drain groupedKeys into pendingRows
+ while (!groupedKeys.isEmpty())
+ {
+ PrimaryKey pk = primaryKeyPriorityQueue.poll();
+ List<PrimaryKeyWithScore> sourceKeys = groupedKeys.remove(pk);
+ UnfilteredRowIterator partitionIterator =
readAndValidatePartition(pk, sourceKeys);
+ if (partitionIterator != null)
+ pendingRows.add(partitionIterator);
+ else
+ // The current primaryKey did not produce a partition
iterator. We know the caller will need
+ // `rowsToRetrieve` rows, so we get the next unique key
and add it to the queue.
+ fillKeys(groupedKeys, 1, primaryKeyPriorityQueue);
}
+ }
- if (topK && !keysToShadow.isEmpty())
+ /**
+ * Fills the `groupedKeys` Map with the next `count` unique primary
keys that are in the keys produced by calling
+ * {@link #nextSelectedKeyInRange()}. We map PrimaryKey to a list of
PrimaryKeyWithScore because the same
+ * primary key can be in the result set multiple times, but with
different source tables.
+ * @param groupedKeys the map to fill
+ * @param count the number of unique PrimaryKeys to consume from the
iterator
+ * @param primaryKeyPriorityQueue the priority queue to add new keys
to. If the queue is null, we do not add
+ * keys to the queue.
+ */
+ private void fillKeys(Map<PrimaryKey, List<PrimaryKeyWithScore>>
groupedKeys, int count, PriorityQueue<PrimaryKey> primaryKeyPriorityQueue)
+ {
+ int initialSize = groupedKeys.size();
+ while (groupedKeys.size() - initialSize < count)
{
- // Record primary keys shadowed by expired TTLs, row
tombstones, or range tombstones:
-
queryContext.vectorContext().recordShadowedPrimaryKeys(keysToShadow);
+ PrimaryKeyWithScore primaryKeyWithScore =
nextSelectedKeyInRange();
+ if (primaryKeyWithScore == null)
+ return;
+ PrimaryKey nextPrimaryKey = primaryKeyWithScore.primaryKey();
+ List<PrimaryKeyWithScore> accumulator =
groupedKeys.computeIfAbsent(nextPrimaryKey, k -> new ArrayList<>());
+ if (primaryKeyPriorityQueue != null && accumulator.isEmpty())
+ primaryKeyPriorityQueue.add(nextPrimaryKey);
+ accumulator.add(primaryKeyWithScore);
}
+ }
+
+ /**
+ * Determine if the key is in one of the queried key ranges. We do not
iterate through results in
+ * {@link PrimaryKey} order, so we have to check each range.
+ * @param key
Review Comment:
nit: `key` is kind of dangling here...I'm fine with just removing it in the
face of a pretty helpful overall description.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]