Author: mreutegg
Date: Wed Jun 10 10:17:32 2015
New Revision: 1684631
URL: http://svn.apache.org/r1684631
Log:
OAK-2822: Release merge lock in retry loop
Re-implemented improvement for 1.0 branch. Trunk commit for this issue cannot
be merged because 1.0 branch diverged from trunk.
Modified:
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStoreBranch.java
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentRootBuilder.java
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeStoreBranch.java
jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java
Modified:
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStoreBranch.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStoreBranch.java?rev=1684631&r1=1684630&r2=1684631&view=diff
==============================================================================
---
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStoreBranch.java
(original)
+++
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStoreBranch.java
Wed Jun 10 10:17:32 2015
@@ -59,11 +59,15 @@ public class KernelNodeStoreBranch exten
}
};
+ /** Lock for coordinating concurrent merge operations */
+ private final Lock mergeLock;
+
public KernelNodeStoreBranch(KernelNodeStore kernelNodeStore,
ChangeDispatcher dispatcher,
Lock mergeLock,
KernelNodeState base) {
- super(kernelNodeStore, dispatcher, mergeLock, base);
+ super(kernelNodeStore, dispatcher, base);
+ this.mergeLock = mergeLock;
}
//----------------------< AbstractNodeStoreBranch
>-------------------------
@@ -145,7 +149,7 @@ public class KernelNodeStoreBranch exten
public NodeState merge(@Nonnull CommitHook hook, @Nonnull CommitInfo info)
throws CommitFailedException {
try {
- return super.merge(hook, info);
+ return merge0(hook, info, mergeLock);
} catch (MicroKernelException e) {
throw new CommitFailedException(MERGE, 1,
"Failed to merge changes to the underlying MicroKernel",
e);
Modified:
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java?rev=1684631&r1=1684630&r2=1684631&view=diff
==============================================================================
---
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java
(original)
+++
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java
Wed Jun 10 10:17:32 2015
@@ -30,7 +30,6 @@ import org.apache.jackrabbit.oak.spi.sta
import org.apache.jackrabbit.oak.spi.state.NodeState;
import static com.google.common.base.Preconditions.checkState;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.jackrabbit.oak.api.CommitFailedException.MERGE;
import static org.apache.jackrabbit.oak.api.CommitFailedException.OAK;
@@ -46,7 +45,7 @@ class DocumentNodeStoreBranch
DocumentNodeStoreBranch(DocumentNodeStore store,
DocumentNodeState base,
ReadWriteLock mergeLock) {
- super(store, new ChangeDispatcher(store.getRoot()),
mergeLock.readLock(),
+ super(store, new ChangeDispatcher(store.getRoot()),
base, null, store.getMaxBackOffMillis(),
store.getMaxBackOffMillis() * 3);
this.mergeLock = mergeLock;
@@ -145,7 +144,7 @@ class DocumentNodeStoreBranch
public NodeState merge(@Nonnull CommitHook hook, @Nonnull CommitInfo info)
throws CommitFailedException {
try {
- return super.merge(hook, info);
+ return merge0(hook, info, mergeLock.readLock());
} catch (CommitFailedException e) {
if (!e.isOfType(MERGE)) {
throw e;
@@ -153,26 +152,20 @@ class DocumentNodeStoreBranch
}
// retry with exclusive lock, blocking other
// concurrent writes
- // do not wait forever
- boolean acquired = false;
- try {
- acquired = mergeLock.writeLock()
- .tryLock(maxLockTryTimeMS, MILLISECONDS);
- } catch (InterruptedException e) {
- // ignore and proceed with shared lock used in base class
- }
- try {
- return super.merge(hook, info);
- } finally {
- if (acquired) {
- mergeLock.writeLock().unlock();
- }
- }
+ return merge0(hook, info, mergeLock.writeLock());
}
//------------------------------< internal
>--------------------------------
/**
+ * For test purposes only!
+ */
+ @Nonnull
+ ReadWriteLock getMergeLock() {
+ return mergeLock;
+ }
+
+ /**
* Persist some changes on top of the given base state.
*
* @param op the changes to persist.
Modified:
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentRootBuilder.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentRootBuilder.java?rev=1684631&r1=1684630&r2=1684631&view=diff
==============================================================================
---
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentRootBuilder.java
(original)
+++
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentRootBuilder.java
Wed Jun 10 10:17:32 2015
@@ -96,7 +96,7 @@ class DocumentRootBuilder extends Abstra
@Override
protected void updated() {
- if (updates++ > UPDATE_LIMIT) {
+ if (++updates > UPDATE_LIMIT) {
purge();
}
}
Modified:
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeStoreBranch.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeStoreBranch.java?rev=1684631&r1=1684630&r2=1684631&view=diff
==============================================================================
---
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeStoreBranch.java
(original)
+++
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeStoreBranch.java
Wed Jun 10 10:17:32 2015
@@ -30,6 +30,7 @@ import static org.apache.jackrabbit.oak.
import static org.apache.jackrabbit.oak.commons.PathUtils.getName;
import static org.apache.jackrabbit.oak.commons.PathUtils.getParentPath;
+import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import org.apache.jackrabbit.oak.api.CommitFailedException;
@@ -76,9 +77,6 @@ public abstract class AbstractNodeStoreB
*/
protected final long maxLockTryTimeMS;
- /** Lock for coordinating concurrent merge operations */
- private final Lock mergeLock;
-
/**
* State of the this branch. Either {@link Unmodified}, {@link InMemory},
{@link Persisted}
* or {@link Merged}.
@@ -88,23 +86,20 @@ public abstract class AbstractNodeStoreB
public AbstractNodeStoreBranch(S kernelNodeStore,
ChangeDispatcher dispatcher,
- Lock mergeLock,
N base) {
- this(kernelNodeStore, dispatcher, mergeLock, base, null,
+ this(kernelNodeStore, dispatcher, base, null,
MILLISECONDS.convert(10, SECONDS),
Integer.MAX_VALUE); // default: wait 'forever'
}
public AbstractNodeStoreBranch(S kernelNodeStore,
ChangeDispatcher dispatcher,
- Lock mergeLock,
N base,
N head,
long maximumBackoff,
long maxLockTryTimeMS) {
this.store = checkNotNull(kernelNodeStore);
this.dispatcher = dispatcher;
- this.mergeLock = checkNotNull(mergeLock);
if (head == null) {
this.branchState = new Unmodified(checkNotNull(base));
} else {
@@ -303,9 +298,17 @@ public abstract class AbstractNodeStoreB
return true;
}
- @Nonnull
@Override
- public NodeState merge(@Nonnull CommitHook hook, @Nonnull CommitInfo info)
+ public void rebase() {
+ branchState.rebase();
+ }
+
+ //----------------------------< internal
>----------------------------------
+
+ @Nonnull
+ protected NodeState merge0(@Nonnull CommitHook hook,
+ @Nonnull CommitInfo info,
+ @Nonnull Lock lock)
throws CommitFailedException {
CommitFailedException ex = null;
long time = System.currentTimeMillis();
@@ -323,28 +326,17 @@ public abstract class AbstractNodeStoreB
}
}
try {
- final long start = perfLogger.start();
- boolean acquired = mergeLock.tryLock(maxLockTryTimeMS,
MILLISECONDS);
- perfLogger.end(start, 1, "Merge - Acquired lock");
- try {
- return branchState.merge(checkNotNull(hook),
checkNotNull(info));
- } catch (CommitFailedException e) {
- log.trace("Merge Error", e);
- ex = e;
- // only retry on merge failures. these may be caused by
- // changes introduce by a commit hook and may be resolved
- // by a rebase and running the hook again
- if (!e.isOfType(MERGE)) {
- throw e;
- }
- } finally {
- if (acquired) {
- mergeLock.unlock();
- }
+ return branchState.merge(checkNotNull(hook),
+ checkNotNull(info), lock);
+ } catch (CommitFailedException e) {
+ log.trace("Merge Error", e);
+ ex = e;
+ // only retry on merge failures. these may be caused by
+ // changes introduce by a commit hook and may be resolved
+ // by a rebase and running the hook again
+ if (!e.isOfType(MERGE)) {
+ throw e;
}
- } catch (InterruptedException e) {
- throw new CommitFailedException(OAK, 1,
- "Unable to acquire merge lock", e);
}
}
// if we get here retrying failed
@@ -354,13 +346,35 @@ public abstract class AbstractNodeStoreB
ex.getCode(), msg, ex.getCause());
}
- @Override
- public void rebase() {
- branchState.rebase();
+ /**
+ * Acquires the merge lock.
+ *
+ * @param lock the lock to acquire.
+ * @return the acquired merge lock or {@code null} if the operation timed
+ * out.
+ * @throws CommitFailedException if the current thread is interrupted while
+ * acquiring the lock
+ */
+ @CheckForNull
+ private Lock acquireMergeLock(@Nonnull Lock lock)
+ throws CommitFailedException {
+ final long start = perfLogger.start();
+ boolean acquired;
+ try {
+ acquired = lock.tryLock(maxLockTryTimeMS, MILLISECONDS);
+ } catch (InterruptedException e) {
+ throw new CommitFailedException(OAK, 1,
+ "Unable to acquire merge lock", e);
+ }
+ if (acquired) {
+ perfLogger.end(start, 1, "Merge - Acquired lock");
+ return lock;
+ } else {
+ log.info("Time out while acquiring merge lock");
+ return null;
+ }
}
- //----------------------------< internal
>----------------------------------
-
private NodeState getNode(String path) {
NodeState node = getHead();
for (String name : elements(path)) {
@@ -414,6 +428,7 @@ public abstract class AbstractNodeStoreB
*
* @param hook the commit hook to run.
* @param info the associated commit info.
+ * @param lock the lock to acquire while performing the merge.
* @return the result of the merge.
* @throws CommitFailedException if a commit hook rejected the changes
* or the actual merge operation failed. An implementation
must
@@ -421,8 +436,9 @@ public abstract class AbstractNodeStoreB
* indicate the cause of the exception.
*/
@Nonnull
- abstract NodeState merge(
- @Nonnull CommitHook hook, @Nonnull CommitInfo info)
+ abstract NodeState merge(@Nonnull CommitHook hook,
+ @Nonnull CommitInfo info,
+ @Nonnull Lock lock)
throws CommitFailedException;
}
@@ -433,7 +449,7 @@ public abstract class AbstractNodeStoreB
* <ul>
* <li>{@link InMemory} on {@link #setRoot(NodeState)} if the new root
differs
* from the current base</li>.
- * <li>{@link Merged} on {@link #merge(CommitHook, CommitInfo)}</li>
+ * <li>{@link Merged} on {@link BranchState#merge(CommitHook,
CommitInfo, Lock)}</li>
* </ul>
*/
private class Unmodified extends BranchState {
@@ -466,7 +482,9 @@ public abstract class AbstractNodeStoreB
@Override
@Nonnull
- NodeState merge(@Nonnull CommitHook hook, @Nonnull CommitInfo info) {
+ NodeState merge(@Nonnull CommitHook hook,
+ @Nonnull CommitInfo info,
+ @Nonnull Lock lock) {
branchState = new Merged(base);
return base;
}
@@ -481,7 +499,7 @@ public abstract class AbstractNodeStoreB
* <li>{@link Unmodified} on {@link #setRoot(NodeState)} if the new
root is the same
* as the base of this branch or
* <li>{@link Persisted} otherwise.
- * <li>{@link Merged} on {@link #merge(CommitHook, CommitInfo)}</li>
+ * <li>{@link Merged} on {@link BranchState#merge(CommitHook,
CommitInfo, Lock)}</li>
* </ul>
*/
private class InMemory extends BranchState {
@@ -525,25 +543,34 @@ public abstract class AbstractNodeStoreB
@Override
@Nonnull
- NodeState merge(@Nonnull CommitHook hook, @Nonnull CommitInfo info)
+ NodeState merge(@Nonnull CommitHook hook,
+ @Nonnull CommitInfo info,
+ @Nonnull Lock lock)
throws CommitFailedException {
checkNotNull(hook);
checkNotNull(info);
+ Lock mergeLock = acquireMergeLock(lock);
try {
- rebase();
- dispatcher.contentChanged(base, null);
- NodeState toCommit = hook.processCommit(base, head, info);
try {
- NodeState newHead =
AbstractNodeStoreBranch.this.persist(toCommit, base, info);
- dispatcher.contentChanged(newHead, info);
- branchState = new Merged(base);
- return newHead;
- } catch (Exception e) {
- throw convertUnchecked(e,
- "Failed to merge changes to the underlying store");
+ rebase();
+ dispatcher.contentChanged(base, null);
+ NodeState toCommit = hook.processCommit(base, head, info);
+ try {
+ NodeState newHead =
AbstractNodeStoreBranch.this.persist(toCommit, base, info);
+ dispatcher.contentChanged(newHead, info);
+ branchState = new Merged(base);
+ return newHead;
+ } catch (Exception e) {
+ throw convertUnchecked(e,
+ "Failed to merge changes to the underlying
store");
+ }
+ } finally {
+ dispatcher.contentChanged(getRoot(), null);
}
} finally {
- dispatcher.contentChanged(getRoot(), null);
+ if (mergeLock != null) {
+ mergeLock.unlock();
+ }
}
}
}
@@ -554,8 +581,8 @@ public abstract class AbstractNodeStoreB
* <p>
* Transitions to:
* <ul>
- * <li>{@link ResetFailed} on failed reset in {@link
#merge(CommitHook, CommitInfo)}</li>
- * <li>{@link Merged} on successful {@link #merge(CommitHook,
CommitInfo)}</li>
+ * <li>{@link ResetFailed} on failed reset in {@link
BranchState#merge(CommitHook, CommitInfo, Lock)}</li>
+ * <li>{@link Merged} on successful {@link
BranchState#merge(CommitHook, CommitInfo, Lock)}</li>
* </ul>
*/
private class Persisted extends BranchState {
@@ -610,10 +637,12 @@ public abstract class AbstractNodeStoreB
@Override
@Nonnull
NodeState merge(@Nonnull final CommitHook hook,
- @Nonnull final CommitInfo info)
+ @Nonnull final CommitInfo info,
+ @Nonnull Lock lock)
throws CommitFailedException {
boolean success = false;
N previousHead = head;
+ Lock mergeLock = acquireMergeLock(lock);
try {
rebase();
previousHead = head;
@@ -638,10 +667,12 @@ public abstract class AbstractNodeStoreB
"Failed to merge changes to the underlying store",
e);
}
} finally {
+ if (mergeLock != null) {
+ mergeLock.unlock();
+ }
if (!success) {
resetBranch(head, previousHead);
}
- dispatcher.contentChanged(getRoot(), null);
}
}
@@ -694,7 +725,9 @@ public abstract class AbstractNodeStoreB
@Override
@Nonnull
- NodeState merge(@Nonnull CommitHook hook, @Nonnull CommitInfo info) {
+ NodeState merge(@Nonnull CommitHook hook,
+ @Nonnull CommitInfo info,
+ @Nonnull Lock lock) {
throw new IllegalStateException("Branch has already been merged");
}
}
@@ -741,7 +774,9 @@ public abstract class AbstractNodeStoreB
*/
@Nonnull
@Override
- NodeState merge(@Nonnull CommitHook hook, @Nonnull CommitInfo info)
+ NodeState merge(@Nonnull CommitHook hook,
+ @Nonnull CommitInfo info,
+ @Nonnull Lock lock)
throws CommitFailedException {
throw ex;
}
Modified:
jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java?rev=1684631&r1=1684630&r2=1684631&view=diff
==============================================================================
---
jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java
(original)
+++
jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java
Wed Jun 10 10:17:32 2015
@@ -50,6 +50,8 @@ import java.util.concurrent.CountDownLat
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
@@ -1747,6 +1749,75 @@ public class DocumentNodeStoreTest {
ns.dispose();
}
+ // OAK-2620
+ @Test
+ public void nonBlockingReset() throws Exception {
+ final List<String> failure = Lists.newArrayList();
+ final AtomicReference<ReentrantReadWriteLock> mergeLock
+ = new AtomicReference<ReentrantReadWriteLock>();
+ MemoryDocumentStore store = new MemoryDocumentStore() {
+ @Override
+ public <T extends Document> T findAndUpdate(Collection<T>
collection,
+ UpdateOp update) {
+ for (Map.Entry<UpdateOp.Key, UpdateOp.Operation> entry :
update.getChanges().entrySet()) {
+ if
(entry.getKey().getName().equals(NodeDocument.COLLISIONS)) {
+ ReentrantReadWriteLock rwLock = mergeLock.get();
+ if (rwLock.getReadHoldCount() > 0
+ || rwLock.getWriteHoldCount() > 0) {
+ failure.add("Branch reset still holds merge lock");
+ break;
+ }
+ }
+ }
+ return super.findAndUpdate(collection, update);
+ }
+ };
+ DocumentNodeStore ds = new DocumentMK.Builder()
+ .setDocumentStore(store)
+ .setAsyncDelay(0).getNodeStore();
+ ds.setMaxBackOffMillis(0); // do not retry merges
+
+ DocumentNodeState root = ds.getRoot();
+ final DocumentNodeStoreBranch b = ds.createBranch(root);
+ // branch state is now Unmodified
+
+ assertTrue(b.getMergeLock() instanceof ReentrantReadWriteLock);
+ mergeLock.set((ReentrantReadWriteLock) b.getMergeLock());
+
+ NodeBuilder builder = root.builder();
+ builder.child("foo");
+ b.setRoot(builder.getNodeState());
+ // branch state is now InMemory
+ builder.child("bar");
+ b.setRoot(builder.getNodeState());
+ // branch state is now Persisted
+
+ try {
+ b.merge(new CommitHook() {
+ @Nonnull
+ @Override
+ public NodeState processCommit(NodeState before,
+ NodeState after,
+ CommitInfo info)
+ throws CommitFailedException {
+ NodeBuilder foo = after.builder().child("foo");
+ for (int i = 0; i <= DocumentRootBuilder.UPDATE_LIMIT;
i++) {
+ foo.setProperty("prop", i);
+ }
+ throw new CommitFailedException("Fail", 0, "");
+ }
+ }, CommitInfo.EMPTY);
+ } catch (CommitFailedException e) {
+ // expected
+ }
+
+ ds.dispose();
+
+ for (String s : failure) {
+ fail(s);
+ }
+ }
+
private static DocumentNodeState asDocumentNodeState(NodeState state) {
if (!(state instanceof DocumentNodeState)) {
throw new IllegalArgumentException("Not a DocumentNodeState");