This is an automated email from the ASF dual-hosted git repository.

stefanegli pushed a commit to branch OAK-10595-fix
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git

commit 9e8319a3fdcbdd8a17380378458c0af30677754a
Author: Stefan Egli <[email protected]>
AuthorDate: Wed Jan 24 13:10:34 2024 +0100

    OAK-10595 : explicit invalidation journal entry after collsion rollback
---
 .../oak/plugins/document/DocumentNodeStore.java    | 61 +++++++++++++++--
 .../oak/plugins/document/JournalEntry.java         | 25 ++++++-
 .../document/DocumentNodeStoreSweepTest.java       | 78 +++++++++++++++++++---
 3 files changed, 150 insertions(+), 14 deletions(-)

diff --git 
a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
 
b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
index 9c4cee90d0..6f1941ca2d 100644
--- 
a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
+++ 
b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
@@ -556,6 +556,15 @@ public final class DocumentNodeStore
      */
     private final Set<Revision> inDoubtTrunkCommits = 
Sets.newConcurrentHashSet();
 
+    /**
+     * Contains journal entry revisions (branch commit style) that were created
+     * as a result of a rollback and are meant to trigger an invalidation in
+     * peer cluster nodes. This list is typically empty or small. It is empties
+     * upon each backgroundWrite. It is used to avoid duplicate journal entries
+     * that would otherwise be created as a result of merge (normal plus 
exclusive) retries
+     */
+    private final Set<String> pendingRollbackInvalidations = 
Sets.newConcurrentHashSet();
+
     private final Predicate<Path> nodeCachePredicate;
 
     private final Feature prefetchFeature;
@@ -1135,6 +1144,7 @@ public final class DocumentNodeStore
     }
 
     void canceled(Commit c) {
+        invalidatePathsOnCancel(c);
         if (commitQueue.contains(c.getRevision())) {
             try {
                 commitQueue.canceled(c.getRevision());
@@ -1160,6 +1170,43 @@ public final class DocumentNodeStore
         }
     }
 
+    void invalidatePathsOnCancel(Commit c) {
+        Iterable<Path> pathsToInvalidate = c.getModifiedPaths();
+        if (!pathsToInvalidate.iterator().hasNext()) {
+            // nothing to do
+            return;
+        }
+        if (changes != null) {
+            // first check if a new journal entry is needed at all
+            for (String pri : pendingRollbackInvalidations) {
+                JournalEntry je = store.find(JOURNAL, pri);
+                if (je == null) {
+                    // quite unexpected
+                    continue;
+                }
+                if (je.containsModified(pathsToInvalidate)) {
+                    return;
+                }
+            }
+        }
+        try {
+            // create the invalidation journal entry (branch commit style)
+            invalidatePaths0(pathsToInvalidate, "rollback invalidation", "");
+            // but also, already push it now, not waiting for backgroundWrite
+            // reason being that otherwise it might get lost on a crash,
+            // in which case the peers still would have uncommitted revisions
+            // in their nodesCache which might become "committed" on a
+            // subsequent fresh parent lastRev change of a future
+            // incarnation of this clusterId.
+            // thanks to pendingRollbackInvalidations however, this expensive
+            // journal entry push is only done once per collision (within
+            // a backgroundWrite cycle)
+            pushJournalEntry(Revision.newRevision(clusterId));
+        } catch(DocumentStoreException e) {
+            LOG.error("invalidatePathsOnCancel: " + e.getMessage());
+        }
+    }
+
     public void setAsyncDelay(int delay) {
         this.asyncDelay = delay;
     }
@@ -2640,6 +2687,12 @@ public final class DocumentNodeStore
             // nothing to do
             return;
         }
+        invalidatePaths0(pathsToInvalidate, "split",
+                " Will be retried with next background split operation.");
+    }
+
+    private void invalidatePaths0(@NotNull Iterable<Path> pathsToInvalidate, 
String type,
+            String errorSuffixMsg) {
         // create journal entry for cache invalidation
         JournalEntry entry = JOURNAL.newDocument(getDocumentStore());
         entry.modified(pathsToInvalidate);
@@ -2647,13 +2700,12 @@ public final class DocumentNodeStore
         UpdateOp journalOp = entry.asUpdateOp(r);
         if (store.create(JOURNAL, singletonList(journalOp))) {
             changes.invalidate(singletonList(r));
-            LOG.debug("Journal entry {} created for split of document(s) {}",
-                    journalOp.getId(), pathsToInvalidate);
+            LOG.debug("Journal entry {} created for {} of document(s) {}",
+                    journalOp.getId(), type, pathsToInvalidate);
         } else {
             String msg = "Unable to create journal entry " +
                     journalOp.getId() + " for document invalidation. " +
-                    "Will be retried with next background split " +
-                    "operation.";
+                    errorSuffixMsg;
             throw new DocumentStoreException(msg);
         }
     }
