This is an automated email from the ASF dual-hosted git repository.
nfsantos pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/trunk by this push:
new 96a86e5420 OAK-11438 - Add support for parallelization in the Lucene
index writer backend. (#2031)
96a86e5420 is described below
commit 96a86e5420fb9da57256a12ec0c4f7206bdae27b
Author: Nuno Santos <[email protected]>
AuthorDate: Tue Feb 18 08:23:34 2025 +0100
OAK-11438 - Add support for parallelization in the Lucene index writer
backend. (#2031)
---
.../index/CompositeIndexEditorProvider.java | 8 +
.../oak/plugins/index/IndexEditorProvider.java | 7 +-
.../index/lucene/LuceneIndexEditorProvider.java | 39 +-
.../lucene/writer/DefaultIndexWriterFactory.java | 42 ++-
.../index/lucene/writer/IndexWriterPool.java | 407 +++++++++++++++++++++
.../lucene/writer/PooledLuceneIndexWriter.java | 78 ++++
.../reader/DefaultIndexReaderFactoryTest.java | 40 +-
.../index/lucene/writer/IndexWriterPoolTest.java | 172 +++++++++
.../jackrabbit/oak/index/OutOfBandIndexerBase.java | 56 +--
.../indexer/document/DocumentStoreIndexerBase.java | 11 +-
.../oak/index/indexer/document/LuceneIndexer.java | 17 +-
.../indexer/document/LuceneIndexerProvider.java | 38 +-
12 files changed, 852 insertions(+), 63 deletions(-)
diff --git
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/CompositeIndexEditorProvider.java
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/CompositeIndexEditorProvider.java
index 531540d8e6..c9127014a5 100644
---
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/CompositeIndexEditorProvider.java
+++
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/CompositeIndexEditorProvider.java
@@ -16,6 +16,7 @@
*/
package org.apache.jackrabbit.oak.plugins.index;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -67,4 +68,11 @@ public class CompositeIndexEditorProvider implements
IndexEditorProvider {
}
return CompositeEditor.compose(indexes);
}
+
+ @Override
+ public void close() throws IOException {
+ for (IndexEditorProvider provider : providers) {
+ provider.close();
+ }
+ }
}
diff --git
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexEditorProvider.java
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexEditorProvider.java
index 942f600c46..9705f8e95a 100644
---
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexEditorProvider.java
+++
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexEditorProvider.java
@@ -23,12 +23,14 @@ import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+import java.io.IOException;
+
/**
* Extension point for plugging in different kinds of IndexEditor providers.
*
* @see IndexEditor
*/
-public interface IndexEditorProvider {
+public interface IndexEditorProvider extends AutoCloseable {
/**
* Each provider knows how to produce a certain type of index. If the
@@ -56,4 +58,7 @@ public interface IndexEditorProvider {
@NotNull String type, @NotNull NodeBuilder definition,
@NotNull NodeState root,
@NotNull IndexUpdateCallback callback) throws
CommitFailedException;
+
+ default void close() throws IOException {
+ }
}
diff --git
a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java
b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java
index 4a2d3006a5..e1b818ce7f 100644
---
a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java
+++
b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java
@@ -33,6 +33,7 @@ import
org.apache.jackrabbit.oak.plugins.index.lucene.property.LuceneIndexProper
import
org.apache.jackrabbit.oak.plugins.index.lucene.property.PropertyIndexUpdateCallback;
import org.apache.jackrabbit.oak.plugins.index.lucene.property.PropertyQuery;
import
org.apache.jackrabbit.oak.plugins.index.lucene.writer.DefaultIndexWriterFactory;
+import org.apache.jackrabbit.oak.plugins.index.lucene.writer.IndexWriterPool;
import
org.apache.jackrabbit.oak.plugins.index.lucene.writer.LuceneIndexWriterConfig;
import
org.apache.jackrabbit.oak.plugins.index.search.CompositePropertyUpdateCallback;
import org.apache.jackrabbit.oak.plugins.index.search.ExtractedTextCache;
@@ -68,11 +69,13 @@ import static
org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstant
*
* @see LuceneIndexEditor
* @see IndexEditorProvider
- *
*/
public class LuceneIndexEditorProvider implements IndexEditorProvider {
private final static Logger LOG =
LoggerFactory.getLogger(LuceneIndexEditorProvider.class);
+ public static final String OAK_INDEXER_EDITOR_PARALLEL_WRITER_ENABLED =
"oak.indexer.editor.parallelWriter.enabled";
+
+
private final IndexCopier indexCopier;
private final ExtractedTextCache extractedTextCache;
private final IndexAugmentorFactory augmentorFactory;
@@ -81,6 +84,7 @@ public class LuceneIndexEditorProvider implements
IndexEditorProvider {
private final ActiveDeletedBlobCollector activeDeletedBlobCollector;
private final LuceneIndexMBean mbean;
private final StatisticsProvider statisticsProvider;
+ private final IndexWriterPool indexWriterPool;
private GarbageCollectableBlobStore blobStore;
private IndexingQueue indexingQueue;
@@ -141,6 +145,10 @@ public class LuceneIndexEditorProvider implements
IndexEditorProvider {
this.activeDeletedBlobCollector = activeDeletedBlobCollector;
this.mbean = mbean;
this.statisticsProvider = statisticsProvider;
+
+ boolean parallelIndexingEnabled =
ConfigHelper.getSystemPropertyAsBoolean(
+ OAK_INDEXER_EDITOR_PARALLEL_WRITER_ENABLED, false);
+ this.indexWriterPool = parallelIndexingEnabled ? new IndexWriterPool()
: null;
}
public LuceneIndexEditorProvider
withAsyncIndexesSizeStatsUpdate(AsyncIndexesSizeStatsUpdate
asyncIndexesSizeStatsUpdate) {
@@ -150,13 +158,13 @@ public class LuceneIndexEditorProvider implements
IndexEditorProvider {
@Override
public Editor getIndexEditor(
- @NotNull String type, @NotNull NodeBuilder definition, @NotNull
NodeState root,
- @NotNull IndexUpdateCallback callback)
+ @NotNull String type, @NotNull NodeBuilder definition, @NotNull
NodeState root,
+ @NotNull IndexUpdateCallback callback)
throws CommitFailedException {
if (TYPE_LUCENE.equals(type)) {
checkArgument(callback instanceof ContextAwareCallback,
"callback instance not of type ContextAwareCallback [%s]",
callback);
- IndexingContext indexingContext =
((ContextAwareCallback)callback).getIndexingContext();
+ IndexingContext indexingContext = ((ContextAwareCallback)
callback).getIndexingContext();
BlobDeletionCallback blobDeletionCallback =
activeDeletedBlobCollector.getBlobDeletionCallback();
indexingContext.registerIndexCommitCallback(blobDeletionCallback);
FulltextIndexWriterFactory writerFactory = null;
@@ -170,12 +178,12 @@ public class LuceneIndexEditorProvider implements
IndexEditorProvider {
//Would not participate in reindexing. Only interested in
//incremental indexing
- if (indexingContext.isReindexing()){
+ if (indexingContext.isReindexing()) {
return null;
}
CommitContext commitContext =
getCommitContext(indexingContext);
- if (commitContext == null){
+ if (commitContext == null) {
//Logically there should not be any commit without commit
context. But
//some initializer code does the commit with out it. So
ignore such calls with
//warning now
@@ -192,9 +200,9 @@ public class LuceneIndexEditorProvider implements
IndexEditorProvider {
//IndexDefinition from tracker might differ from one passed
here for reindexing
//case which should be fine. However reusing existing
definition would avoid
//creating definition instance for each commit as this gets
executed for each commit
- if (indexTracker != null){
+ if (indexTracker != null) {
indexDefinition =
indexTracker.getIndexDefinition(indexPath);
- if (indexDefinition != null &&
!indexDefinition.hasMatchingNodeTypeReg(root)){
+ if (indexDefinition != null &&
!indexDefinition.hasMatchingNodeTypeReg(root)) {
LOG.debug("Detected change in NodeType registry for
index {}. Would not use " +
"existing index definition",
indexDefinition.getIndexPath());
indexDefinition = null;
@@ -231,7 +239,8 @@ public class LuceneIndexEditorProvider implements
IndexEditorProvider {
writerFactory = new
DefaultIndexWriterFactory(mountInfoProvider,
newDirectoryFactory(blobDeletionCallback,
cowDirectoryCleanupCallback),
- writerConfig);
+ writerConfig,
+ indexWriterPool);
}
LuceneIndexEditorContext context = new
LuceneIndexEditorContext(root, definition, indexDefinition, callback,
@@ -278,7 +287,7 @@ public class LuceneIndexEditorProvider implements
IndexEditorProvider {
return new DefaultDirectoryFactory(indexCopier, blobStore,
blobDeletionCallback, cowDirectoryTracker);
}
- private LuceneDocumentHolder getDocumentHolder(CommitContext
commitContext){
+ private LuceneDocumentHolder getDocumentHolder(CommitContext
commitContext) {
LuceneDocumentHolder holder = (LuceneDocumentHolder)
commitContext.get(LuceneDocumentHolder.NAME);
if (holder == null) {
holder = new LuceneDocumentHolder(indexingQueue,
inMemoryDocsLimit);
@@ -308,6 +317,14 @@ public class LuceneIndexEditorProvider implements
IndexEditorProvider {
return nrtIndexingEnabled;
}
+ @Override
+ public void close() {
+ LOG.info("Closing LuceneIndexEditorProvider");
+ if (indexWriterPool != null) {
+ indexWriterPool.close();
+ }
+ }
+
private static CommitContext getCommitContext(IndexingContext
indexingContext) {
return (CommitContext)
indexingContext.getCommitInfo().getInfo().get(CommitContext.NAME);
}
@@ -331,7 +348,7 @@ public class LuceneIndexEditorProvider implements
IndexEditorProvider {
}
for (File f : reindexingLocalDirectories) {
- if ( ! FileUtils.deleteQuietly(f)) {
+ if (!FileUtils.deleteQuietly(f)) {
LOG.warn("Failed to delete {}", f);
}
}
diff --git
a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/DefaultIndexWriterFactory.java
b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/DefaultIndexWriterFactory.java
index ba2639244b..ac15919c9b 100644
---
a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/DefaultIndexWriterFactory.java
+++
b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/DefaultIndexWriterFactory.java
@@ -28,19 +28,33 @@ import
org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.mount.MountInfoProvider;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static java.util.Objects.requireNonNull;
public class DefaultIndexWriterFactory implements LuceneIndexWriterFactory {
+ private static final Logger LOG =
LoggerFactory.getLogger(DefaultIndexWriterFactory.class);
+
private final MountInfoProvider mountInfoProvider;
private final DirectoryFactory directoryFactory;
private final LuceneIndexWriterConfig writerConfig;
+ private final IndexWriterPool indexWriterPool;
public DefaultIndexWriterFactory(MountInfoProvider mountInfoProvider,
- DirectoryFactory directoryFactory,
LuceneIndexWriterConfig writerConfig) {
+ DirectoryFactory directoryFactory,
+ LuceneIndexWriterConfig writerConfig) {
+ this(mountInfoProvider, directoryFactory, writerConfig, null);
+ }
+
+ public DefaultIndexWriterFactory(MountInfoProvider mountInfoProvider,
+ DirectoryFactory directoryFactory,
+ LuceneIndexWriterConfig writerConfig,
+ IndexWriterPool indexWriterPool) {
this.mountInfoProvider = requireNonNull(mountInfoProvider);
this.directoryFactory = requireNonNull(directoryFactory);
this.writerConfig = requireNonNull(writerConfig);
+ this.indexWriterPool = indexWriterPool;
}
@Override
@@ -50,14 +64,30 @@ public class DefaultIndexWriterFactory implements
LuceneIndexWriterFactory {
"Expected %s but found %s for index definition",
LuceneIndexDefinition.class, def.getClass());
- LuceneIndexDefinition definition = (LuceneIndexDefinition)def;
+ LuceneIndexDefinition definition = (LuceneIndexDefinition) def;
- if (mountInfoProvider.hasNonDefaultMounts()){
- return new MultiplexingIndexWriter(directoryFactory,
mountInfoProvider, definition,
- definitionBuilder, reindex, writerConfig);
+ if (mountInfoProvider.hasNonDefaultMounts()) {
+ return wrapWithPipelinedIndexWriter(
+ new MultiplexingIndexWriter(directoryFactory,
mountInfoProvider, definition, definitionBuilder, reindex, writerConfig),
+ definition.getIndexName());
}
- return new DefaultIndexWriter(definition, definitionBuilder,
directoryFactory,
+ DefaultIndexWriter writer = new DefaultIndexWriter(definition,
definitionBuilder, directoryFactory,
FulltextIndexConstants.INDEX_DATA_CHILD_NAME,
LuceneIndexConstants.SUGGEST_DATA_CHILD_NAME, reindex,
writerConfig);
+
+ return wrapWithPipelinedIndexWriter(writer, definition.getIndexName());
+ }
+
+ public void close() {
+ LOG.debug("Closing LuceneIndexWriterFactory");
+ if (indexWriterPool == null) {
+ LOG.debug("Not using an Index writer pool");
+ } else {
+ indexWriterPool.close();
+ }
+ }
+
+ private LuceneIndexWriter wrapWithPipelinedIndexWriter(LuceneIndexWriter
writer, String indexName) {
+ return indexWriterPool == null ? writer : new
PooledLuceneIndexWriter(indexWriterPool, writer, indexName);
}
}
diff --git
a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/IndexWriterPool.java
b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/IndexWriterPool.java
new file mode 100644
index 0000000000..a76e2ae8fc
--- /dev/null
+++
b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/IndexWriterPool.java
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.lucene.writer;
+
+import org.apache.jackrabbit.guava.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
+import org.apache.jackrabbit.oak.plugins.index.ConfigHelper;
+import org.apache.jackrabbit.oak.plugins.index.FormattingUtils;
+import org.apache.lucene.index.IndexableField;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+
+public class IndexWriterPool {
+ private final static Logger LOG =
LoggerFactory.getLogger(IndexWriterPool.class);
+
+ public static final String OAK_INDEXER_PARALLEL_WRITER_MAX_BATCH_SIZE =
"oak.indexer.parallelWriter.maxBatchSize";
+ public static final int DEFAULT_OAK_INDEXER_PARALLEL_WRITER_MAX_BATCH_SIZE
= 256;
+
+ public static final String OAK_INDEXER_PARALLEL_WRITER_QUEUE_SIZE =
"oak.indexer.parallelWriter.queueSize";
+ public static final int DEFAULT_OAK_INDEXER_PARALLEL_WRITER_QUEUE_SIZE =
64;
+
+ public static final String OAK_INDEXER_PARALLEL_WRITER_NUMBER_THREADS =
"oak.indexer.parallelWriter.numberThreads";
+ public static final int DEFAULT_OAK_INDEXER_PARALLEL_WRITER_NUMBER_THREADS
= 4;
+
+ private final int maxBatchSize =
ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PARALLEL_WRITER_MAX_BATCH_SIZE,
DEFAULT_OAK_INDEXER_PARALLEL_WRITER_MAX_BATCH_SIZE);
+ private final int queueSize =
ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PARALLEL_WRITER_QUEUE_SIZE,
DEFAULT_OAK_INDEXER_PARALLEL_WRITER_QUEUE_SIZE);
+ private final int numberOfThreads =
ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PARALLEL_WRITER_NUMBER_THREADS,
DEFAULT_OAK_INDEXER_PARALLEL_WRITER_NUMBER_THREADS);
+
+ // The batch of operations that will be sent to the workers.
+ // Batching individual operations reduces the overhead of synchronization
and context switching.
+ private final ArrayList<Operation> batch = new ArrayList<>(maxBatchSize);
+ // Shared queue between producer and workers
+ private final BlockingQueue<OperationBatch> queue = new
ArrayBlockingQueue<>(queueSize);
+ private final List<Worker> workers;
+ private final List<Future<?>> workerFutures;
+ private final ExecutorService writersPool;
+ // Used to schedule a task that periodically prints statistics
+ private final ScheduledExecutorService monitorTaskExecutor;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ // Used to keep track of the sequence number of the batches that are
currently being processed.
+ // This is used to wait until all operations for a writer are processed
before closing it.
+ private final Object pendingBatchesLock = new Object();
+ private final HashSet<Long> pendingBatches = new HashSet<>();
+ private long batchSequenceNumber = 0;
+
+ // Statistics
+ private final long startTimeNanos = System.nanoTime();
+ private long updateCount = 0;
+ private long deleteCount = 0;
+ private long totalEnqueueTimeNanos = 0;
+
+ private static class OperationBatch {
+ final long sequenceNumber;
+ final Operation[] operations;
+
+ public OperationBatch(long sequenceNumber, Operation[] operations) {
+ Objects.requireNonNull(operations);
+ this.sequenceNumber = sequenceNumber;
+ this.operations = operations;
+ }
+ }
+
+ private static abstract class Operation {
+ final LuceneIndexWriter delegate;
+
+ public Operation(LuceneIndexWriter delegate) {
+ this.delegate = delegate;
+ }
+
+ abstract void execute() throws IOException;
+ }
+
+ private static class UpdateOperation extends Operation {
+ private final String path;
+ private final Iterable<? extends IndexableField> doc;
+
+ UpdateOperation(LuceneIndexWriter delegate, String path, Iterable<?
extends IndexableField> doc) {
+ super(delegate);
+ this.path = path;
+ this.doc = doc;
+ }
+
+ @Override
+ public void execute() throws IOException {
+ delegate.updateDocument(path, doc);
+ }
+ }
+
+ private static class DeleteOperation extends Operation {
+ private final String path;
+
+ DeleteOperation(LuceneIndexWriter delegate, String path) {
+ super(delegate);
+ this.path = path;
+ }
+
+ @Override
+ public void execute() throws IOException {
+ delegate.deleteDocuments(path);
+ }
+ }
+
+ private static class CloseResult {
+ // Either result or error is non-null. The two constructors enforce
this invariant.
+ final Boolean result;
+ final Throwable error;
+
+ CloseResult(boolean result) {
+ this.result = result;
+ this.error = null;
+ }
+
+ CloseResult(Throwable error) {
+ this.result = null;
+ this.error = error;
+ }
+
+ @Override
+ public String toString() {
+ return "CloseResult{" +
+ "result=" + result +
+ ", error=" + error +
+ '}';
+ }
+ }
+
+ private static class CloseWriterOperation extends Operation {
+ private final long timestamp;
+ private final SynchronousQueue<CloseResult> sync;
+
+ /**
+ * The close operation should be synchronous and applied only after
all the write operations for this writer
+ * are processed.
+ *
+ * @param sync A synchronous queue used to wait for the result of the
close operation.
+ */
+ CloseWriterOperation(LuceneIndexWriter delegate, long timestamp,
SynchronousQueue<CloseResult> sync) {
+ super(delegate);
+ this.timestamp = timestamp;
+ this.sync = sync;
+ }
+
+ @Override
+ public void execute() {
+ try {
+ try {
+ boolean closeResult = delegate.close(timestamp);
+ sync.put(new CloseResult(closeResult));
+ } catch (IOException e) {
+ sync.put(new CloseResult(e));
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ // Sentinel used to terminate worker threads
+ final static OperationBatch SHUTDOWN = new OperationBatch(-1, new
Operation[0]);
+
+ private class Worker implements Runnable {
+ private final long startTime = System.nanoTime();
+ private final int id;
+ private long opCount = 0;
+ private long batchesProcessed = 0;
+ private long totalBusyTime = 0;
+
+ public Worker(int id) {
+ this.id = id;
+ }
+
+ @Override
+ public void run() {
+ String oldName = Thread.currentThread().getName();
+ Thread.currentThread().setName("index-writer-" + id);
+ try {
+ LOG.info("[{}] Worker started", id);
+ while (true) {
+ OperationBatch op = queue.take();
+ if (op == SHUTDOWN) {
+ queue.add(SHUTDOWN);
+ LOG.info("[{}] Shutting down worker", id);
+ printStatistics();
+ return;
+ }
+ long start = System.nanoTime();
+ for (Operation operation : op.operations) {
+ operation.execute();
+ opCount++;
+ }
+ batchesProcessed++;
+ long durationNanos = System.nanoTime() - start;
+ totalBusyTime += durationNanos;
+ long durationMillis = durationNanos / 1_000_000;
+ if (durationMillis > 1000) {
+ LOG.info("[{}] Processing batch {} of size {} took {}
millis.",
+ id, op.sequenceNumber, op.operations.length,
durationMillis);
+ }
+ synchronized (pendingBatchesLock) {
+ pendingBatches.remove(op.sequenceNumber);
+ pendingBatchesLock.notifyAll();
+ }
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("[{}] Interrupted while waiting for an index write
operation", id, e);
+ throw new RuntimeException(e);
+ } catch (Throwable t) {
+ LOG.error("[{}] Error while processing an index write
operation", id, t);
+ throw new RuntimeException(t);
+ } finally {
+ Thread.currentThread().setName(oldName);
+ }
+ }
+
+ void printStatistics() {
+ double busyTimePercentage =
FormattingUtils.safeComputePercentage(totalBusyTime, System.nanoTime() -
startTime);
+ String busyTimePercentageStr = String.format("%.2f",
busyTimePercentage);
+ LOG.info("[{}] operationsProcessed: {}, batchesProcessed: {},
busyTime: {} ms ({}%)",
+ id, opCount, batchesProcessed, totalBusyTime / 1_000_000,
busyTimePercentageStr);
+ }
+ }
+
+ /**
+ * Creates and starts a pool of writer threads.
+ * <p>
+ * WARN: This is not thread safe.
+ */
+ public IndexWriterPool() {
+ this.writersPool = Executors.newFixedThreadPool(numberOfThreads, new
ThreadFactoryBuilder()
+ .setDaemon(true)
+ .build());
+ this.monitorTaskExecutor =
Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("index-writer-monitor")
+ .build());
+ this.workers = IntStream.range(0, numberOfThreads)
+ .mapToObj(Worker::new)
+ .collect(Collectors.toList());
+ this.workerFutures = workers.stream()
+ .map(writersPool::submit)
+ .collect(Collectors.toList());
+ monitorTaskExecutor.scheduleAtFixedRate(this::printStatistics, 30, 30,
TimeUnit.SECONDS);
+ LOG.info("Writing thread started");
+ }
+
+ public void updateDocument(LuceneIndexWriter writer, String path,
Iterable<? extends IndexableField> doc) throws IOException {
+ checkOpen();
+ this.updateCount++;
+ enqueueOperation(new UpdateOperation(writer, path, doc));
+ }
+
+ public void deleteDocuments(LuceneIndexWriter writer, String path) throws
IOException {
+ checkOpen();
+ this.deleteCount++;
+ enqueueOperation(new DeleteOperation(writer, path));
+ }
+
+ public boolean closeWriter(LuceneIndexWriter writer, long timestamp)
throws IOException {
+ checkOpen();
+ try {
+ LOG.info("Closing writer: {}", writer);
+ // Before closing the writer, we must wait until all previously
submitted operations for
+ // this writer are processed. For simplicity, we wait instead
until ALL operations currently
+ // in the queue are processed, because otherwise it would be more
complex to distinguish which
+ // operations are for which writer.
+ long seqNumber = flushBatch();
+ LOG.info("All pending operations enqueued. Waiting until all
batches up to {} are processed", seqNumber);
+ synchronized (pendingBatchesLock) {
+ while (true) {
+ Long earliestPending = pendingBatches.isEmpty() ? null :
pendingBatches.stream().min(Long::compareTo).get();
+ LOG.debug("Earliest pending batch: {}. Waiting until all
batches up to {} are processed", earliestPending, seqNumber);
+ if (earliestPending == null || earliestPending >
seqNumber) {
+ break;
+ }
+ pendingBatchesLock.wait();
+ }
+ }
+ LOG.info("All batches up to {} processed. Enqueuing close
operation for writer {}", seqNumber, writer);
+ SynchronousQueue<CloseResult> closeOpSync = new
SynchronousQueue<>();
+ batch.add(new CloseWriterOperation(writer, timestamp,
closeOpSync));
+ flushBatch();
+ CloseResult res = closeOpSync.take();
+ LOG.info("Writer {} closed. Result: {}", writer, res);
+ if (res.error == null) {
+ return res.result;
+ } else {
+ throw new IOException("Error while closing writer", res.error);
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while waiting for the worker to finish", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void close() {
+ if (closed.compareAndSet(false, true)) {
+ flushBatch();
+ queue.add(SHUTDOWN);
+ LOG.info("Shutting down PipelinedIndexWriter. Total enqueue time:
{} ms", totalEnqueueTimeNanos / 1_000_000);
+ for (Future<?> f : workerFutures) {
+ LOG.info("Waiting for future: {}", f);
+ try {
+ f.get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.info("Error while waiting for future", e);
+ }
+ }
+ printStatistics();
+ new ExecutorCloser(writersPool, 1, TimeUnit.SECONDS).close();
+ new ExecutorCloser(monitorTaskExecutor, 1,
TimeUnit.SECONDS).close();
+ } else {
+ LOG.warn("PipelinedIndexWriter already closed");
+ }
+ }
+
+ private void enqueueOperation(Operation op) {
+ batch.add(op);
+ if (batch.size() == maxBatchSize) {
+ flushBatch();
+ }
+ }
+
+ private void checkOpen() {
+ if (closed.get()) {
+ throw new IllegalStateException("PipelinedIndexWriter is closed");
+ }
+ }
+
+ private long flushBatch() {
+ // Empty batches are also enqueued. This is necessary for the close
writer operation, which requires all previous
+ // operations for the writer to be processed before the writer is
closed. This means that all enqueued batches
+ // must be processed and the current partially built batch must also
be enqueued and processed. To ensure that,
+ // the close operation will always enqueue the current batch and wait
for all the batches older or equal to the
+ // newly enqueued batch to be processed. If there are no operations in
the currently pending batch, we enqueue
+ // it anyway just to generate a new sequence number.
+ try {
+ long seqNumber;
+ synchronized (pendingBatchesLock) {
+ // Shared between producer and workers
+ seqNumber = batchSequenceNumber;
+ batchSequenceNumber++;
+ pendingBatches.add(seqNumber);
+ }
+ if (seqNumber % 1000 == 0) {
+ LOG.info("Enqueuing batch {}, size: {}", seqNumber,
batch.size());
+ }
+ long start = System.nanoTime();
+ queue.put(new OperationBatch(seqNumber, batch.toArray(new
Operation[0])));
+ long durationNanos = System.nanoTime() - start;
+ long durationMillis = durationNanos / 1_000_000;
+ totalEnqueueTimeNanos += durationNanos;
+ if (durationMillis > 1) {
+ LOG.info("Enqueuing batch delayed. Seq number: {}, size: {}.
Delay: {} ms",
+ seqNumber, batch.size(), durationMillis);
+ }
+ batch.clear();
+ return seqNumber;
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while waiting to put an update operation in
the queue", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void printStatistics() {
+ double percentageEnqueueTime =
FormattingUtils.safeComputePercentage(totalEnqueueTimeNanos, System.nanoTime()
- startTimeNanos);
+ String percentageEnqueueTimeStr = String.format("%.2f",
percentageEnqueueTime);
+ LOG.info("updateCount: {}, deleteCount: {}, batchesEnqueuedCount: {},
pendingBatchesCount: {}, enqueueTime: {} ms ({}%)",
+ updateCount, deleteCount, batchSequenceNumber,
pendingBatches.size(), totalEnqueueTimeNanos / 1_000_000,
percentageEnqueueTimeStr);
+ workers.forEach(Worker::printStatistics);
+ }
+}
diff --git
a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/PooledLuceneIndexWriter.java
b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/PooledLuceneIndexWriter.java
new file mode 100644
index 0000000000..fc922e9225
--- /dev/null
+++
b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/PooledLuceneIndexWriter.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.lucene.writer;
+
+import org.apache.lucene.index.IndexableField;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Index writer that delegates write operations to an IndexWriterPool.
+ */
+public class PooledLuceneIndexWriter implements LuceneIndexWriter {
+ private final static Logger LOG =
LoggerFactory.getLogger(PooledLuceneIndexWriter.class);
+
+ private final String indexName;
+ private final LuceneIndexWriter delegateWriter;
+ private final IndexWriterPool writerPool;
+
+ private long updateCount = 0;
+ private long deleteCount = 0;
+
+ public PooledLuceneIndexWriter(IndexWriterPool writerPool,
LuceneIndexWriter delegateWriter, String indexName) {
+ this.writerPool = writerPool;
+ this.delegateWriter = delegateWriter;
+ this.indexName = indexName;
+ }
+
+ @Override
+ public void updateDocument(String path, Iterable<? extends IndexableField>
doc) throws IOException {
+ writerPool.updateDocument(delegateWriter, path, doc);
+ updateCount++;
+ }
+
+ @Override
+ public void deleteDocuments(String path) throws IOException {
+ writerPool.deleteDocuments(delegateWriter, path);
+ deleteCount++;
+ }
+
+ @Override
+ public boolean close(long timestamp) throws IOException {
+ LOG.info("[{}] Shutting down PipelinedFullTextIndexWriter", indexName);
+ return writerPool.closeWriter(delegateWriter, timestamp);
+ }
+
+ public long getUpdateCount() {
+ return updateCount;
+ }
+
+ public long getDeleteCount() {
+ return deleteCount;
+ }
+
+ public String formatStatistics() {
+ return "PooledLuceneIndexWriter(" + indexName + ")[" +
+ "updates: " + updateCount + ", " +
+ "deletes: " + deleteCount +
+ "]";
+ }
+}
diff --git
a/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/reader/DefaultIndexReaderFactoryTest.java
b/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/reader/DefaultIndexReaderFactoryTest.java
index b617956781..6150499d48 100644
---
a/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/reader/DefaultIndexReaderFactoryTest.java
+++
b/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/reader/DefaultIndexReaderFactoryTest.java
@@ -32,6 +32,7 @@ import
org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexWriterFactory;
import
org.apache.jackrabbit.oak.plugins.index.lucene.directory.DefaultDirectoryFactory;
import
org.apache.jackrabbit.oak.plugins.index.lucene.directory.DirectoryFactory;
import
org.apache.jackrabbit.oak.plugins.index.lucene.writer.DefaultIndexWriterFactory;
+import org.apache.jackrabbit.oak.plugins.index.lucene.writer.IndexWriterPool;
import org.apache.jackrabbit.oak.plugins.index.lucene.writer.LuceneIndexWriter;
import
org.apache.jackrabbit.oak.plugins.index.lucene.writer.LuceneIndexWriterConfig;
import org.apache.jackrabbit.oak.plugins.index.search.FieldNames;
@@ -41,25 +42,54 @@ import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.StringField;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import static org.apache.jackrabbit.oak.plugins.index.lucene.TestUtil.newDoc;
import static
org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
import static org.apache.jackrabbit.oak.InitialContentHelper.INITIAL_CONTENT;
import static org.junit.Assert.*;
+@RunWith(Parameterized.class)
public class DefaultIndexReaderFactoryTest {
+ private final boolean parallelIndexing;
+ private IndexWriterPool indexWriterPool;
+
+ @Parameterized.Parameters(name = "Parallel Indexing: ({0})")
+ public static List<Boolean> parallelIndexingEnabled() {
+ return List.of(true, false);
+ }
+
@Rule
public TemporaryFolder folder = new TemporaryFolder(new File("target"));
- private NodeState root = INITIAL_CONTENT;
- private NodeBuilder builder = EMPTY_NODE.builder();
+ private final NodeState root = INITIAL_CONTENT;
+ private final NodeBuilder builder = EMPTY_NODE.builder();
private LuceneIndexDefinition defn = new LuceneIndexDefinition(root,
builder.getNodeState(), "/foo");
- private MountInfoProvider mip = Mounts.newBuilder()
+ private final MountInfoProvider mip = Mounts.newBuilder()
.mount("foo", "/libs", "/apps").build();
- private LuceneIndexWriterConfig writerConfig = new
LuceneIndexWriterConfig();
+ private final LuceneIndexWriterConfig writerConfig = new
LuceneIndexWriterConfig();
+
+ public DefaultIndexReaderFactoryTest(boolean parallelIndexing) {
+ this.parallelIndexing = parallelIndexing;
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ indexWriterPool = parallelIndexing ? new IndexWriterPool() : null;
+ }
+
+ @After
+ public void tearDown() {
+ if (indexWriterPool != null) {
+ indexWriterPool.close();
+ }
+ }
@Test
public void emptyDir() throws Exception{
@@ -196,6 +226,6 @@ public class DefaultIndexReaderFactoryTest {
private LuceneIndexWriterFactory newDirectoryFactory(){
DirectoryFactory directoryFactory = new DefaultDirectoryFactory(null,
null);
- return new DefaultIndexWriterFactory(mip, directoryFactory,
writerConfig);
+ return new DefaultIndexWriterFactory(mip, directoryFactory,
writerConfig, indexWriterPool);
}
}
diff --git
a/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/IndexWriterPoolTest.java
b/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/IndexWriterPoolTest.java
new file mode 100644
index 0000000000..2d94694ae7
--- /dev/null
+++
b/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/IndexWriterPoolTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.lucene.writer;
+
+import org.apache.jackrabbit.oak.plugins.index.lucene.TestUtil;
+import org.apache.jackrabbit.oak.plugins.index.search.FieldNames;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexableField;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.contrib.java.lang.system.ProvideSystemProperty;
+import org.junit.contrib.java.lang.system.RestoreSystemProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class IndexWriterPoolTest {
+ private final static Logger LOG =
LoggerFactory.getLogger(IndexWriterPoolTest.class);
+
+ private static class TestWriter implements LuceneIndexWriter {
+ private final int delayMillis;
+ // The writers must be thread safe
+ final Set<String> deletedPaths = ConcurrentHashMap.newKeySet();
+ final Map<String, Document> docs = new ConcurrentHashMap<>();
+ boolean closed;
+
+ public TestWriter() {
+ this(0);
+ }
+
+ public TestWriter(int delayMillis) {
+ this.delayMillis = delayMillis;
+ }
+
+ private void delay() {
+ if (delayMillis > 0) {
+ try {
+ LOG.info("Delaying {}", delayMillis);
+ Thread.sleep(delayMillis);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public void updateDocument(String path, Iterable<? extends
IndexableField> doc) {
+ delay();
+ docs.put(path, (Document) doc);
+ }
+
+ @Override
+ public void deleteDocuments(String path) {
+ delay();
+ deletedPaths.add(path);
+ }
+
+ @Override
+ public boolean close(long timestamp) {
+ delay();
+ closed = true;
+ return true;
+ }
+ }
+
+ @Rule
+ public final ProvideSystemProperty updateSystemProperties
+ = new
ProvideSystemProperty(IndexWriterPool.OAK_INDEXER_PARALLEL_WRITER_MAX_BATCH_SIZE,
"5")
+ .and(IndexWriterPool.OAK_INDEXER_PARALLEL_WRITER_NUMBER_THREADS,
"8")
+ .and(IndexWriterPool.OAK_INDEXER_PARALLEL_WRITER_QUEUE_SIZE,
"100");
+
+ @Rule
+ public final RestoreSystemProperties restoreSystemProperties = new
RestoreSystemProperties();
+
+ @Test
+ public void testSingleWriter() throws IOException {
+ IndexWriterPool indexWriterPool = new IndexWriterPool();
+ TestWriter writer = new TestWriter();
+ Document doc = TestUtil.newDoc("value");
+ indexWriterPool.updateDocument(writer, "test", doc);
+ indexWriterPool.deleteDocuments(writer, "test");
+ boolean closeResult = indexWriterPool.closeWriter(writer, 30);
+ indexWriterPool.close();
+
+ assertTrue(writer.closed);
+ assertTrue(closeResult);
+ assertEquals(Set.of("test"), writer.deletedPaths);
+ assertEquals(Map.of("test", doc), writer.docs);
+ }
+
+ @Test
+ public void testMultipleWriters() throws IOException {
+ IndexWriterPool indexWriterPool = new IndexWriterPool();
+ List<TestWriter> writers = IntStream.range(0,
5).mapToObj(TestWriter::new).collect(Collectors.toList());
+ List<Document> updateDocs = IntStream.range(0, 200).mapToObj(i ->
TestUtil.newDoc("test-doc-" + i)).collect(Collectors.toList());
+
+ int i = 0;
+ for (var doc : updateDocs) {
+ indexWriterPool.updateDocument(writers.get(i%writers.size()),
doc.get(FieldNames.PATH), doc);
+ i++;
+ }
+ for (var w : writers) {
+ indexWriterPool.closeWriter(w, 0);
+ }
+ indexWriterPool.close();
+
+ List<Document> documentsWritten = new ArrayList<>();
+ for (var w : writers) {
+ documentsWritten.addAll(w.docs.values());
+ LOG.info("w: {}, d: {}", w.docs.size(), w.deletedPaths.size());
+ }
+
+ assertEquals(updateDocs.size(), documentsWritten.size());
+ assertEquals(Set.copyOf(updateDocs), Set.copyOf(documentsWritten));
+ assertTrue(writers.stream().allMatch(w -> w.deletedPaths.isEmpty()));
+ }
+
+ @Test
+ public void testEnqueueAfterClose() throws IOException {
+ IndexWriterPool indexWriterPool = new IndexWriterPool();
+ indexWriterPool.close();
+ // Subsequent calls to close should be ignored
+ indexWriterPool.close();
+ // the following method should throw IllegalStateException
+ try {
+ indexWriterPool.updateDocument(new TestWriter(), "test",
TestUtil.newDoc("value"));
+ fail("updateDocument did not throw expected exception");
+ } catch (IllegalStateException expected) {
+ }
+ }
+
+ @Test
+ public void testCloseWriterPoolWithoutClosingWriters() throws IOException {
+ IndexWriterPool indexWriterPool = new IndexWriterPool();
+ TestWriter writer = new TestWriter(100);
+ Document doc = TestUtil.newDoc("value");
+ indexWriterPool.updateDocument(writer, "test", doc);
+ indexWriterPool.deleteDocuments(writer, "test-deletion");
+ indexWriterPool.close();
+
+ assertEquals(Map.of("test", doc), writer.docs);
+ assertEquals(Set.of("test-deletion"), writer.deletedPaths);
+ }
+}
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/OutOfBandIndexerBase.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/OutOfBandIndexerBase.java
index 7b28f47215..63531ab4ef 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/OutOfBandIndexerBase.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/OutOfBandIndexerBase.java
@@ -136,34 +136,36 @@ public abstract class OutOfBandIndexerBase implements
Closeable, IndexUpdateCall
private void preformIndexUpdate(NodeState baseState) throws IOException,
CommitFailedException {
NodeBuilder builder = copyOnWriteStore.getRoot().builder();
- IndexUpdate indexUpdate = new IndexUpdate(
- createIndexEditorProvider(),
- REINDEX_LANE,
- copyOnWriteStore.getRoot(),
- builder,
- this,
- this,
- CommitInfo.EMPTY,
- CorruptIndexHandler.NOOP
- );
-
- configureEstimators(indexUpdate);
-
- //Do not use EmptyState as before otherwise the IndexUpdate would
- //unnecessary traverse the whole repo post reindexing. With use of
baseState
- //It would only traverse the diff i.e. those index definitions paths
- //whose lane has been changed
- NodeState before = baseState;
- NodeState after = copyOnWriteStore.getRoot();
-
- CommitFailedException exception =
- EditorDiff.process(VisibleEditor.wrap(indexUpdate), before,
after);
-
- if (exception != null) {
- throw exception;
+ try (IndexEditorProvider provider = createIndexEditorProvider()) {
+ IndexUpdate indexUpdate = new IndexUpdate(
+ provider,
+ REINDEX_LANE,
+ copyOnWriteStore.getRoot(),
+ builder,
+ this,
+ this,
+ CommitInfo.EMPTY,
+ CorruptIndexHandler.NOOP
+ );
+
+ configureEstimators(indexUpdate);
+
+ //Do not use EmptyState as before otherwise the IndexUpdate would
+ //unnecessary traverse the whole repo post reindexing. With use of
baseState
+ //It would only traverse the diff i.e. those index definitions
paths
+ //whose lane has been changed
+ NodeState before = baseState;
+ NodeState after = copyOnWriteStore.getRoot();
+
+ CommitFailedException exception =
+ EditorDiff.process(VisibleEditor.wrap(indexUpdate),
before, after);
+
+ if (exception != null) {
+ throw exception;
+ }
+
+ copyOnWriteStore.merge(builder, EmptyHook.INSTANCE,
CommitInfo.EMPTY);
}
-
- copyOnWriteStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
}
protected abstract IndexEditorProvider createIndexEditorProvider() throws
IOException;
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
index 991e91526e..f2b22e34fa 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
@@ -413,12 +413,13 @@ public abstract class DocumentStoreIndexerBase implements
Closeable {
}
}
log.info("Top slowest nodes to index (ms): {}",
slowestTopKElements);
+ }
- indexerProviders.forEach(indexProvider -> {
- ExtractedTextCache extractedTextCache =
indexProvider.getTextCache();
- CacheStats cacheStats = extractedTextCache == null ?
null : extractedTextCache.getCacheStats();
- log.info("Text extraction cache statistics: {}",
cacheStats == null ? "N/A" : cacheStats.cacheInfoAsString());
- });
+ for (NodeStateIndexerProvider indexerProvider :
indexerProviders) {
+ ExtractedTextCache extractedTextCache =
indexerProvider.getTextCache();
+ CacheStats cacheStats = extractedTextCache == null ? null
: extractedTextCache.getCacheStats();
+ log.info("Text extraction cache statistics: {}",
cacheStats == null ? "N/A" : cacheStats.cacheInfoAsString());
+ indexerProvider.close();
}
progressReporter.reindexingTraversalEnd();
diff --git
a/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/LuceneIndexer.java
b/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/LuceneIndexer.java
index 373d41fb83..45626c85f9 100644
---
a/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/LuceneIndexer.java
+++
b/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/LuceneIndexer.java
@@ -21,6 +21,7 @@ package org.apache.jackrabbit.oak.index.indexer.document;
import java.io.IOException;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
@@ -49,6 +50,7 @@ public class LuceneIndexer implements NodeStateIndexer,
FacetsConfigProvider {
private FacetsConfig facetsConfig;
private final IndexerStatisticsTracker indexerStatisticsTracker = new
IndexerStatisticsTracker(LOG);
+ private final AtomicBoolean closed = new AtomicBoolean(false);
public LuceneIndexer(IndexDefinition definition, LuceneIndexWriter
indexWriter,
NodeBuilder builder, FulltextBinaryTextExtractor
binaryTextExtractor,
@@ -122,9 +124,13 @@ public class LuceneIndexer implements NodeStateIndexer,
FacetsConfigProvider {
@Override
public void close() throws IOException {
- LOG.info("[{}] Statistics: {}", definition.getIndexName(),
indexerStatisticsTracker.formatStats());
- binaryTextExtractor.logStats();
- indexWriter.close(System.currentTimeMillis());
+ if (closed.compareAndSet(false, true)) {
+ indexWriter.close(System.currentTimeMillis());
+ LOG.info("[{}] Statistics: {}", definition.getIndexName(),
indexerStatisticsTracker.formatStats());
+ binaryTextExtractor.logStats();
+ } else {
+ LOG.debug("Indexer already closed: {}", definition.getIndexName());
+ }
}
private void writeToIndex(Document doc, String path) throws IOException {
@@ -152,4 +158,9 @@ public class LuceneIndexer implements NodeStateIndexer,
FacetsConfigProvider {
return facetsConfig;
}
+
+ @Override
+ public String toString() {
+ return definition.toString();
+ }
}
diff --git
a/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/LuceneIndexerProvider.java
b/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/LuceneIndexerProvider.java
index 2cc235b7f2..0d6a182f61 100644
---
a/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/LuceneIndexerProvider.java
+++
b/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/LuceneIndexerProvider.java
@@ -20,15 +20,19 @@
package org.apache.jackrabbit.oak.index.indexer.document;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.FileUtils;
import org.apache.jackrabbit.oak.index.ExtendedIndexHelper;
import org.apache.jackrabbit.oak.index.IndexerSupport;
+import org.apache.jackrabbit.oak.plugins.index.ConfigHelper;
import org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexDefinition;
import
org.apache.jackrabbit.oak.plugins.index.lucene.directory.DirectoryFactory;
import
org.apache.jackrabbit.oak.plugins.index.lucene.directory.FSDirectoryFactory;
import
org.apache.jackrabbit.oak.plugins.index.lucene.writer.DefaultIndexWriterFactory;
+import org.apache.jackrabbit.oak.plugins.index.lucene.writer.IndexWriterPool;
import org.apache.jackrabbit.oak.plugins.index.lucene.writer.LuceneIndexWriter;
import
org.apache.jackrabbit.oak.plugins.index.progress.IndexingProgressReporter;
import org.apache.jackrabbit.oak.plugins.index.search.ExtractedTextCache;
@@ -41,14 +45,26 @@ import static
org.apache.jackrabbit.oak.plugins.index.IndexConstants.TYPE_PROPER
import static
org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.TYPE_LUCENE;
public class LuceneIndexerProvider implements NodeStateIndexerProvider {
+
+ public static final String OAK_INDEXER_DOCUMENT_PARALLEL_WRITER_ENABLED =
"oak.indexer.document.parallelWriter.enabled";
+
private final ExtractedTextCache textCache =
new ExtractedTextCache(FileUtils.ONE_MB * 5,
TimeUnit.HOURS.toSeconds(5));
private final DefaultIndexWriterFactory indexWriterFactory;
+ private final ArrayList<LuceneIndexer> indexWriters = new ArrayList<>();
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final IndexWriterPool indexWriterPool;
public LuceneIndexerProvider(ExtendedIndexHelper extendedIndexHelper,
IndexerSupport indexerSupport) throws IOException {
DirectoryFactory dirFactory = new
FSDirectoryFactory(indexerSupport.getLocalIndexDir());
- this.indexWriterFactory = new
DefaultIndexWriterFactory(extendedIndexHelper.getMountInfoProvider(),
- dirFactory,
extendedIndexHelper.getLuceneIndexHelper().getWriterConfigForReindex());
+ boolean parallelIndexingEnabled =
ConfigHelper.getSystemPropertyAsBoolean(
+ OAK_INDEXER_DOCUMENT_PARALLEL_WRITER_ENABLED, false);
+ this.indexWriterPool = parallelIndexingEnabled? new IndexWriterPool()
: null;
+ this.indexWriterFactory = new DefaultIndexWriterFactory(
+ extendedIndexHelper.getMountInfoProvider(),
+ dirFactory,
+
extendedIndexHelper.getLuceneIndexHelper().getWriterConfigForReindex(),
+ indexWriterPool);
}
@Override
@@ -59,17 +75,21 @@ public class LuceneIndexerProvider implements
NodeStateIndexerProvider {
return null;
}
- LuceneIndexDefinition idxDefinition =
LuceneIndexDefinition.newLuceneBuilder(root, definition.getNodeState(),
indexPath).reindex().build();
+ LuceneIndexDefinition idxDefinition =
LuceneIndexDefinition.newLuceneBuilder(root, definition.getNodeState(),
indexPath)
+ .reindex()
+ .build();
LuceneIndexWriter indexWriter =
indexWriterFactory.newInstance(idxDefinition, definition, null, true);
FulltextBinaryTextExtractor textExtractor = new
FulltextBinaryTextExtractor(textCache, idxDefinition, true);
- return new LuceneIndexer(
+ LuceneIndexer indexer = new LuceneIndexer(
idxDefinition,
indexWriter,
definition,
textExtractor,
progressReporter
);
+ indexWriters.add(indexer);
+ return indexer;
}
@Override
@@ -79,6 +99,14 @@ public class LuceneIndexerProvider implements
NodeStateIndexerProvider {
@Override
public void close() throws IOException {
-
+ if (closed.compareAndSet(false, true)) {
+ for (LuceneIndexer indexer : indexWriters) {
+ indexer.close();
+ }
+ indexWriterFactory.close();
+ if (indexWriterPool != null) {
+ indexWriterPool.close();
+ }
+ }
}
}