This is an automated email from the ASF dual-hosted git repository.
fortino pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5d5086d250 OAK-11898: decouple ElasticSecureFacetAsyncProvider from
async iterator (#2564)
5d5086d250 is described below
commit 5d5086d250d35eba921a60b944fee58cdf2c1417
Author: Fabrizio Fortino <[email protected]>
AuthorDate: Wed Oct 29 17:33:55 2025 +0100
OAK-11898: decouple ElasticSecureFacetAsyncProvider from async iterator
(#2564)
* OAK-11899: decouple ElasticSecureFacetAsyncProvider from async iterator
* Update
oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticSecureFacetAsyncProvider.java
Co-authored-by: Thomas Mueller <[email protected]>
* OAK-11899: fix error, improved logs
* OAK-11899: fix log message
* OAK-11899: (minor) comment
* OAK-11899: improved log message
* OAK-11899: add missing exception stacktrace
* OAK-11998: make size configurable and add check based on limitReads
---------
Co-authored-by: Thomas Mueller <[email protected]>
---
.../index/elastic/ElasticIndexDefinition.java | 5 +
.../index/elastic/query/ElasticRequestHandler.java | 4 +
.../query/async/ElasticResponseListener.java | 20 --
.../query/async/ElasticResultRowAsyncIterator.java | 11 +-
.../query/async/facets/ElasticFacetProvider.java | 3 +-
.../facets/ElasticSecureFacetAsyncProvider.java | 261 ++++++++++++++-------
6 files changed, 193 insertions(+), 111 deletions(-)
diff --git
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexDefinition.java
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexDefinition.java
index fbd4e01767..ad3fca090f 100644
---
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexDefinition.java
+++
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexDefinition.java
@@ -59,6 +59,9 @@ public class ElasticIndexDefinition extends IndexDefinition {
public static final String TRACK_TOTAL_HITS = "trackTotalHits";
public static final Integer TRACK_TOTAL_HITS_DEFAULT = 10000;
+ public static final String SECURE_FACETS_DOCS_SIZE =
"secureFacetsDocsSize";
+ public static final int SECURE_FACETS_DOCS_SIZE_DEFAULT = 10000;
+
/**
* Hidden property for storing the index mapping version.
*/
@@ -176,6 +179,7 @@ public class ElasticIndexDefinition extends IndexDefinition
{
public final long indexNameSeed;
public final InferenceDefinition inferenceDefinition;
public final long limitTotalFields;
+ public final int secureFacetsDocsSize;
private final Map<String, List<PropertyDefinition>> propertiesByName;
private final List<ElasticPropertyDefinition> dynamicBoostProperties;
@@ -208,6 +212,7 @@ public class ElasticIndexDefinition extends IndexDefinition
{
this.indexNameSeed = getOptionalValue(defn, INDEX_NAME_SEED,
INDEX_NAME_SEED_DEFAULT);
this.similarityTagsFields = getOptionalValues(defn,
SIMILARITY_TAGS_FIELDS, Type.STRINGS, String.class,
SIMILARITY_TAGS_FIELDS_DEFAULT);
this.limitTotalFields = getOptionalValue(defn, LIMIT_TOTAL_FIELDS,
LIMIT_TOTAL_FIELDS_DEFAULT);
+ this.secureFacetsDocsSize = getOptionalValue(defn,
SECURE_FACETS_DOCS_SIZE, SECURE_FACETS_DOCS_SIZE_DEFAULT);
this.propertiesByName = getDefinedRules()
.stream()
diff --git
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticRequestHandler.java
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticRequestHandler.java
index 055eb02271..640b48a3d5 100644
---
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticRequestHandler.java
+++
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticRequestHandler.java
@@ -250,6 +250,10 @@ public class ElasticRequestHandler {
return bqb;
}
+ public long getResultSizeLimit() {
+ return indexPlan.getFilter().getQueryLimits().getLimitReads();
+ }
+
private String generateFieldsForMLT() {
//TODO with addition of :enricher status for inference. All documents
will now have :enricher for inference enabled indexes.
// as as result mlt is now returning all documents.
diff --git
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResponseListener.java
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResponseListener.java
index f3a770f6fe..f51cf519d4 100644
---
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResponseListener.java
+++
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResponseListener.java
@@ -19,11 +19,9 @@ package
org.apache.jackrabbit.oak.plugins.index.elastic.query.async;
import co.elastic.clients.elasticsearch._types.aggregations.Aggregate;
import co.elastic.clients.elasticsearch.core.search.Hit;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.jackrabbit.oak.plugins.index.search.FieldNames;
import org.osgi.annotation.versioning.ProviderType;
import java.util.Map;
-import java.util.Set;
/**
* Generic listener of Elastic response
@@ -31,17 +29,6 @@ import java.util.Set;
@ProviderType
public interface ElasticResponseListener {
- Set<String> DEFAULT_SOURCE_FIELDS = Set.of(FieldNames.PATH);
-
- /**
- * Returns the source fields this listener is interested on
- *
- * @return the list of fields to listen to (only PATH as default)
- */
- default Set<String> sourceFields() {
- return DEFAULT_SOURCE_FIELDS;
- }
-
/**
* This method is invoked when there is no more data to process.
*/
@@ -52,13 +39,6 @@ public interface ElasticResponseListener {
*/
interface SearchHitListener extends ElasticResponseListener {
- /**
- * Returns {@code true} if the listener is interested in the entire
result set
- */
- default boolean isFullScan() {
- return false;
- }
-
/**
* This method is invoked at the beginning of the listener lifecycle
to notify the number of hits this
* listener could receive
diff --git
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java
index c9ec02fea0..ac01b54141 100644
---
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java
+++
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java
@@ -25,6 +25,7 @@ import
org.apache.jackrabbit.oak.plugins.index.elastic.query.ElasticRequestHandl
import
org.apache.jackrabbit.oak.plugins.index.elastic.query.ElasticResponseHandler;
import
org.apache.jackrabbit.oak.plugins.index.elastic.query.async.facets.ElasticFacetProvider;
import org.apache.jackrabbit.oak.plugins.index.elastic.util.ElasticIndexUtils;
+import org.apache.jackrabbit.oak.plugins.index.search.FieldNames;
import
org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndex.FulltextResultRow;
import org.apache.jackrabbit.oak.spi.query.QueryIndex;
import org.apache.jackrabbit.oak.spi.query.QueryIndex.IndexPlan;
@@ -280,7 +281,6 @@ public class ElasticResultRowAsyncIterator implements
ElasticQueryIterator, Elas
private int scannedRows;
private int requests;
- private boolean fullScan;
private long searchStartTime;
// reference to the last document sort values for search_after queries
@@ -296,17 +296,12 @@ public class ElasticResultRowAsyncIterator implements
ElasticQueryIterator, Elas
this.sorts = elasticRequestHandler.baseSorts();
this.highlight = elasticRequestHandler.highlight();
- Set<String> sourceFieldsSet = new HashSet<>();
AtomicBoolean needsAggregations = new AtomicBoolean(false);
Consumer<ElasticResponseListener> register = (listener) -> {
allListeners.add(listener);
- sourceFieldsSet.addAll(listener.sourceFields());
if (listener instanceof SearchHitListener) {
SearchHitListener searchHitListener = (SearchHitListener)
listener;
searchHitListeners.add(searchHitListener);
- if (searchHitListener.isFullScan()) {
- fullScan = true;
- }
}
if (listener instanceof AggregationListener) {
aggregationListeners.add((AggregationListener) listener);
@@ -314,7 +309,7 @@ public class ElasticResultRowAsyncIterator implements
ElasticQueryIterator, Elas
}
};
listeners.forEach(register);
- this.sourceConfig = SourceConfig.of(fn -> fn.filter(f ->
f.includes(new ArrayList<>(sourceFieldsSet))));
+ this.sourceConfig = SourceConfig.of(fn -> fn.filter(f ->
f.includes(FieldNames.PATH)));
searchRequest = SearchRequest.of(builder -> {
builder
@@ -412,7 +407,7 @@ public class ElasticResultRowAsyncIterator implements
ElasticQueryIterator, Elas
if (!anyDataLeft.get()) {
LOG.trace("No data left: closing scanner, notifying
listeners");
close();
- } else if (fullScan || !areAllListenersProcessed) {
+ } else if (!areAllListenersProcessed) {
scan();
}
} else {
diff --git
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticFacetProvider.java
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticFacetProvider.java
index 9a03dec075..c64da64966 100644
---
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticFacetProvider.java
+++
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticFacetProvider.java
@@ -64,7 +64,8 @@ public interface ElasticFacetProvider extends
FulltextIndex.FacetProvider {
break;
case SECURE:
default:
- facetProvider = new
ElasticSecureFacetAsyncProvider(requestHandler, responseHandler, isAccessible,
facetsEvaluationTimeoutMs);
+ facetProvider = new
ElasticSecureFacetAsyncProvider(connection, indexDefinition,
+ requestHandler, responseHandler, isAccessible,
facetsEvaluationTimeoutMs);
}
return facetProvider;
}
diff --git
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticSecureFacetAsyncProvider.java
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticSecureFacetAsyncProvider.java
index 6151699b1e..887fc0bb0f 100644
---
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticSecureFacetAsyncProvider.java
+++
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticSecureFacetAsyncProvider.java
@@ -16,144 +16,241 @@
*/
package org.apache.jackrabbit.oak.plugins.index.elastic.query.async.facets;
+import co.elastic.clients.elasticsearch._types.FieldValue;
+import co.elastic.clients.elasticsearch._types.query_dsl.Query;
+import co.elastic.clients.elasticsearch.core.SearchRequest;
import co.elastic.clients.elasticsearch.core.search.Hit;
+import co.elastic.clients.elasticsearch.core.search.SourceConfig;
import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.JsonNodeType;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
+import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
import
org.apache.jackrabbit.oak.plugins.index.elastic.query.ElasticRequestHandler;
import
org.apache.jackrabbit.oak.plugins.index.elastic.query.ElasticResponseHandler;
-import
org.apache.jackrabbit.oak.plugins.index.elastic.query.async.ElasticResponseListener;
import org.apache.jackrabbit.oak.plugins.index.elastic.util.ElasticIndexUtils;
+import org.apache.jackrabbit.oak.plugins.index.search.FieldNames;
import org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Comparator;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
* An {@link ElasticFacetProvider} that subscribes to Elastic SearchHit events
to return only accessible facets.
*/
-class ElasticSecureFacetAsyncProvider implements ElasticFacetProvider,
ElasticResponseListener.SearchHitListener {
+class ElasticSecureFacetAsyncProvider implements ElasticFacetProvider {
private static final Logger LOG =
LoggerFactory.getLogger(ElasticSecureFacetAsyncProvider.class);
- private final Set<String> facetFields;
- private final long facetsEvaluationTimeoutMs;
- private final Map<String, Map<String, MutableInt>> accessibleFacets = new
ConcurrentHashMap<>();
private final ElasticResponseHandler elasticResponseHandler;
+ private final long resultSizeLimit;
private final Predicate<String> isAccessible;
- private final CountDownLatch latch = new CountDownLatch(1);
+ private final Set<String> facetFields;
+ private final long facetsEvaluationTimeoutMs;
private Map<String, List<FulltextIndex.Facet>> facets;
+ private final SearchRequest searchRequest;
+ private final CompletableFuture<Map<String, List<FulltextIndex.Facet>>>
searchFuture;
+
+ private long processedDocs = 0L;
+
+ private final long queryStartTimeNanos;
+
+ ElasticSecureFacetAsyncProvider(ElasticConnection connection,
ElasticIndexDefinition indexDefinition,
+ ElasticRequestHandler
elasticRequestHandler, ElasticResponseHandler elasticResponseHandler,
+ Predicate<String> isAccessible, long
facetsEvaluationTimeoutMs) {
- ElasticSecureFacetAsyncProvider(
- ElasticRequestHandler elasticRequestHandler,
- ElasticResponseHandler elasticResponseHandler,
- Predicate<String> isAccessible,
- long facetsEvaluationTimeoutMs) {
this.elasticResponseHandler = elasticResponseHandler;
+ this.resultSizeLimit = elasticRequestHandler.getResultSizeLimit();
this.isAccessible = isAccessible;
this.facetFields = elasticRequestHandler.facetFields().
map(ElasticIndexUtils::fieldName).
collect(Collectors.toUnmodifiableSet());
this.facetsEvaluationTimeoutMs = facetsEvaluationTimeoutMs;
+
+ // Base search request template (without search_after initially)
+ this.searchRequest = SearchRequest.of(srb ->
srb.index(indexDefinition.getIndexAlias())
+ .source(SourceConfig.of(scf -> scf.filter(ff ->
ff.includes(FieldNames.PATH).includes(new ArrayList<>(facetFields)))))
+ .query(Query.of(qb ->
qb.bool(elasticRequestHandler.baseQueryBuilder().build())))
+ .size(indexDefinition.secureFacetsDocsSize) // batch size for
each search_after iteration
+ .sort(s -> s.field(fs -> fs.field(FieldNames.PATH)))
+ );
+
+ this.queryStartTimeNanos = System.nanoTime();
+ LOG.trace("Kicking search query with search_after pagination {}",
searchRequest);
+
+ // Start the iterative search process
+ this.searchFuture = searchAllResultsIncremental(connection);
}
- @Override
- public Set<String> sourceFields() {
- return facetFields;
+ private CompletableFuture<Map<String, List<FulltextIndex.Facet>>>
searchAllResultsIncremental(ElasticConnection connection) {
+ // Initialize empty facet accumulator
+ Map<String, Map<String, Integer>> accumulatedFacets = new
ConcurrentHashMap<>();
+
+ return searchWithIncrementalFacetProcessing(connection, null,
accumulatedFacets)
+ .thenApplyAsync(this::buildFinalFacetResult);
}
- @Override
- public boolean isFullScan() {
- return true;
+ private CompletableFuture<Map<String, Map<String, Integer>>>
searchWithIncrementalFacetProcessing(
+ ElasticConnection connection, List<FieldValue> searchAfter,
Map<String, Map<String, Integer>> accumulatedFacets) {
+
+ // Build search request with search_after if provided
+ SearchRequest currentRequest = searchAfter == null ?
+ searchRequest :
+ SearchRequest.of(srb -> srb.index(searchRequest.index())
+ .source(searchRequest.source())
+ .query(searchRequest.query())
+ .size(searchRequest.size())
+ .sort(searchRequest.sort())
+ .searchAfter(searchAfter)
+ );
+
+ return connection.getAsyncClient()
+ .search(currentRequest, ObjectNode.class)
+ .thenComposeAsync(response -> {
+ List<Hit<ObjectNode>> hits = response.hits().hits();
+
+ // Process current page facets and merge with accumulated
facets
+ Map<String, Map<String, Integer>> pageFacets =
extractFacetsFromPage(hits);
+ mergeFacets(accumulatedFacets, pageFacets);
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processed {} documents in current batch,
accumulated facet values: {}",
+ hits.size(),
accumulatedFacets.values().stream().mapToInt(Map::size).sum());
+ }
+
+ // If we got fewer results than requested, we've reached
the end
+ if (hits.isEmpty() || hits.size() < searchRequest.size()) {
+ LOG.trace("Reached end of results, final facet
processing complete");
+ return
CompletableFuture.completedFuture(accumulatedFacets);
+ }
+
+ // Extract sort values from the last hit for next
search_after
+ Hit<ObjectNode> lastHit = hits.get(hits.size() - 1);
+ List<FieldValue> nextSearchAfter = lastHit.sort();
+
+ if (nextSearchAfter == null || nextSearchAfter.isEmpty()) {
+ LOG.warn("No sort values found for search_after,
stopping pagination");
+ return
CompletableFuture.completedFuture(accumulatedFacets);
+ }
+
+ processedDocs += hits.size();
+ if (processedDocs >= resultSizeLimit) {
+ LOG.warn("Result size limit of {} reached during facet
computation. " +
+ "Stopping further processing. Consider using
statistical or insecure facets", resultSizeLimit);
+ return
CompletableFuture.completedFuture(accumulatedFacets);
+ }
+
+ // if we get here, it means we have processed
MAX_PAGE_SIZE hits and there might be more
+ LOG.warn("Large result set detected ({} hits so far) for
search request {}. Consider using statistical or insecure facets for better
performance.",
+ hits.size(), searchRequest);
+
+ // Recursively continue with next batch
+ return searchWithIncrementalFacetProcessing(connection,
nextSearchAfter, accumulatedFacets);
+ })
+ .exceptionally(throwable -> {
+ LOG.error("Error during search pagination", throwable);
+ // Return accumulated facets even if there's an error
+ return accumulatedFacets;
+ });
}
- @Override
- public boolean on(Hit<ObjectNode> searchHit) {
- final String path = elasticResponseHandler.getPath(searchHit);
- if (path != null && isAccessible.test(path)) {
- ObjectNode source = searchHit.source();
- for (String field : facetFields) {
- JsonNode value;
- if (source != null) {
- value = source.get(field);
- if (value != null) {
- if (value.getNodeType() == JsonNodeType.ARRAY) {
- for (JsonNode item : value) {
- updateAccessibleFacets(field, item.asText());
+ private Map<String, Map<String, Integer>>
extractFacetsFromPage(List<Hit<ObjectNode>> hits) {
+ Map<String, Map<String, Integer>> pageFacets = new HashMap<>();
+
+ for (Hit<ObjectNode> hit : hits) {
+ ObjectNode document = hit.source();
+ if (document == null) continue;
+
+ // Apply accessibility check
+ String path = elasticResponseHandler.getPath(hit);
+ if (!isAccessible.test(path)) continue;
+
+ // Extract facet field values
+ for (String facetField : facetFields) {
+ JsonNode fieldValue = document.get(facetField);
+ if (fieldValue != null && !fieldValue.isNull()) {
+ pageFacets.compute(facetField, (key, valueMap) -> {
+ if (valueMap == null) {
+ valueMap = new HashMap<>();
+ }
+ if (fieldValue.isArray()) {
+ for (JsonNode val : fieldValue) {
+ if (val.isTextual()) {
+ valueMap.merge(val.asText(), 1,
Integer::sum);
+ }
}
- } else {
- updateAccessibleFacets(field, value.asText());
+ } else if (fieldValue.isTextual()) {
+ valueMap.merge(fieldValue.asText(), 1,
Integer::sum);
}
- }
+ return valueMap;
+ });
}
}
}
- return true;
+
+ return pageFacets;
}
- private void updateAccessibleFacets(String field, String value) {
- accessibleFacets.compute(field, (column, facetValues) -> {
- if (facetValues == null) {
- Map<String, MutableInt> values = new HashMap<>();
- values.put(value, new MutableInt(1));
- return values;
- } else {
- facetValues.compute(value, (k, v) -> {
- if (v == null) {
- return new MutableInt(1);
- } else {
- v.increment();
- return v;
- }
- });
- return facetValues;
+ private void mergeFacets(Map<String, Map<String, Integer>> accumulated,
Map<String, Map<String, Integer>> pageFacets) {
+ for (Map.Entry<String, Map<String, Integer>> entry :
pageFacets.entrySet()) {
+ Map<String, Integer> valueMap =
accumulated.computeIfAbsent(entry.getKey(), k -> new HashMap<>());
+ for (Map.Entry<String, Integer> valueEntry :
entry.getValue().entrySet()) {
+ valueMap.merge(valueEntry.getKey(), valueEntry.getValue(),
Integer::sum);
}
- });
+ }
}
- @Override
- public void endData() {
- // create Facet objects, order by count (desc) and then by label (asc)
- Comparator<FulltextIndex.Facet> comparator = Comparator
- .comparing(FulltextIndex.Facet::getCount).reversed()
- .thenComparing(FulltextIndex.Facet::getLabel);
- // create Facet objects, order by count (desc) and then by label (asc)
- facets = accessibleFacets.entrySet()
- .stream()
- .collect(Collectors.toMap
- (Map.Entry::getKey, x -> x.getValue().entrySet()
- .stream()
- .map(e -> new FulltextIndex.Facet(e.getKey(),
e.getValue().intValue()))
- .sorted(comparator)
- .collect(Collectors.toList())
- )
- );
- LOG.trace("End data {}", facets);
- latch.countDown();
+ private Map<String, List<FulltextIndex.Facet>>
buildFinalFacetResult(Map<String, Map<String, Integer>> accumulatedFacets) {
+ long elapsedMs = (System.nanoTime() - queryStartTimeNanos) / 1_000_000;
+ LOG.debug("Facet computation completed in {}ms", elapsedMs);
+
+ // Convert accumulated facet values to final result format
+ Map<String, List<FulltextIndex.Facet>> result = new HashMap<>();
+
+ for (Map.Entry<String, Map<String, Integer>> entry :
accumulatedFacets.entrySet()) {
+ String fieldName = entry.getKey();
+ List<FulltextIndex.Facet> facets =
entry.getValue().entrySet().stream()
+ .map(e -> new FulltextIndex.Facet(e.getKey(),
e.getValue()))
+ .collect(Collectors.toList());
+ result.put(fieldName, facets);
+ }
+
+ return result;
}
@Override
public List<FulltextIndex.Facet> getFacets(int numberOfFacets, String
columnName) {
- LOG.trace("Requested facets for {} - Latch count: {}", columnName,
latch.getCount());
- try {
- boolean completed = latch.await(facetsEvaluationTimeoutMs,
TimeUnit.MILLISECONDS);
- if (!completed) {
- throw new IllegalStateException("Timed out while waiting for
facets");
+ // TODO: In case of failure, we log an exception and return null. This
is likely not the ideal behavior, as the
+ // caller has no way to distinguish between a failure and empty
results. But in this PR I'm leaving this
+ // behavior as is to not introduce further changes. We should revise
this behavior once the queries for facets
+ // are decoupled from the query for results, as this will make it
easier to better handle errors
+ if (!searchFuture.isDone()) {
+ try {
+ LOG.trace("Requested facets for {}. Waiting up to: {}",
columnName, facetsEvaluationTimeoutMs);
+ long start = System.nanoTime();
+ facets = searchFuture.get(facetsEvaluationTimeoutMs,
TimeUnit.MILLISECONDS);
+ LOG.trace("Facets computed in {}.",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
+ } catch (ExecutionException e) {
+ LOG.error("Error evaluating facets. Ignoring. Search request:
{}", searchRequest, e);
+ } catch (TimeoutException e) {
+ searchFuture.cancel(true);
+ LOG.error("Timed out while waiting for facets. Search request:
{}", searchRequest, e);
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted while waiting for facets. Search
request: {}", searchRequest, e);
+ Thread.currentThread().interrupt(); // restore interrupt
status
+ throw new IllegalStateException("Error while waiting for
facets", e);
}
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt(); // restore interrupt status
- throw new IllegalStateException("Error while waiting for facets",
e);
}
LOG.trace("Reading facets for {} from {}", columnName, facets);
String field =
ElasticIndexUtils.fieldName(FulltextIndex.parseFacetField(columnName));