This is an automated email from the ASF dual-hosted git repository.
fortino pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/trunk by this push:
new d1cd2bc18e OAK-10539: oak-search-elastic: migrate ingestion from Rest
High Level Client to the new Java API Client (#1193)
d1cd2bc18e is described below
commit d1cd2bc18ef3a41dc0c22c078e4785631856a2ac
Author: Fabrizio Fortino <[email protected]>
AuthorDate: Tue Nov 14 09:07:22 2023 +0100
OAK-10539: oak-search-elastic: migrate ingestion from Rest High Level
Client to the new Java API Client (#1193)
* OAK-10539: bump up ES clients
* OAK-10539: increase max jar size in oak-run-elastic
* OAK-10539: BulkProcessor -> BulkIngester migration (wip)
* OAK-10539: fix failing tests
* OAK-10539: remove use of RHLC
* OAK-10539: reduce max.jar.size in oak-run-elastic
* OAK-10539: minor cleanup + improved docs
* OAK-10539: minor cleanup
* OAK-10539: fix tests in oak-run-elastic
* OAK-10539: (minor) align test names
* OAK-10539: improved interruptions handling in bulk processor
* OAK-10539: code review improvements
* OAK-10539: remove unused bulk retries properties from elastic index
definition
* OAK-10539: use constants for nested field names
* OAK-10539: revise interrupted exception handling
* OAK-10539: bump es to 8.11.0
---
oak-run-elastic/pom.xml | 3 +-
.../index/indexer/document/ElasticIndexerTest.java | 25 ++-
.../ElasticPurgeOldIndexVersionTest.java | 3 +-
oak-search-elastic/pom.xml | 88 +-------
.../plugins/index/elastic/ElasticConnection.java | 23 +-
.../index/elastic/ElasticIndexDefinition.java | 12 +-
.../elastic/index/ElasticBulkProcessorHandler.java | 242 +++++++++++----------
.../index/elastic/index/ElasticCustomAnalyzer.java | 8 +-
.../index/ElasticCustomAnalyzerMappings.java | 4 +-
.../index/elastic/index/ElasticDocument.java | 138 +++++-------
.../index/elastic/index/ElasticIndexHelper.java | 41 ++--
.../index/elastic/index/ElasticIndexWriter.java | 47 +---
.../index/elastic/ElasticPropertyIndexTest.java | 10 +-
.../plugins/index/elastic/ElasticTestServer.java | 6 +-
.../index/ElasticBulkProcessorHandlerTest.java | 17 +-
.../elastic/index/ElasticIndexWriterTest.java | 48 ++--
16 files changed, 303 insertions(+), 412 deletions(-)
diff --git a/oak-run-elastic/pom.xml b/oak-run-elastic/pom.xml
index e7e12b7645..316b9b48a9 100644
--- a/oak-run-elastic/pom.xml
+++ b/oak-run-elastic/pom.xml
@@ -37,8 +37,9 @@
105 MB : Setting constraint to default oak-run jar post adding the
build plugin to rename the fat jar with embedded dependencies as the default
jar.
121 MB : add Elasticsearch Java client along with RHLC: the latter can
be removed when the code can be fully migrated to use the new client
125 MB : shaded Guava
+ 85 MB : remove Elasticsearch RHLC
-->
- <max.jar.size>125000000</max.jar.size>
+ <max.jar.size>85000000</max.jar.size>
</properties>
diff --git
a/oak-run-elastic/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/ElasticIndexerTest.java
b/oak-run-elastic/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/ElasticIndexerTest.java
index 2b5270bdb2..e447aa834c 100644
---
a/oak-run-elastic/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/ElasticIndexerTest.java
+++
b/oak-run-elastic/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/ElasticIndexerTest.java
@@ -18,6 +18,10 @@
*/
package org.apache.jackrabbit.oak.index.indexer.document;
+import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
+import co.elastic.clients.json.JsonpMapper;
+import jakarta.json.spi.JsonProvider;
+import jakarta.json.stream.JsonGenerator;
import org.apache.jackrabbit.oak.index.IndexHelper;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
@@ -34,17 +38,19 @@ import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.junit.Test;
+import java.io.OutputStream;
+
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static
org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
import static org.apache.jackrabbit.oak.InitialContentHelper.INITIAL_CONTENT;
+import static org.mockito.Mockito.when;
public class ElasticIndexerTest {
- private NodeState root = INITIAL_CONTENT;
-
@Test
public void nodeIndexed_WithIncludedPaths() throws Exception {
ElasticIndexDefinitionBuilder idxb = new
ElasticIndexDefinitionBuilder();
@@ -52,11 +58,20 @@ public class ElasticIndexerTest {
idxb.includedPaths("/content");
NodeState defn = idxb.build();
- IndexDefinition idxDefn = new ElasticIndexDefinition(root, defn,
"/oak:index/testIndex", "testPrefix");
+ IndexDefinition idxDefn = new ElasticIndexDefinition(INITIAL_CONTENT,
defn, "/oak:index/testIndex", "testPrefix");
+
+ NodeBuilder builder = INITIAL_CONTENT.builder();
- NodeBuilder builder = root.builder();
+ ElasticConnection elasticConnectionMock =
mock(ElasticConnection.class);
+ ElasticsearchAsyncClient elasticsearchAsyncClientMock =
mock(ElasticsearchAsyncClient.class);
+ JsonpMapper jsonMapperMock = mock(JsonpMapper.class);
+ JsonProvider jsonProviderMock = mock(JsonProvider.class);
+
when(jsonProviderMock.createGenerator(any(OutputStream.class))).thenReturn(mock(JsonGenerator.class));
+ when(jsonMapperMock.jsonProvider()).thenReturn(jsonProviderMock);
+
when(elasticsearchAsyncClientMock._jsonpMapper()).thenReturn(jsonMapperMock);
+
when(elasticConnectionMock.getAsyncClient()).thenReturn(elasticsearchAsyncClientMock);
- FulltextIndexWriter indexWriter = new
ElasticIndexWriterFactory(mock(ElasticConnection.class),
+ FulltextIndexWriter indexWriter = new
ElasticIndexWriterFactory(elasticConnectionMock,
mock(ElasticIndexTracker.class)).newInstance(idxDefn,
defn.builder(), CommitInfo.EMPTY, false);
ElasticIndexer indexer = new ElasticIndexer(idxDefn,
mock(FulltextBinaryTextExtractor.class), builder,
mock(IndexingProgressReporter.class), indexWriter,
mock(ElasticIndexEditorProvider.class), mock(IndexHelper.class));
diff --git
a/oak-run-elastic/src/test/java/org/apache/jackrabbit/oak/indexversion/ElasticPurgeOldIndexVersionTest.java
b/oak-run-elastic/src/test/java/org/apache/jackrabbit/oak/indexversion/ElasticPurgeOldIndexVersionTest.java
index d7bd3b663f..6922d3240a 100644
---
a/oak-run-elastic/src/test/java/org/apache/jackrabbit/oak/indexversion/ElasticPurgeOldIndexVersionTest.java
+++
b/oak-run-elastic/src/test/java/org/apache/jackrabbit/oak/indexversion/ElasticPurgeOldIndexVersionTest.java
@@ -46,13 +46,12 @@ import javax.jcr.Session;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.apache.jackrabbit.commons.JcrUtils.getOrCreateByPath;
import static org.hamcrest.CoreMatchers.containsString;
-import static org.junit.Assert.assertThat;
+import static org.hamcrest.MatcherAssert.assertThat;
public class ElasticPurgeOldIndexVersionTest extends
ElasticAbstractIndexCommandTest {
diff --git a/oak-search-elastic/pom.xml b/oak-search-elastic/pom.xml
index fd19bfdb6a..5fd82b97fa 100644
--- a/oak-search-elastic/pom.xml
+++ b/oak-search-elastic/pom.xml
@@ -33,8 +33,8 @@
<description>Oak Elasticsearch integration subproject</description>
<properties>
- <elasticsearch.hlrc.version>7.17.13</elasticsearch.hlrc.version>
-
<elasticsearch.java.client.version>8.7.1</elasticsearch.java.client.version>
+
<elasticsearch.java.client.version>8.11.0</elasticsearch.java.client.version>
+ <lucene.version>9.8.0</lucene.version>
</properties>
<build>
@@ -122,16 +122,6 @@
</dependency>
<!-- Elastic/Lucene -->
- <dependency>
- <groupId>org.elasticsearch.client</groupId>
- <artifactId>elasticsearch-rest-high-level-client</artifactId>
- <version>${elasticsearch.hlrc.version}</version>
- </dependency>
- <dependency>
- <groupId>org.elasticsearch.client</groupId>
- <artifactId>elasticsearch-rest-client</artifactId>
- <version>${elasticsearch.hlrc.version}</version>
- </dependency>
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
@@ -143,72 +133,14 @@
<version>${jackson.version}</version>
</dependency>
<dependency>
- <groupId>org.elasticsearch</groupId>
- <artifactId>elasticsearch</artifactId>
- <version>${elasticsearch.hlrc.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.elasticsearch</groupId>
- <artifactId>elasticsearch-cli</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.elasticsearch</groupId>
- <artifactId>elasticsearch-geo</artifactId>
- </exclusion>
-
- <!--
https://github.com/elastic/elasticsearch/issues/29184#issuecomment-662480046 -->
- <exclusion>
- <groupId>org.apache.lucene</groupId>
- <artifactId>lucene-backward-codecs</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.lucene</groupId>
- <artifactId>lucene-grouping</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.lucene</groupId>
- <artifactId>lucene-memory</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.lucene</groupId>
- <artifactId>lucene-misc</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.lucene</groupId>
- <artifactId>lucene-queryparser</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.lucene</groupId>
- <artifactId>lucene-sandbox</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.lucene</groupId>
- <artifactId>lucene-spatial</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.lucene</groupId>
- <artifactId>lucene-spatial-extras</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.lucene</groupId>
- <artifactId>lucene-spatial3d</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.lucene</groupId>
- <artifactId>lucene-suggest</artifactId>
- </exclusion>
- <!--
- elasticsearch-xcontent depends on snakeyaml 1.33 which is vulnerable
to remote code execution
- (https://nvd.nist.gov/vuln/detail/CVE-2022-1471)
- elasticsearch-xcontent is used in this module but only with the json
parser which does not use snakeyaml
- so we can safely exclude it.
- This exclusion, like the others above, can be removed once we will
remove the elasticsearch high level rest client
- -->
- <exclusion>
- <groupId>org.yaml</groupId>
- <artifactId>snakeyaml</artifactId>
- </exclusion>
- </exclusions>
+ <groupId>org.apache.lucene</groupId>
+ <artifactId>lucene-core</artifactId>
+ <version>${lucene.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.lucene</groupId>
+ <artifactId>lucene-analysis-common</artifactId>
+ <version>${lucene.version}</version>
</dependency>
<dependency>
<groupId>org.apache.tika</groupId>
diff --git
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticConnection.java
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticConnection.java
index a677a27506..66e3544713 100644
---
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticConnection.java
+++
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticConnection.java
@@ -23,8 +23,6 @@ import org.apache.http.HttpHost;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.client.RestHighLevelClientBuilder;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -123,14 +121,12 @@ public class ElasticConnection implements Closeable {
requestConfigBuilder ->
requestConfigBuilder.setSocketTimeout(ES_SOCKET_TIMEOUT));
RestClient httpClient = builder.build();
- RestHighLevelClient hlClient = new
RestHighLevelClientBuilder(httpClient)
- .setApiCompatibilityMode(true).build();
ElasticsearchTransport transport = new RestClientTransport(
httpClient, new JacksonJsonpMapper());
ElasticsearchClient esClient = new
ElasticsearchClient(transport);
ElasticsearchAsyncClient esAsyncClient = new
ElasticsearchAsyncClient(transport);
- clients = new Clients(esClient, esAsyncClient, hlClient);
+ clients = new Clients(esClient, esAsyncClient);
}
}
}
@@ -153,14 +149,6 @@ public class ElasticConnection implements Closeable {
return getClients().asyncClient;
}
- /**
- * @deprecated
- * @return the old Elasticsearch client
- */
- public RestHighLevelClient getOldClient() {
- return getClients().rhlClient;
- }
-
public String getIndexPrefix() {
return indexPrefix;
}
@@ -187,9 +175,6 @@ public class ElasticConnection implements Closeable {
// standard client
clients.client._transport().close();
}
- if (clients.rhlClient != null) {
- clients.rhlClient.close();
- }
}
isClosed.set(true);
}
@@ -217,12 +202,10 @@ public class ElasticConnection implements Closeable {
private static class Clients {
public final ElasticsearchClient client;
public final ElasticsearchAsyncClient asyncClient;
- public final RestHighLevelClient rhlClient;
- Clients(ElasticsearchClient client, ElasticsearchAsyncClient
asyncClient, RestHighLevelClient rhlClient) {
+ Clients(ElasticsearchClient client, ElasticsearchAsyncClient
asyncClient) {
this.client = client;
this.asyncClient = asyncClient;
- this.rhlClient = rhlClient;
}
}
@@ -271,7 +254,7 @@ public class ElasticConnection implements Closeable {
/**
* This is the final step in charge of building the {@link
ElasticConnection}.
* Validation should be here.
- *
+ * <p>
* It adds support for {@link OptionalSteps}.
*/
public interface BuildStep extends OptionalSteps {
diff --git
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexDefinition.java
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexDefinition.java
index 70a975760b..9ad83c5191 100644
---
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexDefinition.java
+++
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexDefinition.java
@@ -45,17 +45,11 @@ public class ElasticIndexDefinition extends IndexDefinition
{
public static final int BULK_ACTIONS_DEFAULT = 250;
public static final String BULK_SIZE_BYTES = "bulkSizeBytes";
- public static final long BULK_SIZE_BYTES_DEFAULT = 1 * 1024 * 1024; // 1MB
+ public static final long BULK_SIZE_BYTES_DEFAULT = 1024 * 1024; // 1MB
public static final String BULK_FLUSH_INTERVAL_MS = "bulkFlushIntervalMs";
public static final long BULK_FLUSH_INTERVAL_MS_DEFAULT = 3000;
- public static final String BULK_RETRIES = "bulkRetries";
- public static final int BULK_RETRIES_DEFAULT = 3;
-
- public static final String BULK_RETRIES_BACKOFF = "bulkRetriesBackoff";
- public static final long BULK_RETRIES_BACKOFF_DEFAULT = 200;
-
public static final String NUMBER_OF_SHARDS = "numberOfShards";
public static final int NUMBER_OF_SHARDS_DEFAULT = 1;
@@ -133,8 +127,6 @@ public class ElasticIndexDefinition extends IndexDefinition
{
public final int bulkActions;
public final long bulkSizeBytes;
public final long bulkFlushIntervalMs;
- public final int bulkRetries;
- public final long bulkRetriesBackoff;
private final boolean similarityTagsEnabled;
private final float similarityTagsBoost;
public final int numberOfShards;
@@ -157,8 +149,6 @@ public class ElasticIndexDefinition extends IndexDefinition
{
this.bulkActions = getOptionalValue(defn, BULK_ACTIONS,
BULK_ACTIONS_DEFAULT);
this.bulkSizeBytes = getOptionalValue(defn, BULK_SIZE_BYTES,
BULK_SIZE_BYTES_DEFAULT);
this.bulkFlushIntervalMs = getOptionalValue(defn,
BULK_FLUSH_INTERVAL_MS, BULK_FLUSH_INTERVAL_MS_DEFAULT);
- this.bulkRetries = getOptionalValue(defn, BULK_RETRIES,
BULK_RETRIES_DEFAULT);
- this.bulkRetriesBackoff = getOptionalValue(defn, BULK_RETRIES_BACKOFF,
BULK_RETRIES_BACKOFF_DEFAULT);
this.numberOfShards = getOptionalValue(defn, NUMBER_OF_SHARDS,
NUMBER_OF_SHARDS_DEFAULT);
this.numberOfReplicas = getOptionalValue(defn, NUMBER_OF_REPLICAS,
NUMBER_OF_REPLICAS_DEFAULT);
this.similarityTagsEnabled = getOptionalValue(defn,
SIMILARITY_TAGS_ENABLED, SIMILARITY_TAGS_ENABLED_DEFAULT);
diff --git
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
index 035fd7b6c8..b822a7bbd1 100644
---
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
+++
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
@@ -16,6 +16,13 @@
*/
package org.apache.jackrabbit.oak.plugins.index.elastic.index;
+import co.elastic.clients.elasticsearch._helpers.bulk.BulkIngester;
+import co.elastic.clients.elasticsearch._helpers.bulk.BulkListener;
+import co.elastic.clients.elasticsearch._types.ErrorCause;
+import co.elastic.clients.elasticsearch.core.BulkRequest;
+import co.elastic.clients.elasticsearch.core.BulkResponse;
+import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
+import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
@@ -23,37 +30,23 @@ import
org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.DocWriteRequest;
-import org.elasticsearch.action.bulk.BackoffPolicy;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.support.WriteRequest;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.common.Strings;
-import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.core.TimeValue;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
import java.util.LinkedHashSet;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.BiConsumer;
import java.util.stream.Collectors;
-import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS;
-import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
-
class ElasticBulkProcessorHandler {
private static final Logger LOG =
LoggerFactory.getLogger(ElasticBulkProcessorHandler.class);
@@ -68,13 +61,13 @@ class ElasticBulkProcessorHandler {
protected final String indexName;
protected final ElasticIndexDefinition indexDefinition;
private final NodeBuilder definitionBuilder;
- protected final BulkProcessor bulkProcessor;
+ protected final BulkIngester<String> bulkIngester;
private final boolean waitForESAcknowledgement;
/**
* Coordinates communication between bulk processes. It has a main
controller registered at creation time and
* de-registered on {@link ElasticIndexWriter#close(long)}. Each bulk
request register a new party in
- * this Phaser in {@link OakBulkProcessorListener#beforeBulk(long,
BulkRequest)} and de-register itself when
+ * this Phaser in {@link OakBulkListener#beforeBulk(long, BulkRequest,
List)} and de-register itself when
* the request returns.
*/
private final Phaser phaser = new Phaser(1); // register main controller
@@ -82,7 +75,7 @@ class ElasticBulkProcessorHandler {
/**
* Exceptions occurred while trying to update index in elasticsearch
*/
- private final ConcurrentLinkedQueue<Throwable> suppressedExceptions = new
ConcurrentLinkedQueue<>();
+ private final ConcurrentLinkedQueue<ErrorCause> suppressedErrorCauses =
new ConcurrentLinkedQueue<>();
/**
* Key-value structure to keep the history of bulk requests. Keys are the
bulk execution ids, the boolean
@@ -102,7 +95,7 @@ class ElasticBulkProcessorHandler {
this.indexDefinition = indexDefinition;
this.definitionBuilder = definitionBuilder;
this.waitForESAcknowledgement = waitForESAcknowledgement;
- this.bulkProcessor = initBulkProcessor();
+ this.bulkIngester = initBulkIngester();
}
/**
@@ -142,44 +135,57 @@ class ElasticBulkProcessorHandler {
return new ElasticBulkProcessorHandler(elasticConnection, indexName,
indexDefinition, definitionBuilder, waitForESAcknowledgement);
}
- private BulkProcessor initBulkProcessor() {
- return BulkProcessor.builder(requestConsumer(),
- new OakBulkProcessorListener(), this.indexName +
"-bulk-processor")
- .setBulkActions(indexDefinition.bulkActions)
- .setConcurrentRequests(BULK_PROCESSOR_CONCURRENCY)
- .setBulkSize(new ByteSizeValue(indexDefinition.bulkSizeBytes))
-
.setFlushInterval(TimeValue.timeValueMillis(indexDefinition.bulkFlushIntervalMs))
- .setBackoffPolicy(BackoffPolicy.exponentialBackoff(
-
TimeValue.timeValueMillis(indexDefinition.bulkRetriesBackoff),
indexDefinition.bulkRetries)
- )
- .build();
+ private BulkIngester<String> initBulkIngester() {
+ // BulkIngester does not support retry policies. Some retries though
are already implemented in the transport layer.
+ // More details here:
https://github.com/elastic/elasticsearch-java/issues/478
+ return BulkIngester.of(b -> {
+ b = b.client(elasticConnection.getAsyncClient())
+ .listener(new OakBulkListener());
+ if (indexDefinition.bulkActions > 0) {
+ b = b.maxOperations(indexDefinition.bulkActions);
+ }
+ if (indexDefinition.bulkSizeBytes > 0) {
+ b = b.maxSize(indexDefinition.bulkSizeBytes);
+ }
+ if (indexDefinition.bulkFlushIntervalMs > 0) {
+ b = b.flushInterval(indexDefinition.bulkFlushIntervalMs,
TimeUnit.MILLISECONDS);
+ }
+ return b.maxConcurrentRequests(BULK_PROCESSOR_CONCURRENCY);
+ });
}
private void checkFailures() throws IOException {
- if (!suppressedExceptions.isEmpty()) {
+ if (!suppressedErrorCauses.isEmpty()) {
IOException ioe = new IOException("Exception while indexing. See
suppressed for details");
- suppressedExceptions.forEach(ioe::addSuppressed);
+ suppressedErrorCauses.stream().map(ec -> new
IllegalStateException(ec.reason())).forEach(ioe::addSuppressed);
throw ioe;
}
}
- protected BiConsumer<BulkRequest, ActionListener<BulkResponse>>
requestConsumer() {
- // TODO: migrate to ES Java client
https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/current/indexing-bulk.html
- return (request, bulkListener) ->
elasticConnection.getOldClient().bulkAsync(request, RequestOptions.DEFAULT,
bulkListener);
+ public void update(String id, ElasticDocument document) throws IOException
{
+ add(BulkOperation.of(op -> op.index(idx ->
idx.index(indexName).id(id).document(document))), id);
}
- public void add(DocWriteRequest<?> request) throws IOException {
+ public void delete(String id) throws IOException {
+ add(BulkOperation.of(op -> op.delete(idx ->
idx.index(indexName).id(id))), id);
+ }
+
+ private void add(BulkOperation operation, String context) throws
IOException {
// fail fast: we don't want to wait until the processor gets closed to
fail
checkFailures();
-
- bulkProcessor.add(request);
+ bulkIngester.add(operation, context);
totalOperations++;
}
+ /**
+ * Closes the bulk ingester and waits for all the bulk requests to return.
+ * @return {@code true} if at least one update was performed, {@code
false} otherwise
+ * @throws IOException if an error happened while processing the bulk
requests
+ */
public boolean close() throws IOException {
- LOG.trace("Calling close on bulk processor {}", bulkProcessor);
- bulkProcessor.close();
- LOG.trace("Bulk Processor {} closed", bulkProcessor);
+ LOG.trace("Calling close on bulk ingester {}", bulkIngester);
+ bulkIngester.close();
+ LOG.trace("Bulk Ingester {} closed", bulkIngester);
// de-register main controller
int phase = phaser.arriveAndDeregister();
@@ -192,8 +198,11 @@ class ElasticBulkProcessorHandler {
if (waitForESAcknowledgement) {
try {
phaser.awaitAdvanceInterruptibly(phase,
indexDefinition.bulkFlushIntervalMs * 5, TimeUnit.MILLISECONDS);
- } catch (InterruptedException | TimeoutException e) {
+ } catch (TimeoutException e) {
LOG.error("Error waiting for bulk requests to return", e);
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while waiting for bulk processor to
close", e);
+ Thread.currentThread().interrupt(); // restore interrupt
status
}
}
@@ -205,91 +214,96 @@ class ElasticBulkProcessorHandler {
return updatesMap.containsValue(Boolean.TRUE);
}
- private class OakBulkProcessorListener implements BulkProcessor.Listener {
+ private class OakBulkListener implements BulkListener<String> {
@Override
- public void beforeBulk(long executionId, BulkRequest bulkRequest) {
+ public void beforeBulk(long executionId, BulkRequest request,
List<String> contexts) {
// register new bulk party
phaser.register();
// init update status
updatesMap.put(executionId, Boolean.FALSE);
- bulkRequest.timeout(TimeValue.timeValueMinutes(2));
-
- LOG.debug("Sending bulk with id {} -> {}", executionId,
bulkRequest.getDescription());
+ LOG.debug("Sending bulk with id {} -> {}", executionId, contexts);
if (LOG.isTraceEnabled()) {
- LOG.trace("Bulk Requests: \n{}", bulkRequest.requests()
+ LOG.trace("Bulk Requests: \n{}", request.operations()
.stream()
- .map(DocWriteRequest::toString)
+ .map(BulkOperation::toString)
.collect(Collectors.joining("\n"))
);
}
}
@Override
- public void afterBulk(long executionId, BulkRequest bulkRequest,
BulkResponse bulkResponse) {
- LOG.debug("Bulk with id {} processed with status {} in {}",
executionId, bulkResponse.status(), bulkResponse.getTook());
- if (LOG.isTraceEnabled()) {
- try {
-
LOG.trace(Strings.toString(bulkResponse.toXContent(jsonBuilder(),
EMPTY_PARAMS)));
- } catch (IOException e) {
- LOG.error("Error decoding bulk response", e);
+ public void afterBulk(long executionId, BulkRequest request,
List<String> contexts, BulkResponse response) {
+ try {
+ LOG.debug("Bulk with id {} processed in {} ms", executionId,
response.took());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(response.toString());
}
- }
- if (bulkResponse.hasFailures()) { // check if some operations
failed to execute
- Set<String> failedDocSet = new LinkedHashSet<>();
- NodeBuilder status =
definitionBuilder.child(IndexDefinition.STATUS_NODE);
- // Read the current failed paths (if any) on the :status node
into failedDocList
- if (status.hasProperty(IndexDefinition.FAILED_DOC_PATHS)) {
- for (String str :
status.getProperty(IndexDefinition.FAILED_DOC_PATHS).getValue(Type.STRINGS)) {
- failedDocSet.add(str);
+ if (response.items().stream().anyMatch(i -> i.error() !=
null)) { // check if some operations failed to execute
+ Set<String> failedDocSet = new LinkedHashSet<>();
+ NodeBuilder status =
definitionBuilder.child(IndexDefinition.STATUS_NODE);
+ // Read the current failed paths (if any) on the :status
node into failedDocList
+ if (status.hasProperty(IndexDefinition.FAILED_DOC_PATHS)) {
+ for (String str :
status.getProperty(IndexDefinition.FAILED_DOC_PATHS).getValue(Type.STRINGS)) {
+ failedDocSet.add(str);
+ }
}
- }
- int initialSize = failedDocSet.size();
- boolean isFailedDocSetFull = false;
-
- boolean hasSuccesses = false;
- for (BulkItemResponse bulkItemResponse : bulkResponse) {
- if (bulkItemResponse.isFailed()) {
- BulkItemResponse.Failure failure =
bulkItemResponse.getFailure();
- if (indexDefinition.failOnError && failure.getCause()
!= null) {
- suppressedExceptions.add(failure.getCause());
- }
- if (!isFailedDocSetFull && failedDocSet.size() <
FAILED_DOC_COUNT_FOR_STATUS_NODE) {
- failedDocSet.add(bulkItemResponse.getId());
- } else {
- isFailedDocSetFull = true;
+ int initialSize = failedDocSet.size();
+ boolean isFailedDocSetFull = false;
+
+ boolean hasSuccesses = false;
+ for (int i = 0; i < contexts.size(); i++) {
+ BulkResponseItem item = response.items().get(i);
+ if (item.error() != null) {
+ if (indexDefinition.failOnError) {
+ suppressedErrorCauses.add(item.error());
+ }
+ if (!isFailedDocSetFull && failedDocSet.size() <
FAILED_DOC_COUNT_FOR_STATUS_NODE) {
+ failedDocSet.add(contexts.get(i));
+ } else {
+ isFailedDocSetFull = true;
+ }
+ // Log entry to be used to parse logs to get the
failed doc id/path if needed
+ LOG.error("ElasticIndex Update Doc Failure: Error
while adding/updating doc with id: [{}]", contexts.get(i));
+ LOG.error("Failure Details: BulkItem ID: {},
Index: {}, Failure Cause: {}",
+ item.id(), item.index(), item.error());
+ } else if (!hasSuccesses) {
+ // Set indexUpdated to true even if 1 item was
updated successfully
+ updatesMap.put(executionId, Boolean.TRUE);
+ hasSuccesses = true;
}
- // Log entry to be used to parse logs to get the
failed doc id/path if needed
- LOG.error("ElasticIndex Update Doc Failure: Error
while adding/updating doc with id: [{}]", bulkItemResponse.getId());
- LOG.error("Failure Details: BulkItem ID: {}, Index:
{}, Failure Cause: {}",
- failure.getId(), failure.getIndex(),
failure.getCause());
- } else if (!hasSuccesses) {
- // Set indexUpdated to true even if 1 item was updated
successfully
- updatesMap.put(executionId, Boolean.TRUE);
- hasSuccesses = true;
}
- }
- if (isFailedDocSetFull) {
- LOG.info("Cannot store all new Failed Docs because {} has
been filled up. " +
- "See previous log entries to find out the details
of failed paths", IndexDefinition.FAILED_DOC_PATHS);
- } else if (failedDocSet.size() != initialSize) {
- status.setProperty(IndexDefinition.FAILED_DOC_PATHS,
failedDocSet, Type.STRINGS);
+ if (isFailedDocSetFull) {
+ LOG.info("Cannot store all new Failed Docs because {}
has been filled up. " +
+ "See previous log entries to find out the
details of failed paths", IndexDefinition.FAILED_DOC_PATHS);
+ } else if (failedDocSet.size() != initialSize) {
+ status.setProperty(IndexDefinition.FAILED_DOC_PATHS,
failedDocSet, Type.STRINGS);
+ }
+ } else {
+ updatesMap.put(executionId, Boolean.TRUE);
}
- } else {
- updatesMap.put(executionId, Boolean.TRUE);
+ } finally {
+ phaser.arriveAndDeregister();
}
- phaser.arriveAndDeregister();
}
@Override
- public void afterBulk(long executionId, BulkRequest bulkRequest,
Throwable throwable) {
- LOG.error("ElasticIndex Update Bulk Failure : Bulk with id {}
threw an error", executionId, throwable);
- suppressedExceptions.add(throwable);
- phaser.arriveAndDeregister();
+ public void afterBulk(long executionId, BulkRequest request,
List<String> contexts, Throwable failure) {
+ try {
+ LOG.error("ElasticIndex Update Bulk Failure : Bulk with id {}
threw an error", executionId, failure);
+ suppressedErrorCauses.add(ErrorCause.of(ec -> {
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw);
+ failure.printStackTrace(pw);
+ return
ec.reason(failure.getMessage()).stackTrace(sw.toString());
+ }));
+ } finally {
+ phaser.arriveAndDeregister();
+ }
}
}
@@ -297,12 +311,14 @@ class ElasticBulkProcessorHandler {
* {@link ElasticBulkProcessorHandler} extension with real time behaviour.
* It also uses the same async bulk processor as the parent except for the
last flush that waits until the
* indexed documents are searchable.
+ * <p>
+ * BulkIngester does not support customization of intermediate requests.
This means we cannot intercept the last
+ * request and apply a WAIT_UNTIL refresh policy. The workaround is to
force a refresh when the handler is closed.
+ * We can improve this when this issue gets fixed:
+ * <a
href="https://github.com/elastic/elasticsearch-java/issues/703">elasticsearch-java#703</a>
*/
protected static class RealTimeBulkProcessorHandler extends
ElasticBulkProcessorHandler {
- private final AtomicBoolean isClosed = new AtomicBoolean(false);
- private final AtomicBoolean isDataSearchable = new
AtomicBoolean(false);
-
private RealTimeBulkProcessorHandler(@NotNull ElasticConnection
elasticConnection,
@NotNull String indexName,
@NotNull ElasticIndexDefinition
indexDefinition,
@@ -311,27 +327,13 @@ class ElasticBulkProcessorHandler {
super(elasticConnection, indexName, indexDefinition,
definitionBuilder, waitForESAcknowledgement);
}
- @Override
- protected BiConsumer<BulkRequest, ActionListener<BulkResponse>>
requestConsumer() {
- return (request, bulkListener) -> {
- if (isClosed.get()) {
- LOG.debug("Processor is closing. Next request with {}
actions will block until the data is searchable",
- request.requests().size());
-
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
- isDataSearchable.set(true);
- }
- elasticConnection.getOldClient().bulkAsync(request,
RequestOptions.DEFAULT, bulkListener);
- };
- }
-
@Override
public boolean close() throws IOException {
- isClosed.set(true);
// calling super closes the bulk processor. If not empty it calls
#requestConsumer for the last time
boolean closed = super.close();
// it could happen that close gets called when the bulk has
already been flushed. In these cases we trigger
// an actual refresh to make sure the docs are searchable before
returning from the method
- if (totalOperations > 0 && !isDataSearchable.get()) {
+ if (totalOperations > 0) {
LOG.debug("Forcing refresh");
try {
this.elasticConnection.getClient().indices().refresh(b
-> b.index(indexName));
diff --git
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticCustomAnalyzer.java
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticCustomAnalyzer.java
index 4e2c26d40e..9eb4e8f88c 100644
---
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticCustomAnalyzer.java
+++
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticCustomAnalyzer.java
@@ -37,11 +37,11 @@ import
org.apache.jackrabbit.oak.plugins.tree.factories.TreeFactory;
import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.state.NodeStateUtils;
+import org.apache.lucene.analysis.AbstractAnalysisFactory;
+import org.apache.lucene.analysis.CharFilterFactory;
+import org.apache.lucene.analysis.TokenFilterFactory;
import org.apache.lucene.analysis.en.AbstractWordsFileFilterFactory;
-import org.apache.lucene.analysis.util.AbstractAnalysisFactory;
-import org.apache.lucene.analysis.util.CharFilterFactory;
-import org.apache.lucene.analysis.util.ResourceLoader;
-import org.apache.lucene.analysis.util.TokenFilterFactory;
+import org.apache.lucene.util.ResourceLoader;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
diff --git
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticCustomAnalyzerMappings.java
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticCustomAnalyzerMappings.java
index ea79c50086..e8adc6f2e2 100644
---
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticCustomAnalyzerMappings.java
+++
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticCustomAnalyzerMappings.java
@@ -16,6 +16,7 @@
*/
package org.apache.jackrabbit.oak.plugins.index.elastic.index;
+import org.apache.lucene.analysis.AbstractAnalysisFactory;
import org.apache.lucene.analysis.charfilter.MappingCharFilterFactory;
import org.apache.lucene.analysis.cjk.CJKBigramFilterFactory;
import org.apache.lucene.analysis.commongrams.CommonGramsFilterFactory;
@@ -33,7 +34,6 @@ import
org.apache.lucene.analysis.pattern.PatternCaptureGroupFilterFactory;
import org.apache.lucene.analysis.payloads.DelimitedPayloadTokenFilterFactory;
import org.apache.lucene.analysis.shingle.ShingleFilterFactory;
import org.apache.lucene.analysis.synonym.SynonymFilterFactory;
-import org.apache.lucene.analysis.util.AbstractAnalysisFactory;
import org.apache.lucene.analysis.util.ElisionFilterFactory;
import org.jetbrains.annotations.Nullable;
@@ -76,7 +76,7 @@ public class ElasticCustomAnalyzerMappings {
static {
CONTENT_TRANSFORMERS = new LinkedHashMap<>();
CONTENT_TRANSFORMERS.put("mapping", line -> {
- if (line.length() == 0 || line.startsWith("#")) {
+ if (line.isEmpty() || line.startsWith("#")) {
return null;
} else {
return line.replaceAll("\"", "");
diff --git
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticDocument.java
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticDocument.java
index c81d2c28d7..06aa0739a8 100644
---
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticDocument.java
+++
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticDocument.java
@@ -16,16 +16,14 @@
*/
package org.apache.jackrabbit.oak.plugins.index.elastic.index;
+import com.fasterxml.jackson.annotation.JsonAnyGetter;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.commons.PathUtils;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
import org.apache.jackrabbit.oak.plugins.index.search.FieldNames;
import
org.apache.jackrabbit.oak.plugins.index.search.spi.binary.BlobByteSource;
-import org.elasticsearch.common.Strings;
-import org.elasticsearch.xcontent.XContentBuilder;
-import org.elasticsearch.xcontent.XContentFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Map;
@@ -35,17 +33,23 @@ import java.util.LinkedHashSet;
import static
org.apache.jackrabbit.oak.plugins.index.elastic.util.ElasticIndexUtils.toDoubles;
+@JsonInclude(JsonInclude.Include.NON_EMPTY)
public class ElasticDocument {
- private static final Logger LOG =
LoggerFactory.getLogger(ElasticDocument.class);
- private final String path;
- private final Set<String> fulltext;
- private final Set<String> suggest;
- private final Set<String> spellcheck;
- private final Map<String, Set<Object>> properties;
- private final Map<String, Object> similarityFields;
- private final Map<String, Map<String, Double>> dynamicBoostFields;
- private final Set<String> similarityTags;
+ @JsonProperty(FieldNames.PATH)
+ public final String path;
+ @JsonProperty(FieldNames.FULLTEXT)
+ public final Set<String> fulltext;
+ @JsonProperty(FieldNames.SUGGEST)
+ public final Set<Map<String, String>> suggest;
+ @JsonProperty(FieldNames.SPELLCHECK)
+ public final Set<String> spellcheck;
+ @JsonProperty(ElasticIndexDefinition.DYNAMIC_BOOST_FULLTEXT)
+ public final Set<String> dbFullText;
+ @JsonProperty(ElasticIndexDefinition.SIMILARITY_TAGS)
+ public final Set<String> similarityTags;
+ // these are dynamic properties that need to be added to the document
unwrapped. See the use of @JsonAnyGetter in the getter
+ private final Map<String, Object> properties;
ElasticDocument(String path) {
this.path = path;
@@ -53,8 +57,7 @@ public class ElasticDocument {
this.suggest = new LinkedHashSet<>();
this.spellcheck = new LinkedHashSet<>();
this.properties = new HashMap<>();
- this.similarityFields = new HashMap<>();
- this.dynamicBoostFields = new HashMap<>();
+ this.dbFullText = new LinkedHashSet<>();
this.similarityTags = new LinkedHashSet<>();
}
@@ -67,24 +70,40 @@ public class ElasticDocument {
}
void addSuggest(String value) {
- suggest.add(value);
+ suggest.add(Map.of(ElasticIndexHelper.SUGGEST_NESTED_VALUE, value));
}
void addSpellcheck(String value) {
spellcheck.add(value);
}
- // ES for String values (that are not interpreted as date or numbers etc)
would analyze in the same
+ // ES for String values (that are not interpreted as date or numbers etc.)
would analyze in the same
// field and would index a sub-field "keyword" for non-analyzed value.
// ref: https://www.elastic.co/blog/strings-are-dead-long-live-strings
- // (interpretation of date etc:
https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-field-mapping.html)
+ // (interpretation of date etc.:
https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-field-mapping.html)
void addProperty(String fieldName, Object value) {
- properties.computeIfAbsent(fieldName, s -> new
LinkedHashSet<>()).add(value);
+ Object existingValue = properties.get(fieldName);
+ Object finalValue;
+
+ if (existingValue == null) {
+ finalValue = value;
+ } else if (existingValue instanceof Set) {
+ Set<Object> existingSet = (Set<Object>) existingValue;
+ existingSet.add(value);
+ finalValue = existingSet;
+ } else {
+ Set<Object> set = new LinkedHashSet<>();
+ set.add(existingValue);
+ set.add(value);
+ finalValue = set.size() == 1 ? set.iterator().next() : set;
+ }
+
+ properties.put(fieldName, finalValue);
}
void addSimilarityField(String name, Blob value) throws IOException {
byte[] bytes = new BlobByteSource(value).read();
- similarityFields.put(FieldNames.createSimilarityFieldName(name),
toDoubles(bytes));
+ addProperty(FieldNames.createSimilarityFieldName(name),
toDoubles(bytes));
}
void indexAncestors(String path) {
@@ -96,76 +115,25 @@ public class ElasticDocument {
}
void addDynamicBoostField(String propName, String value, double boost) {
- dynamicBoostFields.computeIfAbsent(propName, s -> new HashMap<>())
- .putIfAbsent(value, boost);
+ addProperty(propName,
+ Map.of(
+ ElasticIndexHelper.DYNAMIC_BOOST_NESTED_VALUE, value,
+ ElasticIndexHelper.DYNAMIC_BOOST_NESTED_BOOST, boost
+ )
+ );
+
+ // add value into the dynamic boost specific fulltext field. We cannot
add this in the standard
+ // field since dynamic boosted terms require lower weight compared to
standard terms
+ dbFullText.add(value);
}
void addSimilarityTag(String value) {
similarityTags.add(value);
}
- public String build() {
- String ret;
- try {
- XContentBuilder builder = XContentFactory.jsonBuilder();
- builder.startObject();
- {
- builder.field(FieldNames.PATH, path);
- Set<String> dbFullText = new LinkedHashSet<>();
- for (Map.Entry<String, Map<String, Double>> f :
dynamicBoostFields.entrySet()) {
- builder.startArray(f.getKey());
- for (Map.Entry<String, Double> v :
f.getValue().entrySet()) {
- builder.startObject();
- builder.field("value", v.getKey());
- builder.field("boost", v.getValue());
- builder.endObject();
- // add value into the dynamic boost specific fulltext
field. We cannot add this in the standard
- // field since dynamic boosted terms require lower
weight compared to standard terms
- dbFullText.add(v.getKey());
- }
- builder.endArray();
- }
- if (dbFullText.size() > 0) {
-
builder.field(ElasticIndexDefinition.DYNAMIC_BOOST_FULLTEXT, dbFullText);
- }
- if (fulltext.size() > 0) {
- builder.field(FieldNames.FULLTEXT, fulltext);
- }
- if (suggest.size() > 0) {
- builder.startArray(FieldNames.SUGGEST);
- for (String val : suggest) {
- builder.startObject().field("value", val).endObject();
- }
- builder.endArray();
- }
- if (spellcheck.size() > 0) {
- builder.field(FieldNames.SPELLCHECK, spellcheck);
- }
- for (Map.Entry<String, Object> simProp:
similarityFields.entrySet()) {
- builder.field(simProp.getKey(), simProp.getValue());
- }
- for (Map.Entry<String, Set<Object>> prop :
properties.entrySet()) {
- builder.field(prop.getKey(), prop.getValue().size() == 1 ?
prop.getValue().iterator().next() : prop.getValue());
- }
- if (!similarityTags.isEmpty()) {
- builder.field(ElasticIndexDefinition.SIMILARITY_TAGS,
similarityTags);
- }
- }
- builder.endObject();
-
- ret = Strings.toString(builder);
- } catch (IOException e) {
- LOG.error("Error serializing document - path: {}, properties: {},
fulltext: {}, suggest: {}",
- path, properties, fulltext, suggest, e);
- ret = null;
- }
-
- return ret;
- }
-
- @Override
- public String toString() {
- return build();
+ @JsonAnyGetter
+ public Map<String, Object> getProperties() {
+ return properties;
}
}
diff --git
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexHelper.java
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexHelper.java
index 187f49beee..254d3a0e88 100644
---
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexHelper.java
+++
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexHelper.java
@@ -23,6 +23,7 @@ import
co.elastic.clients.elasticsearch._types.mapping.TypeMapping;
import co.elastic.clients.elasticsearch.indices.CreateIndexRequest;
import co.elastic.clients.elasticsearch.indices.IndexSettings;
import co.elastic.clients.elasticsearch.indices.IndexSettingsAnalysis;
+import co.elastic.clients.elasticsearch.indices.PutIndicesSettingsRequest;
import co.elastic.clients.json.JsonData;
import co.elastic.clients.util.ObjectBuilder;
import org.apache.jackrabbit.oak.api.Type;
@@ -30,8 +31,6 @@ import
org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
import
org.apache.jackrabbit.oak.plugins.index.elastic.ElasticPropertyDefinition;
import org.apache.jackrabbit.oak.plugins.index.search.FieldNames;
import org.apache.jackrabbit.oak.plugins.index.search.PropertyDefinition;
-import
org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
-import org.elasticsearch.common.settings.Settings;
import org.jetbrains.annotations.NotNull;
import java.io.Reader;
@@ -59,10 +58,17 @@ class ElasticIndexHelper {
// Unset the refresh interval and disable replicas at index creation to
optimize for initial loads
//
https://www.elastic.co/guide/en/elasticsearch/reference/current/tune-for-indexing-speed.html
private static final Time INITIAL_REFRESH_INTERVAL = Time.of(b ->
b.time("-1"));
+
private static final String INITIAL_NUMBER_OF_REPLICAS = "0";
private static final String OAK_WORD_DELIMITER_GRAPH_FILTER =
"oak_word_delimiter_graph_filter";
+ protected static final String SUGGEST_NESTED_VALUE = "value";
+
+ protected static final String DYNAMIC_BOOST_NESTED_VALUE = "value";
+
+ protected static final String DYNAMIC_BOOST_NESTED_BOOST = "boost";
+
/**
* Returns a {@code CreateIndexRequest} with settings and mappings
translated from the specified {@code ElasticIndexDefinition}.
* The returned object can be used to create and index optimized for bulk
loads (eg: reindexing) but not for queries.
@@ -121,29 +127,28 @@ class ElasticIndexHelper {
/**
- * Returns a {@code UpdateSettingsRequest} to make an index ready to be
queried and updated in near real time.
+ * Returns a {@code PutIndicesSettingsRequest} to make an index ready to
be queried and updated in near real time.
*
* @param remoteIndexName the final index name (no alias)
* @param indexDefinition the definition used to read settings/mappings
- * @return an {@code UpdateSettingsRequest}
- * <p>
- * TODO: migrate to Elasticsearch Java client when the following issue
will be fixed
- * <a
href="https://github.com/elastic/elasticsearch-java/issues/283">https://github.com/elastic/elasticsearch-java/issues/283</a>
+ * @return an {@code PutIndicesSettingsRequest}
*/
- public static UpdateSettingsRequest enableIndexRequest(String
remoteIndexName, ElasticIndexDefinition indexDefinition) {
- UpdateSettingsRequest request = new
UpdateSettingsRequest(remoteIndexName);
-
- Settings.Builder settingsBuilder = Settings.builder()
- .putNull("index.refresh_interval") // null=reset a setting
back to the default value
- .put("index.number_of_replicas",
indexDefinition.numberOfReplicas);
+ public static PutIndicesSettingsRequest enableIndexRequest(String
remoteIndexName, ElasticIndexDefinition indexDefinition) {
+ IndexSettings indexSettings = IndexSettings.of(is -> is
+
.numberOfReplicas(Integer.toString(indexDefinition.numberOfReplicas))
+ // TODO: we should pass null to reset the refresh interval to
the default value but the following bug prevents it. We need to wait for a fix
+ // <a
href="https://github.com/elastic/elasticsearch-java/issues/283">https://github.com/elastic/elasticsearch-java/issues/283</a>
+ .refreshInterval(Time.of(t -> t.time("1s"))));
- return request.settings(settingsBuilder);
+ return PutIndicesSettingsRequest.of(pisr -> pisr
+ .index(remoteIndexName)
+ .settings(indexSettings));
}
private static ObjectBuilder<IndexSettings> loadSettings(@NotNull
IndexSettings.Builder builder,
@NotNull
ElasticIndexDefinition indexDefinition) {
- if (indexDefinition.getSimilarityProperties().size() > 0) {
+ if (!indexDefinition.getSimilarityProperties().isEmpty()) {
builder.otherSettings(ElasticIndexDefinition.ELASTIKNN,
JsonData.of(true));
}
@@ -246,7 +251,7 @@ class ElasticIndexHelper {
builder.properties(FieldNames.SUGGEST,
b1 -> b1.nested(
// TODO: evaluate
https://www.elastic.co/guide/en/elasticsearch/reference/current/faster-prefix-queries.html
- b2 -> b2.properties("value",
+ b2 -> b2.properties(SUGGEST_NESTED_VALUE,
b3 -> b3.text(
b4 ->
b4.analyzer("oak_analyzer")
)
@@ -258,10 +263,10 @@ class ElasticIndexHelper {
for (PropertyDefinition pd :
indexDefinition.getDynamicBoostProperties()) {
builder.properties(pd.nodeName,
b1 -> b1.nested(
- b2 -> b2.properties("value",
+ b2 -> b2.properties(DYNAMIC_BOOST_NESTED_VALUE,
b3 -> b3.text(
b4 ->
b4.analyzer("oak_analyzer")))
- .properties("boost",
+ .properties(DYNAMIC_BOOST_NESTED_BOOST,
b3 -> b3.double_(f -> f)
)
)
diff --git
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriter.java
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriter.java
index 3ae8ec3e3d..13968326f7 100644
---
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriter.java
+++
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriter.java
@@ -17,11 +17,14 @@
package org.apache.jackrabbit.oak.plugins.index.elastic.index;
import co.elastic.clients.elasticsearch._types.AcknowledgedResponseBase;
+import co.elastic.clients.elasticsearch._types.ElasticsearchException;
import co.elastic.clients.elasticsearch.indices.CreateIndexRequest;
import co.elastic.clients.elasticsearch.indices.CreateIndexResponse;
import co.elastic.clients.elasticsearch.indices.DeleteIndexResponse;
import co.elastic.clients.elasticsearch.indices.ElasticsearchIndicesClient;
import co.elastic.clients.elasticsearch.indices.GetAliasResponse;
+import co.elastic.clients.elasticsearch.indices.PutIndicesSettingsRequest;
+import co.elastic.clients.elasticsearch.indices.PutIndicesSettingsResponse;
import co.elastic.clients.elasticsearch.indices.UpdateAliasesRequest;
import co.elastic.clients.elasticsearch.indices.UpdateAliasesResponse;
import co.elastic.clients.json.JsonpUtils;
@@ -38,15 +41,6 @@ import
org.apache.jackrabbit.oak.plugins.index.importer.AsyncLaneSwitcher;
import
org.apache.jackrabbit.oak.plugins.index.search.spi.editor.FulltextIndexWriter;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
-import org.elasticsearch.ElasticsearchStatusException;
-import
org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
-import org.elasticsearch.action.delete.DeleteRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.support.master.AcknowledgedResponse;
-import org.elasticsearch.client.IndicesClient;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.common.Strings;
-import org.elasticsearch.xcontent.XContentType;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.TestOnly;
import org.slf4j.Logger;
@@ -57,18 +51,13 @@ import java.util.ArrayList;
import java.util.Set;
import java.util.UUID;
-import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS;
-import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
-
class ElasticIndexWriter implements FulltextIndexWriter<ElasticDocument> {
private static final Logger LOG =
LoggerFactory.getLogger(ElasticIndexWriter.class);
private final ElasticIndexTracker indexTracker;
private final ElasticConnection elasticConnection;
private final ElasticIndexDefinition indexDefinition;
-
private final ElasticBulkProcessorHandler bulkProcessorHandler;
-
private final boolean reindex;
private final String indexName;
@@ -134,16 +123,12 @@ class ElasticIndexWriter implements
FulltextIndexWriter<ElasticDocument> {
@Override
public void updateDocument(String path, ElasticDocument doc) throws
IOException {
- IndexRequest request = new IndexRequest(indexName)
- .id(ElasticIndexUtils.idFromPath(path))
- .source(doc.build(), XContentType.JSON);
- bulkProcessorHandler.add(request);
+ bulkProcessorHandler.update(ElasticIndexUtils.idFromPath(path), doc);
}
@Override
public void deleteDocuments(String path) throws IOException {
- DeleteRequest request = new
DeleteRequest(indexName).id(ElasticIndexUtils.idFromPath(path));
- bulkProcessorHandler.add(request);
+ bulkProcessorHandler.delete(ElasticIndexUtils.idFromPath(path));
}
@Override
@@ -196,12 +181,12 @@ class ElasticIndexWriter implements
FulltextIndexWriter<ElasticDocument> {
final CreateIndexResponse response = esClient.create(request);
LOG.info("Created index {}. Response acknowledged: {}", indexName,
response.acknowledged());
checkResponseAcknowledgement(response, "Create index call not
acknowledged for index " + indexName);
- } catch (ElasticsearchStatusException ese) {
+ } catch (ElasticsearchException ese) {
// We already check index existence as first thing in this method,
if we get here it means we have got into
// a conflict (eg: multiple cluster nodes provision concurrently).
// Elasticsearch does not have a CREATE IF NOT EXIST, need to
inspect exception
// https://github.com/elastic/elasticsearch/issues/19862
- if (ese.status().getStatus() == 400 &&
ese.getDetailedMessage().contains("resource_already_exists_exception")) {
+ if (ese.status() == 400 &&
ese.getMessage().contains("resource_already_exists_exception")) {
LOG.warn("Index {} already exists. Ignoring error", indexName);
} else {
throw ese;
@@ -216,14 +201,12 @@ class ElasticIndexWriter implements
FulltextIndexWriter<ElasticDocument> {
throw new IllegalStateException("cannot enable an index that does
not exist");
}
- UpdateSettingsRequest request =
ElasticIndexHelper.enableIndexRequest(indexName, indexDefinition);
+ PutIndicesSettingsRequest request =
ElasticIndexHelper.enableIndexRequest(indexName, indexDefinition);
if (LOG.isDebugEnabled()) {
- final String requestMsg =
Strings.toString(request.toXContent(jsonBuilder(), EMPTY_PARAMS));
- LOG.debug("Updating Index Settings with request {}", requestMsg);
+ LOG.debug("Updating Index Settings with request {}", request);
}
- IndicesClient oldClient = elasticConnection.getOldClient().indices();
- AcknowledgedResponse response = oldClient.putSettings(request,
RequestOptions.DEFAULT);
- LOG.info("Updated settings for index {}. Response acknowledged: {}",
indexName, response.isAcknowledged());
+ PutIndicesSettingsResponse response = client.putSettings(request);
+ LOG.info("Updated settings for index {}. Response acknowledged: {}",
indexName, response.acknowledged());
checkResponseAcknowledgement(response, "Update index settings call not
acknowledged for index " + indexName);
// update the alias
@@ -246,12 +229,6 @@ class ElasticIndexWriter implements
FulltextIndexWriter<ElasticDocument> {
deleteOldIndices(client, aliasResponse.result().keySet());
}
- private void checkResponseAcknowledgement(AcknowledgedResponse response,
String exceptionMessage) {
- if (!response.isAcknowledged()) {
- throw new IllegalStateException(exceptionMessage);
- }
- }
-
private void checkResponseAcknowledgement(AcknowledgedResponseBase
response, String exceptionMessage) {
if (!response.acknowledged()) {
throw new IllegalStateException(exceptionMessage);
@@ -265,7 +242,7 @@ class ElasticIndexWriter implements
FulltextIndexWriter<ElasticDocument> {
}
private void deleteOldIndices(ElasticsearchIndicesClient indicesClient,
Set<String> indices) throws IOException {
- if (indices.size() == 0)
+ if (indices.isEmpty())
return;
DeleteIndexResponse deleteIndexResponse = indicesClient.delete(db ->
db.index(new ArrayList<>(indices)));
checkResponseAcknowledgement(deleteIndexResponse, "Delete index call
not acknowledged for indices " + indices);
diff --git
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticPropertyIndexTest.java
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticPropertyIndexTest.java
index cef7ae6c43..64c21c7377 100644
---
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticPropertyIndexTest.java
+++
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticPropertyIndexTest.java
@@ -36,7 +36,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
public class ElasticPropertyIndexTest extends ElasticAbstractQueryTest {
@Test
- public void testBulkProcessorEventsFlushLimit() throws Exception {
+ public void bulkProcessorEventsFlushLimit() throws Exception {
setIndex("test1", createIndex("propa"));
Tree test = root.getTree("/").addChild("test");
@@ -46,7 +46,7 @@ public class ElasticPropertyIndexTest extends
ElasticAbstractQueryTest {
root.commit();
// 250 is the default flush limit for bulk processor, and we added
just less than 250 nodes
- // So once the index writer is closed , bulk Processor would be closed
and all the 248 entries should be flushed.
+ // So once the index writer is closed, bulk Processor would be closed
and all the 248 entries should be flushed.
// Make sure that the last entry is indexed correctly.
String propaQuery = "select [jcr:path] from [nt:base] where [propa] =
'foo248'";
assertEventually(() -> {
@@ -68,7 +68,7 @@ public class ElasticPropertyIndexTest extends
ElasticAbstractQueryTest {
}
@Test
- public void testBulkProcessorSizeFlushLimit() throws Exception {
+ public void bulkProcessorSizeFlushLimit() throws Exception {
LogCustomizer customLogger = LogCustomizer
.forLogger(
"org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticBulkProcessorHandler")
@@ -105,8 +105,8 @@ public class ElasticPropertyIndexTest extends
ElasticAbstractQueryTest {
assertQuery(propaQuery, List.of("/test/a" +
docCountBreachingBulkSize));
});
- Assert.assertEquals(1, customLogger.getLogs().stream().filter(n ->
n.contains("Bulk with id 2 processed with status OK in")).count());
- Assert.assertEquals(0, customLogger.getLogs().stream().filter(n ->
n.contains("Bulk with id 3 processed with status OK in")).count());
+ Assert.assertEquals(1, customLogger.getLogs().stream().filter(n ->
n.contains("Bulk with id 2 processed in")).count());
+ Assert.assertEquals(0, customLogger.getLogs().stream().filter(n ->
n.contains("Bulk with id 3 processed in")).count());
} finally {
customLogger.finished();
}
diff --git
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticTestServer.java
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticTestServer.java
index f0cf5da8d1..f5eb861030 100644
---
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticTestServer.java
+++
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticTestServer.java
@@ -42,8 +42,6 @@ import static org.junit.Assume.assumeNotNull;
public class ElasticTestServer implements AutoCloseable {
private static final Logger LOG =
LoggerFactory.getLogger(ElasticTestServer.class);
private static final Map<String, String>
PLUGIN_OFFICIAL_RELEASES_DIGEST_MAP = Map.of(
- "7.17.3.0",
"5e3b40bb72b2813f927be9bf6ecdf88668d89d2ef20c7ebafaa51ab8407fd179",
- "7.17.6.0",
"326893bb98ef1a0c569d9f4c4a9a073e53361924f990b17e87077985ce8a7478",
"7.17.7.0",
"4252eb55cc7775f1b889d624ac335abfa2e357931c40d0accb4d520144246b8b",
"8.3.3.0",
"14d3223456f4b9f00f86628ec8400cb46513935e618ae0f5d0d1088739ccc233",
"8.4.1.0",
"56797a1bac6ceeaa36d2358f818b14633124d79c5e04630fa3544603d82eaa01",
@@ -51,7 +49,9 @@ public class ElasticTestServer implements AutoCloseable {
"8.4.3.0",
"5c00d43cdd56c5c5d8e9032ad507acea482fb5ca9445861c5cc12ad63af66425",
"8.5.3.0",
"d4c13f68650f9df5ff8c74ec83abc2e416de9c45f991d459326e0e2baf7b0e3f",
"8.7.0.0",
"7aeac9b7ac4dea1ded3f8e477e26bcc7fe62e313edf6352f4bdf973c43d25819",
- "8.7.1.0",
"80c8d34334b0cf4def79835ea6dab78b59ba9ee54c8f5f3cba0bde53123d7820");
+ "8.7.1.0",
"80c8d34334b0cf4def79835ea6dab78b59ba9ee54c8f5f3cba0bde53123d7820",
+ "8.10.4.0",
"b2ae8faf1e272319594b4d47a72580fa4f61a5c11cbc8d3f13453fd34b153441",
+ "8.11.0.0",
"8d4d80b850c4da4da6dfe2d675b2e2355d2014307f8bdc54cc1b34323c81c7ae");
private static final ElasticTestServer SERVER = new ElasticTestServer();
private static volatile ElasticsearchContainer CONTAINER;
diff --git
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandlerTest.java
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandlerTest.java
index 1f9575bb73..eb37ce415f 100644
---
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandlerTest.java
+++
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandlerTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.jackrabbit.oak.plugins.index.elastic.index;
+import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
import org.apache.jackrabbit.oak.plugins.memory.MultiStringPropertyState;
@@ -23,6 +24,7 @@ import
org.apache.jackrabbit.oak.plugins.memory.StringPropertyState;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
@@ -32,7 +34,7 @@ import java.util.Arrays;
import java.util.Collections;
import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.junit.Assert.assertThat;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.when;
@@ -47,17 +49,28 @@ public class ElasticBulkProcessorHandlerTest {
@Mock
private ElasticConnection elasticConnectionMock;
+ @Mock
+ private ElasticsearchAsyncClient esAsyncClientMock;
+
@Mock
private NodeBuilder definitionBuilder;
@Mock
private CommitInfo commitInfo;
+ private AutoCloseable closeable;
+
@Before
public void setUp() {
- MockitoAnnotations.initMocks(this);
+ closeable = MockitoAnnotations.openMocks(this);
when(indexDefinitionMock.getDefinitionNodeState()).thenReturn(definitionNodeStateMock);
when(commitInfo.getInfo()).thenReturn(Collections.emptyMap());
+
when(elasticConnectionMock.getAsyncClient()).thenReturn(esAsyncClientMock);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ closeable.close();
}
@Test
diff --git
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterTest.java
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterTest.java
index a51eda1994..8997b88af8 100644
---
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterTest.java
+++
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterTest.java
@@ -19,9 +19,7 @@ package org.apache.jackrabbit.oak.plugins.index.elastic.index;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexTracker;
-import org.elasticsearch.action.DocWriteRequest;
-import org.elasticsearch.action.delete.DeleteRequest;
-import org.elasticsearch.action.index.IndexRequest;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@@ -35,6 +33,8 @@ import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.number.OrderingComparison.lessThan;
import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -55,35 +55,41 @@ public class ElasticIndexWriterTest {
private ElasticIndexWriter indexWriter;
+ private AutoCloseable closeable;
+
@Before
public void setUp() {
- MockitoAnnotations.initMocks(this);
+ closeable = MockitoAnnotations.openMocks(this);
when(indexDefinitionMock.getIndexAlias()).thenReturn("test-index");
indexWriter = new ElasticIndexWriter(indexTrackerMock,
elasticConnectionMock, indexDefinitionMock, bulkProcessorHandlerMock);
}
+ @After
+ public void tearDown() throws Exception {
+ closeable.close();
+ }
+
@Test
public void singleUpdateDocument() throws IOException {
indexWriter.updateDocument("/foo", new ElasticDocument("/foo"));
- ArgumentCaptor<IndexRequest> acIndexRequest =
ArgumentCaptor.forClass(IndexRequest.class);
- verify(bulkProcessorHandlerMock).add(acIndexRequest.capture());
+ ArgumentCaptor<ElasticDocument> esDocumentCaptor =
ArgumentCaptor.forClass(ElasticDocument.class);
+ ArgumentCaptor<String> idCaptor =
ArgumentCaptor.forClass(String.class);
+ verify(bulkProcessorHandlerMock).update(idCaptor.capture(),
esDocumentCaptor.capture());
- IndexRequest request = acIndexRequest.getValue();
- assertEquals("test-index", request.index());
- assertEquals("/foo", request.id());
+ assertEquals("/foo", idCaptor.getValue());
+ assertEquals("/foo", esDocumentCaptor.getValue().path);
}
@Test
public void singleDeleteDocument() throws IOException {
indexWriter.deleteDocuments("/bar");
- ArgumentCaptor<DeleteRequest> acDeleteRequest =
ArgumentCaptor.forClass(DeleteRequest.class);
- verify(bulkProcessorHandlerMock).add(acDeleteRequest.capture());
+ ArgumentCaptor<String> idCaptor =
ArgumentCaptor.forClass(String.class);
+ verify(bulkProcessorHandlerMock).delete(idCaptor.capture());
- DeleteRequest request = acDeleteRequest.getValue();
- assertEquals("test-index", request.index());
- assertEquals("/bar", request.id());
+ String id = idCaptor.getValue();
+ assertEquals("/bar", id);
}
@Test
@@ -93,8 +99,8 @@ public class ElasticIndexWriterTest {
indexWriter.deleteDocuments("/foo");
indexWriter.deleteDocuments("/bar");
- ArgumentCaptor<DocWriteRequest<?>> request =
ArgumentCaptor.forClass(DocWriteRequest.class);
- verify(bulkProcessorHandlerMock, times(4)).add(request.capture());
+ verify(bulkProcessorHandlerMock, times(2)).update(anyString(),
any(ElasticDocument.class));
+ verify(bulkProcessorHandlerMock, times(2)).delete(anyString());
}
@Test
@@ -103,12 +109,12 @@ public class ElasticIndexWriterTest {
indexWriter.updateDocument(generatedPath, new
ElasticDocument(generatedPath));
- ArgumentCaptor<IndexRequest> acIndexRequest =
ArgumentCaptor.forClass(IndexRequest.class);
- verify(bulkProcessorHandlerMock).add(acIndexRequest.capture());
+ ArgumentCaptor<String> idCaptor =
ArgumentCaptor.forClass(String.class);
+ verify(bulkProcessorHandlerMock).update(idCaptor.capture(),
any(ElasticDocument.class));
- IndexRequest request = acIndexRequest.getValue();
- assertThat(request.id(), not(generatedPath));
- assertThat(request.id().length(), lessThan(513));
+ String id = idCaptor.getValue();
+ assertThat(id, not(generatedPath));
+ assertThat(id.length(), lessThan(513));
}
@Test