Author: fortino
Date: Fri Jun 12 05:39:42 2020
New Revision: 1878762

URL: http://svn.apache.org/viewvc?rev=1878762&view=rev
Log:
OAK-9091: introduced async iterator

Added:
    
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/
    
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticRequestHandler.java
   (with props)
    
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResponseHandler.java
   (with props)
    
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResponseListener.java
   (with props)
    
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java
   (with props)
    
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/
    
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticFacetProvider.java
   (with props)
    
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticInsecureFacetAsyncProvider.java
   (with props)
    
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticSecureFacetAsyncProvider.java
   (with props)
    
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticStatisticalFacetAsyncProvider.java
   (with props)
Modified:
    
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticDocumentMaker.java
    
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticIndex.java
    
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticResultRowIterator.java
    
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticFacetTest.java
    
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticFullTextAsyncTest.java
    
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticTestUtils.java
    jackrabbit/oak/trunk/oak-search-elastic/src/test/resources/logback-test.xml
    
jackrabbit/oak/trunk/oak-search/src/main/java/org/apache/jackrabbit/oak/plugins/index/search/spi/query/FulltextIndex.java

Modified: 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticDocumentMaker.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticDocumentMaker.java?rev=1878762&r1=1878761&r2=1878762&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticDocumentMaker.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticDocumentMaker.java
 Fri Jun 12 05:39:42 2020
@@ -93,7 +93,8 @@ class ElasticDocumentMaker extends Fullt
 
     @Override
     protected void indexAnalyzedProperty(ElasticDocument doc, String pname, 
String value, PropertyDefinition pd) {
-        doc.addProperty(FieldNames.createAnalyzedFieldName(pname), value);
+        // no need to do anything here. Analyzed properties are persisted in 
Elastic
+        // using multi fields. The analyzed properties are set calling 
#indexTypedProperty
     }
 
     @Override

Modified: 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticIndex.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticIndex.java?rev=1878762&r1=1878761&r2=1878762&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticIndex.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticIndex.java
 Fri Jun 12 05:39:42 2020
@@ -16,7 +16,9 @@
  */
 package org.apache.jackrabbit.oak.plugins.index.elastic.query;
 
+import org.apache.jackrabbit.oak.commons.PathUtils;
 import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
+import 
org.apache.jackrabbit.oak.plugins.index.elastic.query.async.ElasticRequestHandler;
 import org.apache.jackrabbit.oak.plugins.index.search.IndexNode;
 import org.apache.jackrabbit.oak.plugins.index.search.SizeEstimator;
 import org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndex;
@@ -24,7 +26,6 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.index.search.util.LMSEstimator;
 import org.apache.jackrabbit.oak.spi.query.Cursor;
 import org.apache.jackrabbit.oak.spi.query.Filter;
-import org.apache.jackrabbit.oak.spi.query.QueryLimits;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.elasticsearch.common.Strings;
 import org.jetbrains.annotations.NotNull;
