This is an automated email from the ASF dual-hosted git repository.
fortino pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/trunk by this push:
new 31a3341ac3 OAK-10487: replaced lsm estimator with cacheable count
(#1149)
31a3341ac3 is described below
commit 31a3341ac3cb6081a98e990b8323f5522c88c303
Author: Fabrizio Fortino <[email protected]>
AuthorDate: Wed Oct 18 14:54:36 2023 +0200
OAK-10487: replaced lsm estimator with cacheable count (#1149)
* OAK-10487: replaced lsm estimator with cacheable count
* OAK-10487: do not return null when stats are empty + minor improvements
* OAK-10487: fix StatsRequestDescriptor equality, add tests
* OAK-10487: (minor) fixed indentation
* OAK-10487: include query string into hashcode()
* OAK-10487: (minor) introduce private internalQuery() to avoid duplications
---
.../index/elastic/ElasticIndexStatistics.java | 44 ++++++++++++++++------
.../plugins/index/elastic/query/ElasticIndex.java | 16 +++-----
.../index/elastic/query/ElasticRequestHandler.java | 2 +-
.../query/async/ElasticResultRowAsyncIterator.java | 6 +--
.../index/elastic/ElasticIndexStatisticsTest.java | 16 ++++++++
5 files changed, 56 insertions(+), 28 deletions(-)
diff --git
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexStatistics.java
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexStatistics.java
index 419a59e46b..07fdc54c70 100644
---
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexStatistics.java
+++
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexStatistics.java
@@ -24,6 +24,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import org.apache.jackrabbit.oak.plugins.index.search.IndexStatistics;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -107,7 +108,17 @@ public class ElasticIndexStatistics implements
IndexStatistics {
@Override
public int getDocCountFor(String field) {
return countCache.getUnchecked(
- new StatsRequestDescriptor(elasticConnection,
indexDefinition.getIndexAlias(), field)
+ new StatsRequestDescriptor(elasticConnection,
indexDefinition.getIndexAlias(), field, null)
+ );
+ }
+
+ /**
+ * Returns the approximate number of documents for the {@code query} in
the remote index bound to the
+ * {@code ElasticIndexDefinition}.
+ */
+ public int getDocCountFor(Query query) {
+ return countCache.getUnchecked(
+ new StatsRequestDescriptor(elasticConnection,
indexDefinition.getIndexAlias(), null, query)
);
}
@@ -180,12 +191,12 @@ public class ElasticIndexStatistics implements
IndexStatistics {
static class CountCacheLoader extends CacheLoader<StatsRequestDescriptor,
Integer> {
@Override
- public Integer load(@NotNull StatsRequestDescriptor
countRequestDescriptor) throws IOException {
+ public @NotNull Integer load(@NotNull StatsRequestDescriptor
countRequestDescriptor) throws IOException {
return count(countRequestDescriptor);
}
@Override
- public ListenableFuture<Integer> reload(@NotNull
StatsRequestDescriptor crd, @NotNull Integer oldValue) {
+ public @NotNull ListenableFuture<Integer> reload(@NotNull
StatsRequestDescriptor crd, @NotNull Integer oldValue) {
ListenableFutureTask<Integer> task =
ListenableFutureTask.create(() -> count(crd));
REFRESH_EXECUTOR.execute(task);
return task;
@@ -194,7 +205,9 @@ public class ElasticIndexStatistics implements
IndexStatistics {
private int count(StatsRequestDescriptor crd) throws IOException {
CountRequest.Builder cBuilder = new CountRequest.Builder();
cBuilder.index(crd.index);
- if (crd.field != null) {
+ if (crd.query != null) {
+ cBuilder.query(crd.query);
+ } else if (crd.field != null) {
cBuilder.query(q -> q.exists(e -> e.field(crd.field)));
} else {
cBuilder.query(q -> q.matchAll(m -> m));
@@ -206,12 +219,12 @@ public class ElasticIndexStatistics implements
IndexStatistics {
static class StatsCacheLoader extends CacheLoader<StatsRequestDescriptor,
StatsResponse> {
@Override
- public StatsResponse load(@NotNull StatsRequestDescriptor
countRequestDescriptor) throws IOException {
+ public @NotNull StatsResponse load(@NotNull StatsRequestDescriptor
countRequestDescriptor) throws IOException {
return stats(countRequestDescriptor);
}
@Override
- public ListenableFuture<StatsResponse> reload(@NotNull
StatsRequestDescriptor crd, @NotNull StatsResponse oldValue) {
+ public @NotNull ListenableFuture<StatsResponse> reload(@NotNull
StatsRequestDescriptor crd, @NotNull StatsResponse oldValue) {
ListenableFutureTask<StatsResponse> task =
ListenableFutureTask.create(() -> stats(crd));
REFRESH_EXECUTOR.execute(task);
return task;
@@ -223,7 +236,7 @@ public class ElasticIndexStatistics implements
IndexStatistics {
.bytes(Bytes.Bytes))
.valueBody();
if (records.isEmpty()) {
- return null;
+ throw new IllegalStateException("Cannot retrieve stats for
index " + crd.index + " as it does not exist");
}
// Assuming a single index matches crd.index
IndicesRecord record = records.get(0);
@@ -251,17 +264,20 @@ public class ElasticIndexStatistics implements
IndexStatistics {
final String index;
@Nullable
final String field;
+ @Nullable
+ final Query query;
StatsRequestDescriptor(@NotNull ElasticConnection connection,
@NotNull String index) {
- this(connection, index, null);
+ this(connection, index, null, null);
}
StatsRequestDescriptor(@NotNull ElasticConnection connection,
- @NotNull String index, @Nullable String field) {
+ @NotNull String index, @Nullable String field,
@Nullable Query query) {
this.connection = connection;
this.index = index;
this.field = field;
+ this.query = query;
}
@Override
@@ -270,12 +286,18 @@ public class ElasticIndexStatistics implements
IndexStatistics {
if (o == null || getClass() != o.getClass()) return false;
StatsRequestDescriptor that = (StatsRequestDescriptor) o;
return index.equals(that.index) &&
- Objects.equals(field, that.field);
+ Objects.equals(field, that.field) &&
+ // ES Query objects are not comparable, so we need to
compare their string representations
+ Objects.equals(internalQuery(), that.internalQuery());
}
@Override
public int hashCode() {
- return Objects.hash(index, field);
+ return Objects.hash(index, field, internalQuery());
+ }
+
+ private String internalQuery() {
+ return query != null ? query.toString() : null;
}
}
diff --git
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticIndex.java
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticIndex.java
index 38b1715c0c..5e517214b4 100644
---
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticIndex.java
+++
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticIndex.java
@@ -19,21 +19,19 @@ package
org.apache.jackrabbit.oak.plugins.index.elastic.query;
import co.elastic.clients.json.JsonpUtils;
import org.apache.jackrabbit.oak.commons.PathUtils;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexNode;
+import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexStatistics;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexTracker;
import
org.apache.jackrabbit.oak.plugins.index.elastic.query.async.ElasticResultRowAsyncIterator;
import org.apache.jackrabbit.oak.plugins.index.search.IndexNode;
import org.apache.jackrabbit.oak.plugins.index.search.SizeEstimator;
import org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndex;
import
org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndexPlanner;
-import org.apache.jackrabbit.oak.plugins.index.search.util.LMSEstimator;
import org.apache.jackrabbit.oak.spi.query.Cursor;
import org.apache.jackrabbit.oak.spi.query.Filter;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
-import java.util.WeakHashMap;
import java.util.function.BiFunction;
import java.util.function.Predicate;
@@ -43,7 +41,6 @@ import static
org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefini
class ElasticIndex extends FulltextIndex {
private static final Predicate<NodeState>
ELASTICSEARCH_INDEX_DEFINITION_PREDICATE =
state ->
TYPE_ELASTICSEARCH.equals(state.getString(TYPE_PROPERTY_NAME));
- private static final Map<String, LMSEstimator> ESTIMATORS = new
WeakHashMap<>();
// no concept of rewound in ES (even if it might be doing it internally,
we can't do much about it
private static final IteratorRewoundStateProvider
REWOUND_STATE_PROVIDER_NOOP = () -> 0;
@@ -66,7 +63,10 @@ class ElasticIndex extends FulltextIndex {
@Override
protected SizeEstimator getSizeEstimator(IndexPlan plan) {
- return () ->
getEstimator(plan.getPlanName()).estimate(plan.getFilter());
+ return () -> {
+ ElasticIndexStatistics indexStatistics =
acquireIndexNode(plan).getIndexStatistics();
+ return indexStatistics.getDocCountFor(new
ElasticRequestHandler(plan, getPlanResult(plan), null).baseQuery());
+ };
}
@Override
@@ -122,7 +122,6 @@ class ElasticIndex extends FulltextIndex {
responseHandler,
plan,
partialShouldInclude.apply(getPathRestriction(plan),
filter.getPathRestriction()),
- getEstimator(plan.getPlanName()),
elasticIndexTracker.getElasticMetricHandler()
);
@@ -133,11 +132,6 @@ class ElasticIndex extends FulltextIndex {
return new FulltextPathCursor(itr, REWOUND_STATE_PROVIDER_NOOP, plan,
filter.getQueryLimits(), getSizeEstimator(plan));
}
- private LMSEstimator getEstimator(String path) {
- ESTIMATORS.putIfAbsent(path, new LMSEstimator());
- return ESTIMATORS.get(path);
- }
-
private static boolean shouldInclude(String path, Filter.PathRestriction
pathRestriction, String docPath) {
boolean include = true;
switch (pathRestriction) {
diff --git
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticRequestHandler.java
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticRequestHandler.java
index 9c6c777345..3ec9e51a92 100644
---
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticRequestHandler.java
+++
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticRequestHandler.java
@@ -730,7 +730,7 @@ public class ElasticRequestHandler {
int length = QueryConstants.REP_EXCERPT.length();
if (name.length() > length) {
String field = name.substring(length + 1, name.length() - 1);
- if (field.length() > 0 && !field.equals(".")) {
+ if (!field.isEmpty() && !field.equals(".")) {
return field;
}
}
diff --git
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java
index 39c4f55de5..3c50e44dda 100644
---
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java
+++
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java
@@ -32,7 +32,6 @@ import
org.apache.jackrabbit.oak.plugins.index.elastic.query.ElasticRequestHandl
import
org.apache.jackrabbit.oak.plugins.index.elastic.query.ElasticResponseHandler;
import
org.apache.jackrabbit.oak.plugins.index.elastic.query.async.facets.ElasticFacetProvider;
import
org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndex.FulltextResultRow;
-import org.apache.jackrabbit.oak.plugins.index.search.util.LMSEstimator;
import org.apache.jackrabbit.oak.spi.query.QueryIndex;
import org.apache.jackrabbit.oak.spi.query.QueryIndex.IndexPlan;
import org.jetbrains.annotations.NotNull;
@@ -71,7 +70,6 @@ public class ElasticResultRowAsyncIterator implements
Iterator<FulltextResultRow
private final IndexPlan indexPlan;
private final Predicate<String> rowInclusionPredicate;
private final ElasticMetricHandler metricHandler;
- private final LMSEstimator estimator;
private final ElasticQueryScanner elasticQueryScanner;
private final ElasticRequestHandler elasticRequestHandler;
private final ElasticResponseHandler elasticResponseHandler;
@@ -85,13 +83,12 @@ public class ElasticResultRowAsyncIterator implements
Iterator<FulltextResultRow
@NotNull ElasticResponseHandler
elasticResponseHandler,
@NotNull QueryIndex.IndexPlan
indexPlan,
Predicate<String>
rowInclusionPredicate,
- LMSEstimator estimator,
ElasticMetricHandler metricHandler) {
+ ElasticMetricHandler metricHandler) {
this.indexNode = indexNode;
this.elasticRequestHandler = elasticRequestHandler;
this.elasticResponseHandler = elasticResponseHandler;
this.indexPlan = indexPlan;
this.rowInclusionPredicate = rowInclusionPredicate;
- this.estimator = estimator;
this.metricHandler = metricHandler;
this.elasticFacetProvider =
elasticRequestHandler.getAsyncFacetProvider(elasticResponseHandler);
this.elasticQueryScanner = initScanner();
@@ -293,7 +290,6 @@ public class ElasticResultRowAsyncIterator implements
Iterator<FulltextResultRow
} else {
anyDataLeft.set(true);
}
- estimator.update(indexPlan.getFilter(), totalHits);
// now that we got the last hit we can release the semaphore
to potentially unlock other requests
semaphore.release();
diff --git
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexStatisticsTest.java
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexStatisticsTest.java
index 207f4e54b9..b6af378467 100644
---
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexStatisticsTest.java
+++
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexStatisticsTest.java
@@ -17,6 +17,7 @@
package org.apache.jackrabbit.oak.plugins.index.elastic;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch.core.CountRequest;
import co.elastic.clients.elasticsearch.core.CountResponse;
import org.apache.jackrabbit.guava.common.base.Ticker;
@@ -117,6 +118,21 @@ public class ElasticIndexStatisticsTest {
// cache miss, read data from elastic
assertEquals(5000, indexStatistics.numDocs());
verify(elasticClientMock, times(3)).count(any(CountRequest.class));
+
+ // move cache time ahead of 30 minutes, cache value expired
+ ticker.tick(Duration.ofMinutes(30));
+
+ // cache miss, read data using an elastic query
+ assertEquals(5000, indexStatistics.getDocCountFor(Query.of(qf ->
qf.matchAll(mf -> mf))));
+ verify(elasticClientMock, times(4)).count(any(CountRequest.class));
+
+ // call again with the same query but a different instance
+ assertEquals(5000, indexStatistics.getDocCountFor(Query.of(qf ->
qf.matchAll(mf -> mf))));
+ verifyNoMoreInteractions(elasticClientMock);
+
+ // call again with a different query
+ assertEquals(5000, indexStatistics.getDocCountFor(Query.of(qf ->
qf.matchAll(mf -> mf.boost(100F)))));
+ verify(elasticClientMock, times(5)).count(any(CountRequest.class));
}
private static class MutableTicker extends Ticker {