This is an automated email from the ASF dual-hosted git repository.

nfsantos pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fe6cef1166 OAK-11547 - Upgrade Elasticsearch client from 8.15.0 to 
8.17.2 (#2135)
fe6cef1166 is described below

commit fe6cef1166444322f05930b50dc022b75f7fd3e5
Author: Nuno Santos <[email protected]>
AuthorDate: Tue Mar 4 22:26:03 2025 +0100

    OAK-11547 - Upgrade Elasticsearch client from 8.15.0 to 8.17.2 (#2135)
---
 oak-run-elastic/pom.xml                            |  2 +-
 oak-search-elastic/pom.xml                         |  4 +-
 .../elastic/index/ElasticBulkProcessorHandler.java | 73 ++++++++--------------
 .../index/elastic/index/ElasticIndexHelper.java    |  2 +-
 .../elastic/index/ElasticIndexHelperTest.java      |  2 +-
 5 files changed, 32 insertions(+), 51 deletions(-)

diff --git a/oak-run-elastic/pom.xml b/oak-run-elastic/pom.xml
index 9fea031fe4..9d876a0790 100644
--- a/oak-run-elastic/pom.xml
+++ b/oak-run-elastic/pom.xml
@@ -42,7 +42,7 @@
         105 MB: Azure updates
         107 MB: RDB/Tomcat (OAK-10752)
         -->
-        <max.jar.size>113039632</max.jar.size>
+        <max.jar.size>115000000</max.jar.size>
                       
     </properties>
 
diff --git a/oak-search-elastic/pom.xml b/oak-search-elastic/pom.xml
index 44e1dcdf30..fd4ff7f8cf 100644
--- a/oak-search-elastic/pom.xml
+++ b/oak-search-elastic/pom.xml
@@ -33,8 +33,8 @@
   <description>Oak Elasticsearch integration subproject</description>
 
   <properties>
-    
<elasticsearch.java.client.version>8.15.0</elasticsearch.java.client.version>
-    <lucene.version>9.11.1</lucene.version>
+    
<elasticsearch.java.client.version>8.17.2</elasticsearch.java.client.version>
+    <lucene.version>9.12.0</lucene.version>
   </properties>
 
   <build>
diff --git 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
index 13ff29e3f2..64485b03f8 100644
--- 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
+++ 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
@@ -26,7 +26,6 @@ import 
co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
 import co.elastic.clients.json.JsonData;
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
-import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
 import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
 import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
 import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
@@ -44,9 +43,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Phaser;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
@@ -57,7 +54,7 @@ class ElasticBulkProcessorHandler {
     private static final int FAILED_DOC_COUNT_FOR_STATUS_NODE = 
Integer.getInteger("oak.failedDocStatusLimit", 10000);
 
     private static final int BULK_PROCESSOR_CONCURRENCY =
-        Integer.getInteger("oak.indexer.elastic.bulkProcessorConcurrency", 1);
+            Integer.getInteger("oak.indexer.elastic.bulkProcessorConcurrency", 
1);
     private static final String SYNC_MODE_PROPERTY = "sync-mode";
     private static final String SYNC_RT_MODE = "rt";
 
@@ -89,9 +86,6 @@ class ElasticBulkProcessorHandler {
 
     protected long totalOperations;
 
-    // TODO: workaround for 
https://github.com/elastic/elasticsearch-java/pull/867 remove when fixed
-    private final ScheduledExecutorService scheduler;
-
     private ElasticBulkProcessorHandler(@NotNull ElasticConnection 
elasticConnection,
                                         @NotNull String indexName,
                                         @NotNull ElasticIndexDefinition 
indexDefinition,
@@ -102,13 +96,6 @@ class ElasticBulkProcessorHandler {
         this.indexDefinition = indexDefinition;
         this.definitionBuilder = definitionBuilder;
         this.waitForESAcknowledgement = waitForESAcknowledgement;
-        // TODO: workaround for 
https://github.com/elastic/elasticsearch-java/pull/867 remove when fixed
-        this.scheduler = 
Executors.newScheduledThreadPool(BULK_PROCESSOR_CONCURRENCY + 1, (r) -> {
-            Thread t = Executors.defaultThreadFactory().newThread(r);
-            t.setName("oak-bulk-ingester#");
-            t.setDaemon(true);
-            return t;
-        });
         this.bulkIngester = initBulkIngester();
     }
 
@@ -165,9 +152,6 @@ class ElasticBulkProcessorHandler {
                 b = b.flushInterval(indexDefinition.bulkFlushIntervalMs, 
TimeUnit.MILLISECONDS);
             }
 
-            // TODO: workaround for 
https://github.com/elastic/elasticsearch-java/pull/867 remove when fixed
-            b = b.scheduler(scheduler);
-
             return b.maxConcurrentRequests(BULK_PROCESSOR_CONCURRENCY);
         });
     }
@@ -182,7 +166,8 @@ class ElasticBulkProcessorHandler {
 
     /**
      * Indexes a document in the bulk processor. The document is identified by 
the given id. If the document already exists it will be replaced by the new one.
-     * @param id the document id
+     *
+     * @param id       the document id
      * @param document the document to index
      * @throws IOException if an error happened while processing the bulk 
request
      */
@@ -226,44 +211,40 @@ class ElasticBulkProcessorHandler {
 
     /**
      * Closes the bulk ingester and waits for all the bulk requests to return.
+     *
      * @return {@code true} if at least one update was performed, {@code 
false} otherwise
      * @throws IOException if an error happened while processing the bulk 
requests
      */
     public boolean close() throws IOException {
-        try {
-            LOG.trace("Calling close on bulk ingester {}", bulkIngester);
-            bulkIngester.close();
-            LOG.trace("Bulk Ingester {} closed", bulkIngester);
+        LOG.trace("Calling close on bulk ingester {}", bulkIngester);
+        bulkIngester.close();
+        LOG.trace("Bulk Ingester {} closed", bulkIngester);
 
-            // de-register main controller
-            int phase = phaser.arriveAndDeregister();
+        // de-register main controller
+        int phase = phaser.arriveAndDeregister();
 
-            if (totalOperations == 0) { // no need to invoke phaser await if 
we already know there were no operations
-                LOG.debug("No operations executed in this processor. Close 
immediately");
-                return false;
-            }
+        if (totalOperations == 0) { // no need to invoke phaser await if we 
already know there were no operations
+            LOG.debug("No operations executed in this processor. Close 
immediately");
+            return false;
+        }
 
-            if (waitForESAcknowledgement) {
-                try {
-                    phaser.awaitAdvanceInterruptibly(phase, 
indexDefinition.bulkFlushIntervalMs * 5, TimeUnit.MILLISECONDS);
-                } catch (TimeoutException e) {
-                    LOG.error("Error waiting for bulk requests to return", e);
-                } catch (InterruptedException e) {
-                    LOG.warn("Interrupted while waiting for bulk processor to 
close", e);
-                    Thread.currentThread().interrupt();  // restore interrupt 
status
-                }
+        if (waitForESAcknowledgement) {
+            try {
+                phaser.awaitAdvanceInterruptibly(phase, 
indexDefinition.bulkFlushIntervalMs * 5, TimeUnit.MILLISECONDS);
+            } catch (TimeoutException e) {
+                LOG.error("Error waiting for bulk requests to return", e);
+            } catch (InterruptedException e) {
+                LOG.warn("Interrupted while waiting for bulk processor to 
close", e);
+                Thread.currentThread().interrupt();  // restore interrupt 
status
             }
+        }
 
-            checkFailures();
+        checkFailures();
 
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Bulk identifier -> update status = {}", updatesMap);
-            }
-            return updatesMap.containsValue(Boolean.TRUE);
-        } finally {
-            // TODO: workaround for 
https://github.com/elastic/elasticsearch-java/pull/867 remove when fixed
-            new ExecutorCloser(scheduler).close();
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Bulk identifier -> update status = {}", updatesMap);
         }
+        return updatesMap.containsValue(Boolean.TRUE);
     }
 
     private class OakBulkListener implements BulkListener<String> {
@@ -388,7 +369,7 @@ class ElasticBulkProcessorHandler {
             if (totalOperations > 0) {
                 LOG.debug("Forcing refresh");
                 try {
-                       this.elasticConnection.getClient().indices().refresh(b 
-> b.index(indexName));
+                    this.elasticConnection.getClient().indices().refresh(b -> 
b.index(indexName));
                 } catch (IOException e) {
                     LOG.warn("Error refreshing index {}", indexName, e);
                 }
diff --git 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexHelper.java
 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexHelper.java
index 7ec836d132..f42432ea0c 100644
--- 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexHelper.java
+++ 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexHelper.java
@@ -222,7 +222,7 @@ class ElasticIndexHelper {
                         // Make the index more lenient when a field cannot be 
converted to the mapped type. Without this setting
                         // the entire document will fail to update. Instead, 
only the specific field won't be updated.
                         .mapping(mf -> mf.ignoreMalformed(true).
-                                totalFields(f -> 
f.limit(indexDefinition.limitTotalFields)))
+                                totalFields(f -> 
f.limit(Long.toString(indexDefinition.limitTotalFields))))
                         // static setting: cannot be changed after the index 
gets created
                         
.numberOfShards(Integer.toString(indexDefinition.numberOfShards))
                         // dynamic settings: see #enableIndexRequest
diff --git 
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexHelperTest.java
 
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexHelperTest.java
index c168237b02..4873aa6e9e 100644
--- 
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexHelperTest.java
+++ 
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexHelperTest.java
@@ -51,7 +51,7 @@ public class ElasticIndexHelperTest {
         ElasticIndexDefinition definition =
                 new ElasticIndexDefinition(nodeState, nodeState, "path", 
"prefix");
         CreateIndexRequest request = 
ElasticIndexHelper.createIndexRequest("prefix.path", definition);
-        assertEquals(1234L, 
request.settings().index().mapping().totalFields().limit().longValue());
+        assertEquals(1234L, 
Long.parseLong(request.settings().index().mapping().totalFields().limit()));
         assertEquals(true, 
request.settings().index().mapping().ignoreMalformed());
     }
 

Reply via email to