@@ -40,7 +41,7 @@ import static org.apache.jackrabbit.oak.
 class ElasticIndex extends FulltextIndex {
     private static final Predicate<NodeState> 
ELASTICSEARCH_INDEX_DEFINITION_PREDICATE =
             state -> 
TYPE_ELASTICSEARCH.equals(state.getString(TYPE_PROPERTY_NAME));
-    private static final Map<String, LMSEstimator> estimators = new 
WeakHashMap<>();
+    private static final Map<String, LMSEstimator> ESTIMATORS = new 
WeakHashMap<>();
 
     // higher than some threshold below which the query should rather be 
answered by something else if possible
     private static final double MIN_COST = 100.1;
@@ -90,9 +91,7 @@ class ElasticIndex extends FulltextIndex
 
     @Override
     protected String getFulltextRequestString(IndexPlan plan, IndexNode 
indexNode) {
-        return Strings.toString(new ElasticResultRowIterator(plan.getFilter(), 
getPlanResult(plan), plan,
-                acquireIndexNode(plan), FulltextIndex::shouldInclude, 
getEstimator(plan.getPlanName()))
-                .getElasticQuery(plan, getPlanResult(plan)));
+        return Strings.toString(new ElasticRequestHandler(plan, 
getPlanResult(plan)).build());
     }
 
     @Override
@@ -102,11 +101,23 @@ class ElasticIndex extends FulltextIndex
         // TODO: sorting
 
         final FulltextIndexPlanner.PlanResult pr = getPlanResult(plan);
-        QueryLimits settings = filter.getQueryLimits();
+
+        // this function is called for each extracted row. Passing 
FulltextIndex::shouldInclude means that for each
+        // row we evaluate getPathRestriction(plan) & 
plan.getFilter().getPathRestriction(). Providing a partial
+        // function (https://en.wikipedia.org/wiki/Partial_function) we can 
evaluate them once and still use a predicate as before
+//        BiFunction<String, Filter.PathRestriction, Predicate<String>> 
partialShouldInclude = (path, pathRestriction) -> docPath ->
+//                shouldInclude(path, pathRestriction, docPath);
+//
+//        Iterator<FulltextResultRow> itr = new ElasticResultRowAsyncIterator(
+//                acquireIndexNode(plan),
+//                plan,
+//                pr,
+//                partialShouldInclude.apply(getPathRestriction(plan), 
plan.getFilter().getPathRestriction()),
+//                getEstimator(plan.getPlanName())
+//        );
 
         Iterator<FulltextResultRow> itr = new ElasticResultRowIterator(filter, 
pr, plan,
                 acquireIndexNode(plan), FulltextIndex::shouldInclude, 
getEstimator(plan.getPlanName()));
-        SizeEstimator sizeEstimator = getSizeEstimator(plan);
 
         /*
         TODO: sync (nrt too??)
@@ -117,12 +128,29 @@ class ElasticIndex extends FulltextIndex
 
         // no concept of rewound in ES (even if it might be doing it 
internally, we can't do much about it
         IteratorRewoundStateProvider rewoundStateProvider = () -> 0;
-        return new FulltextPathCursor(itr, rewoundStateProvider, plan, 
settings, sizeEstimator);
+        return new FulltextPathCursor(itr, rewoundStateProvider, plan, 
filter.getQueryLimits(), getSizeEstimator(plan));
     }
 
     private LMSEstimator getEstimator(String path) {
-        estimators.putIfAbsent(path, new LMSEstimator());
-        return estimators.get(path);
+        ESTIMATORS.putIfAbsent(path, new LMSEstimator());
+        return ESTIMATORS.get(path);
+    }
+
+    private static boolean shouldInclude(String path, Filter.PathRestriction 
pathRestriction, String docPath) {
+        boolean include = true;
+        switch (pathRestriction) {
+            case EXACT:
+                include = path.equals(docPath);
+                break;
+            case DIRECT_CHILDREN:
+                include = PathUtils.getParentPath(docPath).equals(path);
+                break;
+            case ALL_CHILDREN:
+                include = PathUtils.isAncestor(path, docPath);
+                break;
+        }
+
+        return include;
     }
 
     @Override

Modified: 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticResultRowIterator.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticResultRowIterator.java?rev=1878762&r1=1878761&r2=1878762&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticResultRowIterator.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticResultRowIterator.java
 Fri Jun 12 05:39:42 2020
@@ -20,8 +20,8 @@ import org.apache.jackrabbit.oak.api.Typ
 import org.apache.jackrabbit.oak.commons.PathUtils;
 import org.apache.jackrabbit.oak.commons.PerfLogger;
 import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
-import 
org.apache.jackrabbit.oak.plugins.index.elastic.query.facets.ElasticFacetHelper;
 import 
org.apache.jackrabbit.oak.plugins.index.elastic.query.facets.ElasticAggregationData;
+import 
org.apache.jackrabbit.oak.plugins.index.elastic.query.facets.ElasticFacetHelper;
 import 
org.apache.jackrabbit.oak.plugins.index.elastic.query.facets.ElasticFacets;
 import 
org.apache.jackrabbit.oak.plugins.index.elastic.util.ElasticAggregationBuilderUtil;
 import org.apache.jackrabbit.oak.plugins.index.elastic.util.ElasticConstants;
@@ -29,10 +29,12 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
 import org.apache.jackrabbit.oak.plugins.index.search.PropertyDefinition;
 import org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndex;
+import 
org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndexPlanner;
 import 
org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndexPlanner.PlanResult;
 import org.apache.jackrabbit.oak.plugins.index.search.util.LMSEstimator;
 import org.apache.jackrabbit.oak.spi.query.Filter;
 import org.apache.jackrabbit.oak.spi.query.QueryConstants;
+import org.apache.jackrabbit.oak.spi.query.QueryIndex;
 import org.apache.jackrabbit.oak.spi.query.QueryIndex.IndexPlan;
 import org.apache.jackrabbit.oak.spi.query.fulltext.FullTextAnd;
 import org.apache.jackrabbit.oak.spi.query.fulltext.FullTextContains;
@@ -80,7 +82,6 @@ import static org.apache.jackrabbit.oak.
 import static 
org.apache.jackrabbit.oak.plugins.index.elastic.util.TermQueryBuilderFactory.newPropertyRestrictionQuery;
 import static 
org.apache.jackrabbit.oak.plugins.index.elastic.util.TermQueryBuilderFactory.newWildcardPathQuery;
 import static 
org.apache.jackrabbit.oak.plugins.index.elastic.util.TermQueryBuilderFactory.newWildcardQuery;
-import static 
org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndex.isNodePath;
 import static org.apache.jackrabbit.oak.spi.query.QueryConstants.JCR_PATH;
 import static org.apache.jackrabbit.util.ISO8601.parse;
 import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
@@ -113,8 +114,8 @@ class ElasticResultRowIterator implement
     private final LMSEstimator estimator;
 
     ElasticResultRowIterator(@NotNull Filter filter,
-                             @NotNull PlanResult planResult,
-                             @NotNull IndexPlan plan,
+                             @NotNull FulltextIndexPlanner.PlanResult 
planResult,
+                             @NotNull QueryIndex.IndexPlan plan,
                              ElasticIndexNode indexNode,
                              RowInclusionPredicate rowInclusionPredicate,
                              LMSEstimator estimator) {
@@ -261,9 +262,9 @@ class ElasticResultRowIterator implement
     }
 
     public interface RowInclusionPredicate {
-        boolean shouldInclude(@NotNull String path, @NotNull IndexPlan plan);
+        boolean shouldInclude(@NotNull String path, @NotNull 
QueryIndex.IndexPlan plan);
 
-        RowInclusionPredicate NOOP = (@NotNull String path, @NotNull IndexPlan 
plan) -> true;
+        RowInclusionPredicate NOOP = (@NotNull String path, @NotNull 
QueryIndex.IndexPlan plan) -> true;
     }
 
     /**
@@ -426,19 +427,8 @@ class ElasticResultRowIterator implement
             return FieldNames.FULLTEXT;
         }
 
-        if (isNodePath(p)) {
-            if (pr.isPathTransformed()) {
-                p = PathUtils.getName(p);
-            } else {
-                //Get rid of /* as aggregated fulltext field name is the
-                //node relative path
-                p = 
FieldNames.createFulltextFieldName(PathUtils.getParentPath(p));
-            }
-        } else {
-            if (pr.isPathTransformed()) {
-                p = PathUtils.getName(p);
-            }
-            p = FieldNames.createAnalyzedFieldName(p);
+        if (pr.isPathTransformed()) {
+            p = PathUtils.getName(p);
         }
 
         if ("*".equals(p)) {
@@ -502,7 +492,7 @@ class ElasticResultRowIterator implement
     }
 
     private void addNonFullTextConstraints(List<QueryBuilder> qs,
-                                                  IndexPlan plan, PlanResult 
planResult) {
+                                           IndexPlan plan, PlanResult 
planResult) {
         final BiPredicate<Iterable<String>, String> any = (iterable, value) ->
                 StreamSupport.stream(iterable.spliterator(), 
false).anyMatch(value::equals);
 

Added: 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticRequestHandler.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticRequestHandler.java?rev=1878762&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticRequestHandler.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticRequestHandler.java
 Fri Jun 12 05:39:42 2020
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.elastic.query.async;
+
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.commons.PathUtils;
+import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
+import org.apache.jackrabbit.oak.plugins.index.search.FieldNames;
+import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
+import org.apache.jackrabbit.oak.plugins.index.search.PropertyDefinition;
+import org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndex;
+import 
org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndexPlanner;
+import 
org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndexPlanner.PlanResult;
+import org.apache.jackrabbit.oak.spi.query.Filter;
+import org.apache.jackrabbit.oak.spi.query.QueryConstants;
+import org.apache.jackrabbit.oak.spi.query.QueryIndex;
+import org.apache.jackrabbit.oak.spi.query.QueryIndex.IndexPlan;
+import org.apache.jackrabbit.oak.spi.query.fulltext.FullTextAnd;
+import org.apache.jackrabbit.oak.spi.query.fulltext.FullTextContains;
+import org.apache.jackrabbit.oak.spi.query.fulltext.FullTextExpression;
+import org.apache.jackrabbit.oak.spi.query.fulltext.FullTextOr;
+import org.apache.jackrabbit.oak.spi.query.fulltext.FullTextTerm;
+import org.apache.jackrabbit.oak.spi.query.fulltext.FullTextVisitor;
+import org.apache.lucene.search.WildcardQuery;
+import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.aggregations.AggregationBuilders;
+import 
org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import javax.jcr.PropertyType;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiPredicate;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.jackrabbit.JcrConstants.JCR_MIXINTYPES;
+import static org.apache.jackrabbit.JcrConstants.JCR_PRIMARYTYPE;
+import static org.apache.jackrabbit.oak.commons.PathUtils.denotesRoot;
+import static org.apache.jackrabbit.oak.commons.PathUtils.getParentPath;
+import static 
org.apache.jackrabbit.oak.plugins.index.elastic.util.TermQueryBuilderFactory.newAncestorQuery;
+import static 
org.apache.jackrabbit.oak.plugins.index.elastic.util.TermQueryBuilderFactory.newDepthQuery;
+import static 
org.apache.jackrabbit.oak.plugins.index.elastic.util.TermQueryBuilderFactory.newMixinTypeQuery;
+import static 
org.apache.jackrabbit.oak.plugins.index.elastic.util.TermQueryBuilderFactory.newNodeTypeQuery;
+import static 
org.apache.jackrabbit.oak.plugins.index.elastic.util.TermQueryBuilderFactory.newNotNullPropQuery;
+import static 
org.apache.jackrabbit.oak.plugins.index.elastic.util.TermQueryBuilderFactory.newNullPropQuery;
+import static 
org.apache.jackrabbit.oak.plugins.index.elastic.util.TermQueryBuilderFactory.newPathQuery;
+import static 
org.apache.jackrabbit.oak.plugins.index.elastic.util.TermQueryBuilderFactory.newPrefixPathQuery;
+import static 
org.apache.jackrabbit.oak.plugins.index.elastic.util.TermQueryBuilderFactory.newPrefixQuery;
+import static 
org.apache.jackrabbit.oak.plugins.index.elastic.util.TermQueryBuilderFactory.newPropertyRestrictionQuery;
+import static 
org.apache.jackrabbit.oak.plugins.index.elastic.util.TermQueryBuilderFactory.newWildcardPathQuery;
+import static 
org.apache.jackrabbit.oak.plugins.index.elastic.util.TermQueryBuilderFactory.newWildcardQuery;
+import static org.apache.jackrabbit.oak.spi.query.QueryConstants.JCR_PATH;
+import static org.apache.jackrabbit.util.ISO8601.parse;
+import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
+import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
+import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
+import static org.elasticsearch.index.query.QueryBuilders.queryStringQuery;
+import static org.elasticsearch.index.query.QueryBuilders.termQuery;
+
+/**
+ * Class to map query plans into Elastic request objects.
+ */
+public class ElasticRequestHandler {
+
+    private final IndexPlan indexPlan;
+    private final PlanResult planResult;
+    private final ElasticIndexDefinition elasticIndexDefinition;
+
+    public ElasticRequestHandler(@NotNull QueryIndex.IndexPlan indexPlan, 
@NotNull FulltextIndexPlanner.PlanResult planResult) {
+        this.indexPlan = indexPlan;
+        this.planResult = planResult;
+        this.elasticIndexDefinition = (ElasticIndexDefinition) 
planResult.indexDefinition;
+    }
+
+    public QueryBuilder build() {
+        final BoolQueryBuilder boolQuery = boolQuery();
+
+        Filter filter = indexPlan.getFilter();
+        FullTextExpression ft = filter.getFullTextConstraint();
+
+        if (ft != null) {
+            boolQuery.must(fullTextQuery(ft, planResult));
+        }
+
+        // TODO: this code block should be removed? Or should we throw a 
NotSupported exception in this case?
+        //Check if native function is supported
+        Filter.PropertyRestriction pr = null;
+        if (elasticIndexDefinition.hasFunctionDefined()) {
+            pr = 
filter.getPropertyRestriction(elasticIndexDefinition.getFunctionName());
+        }
+
+        if (pr != null) {
+            String query = 
String.valueOf(pr.first.getValue(pr.first.getType()));
+            // TODO: more like this
+
+            // TODO: spellcheck
+
+            // TODO: suggest
+
+            boolQuery.must(queryStringQuery(query));
+        } else if (planResult.evaluateNonFullTextConstraints()) {
+            for (QueryBuilder constraint: nonFullTextConstraints(indexPlan, 
planResult)) {
+                boolQuery.filter(constraint);
+            }
+        }
+
+        // TODO: sort with no other restriction
+
+        if (!boolQuery.hasClauses()) {
+            // TODO: what happens here in planning mode (specially, apparently 
for things like rep:similar)
+            //For purely nodeType based queries all the documents would have to
+            //be returned (if the index definition has a single rule)
+            if (planResult.evaluateNodeTypeRestriction()) {
+                boolQuery.must(matchAllQuery());
+            }
+        }
+
+        return boolQuery;
+    }
+
+    public boolean requiresFacets() {
+        return indexPlan.getFilter().getPropertyRestrictions()
+                .stream()
+                .anyMatch(pr -> 
QueryConstants.REP_FACET.equals(pr.propertyName));
+    }
+
+    public Stream<TermsAggregationBuilder> aggregations() {
+        return facetFields()
+                .map(facetProp ->
+                        AggregationBuilders.terms(facetProp)
+                                
.field(elasticIndexDefinition.getElasticKeyword(facetProp))
+                                
.size(elasticIndexDefinition.getNumberOfTopFacets())
+                );
+    }
+
+    public Stream<String> facetFields() {
+        return indexPlan.getFilter().getPropertyRestrictions()
+                .stream()
+                .filter(pr -> QueryConstants.REP_FACET.equals(pr.propertyName))
+                .map(pr -> 
FulltextIndex.parseFacetField(pr.first.getValue(Type.STRING)));
+    }
+
+    private QueryBuilder fullTextQuery(FullTextExpression ft, final PlanResult 
pr) {
+        // a reference to the query, so it can be set in the visitor
+        // (a "non-local return")
+        final AtomicReference<QueryBuilder> result = new AtomicReference<>();
+        ft.accept(new FullTextVisitor() {
+
+            @Override
+            public boolean visit(FullTextContains contains) {
+                visitTerm(contains.getPropertyName(), contains.getRawText(), 
null, contains.isNot());
+                return true;
+            }
+
+            @Override
+            public boolean visit(FullTextOr or) {
+                BoolQueryBuilder q = boolQuery();
+                for (FullTextExpression e : or.list) {
+                    q.should(fullTextQuery(e, pr));
+                }
+                result.set(q);
+                return true;
+            }
+
+            @Override
+            public boolean visit(FullTextAnd and) {
+                BoolQueryBuilder q = boolQuery();
+                for (FullTextExpression e : and.list) {
+                    QueryBuilder x = fullTextQuery(e, pr);
+                    // TODO: see OAK-2434 and see if ES also can't work 
without unwrapping
+                    /* Only unwrap the clause if MUST_NOT(x) */
+                    boolean hasMustNot = false;
+                    if (x instanceof BoolQueryBuilder) {
+                        BoolQueryBuilder bq = (BoolQueryBuilder) x;
+                        if (bq.mustNot().size() == 1
+                                // no other clauses
+                                && bq.should().isEmpty() && 
bq.must().isEmpty() && bq.filter().isEmpty()) {
+                            hasMustNot = true;
+                            q.mustNot(bq.mustNot().get(0));
+                        }
+                    }
+
+                    if (!hasMustNot) {
+                        q.must(x);
+                    }
+                }
+                result.set(q);
+                return true;
+            }
+
+            @Override
+            public boolean visit(FullTextTerm term) {
+                return visitTerm(term.getPropertyName(), term.getText(), 
term.getBoost(), term.isNot());
+            }
+
+            private boolean visitTerm(String propertyName, String text, String 
boost, boolean not) {
+                String p = getLuceneFieldName(propertyName, pr);
+                QueryBuilder q = tokenToQuery(text, p, pr);
+                if (boost != null) {
+                    q.boost(Float.parseFloat(boost));
+                }
+                if (not) {
+                    BoolQueryBuilder bq = boolQuery().mustNot(q);
+                    result.set(bq);
+                } else {
+                    result.set(q);
+                }
+                return true;
+            }
+        });
+        return result.get();
+    }
+
+    private List<QueryBuilder> nonFullTextConstraints(IndexPlan plan, 
PlanResult planResult) {
+        final BiPredicate<Iterable<String>, String> any = (iterable, value) ->
+                StreamSupport.stream(iterable.spliterator(), 
false).anyMatch(value::equals);
+
+        final List<QueryBuilder> queries = new ArrayList<>();
+
+        Filter filter = plan.getFilter();
+        if (!filter.matchesAllTypes()) {
+            queries.add(nodeTypeConstraints(planResult.indexingRule, filter));
+        }
+
+        String path = FulltextIndex.getPathRestriction(plan);
+        switch (filter.getPathRestriction()) {
+            case ALL_CHILDREN:
+                if (!"/".equals(path)) {
+                    queries.add(newAncestorQuery(path));
+                }
+                break;
+            case DIRECT_CHILDREN:
+                BoolQueryBuilder bq = boolQuery()
+                    .must(newAncestorQuery(path))
+                    .must(newDepthQuery(path, planResult));
+                queries.add(bq);
+                break;
+            case EXACT:
+                // For transformed paths, we can only add path restriction if 
absolute path to property can be
+                // deduced
+                if (planResult.isPathTransformed()) {
+                    String parentPathSegment = 
planResult.getParentPathSegment();
+                    if (!any.test(PathUtils.elements(parentPathSegment), "*")) 
{
+                        queries.add(newPathQuery(path + parentPathSegment));
+                    }
+                } else {
+                    queries.add(newPathQuery(path));
+                }
+                break;
+            case PARENT:
+                if (denotesRoot(path)) {
+                    // there's no parent of the root node
+                    // we add a path that can not possibly occur because there
+                    // is no way to say "match no documents" in Lucene
+                    queries.add(newPathQuery("///"));
+                } else {
+                    // For transformed paths, we can only add path restriction 
if absolute path to property can be
+                    // deduced
+                    if (planResult.isPathTransformed()) {
+                        String parentPathSegment = 
planResult.getParentPathSegment();
+                        if (!any.test(PathUtils.elements(parentPathSegment), 
"*")) {
+                            queries.add(newPathQuery(getParentPath(path) + 
parentPathSegment));
+                        }
+                    } else {
+                        queries.add(newPathQuery(getParentPath(path)));
+                    }
+                }
+                break;
+            case NO_RESTRICTION:
+                break;
+        }
+
+        for (Filter.PropertyRestriction pr : filter.getPropertyRestrictions()) 
{
+            String name = pr.propertyName;
+
+            if (QueryConstants.REP_EXCERPT.equals(name) || 
QueryConstants.OAK_SCORE_EXPLANATION.equals(name)
+                    || QueryConstants.REP_FACET.equals(name)) {
+                continue;
+            }
+
+            if (QueryConstants.RESTRICTION_LOCAL_NAME.equals(name)) {
+                if (planResult.evaluateNodeNameRestriction()) {
+                    QueryBuilder q = nodeName(pr);
+                    if (q != null) {
+                        queries.add(q);
+                    }
+                }
+                continue;
+            }
+
+            if (pr.first != null && pr.first.equals(pr.last) && 
pr.firstIncluding && pr.lastIncluding) {
+                String first = pr.first.getValue(Type.STRING);
+                first = first.replace("\\", "");
+                if (JCR_PATH.equals(name)) {
+                    queries.add(newPathQuery(first));
+                    continue;
+                } else if ("*".equals(name)) {
+                    //TODO Revisit reference constraint. For performant impl
+                    //references need to be indexed in a different manner
+                    queries.add(referenceConstraint(first));
+                    continue;
+                }
+            }
+
+            PropertyDefinition pd = planResult.getPropDefn(pr);
+            if (pd == null) {
+                continue;
+            }
+
+            QueryBuilder q = createQuery(planResult.getPropertyName(pr), pr, 
pd);
+            if (q != null) {
+                queries.add(q);
+            }
+        }
+        return queries;
+    }
+
+    private static QueryBuilder 
nodeTypeConstraints(IndexDefinition.IndexingRule defn, Filter filter) {
+        final BoolQueryBuilder bq = boolQuery();
+        PropertyDefinition primaryType = defn.getConfig(JCR_PRIMARYTYPE);
+        //TODO OAK-2198 Add proper nodeType query support
+
+        if (primaryType != null && primaryType.propertyIndex) {
+            for (String type : filter.getPrimaryTypes()) {
+                bq.should(newNodeTypeQuery(type));
+            }
+        }
+
+        PropertyDefinition mixinType = defn.getConfig(JCR_MIXINTYPES);
+        if (mixinType != null && mixinType.propertyIndex) {
+            for (String type : filter.getMixinTypes()) {
+                bq.should(newMixinTypeQuery(type));
+            }
+        }
+
+        return bq;
+    }
+
+    private static QueryBuilder nodeName(Filter.PropertyRestriction pr) {
+        String first = pr.first != null ? pr.first.getValue(Type.STRING) : 
null;
+        if (pr.first != null && pr.first.equals(pr.last) && pr.firstIncluding
+                && pr.lastIncluding) {
+            // [property]=[value]
+            return termQuery(FieldNames.NODE_NAME, first);
+        }
+
+        if (pr.isLike) {
+            return like(FieldNames.NODE_NAME, first);
+        }
+
+        throw new IllegalStateException("For nodeName queries only EQUALS and 
LIKE are supported " + pr);
+    }
+
+    private static QueryBuilder like(String name, String first) {
+        first = first.replace('%', WildcardQuery.WILDCARD_STRING);
+        first = first.replace('_', WildcardQuery.WILDCARD_CHAR);
+
+        int indexOfWS = first.indexOf(WildcardQuery.WILDCARD_STRING);
+        int indexOfWC = first.indexOf(WildcardQuery.WILDCARD_CHAR);
+        int len = first.length();
+
+        if (indexOfWS == len || indexOfWC == len) {
+            // remove trailing "*" for prefix query
+            first = first.substring(0, first.length() - 1);
+            if (JCR_PATH.equals(name)) {
+                return newPrefixPathQuery(first);
+            } else {
+                return newPrefixQuery(name, first);
+            }
+        } else {
+            if (JCR_PATH.equals(name)) {
+                return newWildcardPathQuery(first);
+            } else {
+                return newWildcardQuery(name, first);
+            }
+        }
+    }
+
+    private static QueryBuilder referenceConstraint(String uuid) {
+        // TODO: this seems very bad as a query - do we really want to support 
it. In fact, is it even used?
+        // reference query
+        return QueryBuilders.multiMatchQuery(uuid);
+    }
+
+    private static QueryBuilder tokenToQuery(String text, String fieldName, 
PlanResult pr) {
+        QueryBuilder ret;
+        IndexDefinition.IndexingRule indexingRule = pr.indexingRule;
+        //Expand the query on fulltext field
+        if (FieldNames.FULLTEXT.equals(fieldName) &&
+                !indexingRule.getNodeScopeAnalyzedProps().isEmpty()) {
+            BoolQueryBuilder in = boolQuery();
+            for (PropertyDefinition pd : 
indexingRule.getNodeScopeAnalyzedProps()) {
+                QueryBuilder q = matchQuery(pd.name, text).boost(pd.boost);
+                in.should(q);
+            }
+
+            //Add the query for actual fulltext field also. That query would 
not be boosted
+            // TODO: do we need this if all the analyzed fields are queried?
+            ret = in.should(matchQuery(fieldName, text));
+        } else {
+            ret = matchQuery(fieldName, text);
+        }
+
+        return ret;
+    }
+
+    private QueryBuilder createQuery(String propertyName, 
Filter.PropertyRestriction pr,
+                                     PropertyDefinition defn) {
+        int propType = FulltextIndex.determinePropertyType(defn, pr);
+
+        if (pr.isNullRestriction()) {
+            return newNullPropQuery(defn.name);
+        }
+
+        //If notNullCheckEnabled explicitly enabled use the simple TermQuery
+        //otherwise later fallback to range query
+        if (pr.isNotNullRestriction() && defn.notNullCheckEnabled) {
+            return newNotNullPropQuery(defn.name);
+        }
+
+        final String field = 
elasticIndexDefinition.getElasticKeyword(propertyName);
+
+        QueryBuilder in;
+        switch (propType) {
+            case PropertyType.DATE: {
+                in = newPropertyRestrictionQuery(field, pr, value -> 
parse(value.getValue(Type.DATE)).getTime());
+                break;
+            }
+            case PropertyType.DOUBLE: {
+                in = newPropertyRestrictionQuery(field, pr, value -> 
value.getValue(Type.DOUBLE));
+                break;
+            }
+            case PropertyType.LONG: {
+                in = newPropertyRestrictionQuery(field, pr, value -> 
value.getValue(Type.LONG));
+                break;
+            }
+            default: {
+                if (pr.isLike) {
+                    return like(propertyName, pr.first.getValue(Type.STRING));
+                }
+
+                //TODO Confirm that all other types can be treated as string
+                in = newPropertyRestrictionQuery(field, pr, value -> 
value.getValue(Type.STRING));
+            }
+        }
+
+        if (in != null) {
+            return in;
+        }
+
+        throw new IllegalStateException("PropertyRestriction not handled " + 
pr + " for index " + defn);
+    }
+
+    private static String getLuceneFieldName(@Nullable String p, PlanResult 
pr) {
+        if (p == null) {
+            return FieldNames.FULLTEXT;
+        }
+
+        if (pr.isPathTransformed()) {
+            p = PathUtils.getName(p);
+        }
+
+        if ("*".equals(p)) {
+            p = FieldNames.FULLTEXT;
+        }
+        return p;
+    }
+}

