Author: mreutegg
Date: Tue Nov 19 09:24:32 2013
New Revision: 1543354

URL: http://svn.apache.org/r1543354
Log:
OAK-1198: Retry InMemory merge
- added test
- implemented a retry logic similar to the one in SegmentNodeStoreBranch

Added:
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/MergeRetryTest.java
   (with props)
Modified:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeStoreBranch.java

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeStoreBranch.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeStoreBranch.java?rev=1543354&r1=1543353&r2=1543354&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeStoreBranch.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeStoreBranch.java
 Tue Nov 19 09:24:32 2013
@@ -16,7 +16,11 @@
  */
 package org.apache.jackrabbit.oak.spi.state;
 
+import java.util.Random;
+
 import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.jackrabbit.oak.commons.PathUtils.elements;
 import static org.apache.jackrabbit.oak.commons.PathUtils.getName;
 import static org.apache.jackrabbit.oak.commons.PathUtils.getParentPath;
@@ -40,12 +44,16 @@ import org.apache.jackrabbit.oak.spi.com
 public abstract class AbstractNodeStoreBranch<S extends NodeStore, N extends 
NodeState>
         implements NodeStoreBranch {
 
+    private static final Random RANDOM = new Random();
+
     /** The underlying store to which this branch belongs */
     protected final S store;
 
     /** The dispatcher to report changes */
     protected final ChangeDispatcher dispatcher;
 
+    protected final long maximumBackoff;
+
     /**
      * State of the this branch. Either {@link Unmodified}, {@link InMemory}, 
{@link Persisted}
      * or {@link Merged}.
@@ -56,10 +64,17 @@ public abstract class AbstractNodeStoreB
     public AbstractNodeStoreBranch(S kernelNodeStore,
                                    ChangeDispatcher dispatcher,
                                    N base) {
+        this(kernelNodeStore, dispatcher, base, MILLISECONDS.convert(10, 
SECONDS));
+    }
 
+    public AbstractNodeStoreBranch(S kernelNodeStore,
+                                   ChangeDispatcher dispatcher,
+                                   N base,
+                                   long maximumBackoff) {
         this.store = checkNotNull(kernelNodeStore);
         this.dispatcher = dispatcher;
         branchState = new Unmodified(checkNotNull(base));
+        this.maximumBackoff = maximumBackoff;
     }
 
     /**
@@ -388,13 +403,31 @@ public abstract class AbstractNodeStoreB
         NodeState merge(@Nonnull CommitHook hook, CommitInfo info)
                 throws CommitFailedException {
             try {
-                rebase();
-                dispatcher.contentChanged(base, null);
-                NodeState toCommit = checkNotNull(hook).processCommit(base, 
head);
-                NodeState newHead = 
AbstractNodeStoreBranch.this.persist(toCommit, base, info);
-                dispatcher.contentChanged(newHead, info);
-                branchState = new Merged(base);
-                return newHead;
+                CommitFailedException ex = null;
+                for (long backoff = 1; backoff < maximumBackoff; backoff *= 2) 
{
+                    if (ex != null) {
+                        try {
+                            Thread.sleep(backoff, RANDOM.nextInt(1000000));
+                        } catch (InterruptedException ie) {
+                            // ignore
+                            Thread.interrupted();
+                        }
+                    }
+                    rebase();
+                    dispatcher.contentChanged(base, null);
+                    NodeState toCommit = 
checkNotNull(hook).processCommit(base, head);
+                    try {
+                        NodeState newHead = 
AbstractNodeStoreBranch.this.persist(toCommit, base, info);
+                        dispatcher.contentChanged(newHead, info);
+                        branchState = new Merged(base);
+                        return newHead;
+                    } catch (Exception e) {
+                        ex = new CommitFailedException(
+                                "Kernel", 1,
+                                "Failed to merge changes to the underlying 
store", e);
+                    }
+                }
+                throw ex;
             } finally {
                 dispatcher.contentChanged(getRoot(), null);
             }

Added: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/MergeRetryTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/MergeRetryTest.java?rev=1543354&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/MergeRetryTest.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/MergeRetryTest.java
 Tue Nov 19 09:24:32 2013
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.mongomk;
+
+import javax.annotation.CheckForNull;
+
+import org.apache.jackrabbit.mk.blobs.BlobStore;
+import org.apache.jackrabbit.mk.blobs.MemoryBlobStore;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.kernel.KernelNodeStore;
+import org.apache.jackrabbit.oak.spi.commit.CommitHook;
+import org.apache.jackrabbit.oak.spi.commit.DefaultEditor;
+import org.apache.jackrabbit.oak.spi.commit.Editor;
+import org.apache.jackrabbit.oak.spi.commit.EditorHook;
+import org.apache.jackrabbit.oak.spi.commit.EditorProvider;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.junit.Test;
+
+/**
+ * Test for OAK-1198
+ */
+public class MergeRetryTest {
+
+    // this hook adds a 'foo' child if it does not exist
+    private static final CommitHook HOOK = new EditorHook(new EditorProvider() 
{
+        @CheckForNull
+        @Override
+        public Editor getRootEditor(NodeState before,
+                                    NodeState after,
+                                    final NodeBuilder builder)
+                throws CommitFailedException {
+            return new DefaultEditor() {
+                @Override
+                public void enter(NodeState before, NodeState after)
+                        throws CommitFailedException {
+                    if (!builder.hasChildNode("foo")) {
+                        builder.child("foo");
+                    }
+                }
+            };
+        }
+    });
+
+    @Test
+    public void retryOnConflict() throws Exception {
+        MemoryDocumentStore ds = new MemoryDocumentStore();
+        MemoryBlobStore bs = new MemoryBlobStore();
+
+        MongoMK mk1 = createMK(1, 1000, ds, bs);
+        MongoMK mk2 = createMK(2, 1000, ds, bs);
+
+        try {
+            NodeStore ns1 = new KernelNodeStore(mk1);
+            NodeStore ns2 = new KernelNodeStore(mk2);
+
+            NodeBuilder builder1 = ns1.getRoot().builder();
+            builder1.child("bar");
+
+            NodeBuilder builder2 = ns2.getRoot().builder();
+            builder2.child("qux");
+
+            ns1.merge(builder1, HOOK, null);
+            ns2.merge(builder2, HOOK, null);
+        } finally {
+            mk1.dispose();
+            mk2.dispose();
+        }
+    }
+
+    private MongoMK createMK(int clusterId, int asyncDelay,
+                             DocumentStore ds, BlobStore bs) {
+        return new MongoMK.Builder().setDocumentStore(ds).setBlobStore(bs)
+                .setClusterId(clusterId).setAsyncDelay(asyncDelay).open();
+    }
+}

Propchange: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/MergeRetryTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/MergeRetryTest.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision Rev URL


Reply via email to