This is an automated email from the ASF dual-hosted git repository. daim pushed a commit to branch DetailedGC/OAK-10199 in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
commit 297f10704e1de3a30ad540ca2e91f69401ef93b8 Author: Nuno Santos <[email protected]> AuthorDate: Fri Dec 15 09:58:29 2023 +0100 OAK-10580 - Support multiple include paths in regex path filtering (#1249) --- .../pipelined/PipelinedMongoDownloadTask.java | 126 ++++++++++++++------- .../document/flatfile/pipelined/PipelinedIT.java | 21 ++++ .../pipelined/PipelinedMongoDownloadTaskTest.java | 43 ++++--- 3 files changed, 133 insertions(+), 57 deletions(-) diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java index 6c16faf703..caa8a1ee75 100644 --- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java +++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java @@ -51,9 +51,9 @@ import org.slf4j.LoggerFactory; import java.time.Duration; import java.time.Instant; import java.time.temporal.ChronoUnit; -import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; @@ -64,6 +64,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.regex.Pattern; import java.util.stream.Collectors; +import java.util.stream.Stream; import static com.mongodb.client.model.Filters.regex; import static com.mongodb.client.model.Sorts.ascending; @@ -200,7 +201,7 @@ public class PipelinedMongoDownloadTask implements Callable<PipelinedMongoDownlo String enqueueingDelayPercentage = PipelinedUtils.formatAsPercentage(totalEnqueueWaitTimeMillis, durationMillis); String metrics = MetricsFormatter.newBuilder() .add("duration", FormattingUtils.formatToSeconds(downloadStartWatch)) - .add("durationSeconds", durationMillis/1000) + .add("durationSeconds", durationMillis / 1000) .add("documentsDownloaded", documentsRead) .add("enqueueingDelayMillis", totalEnqueueWaitTimeMillis) .add("enqueueingDelayPercentage", enqueueingDelayPercentage) @@ -237,18 +238,18 @@ public class PipelinedMongoDownloadTask implements Callable<PipelinedMongoDownlo // If regex filtering is enabled, start by downloading the ancestors of the path used for filtering. // That is, download "/", "/content", "/content/dam" for a base path of "/content/dam". These nodes will not be // matched by the regex used in the Mongo query, which assumes a prefix of "???:/content/dam" - String regexBasePath = getPathForRegexFiltering(); + Set<String> regexBasePaths = getPathsForRegexFiltering(); Bson childrenFilter; - if (regexBasePath == null) { + if (regexBasePaths.isEmpty()) { childrenFilter = null; } else { // Regex path filtering is enabled // Download the ancestors in a separate query. No retrials done on this query, as it will take only a few // seconds and is done at the start of the job, so if it fails, the job can be retried without losing much work - downloadAncestors(regexBasePath); + downloadAncestors(regexBasePaths); // Filter to apply to the main query - childrenFilter = descendantsFilter(regexBasePath); + childrenFilter = descendantsFilter(regexBasePaths); } Instant failuresStartTimestamp = null; // When the last series of failures started @@ -317,9 +318,9 @@ public class PipelinedMongoDownloadTask implements Callable<PipelinedMongoDownlo download(mongoIterable); } - private void downloadAncestors(String basePath) throws InterruptedException, TimeoutException { + private void downloadAncestors(Set<String> basePath) throws InterruptedException, TimeoutException { Bson ancestorQuery = ancestorsFilter(basePath); - LOG.info("Downloading using regex path filtering. Base path: {}, Ancestors query: {}.", basePath, ancestorQuery); + LOG.info("Downloading ancestors of: {}, Query: {}.", basePath, ancestorQuery); FindIterable<NodeDocument> ancestorsIterable = dbCollection .withReadPreference(readPreference) .find(ancestorQuery) @@ -331,8 +332,8 @@ public class PipelinedMongoDownloadTask implements Callable<PipelinedMongoDownlo private void downloadWithNaturalOrdering() throws InterruptedException, TimeoutException { // We are downloading potentially a large fraction of the repository, so using an index scan will be // inefficient. So we pass the natural hint to force MongoDB to use natural ordering, that is, column scan - String regexBasePath = getPathForRegexFiltering(); - if (regexBasePath == null) { + Set<String> regexBasePath = getPathsForRegexFiltering(); + if (regexBasePath.isEmpty()) { LOG.info("Downloading full repository using natural order"); FindIterable<NodeDocument> mongoIterable = dbCollection .withReadPreference(readPreference) @@ -353,38 +354,67 @@ public class PipelinedMongoDownloadTask implements Callable<PipelinedMongoDownlo } } - private String getPathForRegexFiltering() { + private Set<String> getPathsForRegexFiltering() { if (!regexPathFiltering) { LOG.info("Regex path filtering disabled."); - return null; + return Set.of(); } - return getSingleIncludedPath(pathFilters); + return extractIncludedPaths(pathFilters); } + /** + * Aggregates the included paths from the path filters. The final list will not contain duplicates or overlapping + * paths (i.e., /a and /a/b). + * + * @param pathFilters Empty set if path filtering should be disabled, otherwise the paths that should be included + * in the Mongo query filters + */ // Package private for testing - static String getSingleIncludedPath(List<PathFilter> pathFilters) { - // For the time being, we only enable path filtering if there is a single include path across all indexes and no - // exclude paths. This is the case for most of the larger indexes. We can consider generalizing this in the future. - LOG.info("Creating regex filter from pathFilters: " + pathFilters); + static Set<String> extractIncludedPaths(List<PathFilter> pathFilters) { + // Path filtering is enabled only if there are no excludedPaths. if (pathFilters == null) { - return null; + return Set.of(); + } + Set<String> includedPaths = new HashSet<>(); + Set<String> excludedPaths = new HashSet<>(); + for (PathFilter pathFilter : pathFilters) { + includedPaths.addAll(pathFilter.getIncludedPaths()); + excludedPaths.addAll(pathFilter.getExcludedPaths()); + } + // Sort by natural order, so that parent paths appear before any children. This makes it easier to compute the + // common ancestors of included paths + List<String> sortedIncludedPaths = includedPaths.stream() + .sorted() + .collect(Collectors.toList()); + + LOG.info("Paths considered for regex filtering. IncludedPaths: {}, excludedPaths: {}", sortedIncludedPaths, excludedPaths); + if (sortedIncludedPaths.contains("/")) { + LOG.info("Disabling regex path filtering because root path is in the includedPaths: {}", sortedIncludedPaths); + return Set.of(); + } + if (!excludedPaths.isEmpty()) { + LOG.info("Disabling regex path filtering because there are excluded paths: {}", excludedPaths); + return Set.of(); } - Set<String> includedPaths = pathFilters.stream() - .flatMap(pathFilter -> pathFilter.getIncludedPaths().stream()) - .collect(Collectors.toSet()); - - Set<String> excludedPaths = pathFilters.stream() - .flatMap(pathFilter -> pathFilter.getExcludedPaths().stream()) - .collect(Collectors.toSet()); - if (excludedPaths.isEmpty() && includedPaths.size() == 1) { - return includedPaths.stream().iterator().next(); - } else { - return null; + // Keep only unique include paths. That is, if paths "/a/b" and "/a/b/c" are both in the list, keep only "/a/b" + HashSet<String> includedPathsRoots = new HashSet<>(); + for (String path : sortedIncludedPaths) { + if (includedPathsRoots.stream().noneMatch(ancestor -> PathUtils.isAncestor(ancestor, path))) { + includedPathsRoots.add(path); + } } + return includedPathsRoots; } - private static Bson descendantsFilter(String path) { + private static Bson descendantsFilter(Set<String> path) { + List<Bson> filters = path.stream() + .flatMap(PipelinedMongoDownloadTask::descendantsFilter) + .collect(Collectors.toList()); + return Filters.or(filters); + } + + private static Stream<Bson> descendantsFilter(String path) { if (!path.endsWith("/")) { path = path + "/"; } @@ -392,24 +422,34 @@ public class PipelinedMongoDownloadTask implements Callable<PipelinedMongoDownlo // For entries with path sizes above a certain threshold, the _id field contains a hash instead of the path of // the entry. The path is stored instead in the _path field. Therefore, we have to include in the filter also // the documents with matching _path. - return Filters.or( + return Stream.of( regex(NodeDocument.ID, Pattern.compile("^[0-9]{1,3}:" + quotedPath + ".*$")), - regex(NodeDocument.PATH, Pattern.compile(quotedPath + ".*$")) - ); + regex(NodeDocument.PATH, Pattern.compile(quotedPath + ".*$") + )); } - private static Bson ancestorsFilter(String path) { - ArrayList<Bson> parentFilters = new ArrayList<>(); - String currentPath = path; - while (true) { - String currentId = Utils.getIdFromPath(currentPath); - parentFilters.add(Filters.eq(NodeDocument.ID, currentId)); - if (PathUtils.denotesRoot(currentPath)) { - break; + + static Set<String> getAncestors(Set<String> paths) { + Set<String> ancestors = new HashSet<>(); + for (String child : paths) { + String parent = child; + while (true) { + ancestors.add(parent); + if (PathUtils.denotesRoot(parent)) { + break; + } + parent = PathUtils.getParentPath(parent); } - currentPath = PathUtils.getParentPath(currentPath); } - return Filters.or(parentFilters); + return ancestors; + } + + + static Bson ancestorsFilter(Set<String> paths) { + List<String> parentFilters = getAncestors(paths).stream() + .map(Utils::getIdFromPath) + .collect(Collectors.toList()); + return Filters.in(NodeDocument.ID, parentFilters); } private void download(FindIterable<NodeDocument> mongoIterable) throws InterruptedException, TimeoutException { diff --git a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedIT.java b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedIT.java index c771dc4b2d..1da9b7f86e 100644 --- a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedIT.java +++ b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedIT.java @@ -164,6 +164,27 @@ public class PipelinedIT { testSuccessfulDownload(pathPredicate, pathFilters); } + @Test + public void createFFS_mongoFiltering_multipleIndexes() throws Exception { + System.setProperty(OAK_INDEXER_PIPELINED_MONGO_REGEX_PATH_FILTERING, "true"); + + Predicate<String> pathPredicate = s -> true; + PathFilter pathFilter = new PathFilter(List.of("/content/dam/1000", "/content/dam/2023", "/content/dam/2023/01"), List.of()); + List<PathFilter> pathFilters = List.of(pathFilter); + + testSuccessfulDownload(pathPredicate, pathFilters, List.of( + "/|{}", + "/content|{}", + "/content/dam|{}", + "/content/dam/1000|{}", + "/content/dam/1000/12|{\"p1\":\"v100012\"}", + "/content/dam/2023|{\"p2\":\"v2023\"}", + "/content/dam/2023/01|{\"p1\":\"v202301\"}", + "/content/dam/2023/02|{}", + "/content/dam/2023/02/28|{\"p1\":\"v20230228\"}" + )); + } + @Test public void createFFS_noRetryOnMongoFailures_noMongoFiltering() throws Exception { System.setProperty(OAK_INDEXER_PIPELINED_RETRY_ON_CONNECTION_ERRORS, "false"); diff --git a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTaskTest.java b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTaskTest.java index 82e9f6d98d..fe1ca75945 100644 --- a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTaskTest.java +++ b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTaskTest.java @@ -45,7 +45,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; @@ -133,32 +132,48 @@ public class PipelinedMongoDownloadTaskTest { verify(dbCollection).find(BsonDocument.parse("{\"_modified\": {\"$gte\": 123001}}")); } + private List<PathFilter> createIncludedPathFilters(String... paths) { + return Arrays.stream(paths).map(path -> new PathFilter(List.of(path), List.of())).collect(Collectors.toList()); + } + + @Test + public void ancestorsFilters() { + assertEquals(Set.of(), PipelinedMongoDownloadTask.getAncestors(Set.of())); + assertEquals(Set.of("/"), PipelinedMongoDownloadTask.getAncestors(Set.of("/"))); + assertEquals(Set.of("/", "/a"), PipelinedMongoDownloadTask.getAncestors(Set.of("/a"))); + assertEquals(Set.of("/", "/a", "/b", "/c", "/c/c1", "/c/c1/c2", "/c/c1/c2/c3", "/c/c1/c2/c4"), + PipelinedMongoDownloadTask.getAncestors(Set.of("/a", "/b", "/c/c1/c2/c3", "/c/c1/c2/c4")) + ); + } + @Test public void regexFilters() { - assertNull(PipelinedMongoDownloadTask.getSingleIncludedPath(null)); + assertEquals(Set.of(), PipelinedMongoDownloadTask.extractIncludedPaths(null)); - assertNull(PipelinedMongoDownloadTask.getSingleIncludedPath(List.of())); + assertEquals(Set.of(), PipelinedMongoDownloadTask.extractIncludedPaths(List.of())); List<PathFilter> singlePathFilter = List.of( new PathFilter(List.of("/content/dam"), List.of()) ); - assertEquals("/content/dam", PipelinedMongoDownloadTask.getSingleIncludedPath(singlePathFilter)); + assertEquals(Set.of("/content/dam"), PipelinedMongoDownloadTask.extractIncludedPaths(singlePathFilter)); - List<PathFilter> multipleIncludeFilters = List.of( - new PathFilter(List.of("/content/dam"), List.of()), - new PathFilter(List.of("/content/dam"), List.of()) - ); - assertEquals("/content/dam", PipelinedMongoDownloadTask.getSingleIncludedPath(multipleIncludeFilters)); + List<PathFilter> multipleIncludeFilters = createIncludedPathFilters("/content/dam", "/content/dam"); + assertEquals(Set.of("/content/dam"), PipelinedMongoDownloadTask.extractIncludedPaths(multipleIncludeFilters)); - List<PathFilter> multipleIncludeFiltersDifferent = List.of( - new PathFilter(List.of("/content/dam"), List.of()), - new PathFilter(List.of("/content/dam/collections"), List.of()) + List<PathFilter> includesRoot = List.of( + new PathFilter(List.of("/"), List.of()) ); - assertNull(PipelinedMongoDownloadTask.getSingleIncludedPath(multipleIncludeFiltersDifferent)); + assertEquals(Set.of(), PipelinedMongoDownloadTask.extractIncludedPaths(includesRoot)); + + List<PathFilter> multipleIncludeFiltersDifferent = createIncludedPathFilters("/a/a1", "/a/a1/a2"); + assertEquals(Set.of("/a/a1"), PipelinedMongoDownloadTask.extractIncludedPaths(multipleIncludeFiltersDifferent)); + + List<PathFilter> multipleIncludeFiltersDifferent2 = createIncludedPathFilters("/a/a1/a2", "/a/a1", "/b", "/c", "/cc"); + assertEquals(Set.of("/a/a1", "/b", "/c", "/cc"), PipelinedMongoDownloadTask.extractIncludedPaths(multipleIncludeFiltersDifferent2)); List<PathFilter> withExcludeFilter = List.of( new PathFilter(List.of("/"), List.of("/var")) ); - assertNull(PipelinedMongoDownloadTask.getSingleIncludedPath(withExcludeFilter)); + assertEquals(Set.of(), PipelinedMongoDownloadTask.extractIncludedPaths(withExcludeFilter)); } } \ No newline at end of file
