nit0906 commented on code in PR #2564:
URL: https://github.com/apache/jackrabbit-oak/pull/2564#discussion_r2465774461
##########
oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticSecureFacetAsyncProvider.java:
##########
@@ -16,144 +16,231 @@
*/
package org.apache.jackrabbit.oak.plugins.index.elastic.query.async.facets;
+import co.elastic.clients.elasticsearch._types.FieldValue;
+import co.elastic.clients.elasticsearch._types.query_dsl.Query;
+import co.elastic.clients.elasticsearch.core.SearchRequest;
import co.elastic.clients.elasticsearch.core.search.Hit;
+import co.elastic.clients.elasticsearch.core.search.SourceConfig;
import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.JsonNodeType;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
+import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
import
org.apache.jackrabbit.oak.plugins.index.elastic.query.ElasticRequestHandler;
import
org.apache.jackrabbit.oak.plugins.index.elastic.query.ElasticResponseHandler;
-import
org.apache.jackrabbit.oak.plugins.index.elastic.query.async.ElasticResponseListener;
import org.apache.jackrabbit.oak.plugins.index.elastic.util.ElasticIndexUtils;
+import org.apache.jackrabbit.oak.plugins.index.search.FieldNames;
import org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Comparator;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
* An {@link ElasticFacetProvider} that subscribes to Elastic SearchHit events
to return only accessible facets.
*/
-class ElasticSecureFacetAsyncProvider implements ElasticFacetProvider,
ElasticResponseListener.SearchHitListener {
+class ElasticSecureFacetAsyncProvider implements ElasticFacetProvider {
private static final Logger LOG =
LoggerFactory.getLogger(ElasticSecureFacetAsyncProvider.class);
+ private static final int MAX_PAGE_SIZE = 10_000;
- private final Set<String> facetFields;
- private final long facetsEvaluationTimeoutMs;
- private final Map<String, Map<String, MutableInt>> accessibleFacets = new
ConcurrentHashMap<>();
private final ElasticResponseHandler elasticResponseHandler;
private final Predicate<String> isAccessible;
- private final CountDownLatch latch = new CountDownLatch(1);
+ private final Set<String> facetFields;
+ private final long facetsEvaluationTimeoutMs;
private Map<String, List<FulltextIndex.Facet>> facets;
+ private final SearchRequest searchRequest;
+ private final CompletableFuture<Map<String, List<FulltextIndex.Facet>>>
searchFuture;
+
+ private final long queryStartTimeNanos;
+
+ ElasticSecureFacetAsyncProvider(ElasticConnection connection,
ElasticIndexDefinition indexDefinition,
+ ElasticRequestHandler
elasticRequestHandler, ElasticResponseHandler elasticResponseHandler,
+ Predicate<String> isAccessible, long
facetsEvaluationTimeoutMs) {
- ElasticSecureFacetAsyncProvider(
- ElasticRequestHandler elasticRequestHandler,
- ElasticResponseHandler elasticResponseHandler,
- Predicate<String> isAccessible,
- long facetsEvaluationTimeoutMs) {
this.elasticResponseHandler = elasticResponseHandler;
this.isAccessible = isAccessible;
this.facetFields = elasticRequestHandler.facetFields().
map(ElasticIndexUtils::fieldName).
collect(Collectors.toUnmodifiableSet());
this.facetsEvaluationTimeoutMs = facetsEvaluationTimeoutMs;
+
+ // Base search request template (without search_after initially)
+ this.searchRequest = SearchRequest.of(srb ->
srb.index(indexDefinition.getIndexAlias())
+ .source(SourceConfig.of(scf -> scf.filter(ff ->
ff.includes(FieldNames.PATH).includes(new ArrayList<>(facetFields)))))
+ .query(Query.of(qb ->
qb.bool(elasticRequestHandler.baseQueryBuilder().build())))
+ .size(MAX_PAGE_SIZE) // batch size for each search_after
iteration
+ .sort(s -> s.field(fs -> fs.field(FieldNames.PATH)))
+ );
+
+ this.queryStartTimeNanos = System.nanoTime();
+ LOG.trace("Kicking search query with search_after pagination {}",
searchRequest);
+
+ // Start the iterative search process
+ this.searchFuture = searchAllResultsIncremental(connection);
}
- @Override
- public Set<String> sourceFields() {
- return facetFields;
+ private CompletableFuture<Map<String, List<FulltextIndex.Facet>>>
searchAllResultsIncremental(ElasticConnection connection) {
+ // Initialize empty facet accumulator
+ Map<String, Map<String, Integer>> accumulatedFacets = new
ConcurrentHashMap<>();
+
+ return searchWithIncrementalFacetProcessing(connection, null,
accumulatedFacets)
+ .thenApplyAsync(this::buildFinalFacetResult);
}
- @Override
- public boolean isFullScan() {
- return true;
+ private CompletableFuture<Map<String, Map<String, Integer>>>
searchWithIncrementalFacetProcessing(
+ ElasticConnection connection, List<FieldValue> searchAfter,
Map<String, Map<String, Integer>> accumulatedFacets) {
+
+ // Build search request with search_after if provided
+ SearchRequest currentRequest = searchAfter == null ?
+ searchRequest :
+ SearchRequest.of(srb -> srb.index(searchRequest.index())
+ .source(searchRequest.source())
+ .query(searchRequest.query())
+ .size(searchRequest.size())
+ .sort(searchRequest.sort())
+ .searchAfter(searchAfter)
+ );
+
+ return connection.getAsyncClient()
+ .search(currentRequest, ObjectNode.class)
+ .thenComposeAsync(response -> {
+ List<Hit<ObjectNode>> hits = response.hits().hits();
+
+ // Process current page facets and merge with accumulated
facets
+ Map<String, Map<String, Integer>> pageFacets =
extractFacetsFromPage(hits);
+ mergeFacets(accumulatedFacets, pageFacets);
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processed {} documents in current batch,
accumulated facet values: {}",
+ hits.size(),
accumulatedFacets.values().stream().mapToInt(Map::size).sum());
+ }
+
+ // If we got fewer results than requested, we've reached
the end
+ if (hits.isEmpty() || hits.size() < MAX_PAGE_SIZE) {
+ LOG.trace("Reached end of results, final facet
processing complete");
+ return
CompletableFuture.completedFuture(accumulatedFacets);
+ }
+
+ // Extract sort values from the last hit for next
search_after
+ Hit<ObjectNode> lastHit = hits.get(hits.size() - 1);
+ List<FieldValue> nextSearchAfter = lastHit.sort();
+
+ if (nextSearchAfter == null || nextSearchAfter.isEmpty()) {
+ LOG.warn("No sort values found for search_after,
stopping pagination");
+ return
CompletableFuture.completedFuture(accumulatedFacets);
+ }
+
+ // if we get here, it means we have processed
MAX_PAGE_SIZE hits and there might be more
+ LOG.warn("Large result set detected ({} hits so far) for
search request {}. Consider using statistical or insecure facets for better
performance.",
+ hits.size(), searchRequest);
+
+ // Recursively continue with next batch
+ return searchWithIncrementalFacetProcessing(connection,
nextSearchAfter, accumulatedFacets);
+ })
+ .exceptionally(throwable -> {
+ LOG.error("Error during search pagination", throwable);
+ // Return accumulated facets even if there's an error
+ return accumulatedFacets;
+ });
}
- @Override
- public boolean on(Hit<ObjectNode> searchHit) {
- final String path = elasticResponseHandler.getPath(searchHit);
- if (path != null && isAccessible.test(path)) {
- ObjectNode source = searchHit.source();
- for (String field : facetFields) {
- JsonNode value;
- if (source != null) {
- value = source.get(field);
- if (value != null) {
- if (value.getNodeType() == JsonNodeType.ARRAY) {
- for (JsonNode item : value) {
- updateAccessibleFacets(field, item.asText());
+ private Map<String, Map<String, Integer>>
extractFacetsFromPage(List<Hit<ObjectNode>> hits) {
+ Map<String, Map<String, Integer>> pageFacets = new HashMap<>();
+
+ for (Hit<ObjectNode> hit : hits) {
+ ObjectNode document = hit.source();
+ if (document == null) continue;
+
+ // Apply accessibility check
+ String path = elasticResponseHandler.getPath(hit);
+ if (!isAccessible.test(path)) continue;
+
+ // Extract facet field values
+ for (String facetField : facetFields) {
+ JsonNode fieldValue = document.get(facetField);
+ if (fieldValue != null && !fieldValue.isNull()) {
+ pageFacets.compute(facetField, (key, valueMap) -> {
+ if (valueMap == null) {
+ valueMap = new HashMap<>();
+ }
+ if (fieldValue.isArray()) {
+ for (JsonNode val : fieldValue) {
+ if (val.isTextual()) {
+ valueMap.merge(val.asText(), 1,
Integer::sum);
+ }
}
- } else {
- updateAccessibleFacets(field, value.asText());
+ } else if (fieldValue.isTextual()) {
+ valueMap.merge(fieldValue.asText(), 1,
Integer::sum);
}
- }
+ return valueMap;
+ });
}
}
}
- return true;
+
+ return pageFacets;
}
- private void updateAccessibleFacets(String field, String value) {
- accessibleFacets.compute(field, (column, facetValues) -> {
- if (facetValues == null) {
- Map<String, MutableInt> values = new HashMap<>();
- values.put(value, new MutableInt(1));
- return values;
- } else {
- facetValues.compute(value, (k, v) -> {
- if (v == null) {
- return new MutableInt(1);
- } else {
- v.increment();
- return v;
- }
- });
- return facetValues;
+ private void mergeFacets(Map<String, Map<String, Integer>> accumulated,
Map<String, Map<String, Integer>> pageFacets) {
+ for (Map.Entry<String, Map<String, Integer>> entry :
pageFacets.entrySet()) {
+ Map<String, Integer> valueMap =
accumulated.computeIfAbsent(entry.getKey(), k -> new HashMap<>());
+ for (Map.Entry<String, Integer> valueEntry :
entry.getValue().entrySet()) {
+ valueMap.merge(valueEntry.getKey(), valueEntry.getValue(),
Integer::sum);
}
- });
+ }
}
- @Override
- public void endData() {
- // create Facet objects, order by count (desc) and then by label (asc)
- Comparator<FulltextIndex.Facet> comparator = Comparator
- .comparing(FulltextIndex.Facet::getCount).reversed()
- .thenComparing(FulltextIndex.Facet::getLabel);
- // create Facet objects, order by count (desc) and then by label (asc)
- facets = accessibleFacets.entrySet()
- .stream()
- .collect(Collectors.toMap
- (Map.Entry::getKey, x -> x.getValue().entrySet()
- .stream()
- .map(e -> new FulltextIndex.Facet(e.getKey(),
e.getValue().intValue()))
- .sorted(comparator)
- .collect(Collectors.toList())
- )
- );
- LOG.trace("End data {}", facets);
- latch.countDown();
+ private Map<String, List<FulltextIndex.Facet>>
buildFinalFacetResult(Map<String, Map<String, Integer>> accumulatedFacets) {
+ long elapsedMs = (System.nanoTime() - queryStartTimeNanos) / 1_000_000;
+ LOG.debug("Facet computation completed in {}ms", elapsedMs);
+
+ // Convert accumulated facet values to final result format
+ Map<String, List<FulltextIndex.Facet>> result = new HashMap<>();
+
+ for (Map.Entry<String, Map<String, Integer>> entry :
accumulatedFacets.entrySet()) {
+ String fieldName = entry.getKey();
+ List<FulltextIndex.Facet> facets =
entry.getValue().entrySet().stream()
+ .map(e -> new FulltextIndex.Facet(e.getKey(),
e.getValue()))
+ .collect(Collectors.toList());
+ result.put(fieldName, facets);
+ }
+
+ return result;
}
@Override
public List<FulltextIndex.Facet> getFacets(int numberOfFacets, String
columnName) {
- LOG.trace("Requested facets for {} - Latch count: {}", columnName,
latch.getCount());
- try {
- boolean completed = latch.await(facetsEvaluationTimeoutMs,
TimeUnit.MILLISECONDS);
- if (!completed) {
- throw new IllegalStateException("Timed out while waiting for
facets");
+ // TODO: In case of failure, we log an exception and return null. This
is likely not the ideal behavior, as the
+ // caller has no way to distinguish between a failure and empty
results. But in this PR I'm leaving this
+ // behavior as is to not introduce further changes. We should revise
this behavior once the queries for facets
+ // are decoupled from the query for results, as this will make it
easier to better handle errors
+ if (!searchFuture.isDone()) {
+ try {
+ LOG.trace("Requested facets for {}. Waiting up to: {}",
columnName, facetsEvaluationTimeoutMs);
+ long start = System.nanoTime();
+ facets = searchFuture.get(facetsEvaluationTimeoutMs,
TimeUnit.MILLISECONDS);
+ LOG.trace("Facets computed in {}.",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
+ } catch (ExecutionException e) {
+ LOG.error("Error evaluating facets. Ignoring. Search request:
{}", searchRequest, e);
+ } catch (TimeoutException e) {
+ searchFuture.cancel(true);
+ LOG.error("Timed out while waiting for facets. Search request:
{}", searchRequest);
Review Comment:
This one is missing the exception stack trace.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]