Propchange: 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticRequestHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResponseHandler.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResponseHandler.java?rev=1878762&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResponseHandler.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResponseHandler.java
 Fri Jun 12 05:39:42 2020
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.elastic.query.async;
+
+import org.apache.jackrabbit.oak.plugins.index.search.FieldNames;
+import 
org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndexPlanner;
+import 
org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndexPlanner.PlanResult;
+import org.elasticsearch.search.SearchHit;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Class to process Elastic response objects.
+ */
+public class ElasticResponseHandler {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ElasticResponseHandler.class);
+
+    private final PlanResult planResult;
+
+    ElasticResponseHandler(@NotNull FulltextIndexPlanner.PlanResult 
planResult) {
+        this.planResult = planResult;
+    }
+
+    public String getPath(SearchHit hit) {
+        final Map<String, Object> sourceMap = hit.getSourceAsMap();
+        String path = (String) sourceMap.get(FieldNames.PATH);
+
+        if ("".equals(path)) {
+            path = "/";
+        }
+        if (planResult.isPathTransformed()) {
+            String originalPath = path;
+            path = planResult.transformPath(path);
+
+            if (path == null) {
+                LOG.trace("Ignoring path {} : Transformation returned null", 
originalPath);
+                return null;
+            }
+        }
+
+        return path;
+    }
+}

