This is an automated email from the ASF dual-hosted git repository.
fortino pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2d51b15ee0 OAK-10358 - Filter by path on MongoDB query when
downloading repository in Indexing job (#1042)
2d51b15ee0 is described below
commit 2d51b15ee0255d3b2b72464b4ef43edaa786367c
Author: Nuno Santos <[email protected]>
AuthorDate: Wed Sep 6 17:39:34 2023 +0200
OAK-10358 - Filter by path on MongoDB query when downloading repository in
Indexing job (#1042)
* Initial implementation of regex path filtering on MongoDB download.
* Fix regex filter to also download documents with large paths (_id is a
hash, entry path is in the _path field).
* Revert changes to logback-test.xml.
* Use Pattern.quote instead of manually quoting.
* Minor refactoring.
* Apply regex filtering also in the cases where there are multiple indexes
all with the same includedPaths.
* Change default value of regex path filtering to false.
* Improve documentation.
* Fix documentation
* Add logging of metrics to reindex, merge in node store and total job time.
* Add handling of long paths to method that generates ancestors MongoDB
query
* Do not suppress InterruptedExceptions raised while waiting for retrying
connecting to MongoDB.
---
.../indexer/document/DocumentStoreIndexerBase.java | 10 +-
.../flatfile/FlatFileNodeStoreBuilder.java | 13 +-
.../pipelined/PipelinedMongoDownloadTask.java | 273 ++++++++++++++++-----
.../flatfile/pipelined/PipelinedStrategy.java | 16 +-
.../document/flatfile/pipelined/PipelinedIT.java | 174 ++++++++++---
.../pipelined/PipelinedMongoDownloadTaskTest.java | 33 ++-
.../jackrabbit/oak/spi/filter/PathFilter.java | 19 +-
7 files changed, 424 insertions(+), 114 deletions(-)
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
index df936687ee..7a31811de8 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
@@ -82,7 +82,6 @@ public abstract class DocumentStoreIndexerBase implements
Closeable {
protected final IndexHelper indexHelper;
protected List<NodeStateIndexerProvider> indexerProviders;
protected final IndexerSupport indexerSupport;
- private final Set<String> indexerPaths = new HashSet<>();
private static final int MAX_DOWNLOAD_ATTEMPTS =
Integer.parseInt(System.getProperty("oak.indexer.maxDownloadRetries", "5")) + 1;
public DocumentStoreIndexerBase(IndexHelper indexHelper, IndexerSupport
indexerSupport) {
@@ -109,15 +108,13 @@ public abstract class DocumentStoreIndexerBase implements
Closeable {
private final DocumentNodeStore documentNodeStore;
private final MongoDocumentStore documentStore;
private final Logger traversalLogger;
- private final CompositeIndexer indexer;
private MongoNodeStateEntryTraverserFactory(RevisionVector
rootRevision, DocumentNodeStore documentNodeStore,
- MongoDocumentStore
documentStore, Logger traversalLogger, CompositeIndexer indexer) {
+ MongoDocumentStore
documentStore, Logger traversalLogger) {
this.rootRevision = rootRevision;
this.documentNodeStore = documentNodeStore;
this.documentStore = documentStore;
this.traversalLogger = traversalLogger;
- this.indexer = indexer;
}
@Override
@@ -165,11 +162,12 @@ public abstract class DocumentStoreIndexerBase implements
Closeable {
.withPreferredPathElements((preferredPathElements !=
null) ? preferredPathElements : indexer.getRelativeIndexedNodeNames())
.addExistingDataDumpDir(indexerSupport.getExistingDataDumpDir())
.withPathPredicate(pathPredicate)
+ .withIndexDefinitions(indexDefinitions)
.withRootRevision(rootDocumentState.getRootRevision())
.withNodeStore(nodeStore)
.withMongoDocumentStore(getMongoDocumentStore())
.withNodeStateEntryTraverserFactory(new
MongoNodeStateEntryTraverserFactory(rootDocumentState.getRootRevision(),
- nodeStore, getMongoDocumentStore(),
traversalLog, indexer));
+ nodeStore, getMongoDocumentStore(),
traversalLog));
for (File dir : previousDownloadDirs) {
builder.addExistingDataDumpDir(dir);
}
@@ -219,6 +217,7 @@ public abstract class DocumentStoreIndexerBase implements
Closeable {
for (IndexDefinition indexDf : indexDefinitions) {
preferredPathElements.addAll(indexDf.getRelativeNodeNames());
}
+
Predicate<String> predicate = s ->
indexDefinitions.stream().anyMatch(indexDef ->
indexDef.getPathFilter().filter(s) != PathFilter.Result.EXCLUDE);
FlatFileStore flatFileStore =
buildFlatFileStoreList(checkpointedState, null, predicate,
preferredPathElements,
IndexerConfiguration.parallelIndexEnabled(), indexDefinitions).get(0);
@@ -386,7 +385,6 @@ public abstract class DocumentStoreIndexerBase implements
Closeable {
indexers.add(indexer);
closer.register(indexer);
progressReporter.registerIndex(indexPath, true, -1);
- indexerPaths.add(indexPath);
}
}
}
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java
index 7309d4ff99..ca4bd45822 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java
@@ -35,6 +35,7 @@ import
org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
import org.apache.jackrabbit.oak.query.NodeStateNodeTypeInfoProvider;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.apache.jackrabbit.oak.spi.filter.PathFilter;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
@@ -49,6 +50,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
+import java.util.stream.Collectors;
import static java.util.Collections.unmodifiableSet;
@@ -137,6 +139,8 @@ public class FlatFileNodeStoreBuilder {
private RevisionVector rootRevision = null;
private DocumentNodeStore nodeStore = null;
private MongoDocumentStore mongoDocumentStore = null;
+ private Set<IndexDefinition> indexDefinitions = null;
+
public enum SortStrategyType {
/**
@@ -204,6 +208,12 @@ public class FlatFileNodeStoreBuilder {
return this;
}
+ public FlatFileNodeStoreBuilder withIndexDefinitions(Set<IndexDefinition>
indexDefinitions) {
+ this.indexDefinitions = indexDefinitions;
+ return this;
+ }
+
+
public FlatFileNodeStoreBuilder withRootRevision(RevisionVector
rootRevision) {
this.rootRevision = rootRevision;
return this;
@@ -323,8 +333,9 @@ public class FlatFileNodeStoreBuilder {
blobStore, dir, existingDataDumpDirs, algorithm,
memoryManager, dumpThreshold, pathPredicate);
case PIPELINED:
log.info("Using PipelinedStrategy");
+ List<PathFilter> pathFilters =
indexDefinitions.stream().map(IndexDefinition::getPathFilter).collect(Collectors.toList());
return new PipelinedStrategy(mongoDocumentStore, nodeStore,
rootRevision,
- preferredPathElements, blobStore, dir, algorithm,
pathPredicate);
+ preferredPathElements, blobStore, dir, algorithm,
pathPredicate, pathFilters);
}
throw new IllegalStateException("Not a valid sort strategy value " +
sortStrategyType);
}
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
index f6c627b4e9..bb53f40a35 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
@@ -26,24 +26,33 @@ import com.mongodb.ReadPreference;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
+import com.mongodb.client.model.Filters;
import org.apache.jackrabbit.guava.common.base.Preconditions;
import org.apache.jackrabbit.guava.common.base.Stopwatch;
-import org.apache.jackrabbit.oak.plugins.document.Document;
+import org.apache.jackrabbit.oak.commons.PathUtils;
import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
+import org.apache.jackrabbit.oak.spi.filter.PathFilter;
import org.bson.BsonDocument;
+import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import static com.mongodb.client.model.Filters.regex;
import static com.mongodb.client.model.Sorts.ascending;
public class PipelinedMongoDownloadTask implements
Callable<PipelinedMongoDownloadTask.Result> {
@@ -72,6 +81,17 @@ public class PipelinedMongoDownloadTask implements
Callable<PipelinedMongoDownlo
public static final boolean
DEFAULT_OAK_INDEXER_PIPELINED_RETRY_ON_CONNECTION_ERRORS = true;
public static final String
OAK_INDEXER_PIPELINED_MONGO_CONNECTION_RETRY_SECONDS =
"oak.indexer.pipelined.mongoConnectionRetrySeconds";
public static final int
DEFAULT_OAK_INDEXER_PIPELINED_MONGO_CONNECTION_RETRY_SECONDS = 300;
+ /**
+ * Whether to do path filtering in the Mongo query instead of doing a full
traversal of the document store and
+ * filtering in the indexing job. This feature may significantly reduce
the number of documents downloaded from
+ * Mongo.
+ * The performance gains may not be proportional to the reduction in the
number of documents downloaded because Mongo
+ * still has to traverse all the documents. This is the case because the
regex expression used for path filtering
+ * starts with a wildcard (because the _id starts with the depth of the
path, so the regex expression must ignore
+ * this part). Because of the wildcard at the start, Mongo cannot use of
the index on _id.
+ */
+ public static final String
OAK_INDEXER_PIPELINED_MONGO_REGEX_PATH_FILTERING =
"oak.indexer.pipelined.mongoRegexPathFiltering";
+ public static final boolean
DEFAULT_OAK_INDEXER_PIPELINED_MONGO_REGEX_PATH_FILTERING = false;
// Use a short initial retry interval. In most cases if the connection to
a replica fails, there will be other
// replicas available so a reconnection attempt will succeed immediately.
private final static long retryInitialIntervalMillis = 100;
@@ -80,11 +100,14 @@ public class PipelinedMongoDownloadTask implements
Callable<PipelinedMongoDownlo
// TODO: Revise this timeout. It is used to prevent the indexer from
blocking forever if the queue is full.
private static final Duration MONGO_QUEUE_OFFER_TIMEOUT =
Duration.ofMinutes(30);
private static final int MIN_INTERVAL_BETWEEN_DELAYED_ENQUEUING_MESSAGES =
10;
+ private final static BsonDocument NATURAL_HINT = BsonDocument.parse("{
$natural:1}");
private final int batchSize;
private final BlockingQueue<BasicDBObject[]> mongoDocQueue;
+ private final List<PathFilter> pathFilters;
private final int retryDuringSeconds;
private final boolean retryOnConnectionErrors;
+ private final boolean regexPathFiltering;
private final Logger traversalLog =
LoggerFactory.getLogger(PipelinedMongoDownloadTask.class.getName() +
".traversal");
private final MongoCollection<BasicDBObject> dbCollection;
private final ReadPreference readPreference;
@@ -98,10 +121,12 @@ public class PipelinedMongoDownloadTask implements
Callable<PipelinedMongoDownlo
public PipelinedMongoDownloadTask(MongoCollection<BasicDBObject>
dbCollection,
int batchSize,
- BlockingQueue<BasicDBObject[]> queue) {
+ BlockingQueue<BasicDBObject[]> queue,
+ List<PathFilter> pathFilters) {
this.dbCollection = dbCollection;
this.batchSize = batchSize;
this.mongoDocQueue = queue;
+ this.pathFilters = pathFilters;
// Default retries for 5 minutes.
this.retryDuringSeconds = ConfigHelper.getSystemPropertyAsInt(
OAK_INDEXER_PIPELINED_MONGO_CONNECTION_RETRY_SECONDS,
@@ -111,6 +136,9 @@ public class PipelinedMongoDownloadTask implements
Callable<PipelinedMongoDownlo
this.retryOnConnectionErrors = ConfigHelper.getSystemPropertyAsBoolean(
OAK_INDEXER_PIPELINED_RETRY_ON_CONNECTION_ERRORS,
DEFAULT_OAK_INDEXER_PIPELINED_RETRY_ON_CONNECTION_ERRORS);
+ this.regexPathFiltering = ConfigHelper.getSystemPropertyAsBoolean(
+ OAK_INDEXER_PIPELINED_MONGO_REGEX_PATH_FILTERING,
+ DEFAULT_OAK_INDEXER_PIPELINED_MONGO_REGEX_PATH_FILTERING);
//TODO This may lead to reads being routed to secondary depending on
MongoURI
//So caller must ensure that its safe to read from secondary
@@ -127,67 +155,14 @@ public class PipelinedMongoDownloadTask implements
Callable<PipelinedMongoDownlo
LOG.info("Starting to download from MongoDB.");
//TODO This may lead to reads being routed to secondary depending
on MongoURI
//So caller must ensure that its safe to read from secondary
-
this.nextLastModified = 0;
this.lastIdDownloaded = null;
-
downloadStartWatch.start();
- if (!retryOnConnectionErrors) {
- downloadAll();
+ if (retryOnConnectionErrors) {
+ downloadWithRetryOnConnectionErrors();
} else {
- Instant failuresStartTimestamp = null; // When the last series
of failures started
- long retryIntervalMs = retryInitialIntervalMillis;
- int numberOfFailures = 0;
- boolean downloadCompleted = false;
- Map<String, Integer> exceptions = new HashMap<>();
- while (!downloadCompleted) {
- try {
- if (lastIdDownloaded != null) {
- LOG.info("Recovering from broken connection,
finishing downloading documents with _modified={}", nextLastModified);
- downloadRange(new DownloadRange(nextLastModified,
nextLastModified + 1, lastIdDownloaded));
- // We have managed to reconnect, reset the failure
timestamp
- failuresStartTimestamp = null;
- numberOfFailures = 0;
- // Continue downloading everything starting from
the next _lastmodified value
- downloadRange(new DownloadRange(nextLastModified +
1, Long.MAX_VALUE, null));
- } else {
- downloadRange(new DownloadRange(nextLastModified,
Long.MAX_VALUE, null));
- }
- downloadCompleted = true;
- } catch (MongoException e) {
- if (e instanceof MongoInterruptedException || e
instanceof MongoIncompatibleDriverException) {
- // Non-recoverable exceptions
- throw e;
- }
- if (failuresStartTimestamp == null) {
- failuresStartTimestamp =
Instant.now().truncatedTo(ChronoUnit.SECONDS);
- }
- LOG.warn("Connection error downloading from MongoDB.",
e);
- long secondsSinceStartOfFailures =
Duration.between(failuresStartTimestamp, Instant.now()).toSeconds();
- if (secondsSinceStartOfFailures > retryDuringSeconds) {
- // Give up. Get a string of all exceptions that
were thrown
- StringBuilder summary = new StringBuilder();
- for (Map.Entry<String, Integer> entry :
exceptions.entrySet()) {
-
summary.append("\n\t").append(entry.getValue()).append("x:
").append(entry.getKey());
- }
- throw new RetryException(retryDuringSeconds,
summary.toString(), e);
- } else {
- numberOfFailures++;
- LOG.warn("Retrying download in {} ms; number of
times failed: {}; current series of failures started at: {} ({} seconds ago)",
- retryIntervalMs, numberOfFailures,
failuresStartTimestamp, secondsSinceStartOfFailures);
- exceptions.compute(e.getClass().getSimpleName() +
" - " + e.getMessage(),
- (key, val) -> val == null ? 1 : val + 1
- );
- try {
- Thread.sleep(retryIntervalMs);
- } catch (InterruptedException ignore) {
- }
- // simple exponential backoff mechanism
- retryIntervalMs = Math.min(retryMaxIntervalMillis,
retryIntervalMs * 2);
- }
- }
- }
+ downloadWithNaturalOrdering();
}
LOG.info("Terminating task. Downloaded {} Mongo documents in {}.
Total enqueuing delay: {} ms ({}%)",
documentsRead, downloadStartWatch,
@@ -215,8 +190,87 @@ public class PipelinedMongoDownloadTask implements
Callable<PipelinedMongoDownlo
traversalLog.trace(id);
}
- private void downloadRange(DownloadRange range) throws
InterruptedException, TimeoutException {
- BsonDocument findQuery = range.getFindQuery();
+
+ private void downloadWithRetryOnConnectionErrors() throws
InterruptedException, TimeoutException {
+ // If regex filtering is enabled, start by downloading the ancestors
of the path used for filtering.
+ // That is, download "/", "/content", "/content/dam" for a base path
of "/content/dam". These nodes will not be
+ // matched by the regex used in the Mongo query, which assumes a
prefix of "???:/content/dam"
+ Bson childrenFilter = null;
+ String regexBasePath = getPathForRegexFiltering();
+ if (regexBasePath != null) {
+ // Regex path filtering is enabled
+ // Download the ancestors in a separate query. No retrials done on
this query, as it will take only a few
+ // seconds and is done at the start of the job, so if it fails,
the job can be retried without losing much work
+ LOG.info("Using regex filtering with path {}", regexBasePath);
+ Bson ancestorsQuery = ancestorsFilter(regexBasePath);
+ LOG.info("Downloading ancestors of {}", regexBasePath);
+ // Let Mongo decide which index to use for this query, it will
return very few documents
+ FindIterable<BasicDBObject> mongoIterable = dbCollection
+ .withReadPreference(readPreference)
+ .find(ancestorsQuery);
+ download(mongoIterable);
+ // Filter to apply to the main query
+ childrenFilter = childrenFilter(regexBasePath);
+ }
+
+ Instant failuresStartTimestamp = null; // When the last series of
failures started
+ long retryIntervalMs = retryInitialIntervalMillis;
+ int numberOfFailures = 0;
+ boolean downloadCompleted = false;
+ Map<String, Integer> exceptions = new HashMap<>();
+ this.nextLastModified = 0;
+ this.lastIdDownloaded = null;
+ while (!downloadCompleted) {
+ try {
+ if (lastIdDownloaded != null) {
+ LOG.info("Recovering from broken connection, finishing
downloading documents with _modified={}", nextLastModified);
+ downloadRange(new DownloadRange(nextLastModified,
nextLastModified + 1, lastIdDownloaded), childrenFilter);
+ // We have managed to reconnect, reset the failure
timestamp
+ failuresStartTimestamp = null;
+ numberOfFailures = 0;
+ // Continue downloading everything starting from the next
_lastmodified value
+ downloadRange(new DownloadRange(nextLastModified + 1,
Long.MAX_VALUE, null), childrenFilter);
+ } else {
+ downloadRange(new DownloadRange(nextLastModified,
Long.MAX_VALUE, null), childrenFilter);
+ }
+ downloadCompleted = true;
+ } catch (MongoException e) {
+ if (e instanceof MongoInterruptedException || e instanceof
MongoIncompatibleDriverException) {
+ // Non-recoverable exceptions
+ throw e;
+ }
+ if (failuresStartTimestamp == null) {
+ failuresStartTimestamp =
Instant.now().truncatedTo(ChronoUnit.SECONDS);
+ }
+ LOG.warn("Connection error downloading from MongoDB.", e);
+ long secondsSinceStartOfFailures =
Duration.between(failuresStartTimestamp, Instant.now()).toSeconds();
+ if (secondsSinceStartOfFailures > retryDuringSeconds) {
+ // Give up. Get a string of all exceptions that were thrown
+ StringBuilder summary = new StringBuilder();
+ for (Map.Entry<String, Integer> entry :
exceptions.entrySet()) {
+
summary.append("\n\t").append(entry.getValue()).append("x:
").append(entry.getKey());
+ }
+ throw new RetryException(retryDuringSeconds,
summary.toString(), e);
+ } else {
+ numberOfFailures++;
+ LOG.warn("Retrying download in {} ms; number of times
failed: {}; current series of failures started at: {} ({} seconds ago)",
+ retryIntervalMs, numberOfFailures,
failuresStartTimestamp, secondsSinceStartOfFailures);
+ exceptions.compute(e.getClass().getSimpleName() + " - " +
e.getMessage(),
+ (key, val) -> val == null ? 1 : val + 1
+ );
+ Thread.sleep(retryIntervalMs);
+ // simple exponential backoff mechanism
+ retryIntervalMs = Math.min(retryMaxIntervalMillis,
retryIntervalMs * 2);
+ }
+ }
+ }
+ }
+
+ private void downloadRange(DownloadRange range, Bson filter) throws
InterruptedException, TimeoutException {
+ Bson findQuery = range.getFindQuery();
+ if (filter != null) {
+ findQuery = Filters.and(findQuery, filter);
+ }
LOG.info("Traversing: {}. Query: {}", range, findQuery);
FindIterable<BasicDBObject> mongoIterable = dbCollection
.withReadPreference(readPreference)
@@ -225,12 +279,97 @@ public class PipelinedMongoDownloadTask implements
Callable<PipelinedMongoDownlo
download(mongoIterable);
}
- private void downloadAll() throws InterruptedException, TimeoutException {
- LOG.info("Traversing all documents");
- FindIterable<BasicDBObject> mongoIterable = dbCollection
- .withReadPreference(readPreference)
- .find();
- download(mongoIterable);
+ private void downloadWithNaturalOrdering() throws InterruptedException,
TimeoutException {
+ // We are downloading potentially a large fraction of the repository,
so using an index scan will be
+ // inefficient. So we pass the natural hint to force MongoDB to use
natural ordering, that is, column scan
+ String regexBasePath = getPathForRegexFiltering();
+ if (regexBasePath == null) {
+ LOG.info("Downloading full repository using natural order");
+ FindIterable<BasicDBObject> mongoIterable = dbCollection
+ .withReadPreference(readPreference)
+ .find()
+ .hint(NATURAL_HINT);
+ download(mongoIterable);
+
+ } else {
+ Bson ancestorQuery = ancestorsFilter(regexBasePath);
+ // Do not use natural order to download ancestors. The number of
ancestors will always be small, likely less
+ // than 10, so let MongoDB use an index to find them.
+ LOG.info("Downloading using regex path filtering. Downloading
ancestors: {}.", ancestorQuery);
+ FindIterable<BasicDBObject> ancestorsIterable = dbCollection
+ .withReadPreference(readPreference)
+ .find(ancestorQuery);
+ download(ancestorsIterable);
+
+ Bson childrenQuery = childrenFilter(regexBasePath);
+ LOG.info("Downloading using regex path filtering. Downloading
children: {}.", childrenQuery);
+ FindIterable<BasicDBObject> childrenIterable = dbCollection
+ .withReadPreference(readPreference)
+ .find(childrenQuery)
+ .hint(NATURAL_HINT);
+ download(childrenIterable);
+ }
+ }
+
+ private String getPathForRegexFiltering() {
+ if (!regexPathFiltering) {
+ LOG.info("Regex path filtering disabled.");
+ return null;
+ }
+ return getSingleIncludedPath(pathFilters);
+ }
+
+ // Package private for testing
+ static String getSingleIncludedPath(List<PathFilter> pathFilters) {
+ // For the time being, we only enable path filtering if there is a
single include path across all indexes and no
+ // exclude paths. This is the case for most of the larger indexes. We
can consider generalizing this in the future.
+ LOG.info("Creating regex filter from pathFilters: " + pathFilters);
+ if (pathFilters == null) {
+ return null;
+ }
+ Set<String> includedPaths = pathFilters.stream()
+ .flatMap(pathFilter -> pathFilter.getIncludedPaths().stream())
+ .collect(Collectors.toSet());
+
+ Set<String> excludedPaths = pathFilters.stream()
+ .flatMap(pathFilter -> pathFilter.getExcludedPaths().stream())
+ .collect(Collectors.toSet());
+
+ if (excludedPaths.isEmpty() && includedPaths.size() == 1) {
+ return includedPaths.stream().iterator().next();
+ } else {
+ return null;
+ }
+ }
+
+ private static Bson childrenFilter(String path) {
+ if (!path.endsWith("/")) {
+ path = path + "/";
+ }
+ String quotedPath = Pattern.quote(path);
+ // For entries with path sizes above a certain threshold, the _id
field contains a hash instead of the path of
+ // the entry. The path is stored instead in the _path field.
Therefore, we have to include in the filter also
+ // the documents with matching _path.
+ return Filters.or(
+ regex(NodeDocument.ID, Pattern.compile("^[0-9]{1,3}:" +
quotedPath + ".*$")),
+ regex(NodeDocument.PATH, Pattern.compile(quotedPath + ".*$"))
+ );
+ }
+
+ private static Bson ancestorsFilter(String path) {
+ ArrayList<Bson> parentFilters = new ArrayList<>();
+ int depth = PathUtils.getDepth(path);
+ // Explicitly list all ancestors in a or query.
+ while (true) {
+ parentFilters.add(Filters.eq(NodeDocument.ID, depth + ":" + path));
+ parentFilters.add(Filters.eq(NodeDocument.PATH, path));
+ if (depth == 0) {
+ break;
+ }
+ path = PathUtils.getParentPath(path);
+ depth--;
+ }
+ return Filters.or(parentFilters);
}
private void download(FindIterable<BasicDBObject> mongoIterable) throws
InterruptedException, TimeoutException {
@@ -240,7 +379,7 @@ public class PipelinedMongoDownloadTask implements
Callable<PipelinedMongoDownlo
try {
while (cursor.hasNext()) {
BasicDBObject next = cursor.next();
- String id = next.getString(Document.ID);
+ String id = next.getString(NodeDocument.ID);
// If we are retrying on connection errors, we need to
keep track of the last _modified value
if (retryOnConnectionErrors) {
this.nextLastModified =
next.getLong(NodeDocument.MODIFIED_IN_SECS);
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
index bdd94d1f9e..4379010aee 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
@@ -33,6 +33,7 @@ import
org.apache.jackrabbit.oak.plugins.document.RevisionVector;
import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
import
org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStoreHelper;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.apache.jackrabbit.oak.spi.filter.PathFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,6 +42,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
@@ -193,9 +195,16 @@ public class PipelinedStrategy implements SortStrategy {
private final PathElementComparator pathComparator;
private final Compression algorithm;
+ private final List<PathFilter> pathFilters;
private long entryCount;
private final Predicate<String> pathPredicate;
+ /**
+ *
+ * @param pathPredicate Used by the transform stage to test if a node
should be kept or discarded.
+ * @param pathFilters If non-empty, the download stage will use these
filters to try to create a query that downloads
+ * only the matching MongoDB documents.
+ */
public PipelinedStrategy(MongoDocumentStore documentStore,
DocumentNodeStore documentNodeStore,
RevisionVector rootRevision,
@@ -203,7 +212,8 @@ public class PipelinedStrategy implements SortStrategy {
BlobStore blobStore,
File storeDir,
Compression algorithm,
- Predicate<String> pathPredicate) {
+ Predicate<String> pathPredicate,
+ List<PathFilter> pathFilters) {
this.docStore = documentStore;
this.documentNodeStore = documentNodeStore;
this.rootRevision = rootRevision;
@@ -212,11 +222,11 @@ public class PipelinedStrategy implements SortStrategy {
this.pathComparator = new PathElementComparator(preferredPathElements);
this.pathPredicate = pathPredicate;
this.algorithm = algorithm;
+ this.pathFilters = pathFilters;
Preconditions.checkState(documentStore.isReadOnly(), "Traverser can
only be used with readOnly store");
}
-
private int autodetectWorkingMemoryMB() {
int maxHeapSizeMB = (int) (Runtime.getRuntime().maxMemory() /
FileUtils.ONE_MB);
int workingMemoryMB = maxHeapSizeMB - 2048;
@@ -311,7 +321,7 @@ public class PipelinedStrategy implements SortStrategy {
Stopwatch start = Stopwatch.createStarted();
MongoCollection<BasicDBObject> dbCollection =
MongoDocumentStoreHelper.getDBCollection(docStore, Collection.NODES);
- PipelinedMongoDownloadTask downloadTask = new
PipelinedMongoDownloadTask(dbCollection, mongoBatchSize, mongoDocQueue);
+ PipelinedMongoDownloadTask downloadTask = new
PipelinedMongoDownloadTask(dbCollection, mongoBatchSize, mongoDocQueue,
pathFilters);
ecs.submit(downloadTask);
File flatFileStore = null;
diff --git
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedIT.java
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedIT.java
index 452518513c..4785025e30 100644
---
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedIT.java
+++
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedIT.java
@@ -21,6 +21,7 @@ package
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.commons.Compression;
+import org.apache.jackrabbit.oak.commons.PathUtils;
import org.apache.jackrabbit.oak.plugins.document.DocumentMK;
import org.apache.jackrabbit.oak.plugins.document.DocumentMKBuilderProvider;
import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
@@ -32,12 +33,12 @@ import
org.apache.jackrabbit.oak.plugins.document.util.MongoConnection;
import org.apache.jackrabbit.oak.spi.blob.MemoryBlobStore;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.filter.PathFilter;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.jetbrains.annotations.NotNull;
import org.junit.After;
import org.junit.Assume;
-import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
@@ -48,10 +49,15 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
-import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
+import static
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedMongoDownloadTask.OAK_INDEXER_PIPELINED_MONGO_REGEX_PATH_FILTERING;
+import static
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedMongoDownloadTask.OAK_INDEXER_PIPELINED_RETRY_ON_CONNECTION_ERRORS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
@@ -66,52 +72,129 @@ public class PipelinedIT {
@Rule
public final TemporaryFolder sortFolder = new TemporaryFolder();
+ private static final PathFilter contentDamPathFilter = new
PathFilter(List.of("/content/dam"), List.of());
+
+ private static final int LONG_PATH_TEST_LEVELS = 30;
+ private static final String LONG_PATH_LEVEL_STRING =
"Z12345678901234567890-Level_";
+
@BeforeClass
- public static void checkMongoDbAvailable() {
+ public static void setup() throws IOException {
Assume.assumeTrue(MongoUtils.isAvailable());
- }
-
- @Before
- public void setup() throws IOException {
+ // Generate dynamically the entries expected for the long path tests
+ StringBuilder path = new StringBuilder("/content/dam");
+ for (int i = 0; i < LONG_PATH_TEST_LEVELS; i++) {
+ path.append("/").append(LONG_PATH_LEVEL_STRING).append(i);
+ EXPECTED_FFS.add(path + "|{}");
+ }
}
@After
public void tear() {
MongoConnection c = connectionFactory.getConnection();
- c.getDatabase().drop();
+ if (c != null) {
+ c.getDatabase().drop();
+ }
+ }
+
+ @Test
+ public void createFFS_retryOnMongoFailures_noMongoFiltering() throws
Exception {
+ System.setProperty(OAK_INDEXER_PIPELINED_RETRY_ON_CONNECTION_ERRORS,
"true");
+ System.setProperty(OAK_INDEXER_PIPELINED_MONGO_REGEX_PATH_FILTERING,
"false");
+
+ Predicate<String> pathPredicate = s -> contentDamPathFilter.filter(s)
!= PathFilter.Result.EXCLUDE;
+ List<PathFilter> pathFilters = null;
+
+ testSuccessfulDownload(pathPredicate, pathFilters);
+ }
+
+ @Test
+ public void createFFS_retryOnMongoFailures_mongoFiltering() throws
Exception {
+ System.setProperty(OAK_INDEXER_PIPELINED_RETRY_ON_CONNECTION_ERRORS,
"true");
+ System.setProperty(OAK_INDEXER_PIPELINED_MONGO_REGEX_PATH_FILTERING,
"true");
+
+ Predicate<String> pathPredicate = s -> true;
+ List<PathFilter> pathFilters = List.of(contentDamPathFilter);
+
+ testSuccessfulDownload(pathPredicate, pathFilters);
+ }
+
+ @Test
+ public void createFFS_noRetryOnMongoFailures_mongoFiltering() throws
Exception {
+ System.setProperty(OAK_INDEXER_PIPELINED_RETRY_ON_CONNECTION_ERRORS,
"false");
+ System.setProperty(OAK_INDEXER_PIPELINED_MONGO_REGEX_PATH_FILTERING,
"true");
+
+ Predicate<String> pathPredicate = s -> true;
+ List<PathFilter> pathFilters = List.of(contentDamPathFilter);
+
+ testSuccessfulDownload(pathPredicate, pathFilters);
}
@Test
- public void createFFSWithPipelinedStrategy() throws Exception {
+ public void createFFS_noRetryOnMongoFailures_noMongoFiltering() throws
Exception {
+ System.setProperty(OAK_INDEXER_PIPELINED_RETRY_ON_CONNECTION_ERRORS,
"false");
+ System.setProperty(OAK_INDEXER_PIPELINED_MONGO_REGEX_PATH_FILTERING,
"false");
+
+ Predicate<String> pathPredicate = s -> contentDamPathFilter.filter(s)
!= PathFilter.Result.EXCLUDE;
+ List<PathFilter> pathFilters = List.of(new
PathFilter(List.of("/content/dam"), List.of()));
+
+ testSuccessfulDownload(pathPredicate, pathFilters);
+ }
+
+ @Test
+ public void createFFS_filter_long_paths() throws Exception {
+ System.setProperty(OAK_INDEXER_PIPELINED_RETRY_ON_CONNECTION_ERRORS,
"false");
+ System.setProperty(OAK_INDEXER_PIPELINED_MONGO_REGEX_PATH_FILTERING,
"true");
+
+ // Create a filter on the node with the longest path
+ String longestLine =
EXPECTED_FFS.stream().max(Comparator.comparingInt(String::length)).get();
+ String longestPath = longestLine.substring(0,
longestLine.lastIndexOf("|"));
+ String parent = PathUtils.getParentPath(longestPath);
+ Predicate<String> pathPredicate = s -> true;
+ List<PathFilter> pathFilters = List.of(new PathFilter(List.of(parent),
List.of()));
+
+ // The results should contain all the parents of the node with the
longest path
+ ArrayList<String> expected = new ArrayList<>();
+ expected.add(longestPath + "|{}");
+ while (true) {
+ expected.add(parent + "|{}");
+ if (parent.equals("/")) {
+ break;
+ }
+ parent = PathUtils.getParentPath(parent);
+ }
+ // The list above has the longest paths first, reverse it to match the
order in the FFS
+ Collections.reverse(expected);
+
+ testSuccessfulDownload(pathPredicate, pathFilters, expected);
+ }
+
+ private void testSuccessfulDownload(Predicate<String> pathPredicate,
List<PathFilter> pathFilters)
+ throws CommitFailedException, IOException {
+ testSuccessfulDownload(pathPredicate, pathFilters, EXPECTED_FFS);
+ }
+
+ private void testSuccessfulDownload(Predicate<String> pathPredicate,
List<PathFilter> pathFilters, List<String> expected)
+ throws CommitFailedException, IOException {
ImmutablePair<MongoDocumentStore, DocumentNodeStore> rwStore =
createNodeStore(false);
createContent(rwStore.right);
ImmutablePair<MongoDocumentStore, DocumentNodeStore> roStore =
createNodeStore(true);
- Predicate<String> pathPredicate = s -> s.startsWith("/content/dam");
- PipelinedStrategy pipelinedStrategy = createStrategy(roStore,
pathPredicate);
+
+ PipelinedStrategy pipelinedStrategy = createStrategy(roStore,
pathPredicate, pathFilters);
File file = pipelinedStrategy.createSortedStoreFile();
assertTrue(file.exists());
- assertEquals(Arrays.asList(new String[] {"/content/dam|{}",
- "/content/dam/1000|{}",
- "/content/dam/1000/12|{\"p1\":\"v100012\"}",
- "/content/dam/2022|{}",
- "/content/dam/2022/02|{\"p1\":\"v202202\"}",
- "/content/dam/2023|{\"p2\":\"v2023\"}",
- "/content/dam/2023/01|{\"p1\":\"v202301\"}",
- "/content/dam/2023/02|{}",
- "/content/dam/2023/02/28|{\"p1\":\"v20230228\"}"}),
- Files.readAllLines(file.toPath()));
+ assertEquals(expected, Files.readAllLines(file.toPath()));
}
@Test
- public void createFFSWithPipelinedStrategy_pathPredicateDoesNotMatch()
throws Exception {
+ public void createFFS_pathPredicateDoesNotMatch() throws Exception {
ImmutablePair<MongoDocumentStore, DocumentNodeStore> rwStore =
createNodeStore(false);
createContent(rwStore.right);
ImmutablePair<MongoDocumentStore, DocumentNodeStore> roStore =
createNodeStore(true);
Predicate<String> pathPredicate = s ->
s.startsWith("/content/dam/does-not-exist");
- PipelinedStrategy pipelinedStrategy = createStrategy(roStore,
pathPredicate);
+ PipelinedStrategy pipelinedStrategy = createStrategy(roStore,
pathPredicate, null);
File file = pipelinedStrategy.createSortedStoreFile();
@@ -120,7 +203,7 @@ public class PipelinedIT {
}
@Test
- public void createFFSWithPipelinedStrategy_badNumberOfTransformThreads()
throws CommitFailedException {
+ public void createFFS_badNumberOfTransformThreads() throws
CommitFailedException {
System.setProperty(PipelinedStrategy.OAK_INDEXER_PIPELINED_TRANSFORM_THREADS,
"0");
ImmutablePair<MongoDocumentStore, DocumentNodeStore> rwStore =
createNodeStore(false);
@@ -136,7 +219,7 @@ public class PipelinedIT {
}
@Test
- public void createFFSWithPipelinedStrategy_badWorkingMemorySetting()
throws CommitFailedException {
+ public void createFFS_badWorkingMemorySetting() throws
CommitFailedException {
System.setProperty(PipelinedStrategy.OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB,
"-1");
ImmutablePair<MongoDocumentStore, DocumentNodeStore> rwStore =
createNodeStore(false);
@@ -161,16 +244,16 @@ public class PipelinedIT {
ImmutablePair<MongoDocumentStore, DocumentNodeStore> roStore =
createNodeStore(true);
Predicate<String> pathPredicate = s -> s.startsWith("/content/dam");
- PipelinedStrategy pipelinedStrategy = createStrategy(roStore,
pathPredicate);
+ PipelinedStrategy pipelinedStrategy = createStrategy(roStore,
pathPredicate, null);
pipelinedStrategy.createSortedStoreFile();
}
private PipelinedStrategy createStrategy(ImmutablePair<MongoDocumentStore,
DocumentNodeStore> roStore) {
- return createStrategy(roStore, s -> true);
+ return createStrategy(roStore, s -> true, null);
}
- private PipelinedStrategy createStrategy(ImmutablePair<MongoDocumentStore,
DocumentNodeStore> roStore, Predicate<String> pathPredicate) {
+ private PipelinedStrategy createStrategy(ImmutablePair<MongoDocumentStore,
DocumentNodeStore> roStore, Predicate<String> pathPredicate, List<PathFilter>
pathFilters) {
DocumentNodeStore readOnlyNodeStore = roStore.right;
MongoDocumentStore readOnlyMongoDocStore = roStore.left;
@@ -184,8 +267,8 @@ public class PipelinedIT {
new MemoryBlobStore(),
sortFolder.getRoot(),
Compression.NONE,
- pathPredicate
- );
+ pathPredicate,
+ pathFilters);
}
private void createContent(NodeStore rwNodeStore) throws
CommitFailedException {
@@ -197,9 +280,40 @@ public class PipelinedIT {
contentDamBuilder.child("1000").child("12").setProperty("p1",
"v100012");
contentDamBuilder.child("2023").setProperty("p2", "v2023");
contentDamBuilder.child("2023").child("02").child("28").setProperty("p1",
"v20230228");
+
+ // Node with very long name
+ @NotNull NodeBuilder node = contentDamBuilder;
+ for (int i = 0; i < LONG_PATH_TEST_LEVELS; i++) {
+ node = node.child(LONG_PATH_LEVEL_STRING + i);
+ }
+
+ // Other subtrees, to exercise filtering
+ rootBuilder.child("jcr:system").child("jcr:versionStorage")
+
.child("42").child("41").child("1.0").child("jcr:frozenNode").child("nodes").child("node0");
+
rootBuilder.child("home").child("users").child("system").child("cq:services").child("internal")
+
.child("dam").child("foobar").child("rep:principalPolicy").child("entry2")
+ .child("rep:restrictions");
+
rootBuilder.child("etc").child("scaffolding").child("jcr:content").child("cq:dialog")
+
.child("content").child("items").child("tabs").child("items").child("basic")
+ .child("items");
+
rwNodeStore.merge(rootBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
}
+ private static final List<String> EXPECTED_FFS = new ArrayList<>(List.of(
+ "/|{}",
+ "/content|{}",
+ "/content/dam|{}",
+ "/content/dam/1000|{}",
+ "/content/dam/1000/12|{\"p1\":\"v100012\"}",
+ "/content/dam/2022|{}",
+ "/content/dam/2022/02|{\"p1\":\"v202202\"}",
+ "/content/dam/2023|{\"p2\":\"v2023\"}",
+ "/content/dam/2023/01|{\"p1\":\"v202301\"}",
+ "/content/dam/2023/02|{}",
+ "/content/dam/2023/02/28|{\"p1\":\"v20230228\"}"
+ ));
+
private ImmutablePair<MongoDocumentStore, DocumentNodeStore>
createNodeStore(boolean readOnly) {
MongoConnection c = connectionFactory.getConnection();
DocumentMK.Builder builder = builderProvider.newBuilder();
diff --git
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTaskTest.java
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTaskTest.java
index cbbb644ba4..a9b2d002bd 100644
---
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTaskTest.java
+++
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTaskTest.java
@@ -25,6 +25,7 @@ import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
+import org.apache.jackrabbit.oak.spi.filter.PathFilter;
import org.bson.BsonDocument;
import org.bson.conversions.Bson;
import org.junit.Test;
@@ -37,6 +38,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
@@ -85,7 +87,7 @@ public class PipelinedMongoDownloadTaskTest {
int batchSize = 100;
BlockingQueue<BasicDBObject[]> queue = new ArrayBlockingQueue<>(100);
- PipelinedMongoDownloadTask task = new
PipelinedMongoDownloadTask(dbCollection, batchSize, queue);
+ PipelinedMongoDownloadTask task = new
PipelinedMongoDownloadTask(dbCollection, batchSize, queue, null);
// Execute
PipelinedMongoDownloadTask.Result result = task.call();
@@ -101,4 +103,33 @@ public class PipelinedMongoDownloadTaskTest {
verify(dbCollection).find(BsonDocument.parse("{\"_modified\":
{\"$gte\": 123000, \"$lt\": 123001}, \"_id\": {\"$gt\":
\"3:/content/dam/asset1\"}}"));
verify(dbCollection).find(BsonDocument.parse("{\"_modified\":
{\"$gte\": 123001}}"));
}
+
+ @Test
+ public void regexFilters() {
+ assertNull(PipelinedMongoDownloadTask.getSingleIncludedPath(null));
+
+
assertNull(PipelinedMongoDownloadTask.getSingleIncludedPath(List.of()));
+
+ List<PathFilter> singlePathFilter = List.of(
+ new PathFilter(List.of("/content/dam"), List.of())
+ );
+ assertEquals("/content/dam",
PipelinedMongoDownloadTask.getSingleIncludedPath(singlePathFilter));
+
+ List<PathFilter> multipleIncludeFilters = List.of(
+ new PathFilter(List.of("/content/dam"), List.of()),
+ new PathFilter(List.of("/content/dam"), List.of())
+ );
+ assertEquals("/content/dam",
PipelinedMongoDownloadTask.getSingleIncludedPath(multipleIncludeFilters));
+
+ List<PathFilter> multipleIncludeFiltersDifferent = List.of(
+ new PathFilter(List.of("/content/dam"), List.of()),
+ new PathFilter(List.of("/content/dam/collections"), List.of())
+ );
+
assertNull(PipelinedMongoDownloadTask.getSingleIncludedPath(multipleIncludeFiltersDifferent));
+
+ List<PathFilter> withExcludeFilter = List.of(
+ new PathFilter(List.of("/"), List.of("/var"))
+ );
+
assertNull(PipelinedMongoDownloadTask.getSingleIncludedPath(withExcludeFilter));
+ }
}
\ No newline at end of file
diff --git
a/oak-store-spi/src/main/java/org/apache/jackrabbit/oak/spi/filter/PathFilter.java
b/oak-store-spi/src/main/java/org/apache/jackrabbit/oak/spi/filter/PathFilter.java
index 439b71382a..71be083b24 100644
---
a/oak-store-spi/src/main/java/org/apache/jackrabbit/oak/spi/filter/PathFilter.java
+++
b/oak-store-spi/src/main/java/org/apache/jackrabbit/oak/spi/filter/PathFilter.java
@@ -23,6 +23,7 @@ package org.apache.jackrabbit.oak.spi.filter;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import java.util.Set;
import org.apache.jackrabbit.oak.api.PropertyState;
@@ -72,13 +73,12 @@ public class PathFilter {
TRAVERSE
}
- private static final PathFilter ALL = new PathFilter(INCLUDE_ROOT,
Collections.<String>emptyList()) {
+ private static final PathFilter ALL = new PathFilter(INCLUDE_ROOT,
Collections.emptyList()) {
@Override
public Result filter(@NotNull String path) {
return Result.INCLUDE;
}
};
-
private final String[] includedPaths;
private final String[] excludedPaths;
@@ -99,7 +99,7 @@ public class PathFilter {
}
return new PathFilter(getStrings(defn, PROP_INCLUDED_PATHS,
INCLUDE_ROOT), getStrings(defn, PROP_EXCLUDED_PATHS,
- Collections.<String> emptyList()));
+ Collections.emptyList()));
}
/**
@@ -107,9 +107,8 @@ public class PathFilter {
*
* If both are empty then all paths would be considered to be included
*
- * @param includes list of paths which should not be included
- * @param excludes list of p
- * aths which should be included
+ * @param includes list of paths which should be included
+ * @param excludes list of paths which should not be included
*/
public PathFilter(Iterable<String> includes, Iterable<String> excludes) {
Set<String> includeCopy = newHashSet(includes);
@@ -149,6 +148,14 @@ public class PathFilter {
return Result.EXCLUDE;
}
+ public List<String> getIncludedPaths() {
+ return List.of(includedPaths);
+ }
+
+ public List<String> getExcludedPaths() {
+ return List.of(excludedPaths);
+ }
+
@Override
public String toString() {
return "PathFilter{" +