This is an automated email from the ASF dual-hosted git repository.

fortino pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5d5086d250 OAK-11898: decouple ElasticSecureFacetAsyncProvider from 
async iterator (#2564)
5d5086d250 is described below

commit 5d5086d250d35eba921a60b944fee58cdf2c1417
Author: Fabrizio Fortino <[email protected]>
AuthorDate: Wed Oct 29 17:33:55 2025 +0100

    OAK-11898: decouple ElasticSecureFacetAsyncProvider from async iterator 
(#2564)
    
    * OAK-11899: decouple ElasticSecureFacetAsyncProvider from async iterator
    
    * Update 
oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticSecureFacetAsyncProvider.java
    
    Co-authored-by: Thomas Mueller <[email protected]>
    
    * OAK-11899: fix error, improved logs
    
    * OAK-11899: fix log message
    
    * OAK-11899: (minor) comment
    
    * OAK-11899: improved log message
    
    * OAK-11899: add missing exception stacktrace
    
    * OAK-11998: make size configurable and add check based on limitReads
    
    ---------
    
    Co-authored-by: Thomas Mueller <[email protected]>
---
 .../index/elastic/ElasticIndexDefinition.java      |   5 +
 .../index/elastic/query/ElasticRequestHandler.java |   4 +
 .../query/async/ElasticResponseListener.java       |  20 --
 .../query/async/ElasticResultRowAsyncIterator.java |  11 +-
 .../query/async/facets/ElasticFacetProvider.java   |   3 +-
 .../facets/ElasticSecureFacetAsyncProvider.java    | 261 ++++++++++++++-------
 6 files changed, 193 insertions(+), 111 deletions(-)

diff --git 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexDefinition.java
 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexDefinition.java
index fbd4e01767..ad3fca090f 100644
--- 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexDefinition.java
+++ 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexDefinition.java
@@ -59,6 +59,9 @@ public class ElasticIndexDefinition extends IndexDefinition {
     public static final String TRACK_TOTAL_HITS = "trackTotalHits";
     public static final Integer TRACK_TOTAL_HITS_DEFAULT = 10000;
 
+    public static final String SECURE_FACETS_DOCS_SIZE = 
"secureFacetsDocsSize";
+    public static final int SECURE_FACETS_DOCS_SIZE_DEFAULT = 10000;
+
     /**
      * Hidden property for storing the index mapping version.
      */
@@ -176,6 +179,7 @@ public class ElasticIndexDefinition extends IndexDefinition 
{
     public final long indexNameSeed;
     public final InferenceDefinition inferenceDefinition;
     public final long limitTotalFields;
+    public final int secureFacetsDocsSize;
 
     private final Map<String, List<PropertyDefinition>> propertiesByName;
     private final List<ElasticPropertyDefinition> dynamicBoostProperties;
@@ -208,6 +212,7 @@ public class ElasticIndexDefinition extends IndexDefinition 
{
         this.indexNameSeed = getOptionalValue(defn, INDEX_NAME_SEED, 
INDEX_NAME_SEED_DEFAULT);
         this.similarityTagsFields = getOptionalValues(defn, 
SIMILARITY_TAGS_FIELDS, Type.STRINGS, String.class, 
SIMILARITY_TAGS_FIELDS_DEFAULT);
         this.limitTotalFields = getOptionalValue(defn, LIMIT_TOTAL_FIELDS, 
LIMIT_TOTAL_FIELDS_DEFAULT);
+        this.secureFacetsDocsSize = getOptionalValue(defn, 
SECURE_FACETS_DOCS_SIZE, SECURE_FACETS_DOCS_SIZE_DEFAULT);
 
         this.propertiesByName = getDefinedRules()
                 .stream()
diff --git 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticRequestHandler.java
 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticRequestHandler.java
index 055eb02271..640b48a3d5 100644
--- 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticRequestHandler.java
+++ 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticRequestHandler.java
@@ -250,6 +250,10 @@ public class ElasticRequestHandler {
         return bqb;
     }
 
+    public long getResultSizeLimit() {
+        return indexPlan.getFilter().getQueryLimits().getLimitReads();
+    }
+
     private String generateFieldsForMLT() {
         //TODO with addition of :enricher status for inference. All documents 
will now have :enricher for inference enabled indexes.
         // as as result mlt is now returning all documents.
diff --git 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResponseListener.java
 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResponseListener.java
index f3a770f6fe..f51cf519d4 100644
--- 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResponseListener.java
+++ 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResponseListener.java
@@ -19,11 +19,9 @@ package 
org.apache.jackrabbit.oak.plugins.index.elastic.query.async;
 import co.elastic.clients.elasticsearch._types.aggregations.Aggregate;
 import co.elastic.clients.elasticsearch.core.search.Hit;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.jackrabbit.oak.plugins.index.search.FieldNames;
 import org.osgi.annotation.versioning.ProviderType;
 
 import java.util.Map;
-import java.util.Set;
 
 /**
  * Generic listener of Elastic response
@@ -31,17 +29,6 @@ import java.util.Set;
 @ProviderType
 public interface ElasticResponseListener {
 
-    Set<String> DEFAULT_SOURCE_FIELDS = Set.of(FieldNames.PATH);
-
-    /**
-     * Returns the source fields this listener is interested on
-     *
-     * @return the list of fields to listen to (only PATH as default)
-     */
-    default Set<String> sourceFields() {
-        return DEFAULT_SOURCE_FIELDS;
-    }
-
     /**
      * This method is invoked when there is no more data to process.
      */
@@ -52,13 +39,6 @@ public interface ElasticResponseListener {
      */
     interface SearchHitListener extends ElasticResponseListener {
 
-        /**
-         * Returns {@code true} if the listener is interested in the entire 
result set
-         */
-        default boolean isFullScan() {
-            return false;
-        }
-
         /**
          * This method is invoked at the beginning of the listener lifecycle 
to notify the number of hits this
          * listener could receive
diff --git 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java
 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java
index c9ec02fea0..ac01b54141 100644
--- 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java
+++ 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java
@@ -25,6 +25,7 @@ import 
org.apache.jackrabbit.oak.plugins.index.elastic.query.ElasticRequestHandl
 import 
org.apache.jackrabbit.oak.plugins.index.elastic.query.ElasticResponseHandler;
 import 
org.apache.jackrabbit.oak.plugins.index.elastic.query.async.facets.ElasticFacetProvider;
 import org.apache.jackrabbit.oak.plugins.index.elastic.util.ElasticIndexUtils;
+import org.apache.jackrabbit.oak.plugins.index.search.FieldNames;
 import 
org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndex.FulltextResultRow;
 import org.apache.jackrabbit.oak.spi.query.QueryIndex;
 import org.apache.jackrabbit.oak.spi.query.QueryIndex.IndexPlan;
@@ -280,7 +281,6 @@ public class ElasticResultRowAsyncIterator implements 
ElasticQueryIterator, Elas
 
         private int scannedRows;
         private int requests;
-        private boolean fullScan;
         private long searchStartTime;
 
         // reference to the last document sort values for search_after queries
@@ -296,17 +296,12 @@ public class ElasticResultRowAsyncIterator implements 
ElasticQueryIterator, Elas
             this.sorts = elasticRequestHandler.baseSorts();
             this.highlight = elasticRequestHandler.highlight();
 
-            Set<String> sourceFieldsSet = new HashSet<>();
             AtomicBoolean needsAggregations = new AtomicBoolean(false);
             Consumer<ElasticResponseListener> register = (listener) -> {
                 allListeners.add(listener);
-                sourceFieldsSet.addAll(listener.sourceFields());
                 if (listener instanceof SearchHitListener) {
                     SearchHitListener searchHitListener = (SearchHitListener) 
listener;
                     searchHitListeners.add(searchHitListener);
-                    if (searchHitListener.isFullScan()) {
-                        fullScan = true;
-                    }
                 }
                 if (listener instanceof AggregationListener) {
                     aggregationListeners.add((AggregationListener) listener);
@@ -314,7 +309,7 @@ public class ElasticResultRowAsyncIterator implements 
ElasticQueryIterator, Elas
                 }
             };
             listeners.forEach(register);
-            this.sourceConfig = SourceConfig.of(fn -> fn.filter(f -> 
f.includes(new ArrayList<>(sourceFieldsSet))));
+            this.sourceConfig = SourceConfig.of(fn -> fn.filter(f -> 
f.includes(FieldNames.PATH)));
 
             searchRequest = SearchRequest.of(builder -> {
                         builder
@@ -412,7 +407,7 @@ public class ElasticResultRowAsyncIterator implements 
ElasticQueryIterator, Elas
                 if (!anyDataLeft.get()) {
                     LOG.trace("No data left: closing scanner, notifying 
listeners");
                     close();
-                } else if (fullScan || !areAllListenersProcessed) {
+                } else if (!areAllListenersProcessed) {
                     scan();
                 }
             } else {
diff --git 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticFacetProvider.java
 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticFacetProvider.java
index 9a03dec075..c64da64966 100644
--- 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticFacetProvider.java
+++ 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticFacetProvider.java
@@ -64,7 +64,8 @@ public interface ElasticFacetProvider extends 
FulltextIndex.FacetProvider {
                 break;
             case SECURE:
             default:
-                facetProvider = new 
ElasticSecureFacetAsyncProvider(requestHandler, responseHandler, isAccessible, 
facetsEvaluationTimeoutMs);
+                facetProvider = new 
ElasticSecureFacetAsyncProvider(connection, indexDefinition,
+                        requestHandler, responseHandler, isAccessible, 
facetsEvaluationTimeoutMs);
         }
         return facetProvider;
     }
diff --git 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticSecureFacetAsyncProvider.java
 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticSecureFacetAsyncProvider.java
index 6151699b1e..887fc0bb0f 100644
--- 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticSecureFacetAsyncProvider.java
+++ 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticSecureFacetAsyncProvider.java
@@ -16,144 +16,241 @@
  */
 package org.apache.jackrabbit.oak.plugins.index.elastic.query.async.facets;
 
+import co.elastic.clients.elasticsearch._types.FieldValue;
+import co.elastic.clients.elasticsearch._types.query_dsl.Query;
+import co.elastic.clients.elasticsearch.core.SearchRequest;
 import co.elastic.clients.elasticsearch.core.search.Hit;
+import co.elastic.clients.elasticsearch.core.search.SourceConfig;
 import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.JsonNodeType;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
+import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
 import 
org.apache.jackrabbit.oak.plugins.index.elastic.query.ElasticRequestHandler;
 import 
org.apache.jackrabbit.oak.plugins.index.elastic.query.ElasticResponseHandler;
-import 
org.apache.jackrabbit.oak.plugins.index.elastic.query.async.ElasticResponseListener;
 import org.apache.jackrabbit.oak.plugins.index.elastic.util.ElasticIndexUtils;
+import org.apache.jackrabbit.oak.plugins.index.search.FieldNames;
 import org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndex;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Comparator;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 /**
  * An {@link ElasticFacetProvider} that subscribes to Elastic SearchHit events 
to return only accessible facets.
  */
-class ElasticSecureFacetAsyncProvider implements ElasticFacetProvider, 
ElasticResponseListener.SearchHitListener {
+class ElasticSecureFacetAsyncProvider implements ElasticFacetProvider {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ElasticSecureFacetAsyncProvider.class);
 
-    private final Set<String> facetFields;
-    private final long facetsEvaluationTimeoutMs;
-    private final Map<String, Map<String, MutableInt>> accessibleFacets = new 
ConcurrentHashMap<>();
     private final ElasticResponseHandler elasticResponseHandler;
+    private final long resultSizeLimit;
     private final Predicate<String> isAccessible;
-    private final CountDownLatch latch = new CountDownLatch(1);
+    private final Set<String> facetFields;
+    private final long facetsEvaluationTimeoutMs;
     private Map<String, List<FulltextIndex.Facet>> facets;
+    private final SearchRequest searchRequest;
+    private final CompletableFuture<Map<String, List<FulltextIndex.Facet>>> 
searchFuture;
+
+    private long processedDocs = 0L;
+
+    private final long queryStartTimeNanos;
+
+    ElasticSecureFacetAsyncProvider(ElasticConnection connection, 
ElasticIndexDefinition indexDefinition,
+                                         ElasticRequestHandler 
elasticRequestHandler, ElasticResponseHandler elasticResponseHandler,
+                                         Predicate<String> isAccessible, long 
facetsEvaluationTimeoutMs) {
 
-    ElasticSecureFacetAsyncProvider(
-            ElasticRequestHandler elasticRequestHandler,
-            ElasticResponseHandler elasticResponseHandler,
-            Predicate<String> isAccessible,
-            long facetsEvaluationTimeoutMs) {
         this.elasticResponseHandler = elasticResponseHandler;
+        this.resultSizeLimit = elasticRequestHandler.getResultSizeLimit();
         this.isAccessible = isAccessible;
         this.facetFields = elasticRequestHandler.facetFields().
                 map(ElasticIndexUtils::fieldName).
                 collect(Collectors.toUnmodifiableSet());
         this.facetsEvaluationTimeoutMs = facetsEvaluationTimeoutMs;
+
+        // Base search request template (without search_after initially)
+        this.searchRequest = SearchRequest.of(srb -> 
srb.index(indexDefinition.getIndexAlias())
+                .source(SourceConfig.of(scf -> scf.filter(ff -> 
ff.includes(FieldNames.PATH).includes(new ArrayList<>(facetFields)))))
+                .query(Query.of(qb -> 
qb.bool(elasticRequestHandler.baseQueryBuilder().build())))
+                .size(indexDefinition.secureFacetsDocsSize) // batch size for 
each search_after iteration
+                .sort(s -> s.field(fs -> fs.field(FieldNames.PATH)))
+        );
+
+        this.queryStartTimeNanos = System.nanoTime();
+        LOG.trace("Kicking search query with search_after pagination {}", 
searchRequest);
+
+        // Start the iterative search process
+        this.searchFuture = searchAllResultsIncremental(connection);
     }
 
-    @Override
-    public Set<String> sourceFields() {
-        return facetFields;
+    private CompletableFuture<Map<String, List<FulltextIndex.Facet>>> 
searchAllResultsIncremental(ElasticConnection connection) {
+        // Initialize empty facet accumulator
+        Map<String, Map<String, Integer>> accumulatedFacets = new 
ConcurrentHashMap<>();
+
+        return searchWithIncrementalFacetProcessing(connection, null, 
accumulatedFacets)
+                .thenApplyAsync(this::buildFinalFacetResult);
     }
 
-    @Override
-    public boolean isFullScan() {
-        return true;
+    private CompletableFuture<Map<String, Map<String, Integer>>> 
searchWithIncrementalFacetProcessing(
+            ElasticConnection connection, List<FieldValue> searchAfter, 
Map<String, Map<String, Integer>> accumulatedFacets) {
+
+        // Build search request with search_after if provided
+        SearchRequest currentRequest = searchAfter == null ?
+                searchRequest :
+                SearchRequest.of(srb -> srb.index(searchRequest.index())
+                        .source(searchRequest.source())
+                        .query(searchRequest.query())
+                        .size(searchRequest.size())
+                        .sort(searchRequest.sort())
+                        .searchAfter(searchAfter)
+                );
+
+        return connection.getAsyncClient()
+                .search(currentRequest, ObjectNode.class)
+                .thenComposeAsync(response -> {
+                    List<Hit<ObjectNode>> hits = response.hits().hits();
+
+                    // Process current page facets and merge with accumulated 
facets
+                    Map<String, Map<String, Integer>> pageFacets = 
extractFacetsFromPage(hits);
+                    mergeFacets(accumulatedFacets, pageFacets);
+
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Processed {} documents in current batch, 
accumulated facet values: {}",
+                                hits.size(), 
accumulatedFacets.values().stream().mapToInt(Map::size).sum());
+                    }
+
+                    // If we got fewer results than requested, we've reached 
the end
+                    if (hits.isEmpty() || hits.size() < searchRequest.size()) {
+                        LOG.trace("Reached end of results, final facet 
processing complete");
+                        return 
CompletableFuture.completedFuture(accumulatedFacets);
+                    }
+
+                    // Extract sort values from the last hit for next 
search_after
+                    Hit<ObjectNode> lastHit = hits.get(hits.size() - 1);
+                    List<FieldValue> nextSearchAfter = lastHit.sort();
+
+                    if (nextSearchAfter == null || nextSearchAfter.isEmpty()) {
+                        LOG.warn("No sort values found for search_after, 
stopping pagination");
+                        return 
CompletableFuture.completedFuture(accumulatedFacets);
+                    }
+
+                    processedDocs += hits.size();
+                    if (processedDocs >= resultSizeLimit) {
+                        LOG.warn("Result size limit of {} reached during facet 
computation. " +
+                                "Stopping further processing. Consider using 
statistical or insecure facets", resultSizeLimit);
+                        return 
CompletableFuture.completedFuture(accumulatedFacets);
+                    }
+
+                    // if we get here, it means we have processed 
MAX_PAGE_SIZE hits and there might be more
+                    LOG.warn("Large result set detected ({} hits so far) for 
search request {}. Consider using statistical or insecure facets for better 
performance.",
+                            hits.size(), searchRequest);
+
+                    // Recursively continue with next batch
+                    return searchWithIncrementalFacetProcessing(connection, 
nextSearchAfter, accumulatedFacets);
+                })
+                .exceptionally(throwable -> {
+                    LOG.error("Error during search pagination", throwable);
+                    // Return accumulated facets even if there's an error
+                    return accumulatedFacets;
+                });
     }
 
-    @Override
-    public boolean on(Hit<ObjectNode> searchHit) {
-        final String path = elasticResponseHandler.getPath(searchHit);
-        if (path != null && isAccessible.test(path)) {
-            ObjectNode source = searchHit.source();
-            for (String field : facetFields) {
-                JsonNode value;
-                if (source != null) {
-                    value = source.get(field);
-                    if (value != null) {
-                        if (value.getNodeType() == JsonNodeType.ARRAY) {
-                            for (JsonNode item : value) {
-                                updateAccessibleFacets(field, item.asText());
+    private Map<String, Map<String, Integer>> 
extractFacetsFromPage(List<Hit<ObjectNode>> hits) {
+        Map<String, Map<String, Integer>> pageFacets = new HashMap<>();
+
+        for (Hit<ObjectNode> hit : hits) {
+            ObjectNode document = hit.source();
+            if (document == null) continue;
+
+            // Apply accessibility check
+            String path = elasticResponseHandler.getPath(hit);
+            if (!isAccessible.test(path)) continue;
+
+            // Extract facet field values
+            for (String facetField : facetFields) {
+                JsonNode fieldValue = document.get(facetField);
+                if (fieldValue != null && !fieldValue.isNull()) {
+                    pageFacets.compute(facetField, (key, valueMap) -> {
+                        if (valueMap == null) {
+                            valueMap = new HashMap<>();
+                        }
+                        if (fieldValue.isArray()) {
+                            for (JsonNode val : fieldValue) {
+                                if (val.isTextual()) {
+                                    valueMap.merge(val.asText(), 1, 
Integer::sum);
+                                }
                             }
-                        } else {
-                            updateAccessibleFacets(field, value.asText());
+                        } else if (fieldValue.isTextual()) {
+                            valueMap.merge(fieldValue.asText(), 1, 
Integer::sum);
                         }
-                    }
+                        return valueMap;
+                    });
                 }
             }
         }
-        return true;
+
+        return pageFacets;
     }
 
-    private void updateAccessibleFacets(String field, String value) {
-        accessibleFacets.compute(field, (column, facetValues) -> {
-            if (facetValues == null) {
-                Map<String, MutableInt> values = new HashMap<>();
-                values.put(value, new MutableInt(1));
-                return values;
-            } else {
-                facetValues.compute(value, (k, v) -> {
-                    if (v == null) {
-                        return new MutableInt(1);
-                    } else {
-                        v.increment();
-                        return v;
-                    }
-                });
-                return facetValues;
+    private void mergeFacets(Map<String, Map<String, Integer>> accumulated, 
Map<String, Map<String, Integer>> pageFacets) {
+        for (Map.Entry<String, Map<String, Integer>> entry : 
pageFacets.entrySet()) {
+            Map<String, Integer> valueMap = 
accumulated.computeIfAbsent(entry.getKey(), k -> new HashMap<>());
+            for (Map.Entry<String, Integer> valueEntry : 
entry.getValue().entrySet()) {
+                valueMap.merge(valueEntry.getKey(), valueEntry.getValue(), 
Integer::sum);
             }
-        });
+        }
     }
 
-    @Override
-    public void endData() {
-        // create Facet objects, order by count (desc) and then by label (asc)
-        Comparator<FulltextIndex.Facet> comparator = Comparator
-                .comparing(FulltextIndex.Facet::getCount).reversed()
-                .thenComparing(FulltextIndex.Facet::getLabel);
-        // create Facet objects, order by count (desc) and then by label (asc)
-        facets = accessibleFacets.entrySet()
-                .stream()
-                .collect(Collectors.toMap
-                        (Map.Entry::getKey, x -> x.getValue().entrySet()
-                                .stream()
-                                .map(e -> new FulltextIndex.Facet(e.getKey(), 
e.getValue().intValue()))
-                                .sorted(comparator)
-                                .collect(Collectors.toList())
-                        )
-                );
-        LOG.trace("End data {}", facets);
-        latch.countDown();
+    private Map<String, List<FulltextIndex.Facet>> 
buildFinalFacetResult(Map<String, Map<String, Integer>> accumulatedFacets) {
+        long elapsedMs = (System.nanoTime() - queryStartTimeNanos) / 1_000_000;
+        LOG.debug("Facet computation completed in {}ms", elapsedMs);
+
+        // Convert accumulated facet values to final result format
+        Map<String, List<FulltextIndex.Facet>> result = new HashMap<>();
+
+        for (Map.Entry<String, Map<String, Integer>> entry : 
accumulatedFacets.entrySet()) {
+            String fieldName = entry.getKey();
+            List<FulltextIndex.Facet> facets = 
entry.getValue().entrySet().stream()
+                    .map(e -> new FulltextIndex.Facet(e.getKey(), 
e.getValue()))
+                    .collect(Collectors.toList());
+            result.put(fieldName, facets);
+        }
+
+        return result;
     }
 
     @Override
     public List<FulltextIndex.Facet> getFacets(int numberOfFacets, String 
columnName) {
-        LOG.trace("Requested facets for {} - Latch count: {}", columnName, 
latch.getCount());
-        try {
-            boolean completed = latch.await(facetsEvaluationTimeoutMs, 
TimeUnit.MILLISECONDS);
-            if (!completed) {
-                throw new IllegalStateException("Timed out while waiting for 
facets");
+        // TODO: In case of failure, we log an exception and return null. This 
is likely not the ideal behavior, as the
+        //   caller has no way to distinguish between a failure and empty 
results. But in this PR I'm leaving this
+        //   behavior as is to not introduce further changes. We should revise 
this behavior once the queries for facets
+        //   are decoupled from the query for results, as this will make it 
easier to better handle errors
+        if (!searchFuture.isDone()) {
+            try {
+                LOG.trace("Requested facets for {}. Waiting up to: {}", 
columnName, facetsEvaluationTimeoutMs);
+                long start = System.nanoTime();
+                facets = searchFuture.get(facetsEvaluationTimeoutMs, 
TimeUnit.MILLISECONDS);
+                LOG.trace("Facets computed in {}.", 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
+            } catch (ExecutionException e) {
+                LOG.error("Error evaluating facets. Ignoring. Search request: 
{}", searchRequest, e);
+            } catch (TimeoutException e) {
+                searchFuture.cancel(true);
+                LOG.error("Timed out while waiting for facets. Search request: 
{}", searchRequest, e);
+            } catch (InterruptedException e) {
+                LOG.error("Interrupted while waiting for facets. Search 
request: {}", searchRequest, e);
+                Thread.currentThread().interrupt();  // restore interrupt 
status
+                throw new IllegalStateException("Error while waiting for 
facets", e);
             }
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();  // restore interrupt status
-            throw new IllegalStateException("Error while waiting for facets", 
e);
         }
         LOG.trace("Reading facets for {} from {}", columnName, facets);
         String field = 
ElasticIndexUtils.fieldName(FulltextIndex.parseFacetField(columnName));

Reply via email to