Propchange: 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResponseHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResponseListener.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResponseListener.java?rev=1878762&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResponseListener.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResponseListener.java
 Fri Jun 12 05:39:42 2020
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.elastic.query.async;
+
+import org.apache.jackrabbit.oak.plugins.index.search.FieldNames;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.aggregations.Aggregations;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * Generic listener of Elastic response
+ */
+public interface ElasticResponseListener {
+
+    Set<String> DEFAULT_SOURCE_FIELDS = Collections.singleton(FieldNames.PATH);
+
+    /**
+     * Returns the source fields this listener is interested on
+     *
+     * @return the list of fields to listen to (only PATH as default)
+     */
+    default Set<String> sourceFields() {
+        return DEFAULT_SOURCE_FIELDS;
+    }
+
+    /**
+     * This method is invoked when there is no more data to process.
+     */
+    void endData();
+
+    /**
+     * {@link ElasticResponseListener} extension to subscribe on {@link 
SearchHit} events
+     */
+    interface SearchHitListener extends ElasticResponseListener {
+
+        /**
+         * Returns {@code true} if the listener is interested in the entire 
result set
+         */
+        default boolean isFullScan() {
+            return false;
+        }
+
+        /**
+         * This method is invoked at the beginning of the listener lifecycle 
to notify the number of hits this
+         * listener could receive
+         * @param totalHits the total number of hits
+         */
+        default void startData(long totalHits) { /*empty*/ }
+
+        /**
+         * This method is called for each {@link SearchHit} retrieved
+         */
+        void on(SearchHit searchHit);
+    }
+
+    /**
+     * {@link ElasticResponseListener} extension to subscribe on {@link 
Aggregations} events
+     */
+    interface AggregationListener extends ElasticResponseListener {
+
+        /**
+         * This method is called once when the {@link Aggregations} are 
retrieved
+         * @param aggregations the {@link Aggregations} or {@code null} if 
there are no results
+         */
+        void on(Aggregations aggregations);
+    }
+}

