Author: mreutegg
Date: Wed Jun 10 12:55:09 2015
New Revision: 1684666

URL: http://svn.apache.org/r1684666
Log:
OAK-2868: Bypass CommitQueue for branch commits

Merged revision 1679232 from trunk

Modified:
    jackrabbit/oak/branches/1.2/   (props changed)
    
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java
    
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
    
jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitQueueTest.java

Propchange: jackrabbit/oak/branches/1.2/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jun 10 12:55:09 2015
@@ -1,3 +1,3 @@
 /jackrabbit/oak/branches/1.0:1665962
-/jackrabbit/oak/trunk:1672350,1672468,1672537,1672603,1672642,1672644,1672834-1672835,1673351,1673410,1673414-1673415,1673436,1673644,1673662-1673664,1673669,1673695,1673738,1673787,1673791,1674046,1674065,1674075,1674107,1674228,1674780,1674880,1675054-1675055,1675319,1675332,1675354,1675357,1675382,1675555,1675566,1675593,1676198,1676237,1676407,1676458,1676539,1676670,1676693,1676703,1676725,1677579,1677581,1677609,1677611,1677774,1677788,1677797,1677804,1677806,1677939,1677991,1678173,1678323,1678758,1678938,1678954,1679144,1679165,1679191,1679235,1679958,1679961,1680182,1680222,1680232,1680236,1680461,1680633,1680643,1680805-1680806,1680903,1681282,1681767,1681918,1682218,1682235,1682437,1682494,1682555,1682855,1682904,1683089,1683213,1683249,1683278,1683323,1683687,1684174-1684175,1684186,1684376,1684442,1684561,1684570,1684601,1684618
+/jackrabbit/oak/trunk:1672350,1672468,1672537,1672603,1672642,1672644,1672834-1672835,1673351,1673410,1673414-1673415,1673436,1673644,1673662-1673664,1673669,1673695,1673738,1673787,1673791,1674046,1674065,1674075,1674107,1674228,1674780,1674880,1675054-1675055,1675319,1675332,1675354,1675357,1675382,1675555,1675566,1675593,1676198,1676237,1676407,1676458,1676539,1676670,1676693,1676703,1676725,1677579,1677581,1677609,1677611,1677774,1677788,1677797,1677804,1677806,1677939,1677991,1678173,1678323,1678758,1678938,1678954,1679144,1679165,1679191,1679232,1679235,1679958,1679961,1680182,1680222,1680232,1680236,1680461,1680633,1680643,1680805-1680806,1680903,1681282,1681767,1681918,1682218,1682235,1682437,1682494,1682555,1682855,1682904,1683089,1683213,1683249,1683278,1683323,1683687,1684174-1684175,1684186,1684376,1684442,1684561,1684570,1684601,1684618
 /jackrabbit/trunk:1345480

Modified: 
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java?rev=1684666&r1=1684665&r2=1684666&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java
 (original)
+++ 
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java
 Wed Jun 10 12:55:09 2015
@@ -26,11 +26,7 @@ import java.util.TreeSet;
 import java.util.concurrent.CountDownLatch;
 
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 
-import org.apache.jackrabbit.oak.spi.commit.ChangeDispatcher;
-import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
-import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,20 +34,13 @@ import org.slf4j.LoggerFactory;
  * <code>CommitQueue</code> ensures a sequence of commits consistent with the
  * commit revision even if commits did not complete in this sequence.
  */