@@ -2710,6 +2762,7 @@ public final class DocumentNodeStore
         }, new UnsavedModifications.Snapshot() {
             @Override
             public void acquiring(Revision mostRecent) {
+                pendingRollbackInvalidations.clear();
                 pushJournalEntry(mostRecent);
             }
         }, backgroundOperationLock.writeLock());
diff --git 
a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java
 
b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java
index e656cfcc4e..e5f584f030 100644
--- 
a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java
+++ 
b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java
@@ -71,7 +71,7 @@ public final class JournalEntry extends Document {
 
     static final String BRANCH_COMMITS = "_bc";
 
-    private static final String INVALIDATE_ONLY = "_inv";
+    static final String INVALIDATE_ONLY = "_inv";
 
     public static final String MODIFIED = "_modified";
 
@@ -368,6 +368,29 @@ public final class JournalEntry extends Document {
         return Long.parseLong(parts[1], 16);
     }
 
+    boolean containsModified(Iterable<Path> paths) {
+        for (Path p : paths) {
+            if (!containsModified(p)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    boolean containsModified(Path path) {
+        if (path.isRoot()) {
+            return get(CHANGES) != null;
+        }
+        TreeNode node = getChanges();
+        for (String name : path.elements()) {
+            if (node.get(name) == null) {
+                return false;
+            }
+            node = node.get(name);
+        }
+        return true;
+    }
+
     void modified(Path path) {
         TreeNode node = getChanges();
         for (String name : path.elements()) {
diff --git 
a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreSweepTest.java
 
b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreSweepTest.java
index c9bf8bc284..5139e9e998 100644
--- 
a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreSweepTest.java
+++ 
b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreSweepTest.java
@@ -42,7 +42,6 @@ import org.jetbrains.annotations.NotNull;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -209,8 +208,33 @@ public class DocumentNodeStoreSweepTest {
      * </ul>
      */
     @Test
-    @Ignore(value = "OAK-10595")
-    public void cachingUncommittedBeforeCollisionRollback() throws Exception {
+    public void cachingUncommittedBeforeCollisionRollback_withPause() throws 
Exception {
+        doCachingUncommittedBeforeCollisionRollback(false, false);
+    }
+
+    /**
+     * Same as {@link #cachingUncommittedBeforeCollisionRollback_withPause()},
+     * except with a crash instead of a pause (of clusterId 2).
+     * This variant should test whether recovery correctly causes cache
+     * invalidation on clusterId 4 as well.
+     */
+    @Test
+    public void 
cachingUncommittedBeforeCollisionRollback_crashBeforeRollback() throws 
Exception {
+        doCachingUncommittedBeforeCollisionRollback(true, false);
+    }
+
+    /**
+     * Same as {@link 
#cachingUncommittedBeforeCollisionRollback_crashBeforeRollback()},
+     * except it crashes later, after the rollback was applied.
+     */
+    @Test
+    public void cachingUncommittedBeforeCollisionRollback_crashAfterRollback() 
throws Exception {
+        doCachingUncommittedBeforeCollisionRollback(true, true);
+    }
+
+    public void doCachingUncommittedBeforeCollisionRollback(
+            final boolean crashInsteadOfContinue,
+            final boolean crashAfterRollback) throws Exception {
         // two nodes part of the game:
         // 2 : the main one that starts to do a subtree deletion
         // 4 : a peer one that gets in between the above and causes a 
collision.
@@ -234,6 +258,9 @@ public class DocumentNodeStoreSweepTest {
                 breakpoint1.release();
                 try {
                     breakpoint2.tryAcquire(1, 30, TimeUnit.MINUTES);
+                    if (crashInsteadOfContinue && !crashAfterRollback) {
+                        throw new RuntimeException("crashInsteadOfContinue");
+                    }
                 } catch (InterruptedException e) {
                     throw new RuntimeException(e);
                 }
@@ -245,7 +272,7 @@ public class DocumentNodeStoreSweepTest {
         mf = pausableMongoDocumentStore("2:/parent/foo",
                 breakOnceInThread);
         DocumentStore store1 = mf.createDocumentStore(4);
-        DocumentStore store2 = mf.createDocumentStore(2);
+        FailingDocumentStore store2 = new 
FailingDocumentStore(mf.createDocumentStore(2));
         ns = builderProvider.newBuilder().setDocumentStore(store1)
                 // use lenient mode because tests use a virtual clock
                 
.setLeaseCheckMode(LeaseCheckMode.LENIENT).setClusterId(4).clock(clock)
@@ -267,17 +294,18 @@ public class DocumentNodeStoreSweepTest {
         ns2.runBackgroundOperations();
 
         final Semaphore successOn2 = new Semaphore(0);
+        final DocumentNodeStore finalNs2 = ns2;
         Runnable codeOn2 = new Runnable() {
 
             @Override
             public void run() {
                 try {
                     // now delete but intercept the _revisions update
-                    NodeBuilder builder = ns2.getRoot().builder();
+                    NodeBuilder builder = finalNs2.getRoot().builder();
                     assertTrue(builder.child("parent").child("foo").remove());
                     assertTrue(builder.child("parent").child("bar").remove());
                     breakInThread.set(Thread.currentThread());
-                    merge(ns2, builder);
+                    merge(finalNs2, builder);
                     fail("supposed to fail");
                 } catch (CommitFailedException e) {
                     // supposed to fail
@@ -311,9 +339,20 @@ public class DocumentNodeStoreSweepTest {
         // check at this point though, /parent/foo is still there:
         assertTrue(ns4.getRoot().getChildNode("parent").hasChildNode("foo"));
 
-        // release things and go ahead
-        breakpoint2.release();
-        assertTrue(successOn2.tryAcquire(5, TimeUnit.SECONDS));
+        if (crashInsteadOfContinue && !crashAfterRollback) {
+            store2.fail().after(0).eternally();
+            breakpoint2.release();
+            assertTrue(successOn2.tryAcquire(5, TimeUnit.SECONDS));
+        } else if (crashInsteadOfContinue) {
+            store2.fail().on(Collection.NODES).after(3).eternally();
+            breakpoint2.release();
+            assertTrue(successOn2.tryAcquire(5, TimeUnit.SECONDS));
+            store2.fail().after(0).eternally();
+        } else {
+            // release things and go ahead
+            breakpoint2.release();
+            assertTrue(successOn2.tryAcquire(5, TimeUnit.SECONDS));
+        }
 
         // some bg ops...
         ns4.runBackgroundOperations();
@@ -323,13 +362,34 @@ public class DocumentNodeStoreSweepTest {
         // at this point /parent/foo still exists on 4
         // (due to lastRev on 2 not yet being updated)
         assertTrue(ns4.getRoot().getChildNode("parent").hasChildNode("foo"));
+        if (crashInsteadOfContinue) {
+            try {
+                ns2.dispose();
+            } catch(DocumentStoreException dse) {
+                // ok
+            }
+            // start up a fresh ns2 for the below change, to update lastrev
+            for(int seconds=0; seconds < 1800; seconds+=20) {
+                clock.waitUntil(clock.getTime() + 
TimeUnit.SECONDS.toMillis(1));
+                ns4.runBackgroundOperations();
+            }
+            
assertTrue(ns4.getRoot().getChildNode("parent").hasChildNode("foo"));
+            store2 = new FailingDocumentStore(mf.createDocumentStore(2));
+            ns2 = builderProvider.newBuilder().setDocumentStore(store2)
+                    // use lenient mode because tests use a virtual clock
+                    
.setLeaseCheckMode(LeaseCheckMode.LENIENT).setClusterId(2).clock(clock)
+                    .setAsyncDelay(0).getNodeStore();
+            
assertTrue(ns4.getRoot().getChildNode("parent").hasChildNode("foo"));
+        }
         {
             // but with an update on /parent/bar on 2, _lastRev updates,
             // hence /parent/foo's rolled-back change on 4 now becomes visible
             NodeBuilder b2 = ns2.getRoot().builder();
             b2.child("parent").child("bar").setProperty("z", "v");
             merge(ns2, b2);
+            
assertTrue(ns4.getRoot().getChildNode("parent").hasChildNode("foo"));
             ns2.runBackgroundOperations();
+            
assertTrue(ns4.getRoot().getChildNode("parent").hasChildNode("foo"));
         }
         ns4.runBackgroundOperations();
 

Reply via email to