This is an automated email from the ASF dual-hosted git repository.

nfsantos pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 96a86e5420 OAK-11438 - Add support for parallelization in the Lucene 
index writer backend. (#2031)
96a86e5420 is described below

commit 96a86e5420fb9da57256a12ec0c4f7206bdae27b
Author: Nuno Santos <[email protected]>
AuthorDate: Tue Feb 18 08:23:34 2025 +0100

    OAK-11438 - Add support for parallelization in the Lucene index writer 
backend. (#2031)
---
 .../index/CompositeIndexEditorProvider.java        |   8 +
 .../oak/plugins/index/IndexEditorProvider.java     |   7 +-
 .../index/lucene/LuceneIndexEditorProvider.java    |  39 +-
 .../lucene/writer/DefaultIndexWriterFactory.java   |  42 ++-
 .../index/lucene/writer/IndexWriterPool.java       | 407 +++++++++++++++++++++
 .../lucene/writer/PooledLuceneIndexWriter.java     |  78 ++++
 .../reader/DefaultIndexReaderFactoryTest.java      |  40 +-
 .../index/lucene/writer/IndexWriterPoolTest.java   | 172 +++++++++
 .../jackrabbit/oak/index/OutOfBandIndexerBase.java |  56 +--
 .../indexer/document/DocumentStoreIndexerBase.java |  11 +-
 .../oak/index/indexer/document/LuceneIndexer.java  |  17 +-
 .../indexer/document/LuceneIndexerProvider.java    |  38 +-
 12 files changed, 852 insertions(+), 63 deletions(-)

diff --git 
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/CompositeIndexEditorProvider.java
 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/CompositeIndexEditorProvider.java
index 531540d8e6..c9127014a5 100644
--- 
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/CompositeIndexEditorProvider.java
+++ 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/CompositeIndexEditorProvider.java
@@ -16,6 +16,7 @@
  */
 package org.apache.jackrabbit.oak.plugins.index;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -67,4 +68,11 @@ public class CompositeIndexEditorProvider implements 
IndexEditorProvider {
         }
         return CompositeEditor.compose(indexes);
     }
+
+    @Override
+    public void close() throws IOException {
+        for (IndexEditorProvider provider : providers) {
+            provider.close();
+        }
+    }
 }
diff --git 
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexEditorProvider.java
 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexEditorProvider.java
index 942f600c46..9705f8e95a 100644
--- 
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexEditorProvider.java
+++ 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexEditorProvider.java
@@ -23,12 +23,14 @@ import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
+import java.io.IOException;
+
 /**
  * Extension point for plugging in different kinds of IndexEditor providers.
  * 
  * @see IndexEditor
  */
