This is an automated email from the ASF dual-hosted git repository.

fortino pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2d51b15ee0 OAK-10358 - Filter by path on MongoDB query when 
downloading repository in Indexing job (#1042)
2d51b15ee0 is described below

commit 2d51b15ee0255d3b2b72464b4ef43edaa786367c
Author: Nuno Santos <[email protected]>
AuthorDate: Wed Sep 6 17:39:34 2023 +0200

    OAK-10358 - Filter by path on MongoDB query when downloading repository in 
Indexing job (#1042)
    
    * Initial implementation of regex path filtering on MongoDB download.
    
    * Fix regex filter to also download documents with large paths (_id is a 
hash, entry path is in the _path field).
    
    * Revert changes to logback-test.xml.
    
    * Use Pattern.quote instead of manually  quoting.
    
    * Minor refactoring.
    
    * Apply regex filtering also in the cases where there are multiple indexes 
all with the same includedPaths.
    
    * Change default value of regex path filtering to false.
    
    * Improve documentation.
    
    * Fix documentation
    
    * Add logging of metrics to reindex, merge in node store and total job time.
    
    * Add handling of long paths to method that generates ancestors MongoDB 
query
    
    * Do not suppress InterruptedExceptions raised while waiting for retrying 
connecting to MongoDB.
---
 .../indexer/document/DocumentStoreIndexerBase.java |  10 +-
 .../flatfile/FlatFileNodeStoreBuilder.java         |  13 +-
 .../pipelined/PipelinedMongoDownloadTask.java      | 273 ++++++++++++++++-----
 .../flatfile/pipelined/PipelinedStrategy.java      |  16 +-
 .../document/flatfile/pipelined/PipelinedIT.java   | 174 ++++++++++---
 .../pipelined/PipelinedMongoDownloadTaskTest.java  |  33 ++-
 .../jackrabbit/oak/spi/filter/PathFilter.java      |  19 +-
 7 files changed, 424 insertions(+), 114 deletions(-)

diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
index df936687ee..7a31811de8 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
@@ -82,7 +82,6 @@ public abstract class DocumentStoreIndexerBase implements 
Closeable {
     protected final IndexHelper indexHelper;
     protected List<NodeStateIndexerProvider> indexerProviders;
     protected final IndexerSupport indexerSupport;
-    private final Set<String> indexerPaths = new HashSet<>();
     private static final int MAX_DOWNLOAD_ATTEMPTS = 
Integer.parseInt(System.getProperty("oak.indexer.maxDownloadRetries", "5")) + 1;
 
     public DocumentStoreIndexerBase(IndexHelper indexHelper, IndexerSupport 
indexerSupport) {
@@ -109,15 +108,13 @@ public abstract class DocumentStoreIndexerBase implements 
Closeable {
         private final DocumentNodeStore documentNodeStore;
         private final MongoDocumentStore documentStore;
         private final Logger traversalLogger;
-        private final CompositeIndexer indexer;
 
         private MongoNodeStateEntryTraverserFactory(RevisionVector 
rootRevision, DocumentNodeStore documentNodeStore,
-                                                    MongoDocumentStore 
documentStore, Logger traversalLogger, CompositeIndexer indexer) {
+                                                    MongoDocumentStore 
documentStore, Logger traversalLogger) {
             this.rootRevision = rootRevision;
             this.documentNodeStore = documentNodeStore;
             this.documentStore = documentStore;
             this.traversalLogger = traversalLogger;
-            this.indexer = indexer;
         }
 
         @Override
@@ -165,11 +162,12 @@ public abstract class DocumentStoreIndexerBase implements 
Closeable {
                         .withPreferredPathElements((preferredPathElements != 
null) ? preferredPathElements : indexer.getRelativeIndexedNodeNames())
                         
.addExistingDataDumpDir(indexerSupport.getExistingDataDumpDir())
                         .withPathPredicate(pathPredicate)
+                        .withIndexDefinitions(indexDefinitions)
                         .withRootRevision(rootDocumentState.getRootRevision())
                         .withNodeStore(nodeStore)
                         .withMongoDocumentStore(getMongoDocumentStore())
                         .withNodeStateEntryTraverserFactory(new 
MongoNodeStateEntryTraverserFactory(rootDocumentState.getRootRevision(),
-                                nodeStore, getMongoDocumentStore(), 
traversalLog, indexer));
+                                nodeStore, getMongoDocumentStore(), 
traversalLog));
                 for (File dir : previousDownloadDirs) {
                     builder.addExistingDataDumpDir(dir);
                 }
@@ -219,6 +217,7 @@ public abstract class DocumentStoreIndexerBase implements 
Closeable {
         for (IndexDefinition indexDf : indexDefinitions) {
             preferredPathElements.addAll(indexDf.getRelativeNodeNames());
         }
+
         Predicate<String> predicate = s -> 
indexDefinitions.stream().anyMatch(indexDef -> 
indexDef.getPathFilter().filter(s) != PathFilter.Result.EXCLUDE);
         FlatFileStore flatFileStore = 
buildFlatFileStoreList(checkpointedState, null, predicate,
             preferredPathElements, 
IndexerConfiguration.parallelIndexEnabled(), indexDefinitions).get(0);
@@ -386,7 +385,6 @@ public abstract class DocumentStoreIndexerBase implements 
Closeable {
                     indexers.add(indexer);
                     closer.register(indexer);
                     progressReporter.registerIndex(indexPath, true, -1);
-                    indexerPaths.add(indexPath);
                 }
             }
         }
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java
index 7309d4ff99..ca4bd45822 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java
@@ -35,6 +35,7 @@ import 
org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
 import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
 import org.apache.jackrabbit.oak.query.NodeStateNodeTypeInfoProvider;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.apache.jackrabbit.oak.spi.filter.PathFilter;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
 import org.jetbrains.annotations.Nullable;
 import org.slf4j.Logger;
@@ -49,6 +50,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
 
 import static java.util.Collections.unmodifiableSet;
 
@@ -137,6 +139,8 @@ public class FlatFileNodeStoreBuilder {
     private RevisionVector rootRevision = null;
     private DocumentNodeStore nodeStore = null;
     private MongoDocumentStore mongoDocumentStore = null;
+    private Set<IndexDefinition> indexDefinitions = null;
+
 
     public enum SortStrategyType {
         /**
@@ -204,6 +208,12 @@ public class FlatFileNodeStoreBuilder {
         return this;
     }
 
+    public FlatFileNodeStoreBuilder withIndexDefinitions(Set<IndexDefinition> 
indexDefinitions) {
+        this.indexDefinitions = indexDefinitions;
+        return this;
+    }
+
+
     public FlatFileNodeStoreBuilder withRootRevision(RevisionVector 
rootRevision) {
         this.rootRevision = rootRevision;
         return this;
@@ -323,8 +333,9 @@ public class FlatFileNodeStoreBuilder {
                         blobStore, dir, existingDataDumpDirs, algorithm, 
memoryManager, dumpThreshold, pathPredicate);
             case PIPELINED:
                 log.info("Using PipelinedStrategy");
+                List<PathFilter> pathFilters = 
indexDefinitions.stream().map(IndexDefinition::getPathFilter).collect(Collectors.toList());
                 return new PipelinedStrategy(mongoDocumentStore, nodeStore, 
rootRevision,
-                        preferredPathElements, blobStore, dir, algorithm, 
pathPredicate);
+                        preferredPathElements, blobStore, dir, algorithm, 
pathPredicate, pathFilters);
         }
         throw new IllegalStateException("Not a valid sort strategy value " + 
sortStrategyType);
     }
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
index f6c627b4e9..bb53f40a35 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
@@ -26,24 +26,33 @@ import com.mongodb.ReadPreference;
 import com.mongodb.client.FindIterable;
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoCursor;
+import com.mongodb.client.model.Filters;
 import org.apache.jackrabbit.guava.common.base.Preconditions;
 import org.apache.jackrabbit.guava.common.base.Stopwatch;
-import org.apache.jackrabbit.oak.plugins.document.Document;
+import org.apache.jackrabbit.oak.commons.PathUtils;
 import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
+import org.apache.jackrabbit.oak.spi.filter.PathFilter;
 import org.bson.BsonDocument;
+import org.bson.conversions.Bson;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
+import static com.mongodb.client.model.Filters.regex;
 import static com.mongodb.client.model.Sorts.ascending;
 
 public class PipelinedMongoDownloadTask implements 
Callable<PipelinedMongoDownloadTask.Result> {
@@ -72,6 +81,17 @@ public class PipelinedMongoDownloadTask implements 
Callable<PipelinedMongoDownlo
     public static final boolean 
DEFAULT_OAK_INDEXER_PIPELINED_RETRY_ON_CONNECTION_ERRORS = true;
     public static final String 
OAK_INDEXER_PIPELINED_MONGO_CONNECTION_RETRY_SECONDS = 
"oak.indexer.pipelined.mongoConnectionRetrySeconds";
     public static final int 
DEFAULT_OAK_INDEXER_PIPELINED_MONGO_CONNECTION_RETRY_SECONDS = 300;
+    /**
+     * Whether to do path filtering in the Mongo query instead of doing a full 
traversal of the document store and
+     * filtering in the indexing job. This feature may significantly reduce 
the number of documents downloaded from
+     * Mongo.
+     * The performance gains may not be proportional to the reduction in the 
number of documents downloaded because Mongo
+     * still has to traverse all the documents. This is the case because the 
regex expression used for path filtering
+     * starts with a wildcard (because the _id starts with the depth of the 
path, so the regex expression must ignore
+     * this part). Because of the wildcard at the start, Mongo cannot use of 
the index on _id.
+     */
+    public static final String 
OAK_INDEXER_PIPELINED_MONGO_REGEX_PATH_FILTERING = 
"oak.indexer.pipelined.mongoRegexPathFiltering";
+    public static final boolean 
DEFAULT_OAK_INDEXER_PIPELINED_MONGO_REGEX_PATH_FILTERING = false;
     // Use a short initial retry interval. In most cases if the connection to 
a replica fails, there will be other
     // replicas available so a reconnection attempt will succeed immediately.
     private final static long retryInitialIntervalMillis = 100;
@@ -80,11 +100,14 @@ public class PipelinedMongoDownloadTask implements 
Callable<PipelinedMongoDownlo
     // TODO: Revise this timeout. It is used to prevent the indexer from 
blocking forever if the queue is full.
     private static final Duration MONGO_QUEUE_OFFER_TIMEOUT = 
Duration.ofMinutes(30);
     private static final int MIN_INTERVAL_BETWEEN_DELAYED_ENQUEUING_MESSAGES = 
10;
+    private final static BsonDocument NATURAL_HINT = BsonDocument.parse("{ 
$natural:1}");
 
     private final int batchSize;
     private final BlockingQueue<BasicDBObject[]> mongoDocQueue;
+    private final List<PathFilter> pathFilters;
     private final int retryDuringSeconds;
     private final boolean retryOnConnectionErrors;
+    private final boolean regexPathFiltering;
     private final Logger traversalLog = 
LoggerFactory.getLogger(PipelinedMongoDownloadTask.class.getName() + 
".traversal");
     private final MongoCollection<BasicDBObject> dbCollection;
     private final ReadPreference readPreference;
@@ -98,10 +121,12 @@ public class PipelinedMongoDownloadTask implements 
Callable<PipelinedMongoDownlo
 
     public PipelinedMongoDownloadTask(MongoCollection<BasicDBObject> 
dbCollection,
                                       int batchSize,
-                                      BlockingQueue<BasicDBObject[]> queue) {
+                                      BlockingQueue<BasicDBObject[]> queue,
+                                      List<PathFilter> pathFilters) {
         this.dbCollection = dbCollection;
         this.batchSize = batchSize;
         this.mongoDocQueue = queue;
+        this.pathFilters = pathFilters;
         // Default retries for 5 minutes.
         this.retryDuringSeconds = ConfigHelper.getSystemPropertyAsInt(
                 OAK_INDEXER_PIPELINED_MONGO_CONNECTION_RETRY_SECONDS,
@@ -111,6 +136,9 @@ public class PipelinedMongoDownloadTask implements 
Callable<PipelinedMongoDownlo
         this.retryOnConnectionErrors = ConfigHelper.getSystemPropertyAsBoolean(
                 OAK_INDEXER_PIPELINED_RETRY_ON_CONNECTION_ERRORS,
                 DEFAULT_OAK_INDEXER_PIPELINED_RETRY_ON_CONNECTION_ERRORS);
+        this.regexPathFiltering = ConfigHelper.getSystemPropertyAsBoolean(
+                OAK_INDEXER_PIPELINED_MONGO_REGEX_PATH_FILTERING,
+                DEFAULT_OAK_INDEXER_PIPELINED_MONGO_REGEX_PATH_FILTERING);
 
         //TODO This may lead to reads being routed to secondary depending on 
MongoURI
         //So caller must ensure that its safe to read from secondary
@@ -127,67 +155,14 @@ public class PipelinedMongoDownloadTask implements 
Callable<PipelinedMongoDownlo
             LOG.info("Starting to download from MongoDB.");
             //TODO This may lead to reads being routed to secondary depending 
on MongoURI
             //So caller must ensure that its safe to read from secondary
-
             this.nextLastModified = 0;
             this.lastIdDownloaded = null;
 
-
             downloadStartWatch.start();
-            if (!retryOnConnectionErrors) {
-                downloadAll();
+            if (retryOnConnectionErrors) {
+                downloadWithRetryOnConnectionErrors();
             } else {
-                Instant failuresStartTimestamp = null; // When the last series 
of failures started
-                long retryIntervalMs = retryInitialIntervalMillis;
-                int numberOfFailures = 0;
-                boolean downloadCompleted = false;
-                Map<String, Integer> exceptions = new HashMap<>();
-                while (!downloadCompleted) {
-                    try {
-                        if (lastIdDownloaded != null) {
-                            LOG.info("Recovering from broken connection, 
finishing downloading documents with _modified={}", nextLastModified);
-                            downloadRange(new DownloadRange(nextLastModified, 
nextLastModified + 1, lastIdDownloaded));
-                            // We have managed to reconnect, reset the failure 
timestamp
-                            failuresStartTimestamp = null;
-                            numberOfFailures = 0;
-                            // Continue downloading everything starting from 
the next _lastmodified value
-                            downloadRange(new DownloadRange(nextLastModified + 
1, Long.MAX_VALUE, null));
-                        } else {
-                            downloadRange(new DownloadRange(nextLastModified, 
Long.MAX_VALUE, null));
-                        }
-                        downloadCompleted = true;
-                    } catch (MongoException e) {
-                        if (e instanceof MongoInterruptedException || e 
instanceof MongoIncompatibleDriverException) {
-                            // Non-recoverable exceptions
-                            throw e;
-                        }
-                        if (failuresStartTimestamp == null) {
-                            failuresStartTimestamp = 
Instant.now().truncatedTo(ChronoUnit.SECONDS);
-                        }
-                        LOG.warn("Connection error downloading from MongoDB.", 
e);
-                        long secondsSinceStartOfFailures = 
Duration.between(failuresStartTimestamp, Instant.now()).toSeconds();
-                        if (secondsSinceStartOfFailures > retryDuringSeconds) {
-                            // Give up. Get a string of all exceptions that 
were thrown
-                            StringBuilder summary = new StringBuilder();
-                            for (Map.Entry<String, Integer> entry : 
exceptions.entrySet()) {
-                                
summary.append("\n\t").append(entry.getValue()).append("x: 
").append(entry.getKey());
-                            }
-                            throw new RetryException(retryDuringSeconds, 
summary.toString(), e);
-                        } else {
-                            numberOfFailures++;
-                            LOG.warn("Retrying download in {} ms; number of 
times failed: {}; current series of failures started at: {} ({} seconds ago)",
-                                    retryIntervalMs, numberOfFailures, 
failuresStartTimestamp, secondsSinceStartOfFailures);
-                            exceptions.compute(e.getClass().getSimpleName() + 
" - " + e.getMessage(),
-                                    (key, val) -> val == null ? 1 : val + 1
-                            );
-                            try {
-                                Thread.sleep(retryIntervalMs);
-                            } catch (InterruptedException ignore) {
-                            }
-                            // simple exponential backoff mechanism
-                            retryIntervalMs = Math.min(retryMaxIntervalMillis, 
retryIntervalMs * 2);
-                        }
-                    }
-                }
+                downloadWithNaturalOrdering();
             }
             LOG.info("Terminating task. Downloaded {} Mongo documents in {}. 
Total enqueuing delay: {} ms ({}%)",
                     documentsRead, downloadStartWatch,
@@ -215,8 +190,87 @@ public class PipelinedMongoDownloadTask implements 
Callable<PipelinedMongoDownlo
         traversalLog.trace(id);
     }
 
-    private void downloadRange(DownloadRange range) throws 
InterruptedException, TimeoutException {
-        BsonDocument findQuery = range.getFindQuery();
+
+    private void downloadWithRetryOnConnectionErrors() throws 
InterruptedException, TimeoutException {
+        // If regex filtering is enabled, start by downloading the ancestors 
of the path used for filtering.
+        // That is, download "/", "/content", "/content/dam" for a base path 
of "/content/dam". These nodes will not be
+        // matched by the regex used in the Mongo query, which assumes a 
prefix of "???:/content/dam"
+        Bson childrenFilter = null;
+        String regexBasePath = getPathForRegexFiltering();
+        if (regexBasePath != null) {
+            // Regex path filtering is enabled
+            // Download the ancestors in a separate query. No retrials done on 
this query, as it will take only a few
+            // seconds and is done at the start of the job, so if it fails, 
the job can be retried without losing much work
+            LOG.info("Using regex filtering with path {}", regexBasePath);
+            Bson ancestorsQuery = ancestorsFilter(regexBasePath);
+            LOG.info("Downloading ancestors of {}", regexBasePath);
+            // Let Mongo decide which index to use for this query, it will 
return very few documents
+            FindIterable<BasicDBObject> mongoIterable = dbCollection
+                    .withReadPreference(readPreference)
+                    .find(ancestorsQuery);
+            download(mongoIterable);
+            // Filter to apply to the main query
+            childrenFilter = childrenFilter(regexBasePath);
+        }
+
+        Instant failuresStartTimestamp = null; // When the last series of 
failures started
+        long retryIntervalMs = retryInitialIntervalMillis;
+        int numberOfFailures = 0;
+        boolean downloadCompleted = false;
+        Map<String, Integer> exceptions = new HashMap<>();
+        this.nextLastModified = 0;
+        this.lastIdDownloaded = null;
+        while (!downloadCompleted) {
+            try {
+                if (lastIdDownloaded != null) {
+                    LOG.info("Recovering from broken connection, finishing 
downloading documents with _modified={}", nextLastModified);
+                    downloadRange(new DownloadRange(nextLastModified, 
nextLastModified + 1, lastIdDownloaded), childrenFilter);
+                    // We have managed to reconnect, reset the failure 
timestamp
+                    failuresStartTimestamp = null;
+                    numberOfFailures = 0;
+                    // Continue downloading everything starting from the next 
_lastmodified value
+                    downloadRange(new DownloadRange(nextLastModified + 1, 
Long.MAX_VALUE, null), childrenFilter);
+                } else {
+                    downloadRange(new DownloadRange(nextLastModified, 
Long.MAX_VALUE, null), childrenFilter);
+                }
+                downloadCompleted = true;
+            } catch (MongoException e) {
+                if (e instanceof MongoInterruptedException || e instanceof 
MongoIncompatibleDriverException) {
+                    // Non-recoverable exceptions
+                    throw e;
+                }
+                if (failuresStartTimestamp == null) {
+                    failuresStartTimestamp = 
Instant.now().truncatedTo(ChronoUnit.SECONDS);
+                }
+                LOG.warn("Connection error downloading from MongoDB.", e);
+                long secondsSinceStartOfFailures = 
Duration.between(failuresStartTimestamp, Instant.now()).toSeconds();
+                if (secondsSinceStartOfFailures > retryDuringSeconds) {
+                    // Give up. Get a string of all exceptions that were thrown
+                    StringBuilder summary = new StringBuilder();
+                    for (Map.Entry<String, Integer> entry : 
exceptions.entrySet()) {
+                        
summary.append("\n\t").append(entry.getValue()).append("x: 
").append(entry.getKey());
+                    }
+                    throw new RetryException(retryDuringSeconds, 
summary.toString(), e);
+                } else {
+                    numberOfFailures++;
+                    LOG.warn("Retrying download in {} ms; number of times 
failed: {}; current series of failures started at: {} ({} seconds ago)",
+                            retryIntervalMs, numberOfFailures, 
failuresStartTimestamp, secondsSinceStartOfFailures);
+                    exceptions.compute(e.getClass().getSimpleName() + " - " + 
e.getMessage(),
+                            (key, val) -> val == null ? 1 : val + 1
+                    );
+                    Thread.sleep(retryIntervalMs);
+                    // simple exponential backoff mechanism
+                    retryIntervalMs = Math.min(retryMaxIntervalMillis, 
retryIntervalMs * 2);
+                }
+            }
+        }
+    }
+
+    private void downloadRange(DownloadRange range, Bson filter) throws 
InterruptedException, TimeoutException {
+        Bson findQuery = range.getFindQuery();
+        if (filter != null) {
+            findQuery = Filters.and(findQuery, filter);
+        }
         LOG.info("Traversing: {}. Query: {}", range, findQuery);
         FindIterable<BasicDBObject> mongoIterable = dbCollection
                 .withReadPreference(readPreference)
@@ -225,12 +279,97 @@ public class PipelinedMongoDownloadTask implements 
Callable<PipelinedMongoDownlo
         download(mongoIterable);
     }
 
-    private void downloadAll() throws InterruptedException, TimeoutException {
-        LOG.info("Traversing all documents");
-        FindIterable<BasicDBObject> mongoIterable = dbCollection
-                .withReadPreference(readPreference)
-                .find();
-        download(mongoIterable);
+    private void downloadWithNaturalOrdering() throws InterruptedException, 
TimeoutException {
+        // We are downloading potentially a large fraction of the repository, 
so using an index scan will be
+        // inefficient. So we pass the natural hint to force MongoDB to use 
natural ordering, that is, column scan
+        String regexBasePath = getPathForRegexFiltering();
+        if (regexBasePath == null) {
+            LOG.info("Downloading full repository using natural order");
+            FindIterable<BasicDBObject> mongoIterable = dbCollection
+                    .withReadPreference(readPreference)
+                    .find()
+                    .hint(NATURAL_HINT);
+            download(mongoIterable);
+
+        } else {
+            Bson ancestorQuery = ancestorsFilter(regexBasePath);
+            // Do not use natural order to download ancestors. The number of 
ancestors will always be small, likely less
+            // than 10, so let MongoDB use an index to find them.
+            LOG.info("Downloading using regex path filtering. Downloading 
ancestors: {}.", ancestorQuery);
+            FindIterable<BasicDBObject> ancestorsIterable = dbCollection
+                    .withReadPreference(readPreference)
+                    .find(ancestorQuery);
+            download(ancestorsIterable);
+
+            Bson childrenQuery = childrenFilter(regexBasePath);
+            LOG.info("Downloading using regex path filtering. Downloading 
children: {}.", childrenQuery);
+            FindIterable<BasicDBObject> childrenIterable = dbCollection
+                    .withReadPreference(readPreference)
+                    .find(childrenQuery)
+                    .hint(NATURAL_HINT);
+            download(childrenIterable);
+        }
+    }
+
+    private String getPathForRegexFiltering() {
+        if (!regexPathFiltering) {
+            LOG.info("Regex path filtering disabled.");
+            return null;
+        }
+        return getSingleIncludedPath(pathFilters);
+    }
+
+    // Package private for testing
+    static String getSingleIncludedPath(List<PathFilter> pathFilters) {
+        // For the time being, we only enable path filtering if there is a 
single include path across all indexes and no
+        // exclude paths. This is the case for most of the larger indexes. We 
can consider generalizing this in the future.
+        LOG.info("Creating regex filter from pathFilters: " + pathFilters);
+        if (pathFilters == null) {
+            return null;
+        }
+        Set<String> includedPaths = pathFilters.stream()
+                .flatMap(pathFilter -> pathFilter.getIncludedPaths().stream())
+                .collect(Collectors.toSet());
+
+        Set<String> excludedPaths = pathFilters.stream()
+                .flatMap(pathFilter -> pathFilter.getExcludedPaths().stream())
+                .collect(Collectors.toSet());
+
+        if (excludedPaths.isEmpty() && includedPaths.size() == 1) {
+            return includedPaths.stream().iterator().next();
+        } else {
+            return null;
+        }
+    }
+
+    private static Bson childrenFilter(String path) {
+        if (!path.endsWith("/")) {
+            path = path + "/";
+        }
+        String quotedPath = Pattern.quote(path);
+        // For entries with path sizes above a certain threshold, the _id 
field contains a hash instead of the path of
+        // the entry. The path is stored instead in the _path field. 
Therefore, we have to include in the filter also
+        // the documents with matching _path.
+        return Filters.or(
+                regex(NodeDocument.ID, Pattern.compile("^[0-9]{1,3}:" + 
quotedPath + ".*$")),
+                regex(NodeDocument.PATH, Pattern.compile(quotedPath + ".*$"))
+        );
+    }
+
+    private static Bson ancestorsFilter(String path) {
+        ArrayList<Bson> parentFilters = new ArrayList<>();
+        int depth = PathUtils.getDepth(path);
+        // Explicitly list all ancestors in a or query.
+        while (true) {
+            parentFilters.add(Filters.eq(NodeDocument.ID, depth + ":" + path));
+            parentFilters.add(Filters.eq(NodeDocument.PATH, path));
+            if (depth == 0) {
+                break;
+            }
+            path = PathUtils.getParentPath(path);
+            depth--;
+        }
+        return Filters.or(parentFilters);
     }
 
     private void download(FindIterable<BasicDBObject> mongoIterable) throws 
InterruptedException, TimeoutException {
@@ -240,7 +379,7 @@ public class PipelinedMongoDownloadTask implements 
Callable<PipelinedMongoDownlo
             try {
                 while (cursor.hasNext()) {
                     BasicDBObject next = cursor.next();
-                    String id = next.getString(Document.ID);
+                    String id = next.getString(NodeDocument.ID);
                     // If we are retrying on connection errors, we need to 
keep track of the last _modified value
                     if (retryOnConnectionErrors) {
                         this.nextLastModified = 
next.getLong(NodeDocument.MODIFIED_IN_SECS);
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
index bdd94d1f9e..4379010aee 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
@@ -33,6 +33,7 @@ import 
org.apache.jackrabbit.oak.plugins.document.RevisionVector;
 import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
 import 
org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStoreHelper;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.apache.jackrabbit.oak.spi.filter.PathFilter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,6 +42,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutionException;
@@ -193,9 +195,16 @@ public class PipelinedStrategy implements SortStrategy {
 
     private final PathElementComparator pathComparator;
     private final Compression algorithm;
+    private final List<PathFilter> pathFilters;
     private long entryCount;
     private final Predicate<String> pathPredicate;
 
+    /**
+     *
+     * @param pathPredicate Used by the transform stage to test if a node 
should be kept or discarded.
+     * @param pathFilters If non-empty, the download stage will use these 
filters to try to create a query that downloads
+     *                    only the matching MongoDB documents.
+     */
     public PipelinedStrategy(MongoDocumentStore documentStore,
                              DocumentNodeStore documentNodeStore,
                              RevisionVector rootRevision,
@@ -203,7 +212,8 @@ public class PipelinedStrategy implements SortStrategy {
                              BlobStore blobStore,
                              File storeDir,
                              Compression algorithm,
-                             Predicate<String> pathPredicate) {
+                             Predicate<String> pathPredicate,
+                             List<PathFilter> pathFilters) {
         this.docStore = documentStore;
         this.documentNodeStore = documentNodeStore;
         this.rootRevision = rootRevision;
@@ -212,11 +222,11 @@ public class PipelinedStrategy implements SortStrategy {
         this.pathComparator = new PathElementComparator(preferredPathElements);
         this.pathPredicate = pathPredicate;
         this.algorithm = algorithm;
+        this.pathFilters = pathFilters;
 
         Preconditions.checkState(documentStore.isReadOnly(), "Traverser can 
only be used with readOnly store");
     }
 
-
     private int autodetectWorkingMemoryMB() {
         int maxHeapSizeMB = (int) (Runtime.getRuntime().maxMemory() / 
FileUtils.ONE_MB);
         int workingMemoryMB = maxHeapSizeMB - 2048;
@@ -311,7 +321,7 @@ public class PipelinedStrategy implements SortStrategy {
 
             Stopwatch start = Stopwatch.createStarted();
             MongoCollection<BasicDBObject> dbCollection = 
MongoDocumentStoreHelper.getDBCollection(docStore, Collection.NODES);
-            PipelinedMongoDownloadTask downloadTask = new 
PipelinedMongoDownloadTask(dbCollection, mongoBatchSize, mongoDocQueue);
+            PipelinedMongoDownloadTask downloadTask = new 
PipelinedMongoDownloadTask(dbCollection, mongoBatchSize, mongoDocQueue, 
pathFilters);
             ecs.submit(downloadTask);
 
             File flatFileStore = null;
diff --git 
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedIT.java
 
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedIT.java
index 452518513c..4785025e30 100644
--- 
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedIT.java
+++ 
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedIT.java
@@ -21,6 +21,7 @@ package 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.commons.Compression;
+import org.apache.jackrabbit.oak.commons.PathUtils;
 import org.apache.jackrabbit.oak.plugins.document.DocumentMK;
 import org.apache.jackrabbit.oak.plugins.document.DocumentMKBuilderProvider;
 import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
@@ -32,12 +33,12 @@ import 
org.apache.jackrabbit.oak.plugins.document.util.MongoConnection;
 import org.apache.jackrabbit.oak.spi.blob.MemoryBlobStore;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.filter.PathFilter;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
 import org.jetbrains.annotations.NotNull;
 import org.junit.After;
 import org.junit.Assume;
-import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Rule;
@@ -48,10 +49,15 @@ import org.junit.rules.TemporaryFolder;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
-import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
 import java.util.Set;
 import java.util.function.Predicate;
 
+import static 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedMongoDownloadTask.OAK_INDEXER_PIPELINED_MONGO_REGEX_PATH_FILTERING;
+import static 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedMongoDownloadTask.OAK_INDEXER_PIPELINED_RETRY_ON_CONNECTION_ERRORS;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
@@ -66,52 +72,129 @@ public class PipelinedIT {
     @Rule
     public final TemporaryFolder sortFolder = new TemporaryFolder();
 
+    private static final PathFilter contentDamPathFilter = new 
PathFilter(List.of("/content/dam"), List.of());
+
+    private static final int LONG_PATH_TEST_LEVELS = 30;
+    private static final String LONG_PATH_LEVEL_STRING = 
"Z12345678901234567890-Level_";
+
     @BeforeClass
-    public static void checkMongoDbAvailable() {
+    public static void setup() throws IOException {
         Assume.assumeTrue(MongoUtils.isAvailable());
-    }
-
-    @Before
-    public void setup() throws IOException {
+        // Generate dynamically the entries expected for the long path tests
+        StringBuilder path = new StringBuilder("/content/dam");
+        for (int i = 0; i < LONG_PATH_TEST_LEVELS; i++) {
+            path.append("/").append(LONG_PATH_LEVEL_STRING).append(i);
+            EXPECTED_FFS.add(path + "|{}");
+        }
     }
 
     @After
     public void tear() {
         MongoConnection c = connectionFactory.getConnection();
-        c.getDatabase().drop();
+        if (c != null) {
+            c.getDatabase().drop();
+        }
+    }
+
+    @Test
+    public void createFFS_retryOnMongoFailures_noMongoFiltering() throws 
Exception {
+        System.setProperty(OAK_INDEXER_PIPELINED_RETRY_ON_CONNECTION_ERRORS, 
"true");
+        System.setProperty(OAK_INDEXER_PIPELINED_MONGO_REGEX_PATH_FILTERING, 
"false");
+
+        Predicate<String> pathPredicate = s -> contentDamPathFilter.filter(s) 
!= PathFilter.Result.EXCLUDE;
+        List<PathFilter> pathFilters = null;
+
+        testSuccessfulDownload(pathPredicate, pathFilters);
+    }
+
+    @Test
+    public void createFFS_retryOnMongoFailures_mongoFiltering() throws 
Exception {
+        System.setProperty(OAK_INDEXER_PIPELINED_RETRY_ON_CONNECTION_ERRORS, 
"true");
+        System.setProperty(OAK_INDEXER_PIPELINED_MONGO_REGEX_PATH_FILTERING, 
"true");
+
+        Predicate<String> pathPredicate = s -> true;
+        List<PathFilter> pathFilters = List.of(contentDamPathFilter);
+
+        testSuccessfulDownload(pathPredicate, pathFilters);
+    }
+
+    @Test
+    public void createFFS_noRetryOnMongoFailures_mongoFiltering() throws 
Exception {
+        System.setProperty(OAK_INDEXER_PIPELINED_RETRY_ON_CONNECTION_ERRORS, 
"false");
+        System.setProperty(OAK_INDEXER_PIPELINED_MONGO_REGEX_PATH_FILTERING, 
"true");
+
+        Predicate<String> pathPredicate = s -> true;
+        List<PathFilter> pathFilters = List.of(contentDamPathFilter);
+
+        testSuccessfulDownload(pathPredicate, pathFilters);
     }
 
     @Test
-    public void createFFSWithPipelinedStrategy() throws Exception {
+    public void createFFS_noRetryOnMongoFailures_noMongoFiltering() throws 
Exception {
+        System.setProperty(OAK_INDEXER_PIPELINED_RETRY_ON_CONNECTION_ERRORS, 
"false");
+        System.setProperty(OAK_INDEXER_PIPELINED_MONGO_REGEX_PATH_FILTERING, 
"false");
+
+        Predicate<String> pathPredicate = s -> contentDamPathFilter.filter(s) 
!= PathFilter.Result.EXCLUDE;
+        List<PathFilter> pathFilters = List.of(new 
PathFilter(List.of("/content/dam"), List.of()));
+
+        testSuccessfulDownload(pathPredicate, pathFilters);
+    }
+
+    @Test
+    public void createFFS_filter_long_paths() throws Exception {
+        System.setProperty(OAK_INDEXER_PIPELINED_RETRY_ON_CONNECTION_ERRORS, 
"false");
+        System.setProperty(OAK_INDEXER_PIPELINED_MONGO_REGEX_PATH_FILTERING, 
"true");
+
+        // Create a filter on the node with the longest path
+        String longestLine = 
EXPECTED_FFS.stream().max(Comparator.comparingInt(String::length)).get();
+        String longestPath = longestLine.substring(0, 
longestLine.lastIndexOf("|"));
+        String parent = PathUtils.getParentPath(longestPath);
+        Predicate<String> pathPredicate = s -> true;
+        List<PathFilter> pathFilters = List.of(new PathFilter(List.of(parent), 
List.of()));
+
+        // The results should contain all the parents of the node with the 
longest path
+        ArrayList<String> expected = new ArrayList<>();
+        expected.add(longestPath + "|{}");
+        while (true) {
+            expected.add(parent + "|{}");
+            if (parent.equals("/")) {
+                break;
+            }
+            parent = PathUtils.getParentPath(parent);
+        }
+        // The list above has the longest paths first, reverse it to match the 
order in the FFS
+        Collections.reverse(expected);
+
+        testSuccessfulDownload(pathPredicate, pathFilters, expected);
+    }
+
+    private void testSuccessfulDownload(Predicate<String> pathPredicate, 
List<PathFilter> pathFilters)
+            throws CommitFailedException, IOException {
+        testSuccessfulDownload(pathPredicate, pathFilters, EXPECTED_FFS);
+    }
+
+    private void testSuccessfulDownload(Predicate<String> pathPredicate, 
List<PathFilter> pathFilters, List<String> expected)
+            throws CommitFailedException, IOException {
         ImmutablePair<MongoDocumentStore, DocumentNodeStore> rwStore = 
createNodeStore(false);
         createContent(rwStore.right);
 
         ImmutablePair<MongoDocumentStore, DocumentNodeStore> roStore = 
createNodeStore(true);
-        Predicate<String> pathPredicate = s -> s.startsWith("/content/dam");
-        PipelinedStrategy pipelinedStrategy = createStrategy(roStore, 
pathPredicate);
+
+        PipelinedStrategy pipelinedStrategy = createStrategy(roStore, 
pathPredicate, pathFilters);
 
         File file = pipelinedStrategy.createSortedStoreFile();
         assertTrue(file.exists());
-        assertEquals(Arrays.asList(new String[] {"/content/dam|{}",
-                        "/content/dam/1000|{}",
-                        "/content/dam/1000/12|{\"p1\":\"v100012\"}",
-                        "/content/dam/2022|{}",
-                        "/content/dam/2022/02|{\"p1\":\"v202202\"}",
-                        "/content/dam/2023|{\"p2\":\"v2023\"}",
-                        "/content/dam/2023/01|{\"p1\":\"v202301\"}",
-                        "/content/dam/2023/02|{}",
-                        "/content/dam/2023/02/28|{\"p1\":\"v20230228\"}"}),
-                Files.readAllLines(file.toPath()));
+        assertEquals(expected, Files.readAllLines(file.toPath()));
     }
 
     @Test
-    public void createFFSWithPipelinedStrategy_pathPredicateDoesNotMatch() 
throws Exception {
+    public void createFFS_pathPredicateDoesNotMatch() throws Exception {
         ImmutablePair<MongoDocumentStore, DocumentNodeStore> rwStore = 
createNodeStore(false);
         createContent(rwStore.right);
 
         ImmutablePair<MongoDocumentStore, DocumentNodeStore> roStore = 
createNodeStore(true);
         Predicate<String> pathPredicate = s -> 
s.startsWith("/content/dam/does-not-exist");
-        PipelinedStrategy pipelinedStrategy = createStrategy(roStore, 
pathPredicate);
+        PipelinedStrategy pipelinedStrategy = createStrategy(roStore, 
pathPredicate, null);
 
         File file = pipelinedStrategy.createSortedStoreFile();
 
@@ -120,7 +203,7 @@ public class PipelinedIT {
     }
 
     @Test
-    public void createFFSWithPipelinedStrategy_badNumberOfTransformThreads() 
throws CommitFailedException {
+    public void createFFS_badNumberOfTransformThreads() throws 
CommitFailedException {
         
System.setProperty(PipelinedStrategy.OAK_INDEXER_PIPELINED_TRANSFORM_THREADS, 
"0");
 
         ImmutablePair<MongoDocumentStore, DocumentNodeStore> rwStore = 
createNodeStore(false);
@@ -136,7 +219,7 @@ public class PipelinedIT {
     }
 
     @Test
-    public void createFFSWithPipelinedStrategy_badWorkingMemorySetting() 
throws CommitFailedException {
+    public void createFFS_badWorkingMemorySetting() throws 
CommitFailedException {
         
System.setProperty(PipelinedStrategy.OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB, 
"-1");
 
         ImmutablePair<MongoDocumentStore, DocumentNodeStore> rwStore = 
createNodeStore(false);
@@ -161,16 +244,16 @@ public class PipelinedIT {
 
         ImmutablePair<MongoDocumentStore, DocumentNodeStore> roStore = 
createNodeStore(true);
         Predicate<String> pathPredicate = s -> s.startsWith("/content/dam");
-        PipelinedStrategy pipelinedStrategy = createStrategy(roStore, 
pathPredicate);
+        PipelinedStrategy pipelinedStrategy = createStrategy(roStore, 
pathPredicate, null);
 
         pipelinedStrategy.createSortedStoreFile();
     }
 
     private PipelinedStrategy createStrategy(ImmutablePair<MongoDocumentStore, 
DocumentNodeStore> roStore) {
-        return createStrategy(roStore, s -> true);
+        return createStrategy(roStore, s -> true, null);
     }
 
-    private PipelinedStrategy createStrategy(ImmutablePair<MongoDocumentStore, 
DocumentNodeStore> roStore, Predicate<String> pathPredicate) {
+    private PipelinedStrategy createStrategy(ImmutablePair<MongoDocumentStore, 
DocumentNodeStore> roStore, Predicate<String> pathPredicate, List<PathFilter> 
pathFilters) {
         DocumentNodeStore readOnlyNodeStore = roStore.right;
         MongoDocumentStore readOnlyMongoDocStore = roStore.left;
 
@@ -184,8 +267,8 @@ public class PipelinedIT {
                 new MemoryBlobStore(),
                 sortFolder.getRoot(),
                 Compression.NONE,
-                pathPredicate
-        );
+                pathPredicate,
+                pathFilters);
     }
 
     private void createContent(NodeStore rwNodeStore) throws 
CommitFailedException {
@@ -197,9 +280,40 @@ public class PipelinedIT {
         contentDamBuilder.child("1000").child("12").setProperty("p1", 
"v100012");
         contentDamBuilder.child("2023").setProperty("p2", "v2023");
         
contentDamBuilder.child("2023").child("02").child("28").setProperty("p1", 
"v20230228");
+
+        // Node with very long name
+        @NotNull NodeBuilder node = contentDamBuilder;
+        for (int i = 0; i < LONG_PATH_TEST_LEVELS; i++) {
+            node = node.child(LONG_PATH_LEVEL_STRING + i);
+        }
+
+        // Other subtrees, to exercise filtering
+        rootBuilder.child("jcr:system").child("jcr:versionStorage")
+                
.child("42").child("41").child("1.0").child("jcr:frozenNode").child("nodes").child("node0");
+        
rootBuilder.child("home").child("users").child("system").child("cq:services").child("internal")
+                
.child("dam").child("foobar").child("rep:principalPolicy").child("entry2")
+                .child("rep:restrictions");
+        
rootBuilder.child("etc").child("scaffolding").child("jcr:content").child("cq:dialog")
+                
.child("content").child("items").child("tabs").child("items").child("basic")
+                .child("items");
+
         rwNodeStore.merge(rootBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
     }
 
+    private static final List<String> EXPECTED_FFS = new ArrayList<>(List.of(
+            "/|{}",
+            "/content|{}",
+            "/content/dam|{}",
+            "/content/dam/1000|{}",
+            "/content/dam/1000/12|{\"p1\":\"v100012\"}",
+            "/content/dam/2022|{}",
+            "/content/dam/2022/02|{\"p1\":\"v202202\"}",
+            "/content/dam/2023|{\"p2\":\"v2023\"}",
+            "/content/dam/2023/01|{\"p1\":\"v202301\"}",
+            "/content/dam/2023/02|{}",
+            "/content/dam/2023/02/28|{\"p1\":\"v20230228\"}"
+    ));
+
     private ImmutablePair<MongoDocumentStore, DocumentNodeStore> 
createNodeStore(boolean readOnly) {
         MongoConnection c = connectionFactory.getConnection();
         DocumentMK.Builder builder = builderProvider.newBuilder();
diff --git 
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTaskTest.java
 
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTaskTest.java
index cbbb644ba4..a9b2d002bd 100644
--- 
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTaskTest.java
+++ 
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTaskTest.java
@@ -25,6 +25,7 @@ import com.mongodb.client.FindIterable;
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoCursor;
 import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
+import org.apache.jackrabbit.oak.spi.filter.PathFilter;
 import org.bson.BsonDocument;
 import org.bson.conversions.Bson;
 import org.junit.Test;
@@ -37,6 +38,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
@@ -85,7 +87,7 @@ public class PipelinedMongoDownloadTaskTest {
 
         int batchSize = 100;
         BlockingQueue<BasicDBObject[]> queue = new ArrayBlockingQueue<>(100);
-        PipelinedMongoDownloadTask task = new 
PipelinedMongoDownloadTask(dbCollection, batchSize, queue);
+        PipelinedMongoDownloadTask task = new 
PipelinedMongoDownloadTask(dbCollection, batchSize, queue, null);
 
         // Execute
         PipelinedMongoDownloadTask.Result result = task.call();
@@ -101,4 +103,33 @@ public class PipelinedMongoDownloadTaskTest {
         verify(dbCollection).find(BsonDocument.parse("{\"_modified\": 
{\"$gte\": 123000, \"$lt\": 123001}, \"_id\": {\"$gt\": 
\"3:/content/dam/asset1\"}}"));
         verify(dbCollection).find(BsonDocument.parse("{\"_modified\": 
{\"$gte\": 123001}}"));
     }
+
+    @Test
+    public void regexFilters() {
+        assertNull(PipelinedMongoDownloadTask.getSingleIncludedPath(null));
+
+        
assertNull(PipelinedMongoDownloadTask.getSingleIncludedPath(List.of()));
+
+        List<PathFilter> singlePathFilter = List.of(
+                new PathFilter(List.of("/content/dam"), List.of())
+        );
+        assertEquals("/content/dam", 
PipelinedMongoDownloadTask.getSingleIncludedPath(singlePathFilter));
+
+        List<PathFilter> multipleIncludeFilters = List.of(
+                new PathFilter(List.of("/content/dam"), List.of()),
+                new PathFilter(List.of("/content/dam"), List.of())
+        );
+        assertEquals("/content/dam", 
PipelinedMongoDownloadTask.getSingleIncludedPath(multipleIncludeFilters));
+
+        List<PathFilter> multipleIncludeFiltersDifferent = List.of(
+                new PathFilter(List.of("/content/dam"), List.of()),
+                new PathFilter(List.of("/content/dam/collections"), List.of())
+        );
+        
assertNull(PipelinedMongoDownloadTask.getSingleIncludedPath(multipleIncludeFiltersDifferent));
+
+        List<PathFilter> withExcludeFilter = List.of(
+                new PathFilter(List.of("/"), List.of("/var"))
+        );
+        
assertNull(PipelinedMongoDownloadTask.getSingleIncludedPath(withExcludeFilter));
+    }
 }
\ No newline at end of file
diff --git 
a/oak-store-spi/src/main/java/org/apache/jackrabbit/oak/spi/filter/PathFilter.java
 
b/oak-store-spi/src/main/java/org/apache/jackrabbit/oak/spi/filter/PathFilter.java
index 439b71382a..71be083b24 100644
--- 
a/oak-store-spi/src/main/java/org/apache/jackrabbit/oak/spi/filter/PathFilter.java
+++ 
b/oak-store-spi/src/main/java/org/apache/jackrabbit/oak/spi/filter/PathFilter.java
@@ -23,6 +23,7 @@ package org.apache.jackrabbit.oak.spi.filter;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Set;
 
 import org.apache.jackrabbit.oak.api.PropertyState;
@@ -72,13 +73,12 @@ public class PathFilter {
         TRAVERSE
     }
 
-    private static final PathFilter ALL = new PathFilter(INCLUDE_ROOT, 
Collections.<String>emptyList()) {
+    private static final PathFilter ALL = new PathFilter(INCLUDE_ROOT, 
Collections.emptyList()) {
         @Override
         public Result filter(@NotNull String path) {
             return Result.INCLUDE;
         }
     };
-
     private final String[] includedPaths;
     private final String[] excludedPaths;
 
@@ -99,7 +99,7 @@ public class PathFilter {
         }
         return new PathFilter(getStrings(defn, PROP_INCLUDED_PATHS,
                 INCLUDE_ROOT), getStrings(defn, PROP_EXCLUDED_PATHS,
-                Collections.<String> emptyList()));
+                Collections.emptyList()));
     }
 
     /**
@@ -107,9 +107,8 @@ public class PathFilter {
      *
      * If both are empty then all paths would be considered to be included
      *
-     * @param includes list of paths which should not be included
-     * @param excludes list of p
-     *                 aths which should be included
+     * @param includes list of paths which should be included
+     * @param excludes list of paths which should not be included
      */
     public PathFilter(Iterable<String> includes, Iterable<String> excludes) {
         Set<String> includeCopy = newHashSet(includes);
@@ -149,6 +148,14 @@ public class PathFilter {
         return Result.EXCLUDE;
     }
 
+    public List<String> getIncludedPaths() {
+        return List.of(includedPaths);
+    }
+
+    public List<String> getExcludedPaths() {
+        return List.of(excludedPaths);
+    }
+
     @Override
     public String toString() {
         return "PathFilter{" +

Reply via email to