Author: ngupta
Date: Wed Jul 15 11:44:38 2020
New Revision: 1879884

URL: http://svn.apache.org/viewvc?rev=1879884&view=rev
Log:
OAK-9138 | Have a mechanism to track failed docs in ES

Modified:
    
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriter.java
    
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterFactory.java
    
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterTest.java
    
jackrabbit/oak/trunk/oak-search/src/main/java/org/apache/jackrabbit/oak/plugins/index/search/IndexDefinition.java

Modified: 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriter.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriter.java?rev=1879884&r1=1879883&r2=1879884&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriter.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriter.java
 Wed Jul 15 11:44:38 2020
@@ -16,10 +16,13 @@
  */
 package org.apache.jackrabbit.oak.plugins.index.elastic.index;
 
+import org.apache.jackrabbit.oak.api.Type;
 import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
 import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
 import org.apache.jackrabbit.oak.plugins.index.elastic.util.ElasticIndexUtils;
+import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
 import 
org.apache.jackrabbit.oak.plugins.index.search.spi.editor.FulltextIndexWriter;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
@@ -50,9 +53,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
+import java.util.HashSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Phaser;
 import java.util.concurrent.TimeUnit;
@@ -69,6 +70,8 @@ class ElasticIndexWriter implements Full
 
     private final ElasticConnection elasticConnection;
     private final ElasticIndexDefinition indexDefinition;
+    private final NodeBuilder definitionBuilder;
+    private final int FAILED_DOC_COUNT_FOR_STATUS_NODE = 
Integer.getInteger("oak.failedDocStatusLimit", 10000);
 
     /**
      * Coordinates communication between bulk processes. It has a main 
controller registered at creation time and
@@ -82,22 +85,27 @@ class ElasticIndexWriter implements Full
      * value is {@code true} when at least an update is performed, otherwise 
{@code false}.
      */
     private final ConcurrentHashMap<Long, Boolean> updatesMap = new 
ConcurrentHashMap<>();
+    private final Set<String> failedDocSet = new HashSet<>();
     private final BulkProcessor bulkProcessor;
 
     ElasticIndexWriter(@NotNull ElasticConnection elasticConnection,
-                       @NotNull ElasticIndexDefinition indexDefinition) {
+                       @NotNull ElasticIndexDefinition indexDefinition,
+                       @NotNull NodeBuilder definitionBuilder) {
         this.elasticConnection = elasticConnection;
         this.indexDefinition = indexDefinition;
+        this.definitionBuilder = definitionBuilder;
         bulkProcessor = initBulkProcessor();
     }
 
     @TestOnly
     ElasticIndexWriter(@NotNull ElasticConnection elasticConnection,
                        @NotNull ElasticIndexDefinition indexDefinition,
-                       @NotNull BulkProcessor bulkProcessor) {
+                       @NotNull BulkProcessor bulkProcessor,
+                       @NotNull NodeBuilder definitionBuilder) {
         this.elasticConnection = elasticConnection;
         this.indexDefinition = indexDefinition;
         this.bulkProcessor = bulkProcessor;
+        this.definitionBuilder = definitionBuilder;
     }
 
     private BulkProcessor initBulkProcessor() {
@@ -254,15 +262,35 @@ class ElasticIndexWriter implements Full
                 }
             }
             if (bulkResponse.hasFailures()) { // check if some operations 
failed to execute
+
+                NodeBuilder status = 
definitionBuilder.child(IndexDefinition.STATUS_NODE);
+                // Read the current failed paths (if any) on the :status node 
into failedDocList
+                if (status.hasProperty(IndexDefinition.FAILED_DOC_PATHS)) {
+                    for (String str : 
status.getProperty(IndexDefinition.FAILED_DOC_PATHS).getValue(Type.STRINGS)) {
+                        failedDocSet.add(str);
+                    }
+                }
+
                 for (BulkItemResponse bulkItemResponse : bulkResponse) {
                     if (bulkItemResponse.isFailed()) {
                         BulkItemResponse.Failure failure = 
bulkItemResponse.getFailure();
-                        LOG.error("Bulk item with id {} failed", 
failure.getId(), failure.getCause());
+                        failedDocSet.add(bulkItemResponse.getId());
+                        // Log entry to be used to parse logs to get the 
failed doc id/path if needed
+                        LOG.error("ElasticIndex Update Doc Failure: Error 
while adding/updating doc with id : [{}]", bulkItemResponse.getId());
+                        LOG.error("Failure Details: BulkItem ID: " + 
failure.getId() + ", Failure Cause: {}", failure.getCause());
                     } else {
                         // Set indexUpdated to true even if 1 item was updated 
successfully
                         updatesMap.put(executionId, Boolean.TRUE);
                     }
                 }
+
+                if (failedDocSet.size() == FAILED_DOC_COUNT_FOR_STATUS_NODE) {
+                    LOG.info("Failed Docs count exceeds the persistence limit. 
Will skip persisting paths of more failed docs." +
+                            "Failing docs should be mentioned above under 
ElasticIndex Update Doc Failure");
+                } else {
+                    status.setProperty(IndexDefinition.FAILED_DOC_PATHS, 
failedDocSet, Type.STRINGS);
+                }
+
             } else {
                 updatesMap.put(executionId, Boolean.TRUE);
             }
@@ -271,7 +299,7 @@ class ElasticIndexWriter implements Full
 
         @Override
         public void afterBulk(long executionId, BulkRequest bulkRequest, 
Throwable throwable) {
-            LOG.error("Bulk with id {} threw an error", executionId, 
throwable);
+            LOG.error("ElasticIndex Update Bulk Failure : Bulk with id {} 
threw an error", executionId, throwable);
             phaser.arriveAndDeregister();
         }
     }

Modified: 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterFactory.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterFactory.java?rev=1879884&r1=1879883&r2=1879884&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterFactory.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterFactory.java
 Wed Jul 15 11:44:38 2020
@@ -36,6 +36,6 @@ class ElasticIndexWriterFactory implemen
             throw new IllegalArgumentException("IndexDefinition must be of 
type ElasticsearchIndexDefinition " +
                     "instead of " + definition.getClass().getName());
         }
-        return new ElasticIndexWriter(elasticConnection, 
(ElasticIndexDefinition) definition);
+        return new ElasticIndexWriter(elasticConnection, 
(ElasticIndexDefinition) definition, definitionBuilder);
     }
 }

