Author: ngupta
Date: Wed Jul 29 10:32:04 2020
New Revision: 1880391

URL: http://svn.apache.org/viewvc?rev=1880391&view=rev
Log:
OAK-9152 | Implement 2 factor writes

Modified:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java
    
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java?rev=1880391&r1=1880390&r2=1880391&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java
 Wed Jul 29 10:32:04 2020
@@ -71,6 +71,7 @@ import org.slf4j.LoggerFactory;
 public class IndexUpdate implements Editor, PathSource {
 
     private static final Logger log = 
LoggerFactory.getLogger(IndexUpdate.class);
+    private static final String TYPE_ELASTICSEARCH = "elasticsearch";
 
     /**
      * <p>
@@ -212,13 +213,17 @@ public class IndexUpdate implements Edit
 
     private boolean shouldReindex(NodeBuilder definition, NodeState before,
             String name) {
+        PropertyState type = definition.getProperty(TYPE_PROPERTY_NAME);
+
         //Async indexes are not considered for reindexing for sync indexing
-        if (!isMatchingIndexMode(definition)){
+        // Skip this check for elastic index
+        // TODO : See if the check to skip elastic can be handled in a better 
way - maybe move isMatchingIndexNode to IndexDefinition ? 
+        if (!TYPE_ELASTICSEARCH.equals(type.getValue(Type.STRING)) && 
!isMatchingIndexMode(definition)){
             return false;
         }
 
         //Do not attempt reindex of disabled indexes
-        PropertyState type = definition.getProperty(TYPE_PROPERTY_NAME);
+
         if (type != null && TYPE_DISABLED.equals(type.getValue(Type.STRING))) {
             return false;
         }

Modified: 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java?rev=1880391&r1=1880390&r2=1880391&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
 Wed Jul 29 10:32:04 2020
@@ -18,6 +18,7 @@ package org.apache.jackrabbit.oak.plugin
 
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
 import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
 import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
 import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
@@ -61,6 +62,7 @@ class ElasticBulkProcessorHandler {
 
     private static final String SYNC_MODE_PROPERTY = "sync-mode";
     private static final String SYNC_RT_MODE = "rt";
+    private static boolean waitForESAcknowledgement = true;
 
     protected final ElasticConnection elasticConnection;
     protected final ElasticIndexDefinition indexDefinition;
@@ -109,6 +111,14 @@ class ElasticBulkProcessorHandler {
         PropertyState async = 
indexDefinition.getDefinitionNodeState().getProperty("async");
 
         if (async != null) {
+            Iterable<String> opt = async.getValue(Type.STRINGS);
+
+            // Check if this indexing call is a part of async cycle or a 
commit hook
+            // In case it's from async cycle - commit info will have a 
indexingCheckpointTime key.
+            // Otherwise it's part of commit hook based indexing due to async 
property having a value nrt
+            if 
(!commitInfo.getInfo().containsKey(IndexConstants.CHECKPOINT_CREATION_TIME)) {
+                waitForESAcknowledgement = false;
+            }
             return new ElasticBulkProcessorHandler(elasticConnection, 
indexDefinition, definitionBuilder);
         }
 
@@ -166,10 +176,12 @@ class ElasticBulkProcessorHandler {
             return false;
         }
 
-        try {
-            phaser.awaitAdvanceInterruptibly(phase, 
indexDefinition.bulkFlushIntervalMs * 5, TimeUnit.MILLISECONDS);
-        } catch (InterruptedException | TimeoutException e) {
-            LOG.error("Error waiting for bulk requests to return", e);
+        if (waitForESAcknowledgement) {
+            try {
+                phaser.awaitAdvanceInterruptibly(phase, 
indexDefinition.bulkFlushIntervalMs * 5, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException | TimeoutException e) {
+                LOG.error("Error waiting for bulk requests to return", e);
+            }
         }
 
         if (ioException != null) {


Reply via email to