This is an automated email from the ASF dual-hosted git repository.
nfsantos pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/trunk by this push:
new fe6cef1166 OAK-11547 - Upgrade Elasticsearch client from 8.15.0 to
8.17.2 (#2135)
fe6cef1166 is described below
commit fe6cef1166444322f05930b50dc022b75f7fd3e5
Author: Nuno Santos <[email protected]>
AuthorDate: Tue Mar 4 22:26:03 2025 +0100
OAK-11547 - Upgrade Elasticsearch client from 8.15.0 to 8.17.2 (#2135)
---
oak-run-elastic/pom.xml | 2 +-
oak-search-elastic/pom.xml | 4 +-
.../elastic/index/ElasticBulkProcessorHandler.java | 73 ++++++++--------------
.../index/elastic/index/ElasticIndexHelper.java | 2 +-
.../elastic/index/ElasticIndexHelperTest.java | 2 +-
5 files changed, 32 insertions(+), 51 deletions(-)
diff --git a/oak-run-elastic/pom.xml b/oak-run-elastic/pom.xml
index 9fea031fe4..9d876a0790 100644
--- a/oak-run-elastic/pom.xml
+++ b/oak-run-elastic/pom.xml
@@ -42,7 +42,7 @@
105 MB: Azure updates
107 MB: RDB/Tomcat (OAK-10752)
-->
- <max.jar.size>113039632</max.jar.size>
+ <max.jar.size>115000000</max.jar.size>
</properties>
diff --git a/oak-search-elastic/pom.xml b/oak-search-elastic/pom.xml
index 44e1dcdf30..fd4ff7f8cf 100644
--- a/oak-search-elastic/pom.xml
+++ b/oak-search-elastic/pom.xml
@@ -33,8 +33,8 @@
<description>Oak Elasticsearch integration subproject</description>
<properties>
-
<elasticsearch.java.client.version>8.15.0</elasticsearch.java.client.version>
- <lucene.version>9.11.1</lucene.version>
+
<elasticsearch.java.client.version>8.17.2</elasticsearch.java.client.version>
+ <lucene.version>9.12.0</lucene.version>
</properties>
<build>
diff --git
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
index 13ff29e3f2..64485b03f8 100644
---
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
+++
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
@@ -26,7 +26,6 @@ import
co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
import co.elastic.clients.json.JsonData;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
-import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
@@ -44,9 +43,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
@@ -57,7 +54,7 @@ class ElasticBulkProcessorHandler {
private static final int FAILED_DOC_COUNT_FOR_STATUS_NODE =
Integer.getInteger("oak.failedDocStatusLimit", 10000);
private static final int BULK_PROCESSOR_CONCURRENCY =
- Integer.getInteger("oak.indexer.elastic.bulkProcessorConcurrency", 1);
+ Integer.getInteger("oak.indexer.elastic.bulkProcessorConcurrency",
1);
private static final String SYNC_MODE_PROPERTY = "sync-mode";
private static final String SYNC_RT_MODE = "rt";
@@ -89,9 +86,6 @@ class ElasticBulkProcessorHandler {
protected long totalOperations;
- // TODO: workaround for
https://github.com/elastic/elasticsearch-java/pull/867 remove when fixed
- private final ScheduledExecutorService scheduler;
-
private ElasticBulkProcessorHandler(@NotNull ElasticConnection
elasticConnection,
@NotNull String indexName,
@NotNull ElasticIndexDefinition
indexDefinition,
@@ -102,13 +96,6 @@ class ElasticBulkProcessorHandler {
this.indexDefinition = indexDefinition;
this.definitionBuilder = definitionBuilder;
this.waitForESAcknowledgement = waitForESAcknowledgement;
- // TODO: workaround for
https://github.com/elastic/elasticsearch-java/pull/867 remove when fixed
- this.scheduler =
Executors.newScheduledThreadPool(BULK_PROCESSOR_CONCURRENCY + 1, (r) -> {
- Thread t = Executors.defaultThreadFactory().newThread(r);
- t.setName("oak-bulk-ingester#");
- t.setDaemon(true);
- return t;
- });
this.bulkIngester = initBulkIngester();
}
@@ -165,9 +152,6 @@ class ElasticBulkProcessorHandler {
b = b.flushInterval(indexDefinition.bulkFlushIntervalMs,
TimeUnit.MILLISECONDS);
}
- // TODO: workaround for
https://github.com/elastic/elasticsearch-java/pull/867 remove when fixed
- b = b.scheduler(scheduler);
-
return b.maxConcurrentRequests(BULK_PROCESSOR_CONCURRENCY);
});
}
@@ -182,7 +166,8 @@ class ElasticBulkProcessorHandler {
/**
* Indexes a document in the bulk processor. The document is identified by
the given id. If the document already exists it will be replaced by the new one.
- * @param id the document id
+ *
+ * @param id the document id
* @param document the document to index
* @throws IOException if an error happened while processing the bulk
request
*/
@@ -226,44 +211,40 @@ class ElasticBulkProcessorHandler {
/**
* Closes the bulk ingester and waits for all the bulk requests to return.
+ *
* @return {@code true} if at least one update was performed, {@code
false} otherwise
* @throws IOException if an error happened while processing the bulk
requests
*/
public boolean close() throws IOException {
- try {
- LOG.trace("Calling close on bulk ingester {}", bulkIngester);
- bulkIngester.close();
- LOG.trace("Bulk Ingester {} closed", bulkIngester);
+ LOG.trace("Calling close on bulk ingester {}", bulkIngester);
+ bulkIngester.close();
+ LOG.trace("Bulk Ingester {} closed", bulkIngester);
- // de-register main controller
- int phase = phaser.arriveAndDeregister();
+ // de-register main controller
+ int phase = phaser.arriveAndDeregister();
- if (totalOperations == 0) { // no need to invoke phaser await if
we already know there were no operations
- LOG.debug("No operations executed in this processor. Close
immediately");
- return false;
- }
+ if (totalOperations == 0) { // no need to invoke phaser await if we
already know there were no operations
+ LOG.debug("No operations executed in this processor. Close
immediately");
+ return false;
+ }
- if (waitForESAcknowledgement) {
- try {
- phaser.awaitAdvanceInterruptibly(phase,
indexDefinition.bulkFlushIntervalMs * 5, TimeUnit.MILLISECONDS);
- } catch (TimeoutException e) {
- LOG.error("Error waiting for bulk requests to return", e);
- } catch (InterruptedException e) {
- LOG.warn("Interrupted while waiting for bulk processor to
close", e);
- Thread.currentThread().interrupt(); // restore interrupt
status
- }
+ if (waitForESAcknowledgement) {
+ try {
+ phaser.awaitAdvanceInterruptibly(phase,
indexDefinition.bulkFlushIntervalMs * 5, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ LOG.error("Error waiting for bulk requests to return", e);
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while waiting for bulk processor to
close", e);
+ Thread.currentThread().interrupt(); // restore interrupt
status
}
+ }
- checkFailures();
+ checkFailures();
- if (LOG.isTraceEnabled()) {
- LOG.trace("Bulk identifier -> update status = {}", updatesMap);
- }
- return updatesMap.containsValue(Boolean.TRUE);
- } finally {
- // TODO: workaround for
https://github.com/elastic/elasticsearch-java/pull/867 remove when fixed
- new ExecutorCloser(scheduler).close();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Bulk identifier -> update status = {}", updatesMap);
}
+ return updatesMap.containsValue(Boolean.TRUE);
}
private class OakBulkListener implements BulkListener<String> {
@@ -388,7 +369,7 @@ class ElasticBulkProcessorHandler {
if (totalOperations > 0) {
LOG.debug("Forcing refresh");
try {
- this.elasticConnection.getClient().indices().refresh(b
-> b.index(indexName));
+ this.elasticConnection.getClient().indices().refresh(b ->
b.index(indexName));
} catch (IOException e) {
LOG.warn("Error refreshing index {}", indexName, e);
}
diff --git
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexHelper.java
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexHelper.java
index 7ec836d132..f42432ea0c 100644
---
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexHelper.java
+++
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexHelper.java
@@ -222,7 +222,7 @@ class ElasticIndexHelper {
// Make the index more lenient when a field cannot be
converted to the mapped type. Without this setting
// the entire document will fail to update. Instead,
only the specific field won't be updated.
.mapping(mf -> mf.ignoreMalformed(true).
- totalFields(f ->
f.limit(indexDefinition.limitTotalFields)))
+ totalFields(f ->
f.limit(Long.toString(indexDefinition.limitTotalFields))))
// static setting: cannot be changed after the index
gets created
.numberOfShards(Integer.toString(indexDefinition.numberOfShards))
// dynamic settings: see #enableIndexRequest
diff --git
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexHelperTest.java
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexHelperTest.java
index c168237b02..4873aa6e9e 100644
---
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexHelperTest.java
+++
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexHelperTest.java
@@ -51,7 +51,7 @@ public class ElasticIndexHelperTest {
ElasticIndexDefinition definition =
new ElasticIndexDefinition(nodeState, nodeState, "path",
"prefix");
CreateIndexRequest request =
ElasticIndexHelper.createIndexRequest("prefix.path", definition);
- assertEquals(1234L,
request.settings().index().mapping().totalFields().limit().longValue());
+ assertEquals(1234L,
Long.parseLong(request.settings().index().mapping().totalFields().limit()));
assertEquals(true,
request.settings().index().mapping().ignoreMalformed());
}