-class CommitQueue {
+abstract class CommitQueue {
 
     static final Logger LOG = LoggerFactory.getLogger(CommitQueue.class);
 
-    private final DocumentNodeStore store;
-
     private final SortedMap<Revision, Entry> commits = new TreeMap<Revision, 
Entry>(StableRevisionComparator.INSTANCE);
 
-    private final ChangeDispatcher dispatcher;
-
-    CommitQueue(DocumentNodeStore store, ChangeDispatcher dispatcher) {
-        this.store = store;
-        this.dispatcher = dispatcher;
-    }
+    protected abstract Revision newRevision();
 
     @Nonnull
     Revision createRevision() {
@@ -65,7 +54,7 @@ class CommitQueue {
         Revision rev = null;
         synchronized (this) {
             for (int i = 0; i < num; i++) {
-                rev = store.newRevision();
+                rev = newRevision();
                 revs.add(rev);
             }
             commits.put(rev, new Entry(rev));
@@ -74,23 +63,26 @@ class CommitQueue {
         return revs;
     }
 
-    void done(@Nonnull Commit commit, boolean isBranch, @Nullable CommitInfo 
info) {
-        checkNotNull(commit);
-        if (isBranch) {
-            try {
-                commit.applyToCache(commit.getBaseRevision(), true);
-            } finally {
-                removeCommit(commit.getRevision());
-            }
-        } else {
-            afterTrunkCommit(commit, info);
-        }
+    void done(@Nonnull Revision revision, @Nonnull Callback c) {
+        checkNotNull(revision);
+        waitUntilHeadOfQueue(revision, c);
     }
 
     void canceled(@Nonnull Revision rev) {
         removeCommit(rev);
     }
 
+    boolean contains(@Nonnull Revision revision) {
+        synchronized (this) {
+            return commits.containsKey(checkNotNull(revision));
+        }
+    }
+
+    interface Callback {
+
+        void headOfQueue(@Nonnull Revision revision);
+    }
+
     //------------------------< internal 
>--------------------------------------
 
     private void removeCommit(@Nonnull Revision rev) {
@@ -105,10 +97,9 @@ class CommitQueue {
         }
     }
 
-    private void afterTrunkCommit(@Nonnull Commit commit,
-                                  @Nullable CommitInfo info) {
+    private void waitUntilHeadOfQueue(@Nonnull Revision rev,
+                                      @Nonnull Callback c) {
         assert !commits.isEmpty();
-        Revision rev = commit.getRevision();
 
         boolean isHead;
         Entry commitEntry;
@@ -124,14 +115,7 @@ class CommitQueue {
             commits.remove(rev);
             try {
                 LOG.debug("removed {}, head is now {}", rev, commits.isEmpty() 
? null : commits.firstKey());
-                // remember before revision
-                Revision before = store.getHeadRevision();
-                // apply changes to cache based on before revision
-                commit.applyToCache(before, false);
-                // update head revision
-                store.setHeadRevision(rev);
-                NodeState root = store.getRoot();
-                dispatcher.contentChanged(root, info);
+                c.headOfQueue(rev);
             } finally {
                 // notify next if there is any
                 notifyHead();

Modified: 
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1684666&r1=1684665&r2=1684666&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
 (original)
+++ 
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
 Wed Jun 10 12:55:09 2015
@@ -453,7 +453,12 @@ public final class DocumentNodeStore
         getRevisionComparator().add(headRevision, Revision.newRevision(0));
 
         dispatcher = new ChangeDispatcher(getRoot());
-        commitQueue = new CommitQueue(this, dispatcher);
+        commitQueue = new CommitQueue() {
+            @Override
+            protected Revision newRevision() {
+                return DocumentNodeStore.this.newRevision();
+            }
+        };
         String threadNamePostfix = "(" + clusterId + ")";
         batchCommitQueue = new BatchCommitQueue(store, revisionComparator);
         backgroundReadThread = new Thread(
@@ -590,19 +595,11 @@ public final class DocumentNodeStore
         if (base == null) {
             base = headRevision;
         }
-        backgroundOperationLock.readLock().lock();
-        boolean success = false;
-        Commit c;
-        try {
-            checkOpen();
-            c = new Commit(this, commitQueue.createRevision(), base, branch);
-            success = true;
-        } finally {
-            if (!success) {
-                backgroundOperationLock.readLock().unlock();
-            }
+        if (base.isBranch()) {
+            return newBranchCommit(base, branch);
+        } else {
+            return newTrunkCommit(base);
         }
-        return c;
     }
 
     /**
@@ -635,19 +632,37 @@ public final class DocumentNodeStore
         return c;
     }
 
-    void done(@Nonnull Commit c, boolean isBranch, @Nullable CommitInfo info) {
-        try {
-            commitQueue.done(c, isBranch, info);
-        } finally {
-            backgroundOperationLock.readLock().unlock();
+    void done(final @Nonnull Commit c, boolean isBranch, final @Nullable 
CommitInfo info) {
+        if (commitQueue.contains(c.getRevision())) {
+            try {
+                commitQueue.done(c.getRevision(), new CommitQueue.Callback() {
+                    @Override
+                    public void headOfQueue(@Nonnull Revision revision) {
+                        // remember before revision
+                        Revision before = getHeadRevision();
+                        // apply changes to cache based on before revision
+                        c.applyToCache(before, false);
+                        // update head revision
+                        setHeadRevision(c.getRevision());
+                        dispatcher.contentChanged(getRoot(), info);
+                    }
+                });
+            } finally {
+                backgroundOperationLock.readLock().unlock();
+            }
+        } else {
+            // branch commit
+            c.applyToCache(c.getBaseRevision(), isBranch);
         }
     }
 
     void canceled(Commit c) {
-        try {
-            commitQueue.canceled(c.getRevision());
-        } finally {
-            backgroundOperationLock.readLock().unlock();
+        if (commitQueue.contains(c.getRevision())) {
+            try {
+                commitQueue.canceled(c.getRevision());
+            } finally {
+                backgroundOperationLock.readLock().unlock();
+            }
         }
     }
 
@@ -1890,6 +1905,36 @@ public final class DocumentNodeStore
 
     //-----------------------------< internal 
>---------------------------------
 
+    @Nonnull
+    private Commit newTrunkCommit(@Nonnull Revision base) {
+        checkArgument(!checkNotNull(base).isBranch(),
+                "base must not be a branch revision: " + base);
+
+        backgroundOperationLock.readLock().lock();
+        boolean success = false;
+        Commit c;
+        try {
+            checkOpen();
+            c = new Commit(this, commitQueue.createRevision(), base, null);
+            success = true;
+        } finally {
+            if (!success) {
+                backgroundOperationLock.readLock().unlock();
+            }
+        }
+        return c;
+    }
+
+    @Nonnull
+    private Commit newBranchCommit(@Nonnull Revision base,
+                                   @Nullable DocumentNodeStoreBranch branch) {
+        checkArgument(checkNotNull(base).isBranch(),
+                "base must be a branch revision: " + base);
+
+        checkOpen();
+        return new Commit(this, newRevision(), base, branch);
+    }
+
     /**
      * Checks if this store is still open and throws an
      * {@link IllegalStateException} if it is already disposed (or a dispose

Modified: 
jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitQueueTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitQueueTest.java?rev=1684666&r1=1684665&r2=1684666&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitQueueTest.java
 (original)
+++ 
jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitQueueTest.java
 Wed Jun 10 12:55:09 2015
@@ -18,7 +18,6 @@ package org.apache.jackrabbit.oak.plugin
 
 import java.io.Closeable;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -26,30 +25,36 @@ import java.util.concurrent.atomic.Atomi
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import org.apache.jackrabbit.oak.spi.commit.ChangeDispatcher;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
 import org.apache.jackrabbit.oak.spi.commit.Observer;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import static org.junit.Assert.assertEquals;
+import static java.util.Collections.synchronizedList;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.fail;
 
 /**
  * Tests for {@link CommitQueue}.
  */
 public class CommitQueueTest {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(CommitQueueTest.class);
+
     private static final int NUM_WRITERS = 10;
 
     private static final int COMMITS_PER_WRITER = 100;
 
+    private List<Exception> exceptions = synchronizedList(new 
ArrayList<Exception>());
+
     @Test
     public void concurrentCommits() throws Exception {
         final DocumentNodeStore store = new 
DocumentMK.Builder().getNodeStore();
         AtomicBoolean running = new AtomicBoolean(true);
-        final List<Exception> exceptions = Collections.synchronizedList(new 
ArrayList<Exception>());
 
         Closeable observer = store.addObserver(new Observer() {
             private Revision before = new Revision(0, 0, store.getClusterId());
@@ -58,7 +63,7 @@ public class CommitQueueTest {
             public void contentChanged(@Nonnull NodeState root, @Nullable 
CommitInfo info) {
                 DocumentNodeState after = (DocumentNodeState) root;
                 Revision r = after.getRevision();
-                System.out.println("seen: " + r);
+                LOG.debug("seen: {}", r);
                 if (r.compareRevisionTime(before) < 0) {
                     exceptions.add(new Exception(
                             "Inconsistent revision sequence. Before: " +
@@ -106,46 +111,103 @@ public class CommitQueueTest {
         running.set(false);
         observer.close();
         store.dispose();
-        for (Exception e : exceptions) {
-            throw e;
-        }
+        assertNoExceptions();
     }
 
-    // OAK-2867
     @Test
-    public void doneFailsWithException() throws Exception {
-        final DocumentNodeStore store = new 
DocumentMK.Builder().getNodeStore();
-        final CommitQueue commits = new CommitQueue(store,
-                new ChangeDispatcher(store.getRoot()));
-        Revision r = commits.createRevision();
-        Commit c = new Commit(store, r, store.getHeadRevision(),
-                store.createBranch(store.getRoot())) {
+    public void concurrentCommits2() throws Exception {
+        final CommitQueue queue = new CommitQueue() {
             @Override
-            public void applyToCache(Revision before, boolean isBranchCommit) {
-                throw new RuntimeException("applyToCache");
+            protected Revision newRevision() {
+                return Revision.newRevision(1);
             }
         };
 
-        try {
-            commits.done(c, true, null);
-            fail("must fail with RuntimeException");
-        } catch (Exception e) {
-            assertEquals("applyToCache", e.getMessage());
+        final CommitQueue.Callback c = new CommitQueue.Callback() {
+            private Revision before = Revision.newRevision(1);
+
+            @Override
+            public void headOfQueue(@Nonnull Revision r) {
+                LOG.debug("seen: {}", r);
+                if (r.compareRevisionTime(before) < 0) {
+                    exceptions.add(new Exception(
+                            "Inconsistent revision sequence. Before: " +
+                                    before + ", after: " + r));
+                }
+                before = r;
+            }
+        };
+
+        // perform commits with multiple threads
+        List<Thread> writers = new ArrayList<Thread>();
+        for (int i = 0; i < NUM_WRITERS; i++) {
+            final Random random = new Random(i);
+            writers.add(new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        for (int i = 0; i < COMMITS_PER_WRITER; i++) {
+                            Revision r = queue.createRevision();
+                            try {
+                                Thread.sleep(0, random.nextInt(1000));
+                            } catch (InterruptedException e) {
+                                // ignore
+                            }
+                            if (random.nextInt(5) == 0) {
+                                // cancel 20% of the commits
+                                queue.canceled(r);
+                            } else {
+                                queue.done(r, c);
+                            }
+                        }
+                    } catch (Exception e) {
+                        exceptions.add(e);
+                    }
+                }
+            }));
         }
+        for (Thread t : writers) {
+            t.start();
+        }
+        for (Thread t : writers) {
+            t.join();
+        }
+        assertNoExceptions();
+    }
+
+    // OAK-2868
+    @Test
+    public void branchCommitMustNotBlockTrunkCommit() throws Exception {
+        final DocumentNodeStore ds = new DocumentMK.Builder().getNodeStore();
+
+        // simulate start of a branch commit
+        Commit c = ds.newCommit(ds.getHeadRevision().asBranchRevision(), null);
 
         Thread t = new Thread(new Runnable() {
             @Override
             public void run() {
-                Commit c = new Commit(store, commits.createRevision(),
-                        store.getHeadRevision(), null);
-                commits.done(c, false, null);
+                try {
+                    NodeBuilder builder = ds.getRoot().builder();
+                    builder.child("foo");
+                    ds.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+                } catch (CommitFailedException e) {
+                    exceptions.add(e);
+                }
             }
         });
         t.start();
+
         t.join(3000);
         assertFalse("Commit did not succeed within 3 seconds", t.isAlive());
 
-        store.dispose();
+        ds.canceled(c);
+        ds.dispose();
+        assertNoExceptions();
     }
 
+    private void assertNoExceptions() throws Exception {
+        if (!exceptions.isEmpty()) {
+            throw exceptions.get(0);
+        }
+    }
 }


Reply via email to