This is an automated email from the ASF dual-hosted git repository.

stefanegli pushed a commit to branch OAK-10659
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git

commit 39c4fe1d7c41afe16a659b47b8569fc59fa9ddf2
Author: Stefan Egli <[email protected]>
AuthorDate: Thu Feb 22 15:12:47 2024 +0100

    OAK-10659 : remove orphaned nodes/documents
---
 .../plugins/document/VersionGarbageCollector.java  | 110 ++++--
 .../oak/plugins/document/FailingDocumentStore.java | 137 +++++++-
 .../plugins/document/PausableDocumentStore.java    | 160 +++++++++
 .../document/VersionGarbageCollectorIT.java        | 375 +++++++++++++++++++++
 4 files changed, 740 insertions(+), 42 deletions(-)

diff --git 
a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollector.java
 
b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollector.java
index 73565b29fc..9e00bb2d35 100644
--- 
a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollector.java
+++ 
b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollector.java
@@ -53,6 +53,7 @@ import 
org.apache.jackrabbit.oak.plugins.document.util.TimeInterval;
 import org.apache.jackrabbit.oak.plugins.document.util.Utils;
 import org.apache.jackrabbit.oak.spi.gc.DelegatingGCMonitor;
 import org.apache.jackrabbit.oak.spi.gc.GCMonitor;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.jackrabbit.oak.stats.Clock;
 import org.apache.jackrabbit.oak.commons.TimeDurationFormatter;
 import org.apache.jackrabbit.oak.stats.StatisticsProvider;
@@ -898,6 +899,9 @@ public class VersionGarbageCollector {
         private final Stopwatch timer;
         private final List<UpdateOp> updateOpList;
 
+        /** contains the list of _ids of orphan or deleted documents to be 
removed in the current batch **/
+        private final List<String> orphanOrDeletedRemovalList;
+
         /**
          * Map of documentId => total no. of deleted properties.
          * <p>
@@ -919,6 +923,7 @@ public class VersionGarbageCollector {
         private int totalGarbageDocsCount;
         private final Revision revisionForModified;
         private final Revision ownHeadRevision;
+        private final DocumentNodeState root;
 
         public DetailedGC(@NotNull RevisionVector headRevision, long 
toModifiedMs, @NotNull GCMonitor monitor, @NotNull AtomicBoolean cancel) {
             this.headRevision = requireNonNull(headRevision);
@@ -926,12 +931,14 @@ public class VersionGarbageCollector {
             this.monitor = monitor;
             this.cancel = cancel;
             this.updateOpList = new ArrayList<>();
+            this.orphanOrDeletedRemovalList = new ArrayList<>();
             this.deletedPropsCountMap = new HashMap<>();
             this.deletedUnmergedBCSet = new HashSet<>();
             this.timer = createUnstarted();
             // clusterId is not used
             this.revisionForModified = Revision.newRevision(0);
             this.ownHeadRevision = 
headRevision.getRevision(nodeStore.getClusterId());
+            this.root = nodeStore.getRoot(headRevision);
         }
 
         public void collectGarbage(final NodeDocument doc, final GCPhases 
phases) {
@@ -942,21 +949,63 @@ public class VersionGarbageCollector {
             final UpdateOp op = new UpdateOp(requireNonNull(doc.getId()), 
false);
             op.equals(MODIFIED_IN_SECS, doc.getModified());
 
-            collectDeletedProperties(doc, phases, op);
-            collectUnmergedBranchCommits(doc, phases, op, toModifiedMs);
-            collectOldRevisions(doc, phases, op);
-            // only add if there are changes for this doc
-            if (op.hasChanges()) {
+            // traversed state == state of node at doc.id based on head 
revision
+            NodeState traversedState = root;
+            for (String name : doc.getPath().elements()) {
+                traversedState = traversedState.getChildNode(name);
+            }
+
+            if (isDeletedOrOrphanedNode(doc, traversedState, phases)) {
+                // if this is an orphaned node, all that is needed is its 
removal
                 garbageDocsCount++;
                 totalGarbageDocsCount++;
-                monitor.info("Collected [{}] garbage for doc [{}]", 
op.getChanges().size(), doc.getId());
-                updateOpList.add(op);
+                monitor.info("Deleted orphaned or deleted doc [{}]", 
doc.getId());
+                orphanOrDeletedRemovalList.add(doc.getId());
+            } else {
+                collectDeletedProperties(doc, phases, op);
+                collectUnmergedBranchCommits(doc, phases, op, toModifiedMs);
+                collectOldRevisions(doc, phases, op);
+                // only add if there are changes for this doc
+                if (op.hasChanges()) {
+                    garbageDocsCount++;
+                    totalGarbageDocsCount++;
+                    monitor.info("Collected [{}] garbage for doc [{}]", 
op.getChanges().size(), doc.getId());
+                    updateOpList.add(op);
+                }
             }
             if (log.isDebugEnabled()) {
                 log.debug("UpdateOp for {} is {}", doc.getId(), op);
             }
         }
 
+        /**
+         * Check if the node represented by the given doc and traversedState is
+         * <i>orphaned</i>. A node is considered orphaned if it does not have 
a visible
+         * parent node. But from a GC point of view this also includes regular
+         * deletion cases that have not otherwise been deleted already (eg by 
DeletedDocsGC).
+         *
+         * @param doc
+         * @param traversedState
+         * @param phases
+         * @return true if the node is orphaned (and/or can be removed), false
+         *         otherwise
+         */
+        private boolean isDeletedOrOrphanedNode(NodeDocument doc, NodeState 
traversedState,
+                GCPhases phases) {
+            // several different cases here, but ultimately they all lead back 
to:
+            return !traversedState.exists();
+
+            // if the node when reading at current headRevision (rather than 
traversed)
+            // does not exist, then this is rather a regular deletion, nothing 
special.
+            // that is usually handled in DeletedDocsGC - but if DetailedGC 
sees this,
+            // it seems risky to not delete it right away.
+
+            // if the node when reading at current headRevision (rather than 
traversed)
+            // does exist, then it could be either due to the parent node 
having
+            // been deleted (true orphan) - or the node itself got 
late-write-added.
+            // in both of these cases we should now delete it. That's all with 
above return.
+        }
+
         private boolean hasGarbage() {
             return garbageDocsCount > 0;
         }
