maedhroz commented on code in PR #4353:
URL: https://github.com/apache/cassandra/pull/4353#discussion_r2710947473
##########
src/java/org/apache/cassandra/index/sai/plan/QueryController.java:
##########
@@ -313,117 +360,90 @@ public boolean doesNotSelect(PrimaryKey key)
}
// This is an ANN only query
- public KeyRangeIterator getTopKRows(RowFilter.Expression expression)
+ public CloseableIterator<PrimaryKeyWithScore>
getTopKRows(QueryViewBuilder.QueryExpressionView queryExpressionView)
{
- assert expression.operator() == Operator.ANN;
- StorageAttachedIndex index = indexFor(expression);
- assert index != null;
- Expression planExpression = Expression.create(index).add(Operator.ANN,
expression.getIndexValue().duplicate());
-
- QueryViewBuilder.QueryView queryView = new
QueryViewBuilder(Collections.singleton(planExpression), mergeRange).build();
- Runnable onClose = () ->
queryView.referencedIndexes.forEach(SSTableIndex::releaseQuietly);
-
+ assert queryExpressionView.expression.operator ==
Expression.IndexOperator.ANN;
+ List<CloseableIterator<PrimaryKeyWithScore>> intermediateResults = new
ArrayList<>();
try
{
- List<KeyRangeIterator> memtableResults = queryView.view
- .stream()
- .map(v ->
v.memtableIndexes)
-
.flatMap(Collection::stream)
- .map(idx ->
idx.search(queryContext, planExpression, mergeRange))
-
.collect(Collectors.toList());
-
- List<KeyRangeIterator> sstableIntersections = queryView.view
- .stream()
-
.map(this::createRowIdIterator)
-
.collect(Collectors.toList());
-
- return IndexSearchResultIterator.build(sstableIntersections,
memtableResults, queryView.referencedIndexes, queryContext, onClose);
+ for (MemtableIndex memtableIndex :
queryExpressionView.memtableIndexes)
+ intermediateResults.add(memtableIndex.orderBy(queryContext,
queryExpressionView.expression, mergeRange));
+ for (SSTableIndex sstableIndex :
queryExpressionView.sstableIndexes)
+
intermediateResults.addAll(sstableIndex.orderBy(queryExpressionView.expression,
mergeRange, queryContext));
+ return intermediateResults.isEmpty() ? CloseableIterator.empty()
+ : new
MergePrimaryKeyWithScoreIterator(intermediateResults);
}
catch (Throwable t)
{
// all sstable indexes in view have been referenced, need to clean
up when exception is thrown
- onClose.run();
- throw t;
+
queryExpressionView.sstableIndexes.forEach(SSTableIndex::releaseQuietly);
+ intermediateResults.forEach(FileUtils::closeQuietly);
+ throw Throwables.cleaned(t);
}
}
// This is a hybrid query. We apply all other predicates before ordering
and limiting.
- public KeyRangeIterator getTopKRows(KeyRangeIterator source,
RowFilter.Expression expression)
+ public CloseableIterator<PrimaryKeyWithScore> getTopKRows(KeyRangeIterator
source, QueryViewBuilder.QueryExpressionView queryExpressionView)
{
- return new KeyRangeOrderingIterator(source, orderChunkSize, list ->
this.getTopKRows(list, expression));
+ List<PrimaryKey> primaryKeys = materializeKeysAndCloseSource(source);
+ if (primaryKeys.isEmpty())
+ return CloseableIterator.empty();
+ return getTopKRows(primaryKeys, queryExpressionView);
}
- private KeyRangeIterator getTopKRows(List<PrimaryKey> rawSourceKeys,
RowFilter.Expression expression)
+ private CloseableIterator<PrimaryKeyWithScore>
getTopKRows(List<PrimaryKey> sourceKeys, QueryViewBuilder.QueryExpressionView
queryExpressionView)
{
- VectorQueryContext vectorQueryContext = queryContext.vectorContext();
- // Filter out PKs now. Each PK is passed to every segment of the ANN
index, so filtering shadowed keys
- // eagerly can save some work when going from PK to row id for on disk
segments.
- // Since the result is shared with multiple streams, we use an
unmodifiable list.
- List<PrimaryKey> sourceKeys =
rawSourceKeys.stream().filter(vectorQueryContext::shouldInclude).collect(Collectors.toList());
- StorageAttachedIndex index = indexFor(expression);
- assert index != null : "Cannot do ANN ordering on an unindexed column";
- Expression planExpression = Expression.create(index);
- planExpression.add(Operator.ANN,
expression.getIndexValue().duplicate());
-
- QueryViewBuilder.QueryView queryView = new
QueryViewBuilder(Collections.singleton(planExpression), mergeRange).build();
- Runnable onClose = () ->
queryView.referencedIndexes.forEach(SSTableIndex::releaseQuietly);
-
+ List<CloseableIterator<PrimaryKeyWithScore>> intermediateResults = new
ArrayList<>();
try
{
- List<KeyRangeIterator> memtableResults = queryView.view
- .stream()
- .map(v ->
v.memtableIndexes)
-
.flatMap(Collection::stream)
- .map(idx ->
idx.limitToTopResults(sourceKeys, planExpression, vectorQueryContext.limit()))
-
.collect(Collectors.toList());
-
- List<KeyRangeIterator> sstableIntersections = queryView.view
- .stream()
-
.flatMap(pair -> pair.sstableIndexes.stream())
- .map(idx ->
{
- try
- {
-
return idx.limitToTopKResults(queryContext, sourceKeys, planExpression);
- }
- catch
(IOException e)
- {
-
throw new UncheckedIOException(e);
- }
- })
-
.collect(Collectors.toList());
-
- return IndexSearchResultIterator.build(sstableIntersections,
memtableResults, queryView.referencedIndexes, queryContext, onClose);
+ for (MemtableIndex memtableIndex :
queryExpressionView.memtableIndexes)
+
intermediateResults.add(memtableIndex.orderResultsBy(queryContext, sourceKeys,
queryExpressionView.expression));
+ for (SSTableIndex sstableIndex :
queryExpressionView.sstableIndexes)
+
intermediateResults.addAll(sstableIndex.orderResultsBy(queryContext,
sourceKeys, queryExpressionView.expression));
+ return intermediateResults.isEmpty() ? CloseableIterator.empty()
+ : new
MergePrimaryKeyWithScoreIterator(intermediateResults);
}
catch (Throwable t)
{
// all sstable indexes in view have been referenced, need to clean
up when exception is thrown
- onClose.run();
- throw t;
+
queryExpressionView.sstableIndexes.forEach(SSTableIndex::releaseQuietly);
+ intermediateResults.forEach(FileUtils::closeQuietly);
+ throw Throwables.cleaned(t);
}
}
/**
- * Create row id iterator from different indexes' on-disk searcher of the
same sstable
+ * Materialize the keys from the given source iterator. If there is a
meaningful {@link #mergeRange}, the keys
+ * are filtered to only include those within the range. Note: closes the
source iterator.
+ * @param source The source iterator to fully consume by materializing its
keys
+ * @return The list of materialized keys within the {@link #mergeRange}.
*/
- private KeyRangeIterator
createRowIdIterator(QueryViewBuilder.QueryExpressionView indexExpression)
+ private List<PrimaryKey> materializeKeysAndCloseSource(KeyRangeIterator
source)
{
- List<KeyRangeIterator> subIterators = indexExpression.sstableIndexes
- .stream()
- .map(index ->
- {
- try
- {
- List<KeyRangeIterator> iterators =
index.search(indexExpression.expression, mergeRange, queryContext);
- // concat the result from multiple
segments for the same index
- return
KeyRangeConcatIterator.builder(iterators.size()).add(iterators).build();
- }
- catch (Throwable ex)
- {
- throw Throwables.cleaned(ex);
- }
- }).collect(Collectors.toList());
-
- return KeyRangeUnionIterator.build(subIterators);
+ try (source)
+ {
+ // Skip to the first key (which is really just a token) in the
range if it is not the minimum token
+ if (!mergeRange.left.isMinimum())
+ source.skipTo(firstPrimaryKey);
+
+ if (!source.hasNext())
+ return List.of();
+
+ PrimaryKey maxToken =
keyFactory.create(mergeRange.right.getToken());
+ boolean hasLimitingMaxToken = !maxToken.token().isMinimum() &&
maxToken.compareTo(source.getMaximum()) < 0;
+ List<PrimaryKey> primaryKeys = new ArrayList<>();
+ int count = 0;
+ while (source.hasNext())
+ {
+ PrimaryKey next = source.next();
+ if (hasLimitingMaxToken && next.compareTo(maxToken) > 0)
+ break;
+ primaryKeys.add(next);
+ if (MAX_MATERIALIZED_KEYS < ++count)
+ throw new
QueryMaterializesTooManyPrimaryKeysException("Too many primary keys. Attempted
to load more than: " + MAX_MATERIALIZED_KEYS);
Review Comment:
I've mulled this back and forth a bit, but I think we want to consider two
things:
1.) Moving `max_materialized_keys` to something much less than 1M.
`ReplicaFilteringProtectionOptions` is a similar place where we accumulate keys
in memory, and it defaults to a warning at 2000 (and failure at 32k). I think
we should go with something more conservative like 4096 (if we're into powers
of 2).
2.) Rather than failing the query, we should just back out and hit the
"orderBy" pathway if we hit the max/threshold. It's entirely possible that with
an extremely non-selective filter, we'll still be able to complete the query
before the timeout, because post-filtering won't filter many things out.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]