thomasmueller commented on code in PR #1716:
URL: https://github.com/apache/jackrabbit-oak/pull/1716#discussion_r1760717094
##########
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/ConfigHelper.java:
##########
@@ -48,4 +52,11 @@ public static boolean getSystemPropertyAsBoolean(String
name, boolean defaultVal
LOG.info("Config {}={}", name, value);
return value;
}
+
+ public static List<String> getSystemPropertyAsStringList(String name,
String defaultValue, String separator) {
+ String result = System.getProperty(name, defaultValue);
+ List<String> parts = result.isBlank() ? List.of() :
Arrays.stream(result.split(separator)).map(String::trim).collect(Collectors.toList());
Review Comment:
What about to methods: `getSystemProperty` and `splitIgnoreSpace`? I see the
code is quite lenient in what it accepts (isBlank vs isEmpty, trim). For
user-facing configuration, I think it's the right thing to do; we ran into
problems because people added spaces eg. "type" = "lucene " (trailing space).
(I think it is not necessary in this case because this is not end-user
facing but only for internal use. For internal use, I think non-lenient
behavior is better because it results in shorter, easier to read code. But it's
not that bad here, so I think it's fine.)
But more importantly, what if spaces have meaning? The method name
`getSystemPropertyAsStringList` doesn't convey what it does, and there is no
Javadoc.
What about renaming "separator" -> "separatorRegex", or then quote the
separator (using Pattern.quote I think)?
##########
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeDocumentFilter.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
+
+import org.apache.commons.lang3.mutable.MutableLong;
+import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Implements a filter to decide if a given Mongo document should be processed
or ignored based on its path. The filter has
+ * two configuration parameters:
+ *
+ * <ul>
+ * <li> filteredPath - The path where the filter is applied. Only the
documents inside this path will be considered for filtering.
+ * Documents in other paths will all be accepted.
+ * <li> suffixesToSkip - A list of suffixes to filter. That is, any document
whose path ends in one of these suffixes will
+ * be filtered.
+ * </ul>
+ * <p>
+ * The intent of this filter is to be applied as close as possible to the
download/decoding of the documents from Mongo,
+ * in order to filter unnecessary documents early and avoid spending resources
processing them.
+ */
+public class NodeDocumentFilter {
+ private static final Logger LOG =
LoggerFactory.getLogger(NodeDocumentFilter.class);
+
+ private final String filteredPath;
+ private final List<String> suffixesToSkip;
+
+ private final boolean filteringDisabled;
+
+ // Statistics
+ private final AtomicLong skippedFields = new AtomicLong(0);
+ private final AtomicLong longPathSkipped = new AtomicLong(0);
+ private final ConcurrentHashMap<String, MutableLong>
filteredSuffixesCounts = new ConcurrentHashMap<>();
+
+ public NodeDocumentFilter(String filteredPath, List<String>
suffixesToSkip) {
+ this.filteredPath = filteredPath;
+ this.suffixesToSkip = suffixesToSkip;
+ this.filteringDisabled = filteredPath.isBlank() ||
suffixesToSkip.isEmpty();
+ if (filteringDisabled) {
+ LOG.info("Node document filtering disabled.");
+ }
+ }
+
+ /**
+ * @param fieldName Name of the Mongo document field. Expected to be
either _id or _path
+ * @param idOrPathValue The value of the field
+ * @return true if the document should be skipped, false otherwise
+ */
+ public boolean shouldSkip(String fieldName, String idOrPathValue) {
+ if (filteringDisabled) {
+ return false;
+ }
+ // Check if the NodeDocument should be considered for filtering, that
is, if it starts with includePath.
+ // If the value is for an _id, then we must find the start of the path
section, that is, the position of the first
+ // slash (3:/foo/bar/baz). If the value given is for a path, then it
already contains only the path. In any case,
+ // we look up for the first occurrence of /
+ int idxOfFirstForwardSlash = idOrPathValue.indexOf('/');
+ if (idxOfFirstForwardSlash < 0) {
+ LOG.info("Invalid field. {} = {}", fieldName, idOrPathValue);
Review Comment:
Using info level and the message "invalid" are somewhat conflicting... Did
you see such a case already? If not, I would suggest to use "warn". If you did,
then what field / what was the reason? If it can happen and we know in which
(rare) cases, then I wonder why a log message at all...
##########
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeDocumentCodec.java:
##########
@@ -49,44 +56,107 @@
* <li>Allows estimating the size of the document while reading it, which
will have a negligible overhead (as compared
* with doing an additional traverse of the object structure to compute the
size).</li>
* </ul>
- *
+ * <p>
* This class must be thread-safe, Mongo uses a single coded implementation
across multiple threads.
- *
*/
public class NodeDocumentCodec implements Codec<NodeDocument> {
+ private final static Logger LOG =
LoggerFactory.getLogger(NodeDocumentCodec.class);
+
+ public static final String
OAK_INDEXER_PIPELINED_NODE_DOCUMENT_FILTER_FILTERED_PATH =
"oak.indexer.pipelined.nodeDocument.filter.filteredPath";
+ public static final String
OAK_INDEXER_PIPELINED_NODE_DOCUMENT_FILTER_SUFFIXES_TO_SKIP =
"oak.indexer.pipelined.nodeDocument.filter.suffixesToSkip";
+ private final String filteredPath =
ConfigHelper.getSystemPropertyAsString(OAK_INDEXER_PIPELINED_NODE_DOCUMENT_FILTER_FILTERED_PATH,
"");
+ private final List<String> suffixesToSkip =
ConfigHelper.getSystemPropertyAsStringList(OAK_INDEXER_PIPELINED_NODE_DOCUMENT_FILTER_SUFFIXES_TO_SKIP,
"", ";");
+
// The estimated size is stored in the NodeDocument itself
public final static String SIZE_FIELD = "_ESTIMATED_SIZE_";
+
+ private static class NodeDocumentDecoderContext {
+ long docsDecoded = 0;
+ long dataDownloaded = 0;
+ int estimatedSizeOfCurrentObject = 0;
+ }
+
+ private final NodeDocument emptyNodeDocument;
+
private final MongoDocumentStore store;
private final Collection<NodeDocument> collection;
private final BsonTypeCodecMap bsonTypeCodecMap;
private final DecoderContext decoderContext =
DecoderContext.builder().build();
-
private final Codec<String> stringCoded;
private final Codec<Long> longCoded;
private final Codec<Boolean> booleanCoded;
+ private final NodeDocumentFilter fieldFilter = new
NodeDocumentFilter(filteredPath, suffixesToSkip);
+
+ // Statistics
+ private final AtomicLong totalDocsDecoded = new AtomicLong(0);
+ private final AtomicLong totalDataDownloaded = new AtomicLong(0);
+ private final ThreadLocal<NodeDocumentDecoderContext> perThreadContext =
ThreadLocal.withInitial(NodeDocumentDecoderContext::new);
+
public NodeDocumentCodec(MongoDocumentStore store,
Collection<NodeDocument> collection, CodecRegistry defaultRegistry) {
this.store = store;
this.collection = collection;
this.bsonTypeCodecMap = new BsonTypeCodecMap(new BsonTypeClassMap(),
defaultRegistry);
+ this.emptyNodeDocument = collection.newDocument(store);
// Retrieve references to the most commonly used codecs, to avoid the
map lookup in the common case
this.stringCoded = (Codec<String>)
bsonTypeCodecMap.get(BsonType.STRING);
this.longCoded = (Codec<Long>) bsonTypeCodecMap.get(BsonType.INT64);
this.booleanCoded = (Codec<Boolean>)
bsonTypeCodecMap.get(BsonType.BOOLEAN);
}
+ /**
+ * Skipping over values in the BSON file is faster than reading them.
Skipping is done by advancing a pointer in
+ * an internal buffer, while reading requires converting them to a Java
data type (typically String).
+ */
+ private void skipUntilEndOfDocument(BsonReader reader) {
Review Comment:
To avoid the duplicate line:
```suggestion
private void skipUntilEndOfDocument(BsonReader reader) {
while (true) {
BsonType bsonType = reader.readBsonType();
if (bsonType == BsonType.END_OF_DOCUMENT) {
break;
}
reader.skipName();
reader.skipValue();
}
reader.readEndDocument();
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]