Author: fortino
Date: Fri Jul 17 08:28:19 2020
New Revision: 1879989

URL: http://svn.apache.org/viewvc?rev=1879989&view=rev
Log:
OAK-9133: sync updates with RT support

Added:
    
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
    
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandlerTest.java
Modified:
    
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexWriterFactory.java
    
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexWriterFactory.java
    
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/DefaultIndexWriterFactory.java
    
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditor2Test.java
    
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/MultiplexingLucenePropertyIndexTest.java
    
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/reader/DefaultIndexReaderFactoryTest.java
    
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/MultiplexingIndexWriterTest.java
    
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/LuceneIndexerProvider.java
    
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriter.java
    
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterFactory.java
    
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticFacetTest.java
    
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticOrderByTest.java
    
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterTest.java
    
jackrabbit/oak/trunk/oak-search/src/main/java/org/apache/jackrabbit/oak/plugins/index/search/spi/editor/FulltextIndexEditorContext.java
    
jackrabbit/oak/trunk/oak-search/src/main/java/org/apache/jackrabbit/oak/plugins/index/search/spi/editor/FulltextIndexWriterFactory.java

Modified: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexWriterFactory.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexWriterFactory.java?rev=1879989&r1=1879988&r2=1879989&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexWriterFactory.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexWriterFactory.java
 Fri Jul 17 08:28:19 2020
@@ -22,9 +22,11 @@ package org.apache.jackrabbit.oak.plugin
 import org.apache.jackrabbit.oak.plugins.index.lucene.writer.LuceneIndexWriter;
 import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
 import 
org.apache.jackrabbit.oak.plugins.index.search.spi.editor.FulltextIndexWriterFactory;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.lucene.index.IndexableField;
 
