Author: mreutegg
Date: Wed Jun 10 12:55:09 2015
New Revision: 1684666
URL: http://svn.apache.org/r1684666
Log:
OAK-2868: Bypass CommitQueue for branch commits
Merged revision 1679232 from trunk
Modified:
jackrabbit/oak/branches/1.2/ (props changed)
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitQueueTest.java
Propchange: jackrabbit/oak/branches/1.2/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jun 10 12:55:09 2015
@@ -1,3 +1,3 @@
/jackrabbit/oak/branches/1.0:1665962
-/jackrabbit/oak/trunk:1672350,1672468,1672537,1672603,1672642,1672644,1672834-1672835,1673351,1673410,1673414-1673415,1673436,1673644,1673662-1673664,1673669,1673695,1673738,1673787,1673791,1674046,1674065,1674075,1674107,1674228,1674780,1674880,1675054-1675055,1675319,1675332,1675354,1675357,1675382,1675555,1675566,1675593,1676198,1676237,1676407,1676458,1676539,1676670,1676693,1676703,1676725,1677579,1677581,1677609,1677611,1677774,1677788,1677797,1677804,1677806,1677939,1677991,1678173,1678323,1678758,1678938,1678954,1679144,1679165,1679191,1679235,1679958,1679961,1680182,1680222,1680232,1680236,1680461,1680633,1680643,1680805-1680806,1680903,1681282,1681767,1681918,1682218,1682235,1682437,1682494,1682555,1682855,1682904,1683089,1683213,1683249,1683278,1683323,1683687,1684174-1684175,1684186,1684376,1684442,1684561,1684570,1684601,1684618
+/jackrabbit/oak/trunk:1672350,1672468,1672537,1672603,1672642,1672644,1672834-1672835,1673351,1673410,1673414-1673415,1673436,1673644,1673662-1673664,1673669,1673695,1673738,1673787,1673791,1674046,1674065,1674075,1674107,1674228,1674780,1674880,1675054-1675055,1675319,1675332,1675354,1675357,1675382,1675555,1675566,1675593,1676198,1676237,1676407,1676458,1676539,1676670,1676693,1676703,1676725,1677579,1677581,1677609,1677611,1677774,1677788,1677797,1677804,1677806,1677939,1677991,1678173,1678323,1678758,1678938,1678954,1679144,1679165,1679191,1679232,1679235,1679958,1679961,1680182,1680222,1680232,1680236,1680461,1680633,1680643,1680805-1680806,1680903,1681282,1681767,1681918,1682218,1682235,1682437,1682494,1682555,1682855,1682904,1683089,1683213,1683249,1683278,1683323,1683687,1684174-1684175,1684186,1684376,1684442,1684561,1684570,1684601,1684618
/jackrabbit/trunk:1345480
Modified:
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java?rev=1684666&r1=1684665&r2=1684666&view=diff
==============================================================================
---
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java
(original)
+++
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java
Wed Jun 10 12:55:09 2015
@@ -26,11 +26,7 @@ import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import org.apache.jackrabbit.oak.spi.commit.ChangeDispatcher;
-import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
-import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,20 +34,13 @@ import org.slf4j.LoggerFactory;
* <code>CommitQueue</code> ensures a sequence of commits consistent with the
* commit revision even if commits did not complete in this sequence.
*/
-class CommitQueue {
+abstract class CommitQueue {
static final Logger LOG = LoggerFactory.getLogger(CommitQueue.class);
- private final DocumentNodeStore store;
-
private final SortedMap<Revision, Entry> commits = new TreeMap<Revision,
Entry>(StableRevisionComparator.INSTANCE);
- private final ChangeDispatcher dispatcher;
-
- CommitQueue(DocumentNodeStore store, ChangeDispatcher dispatcher) {
- this.store = store;
- this.dispatcher = dispatcher;
- }
+ protected abstract Revision newRevision();
@Nonnull
Revision createRevision() {
@@ -65,7 +54,7 @@ class CommitQueue {
Revision rev = null;
synchronized (this) {
for (int i = 0; i < num; i++) {
- rev = store.newRevision();
+ rev = newRevision();
revs.add(rev);
}
commits.put(rev, new Entry(rev));
@@ -74,23 +63,26 @@ class CommitQueue {
return revs;
}
- void done(@Nonnull Commit commit, boolean isBranch, @Nullable CommitInfo
info) {
- checkNotNull(commit);
- if (isBranch) {
- try {
- commit.applyToCache(commit.getBaseRevision(), true);
- } finally {
- removeCommit(commit.getRevision());
- }
- } else {
- afterTrunkCommit(commit, info);
- }
+ void done(@Nonnull Revision revision, @Nonnull Callback c) {
+ checkNotNull(revision);
+ waitUntilHeadOfQueue(revision, c);
}
void canceled(@Nonnull Revision rev) {
removeCommit(rev);
}
+ boolean contains(@Nonnull Revision revision) {
+ synchronized (this) {
+ return commits.containsKey(checkNotNull(revision));
+ }
+ }
+
+ interface Callback {
+
+ void headOfQueue(@Nonnull Revision revision);
+ }
+
//------------------------< internal
>--------------------------------------
private void removeCommit(@Nonnull Revision rev) {
@@ -105,10 +97,9 @@ class CommitQueue {
}
}
- private void afterTrunkCommit(@Nonnull Commit commit,
- @Nullable CommitInfo info) {
+ private void waitUntilHeadOfQueue(@Nonnull Revision rev,
+ @Nonnull Callback c) {
assert !commits.isEmpty();
- Revision rev = commit.getRevision();
boolean isHead;
Entry commitEntry;
@@ -124,14 +115,7 @@ class CommitQueue {
commits.remove(rev);
try {
LOG.debug("removed {}, head is now {}", rev, commits.isEmpty()
? null : commits.firstKey());
- // remember before revision
- Revision before = store.getHeadRevision();
- // apply changes to cache based on before revision
- commit.applyToCache(before, false);
- // update head revision
- store.setHeadRevision(rev);
- NodeState root = store.getRoot();
- dispatcher.contentChanged(root, info);
+ c.headOfQueue(rev);
} finally {
// notify next if there is any
notifyHead();
Modified:
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1684666&r1=1684665&r2=1684666&view=diff
==============================================================================
---
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
(original)
+++
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
Wed Jun 10 12:55:09 2015
@@ -453,7 +453,12 @@ public final class DocumentNodeStore
getRevisionComparator().add(headRevision, Revision.newRevision(0));
dispatcher = new ChangeDispatcher(getRoot());
- commitQueue = new CommitQueue(this, dispatcher);
+ commitQueue = new CommitQueue() {
+ @Override
+ protected Revision newRevision() {
+ return DocumentNodeStore.this.newRevision();
+ }
+ };
String threadNamePostfix = "(" + clusterId + ")";
batchCommitQueue = new BatchCommitQueue(store, revisionComparator);
backgroundReadThread = new Thread(
@@ -590,19 +595,11 @@ public final class DocumentNodeStore
if (base == null) {
base = headRevision;
}
- backgroundOperationLock.readLock().lock();
- boolean success = false;
- Commit c;
- try {
- checkOpen();
- c = new Commit(this, commitQueue.createRevision(), base, branch);
- success = true;
- } finally {
- if (!success) {
- backgroundOperationLock.readLock().unlock();
- }
+ if (base.isBranch()) {
+ return newBranchCommit(base, branch);
+ } else {
+ return newTrunkCommit(base);
}
- return c;
}
/**
@@ -635,19 +632,37 @@ public final class DocumentNodeStore
return c;
}
- void done(@Nonnull Commit c, boolean isBranch, @Nullable CommitInfo info) {
- try {
- commitQueue.done(c, isBranch, info);
- } finally {
- backgroundOperationLock.readLock().unlock();
+ void done(final @Nonnull Commit c, boolean isBranch, final @Nullable
CommitInfo info) {
+ if (commitQueue.contains(c.getRevision())) {
+ try {
+ commitQueue.done(c.getRevision(), new CommitQueue.Callback() {
+ @Override
+ public void headOfQueue(@Nonnull Revision revision) {
+ // remember before revision
+ Revision before = getHeadRevision();
+ // apply changes to cache based on before revision
+ c.applyToCache(before, false);
+ // update head revision
+ setHeadRevision(c.getRevision());
+ dispatcher.contentChanged(getRoot(), info);
+ }
+ });
+ } finally {
+ backgroundOperationLock.readLock().unlock();
+ }
+ } else {
+ // branch commit
+ c.applyToCache(c.getBaseRevision(), isBranch);
}
}
void canceled(Commit c) {
- try {
- commitQueue.canceled(c.getRevision());
- } finally {
- backgroundOperationLock.readLock().unlock();
+ if (commitQueue.contains(c.getRevision())) {
+ try {
+ commitQueue.canceled(c.getRevision());
+ } finally {
+ backgroundOperationLock.readLock().unlock();
+ }
}
}
@@ -1890,6 +1905,36 @@ public final class DocumentNodeStore
//-----------------------------< internal
>---------------------------------
+ @Nonnull
+ private Commit newTrunkCommit(@Nonnull Revision base) {
+ checkArgument(!checkNotNull(base).isBranch(),
+ "base must not be a branch revision: " + base);
+
+ backgroundOperationLock.readLock().lock();
+ boolean success = false;
+ Commit c;
+ try {
+ checkOpen();
+ c = new Commit(this, commitQueue.createRevision(), base, null);
+ success = true;
+ } finally {
+ if (!success) {
+ backgroundOperationLock.readLock().unlock();
+ }
+ }
+ return c;
+ }
+
+ @Nonnull
+ private Commit newBranchCommit(@Nonnull Revision base,
+ @Nullable DocumentNodeStoreBranch branch) {
+ checkArgument(checkNotNull(base).isBranch(),
+ "base must be a branch revision: " + base);
+
+ checkOpen();
+ return new Commit(this, newRevision(), base, branch);
+ }
+
/**
* Checks if this store is still open and throws an
* {@link IllegalStateException} if it is already disposed (or a dispose
Modified:
jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitQueueTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitQueueTest.java?rev=1684666&r1=1684665&r2=1684666&view=diff
==============================================================================
---
jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitQueueTest.java
(original)
+++
jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitQueueTest.java
Wed Jun 10 12:55:09 2015
@@ -18,7 +18,6 @@ package org.apache.jackrabbit.oak.plugin
import java.io.Closeable;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -26,30 +25,36 @@ import java.util.concurrent.atomic.Atomi
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import org.apache.jackrabbit.oak.spi.commit.ChangeDispatcher;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
import org.apache.jackrabbit.oak.spi.commit.Observer;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import static org.junit.Assert.assertEquals;
+import static java.util.Collections.synchronizedList;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.fail;
/**
* Tests for {@link CommitQueue}.
*/
public class CommitQueueTest {
+ private static final Logger LOG =
LoggerFactory.getLogger(CommitQueueTest.class);
+
private static final int NUM_WRITERS = 10;
private static final int COMMITS_PER_WRITER = 100;
+ private List<Exception> exceptions = synchronizedList(new
ArrayList<Exception>());
+
@Test
public void concurrentCommits() throws Exception {
final DocumentNodeStore store = new
DocumentMK.Builder().getNodeStore();
AtomicBoolean running = new AtomicBoolean(true);
- final List<Exception> exceptions = Collections.synchronizedList(new
ArrayList<Exception>());
Closeable observer = store.addObserver(new Observer() {
private Revision before = new Revision(0, 0, store.getClusterId());
@@ -58,7 +63,7 @@ public class CommitQueueTest {
public void contentChanged(@Nonnull NodeState root, @Nullable
CommitInfo info) {
DocumentNodeState after = (DocumentNodeState) root;
Revision r = after.getRevision();
- System.out.println("seen: " + r);
+ LOG.debug("seen: {}", r);
if (r.compareRevisionTime(before) < 0) {
exceptions.add(new Exception(
"Inconsistent revision sequence. Before: " +
@@ -106,46 +111,103 @@ public class CommitQueueTest {
running.set(false);
observer.close();
store.dispose();
- for (Exception e : exceptions) {
- throw e;
- }
+ assertNoExceptions();
}
- // OAK-2867
@Test
- public void doneFailsWithException() throws Exception {
- final DocumentNodeStore store = new
DocumentMK.Builder().getNodeStore();
- final CommitQueue commits = new CommitQueue(store,
- new ChangeDispatcher(store.getRoot()));
- Revision r = commits.createRevision();
- Commit c = new Commit(store, r, store.getHeadRevision(),
- store.createBranch(store.getRoot())) {
+ public void concurrentCommits2() throws Exception {
+ final CommitQueue queue = new CommitQueue() {
@Override
- public void applyToCache(Revision before, boolean isBranchCommit) {
- throw new RuntimeException("applyToCache");
+ protected Revision newRevision() {
+ return Revision.newRevision(1);
}
};
- try {
- commits.done(c, true, null);
- fail("must fail with RuntimeException");
- } catch (Exception e) {
- assertEquals("applyToCache", e.getMessage());
+ final CommitQueue.Callback c = new CommitQueue.Callback() {
+ private Revision before = Revision.newRevision(1);
+
+ @Override
+ public void headOfQueue(@Nonnull Revision r) {
+ LOG.debug("seen: {}", r);
+ if (r.compareRevisionTime(before) < 0) {
+ exceptions.add(new Exception(
+ "Inconsistent revision sequence. Before: " +
+ before + ", after: " + r));
+ }
+ before = r;
+ }
+ };
+
+ // perform commits with multiple threads
+ List<Thread> writers = new ArrayList<Thread>();
+ for (int i = 0; i < NUM_WRITERS; i++) {
+ final Random random = new Random(i);
+ writers.add(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ for (int i = 0; i < COMMITS_PER_WRITER; i++) {
+ Revision r = queue.createRevision();
+ try {
+ Thread.sleep(0, random.nextInt(1000));
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ if (random.nextInt(5) == 0) {
+ // cancel 20% of the commits
+ queue.canceled(r);
+ } else {
+ queue.done(r, c);
+ }
+ }
+ } catch (Exception e) {
+ exceptions.add(e);
+ }
+ }
+ }));
}
+ for (Thread t : writers) {
+ t.start();
+ }
+ for (Thread t : writers) {
+ t.join();
+ }
+ assertNoExceptions();
+ }
+
+ // OAK-2868
+ @Test
+ public void branchCommitMustNotBlockTrunkCommit() throws Exception {
+ final DocumentNodeStore ds = new DocumentMK.Builder().getNodeStore();
+
+ // simulate start of a branch commit
+ Commit c = ds.newCommit(ds.getHeadRevision().asBranchRevision(), null);
Thread t = new Thread(new Runnable() {
@Override
public void run() {
- Commit c = new Commit(store, commits.createRevision(),
- store.getHeadRevision(), null);
- commits.done(c, false, null);
+ try {
+ NodeBuilder builder = ds.getRoot().builder();
+ builder.child("foo");
+ ds.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ } catch (CommitFailedException e) {
+ exceptions.add(e);
+ }
}
});
t.start();
+
t.join(3000);
assertFalse("Commit did not succeed within 3 seconds", t.isAlive());
- store.dispose();
+ ds.canceled(c);
+ ds.dispose();
+ assertNoExceptions();
}
+ private void assertNoExceptions() throws Exception {
+ if (!exceptions.isEmpty()) {
+ throw exceptions.get(0);
+ }
+ }
}