This is an automated email from the ASF dual-hosted git repository.

nfsantos pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8fc92a1ca7 OAK-11061 - Indexing job: during indexing phase, download 
blobs ahead of time in separate thread pool (#1669)
8fc92a1ca7 is described below

commit 8fc92a1ca712bc125f3b11b3a6e9961740cbd9ac
Author: Nuno Santos <[email protected]>
AuthorDate: Mon Sep 2 16:54:21 2024 +0200

    OAK-11061 - Indexing job: during indexing phase, download blobs ahead of 
time in separate thread pool (#1669)
---
 .../indexer/document/DocumentStoreIndexerBase.java |  38 +-
 .../flatfile/AheadOfTimeBlobDownloader.java        |  42 +++
 .../AheadOfTimeBlobDownloaderThrottler.java        | 215 ++++++++++++
 .../AheadOfTimeBlobDownloadingFlatFileStore.java   | 169 +++++++++
 .../flatfile/DefaultAheadOfTimeBlobDownloader.java | 390 +++++++++++++++++++++
 .../flatfile/FlatFileNodeStoreBuilder.java         |  35 +-
 .../indexer/document/flatfile/FlatFileStore.java   |   8 +
 .../document/flatfile/NodeStateEntryReader.java    |   2 +-
 .../AheadOfTimeBlobDownloaderThrottlerTest.java    | 185 ++++++++++
 ...headOfTimeBlobDownloadingFlatFileStoreTest.java |  68 ++++
 .../flatfile/FlatFileNodeStoreBuilderTest.java     |  11 +-
 .../document/flatfile/FlatFileStoreTest.java       |   3 +-
 .../apache/jackrabbit/oak/index/IndexCommand.java  |  28 +-
 13 files changed, 1153 insertions(+), 41 deletions(-)

diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
index 1d76d920fa..95c87c5699 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
@@ -30,7 +30,6 @@ import 
org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
 import org.apache.jackrabbit.oak.index.IndexHelper;
 import org.apache.jackrabbit.oak.index.IndexerSupport;
 import 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder;
-import org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileStore;
 import 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.ConfigHelper;
 import 
org.apache.jackrabbit.oak.index.indexer.document.incrementalstore.IncrementalStoreBuilder;
 import org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStore;
@@ -165,14 +164,14 @@ public abstract class DocumentStoreIndexerBase implements 
Closeable {
         }
     }
 
-    private List<FlatFileStore> buildFlatFileStoreList(NodeState 
checkpointedState,
+    private List<IndexStore> buildFlatFileStoreList(NodeState 
checkpointedState,
                                                        CompositeIndexer 
indexer,
                                                        Predicate<String> 
pathPredicate,
                                                        Set<String> 
preferredPathElements,
                                                        boolean splitFlatFile,
                                                        Set<IndexDefinition> 
indexDefinitions,
                                                        IndexingReporter 
reporter) throws IOException {
-        List<FlatFileStore> storeList = new ArrayList<>();
+        List<IndexStore> storeList = new ArrayList<>();
 
         Stopwatch flatFileStoreWatch = Stopwatch.createStarted();
         int executionCount = 1;
@@ -201,7 +200,8 @@ public abstract class DocumentStoreIndexerBase implements 
Closeable {
                                 nodeStore, getMongoDocumentStore(), 
traversalLog))
                         .withCheckpoint(indexerSupport.getCheckpoint())
                         
.withStatisticsProvider(indexHelper.getStatisticsProvider())
-                        .withIndexingReporter(reporter);
+                        .withIndexingReporter(reporter)
+                        .withAheadOfTimeBlobDownloader(true);
 
                 for (File dir : previousDownloadDirs) {
                     builder.addExistingDataDumpDir(dir);
@@ -209,9 +209,9 @@ public abstract class DocumentStoreIndexerBase implements 
Closeable {
                 if (splitFlatFile) {
                     storeList = builder.buildList(indexHelper, indexerSupport, 
indexDefinitions);
                 } else {
-                    storeList.add(builder.build());
+                    storeList.add(builder.build(indexHelper, indexer));
                 }
-                for (FlatFileStore item : storeList) {
+                for (IndexStore item : storeList) {
                     closer.register(item);
                 }
             } catch (CompositeException e) {
@@ -304,16 +304,16 @@ public abstract class DocumentStoreIndexerBase implements 
Closeable {
      * @deprecated replaced by {@link #buildStore()}
      */
     @Deprecated
-    public FlatFileStore buildFlatFileStore() throws IOException, 
CommitFailedException {
+    public IndexStore buildFlatFileStore() throws IOException, 
CommitFailedException {
         NodeState checkpointedState = 
indexerSupport.retrieveNodeStateForCheckpoint();
         Set<IndexDefinition> indexDefinitions = 
indexerSupport.getIndexDefinitions();
         Set<String> preferredPathElements = 
indexerSupport.getPreferredPathElements(indexDefinitions);
         Predicate<String> predicate = 
indexerSupport.getFilterPredicate(indexDefinitions, Function.identity());
-        FlatFileStore flatFileStore = 
buildFlatFileStoreList(checkpointedState, null, predicate,
+        IndexStore indexStore = buildFlatFileStoreList(checkpointedState, 
null, predicate,
                 preferredPathElements, 
IndexerConfiguration.parallelIndexEnabled(), indexDefinitions, 
indexingReporter).get(0);
-        log.info("FlatFileStore built at {}. To use this flatFileStore in a 
reindex step, set System Property-{} with value {}",
-                flatFileStore.getStorePath(), OAK_INDEXER_SORTED_FILE_PATH, 
flatFileStore.getStorePath());
-        return flatFileStore;
+        log.info("Store built at {}. To use this store in a reindex step, set 
the system property {} to {}",
+                indexStore.getStorePath(), OAK_INDEXER_SORTED_FILE_PATH, 
indexStore.getStorePath());
+        return indexStore;
     }
 
     public void reindex() throws CommitFailedException, IOException {
@@ -335,7 +335,7 @@ public abstract class DocumentStoreIndexerBase implements 
Closeable {
 
             closer.register(indexer);
 
-            List<FlatFileStore> flatFileStores = buildFlatFileStoreList(
+            List<IndexStore> indexStores = buildFlatFileStoreList(
                     checkpointedState,
                     indexer,
                     indexer::shouldInclude,
@@ -353,14 +353,14 @@ public abstract class DocumentStoreIndexerBase implements 
Closeable {
             INDEXING_PHASE_LOGGER.info("[TASK:INDEXING:START] Starting 
indexing");
             Stopwatch indexerWatch = Stopwatch.createStarted();
             try {
-                if (flatFileStores.size() > 1) {
-                    indexParallel(flatFileStores, indexer, progressReporter);
-                } else if (flatFileStores.size() == 1) {
-                    FlatFileStore flatFileStore = flatFileStores.get(0);
+                if (indexStores.size() > 1) {
+                    indexParallel(indexStores, indexer, progressReporter);
+                } else if (indexStores.size() == 1) {
+                    IndexStore indexStore = indexStores.get(0);
                     TopKSlowestPaths slowestTopKElements = new 
TopKSlowestPaths(TOP_SLOWEST_PATHS_TO_LOG);
                     indexer.onIndexingStarting();
                     long entryStart = System.nanoTime();
-                    for (NodeStateEntry entry : flatFileStore) {
+                    for (NodeStateEntry entry : indexStore) {
                         reportDocumentRead(entry.getPath(), progressReporter);
                         indexer.index(entry);
                         // Avoid calling System.nanoTime() twice per each 
entry, by reusing the timestamp taken at the end
@@ -419,12 +419,12 @@ public abstract class DocumentStoreIndexerBase implements 
Closeable {
         }
     }
 
-    private void indexParallel(List<FlatFileStore> storeList, CompositeIndexer 
indexer, IndexingProgressReporter progressReporter)
+    private void indexParallel(List<IndexStore> storeList, CompositeIndexer 
indexer, IndexingProgressReporter progressReporter)
             throws IOException {
         ExecutorService service = 
Executors.newFixedThreadPool(IndexerConfiguration.indexThreadPoolSize());
         List<Future> futureList = new ArrayList<>();
 
-        for (FlatFileStore item : storeList) {
+        for (IndexStore item : storeList) {
             Future future = service.submit(() -> {
                 for (NodeStateEntry entry : item) {
                     reportDocumentRead(entry.getPath(), progressReporter);
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/AheadOfTimeBlobDownloader.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/AheadOfTimeBlobDownloader.java
new file mode 100644
index 0000000000..d199506b7d
--- /dev/null
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/AheadOfTimeBlobDownloader.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.flatfile;
+
+import java.io.Closeable;
+
+public interface AheadOfTimeBlobDownloader extends Closeable {
+
+    void start();
+
+    void updateIndexed(long lastEntryIndexed);
+
+    AheadOfTimeBlobDownloader NOOP = new AheadOfTimeBlobDownloader() {
+        @Override
+        public void start() {
+        }
+
+        @Override
+        public void updateIndexed(long lastEntryIndexed) {
+        }
+
+        @Override
+        public void close() {
+        }
+    };
+}
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/AheadOfTimeBlobDownloaderThrottler.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/AheadOfTimeBlobDownloaderThrottler.java
new file mode 100644
index 0000000000..55d6e48bad
--- /dev/null
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/AheadOfTimeBlobDownloaderThrottler.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.flatfile;
+
+
+import org.apache.jackrabbit.oak.commons.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Tracks a prefetch window for the AOT downloader. This class keeps a window 
from {startPosition, endPosition} together
+ * with an estimation of the data that was downloaded inside this window. It 
exposes two operations:
+ *
+ * <ul>
+ * <li> {@link #reserveSpaceForBlob(long, long)} - Reserves space to download 
blob at the given position and size,
+ *      blocking until enough space is available.
+ * <li> {@link #advanceIndexer(long)} - Advances the window until the given 
position, removing from the window any
+ *      positions lower or equal than the new index.
+ * </ul>
+ * <p>
+ */
+public class AheadOfTimeBlobDownloaderThrottler {
+    private final static Logger LOG = 
LoggerFactory.getLogger(AheadOfTimeBlobDownloaderThrottler.class);
+
+    private static class DownloadedBlob {
+        // Position (line number) in the FFS
+        final long position;
+        final long size;
+
+        public DownloadedBlob(long position, long size) {
+            this.position = position;
+            this.size = size;
+        }
+
+        @Override
+        public String toString() {
+            return "DownloadedBlob{" +
+                    "position=" + position +
+                    ", size=" + size +
+                    '}';
+        }
+    }
+
+    // Maximum number of blobs that can be downloaded ahead of time
+    private final int maxWindowSizeNumberOfBlobs;
+    // Maximum number of bytes that can be downloaded ahead of the indexer
+    private final long maxWindowSizeBytes;
+
+    private final ReentrantLock lock = new ReentrantLock();
+    private final Condition condition = lock.newCondition();
+
+    // List of blobs downloaded ahead of the indexer
+    private final ArrayDeque<DownloadedBlob> aotDownloadedBlobs;
+    // How much data is currently downloaded ahead of time. This is just an 
optimization to avoid traversing the list of
+    // downloaded blobs every time we need the total window size.
+    private long currentWindowSizeBytes = 0;
+    // Top of the prefetch window
+    private long windowLastPosition = -1;
+    // Position of the indexer
+    private long indexerPosition = -1;
+
+    // Statistics, the maximum size ever reached by the window
+    private int highestWindowSizeNumberOfBlobs = 0;
+    private long highestWindowSizeBytes = 0;
+
+    /**
+     * @param maxWindowSizeBytes How many bytes can be downloaded ahead of the 
indexer.
+     * @param maxWindowSizeNumberOfBlobs      How many blobs can be downloaded 
ahead of the indexer.
+     */
+    public AheadOfTimeBlobDownloaderThrottler(int maxWindowSizeNumberOfBlobs, 
long maxWindowSizeBytes) {
+        if (maxWindowSizeNumberOfBlobs <= 0) {
+            throw new IllegalArgumentException("windowSizeNumberOfBlobs must 
be positive");
+        }
+        if (maxWindowSizeBytes <= 0) {
+            throw new IllegalArgumentException("maximumSizeBytes must be 
positive");
+        }
+        this.maxWindowSizeNumberOfBlobs = maxWindowSizeNumberOfBlobs;
+        this.maxWindowSizeBytes = maxWindowSizeBytes;
+        this.aotDownloadedBlobs = new ArrayDeque<>(maxWindowSizeNumberOfBlobs);
+    }
+
+    /**
+     * Reserves space for a blob to be downloaded. This method blocks until 
there is enough space in the prefetch window.
+     * If the position of the reservation is lower or equal to the indexer 
position, the reservation is ignored.
+     *
+     * @param position The position of the blob to be downloaded.
+     * @param length   The length of the blob to be downloaded.
+     * @return true if space was reserved, false if the indexer is already 
ahead of the position of the blob.
+     */
+    public boolean reserveSpaceForBlob(long position, long length) throws 
InterruptedException {
+        if (length > maxWindowSizeBytes) {
+            LOG.warn("Blob length {} is higher than the maximum size of the 
window {}. Proceeding with a reservation for the maximumSize of the 
throttler.", length, maxWindowSizeBytes);
+            length = maxWindowSizeBytes;
+        }
+        lock.lock();
+        try {
+            if (position <= windowLastPosition) {
+                throw new IllegalArgumentException("blobPosition " + position 
+ " is not higher than the current last position of the window " + 
windowLastPosition);
+            }
+            if (position <= indexerPosition) {
+                LOG.warn("Blob position {} is lower than the indexer position 
{}. Ignoring space reservation request", position, indexerPosition);
+                // Do not reserve space for this blob, it is already indexed
+                return false;
+            }
+
+            while (currentWindowSizeBytes + length > maxWindowSizeBytes || 
aotDownloadedBlobs.size() >= maxWindowSizeNumberOfBlobs) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Waiting until indexer catches up. Downloader 
position: {}, AOT data downloader: {}, number of aot downloaded blobs: {}",
+                            firstPosition(), 
IOUtils.humanReadableByteCount(currentWindowSizeBytes), 
aotDownloadedBlobs.size()
+                    );
+                }
+                condition.await();
+            }
+            windowLastPosition = position;
+            aotDownloadedBlobs.addLast(new DownloadedBlob(position, length));
+            currentWindowSizeBytes += length;
+            // Update maximum values
+            if (aotDownloadedBlobs.size() > highestWindowSizeNumberOfBlobs) {
+                highestWindowSizeNumberOfBlobs = aotDownloadedBlobs.size();
+            }
+            if (currentWindowSizeBytes > highestWindowSizeBytes) {
+                highestWindowSizeBytes = currentWindowSizeBytes;
+            }
+            return true;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public long getAvailableWindowSize() {
+        lock.lock();
+        try {
+            return maxWindowSizeNumberOfBlobs - aotDownloadedBlobs.size();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public long getAvailableWindowBytes() {
+        lock.lock();
+        try {
+            return maxWindowSizeBytes - currentWindowSizeBytes;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Advances the indexer to the given position.
+     *
+     * @param indexerPosition The new position of the indexer.
+     */
+    public void advanceIndexer(long indexerPosition) {
+        lock.lock();
+        try {
+            int oldWindowSize = aotDownloadedBlobs.size();
+            long oldWindowBytes = currentWindowSizeBytes;
+            while (true) {
+                DownloadedBlob head = aotDownloadedBlobs.peekFirst();
+                if (head != null && head.position <= indexerPosition) {
+                    aotDownloadedBlobs.pollFirst();
+                    currentWindowSizeBytes -= head.size;
+                } else {
+                    break;
+                }
+            }
+            if (oldWindowSize != aotDownloadedBlobs.size()) {
+                LOG.debug("Window size reduced. Indexer position: {}. 
windowSize: {} -> {}, windowSizeBytes: {} -> {}",
+                        indexerPosition, oldWindowSize, 
aotDownloadedBlobs.size(),
+                        IOUtils.humanReadableByteCountBin(oldWindowBytes), 
IOUtils.humanReadableByteCountBin(currentWindowSizeBytes));
+                if (currentWindowSizeBytes < 0) {
+                    throw new IllegalStateException("AOT downloaded bytes is 
negative. aotDownloaded: " + currentWindowSizeBytes);
+                }
+                condition.signalAll();
+            }
+            this.indexerPosition = indexerPosition;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private long firstPosition() {
+        return aotDownloadedBlobs.isEmpty() ? -1 : 
aotDownloadedBlobs.getFirst().position;
+    }
+
+    public String formatStats() {
+        lock.lock();
+        try {
+            return String.format("AOT Downloader throttler: 
{aotDownloadedBlobsSize: %s, aotDownloadedBlobsSizeBytes: %s, 
maxWindowSizeNumberOfBlobs: %s, maxWindowSizeBytes: %s}",
+                    aotDownloadedBlobs.size(), 
IOUtils.humanReadableByteCount(currentWindowSizeBytes), 
highestWindowSizeNumberOfBlobs, 
IOUtils.humanReadableByteCount(highestWindowSizeBytes));
+        } finally {
+            lock.unlock();
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/AheadOfTimeBlobDownloadingFlatFileStore.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/AheadOfTimeBlobDownloadingFlatFileStore.java
new file mode 100644
index 0000000000..767737dd89
--- /dev/null
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/AheadOfTimeBlobDownloadingFlatFileStore.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.flatfile;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.jackrabbit.oak.index.IndexHelper;
+import org.apache.jackrabbit.oak.index.indexer.document.CompositeIndexer;
+import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntry;
+import 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.ConfigHelper;
+import org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStore;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AheadOfTimeBlobDownloadingFlatFileStore implements IndexStore {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final FlatFileStore ffs;
+    private final CompositeIndexer indexer;
+    private final IndexHelper indexHelper;
+
+    public static final String BLOB_PREFETCH_ENABLE_FOR_INDEXES_PREFIXES = 
"oak.indexer.blobPrefetch.enableForIndexesPrefixes";
+    public static final String BLOB_PREFETCH_BINARY_NODES_SUFFIX = 
"oak.indexer.blobPrefetch.binaryNodesSuffix";
+    public static final String BLOB_PREFETCH_DOWNLOAD_THREADS = 
"oak.indexer.blobPrefetch.downloadThreads";
+    public static final String BLOB_PREFETCH_DOWNLOAD_AHEAD_WINDOW_MB = 
"oak.indexer.blobPrefetch.downloadAheadWindowMB";
+    public static final String BLOB_PREFETCH_DOWNLOAD_AHEAD_WINDOW_SIZE = 
"oak.indexer.blobPrefetch.downloadAheadWindowSize";
+    private final String blobPrefetchEnableForIndexes = 
ConfigHelper.getSystemPropertyAsString(BLOB_PREFETCH_ENABLE_FOR_INDEXES_PREFIXES,
 "");
+    private final String blobPrefetchBinaryNodeSuffix = 
ConfigHelper.getSystemPropertyAsString(BLOB_PREFETCH_BINARY_NODES_SUFFIX, "");
+    private final int nDownloadThreads = 
ConfigHelper.getSystemPropertyAsInt(BLOB_PREFETCH_DOWNLOAD_THREADS, 4);
+    private final int maxPrefetchWindowMB = 
ConfigHelper.getSystemPropertyAsInt(BLOB_PREFETCH_DOWNLOAD_AHEAD_WINDOW_MB, 32);
+    private final int maxPrefetchWindowSize = 
ConfigHelper.getSystemPropertyAsInt(BLOB_PREFETCH_DOWNLOAD_AHEAD_WINDOW_SIZE, 
4096);
+
+    public static AheadOfTimeBlobDownloadingFlatFileStore wrap(FlatFileStore 
ffs, CompositeIndexer indexer, IndexHelper indexHelper) {
+        return new AheadOfTimeBlobDownloadingFlatFileStore(ffs, indexer, 
indexHelper);
+    }
+
+    private AheadOfTimeBlobDownloadingFlatFileStore(FlatFileStore ffs, 
CompositeIndexer indexer, IndexHelper indexHelper) {
+        this.ffs = ffs;
+        this.indexer = indexer;
+        this.indexHelper = indexHelper;
+    }
+
+    private @NotNull AheadOfTimeBlobDownloader 
createAheadOfTimeBlobDownloader(CompositeIndexer indexer, IndexHelper 
indexHelper) {
+        if (blobPrefetchBinaryNodeSuffix == null || 
blobPrefetchBinaryNodeSuffix.isEmpty()) {
+            log.info("Ahead of time blob downloader is disabled, no binary 
node suffix provided");
+            return AheadOfTimeBlobDownloader.NOOP;
+        } else if (!isEnabledForIndexes(blobPrefetchEnableForIndexes, 
indexHelper.getIndexPaths())) {
+            log.info("Ahead of time blob downloader is disabled, not enabled 
for indexes: {}", indexHelper.getIndexPaths());
+            return AheadOfTimeBlobDownloader.NOOP;
+        } else {
+            return new DefaultAheadOfTimeBlobDownloader(
+                    blobPrefetchBinaryNodeSuffix,
+                    ffs.getStoreFile(),
+                    ffs.getAlgorithm(),
+                    indexHelper.getGCBlobStore(),
+                    indexer,
+                    nDownloadThreads,
+                    maxPrefetchWindowSize,
+                    maxPrefetchWindowMB);
+        }
+    }
+
+    static boolean isEnabledForIndexes(String indexesEnabledPrefix, 
List<String> indexPaths) {
+        List<String> enableForIndexes = splitAndTrim(indexesEnabledPrefix);
+        for (String indexPath : indexPaths) {
+            if (enableForIndexes.stream().anyMatch(indexPath::startsWith)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private static List<String> splitAndTrim(String str) {
+        if (str == null || str.isBlank()) {
+            return List.of();
+        } else {
+            return 
Arrays.stream(str.split(",")).map(String::trim).collect(Collectors.toList());
+        }
+    }
+
+    @Override
+    public @NotNull Iterator<NodeStateEntry> iterator() {
+        final AheadOfTimeBlobDownloader aheadOfTimeBlobDownloader = 
createAheadOfTimeBlobDownloader(indexer, indexHelper);
+        aheadOfTimeBlobDownloader.start();
+        return new Iterator<>() {
+
+            final Iterator<NodeStateEntry> it = ffs.iterator();
+            long entriesRead;
+
+            @Override
+            public boolean hasNext() {
+                boolean result = it.hasNext();
+                if (!result) {
+                    aheadOfTimeBlobDownloader.updateIndexed(entriesRead);
+                    try {
+                        aheadOfTimeBlobDownloader.close();
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+                return result;
+            }
+
+            @Override
+            public NodeStateEntry next() {
+                entriesRead++;
+                // No need to update the progress reporter for each entry. 
This should reduce a bit the
+                // overhead of updating the AOT downloader, which sets a 
volatile field internally.
+                if (entriesRead % 128 == 0) {
+                    aheadOfTimeBlobDownloader.updateIndexed(entriesRead);
+                }
+                return it.next();
+            }
+        };
+    }
+
+    @Override
+    public String getStorePath() {
+        return ffs.getStorePath();
+    }
+
+    @Override
+    public long getEntryCount() {
+        return ffs.getEntryCount();
+    }
+
+    @Override
+    public void setEntryCount(long entryCount) {
+        ffs.setEntryCount(entryCount);
+    }
+
+    @Override
+    public void close() throws IOException {
+        ffs.close();
+    }
+
+    @Override
+    public String getIndexStoreType() {
+        return ffs.getIndexStoreType();
+    }
+
+    @Override
+    public boolean isIncremental() {
+        return ffs.isIncremental();
+    }
+
+}
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/DefaultAheadOfTimeBlobDownloader.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/DefaultAheadOfTimeBlobDownloader.java
new file mode 100644
index 0000000000..63eb55868c
--- /dev/null
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/DefaultAheadOfTimeBlobDownloader.java
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.flatfile;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.LineIterator;
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.commons.Compression;
+import org.apache.jackrabbit.oak.commons.IOUtils;
+import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
+import org.apache.jackrabbit.oak.index.indexer.document.CompositeIndexer;
+import 
org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStoreUtils;
+import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob;
+import org.apache.jackrabbit.oak.plugins.index.FormattingUtils;
+import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.LongAdder;
+
+/**
+ * Scans a FlatFileStore for non-inlined blobs in nodes matching a given 
pattern and downloads them from the blob store.
+ * The goal of this class is to populate the local data store cache with the 
non-inlined blobs that are required by the
+ * indexer, so that when the indexing thread tries to retrieve the blob, it 
will find it locally, thereby avoiding an
+ * expensive call to the blob store. When indexing repositories with many 
non-inlined renditions, pre-populating the
+ * cache can cut the indexing time by more than half.
+ * <p>
+ * This AOT download is intended to run asynchronously with the indexing 
thread. It starts the following threads:
+ * <ul>
+ * <li>[scanner] - scans the FFS, searching for blobs to download. A blob is 
selected for download if it is a binary property
+ * in a node whose name matches the suffix given as parameter to this class, 
and is non-inlined.</li>
+ * <li>[downloader-n] - a configurable number of threads that download the 
blobs that were discovered by the scanner thread.</li>
+ * </ul>
+ * The indexer should periodically call {@link #updateIndexed(long)} to inform 
the AOT downlaoder of the last line
+ * indexed. This is necessary to keep the AOT downloader more or less in sync 
with the indexer, that is, to prevent it
+ * from falling behind and to prevent it from going to far ahead.
+ * <p>
+ * This AOT downloader should be configured with enough threads that it is 
able to stay ahead of the indexer. Whether it
+ * can remain ahead or not, will depend on the number of blobs to download and 
the speed of the connection to the blob
+ * store. As a rough guide, on a cloud environment with blob stored in Azure 
Blob Store or Amazon S3, 4 download threads
+ * should be enough. If the AOT downloader falls behind the indexer, it will 
skip any nodes that are behind the last known
+ * indexing position, to try to catchup.
+ * <p>
+ * The AOT downlaoder will also try not to be too far ahead of the indexer. 
This is done to avoid filling up the local
+ * blob store cache, which would cause blobs to be evicted before the indexer 
gets around to use them. In this case, the
+ * indexer would have to download again the blob, which would negate the 
benefits of using this AOT downloader.
+ * The AOT downlaoder takes as parameter the maximum amount of data that it is 
allowed to prefetch (<code>maxPrefetchWindowMB</code>).
+ * It will them try to not download more than this data, pausing its progress 
whenever the prefect window is full.
+ *  For details on how this implemented, see {@link 
AheadOfTimeBlobDownloaderThrottler}.
+ */
+public class DefaultAheadOfTimeBlobDownloader implements 
AheadOfTimeBlobDownloader {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DefaultAheadOfTimeBlobDownloader.class);
+    // Stops the downloader threads.
+    private static final Blob SENTINEL = new BlobStoreBlob(null, null);
+    private final static AtomicInteger threadNameCounter = new 
AtomicInteger(0);
+
+    // Set with the ids of blobs that were already enqueued for download. 
Avoids creating a large number of download
+    // requests for the blobs that are referenced by many nodes. Although this 
is a HashMap it is used as a set. The
+    // value is just a placeholder, we use Boolean.TRUE for no particular 
reason.
+    private static final int DOWNLOADED_BLOB_IDS_CACHE_SIZE = 4096;
+    private final LinkedHashMap<String, Boolean> downloadedBlobs = new 
LinkedHashMap<>(DOWNLOADED_BLOB_IDS_CACHE_SIZE, 0.75f, true) {
+        // Avoid resizing operations
+        private final static int MAX_ENTRIES = (int) 
(DOWNLOADED_BLOB_IDS_CACHE_SIZE * 0.70);
+
+        protected boolean removeEldestEntry(Map.Entry eldest) {
+            return size() > MAX_ENTRIES;
+        }
+    };
+
+    private final String binaryBlobsPathSuffix;
+    private final File ffsPath;
+    private final Compression algorithm;
+    private final GarbageCollectableBlobStore blobStore;
+    private final CompositeIndexer indexer;
+
+    // Statistics
+    private final LongAdder totalBytesDownloaded = new LongAdder();
+    private final LongAdder totalTimeDownloadingNanos = new LongAdder();
+    private final LongAdder totalBlobsDownloaded = new LongAdder();
+    private long blobsEnqueuedForDownload = 0;
+    private long skippedLinesDueToLaggingIndexing = 0;
+
+    // Download threads plus thread that scans the FFS
+    private ExecutorService executor;
+    private ScanTask scanTask;
+    private Future<?> scanFuture;
+    private ArrayList<Future<?>> downloadFutures;
+    private final int nDownloadThreads;
+    private final AheadOfTimeBlobDownloaderThrottler throttler;
+    private volatile long indexerLastKnownPosition;
+
+    /**
+     * @param binaryBlobsPathSuffix Suffix of nodes that are to be considered 
for AOT download. Any node that does not match this suffix is ignored.
+     * @param ffsPath               Flat file store path.
+     * @param algorithm             Compression algorithm of the flat file 
store.
+     * @param blobStore             The blob store. This should be the same 
blob store used by the indexer and its cache should be
+     *                              large enough to hold 
<code>maxPrefetchWindowMB</code> of data.
+     * @param indexer               The indexer, needed to check if a given 
path should be indexed.
+     * @param nDownloadThreads      Number of download threads.
+     * @param maxPrefetchWindowMB   Size of the prefetch window, that is, how 
much data the downlaoder will retrieve ahead of the indexer.
+     */
+    public DefaultAheadOfTimeBlobDownloader(@NotNull String 
binaryBlobsPathSuffix,
+                                            @NotNull File ffsPath, @NotNull 
Compression algorithm,
+                                            @NotNull 
GarbageCollectableBlobStore blobStore,
+                                            @NotNull CompositeIndexer indexer,
+                                            int nDownloadThreads, int 
maxPrefetchWindowSize, int maxPrefetchWindowMB) {
+        if (nDownloadThreads < 1) {
+            throw new IllegalArgumentException("nDownloadThreads must be 
greater than 0. Was: " + nDownloadThreads);
+        }
+        if (maxPrefetchWindowMB < 1) {
+            throw new IllegalArgumentException("maxPrefetchWindowMB must be 
greater than 0. Was: " + maxPrefetchWindowMB);
+        }
+        this.binaryBlobsPathSuffix = binaryBlobsPathSuffix;
+        this.ffsPath = ffsPath;
+        this.algorithm = algorithm;
+        this.blobStore = blobStore;
+        this.indexer = indexer;
+        this.nDownloadThreads = nDownloadThreads;
+        this.throttler = new 
AheadOfTimeBlobDownloaderThrottler(maxPrefetchWindowSize, maxPrefetchWindowMB * 
FileUtils.ONE_MB);
+        LOG.info("Created AheadOfTimeBlobDownloader. downloadThreads: {}, 
prefetchMB: {}", nDownloadThreads, maxPrefetchWindowMB);
+    }
+
+    public void start() {
+        executor = Executors.newFixedThreadPool(nDownloadThreads + 1);
+        ArrayBlockingQueue<Blob> queue = new 
ArrayBlockingQueue<>(nDownloadThreads * 2);
+
+        downloadFutures = new ArrayList<>();
+        for (int i = 0; i < nDownloadThreads; i++) {
+            DownloadTask downloadTask = new DownloadTask(queue);
+            downloadFutures.add(executor.submit(downloadTask));
+        }
+        scanTask = new ScanTask(queue);
+        scanFuture = executor.submit(scanTask);
+    }
+
+    public void updateIndexed(long positionIndexed) {
+        this.indexerLastKnownPosition = positionIndexed;
+        throttler.advanceIndexer(positionIndexed);
+    }
+
+    public void close() {
+        stop();
+    }
+
+    public void stop() {
+        if (executor == null) {
+            return;
+        }
+        LOG.info("Stopping AheadOfTimeBlobDownloader. Statistics: {}", 
formatAggregateStatistics());
+        scanFuture.cancel(true);
+        for (Future<?> downloadFuture : downloadFutures) {
+            downloadFuture.cancel(true);
+        }
+        LOG.info("Waiting for download tasks to finish");
+        new ExecutorCloser(executor).close();
+        executor = null;
+        LOG.info("All download tasks finished");
+    }
+
+    public String formatAggregateStatistics() {
+        long totalBytesDownloadedSum = totalBytesDownloaded.sum();
+        return String.format(
+                "Downloaded %d blobs, %d bytes (%s). aggregatedDownloadTime: 
%s, cacheHits: %d, linesScanned: %d, " +
+                        "notIncludedInIndex: %d, doesNotMatchPattern: %d, 
inlinedBlobsSkipped: %d, " +
+                        "skippedForOtherReasons: %d, 
skippedLinesDueToLaggingIndexing: %d",
+                totalBlobsDownloaded.sum(), totalBytesDownloadedSum, 
IOUtils.humanReadableByteCountBin(totalBytesDownloadedSum),
+                
FormattingUtils.formatNanosToSeconds(totalTimeDownloadingNanos.sum()),
+                scanTask.blobCacheHit, scanTask.linesScanned, 
scanTask.notIncludedInIndex, scanTask.doesNotMatchPattern,
+                scanTask.inlinedBlobsSkipped, scanTask.skippedForOtherReasons, 
skippedLinesDueToLaggingIndexing);
+    }
+
+    /**
+     * Scans the FFS, searching for binary properties that are not inlined and 
enqueues them for download.
+     */
+    private class ScanTask implements Runnable {
+        private final NodeStateEntryReader nodeStateEntryReader = new 
NodeStateEntryReader(blobStore);
+        private final ArrayBlockingQueue<Blob> queue;
+
+        long linesScanned = 0;
+        long blobCacheHit = 0;
+        long notIncludedInIndex = 0;
+        long doesNotMatchPattern = 0;
+        long inlinedBlobsSkipped = 0;
+        long skippedForOtherReasons = 0;
+
+        public ScanTask(ArrayBlockingQueue<Blob> queue) {
+            this.queue = queue;
+        }
+
+        @Override
+        public void run() {
+            try (LineIterator ffsLineIterator = new 
LineIterator(IndexStoreUtils.createReader(ffsPath, algorithm))) {
+                String oldName = Thread.currentThread().getName();
+                Thread.currentThread().setName("scanner");
+                try {
+                    while (ffsLineIterator.hasNext()) {
+                        String ffsLine = ffsLineIterator.next();
+                        // Do not parse the json with the node state yet, 
first check if the path is a possible candidate
+                        // for download. Most of the lines in the FFS will be 
of lines that do not contain blobs to download,
+                        // so it would be wasteful to parse the json for all 
of them.
+                        int pipeIndex = 
ffsLine.indexOf(NodeStateEntryWriter.DELIMITER_CHAR);
+                        String entryPath = ffsLine.substring(0, pipeIndex);
+                        if (!isCandidatePath(entryPath)) {
+                            doesNotMatchPattern++;
+                        } else if (!indexer.shouldInclude(entryPath)) {
+                            notIncludedInIndex++;
+                        } else if (isBehindIndexer(linesScanned)) {
+                            LOG.debug("Skipping blob at position {} because it 
was already indexed", linesScanned);
+                            skippedLinesDueToLaggingIndexing++;
+                        } else {
+                            // Now we need to parse the json to check if there 
are any blobs to download
+                            processEntry(entryPath, 
ffsLine.substring(pipeIndex + 1));
+                        }
+                        linesScanned++;
+                        if (linesScanned % 100_000 == 0) {
+                            LOG.info("[{}] Last path scanned: {}. Aggregated 
statistics: {}", linesScanned, entryPath, formatAggregateStatistics());
+                        }
+                    }
+                } catch (InterruptedException e) {
+                    queue.clear();
+                    LOG.info("Scan task interrupted, exiting");
+                } finally {
+                    LOG.info("Scanner reached end of FFS, stopping download 
threads. Statistics: {}", formatAggregateStatistics());
+                    Thread.currentThread().setName(oldName);
+                    queue.put(SENTINEL);
+                }
+            } catch (InterruptedException | IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        private boolean isCandidatePath(String path) {
+            return path.endsWith(binaryBlobsPathSuffix);
+        }
+
+        private boolean isBehindIndexer(long scannerPosition) {
+            // Always try to be ahead of the indexer
+            return scannerPosition <= indexerLastKnownPosition;
+        }
+
+        private void processEntry(String entryPath, String entryStateAsJson) 
throws InterruptedException {
+            NodeState nodeState = 
nodeStateEntryReader.parseState(entryStateAsJson);
+            PropertyState ps = nodeState.getProperty("jcr:data");
+            if (ps == null || ps.isArray() || ps.getType() != Type.BINARY) {
+                skippedForOtherReasons++;
+                LOG.info("Skipping node: {}. Property \"jcr:data\": {}", 
entryPath, ps);
+                return;
+            }
+            for (Blob blob : ps.getValue(Type.BINARIES)) {
+                if (blob.isInlined()) {
+                    inlinedBlobsSkipped++;
+                    continue;
+                }
+                if (blob.getContentIdentity() == null) {
+                    LOG.info("[{}] Skipping blob with null content identity: 
{}", linesScanned, blob.getContentIdentity());
+                    continue;
+                }
+                // Check if we have recently downloaded this blob. This is 
just an optimization. Without this cache,
+                // if a blob had been downloaded recently, further attempts to 
download it would hit the blob store cache
+                // and complete quickly. But as it is common for the same blob 
to be referenced by many entries, this simple
+                // check here avoids the extra work of enqueuing the blob for 
download and reading it from the cache.
+                boolean present = 
downloadedBlobs.containsKey(blob.getContentIdentity());
+                if (present) {
+                    blobCacheHit++;
+                    LOG.debug("[{}] Blob already downloaded or enqueued for 
download: {}", linesScanned, blob.getContentIdentity());
+                    continue;
+                }
+                throttler.reserveSpaceForBlob(linesScanned, blob.length());
+                downloadedBlobs.put(blob.getContentIdentity(), Boolean.TRUE);
+                queue.put(blob);
+                blobsEnqueuedForDownload++;
+                // Log progress
+                if (blobsEnqueuedForDownload % 1000 == 0) {
+                    LOG.info("[{}] Enqueued blob for download: {}, size: {}, 
Statistics: {}, {}",
+                            linesScanned, blob.getContentIdentity(), 
blob.length(),
+                            formatAggregateStatistics(), 
throttler.formatStats());
+                }
+            }
+        }
+    }
+
+    /**
+     * Downloads blobs from the blob store.
+     */
+    private class DownloadTask implements Runnable {
+        private final ArrayBlockingQueue<Blob> queue;
+
+        private long blobsDownloaded = 0;
+        private long bytesDownloaded = 0;
+        private long timeDownloadingNanos = 0;
+
+        /**
+         * @param queue The queue from which to take blobs to download.
+         */
+        public DownloadTask(ArrayBlockingQueue<Blob> queue) {
+            this.queue = queue;
+        }
+
+        @Override
+        public void run() {
+            String oldName = Thread.currentThread().getName();
+            Thread.currentThread().setName("downloader-" + 
threadNameCounter.getAndIncrement());
+            byte[] buffer = new byte[4096];
+            try {
+                while (true) {
+                    Blob blob = queue.take();
+                    if (blob == SENTINEL) {
+                        LOG.info("Sentinel received, exiting. Statistics: {}", 
formatDownloaderStats());
+                        queue.put(SENTINEL);
+                        break;
+                    }
+
+                    long startNanos = System.nanoTime();
+                    InputStream stream = blob.getNewStream();
+                    int blobSize = 0;
+                    try {
+                        while (true) {
+                            int bytesRead = stream.read(buffer);
+                            if (bytesRead == -1) {
+                                break;
+                            }
+                            blobSize += bytesRead;
+                        }
+                        if (blobSize != blob.length()) {
+                            LOG.error("Blob size mismatch: blob.length(): {}, 
bytesRead: {}", blob.length(), blobSize);
+                        }
+                        long elapsedNanos = System.nanoTime() - startNanos;
+                        // Local stats
+                        bytesDownloaded += blobSize;
+                        blobsDownloaded++;
+                        timeDownloadingNanos += elapsedNanos;
+                        // Aggregated stats across all download threads.
+                        totalBytesDownloaded.add(blobSize);
+                        totalTimeDownloadingNanos.add(elapsedNanos);
+                        totalBlobsDownloaded.increment();
+                        // Log progress
+                        if (blobsDownloaded % 500 == 0) {
+                            LOG.info("Retrieved blob: {}, size: {}, in {} ms. 
Downloader thread statistics: {}",
+                                    blob.getContentIdentity(), blob.length(), 
elapsedNanos / 1_000_000, formatDownloaderStats());
+                        }
+                    } catch (IOException e) {
+                        LOG.error("Error downloading blob: {}", 
blob.getContentIdentity(), e);
+                    }
+                }
+            } catch (InterruptedException e) {
+                LOG.info("Download task interrupted, exiting. Statistics: {}", 
formatDownloaderStats());
+            } finally {
+                Thread.currentThread().setName(oldName);
+            }
+        }
+
+        private String formatDownloaderStats() {
+            return String.format("Downloaded %d blobs, %d bytes (%s) in %s",
+                    blobsDownloaded, bytesDownloaded, 
IOUtils.humanReadableByteCountBin(bytesDownloaded),
+                    
FormattingUtils.formatNanosToSeconds(timeDownloadingNanos));
+        }
+    }
+}
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java
index 294d8f3117..949253ba46 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java
@@ -27,8 +27,10 @@ import org.apache.jackrabbit.oak.commons.Compression;
 import org.apache.jackrabbit.oak.index.IndexHelper;
 import org.apache.jackrabbit.oak.index.IndexerSupport;
 import org.apache.jackrabbit.oak.index.indexer.document.CompositeException;
+import org.apache.jackrabbit.oak.index.indexer.document.CompositeIndexer;
 import 
org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntryTraverserFactory;
 import 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedStrategy;
+import org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStore;
 import 
org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStoreSortStrategy;
 import 
org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStoreUtils;
 import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
@@ -106,6 +108,7 @@ public class FlatFileNodeStoreBuilder {
     private StatisticsProvider statisticsProvider = StatisticsProvider.NOOP;
     private IndexingReporter indexingReporter = IndexingReporter.NOOP;
     private MongoClientURI mongoClientURI;
+    private boolean withAheadOfTimeBlobDownloading = false;
 
     public enum SortStrategyType {
         /**
@@ -199,7 +202,24 @@ public class FlatFileNodeStoreBuilder {
         return this;
     }
 
-    public FlatFileStore build() throws IOException, CompositeException {
+    /**
+     * NOTE: This enables the support for AOT blob download, it does not turn 
activate it. To activate it, set the property
+     * 
AheadOfTimeBlobDownloadingFlatFileStore.OAK_INDEXER_BLOB_PREFETCH_BINARY_NODES_SUFFIX
 to a non-empty value AND
+     * enable the support here.
+     *
+     * @param aotSupportEnabled
+     * @return
+     */
+    public FlatFileNodeStoreBuilder withAheadOfTimeBlobDownloader(boolean 
aotSupportEnabled) {
+        this.withAheadOfTimeBlobDownloading = aotSupportEnabled;
+        return this;
+    }
+
+    public IndexStore build() throws IOException, CompositeException {
+        return build(null, null);
+    }
+
+    public IndexStore build(IndexHelper indexHelper, CompositeIndexer indexer) 
throws IOException, CompositeException {
         logFlags();
         entryWriter = new NodeStateEntryWriter(blobStore);
         IndexStoreFiles indexStoreFiles = createdSortedStoreFiles();
@@ -210,10 +230,17 @@ public class FlatFileNodeStoreBuilder {
         if (entryCount > 0) {
             store.setEntryCount(entryCount);
         }
-        return store;
+        if (indexer == null || indexHelper == null) {
+            return store;
+        }
+        if (withAheadOfTimeBlobDownloading) {
+            return AheadOfTimeBlobDownloadingFlatFileStore.wrap(store, 
indexer, indexHelper);
+        } else {
+            return store;
+        }
     }
 
-    public List<FlatFileStore> buildList(IndexHelper indexHelper, 
IndexerSupport indexerSupport,
+    public List<IndexStore> buildList(IndexHelper indexHelper, IndexerSupport 
indexerSupport,
                                          Set<IndexDefinition> 
indexDefinitions) throws IOException, CompositeException {
         logFlags();
         entryWriter = new NodeStateEntryWriter(blobStore);
@@ -233,7 +260,7 @@ public class FlatFileNodeStoreBuilder {
             log.info("Split flat file to result files '{}' is done, took {} 
ms", fileList, System.currentTimeMillis() - start);
         }
 
-        List<FlatFileStore> storeList = new ArrayList<>();
+        List<IndexStore> storeList = new ArrayList<>();
         for (File flatFileItem : fileList) {
             FlatFileStore store = new FlatFileStore(blobStore, flatFileItem, 
metadataFile, new NodeStateEntryReader(blobStore),
                     unmodifiableSet(preferredPathElements), algorithm);
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStore.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStore.java
index 1e260f4c1d..9650cd1fef 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStore.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStore.java
@@ -68,6 +68,10 @@ public class FlatFileStore implements IndexStore {
         return storeFile.getParentFile().getAbsolutePath();
     }
 
+    public File getStoreFile() {
+        return storeFile;
+    }
+
     /**
      *
      * @deprecated use {@link #getStorePath()} instead
@@ -132,4 +136,8 @@ public class FlatFileStore implements IndexStore {
     public boolean isIncremental() {
         return false;
     }
+
+    public Compression getAlgorithm() {
+        return algorithm;
+    }
 }
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/NodeStateEntryReader.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/NodeStateEntryReader.java
index 96732d4e96..32a07ccf19 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/NodeStateEntryReader.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/NodeStateEntryReader.java
@@ -43,7 +43,7 @@ public class NodeStateEntryReader {
         return new NodeStateEntry(nodeState, parts[0], memUsage, 0, "");
     }
 
-    protected NodeState parseState(String part) {
+    public NodeState parseState(String part) {
         return des.deserialize(part);
     }
 }
diff --git 
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/AheadOfTimeBlobDownloaderThrottlerTest.java
 
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/AheadOfTimeBlobDownloaderThrottlerTest.java
new file mode 100644
index 0000000000..ad55c7205d
--- /dev/null
+++ 
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/AheadOfTimeBlobDownloaderThrottlerTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document;
+
+import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
+import 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.AheadOfTimeBlobDownloaderThrottler;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.*;
+
+public class AheadOfTimeBlobDownloaderThrottlerTest {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AheadOfTimeBlobDownloaderThrottlerTest.class);
+
+    @Test
+    public void blockOnWindowFullByteSize() throws ExecutionException, 
InterruptedException, TimeoutException {
+        ScheduledExecutorService executorService = 
Executors.newSingleThreadScheduledExecutor();
+
+        int maxWindow = 10;
+        int maxWindowSizeBytes = 500;
+        try {
+            AheadOfTimeBlobDownloaderThrottler throttler = new 
AheadOfTimeBlobDownloaderThrottler(maxWindow, maxWindowSizeBytes);
+            assertTrue(throttler.reserveSpaceForBlob(0, 100));
+            assertEquals(maxWindowSizeBytes - 100, 
throttler.getAvailableWindowBytes());
+            assertEquals(maxWindow - 1, throttler.getAvailableWindowSize());
+
+            assertTrue(throttler.reserveSpaceForBlob(4, 300));
+            assertEquals(maxWindowSizeBytes - 400, 
throttler.getAvailableWindowBytes());
+            assertEquals(maxWindow - 2, throttler.getAvailableWindowSize());
+
+            assertTrue(throttler.reserveSpaceForBlob(5, 100));
+            assertEquals(0, throttler.getAvailableWindowBytes());
+            assertEquals(maxWindow - 3, throttler.getAvailableWindowSize());
+
+            // The prefetch window is full, so the next call to 
reserveSpaceForBlob should block. Do the call in a separate thread.
+            AtomicBoolean spaceWasReservedAfterWait = new AtomicBoolean(false);
+            CountDownLatch latch = new CountDownLatch(1);
+            Future<?> f = executorService.submit((() -> {
+                latch.countDown();
+                try {
+                    // This should block until the prefetch window is advanced
+                    assertTrue(throttler.reserveSpaceForBlob(6, 10));
+                    spaceWasReservedAfterWait.set(true);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }));
+            latch.await();
+            // The thread above has started. Wait for a bit to make sure it's 
blocked
+            Thread.sleep(50);
+            // The worker thread cannot reserve space until the main thread 
advances the indexer
+            assertFalse(spaceWasReservedAfterWait.get());
+            // This should release the thread above
+            throttler.advanceIndexer(0);
+            f.get(100, TimeUnit.MILLISECONDS);
+            // The thread should have advanced
+            assertTrue(spaceWasReservedAfterWait.get());
+            // Advance to the end
+            throttler.advanceIndexer(6);
+            // Check that the AOT download window is empty
+            assertEquals(maxWindowSizeBytes, 
throttler.getAvailableWindowBytes());
+            assertEquals(maxWindow, throttler.getAvailableWindowSize());
+        } finally {
+            new ExecutorCloser(executorService).close();
+        }
+    }
+
+    @Test
+    public void blockOnWindowFullCapacity() throws ExecutionException, 
InterruptedException, TimeoutException {
+        ScheduledExecutorService executorService = 
Executors.newSingleThreadScheduledExecutor();
+
+        int maxWindow = 10;
+        int maxWindowSizeBytes = 500;
+        try {
+            AheadOfTimeBlobDownloaderThrottler throttler = new 
AheadOfTimeBlobDownloaderThrottler(maxWindow, maxWindowSizeBytes);
+            // Fill the prefetch window
+            for (int i = 0; i < maxWindow; i++) {
+                assertTrue(throttler.reserveSpaceForBlob(i, 10));
+            }
+            assertEquals(maxWindowSizeBytes - 100, 
throttler.getAvailableWindowBytes());
+            assertEquals(0, throttler.getAvailableWindowSize());
+            AtomicBoolean spaceWasReservedAfterWait = new AtomicBoolean(false);
+            CountDownLatch latch = new CountDownLatch(1);
+            Future<?> f = executorService.submit((() -> {
+                latch.countDown();
+                try {
+                    // This should block until the prefetch window is advanced
+                    assertTrue(throttler.reserveSpaceForBlob(11, 10));
+                    spaceWasReservedAfterWait.set(true);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }));
+            latch.await();
+            // The thread above has started. Wait for a bit to make sure it's 
blocked
+            Thread.sleep(50);
+            assertFalse(spaceWasReservedAfterWait.get());
+            // Advance one element, this should allow the worker thread to 
reserve space
+            throttler.advanceIndexer(1);
+            f.get(100, TimeUnit.MILLISECONDS);
+            assertTrue(spaceWasReservedAfterWait.get());
+            // Advance to the end
+            throttler.advanceIndexer(11);
+            // Check that the AOT download window is empty
+            assertEquals(maxWindowSizeBytes, 
throttler.getAvailableWindowBytes());
+            assertEquals(maxWindow, throttler.getAvailableWindowSize());
+        } finally {
+            new ExecutorCloser(executorService).close();
+        }
+    }
+
+    @Test
+    public void spaceReservationForPositionBehindIndexerIsIgnored() throws 
InterruptedException {
+        AheadOfTimeBlobDownloaderThrottler throttler = new 
AheadOfTimeBlobDownloaderThrottler(10, 100);
+        throttler.advanceIndexer(5);
+        assertFalse(throttler.reserveSpaceForBlob(0, 10));
+        assertFalse(throttler.reserveSpaceForBlob(1, 10));
+        assertFalse(throttler.reserveSpaceForBlob(5, 10));
+        assertEquals(100, throttler.getAvailableWindowBytes());
+        assertEquals(10, throttler.getAvailableWindowSize());
+        assertTrue(throttler.reserveSpaceForBlob(6, 10));
+        assertEquals(90, throttler.getAvailableWindowBytes());
+        assertEquals(9, throttler.getAvailableWindowSize());
+    }
+
+    @Test
+    public void manyReservations() throws InterruptedException, 
ExecutionException {
+        AheadOfTimeBlobDownloaderThrottler throttler = new 
AheadOfTimeBlobDownloaderThrottler(1024, 64 * 1024);
+        ScheduledExecutorService executorService = 
Executors.newSingleThreadScheduledExecutor();
+        try {
+            CountDownLatch latch = new CountDownLatch(1);
+            Future<?> future = executorService.submit(() -> {
+                Random random = new Random();
+                latch.countDown();
+                for (int i = 0; i < 500; i++) {
+                    try {
+                        throttler.reserveSpaceForBlob(i, 512 + 
random.nextInt(512));
+                        Thread.sleep(1);
+                    } catch (InterruptedException e) {
+                        fail("Should not have thrown an exception");
+                    }
+                }
+            });
+
+            latch.await();
+            for (int i = 0; i < 500; i++) {
+                throttler.advanceIndexer(i);
+                Thread.sleep(1);
+            }
+
+            future.get();
+            LOG.info("Stats: {}", throttler.formatStats());
+        } finally {
+            new ExecutorCloser(executorService).close();
+        }
+    }
+}
diff --git 
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/AheadOfTimeBlobDownloadingFlatFileStoreTest.java
 
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/AheadOfTimeBlobDownloadingFlatFileStoreTest.java
new file mode 100644
index 0000000000..a630f05ae2
--- /dev/null
+++ 
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/AheadOfTimeBlobDownloadingFlatFileStoreTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.flatfile;
+
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+public class AheadOfTimeBlobDownloadingFlatFileStoreTest {
+
+    @Test
+    public void isEnabledForIndexes() {
+        
assertFalse(AheadOfTimeBlobDownloadingFlatFileStore.isEnabledForIndexes(
+                "",
+                List.of("/oak:index/fooA-34")
+        ));
+
+        assertTrue(AheadOfTimeBlobDownloadingFlatFileStore.isEnabledForIndexes(
+                "/oak:index/foo",
+                List.of("/oak:index/fooA-34")
+        ));
+
+        assertTrue(AheadOfTimeBlobDownloadingFlatFileStore.isEnabledForIndexes(
+                "/oak:index/foo",
+                List.of("/oak:index/anotherIndex", "/oak:index/fooA-34")
+        ));
+
+        
assertFalse(AheadOfTimeBlobDownloadingFlatFileStore.isEnabledForIndexes(
+                "/oak:index/foo",
+                List.of("/oak:index/anotherIndex")
+        ));
+
+        assertTrue(AheadOfTimeBlobDownloadingFlatFileStore.isEnabledForIndexes(
+                "/oak:index/fooA-,/oak:index/fooB-",
+                List.of("/oak:index/fooA-34")
+        ));
+
+        assertTrue(AheadOfTimeBlobDownloadingFlatFileStore.isEnabledForIndexes(
+                "/oak:index/fooA-, /oak:index/fooB-",
+                List.of("/oak:index/anotherIndex", "/oak:index/fooA-34")
+        ));
+
+        
assertFalse(AheadOfTimeBlobDownloadingFlatFileStore.isEnabledForIndexes(
+                "/oak:index/fooA-",
+                List.of()
+        ));
+    }
+}
\ No newline at end of file
diff --git 
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilderTest.java
 
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilderTest.java
index c15c29003e..679fcc62db 100644
--- 
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilderTest.java
+++ 
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilderTest.java
@@ -36,6 +36,7 @@ import 
org.apache.jackrabbit.oak.index.indexer.document.CompositeException;
 import org.apache.jackrabbit.oak.index.indexer.document.IndexerConfiguration;
 import 
org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntryTraverserFactory;
 import 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedStrategy;
+import org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStore;
 import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
 import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
 import 
org.apache.jackrabbit.oak.plugins.index.search.util.IndexDefinitionBuilder;
@@ -177,8 +178,8 @@ public class FlatFileNodeStoreBuilderTest {
     public void assertBuild(String dir) throws CompositeException, IOException 
{
         FlatFileNodeStoreBuilder builder = new 
FlatFileNodeStoreBuilder(folder.getRoot()).withNodeStateEntryTraverserFactory(
                 nodeStateEntryTraverserFactory);
-        try (FlatFileStore store = builder.build()) {
-            assertEquals(dir, store.getFlatFileStorePath());
+        try (IndexStore store = builder.build()) {
+            assertEquals(dir, store.getStorePath());
         }
     }
 
@@ -201,13 +202,13 @@ public class FlatFileNodeStoreBuilderTest {
         NodeState rootState = mock(NodeState.class);
         
when(indexerSupport.retrieveNodeStateForCheckpoint()).thenReturn(rootState);
 
-        List<FlatFileStore> storeList = builder.buildList(indexHelper, 
indexerSupport, mockIndexDefns());
+        List<IndexStore> storeList = builder.buildList(indexHelper, 
indexerSupport, mockIndexDefns());
 
         if (split) {
-            assertEquals(new File(dir, "split").getAbsolutePath(), 
storeList.get(0).getFlatFileStorePath());
+            assertEquals(new File(dir, "split").getAbsolutePath(), 
storeList.get(0).getStorePath());
             assertTrue(storeList.size() > 1);
         } else {
-            assertEquals(dir, storeList.get(0).getFlatFileStorePath());
+            assertEquals(dir, storeList.get(0).getStorePath());
             assertEquals(1, storeList.size());
         }
     }
diff --git 
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStoreTest.java
 
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStoreTest.java
index fabe1273a8..1688dc849c 100644
--- 
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStoreTest.java
+++ 
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStoreTest.java
@@ -21,6 +21,7 @@ package 
org.apache.jackrabbit.oak.index.indexer.document.flatfile;
 
 import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntry;
 import 
org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntryTraverser;
+import org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStore;
 import org.apache.jackrabbit.oak.spi.blob.MemoryBlobStore;
 import org.jetbrains.annotations.NotNull;
 import org.junit.Rule;
@@ -55,7 +56,7 @@ public class FlatFileStoreTest {
     private void runBasicTest() throws Exception {
         List<String> paths = createTestPaths();
         FlatFileNodeStoreBuilder spyBuilder = Mockito.spy(new 
FlatFileNodeStoreBuilder(folder.getRoot()));
-        FlatFileStore flatStore = spyBuilder.withBlobStore(new 
MemoryBlobStore())
+        IndexStore flatStore = spyBuilder.withBlobStore(new MemoryBlobStore())
                 .withPreferredPathElements(preferred)
                 .withPathPredicate(pathPredicate)
                 .withNodeStateEntryTraverserFactory(range -> new 
NodeStateEntryTraverser("NS-1", null, null,null, range) {
diff --git 
a/oak-run/src/main/java/org/apache/jackrabbit/oak/index/IndexCommand.java 
b/oak-run/src/main/java/org/apache/jackrabbit/oak/index/IndexCommand.java
index 8a2cbd521a..e8818bcd69 100644
--- a/oak-run/src/main/java/org/apache/jackrabbit/oak/index/IndexCommand.java
+++ b/oak-run/src/main/java/org/apache/jackrabbit/oak/index/IndexCommand.java
@@ -31,6 +31,7 @@ import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.index.async.AsyncIndexerLucene;
 import org.apache.jackrabbit.oak.index.indexer.document.DocumentStoreIndexer;
 import org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileStore;
+import org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStore;
 import org.apache.jackrabbit.oak.plugins.index.importer.IndexDefinitionUpdater;
 import org.apache.jackrabbit.oak.run.cli.CommonOptions;
 import org.apache.jackrabbit.oak.run.cli.DocumentBuilderCustomizer;
@@ -173,7 +174,7 @@ public class IndexCommand implements Command {
         }
     }
 
-    private void execute(NodeStoreFixture fixture,  IndexOptions indexOpts, 
Closer closer)
+    private void execute(NodeStoreFixture fixture, IndexOptions indexOpts, 
Closer closer)
             throws IOException, CommitFailedException {
         ExtendedIndexHelper extendedIndexHelper = createIndexHelper(fixture, 
indexOpts, closer);
 
@@ -184,13 +185,13 @@ public class IndexCommand implements Command {
         reindexOperation(indexOpts, extendedIndexHelper);
         importIndexOperation(indexOpts, extendedIndexHelper);
 
-        log.info("[INDEXING_REPORT:INDEX_UPLOAD]\n{}" , 
extendedIndexHelper.getIndexReporter().generateReport());
+        log.info("[INDEXING_REPORT:INDEX_UPLOAD]\n{}", 
extendedIndexHelper.getIndexReporter().generateReport());
     }
 
     private ExtendedIndexHelper createIndexHelper(NodeStoreFixture fixture,
-                                          IndexOptions indexOpts, Closer 
closer) throws IOException {
+                                                  IndexOptions indexOpts, 
Closer closer) throws IOException {
         ExtendedIndexHelper extendedIndexHelper = new 
ExtendedIndexHelper(fixture.getStore(), fixture.getBlobStore(), 
fixture.getWhiteboard(),
-                indexOpts.getOutDir(),  indexOpts.getWorkDir(), 
computeIndexPaths(indexOpts));
+                indexOpts.getOutDir(), indexOpts.getWorkDir(), 
computeIndexPaths(indexOpts));
 
         configurePreExtractionSupport(indexOpts, extendedIndexHelper);
 
@@ -206,7 +207,7 @@ public class IndexCommand implements Command {
             IndexDefinitionUpdater updater = new 
IndexDefinitionUpdater(definitions);
             Set<String> indexPathsFromJson = updater.getIndexPaths();
             Set<String> diff = Sets.difference(indexPathsFromJson, indexPaths);
-            if (!diff.isEmpty()){
+            if (!diff.isEmpty()) {
                 log.info("Augmenting the indexPaths with {} which are present 
in {}", diff, definitions);
             }
             indexPaths.addAll(indexPathsFromJson);
@@ -223,7 +224,7 @@ public class IndexCommand implements Command {
     }
 
     private void reindexOperation(IndexOptions indexOpts, ExtendedIndexHelper 
extendedIndexHelper) throws IOException, CommitFailedException {
-        if (!indexOpts.isReindex()){
+        if (!indexOpts.isReindex()) {
             return;
         }
 
@@ -252,9 +253,14 @@ public class IndexCommand implements Command {
             log.info("Using Document order traversal to perform reindexing");
             try (DocumentStoreIndexer indexer = new 
DocumentStoreIndexer(extendedIndexHelper, indexerSupport)) {
                 if (idxOpts.buildFlatFileStoreSeparately()) {
-                    FlatFileStore ffs = indexer.buildFlatFileStore();
-                    String pathToFFS = ffs.getFlatFileStorePath();
-                    System.setProperty(OAK_INDEXER_SORTED_FILE_PATH, 
pathToFFS);
+                    IndexStore indexStore = indexer.buildFlatFileStore();
+                    if (indexStore instanceof FlatFileStore) {
+                        FlatFileStore ffs = (FlatFileStore) indexStore;
+                        String pathToFFS = ffs.getFlatFileStorePath();
+                        System.setProperty(OAK_INDEXER_SORTED_FILE_PATH, 
pathToFFS);
+                    } else {
+                        throw new IllegalArgumentException("Store is not 
FlatFileStore, cannot cannot use option to build flat file store separately.");
+                    }
                 }
                 indexer.reindex();
             }
@@ -305,7 +311,7 @@ public class IndexCommand implements Command {
 
     private String connectInReadWriteModeAndCreateCheckPoint(IndexOptions 
indexOpts) throws Exception {
         String checkpoint = indexOpts.getCheckpoint();
-        if (checkpoint != null){
+        if (checkpoint != null) {
             log.info("Using provided checkpoint [{}]", checkpoint);
             return checkpoint;
         }
@@ -419,7 +425,7 @@ public class IndexCommand implements Command {
     }
 
     private static void configureCustomizer(Options opts, Closer closer, 
boolean readOnlyAccess) {
-        if (opts.getCommonOpts().isDocument()){
+        if (opts.getCommonOpts().isDocument()) {
             IndexOptions indexOpts = opts.getOptionBean(IndexOptions.class);
             if (indexOpts.isReindex()) {
                 IndexDocumentBuilderCustomizer customizer = new 
IndexDocumentBuilderCustomizer(opts, readOnlyAccess);

Reply via email to