This is an automated email from the ASF dual-hosted git repository.

fortino pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 12f787fcc4 OAK-11042: bump elastic 8.15.0 / lucene 9.11.1 (#1658)
12f787fcc4 is described below

commit 12f787fcc4606907182f3cbd186876fc554bb312
Author: Fabrizio Fortino <[email protected]>
AuthorDate: Thu Aug 22 15:39:59 2024 +0200

    OAK-11042: bump elastic 8.15.0 / lucene 9.11.1 (#1658)
    
    * OAK-11042: bump elastic 8.15.0 / lucene 9.11.1
    
    * OAK-11029: add workaround for elastic/elasticsearch-java#867
    
    * OAK-11029: better doc for workaround
    
    * OAK-11029: improve bulkIngester#close
    
    * OAK-11029: missing static modifier for 
ElasticBulkProcessorHandler#FAILED_DOC_COUNT_FOR_STATUS_NODE
---
 oak-search-elastic/pom.xml                         |  4 +-
 .../elastic/index/ElasticBulkProcessorHandler.java | 68 ++++++++++++++--------
 .../elastic/util/TermQueryBuilderFactory.java      | 18 +++---
 .../plugins/index/elastic/ElasticTestServer.java   |  4 +-
 4 files changed, 58 insertions(+), 36 deletions(-)

diff --git a/oak-search-elastic/pom.xml b/oak-search-elastic/pom.xml
index 3c6b1d3489..293faf6ce1 100644
--- a/oak-search-elastic/pom.xml
+++ b/oak-search-elastic/pom.xml
@@ -33,8 +33,8 @@
   <description>Oak Elasticsearch integration subproject</description>
 
   <properties>
-    
<elasticsearch.java.client.version>8.13.2</elasticsearch.java.client.version>
-    <lucene.version>9.10.0</lucene.version>
+    
<elasticsearch.java.client.version>8.15.0</elasticsearch.java.client.version>
+    <lucene.version>9.11.1</lucene.version>
   </properties>
 
   <build>
diff --git 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
index 5352340304..02897b558c 100644
--- 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
+++ 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
@@ -25,6 +25,7 @@ import 
co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
 import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
 import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
 import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
 import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
@@ -42,7 +43,9 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Phaser;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
@@ -50,7 +53,7 @@ import java.util.stream.Collectors;
 class ElasticBulkProcessorHandler {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ElasticBulkProcessorHandler.class);
-    private final int FAILED_DOC_COUNT_FOR_STATUS_NODE = 
Integer.getInteger("oak.failedDocStatusLimit", 10000);
+    private static final int FAILED_DOC_COUNT_FOR_STATUS_NODE = 
Integer.getInteger("oak.failedDocStatusLimit", 10000);
 
     private static final int BULK_PROCESSOR_CONCURRENCY =
         Integer.getInteger("oak.indexer.elastic.bulkProcessorConcurrency", 1);
@@ -85,6 +88,9 @@ class ElasticBulkProcessorHandler {
 
     protected long totalOperations;
 
+    // TODO: workaround for 
https://github.com/elastic/elasticsearch-java/pull/867 remove when fixed
+    private final ScheduledExecutorService scheduler;
+
     private ElasticBulkProcessorHandler(@NotNull ElasticConnection 
elasticConnection,
                                         @NotNull String indexName,
                                         @NotNull ElasticIndexDefinition 
indexDefinition,
@@ -95,6 +101,13 @@ class ElasticBulkProcessorHandler {
         this.indexDefinition = indexDefinition;
         this.definitionBuilder = definitionBuilder;
         this.waitForESAcknowledgement = waitForESAcknowledgement;
+        // TODO: workaround for 
https://github.com/elastic/elasticsearch-java/pull/867 remove when fixed
+        this.scheduler = 
Executors.newScheduledThreadPool(BULK_PROCESSOR_CONCURRENCY + 1, (r) -> {
+            Thread t = Executors.defaultThreadFactory().newThread(r);
+            t.setName("oak-bulk-ingester#");
+            t.setDaemon(true);
+            return t;
+        });
         this.bulkIngester = initBulkIngester();
     }
 
@@ -150,6 +163,10 @@ class ElasticBulkProcessorHandler {
             if (indexDefinition.bulkFlushIntervalMs > 0) {
                 b = b.flushInterval(indexDefinition.bulkFlushIntervalMs, 
TimeUnit.MILLISECONDS);
             }
+
+            // TODO: workaround for 
https://github.com/elastic/elasticsearch-java/pull/867 remove when fixed
+            b = b.scheduler(scheduler);
+
             return b.maxConcurrentRequests(BULK_PROCESSOR_CONCURRENCY);
         });
     }
@@ -183,35 +200,40 @@ class ElasticBulkProcessorHandler {
      * @throws IOException if an error happened while processing the bulk 
requests
      */
     public boolean close() throws IOException {
-        LOG.trace("Calling close on bulk ingester {}", bulkIngester);
-        bulkIngester.close();
-        LOG.trace("Bulk Ingester {} closed", bulkIngester);
+        try {
+            LOG.trace("Calling close on bulk ingester {}", bulkIngester);
+            bulkIngester.close();
+            LOG.trace("Bulk Ingester {} closed", bulkIngester);
 
-        // de-register main controller
-        int phase = phaser.arriveAndDeregister();
+            // de-register main controller
+            int phase = phaser.arriveAndDeregister();
 
-        if (totalOperations == 0) { // no need to invoke phaser await if we 
already know there were no operations
-            LOG.debug("No operations executed in this processor. Close 
immediately");
-            return false;
-        }
+            if (totalOperations == 0) { // no need to invoke phaser await if 
we already know there were no operations
+                LOG.debug("No operations executed in this processor. Close 
immediately");
+                return false;
+            }
 
-        if (waitForESAcknowledgement) {
-            try {
-                phaser.awaitAdvanceInterruptibly(phase, 
indexDefinition.bulkFlushIntervalMs * 5, TimeUnit.MILLISECONDS);
-            } catch (TimeoutException e) {
-                LOG.error("Error waiting for bulk requests to return", e);
-            } catch (InterruptedException e) {
-                LOG.warn("Interrupted while waiting for bulk processor to 
close", e);
-                Thread.currentThread().interrupt();  // restore interrupt 
status
+            if (waitForESAcknowledgement) {
+                try {
+                    phaser.awaitAdvanceInterruptibly(phase, 
indexDefinition.bulkFlushIntervalMs * 5, TimeUnit.MILLISECONDS);
+                } catch (TimeoutException e) {
+                    LOG.error("Error waiting for bulk requests to return", e);
+                } catch (InterruptedException e) {
+                    LOG.warn("Interrupted while waiting for bulk processor to 
close", e);
+                    Thread.currentThread().interrupt();  // restore interrupt 
status
+                }
             }
-        }
 
-        checkFailures();
+            checkFailures();
 
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("Bulk identifier -> update status = {}", updatesMap);
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Bulk identifier -> update status = {}", updatesMap);
+            }
+            return updatesMap.containsValue(Boolean.TRUE);
+        } finally {
+            // TODO: workaround for 
https://github.com/elastic/elasticsearch-java/pull/867 remove when fixed
+            new ExecutorCloser(scheduler).close();
         }
-        return updatesMap.containsValue(Boolean.TRUE);
     }
 
     private class OakBulkListener implements BulkListener<String> {
diff --git 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/util/TermQueryBuilderFactory.java
 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/util/TermQueryBuilderFactory.java
index ef93868b40..33743639a4 100644
--- 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/util/TermQueryBuilderFactory.java
+++ 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/util/TermQueryBuilderFactory.java
@@ -70,27 +70,27 @@ public class TermQueryBuilderFactory {
         int depth = PathUtils.getDepth(path) + planResult.getParentDepth() + 1;
         return Query.of(q -> q.term(t -> 
t.field(PATH_DEPTH).value(v->v.longValue(depth))));
     }
-    
+
     private static <R> Query newRangeQuery(String field, R first, R last, 
boolean firstIncluding,
-            boolean lastIncluding) {
+                                           boolean lastIncluding) {
 
-        return Query.of(fn -> fn.range(fnr -> {
+        return Query.of(fn -> fn.range(fnr -> fnr.date(date -> {
             if (first != null) {
                 if (firstIncluding) {
-                    fnr.gte(JsonData.of(first));
+                    date.gte(first.toString());
                 } else {
-                    fnr.gt(JsonData.of(first));
+                    date.gt(first.toString());
                 }
             }
             if (last != null) {
                 if (lastIncluding) {
-                    fnr.lte(JsonData.of(last));
+                    date.lte(last.toString());
                 } else {
-                    fnr.lt(JsonData.of(last));
+                    date.lt(last.toString());
                 }
             }
-            return fnr.field(field);
-        }));
+            return date.field(field);
+        })));
     }
 
     private static <R> FieldValue toFieldValue(R value) {
diff --git 
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticTestServer.java
 
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticTestServer.java
index 43d2157d83..1d42f8cfa9 100644
--- 
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticTestServer.java
+++ 
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticTestServer.java
@@ -49,9 +49,9 @@ public class ElasticTestServer implements AutoCloseable {
             "8.7.1.0", 
"80c8d34334b0cf4def79835ea6dab78b59ba9ee54c8f5f3cba0bde53123d7820",
             "8.10.4.0", 
"b2ae8faf1e272319594b4d47a72580fa4f61a5c11cbc8d3f13453fd34b153441",
             "8.11.0.0", 
"8d4d80b850c4da4da6dfe2d675b2e2355d2014307f8bdc54cc1b34323c81c7ae",
-            "8.11.1.0", 
"a00a920d4bc29f0deacde7c2ef3d3f70692b00b62bf7fb82b0fe18eeb1dafee9",
             "8.11.3.0", 
"1f14b496baf973fb5c64e77fc458d9814dd6905170d7b15350f9f1a009824f41",
-            "8.13.2.0", 
"586f553b109266d7996265f3f34a20914b569d494b49da2c0534428770e551f0");
+            "8.13.2.0", 
"586f553b109266d7996265f3f34a20914b569d494b49da2c0534428770e551f0",
+            "8.15.0.0", 
"6cbb54d68d654a3476df0b730856cfa3194bce5c6e1050a35e7a86ffec8a3e20");
 
     private static final ElasticTestServer SERVER = new ElasticTestServer();
     private static volatile ElasticsearchContainer CONTAINER;

Reply via email to