-public interface IndexEditorProvider {
+public interface IndexEditorProvider extends AutoCloseable {
 
     /**
      * Each provider knows how to produce a certain type of index. If the
@@ -56,4 +58,7 @@ public interface IndexEditorProvider {
             @NotNull String type, @NotNull NodeBuilder definition,
             @NotNull NodeState root, 
             @NotNull IndexUpdateCallback callback) throws 
CommitFailedException;
+
+    default void close() throws IOException {
+    }
 }
diff --git 
a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java
 
b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java
index 4a2d3006a5..e1b818ce7f 100644
--- 
a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java
+++ 
b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java
@@ -33,6 +33,7 @@ import 
org.apache.jackrabbit.oak.plugins.index.lucene.property.LuceneIndexProper
 import 
org.apache.jackrabbit.oak.plugins.index.lucene.property.PropertyIndexUpdateCallback;
 import org.apache.jackrabbit.oak.plugins.index.lucene.property.PropertyQuery;
 import 
org.apache.jackrabbit.oak.plugins.index.lucene.writer.DefaultIndexWriterFactory;
+import org.apache.jackrabbit.oak.plugins.index.lucene.writer.IndexWriterPool;
 import 
org.apache.jackrabbit.oak.plugins.index.lucene.writer.LuceneIndexWriterConfig;
 import 
org.apache.jackrabbit.oak.plugins.index.search.CompositePropertyUpdateCallback;
 import org.apache.jackrabbit.oak.plugins.index.search.ExtractedTextCache;
@@ -68,11 +69,13 @@ import static 
org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstant
  *
  * @see LuceneIndexEditor
  * @see IndexEditorProvider
- *
  */
 public class LuceneIndexEditorProvider implements IndexEditorProvider {
     private final static Logger LOG = 
LoggerFactory.getLogger(LuceneIndexEditorProvider.class);
 
+    public static final String OAK_INDEXER_EDITOR_PARALLEL_WRITER_ENABLED = 
"oak.indexer.editor.parallelWriter.enabled";
+
+
     private final IndexCopier indexCopier;
     private final ExtractedTextCache extractedTextCache;
     private final IndexAugmentorFactory augmentorFactory;
@@ -81,6 +84,7 @@ public class LuceneIndexEditorProvider implements 
IndexEditorProvider {
     private final ActiveDeletedBlobCollector activeDeletedBlobCollector;
     private final LuceneIndexMBean mbean;
     private final StatisticsProvider statisticsProvider;
+    private final IndexWriterPool indexWriterPool;
 
     private GarbageCollectableBlobStore blobStore;
     private IndexingQueue indexingQueue;
@@ -141,6 +145,10 @@ public class LuceneIndexEditorProvider implements 
IndexEditorProvider {
         this.activeDeletedBlobCollector = activeDeletedBlobCollector;
         this.mbean = mbean;
         this.statisticsProvider = statisticsProvider;
+
+        boolean parallelIndexingEnabled = 
ConfigHelper.getSystemPropertyAsBoolean(
+                OAK_INDEXER_EDITOR_PARALLEL_WRITER_ENABLED, false);
+        this.indexWriterPool = parallelIndexingEnabled ? new IndexWriterPool() 
: null;
     }
 
     public LuceneIndexEditorProvider 
withAsyncIndexesSizeStatsUpdate(AsyncIndexesSizeStatsUpdate 
asyncIndexesSizeStatsUpdate) {
@@ -150,13 +158,13 @@ public class LuceneIndexEditorProvider implements 
IndexEditorProvider {
 
     @Override
     public Editor getIndexEditor(
-        @NotNull String type, @NotNull NodeBuilder definition, @NotNull 
NodeState root,
-        @NotNull IndexUpdateCallback callback)
+            @NotNull String type, @NotNull NodeBuilder definition, @NotNull 
NodeState root,
+            @NotNull IndexUpdateCallback callback)
             throws CommitFailedException {
         if (TYPE_LUCENE.equals(type)) {
             checkArgument(callback instanceof ContextAwareCallback,
                     "callback instance not of type ContextAwareCallback [%s]", 
callback);
-            IndexingContext indexingContext = 
((ContextAwareCallback)callback).getIndexingContext();
+            IndexingContext indexingContext = ((ContextAwareCallback) 
callback).getIndexingContext();
             BlobDeletionCallback blobDeletionCallback = 
activeDeletedBlobCollector.getBlobDeletionCallback();
             indexingContext.registerIndexCommitCallback(blobDeletionCallback);
             FulltextIndexWriterFactory writerFactory = null;
@@ -170,12 +178,12 @@ public class LuceneIndexEditorProvider implements 
IndexEditorProvider {
 
                 //Would not participate in reindexing. Only interested in
                 //incremental indexing
-                if (indexingContext.isReindexing()){
+                if (indexingContext.isReindexing()) {
                     return null;
                 }
 
                 CommitContext commitContext = 
getCommitContext(indexingContext);
-                if (commitContext == null){
+                if (commitContext == null) {
                     //Logically there should not be any commit without commit 
context. But
                     //some initializer code does the commit with out it. So 
ignore such calls with
                     //warning now
@@ -192,9 +200,9 @@ public class LuceneIndexEditorProvider implements 
IndexEditorProvider {
                 //IndexDefinition from tracker might differ from one passed 
here for reindexing
                 //case which should be fine. However reusing existing 
definition would avoid
                 //creating definition instance for each commit as this gets 
executed for each commit
-                if (indexTracker != null){
+                if (indexTracker != null) {
                     indexDefinition = 
indexTracker.getIndexDefinition(indexPath);
-                    if (indexDefinition != null && 
!indexDefinition.hasMatchingNodeTypeReg(root)){
+                    if (indexDefinition != null && 
!indexDefinition.hasMatchingNodeTypeReg(root)) {
                         LOG.debug("Detected change in NodeType registry for 
index {}. Would not use " +
                                 "existing index definition", 
indexDefinition.getIndexPath());
                         indexDefinition = null;
@@ -231,7 +239,8 @@ public class LuceneIndexEditorProvider implements 
IndexEditorProvider {
 
                 writerFactory = new 
DefaultIndexWriterFactory(mountInfoProvider,
                         newDirectoryFactory(blobDeletionCallback, 
cowDirectoryCleanupCallback),
-                        writerConfig);
+                        writerConfig,
+                        indexWriterPool);
             }
 
             LuceneIndexEditorContext context = new 
LuceneIndexEditorContext(root, definition, indexDefinition, callback,
@@ -278,7 +287,7 @@ public class LuceneIndexEditorProvider implements 
IndexEditorProvider {
         return new DefaultDirectoryFactory(indexCopier, blobStore, 
blobDeletionCallback, cowDirectoryTracker);
     }
 
-    private LuceneDocumentHolder getDocumentHolder(CommitContext 
commitContext){
+    private LuceneDocumentHolder getDocumentHolder(CommitContext 
commitContext) {
         LuceneDocumentHolder holder = (LuceneDocumentHolder) 
commitContext.get(LuceneDocumentHolder.NAME);
         if (holder == null) {
             holder = new LuceneDocumentHolder(indexingQueue, 
inMemoryDocsLimit);
@@ -308,6 +317,14 @@ public class LuceneIndexEditorProvider implements 
IndexEditorProvider {
         return nrtIndexingEnabled;
     }
 
+    @Override
+    public void close() {
+        LOG.info("Closing LuceneIndexEditorProvider");
+        if (indexWriterPool != null) {
+            indexWriterPool.close();
+        }
+    }
+
     private static CommitContext getCommitContext(IndexingContext 
indexingContext) {
         return (CommitContext) 
indexingContext.getCommitInfo().getInfo().get(CommitContext.NAME);
     }
@@ -331,7 +348,7 @@ public class LuceneIndexEditorProvider implements 
IndexEditorProvider {
                 }
 
                 for (File f : reindexingLocalDirectories) {
-                    if ( ! FileUtils.deleteQuietly(f)) {
+                    if (!FileUtils.deleteQuietly(f)) {
                         LOG.warn("Failed to delete {}", f);
                     }
                 }
diff --git 
a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/DefaultIndexWriterFactory.java
 
b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/DefaultIndexWriterFactory.java
index ba2639244b..ac15919c9b 100644
--- 
a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/DefaultIndexWriterFactory.java
+++ 
b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/DefaultIndexWriterFactory.java
@@ -28,19 +28,33 @@ import 
org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.mount.MountInfoProvider;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static java.util.Objects.requireNonNull;
 
 public class DefaultIndexWriterFactory implements LuceneIndexWriterFactory {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DefaultIndexWriterFactory.class);
+
     private final MountInfoProvider mountInfoProvider;
     private final DirectoryFactory directoryFactory;
     private final LuceneIndexWriterConfig writerConfig;
+    private final IndexWriterPool indexWriterPool;
 
     public DefaultIndexWriterFactory(MountInfoProvider mountInfoProvider,
-                                     DirectoryFactory directoryFactory, 
LuceneIndexWriterConfig writerConfig) {
+                                     DirectoryFactory directoryFactory,
+                                     LuceneIndexWriterConfig writerConfig) {
+        this(mountInfoProvider, directoryFactory, writerConfig, null);
+    }
+
+    public DefaultIndexWriterFactory(MountInfoProvider mountInfoProvider,
+                                     DirectoryFactory directoryFactory,
+                                     LuceneIndexWriterConfig writerConfig,
+                                     IndexWriterPool indexWriterPool) {
         this.mountInfoProvider = requireNonNull(mountInfoProvider);
         this.directoryFactory = requireNonNull(directoryFactory);
         this.writerConfig = requireNonNull(writerConfig);
+        this.indexWriterPool = indexWriterPool;
     }
 
     @Override
@@ -50,14 +64,30 @@ public class DefaultIndexWriterFactory implements 
LuceneIndexWriterFactory {
                 "Expected %s but found %s for index definition",
                 LuceneIndexDefinition.class, def.getClass());
 
-        LuceneIndexDefinition definition = (LuceneIndexDefinition)def;
+        LuceneIndexDefinition definition = (LuceneIndexDefinition) def;
 
-        if (mountInfoProvider.hasNonDefaultMounts()){
-            return new MultiplexingIndexWriter(directoryFactory, 
mountInfoProvider, definition,
-                    definitionBuilder, reindex, writerConfig);
+        if (mountInfoProvider.hasNonDefaultMounts()) {
+            return wrapWithPipelinedIndexWriter(
+                    new MultiplexingIndexWriter(directoryFactory, 
mountInfoProvider, definition, definitionBuilder, reindex, writerConfig),
+                    definition.getIndexName());
         }
-        return new DefaultIndexWriter(definition, definitionBuilder, 
directoryFactory,
+        DefaultIndexWriter writer = new DefaultIndexWriter(definition, 
definitionBuilder, directoryFactory,
                 FulltextIndexConstants.INDEX_DATA_CHILD_NAME,
                 LuceneIndexConstants.SUGGEST_DATA_CHILD_NAME, reindex, 
writerConfig);
+
+        return wrapWithPipelinedIndexWriter(writer, definition.getIndexName());
+    }
+
+    public void close() {
+        LOG.debug("Closing LuceneIndexWriterFactory");
+        if (indexWriterPool == null) {
+            LOG.debug("Not using an Index writer pool");
+        } else {
+            indexWriterPool.close();
+        }
+    }
+
+    private LuceneIndexWriter wrapWithPipelinedIndexWriter(LuceneIndexWriter 
writer, String indexName) {
+        return indexWriterPool == null ? writer : new 
PooledLuceneIndexWriter(indexWriterPool, writer, indexName);
     }
 }
diff --git 
a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/IndexWriterPool.java
 
b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/IndexWriterPool.java
new file mode 100644
index 0000000000..a76e2ae8fc
--- /dev/null
+++ 
b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/IndexWriterPool.java
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.lucene.writer;
+
+import org.apache.jackrabbit.guava.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
+import org.apache.jackrabbit.oak.plugins.index.ConfigHelper;
+import org.apache.jackrabbit.oak.plugins.index.FormattingUtils;
+import org.apache.lucene.index.IndexableField;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+
+public class IndexWriterPool {
+    private final static Logger LOG = 
LoggerFactory.getLogger(IndexWriterPool.class);
+
+    public static final String OAK_INDEXER_PARALLEL_WRITER_MAX_BATCH_SIZE = 
"oak.indexer.parallelWriter.maxBatchSize";
+    public static final int DEFAULT_OAK_INDEXER_PARALLEL_WRITER_MAX_BATCH_SIZE 
= 256;
+
+    public static final String OAK_INDEXER_PARALLEL_WRITER_QUEUE_SIZE = 
"oak.indexer.parallelWriter.queueSize";
+    public static final int DEFAULT_OAK_INDEXER_PARALLEL_WRITER_QUEUE_SIZE = 
64;
+
+    public static final String OAK_INDEXER_PARALLEL_WRITER_NUMBER_THREADS = 
"oak.indexer.parallelWriter.numberThreads";
+    public static final int DEFAULT_OAK_INDEXER_PARALLEL_WRITER_NUMBER_THREADS 
= 4;
+
+    private final int maxBatchSize = 
ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PARALLEL_WRITER_MAX_BATCH_SIZE, 
DEFAULT_OAK_INDEXER_PARALLEL_WRITER_MAX_BATCH_SIZE);
+    private final int queueSize = 
ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PARALLEL_WRITER_QUEUE_SIZE, 
DEFAULT_OAK_INDEXER_PARALLEL_WRITER_QUEUE_SIZE);
+    private final int numberOfThreads = 
ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PARALLEL_WRITER_NUMBER_THREADS, 
DEFAULT_OAK_INDEXER_PARALLEL_WRITER_NUMBER_THREADS);
+
+    // The batch of operations that will be sent to the workers.
+    // Batching individual operations reduces the overhead of synchronization 
and context switching.
+    private final ArrayList<Operation> batch = new ArrayList<>(maxBatchSize);
+    // Shared queue between producer and workers
+    private final BlockingQueue<OperationBatch> queue = new 
ArrayBlockingQueue<>(queueSize);
+    private final List<Worker> workers;
+    private final List<Future<?>> workerFutures;
+    private final ExecutorService writersPool;
+    // Used to schedule a task that periodically prints statistics
+    private final ScheduledExecutorService monitorTaskExecutor;
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+
+    // Used to keep track of the sequence number of the batches that are 
currently being processed.
+    // This is used to wait until all operations for a writer are processed 
before closing it.
+    private final Object pendingBatchesLock = new Object();
+    private final HashSet<Long> pendingBatches = new HashSet<>();
+    private long batchSequenceNumber = 0;
+
+    // Statistics
+    private final long startTimeNanos = System.nanoTime();
+    private long updateCount = 0;
+    private long deleteCount = 0;
+    private long totalEnqueueTimeNanos = 0;
+
+    private static class OperationBatch {
+        final long sequenceNumber;
+        final Operation[] operations;
+
+        public OperationBatch(long sequenceNumber, Operation[] operations) {
+            Objects.requireNonNull(operations);
+            this.sequenceNumber = sequenceNumber;
+            this.operations = operations;
+        }
+    }
+
+    private static abstract class Operation {
+        final LuceneIndexWriter delegate;
+
+        public Operation(LuceneIndexWriter delegate) {
+            this.delegate = delegate;
+        }
+
+        abstract void execute() throws IOException;
+    }
+
+    private static class UpdateOperation extends Operation {
+        private final String path;
+        private final Iterable<? extends IndexableField> doc;
+
+        UpdateOperation(LuceneIndexWriter delegate, String path, Iterable<? 
extends IndexableField> doc) {
+            super(delegate);
+            this.path = path;
+            this.doc = doc;
+        }
+
+        @Override
+        public void execute() throws IOException {
+            delegate.updateDocument(path, doc);
+        }
+    }
+
+    private static class DeleteOperation extends Operation {
+        private final String path;
+
+        DeleteOperation(LuceneIndexWriter delegate, String path) {
+            super(delegate);
+            this.path = path;
+        }
+
+        @Override
+        public void execute() throws IOException {
+            delegate.deleteDocuments(path);
+        }
+    }
+
+    private static class CloseResult {
+        // Either result or error is non-null. The two constructors enforce 
this invariant.
+        final Boolean result;
+        final Throwable error;
+
+        CloseResult(boolean result) {
+            this.result = result;
+            this.error = null;
+        }
+
+        CloseResult(Throwable error) {
+            this.result = null;
+            this.error = error;
+        }
+
+        @Override
+        public String toString() {
+            return "CloseResult{" +
+                    "result=" + result +
+                    ", error=" + error +
+                    '}';
+        }
+    }
+
+    private static class CloseWriterOperation extends Operation {
+        private final long timestamp;
+        private final SynchronousQueue<CloseResult> sync;
+
+        /**
+         * The close operation should be synchronous and applied only after 
all the write operations for this writer
+         * are processed.
+         *
+         * @param sync A synchronous queue used to wait for the result of the 
close operation.
+         */
+        CloseWriterOperation(LuceneIndexWriter delegate, long timestamp, 
SynchronousQueue<CloseResult> sync) {
+            super(delegate);
+            this.timestamp = timestamp;
+            this.sync = sync;
+        }
+
+        @Override
+        public void execute() {
+            try {
+                try {
+                    boolean closeResult = delegate.close(timestamp);
+                    sync.put(new CloseResult(closeResult));
+                } catch (IOException e) {
+                    sync.put(new CloseResult(e));
+                }
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    // Sentinel used to terminate worker threads
+    final static OperationBatch SHUTDOWN = new OperationBatch(-1, new 
Operation[0]);
+
+    private class Worker implements Runnable {
+        private final long startTime = System.nanoTime();
+        private final int id;
+        private long opCount = 0;
+        private long batchesProcessed = 0;
+        private long totalBusyTime = 0;
+
+        public Worker(int id) {
+            this.id = id;
+        }
+
+        @Override
+        public void run() {
+            String oldName = Thread.currentThread().getName();
+            Thread.currentThread().setName("index-writer-" + id);
+            try {
+                LOG.info("[{}] Worker started", id);
+                while (true) {
+                    OperationBatch op = queue.take();
+                    if (op == SHUTDOWN) {
+                        queue.add(SHUTDOWN);
+                        LOG.info("[{}] Shutting down worker", id);
+                        printStatistics();
+                        return;
+                    }
+                    long start = System.nanoTime();
+                    for (Operation operation : op.operations) {
+                        operation.execute();
+                        opCount++;
+                    }
+                    batchesProcessed++;
+                    long durationNanos = System.nanoTime() - start;
+                    totalBusyTime += durationNanos;
+                    long durationMillis = durationNanos / 1_000_000;
+                    if (durationMillis > 1000) {
+                        LOG.info("[{}] Processing batch {} of size {} took {} 
millis.",
+                                id, op.sequenceNumber, op.operations.length, 
durationMillis);
+                    }
+                    synchronized (pendingBatchesLock) {
+                        pendingBatches.remove(op.sequenceNumber);
+                        pendingBatchesLock.notifyAll();
+                    }
+                }
+            } catch (InterruptedException e) {
+                LOG.warn("[{}] Interrupted while waiting for an index write 
operation", id, e);
+                throw new RuntimeException(e);
+            } catch (Throwable t) {
+                LOG.error("[{}] Error while processing an index write 
operation", id, t);
+                throw new RuntimeException(t);
+            } finally {
+                Thread.currentThread().setName(oldName);
+            }
+        }
+
+        void printStatistics() {
+            double busyTimePercentage = 
FormattingUtils.safeComputePercentage(totalBusyTime, System.nanoTime() - 
startTime);
+            String busyTimePercentageStr = String.format("%.2f", 
busyTimePercentage);
+            LOG.info("[{}] operationsProcessed: {}, batchesProcessed: {}, 
busyTime: {} ms ({}%)",
+                    id, opCount, batchesProcessed, totalBusyTime / 1_000_000, 
busyTimePercentageStr);
+        }
+    }
+
+    /**
+     * Creates and starts a pool of writer threads.
+     * <p>
+     * WARN: This is not thread safe.
+     */
+    public IndexWriterPool() {
+        this.writersPool = Executors.newFixedThreadPool(numberOfThreads, new 
ThreadFactoryBuilder()
+                .setDaemon(true)
+                .build());
+        this.monitorTaskExecutor = 
Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat("index-writer-monitor")
+                .build());
+        this.workers = IntStream.range(0, numberOfThreads)
+                .mapToObj(Worker::new)
+                .collect(Collectors.toList());
+        this.workerFutures = workers.stream()
+                .map(writersPool::submit)
+                .collect(Collectors.toList());
+        monitorTaskExecutor.scheduleAtFixedRate(this::printStatistics, 30, 30, 
TimeUnit.SECONDS);
+        LOG.info("Writing thread started");
+    }
+
+    public void updateDocument(LuceneIndexWriter writer, String path, 
Iterable<? extends IndexableField> doc) throws IOException {
+        checkOpen();
+        this.updateCount++;
+        enqueueOperation(new UpdateOperation(writer, path, doc));
+    }
+
+    public void deleteDocuments(LuceneIndexWriter writer, String path) throws 
IOException {
+        checkOpen();
+        this.deleteCount++;
+        enqueueOperation(new DeleteOperation(writer, path));
+    }
+
+    public boolean closeWriter(LuceneIndexWriter writer, long timestamp) 
throws IOException {
+        checkOpen();
+        try {
+            LOG.info("Closing writer: {}", writer);
+            // Before closing the writer, we must wait until all previously 
submitted operations for
+            // this writer are processed. For simplicity, we wait instead 
until ALL operations currently
+            // in the queue are processed, because otherwise it would be more 
complex to distinguish which
+            // operations are for which writer.
+            long seqNumber = flushBatch();
+            LOG.info("All pending operations enqueued. Waiting until all 
batches up to {} are processed", seqNumber);
+            synchronized (pendingBatchesLock) {
+                while (true) {
+                    Long earliestPending = pendingBatches.isEmpty() ? null : 
pendingBatches.stream().min(Long::compareTo).get();
+                    LOG.debug("Earliest pending batch: {}. Waiting until all 
batches up to {} are processed", earliestPending, seqNumber);
+                    if (earliestPending == null || earliestPending > 
seqNumber) {
+                        break;
+                    }
+                    pendingBatchesLock.wait();
+                }
+            }
+            LOG.info("All batches up to {} processed. Enqueuing close 
operation for writer {}", seqNumber, writer);
+            SynchronousQueue<CloseResult> closeOpSync = new 
SynchronousQueue<>();
+            batch.add(new CloseWriterOperation(writer, timestamp, 
closeOpSync));
+            flushBatch();
+            CloseResult res = closeOpSync.take();
+            LOG.info("Writer {} closed. Result: {}", writer, res);
+            if (res.error == null) {
+                return res.result;
+            } else {
+                throw new IOException("Error while closing writer", res.error);
+            }
+        } catch (InterruptedException e) {
+            LOG.warn("Interrupted while waiting for the worker to finish", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void close() {
+        if (closed.compareAndSet(false, true)) {
+            flushBatch();
+            queue.add(SHUTDOWN);
+            LOG.info("Shutting down PipelinedIndexWriter. Total enqueue time: 
{} ms", totalEnqueueTimeNanos / 1_000_000);
+            for (Future<?> f : workerFutures) {
+                LOG.info("Waiting for future: {}", f);
+                try {
+                    f.get();
+                } catch (InterruptedException | ExecutionException e) {
+                    LOG.info("Error while waiting for future", e);
+                }
+            }
+            printStatistics();
+            new ExecutorCloser(writersPool, 1, TimeUnit.SECONDS).close();
+            new ExecutorCloser(monitorTaskExecutor, 1, 
TimeUnit.SECONDS).close();
+        } else {
+            LOG.warn("PipelinedIndexWriter already closed");
+        }
+    }
+
+    private void enqueueOperation(Operation op) {
+        batch.add(op);
+        if (batch.size() == maxBatchSize) {
+            flushBatch();
+        }
+    }
+
+    private void checkOpen() {
+        if (closed.get()) {
+            throw new IllegalStateException("PipelinedIndexWriter is closed");
+        }
+    }
+
+    private long flushBatch() {
+        // Empty batches are also enqueued. This is necessary for the close 
writer operation, which requires all previous
+        // operations for the writer to be processed before the writer is 
closed. This means that all enqueued batches
+        // must be processed and the current partially built batch must also 
be enqueued and processed. To ensure that,
+        // the close operation will always enqueue the current batch and wait 
for all the batches older or equal to the
+        // newly enqueued batch to be processed. If there are no operations in 
the currently pending batch, we enqueue
+        // it anyway just to generate a new sequence number.
+        try {
+            long seqNumber;
+            synchronized (pendingBatchesLock) {
+                // Shared between producer and workers
+                seqNumber = batchSequenceNumber;
+                batchSequenceNumber++;
+                pendingBatches.add(seqNumber);
+            }
+            if (seqNumber % 1000 == 0) {
+                LOG.info("Enqueuing batch {}, size: {}", seqNumber, 
batch.size());
+            }
+            long start = System.nanoTime();
+            queue.put(new OperationBatch(seqNumber, batch.toArray(new 
Operation[0])));
+            long durationNanos = System.nanoTime() - start;
+            long durationMillis = durationNanos / 1_000_000;
+            totalEnqueueTimeNanos += durationNanos;
+            if (durationMillis > 1) {
+                LOG.info("Enqueuing batch delayed. Seq number: {}, size: {}. 
Delay: {} ms",
+                        seqNumber, batch.size(), durationMillis);
+            }
+            batch.clear();
+            return seqNumber;
+        } catch (InterruptedException e) {
+            LOG.warn("Interrupted while waiting to put an update operation in 
the queue", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void printStatistics() {
+        double percentageEnqueueTime = 
FormattingUtils.safeComputePercentage(totalEnqueueTimeNanos, System.nanoTime() 
- startTimeNanos);
+        String percentageEnqueueTimeStr = String.format("%.2f", 
percentageEnqueueTime);
+        LOG.info("updateCount: {}, deleteCount: {}, batchesEnqueuedCount: {}, 
pendingBatchesCount: {},  enqueueTime: {} ms ({}%)",
+                updateCount, deleteCount, batchSequenceNumber, 
pendingBatches.size(), totalEnqueueTimeNanos / 1_000_000, 
percentageEnqueueTimeStr);
+        workers.forEach(Worker::printStatistics);
+    }
+}
diff --git 
a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/PooledLuceneIndexWriter.java
 
b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/PooledLuceneIndexWriter.java
new file mode 100644
index 0000000000..fc922e9225
--- /dev/null
+++ 
b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/PooledLuceneIndexWriter.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.lucene.writer;
+
+import org.apache.lucene.index.IndexableField;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Index writer that delegates write operations to an IndexWriterPool.
+ */
+public class PooledLuceneIndexWriter implements LuceneIndexWriter {
+    private final static Logger LOG = 
LoggerFactory.getLogger(PooledLuceneIndexWriter.class);
+
+    private final String indexName;
+    private final LuceneIndexWriter delegateWriter;
+    private final IndexWriterPool writerPool;
+
+    private long updateCount = 0;
+    private long deleteCount = 0;
+
+    public PooledLuceneIndexWriter(IndexWriterPool writerPool, 
LuceneIndexWriter delegateWriter, String indexName) {
+        this.writerPool = writerPool;
+        this.delegateWriter = delegateWriter;
+        this.indexName = indexName;
+    }
+
+    @Override
+    public void updateDocument(String path, Iterable<? extends IndexableField> 
doc) throws IOException {
+        writerPool.updateDocument(delegateWriter, path, doc);
+        updateCount++;
+    }
+
+    @Override
+    public void deleteDocuments(String path) throws IOException {
+        writerPool.deleteDocuments(delegateWriter, path);
+        deleteCount++;
+    }
+
+    @Override
+    public boolean close(long timestamp) throws IOException {
+        LOG.info("[{}] Shutting down PipelinedFullTextIndexWriter", indexName);
+        return writerPool.closeWriter(delegateWriter, timestamp);
+    }
+
+    public long getUpdateCount() {
+        return updateCount;
+    }
+
+    public long getDeleteCount() {
+        return deleteCount;
+    }
+
+    public String formatStatistics() {
+        return "PooledLuceneIndexWriter(" + indexName + ")[" +
+                "updates: " + updateCount + ", " +
+                "deletes: " + deleteCount +
+                "]";
+    }
+}
diff --git 
a/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/reader/DefaultIndexReaderFactoryTest.java
 
b/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/reader/DefaultIndexReaderFactoryTest.java
index b617956781..6150499d48 100644
--- 
a/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/reader/DefaultIndexReaderFactoryTest.java
+++ 
b/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/reader/DefaultIndexReaderFactoryTest.java
@@ -32,6 +32,7 @@ import 
org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexWriterFactory;
 import 
org.apache.jackrabbit.oak.plugins.index.lucene.directory.DefaultDirectoryFactory;
 import 
org.apache.jackrabbit.oak.plugins.index.lucene.directory.DirectoryFactory;
 import 
org.apache.jackrabbit.oak.plugins.index.lucene.writer.DefaultIndexWriterFactory;
+import org.apache.jackrabbit.oak.plugins.index.lucene.writer.IndexWriterPool;
 import org.apache.jackrabbit.oak.plugins.index.lucene.writer.LuceneIndexWriter;
 import 
org.apache.jackrabbit.oak.plugins.index.lucene.writer.LuceneIndexWriterConfig;
 import org.apache.jackrabbit.oak.plugins.index.search.FieldNames;
@@ -41,25 +42,54 @@ import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.StringField;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import static org.apache.jackrabbit.oak.plugins.index.lucene.TestUtil.newDoc;
 import static 
org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
 import static org.apache.jackrabbit.oak.InitialContentHelper.INITIAL_CONTENT;
 import static org.junit.Assert.*;
 
+@RunWith(Parameterized.class)
 public class DefaultIndexReaderFactoryTest {
+    private final boolean parallelIndexing;
+    private IndexWriterPool indexWriterPool;
+
+    @Parameterized.Parameters(name = "Parallel Indexing: ({0})")
+    public static List<Boolean> parallelIndexingEnabled() {
+        return List.of(true, false);
+    }
+
     @Rule
     public TemporaryFolder folder = new TemporaryFolder(new File("target"));
 
-    private NodeState root = INITIAL_CONTENT;
-    private NodeBuilder builder = EMPTY_NODE.builder();
+    private final NodeState root = INITIAL_CONTENT;
+    private final NodeBuilder builder = EMPTY_NODE.builder();
     private LuceneIndexDefinition defn = new LuceneIndexDefinition(root, 
builder.getNodeState(), "/foo");
-    private MountInfoProvider mip = Mounts.newBuilder()
+    private final MountInfoProvider mip = Mounts.newBuilder()
             .mount("foo", "/libs", "/apps").build();
-    private LuceneIndexWriterConfig writerConfig = new 
LuceneIndexWriterConfig();
+    private final LuceneIndexWriterConfig writerConfig = new 
LuceneIndexWriterConfig();
+
+    public DefaultIndexReaderFactoryTest(boolean parallelIndexing) {
+        this.parallelIndexing = parallelIndexing;
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        indexWriterPool = parallelIndexing ? new IndexWriterPool() : null;
+    }
+
+    @After
+    public void tearDown() {
+        if (indexWriterPool != null) {
+            indexWriterPool.close();
+        }
+    }
 
     @Test
     public void emptyDir() throws Exception{
@@ -196,6 +226,6 @@ public class DefaultIndexReaderFactoryTest {
 
     private LuceneIndexWriterFactory newDirectoryFactory(){
         DirectoryFactory directoryFactory = new DefaultDirectoryFactory(null, 
null);
-        return new DefaultIndexWriterFactory(mip, directoryFactory, 
writerConfig);
+        return new DefaultIndexWriterFactory(mip, directoryFactory, 
writerConfig, indexWriterPool);
     }
 }
diff --git 
a/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/IndexWriterPoolTest.java
 
b/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/IndexWriterPoolTest.java
new file mode 100644
index 0000000000..2d94694ae7
--- /dev/null
+++ 
b/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/IndexWriterPoolTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.lucene.writer;
+
+import org.apache.jackrabbit.oak.plugins.index.lucene.TestUtil;
+import org.apache.jackrabbit.oak.plugins.index.search.FieldNames;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexableField;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.contrib.java.lang.system.ProvideSystemProperty;
+import org.junit.contrib.java.lang.system.RestoreSystemProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class IndexWriterPoolTest {
+    private final static Logger LOG = 
LoggerFactory.getLogger(IndexWriterPoolTest.class);
+
+    private static class TestWriter implements LuceneIndexWriter {
+        private final int delayMillis;
+        // The writers must be thread safe
+        final Set<String> deletedPaths = ConcurrentHashMap.newKeySet();
+        final Map<String, Document> docs = new ConcurrentHashMap<>();
+        boolean closed;
+
+        public TestWriter() {
+            this(0);
+        }
+
+        public TestWriter(int delayMillis) {
+            this.delayMillis = delayMillis;
+        }
+
+        private void delay() {
+            if (delayMillis > 0) {
+                try {
+                    LOG.info("Delaying {}", delayMillis);
+                    Thread.sleep(delayMillis);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+
+        @Override
+        public void updateDocument(String path, Iterable<? extends 
IndexableField> doc) {
+            delay();
+            docs.put(path, (Document) doc);
+        }
+
+        @Override
+        public void deleteDocuments(String path) {
+            delay();
+            deletedPaths.add(path);
+        }
+
+        @Override
+        public boolean close(long timestamp) {
+            delay();
+            closed = true;
+            return true;
+        }
+    }
+
+    @Rule
+    public final ProvideSystemProperty updateSystemProperties
+            = new 
ProvideSystemProperty(IndexWriterPool.OAK_INDEXER_PARALLEL_WRITER_MAX_BATCH_SIZE,
 "5")
+            .and(IndexWriterPool.OAK_INDEXER_PARALLEL_WRITER_NUMBER_THREADS, 
"8")
+            .and(IndexWriterPool.OAK_INDEXER_PARALLEL_WRITER_QUEUE_SIZE, 
"100");
+
+    @Rule
+    public final RestoreSystemProperties restoreSystemProperties = new 
RestoreSystemProperties();
+
+    @Test
+    public void testSingleWriter() throws IOException {
+        IndexWriterPool indexWriterPool = new IndexWriterPool();
+        TestWriter writer = new TestWriter();
+        Document doc = TestUtil.newDoc("value");
+        indexWriterPool.updateDocument(writer, "test", doc);
+        indexWriterPool.deleteDocuments(writer, "test");
+        boolean closeResult = indexWriterPool.closeWriter(writer, 30);
+        indexWriterPool.close();
+
+        assertTrue(writer.closed);
+        assertTrue(closeResult);
+        assertEquals(Set.of("test"), writer.deletedPaths);
+        assertEquals(Map.of("test", doc), writer.docs);
+    }
+
+    @Test
+    public void testMultipleWriters() throws IOException {
+        IndexWriterPool indexWriterPool = new IndexWriterPool();
+        List<TestWriter> writers = IntStream.range(0, 
5).mapToObj(TestWriter::new).collect(Collectors.toList());
+        List<Document> updateDocs = IntStream.range(0, 200).mapToObj(i -> 
TestUtil.newDoc("test-doc-" + i)).collect(Collectors.toList());
+
+        int i = 0;
+        for (var doc : updateDocs) {
+            indexWriterPool.updateDocument(writers.get(i%writers.size()), 
doc.get(FieldNames.PATH), doc);
+            i++;
+        }
+        for (var w : writers) {
+            indexWriterPool.closeWriter(w, 0);
+        }
+        indexWriterPool.close();
+
+        List<Document> documentsWritten = new ArrayList<>();
+        for (var w : writers) {
+            documentsWritten.addAll(w.docs.values());
+            LOG.info("w: {}, d: {}", w.docs.size(), w.deletedPaths.size());
+        }
+
+        assertEquals(updateDocs.size(), documentsWritten.size());
+        assertEquals(Set.copyOf(updateDocs), Set.copyOf(documentsWritten));
+        assertTrue(writers.stream().allMatch(w -> w.deletedPaths.isEmpty()));
+    }
+
+    @Test
+    public void testEnqueueAfterClose() throws IOException {
+        IndexWriterPool indexWriterPool = new IndexWriterPool();
+        indexWriterPool.close();
+        // Subsequent calls to close should be ignored
+        indexWriterPool.close();
+        // the following method should throw IllegalStateException
+        try {
+            indexWriterPool.updateDocument(new TestWriter(), "test", 
TestUtil.newDoc("value"));
+            fail("updateDocument did not throw expected exception");
+        } catch (IllegalStateException expected) {
+        }
+    }
+
+    @Test
+    public void testCloseWriterPoolWithoutClosingWriters() throws IOException {
+        IndexWriterPool indexWriterPool = new IndexWriterPool();
+        TestWriter writer = new TestWriter(100);
+        Document doc = TestUtil.newDoc("value");
+        indexWriterPool.updateDocument(writer, "test", doc);
+        indexWriterPool.deleteDocuments(writer, "test-deletion");
+        indexWriterPool.close();
+
+        assertEquals(Map.of("test", doc), writer.docs);
+        assertEquals(Set.of("test-deletion"), writer.deletedPaths);
+    }
+}
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/OutOfBandIndexerBase.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/OutOfBandIndexerBase.java
index 7b28f47215..63531ab4ef 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/OutOfBandIndexerBase.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/OutOfBandIndexerBase.java
@@ -136,34 +136,36 @@ public abstract class OutOfBandIndexerBase implements 
Closeable, IndexUpdateCall
     private void preformIndexUpdate(NodeState baseState) throws IOException, 
CommitFailedException {
         NodeBuilder builder = copyOnWriteStore.getRoot().builder();
 
-        IndexUpdate indexUpdate = new IndexUpdate(
-                createIndexEditorProvider(),
-                REINDEX_LANE,
-                copyOnWriteStore.getRoot(),
-                builder,
-                this,
-                this,
-                CommitInfo.EMPTY,
-                CorruptIndexHandler.NOOP
-        );
-
-        configureEstimators(indexUpdate);
-
-        //Do not use EmptyState as before otherwise the IndexUpdate would
-        //unnecessary traverse the whole repo post reindexing. With use of 
baseState
-        //It would only traverse the diff i.e. those index definitions paths
-        //whose lane has been changed
-        NodeState before = baseState;
-        NodeState after = copyOnWriteStore.getRoot();
-
-        CommitFailedException exception =
-                EditorDiff.process(VisibleEditor.wrap(indexUpdate), before, 
after);
-
-        if (exception != null) {
-            throw exception;
+        try (IndexEditorProvider provider = createIndexEditorProvider()) {
+            IndexUpdate indexUpdate = new IndexUpdate(
+                    provider,
+                    REINDEX_LANE,
+                    copyOnWriteStore.getRoot(),
+                    builder,
+                    this,
+                    this,
+                    CommitInfo.EMPTY,
+                    CorruptIndexHandler.NOOP
+            );
+
+            configureEstimators(indexUpdate);
+
+            //Do not use EmptyState as before otherwise the IndexUpdate would
+            //unnecessary traverse the whole repo post reindexing. With use of 
baseState
+            //It would only traverse the diff i.e. those index definitions 
paths
+            //whose lane has been changed
+            NodeState before = baseState;
+            NodeState after = copyOnWriteStore.getRoot();
+
+            CommitFailedException exception =
+                    EditorDiff.process(VisibleEditor.wrap(indexUpdate), 
before, after);
+
+            if (exception != null) {
+                throw exception;
+            }
+
+            copyOnWriteStore.merge(builder, EmptyHook.INSTANCE, 
CommitInfo.EMPTY);
         }
-
-        copyOnWriteStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
     }
 
     protected abstract IndexEditorProvider createIndexEditorProvider() throws 
IOException;
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
index 991e91526e..f2b22e34fa 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
@@ -413,12 +413,13 @@ public abstract class DocumentStoreIndexerBase implements 
Closeable {
                         }
                     }
                     log.info("Top slowest nodes to index (ms): {}", 
slowestTopKElements);
+                }
 
-                    indexerProviders.forEach(indexProvider -> {
-                        ExtractedTextCache extractedTextCache = 
indexProvider.getTextCache();
-                        CacheStats cacheStats = extractedTextCache == null ? 
null : extractedTextCache.getCacheStats();
-                        log.info("Text extraction cache statistics: {}", 
cacheStats == null ? "N/A" : cacheStats.cacheInfoAsString());
-                    });
+                for (NodeStateIndexerProvider indexerProvider : 
indexerProviders) {
+                    ExtractedTextCache extractedTextCache = 
indexerProvider.getTextCache();
+                    CacheStats cacheStats = extractedTextCache == null ? null 
: extractedTextCache.getCacheStats();
+                    log.info("Text extraction cache statistics: {}", 
cacheStats == null ? "N/A" : cacheStats.cacheInfoAsString());
+                    indexerProvider.close();
                 }
 
                 progressReporter.reindexingTraversalEnd();
diff --git 
a/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/LuceneIndexer.java
 
b/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/LuceneIndexer.java
index 373d41fb83..45626c85f9 100644
--- 
a/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/LuceneIndexer.java
+++ 
b/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/LuceneIndexer.java
@@ -21,6 +21,7 @@ package org.apache.jackrabbit.oak.index.indexer.document;
 
 import java.io.IOException;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
@@ -49,6 +50,7 @@ public class LuceneIndexer implements NodeStateIndexer, 
FacetsConfigProvider {
     private FacetsConfig facetsConfig;
 
     private final IndexerStatisticsTracker indexerStatisticsTracker = new 
IndexerStatisticsTracker(LOG);
+    private final AtomicBoolean closed = new AtomicBoolean(false);
 
     public LuceneIndexer(IndexDefinition definition, LuceneIndexWriter 
indexWriter,
                          NodeBuilder builder, FulltextBinaryTextExtractor 
binaryTextExtractor,
@@ -122,9 +124,13 @@ public class LuceneIndexer implements NodeStateIndexer, 
FacetsConfigProvider {
 
     @Override
     public void close() throws IOException {
-        LOG.info("[{}] Statistics: {}", definition.getIndexName(), 
indexerStatisticsTracker.formatStats());
-        binaryTextExtractor.logStats();
-        indexWriter.close(System.currentTimeMillis());
+        if (closed.compareAndSet(false, true)) {
+            indexWriter.close(System.currentTimeMillis());
+            LOG.info("[{}] Statistics: {}", definition.getIndexName(), 
indexerStatisticsTracker.formatStats());
+            binaryTextExtractor.logStats();
+        } else {
+            LOG.debug("Indexer already closed: {}", definition.getIndexName());
+        }
     }
 
     private void writeToIndex(Document doc, String path) throws IOException {
@@ -152,4 +158,9 @@ public class LuceneIndexer implements NodeStateIndexer, 
FacetsConfigProvider {
         return facetsConfig;
     }
 
+
+    @Override
+    public String toString() {
+        return definition.toString();
+    }
 }
diff --git 
a/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/LuceneIndexerProvider.java
 
b/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/LuceneIndexerProvider.java
index 2cc235b7f2..0d6a182f61 100644
--- 
a/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/LuceneIndexerProvider.java
+++ 
b/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/LuceneIndexerProvider.java
@@ -20,15 +20,19 @@
 package org.apache.jackrabbit.oak.index.indexer.document;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.jackrabbit.oak.index.ExtendedIndexHelper;
 import org.apache.jackrabbit.oak.index.IndexerSupport;
+import org.apache.jackrabbit.oak.plugins.index.ConfigHelper;
 import org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexDefinition;
 import 
org.apache.jackrabbit.oak.plugins.index.lucene.directory.DirectoryFactory;
 import 
org.apache.jackrabbit.oak.plugins.index.lucene.directory.FSDirectoryFactory;
 import 
org.apache.jackrabbit.oak.plugins.index.lucene.writer.DefaultIndexWriterFactory;
+import org.apache.jackrabbit.oak.plugins.index.lucene.writer.IndexWriterPool;
 import org.apache.jackrabbit.oak.plugins.index.lucene.writer.LuceneIndexWriter;
 import 
org.apache.jackrabbit.oak.plugins.index.progress.IndexingProgressReporter;
 import org.apache.jackrabbit.oak.plugins.index.search.ExtractedTextCache;
@@ -41,14 +45,26 @@ import static 
org.apache.jackrabbit.oak.plugins.index.IndexConstants.TYPE_PROPER
 import static 
org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.TYPE_LUCENE;
 
 public class LuceneIndexerProvider implements NodeStateIndexerProvider {
+
+    public static final String OAK_INDEXER_DOCUMENT_PARALLEL_WRITER_ENABLED = 
"oak.indexer.document.parallelWriter.enabled";
+
     private final ExtractedTextCache textCache =
             new ExtractedTextCache(FileUtils.ONE_MB * 5, 
TimeUnit.HOURS.toSeconds(5));
     private final DefaultIndexWriterFactory indexWriterFactory;
+    private final ArrayList<LuceneIndexer> indexWriters = new ArrayList<>();
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final IndexWriterPool indexWriterPool;
 
     public LuceneIndexerProvider(ExtendedIndexHelper extendedIndexHelper, 
IndexerSupport indexerSupport) throws IOException {
         DirectoryFactory dirFactory = new 
FSDirectoryFactory(indexerSupport.getLocalIndexDir());
-        this.indexWriterFactory = new 
DefaultIndexWriterFactory(extendedIndexHelper.getMountInfoProvider(),
-                dirFactory, 
extendedIndexHelper.getLuceneIndexHelper().getWriterConfigForReindex());
+        boolean parallelIndexingEnabled = 
ConfigHelper.getSystemPropertyAsBoolean(
+                OAK_INDEXER_DOCUMENT_PARALLEL_WRITER_ENABLED, false);
+        this.indexWriterPool = parallelIndexingEnabled? new IndexWriterPool() 
: null;
+        this.indexWriterFactory = new DefaultIndexWriterFactory(
+                extendedIndexHelper.getMountInfoProvider(),
+                dirFactory,
+                
extendedIndexHelper.getLuceneIndexHelper().getWriterConfigForReindex(),
+                indexWriterPool);
     }
 
     @Override
@@ -59,17 +75,21 @@ public class LuceneIndexerProvider implements 
NodeStateIndexerProvider {
             return null;
         }
 
-        LuceneIndexDefinition idxDefinition = 
LuceneIndexDefinition.newLuceneBuilder(root, definition.getNodeState(), 
indexPath).reindex().build();
+        LuceneIndexDefinition idxDefinition = 
LuceneIndexDefinition.newLuceneBuilder(root, definition.getNodeState(), 
indexPath)
+                .reindex()
+                .build();
 
         LuceneIndexWriter indexWriter = 
indexWriterFactory.newInstance(idxDefinition, definition, null, true);
         FulltextBinaryTextExtractor textExtractor = new 
FulltextBinaryTextExtractor(textCache, idxDefinition, true);
-        return new LuceneIndexer(
+        LuceneIndexer indexer = new LuceneIndexer(
                 idxDefinition,
                 indexWriter,
                 definition,
                 textExtractor,
                 progressReporter
         );
+        indexWriters.add(indexer);
+        return indexer;
     }
 
     @Override
@@ -79,6 +99,14 @@ public class LuceneIndexerProvider implements 
NodeStateIndexerProvider {
 
     @Override
     public void close() throws IOException {
-
+        if (closed.compareAndSet(false, true)) {
+            for (LuceneIndexer indexer : indexWriters) {
+                indexer.close();
+            }
+            indexWriterFactory.close();
+            if (indexWriterPool != null) {
+                indexWriterPool.close();
+            }
+        }
     }
 }

Reply via email to