This is an automated email from the ASF dual-hosted git repository.

fortino pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d1cd2bc18e OAK-10539: oak-search-elastic: migrate ingestion from Rest 
High Level Client to the new Java API Client (#1193)
d1cd2bc18e is described below

commit d1cd2bc18ef3a41dc0c22c078e4785631856a2ac
Author: Fabrizio Fortino <[email protected]>
AuthorDate: Tue Nov 14 09:07:22 2023 +0100

    OAK-10539: oak-search-elastic: migrate ingestion from Rest High Level 
Client to the new Java API Client (#1193)
    
    * OAK-10539: bump up ES clients
    
    * OAK-10539: increase max jar size in oak-run-elastic
    
    * OAK-10539: BulkProcessor -> BulkIngester migration (wip)
    
    * OAK-10539: fix failing tests
    
    * OAK-10539: remove use of RHLC
    
    * OAK-10539: reduce max.jar.size in oak-run-elastic
    
    * OAK-10539: minor cleanup + improved docs
    
    * OAK-10539: minor cleanup
    
    * OAK-10539: fix tests in oak-run-elastic
    
    * OAK-10539: (minor) align test names
    
    * OAK-10539: improved interruptions handling in bulk processor
    
    * OAK-10539: code review improvements
    
    * OAK-10539: remove unused bulk retries properties from elastic index 
definition
    
    * OAK-10539: use constants for nested field names
    
    * OAK-10539: revise interrupted exception handling
    
    * OAK-10539: bump es to 8.11.0
---
 oak-run-elastic/pom.xml                            |   3 +-
 .../index/indexer/document/ElasticIndexerTest.java |  25 ++-
 .../ElasticPurgeOldIndexVersionTest.java           |   3 +-
 oak-search-elastic/pom.xml                         |  88 +-------
 .../plugins/index/elastic/ElasticConnection.java   |  23 +-
 .../index/elastic/ElasticIndexDefinition.java      |  12 +-
 .../elastic/index/ElasticBulkProcessorHandler.java | 242 +++++++++++----------
 .../index/elastic/index/ElasticCustomAnalyzer.java |   8 +-
 .../index/ElasticCustomAnalyzerMappings.java       |   4 +-
 .../index/elastic/index/ElasticDocument.java       | 138 +++++-------
 .../index/elastic/index/ElasticIndexHelper.java    |  41 ++--
 .../index/elastic/index/ElasticIndexWriter.java    |  47 +---
 .../index/elastic/ElasticPropertyIndexTest.java    |  10 +-
 .../plugins/index/elastic/ElasticTestServer.java   |   6 +-
 .../index/ElasticBulkProcessorHandlerTest.java     |  17 +-
 .../elastic/index/ElasticIndexWriterTest.java      |  48 ++--
 16 files changed, 303 insertions(+), 412 deletions(-)

diff --git a/oak-run-elastic/pom.xml b/oak-run-elastic/pom.xml
index e7e12b7645..316b9b48a9 100644
--- a/oak-run-elastic/pom.xml
+++ b/oak-run-elastic/pom.xml
@@ -37,8 +37,9 @@
         105 MB : Setting constraint to default oak-run jar post adding the 
build plugin to rename the fat jar with embedded dependencies as the default 
jar.
         121 MB : add Elasticsearch Java client along with RHLC: the latter can 
be removed when the code can be fully migrated to use the new client
         125 MB : shaded Guava
+        85 MB : remove Elasticsearch RHLC
         -->
-        <max.jar.size>125000000</max.jar.size>
+        <max.jar.size>85000000</max.jar.size>
                       
     </properties>
 
diff --git 
a/oak-run-elastic/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/ElasticIndexerTest.java
 
b/oak-run-elastic/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/ElasticIndexerTest.java
index 2b5270bdb2..e447aa834c 100644
--- 
a/oak-run-elastic/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/ElasticIndexerTest.java
+++ 
b/oak-run-elastic/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/ElasticIndexerTest.java
@@ -18,6 +18,10 @@
  */
 package org.apache.jackrabbit.oak.index.indexer.document;
 
+import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
+import co.elastic.clients.json.JsonpMapper;
+import jakarta.json.spi.JsonProvider;
+import jakarta.json.stream.JsonGenerator;
 import org.apache.jackrabbit.oak.index.IndexHelper;
 import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
 import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
@@ -34,17 +38,19 @@ import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.junit.Test;
 
+import java.io.OutputStream;
+
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 
 import static 
org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
 import static org.apache.jackrabbit.oak.InitialContentHelper.INITIAL_CONTENT;
+import static org.mockito.Mockito.when;
 
 public class ElasticIndexerTest {
 
-    private NodeState root = INITIAL_CONTENT;
-
     @Test
     public void nodeIndexed_WithIncludedPaths() throws Exception {
         ElasticIndexDefinitionBuilder idxb = new 
ElasticIndexDefinitionBuilder();
@@ -52,11 +58,20 @@ public class ElasticIndexerTest {
         idxb.includedPaths("/content");
 
         NodeState defn = idxb.build();
-        IndexDefinition idxDefn = new ElasticIndexDefinition(root, defn, 
"/oak:index/testIndex", "testPrefix");
+        IndexDefinition idxDefn = new ElasticIndexDefinition(INITIAL_CONTENT, 
defn, "/oak:index/testIndex", "testPrefix");
+
+        NodeBuilder builder = INITIAL_CONTENT.builder();
 
-        NodeBuilder builder = root.builder();
+        ElasticConnection elasticConnectionMock = 
mock(ElasticConnection.class);
+        ElasticsearchAsyncClient elasticsearchAsyncClientMock = 
mock(ElasticsearchAsyncClient.class);
+        JsonpMapper jsonMapperMock = mock(JsonpMapper.class);
+        JsonProvider jsonProviderMock = mock(JsonProvider.class);
+        
when(jsonProviderMock.createGenerator(any(OutputStream.class))).thenReturn(mock(JsonGenerator.class));
+        when(jsonMapperMock.jsonProvider()).thenReturn(jsonProviderMock);
+        
when(elasticsearchAsyncClientMock._jsonpMapper()).thenReturn(jsonMapperMock);
+        
when(elasticConnectionMock.getAsyncClient()).thenReturn(elasticsearchAsyncClientMock);
 
-        FulltextIndexWriter indexWriter = new 
ElasticIndexWriterFactory(mock(ElasticConnection.class),
+        FulltextIndexWriter indexWriter = new 
ElasticIndexWriterFactory(elasticConnectionMock,
                 mock(ElasticIndexTracker.class)).newInstance(idxDefn, 
defn.builder(), CommitInfo.EMPTY, false);
         ElasticIndexer indexer = new ElasticIndexer(idxDefn, 
mock(FulltextBinaryTextExtractor.class), builder,
                 mock(IndexingProgressReporter.class), indexWriter, 
mock(ElasticIndexEditorProvider.class), mock(IndexHelper.class));
diff --git 
a/oak-run-elastic/src/test/java/org/apache/jackrabbit/oak/indexversion/ElasticPurgeOldIndexVersionTest.java
 
b/oak-run-elastic/src/test/java/org/apache/jackrabbit/oak/indexversion/ElasticPurgeOldIndexVersionTest.java
index d7bd3b663f..6922d3240a 100644
--- 
a/oak-run-elastic/src/test/java/org/apache/jackrabbit/oak/indexversion/ElasticPurgeOldIndexVersionTest.java
+++ 
b/oak-run-elastic/src/test/java/org/apache/jackrabbit/oak/indexversion/ElasticPurgeOldIndexVersionTest.java
@@ -46,13 +46,12 @@ import javax.jcr.Session;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.jackrabbit.commons.JcrUtils.getOrCreateByPath;
 import static org.hamcrest.CoreMatchers.containsString;
-import static org.junit.Assert.assertThat;
+import static org.hamcrest.MatcherAssert.assertThat;
 
 public class ElasticPurgeOldIndexVersionTest extends 
ElasticAbstractIndexCommandTest {
 
diff --git a/oak-search-elastic/pom.xml b/oak-search-elastic/pom.xml
index fd19bfdb6a..5fd82b97fa 100644
--- a/oak-search-elastic/pom.xml
+++ b/oak-search-elastic/pom.xml
@@ -33,8 +33,8 @@
   <description>Oak Elasticsearch integration subproject</description>
 
   <properties>
-    <elasticsearch.hlrc.version>7.17.13</elasticsearch.hlrc.version>
-    
<elasticsearch.java.client.version>8.7.1</elasticsearch.java.client.version>
+    
<elasticsearch.java.client.version>8.11.0</elasticsearch.java.client.version>
+    <lucene.version>9.8.0</lucene.version>
   </properties>
 
   <build>
@@ -122,16 +122,6 @@
     </dependency>
 
     <!-- Elastic/Lucene -->
-    <dependency>
-      <groupId>org.elasticsearch.client</groupId>
-      <artifactId>elasticsearch-rest-high-level-client</artifactId>
-      <version>${elasticsearch.hlrc.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.elasticsearch.client</groupId>
-      <artifactId>elasticsearch-rest-client</artifactId>
-      <version>${elasticsearch.hlrc.version}</version>
-    </dependency>
        <dependency>
       <groupId>co.elastic.clients</groupId>
       <artifactId>elasticsearch-java</artifactId>
@@ -143,72 +133,14 @@
       <version>${jackson.version}</version>
     </dependency>
     <dependency>
-      <groupId>org.elasticsearch</groupId>
-      <artifactId>elasticsearch</artifactId>
-      <version>${elasticsearch.hlrc.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.elasticsearch</groupId>
-          <artifactId>elasticsearch-cli</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.elasticsearch</groupId>
-          <artifactId>elasticsearch-geo</artifactId>
-        </exclusion>
-
-        <!-- 
https://github.com/elastic/elasticsearch/issues/29184#issuecomment-662480046 -->
-        <exclusion>
-          <groupId>org.apache.lucene</groupId>
-          <artifactId>lucene-backward-codecs</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.lucene</groupId>
-          <artifactId>lucene-grouping</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.lucene</groupId>
-          <artifactId>lucene-memory</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.lucene</groupId>
-          <artifactId>lucene-misc</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.lucene</groupId>
-          <artifactId>lucene-queryparser</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.lucene</groupId>
-          <artifactId>lucene-sandbox</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.lucene</groupId>
-          <artifactId>lucene-spatial</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.lucene</groupId>
-          <artifactId>lucene-spatial-extras</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.lucene</groupId>
-          <artifactId>lucene-spatial3d</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.lucene</groupId>
-          <artifactId>lucene-suggest</artifactId>
-        </exclusion>
-        <!--
-          elasticsearch-xcontent depends on snakeyaml 1.33 which is vulnerable 
to remote code execution
-          (https://nvd.nist.gov/vuln/detail/CVE-2022-1471)
-          elasticsearch-xcontent is used in this module but only with the json 
parser which does not use snakeyaml
-          so we can safely exclude it.
-          This exclusion, like the others above, can be removed once we will 
remove the elasticsearch high level rest client
-        -->
-        <exclusion>
-          <groupId>org.yaml</groupId>
-          <artifactId>snakeyaml</artifactId>
-        </exclusion>
-      </exclusions>
+      <groupId>org.apache.lucene</groupId>
+      <artifactId>lucene-core</artifactId>
+      <version>${lucene.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.lucene</groupId>
+      <artifactId>lucene-analysis-common</artifactId>
+      <version>${lucene.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.tika</groupId>
diff --git 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticConnection.java
 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticConnection.java
index a677a27506..66e3544713 100644
--- 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticConnection.java
+++ 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticConnection.java
@@ -23,8 +23,6 @@ import org.apache.http.HttpHost;
 import org.apache.http.message.BasicHeader;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestClientBuilder;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.client.RestHighLevelClientBuilder;
 import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -123,14 +121,12 @@ public class ElasticConnection implements Closeable {
                             requestConfigBuilder -> 
requestConfigBuilder.setSocketTimeout(ES_SOCKET_TIMEOUT));
 
                     RestClient httpClient = builder.build();
-                    RestHighLevelClient hlClient = new 
RestHighLevelClientBuilder(httpClient)
-                            .setApiCompatibilityMode(true).build();
 
                     ElasticsearchTransport transport = new RestClientTransport(
                             httpClient, new JacksonJsonpMapper());
                     ElasticsearchClient esClient = new 
ElasticsearchClient(transport);
                     ElasticsearchAsyncClient esAsyncClient = new 
ElasticsearchAsyncClient(transport);
-                    clients = new Clients(esClient, esAsyncClient, hlClient);
+                    clients = new Clients(esClient, esAsyncClient);
                 }
             }
         }
@@ -153,14 +149,6 @@ public class ElasticConnection implements Closeable {
         return getClients().asyncClient;
     }
 
-    /**
-     * @deprecated
-     * @return the old Elasticsearch client
-     */
-    public RestHighLevelClient getOldClient() {
-        return getClients().rhlClient;
-    }
-
     public String getIndexPrefix() {
         return indexPrefix;
     }
@@ -187,9 +175,6 @@ public class ElasticConnection implements Closeable {
                 // standard client
                 clients.client._transport().close();
             }
-            if (clients.rhlClient != null) {
-                clients.rhlClient.close();
-            }
         }
         isClosed.set(true);
     }
@@ -217,12 +202,10 @@ public class ElasticConnection implements Closeable {
     private static class Clients {
         public final ElasticsearchClient client;
         public final ElasticsearchAsyncClient asyncClient;
-        public final RestHighLevelClient rhlClient;
 
-        Clients(ElasticsearchClient client, ElasticsearchAsyncClient 
asyncClient, RestHighLevelClient rhlClient) {
+        Clients(ElasticsearchClient client, ElasticsearchAsyncClient 
asyncClient) {
             this.client = client;
             this.asyncClient = asyncClient;
-            this.rhlClient = rhlClient;
         }
     }
 
@@ -271,7 +254,7 @@ public class ElasticConnection implements Closeable {
         /**
          * This is the final step in charge of building the {@link 
ElasticConnection}.
          * Validation should be here.
-         *
+         * <p>
          * It adds support for {@link OptionalSteps}.
          */
         public interface BuildStep extends OptionalSteps {
diff --git 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexDefinition.java
 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexDefinition.java
index 70a975760b..9ad83c5191 100644
--- 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexDefinition.java
+++ 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexDefinition.java
@@ -45,17 +45,11 @@ public class ElasticIndexDefinition extends IndexDefinition 
{
     public static final int BULK_ACTIONS_DEFAULT = 250;
 
     public static final String BULK_SIZE_BYTES = "bulkSizeBytes";
-    public static final long BULK_SIZE_BYTES_DEFAULT = 1 * 1024 * 1024; // 1MB
+    public static final long BULK_SIZE_BYTES_DEFAULT = 1024 * 1024; // 1MB
 
     public static final String BULK_FLUSH_INTERVAL_MS = "bulkFlushIntervalMs";
     public static final long BULK_FLUSH_INTERVAL_MS_DEFAULT = 3000;
 
-    public static final String BULK_RETRIES = "bulkRetries";
-    public static final int BULK_RETRIES_DEFAULT = 3;
-
-    public static final String BULK_RETRIES_BACKOFF = "bulkRetriesBackoff";
-    public static final long BULK_RETRIES_BACKOFF_DEFAULT = 200;
-
     public static final String NUMBER_OF_SHARDS = "numberOfShards";
     public static final int NUMBER_OF_SHARDS_DEFAULT = 1;
 
@@ -133,8 +127,6 @@ public class ElasticIndexDefinition extends IndexDefinition 
{
     public final int bulkActions;
     public final long bulkSizeBytes;
     public final long bulkFlushIntervalMs;
-    public final int bulkRetries;
-    public final long bulkRetriesBackoff;
     private final boolean similarityTagsEnabled;
     private final float similarityTagsBoost;
     public final int numberOfShards;
@@ -157,8 +149,6 @@ public class ElasticIndexDefinition extends IndexDefinition 
{
         this.bulkActions = getOptionalValue(defn, BULK_ACTIONS, 
BULK_ACTIONS_DEFAULT);
         this.bulkSizeBytes = getOptionalValue(defn, BULK_SIZE_BYTES, 
BULK_SIZE_BYTES_DEFAULT);
         this.bulkFlushIntervalMs = getOptionalValue(defn, 
BULK_FLUSH_INTERVAL_MS, BULK_FLUSH_INTERVAL_MS_DEFAULT);
-        this.bulkRetries = getOptionalValue(defn, BULK_RETRIES, 
BULK_RETRIES_DEFAULT);
-        this.bulkRetriesBackoff = getOptionalValue(defn, BULK_RETRIES_BACKOFF, 
BULK_RETRIES_BACKOFF_DEFAULT);
         this.numberOfShards = getOptionalValue(defn, NUMBER_OF_SHARDS, 
NUMBER_OF_SHARDS_DEFAULT);
         this.numberOfReplicas = getOptionalValue(defn, NUMBER_OF_REPLICAS, 
NUMBER_OF_REPLICAS_DEFAULT);
         this.similarityTagsEnabled = getOptionalValue(defn, 
SIMILARITY_TAGS_ENABLED, SIMILARITY_TAGS_ENABLED_DEFAULT);
diff --git 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
index 035fd7b6c8..b822a7bbd1 100644
--- 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
+++ 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
@@ -16,6 +16,13 @@
  */
 package org.apache.jackrabbit.oak.plugins.index.elastic.index;
 
+import co.elastic.clients.elasticsearch._helpers.bulk.BulkIngester;
+import co.elastic.clients.elasticsearch._helpers.bulk.BulkListener;
+import co.elastic.clients.elasticsearch._types.ErrorCause;
+import co.elastic.clients.elasticsearch.core.BulkRequest;
+import co.elastic.clients.elasticsearch.core.BulkResponse;
+import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
+import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
 import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
@@ -23,37 +30,23 @@ import 
org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
 import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.DocWriteRequest;
-import org.elasticsearch.action.bulk.BackoffPolicy;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.support.WriteRequest;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.common.Strings;
-import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.core.TimeValue;
 import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Phaser;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
-import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS;
-import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
-
 class ElasticBulkProcessorHandler {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ElasticBulkProcessorHandler.class);
@@ -68,13 +61,13 @@ class ElasticBulkProcessorHandler {
     protected final String indexName;
     protected final ElasticIndexDefinition indexDefinition;
     private final NodeBuilder definitionBuilder;
-    protected final BulkProcessor bulkProcessor;
+    protected final BulkIngester<String> bulkIngester;
     private final boolean waitForESAcknowledgement;
 
     /**
      * Coordinates communication between bulk processes. It has a main 
controller registered at creation time and
      * de-registered on {@link ElasticIndexWriter#close(long)}. Each bulk 
request register a new party in
-     * this Phaser in {@link OakBulkProcessorListener#beforeBulk(long, 
BulkRequest)} and de-register itself when
+     * this Phaser in {@link OakBulkListener#beforeBulk(long, BulkRequest, 
List)} and de-register itself when
      * the request returns.
      */
     private final Phaser phaser = new Phaser(1); // register main controller
@@ -82,7 +75,7 @@ class ElasticBulkProcessorHandler {
     /**
      * Exceptions occurred while trying to update index in elasticsearch
      */
-    private final ConcurrentLinkedQueue<Throwable> suppressedExceptions = new 
ConcurrentLinkedQueue<>();
+    private final ConcurrentLinkedQueue<ErrorCause> suppressedErrorCauses = 
new ConcurrentLinkedQueue<>();
 
     /**
      * Key-value structure to keep the history of bulk requests. Keys are the 
bulk execution ids, the boolean
@@ -102,7 +95,7 @@ class ElasticBulkProcessorHandler {
         this.indexDefinition = indexDefinition;
         this.definitionBuilder = definitionBuilder;
         this.waitForESAcknowledgement = waitForESAcknowledgement;
-        this.bulkProcessor = initBulkProcessor();
+        this.bulkIngester = initBulkIngester();
     }
 
     /**
@@ -142,44 +135,57 @@ class ElasticBulkProcessorHandler {
         return new ElasticBulkProcessorHandler(elasticConnection, indexName, 
indexDefinition, definitionBuilder, waitForESAcknowledgement);
     }
 
-    private BulkProcessor initBulkProcessor() {
-        return BulkProcessor.builder(requestConsumer(),
-                new OakBulkProcessorListener(), this.indexName + 
"-bulk-processor")
-                .setBulkActions(indexDefinition.bulkActions)
-                .setConcurrentRequests(BULK_PROCESSOR_CONCURRENCY)
-                .setBulkSize(new ByteSizeValue(indexDefinition.bulkSizeBytes))
-                
.setFlushInterval(TimeValue.timeValueMillis(indexDefinition.bulkFlushIntervalMs))
-                .setBackoffPolicy(BackoffPolicy.exponentialBackoff(
-                        
TimeValue.timeValueMillis(indexDefinition.bulkRetriesBackoff), 
indexDefinition.bulkRetries)
-                )
-                .build();
+    private BulkIngester<String> initBulkIngester() {
+        // BulkIngester does not support retry policies. Some retries though 
are already implemented in the transport layer.
+        // More details here: 
https://github.com/elastic/elasticsearch-java/issues/478
+        return BulkIngester.of(b -> {
+            b = b.client(elasticConnection.getAsyncClient())
+                    .listener(new OakBulkListener());
+            if (indexDefinition.bulkActions > 0) {
+                b = b.maxOperations(indexDefinition.bulkActions);
+            }
+            if (indexDefinition.bulkSizeBytes > 0) {
+                b = b.maxSize(indexDefinition.bulkSizeBytes);
+            }
+            if (indexDefinition.bulkFlushIntervalMs > 0) {
+                b = b.flushInterval(indexDefinition.bulkFlushIntervalMs, 
TimeUnit.MILLISECONDS);
+            }
+            return b.maxConcurrentRequests(BULK_PROCESSOR_CONCURRENCY);
+        });
     }
 
     private void checkFailures() throws IOException {
-        if (!suppressedExceptions.isEmpty()) {
+        if (!suppressedErrorCauses.isEmpty()) {
             IOException ioe = new IOException("Exception while indexing. See 
suppressed for details");
-            suppressedExceptions.forEach(ioe::addSuppressed);
+            suppressedErrorCauses.stream().map(ec -> new 
IllegalStateException(ec.reason())).forEach(ioe::addSuppressed);
             throw ioe;
         }
     }
 
-    protected BiConsumer<BulkRequest, ActionListener<BulkResponse>> 
requestConsumer() {
-        // TODO: migrate to ES Java client 
https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/current/indexing-bulk.html
-        return (request, bulkListener) -> 
elasticConnection.getOldClient().bulkAsync(request, RequestOptions.DEFAULT, 
bulkListener);
+    public void update(String id, ElasticDocument document) throws IOException 
{
+        add(BulkOperation.of(op -> op.index(idx -> 
idx.index(indexName).id(id).document(document))), id);
     }
 
-    public void add(DocWriteRequest<?> request) throws IOException {
+    public void delete(String id) throws IOException {
+        add(BulkOperation.of(op -> op.delete(idx -> 
idx.index(indexName).id(id))), id);
+    }
+
+    private void add(BulkOperation operation, String context) throws 
IOException {
         // fail fast: we don't want to wait until the processor gets closed to 
fail
         checkFailures();
-
-        bulkProcessor.add(request);
+        bulkIngester.add(operation, context);
         totalOperations++;
     }
 
+    /**
+     * Closes the bulk ingester and waits for all the bulk requests to return.
+     * @return {@code true} if at least one update was performed, {@code 
false} otherwise
+     * @throws IOException if an error happened while processing the bulk 
requests
+     */
     public boolean close() throws IOException {
-        LOG.trace("Calling close on bulk processor {}", bulkProcessor);
-        bulkProcessor.close();
-        LOG.trace("Bulk Processor {} closed", bulkProcessor);
+        LOG.trace("Calling close on bulk ingester {}", bulkIngester);
+        bulkIngester.close();
+        LOG.trace("Bulk Ingester {} closed", bulkIngester);
 
         // de-register main controller
         int phase = phaser.arriveAndDeregister();
@@ -192,8 +198,11 @@ class ElasticBulkProcessorHandler {
         if (waitForESAcknowledgement) {
             try {
                 phaser.awaitAdvanceInterruptibly(phase, 
indexDefinition.bulkFlushIntervalMs * 5, TimeUnit.MILLISECONDS);
-            } catch (InterruptedException | TimeoutException e) {
+            } catch (TimeoutException e) {
                 LOG.error("Error waiting for bulk requests to return", e);
+            } catch (InterruptedException e) {
+                LOG.warn("Interrupted while waiting for bulk processor to 
close", e);
+                Thread.currentThread().interrupt();  // restore interrupt 
status
             }
         }
 
@@ -205,91 +214,96 @@ class ElasticBulkProcessorHandler {
         return updatesMap.containsValue(Boolean.TRUE);
     }
 
-    private class OakBulkProcessorListener implements BulkProcessor.Listener {
+    private class OakBulkListener implements BulkListener<String> {
 
         @Override
-        public void beforeBulk(long executionId, BulkRequest bulkRequest) {
+        public void beforeBulk(long executionId, BulkRequest request, 
List<String> contexts) {
             // register new bulk party
             phaser.register();
 
             // init update status
             updatesMap.put(executionId, Boolean.FALSE);
 
-            bulkRequest.timeout(TimeValue.timeValueMinutes(2));
-
-            LOG.debug("Sending bulk with id {} -> {}", executionId, 
bulkRequest.getDescription());
+            LOG.debug("Sending bulk with id {} -> {}", executionId, contexts);
             if (LOG.isTraceEnabled()) {
-                LOG.trace("Bulk Requests: \n{}", bulkRequest.requests()
+                LOG.trace("Bulk Requests: \n{}", request.operations()
                         .stream()
-                        .map(DocWriteRequest::toString)
+                        .map(BulkOperation::toString)
                         .collect(Collectors.joining("\n"))
                 );
             }
         }
 
         @Override
-        public void afterBulk(long executionId, BulkRequest bulkRequest, 
BulkResponse bulkResponse) {
-            LOG.debug("Bulk with id {} processed with status {} in {}", 
executionId, bulkResponse.status(), bulkResponse.getTook());
-            if (LOG.isTraceEnabled()) {
-                try {
-                    
LOG.trace(Strings.toString(bulkResponse.toXContent(jsonBuilder(), 
EMPTY_PARAMS)));
-                } catch (IOException e) {
-                    LOG.error("Error decoding bulk response", e);
+        public void afterBulk(long executionId, BulkRequest request, 
List<String> contexts, BulkResponse response) {
+            try {
+                LOG.debug("Bulk with id {} processed in {} ms", executionId, 
response.took());
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace(response.toString());
                 }
-            }
-            if (bulkResponse.hasFailures()) { // check if some operations 
failed to execute
-                Set<String> failedDocSet = new LinkedHashSet<>();
-                NodeBuilder status = 
definitionBuilder.child(IndexDefinition.STATUS_NODE);
-                // Read the current failed paths (if any) on the :status node 
into failedDocList
-                if (status.hasProperty(IndexDefinition.FAILED_DOC_PATHS)) {
-                    for (String str : 
status.getProperty(IndexDefinition.FAILED_DOC_PATHS).getValue(Type.STRINGS)) {
-                        failedDocSet.add(str);
+                if (response.items().stream().anyMatch(i -> i.error() != 
null)) { // check if some operations failed to execute
+                    Set<String> failedDocSet = new LinkedHashSet<>();
+                    NodeBuilder status = 
definitionBuilder.child(IndexDefinition.STATUS_NODE);
+                    // Read the current failed paths (if any) on the :status 
node into failedDocList
+                    if (status.hasProperty(IndexDefinition.FAILED_DOC_PATHS)) {
+                        for (String str : 
status.getProperty(IndexDefinition.FAILED_DOC_PATHS).getValue(Type.STRINGS)) {
+                            failedDocSet.add(str);
+                        }
                     }
-                }
 
-                int initialSize = failedDocSet.size();
-                boolean isFailedDocSetFull = false;
-
-                boolean hasSuccesses = false;
-                for (BulkItemResponse bulkItemResponse : bulkResponse) {
-                    if (bulkItemResponse.isFailed()) {
-                        BulkItemResponse.Failure failure = 
bulkItemResponse.getFailure();
-                        if (indexDefinition.failOnError && failure.getCause() 
!= null) {
-                            suppressedExceptions.add(failure.getCause());
-                        }
-                        if (!isFailedDocSetFull && failedDocSet.size() < 
FAILED_DOC_COUNT_FOR_STATUS_NODE) {
-                            failedDocSet.add(bulkItemResponse.getId());
-                        } else {
-                            isFailedDocSetFull = true;
+                    int initialSize = failedDocSet.size();
+                    boolean isFailedDocSetFull = false;
+
+                    boolean hasSuccesses = false;
+                    for (int i = 0; i < contexts.size(); i++) {
+                        BulkResponseItem item = response.items().get(i);
+                        if (item.error() != null) {
+                            if (indexDefinition.failOnError) {
+                                suppressedErrorCauses.add(item.error());
+                            }
+                            if (!isFailedDocSetFull && failedDocSet.size() < 
FAILED_DOC_COUNT_FOR_STATUS_NODE) {
+                                failedDocSet.add(contexts.get(i));
+                            } else {
+                                isFailedDocSetFull = true;
+                            }
+                            // Log entry to be used to parse logs to get the 
failed doc id/path if needed
+                            LOG.error("ElasticIndex Update Doc Failure: Error 
while adding/updating doc with id: [{}]", contexts.get(i));
+                            LOG.error("Failure Details: BulkItem ID: {}, 
Index: {}, Failure Cause: {}",
+                                    item.id(), item.index(), item.error());
+                        } else if (!hasSuccesses) {
+                            // Set indexUpdated to true even if 1 item was 
updated successfully
+                            updatesMap.put(executionId, Boolean.TRUE);
+                            hasSuccesses = true;
                         }
-                        // Log entry to be used to parse logs to get the 
failed doc id/path if needed
-                        LOG.error("ElasticIndex Update Doc Failure: Error 
while adding/updating doc with id: [{}]", bulkItemResponse.getId());
-                        LOG.error("Failure Details: BulkItem ID: {}, Index: 
{}, Failure Cause: {}",
-                                failure.getId(), failure.getIndex(), 
failure.getCause());
-                    } else if (!hasSuccesses) {
-                        // Set indexUpdated to true even if 1 item was updated 
successfully
-                        updatesMap.put(executionId, Boolean.TRUE);
-                        hasSuccesses = true;
                     }
-                }
 
-                if (isFailedDocSetFull) {
-                    LOG.info("Cannot store all new Failed Docs because {} has 
been filled up. " +
-                            "See previous log entries to find out the details 
of failed paths", IndexDefinition.FAILED_DOC_PATHS);
-                } else if (failedDocSet.size() != initialSize) {
-                    status.setProperty(IndexDefinition.FAILED_DOC_PATHS, 
failedDocSet, Type.STRINGS);
+                    if (isFailedDocSetFull) {
+                        LOG.info("Cannot store all new Failed Docs because {} 
has been filled up. " +
+                                "See previous log entries to find out the 
details of failed paths", IndexDefinition.FAILED_DOC_PATHS);
+                    } else if (failedDocSet.size() != initialSize) {
+                        status.setProperty(IndexDefinition.FAILED_DOC_PATHS, 
failedDocSet, Type.STRINGS);
+                    }
+                } else {
+                    updatesMap.put(executionId, Boolean.TRUE);
                 }
-            } else {
-                updatesMap.put(executionId, Boolean.TRUE);
+            } finally {
+                phaser.arriveAndDeregister();
             }
-            phaser.arriveAndDeregister();
         }
 
         @Override
-        public void afterBulk(long executionId, BulkRequest bulkRequest, 
Throwable throwable) {
-            LOG.error("ElasticIndex Update Bulk Failure : Bulk with id {} 
threw an error", executionId, throwable);
-            suppressedExceptions.add(throwable);
-            phaser.arriveAndDeregister();
+        public void afterBulk(long executionId, BulkRequest request, 
List<String> contexts, Throwable failure) {
+            try {
+                LOG.error("ElasticIndex Update Bulk Failure : Bulk with id {} 
threw an error", executionId, failure);
+                suppressedErrorCauses.add(ErrorCause.of(ec -> {
+                    StringWriter sw = new StringWriter();
+                    PrintWriter pw = new PrintWriter(sw);
+                    failure.printStackTrace(pw);
+                    return 
ec.reason(failure.getMessage()).stackTrace(sw.toString());
+                }));
+            } finally {
+                phaser.arriveAndDeregister();
+            }
         }
     }
 
@@ -297,12 +311,14 @@ class ElasticBulkProcessorHandler {
      * {@link ElasticBulkProcessorHandler} extension with real time behaviour.
      * It also uses the same async bulk processor as the parent except for the 
last flush that waits until the
      * indexed documents are searchable.
+     * <p>
+     * BulkIngester does not support customization of intermediate requests. 
This means we cannot intercept the last
+     * request and apply a WAIT_UNTIL refresh policy. The workaround is to 
force a refresh when the handler is closed.
+     * We can improve this when this issue gets fixed:
+     * <a 
href="https://github.com/elastic/elasticsearch-java/issues/703";>elasticsearch-java#703</a>
      */
     protected static class RealTimeBulkProcessorHandler extends 
ElasticBulkProcessorHandler {
 
-        private final AtomicBoolean isClosed = new AtomicBoolean(false);
-        private final AtomicBoolean isDataSearchable = new 
AtomicBoolean(false);
-
         private RealTimeBulkProcessorHandler(@NotNull ElasticConnection 
elasticConnection,
                                              @NotNull String indexName,
                                              @NotNull ElasticIndexDefinition 
indexDefinition,
@@ -311,27 +327,13 @@ class ElasticBulkProcessorHandler {
             super(elasticConnection, indexName, indexDefinition, 
definitionBuilder, waitForESAcknowledgement);
         }
 
-        @Override
-        protected BiConsumer<BulkRequest, ActionListener<BulkResponse>> 
requestConsumer() {
-            return (request, bulkListener) -> {
-                if (isClosed.get()) {
-                    LOG.debug("Processor is closing. Next request with {} 
actions will block until the data is searchable",
-                            request.requests().size());
-                    
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
-                    isDataSearchable.set(true);
-                }
-                elasticConnection.getOldClient().bulkAsync(request, 
RequestOptions.DEFAULT, bulkListener);
-            };
-        }
-
         @Override
         public boolean close() throws IOException {
-            isClosed.set(true);
             // calling super closes the bulk processor. If not empty it calls 
#requestConsumer for the last time
             boolean closed = super.close();
             // it could happen that close gets called when the bulk has 
already been flushed. In these cases we trigger
             // an actual refresh to make sure the docs are searchable before 
returning from the method
-            if (totalOperations > 0 && !isDataSearchable.get()) {
+            if (totalOperations > 0) {
                 LOG.debug("Forcing refresh");
                 try {
                        this.elasticConnection.getClient().indices().refresh(b 
-> b.index(indexName));
diff --git 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticCustomAnalyzer.java
 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticCustomAnalyzer.java
index 4e2c26d40e..9eb4e8f88c 100644
--- 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticCustomAnalyzer.java
+++ 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticCustomAnalyzer.java
@@ -37,11 +37,11 @@ import 
org.apache.jackrabbit.oak.plugins.tree.factories.TreeFactory;
 import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.jackrabbit.oak.spi.state.NodeStateUtils;
+import org.apache.lucene.analysis.AbstractAnalysisFactory;
+import org.apache.lucene.analysis.CharFilterFactory;
+import org.apache.lucene.analysis.TokenFilterFactory;
 import org.apache.lucene.analysis.en.AbstractWordsFileFilterFactory;
-import org.apache.lucene.analysis.util.AbstractAnalysisFactory;
-import org.apache.lucene.analysis.util.CharFilterFactory;
-import org.apache.lucene.analysis.util.ResourceLoader;
-import org.apache.lucene.analysis.util.TokenFilterFactory;
+import org.apache.lucene.util.ResourceLoader;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.slf4j.Logger;
diff --git 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticCustomAnalyzerMappings.java
 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticCustomAnalyzerMappings.java
index ea79c50086..e8adc6f2e2 100644
--- 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticCustomAnalyzerMappings.java
+++ 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticCustomAnalyzerMappings.java
@@ -16,6 +16,7 @@
  */
 package org.apache.jackrabbit.oak.plugins.index.elastic.index;
 
+import org.apache.lucene.analysis.AbstractAnalysisFactory;
 import org.apache.lucene.analysis.charfilter.MappingCharFilterFactory;
 import org.apache.lucene.analysis.cjk.CJKBigramFilterFactory;
 import org.apache.lucene.analysis.commongrams.CommonGramsFilterFactory;
@@ -33,7 +34,6 @@ import 
org.apache.lucene.analysis.pattern.PatternCaptureGroupFilterFactory;
 import org.apache.lucene.analysis.payloads.DelimitedPayloadTokenFilterFactory;
 import org.apache.lucene.analysis.shingle.ShingleFilterFactory;
 import org.apache.lucene.analysis.synonym.SynonymFilterFactory;
-import org.apache.lucene.analysis.util.AbstractAnalysisFactory;
 import org.apache.lucene.analysis.util.ElisionFilterFactory;
 import org.jetbrains.annotations.Nullable;
 
@@ -76,7 +76,7 @@ public class ElasticCustomAnalyzerMappings {
     static {
         CONTENT_TRANSFORMERS = new LinkedHashMap<>();
         CONTENT_TRANSFORMERS.put("mapping", line -> {
-            if (line.length() == 0 || line.startsWith("#")) {
+            if (line.isEmpty() || line.startsWith("#")) {
                 return null;
             } else {
                 return line.replaceAll("\"", "");
diff --git 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticDocument.java
 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticDocument.java
index c81d2c28d7..06aa0739a8 100644
--- 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticDocument.java
+++ 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticDocument.java
@@ -16,16 +16,14 @@
  */
 package org.apache.jackrabbit.oak.plugins.index.elastic.index;
 
+import com.fasterxml.jackson.annotation.JsonAnyGetter;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.jackrabbit.oak.api.Blob;
 import org.apache.jackrabbit.oak.commons.PathUtils;
 import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
 import org.apache.jackrabbit.oak.plugins.index.search.FieldNames;
 import 
org.apache.jackrabbit.oak.plugins.index.search.spi.binary.BlobByteSource;
-import org.elasticsearch.common.Strings;
-import org.elasticsearch.xcontent.XContentBuilder;
-import org.elasticsearch.xcontent.XContentFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Map;
@@ -35,17 +33,23 @@ import java.util.LinkedHashSet;
 
 import static 
org.apache.jackrabbit.oak.plugins.index.elastic.util.ElasticIndexUtils.toDoubles;
 
+@JsonInclude(JsonInclude.Include.NON_EMPTY)
 public class ElasticDocument {
-    private static final Logger LOG = 
LoggerFactory.getLogger(ElasticDocument.class);
 
-    private final String path;
-    private final Set<String> fulltext;
-    private final Set<String> suggest;
-    private final Set<String> spellcheck;
-    private final Map<String, Set<Object>> properties;
-    private final Map<String, Object> similarityFields;
-    private final Map<String, Map<String, Double>> dynamicBoostFields;
-    private final Set<String> similarityTags;
+    @JsonProperty(FieldNames.PATH)
+    public final String path;
+    @JsonProperty(FieldNames.FULLTEXT)
+    public final Set<String> fulltext;
+    @JsonProperty(FieldNames.SUGGEST)
+    public final Set<Map<String, String>> suggest;
+    @JsonProperty(FieldNames.SPELLCHECK)
+    public final Set<String> spellcheck;
+    @JsonProperty(ElasticIndexDefinition.DYNAMIC_BOOST_FULLTEXT)
+    public final Set<String> dbFullText;
+    @JsonProperty(ElasticIndexDefinition.SIMILARITY_TAGS)
+    public final Set<String> similarityTags;
+    // these are dynamic properties that need to be added to the document 
unwrapped. See the use of @JsonAnyGetter in the getter
+    private final Map<String, Object> properties;
 
     ElasticDocument(String path) {
         this.path = path;
@@ -53,8 +57,7 @@ public class ElasticDocument {
         this.suggest = new LinkedHashSet<>();
         this.spellcheck = new LinkedHashSet<>();
         this.properties = new HashMap<>();
-        this.similarityFields = new HashMap<>();
-        this.dynamicBoostFields = new HashMap<>();
+        this.dbFullText = new LinkedHashSet<>();
         this.similarityTags = new LinkedHashSet<>();
     }
 
@@ -67,24 +70,40 @@ public class ElasticDocument {
     }
 
     void addSuggest(String value) {
-        suggest.add(value);
+        suggest.add(Map.of(ElasticIndexHelper.SUGGEST_NESTED_VALUE, value));
     }
 
     void addSpellcheck(String value) {
         spellcheck.add(value);
     }
 
-    // ES for String values (that are not interpreted as date or numbers etc) 
would analyze in the same
+    // ES for String values (that are not interpreted as date or numbers etc.) 
would analyze in the same
     // field and would index a sub-field "keyword" for non-analyzed value.
     // ref: https://www.elastic.co/blog/strings-are-dead-long-live-strings
-    // (interpretation of date etc: 
https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-field-mapping.html)
+    // (interpretation of date etc.: 
https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-field-mapping.html)
     void addProperty(String fieldName, Object value) {
-        properties.computeIfAbsent(fieldName, s -> new 
LinkedHashSet<>()).add(value);
+        Object existingValue = properties.get(fieldName);
+        Object finalValue;
+
+        if (existingValue == null) {
+            finalValue = value;
+        } else if (existingValue instanceof Set) {
+            Set<Object> existingSet = (Set<Object>) existingValue;
+            existingSet.add(value);
+            finalValue = existingSet;
+        } else {
+            Set<Object> set = new LinkedHashSet<>();
+            set.add(existingValue);
+            set.add(value);
+            finalValue = set.size() == 1 ? set.iterator().next() : set;
+        }
+
+        properties.put(fieldName, finalValue);
     }
 
     void addSimilarityField(String name, Blob value) throws IOException {
         byte[] bytes = new BlobByteSource(value).read();
-        similarityFields.put(FieldNames.createSimilarityFieldName(name), 
toDoubles(bytes));
+        addProperty(FieldNames.createSimilarityFieldName(name), 
toDoubles(bytes));
     }
 
     void indexAncestors(String path) {
@@ -96,76 +115,25 @@ public class ElasticDocument {
     }
 
     void addDynamicBoostField(String propName, String value, double boost) {
-        dynamicBoostFields.computeIfAbsent(propName, s -> new HashMap<>())
-                .putIfAbsent(value, boost);
+        addProperty(propName,
+                Map.of(
+                        ElasticIndexHelper.DYNAMIC_BOOST_NESTED_VALUE, value,
+                        ElasticIndexHelper.DYNAMIC_BOOST_NESTED_BOOST, boost
+                )
+        );
+
+        // add value into the dynamic boost specific fulltext field. We cannot 
add this in the standard
+        // field since dynamic boosted terms require lower weight compared to 
standard terms
+        dbFullText.add(value);
     }
 
     void addSimilarityTag(String value) {
         similarityTags.add(value);
     }
 
-    public String build() {
-        String ret;
-        try {
-            XContentBuilder builder = XContentFactory.jsonBuilder();
-            builder.startObject();
-            {
-                builder.field(FieldNames.PATH, path);
-                Set<String> dbFullText = new LinkedHashSet<>();
-                for (Map.Entry<String, Map<String, Double>> f : 
dynamicBoostFields.entrySet()) {
-                    builder.startArray(f.getKey());
-                    for (Map.Entry<String, Double> v : 
f.getValue().entrySet()) {
-                        builder.startObject();
-                        builder.field("value", v.getKey());
-                        builder.field("boost", v.getValue());
-                        builder.endObject();
-                        // add value into the dynamic boost specific fulltext 
field. We cannot add this in the standard
-                        // field since dynamic boosted terms require lower 
weight compared to standard terms
-                        dbFullText.add(v.getKey());
-                    }
-                    builder.endArray();
-                }
-                if (dbFullText.size() > 0) {
-                    
builder.field(ElasticIndexDefinition.DYNAMIC_BOOST_FULLTEXT, dbFullText);
-                }
-                if (fulltext.size() > 0) {
-                    builder.field(FieldNames.FULLTEXT, fulltext);
-                }
-                if (suggest.size() > 0) {
-                    builder.startArray(FieldNames.SUGGEST);
-                    for (String val : suggest) {
-                        builder.startObject().field("value", val).endObject();
-                    }
-                    builder.endArray();
-                }
-                if (spellcheck.size() > 0) {
-                    builder.field(FieldNames.SPELLCHECK, spellcheck);
-                }
-                for (Map.Entry<String, Object> simProp: 
similarityFields.entrySet()) {
-                    builder.field(simProp.getKey(), simProp.getValue());
-                }
-                for (Map.Entry<String, Set<Object>> prop : 
properties.entrySet()) {
-                    builder.field(prop.getKey(), prop.getValue().size() == 1 ? 
prop.getValue().iterator().next() : prop.getValue());
-                }
-                if (!similarityTags.isEmpty()) {
-                    builder.field(ElasticIndexDefinition.SIMILARITY_TAGS, 
similarityTags);
-                }
-            }
-            builder.endObject();
-
-            ret = Strings.toString(builder);
-        } catch (IOException e) {
-            LOG.error("Error serializing document - path: {}, properties: {}, 
fulltext: {}, suggest: {}",
-                    path, properties, fulltext, suggest, e);
-            ret = null;
-        }
-
-        return ret;
-    }
-
-    @Override
-    public String toString() {
-        return build();
+    @JsonAnyGetter
+    public Map<String, Object> getProperties() {
+        return properties;
     }
 
 }
diff --git 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexHelper.java
 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexHelper.java
index 187f49beee..254d3a0e88 100644
--- 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexHelper.java
+++ 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexHelper.java
@@ -23,6 +23,7 @@ import 
co.elastic.clients.elasticsearch._types.mapping.TypeMapping;
 import co.elastic.clients.elasticsearch.indices.CreateIndexRequest;
 import co.elastic.clients.elasticsearch.indices.IndexSettings;
 import co.elastic.clients.elasticsearch.indices.IndexSettingsAnalysis;
+import co.elastic.clients.elasticsearch.indices.PutIndicesSettingsRequest;
 import co.elastic.clients.json.JsonData;
 import co.elastic.clients.util.ObjectBuilder;
 import org.apache.jackrabbit.oak.api.Type;
@@ -30,8 +31,6 @@ import 
org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
 import 
org.apache.jackrabbit.oak.plugins.index.elastic.ElasticPropertyDefinition;
 import org.apache.jackrabbit.oak.plugins.index.search.FieldNames;
 import org.apache.jackrabbit.oak.plugins.index.search.PropertyDefinition;
-import 
org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
-import org.elasticsearch.common.settings.Settings;
 import org.jetbrains.annotations.NotNull;
 
 import java.io.Reader;
@@ -59,10 +58,17 @@ class ElasticIndexHelper {
     // Unset the refresh interval and disable replicas at index creation to 
optimize for initial loads
     // 
https://www.elastic.co/guide/en/elasticsearch/reference/current/tune-for-indexing-speed.html
     private static final Time INITIAL_REFRESH_INTERVAL = Time.of(b -> 
b.time("-1"));
+
     private static final String INITIAL_NUMBER_OF_REPLICAS = "0";
 
     private static final String OAK_WORD_DELIMITER_GRAPH_FILTER = 
"oak_word_delimiter_graph_filter";
 
+    protected static final String SUGGEST_NESTED_VALUE = "value";
+
+    protected static final String DYNAMIC_BOOST_NESTED_VALUE = "value";
+
+    protected static final String DYNAMIC_BOOST_NESTED_BOOST = "boost";
+
     /**
      * Returns a {@code CreateIndexRequest} with settings and mappings 
translated from the specified {@code ElasticIndexDefinition}.
      * The returned object can be used to create and index optimized for bulk 
loads (eg: reindexing) but not for queries.
@@ -121,29 +127,28 @@ class ElasticIndexHelper {
 
 
     /**
-     * Returns a {@code UpdateSettingsRequest} to make an index ready to be 
queried and updated in near real time.
+     * Returns a {@code PutIndicesSettingsRequest} to make an index ready to 
be queried and updated in near real time.
      *
      * @param remoteIndexName the final index name (no alias)
      * @param indexDefinition the definition used to read settings/mappings
-     * @return an {@code UpdateSettingsRequest}
-     * <p>
-     * TODO: migrate to Elasticsearch Java client when the following issue 
will be fixed
-     * <a 
href="https://github.com/elastic/elasticsearch-java/issues/283";>https://github.com/elastic/elasticsearch-java/issues/283</a>
+     * @return an {@code PutIndicesSettingsRequest}
      */
-    public static UpdateSettingsRequest enableIndexRequest(String 
remoteIndexName, ElasticIndexDefinition indexDefinition) {
-        UpdateSettingsRequest request = new 
UpdateSettingsRequest(remoteIndexName);
-
-        Settings.Builder settingsBuilder = Settings.builder()
-                .putNull("index.refresh_interval") // null=reset a setting 
back to the default value
-                .put("index.number_of_replicas", 
indexDefinition.numberOfReplicas);
+    public static PutIndicesSettingsRequest enableIndexRequest(String 
remoteIndexName, ElasticIndexDefinition indexDefinition) {
+        IndexSettings indexSettings = IndexSettings.of(is -> is
+                
.numberOfReplicas(Integer.toString(indexDefinition.numberOfReplicas))
+                // TODO: we should pass null to reset the refresh interval to 
the default value but the following bug prevents it. We need to wait for a fix
+                // <a 
href="https://github.com/elastic/elasticsearch-java/issues/283";>https://github.com/elastic/elasticsearch-java/issues/283</a>
+                .refreshInterval(Time.of(t -> t.time("1s"))));
 
-        return request.settings(settingsBuilder);
+        return PutIndicesSettingsRequest.of(pisr -> pisr
+                .index(remoteIndexName)
+                .settings(indexSettings));
     }
 
 
     private static ObjectBuilder<IndexSettings> loadSettings(@NotNull 
IndexSettings.Builder builder,
                                                              @NotNull 
ElasticIndexDefinition indexDefinition) {
-        if (indexDefinition.getSimilarityProperties().size() > 0) {
+        if (!indexDefinition.getSimilarityProperties().isEmpty()) {
             builder.otherSettings(ElasticIndexDefinition.ELASTIKNN, 
JsonData.of(true));
         }
 
@@ -246,7 +251,7 @@ class ElasticIndexHelper {
                 builder.properties(FieldNames.SUGGEST,
                         b1 -> b1.nested(
                                 // TODO: evaluate 
https://www.elastic.co/guide/en/elasticsearch/reference/current/faster-prefix-queries.html
-                                b2 -> b2.properties("value",
+                                b2 -> b2.properties(SUGGEST_NESTED_VALUE,
                                         b3 -> b3.text(
                                                 b4 -> 
b4.analyzer("oak_analyzer")
                                         )
@@ -258,10 +263,10 @@ class ElasticIndexHelper {
             for (PropertyDefinition pd : 
indexDefinition.getDynamicBoostProperties()) {
                 builder.properties(pd.nodeName,
                         b1 -> b1.nested(
-                                b2 -> b2.properties("value",
+                                b2 -> b2.properties(DYNAMIC_BOOST_NESTED_VALUE,
                                                 b3 -> b3.text(
                                                         b4 -> 
b4.analyzer("oak_analyzer")))
-                                        .properties("boost",
+                                        .properties(DYNAMIC_BOOST_NESTED_BOOST,
                                                 b3 -> b3.double_(f -> f)
                                         )
                         )
diff --git 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriter.java
 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriter.java
index 3ae8ec3e3d..13968326f7 100644
--- 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriter.java
+++ 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriter.java
@@ -17,11 +17,14 @@
 package org.apache.jackrabbit.oak.plugins.index.elastic.index;
 
 import co.elastic.clients.elasticsearch._types.AcknowledgedResponseBase;
+import co.elastic.clients.elasticsearch._types.ElasticsearchException;
 import co.elastic.clients.elasticsearch.indices.CreateIndexRequest;
 import co.elastic.clients.elasticsearch.indices.CreateIndexResponse;
 import co.elastic.clients.elasticsearch.indices.DeleteIndexResponse;
 import co.elastic.clients.elasticsearch.indices.ElasticsearchIndicesClient;
 import co.elastic.clients.elasticsearch.indices.GetAliasResponse;
+import co.elastic.clients.elasticsearch.indices.PutIndicesSettingsRequest;
+import co.elastic.clients.elasticsearch.indices.PutIndicesSettingsResponse;
 import co.elastic.clients.elasticsearch.indices.UpdateAliasesRequest;
 import co.elastic.clients.elasticsearch.indices.UpdateAliasesResponse;
 import co.elastic.clients.json.JsonpUtils;
@@ -38,15 +41,6 @@ import 
org.apache.jackrabbit.oak.plugins.index.importer.AsyncLaneSwitcher;
 import 
org.apache.jackrabbit.oak.plugins.index.search.spi.editor.FulltextIndexWriter;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
-import org.elasticsearch.ElasticsearchStatusException;
-import 
org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
-import org.elasticsearch.action.delete.DeleteRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.support.master.AcknowledgedResponse;
-import org.elasticsearch.client.IndicesClient;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.common.Strings;
-import org.elasticsearch.xcontent.XContentType;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.TestOnly;
 import org.slf4j.Logger;
@@ -57,18 +51,13 @@ import java.util.ArrayList;
 import java.util.Set;
 import java.util.UUID;
 
-import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS;
-import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
-
 class ElasticIndexWriter implements FulltextIndexWriter<ElasticDocument> {
     private static final Logger LOG = 
LoggerFactory.getLogger(ElasticIndexWriter.class);
 
     private final ElasticIndexTracker indexTracker;
     private final ElasticConnection elasticConnection;
     private final ElasticIndexDefinition indexDefinition;
-
     private final ElasticBulkProcessorHandler bulkProcessorHandler;
-
     private final boolean reindex;
     private final String indexName;
 
@@ -134,16 +123,12 @@ class ElasticIndexWriter implements 
FulltextIndexWriter<ElasticDocument> {
 
     @Override
     public void updateDocument(String path, ElasticDocument doc) throws 
IOException {
-        IndexRequest request = new IndexRequest(indexName)
-                .id(ElasticIndexUtils.idFromPath(path))
-                .source(doc.build(), XContentType.JSON);
-        bulkProcessorHandler.add(request);
+        bulkProcessorHandler.update(ElasticIndexUtils.idFromPath(path), doc);
     }
 
     @Override
     public void deleteDocuments(String path) throws IOException {
-        DeleteRequest request = new 
DeleteRequest(indexName).id(ElasticIndexUtils.idFromPath(path));
-        bulkProcessorHandler.add(request);
+        bulkProcessorHandler.delete(ElasticIndexUtils.idFromPath(path));
     }
 
     @Override
@@ -196,12 +181,12 @@ class ElasticIndexWriter implements 
FulltextIndexWriter<ElasticDocument> {
             final CreateIndexResponse response = esClient.create(request);
             LOG.info("Created index {}. Response acknowledged: {}", indexName, 
response.acknowledged());
             checkResponseAcknowledgement(response, "Create index call not 
acknowledged for index " + indexName);
-        } catch (ElasticsearchStatusException ese) {
+        } catch (ElasticsearchException ese) {
             // We already check index existence as first thing in this method, 
if we get here it means we have got into
             // a conflict (eg: multiple cluster nodes provision concurrently).
             // Elasticsearch does not have a CREATE IF NOT EXIST, need to 
inspect exception
             // https://github.com/elastic/elasticsearch/issues/19862
-            if (ese.status().getStatus() == 400 && 
ese.getDetailedMessage().contains("resource_already_exists_exception")) {
+            if (ese.status() == 400 && 
ese.getMessage().contains("resource_already_exists_exception")) {
                 LOG.warn("Index {} already exists. Ignoring error", indexName);
             } else {
                 throw ese;
@@ -216,14 +201,12 @@ class ElasticIndexWriter implements 
FulltextIndexWriter<ElasticDocument> {
             throw new IllegalStateException("cannot enable an index that does 
not exist");
         }
 
-        UpdateSettingsRequest request = 
ElasticIndexHelper.enableIndexRequest(indexName, indexDefinition);
+        PutIndicesSettingsRequest request = 
ElasticIndexHelper.enableIndexRequest(indexName, indexDefinition);
         if (LOG.isDebugEnabled()) {
-            final String requestMsg = 
Strings.toString(request.toXContent(jsonBuilder(), EMPTY_PARAMS));
-            LOG.debug("Updating Index Settings with request {}", requestMsg);
+            LOG.debug("Updating Index Settings with request {}", request);
         }
-        IndicesClient oldClient = elasticConnection.getOldClient().indices();
-        AcknowledgedResponse response = oldClient.putSettings(request, 
RequestOptions.DEFAULT);
-        LOG.info("Updated settings for index {}. Response acknowledged: {}", 
indexName, response.isAcknowledged());
+        PutIndicesSettingsResponse response = client.putSettings(request);
+        LOG.info("Updated settings for index {}. Response acknowledged: {}", 
indexName, response.acknowledged());
         checkResponseAcknowledgement(response, "Update index settings call not 
acknowledged for index " + indexName);
 
         // update the alias
@@ -246,12 +229,6 @@ class ElasticIndexWriter implements 
FulltextIndexWriter<ElasticDocument> {
         deleteOldIndices(client, aliasResponse.result().keySet());
     }
 
-    private void checkResponseAcknowledgement(AcknowledgedResponse response, 
String exceptionMessage) {
-        if (!response.isAcknowledged()) {
-            throw new IllegalStateException(exceptionMessage);
-        }
-    }
-
     private void checkResponseAcknowledgement(AcknowledgedResponseBase 
response, String exceptionMessage) {
         if (!response.acknowledged()) {
             throw new IllegalStateException(exceptionMessage);
@@ -265,7 +242,7 @@ class ElasticIndexWriter implements 
FulltextIndexWriter<ElasticDocument> {
     }
 
     private void deleteOldIndices(ElasticsearchIndicesClient indicesClient, 
Set<String> indices) throws IOException {
-        if (indices.size() == 0)
+        if (indices.isEmpty())
             return;
         DeleteIndexResponse deleteIndexResponse = indicesClient.delete(db -> 
db.index(new ArrayList<>(indices)));
         checkResponseAcknowledgement(deleteIndexResponse, "Delete index call 
not acknowledged for indices " + indices);
diff --git 
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticPropertyIndexTest.java
 
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticPropertyIndexTest.java
index cef7ae6c43..64c21c7377 100644
--- 
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticPropertyIndexTest.java
+++ 
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticPropertyIndexTest.java
@@ -36,7 +36,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 public class ElasticPropertyIndexTest extends ElasticAbstractQueryTest {
 
     @Test
-    public void testBulkProcessorEventsFlushLimit() throws Exception {
+    public void bulkProcessorEventsFlushLimit() throws Exception {
         setIndex("test1", createIndex("propa"));
 
         Tree test = root.getTree("/").addChild("test");
@@ -46,7 +46,7 @@ public class ElasticPropertyIndexTest extends 
ElasticAbstractQueryTest {
         root.commit();
 
         // 250 is the default flush limit for bulk processor, and we added 
just less than 250 nodes
-        // So once the index writer is closed , bulk Processor would be closed 
and all the 248 entries should be flushed.
+        // So once the index writer is closed, bulk Processor would be closed 
and all the 248 entries should be flushed.
         // Make sure that the last entry is indexed correctly.
         String propaQuery = "select [jcr:path] from [nt:base] where [propa] = 
'foo248'";
         assertEventually(() -> {
@@ -68,7 +68,7 @@ public class ElasticPropertyIndexTest extends 
ElasticAbstractQueryTest {
     }
 
     @Test
-    public void testBulkProcessorSizeFlushLimit() throws Exception {
+    public void bulkProcessorSizeFlushLimit() throws Exception {
         LogCustomizer customLogger = LogCustomizer
                 .forLogger(
                         
"org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticBulkProcessorHandler")
@@ -105,8 +105,8 @@ public class ElasticPropertyIndexTest extends 
ElasticAbstractQueryTest {
                 assertQuery(propaQuery, List.of("/test/a" + 
docCountBreachingBulkSize));
             });
 
-            Assert.assertEquals(1, customLogger.getLogs().stream().filter(n -> 
n.contains("Bulk with id 2 processed with status OK in")).count());
-            Assert.assertEquals(0, customLogger.getLogs().stream().filter(n -> 
n.contains("Bulk with id 3 processed with status OK in")).count());
+            Assert.assertEquals(1, customLogger.getLogs().stream().filter(n -> 
n.contains("Bulk with id 2 processed in")).count());
+            Assert.assertEquals(0, customLogger.getLogs().stream().filter(n -> 
n.contains("Bulk with id 3 processed in")).count());
         } finally {
             customLogger.finished();
         }
diff --git 
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticTestServer.java
 
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticTestServer.java
index f0cf5da8d1..f5eb861030 100644
--- 
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticTestServer.java
+++ 
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticTestServer.java
@@ -42,8 +42,6 @@ import static org.junit.Assume.assumeNotNull;
 public class ElasticTestServer implements AutoCloseable {
     private static final Logger LOG = 
LoggerFactory.getLogger(ElasticTestServer.class);
     private static final Map<String, String> 
PLUGIN_OFFICIAL_RELEASES_DIGEST_MAP = Map.of(
-            "7.17.3.0", 
"5e3b40bb72b2813f927be9bf6ecdf88668d89d2ef20c7ebafaa51ab8407fd179",
-            "7.17.6.0", 
"326893bb98ef1a0c569d9f4c4a9a073e53361924f990b17e87077985ce8a7478",
             "7.17.7.0", 
"4252eb55cc7775f1b889d624ac335abfa2e357931c40d0accb4d520144246b8b",
             "8.3.3.0", 
"14d3223456f4b9f00f86628ec8400cb46513935e618ae0f5d0d1088739ccc233",
             "8.4.1.0", 
"56797a1bac6ceeaa36d2358f818b14633124d79c5e04630fa3544603d82eaa01",
@@ -51,7 +49,9 @@ public class ElasticTestServer implements AutoCloseable {
             "8.4.3.0", 
"5c00d43cdd56c5c5d8e9032ad507acea482fb5ca9445861c5cc12ad63af66425",
             "8.5.3.0", 
"d4c13f68650f9df5ff8c74ec83abc2e416de9c45f991d459326e0e2baf7b0e3f",
             "8.7.0.0", 
"7aeac9b7ac4dea1ded3f8e477e26bcc7fe62e313edf6352f4bdf973c43d25819",
-            "8.7.1.0", 
"80c8d34334b0cf4def79835ea6dab78b59ba9ee54c8f5f3cba0bde53123d7820");
+            "8.7.1.0", 
"80c8d34334b0cf4def79835ea6dab78b59ba9ee54c8f5f3cba0bde53123d7820",
+            "8.10.4.0", 
"b2ae8faf1e272319594b4d47a72580fa4f61a5c11cbc8d3f13453fd34b153441",
+            "8.11.0.0", 
"8d4d80b850c4da4da6dfe2d675b2e2355d2014307f8bdc54cc1b34323c81c7ae");
 
     private static final ElasticTestServer SERVER = new ElasticTestServer();
     private static volatile ElasticsearchContainer CONTAINER;
diff --git 
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandlerTest.java
 
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandlerTest.java
index 1f9575bb73..eb37ce415f 100644
--- 
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandlerTest.java
+++ 
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandlerTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.jackrabbit.oak.plugins.index.elastic.index;
 
+import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
 import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
 import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
 import org.apache.jackrabbit.oak.plugins.memory.MultiStringPropertyState;
@@ -23,6 +24,7 @@ import 
org.apache.jackrabbit.oak.plugins.memory.StringPropertyState;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
@@ -32,7 +34,7 @@ import java.util.Arrays;
 import java.util.Collections;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.junit.Assert.assertThat;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.when;
 
@@ -47,17 +49,28 @@ public class ElasticBulkProcessorHandlerTest {
     @Mock
     private ElasticConnection elasticConnectionMock;
 
+    @Mock
+    private ElasticsearchAsyncClient esAsyncClientMock;
+
     @Mock
     private NodeBuilder definitionBuilder;
 
     @Mock
     private CommitInfo commitInfo;
 
+    private AutoCloseable closeable;
+
     @Before
     public void setUp() {
-        MockitoAnnotations.initMocks(this);
+        closeable = MockitoAnnotations.openMocks(this);
         
when(indexDefinitionMock.getDefinitionNodeState()).thenReturn(definitionNodeStateMock);
         when(commitInfo.getInfo()).thenReturn(Collections.emptyMap());
+        
when(elasticConnectionMock.getAsyncClient()).thenReturn(esAsyncClientMock);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        closeable.close();
     }
 
     @Test
diff --git 
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterTest.java
 
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterTest.java
index a51eda1994..8997b88af8 100644
--- 
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterTest.java
+++ 
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterTest.java
@@ -19,9 +19,7 @@ package org.apache.jackrabbit.oak.plugins.index.elastic.index;
 import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
 import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
 import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexTracker;
-import org.elasticsearch.action.DocWriteRequest;
-import org.elasticsearch.action.delete.DeleteRequest;
-import org.elasticsearch.action.index.IndexRequest;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
@@ -35,6 +33,8 @@ import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.number.OrderingComparison.lessThan;
 import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -55,35 +55,41 @@ public class ElasticIndexWriterTest {
 
     private ElasticIndexWriter indexWriter;
 
+    private AutoCloseable closeable;
+
     @Before
     public void setUp() {
-        MockitoAnnotations.initMocks(this);
+        closeable = MockitoAnnotations.openMocks(this);
         when(indexDefinitionMock.getIndexAlias()).thenReturn("test-index");
         indexWriter = new ElasticIndexWriter(indexTrackerMock, 
elasticConnectionMock, indexDefinitionMock, bulkProcessorHandlerMock);
     }
 
+    @After
+    public void tearDown() throws Exception {
+        closeable.close();
+    }
+
     @Test
     public void singleUpdateDocument() throws IOException {
         indexWriter.updateDocument("/foo", new ElasticDocument("/foo"));
 
-        ArgumentCaptor<IndexRequest> acIndexRequest = 
ArgumentCaptor.forClass(IndexRequest.class);
-        verify(bulkProcessorHandlerMock).add(acIndexRequest.capture());
+        ArgumentCaptor<ElasticDocument> esDocumentCaptor = 
ArgumentCaptor.forClass(ElasticDocument.class);
+        ArgumentCaptor<String> idCaptor = 
ArgumentCaptor.forClass(String.class);
+        verify(bulkProcessorHandlerMock).update(idCaptor.capture(), 
esDocumentCaptor.capture());
 
-        IndexRequest request = acIndexRequest.getValue();
-        assertEquals("test-index", request.index());
-        assertEquals("/foo", request.id());
+        assertEquals("/foo", idCaptor.getValue());
+        assertEquals("/foo", esDocumentCaptor.getValue().path);
     }
 
     @Test
     public void singleDeleteDocument() throws IOException {
         indexWriter.deleteDocuments("/bar");
 
-        ArgumentCaptor<DeleteRequest> acDeleteRequest = 
ArgumentCaptor.forClass(DeleteRequest.class);
-        verify(bulkProcessorHandlerMock).add(acDeleteRequest.capture());
+        ArgumentCaptor<String> idCaptor = 
ArgumentCaptor.forClass(String.class);
+        verify(bulkProcessorHandlerMock).delete(idCaptor.capture());
 
-        DeleteRequest request = acDeleteRequest.getValue();
-        assertEquals("test-index", request.index());
-        assertEquals("/bar", request.id());
+        String id = idCaptor.getValue();
+        assertEquals("/bar", id);
     }
 
     @Test
@@ -93,8 +99,8 @@ public class ElasticIndexWriterTest {
         indexWriter.deleteDocuments("/foo");
         indexWriter.deleteDocuments("/bar");
 
-        ArgumentCaptor<DocWriteRequest<?>> request = 
ArgumentCaptor.forClass(DocWriteRequest.class);
-        verify(bulkProcessorHandlerMock, times(4)).add(request.capture());
+        verify(bulkProcessorHandlerMock, times(2)).update(anyString(), 
any(ElasticDocument.class));
+        verify(bulkProcessorHandlerMock, times(2)).delete(anyString());
     }
 
     @Test
@@ -103,12 +109,12 @@ public class ElasticIndexWriterTest {
 
         indexWriter.updateDocument(generatedPath, new 
ElasticDocument(generatedPath));
 
-        ArgumentCaptor<IndexRequest> acIndexRequest = 
ArgumentCaptor.forClass(IndexRequest.class);
-        verify(bulkProcessorHandlerMock).add(acIndexRequest.capture());
+        ArgumentCaptor<String> idCaptor = 
ArgumentCaptor.forClass(String.class);
+        verify(bulkProcessorHandlerMock).update(idCaptor.capture(), 
any(ElasticDocument.class));
 
-        IndexRequest request = acIndexRequest.getValue();
-        assertThat(request.id(), not(generatedPath));
-        assertThat(request.id().length(), lessThan(513));
+        String id = idCaptor.getValue();
+        assertThat(id, not(generatedPath));
+        assertThat(id.length(), lessThan(513));
     }
 
     @Test

Reply via email to