thomasmueller commented on code in PR #1716:
URL: https://github.com/apache/jackrabbit-oak/pull/1716#discussion_r1760717094


##########
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/ConfigHelper.java:
##########
@@ -48,4 +52,11 @@ public static boolean getSystemPropertyAsBoolean(String 
name, boolean defaultVal
         LOG.info("Config {}={}", name, value);
         return value;
     }
+
+    public static List<String> getSystemPropertyAsStringList(String name, 
String defaultValue, String separator) {
+        String result = System.getProperty(name, defaultValue);
+        List<String> parts = result.isBlank() ? List.of() : 
Arrays.stream(result.split(separator)).map(String::trim).collect(Collectors.toList());

Review Comment:
   What about to methods: `getSystemProperty` and `splitIgnoreSpace`? I see the 
code is quite lenient in what it accepts (isBlank vs isEmpty, trim). For 
user-facing configuration, I think it's the right thing to do; we ran into 
problems because people added spaces eg. "type" = "lucene " (trailing space). 
   
   (I think it is not necessary in this case because this is not end-user 
facing but only for internal use. For internal use, I think non-lenient 
behavior is better because it results in shorter, easier to read code. But it's 
not that bad here, so I think it's fine.)
   
   But more importantly, what if spaces have meaning? The method name 
`getSystemPropertyAsStringList` doesn't convey what it does, and there is no 
Javadoc.
   
   What about renaming "separator" -> "separatorRegex", or then quote the 
separator (using Pattern.quote I think)?



##########
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeDocumentFilter.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
+
+import org.apache.commons.lang3.mutable.MutableLong;
+import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Implements a filter to decide if a given Mongo document should be processed 
or ignored based on its path. The filter has
+ * two configuration parameters:
+ *
+ * <ul>
+ * <li> filteredPath - The path where the filter is applied. Only the 
documents inside this path will be considered for filtering.
+ *   Documents in other paths will all be accepted.
+ * <li> suffixesToSkip - A list of suffixes to filter. That is, any document 
whose path ends in one of these suffixes will
+ *   be filtered.
+ * </ul>
+ * <p>
+ * The intent of this filter is to be applied as close as possible to the 
download/decoding of the documents from Mongo,
+ * in order to filter unnecessary documents early and avoid spending resources 
processing them.
+ */
+public class NodeDocumentFilter {
+    private static final Logger LOG = 
LoggerFactory.getLogger(NodeDocumentFilter.class);
+
+    private final String filteredPath;
+    private final List<String> suffixesToSkip;
+
+    private final boolean filteringDisabled;
+
+    // Statistics
+    private final AtomicLong skippedFields = new AtomicLong(0);
+    private final AtomicLong longPathSkipped = new AtomicLong(0);
+    private final ConcurrentHashMap<String, MutableLong> 
filteredSuffixesCounts = new ConcurrentHashMap<>();
+
+    public NodeDocumentFilter(String filteredPath, List<String> 
suffixesToSkip) {
+        this.filteredPath = filteredPath;
+        this.suffixesToSkip = suffixesToSkip;
+        this.filteringDisabled = filteredPath.isBlank() || 
suffixesToSkip.isEmpty();
+        if (filteringDisabled) {
+            LOG.info("Node document filtering disabled.");
+        }
+    }
+
+    /**
+     * @param fieldName     Name of the Mongo document field. Expected to be 
either  _id or _path
+     * @param idOrPathValue The value of the field
+     * @return true if the document should be skipped, false otherwise
+     */
+    public boolean shouldSkip(String fieldName, String idOrPathValue) {
+        if (filteringDisabled) {
+            return false;
+        }
+        // Check if the NodeDocument should be considered for filtering, that 
is, if it starts with includePath.
+        // If the value is for an _id, then we must find the start of the path 
section, that is, the position of the first
+        // slash (3:/foo/bar/baz). If the value given is for a path, then it 
already contains only the path. In any case,
+        // we look up for the first occurrence of /
+        int idxOfFirstForwardSlash = idOrPathValue.indexOf('/');
+        if (idxOfFirstForwardSlash < 0) {
+            LOG.info("Invalid field. {} = {}", fieldName, idOrPathValue);

Review Comment:
   Using info level and the message "invalid" are somewhat conflicting... Did 
you see such a case already? If not, I would suggest to use "warn". If you did, 
then what field / what was the reason? If it can happen and we know in which 
(rare) cases, then I wonder why a log message at all...



##########
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeDocumentCodec.java:
##########
@@ -49,44 +56,107 @@
  *   <li>Allows estimating the size of the document while reading it, which 
will have a negligible overhead (as compared
  *   with doing an additional traverse of the object structure to compute the 
size).</li>
  * </ul>
- *
+ * <p>
  * This class must be thread-safe, Mongo uses a single coded implementation 
across multiple threads.
- *
  */
 public class NodeDocumentCodec implements Codec<NodeDocument> {
+    private final static Logger LOG = 
LoggerFactory.getLogger(NodeDocumentCodec.class);
+
+    public static final String 
OAK_INDEXER_PIPELINED_NODE_DOCUMENT_FILTER_FILTERED_PATH = 
"oak.indexer.pipelined.nodeDocument.filter.filteredPath";
+    public static final String 
OAK_INDEXER_PIPELINED_NODE_DOCUMENT_FILTER_SUFFIXES_TO_SKIP = 
"oak.indexer.pipelined.nodeDocument.filter.suffixesToSkip";
+    private final String filteredPath = 
ConfigHelper.getSystemPropertyAsString(OAK_INDEXER_PIPELINED_NODE_DOCUMENT_FILTER_FILTERED_PATH,
 "");
+    private final List<String> suffixesToSkip = 
ConfigHelper.getSystemPropertyAsStringList(OAK_INDEXER_PIPELINED_NODE_DOCUMENT_FILTER_SUFFIXES_TO_SKIP,
 "", ";");
+
     // The estimated size is stored in the NodeDocument itself
     public final static String SIZE_FIELD = "_ESTIMATED_SIZE_";
+
+    private static class NodeDocumentDecoderContext {
+        long docsDecoded = 0;
+        long dataDownloaded = 0;
+        int estimatedSizeOfCurrentObject = 0;
+    }
+
+    private final NodeDocument emptyNodeDocument;
+
     private final MongoDocumentStore store;
     private final Collection<NodeDocument> collection;
     private final BsonTypeCodecMap bsonTypeCodecMap;
     private final DecoderContext decoderContext = 
DecoderContext.builder().build();
-
     private final Codec<String> stringCoded;
     private final Codec<Long> longCoded;
     private final Codec<Boolean> booleanCoded;
 
+    private final NodeDocumentFilter fieldFilter = new 
NodeDocumentFilter(filteredPath, suffixesToSkip);
+
+    // Statistics
+    private final AtomicLong totalDocsDecoded = new AtomicLong(0);
+    private final AtomicLong totalDataDownloaded = new AtomicLong(0);
+    private final ThreadLocal<NodeDocumentDecoderContext> perThreadContext = 
ThreadLocal.withInitial(NodeDocumentDecoderContext::new);
+
     public NodeDocumentCodec(MongoDocumentStore store, 
Collection<NodeDocument> collection, CodecRegistry defaultRegistry) {
         this.store = store;
         this.collection = collection;
         this.bsonTypeCodecMap = new BsonTypeCodecMap(new BsonTypeClassMap(), 
defaultRegistry);
+        this.emptyNodeDocument = collection.newDocument(store);
         // Retrieve references to the most commonly used codecs, to avoid the 
map lookup in the common case
         this.stringCoded = (Codec<String>) 
bsonTypeCodecMap.get(BsonType.STRING);
         this.longCoded = (Codec<Long>) bsonTypeCodecMap.get(BsonType.INT64);
         this.booleanCoded = (Codec<Boolean>) 
bsonTypeCodecMap.get(BsonType.BOOLEAN);
     }
 
+    /**
+     * Skipping over values in the BSON file is faster than reading them. 
Skipping is done by advancing a pointer in
+     * an internal buffer, while reading requires converting them to a Java 
data type (typically String).
+     */
+    private void skipUntilEndOfDocument(BsonReader reader) {

Review Comment:
   To avoid the duplicate line:
   ```suggestion
       private void skipUntilEndOfDocument(BsonReader reader) {
           while (true) {
               BsonType bsonType = reader.readBsonType();
               if (bsonType == BsonType.END_OF_DOCUMENT) {
                   break;
               }
               reader.skipName();
               reader.skipValue();
           }
           reader.readEndDocument();
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to