Author: ngupta
Date: Wed Jul 15 11:44:38 2020
New Revision: 1879884
URL: http://svn.apache.org/viewvc?rev=1879884&view=rev
Log:
OAK-9138 | Have a mechanism to track failed docs in ES
Modified:
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriter.java
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterFactory.java
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterTest.java
jackrabbit/oak/trunk/oak-search/src/main/java/org/apache/jackrabbit/oak/plugins/index/search/IndexDefinition.java
Modified:
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriter.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriter.java?rev=1879884&r1=1879883&r2=1879884&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriter.java
(original)
+++
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriter.java
Wed Jul 15 11:44:38 2020
@@ -16,10 +16,13 @@
*/
package org.apache.jackrabbit.oak.plugins.index.elastic.index;
+import org.apache.jackrabbit.oak.api.Type;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
import org.apache.jackrabbit.oak.plugins.index.elastic.util.ElasticIndexUtils;
+import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
import
org.apache.jackrabbit.oak.plugins.index.search.spi.editor.FulltextIndexWriter;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
@@ -50,9 +53,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
+import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
@@ -69,6 +70,8 @@ class ElasticIndexWriter implements Full
private final ElasticConnection elasticConnection;
private final ElasticIndexDefinition indexDefinition;
+ private final NodeBuilder definitionBuilder;
+ private final int FAILED_DOC_COUNT_FOR_STATUS_NODE =
Integer.getInteger("oak.failedDocStatusLimit", 10000);
/**
* Coordinates communication between bulk processes. It has a main
controller registered at creation time and
@@ -82,22 +85,27 @@ class ElasticIndexWriter implements Full
* value is {@code true} when at least an update is performed, otherwise
{@code false}.
*/
private final ConcurrentHashMap<Long, Boolean> updatesMap = new
ConcurrentHashMap<>();
+ private final Set<String> failedDocSet = new HashSet<>();
private final BulkProcessor bulkProcessor;
ElasticIndexWriter(@NotNull ElasticConnection elasticConnection,
- @NotNull ElasticIndexDefinition indexDefinition) {
+ @NotNull ElasticIndexDefinition indexDefinition,
+ @NotNull NodeBuilder definitionBuilder) {
this.elasticConnection = elasticConnection;
this.indexDefinition = indexDefinition;
+ this.definitionBuilder = definitionBuilder;
bulkProcessor = initBulkProcessor();
}
@TestOnly
ElasticIndexWriter(@NotNull ElasticConnection elasticConnection,
@NotNull ElasticIndexDefinition indexDefinition,
- @NotNull BulkProcessor bulkProcessor) {
+ @NotNull BulkProcessor bulkProcessor,
+ @NotNull NodeBuilder definitionBuilder) {
this.elasticConnection = elasticConnection;
this.indexDefinition = indexDefinition;
this.bulkProcessor = bulkProcessor;
+ this.definitionBuilder = definitionBuilder;
}
private BulkProcessor initBulkProcessor() {
@@ -254,15 +262,35 @@ class ElasticIndexWriter implements Full
}
}
if (bulkResponse.hasFailures()) { // check if some operations
failed to execute
+
+ NodeBuilder status =
definitionBuilder.child(IndexDefinition.STATUS_NODE);
+ // Read the current failed paths (if any) on the :status node
into failedDocList
+ if (status.hasProperty(IndexDefinition.FAILED_DOC_PATHS)) {
+ for (String str :
status.getProperty(IndexDefinition.FAILED_DOC_PATHS).getValue(Type.STRINGS)) {
+ failedDocSet.add(str);
+ }
+ }
+
for (BulkItemResponse bulkItemResponse : bulkResponse) {
if (bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure =
bulkItemResponse.getFailure();
- LOG.error("Bulk item with id {} failed",
failure.getId(), failure.getCause());
+ failedDocSet.add(bulkItemResponse.getId());
+ // Log entry to be used to parse logs to get the
failed doc id/path if needed
+ LOG.error("ElasticIndex Update Doc Failure: Error
while adding/updating doc with id : [{}]", bulkItemResponse.getId());
+ LOG.error("Failure Details: BulkItem ID: " +
failure.getId() + ", Failure Cause: {}", failure.getCause());
} else {
// Set indexUpdated to true even if 1 item was updated
successfully
updatesMap.put(executionId, Boolean.TRUE);
}
}
+
+ if (failedDocSet.size() == FAILED_DOC_COUNT_FOR_STATUS_NODE) {
+ LOG.info("Failed Docs count exceeds the persistence limit.
Will skip persisting paths of more failed docs." +
+ "Failing docs should be mentioned above under
ElasticIndex Update Doc Failure");
+ } else {
+ status.setProperty(IndexDefinition.FAILED_DOC_PATHS,
failedDocSet, Type.STRINGS);
+ }
+
} else {
updatesMap.put(executionId, Boolean.TRUE);
}
@@ -271,7 +299,7 @@ class ElasticIndexWriter implements Full
@Override
public void afterBulk(long executionId, BulkRequest bulkRequest,
Throwable throwable) {
- LOG.error("Bulk with id {} threw an error", executionId,
throwable);
+ LOG.error("ElasticIndex Update Bulk Failure : Bulk with id {}
threw an error", executionId, throwable);
phaser.arriveAndDeregister();
}
}
Modified:
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterFactory.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterFactory.java?rev=1879884&r1=1879883&r2=1879884&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterFactory.java
(original)
+++
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterFactory.java
Wed Jul 15 11:44:38 2020
@@ -36,6 +36,6 @@ class ElasticIndexWriterFactory implemen
throw new IllegalArgumentException("IndexDefinition must be of
type ElasticsearchIndexDefinition " +
"instead of " + definition.getClass().getName());
}
- return new ElasticIndexWriter(elasticConnection,
(ElasticIndexDefinition) definition);
+ return new ElasticIndexWriter(elasticConnection,
(ElasticIndexDefinition) definition, definitionBuilder);
}
}
Modified:
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterTest.java?rev=1879884&r1=1879883&r2=1879884&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterTest.java
(original)
+++
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterTest.java
Wed Jul 15 11:44:38 2020
@@ -18,6 +18,7 @@ package org.apache.jackrabbit.oak.plugin
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
@@ -47,13 +48,16 @@ public class ElasticIndexWriterTest {
@Mock
private BulkProcessor bulkProcessorMock;
+ @Mock
+ private NodeBuilder definitionBuilder;
+
private ElasticIndexWriter indexWriter;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
when(indexDefinitionMock.getRemoteIndexAlias()).thenReturn("test-index");
- indexWriter = new ElasticIndexWriter(elasticConnectionMock,
indexDefinitionMock, bulkProcessorMock);
+ indexWriter = new ElasticIndexWriter(elasticConnectionMock,
indexDefinitionMock, bulkProcessorMock, definitionBuilder);
}
@Test
Modified:
jackrabbit/oak/trunk/oak-search/src/main/java/org/apache/jackrabbit/oak/plugins/index/search/IndexDefinition.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search/src/main/java/org/apache/jackrabbit/oak/plugins/index/search/IndexDefinition.java?rev=1879884&r1=1879883&r2=1879884&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-search/src/main/java/org/apache/jackrabbit/oak/plugins/index/search/IndexDefinition.java
(original)
+++
jackrabbit/oak/trunk/oak-search/src/main/java/org/apache/jackrabbit/oak/plugins/index/search/IndexDefinition.java
Wed Jul 15 11:44:38 2020
@@ -145,6 +145,10 @@ public class IndexDefinition implements
public static final String CREATION_TIMESTAMP = "creationTimestamp";
public static final String REINDEX_COMPLETION_TIMESTAMP =
"reindexCompletionTimestamp";
+ /**
+ * Property to store paths for documents failed during index updates.
+ */
+ public static final String FAILED_DOC_PATHS = "failedDocPaths";
/**
* Meta property which provides the unique id