Author: mreutegg
Date: Tue Nov 19 09:24:32 2013
New Revision: 1543354
URL: http://svn.apache.org/r1543354
Log:
OAK-1198: Retry InMemory merge
- added test
- implemented a retry logic similar to the one in SegmentNodeStoreBranch
Added:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/MergeRetryTest.java
(with props)
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeStoreBranch.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeStoreBranch.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeStoreBranch.java?rev=1543354&r1=1543353&r2=1543354&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeStoreBranch.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeStoreBranch.java
Tue Nov 19 09:24:32 2013
@@ -16,7 +16,11 @@
*/
package org.apache.jackrabbit.oak.spi.state;
+import java.util.Random;
+
import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.jackrabbit.oak.commons.PathUtils.elements;
import static org.apache.jackrabbit.oak.commons.PathUtils.getName;
import static org.apache.jackrabbit.oak.commons.PathUtils.getParentPath;
@@ -40,12 +44,16 @@ import org.apache.jackrabbit.oak.spi.com
public abstract class AbstractNodeStoreBranch<S extends NodeStore, N extends
NodeState>
implements NodeStoreBranch {
+ private static final Random RANDOM = new Random();
+
/** The underlying store to which this branch belongs */
protected final S store;
/** The dispatcher to report changes */
protected final ChangeDispatcher dispatcher;
+ protected final long maximumBackoff;
+
/**
* State of the this branch. Either {@link Unmodified}, {@link InMemory},
{@link Persisted}
* or {@link Merged}.
@@ -56,10 +64,17 @@ public abstract class AbstractNodeStoreB
public AbstractNodeStoreBranch(S kernelNodeStore,
ChangeDispatcher dispatcher,
N base) {
+ this(kernelNodeStore, dispatcher, base, MILLISECONDS.convert(10,
SECONDS));
+ }
+ public AbstractNodeStoreBranch(S kernelNodeStore,
+ ChangeDispatcher dispatcher,
+ N base,
+ long maximumBackoff) {
this.store = checkNotNull(kernelNodeStore);
this.dispatcher = dispatcher;
branchState = new Unmodified(checkNotNull(base));
+ this.maximumBackoff = maximumBackoff;
}
/**
@@ -388,13 +403,31 @@ public abstract class AbstractNodeStoreB
NodeState merge(@Nonnull CommitHook hook, CommitInfo info)
throws CommitFailedException {
try {
- rebase();
- dispatcher.contentChanged(base, null);
- NodeState toCommit = checkNotNull(hook).processCommit(base,
head);
- NodeState newHead =
AbstractNodeStoreBranch.this.persist(toCommit, base, info);
- dispatcher.contentChanged(newHead, info);
- branchState = new Merged(base);
- return newHead;
+ CommitFailedException ex = null;
+ for (long backoff = 1; backoff < maximumBackoff; backoff *= 2)
{
+ if (ex != null) {
+ try {
+ Thread.sleep(backoff, RANDOM.nextInt(1000000));
+ } catch (InterruptedException ie) {
+ // ignore
+ Thread.interrupted();
+ }
+ }
+ rebase();
+ dispatcher.contentChanged(base, null);
+ NodeState toCommit =
checkNotNull(hook).processCommit(base, head);
+ try {
+ NodeState newHead =
AbstractNodeStoreBranch.this.persist(toCommit, base, info);
+ dispatcher.contentChanged(newHead, info);
+ branchState = new Merged(base);
+ return newHead;
+ } catch (Exception e) {
+ ex = new CommitFailedException(
+ "Kernel", 1,
+ "Failed to merge changes to the underlying
store", e);
+ }
+ }
+ throw ex;
} finally {
dispatcher.contentChanged(getRoot(), null);
}
Added:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/MergeRetryTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/MergeRetryTest.java?rev=1543354&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/MergeRetryTest.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/MergeRetryTest.java
Tue Nov 19 09:24:32 2013
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.mongomk;
+
+import javax.annotation.CheckForNull;
+
+import org.apache.jackrabbit.mk.blobs.BlobStore;
+import org.apache.jackrabbit.mk.blobs.MemoryBlobStore;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.kernel.KernelNodeStore;
+import org.apache.jackrabbit.oak.spi.commit.CommitHook;
+import org.apache.jackrabbit.oak.spi.commit.DefaultEditor;
+import org.apache.jackrabbit.oak.spi.commit.Editor;
+import org.apache.jackrabbit.oak.spi.commit.EditorHook;
+import org.apache.jackrabbit.oak.spi.commit.EditorProvider;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.junit.Test;
+
+/**
+ * Test for OAK-1198
+ */
+public class MergeRetryTest {
+
+ // this hook adds a 'foo' child if it does not exist
+ private static final CommitHook HOOK = new EditorHook(new EditorProvider()
{
+ @CheckForNull
+ @Override
+ public Editor getRootEditor(NodeState before,
+ NodeState after,
+ final NodeBuilder builder)
+ throws CommitFailedException {
+ return new DefaultEditor() {
+ @Override
+ public void enter(NodeState before, NodeState after)
+ throws CommitFailedException {
+ if (!builder.hasChildNode("foo")) {
+ builder.child("foo");
+ }
+ }
+ };
+ }
+ });
+
+ @Test
+ public void retryOnConflict() throws Exception {
+ MemoryDocumentStore ds = new MemoryDocumentStore();
+ MemoryBlobStore bs = new MemoryBlobStore();
+
+ MongoMK mk1 = createMK(1, 1000, ds, bs);
+ MongoMK mk2 = createMK(2, 1000, ds, bs);
+
+ try {
+ NodeStore ns1 = new KernelNodeStore(mk1);
+ NodeStore ns2 = new KernelNodeStore(mk2);
+
+ NodeBuilder builder1 = ns1.getRoot().builder();
+ builder1.child("bar");
+
+ NodeBuilder builder2 = ns2.getRoot().builder();
+ builder2.child("qux");
+
+ ns1.merge(builder1, HOOK, null);
+ ns2.merge(builder2, HOOK, null);
+ } finally {
+ mk1.dispose();
+ mk2.dispose();
+ }
+ }
+
+ private MongoMK createMK(int clusterId, int asyncDelay,
+ DocumentStore ds, BlobStore bs) {
+ return new MongoMK.Builder().setDocumentStore(ds).setBlobStore(bs)
+ .setClusterId(clusterId).setAsyncDelay(asyncDelay).open();
+ }
+}
Propchange:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/MergeRetryTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/MergeRetryTest.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision Rev URL