Author: mreutegg
Date: Wed Jun 10 10:17:32 2015
New Revision: 1684631

URL: http://svn.apache.org/r1684631
Log:
OAK-2822: Release merge lock in retry loop

Re-implemented improvement for 1.0 branch. Trunk commit for this issue cannot 
be merged because 1.0 branch diverged from trunk.

Modified:
    
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStoreBranch.java
    
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java
    
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentRootBuilder.java
    
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeStoreBranch.java
    
jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java

Modified: 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStoreBranch.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStoreBranch.java?rev=1684631&r1=1684630&r2=1684631&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStoreBranch.java
 (original)
+++ 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStoreBranch.java
 Wed Jun 10 10:17:32 2015
@@ -59,11 +59,15 @@ public class KernelNodeStoreBranch exten
         }
     };
 
+    /** Lock for coordinating concurrent merge operations */
+    private final Lock mergeLock;
+
     public KernelNodeStoreBranch(KernelNodeStore kernelNodeStore,
                                  ChangeDispatcher dispatcher,
                                  Lock mergeLock,
                                  KernelNodeState base) {
-        super(kernelNodeStore, dispatcher, mergeLock, base);
+        super(kernelNodeStore, dispatcher, base);
+        this.mergeLock = mergeLock;
     }
 
     //----------------------< AbstractNodeStoreBranch 