@@ -1207,9 +1256,10 @@ public class VersionGarbageCollector {
             totalGarbageDocsCount = 0;
         }
 
+
         public void removeGarbage(final VersionGCStats stats) {
 
-            if (updateOpList.isEmpty()) {
+            if (updateOpList.isEmpty() && 
orphanOrDeletedRemovalList.isEmpty()) {
                 if (log.isDebugEnabled() || isDetailedGCDryRun) {
                     log.debug("Skipping removal of detailed garbage, cause no 
garbage detected");
                 }
@@ -1255,28 +1305,40 @@ public class VersionGarbageCollector {
                 }
                 if (!isDetailedGCDryRun) {
                     // only delete these in case it is not a dryRun
-                    List<NodeDocument> oldDocs = ds.findAndUpdate(NODES, 
updateOpList);
-                    int deletedProps = 
oldDocs.stream().filter(Objects::nonNull).mapToInt(d -> 
deletedPropsCountMap.getOrDefault(d.getId(), 0)).sum();
-                    int updatedDocs = (int) 
oldDocs.stream().filter(Objects::nonNull).count();
-                    stats.updatedDetailedGCDocsCount += updatedDocs;
-                    stats.deletedPropsCount += deletedProps;
-                    stats.deletedUnmergedBCCount += 
deletedUnmergedBCSet.size();
-
-                    if (log.isDebugEnabled()) {
-                        log.debug("Updated [{}] documents, deleted [{}] 
properties, deleted [{}] unmergedBranchCommits",
-                                updatedDocs, deletedProps, 
deletedUnmergedBCSet.size());
+
+                    if (!orphanOrDeletedRemovalList.isEmpty()) {
+                        ds.remove(NODES, orphanOrDeletedRemovalList);
+                        final int removedSize = 
orphanOrDeletedRemovalList.size();
+                        stats.updatedDetailedGCDocsCount += removedSize;
+                        stats.deletedDocGCCount += removedSize;
+                        detailedGCStats.documentsUpdated(removedSize);
                     }
 
-                    // save stats
-                    detailedGCStats.propertiesDeleted(deletedProps);
-                    
detailedGCStats.unmergedBranchCommitsDeleted(deletedUnmergedBCSet.size());
-                    detailedGCStats.documentsUpdated(updatedDocs);
-                    // fix for sonar : converted to long before operation
-                    
detailedGCStats.documentsUpdateSkipped((long)oldDocs.size() - updatedDocs);
+                    if (!updateOpList.isEmpty()) {
+                        List<NodeDocument> oldDocs = ds.findAndUpdate(NODES, 
updateOpList);
+                        int deletedProps = 
oldDocs.stream().filter(Objects::nonNull).mapToInt(d -> 
deletedPropsCountMap.getOrDefault(d.getId(), 0)).sum();
+                        int updatedDocs = (int) 
oldDocs.stream().filter(Objects::nonNull).count();
+                        stats.updatedDetailedGCDocsCount += updatedDocs;
+                        stats.deletedPropsCount += deletedProps;
+                        stats.deletedUnmergedBCCount += 
deletedUnmergedBCSet.size();
+
+                        if (log.isDebugEnabled()) {
+                            log.debug("Updated [{}] documents, deleted [{}] 
properties, deleted [{}] unmergedBranchCommits",
+                                    updatedDocs, deletedProps, 
deletedUnmergedBCSet.size());
+                        }
+
+                        // save stats
+                        detailedGCStats.propertiesDeleted(deletedProps);
+                        
detailedGCStats.unmergedBranchCommitsDeleted(deletedUnmergedBCSet.size());
+                        detailedGCStats.documentsUpdated(updatedDocs);
+                        // fix for sonar : converted to long before operation
+                        
detailedGCStats.documentsUpdateSkipped((long)oldDocs.size() - updatedDocs);
+                    }
                 }
             } finally {
                 // now reset delete metadata
                 updateOpList.clear();
+                orphanOrDeletedRemovalList.clear();
                 deletedPropsCountMap.clear();
                 deletedUnmergedBCSet.clear();
                 garbageDocsCount = 0;
diff --git 
a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/FailingDocumentStore.java
 
b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/FailingDocumentStore.java
index 9bc64f9f2e..adfccbe0ee 100644
--- 
a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/FailingDocumentStore.java
+++ 
b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/FailingDocumentStore.java
@@ -49,14 +49,30 @@ class FailingDocumentStore extends DocumentStoreWrapper {
 
     private List<Collection<? extends Document>> collectionIncludeList;
 
+    private List<String> idIncludeList;
+
     private List<FailedUpdateOpListener> listeners = new ArrayList<>();
 
+    private boolean afterOp = false;
+
+    private boolean noDispose = false;
+
     class Fail {
 
         private Fail() {
             never();
         }
 
+        Fail afterOp() {
+            afterOp = true;
+            return this;
+        }
+
+        Fail beforeOp() {
+            afterOp = false;
+            return this;
+        }
+
         Fail after(int numOps) {
             p = -1;
             failAfter.set(numOps);
@@ -74,6 +90,8 @@ class FailingDocumentStore extends DocumentStoreWrapper {
             failAfter.set(Long.MAX_VALUE);
             exceptionType = Type.GENERIC;
             collectionIncludeList = null;
+            idIncludeList = null;
+            afterOp = false;
         }
 
         void once() {
@@ -96,6 +114,14 @@ class FailingDocumentStore extends DocumentStoreWrapper {
             collectionIncludeList.add(collectionInclude);
             return this;
         }
+
+        Fail on(String idInclude) {
+            if (idIncludeList == null) {
+                idIncludeList = new LinkedList<>();
+            }
+            idIncludeList.add(idInclude);
+            return this;
+        }
     }
 
     public interface FailedUpdateOpListener {
@@ -127,8 +153,16 @@ class FailingDocumentStore extends DocumentStoreWrapper {
     @Override
     public <T extends Document> void remove(Collection<T> collection,
                                             String key) {
-        maybeFail(collection);
-        super.remove(collection, key);
+        if (!afterOp) {
+            maybeFail(collection);
+        }
+        try {
+            super.remove(collection, key);
+        } finally {
+            if (afterOp) {
+                maybeFail(collection);
+            }
+        }
     }
 
     @Override
@@ -146,8 +180,16 @@ class FailingDocumentStore extends DocumentStoreWrapper {
         int num = 0;
         // remove individually
         for (Map.Entry<String, Long> rm : toRemove.entrySet()) {
-            maybeFail(collection);
-            num += super.remove(collection, singletonMap(rm.getKey(), 
rm.getValue()));
+            if (!afterOp) {
+                maybeFail(collection);
+            }
+            try {
+                num += super.remove(collection, singletonMap(rm.getKey(), 
rm.getValue()));
+            } finally {
+                if (afterOp) {
+                    maybeFail(collection);
+                }
+            }
         }
         return num;
     }
@@ -158,8 +200,17 @@ class FailingDocumentStore extends DocumentStoreWrapper {
                                            long startValue,
                                            long endValue)
             throws DocumentStoreException {
-        maybeFail(collection);
-        return super.remove(collection, indexedProperty, startValue, endValue);
+        if (!afterOp) {
+            maybeFail(collection);
+        }
+        try {
+            return super.remove(collection, indexedProperty, startValue, 
endValue);
+        } finally {
+            if (afterOp) {
+                maybeFail(collection);
+            }
+
+        }
     }
 
     @Override
@@ -169,9 +220,17 @@ class FailingDocumentStore extends DocumentStoreWrapper {
         int i = 0;
         // create individually
         for (UpdateOp op : updateOps) {
-            maybeFail(collection, remaining.subList(i++, remaining.size()));
-            if (!super.create(collection, singletonList(op))) {
-                return false;
+            if (!afterOp) {
+                maybeFail(collection, remaining.subList(i++, 
remaining.size()));
+            }
+            try {
+                if (!super.create(collection, singletonList(op))) {
+                    return false;
+                }
+            } finally {
+                if (afterOp) {
+                    maybeFail(collection, remaining.subList(i++, 
remaining.size()));
+                }
             }
         }
         return true;
@@ -179,9 +238,17 @@ class FailingDocumentStore extends DocumentStoreWrapper {
 
     @Override
     public <T extends Document> T createOrUpdate(Collection<T> collection,
-                                                 UpdateOp update) {
-        maybeFail(collection, singletonList(update));
-        return super.createOrUpdate(collection, update);
+            UpdateOp update) {
+        if (!afterOp) {
+            maybeFail(collection, singletonList(update));
+        }
+        try {
+            return super.createOrUpdate(collection, update);
+        } finally {
+            if (afterOp) {
+                maybeFail(collection, singletonList(update));
+            }
+        }
     }
 
     @Override
@@ -191,8 +258,16 @@ class FailingDocumentStore extends DocumentStoreWrapper {
         List<T> result = Lists.newArrayList();
         int i = 0;
         for (UpdateOp op : updateOps) {
-            maybeFail(collection, remaining.subList(i++, remaining.size()));
-            result.add(super.createOrUpdate(collection, op));
+            if (!afterOp) {
+                maybeFail(collection, remaining.subList(i++, 
remaining.size()));
+            }
+            try {
+                result.add(super.createOrUpdate(collection, op));
+            } finally {
+                if (afterOp) {
+                    maybeFail(collection, remaining.subList(i++, 
remaining.size()));
+                }
+            }
         }
         return result;
     }
@@ -200,8 +275,16 @@ class FailingDocumentStore extends DocumentStoreWrapper {
     @Override
     public <T extends Document> T findAndUpdate(Collection<T> collection,
                                                 UpdateOp update) {
-        maybeFail(collection, singletonList(update));
-        return super.findAndUpdate(collection, update);
+        if (!afterOp) {
+            maybeFail(collection, singletonList(update));
+        }
+        try {
+            return super.findAndUpdate(collection, update);
+        } finally {
+            if (afterOp) {
+                maybeFail(collection, singletonList(update));
+            }
+        }
     }
 
     private <T extends Document> void maybeFail(Collection<T> collection) {
@@ -211,15 +294,33 @@ class FailingDocumentStore extends DocumentStoreWrapper {
     private <T extends Document> void maybeFail(Collection<T> collection,
                                                 List<UpdateOp> remainingOps) {
         if ((collectionIncludeList == null || 
collectionIncludeList.contains(collection)) &&
-                (random.nextFloat() < p || failAfter.getAndDecrement() <= 0)) {
+                (random.nextFloat() < p || failAfter.getAndDecrement() <= 0) &&
+                (idIncludeList == null || (!remainingOps.isEmpty()
+                        && 
idIncludeList.contains(remainingOps.get(0).getId())))) {
             if (numFailures.getAndDecrement() > 0) {
                 reportRemainingOps(remainingOps);
-                throw new DocumentStoreException("write operation failed", 
null, exceptionType);
+                failNow(remainingOps);
             }
         }
     }
 
+    void failNow(List<UpdateOp> remainingOps) {
+        throw new DocumentStoreException("write operation failed", null, 
exceptionType);
+    }
+
     private void reportRemainingOps(List<UpdateOp> remainingOps) {
         listeners.forEach(listener -> remainingOps.forEach(listener::failed));
     }
+
+    public void noDispose() {
+        noDispose = true;
+    }
+
+    @Override
+    public void dispose() {
+        if (!noDispose) {
+            super.dispose();
+        }
+    }
+
 }
diff --git 
a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/PausableDocumentStore.java
 
b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/PausableDocumentStore.java
new file mode 100644
index 0000000000..06bd6213e3
--- /dev/null
+++ 
b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/PausableDocumentStore.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.util.List;
+
+import org.apache.jackrabbit.oak.plugins.document.DocumentStoreException.Type;
+import org.apache.jackrabbit.oak.plugins.document.FailingDocumentStore.Fail;
+
+/**
+ * Wraps a document store and can be instructed to pause operations.
+ */
+public class PausableDocumentStore extends DocumentStoreWrapper {
+
+    public interface PauseCallback {
+
+        /**
+         * @return the PauseCallback to use going forward - null to stop
+         * doing pauses
+         */
+        PauseCallback handlePause(List<UpdateOp> remainingOps);
+    }
+
+    /**
+     * small extension of FailingDocumentStore that doesn't throw an exception 
but
+     * pauses (pauseNow instead of failNow).
+     */
+    static class PausingFailingDocumentStore extends FailingDocumentStore {
+
+        private PausableDocumentStore pds;
+
+        PausingFailingDocumentStore(DocumentStore store) {
+            super(store);
+        }
+
+        PausingFailingDocumentStore(DocumentStore store, long seed) {
+            super(store, seed);
+        }
+
+        private void bondWith(PausableDocumentStore pds) {
+            this.pds = pds;
+        }
+
+        @Override
+        void failNow(List<UpdateOp> remainingOps) {
+            pds.pauseNow(remainingOps);
+        }
+    }
+
+    class Pause {
+
+        Fail f = getFailingDocumentStore().fail();
+
+        private Pause() {
+            never();
+        }
+
+        Pause afterOp() {
+            f.afterOp();
+            return this;
+        }
+
+        Pause beforeOp() {
+            f.beforeOp();
+            return this;
+        }
+
+        Pause after(int numOps) {
+            f.after(numOps);
+            return this;
+        }
+
+        Pause withType(Type type) {
+            f.withType(type);
+            return this;
+        }
+
+        void never() {
+            f.never();
+        }
+
+        void once() {
+            f.once();
+        }
+
+        void eternally() {
+            f.eternally();
+        }
+
+        Pause randomly(double probability) {
+            f.randomly(probability);
+            return this;
+        }
+
+        Pause on(Collection<? extends Document> collectionInclude) {
+            f.on(collectionInclude);
+            return this;
+        }
+
+        Pause on(String idInclude) {
+            f.on(idInclude);
+            return this;
+        }
+    }
+
+    PauseCallback pauseCallback = null;
+
+    PausableDocumentStore(DocumentStore store, long seed) {
+        super(new PausingFailingDocumentStore(store, seed));
+        bond();
+    }
+
+    PausableDocumentStore(DocumentStore store) {
+        super(new PausingFailingDocumentStore(store));
+        bond();
+    }
+
+    private void bond() {
+        getFailingDocumentStore().bondWith(this);
+    }
+
+    private PausingFailingDocumentStore getFailingDocumentStore() {
+        return (PausingFailingDocumentStore) store;
+    }
+
+    Pause pauseWith(PauseCallback r) {
+        assertNotNull(r);
+        pauseCallback = r;
+        return new Pause();
+    }
+
+    void pauseNow(List<UpdateOp> remainingOps) {
+        PauseCallback nextCallback = pauseCallback.handlePause(remainingOps);
+        if (nextCallback == null) {
+            new Pause().never();
+        } else if (nextCallback != pauseCallback) {
+            pauseWith(nextCallback);
+        } // else continue using the same pauseCallback
+    }
+
+    public void noDispose() {
+        getFailingDocumentStore().noDispose();
+    }
+}
diff --git 
a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollectorIT.java
 
b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollectorIT.java
index 92ce1a888d..f2c153f5f0 100644
--- 
a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollectorIT.java
+++ 
b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollectorIT.java
@@ -19,11 +19,14 @@
 package org.apache.jackrabbit.oak.plugins.document;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
@@ -49,6 +52,7 @@ import static java.util.concurrent.TimeUnit.MINUTES;
 import static org.apache.jackrabbit.oak.api.Type.NAME;
 import static org.apache.jackrabbit.oak.api.Type.STRING;
 import static org.apache.jackrabbit.oak.api.Type.STRINGS;
+import static 
org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.DEFAULT_LEASE_DURATION_MILLIS;
 import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
 import static org.apache.jackrabbit.oak.plugins.document.Collection.SETTINGS;
 import static 
org.apache.jackrabbit.oak.plugins.document.DetailGCHelper.assertBranchRevisionNotRemovedFromAllDocuments;
@@ -61,10 +65,14 @@ import static 
org.apache.jackrabbit.oak.plugins.document.NodeDocument.MIN_ID_VAL
 import static 
org.apache.jackrabbit.oak.plugins.document.NodeDocument.NUM_REVS_THRESHOLD;
 import static 
org.apache.jackrabbit.oak.plugins.document.NodeDocument.PREV_SPLIT_FACTOR;
 import static 
org.apache.jackrabbit.oak.plugins.document.NodeDocument.SplitDocType;
+import static 
org.apache.jackrabbit.oak.plugins.document.NodeDocument.isDeletedEntry;
 import static 
org.apache.jackrabbit.oak.plugins.document.NodeDocument.setModified;
 import static 
org.apache.jackrabbit.oak.plugins.document.Revision.getCurrentTimestamp;
 import static org.apache.jackrabbit.oak.plugins.document.Revision.newRevision;
 import static org.apache.jackrabbit.oak.plugins.document.TestUtils.NO_BINARY;
+import static org.apache.jackrabbit.oak.plugins.document.TestUtils.createChild;
+import static 
org.apache.jackrabbit.oak.plugins.document.TestUtils.disposeQuietly;
+import static 
org.apache.jackrabbit.oak.plugins.document.TestUtils.childBuilder;
 import static 
org.apache.jackrabbit.oak.plugins.document.VersionGarbageCollector.SETTINGS_COLLECTION_DETAILED_GC_DOCUMENT_ID_PROP;
 import static 
org.apache.jackrabbit.oak.plugins.document.VersionGarbageCollector.SETTINGS_COLLECTION_DETAILED_GC_DRY_RUN_DOCUMENT_ID_PROP;
 import static 
org.apache.jackrabbit.oak.plugins.document.VersionGarbageCollector.SETTINGS_COLLECTION_DETAILED_GC_DRY_RUN_TIMESTAMP_PROP;
@@ -97,6 +105,7 @@ import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
 import 
org.apache.jackrabbit.oak.plugins.document.DocumentStoreFixture.RDBFixture;
+import 
org.apache.jackrabbit.oak.plugins.document.FailingDocumentStore.FailedUpdateOpListener;
 import 
org.apache.jackrabbit.oak.plugins.document.VersionGarbageCollector.VersionGCStats;
 import 
org.apache.jackrabbit.oak.plugins.document.bundlor.BundlingConfigInitializer;
 import org.apache.jackrabbit.oak.plugins.document.mongo.MongoTestUtils;
@@ -113,6 +122,7 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -192,11 +202,20 @@ public class VersionGarbageCollectorIT {
     }
 
     private void createSecondaryStore(LeaseCheckMode leaseCheckNode) {
+        createSecondaryStore(leaseCheckNode, false);
+    }
+
+    private void createSecondaryStore(LeaseCheckMode leaseCheckNode, boolean 
withFailingDS) {
         if (fixture instanceof RDBFixture) {
             ((RDBFixture) fixture).setRDBOptions(
                     new 
RDBOptions().tablePrefix(rdbTablePrefix).dropTablesOnClose(false));
         }
         ds2 = fixture.createDocumentStore();
+        if (withFailingDS) {
+            FailingDocumentStore failingDs = new FailingDocumentStore(ds2);
+            failingDs.noDispose();
+            ds2 = failingDs;
+        }
         DocumentMK.Builder documentMKBuilder2 = new 
DocumentMK.Builder().clock(clock).setClusterId(2)
                 .setLeaseCheckMode(leaseCheckNode)
                 .setDocumentStore(ds2).setAsyncDelay(0);
@@ -1182,6 +1201,362 @@ public class VersionGarbageCollectorIT {
     }
     // OAK-8646 END
 
+    private void createNodes(Collection<String> paths) throws Exception {
+        createNodes(paths.toArray(new String[paths.size()]));
+    }
+
+    private void createNodes(String... paths) throws CommitFailedException {
+        createNodes(store1, paths);
+    }
+
+    private void createNodes(DocumentNodeStore dns,
+            String... paths) throws CommitFailedException {
+        for (String path : paths) {
+            merge(dns, createChild(dns.getRoot().builder(), path));
+        }
+        dns.runBackgroundOperations();
+    }
+
+    interface LateWriteChangesBuilder {
+        void apply(NodeBuilder root, String path);
+    }
+
+    private void lateWriteCreateNodes(Collection<String> orphanedPaths,
+            String unrelatedPathOrNull) throws Exception {
+        lateWrite(orphanedPaths, (root, path) -> createChild(root, path),
+                unrelatedPathOrNull, ADD_NODE_OPS);
+    }
+
+    private void lateWriteRemoveNodes(Collection<String> orphanedPaths,
+            String unrelatedPathOrNull) throws Exception {
+        lateWrite(orphanedPaths, (rb, path) -> childBuilder(rb, path).remove(),
+                unrelatedPathOrNull, REMOVE_NODE_OPS);
+    }
+
+    /**
+     * Creates orphaned nodes, late write style. Assumes the secondary store 
is not
+     * in use as it needs to control its creation and disposal.
+     *
+     * @param filterPredicate
+     */
+    private void lateWrite(Collection<String> orphanedPaths,
+            LateWriteChangesBuilder lateWriteChangesBuilder, String 
unrelatedPath,
+            Predicate<UpdateOp> filterPredicate) throws Exception {
+        // this method requires store2 to be null as a prerequisite
+        assertNull(store2);
+        // as it creates store2 itself - then disposes it later too
+        createSecondaryStore(LeaseCheckMode.LENIENT, true);
+        // create the orphaned paths
+        final List<UpdateOp> failed = new ArrayList<>();
+        final FailingDocumentStore fds = (FailingDocumentStore) ds2;
+        fds.addListener(filter0(failed, filterPredicate));
+        fds.fail().after(0).eternally();
+        for (String path : orphanedPaths) {
+            try {
+                NodeBuilder rb = store2.getRoot().builder();
+                lateWriteChangesBuilder.apply(rb, path);
+                merge(store2, rb);
+                fail("merge must fail");
+            } catch (CommitFailedException e) {
+                // expected
+                String msg = e.getMessage();
+                e.printStackTrace();
+                assertEquals("OakOak0001: write operation failed", msg);
+            }
+        }
+        disposeQuietly(store2);
+        fds.fail().never();
+
+        // wait until lease expires
+        clock.waitUntil(clock.getTime() + DEFAULT_LEASE_DURATION_MILLIS + 1);
+
+        {
+            store1.renewClusterIdLease();
+            assertTrue(store1.getLastRevRecoveryAgent().isRecoveryNeeded());
+            assertEquals(0, store1.getLastRevRecoveryAgent().recover(2));
+        }
+
+        // 'late write'
+        fds.createOrUpdate(NODES, failed);
+
+        if (unrelatedPath == null || unrelatedPath.isEmpty()) {
+            return;
+        }
+
+        // revive clusterId 2
+        createSecondaryStore(LeaseCheckMode.LENIENT);
+        merge(store2, createChild(store2.getRoot().builder(), unrelatedPath));
+        store2.runBackgroundOperations();
+        store2.dispose();
+        store1.runBackgroundOperations();
+    }
+
+    /**
+     * Creates a bunch of parents properly, then creates a bunch of orphans in
+     * late-write manner (i.e. not properly), then runs DetailedGC and assets 
that
+     * everything was deleted as expected
+     * 
+     * @param parents                 the nodes that should be created 
properly -
+     *                                each one in a separate merge
+     * @param orphans                 the nodes that should be created 
inproperly -
+     *                                each one in a separate late-write way
+     * @param expectedNumOrphanedDocs the expected number of orphan documents 
that
+     *                                DetailedGC should cleanup
+     * @param unrelatedPath           an unrelated path that should be merged 
after
+     *                                late-write - ensures lastRev is updated 
on
+     *                                root to allow detecting late-writes as 
such
+     */
+    private void doLateWriteCreateChildrenGC(Collection<String> parents,
+            Collection<String> orphans, int expectedNumOrphanedDocs, String 
unrelatedPath)
+            throws Exception {
+        assumeTrue(fixture.hasSinglePersistence());
+        createNodes(parents);
+        lateWriteCreateNodes(orphans, unrelatedPath);
+
+        assertDocumentsExist(parents);
+        assertDocumentsExist(orphans);
+        assertNodesDontExist(parents, orphans);
+
+        enableDetailedGC(store1);
+
+        // wait two hours
+        clock.waitUntil(clock.getTime() + HOURS.toMillis(2));
+        // clean everything older than one hour
+        VersionGCStats stats = gc(store1.getVersionGarbageCollector(), 1, 
HOURS);
+        assertNotNull(stats);
+        assertEquals(expectedNumOrphanedDocs, stats.deletedDocGCCount);
+
+        assertDocumentsExist(parents);
+        // and the main assert being: have those lateCreated (orphans) docs 
been deleted
+        assertNodesDontExist(parents, orphans);
+        assertDocumentsDontExist(orphans);
+    }
+
+    private void assertNodesDontExist(Collection<String> existingNodes,
+            Collection<String> missingNodes) {
+        for (String aMissingNode : missingNodes) {
+            assertChildNotExists(existingNodes, aMissingNode);
+        }
+    }
+
+    private void assertChildNotExists(Collection<String> existingNodes, String 
aMissingNode) {
+        final Path aMissingPath = Path.fromString(aMissingNode);
+        String nearestParent = null;
+        Path nearestParentPath = null;
+        for (String anExistingNode : existingNodes) {
+            final Path anExistingPath = Path.fromString(anExistingNode);
+            if (!anExistingPath.isAncestorOf(aMissingPath)) {
+                // skip
+                continue;
+            }
+            if (nearestParent == null || 
nearestParentPath.isAncestorOf(anExistingPath)) {
+                nearestParent = anExistingNode;
+                nearestParentPath = anExistingPath;
+            }
+        }
+        assertNotNull(nearestParent);
+        Path nearestChildPath = aMissingPath;
+        Path childParentPath = nearestChildPath.getParent();
+        while(nearestParentPath.isAncestorOf(childParentPath)) {
+            nearestChildPath = childParentPath;
+            childParentPath = childParentPath.getParent();
+        }
+        assertFalse(getChildeNodeState(store1, nearestParent, 
true).hasChildNode(nearestChildPath.getName()));
+    }
+
+    /**
+     * Tests whether DetailedGC properly deletes a late-written addChild 
"/grand/parent/a"
+     */
+    @Test
+    public void lateWriteCreateChildGC() throws Exception {
+        doLateWriteCreateChildrenGC(Arrays.asList("/grand/parent"),
+                Arrays.asList("/grand/parent/a"), 1, "/d");
+    }
+
+    /**
+     * Tests whether DetailedGC can delete a whole subtree "/a/b/c/d/**" that 
was
+     * added via late-writes.
+     */
+    @Test
+    public void lateWriteCreateChildTreeGC() throws Exception {
+        doLateWriteCreateChildrenGC(Arrays.asList("/a", "/a/b/c"),
+                Arrays.asList("/a/b/c/d", "/a/b/c/d/e/f"), 3, "/d");
+    }
+
+    /**
+     * Tests whether DetailedGC can delete a large amount of randomly
+     * created orphans (that were added in a late-write manner)
+     */
+    @Test
+    public void lateWriteCreateManyChildrenGC() throws Exception {
+        List<String> nonOrphans = Arrays.asList("/a", "/b", "/c");
+        createNodes(nonOrphans);
+        Set<String> orphans = new HashSet<>();
+        Set<String> commonOrphanParents = new HashSet<>();
+        Random r = new Random(43);
+        for(int i = 0; i < 900; i++) {
+            String orphanParent = nonOrphans.get(r.nextInt(3)) +
+                                "/" + r.nextInt(42);
+            commonOrphanParents.add(orphanParent);
+            orphans.add(orphanParent + "/" + r.nextInt(24));
+        }
+        doLateWriteCreateChildrenGC(nonOrphans,
+                orphans, orphans.size() + commonOrphanParents.size(), "/d");
+    }
+
+    @Test
+    @Ignore(value = "OAK-10535 : fails currently as uncommitted revisions 
aren't yet removed")
+    public void lateWriteRemoveChildGC_noSweep() throws Exception {
+        assumeTrue(fixture.hasSinglePersistence());
+        enableDetailedGC(store1);
+        createNodes("/a/b/c/d");
+        lateWriteRemoveNodes(Arrays.asList("/a/b"), null);
+
+        assertTrue(getChildeNodeState(store1, "/a/b/c/d", true).exists());
+
+        // wait two hours
+        clock.waitUntil(clock.getTime() + HOURS.toMillis(2));
+        // clean everything older than one hour
+        VersionGCStats stats = gc(store1.getVersionGarbageCollector(), 1, 
HOURS);
+        assertNotNull(stats);
+
+        assertTrue(store1.getDocumentStore().find(NODES, "2:/a/b") != null);
+        assertTrue(store1.getDocumentStore().find(NODES, "4:/a/b/c/d") != 
null);
+        assertTrue(getChildeNodeState(store1, "/a/b/c/d", true).exists());
+        //TODO: below assert fails currently as uncommitted revisions aren't 
yet removed
+        // should be 3 as it should clean up the _deleted from /a/b, /a/b/c 
and /a/b/c/d
+        assertEquals(3, stats.updatedDetailedGCDocsCount);
+    }
+
+    /**
+     * This (re)produces a case where classic GC deletes nodes
+     * but they are still in the nodes cache, eg:
+     *  org.apache.jackrabbit.oak.plugins.document.ConflictException: 
+     * The node 4:/a/b/c/d does not exist or is already deleted 
+     * at base revision r2-0-1,r1-0-2,
+     * branch: null, commit revision: re-0-1]
+     */
+    @Test
+    @Ignore(value = "OAK-10658 : fails currently as invalidation is missing 
(in classic GC) after late-write-then-sweep-then-GC")
+    public void lateWriteRemoveChildGC_withSweep() throws Exception {
+        assumeTrue(fixture.hasSinglePersistence());
+        enableDetailedGC(store1);
+        createNodes("/a/b/c/d");
+        lateWriteRemoveNodes(Arrays.asList("/a/b"), "/foo");
+
+        getChildeNodeState(store1, "/a/b/c/d", true);
+
+        // wait two hours
+        clock.waitUntil(clock.getTime() + HOURS.toMillis(2));
+        // clean everything older than one hour
+        getChildeNodeState(store1, "/a/b/c/d", true);
+        assertTrue(store1.getDocumentStore().find(NODES, "4:/a/b/c/d") != 
null);
+        assertTrue(store1.getDocumentStore().find(NODES, "3:/a/b/c") != null);
+        assertTrue(store1.getDocumentStore().find(NODES, "2:/a/b") != null);
+
+        VersionGCStats stats = gc(store1.getVersionGarbageCollector(), 1, 
HOURS);
+        assertNotNull(stats);
+
+        assertFalse(store1.getDocumentStore().find(NODES, "4:/a/b/c/d") != 
null);
+        assertFalse(store1.getDocumentStore().find(NODES, "3:/a/b/c") != null);
+        assertFalse(store1.getDocumentStore().find(NODES, "2:/a/b") != null);
+
+        // invalidating store1's nodeCache would fix it
+        // but we need that to happen in prod code, not test
+//        store1.getNodeCache().invalidateAll();
+
+        // creating /a/b/c again, below late-write-removed /a/b
+        // triggered a ConflictException
+        createNodes("/a/b/c/d/e");
+        getChildeNodeState(store1, "/a/b/c/d/e", true);
+    }
+
+    private void enableDetailedGC(DocumentNodeStore dns) throws 
IllegalAccessException {
+        final VersionGarbageCollector vgc = dns.getVersionGarbageCollector();
+        writeField(vgc, "detailedGCEnabled", true, true);
+    }
+
+    @Test
+    public void orphanedChildGC() throws Exception {
+        assumeTrue(fixture.hasSinglePersistence());
+        createSecondaryStore(LeaseCheckMode.LENIENT);
+        createNodes(store2, "/a/b/c", "/a/b/c/d/e", "/a/f/g");
+        ds2.remove(NODES, "3:/a/b/c");
+        store2.dispose();
+
+        store1.runBackgroundOperations();
+
+        // wait two hours
+        clock.waitUntil(clock.getTime() + HOURS.toMillis(2));
+        // clean everything older than one hour
+        enableDetailedGC(store1);
+        VersionGCStats stats = gc(store1.getVersionGarbageCollector(), 1, 
HOURS);
+        assertNotNull(stats);
+        // expected 2 updated (deletions): /a/b/c/d and /a/b/c/d/e
+        assertEquals(2, stats.updatedDetailedGCDocsCount);
+        assertEquals(2, stats.deletedDocGCCount);
+
+        createNodes("/a/b/c/d/e");
+    }
+
+    private void assertDocumentsDontExist(Collection<String> nonExistingPaths) 
{
+        for (String nonExistingPath : nonExistingPaths) {
+            Path p = Path.fromString(nonExistingPath);
+            assertFalse(getChildeNodeState(store1, nonExistingPath, 
false).exists());
+            String id = Utils.getIdFromPath(p);
+            assertTrue(store1.getDocumentStore().find(NODES, id) == null);
+        }
+    }
+
+    private void assertDocumentsExist(Collection<String> paths) {
+        for (String aPath : paths) {
+            Path p = Path.fromString(aPath);
+            String id = Utils.getIdFromPath(p);
+            assertFalse(store1.getDocumentStore().find(NODES, id, -1) == null);
+        }
+    }
+
+    private static final Predicate<UpdateOp> ADD_NODE_OPS = updateOp -> {
+        for (UpdateOp.Key key : updateOp.getChanges().keySet()) {
+            if (isDeletedEntry(key.getName())
+                    && updateOp.getChanges().get(key).value.equals("false")) {
+                return true;
+            }
+        }
+        return false;
+    };
+
+    private static final Predicate<UpdateOp> REMOVE_NODE_OPS = updateOp -> {
+        for (UpdateOp.Key key : updateOp.getChanges().keySet()) {
+            if (isDeletedEntry(key.getName())
+                    && updateOp.getChanges().get(key).value.equals("true")) {
+                return true;
+            }
+        }
+        return false;
+    };
+
+    private static FailedUpdateOpListener filter0(List<UpdateOp> ops,
+            Predicate<UpdateOp> predicate) {
+        return op -> {
+            if (predicate.test(op)) {
+                ops.add(op);
+            }
+        };
+    }
+
+    private static NodeState getChildeNodeState(DocumentNodeStore ns2, String 
path, boolean assertIntermediatesExist) {
+        final Path p = Path.fromString(path);
+        NodeState state = ns2.getRoot();
+        for (String name : p.elements()) {
+            state = state.getChildNode(name);
+            if (assertIntermediatesExist) {
+                assertTrue(state.exists());
+            }
+        }
+        return state;
+    }
+
     // OAK-10370
     @Test
     public void testGCDeletedPropsWithDryRunMode() throws Exception {


Reply via email to