This is an automated email from the ASF dual-hosted git repository.
stefanegli pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2128504a2f OAK-10254 : added a test to reproduce late-write
inconsistencies (#950)
2128504a2f is described below
commit 2128504a2f05e9fed1c596dcb99f146ac7ad78b5
Author: stefan-egli <[email protected]>
AuthorDate: Tue May 23 18:13:40 2023 +0200
OAK-10254 : added a test to reproduce late-write inconsistencies (#950)
* OAK-10254 : added a test to reproduce late-write inconsistencies
* OAK-10254 : fix non-existing getDocument
* OAK-10254 : replace IllegalStateException with
UnsupportedOperationException
* OAK-10254 : inline a parameter
* OAK-10254 : mark a method unused
---
.../plugins/document/UnrecoveredRevisionTest.java | 816 +++++++++++++++++++++
1 file changed, 816 insertions(+)
diff --git
a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/UnrecoveredRevisionTest.java
b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/UnrecoveredRevisionTest.java
new file mode 100644
index 0000000000..9e38aa0888
--- /dev/null
+++
b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/UnrecoveredRevisionTest.java
@@ -0,0 +1,816 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.document;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
+import
org.apache.jackrabbit.oak.plugins.document.util.LeaseCheckDocumentStoreWrapper;
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import junitx.util.PrivateAccessor;
+
+/**
+ * Tests unrecovered revisions on a document
+ */
+public class UnrecoveredRevisionTest {
+
+ interface BreakpointCallback {
+
+ void breakpointCallback(String contextMsg);
+
+ }
+
+ class ControllableMemoryDocumentStore extends MemoryDocumentStore {
+
+ private final AtomicInteger remainingOps;
+ private BreakpointCallback callback;
+ private Set<Collection<?>> restrictedCollections = new HashSet<>();
+ private List<UpdateOp> writtenUpdateOps = new LinkedList<>();
+
+ ControllableMemoryDocumentStore(int remainingOps, BreakpointCallback
callback) {
+ this.remainingOps = new AtomicInteger(remainingOps);
+ this.callback = callback;
+ // restrict only NODES by default
+ this.restrictedCollections.add(Collection.NODES);
+ }
+
+ public void clearOps() {
+ writtenUpdateOps.clear();
+ }
+
+ public void printOps() {
+ for (UpdateOp updateOp : writtenUpdateOps) {
+ System.out.println("written updateOp: " + updateOp.toString());
+ }
+ }
+
+ private void registerWrittenUpdateOp(Collection<?> collection,
UpdateOp updateOp) {
+ if (restrictedCollections.contains(collection)) {
+ writtenUpdateOps.add(updateOp);
+ }
+ }
+
+ private void registerWrittenUpdateOp(Collection<?> collection,
List<UpdateOp> trimmedOps) {
+ if (restrictedCollections.contains(collection)) {
+ writtenUpdateOps.addAll(trimmedOps);
+ }
+ }
+
+ private void setBreakpointCallback(BreakpointCallback callback) {
+ this.callback = callback;
+ }
+
+ @SuppressWarnings("unused")
+ private void setRestrictedCollections(Collection<?>... collections) {
+ restrictedCollections.clear();
+ restrictedCollections.addAll(Arrays.asList(collections));
+ }
+
+ private void setRestrictedCollections(Set<Collection<?>> collections) {
+ restrictedCollections.clear();
+ restrictedCollections.addAll(collections);
+ }
+
+ public Set<Collection<?>> getRestrictedCollections() {
+ return new HashSet<>(restrictedCollections);
+ }
+
+ public AtomicInteger getRemainingOps() {
+ return remainingOps;
+ }
+
+ private void breakpoint(String msg) {
+ callback.breakpointCallback(msg);
+ }
+
+ private void allowOp(Collection<?> collection) {
+ if (!restrictedCollections.isEmpty() &&
!restrictedCollections.contains(collection)) {
+ return;
+ }
+ int remaining = remainingOps.decrementAndGet();
+ if (remaining < 0) {
+ breakpoint("remaining is " + remaining);
+ }
+ }
+
+ private int allowedOps(Collection<?> collection, int desiredOps) {
+ if (!restrictedCollections.isEmpty() &&
!restrictedCollections.contains(collection)) {
+ return desiredOps;
+ }
+ int current = remainingOps.get();
+ int remaining = current - desiredOps;
+ if (remaining >= 0) {
+ remainingOps.compareAndSet(current, remaining);
+ return desiredOps;
+ } else {
+ remainingOps.compareAndSet(current, 0);
+ return current;
+ }
+ }
+
+ @Override
+ public <T extends Document> boolean create(Collection<T> collection,
+ List<UpdateOp> updateOps) {
+ Lock lock = rwLock().writeLock();
+ lock.lock();
+ try {
+ ConcurrentSkipListMap<String, T> map = getMap(collection);
+ for (UpdateOp op : updateOps) {
+ if (map.containsKey(op.getId())) {
+ return false;
+ }
+ }
+
+ final List<UpdateOp> trimmedOps = new LinkedList<>();
+ final List<UpdateOp> lateOps = new LinkedList<>();
+ trimOps(collection, updateOps, trimmedOps, lateOps);
+ boolean result = super.create(collection, trimmedOps);
+ registerWrittenUpdateOp(collection, trimmedOps);
+ if (trimmedOps.size() != updateOps.size()) {
+ breakpoint("wanted " + updateOps.size() + " ops, allowed
only " + trimmedOps.size());
+ super.create(collection, lateOps);
+ }
+ return result;
+
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private <T extends Document> void trimOps(Collection<T> collection,
+ List<UpdateOp> updateOps, List<UpdateOp> immediateOps,
List<UpdateOp> lateOps) {
+ int max = allowedOps(collection, updateOps.size());
+ ArrayList<UpdateOp> al = new ArrayList<>(updateOps);
+ immediateOps.addAll(al.subList(0, max));
+ lateOps.addAll(al.subList(max, updateOps.size()));
+ }
+
+ @SuppressWarnings("unused")
+ private <T extends Document> List<String> trimString(Collection<T>
collection, List<String> updateOps) {
+ int max = allowedOps(collection, updateOps.size());
+ ArrayList<String> al = new ArrayList<>(updateOps);
+ List<String> trimmedUpdateOps = al.subList(0, max);
+ return trimmedUpdateOps;
+ }
+
+ private ReadWriteLock rwLock() {
+ try {
+ return (ReadWriteLock) PrivateAccessor.getField(this,
"rwLock");
+ } catch (NoSuchFieldException e) {
+ fail(e.getMessage());
+ // never reached:
+ return null;
+ }
+ }
+
+ @Override
+ public <T extends Document> @Nullable T createOrUpdate(Collection<T>
collection,
+ UpdateOp update) {
+ allowOp(collection);
+ registerWrittenUpdateOp(collection, update);
+ return super.createOrUpdate(collection, update);
+ }
+
+ private <T extends Document> List<T> superCreateOrUpdate(Collection<T>
collection, List<UpdateOp> updateOps) {
+ List<T> result = new ArrayList<T>(updateOps.size());
+ for (UpdateOp update : updateOps) {
+ result.add(super.createOrUpdate(collection, update));
+ }
+ return result;
+ }
+
+ @Override
+ public <T extends Document> List<T> createOrUpdate(Collection<T>
collection,
+ List<UpdateOp> updateOps) {
+ final List<UpdateOp> trimmedOps = new LinkedList<>();
+ final List<UpdateOp> lateOps = new LinkedList<>();
+ trimOps(collection, updateOps, trimmedOps, lateOps);
+ List<T> result = superCreateOrUpdate(collection, trimmedOps);
+ registerWrittenUpdateOp(collection, trimmedOps);
+ if (trimmedOps.size() != updateOps.size()) {
+ breakpoint("wanted " + updateOps.size() + " ops, allowed only
" + trimmedOps.size());
+ superCreateOrUpdate(collection, lateOps);
+ }
+ return result;
+ }
+
+ @Override
+ public <T extends Document> T findAndUpdate(Collection<T> collection,
+ UpdateOp update) {
+ allowOp(collection);
+ registerWrittenUpdateOp(collection, update);
+ return super.findAndUpdate(collection, update);
+ }
+
+ @Override
+ public <T extends Document> void remove(Collection<T> collection,
+ List<String> keys) {
+ throw new UnsupportedOperationException("not yet implemented");
+// super.remove(collection, trimString(collection, keys));
+ }
+
+ @Override
+ public <T extends Document> int remove(Collection<T> collection,
+ Map<String, Long> toRemove) {
+ throw new UnsupportedOperationException("not yet implemented");
+ }
+
+ @Override
+ public <T extends Document> void remove(Collection<T> collection,
String key) {
+ allowOp(collection);
+ super.remove(collection, key);
+ }
+
+ @Override
+ public <T extends Document> int remove(Collection<T> collection,
+ String indexedProperty, long startValue, long endValue)
+ throws DocumentStoreException {
+ throw new UnsupportedOperationException("not yet implemented");
+ }
+ }
+
+ @Rule
+ public DocumentMKBuilderProvider builderProvider = new
DocumentMKBuilderProvider();
+
+ private Clock virtualClock;
+ private DocumentNodeStore ns1;
+ private DocumentNodeStore ns2;
+
+ private ControllableMemoryDocumentStore store;
+
+ @Before
+ public void setup() throws Exception {
+ setup(1);
+ }
+
+ private void setup(int createOrUpdateBatchSize) throws
InterruptedException {
+ System.setProperty("oak.documentMK.createOrUpdateBatchSize",
String.valueOf(createOrUpdateBatchSize));
+ virtualClock = new Clock.Virtual();
+ virtualClock.waitUntil(System.currentTimeMillis());
+ ClusterNodeInfo.setClock(virtualClock);
+ store = new ControllableMemoryDocumentStore(Integer.MAX_VALUE, new
BreakpointCallback() {
+
+ @Override
+ public void breakpointCallback(String contextMsg) {
+ throw new Error("Crashing Now : " + contextMsg);
+ }
+ });
+ ns1 = createDNS(store, 1);
+ ns2 = createDNS(store, 2);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ store.getRemainingOps().set(Integer.MAX_VALUE);
+ System.clearProperty("oak.documentMK.createOrUpdateBatchSize");
+ }
+
+ private void assertChildren(DocumentNodeStore ns, String... childPaths) {
+ final DocumentNodeState root = ns.getRoot();
+ for (String aChildPath : childPaths) {
+ boolean inverted = false;
+ if (aChildPath.startsWith("!")) {
+ aChildPath = aChildPath.substring(1);
+ inverted = true;
+ }
+ Path p = Path.fromString(aChildPath);
+ NodeState node = root;
+ for (String name : p.elements()) {
+ node = node.getChildNode(name);
+ }
+ assertEquals(aChildPath + " exists :" + node.exists(), !inverted,
node.exists());
+ }
+ }
+
+ private void crashSafely(DocumentNodeStore ns) throws NoSuchFieldException
{
+ crashSafely(ns, true);
+ }
+
+ private void crashSafely(DocumentNodeStore ns, boolean disposeClusterInfo)
throws NoSuchFieldException {
+ if (disposeClusterInfo) {
+ ns.getClusterInfo().dispose();
+ }
+ try {
+ ns.dispose();
+ fail("should have errored");
+ } catch(Error e) {
+ // expected
+ }
+ AtomicBoolean stopLeaseUpdateThread = (AtomicBoolean)
PrivateAccessor.getField(ns, "stopLeaseUpdateThread");
+ stopLeaseUpdateThread.set(true);
+ synchronized (stopLeaseUpdateThread) {
+ stopLeaseUpdateThread.notifyAll();
+ }
+ // trigger one last lease update in exactly this moment - otherwise
it'll do it in an unpredictable future moment
+ // but if no lease update was necessary, never mind
+ ns.renewClusterIdLease();
+ }
+
+ /**
+ * Illustrates creating an orphaned node (that can then cause the usual
OakMerge0004)
+ * Steps:
+ * 1. create a subtree partially, eg create /a, /a/b, /a/b/c
+ * 2. then crash
+ * 3. then lastRevRecovery
+ * 4. then continue on the partial subtree adding further children, eg
/a/b/c/d, /a/b/c/d/e
+ * 5. then do all sorts of asserts on the resulting status
+ */
+ @Test
+ public void testOrphanedNodes() throws Exception {
+ System.setProperty("oak.documentMK.createOrUpdateBatchSize", "42");
+ ns2.dispose();
+ final DocumentNodeStore ns3 = createDNS(store, 3);
+ store.setRestrictedCollections(new HashSet<>());
+ final Semaphore breakpointReached = new Semaphore(0);
+ final Semaphore breakpointContinue = new Semaphore(0);
+ final Semaphore successSignal = new Semaphore(0);
+ Runnable r = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ createLeaves(ns3, "/a/b/c", "/a/b/c/d/e",
"/a/b/c/d/e/f/g");
+ fail("should fail with a lease update failure");
+ } catch(CommitFailedException ce) {
+ // it should actually fail due to sending the commitRoot
update
+ successSignal.release(1);
+ }
+ }
+ };
+ final Thread lateWriterThread = new Thread(r);
+ store.setBreakpointCallback(new BreakpointCallback() {
+
+ @Override
+ public void breakpointCallback(String contextMsg) {
+ if (Thread.currentThread() != lateWriterThread) {
+ // only block Thread lateWriterThread
+ // other threads treat with an Error instead
+ throw new Error("crashing with msg : " + contextMsg);
+ }
+ breakpointReached.release(1);
+ try {
+ while (!breakpointContinue.tryAcquire(1, 1000,
TimeUnit.MILLISECONDS)) {
+ System.out.println("DocumentStore breakpoint still
blocked");
+ }
+ System.out.println("done");
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ assertTrue(ns3.getClusterInfo().toString().contains("state: ACTIVE"));
+ advanceClockOneSecond();
+ store.getRemainingOps().set(3);
+ assertTrue(ns3.getClusterInfo().toString().contains("state: ACTIVE"));
+ lateWriterThread.setDaemon(true);
+ lateWriterThread.start();
+ assertTrue("worker thread failed to reach breakpointReached in time",
+ breakpointReached.tryAcquire(1, 5000, TimeUnit.MILLISECONDS));
+ assertTrue(ns3.getClusterInfo().toString().contains("state: ACTIVE"));
+ advanceClock(11000);
+// crashSafely(ns2, false);
+ assertTrue(ns3.getClusterInfo().toString().contains("state: ACTIVE"));
+ store.getRemainingOps().set(Integer.MAX_VALUE);
+ advanceClock(85000);
+ ns1.runBackgroundOperations();
+ ns1.renewClusterIdLease();
+ advanceClock(65000);
+ ns1.runBackgroundOperations();
+ ns1.renewClusterIdLease();
+
+ try {
+ ns1.getLastRevRecoveryAgent().performRecoveryIfNeeded();
+ } catch(Throwable th) {
+ th.printStackTrace();
+ fail("errr : " + th);
+ }
+ LeaseCheckDocumentStoreWrapper s = (LeaseCheckDocumentStoreWrapper)
ns1.getDocumentStore();
+ System.out.println(PrivateAccessor.getField(s, "delegate"));
+
+ // now release the original thread from the lateWriterThread
+ breakpointContinue.release(1);
+
+ assertTrue("worker thread did not finish", successSignal.tryAcquire(1,
5000, TimeUnit.MILLISECONDS));
+
+ advanceClockOneSecond();
+ ns1.runBackgroundOperations();
+
+ // now do those asserts, since now the data should be inconsistent
+ {
+ DocumentNodeState root = ns1.getRoot();
+ assertFalse(root.hasChildNode("a"));
+ }
+ createLeaves(ns1, "/a");
+ {
+ DocumentNodeState root = ns1.getRoot();
+ assertFalse(root.getChildNode("a").hasChildNode("b"));
+ }
+ createLeaves(ns1, "/a/b");
+ {
+ DocumentNodeState root = ns1.getRoot();
+
assertFalse(root.getChildNode("a").getChildNode("b").hasChildNode("c"));
+ }
+ createLeaves(ns1, "/a/b/c");
+ {
+ DocumentNodeState root = ns1.getRoot();
+
assertFalse(root.getChildNode("a").getChildNode("b").getChildNode("c").hasChildNode("d"));
+
assertFalse(root.getChildNode("a").getChildNode("b").getChildNode("c").getChildNode("d").hasChildNode("e"));
+ }
+ try {
+// createLeaves(ns1, "/a/b/c/d");
+ createLeaves(ns1, "/a/b/c/d/e");
+ fail("should have failed");
+ } catch(CommitFailedException cfe) {
+ assertTrue(cfe.toString().contains("OakMerge0004"));
+ }
+// ns1.getMBean().recover("/a/b/c/d", 3);
+// createLeaves(ns1, "/a/b/c/d/e");
+ }
+
+ private void advanceClock(long i) throws InterruptedException {
+ virtualClock.waitUntil(virtualClock.getTime() + i);
+ }
+
+ private class LeavesBuilder {
+
+ private List<List<String>> transactions = new LinkedList<>();
+
+ LeavesBuilder() {
+
+ }
+
+ LeavesBuilder add(String... leaves) {
+ transactions.add(Arrays.asList(leaves));
+ return this;
+ }
+
+ public void assertLeavesExist(DocumentNodeStore ns1) {
+ List<String> allLeaves = new LinkedList<>();
+ for (List<String> leaves : transactions) {
+ allLeaves.addAll(leaves);
+ }
+ assertChildren(ns1, allLeaves.toArray(new String[0]));
+ }
+
+ }
+
+ private void createContent(LeavesBuilder cb, DocumentNodeStore writingDns,
DocumentNodeStore... dnss) throws CommitFailedException, InterruptedException {
+ List<String> allLeaves = new LinkedList<>();
+ for (List<String> leaves : cb.transactions) {
+ createLeaves(writingDns, leaves.toArray(new String[0]));
+ advanceClockOneSecond();
+ allLeaves.addAll(leaves);
+ assertChildren(writingDns, allLeaves.toArray(new String[0]));
+ }
+ advanceClockOneSecond();
+ assertChildren(writingDns, allLeaves.toArray(new String[0]));
+ advanceClockOneSecond();
+ // now sync within cluster a couple times
+ List<DocumentNodeStore> allDnss = new
LinkedList<>(Arrays.asList(dnss));
+ assertFalse(allDnss.contains(writingDns));
+ allDnss.add(writingDns);
+ runBackgroundOps(3, allDnss.toArray(new DocumentNodeStore[0]));
+ advanceClockOneSecond();
+ }
+
+ interface ChangeBuilder {
+ void modify(NodeBuilder nb);
+ }
+
+ void produceLateWriteSituation(int allowedUpdateOps, DocumentNodeStore
dns, ChangeBuilder cb) throws NoSuchFieldException {
+ store.getRemainingOps().set(allowedUpdateOps);
+ store.clearOps();
+ store.printOps();
+ try {
+ @NotNull
+ NodeBuilder rb = dns.getRoot().builder();
+ cb.modify(rb);
+ dns.merge(rb, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ fail("should have failed");
+ } catch(CommitFailedException ce) {
+ // expected
+ }
+ crashSafely(dns);
+ store.getRemainingOps().set(Integer.MAX_VALUE);
+ System.out.println("These UpdateOps were still sent");
+ store.printOps();
+ System.out.println("These UpdateOps were still sent : DONE");
+ }
+
+ private void removeNodes(NodeBuilder rb, String... paths) {
+ for (String aPath : paths) {
+ Path p = Path.fromString(aPath);
+ NodeBuilder b = rb;
+ for (String name : p.elements()) {
+ b = b.child(name);
+ }
+ b.remove();
+ }
+ }
+
+ private void doTestAddNode(int createOrUpdateBatchSize, String
targetTestNode)
+ throws InterruptedException, CommitFailedException,
NoSuchFieldException {
+ setup(createOrUpdateBatchSize);
+
+ // test setup, using ns1
+ LeavesBuilder leaves =
+ new LeavesBuilder().add("/foo").add("/foo/a");
+ createContent(leaves, ns1, ns2);
+
+ // now produce JVM-pause/lateWrite/crash situation, using ns2
+ produceLateWriteSituation(2, ns2, (rb) -> createLeaves(rb,
"/foo/a/b/c/d", "/foo/e/f/g/h"));
+
+ // ensure the above didn't manage to add nodes
+ advanceClockOneSecond();
+ leaves.assertLeavesExist(ns1);
+
+ // now cause a conflict due to the late write
+ try {
+ setProperty(2, targetTestNode, "p2", "v2");
+ fail("should fail");
+ } catch(CommitFailedException cfe) {
+ assertTrue(cfe.toString().contains("OakMerge0004"));
+ }
+ }
+
+ /**
+ * Illustrates a late-write add-node can later on cause an OakMerge0004
conflict
+ */
+ @Test
+ public void testAddNode_variantA() throws Exception {
+ doTestAddNode(1, "/foo/a/b/c/d");
+ }
+
+ /**
+ * Illustrates a late-write add-node can later on cause an OakMerge0004
conflict
+ */
+ @Test
+ public void testAddNode_variantB() throws Exception {
+ doTestAddNode(1, "/foo/a/b");
+ }
+
+ private void doTestRemoveNode(int createOrUpdateBatchSize, String
targetTestNode) throws Exception {
+ setup(createOrUpdateBatchSize);
+
+ // test setup, using ns1
+ LeavesBuilder leaves =
+ new LeavesBuilder().add("/foo").add("/foo/a/b/c/d",
"/foo/e/f/g/h", "/foo/e/f/i/j");
+ createContent(leaves, ns1, ns2);
+
+ // now produce JVM-pause/lateWrite/crash situation, using ns2
+ produceLateWriteSituation(2, ns2, (rb) -> removeNodes(rb,
"/foo/a/b/c/d", "/foo/e/f/g/h"));
+
+ // ensure the above didn't manage to delete properly
+ advanceClockOneSecond();
+ leaves.assertLeavesExist(ns1);
+
+ // now cause a conflict due to the late write
+ try {
+ setProperty(2, targetTestNode, "p2", "v2");
+ fail("should fail");
+ } catch(CommitFailedException cfe) {
+ assertTrue(cfe.toString().contains("OakMerge0004"));
+ }
+ }
+
+ /**
+ * Illustrates a late-write remove-node can later on cause an OakMerge0004
conflict
+ */
+ @Test
+ public void testRemoveNode_variantA() throws Exception {
+ doTestRemoveNode(1, "/foo/a/b/c/d");
+ }
+
+ /**
+ * Illustrates a late-write remove-node can later on cause an OakMerge0004
conflict
+ */
+ @Test
+ public void testRemoveNode_variantB() throws Exception {
+ doTestRemoveNode(1, "/foo/e/f/g/h");
+ }
+
+ /**
+ * Illustrates a late-write remove-node can later on cause an OakMerge0004
conflict
+ */
+ @Test
+ public void testRemoveNode_variantC() throws Exception {
+ doTestRemoveNode(10, "/foo/e/f/g/h");
+ }
+
+ private void runBackgroundOps(int times, DocumentNodeStore... dnss) {
+ for( int i = 0; i < times; i++) {
+ for (DocumentNodeStore dns : dnss) {
+ dns.runBackgroundOperations();
+ }
+ }
+ }
+
+ private void advanceClock(long duration, TimeUnit unit) throws
InterruptedException {
+ advanceClock(unit.toMillis(duration));
+ }
+
+ private void advanceClockOneSecond() throws InterruptedException {
+ advanceClock(1, TimeUnit.SECONDS);
+ }
+
+ @SuppressWarnings("unused")
+ private void printNodes(DocumentNodeStore ns, String... paths) {
+ for (String aPath : paths) {
+ Path p = Path.fromString(aPath);
+ NodeState n = ns.getRoot();
+ for (String name : p.elements()) {
+ n = n.getChildNode(name);
+ }
+ System.out.println(aPath + " :");
+ if (!n.exists()) {
+ System.out.println("DOES NOT EXIST");
+ } else {
+ DocumentNodeState dns = (DocumentNodeState) n;
+ Path path = dns.getPath();
+ NodeDocument doc =
ns.getDocumentStore().find(Collection.NODES, Utils.getIdFromPath(path));
+ System.out.println(doc.format());
+ }
+ System.out.println();
+ }
+ }
+
+ /**
+ * Illustrates a late-write change-property does *NOT* cause a later
conflict
+ */
+ @Test
+ public void changeUncommittedProperty() throws Exception {
+ assertFalse(ns1.getRoot().hasChildNode("a"));
+
+ @NotNull
+ NodeBuilder rb = ns1.getRoot().builder();
+ rb.child("a");
+ ns1.merge(rb, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ assertTrue(ns1.getRoot().hasChildNode("a"));
+
+ createLeaves(ns1, "/foo/bar", "/foo/baz");
+ setStringProperty(ns1, "/foo/bar", "a", "1");
+ assertEquals("1", getStringProperty(ns1, "/foo/bar", "a"));
+ assertEquals("true", getStringProperty(ns1, "/foo/bar", "leaf"));
+ assertNull(getStringProperty(ns1, "/foo/bar", "crashleaf"));
+ ns1.runBackgroundOperations();
+ createChangeAfterRecovery(3, 2, "/foo/bar", "/foo/baz");
+ store.getRemainingOps().set(Integer.MAX_VALUE);
+ assertNull(getStringProperty(ns1, "/foo/bar", "crashleaf"));
+ setStringProperty(ns1, "/foo/bar", "crashleaf", "false");
+ }
+
+ private String getStringProperty(DocumentNodeStore ns, String path, String
key) {
+ DocumentNodeState r = ns.getRoot();
+ Path p = Path.fromString(path);
+ NodeState s = r;
+ for (String name : p.elements()) {
+ s = s.getChildNode(name);
+ }
+ return s.getString(key);
+ }
+
+ private void setStringProperty(DocumentNodeStore ns, String path, String
key,
+ String value) throws CommitFailedException {
+ NodeBuilder rb = ns.getRoot().builder();
+ Path p = Path.fromString(path);
+ NodeBuilder b = rb;
+ for (String name : p.elements()) {
+ b = b.child(name);
+ }
+ b.setProperty(key, value);
+ ns.merge(rb, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ }
+
+ private void createLeaves(DocumentNodeStore ns, String... paths) throws
CommitFailedException {
+ @NotNull
+ NodeBuilder rb = ns.getRoot().builder();
+ createLeaves(rb, paths);
+
+ ns.merge(rb, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ }
+
+ private void createLeaves(NodeBuilder rb, String... paths) {
+ for (String aPath : paths) {
+ Path p = Path.fromString(aPath);
+ NodeBuilder b = rb;
+ for (String name : p.elements()) {
+ b = b.child(name);
+ }
+ b.setProperty("leaf", "true");
+ }
+ }
+
+ private void setProperty(int clusterId, String path, String key, String
value) throws CommitFailedException {
+ DocumentNodeStore ns = createDNS(store, clusterId);
+ ns.runBackgroundOperations();
+
+ setProperty(ns, path, key, value);
+ ns.runBackgroundOperations();
+ ns.dispose();
+ }
+
+ private void setProperty(DocumentNodeStore ns, String path, String key,
+ String value) throws CommitFailedException {
+ NodeBuilder rb = ns.getRoot().builder();
+ Path p = Path.fromString(path);
+ NodeBuilder b = rb;
+ for (String name : p.elements()) {
+ b = b.child(name);
+ }
+ b.setProperty(key, value);
+
+ ns.merge(rb, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ }
+
+ /**
+ * This creates a change (revisions) after an instance has
+ * crashed and has been recovered by another instance.
+ * The change is done in a few different subtrees and has
+ * not been fully committed yet (the commit value on the
+ * commit root has not been set yet)
+ * @throws CommitFailedException
+ */
+ private void createChangeAfterRecovery(int clusterId, int allowedOps,
+ String... paths) throws CommitFailedException {
+ DocumentNodeStore ns = createDNS(store, clusterId);
+
+ setStringProperty(ns, "/", "dummyPropForSweeper", "true");
+ ns.runBackgroundOperations();
+
+ @NotNull
+ NodeBuilder rb = ns.getRoot().builder();
+ for (String aPath : paths) {
+ Path p = Path.fromString(aPath);
+ NodeBuilder b = rb;
+ for (String name : p.elements()) {
+ b = b.child(name);
+ }
+ b.setProperty("crashleaf", "true");
+ }
+
+ store.getRemainingOps().set(allowedOps);
+ try {
+ ns.merge(rb, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ fail("did not fail");
+ } catch(CommitFailedException ce) {
+ // expected
+ }
+ }
+
+ private DocumentNodeStore createDNS(DocumentStore ds, int clusterId){
+ return create(ds, clusterId).getNodeStore();
+ }
+
+ private DocumentMK create(DocumentStore ds, int clusterId){
+ return
builderProvider.newBuilder().clock(virtualClock).setAsyncDelay(0)
+ .setDocumentStore(ds).setClusterId(clusterId).open();
+ }
+}