>-------------------------
@@ -145,7 +149,7 @@ public class KernelNodeStoreBranch exten
     public NodeState merge(@Nonnull CommitHook hook, @Nonnull CommitInfo info)
             throws CommitFailedException {
         try {
-            return super.merge(hook, info);
+            return merge0(hook, info, mergeLock);
         } catch (MicroKernelException e) {
             throw new CommitFailedException(MERGE, 1,
                     "Failed to merge changes to the underlying MicroKernel", 
e);

Modified: 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java?rev=1684631&r1=1684630&r2=1684631&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java
 (original)
+++ 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java
 Wed Jun 10 10:17:32 2015
@@ -30,7 +30,6 @@ import org.apache.jackrabbit.oak.spi.sta
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 
 import static com.google.common.base.Preconditions.checkState;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.jackrabbit.oak.api.CommitFailedException.MERGE;
 import static org.apache.jackrabbit.oak.api.CommitFailedException.OAK;
 
@@ -46,7 +45,7 @@ class DocumentNodeStoreBranch
     DocumentNodeStoreBranch(DocumentNodeStore store,
                             DocumentNodeState base,
                             ReadWriteLock mergeLock) {
-        super(store, new ChangeDispatcher(store.getRoot()), 
mergeLock.readLock(),
+        super(store, new ChangeDispatcher(store.getRoot()),
                 base, null, store.getMaxBackOffMillis(),
                 store.getMaxBackOffMillis() * 3);
         this.mergeLock = mergeLock;
@@ -145,7 +144,7 @@ class DocumentNodeStoreBranch
     public NodeState merge(@Nonnull CommitHook hook, @Nonnull CommitInfo info)
             throws CommitFailedException {
         try {
-            return super.merge(hook, info);
+            return merge0(hook, info, mergeLock.readLock());
         } catch (CommitFailedException e) {
             if (!e.isOfType(MERGE)) {
                 throw e;
@@ -153,26 +152,20 @@ class DocumentNodeStoreBranch
         }
         // retry with exclusive lock, blocking other
         // concurrent writes
-        // do not wait forever
-        boolean acquired = false;
-        try {
-            acquired = mergeLock.writeLock()
-                    .tryLock(maxLockTryTimeMS, MILLISECONDS);
-        } catch (InterruptedException e) {
-            // ignore and proceed with shared lock used in base class
-        }
-        try {
-            return super.merge(hook, info);
-        } finally {
-            if (acquired) {
-                mergeLock.writeLock().unlock();
-            }
-        }
+        return merge0(hook, info, mergeLock.writeLock());
     }
 
     //------------------------------< internal 
>--------------------------------
 
     /**
+     * For test purposes only!
+     */
+    @Nonnull
+    ReadWriteLock getMergeLock() {
+        return mergeLock;
+    }
+
+    /**
      * Persist some changes on top of the given base state.
      *
      * @param op the changes to persist.

Modified: 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentRootBuilder.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentRootBuilder.java?rev=1684631&r1=1684630&r2=1684631&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentRootBuilder.java
 (original)
+++ 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentRootBuilder.java
 Wed Jun 10 10:17:32 2015
@@ -96,7 +96,7 @@ class DocumentRootBuilder extends Abstra
 
     @Override
     protected void updated() {
-        if (updates++ > UPDATE_LIMIT) {
+        if (++updates > UPDATE_LIMIT) {
             purge();
         }
     }

Modified: 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeStoreBranch.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeStoreBranch.java?rev=1684631&r1=1684630&r2=1684631&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeStoreBranch.java
 (original)
+++ 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeStoreBranch.java
 Wed Jun 10 10:17:32 2015
@@ -30,6 +30,7 @@ import static org.apache.jackrabbit.oak.
 import static org.apache.jackrabbit.oak.commons.PathUtils.getName;
 import static org.apache.jackrabbit.oak.commons.PathUtils.getParentPath;
 
+import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 
 import org.apache.jackrabbit.oak.api.CommitFailedException;
@@ -76,9 +77,6 @@ public abstract class AbstractNodeStoreB
      */
     protected final long maxLockTryTimeMS;
 
-    /** Lock for coordinating concurrent merge operations */
-    private final Lock mergeLock;
-
     /**
      * State of the this branch. Either {@link Unmodified}, {@link InMemory}, 
{@link Persisted}
      * or {@link Merged}.
@@ -88,23 +86,20 @@ public abstract class AbstractNodeStoreB
 
     public AbstractNodeStoreBranch(S kernelNodeStore,
                                    ChangeDispatcher dispatcher,
-                                   Lock mergeLock,
                                    N base) {
-        this(kernelNodeStore, dispatcher, mergeLock, base, null,
+        this(kernelNodeStore, dispatcher, base, null,
                 MILLISECONDS.convert(10, SECONDS),
                 Integer.MAX_VALUE); // default: wait 'forever'
     }
 
     public AbstractNodeStoreBranch(S kernelNodeStore,
                                    ChangeDispatcher dispatcher,
-                                   Lock mergeLock,
                                    N base,
                                    N head,
                                    long maximumBackoff,
                                    long maxLockTryTimeMS) {
         this.store = checkNotNull(kernelNodeStore);
         this.dispatcher = dispatcher;
-        this.mergeLock = checkNotNull(mergeLock);
         if (head == null) {
             this.branchState = new Unmodified(checkNotNull(base));
         } else {
@@ -303,9 +298,17 @@ public abstract class AbstractNodeStoreB
         return true;
     }
 
-    @Nonnull
     @Override
-    public NodeState merge(@Nonnull CommitHook hook, @Nonnull CommitInfo info)
+    public void rebase() {
+        branchState.rebase();
+    }
+
+    //----------------------------< internal 
>----------------------------------
+
+    @Nonnull
+    protected NodeState merge0(@Nonnull CommitHook hook,
+                               @Nonnull CommitInfo info,
+                               @Nonnull Lock lock)
             throws CommitFailedException {
         CommitFailedException ex = null;
         long time = System.currentTimeMillis();
@@ -323,28 +326,17 @@ public abstract class AbstractNodeStoreB
                 }
             }
             try {
-                final long start = perfLogger.start();
-                boolean acquired = mergeLock.tryLock(maxLockTryTimeMS, 
MILLISECONDS);
-                perfLogger.end(start, 1, "Merge - Acquired lock");
-                try {
-                    return branchState.merge(checkNotNull(hook), 
checkNotNull(info));
-                } catch (CommitFailedException e) {
-                    log.trace("Merge Error", e);
-                    ex = e;
-                    // only retry on merge failures. these may be caused by
-                    // changes introduce by a commit hook and may be resolved
-                    // by a rebase and running the hook again
-                    if (!e.isOfType(MERGE)) {
-                        throw e;
-                    }
-                } finally {
-                    if (acquired) {
-                        mergeLock.unlock();
-                    }
+                return branchState.merge(checkNotNull(hook),
+                        checkNotNull(info), lock);
+            } catch (CommitFailedException e) {
+                log.trace("Merge Error", e);
+                ex = e;
+                // only retry on merge failures. these may be caused by
+                // changes introduce by a commit hook and may be resolved
+                // by a rebase and running the hook again
+                if (!e.isOfType(MERGE)) {
+                    throw e;
                 }
-            } catch (InterruptedException e) {
-                throw new CommitFailedException(OAK, 1,
-                        "Unable to acquire merge lock", e);
             }
         }
         // if we get here retrying failed
@@ -354,13 +346,35 @@ public abstract class AbstractNodeStoreB
                 ex.getCode(), msg, ex.getCause());
     }
 
-    @Override
-    public void rebase() {
-        branchState.rebase();
+    /**
+     * Acquires the merge lock.
+     *
+     * @param lock the lock to acquire.
+     * @return the acquired merge lock or {@code null} if the operation timed
+     * out.
+     * @throws CommitFailedException if the current thread is interrupted while
+     *                               acquiring the lock
+     */
+    @CheckForNull
+    private Lock acquireMergeLock(@Nonnull Lock lock)
+            throws CommitFailedException {
+        final long start = perfLogger.start();
+        boolean acquired;
+        try {
+            acquired = lock.tryLock(maxLockTryTimeMS, MILLISECONDS);
+        } catch (InterruptedException e) {
+            throw new CommitFailedException(OAK, 1,
+                    "Unable to acquire merge lock", e);
+        }
+        if (acquired) {
+            perfLogger.end(start, 1, "Merge - Acquired lock");
+            return lock;
+        } else {
+            log.info("Time out while acquiring merge lock");
+            return null;
+        }
     }
 
-    //----------------------------< internal 
>----------------------------------
-
     private NodeState getNode(String path) {
         NodeState node = getHead();
         for (String name : elements(path)) {
@@ -414,6 +428,7 @@ public abstract class AbstractNodeStoreB
          *
          * @param hook the commit hook to run.
          * @param info the associated commit info.
+         * @param lock the lock to acquire while performing the merge.
          * @return the result of the merge.
          * @throws CommitFailedException if a commit hook rejected the changes
          *          or the actual merge operation failed. An implementation 
must
@@ -421,8 +436,9 @@ public abstract class AbstractNodeStoreB
          *          indicate the cause of the exception.
          */
         @Nonnull
-        abstract NodeState merge(
-                @Nonnull CommitHook hook, @Nonnull CommitInfo info)
+        abstract NodeState merge(@Nonnull CommitHook hook,
+                                 @Nonnull CommitInfo info,
+                                 @Nonnull Lock lock)
                 throws CommitFailedException;
     }
 
@@ -433,7 +449,7 @@ public abstract class AbstractNodeStoreB
      * <ul>
      *     <li>{@link InMemory} on {@link #setRoot(NodeState)} if the new root 
differs
      *         from the current base</li>.
-     *     <li>{@link Merged} on {@link #merge(CommitHook, CommitInfo)}</li>
+     *     <li>{@link Merged} on {@link BranchState#merge(CommitHook, 
CommitInfo, Lock)}</li>
      * </ul>
      */
     private class Unmodified extends BranchState {
@@ -466,7 +482,9 @@ public abstract class AbstractNodeStoreB
 
         @Override
         @Nonnull
-        NodeState merge(@Nonnull CommitHook hook, @Nonnull CommitInfo info) {
+        NodeState merge(@Nonnull CommitHook hook,
+                        @Nonnull CommitInfo info,
+                        @Nonnull Lock lock) {
             branchState = new Merged(base);
             return base;
         }
@@ -481,7 +499,7 @@ public abstract class AbstractNodeStoreB
      *     <li>{@link Unmodified} on {@link #setRoot(NodeState)} if the new 
root is the same
      *         as the base of this branch or
      *     <li>{@link Persisted} otherwise.
-     *     <li>{@link Merged} on {@link #merge(CommitHook, CommitInfo)}</li>
+     *     <li>{@link Merged} on {@link BranchState#merge(CommitHook, 
CommitInfo, Lock)}</li>
      * </ul>
      */
     private class InMemory extends BranchState {
@@ -525,25 +543,34 @@ public abstract class AbstractNodeStoreB
 
         @Override
         @Nonnull
-        NodeState merge(@Nonnull CommitHook hook, @Nonnull CommitInfo info)
+        NodeState merge(@Nonnull CommitHook hook,
+                        @Nonnull CommitInfo info,
+                        @Nonnull Lock lock)
                 throws CommitFailedException {
             checkNotNull(hook);
             checkNotNull(info);
+            Lock mergeLock = acquireMergeLock(lock);
             try {
-                rebase();
-                dispatcher.contentChanged(base, null);
-                NodeState toCommit = hook.processCommit(base, head, info);
                 try {
-                    NodeState newHead = 
AbstractNodeStoreBranch.this.persist(toCommit, base, info);
-                    dispatcher.contentChanged(newHead, info);
-                    branchState = new Merged(base);
-                    return newHead;
-                } catch (Exception e) {
-                    throw convertUnchecked(e,
-                            "Failed to merge changes to the underlying store");
+                    rebase();
+                    dispatcher.contentChanged(base, null);
+                    NodeState toCommit = hook.processCommit(base, head, info);
+                    try {
+                        NodeState newHead = 
AbstractNodeStoreBranch.this.persist(toCommit, base, info);
+                        dispatcher.contentChanged(newHead, info);
+                        branchState = new Merged(base);
+                        return newHead;
+                    } catch (Exception e) {
+                        throw convertUnchecked(e,
+                                "Failed to merge changes to the underlying 
store");
+                    }
+                } finally {
+                    dispatcher.contentChanged(getRoot(), null);
                 }
             } finally {
-                dispatcher.contentChanged(getRoot(), null);
+                if (mergeLock != null) {
+                    mergeLock.unlock();
+                }
             }
         }
     }
@@ -554,8 +581,8 @@ public abstract class AbstractNodeStoreB
      * <p>
      * Transitions to:
      * <ul>
-     *     <li>{@link ResetFailed} on failed reset in {@link 
#merge(CommitHook, CommitInfo)}</li>
-     *     <li>{@link Merged} on successful {@link #merge(CommitHook, 
CommitInfo)}</li>
+     *     <li>{@link ResetFailed} on failed reset in {@link 
BranchState#merge(CommitHook, CommitInfo, Lock)}</li>
+     *     <li>{@link Merged} on successful {@link 
BranchState#merge(CommitHook, CommitInfo, Lock)}</li>
      * </ul>
      */
     private class Persisted extends BranchState {
@@ -610,10 +637,12 @@ public abstract class AbstractNodeStoreB
         @Override
         @Nonnull
         NodeState merge(@Nonnull final CommitHook hook,
-                        @Nonnull final CommitInfo info)
+                        @Nonnull final CommitInfo info,
+                        @Nonnull Lock lock)
                 throws CommitFailedException {
             boolean success = false;
             N previousHead = head;
+            Lock mergeLock = acquireMergeLock(lock);
             try {
                 rebase();
                 previousHead = head;
@@ -638,10 +667,12 @@ public abstract class AbstractNodeStoreB
                             "Failed to merge changes to the underlying store", 
e);
                 }
             } finally {
+                if (mergeLock != null) {
+                    mergeLock.unlock();
+                }
                 if (!success) {
                     resetBranch(head, previousHead);
                 }
-                dispatcher.contentChanged(getRoot(), null);
             }
         }
 
@@ -694,7 +725,9 @@ public abstract class AbstractNodeStoreB
 
         @Override
         @Nonnull
-        NodeState merge(@Nonnull CommitHook hook, @Nonnull CommitInfo info) {
+        NodeState merge(@Nonnull CommitHook hook,
+                        @Nonnull CommitInfo info,
+                        @Nonnull Lock lock) {
             throw new IllegalStateException("Branch has already been merged");
         }
     }
@@ -741,7 +774,9 @@ public abstract class AbstractNodeStoreB
          */
         @Nonnull
         @Override
-        NodeState merge(@Nonnull CommitHook hook, @Nonnull CommitInfo info)
+        NodeState merge(@Nonnull CommitHook hook,
+                        @Nonnull CommitInfo info,
+                        @Nonnull Lock lock)
                 throws CommitFailedException {
             throw ex;
         }

Modified: 
jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java?rev=1684631&r1=1684630&r2=1684631&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java
 (original)
+++ 
jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java
 Wed Jun 10 10:17:32 2015
@@ -50,6 +50,8 @@ import java.util.concurrent.CountDownLat
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
@@ -1747,6 +1749,75 @@ public class DocumentNodeStoreTest {
         ns.dispose();
     }
 
+    // OAK-2620
+    @Test
+    public void nonBlockingReset() throws Exception {
+        final List<String> failure = Lists.newArrayList();
+        final AtomicReference<ReentrantReadWriteLock> mergeLock
+                = new AtomicReference<ReentrantReadWriteLock>();
+        MemoryDocumentStore store = new MemoryDocumentStore() {
+            @Override
+            public <T extends Document> T findAndUpdate(Collection<T> 
collection,
+                                                        UpdateOp update) {
+                for (Map.Entry<UpdateOp.Key, UpdateOp.Operation> entry : 
update.getChanges().entrySet()) {
+                    if 
(entry.getKey().getName().equals(NodeDocument.COLLISIONS)) {
+                        ReentrantReadWriteLock rwLock = mergeLock.get();
+                        if (rwLock.getReadHoldCount() > 0
+                                || rwLock.getWriteHoldCount() > 0) {
+                            failure.add("Branch reset still holds merge lock");
+                            break;
+                        }
+                    }
+                }
+                return super.findAndUpdate(collection, update);
+            }
+        };
+        DocumentNodeStore ds = new DocumentMK.Builder()
+                .setDocumentStore(store)
+                .setAsyncDelay(0).getNodeStore();
+        ds.setMaxBackOffMillis(0); // do not retry merges
+
+        DocumentNodeState root = ds.getRoot();
+        final DocumentNodeStoreBranch b = ds.createBranch(root);
+        // branch state is now Unmodified
+
+        assertTrue(b.getMergeLock() instanceof ReentrantReadWriteLock);
+        mergeLock.set((ReentrantReadWriteLock) b.getMergeLock());
+
+        NodeBuilder builder = root.builder();
+        builder.child("foo");
+        b.setRoot(builder.getNodeState());
+        // branch state is now InMemory
+        builder.child("bar");
+        b.setRoot(builder.getNodeState());
+        // branch state is now Persisted
+
+        try {
+            b.merge(new CommitHook() {
+                @Nonnull
+                @Override
+                public NodeState processCommit(NodeState before,
+                                               NodeState after,
+                                               CommitInfo info)
+                        throws CommitFailedException {
+                    NodeBuilder foo = after.builder().child("foo");
+                    for (int i = 0; i <= DocumentRootBuilder.UPDATE_LIMIT; 
i++) {
+                        foo.setProperty("prop", i);
+                    }
+                    throw new CommitFailedException("Fail", 0, "");
+                }
+            }, CommitInfo.EMPTY);
+        } catch (CommitFailedException e) {
+            // expected
+        }
+
+        ds.dispose();
+
+        for (String s : failure) {
+            fail(s);
+        }
+    }
+
     private static DocumentNodeState asDocumentNodeState(NodeState state) {
         if (!(state instanceof DocumentNodeState)) {
             throw new IllegalArgumentException("Not a DocumentNodeState");


Reply via email to