Propchange: 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResponseListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java?rev=1878762&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java
 Fri Jun 12 05:39:42 2020
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.elastic.query.async;
+
+import org.apache.jackrabbit.oak.plugins.index.elastic.query.ElasticIndexNode;
+import 
org.apache.jackrabbit.oak.plugins.index.elastic.query.async.facets.ElasticFacetProvider;
+import org.apache.jackrabbit.oak.plugins.index.search.FieldNames;
+import 
org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndex.FulltextResultRow;
+import 
org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndexPlanner;
+import 
org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndexPlanner.PlanResult;
+import org.apache.jackrabbit.oak.plugins.index.search.util.LMSEstimator;
+import org.apache.jackrabbit.oak.spi.query.QueryIndex;
+import org.apache.jackrabbit.oak.spi.query.QueryIndex.IndexPlan;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.aggregations.Aggregations;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.sort.SortBuilders;
+import org.elasticsearch.search.sort.SortOrder;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+
+/**
+ * Class to iterate over Elastic results of a given {@link IndexPlan}.
+ * The results are produced asynchronously into an internal unbounded {@link 
BlockingQueue}. To avoid too many calls to
+ * Elastic the results are loaded in chunks (using search_after strategy) and 
loaded only when needed.
+ */
+public class ElasticResultRowAsyncIterator implements 
Iterator<FulltextResultRow>, ElasticResponseListener.SearchHitListener {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ElasticResultRowAsyncIterator.class);
+    // this is an internal special message to notify the consumer the result 
set has been completely returned
+    private static final FulltextResultRow POISON_PILL =
+            new FulltextResultRow("___OAK_POISON_PILL___", 0d, 
Collections.emptyMap(), null, null);
+
+    private final BlockingQueue<FulltextResultRow> queue = new 
LinkedBlockingQueue<>();
+
+    private final ElasticIndexNode indexNode;
+    private final IndexPlan indexPlan;
+    private final PlanResult planResult;
+    private final Predicate<String> rowInclusionPredicate;
+    private final LMSEstimator estimator;
+
+    private final ElasticQueryScanner elasticQueryScanner;
+    private final ElasticRequestHandler elasticRequestHandler;
+    private final ElasticResponseHandler elasticResponseHandler;
+    private final ElasticFacetProvider elasticFacetProvider;
+
+    private FulltextResultRow nextRow;
+
+    public ElasticResultRowAsyncIterator(@NotNull ElasticIndexNode indexNode,
+                                         @NotNull QueryIndex.IndexPlan 
indexPlan,
+                                         @NotNull 
FulltextIndexPlanner.PlanResult planResult,
+                                         Predicate<String> 
rowInclusionPredicate,
+                                         LMSEstimator estimator) {
+        this.indexNode = indexNode;
+        this.indexPlan = indexPlan;
+        this.planResult = planResult;
+        this.rowInclusionPredicate = rowInclusionPredicate;
+        this.estimator = estimator;
+
+        this.elasticRequestHandler = new ElasticRequestHandler(indexPlan, 
planResult);
+        this.elasticResponseHandler = new ElasticResponseHandler(planResult);
+        this.elasticFacetProvider = initFacetProvider();
+        this.elasticQueryScanner = initScanner();
+    }
+
+    @Override
+    public boolean hasNext() {
+        if (queue.isEmpty()) {
+            // this triggers, when needed, the scan of the next results chunk
+            elasticQueryScanner.scan();
+        }
+        try {
+            nextRow = queue.take();
+        } catch (InterruptedException e) {
+            throw new IllegalStateException("Error reading next result from 
Elastic", e);
+        }
+        if (POISON_PILL.path.equals(nextRow.path)) {
+            nextRow = null;
+        }
+        return nextRow != null;
+    }
+
+    @Override
+    public FulltextResultRow next() {
+        return nextRow;
+    }
+
+    @Override
+    public void on(SearchHit searchHit) {
+        final String path = elasticResponseHandler.getPath(searchHit);
+        if (path != null) {
+            if (rowInclusionPredicate != null && 
!rowInclusionPredicate.test(path)) {
+                LOG.trace("Path {} not included because of hierarchy inclusion 
rules", path);
+                return;
+            }
+            try {
+                queue.put(new FulltextResultRow(path, searchHit.getScore(), 
null, elasticFacetProvider, null));
+            } catch (InterruptedException e) {
+                throw new IllegalStateException("Error producing results into 
the iterator queue", e);
+            }
+        }
+    }
+
+    @Override
+    public void endData() {
+        try {
+            queue.put(POISON_PILL);
+        } catch (InterruptedException e) {
+            throw new IllegalStateException("Error inserting poison pill into 
the iterator queue", e);
+        }
+    }
+
+    private ElasticFacetProvider initFacetProvider() {
+        return elasticRequestHandler.requiresFacets() ?
+                ElasticFacetProvider.getProvider(
+                        
planResult.indexDefinition.getSecureFacetConfiguration(),
+                        elasticRequestHandler, elasticResponseHandler,
+                        indexPlan.getFilter()::isAccessible
+                ) : null;
+    }
+
+    private ElasticQueryScanner initScanner() {
+        List<ElasticResponseListener> listeners = new ArrayList<>();
+        // TODO: we could avoid to register this listener when the client is 
interested in facets only. It would save space and time
+        listeners.add(this);
+        if (elasticFacetProvider != null) {
+            listeners.add(elasticFacetProvider);
+        }
+
+        return new ElasticQueryScanner(elasticRequestHandler, listeners);
+    }
+
+    /**
+     * Scans Elastic results asynchronously and notify listeners.
+     */
+    class ElasticQueryScanner implements ActionListener<SearchResponse> {
+
+        private static final int SMALL_RESULT_SET_SIZE = 10;
+        private static final int MEDIUM_RESULT_SET_SIZE = 100;
+        private static final int LARGE_RESULT_SET_SIZE = 1000;
+
+        private final Set<ElasticResponseListener> allListeners = new 
HashSet<>();
+        private final List<SearchHitListener> searchHitListeners = new 
ArrayList<>();
+        private final List<AggregationListener> aggregationListeners = new 
ArrayList<>();
+
+        private final QueryBuilder query;
+        private final String[] sourceFields;
+
+        // concurrent data structures to coordinate chunks loading
+        private final AtomicBoolean anyDataLeft = new AtomicBoolean(false);
+
+        private int scannedRows = 0;
+        private boolean firstRequest = true;
+        private boolean fullScan = false;
+
+        // reference to the last document sort values for search_after queries
+        private Object[] lastHitSortValues;
+
+        // Semaphore to guarantee only one in-flight request to Elastic
+        private final Semaphore semaphore = new Semaphore(1);
+
+        ElasticQueryScanner(ElasticRequestHandler requestHandler,
+                            List<ElasticResponseListener> listeners) {
+            this.query = requestHandler.build();
+
+            final Set<String> sourceFieldsSet = new HashSet<>();
+            final AtomicBoolean needsAggregations = new AtomicBoolean(false);
+            final Consumer<ElasticResponseListener> register = (listener) -> {
+                allListeners.add(listener);
+                sourceFieldsSet.addAll(listener.sourceFields());
+                if (listener instanceof SearchHitListener) {
+                    SearchHitListener searchHitListener = (SearchHitListener) 
listener;
+                    searchHitListeners.add(searchHitListener);
+                    if (searchHitListener.isFullScan()) {
+                        fullScan = true;
+                    }
+                }
+                if (listener instanceof AggregationListener) {
+                    aggregationListeners.add((AggregationListener) listener);
+                    needsAggregations.set(true);
+                }
+            };
+
+            listeners.forEach(register);
+            this.sourceFields = sourceFieldsSet.toArray(new String[0]);
+
+            final SearchSourceBuilder searchSourceBuilder = 
SearchSourceBuilder.searchSource()
+                    .query(query)
+                    // use a smaller size when the client asks for facets. 
This improves performance
+                    // when the client is only interested in insecure facets
+                    .size(needsAggregations.get() ? SMALL_RESULT_SET_SIZE : 
MEDIUM_RESULT_SET_SIZE)
+                    .fetchSource(sourceFields, null)
+                    // TODO: this needs to be moved in the requestHandler when 
sorting will be properly supported
+                    
.sort(SortBuilders.fieldSort("_score").order(SortOrder.DESC))
+                    
.sort(SortBuilders.fieldSort(FieldNames.PATH).order(SortOrder.ASC)); // 
tie-breaker
+
+            if (needsAggregations.get()) {
+                
requestHandler.aggregations().forEach(searchSourceBuilder::aggregation);
+            }
+
+            final SearchRequest searchRequest = new 
SearchRequest(indexNode.getDefinition().getRemoteIndexAlias())
+                    .source(searchSourceBuilder);
+
+            LOG.trace("Kicking initial search for query {}", 
searchSourceBuilder);
+            semaphore.tryAcquire();
+            indexNode.getConnection().getClient().searchAsync(searchRequest, 
RequestOptions.DEFAULT, this);
+        }
+
+        /**
+         * Handle the response action notifying the registered listeners. 
Depending on the listeners' configuration
+         * it could keep loading chunks or wait for a {@code #scan} call to 
resume scanning.
+         */
+        @Override
+        public void onResponse(SearchResponse searchResponse) {
+            final SearchHit[] searchHits = searchResponse.getHits().getHits();
+            if (searchHits != null && searchHits.length > 0) {
+                long totalHits = searchResponse.getHits().getTotalHits().value;
+                LOG.debug("Processing search response that took {} to read 
{}/{} docs",
+                        searchResponse.getTook(), searchHits.length, 
totalHits);
+                lastHitSortValues = searchHits[searchHits.length - 
1].getSortValues();
+                scannedRows += searchHits.length;
+                anyDataLeft.set(totalHits > scannedRows);
+                estimator.update(indexPlan.getFilter(), totalHits);
+
+                // now that we got the last hit we can release the semaphore 
to potentially unlock other requests
+                semaphore.release();
+
+                if (firstRequest) {
+                    for (SearchHitListener l : searchHitListeners) {
+                        l.startData(totalHits);
+                    }
+
+                    if (!aggregationListeners.isEmpty()) {
+                        Aggregations aggregations = 
searchResponse.getAggregations();
+                        LOG.trace("Emitting aggregations {}", aggregations);
+                        for (AggregationListener l : aggregationListeners) {
+                            l.on(aggregations);
+                        }
+                    }
+
+                    firstRequest = false;
+                }
+
+                LOG.trace("Emitting {} search hits, for a total of {} scanned 
results", searchHits.length, scannedRows);
+                for (SearchHit hit : searchHits) {
+                    for (SearchHitListener l : searchHitListeners) {
+                        l.on(hit);
+                    }
+                }
+
+                if (!anyDataLeft.get()) {
+                    LOG.trace("No data left: closing scanner, notifying 
listeners");
+                    close();
+                } else if (fullScan) {
+                    scan();
+                }
+            } else {
+                LOG.trace("No results: closing scanner, notifying listeners");
+                close();
+            }
+        }
+
+        @Override
+        public void onFailure(Exception e) {
+            LOG.error("Error retrieving data from Elastic: closing scanner, 
notifying listeners", e);
+            // closing scanner immediately after a failure avoiding them to 
hang (potentially) forever
+            close();
+        }
+
+        /**
+         * Triggers a scan of a new chunk of the result set, if needed.
+         */
+        public void scan() {
+            if (semaphore.tryAcquire() && anyDataLeft.get()) {
+                final SearchSourceBuilder searchSourceBuilder = 
SearchSourceBuilder.searchSource()
+                        .query(query)
+                        .size(LARGE_RESULT_SET_SIZE)
+                        .fetchSource(sourceFields, null)
+                        .searchAfter(lastHitSortValues)
+                        // TODO: this needs to be moved in the requestHandler 
when sorting will be properly supported
+                        
.sort(SortBuilders.fieldSort("_score").order(SortOrder.DESC))
+                        
.sort(SortBuilders.fieldSort(FieldNames.PATH).order(SortOrder.ASC)); // 
tie-breaker
+
+                final SearchRequest searchRequest = new 
SearchRequest(indexNode.getDefinition().getRemoteIndexAlias())
+                        .source(searchSourceBuilder);
+                LOG.trace("Kicking new search after query {}", 
searchRequest.source());
+
+                
indexNode.getConnection().getClient().searchAsync(searchRequest, 
RequestOptions.DEFAULT, this);
+            } else {
+                LOG.trace("Scanner is closing or still processing data from 
the previous scan");
+            }
+        }
+
+        // close all listeners
+        private void close() {
+            semaphore.release();
+            for (ElasticResponseListener l : allListeners) {
+                l.endData();
+            }
+        }
+    }
+}

Propchange: 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticFacetProvider.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticFacetProvider.java?rev=1878762&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticFacetProvider.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticFacetProvider.java
 Fri Jun 12 05:39:42 2020
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.elastic.query.async.facets;
+
+import 
org.apache.jackrabbit.oak.plugins.index.elastic.query.async.ElasticRequestHandler;
+import 
org.apache.jackrabbit.oak.plugins.index.elastic.query.async.ElasticResponseHandler;
+import 
org.apache.jackrabbit.oak.plugins.index.elastic.query.async.ElasticResponseListener;
+import 
org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition.SecureFacetConfiguration;
+import org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndex;
+
+import java.util.function.Predicate;
+
+/**
+ * Provider of facets through an {@link ElasticResponseListener}
+ */
+public interface ElasticFacetProvider extends FulltextIndex.FacetProvider, 
ElasticResponseListener {
+
+    /**
+     * Returns the appropriate provider based on the {@link 
SecureFacetConfiguration}
+     * @param facetConfiguration the {@link SecureFacetConfiguration} to 
extract facet options
+     * @param requestHandler the {@link ElasticRequestHandler} to perform 
actions at request time
+     * @param responseHandler the {@link ElasticResponseHandler} to decode 
responses
+     * @param isAccessible a {@link Predicate} to check if a node is accessible
+     *
+     * @return an {@link ElasticFacetProvider} based on {@link 
SecureFacetConfiguration#getMode()} with
+     * {@link SecureFacetConfiguration.MODE#SECURE} as default
+     */
+    static ElasticFacetProvider getProvider(
+            SecureFacetConfiguration facetConfiguration,
+            ElasticRequestHandler requestHandler,
+            ElasticResponseHandler responseHandler,
+            Predicate<String> isAccessible
+    ) {
+        final ElasticFacetProvider facetProvider;
+        switch (facetConfiguration.getMode()) {
+            case INSECURE:
+                facetProvider = new ElasticInsecureFacetAsyncProvider();
+                break;
+            case STATISTICAL:
+                facetProvider = new ElasticStatisticalFacetAsyncProvider(
+                        requestHandler, responseHandler, isAccessible,
+                        facetConfiguration.getRandomSeed(), 
facetConfiguration.getStatisticalFacetSampleSize()
+                );
+                break;
+            case SECURE:
+            default:
+                facetProvider = new 
ElasticSecureFacetAsyncProvider(requestHandler, responseHandler, isAccessible);
+
+        }
+        return facetProvider;
+    }
+
+}

Propchange: 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticFacetProvider.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticInsecureFacetAsyncProvider.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticInsecureFacetAsyncProvider.java?rev=1878762&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticInsecureFacetAsyncProvider.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticInsecureFacetAsyncProvider.java
 Fri Jun 12 05:39:42 2020
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.elastic.query.async.facets;
+
+import 
org.apache.jackrabbit.oak.plugins.index.elastic.query.async.ElasticResponseListener;
+import org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndex;
+import org.elasticsearch.search.aggregations.Aggregations;
+import org.elasticsearch.search.aggregations.bucket.terms.Terms;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An {@link ElasticFacetProvider} that subscribes to Elastic Aggregation 
events.
+ * The provider is async: {@code getFacets} waits until the aggregation is 
read or for a max of 15 seconds. In the latter
+ * case, an {@link IllegalStateException} is thrown.
+ */
+class ElasticInsecureFacetAsyncProvider implements ElasticFacetProvider, 
ElasticResponseListener.AggregationListener {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ElasticInsecureFacetAsyncProvider.class);
+
+    private Aggregations aggregations;
+
+    private final CountDownLatch latch = new CountDownLatch(1);
+
+    @Override
+    public List<FulltextIndex.Facet> getFacets(int numberOfFacets, String 
columnName) {
+        LOG.trace("Requested facets for {} - Latch count: {}", columnName, 
latch.getCount());
+        try {
+            latch.await(15, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            throw new IllegalStateException("Error while waiting for facets", 
e);
+        }
+        LOG.trace("Reading facets for {} from aggregations {}", columnName, 
aggregations.asMap());
+        if (aggregations != null) {
+            final String facetProp = FulltextIndex.parseFacetField(columnName);
+            Terms terms = aggregations.get(facetProp);
+            List<FulltextIndex.Facet> facets = new 
ArrayList<>(terms.getBuckets().size());
+            for (Terms.Bucket bucket : terms.getBuckets()) {
+                facets.add(new FulltextIndex.Facet(bucket.getKeyAsString(), 
(int) bucket.getDocCount()));
+            }
+            return facets;
+        } else return null;
+    }
+
+    @Override
+    public void on(Aggregations aggregations) {
+        this.aggregations = aggregations;
+        this.endData();
+    }
+
+    @Override
+    public void endData() {
+        latch.countDown();
+    }
+}

Propchange: 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticInsecureFacetAsyncProvider.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticSecureFacetAsyncProvider.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticSecureFacetAsyncProvider.java?rev=1878762&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticSecureFacetAsyncProvider.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticSecureFacetAsyncProvider.java
 Fri Jun 12 05:39:42 2020
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.elastic.query.async.facets;
+
+import 
org.apache.jackrabbit.oak.plugins.index.elastic.query.async.ElasticRequestHandler;
+import 
org.apache.jackrabbit.oak.plugins.index.elastic.query.async.ElasticResponseHandler;
+import 
org.apache.jackrabbit.oak.plugins.index.elastic.query.async.ElasticResponseListener;
+import org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndex;
+import org.elasticsearch.search.SearchHit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+/**
+ * An {@link ElasticFacetProvider} that subscribes to Elastic SearchHit events 
to return only accessible facets.
+ */
+class ElasticSecureFacetAsyncProvider implements ElasticFacetProvider, 
ElasticResponseListener.SearchHitListener {
+
+    protected static final Logger LOG = 
LoggerFactory.getLogger(ElasticSecureFacetAsyncProvider.class);
+
+    protected final Set<String> facetFields;
+    private final Map<String, Map<String, Integer>> facetsMap = new 
ConcurrentHashMap<>();
+    private Map<String, List<FulltextIndex.Facet>> facets;
+    protected final ElasticResponseHandler elasticResponseHandler;
+    protected final Predicate<String> isAccessible;
+
+    private final CountDownLatch latch = new CountDownLatch(1);
+
+    ElasticSecureFacetAsyncProvider(
+            ElasticRequestHandler elasticRequestHandler,
+            ElasticResponseHandler elasticResponseHandler,
+            Predicate<String> isAccessible
+    ) {
+        this.elasticResponseHandler = elasticResponseHandler;
+        this.isAccessible = isAccessible;
+        this.facetFields = 
elasticRequestHandler.facetFields().collect(Collectors.toSet());
+    }
+
+    @Override
+    public Set<String> sourceFields() {
+        return facetFields;
+    }
+
+    @Override
+    public boolean isFullScan() {
+        return true;
+    }
+
+    @Override
+    public void on(SearchHit searchHit) {
+        final String path = elasticResponseHandler.getPath(searchHit);
+        if (path != null && isAccessible.test(path)) {
+            Map<String, Object> sourceMap = searchHit.getSourceAsMap();
+            for (String field: facetFields) {
+                Object value = sourceMap.get(field);
+                if (value != null) {
+                    facetsMap.compute(field, (column, facetValues) -> {
+                        if (facetValues == null) {
+                            Map<String, Integer> values = new HashMap<>();
+                            values.put(value.toString(), 1);
+                            return values;
+                        } else {
+                            facetValues.merge(value.toString(), 1, 
Integer::sum);
+                            return facetValues;
+                        }
+                    });
+                }
+            }
+        }
+    }
+
+    @Override
+    public void endData() {
+        // create Facet objects, order by count (desc) and then by label (asc)
+        facets = facetsMap.entrySet()
+                .stream()
+                .collect(Collectors.toMap
+                        (Map.Entry::getKey, x -> x.getValue().entrySet()
+                                .stream()
+                                .map(e -> new FulltextIndex.Facet(e.getKey(), 
e.getValue()))
+                                .sorted((f1, f2) -> {
+                                    int f1Count = f1.getCount();
+                                    int f2Count = f2.getCount();
+                                    if (f1Count == f2Count) {
+                                        return 
f1.getLabel().compareTo(f2.getLabel());
+                                    } else return f2Count - f1Count;
+                                })
+                                .collect(Collectors.toList())
+                        )
+                );
+        LOG.trace("End data {}", facetsMap);
+        latch.countDown();
+    }
+
+    @Override
+    public List<FulltextIndex.Facet> getFacets(int numberOfFacets, String 
columnName) {
+        LOG.trace("Requested facets for {} - Latch count: {}", columnName, 
latch.getCount());
+        try {
+            latch.await(15, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            throw new IllegalStateException("Error while waiting for facets", 
e);
+        }
+        LOG.trace("Reading facets for {} from {}", columnName, facetsMap);
+        return facets.get(FulltextIndex.parseFacetField(columnName));
+    }
+}

Propchange: 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticSecureFacetAsyncProvider.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticStatisticalFacetAsyncProvider.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticStatisticalFacetAsyncProvider.java?rev=1878762&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticStatisticalFacetAsyncProvider.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticStatisticalFacetAsyncProvider.java
 Fri Jun 12 05:39:42 2020
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.elastic.query.async.facets;
+
+import 
org.apache.jackrabbit.oak.plugins.index.elastic.query.async.ElasticRequestHandler;
+import 
org.apache.jackrabbit.oak.plugins.index.elastic.query.async.ElasticResponseHandler;
+import 
org.apache.jackrabbit.oak.plugins.index.elastic.query.async.ElasticResponseListener;
+import org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndex;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.aggregations.Aggregations;
+import org.elasticsearch.search.aggregations.bucket.terms.Terms;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+
+/**
+ * An {@link ElasticSecureFacetAsyncProvider} extension that subscribes also 
on Elastic Aggregation events.
+ * SearchHit events are sampled and then used to adjust facets coming from 
Aggregations in order to minimize
+ * access checks. This provider could improve facets performance but only when 
the result set is quite big.
+ */
+public class ElasticStatisticalFacetAsyncProvider
+        extends ElasticSecureFacetAsyncProvider
+        implements ElasticResponseListener.AggregationListener {
+
+    private final int sampleSize;
+    private long totalHits;
+
+    private final Random rGen;
+    private int sampled = 0;
+    private int seen = 0;
+    private long accessibleCount = 0;
+
+    private final Map<String, List<FulltextIndex.Facet>> facetMap = new 
HashMap<>();
+
+    private final CountDownLatch latch = new CountDownLatch(1);
+
+    ElasticStatisticalFacetAsyncProvider(ElasticRequestHandler 
elasticRequestHandler,
+                                         ElasticResponseHandler 
elasticResponseHandler,
+                                         Predicate<String> isAccessible,
+                                         long randomSeed, int sampleSize) {
+        super(elasticRequestHandler, elasticResponseHandler, isAccessible);
+        this.sampleSize = sampleSize;
+        this.rGen = new Random(randomSeed);
+    }
+
+    @Override
+    public void startData(long totalHits) {
+        this.totalHits = totalHits;
+    }
+
+    @Override
+    public void on(SearchHit searchHit) {
+        if (totalHits < sampleSize) {
+            super.on(searchHit);
+        } else {
+            if (sampleSize == sampled) {
+                return;
+            }
+            int r = rGen.nextInt((int) (totalHits - seen)) + 1;
+            seen++;
+
+            if (r <= sampleSize - sampled) {
+                sampled++;
+                final String path = elasticResponseHandler.getPath(searchHit);
+                if (path != null && isAccessible.test(path)) {
+                    accessibleCount++;
+                }
+            }
+        }
+    }
+
+    @Override
+    public void on(Aggregations aggregations) {
+        for (String field: facetFields) {
+            Terms terms = aggregations.get(field);
+            List<? extends Terms.Bucket> buckets = terms.getBuckets();
+            final List<FulltextIndex.Facet> facetList = new ArrayList<>();
+            for (Terms.Bucket bucket : buckets) {
+                facetList.add(new FulltextIndex.Facet(bucket.getKeyAsString(), 
(int) bucket.getDocCount()));
+            }
+            facetMap.put(field, facetList);
+        }
+    }
+
+    @Override
+    public void endData() {
+        if (totalHits < sampleSize) {
+            super.endData();
+        } else {
+            for (String facet: facetMap.keySet()) {
+                facetMap.compute(facet, (s, facets1) -> 
updateLabelAndValueIfRequired(facets1));
+            }
+            latch.countDown();
+        }
+    }
+
+    @Override
+    public List<FulltextIndex.Facet> getFacets(int numberOfFacets, String 
columnName) {
+        if (totalHits < sampleSize) {
+            return super.getFacets(numberOfFacets, columnName);
+        } else {
+            LOG.trace("Requested facets for {} - Latch count: {}", columnName, 
latch.getCount());
+            try {
+                latch.await(15, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                throw new IllegalStateException("Error while waiting for 
facets", e);
+            }
+            LOG.trace("Reading facets for {} from {}", columnName, facetMap);
+            return facetMap.get(FulltextIndex.parseFacetField(columnName));
+        }
+    }
+
+    private List<FulltextIndex.Facet> 
updateLabelAndValueIfRequired(List<FulltextIndex.Facet> labelAndValues) {
+        if (accessibleCount < sampleSize) {
+            int numZeros = 0;
+            List<FulltextIndex.Facet> newValues;
+            {
+                List<FulltextIndex.Facet> proportionedLVs = new LinkedList<>();
+                for (FulltextIndex.Facet labelAndValue : labelAndValues) {
+                    long count = labelAndValue.getCount() * accessibleCount / 
sampleSize;
+                    if (count == 0) {
+                        numZeros++;
+                    }
+                    proportionedLVs.add(new 
FulltextIndex.Facet(labelAndValue.getLabel(), Math.toIntExact(count)));
+                }
+                labelAndValues = proportionedLVs;
+            }
+            if (numZeros > 0) {
+                newValues = new LinkedList<>();
+                for (FulltextIndex.Facet lv : labelAndValues) {
+                    if (lv.getCount() > 0) {
+                        newValues.add(lv);
+                    }
+                }
+            } else {
+                newValues = labelAndValues;
+            }
+            return newValues;
+        } else {
+            return labelAndValues;
+        }
+    }
+}

Propchange: 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticStatisticalFacetAsyncProvider.java
------------------------------------------------------------------------------
    svn:eol-style = native


Reply via email to