-public interface LuceneIndexWriterFactory extends FulltextIndexWriterFactory {
+public interface LuceneIndexWriterFactory extends 
FulltextIndexWriterFactory<Iterable<? extends IndexableField>> {
     @Override
-    LuceneIndexWriter newInstance(IndexDefinition definition, NodeBuilder 
definitionBuilder, boolean reindex);
+    LuceneIndexWriter newInstance(IndexDefinition definition, NodeBuilder 
definitionBuilder, CommitInfo commitInfo, boolean reindex);
 }

Modified: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexWriterFactory.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexWriterFactory.java?rev=1879989&r1=1879988&r2=1879989&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexWriterFactory.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexWriterFactory.java
 Fri Jul 17 08:28:19 2020
@@ -24,10 +24,11 @@ import java.io.IOException;
 import org.apache.jackrabbit.oak.plugins.index.lucene.writer.LuceneIndexWriter;
 import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
 import 
org.apache.jackrabbit.oak.plugins.index.search.spi.editor.FulltextIndexWriterFactory;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.lucene.index.IndexableField;
 
-public class LocalIndexWriterFactory implements FulltextIndexWriterFactory {
+public class LocalIndexWriterFactory implements 
FulltextIndexWriterFactory<Iterable<? extends IndexableField>> {
     private final LuceneDocumentHolder documentHolder;
     private final String indexPath;
 
@@ -37,7 +38,8 @@ public class LocalIndexWriterFactory imp
     }
 
     @Override
-    public LuceneIndexWriter newInstance(IndexDefinition definition, 
NodeBuilder definitionBuilder, boolean reindex) {
+    public LuceneIndexWriter newInstance(IndexDefinition definition, 
NodeBuilder definitionBuilder,
+                                         CommitInfo commitInfo, boolean 
reindex) {
         return new LocalIndexWriter(definition);
     }
 

Modified: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/DefaultIndexWriterFactory.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/DefaultIndexWriterFactory.java?rev=1879989&r1=1879988&r2=1879989&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/DefaultIndexWriterFactory.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/DefaultIndexWriterFactory.java
 Fri Jul 17 08:28:19 2020
@@ -26,6 +26,7 @@ import org.apache.jackrabbit.oak.plugins
 import 
org.apache.jackrabbit.oak.plugins.index.lucene.directory.DirectoryFactory;
 import org.apache.jackrabbit.oak.plugins.index.search.FulltextIndexConstants;
 import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.mount.MountInfoProvider;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 
@@ -44,8 +45,8 @@ public class DefaultIndexWriterFactory i
     }
 
     @Override
-    public LuceneIndexWriter newInstance(IndexDefinition def,
-                                         NodeBuilder definitionBuilder, 
boolean reindex) {
+    public LuceneIndexWriter newInstance(IndexDefinition def, NodeBuilder 
definitionBuilder,
+                                         CommitInfo commitInfo, boolean 
reindex) {
         Preconditions.checkArgument(def instanceof LuceneIndexDefinition,
                 "Expected {} but found {} for index definition",
                 LuceneIndexDefinition.class, def.getClass());
@@ -54,7 +55,7 @@ public class DefaultIndexWriterFactory i
 
         if (mountInfoProvider.hasNonDefaultMounts()){
             return new MultiplexingIndexWriter(directoryFactory, 
mountInfoProvider, definition,
-                definitionBuilder, reindex, writerConfig);
+                    definitionBuilder, reindex, writerConfig);
         }
         return new DefaultIndexWriter(definition, definitionBuilder, 
directoryFactory,
                 FulltextIndexConstants.INDEX_DATA_CHILD_NAME,

Modified: 
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditor2Test.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditor2Test.java?rev=1879989&r1=1879988&r2=1879989&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditor2Test.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditor2Test.java
 Fri Jul 17 08:28:19 2020
@@ -281,10 +281,10 @@ public class LuceneIndexEditor2Test {
     }
 
 
-    private class TestWriterFactory implements FulltextIndexWriterFactory {
+    private class TestWriterFactory implements 
FulltextIndexWriterFactory<Iterable<? extends IndexableField>> {
         @Override
-        public LuceneIndexWriter newInstance(IndexDefinition definition,
-                                             NodeBuilder definitionBuilder, 
boolean reindex) {
+        public LuceneIndexWriter newInstance(IndexDefinition definition, 
NodeBuilder definitionBuilder,
+                                             CommitInfo commitInfo, boolean 
reindex) {
             return writer;
         }
     }

Modified: 
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/MultiplexingLucenePropertyIndexTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/MultiplexingLucenePropertyIndexTest.java?rev=1879989&r1=1879988&r2=1879989&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/MultiplexingLucenePropertyIndexTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/MultiplexingLucenePropertyIndexTest.java
 Fri Jul 17 08:28:19 2020
@@ -95,11 +95,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import static com.google.common.collect.ImmutableList.of;
-import static 
org.apache.jackrabbit.oak.plugins.memory.PropertyStates.createProperty;
 import static org.apache.jackrabbit.oak.InitialContentHelper.INITIAL_CONTENT;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 public class MultiplexingLucenePropertyIndexTest extends AbstractQueryTest {
     private ExecutorService executorService = Executors.newFixedThreadPool(2);
@@ -110,7 +106,7 @@ public class MultiplexingLucenePropertyI
     private NodeState initialContent = INITIAL_CONTENT;
     private NodeBuilder builder = EMPTY_NODE.builder();
     private MountInfoProvider mip = Mounts.newBuilder()
-        .mount("foo", "/libs", "/apps").build();
+            .mount("foo", "/libs", "/apps").build();
     private NodeStore nodeStore;
 
     @Override
@@ -122,20 +118,20 @@ public class MultiplexingLucenePropertyI
             throw new RuntimeException(e);
         }
         LuceneIndexEditorProvider editorProvider = new 
LuceneIndexEditorProvider(copier,
-            new ExtractedTextCache(10*FileUtils.ONE_MB, 100),
-            null,
-            mip);
+                new ExtractedTextCache(10*FileUtils.ONE_MB, 100),
+                null,
+                mip);
         LuceneIndexProvider provider = new LuceneIndexProvider(new 
IndexTracker(new DefaultIndexReaderFactory(mip, copier)));
         nodeStore = new MemoryNodeStore();
         return new Oak(nodeStore)
-            .with(new InitialContent())
-            .with(new OpenSecurityProvider())
-            .with((QueryIndexProvider) provider)
-            .with((Observer) provider)
-            .with(editorProvider)
-            .with(new PropertyIndexEditorProvider())
-            .with(new NodeTypeIndexProvider())
-            .createContentRepository();
+                .with(new InitialContent())
+                .with(new OpenSecurityProvider())
+                .with((QueryIndexProvider) provider)
+                .with((Observer) provider)
+                .with(editorProvider)
+                .with(new PropertyIndexEditorProvider())
+                .with(new NodeTypeIndexProvider())
+                .createContentRepository();
     }
 
     // OAK-8001
@@ -153,7 +149,7 @@ public class MultiplexingLucenePropertyI
         DefaultIndexWriterFactory factory = new DefaultIndexWriterFactory(mip,
                 new DefaultDirectoryFactory(null, null),
                 new LuceneIndexWriterConfig());
-        LuceneIndexWriter writer = factory.newInstance(defn, builder, true);
+        LuceneIndexWriter writer = factory.newInstance(defn, builder, null, 
true);
         writer.close(0);
 
         //2. Construct the readers
@@ -174,7 +170,7 @@ public class MultiplexingLucenePropertyI
         //1. Have 2 reader created by writes in 2 diff mounts
         DirectoryFactory directoryFactory = new DefaultDirectoryFactory(null, 
null);
         DefaultIndexWriterFactory factory = new DefaultIndexWriterFactory(mip, 
directoryFactory, new LuceneIndexWriterConfig());
-        LuceneIndexWriter writer = factory.newInstance(defn, builder, true);
+        LuceneIndexWriter writer = factory.newInstance(defn, builder, null, 
true);
 
         Document doc = newDoc("/content/en");
         doc.add(new StringField("foo", "bar", Field.Store.NO));
@@ -270,7 +266,7 @@ public class MultiplexingLucenePropertyI
         root.commit();
 
         assertOrderedQuery("select [jcr:path] from [nt:base] where [bar] = 
'baz' order by [foo] asc, [baz] desc",
-            LucenePropertyIndexTest.getSortedPaths(tuples));
+                LucenePropertyIndexTest.getSortedPaths(tuples));
     }
 
     private List<String> getIndexDirNames(String indexName){

Modified: 
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/reader/DefaultIndexReaderFactoryTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/reader/DefaultIndexReaderFactoryTest.java?rev=1879989&r1=1879988&r2=1879989&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/reader/DefaultIndexReaderFactoryTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/reader/DefaultIndexReaderFactoryTest.java
 Fri Jul 17 08:28:19 2020
@@ -35,14 +35,12 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.index.lucene.writer.LuceneIndexWriter;
 import 
org.apache.jackrabbit.oak.plugins.index.lucene.writer.LuceneIndexWriterConfig;
 import org.apache.jackrabbit.oak.plugins.index.search.FieldNames;
-import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
 import org.apache.jackrabbit.oak.spi.mount.MountInfoProvider;
 import org.apache.jackrabbit.oak.spi.mount.Mounts;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.StringField;
-import org.apache.lucene.index.IndexReader;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -73,7 +71,7 @@ public class DefaultIndexReaderFactoryTe
     @Test
     public void indexDir() throws Exception{
         LuceneIndexWriterFactory factory = newDirectoryFactory();
-        LuceneIndexWriter writer = factory.newInstance(defn, builder, true);
+        LuceneIndexWriter writer = factory.newInstance(defn, builder, null, 
true);
 
         writer.updateDocument("/content/en", newDoc("/content/en"));
         writer.close(0);
@@ -90,12 +88,7 @@ public class DefaultIndexReaderFactoryTe
         assertEquals(1, reader.getReader().numDocs());
 
         final AtomicBoolean closed = new AtomicBoolean();
-        reader.getReader().addReaderClosedListener(new 
IndexReader.ReaderClosedListener() {
-            @Override
-            public void onClose(IndexReader reader) {
-                closed.set(true);
-            }
-        });
+        reader.getReader().addReaderClosedListener(reader1 -> 
closed.set(true));
 
         reader.close();
 
@@ -106,12 +99,12 @@ public class DefaultIndexReaderFactoryTe
     public void indexDirWithBlobStore() throws Exception {
         /* Register a blob store */
         CachingFileDataStore ds = DataStoreUtils
-            .createCachingFDS(folder.newFolder().getAbsolutePath(),
-                folder.newFolder().getAbsolutePath());
+                .createCachingFDS(folder.newFolder().getAbsolutePath(),
+                        folder.newFolder().getAbsolutePath());
 
         DirectoryFactory directoryFactory = new DefaultDirectoryFactory(null, 
new DataStoreBlobStore(ds));
         LuceneIndexWriterFactory factory = new DefaultIndexWriterFactory(mip, 
directoryFactory, writerConfig);
-        LuceneIndexWriter writer = factory.newInstance(defn, builder, true);
+        LuceneIndexWriter writer = factory.newInstance(defn, builder, null, 
true);
 
         writer.updateDocument("/content/en", newDoc("/content/en"));
         writer.close(0);
@@ -128,12 +121,7 @@ public class DefaultIndexReaderFactoryTe
         assertEquals(1, reader.getReader().numDocs());
 
         final AtomicBoolean closed = new AtomicBoolean();
-        reader.getReader().addReaderClosedListener(new 
IndexReader.ReaderClosedListener() {
-            @Override
-            public void onClose(IndexReader reader) {
-                closed.set(true);
-            }
-        });
+        reader.getReader().addReaderClosedListener(reader1 -> 
closed.set(true));
 
         reader.close();
 
@@ -145,7 +133,7 @@ public class DefaultIndexReaderFactoryTe
         LuceneIndexWriterFactory factory = newDirectoryFactory();
         enabledSuggestorForSomeProp();
         defn = new LuceneIndexDefinition(root, builder.getNodeState(), "/foo");
-        LuceneIndexWriter writer = factory.newInstance(defn, builder, true);
+        LuceneIndexWriter writer = factory.newInstance(defn, builder, null, 
true);
 
         Document doc = newDoc("/content/en");
         doc.add(new StringField(FieldNames.SUGGEST, "test", null));
@@ -163,7 +151,7 @@ public class DefaultIndexReaderFactoryTe
     @Test
     public void multipleReaders() throws Exception{
         LuceneIndexWriterFactory factory = newDirectoryFactory();
-        LuceneIndexWriter writer = factory.newInstance(defn, builder, true);
+        LuceneIndexWriter writer = factory.newInstance(defn, builder, null, 
true);
 
         writer.updateDocument("/content/en", newDoc("/content/en"));
         writer.updateDocument("/libs/config", newDoc("/libs/config"));
@@ -179,7 +167,7 @@ public class DefaultIndexReaderFactoryTe
         LuceneIndexWriterFactory factory = newDirectoryFactory();
         enabledSuggestorForSomeProp();
         defn = new LuceneIndexDefinition(root, builder.getNodeState(), "/foo");
-        LuceneIndexWriter writer = factory.newInstance(defn, builder, true);
+        LuceneIndexWriter writer = factory.newInstance(defn, builder, null, 
true);
 
         //Suggester field is only present for document in default mount
         Document doc = newDoc("/content/en");

Modified: 
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/MultiplexingIndexWriterTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/MultiplexingIndexWriterTest.java?rev=1879989&r1=1879988&r2=1879989&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/MultiplexingIndexWriterTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/MultiplexingIndexWriterTest.java
 Fri Jul 17 08:28:19 2020
@@ -80,14 +80,14 @@ public class MultiplexingIndexWriterTest
     @Test
     public void defaultWriterWithNoMounts() throws Exception{
         LuceneIndexWriterFactory factory = 
newDirectoryFactory(Mounts.defaultMountInfoProvider());
-        LuceneIndexWriter writer = factory.newInstance(defn, builder, true);
+        LuceneIndexWriter writer = factory.newInstance(defn, builder, null, 
true);
         assertThat(writer, instanceOf(DefaultIndexWriter.class));
     }
 
     @Test
     public void closeWithoutChange() throws Exception{
         LuceneIndexWriterFactory factory = newDirectoryFactory();
-        LuceneIndexWriter writer = factory.newInstance(defn, builder, true);
+        LuceneIndexWriter writer = factory.newInstance(defn, builder, null, 
true);
         assertTrue(writer.close(0));
         assertEquals(2, Iterables.size(getIndexDirNodes()));
         assertFalse(builder.hasChildNode(indexDirName(roMount)));
@@ -95,10 +95,10 @@ public class MultiplexingIndexWriterTest
         assertEquals(0, numDocs(defaultMount));
 
         // delete all dir nodes first
-        getIndexDirNodes().stream().forEach(dirName -> 
builder.getChildNode(dirName).remove());
+        getIndexDirNodes().forEach(dirName -> 
builder.getChildNode(dirName).remove());
 
         // empty index dir doesn't get created during normal indexing
-        writer = factory.newInstance(defn, builder, false);
+        writer = factory.newInstance(defn, builder, null, false);
         assertFalse(writer.close(0));
         assertEquals(0, Iterables.size(getIndexDirNodes()));
     }
@@ -106,7 +106,7 @@ public class MultiplexingIndexWriterTest
     @Test
     public void writesInDefaultMount() throws Exception{
         LuceneIndexWriterFactory factory = newDirectoryFactory();
-        LuceneIndexWriter writer = factory.newInstance(defn, builder, true);
+        LuceneIndexWriter writer = factory.newInstance(defn, builder, null, 
true);
 
 
         //1. Add entry in foo mount
@@ -118,7 +118,7 @@ public class MultiplexingIndexWriterTest
         assertEquals(0, numDocs(defaultMount));
 
         //2. Add entry in default mount
-        writer = factory.newInstance(defn, builder, true);
+        writer = factory.newInstance(defn, builder, null, true);
         writer.updateDocument("/content", newDoc("/content"));
         writer.close(0);
 
@@ -128,7 +128,7 @@ public class MultiplexingIndexWriterTest
         assertEquals(1, numDocs(defaultMount));
 
         //3. Add another entry in foo mount without reindexing
-        writer = factory.newInstance(defn, builder, false);
+        writer = factory.newInstance(defn, builder, null, false);
         writer.updateDocument("/libs/config1", newDoc("/libs/config1"));
         writer.close(0);
 
@@ -137,7 +137,7 @@ public class MultiplexingIndexWriterTest
         assertEquals(1, numDocs(defaultMount));
 
         //4. Add another entry in default mount without reindexing
-        writer = factory.newInstance(defn, builder, false);
+        writer = factory.newInstance(defn, builder, null, false);
         writer.updateDocument("/content1", newDoc("/content1"));
         writer.close(0);
 
@@ -150,12 +150,12 @@ public class MultiplexingIndexWriterTest
     @Test
     public void writesInDefaultMountBlobStore() throws Exception {
         CachingFileDataStore ds = DataStoreUtils
-            .createCachingFDS(folder.newFolder().getAbsolutePath(),
-                folder.newFolder().getAbsolutePath());
+                .createCachingFDS(folder.newFolder().getAbsolutePath(),
+                        folder.newFolder().getAbsolutePath());
 
         DirectoryFactory directoryFactory = new DefaultDirectoryFactory(null, 
new DataStoreBlobStore(ds));
         LuceneIndexWriterFactory factory = new DefaultIndexWriterFactory(mip, 
directoryFactory, writerConfig);
-        LuceneIndexWriter writer = factory.newInstance(defn, builder, true);
+        LuceneIndexWriter writer = factory.newInstance(defn, builder, null, 
true);
 
         //1. Add entry in foo mount
         writer.updateDocument("/libs/config", newDoc("/libs/config"));
@@ -166,7 +166,7 @@ public class MultiplexingIndexWriterTest
         assertEquals(0, numDocs(defaultMount));
 
         //2. Add entry in default mount
-        writer = factory.newInstance(defn, builder, true);
+        writer = factory.newInstance(defn, builder, null, true);
         writer.updateDocument("/content", newDoc("/content"));
         writer.close(0);
 
@@ -176,7 +176,7 @@ public class MultiplexingIndexWriterTest
         assertEquals(1, numDocs(defaultMount));
 
         //3. Add another entry in foo mount without reindexing
-        writer = factory.newInstance(defn, builder, false);
+        writer = factory.newInstance(defn, builder, null, false);
         writer.updateDocument("/libs/config1", newDoc("/libs/config1"));
         writer.close(0);
 
@@ -185,7 +185,7 @@ public class MultiplexingIndexWriterTest
         assertEquals(1, numDocs(defaultMount));
 
         //4. Add another entry in default mount without reindexing
-        writer = factory.newInstance(defn, builder, false);
+        writer = factory.newInstance(defn, builder, null, false);
         writer.updateDocument("/content1", newDoc("/content1"));
         writer.close(0);
 
@@ -198,7 +198,7 @@ public class MultiplexingIndexWriterTest
     @Test
     public void deletes() throws Exception{
         LuceneIndexWriterFactory factory = newDirectoryFactory();
-        LuceneIndexWriter writer = factory.newInstance(defn, builder, true);
+        LuceneIndexWriter writer = factory.newInstance(defn, builder, null, 
true);
 
         writer.updateDocument("/libs/config", newDoc("/libs/config"));
         writer.updateDocument("/libs/install", newDoc("/libs/install"));
@@ -209,14 +209,14 @@ public class MultiplexingIndexWriterTest
         assertEquals(2, numDocs(fooMount));
         assertEquals(2, numDocs(defaultMount));
 
-        writer = factory.newInstance(defn, builder, true);
+        writer = factory.newInstance(defn, builder, null, true);
         writer.deleteDocuments("/libs/config");
         writer.close(0);
 
         assertEquals(1, numDocs(fooMount));
         assertEquals(2, numDocs(defaultMount));
 
-        writer = factory.newInstance(defn, builder, true);
+        writer = factory.newInstance(defn, builder, null, true);
         writer.deleteDocuments("/content");
         writer.close(0);
 
@@ -230,7 +230,7 @@ public class MultiplexingIndexWriterTest
                 .mount("foo", "/content/remote").build();
         initializeMounts();
         LuceneIndexWriterFactory factory = newDirectoryFactory();
-        LuceneIndexWriter writer = factory.newInstance(defn, builder, true);
+        LuceneIndexWriter writer = factory.newInstance(defn, builder, null, 
true);
 
         writer.updateDocument("/content/remote/a", 
newDoc("/content/remote/a"));
         writer.updateDocument("/etc", newDoc("/etc"));
@@ -240,7 +240,7 @@ public class MultiplexingIndexWriterTest
         assertEquals(1, numDocs(fooMount));
         assertEquals(2, numDocs(defaultMount));
 
-        writer = factory.newInstance(defn, builder, true);
+        writer = factory.newInstance(defn, builder, null, true);
         writer.deleteDocuments("/content");
         writer.close(0);
 

Modified: 
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/LuceneIndexerProvider.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/LuceneIndexerProvider.java?rev=1879989&r1=1879988&r2=1879989&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/LuceneIndexerProvider.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/LuceneIndexerProvider.java
 Fri Jul 17 08:28:19 2020
@@ -33,7 +33,6 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.index.lucene.writer.LuceneIndexWriter;
 import 
org.apache.jackrabbit.oak.plugins.index.progress.IndexingProgressReporter;
 import org.apache.jackrabbit.oak.plugins.index.search.ExtractedTextCache;
-import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
 import 
org.apache.jackrabbit.oak.plugins.index.search.spi.binary.FulltextBinaryTextExtractor;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
@@ -66,7 +65,7 @@ public class LuceneIndexerProvider imple
 
         LuceneIndexDefinition idxDefinition = 
LuceneIndexDefinition.newBuilder(root, definition.getNodeState(), 
indexPath).reindex().build();
 
-        LuceneIndexWriter indexWriter = 
indexWriterFactory.newInstance(idxDefinition, definition, true);
+        LuceneIndexWriter indexWriter = 
indexWriterFactory.newInstance(idxDefinition, definition, null, true);
         FulltextBinaryTextExtractor textExtractor = new 
FulltextBinaryTextExtractor(textCache, idxDefinition, true);
         return new LuceneIndexer(
                 idxDefinition,

Added: 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java?rev=1879989&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
 Fri Jul 17 08:28:19 2020
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.elastic.index;
+
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
+import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
+import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.bulk.BackoffPolicy;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+
+import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+
+class ElasticBulkProcessorHandler {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ElasticBulkProcessorHandler.class);
+    private final int FAILED_DOC_COUNT_FOR_STATUS_NODE = 
Integer.getInteger("oak.failedDocStatusLimit", 10000);
+
+    private static final String SYNC_MODE_PROPERTY = "sync-mode";
+    private static final String SYNC_RT_MODE = "rt";
+
+    protected final ElasticConnection elasticConnection;
+    protected final ElasticIndexDefinition indexDefinition;
+    private final NodeBuilder definitionBuilder;
+    protected final BulkProcessor bulkProcessor;
+
+    /**
+     * Coordinates communication between bulk processes. It has a main 
controller registered at creation time and
+     * de-registered on {@link ElasticIndexWriter#close(long)}. Each bulk 
request register a new party in
+     * this Phaser in {@link OakBulkProcessorListener#beforeBulk(long, 
BulkRequest)} and de-register itself when
+     * the request returns.
+     */
+    private final Phaser phaser = new Phaser(1); // register main controller
+
+    /**
+     * Key-value structure to keep the history of bulk requests. Keys are the 
bulk execution ids, the boolean
+     * value is {@code true} when at least an update is performed, otherwise 
{@code false}.
+     */
+    private final ConcurrentHashMap<Long, Boolean> updatesMap = new 
ConcurrentHashMap<>();
+
+    protected long totalOperations;
+
+    private ElasticBulkProcessorHandler(@NotNull ElasticConnection 
elasticConnection,
+                                        @NotNull ElasticIndexDefinition 
indexDefinition,
+                                        @NotNull NodeBuilder 
definitionBuilder) {
+        this.elasticConnection = elasticConnection;
+        this.indexDefinition = indexDefinition;
+        this.definitionBuilder = definitionBuilder;
+        this.bulkProcessor = initBulkProcessor();
+    }
+
+    /**
+     * Returns an ElasticBulkProcessorHandler instance based on the index 
definition configuration.
+     * <p>
+     * The `sync-mode` property can be set to `rt` (real-time). In this case 
the returned handler will be real-time.
+     * This option is available for sync index definitions only.
+     */
+    public static ElasticBulkProcessorHandler getBulkProcessorHandler(@NotNull 
ElasticConnection elasticConnection,
+                                                                      @NotNull 
ElasticIndexDefinition indexDefinition,
+                                                                      @NotNull 
NodeBuilder definitionBuilder, CommitInfo commitInfo) {
+        PropertyState async = 
indexDefinition.getDefinitionNodeState().getProperty("async");
+
+        if (async != null) {
+            return new ElasticBulkProcessorHandler(elasticConnection, 
indexDefinition, definitionBuilder);
+        }
+
+        // commit-info has priority over configuration in index definition
+        String syncMode = null;
+        if (commitInfo != null) {
+            syncMode = (String) commitInfo.getInfo().get(SYNC_MODE_PROPERTY);
+        }
+
+        if (syncMode == null) {
+            PropertyState syncModeProp = 
indexDefinition.getDefinitionNodeState().getProperty("sync-mode");
+            if (syncModeProp != null) {
+                syncMode = syncModeProp.getValue(Type.STRING);
+            }
+        }
+
+        if (SYNC_RT_MODE.equals(syncMode)) {
+            return new RealTimeBulkProcessorHandler(elasticConnection, 
indexDefinition, definitionBuilder);
+        }
+
+        return new ElasticBulkProcessorHandler(elasticConnection, 
indexDefinition, definitionBuilder);
+    }
+
+    private BulkProcessor initBulkProcessor() {
+        return BulkProcessor.builder(requestConsumer(),
+                new OakBulkProcessorListener())
+                .setBulkActions(indexDefinition.bulkActions)
+                .setBulkSize(new ByteSizeValue(indexDefinition.bulkSizeBytes))
+                
.setFlushInterval(TimeValue.timeValueMillis(indexDefinition.bulkFlushIntervalMs))
+                .setBackoffPolicy(BackoffPolicy.exponentialBackoff(
+                        
TimeValue.timeValueMillis(indexDefinition.bulkRetriesBackoff), 
indexDefinition.bulkRetries)
+                )
+                .build();
+    }
+
+    protected BiConsumer<BulkRequest, ActionListener<BulkResponse>> 
requestConsumer() {
+        return (request, bulkListener) -> 
elasticConnection.getClient().bulkAsync(request, RequestOptions.DEFAULT, 
bulkListener);
+    }
+
+    public void add(DocWriteRequest<?> request) {
+        bulkProcessor.add(request);
+        totalOperations++;
+    }
+
+    public boolean close() {
+        LOG.trace("Calling close on bulk processor {}", bulkProcessor);
+        bulkProcessor.close();
+        LOG.trace("Bulk Processor {} closed", bulkProcessor);
+
+        // de-register main controller
+        int phase = phaser.arriveAndDeregister();
+
+        if (totalOperations == 0) { // no need to invoke phaser await if we 
already know there were no operations
+            LOG.debug("No operations executed in this processor. Close 
immediately");
+            return false;
+        }
+
+        try {
+            phaser.awaitAdvanceInterruptibly(phase, 
indexDefinition.bulkFlushIntervalMs * 5, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException | TimeoutException e) {
+            LOG.error("Error waiting for bulk requests to return", e);
+        }
+
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Bulk identifier -> update status = {}", updatesMap);
+        }
+        return updatesMap.containsValue(Boolean.TRUE);
+    }
+
+    private class OakBulkProcessorListener implements BulkProcessor.Listener {
+
+        @Override
+        public void beforeBulk(long executionId, BulkRequest bulkRequest) {
+            // register new bulk party
+            phaser.register();
+
+            // init update status
+            updatesMap.put(executionId, Boolean.FALSE);
+
+            LOG.debug("Sending bulk with id {} -> {}", executionId, 
bulkRequest.getDescription());
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Bulk Requests: \n{}", bulkRequest.requests()
+                        .stream()
+                        .map(DocWriteRequest::toString)
+                        .collect(Collectors.joining("\n"))
+                );
+            }
+        }
+
+        @Override
+        public void afterBulk(long executionId, BulkRequest bulkRequest, 
BulkResponse bulkResponse) {
+            LOG.debug("Bulk with id {} processed with status {} in {}", 
executionId, bulkResponse.status(), bulkResponse.getTook());
+            if (LOG.isTraceEnabled()) {
+                try {
+                    
LOG.trace(Strings.toString(bulkResponse.toXContent(jsonBuilder(), 
EMPTY_PARAMS)));
+                } catch (IOException e) {
+                    LOG.error("Error decoding bulk response", e);
+                }
+            }
+            if (bulkResponse.hasFailures()) { // check if some operations 
failed to execute
+                Set<String> failedDocSet = new LinkedHashSet<>();
+                NodeBuilder status = 
definitionBuilder.child(IndexDefinition.STATUS_NODE);
+                // Read the current failed paths (if any) on the :status node 
into failedDocList
+                if (status.hasProperty(IndexDefinition.FAILED_DOC_PATHS)) {
+                    for (String str : 
status.getProperty(IndexDefinition.FAILED_DOC_PATHS).getValue(Type.STRINGS)) {
+                        failedDocSet.add(str);
+                    }
+                }
+
+                int initialSize = failedDocSet.size();
+                boolean isFailedDocSetFull = false;
+
+                boolean hasSuccesses = false;
+                for (BulkItemResponse bulkItemResponse : bulkResponse) {
+                    if (bulkItemResponse.isFailed()) {
+                        BulkItemResponse.Failure failure = 
bulkItemResponse.getFailure();
+                        if (!isFailedDocSetFull && failedDocSet.size() < 
FAILED_DOC_COUNT_FOR_STATUS_NODE) {
+                            failedDocSet.add(bulkItemResponse.getId());
+                        } else {
+                            isFailedDocSetFull = true;
+                        }
+                        // Log entry to be used to parse logs to get the 
failed doc id/path if needed
+                        LOG.error("ElasticIndex Update Doc Failure: Error 
while adding/updating doc with id : [{}]", bulkItemResponse.getId());
+                        LOG.error("Failure Details: BulkItem ID: " + 
failure.getId() + ", Failure Cause: {}", failure.getCause());
+                    } else if (!hasSuccesses) {
+                        // Set indexUpdated to true even if 1 item was updated 
successfully
+                        updatesMap.put(executionId, Boolean.TRUE);
+                        hasSuccesses = true;
+                    }
+                }
+
+                if (isFailedDocSetFull) {
+                    LOG.info("Cannot store all new Failed Docs because {} has 
been filled up. " +
+                            "See previous log entries to find out the details 
of failed paths", IndexDefinition.FAILED_DOC_PATHS);
+                } else if (failedDocSet.size() != initialSize) {
+                    status.setProperty(IndexDefinition.FAILED_DOC_PATHS, 
failedDocSet, Type.STRINGS);
+                }
+            } else {
+                updatesMap.put(executionId, Boolean.TRUE);
+            }
+            phaser.arriveAndDeregister();
+        }
+
+        @Override
+        public void afterBulk(long executionId, BulkRequest bulkRequest, 
Throwable throwable) {
+            LOG.error("ElasticIndex Update Bulk Failure : Bulk with id {} 
threw an error", executionId, throwable);
+            phaser.arriveAndDeregister();
+        }
+    }
+
+    /**
+     * {@link ElasticBulkProcessorHandler} extension with real time behaviour.
+     * It also uses the same async bulk processor as the parent except for the 
last flush that waits until the
+     * indexed documents are searchable.
+     */
+    protected static class RealTimeBulkProcessorHandler extends 
ElasticBulkProcessorHandler {
+
+        private final AtomicBoolean isClosed = new AtomicBoolean(false);
+        private final AtomicBoolean isDataSearchable = new 
AtomicBoolean(false);
+
+        private RealTimeBulkProcessorHandler(@NotNull ElasticConnection 
elasticConnection,
+                                             @NotNull ElasticIndexDefinition 
indexDefinition,
+                                             @NotNull NodeBuilder 
definitionBuilder) {
+            super(elasticConnection, indexDefinition, definitionBuilder);
+        }
+
+        @Override
+        protected BiConsumer<BulkRequest, ActionListener<BulkResponse>> 
requestConsumer() {
+            return (request, bulkListener) -> {
+                if (isClosed.get()) {
+                    LOG.debug("Processor is closing. Next request with {} 
actions will block until the data is searchable",
+                            request.requests().size());
+                    
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
+                    isDataSearchable.set(true);
+                }
+                elasticConnection.getClient().bulkAsync(request, 
RequestOptions.DEFAULT, bulkListener);
+            };
+        }
+
+        @Override
+        public boolean close() {
+            isClosed.set(true);
+            // calling super closes the bulk processor. If not empty it calls 
#requestConsumer for the last time
+            boolean closed = super.close();
+            // it could happen that close gets called when the bulk has 
already been flushed. In these cases we trigger
+            // an actual refresh to make sure the docs are searchable before 
returning from the method
+            if (totalOperations > 0 && !isDataSearchable.get()) {
+                LOG.debug("Forcing refresh");
+                try {
+                    this.elasticConnection.getClient()
+                            .indices()
+                            .refresh(new 
RefreshRequest(indexDefinition.getRemoteIndexAlias()), RequestOptions.DEFAULT);
+                } catch (IOException e) {
+                    LOG.warn("Error refreshing index " + 
indexDefinition.getRemoteIndexAlias(), e);
+                }
+            }
+            return closed;
+        }
+    }
+}

Modified: 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriter.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriter.java?rev=1879989&r1=1879988&r2=1879989&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriter.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriter.java
 Fri Jul 17 08:28:19 2020
@@ -16,23 +16,16 @@
  */
 package org.apache.jackrabbit.oak.plugins.index.elastic.index;
 
-import org.apache.jackrabbit.oak.api.Type;
 import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
 import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
 import org.apache.jackrabbit.oak.plugins.index.elastic.util.ElasticIndexUtils;
-import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
 import 
org.apache.jackrabbit.oak.plugins.index.search.spi.editor.FulltextIndexWriter;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.elasticsearch.ElasticsearchStatusException;
-import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
 import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
-import org.elasticsearch.action.bulk.BackoffPolicy;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
@@ -44,8 +37,6 @@ import org.elasticsearch.client.indices.
 import org.elasticsearch.client.indices.GetIndexRequest;
 import org.elasticsearch.cluster.metadata.AliasMetadata;
 import org.elasticsearch.common.Strings;
-import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.TestOnly;
@@ -53,14 +44,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.HashSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Phaser;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@@ -70,55 +55,26 @@ class ElasticIndexWriter implements Full
 
     private final ElasticConnection elasticConnection;
     private final ElasticIndexDefinition indexDefinition;
-    private final NodeBuilder definitionBuilder;
-    private final int FAILED_DOC_COUNT_FOR_STATUS_NODE = 
Integer.getInteger("oak.failedDocStatusLimit", 10000);
 
-    /**
-     * Coordinates communication between bulk processes. It has a main 
controller registered at creation time and
-     * de-registered on {@link ElasticIndexWriter#close(long)}. Each bulk 
request register a new party in
-     * this Phaser in {@link OakBulkProcessorListener#beforeBulk(long, 
BulkRequest)} and de-register itself when
-     * the request returns.
-     */
-    private final Phaser phaser = new Phaser(1); // register main controller
-    /**
-     * Key-value structure to keep the history of bulk requests. Keys are the 
bulk execution ids, the boolean
-     * value is {@code true} when at least an update is performed, otherwise 
{@code false}.
-     */
-    private final ConcurrentHashMap<Long, Boolean> updatesMap = new 
ConcurrentHashMap<>();
-    private final Set<String> failedDocSet = new HashSet<>();
-    private final BulkProcessor bulkProcessor;
+    private final ElasticBulkProcessorHandler bulkProcessorHandler;
 
     ElasticIndexWriter(@NotNull ElasticConnection elasticConnection,
                        @NotNull ElasticIndexDefinition indexDefinition,
-                       @NotNull NodeBuilder definitionBuilder) {
+                       @NotNull NodeBuilder definitionBuilder,
+                       CommitInfo commitInfo) {
         this.elasticConnection = elasticConnection;
         this.indexDefinition = indexDefinition;
-        this.definitionBuilder = definitionBuilder;
-        bulkProcessor = initBulkProcessor();
+        this.bulkProcessorHandler = ElasticBulkProcessorHandler
+                .getBulkProcessorHandler(elasticConnection, indexDefinition, 
definitionBuilder, commitInfo);
     }
 
     @TestOnly
     ElasticIndexWriter(@NotNull ElasticConnection elasticConnection,
                        @NotNull ElasticIndexDefinition indexDefinition,
-                       @NotNull BulkProcessor bulkProcessor,
-                       @NotNull NodeBuilder definitionBuilder) {
+                       @NotNull ElasticBulkProcessorHandler 
bulkProcessorHandler) {
         this.elasticConnection = elasticConnection;
         this.indexDefinition = indexDefinition;
-        this.bulkProcessor = bulkProcessor;
-        this.definitionBuilder = definitionBuilder;
-    }
-
-    private BulkProcessor initBulkProcessor() {
-        return BulkProcessor.builder((request, bulkListener) ->
-                        elasticConnection.getClient().bulkAsync(request, 
RequestOptions.DEFAULT, bulkListener),
-                new OakBulkProcessorListener())
-                .setBulkActions(indexDefinition.bulkActions)
-                .setBulkSize(new ByteSizeValue(indexDefinition.bulkSizeBytes))
-                
.setFlushInterval(TimeValue.timeValueMillis(indexDefinition.bulkFlushIntervalMs))
-                .setBackoffPolicy(BackoffPolicy.exponentialBackoff(
-                        
TimeValue.timeValueMillis(indexDefinition.bulkRetriesBackoff), 
indexDefinition.bulkRetries)
-                )
-                .build();
+        this.bulkProcessorHandler = bulkProcessorHandler;
     }
 
     @Override
@@ -126,35 +82,19 @@ class ElasticIndexWriter implements Full
         IndexRequest request = new 
IndexRequest(indexDefinition.getRemoteIndexAlias())
                 .id(ElasticIndexUtils.idFromPath(path))
                 .source(doc.build(), XContentType.JSON);
-        bulkProcessor.add(request);
+        bulkProcessorHandler.add(request);
     }
 
     @Override
     public void deleteDocuments(String path) {
         DeleteRequest request = new 
DeleteRequest(indexDefinition.getRemoteIndexAlias())
                 .id(ElasticIndexUtils.idFromPath(path));
-        bulkProcessor.add(request);
+        bulkProcessorHandler.add(request);
     }
 
     @Override
     public boolean close(long timestamp) {
-        LOG.trace("Calling close on bulk processor {}", bulkProcessor);
-        bulkProcessor.close();
-        LOG.trace("Bulk Processor {} closed", bulkProcessor);
-
-        // de-register main controller
-        final int phase = phaser.arriveAndDeregister();
-
-        try {
-            phaser.awaitAdvanceInterruptibly(phase, 
indexDefinition.bulkFlushIntervalMs * 5, TimeUnit.MILLISECONDS);
-        } catch (InterruptedException | TimeoutException e) {
-            LOG.error("Error waiting for bulk requests to return", e);
-        }
-
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("Bulk identifier -> update status = {}", updatesMap);
-        }
-        return updatesMap.containsValue(Boolean.TRUE);
+        return bulkProcessorHandler.close();
     }
 
     protected void provisionIndex() throws IOException {
@@ -231,77 +171,4 @@ class ElasticIndexWriter implements Full
         checkResponseAcknowledgement(deleteIndexResponse, "Delete index call 
not acknowledged for indices " + indices);
         LOG.info("Deleted indices {}. Response acknowledged: {}", 
indices.toString(), deleteIndexResponse.isAcknowledged());
     }
-
-    private class OakBulkProcessorListener implements BulkProcessor.Listener {
-
-        @Override
-        public void beforeBulk(long executionId, BulkRequest bulkRequest) {
-            // register new bulk party
-            phaser.register();
-            // init update status
-            updatesMap.put(executionId, Boolean.FALSE);
-
-            LOG.debug("Sending bulk with id {} -> {}", executionId, 
bulkRequest.getDescription());
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Bulk Requests: \n{}", bulkRequest.requests()
-                        .stream()
-                        .map(DocWriteRequest::toString)
-                        .collect(Collectors.joining("\n"))
-                );
-            }
-        }
-
-        @Override
-        public void afterBulk(long executionId, BulkRequest bulkRequest, 
BulkResponse bulkResponse) {
-            LOG.debug("Bulk with id {} processed with status {} in {}", 
executionId, bulkResponse.status(), bulkResponse.getTook());
-            if (LOG.isTraceEnabled()) {
-                try {
-                    
LOG.trace(Strings.toString(bulkResponse.toXContent(jsonBuilder(), 
EMPTY_PARAMS)));
-                } catch (IOException e) {
-                    LOG.error("Error decoding bulk response", e);
-                }
-            }
-            if (bulkResponse.hasFailures()) { // check if some operations 
failed to execute
-
-                NodeBuilder status = 
definitionBuilder.child(IndexDefinition.STATUS_NODE);
-                // Read the current failed paths (if any) on the :status node 
into failedDocList
-                if (status.hasProperty(IndexDefinition.FAILED_DOC_PATHS)) {
-                    for (String str : 
status.getProperty(IndexDefinition.FAILED_DOC_PATHS).getValue(Type.STRINGS)) {
-                        failedDocSet.add(str);
-                    }
-                }
-
-                for (BulkItemResponse bulkItemResponse : bulkResponse) {
-                    if (bulkItemResponse.isFailed()) {
-                        BulkItemResponse.Failure failure = 
bulkItemResponse.getFailure();
-                        failedDocSet.add(bulkItemResponse.getId());
-                        // Log entry to be used to parse logs to get the 
failed doc id/path if needed
-                        LOG.error("ElasticIndex Update Doc Failure: Error 
while adding/updating doc with id : [{}]", bulkItemResponse.getId());
-                        LOG.error("Failure Details: BulkItem ID: " + 
failure.getId() + ", Failure Cause: {}", failure.getCause());
-                    } else {
-                        // Set indexUpdated to true even if 1 item was updated 
successfully
-                        updatesMap.put(executionId, Boolean.TRUE);
-                    }
-                }
-
-                if (failedDocSet.size() == FAILED_DOC_COUNT_FOR_STATUS_NODE) {
-                    LOG.info("Failed Docs count exceeds the persistence limit. 
Will skip persisting paths of more failed docs." +
-                            "Failing docs should be mentioned above under 
ElasticIndex Update Doc Failure");
-                } else {
-                    status.setProperty(IndexDefinition.FAILED_DOC_PATHS, 
failedDocSet, Type.STRINGS);
-                }
-
-            } else {
-                updatesMap.put(executionId, Boolean.TRUE);
-            }
-            phaser.arriveAndDeregister();
-        }
-
-        @Override
-        public void afterBulk(long executionId, BulkRequest bulkRequest, 
Throwable throwable) {
-            LOG.error("ElasticIndex Update Bulk Failure : Bulk with id {} 
threw an error", executionId, throwable);
-            phaser.arriveAndDeregister();
-        }
-    }
-
 }

Modified: 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterFactory.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterFactory.java?rev=1879989&r1=1879988&r2=1879989&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterFactory.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterFactory.java
 Fri Jul 17 08:28:19 2020
@@ -20,6 +20,7 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
 import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
 import 
org.apache.jackrabbit.oak.plugins.index.search.spi.editor.FulltextIndexWriterFactory;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.jetbrains.annotations.NotNull;
 
@@ -31,11 +32,12 @@ class ElasticIndexWriterFactory implemen
     }
 
     @Override
-    public ElasticIndexWriter newInstance(IndexDefinition definition, 
NodeBuilder definitionBuilder, boolean reindex) {
+    public ElasticIndexWriter newInstance(IndexDefinition definition, 
NodeBuilder definitionBuilder,
+                                          CommitInfo commitInfo, boolean 
reindex) {
         if (!(definition instanceof ElasticIndexDefinition)) {
             throw new IllegalArgumentException("IndexDefinition must be of 
type ElasticsearchIndexDefinition " +
                     "instead of " + definition.getClass().getName());
         }
-        return new ElasticIndexWriter(elasticConnection, 
(ElasticIndexDefinition) definition, definitionBuilder);
+        return new ElasticIndexWriter(elasticConnection, 
(ElasticIndexDefinition) definition, definitionBuilder, commitInfo);
     }
 }

Modified: 
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticFacetTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticFacetTest.java?rev=1879989&r1=1879988&r2=1879989&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticFacetTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticFacetTest.java
 Fri Jul 17 08:28:19 2020
@@ -59,7 +59,6 @@ import java.util.stream.Collectors;
 import static org.apache.jackrabbit.commons.JcrUtils.getOrCreateByPath;
 import static org.apache.jackrabbit.oak.InitialContentHelper.INITIAL_CONTENT;
 import static 
org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME;
-import static 
org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition.BULK_FLUSH_INTERVAL_MS_DEFAULT;
 import static 
org.apache.jackrabbit.oak.plugins.index.search.FulltextIndexConstants.FACETS;
 import static 
org.apache.jackrabbit.oak.plugins.index.search.FulltextIndexConstants.PROP_REFRESH_DEFN;
 import static 
org.apache.jackrabbit.oak.plugins.index.search.FulltextIndexConstants.PROP_SECURE_FACETS;
@@ -164,6 +163,7 @@ public class ElasticFacetTest {
         IndexSkeleton indexSkeleton = new IndexSkeleton();
         indexSkeleton.initialize();
         indexSkeleton.indexDefinitionBuilder.noAsync();
+        
indexSkeleton.indexDefinitionBuilder.getBuilderTree().setProperty("sync-mode", 
"rt");
         indexSkeleton.indexRule.property("cons").propertyIndex();
         
indexSkeleton.indexRule.property("foo").propertyIndex().getBuilderTree().setProperty(FACET_PROP,
 true, Type.BOOLEAN);
         
indexSkeleton.indexRule.property("bar").propertyIndex().getBuilderTree().setProperty(FACET_PROP,
 true, Type.BOOLEAN);
@@ -226,7 +226,7 @@ public class ElasticFacetTest {
     @Test
     public void secureFacets() throws Exception {
         createDataset(NUM_LEAF_NODES_FOR_LARGE_DATASET);
-        assertEventually(() -> assertEquals(actualAclLabelCount, getFacets()));
+        assertEquals(actualAclLabelCount, getFacets());
     }
 
     @Test
@@ -236,7 +236,7 @@ public class ElasticFacetTest {
         inaccessibleChild.setProperty("cons", "val");
         inaccessibleChild.setProperty("foo", "l4");
         adminSession.save();
-        assertEventually(() -> assertEquals(actualAclLabelCount, getFacets()));
+        assertEquals(actualAclLabelCount, getFacets());
     }
 
     @Test
@@ -246,7 +246,7 @@ public class ElasticFacetTest {
         adminSession.save();
 
         createDataset(NUM_LEAF_NODES_FOR_LARGE_DATASET);
-        assertEventually(() -> assertEquals(actualLabelCount, getFacets()));
+        assertEquals(actualLabelCount, getFacets());
     }
 
     @Test
@@ -258,17 +258,15 @@ public class ElasticFacetTest {
 
         createDataset(NUM_LEAF_NODES_FOR_LARGE_DATASET);
 
-        assertEventually(() -> assertEquals("Unexpected number of facets", 
actualAclLabelCount.size(), getFacets().size()));
+        assertEquals("Unexpected number of facets", 
actualAclLabelCount.size(), getFacets().size());
 
         for (Map.Entry<String, Integer> facet : 
actualAclLabelCount.entrySet()) {
             String facetLabel = facet.getKey();
-            assertEventually(() -> {
-                int facetCount = getFacets().get(facetLabel);
-                float ratio = ((float) facetCount) / facet.getValue();
-                assertTrue("Facet count for label: " + facetLabel + " is 
outside of 10% margin of error. " +
-                                "Expected: " + facet.getValue() + "; Got: " + 
facetCount + "; Ratio: " + ratio,
-                        Math.abs(ratio - 1) < 0.1);
-            });
+            int facetCount = getFacets().get(facetLabel);
+            float ratio = ((float) facetCount) / facet.getValue();
+            assertTrue("Facet count for label: " + facetLabel + " is outside 
of 10% margin of error. " +
+                            "Expected: " + facet.getValue() + "; Got: " + 
facetCount + "; Ratio: " + ratio,
+                    Math.abs(ratio - 1) < 0.1);
         }
     }
 
@@ -281,11 +279,11 @@ public class ElasticFacetTest {
 
         createDataset(NUM_LEAF_NODES_FOR_SMALL_DATASET);
 
-        assertEventually(() -> assertEquals("Unexpected number of facets", 
actualAclLabelCount.size(), getFacets().size()));
+        assertEquals("Unexpected number of facets", 
actualAclLabelCount.size(), getFacets().size());
 
         // Since the hit count is less than sample size -> flow should have 
switched to secure facet count instead of statistical
         // and thus the count should be exactly equal
-        assertEventually(() -> assertEquals(actualAclLabelCount, getFacets()));
+        assertEquals(actualAclLabelCount, getFacets());
     }
 
     @Test
@@ -297,19 +295,17 @@ public class ElasticFacetTest {
 
         createDataset(NUM_LEAF_NODES_FOR_LARGE_DATASET);
 
-        assertEventually(() -> {
-            Map<String, Integer> facets = getFacets("/parent/par1");
-            assertEquals("Unexpected number of facets", 
actualAclPar1LabelCount.size(), facets.size());
+        Map<String, Integer> facets = getFacets("/parent/par1");
+        assertEquals("Unexpected number of facets", 
actualAclPar1LabelCount.size(), facets.size());
 
         for (Map.Entry<String, Integer> facet : 
actualAclPar1LabelCount.entrySet()) {
             String facetLabel = facet.getKey();
             int facetCount = facets.get(facetLabel);
             float ratio = ((float) facetCount) / facet.getValue();
             assertTrue("Facet count for label: " + facetLabel + " is outside 
of 10% margin of error. " +
-                                "Expected: " + facet.getValue() + "; Got: " + 
facetCount + "; Ratio: " + ratio,
-                        Math.abs(ratio - 1) < 0.1);
-            }
-        });
+                            "Expected: " + facet.getValue() + "; Got: " + 
facetCount + "; Ratio: " + ratio,
+                    Math.abs(ratio - 1) < 0.1);
+        }
     }
 
     @Test
@@ -324,21 +320,19 @@ public class ElasticFacetTest {
         inaccessibleChild.setProperty("cons", "val");
         inaccessibleChild.setProperty("foo", "l4");
         adminSession.save();
-        assertEventually(() -> {
-            Map<String, Integer> facets = getFacets();
-            assertEquals("Unexpected number of facets", 
actualAclLabelCount.size(), facets.size());
-        });
+
+        Map<String, Integer> facets = getFacets();
+        assertEquals("Unexpected number of facets", 
actualAclLabelCount.size(), facets.size());
 
         for (Map.Entry<String, Integer> facet : 
actualAclLabelCount.entrySet()) {
 
-            assertEventually(() -> {
-                String facetLabel = facet.getKey();
-                int facetCount = getFacets().get(facetLabel);
-                float ratio = ((float) facetCount) / facet.getValue();
-                assertTrue("Facet count for label: " + facetLabel + " is 
outside of 10% margin of error. " +
-                                "Expected: " + facet.getValue() + "; Got: " + 
facetCount + "; Ratio: " + ratio,
-                        Math.abs(ratio - 1) < 0.1);
-            });
+
+            String facetLabel = facet.getKey();
+            int facetCount = getFacets().get(facetLabel);
+            float ratio = ((float) facetCount) / facet.getValue();
+            assertTrue("Facet count for label: " + facetLabel + " is outside 
of 10% margin of error. " +
+                            "Expected: " + facet.getValue() + "; Got: " + 
facetCount + "; Ratio: " + ratio,
+                    Math.abs(ratio - 1) < 0.1);
         }
     }
 
@@ -350,7 +344,7 @@ public class ElasticFacetTest {
         adminSession.save();
         createDataset(NUM_LEAF_NODES_FOR_LARGE_DATASET);
         qe = adminSession.getWorkspace().getQueryManager();
-        assertEventually(() -> assertEquals(actualLabelCount, getFacets()));
+        assertEquals(actualLabelCount, getFacets());
     }
 
     @Test
@@ -361,20 +355,19 @@ public class ElasticFacetTest {
         adminSession.save();
         createDataset(NUM_LEAF_NODES_FOR_LARGE_DATASET);
         qe = adminSession.getWorkspace().getQueryManager();
-        assertEventually(() -> {
-            Map<String, Integer> facets = getFacets();
-            assertEquals("Unexpected number of facets", 
actualLabelCount.size(), facets.size());
-        });
+
+        Map<String, Integer> facets = getFacets();
+        assertEquals("Unexpected number of facets", actualLabelCount.size(), 
facets.size());
 
         for (Map.Entry<String, Integer> facet : actualLabelCount.entrySet()) {
-            assertEventually(() -> {
-                String facetLabel = facet.getKey();
-                int facetCount = getFacets().get(facetLabel);
-                float ratio = ((float) facetCount) / facet.getValue();
-                assertTrue("Facet count for label: " + facetLabel + " is 
outside of 5% margin of error. " +
-                                "Expected: " + facet.getValue() + "; Got: " + 
facetCount + "; Ratio: " + ratio,
-                        Math.abs(ratio - 1) < 0.05);
-            });
+
+            String facetLabel = facet.getKey();
+            int facetCount = getFacets().get(facetLabel);
+            float ratio = ((float) facetCount) / facet.getValue();
+            assertTrue("Facet count for label: " + facetLabel + " is outside 
of 5% margin of error. " +
+                            "Expected: " + facet.getValue() + "; Got: " + 
facetCount + "; Ratio: " + ratio,
+                    Math.abs(ratio - 1) < 0.05);
+
         }
     }
 
@@ -416,8 +409,4 @@ public class ElasticFacetTest {
                 .collect(Collectors.toMap(FacetResult.Facet::getLabel, 
FacetResult.Facet::getCount));
     }
 
-    private static void assertEventually(Runnable r) {
-        ElasticTestUtils.assertEventually(r, BULK_FLUSH_INTERVAL_MS_DEFAULT * 
3);
-    }
-
 }

Modified: 
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticOrderByTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticOrderByTest.java?rev=1879989&r1=1879988&r2=1879989&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticOrderByTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticOrderByTest.java
 Fri Jul 17 08:28:19 2020
@@ -22,6 +22,7 @@ import org.apache.jackrabbit.oak.query.A
 import org.junit.Test;
 
 import javax.jcr.PropertyType;
+import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 
@@ -38,15 +39,15 @@ public class ElasticOrderByTest extends
                 .analyzed();
 
         setIndex(UUID.randomUUID().toString(), builder);
+        root.commit();
 
         Tree test = root.getTree("/").addChild("test");
         test.addChild("a").setProperty("foo", "hello");
         test.addChild("b").setProperty("foo", "hello hello");
-        root.commit();
+        root.commit(Collections.singletonMap("sync-mode", "rt"));
 
         // results are sorted by score desc, node `b` returns first because it 
has a higher score from a tf/idf perspective
-        assertEventually(() -> assertOrderedQuery("select [jcr:path] from 
[nt:base] where contains(foo, 'hello')",
-                asList("/test/b", "/test/a")));
+        assertOrderedQuery("select [jcr:path] from [nt:base] where 
contains(foo, 'hello')", asList("/test/b", "/test/a"));
     }
 
     @Test
@@ -61,15 +62,13 @@ public class ElasticOrderByTest extends
         Tree test = root.getTree("/").addChild("test");
         test.addChild("a").setProperty("foo", "hello");
         test.addChild("b").setProperty("foo", "hello hello");
-        root.commit();
+        root.commit(Collections.singletonMap("sync-mode", "rt"));
+
+        assertOrderedQuery("select [jcr:path] from [nt:base] where 
contains(foo, 'hello') order by [jcr:score]",
+                asList("/test/a", "/test/b"));
 
-        assertEventually(() -> {
-            assertOrderedQuery("select [jcr:path] from [nt:base] where 
contains(foo, 'hello') order by [jcr:score]",
-                    asList("/test/a", "/test/b"));
-
-            assertOrderedQuery("select [jcr:path] from [nt:base] where 
contains(foo, 'hello') order by [jcr:score] DESC",
-                    asList("/test/b", "/test/a"));
-        });
+        assertOrderedQuery("select [jcr:path] from [nt:base] where 
contains(foo, 'hello') order by [jcr:score] DESC",
+                asList("/test/b", "/test/a"));
     }
 
     @Test
@@ -82,13 +81,10 @@ public class ElasticOrderByTest extends
         Tree test = root.getTree("/").addChild("test");
         test.addChild("a").setProperty("foo", "aaaaaa");
         test.addChild("b").setProperty("foo", "bbbbbb");
-        root.commit();
-
-        assertEventually(() -> {
-            assertOrderedQuery("select [jcr:path] from [nt:base] order by 
[jcr:path]", asList("/test/a", "/test/b"));
+        root.commit(Collections.singletonMap("sync-mode", "rt"));
 
-            assertOrderedQuery("select [jcr:path] from [nt:base] order by 
[jcr:path] DESC", asList("/test/b", "/test/a"));
-        });
+        assertOrderedQuery("select [jcr:path] from [nt:base] order by 
[jcr:path]", asList("/test/a", "/test/b"));
+        assertOrderedQuery("select [jcr:path] from [nt:base] order by 
[jcr:path] DESC", asList("/test/b", "/test/a"));
     }
 
     @Test
@@ -101,13 +97,10 @@ public class ElasticOrderByTest extends
         Tree test = root.getTree("/").addChild("test");
         test.addChild("a").setProperty("foo", "zzzzzz");
         test.addChild("b").setProperty("foo", "aaaaaa");
-        root.commit();
-
-        assertEventually(() -> {
-            assertOrderedQuery("select [jcr:path] from [nt:base] order by 
@foo", asList("/test/b", "/test/a"));
+        root.commit(Collections.singletonMap("sync-mode", "rt"));
 
-            assertOrderedQuery("select [jcr:path] from [nt:base] order by @foo 
DESC", asList("/test/a", "/test/b"));
-        });
+        assertOrderedQuery("select [jcr:path] from [nt:base] order by @foo", 
asList("/test/b", "/test/a"));
+        assertOrderedQuery("select [jcr:path] from [nt:base] order by @foo 
DESC", asList("/test/a", "/test/b"));
     }
 
     @Test
@@ -120,16 +113,13 @@ public class ElasticOrderByTest extends
         Tree test = root.getTree("/").addChild("test");
         test.addChild("a").setProperty("foo", "bbbbb");
         test.addChild("b").setProperty("foo", "hello aaaa");
-        root.commit();
+        root.commit(Collections.singletonMap("sync-mode", "rt"));
 
         // this test verifies we use the keyword multi field when an analyzed 
properties is specified in order by
         // http://www.technocratsid.com/string-sorting-in-elasticsearch/
 
-        assertEventually(() -> {
-            assertOrderedQuery("select [jcr:path] from [nt:base] order by 
@foo", asList("/test/a", "/test/b"));
-
-            assertOrderedQuery("select [jcr:path] from [nt:base] order by @foo 
DESC", asList("/test/b", "/test/a"));
-        });
+        assertOrderedQuery("select [jcr:path] from [nt:base] order by @foo", 
asList("/test/a", "/test/b"));
+        assertOrderedQuery("select [jcr:path] from [nt:base] order by @foo 
DESC", asList("/test/b", "/test/a"));
     }
 
     @Test
@@ -142,13 +132,10 @@ public class ElasticOrderByTest extends
         Tree test = root.getTree("/").addChild("test");
         test.addChild("a").setProperty("foo", "10");
         test.addChild("b").setProperty("foo", "5");
-        root.commit();
-
-        assertEventually(() -> {
-            assertOrderedQuery("select [jcr:path] from [nt:base] order by 
@foo", asList("/test/b", "/test/a"));
+        root.commit(Collections.singletonMap("sync-mode", "rt"));
 
-            assertOrderedQuery("select [jcr:path] from [nt:base] order by @foo 
DESC", asList("/test/a", "/test/b"));
-        });
+        assertOrderedQuery("select [jcr:path] from [nt:base] order by @foo", 
asList("/test/b", "/test/a"));
+        assertOrderedQuery("select [jcr:path] from [nt:base] order by @foo 
DESC", asList("/test/a", "/test/b"));
     }
 
     @Test
@@ -169,18 +156,16 @@ public class ElasticOrderByTest extends
         Tree b = test.addChild("b");
         b.setProperty("foo", "b");
         b.setProperty("bar", "100");
-        root.commit();
+        root.commit(Collections.singletonMap("sync-mode", "rt"));
+
+        assertOrderedQuery("select [jcr:path] from [nt:base] order by @foo, 
@bar",
+                asList("/test/a1", "/test/a2", "/test/b"));
+
+        assertOrderedQuery("select [jcr:path] from [nt:base] order by @foo, 
@bar DESC",
+                asList("/test/a2", "/test/a1", "/test/b"));
 
-        assertEventually(() -> {
-            assertOrderedQuery("select [jcr:path] from [nt:base] order by 
@foo, @bar",
-                    asList("/test/a1", "/test/a2", "/test/b"));
-
-            assertOrderedQuery("select [jcr:path] from [nt:base] order by 
@foo, @bar DESC",
-                    asList("/test/a2", "/test/a1", "/test/b"));
-
-            assertOrderedQuery("select [jcr:path] from [nt:base] order by @bar 
DESC, @foo DESC",
-                    asList("/test/b", "/test/a2", "/test/a1"));
-        });
+        assertOrderedQuery("select [jcr:path] from [nt:base] order by @bar 
DESC, @foo DESC",
+                asList("/test/b", "/test/a2", "/test/a1"));
     }
 
     private void assertOrderedQuery(String sql, List<String> paths) {

Added: 
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandlerTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandlerTest.java?rev=1879989&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandlerTest.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandlerTest.java
 Fri Jul 17 08:28:19 2020
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.elastic.index;
+
+import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
+import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
+import org.apache.jackrabbit.oak.plugins.memory.MultiStringPropertyState;
+import org.apache.jackrabbit.oak.plugins.memory.StringPropertyState;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.when;
+
+public class ElasticBulkProcessorHandlerTest {
+
+    @Mock
+    private ElasticIndexDefinition indexDefinitionMock;
+
+    @Mock
+    private NodeState definitionNodeStateMock;
+
+    @Mock
+    private ElasticConnection elasticConnectionMock;
+
+    @Mock
+    private NodeBuilder definitionBuilder;
+
+    @Mock
+    private CommitInfo commitInfo;
+
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
+        
when(indexDefinitionMock.getDefinitionNodeState()).thenReturn(definitionNodeStateMock);
+        when(commitInfo.getInfo()).thenReturn(Collections.emptyMap());
+    }
+
+    @Test
+    public void defaultMode() {
+        
when(definitionNodeStateMock.getProperty(eq("async"))).thenReturn(null);
+
+        ElasticBulkProcessorHandler bulkProcessorHandler = 
ElasticBulkProcessorHandler
+                .getBulkProcessorHandler(elasticConnectionMock, 
indexDefinitionMock, definitionBuilder, commitInfo);
+
+        assertThat(bulkProcessorHandler, 
instanceOf(ElasticBulkProcessorHandler.class));
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void multiSyncModes() {
+        
when(definitionNodeStateMock.getProperty(eq("async"))).thenReturn(null);
+        when(definitionNodeStateMock.getProperty(eq("sync-mode")))
+                .thenReturn(new MultiStringPropertyState("sync-mode", 
Arrays.asList("nrt", "rt")));
+
+        ElasticBulkProcessorHandler
+                .getBulkProcessorHandler(elasticConnectionMock, 
indexDefinitionMock, definitionBuilder, commitInfo);
+    }
+
+    @Test
+    public void rtMode() {
+        
when(definitionNodeStateMock.getProperty(eq("async"))).thenReturn(null);
+        when(definitionNodeStateMock.getProperty(eq("sync-mode")))
+                .thenReturn(new StringPropertyState("sync-mode", "rt"));
+
+        ElasticBulkProcessorHandler bulkProcessorHandler = 
ElasticBulkProcessorHandler
+                .getBulkProcessorHandler(elasticConnectionMock, 
indexDefinitionMock, definitionBuilder, commitInfo);
+
+        assertThat(bulkProcessorHandler, 
instanceOf(ElasticBulkProcessorHandler.RealTimeBulkProcessorHandler.class));
+    }
+
+    @Test
+    public void defaultModeWithCommitInfoOverride() {
+        
when(definitionNodeStateMock.getProperty(eq("async"))).thenReturn(null);
+        
when(commitInfo.getInfo()).thenReturn(Collections.singletonMap("sync-mode", 
"rt"));
+
+        ElasticBulkProcessorHandler bulkProcessorHandler = 
ElasticBulkProcessorHandler
+                .getBulkProcessorHandler(elasticConnectionMock, 
indexDefinitionMock, definitionBuilder, commitInfo);
+
+        assertThat(bulkProcessorHandler, 
instanceOf(ElasticBulkProcessorHandler.RealTimeBulkProcessorHandler.class));
+    }
+}

Modified: 
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterTest.java?rev=1879989&r1=1879988&r2=1879989&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterTest.java
 Fri Jul 17 08:28:19 2020
@@ -18,8 +18,7 @@ package org.apache.jackrabbit.oak.plugin
 
 import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
 import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
-import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
-import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.action.index.IndexRequest;
 import org.junit.Before;
@@ -46,10 +45,7 @@ public class ElasticIndexWriterTest {
     private ElasticIndexDefinition indexDefinitionMock;
 
     @Mock
-    private BulkProcessor bulkProcessorMock;
-
-    @Mock
-    private NodeBuilder definitionBuilder;
+    private ElasticBulkProcessorHandler bulkProcessorHandlerMock;
 
     private ElasticIndexWriter indexWriter;
 
@@ -57,7 +53,7 @@ public class ElasticIndexWriterTest {
     public void setUp() {
         MockitoAnnotations.initMocks(this);
         
when(indexDefinitionMock.getRemoteIndexAlias()).thenReturn("test-index");
-        indexWriter = new ElasticIndexWriter(elasticConnectionMock, 
indexDefinitionMock, bulkProcessorMock, definitionBuilder);
+        indexWriter = new ElasticIndexWriter(elasticConnectionMock, 
indexDefinitionMock, bulkProcessorHandlerMock);
     }
 
     @Test
@@ -65,7 +61,7 @@ public class ElasticIndexWriterTest {
         indexWriter.updateDocument("/foo", new ElasticDocument("/foo"));
 
         ArgumentCaptor<IndexRequest> acIndexRequest = 
ArgumentCaptor.forClass(IndexRequest.class);
-        verify(bulkProcessorMock).add(acIndexRequest.capture());
+        verify(bulkProcessorHandlerMock).add(acIndexRequest.capture());
 
         IndexRequest request = acIndexRequest.getValue();
         assertEquals("test-index", request.index());
@@ -77,7 +73,7 @@ public class ElasticIndexWriterTest {
         indexWriter.deleteDocuments("/bar");
 
         ArgumentCaptor<DeleteRequest> acDeleteRequest = 
ArgumentCaptor.forClass(DeleteRequest.class);
-        verify(bulkProcessorMock).add(acDeleteRequest.capture());
+        verify(bulkProcessorHandlerMock).add(acDeleteRequest.capture());
 
         DeleteRequest request = acDeleteRequest.getValue();
         assertEquals("test-index", request.index());
@@ -91,10 +87,8 @@ public class ElasticIndexWriterTest {
         indexWriter.deleteDocuments("/foo");
         indexWriter.deleteDocuments("/bar");
 
-        ArgumentCaptor<IndexRequest> acIndexRequest = 
ArgumentCaptor.forClass(IndexRequest.class);
-        verify(bulkProcessorMock, times(2)).add(acIndexRequest.capture());
-        ArgumentCaptor<DeleteRequest> acDeleteRequest = 
ArgumentCaptor.forClass(DeleteRequest.class);
-        verify(bulkProcessorMock, times(2)).add(acDeleteRequest.capture());
+        ArgumentCaptor<DocWriteRequest<?>> request = 
ArgumentCaptor.forClass(DocWriteRequest.class);
+        verify(bulkProcessorHandlerMock, times(4)).add(request.capture());
     }
 
     @Test
@@ -104,7 +98,7 @@ public class ElasticIndexWriterTest {
         indexWriter.updateDocument(generatedPath, new 
ElasticDocument(generatedPath));
 
         ArgumentCaptor<IndexRequest> acIndexRequest = 
ArgumentCaptor.forClass(IndexRequest.class);
-        verify(bulkProcessorMock).add(acIndexRequest.capture());
+        verify(bulkProcessorHandlerMock).add(acIndexRequest.capture());
 
         IndexRequest request = acIndexRequest.getValue();
         assertThat(request.id(), not(generatedPath));
@@ -114,7 +108,7 @@ public class ElasticIndexWriterTest {
     @Test
     public void closeBulkProcessor() {
         indexWriter.close(System.currentTimeMillis());
-        verify(bulkProcessorMock).close();
+        verify(bulkProcessorHandlerMock).close();
     }
 
 }


Reply via email to