Author: ngupta
Date: Wed Jul 29 10:32:04 2020
New Revision: 1880391
URL: http://svn.apache.org/viewvc?rev=1880391&view=rev
Log:
OAK-9152 | Implement 2 factor writes
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java?rev=1880391&r1=1880390&r2=1880391&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java
Wed Jul 29 10:32:04 2020
@@ -71,6 +71,7 @@ import org.slf4j.LoggerFactory;
public class IndexUpdate implements Editor, PathSource {
private static final Logger log =
LoggerFactory.getLogger(IndexUpdate.class);
+ private static final String TYPE_ELASTICSEARCH = "elasticsearch";
/**
* <p>
@@ -212,13 +213,17 @@ public class IndexUpdate implements Edit
private boolean shouldReindex(NodeBuilder definition, NodeState before,
String name) {
+ PropertyState type = definition.getProperty(TYPE_PROPERTY_NAME);
+
//Async indexes are not considered for reindexing for sync indexing
- if (!isMatchingIndexMode(definition)){
+ // Skip this check for elastic index
+ // TODO : See if the check to skip elastic can be handled in a better
way - maybe move isMatchingIndexNode to IndexDefinition ?
+ if (!TYPE_ELASTICSEARCH.equals(type.getValue(Type.STRING)) &&
!isMatchingIndexMode(definition)){
return false;
}
//Do not attempt reindex of disabled indexes
- PropertyState type = definition.getProperty(TYPE_PROPERTY_NAME);
+
if (type != null && TYPE_DISABLED.equals(type.getValue(Type.STRING))) {
return false;
}
Modified:
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java?rev=1880391&r1=1880390&r2=1880391&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
(original)
+++
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
Wed Jul 29 10:32:04 2020
@@ -18,6 +18,7 @@ package org.apache.jackrabbit.oak.plugin
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
@@ -61,6 +62,7 @@ class ElasticBulkProcessorHandler {
private static final String SYNC_MODE_PROPERTY = "sync-mode";
private static final String SYNC_RT_MODE = "rt";
+ private static boolean waitForESAcknowledgement = true;
protected final ElasticConnection elasticConnection;
protected final ElasticIndexDefinition indexDefinition;
@@ -109,6 +111,14 @@ class ElasticBulkProcessorHandler {
PropertyState async =
indexDefinition.getDefinitionNodeState().getProperty("async");
if (async != null) {
+ Iterable<String> opt = async.getValue(Type.STRINGS);
+
+ // Check if this indexing call is a part of async cycle or a
commit hook
+ // In case it's from async cycle - commit info will have a
indexingCheckpointTime key.
+ // Otherwise it's part of commit hook based indexing due to async
property having a value nrt
+ if
(!commitInfo.getInfo().containsKey(IndexConstants.CHECKPOINT_CREATION_TIME)) {
+ waitForESAcknowledgement = false;
+ }
return new ElasticBulkProcessorHandler(elasticConnection,
indexDefinition, definitionBuilder);
}
@@ -166,10 +176,12 @@ class ElasticBulkProcessorHandler {
return false;
}
- try {
- phaser.awaitAdvanceInterruptibly(phase,
indexDefinition.bulkFlushIntervalMs * 5, TimeUnit.MILLISECONDS);
- } catch (InterruptedException | TimeoutException e) {
- LOG.error("Error waiting for bulk requests to return", e);
+ if (waitForESAcknowledgement) {
+ try {
+ phaser.awaitAdvanceInterruptibly(phase,
indexDefinition.bulkFlushIntervalMs * 5, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException | TimeoutException e) {
+ LOG.error("Error waiting for bulk requests to return", e);
+ }
}
if (ioException != null) {