This is an automated email from the ASF dual-hosted git repository.

daim pushed a commit to branch DetailedGC/OAK-10199
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git

commit 297f10704e1de3a30ad540ca2e91f69401ef93b8
Author: Nuno Santos <[email protected]>
AuthorDate: Fri Dec 15 09:58:29 2023 +0100

    OAK-10580 - Support multiple include paths in regex path filtering (#1249)
---
 .../pipelined/PipelinedMongoDownloadTask.java      | 126 ++++++++++++++-------
 .../document/flatfile/pipelined/PipelinedIT.java   |  21 ++++
 .../pipelined/PipelinedMongoDownloadTaskTest.java  |  43 ++++---
 3 files changed, 133 insertions(+), 57 deletions(-)

diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
index 6c16faf703..caa8a1ee75 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
@@ -51,9 +51,9 @@ import org.slf4j.LoggerFactory;
 import java.time.Duration;
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -64,6 +64,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static com.mongodb.client.model.Filters.regex;
 import static com.mongodb.client.model.Sorts.ascending;
@@ -200,7 +201,7 @@ public class PipelinedMongoDownloadTask implements 
Callable<PipelinedMongoDownlo
             String enqueueingDelayPercentage = 
PipelinedUtils.formatAsPercentage(totalEnqueueWaitTimeMillis, durationMillis);
             String metrics = MetricsFormatter.newBuilder()
                     .add("duration", 
FormattingUtils.formatToSeconds(downloadStartWatch))
-                    .add("durationSeconds", durationMillis/1000)
+                    .add("durationSeconds", durationMillis / 1000)
                     .add("documentsDownloaded", documentsRead)
                     .add("enqueueingDelayMillis", totalEnqueueWaitTimeMillis)
                     .add("enqueueingDelayPercentage", 
enqueueingDelayPercentage)
@@ -237,18 +238,18 @@ public class PipelinedMongoDownloadTask implements 
Callable<PipelinedMongoDownlo
         // If regex filtering is enabled, start by downloading the ancestors 
of the path used for filtering.
         // That is, download "/", "/content", "/content/dam" for a base path 
of "/content/dam". These nodes will not be
         // matched by the regex used in the Mongo query, which assumes a 
prefix of "???:/content/dam"
-        String regexBasePath = getPathForRegexFiltering();
+        Set<String> regexBasePaths = getPathsForRegexFiltering();
         Bson childrenFilter;
-        if (regexBasePath == null) {
+        if (regexBasePaths.isEmpty()) {
             childrenFilter = null;
         } else {
             // Regex path filtering is enabled
             // Download the ancestors in a separate query. No retrials done on 
this query, as it will take only a few
             // seconds and is done at the start of the job, so if it fails, 
the job can be retried without losing much work
-            downloadAncestors(regexBasePath);
+            downloadAncestors(regexBasePaths);
 
             // Filter to apply to the main query
-            childrenFilter = descendantsFilter(regexBasePath);
+            childrenFilter = descendantsFilter(regexBasePaths);
         }
 
         Instant failuresStartTimestamp = null; // When the last series of 
failures started
@@ -317,9 +318,9 @@ public class PipelinedMongoDownloadTask implements 
Callable<PipelinedMongoDownlo
         download(mongoIterable);
     }
 
-    private void downloadAncestors(String basePath) throws 
InterruptedException, TimeoutException {
+    private void downloadAncestors(Set<String> basePath) throws 
InterruptedException, TimeoutException {
         Bson ancestorQuery = ancestorsFilter(basePath);
-        LOG.info("Downloading using regex path filtering. Base path: {}, 
Ancestors query: {}.", basePath, ancestorQuery);
+        LOG.info("Downloading ancestors of: {}, Query: {}.", basePath, 
ancestorQuery);
         FindIterable<NodeDocument> ancestorsIterable = dbCollection
                 .withReadPreference(readPreference)
                 .find(ancestorQuery)
@@ -331,8 +332,8 @@ public class PipelinedMongoDownloadTask implements 
Callable<PipelinedMongoDownlo
     private void downloadWithNaturalOrdering() throws InterruptedException, 
TimeoutException {
         // We are downloading potentially a large fraction of the repository, 
so using an index scan will be
         // inefficient. So we pass the natural hint to force MongoDB to use 
natural ordering, that is, column scan
-        String regexBasePath = getPathForRegexFiltering();
-        if (regexBasePath == null) {
+        Set<String> regexBasePath = getPathsForRegexFiltering();
+        if (regexBasePath.isEmpty()) {
             LOG.info("Downloading full repository using natural order");
             FindIterable<NodeDocument> mongoIterable = dbCollection
                     .withReadPreference(readPreference)
@@ -353,38 +354,67 @@ public class PipelinedMongoDownloadTask implements 
Callable<PipelinedMongoDownlo
         }
     }
 
-    private String getPathForRegexFiltering() {
+    private Set<String> getPathsForRegexFiltering() {
         if (!regexPathFiltering) {
             LOG.info("Regex path filtering disabled.");
-            return null;
+            return Set.of();
         }
-        return getSingleIncludedPath(pathFilters);
+        return extractIncludedPaths(pathFilters);
     }
 
+    /**
+     * Aggregates the included paths from the path filters. The final list 
will not contain duplicates or overlapping
+     * paths (i.e., /a and /a/b).
+     *
+     * @param pathFilters Empty set if path filtering should be disabled, 
otherwise the paths that should be included
+     *                    in the Mongo query filters
+     */
     // Package private for testing
-    static String getSingleIncludedPath(List<PathFilter> pathFilters) {
-        // For the time being, we only enable path filtering if there is a 
single include path across all indexes and no
-        // exclude paths. This is the case for most of the larger indexes. We 
can consider generalizing this in the future.
-        LOG.info("Creating regex filter from pathFilters: " + pathFilters);
+    static Set<String> extractIncludedPaths(List<PathFilter> pathFilters) {
+        // Path filtering is enabled only if there are no excludedPaths.
         if (pathFilters == null) {
-            return null;
+            return Set.of();
+        }
+        Set<String> includedPaths = new HashSet<>();
+        Set<String> excludedPaths = new HashSet<>();
+        for (PathFilter pathFilter : pathFilters) {
+            includedPaths.addAll(pathFilter.getIncludedPaths());
+            excludedPaths.addAll(pathFilter.getExcludedPaths());
+        }
+        // Sort by natural order, so that parent paths appear before any 
children. This makes it easier to compute the
+        // common ancestors of included paths
+        List<String> sortedIncludedPaths = includedPaths.stream()
+                .sorted()
+                .collect(Collectors.toList());
+
+        LOG.info("Paths considered for regex filtering. IncludedPaths: {}, 
excludedPaths: {}", sortedIncludedPaths, excludedPaths);
+        if (sortedIncludedPaths.contains("/")) {
+            LOG.info("Disabling regex path filtering because root path is in 
the includedPaths: {}", sortedIncludedPaths);
+            return Set.of();
+        }
+        if (!excludedPaths.isEmpty()) {
+            LOG.info("Disabling regex path filtering because there are 
excluded paths: {}", excludedPaths);
+            return Set.of();
         }
-        Set<String> includedPaths = pathFilters.stream()
-                .flatMap(pathFilter -> pathFilter.getIncludedPaths().stream())
-                .collect(Collectors.toSet());
-
-        Set<String> excludedPaths = pathFilters.stream()
-                .flatMap(pathFilter -> pathFilter.getExcludedPaths().stream())
-                .collect(Collectors.toSet());
 
-        if (excludedPaths.isEmpty() && includedPaths.size() == 1) {
-            return includedPaths.stream().iterator().next();
-        } else {
-            return null;
+        // Keep only unique include paths. That is, if paths "/a/b" and 
"/a/b/c" are both in the list, keep only "/a/b"
+        HashSet<String> includedPathsRoots = new HashSet<>();
+        for (String path : sortedIncludedPaths) {
+            if (includedPathsRoots.stream().noneMatch(ancestor -> 
PathUtils.isAncestor(ancestor, path))) {
+                includedPathsRoots.add(path);
+            }
         }
+        return includedPathsRoots;
     }
 
-    private static Bson descendantsFilter(String path) {
+    private static Bson descendantsFilter(Set<String> path) {
+        List<Bson> filters = path.stream()
+                .flatMap(PipelinedMongoDownloadTask::descendantsFilter)
+                .collect(Collectors.toList());
+        return Filters.or(filters);
+    }
+
+    private static Stream<Bson> descendantsFilter(String path) {
         if (!path.endsWith("/")) {
             path = path + "/";
         }
@@ -392,24 +422,34 @@ public class PipelinedMongoDownloadTask implements 
Callable<PipelinedMongoDownlo
         // For entries with path sizes above a certain threshold, the _id 
field contains a hash instead of the path of
         // the entry. The path is stored instead in the _path field. 
Therefore, we have to include in the filter also
         // the documents with matching _path.
-        return Filters.or(
+        return Stream.of(
                 regex(NodeDocument.ID, Pattern.compile("^[0-9]{1,3}:" + 
quotedPath + ".*$")),
-                regex(NodeDocument.PATH, Pattern.compile(quotedPath + ".*$"))
-        );
+                regex(NodeDocument.PATH, Pattern.compile(quotedPath + ".*$")
+                ));
     }
 
-    private static Bson ancestorsFilter(String path) {
-        ArrayList<Bson> parentFilters = new ArrayList<>();
-        String currentPath = path;
-        while (true) {
-            String currentId = Utils.getIdFromPath(currentPath);
-            parentFilters.add(Filters.eq(NodeDocument.ID, currentId));
-            if (PathUtils.denotesRoot(currentPath)) {
-                break;
+
+    static Set<String> getAncestors(Set<String> paths) {
+        Set<String> ancestors = new HashSet<>();
+        for (String child : paths) {
+            String parent = child;
+            while (true) {
+                ancestors.add(parent);
+                if (PathUtils.denotesRoot(parent)) {
+                    break;
+                }
+                parent = PathUtils.getParentPath(parent);
             }
-            currentPath = PathUtils.getParentPath(currentPath);
         }
-        return Filters.or(parentFilters);
+        return ancestors;
+    }
+
+
+    static Bson ancestorsFilter(Set<String> paths) {
+        List<String> parentFilters = getAncestors(paths).stream()
+                .map(Utils::getIdFromPath)
+                .collect(Collectors.toList());
+        return Filters.in(NodeDocument.ID, parentFilters);
     }
 
     private void download(FindIterable<NodeDocument> mongoIterable) throws 
InterruptedException, TimeoutException {
diff --git 
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedIT.java
 
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedIT.java
index c771dc4b2d..1da9b7f86e 100644
--- 
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedIT.java
+++ 
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedIT.java
@@ -164,6 +164,27 @@ public class PipelinedIT {
         testSuccessfulDownload(pathPredicate, pathFilters);
     }
 
+    @Test
+    public void createFFS_mongoFiltering_multipleIndexes() throws Exception {
+        System.setProperty(OAK_INDEXER_PIPELINED_MONGO_REGEX_PATH_FILTERING, 
"true");
+
+        Predicate<String> pathPredicate = s -> true;
+        PathFilter pathFilter = new PathFilter(List.of("/content/dam/1000", 
"/content/dam/2023", "/content/dam/2023/01"), List.of());
+        List<PathFilter> pathFilters = List.of(pathFilter);
+
+        testSuccessfulDownload(pathPredicate, pathFilters, List.of(
+                "/|{}",
+                "/content|{}",
+                "/content/dam|{}",
+                "/content/dam/1000|{}",
+                "/content/dam/1000/12|{\"p1\":\"v100012\"}",
+                "/content/dam/2023|{\"p2\":\"v2023\"}",
+                "/content/dam/2023/01|{\"p1\":\"v202301\"}",
+                "/content/dam/2023/02|{}",
+                "/content/dam/2023/02/28|{\"p1\":\"v20230228\"}"
+        ));
+    }
+
     @Test
     public void createFFS_noRetryOnMongoFailures_noMongoFiltering() throws 
Exception {
         System.setProperty(OAK_INDEXER_PIPELINED_RETRY_ON_CONNECTION_ERRORS, 
"false");
diff --git 
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTaskTest.java
 
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTaskTest.java
index 82e9f6d98d..fe1ca75945 100644
--- 
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTaskTest.java
+++ 
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTaskTest.java
@@ -45,7 +45,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
@@ -133,32 +132,48 @@ public class PipelinedMongoDownloadTaskTest {
         verify(dbCollection).find(BsonDocument.parse("{\"_modified\": 
{\"$gte\": 123001}}"));
     }
 
+    private List<PathFilter> createIncludedPathFilters(String... paths) {
+        return Arrays.stream(paths).map(path -> new PathFilter(List.of(path), 
List.of())).collect(Collectors.toList());
+    }
+
+    @Test
+    public void ancestorsFilters() {
+        assertEquals(Set.of(), 
PipelinedMongoDownloadTask.getAncestors(Set.of()));
+        assertEquals(Set.of("/"), 
PipelinedMongoDownloadTask.getAncestors(Set.of("/")));
+        assertEquals(Set.of("/", "/a"), 
PipelinedMongoDownloadTask.getAncestors(Set.of("/a")));
+        assertEquals(Set.of("/", "/a", "/b", "/c", "/c/c1", "/c/c1/c2", 
"/c/c1/c2/c3", "/c/c1/c2/c4"),
+                PipelinedMongoDownloadTask.getAncestors(Set.of("/a", "/b", 
"/c/c1/c2/c3", "/c/c1/c2/c4"))
+        );
+    }
+
     @Test
     public void regexFilters() {
-        assertNull(PipelinedMongoDownloadTask.getSingleIncludedPath(null));
+        assertEquals(Set.of(), 
PipelinedMongoDownloadTask.extractIncludedPaths(null));
 
-        
assertNull(PipelinedMongoDownloadTask.getSingleIncludedPath(List.of()));
+        assertEquals(Set.of(), 
PipelinedMongoDownloadTask.extractIncludedPaths(List.of()));
 
         List<PathFilter> singlePathFilter = List.of(
                 new PathFilter(List.of("/content/dam"), List.of())
         );
-        assertEquals("/content/dam", 
PipelinedMongoDownloadTask.getSingleIncludedPath(singlePathFilter));
+        assertEquals(Set.of("/content/dam"), 
PipelinedMongoDownloadTask.extractIncludedPaths(singlePathFilter));
 
-        List<PathFilter> multipleIncludeFilters = List.of(
-                new PathFilter(List.of("/content/dam"), List.of()),
-                new PathFilter(List.of("/content/dam"), List.of())
-        );
-        assertEquals("/content/dam", 
PipelinedMongoDownloadTask.getSingleIncludedPath(multipleIncludeFilters));
+        List<PathFilter> multipleIncludeFilters = 
createIncludedPathFilters("/content/dam", "/content/dam");
+        assertEquals(Set.of("/content/dam"), 
PipelinedMongoDownloadTask.extractIncludedPaths(multipleIncludeFilters));
 
-        List<PathFilter> multipleIncludeFiltersDifferent = List.of(
-                new PathFilter(List.of("/content/dam"), List.of()),
-                new PathFilter(List.of("/content/dam/collections"), List.of())
+        List<PathFilter> includesRoot = List.of(
+                new PathFilter(List.of("/"), List.of())
         );
-        
assertNull(PipelinedMongoDownloadTask.getSingleIncludedPath(multipleIncludeFiltersDifferent));
+        assertEquals(Set.of(), 
PipelinedMongoDownloadTask.extractIncludedPaths(includesRoot));
+
+        List<PathFilter> multipleIncludeFiltersDifferent = 
createIncludedPathFilters("/a/a1", "/a/a1/a2");
+        assertEquals(Set.of("/a/a1"), 
PipelinedMongoDownloadTask.extractIncludedPaths(multipleIncludeFiltersDifferent));
+
+        List<PathFilter> multipleIncludeFiltersDifferent2 = 
createIncludedPathFilters("/a/a1/a2", "/a/a1", "/b", "/c", "/cc");
+        assertEquals(Set.of("/a/a1", "/b", "/c", "/cc"), 
PipelinedMongoDownloadTask.extractIncludedPaths(multipleIncludeFiltersDifferent2));
 
         List<PathFilter> withExcludeFilter = List.of(
                 new PathFilter(List.of("/"), List.of("/var"))
         );
-        
assertNull(PipelinedMongoDownloadTask.getSingleIncludedPath(withExcludeFilter));
+        assertEquals(Set.of(), 
PipelinedMongoDownloadTask.extractIncludedPaths(withExcludeFilter));
     }
 }
\ No newline at end of file

Reply via email to