Modified: 
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterTest.java?rev=1879884&r1=1879883&r2=1879884&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterTest.java
 Wed Jul 15 11:44:38 2020
@@ -18,6 +18,7 @@ package org.apache.jackrabbit.oak.plugin
 
 import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
 import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.action.index.IndexRequest;
@@ -47,13 +48,16 @@ public class ElasticIndexWriterTest {
     @Mock
     private BulkProcessor bulkProcessorMock;
 
+    @Mock
+    private NodeBuilder definitionBuilder;
+
     private ElasticIndexWriter indexWriter;
 
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
         
when(indexDefinitionMock.getRemoteIndexAlias()).thenReturn("test-index");
-        indexWriter = new ElasticIndexWriter(elasticConnectionMock, 
indexDefinitionMock, bulkProcessorMock);
+        indexWriter = new ElasticIndexWriter(elasticConnectionMock, 
indexDefinitionMock, bulkProcessorMock, definitionBuilder);
     }
 
     @Test

Modified: 
jackrabbit/oak/trunk/oak-search/src/main/java/org/apache/jackrabbit/oak/plugins/index/search/IndexDefinition.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search/src/main/java/org/apache/jackrabbit/oak/plugins/index/search/IndexDefinition.java?rev=1879884&r1=1879883&r2=1879884&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-search/src/main/java/org/apache/jackrabbit/oak/plugins/index/search/IndexDefinition.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-search/src/main/java/org/apache/jackrabbit/oak/plugins/index/search/IndexDefinition.java
 Wed Jul 15 11:44:38 2020
@@ -145,6 +145,10 @@ public class IndexDefinition implements
 
     public static final String CREATION_TIMESTAMP = "creationTimestamp";
     public static final String REINDEX_COMPLETION_TIMESTAMP = 
"reindexCompletionTimestamp";
+    /**
+     * Property to store paths for documents failed during index updates.
+     */
+    public static final String FAILED_DOC_PATHS = "failedDocPaths";
 
     /**
      * Meta property which provides the unique id


Reply via email to