Author: fortino
Date: Fri Jul 17 08:28:19 2020
New Revision: 1879989
URL: http://svn.apache.org/viewvc?rev=1879989&view=rev
Log:
OAK-9133: sync updates with RT support
Added:
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandlerTest.java
Modified:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexWriterFactory.java
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexWriterFactory.java
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/DefaultIndexWriterFactory.java
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditor2Test.java
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/MultiplexingLucenePropertyIndexTest.java
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/reader/DefaultIndexReaderFactoryTest.java
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/MultiplexingIndexWriterTest.java
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/LuceneIndexerProvider.java
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriter.java
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterFactory.java
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticFacetTest.java
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticOrderByTest.java
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterTest.java
jackrabbit/oak/trunk/oak-search/src/main/java/org/apache/jackrabbit/oak/plugins/index/search/spi/editor/FulltextIndexEditorContext.java
jackrabbit/oak/trunk/oak-search/src/main/java/org/apache/jackrabbit/oak/plugins/index/search/spi/editor/FulltextIndexWriterFactory.java
Modified:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexWriterFactory.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexWriterFactory.java?rev=1879989&r1=1879988&r2=1879989&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexWriterFactory.java
(original)
+++
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexWriterFactory.java
Fri Jul 17 08:28:19 2020
@@ -22,9 +22,11 @@ package org.apache.jackrabbit.oak.plugin
import org.apache.jackrabbit.oak.plugins.index.lucene.writer.LuceneIndexWriter;
import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
import
org.apache.jackrabbit.oak.plugins.index.search.spi.editor.FulltextIndexWriterFactory;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.lucene.index.IndexableField;
-public interface LuceneIndexWriterFactory extends FulltextIndexWriterFactory {
+public interface LuceneIndexWriterFactory extends
FulltextIndexWriterFactory<Iterable<? extends IndexableField>> {
@Override
- LuceneIndexWriter newInstance(IndexDefinition definition, NodeBuilder
definitionBuilder, boolean reindex);
+ LuceneIndexWriter newInstance(IndexDefinition definition, NodeBuilder
definitionBuilder, CommitInfo commitInfo, boolean reindex);
}
Modified:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexWriterFactory.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexWriterFactory.java?rev=1879989&r1=1879988&r2=1879989&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexWriterFactory.java
(original)
+++
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexWriterFactory.java
Fri Jul 17 08:28:19 2020
@@ -24,10 +24,11 @@ import java.io.IOException;
import org.apache.jackrabbit.oak.plugins.index.lucene.writer.LuceneIndexWriter;
import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
import
org.apache.jackrabbit.oak.plugins.index.search.spi.editor.FulltextIndexWriterFactory;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.lucene.index.IndexableField;
-public class LocalIndexWriterFactory implements FulltextIndexWriterFactory {
+public class LocalIndexWriterFactory implements
FulltextIndexWriterFactory<Iterable<? extends IndexableField>> {
private final LuceneDocumentHolder documentHolder;
private final String indexPath;
@@ -37,7 +38,8 @@ public class LocalIndexWriterFactory imp
}
@Override
- public LuceneIndexWriter newInstance(IndexDefinition definition,
NodeBuilder definitionBuilder, boolean reindex) {
+ public LuceneIndexWriter newInstance(IndexDefinition definition,
NodeBuilder definitionBuilder,
+ CommitInfo commitInfo, boolean
reindex) {
return new LocalIndexWriter(definition);
}
Modified:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/DefaultIndexWriterFactory.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/DefaultIndexWriterFactory.java?rev=1879989&r1=1879988&r2=1879989&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/DefaultIndexWriterFactory.java
(original)
+++
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/DefaultIndexWriterFactory.java
Fri Jul 17 08:28:19 2020
@@ -26,6 +26,7 @@ import org.apache.jackrabbit.oak.plugins
import
org.apache.jackrabbit.oak.plugins.index.lucene.directory.DirectoryFactory;
import org.apache.jackrabbit.oak.plugins.index.search.FulltextIndexConstants;
import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.mount.MountInfoProvider;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
@@ -44,8 +45,8 @@ public class DefaultIndexWriterFactory i
}
@Override
- public LuceneIndexWriter newInstance(IndexDefinition def,
- NodeBuilder definitionBuilder,
boolean reindex) {
+ public LuceneIndexWriter newInstance(IndexDefinition def, NodeBuilder
definitionBuilder,
+ CommitInfo commitInfo, boolean
reindex) {
Preconditions.checkArgument(def instanceof LuceneIndexDefinition,
"Expected {} but found {} for index definition",
LuceneIndexDefinition.class, def.getClass());
@@ -54,7 +55,7 @@ public class DefaultIndexWriterFactory i
if (mountInfoProvider.hasNonDefaultMounts()){
return new MultiplexingIndexWriter(directoryFactory,
mountInfoProvider, definition,
- definitionBuilder, reindex, writerConfig);
+ definitionBuilder, reindex, writerConfig);
}
return new DefaultIndexWriter(definition, definitionBuilder,
directoryFactory,
FulltextIndexConstants.INDEX_DATA_CHILD_NAME,
Modified:
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditor2Test.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditor2Test.java?rev=1879989&r1=1879988&r2=1879989&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditor2Test.java
(original)
+++
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditor2Test.java
Fri Jul 17 08:28:19 2020
@@ -281,10 +281,10 @@ public class LuceneIndexEditor2Test {
}
- private class TestWriterFactory implements FulltextIndexWriterFactory {
+ private class TestWriterFactory implements
FulltextIndexWriterFactory<Iterable<? extends IndexableField>> {
@Override
- public LuceneIndexWriter newInstance(IndexDefinition definition,
- NodeBuilder definitionBuilder,
boolean reindex) {
+ public LuceneIndexWriter newInstance(IndexDefinition definition,
NodeBuilder definitionBuilder,
+ CommitInfo commitInfo, boolean
reindex) {
return writer;
}
}
Modified:
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/MultiplexingLucenePropertyIndexTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/MultiplexingLucenePropertyIndexTest.java?rev=1879989&r1=1879988&r2=1879989&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/MultiplexingLucenePropertyIndexTest.java
(original)
+++
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/MultiplexingLucenePropertyIndexTest.java
Fri Jul 17 08:28:19 2020
@@ -95,11 +95,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import static com.google.common.collect.ImmutableList.of;
-import static
org.apache.jackrabbit.oak.plugins.memory.PropertyStates.createProperty;
import static org.apache.jackrabbit.oak.InitialContentHelper.INITIAL_CONTENT;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
public class MultiplexingLucenePropertyIndexTest extends AbstractQueryTest {
private ExecutorService executorService = Executors.newFixedThreadPool(2);
@@ -110,7 +106,7 @@ public class MultiplexingLucenePropertyI
private NodeState initialContent = INITIAL_CONTENT;
private NodeBuilder builder = EMPTY_NODE.builder();
private MountInfoProvider mip = Mounts.newBuilder()
- .mount("foo", "/libs", "/apps").build();
+ .mount("foo", "/libs", "/apps").build();
private NodeStore nodeStore;
@Override
@@ -122,20 +118,20 @@ public class MultiplexingLucenePropertyI
throw new RuntimeException(e);
}
LuceneIndexEditorProvider editorProvider = new
LuceneIndexEditorProvider(copier,
- new ExtractedTextCache(10*FileUtils.ONE_MB, 100),
- null,
- mip);
+ new ExtractedTextCache(10*FileUtils.ONE_MB, 100),
+ null,
+ mip);
LuceneIndexProvider provider = new LuceneIndexProvider(new
IndexTracker(new DefaultIndexReaderFactory(mip, copier)));
nodeStore = new MemoryNodeStore();
return new Oak(nodeStore)
- .with(new InitialContent())
- .with(new OpenSecurityProvider())
- .with((QueryIndexProvider) provider)
- .with((Observer) provider)
- .with(editorProvider)
- .with(new PropertyIndexEditorProvider())
- .with(new NodeTypeIndexProvider())
- .createContentRepository();
+ .with(new InitialContent())
+ .with(new OpenSecurityProvider())
+ .with((QueryIndexProvider) provider)
+ .with((Observer) provider)
+ .with(editorProvider)
+ .with(new PropertyIndexEditorProvider())
+ .with(new NodeTypeIndexProvider())
+ .createContentRepository();
}
// OAK-8001
@@ -153,7 +149,7 @@ public class MultiplexingLucenePropertyI
DefaultIndexWriterFactory factory = new DefaultIndexWriterFactory(mip,
new DefaultDirectoryFactory(null, null),
new LuceneIndexWriterConfig());
- LuceneIndexWriter writer = factory.newInstance(defn, builder, true);
+ LuceneIndexWriter writer = factory.newInstance(defn, builder, null,
true);
writer.close(0);
//2. Construct the readers
@@ -174,7 +170,7 @@ public class MultiplexingLucenePropertyI
//1. Have 2 reader created by writes in 2 diff mounts
DirectoryFactory directoryFactory = new DefaultDirectoryFactory(null,
null);
DefaultIndexWriterFactory factory = new DefaultIndexWriterFactory(mip,
directoryFactory, new LuceneIndexWriterConfig());
- LuceneIndexWriter writer = factory.newInstance(defn, builder, true);
+ LuceneIndexWriter writer = factory.newInstance(defn, builder, null,
true);
Document doc = newDoc("/content/en");
doc.add(new StringField("foo", "bar", Field.Store.NO));
@@ -270,7 +266,7 @@ public class MultiplexingLucenePropertyI
root.commit();
assertOrderedQuery("select [jcr:path] from [nt:base] where [bar] =
'baz' order by [foo] asc, [baz] desc",
- LucenePropertyIndexTest.getSortedPaths(tuples));
+ LucenePropertyIndexTest.getSortedPaths(tuples));
}
private List<String> getIndexDirNames(String indexName){
Modified:
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/reader/DefaultIndexReaderFactoryTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/reader/DefaultIndexReaderFactoryTest.java?rev=1879989&r1=1879988&r2=1879989&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/reader/DefaultIndexReaderFactoryTest.java
(original)
+++
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/reader/DefaultIndexReaderFactoryTest.java
Fri Jul 17 08:28:19 2020
@@ -35,14 +35,12 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.index.lucene.writer.LuceneIndexWriter;
import
org.apache.jackrabbit.oak.plugins.index.lucene.writer.LuceneIndexWriterConfig;
import org.apache.jackrabbit.oak.plugins.index.search.FieldNames;
-import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
import org.apache.jackrabbit.oak.spi.mount.MountInfoProvider;
import org.apache.jackrabbit.oak.spi.mount.Mounts;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.StringField;
-import org.apache.lucene.index.IndexReader;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -73,7 +71,7 @@ public class DefaultIndexReaderFactoryTe
@Test
public void indexDir() throws Exception{
LuceneIndexWriterFactory factory = newDirectoryFactory();
- LuceneIndexWriter writer = factory.newInstance(defn, builder, true);
+ LuceneIndexWriter writer = factory.newInstance(defn, builder, null,
true);
writer.updateDocument("/content/en", newDoc("/content/en"));
writer.close(0);
@@ -90,12 +88,7 @@ public class DefaultIndexReaderFactoryTe
assertEquals(1, reader.getReader().numDocs());
final AtomicBoolean closed = new AtomicBoolean();
- reader.getReader().addReaderClosedListener(new
IndexReader.ReaderClosedListener() {
- @Override
- public void onClose(IndexReader reader) {
- closed.set(true);
- }
- });
+ reader.getReader().addReaderClosedListener(reader1 ->
closed.set(true));
reader.close();
@@ -106,12 +99,12 @@ public class DefaultIndexReaderFactoryTe
public void indexDirWithBlobStore() throws Exception {
/* Register a blob store */
CachingFileDataStore ds = DataStoreUtils
- .createCachingFDS(folder.newFolder().getAbsolutePath(),
- folder.newFolder().getAbsolutePath());
+ .createCachingFDS(folder.newFolder().getAbsolutePath(),
+ folder.newFolder().getAbsolutePath());
DirectoryFactory directoryFactory = new DefaultDirectoryFactory(null,
new DataStoreBlobStore(ds));
LuceneIndexWriterFactory factory = new DefaultIndexWriterFactory(mip,
directoryFactory, writerConfig);
- LuceneIndexWriter writer = factory.newInstance(defn, builder, true);
+ LuceneIndexWriter writer = factory.newInstance(defn, builder, null,
true);
writer.updateDocument("/content/en", newDoc("/content/en"));
writer.close(0);
@@ -128,12 +121,7 @@ public class DefaultIndexReaderFactoryTe
assertEquals(1, reader.getReader().numDocs());
final AtomicBoolean closed = new AtomicBoolean();
- reader.getReader().addReaderClosedListener(new
IndexReader.ReaderClosedListener() {
- @Override
- public void onClose(IndexReader reader) {
- closed.set(true);
- }
- });
+ reader.getReader().addReaderClosedListener(reader1 ->
closed.set(true));
reader.close();
@@ -145,7 +133,7 @@ public class DefaultIndexReaderFactoryTe
LuceneIndexWriterFactory factory = newDirectoryFactory();
enabledSuggestorForSomeProp();
defn = new LuceneIndexDefinition(root, builder.getNodeState(), "/foo");
- LuceneIndexWriter writer = factory.newInstance(defn, builder, true);
+ LuceneIndexWriter writer = factory.newInstance(defn, builder, null,
true);
Document doc = newDoc("/content/en");
doc.add(new StringField(FieldNames.SUGGEST, "test", null));
@@ -163,7 +151,7 @@ public class DefaultIndexReaderFactoryTe
@Test
public void multipleReaders() throws Exception{
LuceneIndexWriterFactory factory = newDirectoryFactory();
- LuceneIndexWriter writer = factory.newInstance(defn, builder, true);
+ LuceneIndexWriter writer = factory.newInstance(defn, builder, null,
true);
writer.updateDocument("/content/en", newDoc("/content/en"));
writer.updateDocument("/libs/config", newDoc("/libs/config"));
@@ -179,7 +167,7 @@ public class DefaultIndexReaderFactoryTe
LuceneIndexWriterFactory factory = newDirectoryFactory();
enabledSuggestorForSomeProp();
defn = new LuceneIndexDefinition(root, builder.getNodeState(), "/foo");
- LuceneIndexWriter writer = factory.newInstance(defn, builder, true);
+ LuceneIndexWriter writer = factory.newInstance(defn, builder, null,
true);
//Suggester field is only present for document in default mount
Document doc = newDoc("/content/en");
Modified:
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/MultiplexingIndexWriterTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/MultiplexingIndexWriterTest.java?rev=1879989&r1=1879988&r2=1879989&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/MultiplexingIndexWriterTest.java
(original)
+++
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/MultiplexingIndexWriterTest.java
Fri Jul 17 08:28:19 2020
@@ -80,14 +80,14 @@ public class MultiplexingIndexWriterTest
@Test
public void defaultWriterWithNoMounts() throws Exception{
LuceneIndexWriterFactory factory =
newDirectoryFactory(Mounts.defaultMountInfoProvider());
- LuceneIndexWriter writer = factory.newInstance(defn, builder, true);
+ LuceneIndexWriter writer = factory.newInstance(defn, builder, null,
true);
assertThat(writer, instanceOf(DefaultIndexWriter.class));
}
@Test
public void closeWithoutChange() throws Exception{
LuceneIndexWriterFactory factory = newDirectoryFactory();
- LuceneIndexWriter writer = factory.newInstance(defn, builder, true);
+ LuceneIndexWriter writer = factory.newInstance(defn, builder, null,
true);
assertTrue(writer.close(0));
assertEquals(2, Iterables.size(getIndexDirNodes()));
assertFalse(builder.hasChildNode(indexDirName(roMount)));
@@ -95,10 +95,10 @@ public class MultiplexingIndexWriterTest
assertEquals(0, numDocs(defaultMount));
// delete all dir nodes first
- getIndexDirNodes().stream().forEach(dirName ->
builder.getChildNode(dirName).remove());
+ getIndexDirNodes().forEach(dirName ->
builder.getChildNode(dirName).remove());
// empty index dir doesn't get created during normal indexing
- writer = factory.newInstance(defn, builder, false);
+ writer = factory.newInstance(defn, builder, null, false);
assertFalse(writer.close(0));
assertEquals(0, Iterables.size(getIndexDirNodes()));
}
@@ -106,7 +106,7 @@ public class MultiplexingIndexWriterTest
@Test
public void writesInDefaultMount() throws Exception{
LuceneIndexWriterFactory factory = newDirectoryFactory();
- LuceneIndexWriter writer = factory.newInstance(defn, builder, true);
+ LuceneIndexWriter writer = factory.newInstance(defn, builder, null,
true);
//1. Add entry in foo mount
@@ -118,7 +118,7 @@ public class MultiplexingIndexWriterTest
assertEquals(0, numDocs(defaultMount));
//2. Add entry in default mount
- writer = factory.newInstance(defn, builder, true);
+ writer = factory.newInstance(defn, builder, null, true);
writer.updateDocument("/content", newDoc("/content"));
writer.close(0);
@@ -128,7 +128,7 @@ public class MultiplexingIndexWriterTest
assertEquals(1, numDocs(defaultMount));
//3. Add another entry in foo mount without reindexing
- writer = factory.newInstance(defn, builder, false);
+ writer = factory.newInstance(defn, builder, null, false);
writer.updateDocument("/libs/config1", newDoc("/libs/config1"));
writer.close(0);
@@ -137,7 +137,7 @@ public class MultiplexingIndexWriterTest
assertEquals(1, numDocs(defaultMount));
//4. Add another entry in default mount without reindexing
- writer = factory.newInstance(defn, builder, false);
+ writer = factory.newInstance(defn, builder, null, false);
writer.updateDocument("/content1", newDoc("/content1"));
writer.close(0);
@@ -150,12 +150,12 @@ public class MultiplexingIndexWriterTest
@Test
public void writesInDefaultMountBlobStore() throws Exception {
CachingFileDataStore ds = DataStoreUtils
- .createCachingFDS(folder.newFolder().getAbsolutePath(),
- folder.newFolder().getAbsolutePath());
+ .createCachingFDS(folder.newFolder().getAbsolutePath(),
+ folder.newFolder().getAbsolutePath());
DirectoryFactory directoryFactory = new DefaultDirectoryFactory(null,
new DataStoreBlobStore(ds));
LuceneIndexWriterFactory factory = new DefaultIndexWriterFactory(mip,
directoryFactory, writerConfig);
- LuceneIndexWriter writer = factory.newInstance(defn, builder, true);
+ LuceneIndexWriter writer = factory.newInstance(defn, builder, null,
true);
//1. Add entry in foo mount
writer.updateDocument("/libs/config", newDoc("/libs/config"));
@@ -166,7 +166,7 @@ public class MultiplexingIndexWriterTest
assertEquals(0, numDocs(defaultMount));
//2. Add entry in default mount
- writer = factory.newInstance(defn, builder, true);
+ writer = factory.newInstance(defn, builder, null, true);
writer.updateDocument("/content", newDoc("/content"));
writer.close(0);
@@ -176,7 +176,7 @@ public class MultiplexingIndexWriterTest
assertEquals(1, numDocs(defaultMount));
//3. Add another entry in foo mount without reindexing
- writer = factory.newInstance(defn, builder, false);
+ writer = factory.newInstance(defn, builder, null, false);
writer.updateDocument("/libs/config1", newDoc("/libs/config1"));
writer.close(0);
@@ -185,7 +185,7 @@ public class MultiplexingIndexWriterTest
assertEquals(1, numDocs(defaultMount));
//4. Add another entry in default mount without reindexing
- writer = factory.newInstance(defn, builder, false);
+ writer = factory.newInstance(defn, builder, null, false);
writer.updateDocument("/content1", newDoc("/content1"));
writer.close(0);
@@ -198,7 +198,7 @@ public class MultiplexingIndexWriterTest
@Test
public void deletes() throws Exception{
LuceneIndexWriterFactory factory = newDirectoryFactory();
- LuceneIndexWriter writer = factory.newInstance(defn, builder, true);
+ LuceneIndexWriter writer = factory.newInstance(defn, builder, null,
true);
writer.updateDocument("/libs/config", newDoc("/libs/config"));
writer.updateDocument("/libs/install", newDoc("/libs/install"));
@@ -209,14 +209,14 @@ public class MultiplexingIndexWriterTest
assertEquals(2, numDocs(fooMount));
assertEquals(2, numDocs(defaultMount));
- writer = factory.newInstance(defn, builder, true);
+ writer = factory.newInstance(defn, builder, null, true);
writer.deleteDocuments("/libs/config");
writer.close(0);
assertEquals(1, numDocs(fooMount));
assertEquals(2, numDocs(defaultMount));
- writer = factory.newInstance(defn, builder, true);
+ writer = factory.newInstance(defn, builder, null, true);
writer.deleteDocuments("/content");
writer.close(0);
@@ -230,7 +230,7 @@ public class MultiplexingIndexWriterTest
.mount("foo", "/content/remote").build();
initializeMounts();
LuceneIndexWriterFactory factory = newDirectoryFactory();
- LuceneIndexWriter writer = factory.newInstance(defn, builder, true);
+ LuceneIndexWriter writer = factory.newInstance(defn, builder, null,
true);
writer.updateDocument("/content/remote/a",
newDoc("/content/remote/a"));
writer.updateDocument("/etc", newDoc("/etc"));
@@ -240,7 +240,7 @@ public class MultiplexingIndexWriterTest
assertEquals(1, numDocs(fooMount));
assertEquals(2, numDocs(defaultMount));
- writer = factory.newInstance(defn, builder, true);
+ writer = factory.newInstance(defn, builder, null, true);
writer.deleteDocuments("/content");
writer.close(0);
Modified:
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/LuceneIndexerProvider.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/LuceneIndexerProvider.java?rev=1879989&r1=1879988&r2=1879989&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/LuceneIndexerProvider.java
(original)
+++
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/LuceneIndexerProvider.java
Fri Jul 17 08:28:19 2020
@@ -33,7 +33,6 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.index.lucene.writer.LuceneIndexWriter;
import
org.apache.jackrabbit.oak.plugins.index.progress.IndexingProgressReporter;
import org.apache.jackrabbit.oak.plugins.index.search.ExtractedTextCache;
-import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
import
org.apache.jackrabbit.oak.plugins.index.search.spi.binary.FulltextBinaryTextExtractor;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
@@ -66,7 +65,7 @@ public class LuceneIndexerProvider imple
LuceneIndexDefinition idxDefinition =
LuceneIndexDefinition.newBuilder(root, definition.getNodeState(),
indexPath).reindex().build();
- LuceneIndexWriter indexWriter =
indexWriterFactory.newInstance(idxDefinition, definition, true);
+ LuceneIndexWriter indexWriter =
indexWriterFactory.newInstance(idxDefinition, definition, null, true);
FulltextBinaryTextExtractor textExtractor = new
FulltextBinaryTextExtractor(textCache, idxDefinition, true);
return new LuceneIndexer(
idxDefinition,
Added:
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java?rev=1879989&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
(added)
+++
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
Fri Jul 17 08:28:19 2020
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.elastic.index;
+
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
+import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
+import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.bulk.BackoffPolicy;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+
+import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+
+class ElasticBulkProcessorHandler {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ElasticBulkProcessorHandler.class);
+ private final int FAILED_DOC_COUNT_FOR_STATUS_NODE =
Integer.getInteger("oak.failedDocStatusLimit", 10000);
+
+ private static final String SYNC_MODE_PROPERTY = "sync-mode";
+ private static final String SYNC_RT_MODE = "rt";
+
+ protected final ElasticConnection elasticConnection;
+ protected final ElasticIndexDefinition indexDefinition;
+ private final NodeBuilder definitionBuilder;
+ protected final BulkProcessor bulkProcessor;
+
+ /**
+ * Coordinates communication between bulk processes. It has a main
controller registered at creation time and
+ * de-registered on {@link ElasticIndexWriter#close(long)}. Each bulk
request register a new party in
+ * this Phaser in {@link OakBulkProcessorListener#beforeBulk(long,
BulkRequest)} and de-register itself when
+ * the request returns.
+ */
+ private final Phaser phaser = new Phaser(1); // register main controller
+
+ /**
+ * Key-value structure to keep the history of bulk requests. Keys are the
bulk execution ids, the boolean
+ * value is {@code true} when at least an update is performed, otherwise
{@code false}.
+ */
+ private final ConcurrentHashMap<Long, Boolean> updatesMap = new
ConcurrentHashMap<>();
+
+ protected long totalOperations;
+
+ private ElasticBulkProcessorHandler(@NotNull ElasticConnection
elasticConnection,
+ @NotNull ElasticIndexDefinition
indexDefinition,
+ @NotNull NodeBuilder
definitionBuilder) {
+ this.elasticConnection = elasticConnection;
+ this.indexDefinition = indexDefinition;
+ this.definitionBuilder = definitionBuilder;
+ this.bulkProcessor = initBulkProcessor();
+ }
+
+ /**
+ * Returns an ElasticBulkProcessorHandler instance based on the index
definition configuration.
+ * <p>
+ * The `sync-mode` property can be set to `rt` (real-time). In this case
the returned handler will be real-time.
+ * This option is available for sync index definitions only.
+ */
+ public static ElasticBulkProcessorHandler getBulkProcessorHandler(@NotNull
ElasticConnection elasticConnection,
+ @NotNull
ElasticIndexDefinition indexDefinition,
+ @NotNull
NodeBuilder definitionBuilder, CommitInfo commitInfo) {
+ PropertyState async =
indexDefinition.getDefinitionNodeState().getProperty("async");
+
+ if (async != null) {
+ return new ElasticBulkProcessorHandler(elasticConnection,
indexDefinition, definitionBuilder);
+ }
+
+ // commit-info has priority over configuration in index definition
+ String syncMode = null;
+ if (commitInfo != null) {
+ syncMode = (String) commitInfo.getInfo().get(SYNC_MODE_PROPERTY);
+ }
+
+ if (syncMode == null) {
+ PropertyState syncModeProp =
indexDefinition.getDefinitionNodeState().getProperty("sync-mode");
+ if (syncModeProp != null) {
+ syncMode = syncModeProp.getValue(Type.STRING);
+ }
+ }
+
+ if (SYNC_RT_MODE.equals(syncMode)) {
+ return new RealTimeBulkProcessorHandler(elasticConnection,
indexDefinition, definitionBuilder);
+ }
+
+ return new ElasticBulkProcessorHandler(elasticConnection,
indexDefinition, definitionBuilder);
+ }
+
+ private BulkProcessor initBulkProcessor() {
+ return BulkProcessor.builder(requestConsumer(),
+ new OakBulkProcessorListener())
+ .setBulkActions(indexDefinition.bulkActions)
+ .setBulkSize(new ByteSizeValue(indexDefinition.bulkSizeBytes))
+
.setFlushInterval(TimeValue.timeValueMillis(indexDefinition.bulkFlushIntervalMs))
+ .setBackoffPolicy(BackoffPolicy.exponentialBackoff(
+
TimeValue.timeValueMillis(indexDefinition.bulkRetriesBackoff),
indexDefinition.bulkRetries)
+ )
+ .build();
+ }
+
+ protected BiConsumer<BulkRequest, ActionListener<BulkResponse>>
requestConsumer() {
+ return (request, bulkListener) ->
elasticConnection.getClient().bulkAsync(request, RequestOptions.DEFAULT,
bulkListener);
+ }
+
+ public void add(DocWriteRequest<?> request) {
+ bulkProcessor.add(request);
+ totalOperations++;
+ }
+
+ public boolean close() {
+ LOG.trace("Calling close on bulk processor {}", bulkProcessor);
+ bulkProcessor.close();
+ LOG.trace("Bulk Processor {} closed", bulkProcessor);
+
+ // de-register main controller
+ int phase = phaser.arriveAndDeregister();
+
+ if (totalOperations == 0) { // no need to invoke phaser await if we
already know there were no operations
+ LOG.debug("No operations executed in this processor. Close
immediately");
+ return false;
+ }
+
+ try {
+ phaser.awaitAdvanceInterruptibly(phase,
indexDefinition.bulkFlushIntervalMs * 5, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException | TimeoutException e) {
+ LOG.error("Error waiting for bulk requests to return", e);
+ }
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Bulk identifier -> update status = {}", updatesMap);
+ }
+ return updatesMap.containsValue(Boolean.TRUE);
+ }
+
+ private class OakBulkProcessorListener implements BulkProcessor.Listener {
+
+ @Override
+ public void beforeBulk(long executionId, BulkRequest bulkRequest) {
+ // register new bulk party
+ phaser.register();
+
+ // init update status
+ updatesMap.put(executionId, Boolean.FALSE);
+
+ LOG.debug("Sending bulk with id {} -> {}", executionId,
bulkRequest.getDescription());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Bulk Requests: \n{}", bulkRequest.requests()
+ .stream()
+ .map(DocWriteRequest::toString)
+ .collect(Collectors.joining("\n"))
+ );
+ }
+ }
+
+ @Override
+ public void afterBulk(long executionId, BulkRequest bulkRequest,
BulkResponse bulkResponse) {
+ LOG.debug("Bulk with id {} processed with status {} in {}",
executionId, bulkResponse.status(), bulkResponse.getTook());
+ if (LOG.isTraceEnabled()) {
+ try {
+
LOG.trace(Strings.toString(bulkResponse.toXContent(jsonBuilder(),
EMPTY_PARAMS)));
+ } catch (IOException e) {
+ LOG.error("Error decoding bulk response", e);
+ }
+ }
+ if (bulkResponse.hasFailures()) { // check if some operations
failed to execute
+ Set<String> failedDocSet = new LinkedHashSet<>();
+ NodeBuilder status =
definitionBuilder.child(IndexDefinition.STATUS_NODE);
+ // Read the current failed paths (if any) on the :status node
into failedDocList
+ if (status.hasProperty(IndexDefinition.FAILED_DOC_PATHS)) {
+ for (String str :
status.getProperty(IndexDefinition.FAILED_DOC_PATHS).getValue(Type.STRINGS)) {
+ failedDocSet.add(str);
+ }
+ }
+
+ int initialSize = failedDocSet.size();
+ boolean isFailedDocSetFull = false;
+
+ boolean hasSuccesses = false;
+ for (BulkItemResponse bulkItemResponse : bulkResponse) {
+ if (bulkItemResponse.isFailed()) {
+ BulkItemResponse.Failure failure =
bulkItemResponse.getFailure();
+ if (!isFailedDocSetFull && failedDocSet.size() <
FAILED_DOC_COUNT_FOR_STATUS_NODE) {
+ failedDocSet.add(bulkItemResponse.getId());
+ } else {
+ isFailedDocSetFull = true;
+ }
+ // Log entry to be used to parse logs to get the
failed doc id/path if needed
+ LOG.error("ElasticIndex Update Doc Failure: Error
while adding/updating doc with id : [{}]", bulkItemResponse.getId());
+ LOG.error("Failure Details: BulkItem ID: " +
failure.getId() + ", Failure Cause: {}", failure.getCause());
+ } else if (!hasSuccesses) {
+ // Set indexUpdated to true even if 1 item was updated
successfully
+ updatesMap.put(executionId, Boolean.TRUE);
+ hasSuccesses = true;
+ }
+ }
+
+ if (isFailedDocSetFull) {
+ LOG.info("Cannot store all new Failed Docs because {} has
been filled up. " +
+ "See previous log entries to find out the details
of failed paths", IndexDefinition.FAILED_DOC_PATHS);
+ } else if (failedDocSet.size() != initialSize) {
+ status.setProperty(IndexDefinition.FAILED_DOC_PATHS,
failedDocSet, Type.STRINGS);
+ }
+ } else {
+ updatesMap.put(executionId, Boolean.TRUE);
+ }
+ phaser.arriveAndDeregister();
+ }
+
+ @Override
+ public void afterBulk(long executionId, BulkRequest bulkRequest,
Throwable throwable) {
+ LOG.error("ElasticIndex Update Bulk Failure : Bulk with id {}
threw an error", executionId, throwable);
+ phaser.arriveAndDeregister();
+ }
+ }
+
+ /**
+ * {@link ElasticBulkProcessorHandler} extension with real time behaviour.
+ * It also uses the same async bulk processor as the parent except for the
last flush that waits until the
+ * indexed documents are searchable.
+ */
+ protected static class RealTimeBulkProcessorHandler extends
ElasticBulkProcessorHandler {
+
+ private final AtomicBoolean isClosed = new AtomicBoolean(false);
+ private final AtomicBoolean isDataSearchable = new
AtomicBoolean(false);
+
+ private RealTimeBulkProcessorHandler(@NotNull ElasticConnection
elasticConnection,
+ @NotNull ElasticIndexDefinition
indexDefinition,
+ @NotNull NodeBuilder
definitionBuilder) {
+ super(elasticConnection, indexDefinition, definitionBuilder);
+ }
+
+ @Override
+ protected BiConsumer<BulkRequest, ActionListener<BulkResponse>>
requestConsumer() {
+ return (request, bulkListener) -> {
+ if (isClosed.get()) {
+ LOG.debug("Processor is closing. Next request with {}
actions will block until the data is searchable",
+ request.requests().size());
+
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
+ isDataSearchable.set(true);
+ }
+ elasticConnection.getClient().bulkAsync(request,
RequestOptions.DEFAULT, bulkListener);
+ };
+ }
+
+ @Override
+ public boolean close() {
+ isClosed.set(true);
+ // calling super closes the bulk processor. If not empty it calls
#requestConsumer for the last time
+ boolean closed = super.close();
+ // it could happen that close gets called when the bulk has
already been flushed. In these cases we trigger
+ // an actual refresh to make sure the docs are searchable before
returning from the method
+ if (totalOperations > 0 && !isDataSearchable.get()) {
+ LOG.debug("Forcing refresh");
+ try {
+ this.elasticConnection.getClient()
+ .indices()
+ .refresh(new
RefreshRequest(indexDefinition.getRemoteIndexAlias()), RequestOptions.DEFAULT);
+ } catch (IOException e) {
+ LOG.warn("Error refreshing index " +
indexDefinition.getRemoteIndexAlias(), e);
+ }
+ }
+ return closed;
+ }
+ }
+}
Modified:
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriter.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriter.java?rev=1879989&r1=1879988&r2=1879989&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriter.java
(original)
+++
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriter.java
Fri Jul 17 08:28:19 2020
@@ -16,23 +16,16 @@
*/
package org.apache.jackrabbit.oak.plugins.index.elastic.index;
-import org.apache.jackrabbit.oak.api.Type;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
import org.apache.jackrabbit.oak.plugins.index.elastic.util.ElasticIndexUtils;
-import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
import
org.apache.jackrabbit.oak.plugins.index.search.spi.editor.FulltextIndexWriter;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.elasticsearch.ElasticsearchStatusException;
-import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
-import org.elasticsearch.action.bulk.BackoffPolicy;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
@@ -44,8 +37,6 @@ import org.elasticsearch.client.indices.
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.common.Strings;
-import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.TestOnly;
@@ -53,14 +44,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.HashSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Phaser;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.Map;
import java.util.Set;
-import java.util.stream.Collectors;
import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@@ -70,55 +55,26 @@ class ElasticIndexWriter implements Full
private final ElasticConnection elasticConnection;
private final ElasticIndexDefinition indexDefinition;
- private final NodeBuilder definitionBuilder;
- private final int FAILED_DOC_COUNT_FOR_STATUS_NODE =
Integer.getInteger("oak.failedDocStatusLimit", 10000);
- /**
- * Coordinates communication between bulk processes. It has a main
controller registered at creation time and
- * de-registered on {@link ElasticIndexWriter#close(long)}. Each bulk
request register a new party in
- * this Phaser in {@link OakBulkProcessorListener#beforeBulk(long,
BulkRequest)} and de-register itself when
- * the request returns.
- */
- private final Phaser phaser = new Phaser(1); // register main controller
- /**
- * Key-value structure to keep the history of bulk requests. Keys are the
bulk execution ids, the boolean
- * value is {@code true} when at least an update is performed, otherwise
{@code false}.
- */
- private final ConcurrentHashMap<Long, Boolean> updatesMap = new
ConcurrentHashMap<>();
- private final Set<String> failedDocSet = new HashSet<>();
- private final BulkProcessor bulkProcessor;
+ private final ElasticBulkProcessorHandler bulkProcessorHandler;
ElasticIndexWriter(@NotNull ElasticConnection elasticConnection,
@NotNull ElasticIndexDefinition indexDefinition,
- @NotNull NodeBuilder definitionBuilder) {
+ @NotNull NodeBuilder definitionBuilder,
+ CommitInfo commitInfo) {
this.elasticConnection = elasticConnection;
this.indexDefinition = indexDefinition;
- this.definitionBuilder = definitionBuilder;
- bulkProcessor = initBulkProcessor();
+ this.bulkProcessorHandler = ElasticBulkProcessorHandler
+ .getBulkProcessorHandler(elasticConnection, indexDefinition,
definitionBuilder, commitInfo);
}
@TestOnly
ElasticIndexWriter(@NotNull ElasticConnection elasticConnection,
@NotNull ElasticIndexDefinition indexDefinition,
- @NotNull BulkProcessor bulkProcessor,
- @NotNull NodeBuilder definitionBuilder) {
+ @NotNull ElasticBulkProcessorHandler
bulkProcessorHandler) {
this.elasticConnection = elasticConnection;
this.indexDefinition = indexDefinition;
- this.bulkProcessor = bulkProcessor;
- this.definitionBuilder = definitionBuilder;
- }
-
- private BulkProcessor initBulkProcessor() {
- return BulkProcessor.builder((request, bulkListener) ->
- elasticConnection.getClient().bulkAsync(request,
RequestOptions.DEFAULT, bulkListener),
- new OakBulkProcessorListener())
- .setBulkActions(indexDefinition.bulkActions)
- .setBulkSize(new ByteSizeValue(indexDefinition.bulkSizeBytes))
-
.setFlushInterval(TimeValue.timeValueMillis(indexDefinition.bulkFlushIntervalMs))
- .setBackoffPolicy(BackoffPolicy.exponentialBackoff(
-
TimeValue.timeValueMillis(indexDefinition.bulkRetriesBackoff),
indexDefinition.bulkRetries)
- )
- .build();
+ this.bulkProcessorHandler = bulkProcessorHandler;
}
@Override
@@ -126,35 +82,19 @@ class ElasticIndexWriter implements Full
IndexRequest request = new
IndexRequest(indexDefinition.getRemoteIndexAlias())
.id(ElasticIndexUtils.idFromPath(path))
.source(doc.build(), XContentType.JSON);
- bulkProcessor.add(request);
+ bulkProcessorHandler.add(request);
}
@Override
public void deleteDocuments(String path) {
DeleteRequest request = new
DeleteRequest(indexDefinition.getRemoteIndexAlias())
.id(ElasticIndexUtils.idFromPath(path));
- bulkProcessor.add(request);
+ bulkProcessorHandler.add(request);
}
@Override
public boolean close(long timestamp) {
- LOG.trace("Calling close on bulk processor {}", bulkProcessor);
- bulkProcessor.close();
- LOG.trace("Bulk Processor {} closed", bulkProcessor);
-
- // de-register main controller
- final int phase = phaser.arriveAndDeregister();
-
- try {
- phaser.awaitAdvanceInterruptibly(phase,
indexDefinition.bulkFlushIntervalMs * 5, TimeUnit.MILLISECONDS);
- } catch (InterruptedException | TimeoutException e) {
- LOG.error("Error waiting for bulk requests to return", e);
- }
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("Bulk identifier -> update status = {}", updatesMap);
- }
- return updatesMap.containsValue(Boolean.TRUE);
+ return bulkProcessorHandler.close();
}
protected void provisionIndex() throws IOException {
@@ -231,77 +171,4 @@ class ElasticIndexWriter implements Full
checkResponseAcknowledgement(deleteIndexResponse, "Delete index call
not acknowledged for indices " + indices);
LOG.info("Deleted indices {}. Response acknowledged: {}",
indices.toString(), deleteIndexResponse.isAcknowledged());
}
-
- private class OakBulkProcessorListener implements BulkProcessor.Listener {
-
- @Override
- public void beforeBulk(long executionId, BulkRequest bulkRequest) {
- // register new bulk party
- phaser.register();
- // init update status
- updatesMap.put(executionId, Boolean.FALSE);
-
- LOG.debug("Sending bulk with id {} -> {}", executionId,
bulkRequest.getDescription());
- if (LOG.isTraceEnabled()) {
- LOG.trace("Bulk Requests: \n{}", bulkRequest.requests()
- .stream()
- .map(DocWriteRequest::toString)
- .collect(Collectors.joining("\n"))
- );
- }
- }
-
- @Override
- public void afterBulk(long executionId, BulkRequest bulkRequest,
BulkResponse bulkResponse) {
- LOG.debug("Bulk with id {} processed with status {} in {}",
executionId, bulkResponse.status(), bulkResponse.getTook());
- if (LOG.isTraceEnabled()) {
- try {
-
LOG.trace(Strings.toString(bulkResponse.toXContent(jsonBuilder(),
EMPTY_PARAMS)));
- } catch (IOException e) {
- LOG.error("Error decoding bulk response", e);
- }
- }
- if (bulkResponse.hasFailures()) { // check if some operations
failed to execute
-
- NodeBuilder status =
definitionBuilder.child(IndexDefinition.STATUS_NODE);
- // Read the current failed paths (if any) on the :status node
into failedDocList
- if (status.hasProperty(IndexDefinition.FAILED_DOC_PATHS)) {
- for (String str :
status.getProperty(IndexDefinition.FAILED_DOC_PATHS).getValue(Type.STRINGS)) {
- failedDocSet.add(str);
- }
- }
-
- for (BulkItemResponse bulkItemResponse : bulkResponse) {
- if (bulkItemResponse.isFailed()) {
- BulkItemResponse.Failure failure =
bulkItemResponse.getFailure();
- failedDocSet.add(bulkItemResponse.getId());
- // Log entry to be used to parse logs to get the
failed doc id/path if needed
- LOG.error("ElasticIndex Update Doc Failure: Error
while adding/updating doc with id : [{}]", bulkItemResponse.getId());
- LOG.error("Failure Details: BulkItem ID: " +
failure.getId() + ", Failure Cause: {}", failure.getCause());
- } else {
- // Set indexUpdated to true even if 1 item was updated
successfully
- updatesMap.put(executionId, Boolean.TRUE);
- }
- }
-
- if (failedDocSet.size() == FAILED_DOC_COUNT_FOR_STATUS_NODE) {
- LOG.info("Failed Docs count exceeds the persistence limit.
Will skip persisting paths of more failed docs." +
- "Failing docs should be mentioned above under
ElasticIndex Update Doc Failure");
- } else {
- status.setProperty(IndexDefinition.FAILED_DOC_PATHS,
failedDocSet, Type.STRINGS);
- }
-
- } else {
- updatesMap.put(executionId, Boolean.TRUE);
- }
- phaser.arriveAndDeregister();
- }
-
- @Override
- public void afterBulk(long executionId, BulkRequest bulkRequest,
Throwable throwable) {
- LOG.error("ElasticIndex Update Bulk Failure : Bulk with id {}
threw an error", executionId, throwable);
- phaser.arriveAndDeregister();
- }
- }
-
}
Modified:
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterFactory.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterFactory.java?rev=1879989&r1=1879988&r2=1879989&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterFactory.java
(original)
+++
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterFactory.java
Fri Jul 17 08:28:19 2020
@@ -20,6 +20,7 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
import
org.apache.jackrabbit.oak.plugins.index.search.spi.editor.FulltextIndexWriterFactory;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.jetbrains.annotations.NotNull;
@@ -31,11 +32,12 @@ class ElasticIndexWriterFactory implemen
}
@Override
- public ElasticIndexWriter newInstance(IndexDefinition definition,
NodeBuilder definitionBuilder, boolean reindex) {
+ public ElasticIndexWriter newInstance(IndexDefinition definition,
NodeBuilder definitionBuilder,
+ CommitInfo commitInfo, boolean
reindex) {
if (!(definition instanceof ElasticIndexDefinition)) {
throw new IllegalArgumentException("IndexDefinition must be of
type ElasticsearchIndexDefinition " +
"instead of " + definition.getClass().getName());
}
- return new ElasticIndexWriter(elasticConnection,
(ElasticIndexDefinition) definition, definitionBuilder);
+ return new ElasticIndexWriter(elasticConnection,
(ElasticIndexDefinition) definition, definitionBuilder, commitInfo);
}
}
Modified:
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticFacetTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticFacetTest.java?rev=1879989&r1=1879988&r2=1879989&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticFacetTest.java
(original)
+++
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticFacetTest.java
Fri Jul 17 08:28:19 2020
@@ -59,7 +59,6 @@ import java.util.stream.Collectors;
import static org.apache.jackrabbit.commons.JcrUtils.getOrCreateByPath;
import static org.apache.jackrabbit.oak.InitialContentHelper.INITIAL_CONTENT;
import static
org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME;
-import static
org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition.BULK_FLUSH_INTERVAL_MS_DEFAULT;
import static
org.apache.jackrabbit.oak.plugins.index.search.FulltextIndexConstants.FACETS;
import static
org.apache.jackrabbit.oak.plugins.index.search.FulltextIndexConstants.PROP_REFRESH_DEFN;
import static
org.apache.jackrabbit.oak.plugins.index.search.FulltextIndexConstants.PROP_SECURE_FACETS;
@@ -164,6 +163,7 @@ public class ElasticFacetTest {
IndexSkeleton indexSkeleton = new IndexSkeleton();
indexSkeleton.initialize();
indexSkeleton.indexDefinitionBuilder.noAsync();
+
indexSkeleton.indexDefinitionBuilder.getBuilderTree().setProperty("sync-mode",
"rt");
indexSkeleton.indexRule.property("cons").propertyIndex();
indexSkeleton.indexRule.property("foo").propertyIndex().getBuilderTree().setProperty(FACET_PROP,
true, Type.BOOLEAN);
indexSkeleton.indexRule.property("bar").propertyIndex().getBuilderTree().setProperty(FACET_PROP,
true, Type.BOOLEAN);
@@ -226,7 +226,7 @@ public class ElasticFacetTest {
@Test
public void secureFacets() throws Exception {
createDataset(NUM_LEAF_NODES_FOR_LARGE_DATASET);
- assertEventually(() -> assertEquals(actualAclLabelCount, getFacets()));
+ assertEquals(actualAclLabelCount, getFacets());
}
@Test
@@ -236,7 +236,7 @@ public class ElasticFacetTest {
inaccessibleChild.setProperty("cons", "val");
inaccessibleChild.setProperty("foo", "l4");
adminSession.save();
- assertEventually(() -> assertEquals(actualAclLabelCount, getFacets()));
+ assertEquals(actualAclLabelCount, getFacets());
}
@Test
@@ -246,7 +246,7 @@ public class ElasticFacetTest {
adminSession.save();
createDataset(NUM_LEAF_NODES_FOR_LARGE_DATASET);
- assertEventually(() -> assertEquals(actualLabelCount, getFacets()));
+ assertEquals(actualLabelCount, getFacets());
}
@Test
@@ -258,17 +258,15 @@ public class ElasticFacetTest {
createDataset(NUM_LEAF_NODES_FOR_LARGE_DATASET);
- assertEventually(() -> assertEquals("Unexpected number of facets",
actualAclLabelCount.size(), getFacets().size()));
+ assertEquals("Unexpected number of facets",
actualAclLabelCount.size(), getFacets().size());
for (Map.Entry<String, Integer> facet :
actualAclLabelCount.entrySet()) {
String facetLabel = facet.getKey();
- assertEventually(() -> {
- int facetCount = getFacets().get(facetLabel);
- float ratio = ((float) facetCount) / facet.getValue();
- assertTrue("Facet count for label: " + facetLabel + " is
outside of 10% margin of error. " +
- "Expected: " + facet.getValue() + "; Got: " +
facetCount + "; Ratio: " + ratio,
- Math.abs(ratio - 1) < 0.1);
- });
+ int facetCount = getFacets().get(facetLabel);
+ float ratio = ((float) facetCount) / facet.getValue();
+ assertTrue("Facet count for label: " + facetLabel + " is outside
of 10% margin of error. " +
+ "Expected: " + facet.getValue() + "; Got: " +
facetCount + "; Ratio: " + ratio,
+ Math.abs(ratio - 1) < 0.1);
}
}
@@ -281,11 +279,11 @@ public class ElasticFacetTest {
createDataset(NUM_LEAF_NODES_FOR_SMALL_DATASET);
- assertEventually(() -> assertEquals("Unexpected number of facets",
actualAclLabelCount.size(), getFacets().size()));
+ assertEquals("Unexpected number of facets",
actualAclLabelCount.size(), getFacets().size());
// Since the hit count is less than sample size -> flow should have
switched to secure facet count instead of statistical
// and thus the count should be exactly equal
- assertEventually(() -> assertEquals(actualAclLabelCount, getFacets()));
+ assertEquals(actualAclLabelCount, getFacets());
}
@Test
@@ -297,19 +295,17 @@ public class ElasticFacetTest {
createDataset(NUM_LEAF_NODES_FOR_LARGE_DATASET);
- assertEventually(() -> {
- Map<String, Integer> facets = getFacets("/parent/par1");
- assertEquals("Unexpected number of facets",
actualAclPar1LabelCount.size(), facets.size());
+ Map<String, Integer> facets = getFacets("/parent/par1");
+ assertEquals("Unexpected number of facets",
actualAclPar1LabelCount.size(), facets.size());
for (Map.Entry<String, Integer> facet :
actualAclPar1LabelCount.entrySet()) {
String facetLabel = facet.getKey();
int facetCount = facets.get(facetLabel);
float ratio = ((float) facetCount) / facet.getValue();
assertTrue("Facet count for label: " + facetLabel + " is outside
of 10% margin of error. " +
- "Expected: " + facet.getValue() + "; Got: " +
facetCount + "; Ratio: " + ratio,
- Math.abs(ratio - 1) < 0.1);
- }
- });
+ "Expected: " + facet.getValue() + "; Got: " +
facetCount + "; Ratio: " + ratio,
+ Math.abs(ratio - 1) < 0.1);
+ }
}
@Test
@@ -324,21 +320,19 @@ public class ElasticFacetTest {
inaccessibleChild.setProperty("cons", "val");
inaccessibleChild.setProperty("foo", "l4");
adminSession.save();
- assertEventually(() -> {
- Map<String, Integer> facets = getFacets();
- assertEquals("Unexpected number of facets",
actualAclLabelCount.size(), facets.size());
- });
+
+ Map<String, Integer> facets = getFacets();
+ assertEquals("Unexpected number of facets",
actualAclLabelCount.size(), facets.size());
for (Map.Entry<String, Integer> facet :
actualAclLabelCount.entrySet()) {
- assertEventually(() -> {
- String facetLabel = facet.getKey();
- int facetCount = getFacets().get(facetLabel);
- float ratio = ((float) facetCount) / facet.getValue();
- assertTrue("Facet count for label: " + facetLabel + " is
outside of 10% margin of error. " +
- "Expected: " + facet.getValue() + "; Got: " +
facetCount + "; Ratio: " + ratio,
- Math.abs(ratio - 1) < 0.1);
- });
+
+ String facetLabel = facet.getKey();
+ int facetCount = getFacets().get(facetLabel);
+ float ratio = ((float) facetCount) / facet.getValue();
+ assertTrue("Facet count for label: " + facetLabel + " is outside
of 10% margin of error. " +
+ "Expected: " + facet.getValue() + "; Got: " +
facetCount + "; Ratio: " + ratio,
+ Math.abs(ratio - 1) < 0.1);
}
}
@@ -350,7 +344,7 @@ public class ElasticFacetTest {
adminSession.save();
createDataset(NUM_LEAF_NODES_FOR_LARGE_DATASET);
qe = adminSession.getWorkspace().getQueryManager();
- assertEventually(() -> assertEquals(actualLabelCount, getFacets()));
+ assertEquals(actualLabelCount, getFacets());
}
@Test
@@ -361,20 +355,19 @@ public class ElasticFacetTest {
adminSession.save();
createDataset(NUM_LEAF_NODES_FOR_LARGE_DATASET);
qe = adminSession.getWorkspace().getQueryManager();
- assertEventually(() -> {
- Map<String, Integer> facets = getFacets();
- assertEquals("Unexpected number of facets",
actualLabelCount.size(), facets.size());
- });
+
+ Map<String, Integer> facets = getFacets();
+ assertEquals("Unexpected number of facets", actualLabelCount.size(),
facets.size());
for (Map.Entry<String, Integer> facet : actualLabelCount.entrySet()) {
- assertEventually(() -> {
- String facetLabel = facet.getKey();
- int facetCount = getFacets().get(facetLabel);
- float ratio = ((float) facetCount) / facet.getValue();
- assertTrue("Facet count for label: " + facetLabel + " is
outside of 5% margin of error. " +
- "Expected: " + facet.getValue() + "; Got: " +
facetCount + "; Ratio: " + ratio,
- Math.abs(ratio - 1) < 0.05);
- });
+
+ String facetLabel = facet.getKey();
+ int facetCount = getFacets().get(facetLabel);
+ float ratio = ((float) facetCount) / facet.getValue();
+ assertTrue("Facet count for label: " + facetLabel + " is outside
of 5% margin of error. " +
+ "Expected: " + facet.getValue() + "; Got: " +
facetCount + "; Ratio: " + ratio,
+ Math.abs(ratio - 1) < 0.05);
+
}
}
@@ -416,8 +409,4 @@ public class ElasticFacetTest {
.collect(Collectors.toMap(FacetResult.Facet::getLabel,
FacetResult.Facet::getCount));
}
- private static void assertEventually(Runnable r) {
- ElasticTestUtils.assertEventually(r, BULK_FLUSH_INTERVAL_MS_DEFAULT *
3);
- }
-
}
Modified:
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticOrderByTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticOrderByTest.java?rev=1879989&r1=1879988&r2=1879989&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticOrderByTest.java
(original)
+++
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticOrderByTest.java
Fri Jul 17 08:28:19 2020
@@ -22,6 +22,7 @@ import org.apache.jackrabbit.oak.query.A
import org.junit.Test;
import javax.jcr.PropertyType;
+import java.util.Collections;
import java.util.List;
import java.util.UUID;
@@ -38,15 +39,15 @@ public class ElasticOrderByTest extends
.analyzed();
setIndex(UUID.randomUUID().toString(), builder);
+ root.commit();
Tree test = root.getTree("/").addChild("test");
test.addChild("a").setProperty("foo", "hello");
test.addChild("b").setProperty("foo", "hello hello");
- root.commit();
+ root.commit(Collections.singletonMap("sync-mode", "rt"));
// results are sorted by score desc, node `b` returns first because it
has a higher score from a tf/idf perspective
- assertEventually(() -> assertOrderedQuery("select [jcr:path] from
[nt:base] where contains(foo, 'hello')",
- asList("/test/b", "/test/a")));
+ assertOrderedQuery("select [jcr:path] from [nt:base] where
contains(foo, 'hello')", asList("/test/b", "/test/a"));
}
@Test
@@ -61,15 +62,13 @@ public class ElasticOrderByTest extends
Tree test = root.getTree("/").addChild("test");
test.addChild("a").setProperty("foo", "hello");
test.addChild("b").setProperty("foo", "hello hello");
- root.commit();
+ root.commit(Collections.singletonMap("sync-mode", "rt"));
+
+ assertOrderedQuery("select [jcr:path] from [nt:base] where
contains(foo, 'hello') order by [jcr:score]",
+ asList("/test/a", "/test/b"));
- assertEventually(() -> {
- assertOrderedQuery("select [jcr:path] from [nt:base] where
contains(foo, 'hello') order by [jcr:score]",
- asList("/test/a", "/test/b"));
-
- assertOrderedQuery("select [jcr:path] from [nt:base] where
contains(foo, 'hello') order by [jcr:score] DESC",
- asList("/test/b", "/test/a"));
- });
+ assertOrderedQuery("select [jcr:path] from [nt:base] where
contains(foo, 'hello') order by [jcr:score] DESC",
+ asList("/test/b", "/test/a"));
}
@Test
@@ -82,13 +81,10 @@ public class ElasticOrderByTest extends
Tree test = root.getTree("/").addChild("test");
test.addChild("a").setProperty("foo", "aaaaaa");
test.addChild("b").setProperty("foo", "bbbbbb");
- root.commit();
-
- assertEventually(() -> {
- assertOrderedQuery("select [jcr:path] from [nt:base] order by
[jcr:path]", asList("/test/a", "/test/b"));
+ root.commit(Collections.singletonMap("sync-mode", "rt"));
- assertOrderedQuery("select [jcr:path] from [nt:base] order by
[jcr:path] DESC", asList("/test/b", "/test/a"));
- });
+ assertOrderedQuery("select [jcr:path] from [nt:base] order by
[jcr:path]", asList("/test/a", "/test/b"));
+ assertOrderedQuery("select [jcr:path] from [nt:base] order by
[jcr:path] DESC", asList("/test/b", "/test/a"));
}
@Test
@@ -101,13 +97,10 @@ public class ElasticOrderByTest extends
Tree test = root.getTree("/").addChild("test");
test.addChild("a").setProperty("foo", "zzzzzz");
test.addChild("b").setProperty("foo", "aaaaaa");
- root.commit();
-
- assertEventually(() -> {
- assertOrderedQuery("select [jcr:path] from [nt:base] order by
@foo", asList("/test/b", "/test/a"));
+ root.commit(Collections.singletonMap("sync-mode", "rt"));
- assertOrderedQuery("select [jcr:path] from [nt:base] order by @foo
DESC", asList("/test/a", "/test/b"));
- });
+ assertOrderedQuery("select [jcr:path] from [nt:base] order by @foo",
asList("/test/b", "/test/a"));
+ assertOrderedQuery("select [jcr:path] from [nt:base] order by @foo
DESC", asList("/test/a", "/test/b"));
}
@Test
@@ -120,16 +113,13 @@ public class ElasticOrderByTest extends
Tree test = root.getTree("/").addChild("test");
test.addChild("a").setProperty("foo", "bbbbb");
test.addChild("b").setProperty("foo", "hello aaaa");
- root.commit();
+ root.commit(Collections.singletonMap("sync-mode", "rt"));
// this test verifies we use the keyword multi field when an analyzed
properties is specified in order by
// http://www.technocratsid.com/string-sorting-in-elasticsearch/
- assertEventually(() -> {
- assertOrderedQuery("select [jcr:path] from [nt:base] order by
@foo", asList("/test/a", "/test/b"));
-
- assertOrderedQuery("select [jcr:path] from [nt:base] order by @foo
DESC", asList("/test/b", "/test/a"));
- });
+ assertOrderedQuery("select [jcr:path] from [nt:base] order by @foo",
asList("/test/a", "/test/b"));
+ assertOrderedQuery("select [jcr:path] from [nt:base] order by @foo
DESC", asList("/test/b", "/test/a"));
}
@Test
@@ -142,13 +132,10 @@ public class ElasticOrderByTest extends
Tree test = root.getTree("/").addChild("test");
test.addChild("a").setProperty("foo", "10");
test.addChild("b").setProperty("foo", "5");
- root.commit();
-
- assertEventually(() -> {
- assertOrderedQuery("select [jcr:path] from [nt:base] order by
@foo", asList("/test/b", "/test/a"));
+ root.commit(Collections.singletonMap("sync-mode", "rt"));
- assertOrderedQuery("select [jcr:path] from [nt:base] order by @foo
DESC", asList("/test/a", "/test/b"));
- });
+ assertOrderedQuery("select [jcr:path] from [nt:base] order by @foo",
asList("/test/b", "/test/a"));
+ assertOrderedQuery("select [jcr:path] from [nt:base] order by @foo
DESC", asList("/test/a", "/test/b"));
}
@Test
@@ -169,18 +156,16 @@ public class ElasticOrderByTest extends
Tree b = test.addChild("b");
b.setProperty("foo", "b");
b.setProperty("bar", "100");
- root.commit();
+ root.commit(Collections.singletonMap("sync-mode", "rt"));
+
+ assertOrderedQuery("select [jcr:path] from [nt:base] order by @foo,
@bar",
+ asList("/test/a1", "/test/a2", "/test/b"));
+
+ assertOrderedQuery("select [jcr:path] from [nt:base] order by @foo,
@bar DESC",
+ asList("/test/a2", "/test/a1", "/test/b"));
- assertEventually(() -> {
- assertOrderedQuery("select [jcr:path] from [nt:base] order by
@foo, @bar",
- asList("/test/a1", "/test/a2", "/test/b"));
-
- assertOrderedQuery("select [jcr:path] from [nt:base] order by
@foo, @bar DESC",
- asList("/test/a2", "/test/a1", "/test/b"));
-
- assertOrderedQuery("select [jcr:path] from [nt:base] order by @bar
DESC, @foo DESC",
- asList("/test/b", "/test/a2", "/test/a1"));
- });
+ assertOrderedQuery("select [jcr:path] from [nt:base] order by @bar
DESC, @foo DESC",
+ asList("/test/b", "/test/a2", "/test/a1"));
}
private void assertOrderedQuery(String sql, List<String> paths) {
Added:
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandlerTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandlerTest.java?rev=1879989&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandlerTest.java
(added)
+++
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandlerTest.java
Fri Jul 17 08:28:19 2020
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.elastic.index;
+
+import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
+import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
+import org.apache.jackrabbit.oak.plugins.memory.MultiStringPropertyState;
+import org.apache.jackrabbit.oak.plugins.memory.StringPropertyState;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.when;
+
+public class ElasticBulkProcessorHandlerTest {
+
+ @Mock
+ private ElasticIndexDefinition indexDefinitionMock;
+
+ @Mock
+ private NodeState definitionNodeStateMock;
+
+ @Mock
+ private ElasticConnection elasticConnectionMock;
+
+ @Mock
+ private NodeBuilder definitionBuilder;
+
+ @Mock
+ private CommitInfo commitInfo;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+
when(indexDefinitionMock.getDefinitionNodeState()).thenReturn(definitionNodeStateMock);
+ when(commitInfo.getInfo()).thenReturn(Collections.emptyMap());
+ }
+
+ @Test
+ public void defaultMode() {
+
when(definitionNodeStateMock.getProperty(eq("async"))).thenReturn(null);
+
+ ElasticBulkProcessorHandler bulkProcessorHandler =
ElasticBulkProcessorHandler
+ .getBulkProcessorHandler(elasticConnectionMock,
indexDefinitionMock, definitionBuilder, commitInfo);
+
+ assertThat(bulkProcessorHandler,
instanceOf(ElasticBulkProcessorHandler.class));
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void multiSyncModes() {
+
when(definitionNodeStateMock.getProperty(eq("async"))).thenReturn(null);
+ when(definitionNodeStateMock.getProperty(eq("sync-mode")))
+ .thenReturn(new MultiStringPropertyState("sync-mode",
Arrays.asList("nrt", "rt")));
+
+ ElasticBulkProcessorHandler
+ .getBulkProcessorHandler(elasticConnectionMock,
indexDefinitionMock, definitionBuilder, commitInfo);
+ }
+
+ @Test
+ public void rtMode() {
+
when(definitionNodeStateMock.getProperty(eq("async"))).thenReturn(null);
+ when(definitionNodeStateMock.getProperty(eq("sync-mode")))
+ .thenReturn(new StringPropertyState("sync-mode", "rt"));
+
+ ElasticBulkProcessorHandler bulkProcessorHandler =
ElasticBulkProcessorHandler
+ .getBulkProcessorHandler(elasticConnectionMock,
indexDefinitionMock, definitionBuilder, commitInfo);
+
+ assertThat(bulkProcessorHandler,
instanceOf(ElasticBulkProcessorHandler.RealTimeBulkProcessorHandler.class));
+ }
+
+ @Test
+ public void defaultModeWithCommitInfoOverride() {
+
when(definitionNodeStateMock.getProperty(eq("async"))).thenReturn(null);
+
when(commitInfo.getInfo()).thenReturn(Collections.singletonMap("sync-mode",
"rt"));
+
+ ElasticBulkProcessorHandler bulkProcessorHandler =
ElasticBulkProcessorHandler
+ .getBulkProcessorHandler(elasticConnectionMock,
indexDefinitionMock, definitionBuilder, commitInfo);
+
+ assertThat(bulkProcessorHandler,
instanceOf(ElasticBulkProcessorHandler.RealTimeBulkProcessorHandler.class));
+ }
+}
Modified:
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterTest.java?rev=1879989&r1=1879988&r2=1879989&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterTest.java
(original)
+++
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexWriterTest.java
Fri Jul 17 08:28:19 2020
@@ -18,8 +18,7 @@ package org.apache.jackrabbit.oak.plugin
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
-import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
-import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.junit.Before;
@@ -46,10 +45,7 @@ public class ElasticIndexWriterTest {
private ElasticIndexDefinition indexDefinitionMock;
@Mock
- private BulkProcessor bulkProcessorMock;
-
- @Mock
- private NodeBuilder definitionBuilder;
+ private ElasticBulkProcessorHandler bulkProcessorHandlerMock;
private ElasticIndexWriter indexWriter;
@@ -57,7 +53,7 @@ public class ElasticIndexWriterTest {
public void setUp() {
MockitoAnnotations.initMocks(this);
when(indexDefinitionMock.getRemoteIndexAlias()).thenReturn("test-index");
- indexWriter = new ElasticIndexWriter(elasticConnectionMock,
indexDefinitionMock, bulkProcessorMock, definitionBuilder);
+ indexWriter = new ElasticIndexWriter(elasticConnectionMock,
indexDefinitionMock, bulkProcessorHandlerMock);
}
@Test
@@ -65,7 +61,7 @@ public class ElasticIndexWriterTest {
indexWriter.updateDocument("/foo", new ElasticDocument("/foo"));
ArgumentCaptor<IndexRequest> acIndexRequest =
ArgumentCaptor.forClass(IndexRequest.class);
- verify(bulkProcessorMock).add(acIndexRequest.capture());
+ verify(bulkProcessorHandlerMock).add(acIndexRequest.capture());
IndexRequest request = acIndexRequest.getValue();
assertEquals("test-index", request.index());
@@ -77,7 +73,7 @@ public class ElasticIndexWriterTest {
indexWriter.deleteDocuments("/bar");
ArgumentCaptor<DeleteRequest> acDeleteRequest =
ArgumentCaptor.forClass(DeleteRequest.class);
- verify(bulkProcessorMock).add(acDeleteRequest.capture());
+ verify(bulkProcessorHandlerMock).add(acDeleteRequest.capture());
DeleteRequest request = acDeleteRequest.getValue();
assertEquals("test-index", request.index());
@@ -91,10 +87,8 @@ public class ElasticIndexWriterTest {
indexWriter.deleteDocuments("/foo");
indexWriter.deleteDocuments("/bar");
- ArgumentCaptor<IndexRequest> acIndexRequest =
ArgumentCaptor.forClass(IndexRequest.class);
- verify(bulkProcessorMock, times(2)).add(acIndexRequest.capture());
- ArgumentCaptor<DeleteRequest> acDeleteRequest =
ArgumentCaptor.forClass(DeleteRequest.class);
- verify(bulkProcessorMock, times(2)).add(acDeleteRequest.capture());
+ ArgumentCaptor<DocWriteRequest<?>> request =
ArgumentCaptor.forClass(DocWriteRequest.class);
+ verify(bulkProcessorHandlerMock, times(4)).add(request.capture());
}
@Test
@@ -104,7 +98,7 @@ public class ElasticIndexWriterTest {
indexWriter.updateDocument(generatedPath, new
ElasticDocument(generatedPath));
ArgumentCaptor<IndexRequest> acIndexRequest =
ArgumentCaptor.forClass(IndexRequest.class);
- verify(bulkProcessorMock).add(acIndexRequest.capture());
+ verify(bulkProcessorHandlerMock).add(acIndexRequest.capture());
IndexRequest request = acIndexRequest.getValue();
assertThat(request.id(), not(generatedPath));
@@ -114,7 +108,7 @@ public class ElasticIndexWriterTest {
@Test
public void closeBulkProcessor() {
indexWriter.close(System.currentTimeMillis());
- verify(bulkProcessorMock).close();
+ verify(bulkProcessorHandlerMock).close();
}
}