This is an automated email from the ASF dual-hosted git repository.
fortino pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/trunk by this push:
new 12f787fcc4 OAK-11042: bump elastic 8.15.0 / lucene 9.11.1 (#1658)
12f787fcc4 is described below
commit 12f787fcc4606907182f3cbd186876fc554bb312
Author: Fabrizio Fortino <[email protected]>
AuthorDate: Thu Aug 22 15:39:59 2024 +0200
OAK-11042: bump elastic 8.15.0 / lucene 9.11.1 (#1658)
* OAK-11042: bump elastic 8.15.0 / lucene 9.11.1
* OAK-11029: add workaround for elastic/elasticsearch-java#867
* OAK-11029: better doc for workaround
* OAK-11029: improve bulkIngester#close
* OAK-11029: missing static modifier for
ElasticBulkProcessorHandler#FAILED_DOC_COUNT_FOR_STATUS_NODE
---
oak-search-elastic/pom.xml | 4 +-
.../elastic/index/ElasticBulkProcessorHandler.java | 68 ++++++++++++++--------
.../elastic/util/TermQueryBuilderFactory.java | 18 +++---
.../plugins/index/elastic/ElasticTestServer.java | 4 +-
4 files changed, 58 insertions(+), 36 deletions(-)
diff --git a/oak-search-elastic/pom.xml b/oak-search-elastic/pom.xml
index 3c6b1d3489..293faf6ce1 100644
--- a/oak-search-elastic/pom.xml
+++ b/oak-search-elastic/pom.xml
@@ -33,8 +33,8 @@
<description>Oak Elasticsearch integration subproject</description>
<properties>
-
<elasticsearch.java.client.version>8.13.2</elasticsearch.java.client.version>
- <lucene.version>9.10.0</lucene.version>
+
<elasticsearch.java.client.version>8.15.0</elasticsearch.java.client.version>
+ <lucene.version>9.11.1</lucene.version>
</properties>
<build>
diff --git
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
index 5352340304..02897b558c 100644
---
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
+++
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
@@ -25,6 +25,7 @@ import
co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
@@ -42,7 +43,9 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
@@ -50,7 +53,7 @@ import java.util.stream.Collectors;
class ElasticBulkProcessorHandler {
private static final Logger LOG =
LoggerFactory.getLogger(ElasticBulkProcessorHandler.class);
- private final int FAILED_DOC_COUNT_FOR_STATUS_NODE =
Integer.getInteger("oak.failedDocStatusLimit", 10000);
+ private static final int FAILED_DOC_COUNT_FOR_STATUS_NODE =
Integer.getInteger("oak.failedDocStatusLimit", 10000);
private static final int BULK_PROCESSOR_CONCURRENCY =
Integer.getInteger("oak.indexer.elastic.bulkProcessorConcurrency", 1);
@@ -85,6 +88,9 @@ class ElasticBulkProcessorHandler {
protected long totalOperations;
+ // TODO: workaround for
https://github.com/elastic/elasticsearch-java/pull/867 remove when fixed
+ private final ScheduledExecutorService scheduler;
+
private ElasticBulkProcessorHandler(@NotNull ElasticConnection
elasticConnection,
@NotNull String indexName,
@NotNull ElasticIndexDefinition
indexDefinition,
@@ -95,6 +101,13 @@ class ElasticBulkProcessorHandler {
this.indexDefinition = indexDefinition;
this.definitionBuilder = definitionBuilder;
this.waitForESAcknowledgement = waitForESAcknowledgement;
+ // TODO: workaround for
https://github.com/elastic/elasticsearch-java/pull/867 remove when fixed
+ this.scheduler =
Executors.newScheduledThreadPool(BULK_PROCESSOR_CONCURRENCY + 1, (r) -> {
+ Thread t = Executors.defaultThreadFactory().newThread(r);
+ t.setName("oak-bulk-ingester#");
+ t.setDaemon(true);
+ return t;
+ });
this.bulkIngester = initBulkIngester();
}
@@ -150,6 +163,10 @@ class ElasticBulkProcessorHandler {
if (indexDefinition.bulkFlushIntervalMs > 0) {
b = b.flushInterval(indexDefinition.bulkFlushIntervalMs,
TimeUnit.MILLISECONDS);
}
+
+ // TODO: workaround for
https://github.com/elastic/elasticsearch-java/pull/867 remove when fixed
+ b = b.scheduler(scheduler);
+
return b.maxConcurrentRequests(BULK_PROCESSOR_CONCURRENCY);
});
}
@@ -183,35 +200,40 @@ class ElasticBulkProcessorHandler {
* @throws IOException if an error happened while processing the bulk
requests
*/
public boolean close() throws IOException {
- LOG.trace("Calling close on bulk ingester {}", bulkIngester);
- bulkIngester.close();
- LOG.trace("Bulk Ingester {} closed", bulkIngester);
+ try {
+ LOG.trace("Calling close on bulk ingester {}", bulkIngester);
+ bulkIngester.close();
+ LOG.trace("Bulk Ingester {} closed", bulkIngester);
- // de-register main controller
- int phase = phaser.arriveAndDeregister();
+ // de-register main controller
+ int phase = phaser.arriveAndDeregister();
- if (totalOperations == 0) { // no need to invoke phaser await if we
already know there were no operations
- LOG.debug("No operations executed in this processor. Close
immediately");
- return false;
- }
+ if (totalOperations == 0) { // no need to invoke phaser await if
we already know there were no operations
+ LOG.debug("No operations executed in this processor. Close
immediately");
+ return false;
+ }
- if (waitForESAcknowledgement) {
- try {
- phaser.awaitAdvanceInterruptibly(phase,
indexDefinition.bulkFlushIntervalMs * 5, TimeUnit.MILLISECONDS);
- } catch (TimeoutException e) {
- LOG.error("Error waiting for bulk requests to return", e);
- } catch (InterruptedException e) {
- LOG.warn("Interrupted while waiting for bulk processor to
close", e);
- Thread.currentThread().interrupt(); // restore interrupt
status
+ if (waitForESAcknowledgement) {
+ try {
+ phaser.awaitAdvanceInterruptibly(phase,
indexDefinition.bulkFlushIntervalMs * 5, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ LOG.error("Error waiting for bulk requests to return", e);
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while waiting for bulk processor to
close", e);
+ Thread.currentThread().interrupt(); // restore interrupt
status
+ }
}
- }
- checkFailures();
+ checkFailures();
- if (LOG.isTraceEnabled()) {
- LOG.trace("Bulk identifier -> update status = {}", updatesMap);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Bulk identifier -> update status = {}", updatesMap);
+ }
+ return updatesMap.containsValue(Boolean.TRUE);
+ } finally {
+ // TODO: workaround for
https://github.com/elastic/elasticsearch-java/pull/867 remove when fixed
+ new ExecutorCloser(scheduler).close();
}
- return updatesMap.containsValue(Boolean.TRUE);
}
private class OakBulkListener implements BulkListener<String> {
diff --git
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/util/TermQueryBuilderFactory.java
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/util/TermQueryBuilderFactory.java
index ef93868b40..33743639a4 100644
---
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/util/TermQueryBuilderFactory.java
+++
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/util/TermQueryBuilderFactory.java
@@ -70,27 +70,27 @@ public class TermQueryBuilderFactory {
int depth = PathUtils.getDepth(path) + planResult.getParentDepth() + 1;
return Query.of(q -> q.term(t ->
t.field(PATH_DEPTH).value(v->v.longValue(depth))));
}
-
+
private static <R> Query newRangeQuery(String field, R first, R last,
boolean firstIncluding,
- boolean lastIncluding) {
+ boolean lastIncluding) {
- return Query.of(fn -> fn.range(fnr -> {
+ return Query.of(fn -> fn.range(fnr -> fnr.date(date -> {
if (first != null) {
if (firstIncluding) {
- fnr.gte(JsonData.of(first));
+ date.gte(first.toString());
} else {
- fnr.gt(JsonData.of(first));
+ date.gt(first.toString());
}
}
if (last != null) {
if (lastIncluding) {
- fnr.lte(JsonData.of(last));
+ date.lte(last.toString());
} else {
- fnr.lt(JsonData.of(last));
+ date.lt(last.toString());
}
}
- return fnr.field(field);
- }));
+ return date.field(field);
+ })));
}
private static <R> FieldValue toFieldValue(R value) {
diff --git
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticTestServer.java
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticTestServer.java
index 43d2157d83..1d42f8cfa9 100644
---
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticTestServer.java
+++
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticTestServer.java
@@ -49,9 +49,9 @@ public class ElasticTestServer implements AutoCloseable {
"8.7.1.0",
"80c8d34334b0cf4def79835ea6dab78b59ba9ee54c8f5f3cba0bde53123d7820",
"8.10.4.0",
"b2ae8faf1e272319594b4d47a72580fa4f61a5c11cbc8d3f13453fd34b153441",
"8.11.0.0",
"8d4d80b850c4da4da6dfe2d675b2e2355d2014307f8bdc54cc1b34323c81c7ae",
- "8.11.1.0",
"a00a920d4bc29f0deacde7c2ef3d3f70692b00b62bf7fb82b0fe18eeb1dafee9",
"8.11.3.0",
"1f14b496baf973fb5c64e77fc458d9814dd6905170d7b15350f9f1a009824f41",
- "8.13.2.0",
"586f553b109266d7996265f3f34a20914b569d494b49da2c0534428770e551f0");
+ "8.13.2.0",
"586f553b109266d7996265f3f34a20914b569d494b49da2c0534428770e551f0",
+ "8.15.0.0",
"6cbb54d68d654a3476df0b730856cfa3194bce5c6e1050a35e7a86ffec8a3e20");
private static final ElasticTestServer SERVER = new ElasticTestServer();
private static volatile ElasticsearchContainer CONTAINER;