Author: fortino
Date: Fri Jun 12 05:39:42 2020
New Revision: 1878762
URL: http://svn.apache.org/viewvc?rev=1878762&view=rev
Log:
OAK-9091: introduced async iterator
Added:
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticRequestHandler.java
(with props)
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResponseHandler.java
(with props)
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResponseListener.java
(with props)
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java
(with props)
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticFacetProvider.java
(with props)
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticInsecureFacetAsyncProvider.java
(with props)
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticSecureFacetAsyncProvider.java
(with props)
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticStatisticalFacetAsyncProvider.java
(with props)
Modified:
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticDocumentMaker.java
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticIndex.java
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticResultRowIterator.java
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticFacetTest.java
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticFullTextAsyncTest.java
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticTestUtils.java
jackrabbit/oak/trunk/oak-search-elastic/src/test/resources/logback-test.xml
jackrabbit/oak/trunk/oak-search/src/main/java/org/apache/jackrabbit/oak/plugins/index/search/spi/query/FulltextIndex.java
Modified:
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticDocumentMaker.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticDocumentMaker.java?rev=1878762&r1=1878761&r2=1878762&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticDocumentMaker.java
(original)
+++
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticDocumentMaker.java
Fri Jun 12 05:39:42 2020
@@ -93,7 +93,8 @@ class ElasticDocumentMaker extends Fullt
@Override
protected void indexAnalyzedProperty(ElasticDocument doc, String pname,
String value, PropertyDefinition pd) {
- doc.addProperty(FieldNames.createAnalyzedFieldName(pname), value);
+ // no need to do anything here. Analyzed properties are persisted in
Elastic
+ // using multi fields. The analyzed properties are set calling
#indexTypedProperty
}
@Override
Modified:
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticIndex.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticIndex.java?rev=1878762&r1=1878761&r2=1878762&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticIndex.java
(original)
+++
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticIndex.java
Fri Jun 12 05:39:42 2020
@@ -16,7 +16,9 @@
*/
package org.apache.jackrabbit.oak.plugins.index.elastic.query;
+import org.apache.jackrabbit.oak.commons.PathUtils;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
+import
org.apache.jackrabbit.oak.plugins.index.elastic.query.async.ElasticRequestHandler;
import org.apache.jackrabbit.oak.plugins.index.search.IndexNode;
import org.apache.jackrabbit.oak.plugins.index.search.SizeEstimator;
import org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndex;
@@ -24,7 +26,6 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.index.search.util.LMSEstimator;
import org.apache.jackrabbit.oak.spi.query.Cursor;
import org.apache.jackrabbit.oak.spi.query.Filter;
-import org.apache.jackrabbit.oak.spi.query.QueryLimits;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.elasticsearch.common.Strings;
import org.jetbrains.annotations.NotNull;
@@ -40,7 +41,7 @@ import static org.apache.jackrabbit.oak.
class ElasticIndex extends FulltextIndex {
private static final Predicate<NodeState>
ELASTICSEARCH_INDEX_DEFINITION_PREDICATE =
state ->
TYPE_ELASTICSEARCH.equals(state.getString(TYPE_PROPERTY_NAME));
- private static final Map<String, LMSEstimator> estimators = new
WeakHashMap<>();
+ private static final Map<String, LMSEstimator> ESTIMATORS = new
WeakHashMap<>();
// higher than some threshold below which the query should rather be
answered by something else if possible
private static final double MIN_COST = 100.1;
@@ -90,9 +91,7 @@ class ElasticIndex extends FulltextIndex
@Override
protected String getFulltextRequestString(IndexPlan plan, IndexNode
indexNode) {
- return Strings.toString(new ElasticResultRowIterator(plan.getFilter(),
getPlanResult(plan), plan,
- acquireIndexNode(plan), FulltextIndex::shouldInclude,
getEstimator(plan.getPlanName()))
- .getElasticQuery(plan, getPlanResult(plan)));
+ return Strings.toString(new ElasticRequestHandler(plan,
getPlanResult(plan)).build());
}
@Override
@@ -102,11 +101,23 @@ class ElasticIndex extends FulltextIndex
// TODO: sorting
final FulltextIndexPlanner.PlanResult pr = getPlanResult(plan);
- QueryLimits settings = filter.getQueryLimits();
+
+ // this function is called for each extracted row. Passing
FulltextIndex::shouldInclude means that for each
+ // row we evaluate getPathRestriction(plan) &
plan.getFilter().getPathRestriction(). Providing a partial
+ // function (https://en.wikipedia.org/wiki/Partial_function) we can
evaluate them once and still use a predicate as before
+// BiFunction<String, Filter.PathRestriction, Predicate<String>>
partialShouldInclude = (path, pathRestriction) -> docPath ->
+// shouldInclude(path, pathRestriction, docPath);
+//
+// Iterator<FulltextResultRow> itr = new ElasticResultRowAsyncIterator(
+// acquireIndexNode(plan),
+// plan,
+// pr,
+// partialShouldInclude.apply(getPathRestriction(plan),
plan.getFilter().getPathRestriction()),
+// getEstimator(plan.getPlanName())
+// );
Iterator<FulltextResultRow> itr = new ElasticResultRowIterator(filter,
pr, plan,
acquireIndexNode(plan), FulltextIndex::shouldInclude,
getEstimator(plan.getPlanName()));
- SizeEstimator sizeEstimator = getSizeEstimator(plan);
/*
TODO: sync (nrt too??)
@@ -117,12 +128,29 @@ class ElasticIndex extends FulltextIndex
// no concept of rewound in ES (even if it might be doing it
internally, we can't do much about it
IteratorRewoundStateProvider rewoundStateProvider = () -> 0;
- return new FulltextPathCursor(itr, rewoundStateProvider, plan,
settings, sizeEstimator);
+ return new FulltextPathCursor(itr, rewoundStateProvider, plan,
filter.getQueryLimits(), getSizeEstimator(plan));
}
private LMSEstimator getEstimator(String path) {
- estimators.putIfAbsent(path, new LMSEstimator());
- return estimators.get(path);
+ ESTIMATORS.putIfAbsent(path, new LMSEstimator());
+ return ESTIMATORS.get(path);
+ }
+
+ private static boolean shouldInclude(String path, Filter.PathRestriction
pathRestriction, String docPath) {
+ boolean include = true;
+ switch (pathRestriction) {
+ case EXACT:
+ include = path.equals(docPath);
+ break;
+ case DIRECT_CHILDREN:
+ include = PathUtils.getParentPath(docPath).equals(path);
+ break;
+ case ALL_CHILDREN:
+ include = PathUtils.isAncestor(path, docPath);
+ break;
+ }
+
+ return include;
}
@Override
Modified:
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticResultRowIterator.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticResultRowIterator.java?rev=1878762&r1=1878761&r2=1878762&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticResultRowIterator.java
(original)
+++
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticResultRowIterator.java
Fri Jun 12 05:39:42 2020
@@ -20,8 +20,8 @@ import org.apache.jackrabbit.oak.api.Typ
import org.apache.jackrabbit.oak.commons.PathUtils;
import org.apache.jackrabbit.oak.commons.PerfLogger;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
-import
org.apache.jackrabbit.oak.plugins.index.elastic.query.facets.ElasticFacetHelper;
import
org.apache.jackrabbit.oak.plugins.index.elastic.query.facets.ElasticAggregationData;
+import
org.apache.jackrabbit.oak.plugins.index.elastic.query.facets.ElasticFacetHelper;
import
org.apache.jackrabbit.oak.plugins.index.elastic.query.facets.ElasticFacets;
import
org.apache.jackrabbit.oak.plugins.index.elastic.util.ElasticAggregationBuilderUtil;
import org.apache.jackrabbit.oak.plugins.index.elastic.util.ElasticConstants;
@@ -29,10 +29,12 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
import org.apache.jackrabbit.oak.plugins.index.search.PropertyDefinition;
import org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndex;
+import
org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndexPlanner;
import
org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndexPlanner.PlanResult;
import org.apache.jackrabbit.oak.plugins.index.search.util.LMSEstimator;
import org.apache.jackrabbit.oak.spi.query.Filter;
import org.apache.jackrabbit.oak.spi.query.QueryConstants;
+import org.apache.jackrabbit.oak.spi.query.QueryIndex;
import org.apache.jackrabbit.oak.spi.query.QueryIndex.IndexPlan;
import org.apache.jackrabbit.oak.spi.query.fulltext.FullTextAnd;
import org.apache.jackrabbit.oak.spi.query.fulltext.FullTextContains;
@@ -80,7 +82,6 @@ import static org.apache.jackrabbit.oak.
import static
org.apache.jackrabbit.oak.plugins.index.elastic.util.TermQueryBuilderFactory.newPropertyRestrictionQuery;
import static
org.apache.jackrabbit.oak.plugins.index.elastic.util.TermQueryBuilderFactory.newWildcardPathQuery;
import static
org.apache.jackrabbit.oak.plugins.index.elastic.util.TermQueryBuilderFactory.newWildcardQuery;
-import static
org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndex.isNodePath;
import static org.apache.jackrabbit.oak.spi.query.QueryConstants.JCR_PATH;
import static org.apache.jackrabbit.util.ISO8601.parse;
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
@@ -113,8 +114,8 @@ class ElasticResultRowIterator implement
private final LMSEstimator estimator;
ElasticResultRowIterator(@NotNull Filter filter,
- @NotNull PlanResult planResult,
- @NotNull IndexPlan plan,
+ @NotNull FulltextIndexPlanner.PlanResult
planResult,
+ @NotNull QueryIndex.IndexPlan plan,
ElasticIndexNode indexNode,
RowInclusionPredicate rowInclusionPredicate,
LMSEstimator estimator) {
@@ -261,9 +262,9 @@ class ElasticResultRowIterator implement
}
public interface RowInclusionPredicate {
- boolean shouldInclude(@NotNull String path, @NotNull IndexPlan plan);
+ boolean shouldInclude(@NotNull String path, @NotNull
QueryIndex.IndexPlan plan);
- RowInclusionPredicate NOOP = (@NotNull String path, @NotNull IndexPlan
plan) -> true;
+ RowInclusionPredicate NOOP = (@NotNull String path, @NotNull
QueryIndex.IndexPlan plan) -> true;
}
/**
@@ -426,19 +427,8 @@ class ElasticResultRowIterator implement
return FieldNames.FULLTEXT;
}
- if (isNodePath(p)) {
- if (pr.isPathTransformed()) {
- p = PathUtils.getName(p);
- } else {
- //Get rid of /* as aggregated fulltext field name is the
- //node relative path
- p =
FieldNames.createFulltextFieldName(PathUtils.getParentPath(p));
- }
- } else {
- if (pr.isPathTransformed()) {
- p = PathUtils.getName(p);
- }
- p = FieldNames.createAnalyzedFieldName(p);
+ if (pr.isPathTransformed()) {
+ p = PathUtils.getName(p);
}
if ("*".equals(p)) {
@@ -502,7 +492,7 @@ class ElasticResultRowIterator implement
}
private void addNonFullTextConstraints(List<QueryBuilder> qs,
- IndexPlan plan, PlanResult
planResult) {
+ IndexPlan plan, PlanResult
planResult) {
final BiPredicate<Iterable<String>, String> any = (iterable, value) ->
StreamSupport.stream(iterable.spliterator(),
false).anyMatch(value::equals);
Added:
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticRequestHandler.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticRequestHandler.java?rev=1878762&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticRequestHandler.java
(added)
+++
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticRequestHandler.java
Fri Jun 12 05:39:42 2020
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.elastic.query.async;
+
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.commons.PathUtils;
+import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
+import org.apache.jackrabbit.oak.plugins.index.search.FieldNames;
+import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
+import org.apache.jackrabbit.oak.plugins.index.search.PropertyDefinition;
+import org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndex;
+import
org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndexPlanner;
+import
org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndexPlanner.PlanResult;
+import org.apache.jackrabbit.oak.spi.query.Filter;
+import org.apache.jackrabbit.oak.spi.query.QueryConstants;
+import org.apache.jackrabbit.oak.spi.query.QueryIndex;
+import org.apache.jackrabbit.oak.spi.query.QueryIndex.IndexPlan;
+import org.apache.jackrabbit.oak.spi.query.fulltext.FullTextAnd;
+import org.apache.jackrabbit.oak.spi.query.fulltext.FullTextContains;
+import org.apache.jackrabbit.oak.spi.query.fulltext.FullTextExpression;
+import org.apache.jackrabbit.oak.spi.query.fulltext.FullTextOr;
+import org.apache.jackrabbit.oak.spi.query.fulltext.FullTextTerm;
+import org.apache.jackrabbit.oak.spi.query.fulltext.FullTextVisitor;
+import org.apache.lucene.search.WildcardQuery;
+import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.aggregations.AggregationBuilders;
+import
org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import javax.jcr.PropertyType;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiPredicate;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.jackrabbit.JcrConstants.JCR_MIXINTYPES;
+import static org.apache.jackrabbit.JcrConstants.JCR_PRIMARYTYPE;
+import static org.apache.jackrabbit.oak.commons.PathUtils.denotesRoot;
+import static org.apache.jackrabbit.oak.commons.PathUtils.getParentPath;
+import static
org.apache.jackrabbit.oak.plugins.index.elastic.util.TermQueryBuilderFactory.newAncestorQuery;
+import static
org.apache.jackrabbit.oak.plugins.index.elastic.util.TermQueryBuilderFactory.newDepthQuery;
+import static
org.apache.jackrabbit.oak.plugins.index.elastic.util.TermQueryBuilderFactory.newMixinTypeQuery;
+import static
org.apache.jackrabbit.oak.plugins.index.elastic.util.TermQueryBuilderFactory.newNodeTypeQuery;
+import static
org.apache.jackrabbit.oak.plugins.index.elastic.util.TermQueryBuilderFactory.newNotNullPropQuery;
+import static
org.apache.jackrabbit.oak.plugins.index.elastic.util.TermQueryBuilderFactory.newNullPropQuery;
+import static
org.apache.jackrabbit.oak.plugins.index.elastic.util.TermQueryBuilderFactory.newPathQuery;
+import static
org.apache.jackrabbit.oak.plugins.index.elastic.util.TermQueryBuilderFactory.newPrefixPathQuery;
+import static
org.apache.jackrabbit.oak.plugins.index.elastic.util.TermQueryBuilderFactory.newPrefixQuery;
+import static
org.apache.jackrabbit.oak.plugins.index.elastic.util.TermQueryBuilderFactory.newPropertyRestrictionQuery;
+import static
org.apache.jackrabbit.oak.plugins.index.elastic.util.TermQueryBuilderFactory.newWildcardPathQuery;
+import static
org.apache.jackrabbit.oak.plugins.index.elastic.util.TermQueryBuilderFactory.newWildcardQuery;
+import static org.apache.jackrabbit.oak.spi.query.QueryConstants.JCR_PATH;
+import static org.apache.jackrabbit.util.ISO8601.parse;
+import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
+import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
+import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
+import static org.elasticsearch.index.query.QueryBuilders.queryStringQuery;
+import static org.elasticsearch.index.query.QueryBuilders.termQuery;
+
+/**
+ * Class to map query plans into Elastic request objects.
+ */
+public class ElasticRequestHandler {
+
+ private final IndexPlan indexPlan;
+ private final PlanResult planResult;
+ private final ElasticIndexDefinition elasticIndexDefinition;
+
+ public ElasticRequestHandler(@NotNull QueryIndex.IndexPlan indexPlan,
@NotNull FulltextIndexPlanner.PlanResult planResult) {
+ this.indexPlan = indexPlan;
+ this.planResult = planResult;
+ this.elasticIndexDefinition = (ElasticIndexDefinition)
planResult.indexDefinition;
+ }
+
+ public QueryBuilder build() {
+ final BoolQueryBuilder boolQuery = boolQuery();
+
+ Filter filter = indexPlan.getFilter();
+ FullTextExpression ft = filter.getFullTextConstraint();
+
+ if (ft != null) {
+ boolQuery.must(fullTextQuery(ft, planResult));
+ }
+
+ // TODO: this code block should be removed? Or should we throw a
NotSupported exception in this case?
+ //Check if native function is supported
+ Filter.PropertyRestriction pr = null;
+ if (elasticIndexDefinition.hasFunctionDefined()) {
+ pr =
filter.getPropertyRestriction(elasticIndexDefinition.getFunctionName());
+ }
+
+ if (pr != null) {
+ String query =
String.valueOf(pr.first.getValue(pr.first.getType()));
+ // TODO: more like this
+
+ // TODO: spellcheck
+
+ // TODO: suggest
+
+ boolQuery.must(queryStringQuery(query));
+ } else if (planResult.evaluateNonFullTextConstraints()) {
+ for (QueryBuilder constraint: nonFullTextConstraints(indexPlan,
planResult)) {
+ boolQuery.filter(constraint);
+ }
+ }
+
+ // TODO: sort with no other restriction
+
+ if (!boolQuery.hasClauses()) {
+ // TODO: what happens here in planning mode (specially, apparently
for things like rep:similar)
+ //For purely nodeType based queries all the documents would have to
+ //be returned (if the index definition has a single rule)
+ if (planResult.evaluateNodeTypeRestriction()) {
+ boolQuery.must(matchAllQuery());
+ }
+ }
+
+ return boolQuery;
+ }
+
+ public boolean requiresFacets() {
+ return indexPlan.getFilter().getPropertyRestrictions()
+ .stream()
+ .anyMatch(pr ->
QueryConstants.REP_FACET.equals(pr.propertyName));
+ }
+
+ public Stream<TermsAggregationBuilder> aggregations() {
+ return facetFields()
+ .map(facetProp ->
+ AggregationBuilders.terms(facetProp)
+
.field(elasticIndexDefinition.getElasticKeyword(facetProp))
+
.size(elasticIndexDefinition.getNumberOfTopFacets())
+ );
+ }
+
+ public Stream<String> facetFields() {
+ return indexPlan.getFilter().getPropertyRestrictions()
+ .stream()
+ .filter(pr -> QueryConstants.REP_FACET.equals(pr.propertyName))
+ .map(pr ->
FulltextIndex.parseFacetField(pr.first.getValue(Type.STRING)));
+ }
+
+ private QueryBuilder fullTextQuery(FullTextExpression ft, final PlanResult
pr) {
+ // a reference to the query, so it can be set in the visitor
+ // (a "non-local return")
+ final AtomicReference<QueryBuilder> result = new AtomicReference<>();
+ ft.accept(new FullTextVisitor() {
+
+ @Override
+ public boolean visit(FullTextContains contains) {
+ visitTerm(contains.getPropertyName(), contains.getRawText(),
null, contains.isNot());
+ return true;
+ }
+
+ @Override
+ public boolean visit(FullTextOr or) {
+ BoolQueryBuilder q = boolQuery();
+ for (FullTextExpression e : or.list) {
+ q.should(fullTextQuery(e, pr));
+ }
+ result.set(q);
+ return true;
+ }
+
+ @Override
+ public boolean visit(FullTextAnd and) {
+ BoolQueryBuilder q = boolQuery();
+ for (FullTextExpression e : and.list) {
+ QueryBuilder x = fullTextQuery(e, pr);
+ // TODO: see OAK-2434 and see if ES also can't work
without unwrapping
+ /* Only unwrap the clause if MUST_NOT(x) */
+ boolean hasMustNot = false;
+ if (x instanceof BoolQueryBuilder) {
+ BoolQueryBuilder bq = (BoolQueryBuilder) x;
+ if (bq.mustNot().size() == 1
+ // no other clauses
+ && bq.should().isEmpty() &&
bq.must().isEmpty() && bq.filter().isEmpty()) {
+ hasMustNot = true;
+ q.mustNot(bq.mustNot().get(0));
+ }
+ }
+
+ if (!hasMustNot) {
+ q.must(x);
+ }
+ }
+ result.set(q);
+ return true;
+ }
+
+ @Override
+ public boolean visit(FullTextTerm term) {
+ return visitTerm(term.getPropertyName(), term.getText(),
term.getBoost(), term.isNot());
+ }
+
+ private boolean visitTerm(String propertyName, String text, String
boost, boolean not) {
+ String p = getLuceneFieldName(propertyName, pr);
+ QueryBuilder q = tokenToQuery(text, p, pr);
+ if (boost != null) {
+ q.boost(Float.parseFloat(boost));
+ }
+ if (not) {
+ BoolQueryBuilder bq = boolQuery().mustNot(q);
+ result.set(bq);
+ } else {
+ result.set(q);
+ }
+ return true;
+ }
+ });
+ return result.get();
+ }
+
+ private List<QueryBuilder> nonFullTextConstraints(IndexPlan plan,
PlanResult planResult) {
+ final BiPredicate<Iterable<String>, String> any = (iterable, value) ->
+ StreamSupport.stream(iterable.spliterator(),
false).anyMatch(value::equals);
+
+ final List<QueryBuilder> queries = new ArrayList<>();
+
+ Filter filter = plan.getFilter();
+ if (!filter.matchesAllTypes()) {
+ queries.add(nodeTypeConstraints(planResult.indexingRule, filter));
+ }
+
+ String path = FulltextIndex.getPathRestriction(plan);
+ switch (filter.getPathRestriction()) {
+ case ALL_CHILDREN:
+ if (!"/".equals(path)) {
+ queries.add(newAncestorQuery(path));
+ }
+ break;
+ case DIRECT_CHILDREN:
+ BoolQueryBuilder bq = boolQuery()
+ .must(newAncestorQuery(path))
+ .must(newDepthQuery(path, planResult));
+ queries.add(bq);
+ break;
+ case EXACT:
+ // For transformed paths, we can only add path restriction if
absolute path to property can be
+ // deduced
+ if (planResult.isPathTransformed()) {
+ String parentPathSegment =
planResult.getParentPathSegment();
+ if (!any.test(PathUtils.elements(parentPathSegment), "*"))
{
+ queries.add(newPathQuery(path + parentPathSegment));
+ }
+ } else {
+ queries.add(newPathQuery(path));
+ }
+ break;
+ case PARENT:
+ if (denotesRoot(path)) {
+ // there's no parent of the root node
+ // we add a path that can not possibly occur because there
+ // is no way to say "match no documents" in Lucene
+ queries.add(newPathQuery("///"));
+ } else {
+ // For transformed paths, we can only add path restriction
if absolute path to property can be
+ // deduced
+ if (planResult.isPathTransformed()) {
+ String parentPathSegment =
planResult.getParentPathSegment();
+ if (!any.test(PathUtils.elements(parentPathSegment),
"*")) {
+ queries.add(newPathQuery(getParentPath(path) +
parentPathSegment));
+ }
+ } else {
+ queries.add(newPathQuery(getParentPath(path)));
+ }
+ }
+ break;
+ case NO_RESTRICTION:
+ break;
+ }
+
+ for (Filter.PropertyRestriction pr : filter.getPropertyRestrictions())
{
+ String name = pr.propertyName;
+
+ if (QueryConstants.REP_EXCERPT.equals(name) ||
QueryConstants.OAK_SCORE_EXPLANATION.equals(name)
+ || QueryConstants.REP_FACET.equals(name)) {
+ continue;
+ }
+
+ if (QueryConstants.RESTRICTION_LOCAL_NAME.equals(name)) {
+ if (planResult.evaluateNodeNameRestriction()) {
+ QueryBuilder q = nodeName(pr);
+ if (q != null) {
+ queries.add(q);
+ }
+ }
+ continue;
+ }
+
+ if (pr.first != null && pr.first.equals(pr.last) &&
pr.firstIncluding && pr.lastIncluding) {
+ String first = pr.first.getValue(Type.STRING);
+ first = first.replace("\\", "");
+ if (JCR_PATH.equals(name)) {
+ queries.add(newPathQuery(first));
+ continue;
+ } else if ("*".equals(name)) {
+ //TODO Revisit reference constraint. For performant impl
+ //references need to be indexed in a different manner
+ queries.add(referenceConstraint(first));
+ continue;
+ }
+ }
+
+ PropertyDefinition pd = planResult.getPropDefn(pr);
+ if (pd == null) {
+ continue;
+ }
+
+ QueryBuilder q = createQuery(planResult.getPropertyName(pr), pr,
pd);
+ if (q != null) {
+ queries.add(q);
+ }
+ }
+ return queries;
+ }
+
+ private static QueryBuilder
nodeTypeConstraints(IndexDefinition.IndexingRule defn, Filter filter) {
+ final BoolQueryBuilder bq = boolQuery();
+ PropertyDefinition primaryType = defn.getConfig(JCR_PRIMARYTYPE);
+ //TODO OAK-2198 Add proper nodeType query support
+
+ if (primaryType != null && primaryType.propertyIndex) {
+ for (String type : filter.getPrimaryTypes()) {
+ bq.should(newNodeTypeQuery(type));
+ }
+ }
+
+ PropertyDefinition mixinType = defn.getConfig(JCR_MIXINTYPES);
+ if (mixinType != null && mixinType.propertyIndex) {
+ for (String type : filter.getMixinTypes()) {
+ bq.should(newMixinTypeQuery(type));
+ }
+ }
+
+ return bq;
+ }
+
+ private static QueryBuilder nodeName(Filter.PropertyRestriction pr) {
+ String first = pr.first != null ? pr.first.getValue(Type.STRING) :
null;
+ if (pr.first != null && pr.first.equals(pr.last) && pr.firstIncluding
+ && pr.lastIncluding) {
+ // [property]=[value]
+ return termQuery(FieldNames.NODE_NAME, first);
+ }
+
+ if (pr.isLike) {
+ return like(FieldNames.NODE_NAME, first);
+ }
+
+ throw new IllegalStateException("For nodeName queries only EQUALS and
LIKE are supported " + pr);
+ }
+
+ private static QueryBuilder like(String name, String first) {
+ first = first.replace('%', WildcardQuery.WILDCARD_STRING);
+ first = first.replace('_', WildcardQuery.WILDCARD_CHAR);
+
+ int indexOfWS = first.indexOf(WildcardQuery.WILDCARD_STRING);
+ int indexOfWC = first.indexOf(WildcardQuery.WILDCARD_CHAR);
+ int len = first.length();
+
+ if (indexOfWS == len || indexOfWC == len) {
+ // remove trailing "*" for prefix query
+ first = first.substring(0, first.length() - 1);
+ if (JCR_PATH.equals(name)) {
+ return newPrefixPathQuery(first);
+ } else {
+ return newPrefixQuery(name, first);
+ }
+ } else {
+ if (JCR_PATH.equals(name)) {
+ return newWildcardPathQuery(first);
+ } else {
+ return newWildcardQuery(name, first);
+ }
+ }
+ }
+
+ private static QueryBuilder referenceConstraint(String uuid) {
+ // TODO: this seems very bad as a query - do we really want to support
it. In fact, is it even used?
+ // reference query
+ return QueryBuilders.multiMatchQuery(uuid);
+ }
+
+ private static QueryBuilder tokenToQuery(String text, String fieldName,
PlanResult pr) {
+ QueryBuilder ret;
+ IndexDefinition.IndexingRule indexingRule = pr.indexingRule;
+ //Expand the query on fulltext field
+ if (FieldNames.FULLTEXT.equals(fieldName) &&
+ !indexingRule.getNodeScopeAnalyzedProps().isEmpty()) {
+ BoolQueryBuilder in = boolQuery();
+ for (PropertyDefinition pd :
indexingRule.getNodeScopeAnalyzedProps()) {
+ QueryBuilder q = matchQuery(pd.name, text).boost(pd.boost);
+ in.should(q);
+ }
+
+ //Add the query for actual fulltext field also. That query would
not be boosted
+ // TODO: do we need this if all the analyzed fields are queried?
+ ret = in.should(matchQuery(fieldName, text));
+ } else {
+ ret = matchQuery(fieldName, text);
+ }
+
+ return ret;
+ }
+
+ private QueryBuilder createQuery(String propertyName,
Filter.PropertyRestriction pr,
+ PropertyDefinition defn) {
+ int propType = FulltextIndex.determinePropertyType(defn, pr);
+
+ if (pr.isNullRestriction()) {
+ return newNullPropQuery(defn.name);
+ }
+
+ //If notNullCheckEnabled explicitly enabled use the simple TermQuery
+ //otherwise later fallback to range query
+ if (pr.isNotNullRestriction() && defn.notNullCheckEnabled) {
+ return newNotNullPropQuery(defn.name);
+ }
+
+ final String field =
elasticIndexDefinition.getElasticKeyword(propertyName);
+
+ QueryBuilder in;
+ switch (propType) {
+ case PropertyType.DATE: {
+ in = newPropertyRestrictionQuery(field, pr, value ->
parse(value.getValue(Type.DATE)).getTime());
+ break;
+ }
+ case PropertyType.DOUBLE: {
+ in = newPropertyRestrictionQuery(field, pr, value ->
value.getValue(Type.DOUBLE));
+ break;
+ }
+ case PropertyType.LONG: {
+ in = newPropertyRestrictionQuery(field, pr, value ->
value.getValue(Type.LONG));
+ break;
+ }
+ default: {
+ if (pr.isLike) {
+ return like(propertyName, pr.first.getValue(Type.STRING));
+ }
+
+ //TODO Confirm that all other types can be treated as string
+ in = newPropertyRestrictionQuery(field, pr, value ->
value.getValue(Type.STRING));
+ }
+ }
+
+ if (in != null) {
+ return in;
+ }
+
+ throw new IllegalStateException("PropertyRestriction not handled " +
pr + " for index " + defn);
+ }
+
+ private static String getLuceneFieldName(@Nullable String p, PlanResult
pr) {
+ if (p == null) {
+ return FieldNames.FULLTEXT;
+ }
+
+ if (pr.isPathTransformed()) {
+ p = PathUtils.getName(p);
+ }
+
+ if ("*".equals(p)) {
+ p = FieldNames.FULLTEXT;
+ }
+ return p;
+ }
+}
Propchange:
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticRequestHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResponseHandler.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResponseHandler.java?rev=1878762&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResponseHandler.java
(added)
+++
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResponseHandler.java
Fri Jun 12 05:39:42 2020
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.elastic.query.async;
+
+import org.apache.jackrabbit.oak.plugins.index.search.FieldNames;
+import
org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndexPlanner;
+import
org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndexPlanner.PlanResult;
+import org.elasticsearch.search.SearchHit;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Class to process Elastic response objects.
+ */
+public class ElasticResponseHandler {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ElasticResponseHandler.class);
+
+ private final PlanResult planResult;
+
+ ElasticResponseHandler(@NotNull FulltextIndexPlanner.PlanResult
planResult) {
+ this.planResult = planResult;
+ }
+
+ public String getPath(SearchHit hit) {
+ final Map<String, Object> sourceMap = hit.getSourceAsMap();
+ String path = (String) sourceMap.get(FieldNames.PATH);
+
+ if ("".equals(path)) {
+ path = "/";
+ }
+ if (planResult.isPathTransformed()) {
+ String originalPath = path;
+ path = planResult.transformPath(path);
+
+ if (path == null) {
+ LOG.trace("Ignoring path {} : Transformation returned null",
originalPath);
+ return null;
+ }
+ }
+
+ return path;
+ }
+}
Propchange:
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResponseHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResponseListener.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResponseListener.java?rev=1878762&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResponseListener.java
(added)
+++
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResponseListener.java
Fri Jun 12 05:39:42 2020
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.elastic.query.async;
+
+import org.apache.jackrabbit.oak.plugins.index.search.FieldNames;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.aggregations.Aggregations;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * Generic listener of Elastic response
+ */
+public interface ElasticResponseListener {
+
+ Set<String> DEFAULT_SOURCE_FIELDS = Collections.singleton(FieldNames.PATH);
+
+ /**
+ * Returns the source fields this listener is interested on
+ *
+ * @return the list of fields to listen to (only PATH as default)
+ */
+ default Set<String> sourceFields() {
+ return DEFAULT_SOURCE_FIELDS;
+ }
+
+ /**
+ * This method is invoked when there is no more data to process.
+ */
+ void endData();
+
+ /**
+ * {@link ElasticResponseListener} extension to subscribe on {@link
SearchHit} events
+ */
+ interface SearchHitListener extends ElasticResponseListener {
+
+ /**
+ * Returns {@code true} if the listener is interested in the entire
result set
+ */
+ default boolean isFullScan() {
+ return false;
+ }
+
+ /**
+ * This method is invoked at the beginning of the listener lifecycle
to notify the number of hits this
+ * listener could receive
+ * @param totalHits the total number of hits
+ */
+ default void startData(long totalHits) { /*empty*/ }
+
+ /**
+ * This method is called for each {@link SearchHit} retrieved
+ */
+ void on(SearchHit searchHit);
+ }
+
+ /**
+ * {@link ElasticResponseListener} extension to subscribe on {@link
Aggregations} events
+ */
+ interface AggregationListener extends ElasticResponseListener {
+
+ /**
+ * This method is called once when the {@link Aggregations} are
retrieved
+ * @param aggregations the {@link Aggregations} or {@code null} if
there are no results
+ */
+ void on(Aggregations aggregations);
+ }
+}
Propchange:
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResponseListener.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java?rev=1878762&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java
(added)
+++
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java
Fri Jun 12 05:39:42 2020
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.elastic.query.async;
+
+import org.apache.jackrabbit.oak.plugins.index.elastic.query.ElasticIndexNode;
+import
org.apache.jackrabbit.oak.plugins.index.elastic.query.async.facets.ElasticFacetProvider;
+import org.apache.jackrabbit.oak.plugins.index.search.FieldNames;
+import
org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndex.FulltextResultRow;
+import
org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndexPlanner;
+import
org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndexPlanner.PlanResult;
+import org.apache.jackrabbit.oak.plugins.index.search.util.LMSEstimator;
+import org.apache.jackrabbit.oak.spi.query.QueryIndex;
+import org.apache.jackrabbit.oak.spi.query.QueryIndex.IndexPlan;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.aggregations.Aggregations;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.sort.SortBuilders;
+import org.elasticsearch.search.sort.SortOrder;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+
+/**
+ * Class to iterate over Elastic results of a given {@link IndexPlan}.
+ * The results are produced asynchronously into an internal unbounded {@link
BlockingQueue}. To avoid too many calls to
+ * Elastic the results are loaded in chunks (using search_after strategy) and
loaded only when needed.
+ */
+public class ElasticResultRowAsyncIterator implements
Iterator<FulltextResultRow>, ElasticResponseListener.SearchHitListener {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ElasticResultRowAsyncIterator.class);
+ // this is an internal special message to notify the consumer the result
set has been completely returned
+ private static final FulltextResultRow POISON_PILL =
+ new FulltextResultRow("___OAK_POISON_PILL___", 0d,
Collections.emptyMap(), null, null);
+
+ private final BlockingQueue<FulltextResultRow> queue = new
LinkedBlockingQueue<>();
+
+ private final ElasticIndexNode indexNode;
+ private final IndexPlan indexPlan;
+ private final PlanResult planResult;
+ private final Predicate<String> rowInclusionPredicate;
+ private final LMSEstimator estimator;
+
+ private final ElasticQueryScanner elasticQueryScanner;
+ private final ElasticRequestHandler elasticRequestHandler;
+ private final ElasticResponseHandler elasticResponseHandler;
+ private final ElasticFacetProvider elasticFacetProvider;
+
+ private FulltextResultRow nextRow;
+
+ public ElasticResultRowAsyncIterator(@NotNull ElasticIndexNode indexNode,
+ @NotNull QueryIndex.IndexPlan
indexPlan,
+ @NotNull
FulltextIndexPlanner.PlanResult planResult,
+ Predicate<String>
rowInclusionPredicate,
+ LMSEstimator estimator) {
+ this.indexNode = indexNode;
+ this.indexPlan = indexPlan;
+ this.planResult = planResult;
+ this.rowInclusionPredicate = rowInclusionPredicate;
+ this.estimator = estimator;
+
+ this.elasticRequestHandler = new ElasticRequestHandler(indexPlan,
planResult);
+ this.elasticResponseHandler = new ElasticResponseHandler(planResult);
+ this.elasticFacetProvider = initFacetProvider();
+ this.elasticQueryScanner = initScanner();
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (queue.isEmpty()) {
+ // this triggers, when needed, the scan of the next results chunk
+ elasticQueryScanner.scan();
+ }
+ try {
+ nextRow = queue.take();
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("Error reading next result from
Elastic", e);
+ }
+ if (POISON_PILL.path.equals(nextRow.path)) {
+ nextRow = null;
+ }
+ return nextRow != null;
+ }
+
+ @Override
+ public FulltextResultRow next() {
+ return nextRow;
+ }
+
+ @Override
+ public void on(SearchHit searchHit) {
+ final String path = elasticResponseHandler.getPath(searchHit);
+ if (path != null) {
+ if (rowInclusionPredicate != null &&
!rowInclusionPredicate.test(path)) {
+ LOG.trace("Path {} not included because of hierarchy inclusion
rules", path);
+ return;
+ }
+ try {
+ queue.put(new FulltextResultRow(path, searchHit.getScore(),
null, elasticFacetProvider, null));
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("Error producing results into
the iterator queue", e);
+ }
+ }
+ }
+
+ @Override
+ public void endData() {
+ try {
+ queue.put(POISON_PILL);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("Error inserting poison pill into
the iterator queue", e);
+ }
+ }
+
+ private ElasticFacetProvider initFacetProvider() {
+ return elasticRequestHandler.requiresFacets() ?
+ ElasticFacetProvider.getProvider(
+
planResult.indexDefinition.getSecureFacetConfiguration(),
+ elasticRequestHandler, elasticResponseHandler,
+ indexPlan.getFilter()::isAccessible
+ ) : null;
+ }
+
+ private ElasticQueryScanner initScanner() {
+ List<ElasticResponseListener> listeners = new ArrayList<>();
+ // TODO: we could avoid to register this listener when the client is
interested in facets only. It would save space and time
+ listeners.add(this);
+ if (elasticFacetProvider != null) {
+ listeners.add(elasticFacetProvider);
+ }
+
+ return new ElasticQueryScanner(elasticRequestHandler, listeners);
+ }
+
+ /**
+ * Scans Elastic results asynchronously and notify listeners.
+ */
+ class ElasticQueryScanner implements ActionListener<SearchResponse> {
+
+ private static final int SMALL_RESULT_SET_SIZE = 10;
+ private static final int MEDIUM_RESULT_SET_SIZE = 100;
+ private static final int LARGE_RESULT_SET_SIZE = 1000;
+
+ private final Set<ElasticResponseListener> allListeners = new
HashSet<>();
+ private final List<SearchHitListener> searchHitListeners = new
ArrayList<>();
+ private final List<AggregationListener> aggregationListeners = new
ArrayList<>();
+
+ private final QueryBuilder query;
+ private final String[] sourceFields;
+
+ // concurrent data structures to coordinate chunks loading
+ private final AtomicBoolean anyDataLeft = new AtomicBoolean(false);
+
+ private int scannedRows = 0;
+ private boolean firstRequest = true;
+ private boolean fullScan = false;
+
+ // reference to the last document sort values for search_after queries
+ private Object[] lastHitSortValues;
+
+ // Semaphore to guarantee only one in-flight request to Elastic
+ private final Semaphore semaphore = new Semaphore(1);
+
+ ElasticQueryScanner(ElasticRequestHandler requestHandler,
+ List<ElasticResponseListener> listeners) {
+ this.query = requestHandler.build();
+
+ final Set<String> sourceFieldsSet = new HashSet<>();
+ final AtomicBoolean needsAggregations = new AtomicBoolean(false);
+ final Consumer<ElasticResponseListener> register = (listener) -> {
+ allListeners.add(listener);
+ sourceFieldsSet.addAll(listener.sourceFields());
+ if (listener instanceof SearchHitListener) {
+ SearchHitListener searchHitListener = (SearchHitListener)
listener;
+ searchHitListeners.add(searchHitListener);
+ if (searchHitListener.isFullScan()) {
+ fullScan = true;
+ }
+ }
+ if (listener instanceof AggregationListener) {
+ aggregationListeners.add((AggregationListener) listener);
+ needsAggregations.set(true);
+ }
+ };
+
+ listeners.forEach(register);
+ this.sourceFields = sourceFieldsSet.toArray(new String[0]);
+
+ final SearchSourceBuilder searchSourceBuilder =
SearchSourceBuilder.searchSource()
+ .query(query)
+ // use a smaller size when the client asks for facets.
This improves performance
+ // when the client is only interested in insecure facets
+ .size(needsAggregations.get() ? SMALL_RESULT_SET_SIZE :
MEDIUM_RESULT_SET_SIZE)
+ .fetchSource(sourceFields, null)
+ // TODO: this needs to be moved in the requestHandler when
sorting will be properly supported
+
.sort(SortBuilders.fieldSort("_score").order(SortOrder.DESC))
+
.sort(SortBuilders.fieldSort(FieldNames.PATH).order(SortOrder.ASC)); //
tie-breaker
+
+ if (needsAggregations.get()) {
+
requestHandler.aggregations().forEach(searchSourceBuilder::aggregation);
+ }
+
+ final SearchRequest searchRequest = new
SearchRequest(indexNode.getDefinition().getRemoteIndexAlias())
+ .source(searchSourceBuilder);
+
+ LOG.trace("Kicking initial search for query {}",
searchSourceBuilder);
+ semaphore.tryAcquire();
+ indexNode.getConnection().getClient().searchAsync(searchRequest,
RequestOptions.DEFAULT, this);
+ }
+
+ /**
+ * Handle the response action notifying the registered listeners.
Depending on the listeners' configuration
+ * it could keep loading chunks or wait for a {@code #scan} call to
resume scanning.
+ */
+ @Override
+ public void onResponse(SearchResponse searchResponse) {
+ final SearchHit[] searchHits = searchResponse.getHits().getHits();
+ if (searchHits != null && searchHits.length > 0) {
+ long totalHits = searchResponse.getHits().getTotalHits().value;
+ LOG.debug("Processing search response that took {} to read
{}/{} docs",
+ searchResponse.getTook(), searchHits.length,
totalHits);
+ lastHitSortValues = searchHits[searchHits.length -
1].getSortValues();
+ scannedRows += searchHits.length;
+ anyDataLeft.set(totalHits > scannedRows);
+ estimator.update(indexPlan.getFilter(), totalHits);
+
+ // now that we got the last hit we can release the semaphore
to potentially unlock other requests
+ semaphore.release();
+
+ if (firstRequest) {
+ for (SearchHitListener l : searchHitListeners) {
+ l.startData(totalHits);
+ }
+
+ if (!aggregationListeners.isEmpty()) {
+ Aggregations aggregations =
searchResponse.getAggregations();
+ LOG.trace("Emitting aggregations {}", aggregations);
+ for (AggregationListener l : aggregationListeners) {
+ l.on(aggregations);
+ }
+ }
+
+ firstRequest = false;
+ }
+
+ LOG.trace("Emitting {} search hits, for a total of {} scanned
results", searchHits.length, scannedRows);
+ for (SearchHit hit : searchHits) {
+ for (SearchHitListener l : searchHitListeners) {
+ l.on(hit);
+ }
+ }
+
+ if (!anyDataLeft.get()) {
+ LOG.trace("No data left: closing scanner, notifying
listeners");
+ close();
+ } else if (fullScan) {
+ scan();
+ }
+ } else {
+ LOG.trace("No results: closing scanner, notifying listeners");
+ close();
+ }
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ LOG.error("Error retrieving data from Elastic: closing scanner,
notifying listeners", e);
+ // closing scanner immediately after a failure avoiding them to
hang (potentially) forever
+ close();
+ }
+
+ /**
+ * Triggers a scan of a new chunk of the result set, if needed.
+ */
+ public void scan() {
+ if (semaphore.tryAcquire() && anyDataLeft.get()) {
+ final SearchSourceBuilder searchSourceBuilder =
SearchSourceBuilder.searchSource()
+ .query(query)
+ .size(LARGE_RESULT_SET_SIZE)
+ .fetchSource(sourceFields, null)
+ .searchAfter(lastHitSortValues)
+ // TODO: this needs to be moved in the requestHandler
when sorting will be properly supported
+
.sort(SortBuilders.fieldSort("_score").order(SortOrder.DESC))
+
.sort(SortBuilders.fieldSort(FieldNames.PATH).order(SortOrder.ASC)); //
tie-breaker
+
+ final SearchRequest searchRequest = new
SearchRequest(indexNode.getDefinition().getRemoteIndexAlias())
+ .source(searchSourceBuilder);
+ LOG.trace("Kicking new search after query {}",
searchRequest.source());
+
+
indexNode.getConnection().getClient().searchAsync(searchRequest,
RequestOptions.DEFAULT, this);
+ } else {
+ LOG.trace("Scanner is closing or still processing data from
the previous scan");
+ }
+ }
+
+ // close all listeners
+ private void close() {
+ semaphore.release();
+ for (ElasticResponseListener l : allListeners) {
+ l.endData();
+ }
+ }
+ }
+}
Propchange:
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticFacetProvider.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticFacetProvider.java?rev=1878762&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticFacetProvider.java
(added)
+++
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticFacetProvider.java
Fri Jun 12 05:39:42 2020
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.elastic.query.async.facets;
+
+import
org.apache.jackrabbit.oak.plugins.index.elastic.query.async.ElasticRequestHandler;
+import
org.apache.jackrabbit.oak.plugins.index.elastic.query.async.ElasticResponseHandler;
+import
org.apache.jackrabbit.oak.plugins.index.elastic.query.async.ElasticResponseListener;
+import
org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition.SecureFacetConfiguration;
+import org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndex;
+
+import java.util.function.Predicate;
+
+/**
+ * Provider of facets through an {@link ElasticResponseListener}
+ */
+public interface ElasticFacetProvider extends FulltextIndex.FacetProvider,
ElasticResponseListener {
+
+ /**
+ * Returns the appropriate provider based on the {@link
SecureFacetConfiguration}
+ * @param facetConfiguration the {@link SecureFacetConfiguration} to
extract facet options
+ * @param requestHandler the {@link ElasticRequestHandler} to perform
actions at request time
+ * @param responseHandler the {@link ElasticResponseHandler} to decode
responses
+ * @param isAccessible a {@link Predicate} to check if a node is accessible
+ *
+ * @return an {@link ElasticFacetProvider} based on {@link
SecureFacetConfiguration#getMode()} with
+ * {@link SecureFacetConfiguration.MODE#SECURE} as default
+ */
+ static ElasticFacetProvider getProvider(
+ SecureFacetConfiguration facetConfiguration,
+ ElasticRequestHandler requestHandler,
+ ElasticResponseHandler responseHandler,
+ Predicate<String> isAccessible
+ ) {
+ final ElasticFacetProvider facetProvider;
+ switch (facetConfiguration.getMode()) {
+ case INSECURE:
+ facetProvider = new ElasticInsecureFacetAsyncProvider();
+ break;
+ case STATISTICAL:
+ facetProvider = new ElasticStatisticalFacetAsyncProvider(
+ requestHandler, responseHandler, isAccessible,
+ facetConfiguration.getRandomSeed(),
facetConfiguration.getStatisticalFacetSampleSize()
+ );
+ break;
+ case SECURE:
+ default:
+ facetProvider = new
ElasticSecureFacetAsyncProvider(requestHandler, responseHandler, isAccessible);
+
+ }
+ return facetProvider;
+ }
+
+}
Propchange:
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticFacetProvider.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticInsecureFacetAsyncProvider.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticInsecureFacetAsyncProvider.java?rev=1878762&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticInsecureFacetAsyncProvider.java
(added)
+++
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticInsecureFacetAsyncProvider.java
Fri Jun 12 05:39:42 2020
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.elastic.query.async.facets;
+
+import
org.apache.jackrabbit.oak.plugins.index.elastic.query.async.ElasticResponseListener;
+import org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndex;
+import org.elasticsearch.search.aggregations.Aggregations;
+import org.elasticsearch.search.aggregations.bucket.terms.Terms;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An {@link ElasticFacetProvider} that subscribes to Elastic Aggregation
events.
+ * The provider is async: {@code getFacets} waits until the aggregation is
read or for a max of 15 seconds. In the latter
+ * case, an {@link IllegalStateException} is thrown.
+ */
+class ElasticInsecureFacetAsyncProvider implements ElasticFacetProvider,
ElasticResponseListener.AggregationListener {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ElasticInsecureFacetAsyncProvider.class);
+
+ private Aggregations aggregations;
+
+ private final CountDownLatch latch = new CountDownLatch(1);
+
+ @Override
+ public List<FulltextIndex.Facet> getFacets(int numberOfFacets, String
columnName) {
+ LOG.trace("Requested facets for {} - Latch count: {}", columnName,
latch.getCount());
+ try {
+ latch.await(15, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("Error while waiting for facets",
e);
+ }
+ LOG.trace("Reading facets for {} from aggregations {}", columnName,
aggregations.asMap());
+ if (aggregations != null) {
+ final String facetProp = FulltextIndex.parseFacetField(columnName);
+ Terms terms = aggregations.get(facetProp);
+ List<FulltextIndex.Facet> facets = new
ArrayList<>(terms.getBuckets().size());
+ for (Terms.Bucket bucket : terms.getBuckets()) {
+ facets.add(new FulltextIndex.Facet(bucket.getKeyAsString(),
(int) bucket.getDocCount()));
+ }
+ return facets;
+ } else return null;
+ }
+
+ @Override
+ public void on(Aggregations aggregations) {
+ this.aggregations = aggregations;
+ this.endData();
+ }
+
+ @Override
+ public void endData() {
+ latch.countDown();
+ }
+}
Propchange:
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticInsecureFacetAsyncProvider.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticSecureFacetAsyncProvider.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticSecureFacetAsyncProvider.java?rev=1878762&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticSecureFacetAsyncProvider.java
(added)
+++
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticSecureFacetAsyncProvider.java
Fri Jun 12 05:39:42 2020
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.elastic.query.async.facets;
+
+import
org.apache.jackrabbit.oak.plugins.index.elastic.query.async.ElasticRequestHandler;
+import
org.apache.jackrabbit.oak.plugins.index.elastic.query.async.ElasticResponseHandler;
+import
org.apache.jackrabbit.oak.plugins.index.elastic.query.async.ElasticResponseListener;
+import org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndex;
+import org.elasticsearch.search.SearchHit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+/**
+ * An {@link ElasticFacetProvider} that subscribes to Elastic SearchHit events
to return only accessible facets.
+ */
+class ElasticSecureFacetAsyncProvider implements ElasticFacetProvider,
ElasticResponseListener.SearchHitListener {
+
+ protected static final Logger LOG =
LoggerFactory.getLogger(ElasticSecureFacetAsyncProvider.class);
+
+ protected final Set<String> facetFields;
+ private final Map<String, Map<String, Integer>> facetsMap = new
ConcurrentHashMap<>();
+ private Map<String, List<FulltextIndex.Facet>> facets;
+ protected final ElasticResponseHandler elasticResponseHandler;
+ protected final Predicate<String> isAccessible;
+
+ private final CountDownLatch latch = new CountDownLatch(1);
+
+ ElasticSecureFacetAsyncProvider(
+ ElasticRequestHandler elasticRequestHandler,
+ ElasticResponseHandler elasticResponseHandler,
+ Predicate<String> isAccessible
+ ) {
+ this.elasticResponseHandler = elasticResponseHandler;
+ this.isAccessible = isAccessible;
+ this.facetFields =
elasticRequestHandler.facetFields().collect(Collectors.toSet());
+ }
+
+ @Override
+ public Set<String> sourceFields() {
+ return facetFields;
+ }
+
+ @Override
+ public boolean isFullScan() {
+ return true;
+ }
+
+ @Override
+ public void on(SearchHit searchHit) {
+ final String path = elasticResponseHandler.getPath(searchHit);
+ if (path != null && isAccessible.test(path)) {
+ Map<String, Object> sourceMap = searchHit.getSourceAsMap();
+ for (String field: facetFields) {
+ Object value = sourceMap.get(field);
+ if (value != null) {
+ facetsMap.compute(field, (column, facetValues) -> {
+ if (facetValues == null) {
+ Map<String, Integer> values = new HashMap<>();
+ values.put(value.toString(), 1);
+ return values;
+ } else {
+ facetValues.merge(value.toString(), 1,
Integer::sum);
+ return facetValues;
+ }
+ });
+ }
+ }
+ }
+ }
+
+ @Override
+ public void endData() {
+ // create Facet objects, order by count (desc) and then by label (asc)
+ facets = facetsMap.entrySet()
+ .stream()
+ .collect(Collectors.toMap
+ (Map.Entry::getKey, x -> x.getValue().entrySet()
+ .stream()
+ .map(e -> new FulltextIndex.Facet(e.getKey(),
e.getValue()))
+ .sorted((f1, f2) -> {
+ int f1Count = f1.getCount();
+ int f2Count = f2.getCount();
+ if (f1Count == f2Count) {
+ return
f1.getLabel().compareTo(f2.getLabel());
+ } else return f2Count - f1Count;
+ })
+ .collect(Collectors.toList())
+ )
+ );
+ LOG.trace("End data {}", facetsMap);
+ latch.countDown();
+ }
+
+ @Override
+ public List<FulltextIndex.Facet> getFacets(int numberOfFacets, String
columnName) {
+ LOG.trace("Requested facets for {} - Latch count: {}", columnName,
latch.getCount());
+ try {
+ latch.await(15, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("Error while waiting for facets",
e);
+ }
+ LOG.trace("Reading facets for {} from {}", columnName, facetsMap);
+ return facets.get(FulltextIndex.parseFacetField(columnName));
+ }
+}
Propchange:
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticSecureFacetAsyncProvider.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticStatisticalFacetAsyncProvider.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticStatisticalFacetAsyncProvider.java?rev=1878762&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticStatisticalFacetAsyncProvider.java
(added)
+++
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticStatisticalFacetAsyncProvider.java
Fri Jun 12 05:39:42 2020
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.elastic.query.async.facets;
+
+import
org.apache.jackrabbit.oak.plugins.index.elastic.query.async.ElasticRequestHandler;
+import
org.apache.jackrabbit.oak.plugins.index.elastic.query.async.ElasticResponseHandler;
+import
org.apache.jackrabbit.oak.plugins.index.elastic.query.async.ElasticResponseListener;
+import org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndex;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.aggregations.Aggregations;
+import org.elasticsearch.search.aggregations.bucket.terms.Terms;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+
+/**
+ * An {@link ElasticSecureFacetAsyncProvider} extension that subscribes also
on Elastic Aggregation events.
+ * SearchHit events are sampled and then used to adjust facets coming from
Aggregations in order to minimize
+ * access checks. This provider could improve facets performance but only when
the result set is quite big.
+ */
+public class ElasticStatisticalFacetAsyncProvider
+ extends ElasticSecureFacetAsyncProvider
+ implements ElasticResponseListener.AggregationListener {
+
+ private final int sampleSize;
+ private long totalHits;
+
+ private final Random rGen;
+ private int sampled = 0;
+ private int seen = 0;
+ private long accessibleCount = 0;
+
+ private final Map<String, List<FulltextIndex.Facet>> facetMap = new
HashMap<>();
+
+ private final CountDownLatch latch = new CountDownLatch(1);
+
+ ElasticStatisticalFacetAsyncProvider(ElasticRequestHandler
elasticRequestHandler,
+ ElasticResponseHandler
elasticResponseHandler,
+ Predicate<String> isAccessible,
+ long randomSeed, int sampleSize) {
+ super(elasticRequestHandler, elasticResponseHandler, isAccessible);
+ this.sampleSize = sampleSize;
+ this.rGen = new Random(randomSeed);
+ }
+
+ @Override
+ public void startData(long totalHits) {
+ this.totalHits = totalHits;
+ }
+
+ @Override
+ public void on(SearchHit searchHit) {
+ if (totalHits < sampleSize) {
+ super.on(searchHit);
+ } else {
+ if (sampleSize == sampled) {
+ return;
+ }
+ int r = rGen.nextInt((int) (totalHits - seen)) + 1;
+ seen++;
+
+ if (r <= sampleSize - sampled) {
+ sampled++;
+ final String path = elasticResponseHandler.getPath(searchHit);
+ if (path != null && isAccessible.test(path)) {
+ accessibleCount++;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void on(Aggregations aggregations) {
+ for (String field: facetFields) {
+ Terms terms = aggregations.get(field);
+ List<? extends Terms.Bucket> buckets = terms.getBuckets();
+ final List<FulltextIndex.Facet> facetList = new ArrayList<>();
+ for (Terms.Bucket bucket : buckets) {
+ facetList.add(new FulltextIndex.Facet(bucket.getKeyAsString(),
(int) bucket.getDocCount()));
+ }
+ facetMap.put(field, facetList);
+ }
+ }
+
+ @Override
+ public void endData() {
+ if (totalHits < sampleSize) {
+ super.endData();
+ } else {
+ for (String facet: facetMap.keySet()) {
+ facetMap.compute(facet, (s, facets1) ->
updateLabelAndValueIfRequired(facets1));
+ }
+ latch.countDown();
+ }
+ }
+
+ @Override
+ public List<FulltextIndex.Facet> getFacets(int numberOfFacets, String
columnName) {
+ if (totalHits < sampleSize) {
+ return super.getFacets(numberOfFacets, columnName);
+ } else {
+ LOG.trace("Requested facets for {} - Latch count: {}", columnName,
latch.getCount());
+ try {
+ latch.await(15, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("Error while waiting for
facets", e);
+ }
+ LOG.trace("Reading facets for {} from {}", columnName, facetMap);
+ return facetMap.get(FulltextIndex.parseFacetField(columnName));
+ }
+ }
+
+ private List<FulltextIndex.Facet>
updateLabelAndValueIfRequired(List<FulltextIndex.Facet> labelAndValues) {
+ if (accessibleCount < sampleSize) {
+ int numZeros = 0;
+ List<FulltextIndex.Facet> newValues;
+ {
+ List<FulltextIndex.Facet> proportionedLVs = new LinkedList<>();
+ for (FulltextIndex.Facet labelAndValue : labelAndValues) {
+ long count = labelAndValue.getCount() * accessibleCount /
sampleSize;
+ if (count == 0) {
+ numZeros++;
+ }
+ proportionedLVs.add(new
FulltextIndex.Facet(labelAndValue.getLabel(), Math.toIntExact(count)));
+ }
+ labelAndValues = proportionedLVs;
+ }
+ if (numZeros > 0) {
+ newValues = new LinkedList<>();
+ for (FulltextIndex.Facet lv : labelAndValues) {
+ if (lv.getCount() > 0) {
+ newValues.add(lv);
+ }
+ }
+ } else {
+ newValues = labelAndValues;
+ }
+ return newValues;
+ } else {
+ return labelAndValues;
+ }
+ }
+}
Propchange:
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticStatisticalFacetAsyncProvider.java
------------------------------------------------------------------------------
svn:eol-style = native