This is an automated email from the ASF dual-hosted git repository. stefanegli pushed a commit to branch OAK-10595-2 in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
commit 415a29f7d354a8fad178df48d1e9d5c38aded8a3 Author: Stefan Egli <[email protected]> AuthorDate: Wed Jan 24 13:15:24 2024 +0100 OAK-10595 : explicit invalidation journal entry after collsion rollback --- .../oak/plugins/document/DocumentNodeStore.java | 61 +++++++++++++++-- .../oak/plugins/document/JournalEntry.java | 25 ++++++- .../document/DocumentNodeStoreSweepTest.java | 78 +++++++++++++++++++--- 3 files changed, 150 insertions(+), 14 deletions(-) diff --git a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java index 9c4cee90d0..6f1941ca2d 100644 --- a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java +++ b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java @@ -556,6 +556,15 @@ public final class DocumentNodeStore */ private final Set<Revision> inDoubtTrunkCommits = Sets.newConcurrentHashSet(); + /** + * Contains journal entry revisions (branch commit style) that were created + * as a result of a rollback and are meant to trigger an invalidation in + * peer cluster nodes. This list is typically empty or small. It is empties + * upon each backgroundWrite. It is used to avoid duplicate journal entries + * that would otherwise be created as a result of merge (normal plus exclusive) retries + */ + private final Set<String> pendingRollbackInvalidations = Sets.newConcurrentHashSet(); + private final Predicate<Path> nodeCachePredicate; private final Feature prefetchFeature; @@ -1135,6 +1144,7 @@ public final class DocumentNodeStore } void canceled(Commit c) { + invalidatePathsOnCancel(c); if (commitQueue.contains(c.getRevision())) { try { commitQueue.canceled(c.getRevision()); @@ -1160,6 +1170,43 @@ public final class DocumentNodeStore } } + void invalidatePathsOnCancel(Commit c) { + Iterable<Path> pathsToInvalidate = c.getModifiedPaths(); + if (!pathsToInvalidate.iterator().hasNext()) { + // nothing to do + return; + } + if (changes != null) { + // first check if a new journal entry is needed at all + for (String pri : pendingRollbackInvalidations) { + JournalEntry je = store.find(JOURNAL, pri); + if (je == null) { + // quite unexpected + continue; + } + if (je.containsModified(pathsToInvalidate)) { + return; + } + } + } + try { + // create the invalidation journal entry (branch commit style) + invalidatePaths0(pathsToInvalidate, "rollback invalidation", ""); + // but also, already push it now, not waiting for backgroundWrite + // reason being that otherwise it might get lost on a crash, + // in which case the peers still would have uncommitted revisions + // in their nodesCache which might become "committed" on a + // subsequent fresh parent lastRev change of a future + // incarnation of this clusterId. + // thanks to pendingRollbackInvalidations however, this expensive + // journal entry push is only done once per collision (within + // a backgroundWrite cycle) + pushJournalEntry(Revision.newRevision(clusterId)); + } catch(DocumentStoreException e) { + LOG.error("invalidatePathsOnCancel: " + e.getMessage()); + } + } + public void setAsyncDelay(int delay) { this.asyncDelay = delay; } @@ -2640,6 +2687,12 @@ public final class DocumentNodeStore // nothing to do return; } + invalidatePaths0(pathsToInvalidate, "split", + " Will be retried with next background split operation."); + } + + private void invalidatePaths0(@NotNull Iterable<Path> pathsToInvalidate, String type, + String errorSuffixMsg) { // create journal entry for cache invalidation JournalEntry entry = JOURNAL.newDocument(getDocumentStore()); entry.modified(pathsToInvalidate); @@ -2647,13 +2700,12 @@ public final class DocumentNodeStore UpdateOp journalOp = entry.asUpdateOp(r); if (store.create(JOURNAL, singletonList(journalOp))) { changes.invalidate(singletonList(r)); - LOG.debug("Journal entry {} created for split of document(s) {}", - journalOp.getId(), pathsToInvalidate); + LOG.debug("Journal entry {} created for {} of document(s) {}", + journalOp.getId(), type, pathsToInvalidate); } else { String msg = "Unable to create journal entry " + journalOp.getId() + " for document invalidation. " + - "Will be retried with next background split " + - "operation."; + errorSuffixMsg; throw new DocumentStoreException(msg); } } @@ -2710,6 +2762,7 @@ public final class DocumentNodeStore }, new UnsavedModifications.Snapshot() { @Override public void acquiring(Revision mostRecent) { + pendingRollbackInvalidations.clear(); pushJournalEntry(mostRecent); } }, backgroundOperationLock.writeLock()); diff --git a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java index e656cfcc4e..e5f584f030 100644 --- a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java +++ b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java @@ -71,7 +71,7 @@ public final class JournalEntry extends Document { static final String BRANCH_COMMITS = "_bc"; - private static final String INVALIDATE_ONLY = "_inv"; + static final String INVALIDATE_ONLY = "_inv"; public static final String MODIFIED = "_modified"; @@ -368,6 +368,29 @@ public final class JournalEntry extends Document { return Long.parseLong(parts[1], 16); } + boolean containsModified(Iterable<Path> paths) { + for (Path p : paths) { + if (!containsModified(p)) { + return false; + } + } + return true; + } + + boolean containsModified(Path path) { + if (path.isRoot()) { + return get(CHANGES) != null; + } + TreeNode node = getChanges(); + for (String name : path.elements()) { + if (node.get(name) == null) { + return false; + } + node = node.get(name); + } + return true; + } + void modified(Path path) { TreeNode node = getChanges(); for (String name : path.elements()) { diff --git a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreSweepTest.java b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreSweepTest.java index c9bf8bc284..5139e9e998 100644 --- a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreSweepTest.java +++ b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreSweepTest.java @@ -42,7 +42,6 @@ import org.jetbrains.annotations.NotNull; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; @@ -209,8 +208,33 @@ public class DocumentNodeStoreSweepTest { * </ul> */ @Test - @Ignore(value = "OAK-10595") - public void cachingUncommittedBeforeCollisionRollback() throws Exception { + public void cachingUncommittedBeforeCollisionRollback_withPause() throws Exception { + doCachingUncommittedBeforeCollisionRollback(false, false); + } + + /** + * Same as {@link #cachingUncommittedBeforeCollisionRollback_withPause()}, + * except with a crash instead of a pause (of clusterId 2). + * This variant should test whether recovery correctly causes cache + * invalidation on clusterId 4 as well. + */ + @Test + public void cachingUncommittedBeforeCollisionRollback_crashBeforeRollback() throws Exception { + doCachingUncommittedBeforeCollisionRollback(true, false); + } + + /** + * Same as {@link #cachingUncommittedBeforeCollisionRollback_crashBeforeRollback()}, + * except it crashes later, after the rollback was applied. + */ + @Test + public void cachingUncommittedBeforeCollisionRollback_crashAfterRollback() throws Exception { + doCachingUncommittedBeforeCollisionRollback(true, true); + } + + public void doCachingUncommittedBeforeCollisionRollback( + final boolean crashInsteadOfContinue, + final boolean crashAfterRollback) throws Exception { // two nodes part of the game: // 2 : the main one that starts to do a subtree deletion // 4 : a peer one that gets in between the above and causes a collision. @@ -234,6 +258,9 @@ public class DocumentNodeStoreSweepTest { breakpoint1.release(); try { breakpoint2.tryAcquire(1, 30, TimeUnit.MINUTES); + if (crashInsteadOfContinue && !crashAfterRollback) { + throw new RuntimeException("crashInsteadOfContinue"); + } } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -245,7 +272,7 @@ public class DocumentNodeStoreSweepTest { mf = pausableMongoDocumentStore("2:/parent/foo", breakOnceInThread); DocumentStore store1 = mf.createDocumentStore(4); - DocumentStore store2 = mf.createDocumentStore(2); + FailingDocumentStore store2 = new FailingDocumentStore(mf.createDocumentStore(2)); ns = builderProvider.newBuilder().setDocumentStore(store1) // use lenient mode because tests use a virtual clock .setLeaseCheckMode(LeaseCheckMode.LENIENT).setClusterId(4).clock(clock) @@ -267,17 +294,18 @@ public class DocumentNodeStoreSweepTest { ns2.runBackgroundOperations(); final Semaphore successOn2 = new Semaphore(0); + final DocumentNodeStore finalNs2 = ns2; Runnable codeOn2 = new Runnable() { @Override public void run() { try { // now delete but intercept the _revisions update - NodeBuilder builder = ns2.getRoot().builder(); + NodeBuilder builder = finalNs2.getRoot().builder(); assertTrue(builder.child("parent").child("foo").remove()); assertTrue(builder.child("parent").child("bar").remove()); breakInThread.set(Thread.currentThread()); - merge(ns2, builder); + merge(finalNs2, builder); fail("supposed to fail"); } catch (CommitFailedException e) { // supposed to fail @@ -311,9 +339,20 @@ public class DocumentNodeStoreSweepTest { // check at this point though, /parent/foo is still there: assertTrue(ns4.getRoot().getChildNode("parent").hasChildNode("foo")); - // release things and go ahead - breakpoint2.release(); - assertTrue(successOn2.tryAcquire(5, TimeUnit.SECONDS)); + if (crashInsteadOfContinue && !crashAfterRollback) { + store2.fail().after(0).eternally(); + breakpoint2.release(); + assertTrue(successOn2.tryAcquire(5, TimeUnit.SECONDS)); + } else if (crashInsteadOfContinue) { + store2.fail().on(Collection.NODES).after(3).eternally(); + breakpoint2.release(); + assertTrue(successOn2.tryAcquire(5, TimeUnit.SECONDS)); + store2.fail().after(0).eternally(); + } else { + // release things and go ahead + breakpoint2.release(); + assertTrue(successOn2.tryAcquire(5, TimeUnit.SECONDS)); + } // some bg ops... ns4.runBackgroundOperations(); @@ -323,13 +362,34 @@ public class DocumentNodeStoreSweepTest { // at this point /parent/foo still exists on 4 // (due to lastRev on 2 not yet being updated) assertTrue(ns4.getRoot().getChildNode("parent").hasChildNode("foo")); + if (crashInsteadOfContinue) { + try { + ns2.dispose(); + } catch(DocumentStoreException dse) { + // ok + } + // start up a fresh ns2 for the below change, to update lastrev + for(int seconds=0; seconds < 1800; seconds+=20) { + clock.waitUntil(clock.getTime() + TimeUnit.SECONDS.toMillis(1)); + ns4.runBackgroundOperations(); + } + assertTrue(ns4.getRoot().getChildNode("parent").hasChildNode("foo")); + store2 = new FailingDocumentStore(mf.createDocumentStore(2)); + ns2 = builderProvider.newBuilder().setDocumentStore(store2) + // use lenient mode because tests use a virtual clock + .setLeaseCheckMode(LeaseCheckMode.LENIENT).setClusterId(2).clock(clock) + .setAsyncDelay(0).getNodeStore(); + assertTrue(ns4.getRoot().getChildNode("parent").hasChildNode("foo")); + } { // but with an update on /parent/bar on 2, _lastRev updates, // hence /parent/foo's rolled-back change on 4 now becomes visible NodeBuilder b2 = ns2.getRoot().builder(); b2.child("parent").child("bar").setProperty("z", "v"); merge(ns2, b2); + assertTrue(ns4.getRoot().getChildNode("parent").hasChildNode("foo")); ns2.runBackgroundOperations(); + assertTrue(ns4.getRoot().getChildNode("parent").hasChildNode("foo")); } ns4.runBackgroundOperations();
