maedhroz commented on code in PR #4353:
URL: https://github.com/apache/cassandra/pull/4353#discussion_r2678362626
##########
src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java:
##########
@@ -468,79 +501,321 @@ private UnfilteredRowIterator
queryStorageAndFilter(List<PrimaryKey> keys)
}
}
- private UnfilteredRowIterator filterPartition(List<PrimaryKey> keys,
UnfilteredRowIterator partition, FilterTree tree)
+ @Override
+ public TableMetadata metadata()
{
- Row staticRow = partition.staticRow();
- DecoratedKey partitionKey = partition.partitionKey();
- List<Unfiltered> matches = new ArrayList<>();
- boolean hasMatch = false;
- Set<PrimaryKey> keysToShadow = topK ? new HashSet<>(keys) :
Collections.emptySet();
+ return queryController.metadata();
+ }
- while (partition.hasNext())
- {
- Unfiltered unfiltered = partition.next();
+ @Override
+ public void close()
+ {
+ FileUtils.closeQuietly(resultKeyIterator);
+ if (tableQueryMetrics != null)
tableQueryMetrics.record(queryContext);
+ }
+ }
- if (unfiltered.isRow())
- {
- queryContext.rowsFiltered++;
+ private static UnfilteredRowIterator filterPartition(UnfilteredRowIterator
partition, FilterTree tree, QueryContext context)
+ {
+ Row staticRow = partition.staticRow();
+ DecoratedKey partitionKey = partition.partitionKey();
+ List<Unfiltered> matches = new ArrayList<>();
+ boolean hasMatch = false;
- if (tree.isSatisfiedBy(partitionKey, (Row) unfiltered,
staticRow))
- {
- matches.add(unfiltered);
- hasMatch = true;
+ // todo static keys??
+ while (partition.hasNext())
+ {
+ Unfiltered unfiltered = partition.next();
- if (topK)
- {
- PrimaryKey shadowed =
keyFactory.hasClusteringColumns()
- ?
keyFactory.create(partitionKey, ((Row) unfiltered).clustering())
- :
keyFactory.create(partitionKey);
- keysToShadow.remove(shadowed);
- }
- }
+ if (unfiltered.isRow())
+ {
+ context.rowsFiltered++;
+
+ if (tree.isSatisfiedBy(partitionKey, (Row) unfiltered,
staticRow))
+ {
+ matches.add(unfiltered);
+ hasMatch = true;
}
}
+ }
- // If any non-static rows match the filter, there should be no
need to shadow the static primary key:
- if (topK && hasMatch && keyFactory.hasClusteringColumns())
- keysToShadow.remove(keyFactory.create(partitionKey,
Clustering.STATIC_CLUSTERING));
+ // We may not have any non-static row data to filter...
+ if (!hasMatch)
+ {
+ context.rowsFiltered++;
- // We may not have any non-static row data to filter...
- if (!hasMatch)
+ if (tree.isSatisfiedBy(partitionKey, staticRow, staticRow))
{
- queryContext.rowsFiltered++;
+ hasMatch = true;
+ }
+ }
- if (tree.isSatisfiedBy(partitionKey, staticRow, staticRow))
- {
- hasMatch = true;
+ if (!hasMatch)
+ {
+ // If there are no matches, return an empty partition. If
reconciliation is required at the
+ // coordinator, replica filtering protection may make a second
round trip to complete its view
+ // of the partition.
+ return null;
+ }
- if (topK)
- keysToShadow.clear();
- }
+ // Return all matches found, along with the static row...
+ return new SinglePartitionIterator(partition, staticRow,
matches.iterator());
+ }
+
+ private static class SinglePartitionIterator extends
AbstractUnfilteredRowIterator
+ {
+ private final Iterator<Unfiltered> rows;
+
+ public SinglePartitionIterator(UnfilteredRowIterator partition, Row
staticRow, Iterator<Unfiltered> rows)
+ {
+ super(partition.metadata(),
+ partition.partitionKey(),
+ partition.partitionLevelDeletion(),
+ partition.columns(),
+ staticRow,
+ partition.isReverseOrder(),
+ partition.stats());
+
+ this.rows = rows;
+ }
+
+ @Override
+ protected Unfiltered computeNext()
+ {
+ return rows.hasNext() ? rows.next() : endOfData();
+ }
+ }
+
+ /**
+ * A result retriever that consumes an iterator primary keys sorted by
some score, materializes the row for each
+ * primary key (currently, each primary key is required to be fully
qualified and should only point to one row),
+ * apply the filter tree to the row to test that the real row satisfies
the WHERE clause, and finally tests
+ * that the row is valid for the ORDER BY clause. The class performs some
optimizations to avoid materializing
+ * rows unnecessarily. See the class for more details.
+ * <p>
+ * The resulting {@link UnfilteredRowIterator} objects are not guaranteed
to be in any particular order. It is
+ * the responsibility of the caller to sort the results if necessary.
+ */
+ public static class ScoreOrderedResultRetriever extends
AbstractIterator<UnfilteredRowIterator> implements UnfilteredPartitionIterator
+ {
+ private final ColumnFamilyStore.ViewFragment view;
+ private final List<AbstractBounds<PartitionPosition>> keyRanges;
+ private final boolean coversFullRing;
+ private final CloseableIterator<PrimaryKeyWithScore>
scoredPrimaryKeyIterator;
+ private final FilterTree filterTree;
+ private final QueryController controller;
+ private final ReadExecutionController executionController;
+ private final QueryContext queryContext;
+
+ private final HashSet<PrimaryKey> processedKeys;
+ private final Queue<UnfilteredRowIterator> pendingRows;
+
+ // The limit requested by the query. We cannot load more than
softLimit rows in bulk because we only want
+ // to fetch the topk rows where k is the limit. However, we allow the
iterator to fetch more rows than the
+ // soft limit to avoid confusing behavior. When the softLimit is
reached, the iterator will fetch one row
+ // at a time.
+ private final int softLimit;
+ private int returnedRowCount = 0;
+
+ private ScoreOrderedResultRetriever(QueryController controller,
+ ReadExecutionController
executionController,
+ QueryContext queryContext,
+ QueryViewBuilder.QueryView
queryView,
+ int limit)
+ {
+ assert queryView.view.size() == 1;
+ QueryViewBuilder.QueryExpressionView queryExpressionView =
queryView.view.stream().findFirst().get();
+ this.view = queryExpressionView.viewFragment;
+ this.keyRanges =
controller.dataRanges().stream().map(DataRange::keyRange).collect(Collectors.toList());
+ this.coversFullRing = keyRanges.size() == 1 &&
RangeUtil.coversFullRing(keyRanges.get(0));
+
+ this.scoredPrimaryKeyIterator =
Operation.buildIteratorForOrder(controller, queryExpressionView);
+ this.filterTree = Operation.buildFilter(controller,
controller.usesStrictFiltering());
+ this.controller = controller;
+ this.executionController = executionController;
+ this.queryContext = queryContext;
+
+ this.processedKeys = new HashSet<>(limit);
+ this.pendingRows = new ArrayDeque<>(limit);
+ this.softLimit = limit;
+ }
+
+ @Override
+ public UnfilteredRowIterator computeNext()
+ {
+ if (pendingRows.isEmpty())
+ fillPendingRows();
+ returnedRowCount++;
+ // Because we know ordered keys are fully qualified, we do not
iterate partitions
+ return !pendingRows.isEmpty() ? pendingRows.poll() : endOfData();
+ }
+
+ /**
+ * Fills the pendingRows queue to generate a queue of row iterators
for the supplied keys by repeatedly calling
+ * {@link #readAndValidatePartition} until it gives enough non-null
results.
+ */
+ private void fillPendingRows()
+ {
+ // Group PKs by source sstable/memtable
+ Map<PrimaryKey, List<PrimaryKeyWithScore>> groupedKeys = new
HashMap<>();
+ // We always want to get at least 1.
+ int rowsToRetrieve = Math.max(1, softLimit - returnedRowCount);
+ // We want to get the first unique `rowsToRetrieve` keys to
materialize
+ // Don't pass the priority queue here because it is more efficient
to add keys in bulk
+ fillKeys(groupedKeys, rowsToRetrieve, null);
+ // Sort the primary keys by PrK order, just in case that helps
with cache and disk efficiency
+ PriorityQueue<PrimaryKey> primaryKeyPriorityQueue = new
PriorityQueue<>(groupedKeys.keySet());
+
+ // drain groupedKeys into pendingRows
+ while (!groupedKeys.isEmpty())
+ {
+ PrimaryKey pk = primaryKeyPriorityQueue.poll();
+ List<PrimaryKeyWithScore> sourceKeys = groupedKeys.remove(pk);
+ UnfilteredRowIterator partitionIterator =
readAndValidatePartition(pk, sourceKeys);
+ if (partitionIterator != null)
+ pendingRows.add(partitionIterator);
+ else
+ // The current primaryKey did not produce a partition
iterator. We know the caller will need
+ // `rowsToRetrieve` rows, so we get the next unique key
and add it to the queue.
+ fillKeys(groupedKeys, 1, primaryKeyPriorityQueue);
}
+ }
- if (topK && !keysToShadow.isEmpty())
+ /**
+ * Fills the `groupedKeys` Map with the next `count` unique primary
keys that are in the keys produced by calling
+ * {@link #nextSelectedKeyInRange()}. We map PrimaryKey to a list of
PrimaryKeyWithScore because the same
+ * primary key can be in the result set multiple times, but with
different source tables.
+ * @param groupedKeys the map to fill
+ * @param count the number of unique PrimaryKeys to consume from the
iterator
+ * @param primaryKeyPriorityQueue the priority queue to add new keys
to. If the queue is null, we do not add
+ * keys to the queue.
+ */
+ private void fillKeys(Map<PrimaryKey, List<PrimaryKeyWithScore>>
groupedKeys, int count, PriorityQueue<PrimaryKey> primaryKeyPriorityQueue)
+ {
+ int initialSize = groupedKeys.size();
+ while (groupedKeys.size() - initialSize < count)
{
- // Record primary keys shadowed by expired TTLs, row
tombstones, or range tombstones:
-
queryContext.vectorContext().recordShadowedPrimaryKeys(keysToShadow);
+ PrimaryKeyWithScore primaryKeyWithScore =
nextSelectedKeyInRange();
+ if (primaryKeyWithScore == null)
+ return;
+ PrimaryKey nextPrimaryKey = primaryKeyWithScore.primaryKey();
+ List<PrimaryKeyWithScore> accumulator =
groupedKeys.computeIfAbsent(nextPrimaryKey, k -> new ArrayList<>());
+ if (primaryKeyPriorityQueue != null && accumulator.isEmpty())
+ primaryKeyPriorityQueue.add(nextPrimaryKey);
+ accumulator.add(primaryKeyWithScore);
}
+ }
+
+ /**
+ * Determine if the key is in one of the queried key ranges. We do not
iterate through results in
+ * {@link PrimaryKey} order, so we have to check each range.
+ * @param key
+ * @return true if the key is in one of the queried key ranges
+ */
+ private boolean isInRange(DecoratedKey key)
+ {
+ if (coversFullRing)
+ return true;
- if (!hasMatch)
+ for (AbstractBounds<PartitionPosition> range : keyRanges)
+ if (range.contains(key))
+ return true;
+ return false;
+ }
+
+ /**
+ * Returns the next available key contained by one of the keyRanges
and selected by the queryController.
+ * If the next key falls out of the current key range, it skips to the
next key range, and so on.
+ * If no more keys acceptd by the controller are available, returns
null.
+ */
+ private @Nullable PrimaryKeyWithScore nextSelectedKeyInRange()
+ {
+ while (scoredPrimaryKeyIterator.hasNext())
{
- // If there are no matches, return an empty partition. If
reconciliation is required at the
- // coordinator, replica filtering protection may make a second
round trip to complete its view
- // of the partition.
+ PrimaryKeyWithScore key = scoredPrimaryKeyIterator.next();
+ if (isInRange(key.primaryKey().partitionKey()) &&
!controller.doesNotSelect(key.primaryKey()))
+ return key;
+ }
+ return null;
+ }
+
+ /**
+ * Reads and validates a partition for a given primary key against its
sources.
+ * <p>
+ * @param pk The primary key of the partition to read and validate
+ * @param sourceKeys A list of PrimaryKeyWithScore objects associated
with the primary key.
+ * Multiple sort keys can exist for the same primary
key when data comes from different
+ * sstables or memtables.
+ *
+ * @return An UnfilteredRowIterator containing the validated partition
data, or null if:
+ * - The key has already been processed
+ * - The partition does not pass index filters
+ * - The partition contains no valid rows
+ * - The row data does not match the index metadata for any of
the provided primary keys
+ */
+ public UnfilteredRowIterator readAndValidatePartition(PrimaryKey pk,
List<PrimaryKeyWithScore> sourceKeys)
+ {
+ // If we've already processed the key, we can skip it. Because the
score ordered iterator does not
+ // deduplicate rows, we could see dupes if a row is in the
ordering index multiple times. This happens
+ // in the case of dupes and of overwrites.
+ if (processedKeys.contains(pk))
return null;
+
+ try (UnfilteredRowIterator partition = controller.queryStorage(pk,
view, executionController))
+ {
+ queryContext.partitionsRead++;
+ queryContext.checkpoint();
+ Row staticRow = partition.staticRow();
+ UnfilteredRowIterator clusters = filterPartition(partition,
filterTree, queryContext);
+
+ if (clusters == null || !clusters.hasNext())
+ {
+ processedKeys.add(pk);
+ return null;
+ }
+
+ long now = FBUtilities.nowInSeconds();
+ boolean isRowValid = false;
+ Unfiltered row = clusters.next();
+ assert !clusters.hasNext() : "Expected only one row per
partition";
+ if (!row.isRangeTombstoneMarker())
+ {
+ for (PrimaryKeyWithScore sourceKey : sourceKeys)
+ {
+ // Each of these primary keys are equal, but they have
different source tables. Therefore,
+ // we check to see if the row is valid for any of
them, and if it is, we return the row.
+ if (sourceKey.isIndexDataValid((Row) row, now))
+ {
+ isRowValid = true;
+ // We can only count the pk as processed once we
know it was valid for one of the
+ // scored keys.
+ processedKeys.add(pk);
+ break;
+ }
+ }
+ }
+ return isRowValid ? new PrimaryKeyIterator(partition,
staticRow, row)
Review Comment:
So to ground myself a bit, is it correct to say that we never attempt to
batch rows in the same partition? In data models without clusterings, I guess
this doesn't matter...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]