Author: adulceanu
Date: Thu Apr 27 09:55:46 2017
New Revision: 1792857
URL: http://svn.apache.org/viewvc?rev=1792857&view=rev
Log:
OAK-6074 - Simplify merge logic in LockBasedScheduler
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedScheduler.java
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/MergeTest.java
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedScheduler.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedScheduler.java?rev=1792857&r1=1792856&r2=1792857&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedScheduler.java
(original)
+++
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedScheduler.java
Thu Apr 27 09:55:46 2017
@@ -18,17 +18,12 @@ package org.apache.jackrabbit.oak.segmen
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
-import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.currentThread;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
-import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.jackrabbit.oak.api.Type.LONG;
import java.io.Closeable;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Semaphore;
@@ -73,19 +68,18 @@ public class LockBasedScheduler implemen
@Nonnull
private StatisticsProvider statsProvider = StatisticsProvider.NOOP;
-
+
private boolean dispatchChanges = true;
- private long maximumBackoff = MILLISECONDS.convert(10, SECONDS);
-
- private LockBasedSchedulerBuilder(@Nonnull Revisions revisions,
- @Nonnull SegmentReader reader) {
+ private LockBasedSchedulerBuilder(@Nonnull Revisions revisions,
@Nonnull SegmentReader reader) {
this.revisions = revisions;
this.reader = reader;
}
-
+
/**
- * {@link StatisticsProvider} for collecting statistics related to
SegmentStore
+ * {@link StatisticsProvider} for collecting statistics related to
+ * SegmentStore
+ *
* @param statisticsProvider
* @return this instance
*/
@@ -94,26 +88,20 @@ public class LockBasedScheduler implemen
this.statsProvider = checkNotNull(statisticsProvider);
return this;
}
-
+
@Nonnull
public LockBasedSchedulerBuilder dispatchChanges(boolean
dispatchChanges) {
this.dispatchChanges = dispatchChanges;
return this;
}
-
- @Nonnull
- public LockBasedSchedulerBuilder withMaximumBackoff(long
maximumBackoff) {
- this.maximumBackoff = maximumBackoff;
- return this;
- }
-
+
@Nonnull
public LockBasedScheduler build() {
return new LockBasedScheduler(this);
}
-
+
}
-
+
public static LockBasedSchedulerBuilder builder(@Nonnull Revisions
revisions, @Nonnull SegmentReader reader) {
return new LockBasedSchedulerBuilder(checkNotNull(revisions),
checkNotNull(reader));
}
@@ -150,12 +138,8 @@ public class LockBasedScheduler implemen
private final AtomicReference<SegmentNodeState> head;
private final ChangeDispatcher changeDispatcher;
-
- private final Random random = new Random();
-
+
private final SegmentNodeStoreStats stats;
-
- private final long maximumBackoff;
public LockBasedScheduler(LockBasedSchedulerBuilder builder) {
if (COMMIT_FAIR_LOCK) {
@@ -170,9 +154,8 @@ public class LockBasedScheduler implemen
} else {
this.changeDispatcher = null;
}
-
+
this.stats = new SegmentNodeStoreStats(builder.statsProvider);
- this.maximumBackoff = builder.maximumBackoff;
}
@Override
@@ -182,7 +165,7 @@ public class LockBasedScheduler implemen
}
return NOOP;
}
-
+
@Override
public NodeState getHeadNodeState() {
if (commitSemaphore.tryAcquire()) {
@@ -194,7 +177,7 @@ public class LockBasedScheduler implemen
}
return head.get();
}
-
+
/**
* Refreshes the head state. Should only be called while holding a permit
* from the {@link #commitSemaphore}.
@@ -217,9 +200,10 @@ public class LockBasedScheduler implemen
changeDispatcher.contentChanged(root, info);
}
}
-
+
@Override
- public NodeState schedule(@Nonnull Commit commit, SchedulerOption...
schedulingOptions) throws CommitFailedException {
+ public NodeState schedule(@Nonnull Commit commit, SchedulerOption...
schedulingOptions)
+ throws CommitFailedException {
boolean queued = false;
try {
@@ -260,95 +244,22 @@ public class LockBasedScheduler implemen
}
}
- private NodeState execute(Commit commit)
- throws CommitFailedException, InterruptedException {
+ private NodeState execute(Commit commit) throws CommitFailedException,
InterruptedException {
// only do the merge if there are some changes to commit
if (commit.hasChanges()) {
- long timeout = optimisticMerge(commit);
- if (timeout >= 0) {
- pessimisticMerge(commit, timeout);
- }
- }
- return head.get().getChildNode(ROOT);
- }
-
- private long optimisticMerge(Commit commit)
- throws CommitFailedException, InterruptedException {
- long timeout = 1;
-
- // use exponential backoff in case of concurrent commits
- for (long backoff = 1; backoff < maximumBackoff; backoff *= 2) {
- long start = System.nanoTime();
-
refreshHead(true);
- SegmentNodeState state = head.get();
- if (state.hasProperty("token")
- && state.getLong("timeout") >= currentTimeMillis()) {
- // someone else has a pessimistic lock on the journal,
- // so we should not try to commit anything yet
- } else {
- // use optimistic locking to update the journal
- if (setHead(state, commit.apply(state), commit.info())) {
- return -1;
- }
- }
-
- // someone else was faster, so wait a while and retry later
- Thread.sleep(backoff, random.nextInt(1000000));
-
- long stop = System.nanoTime();
- if (stop - start > timeout) {
- timeout = stop - start;
+ SegmentNodeState before = head.get();
+ SegmentNodeState after = commit.apply(before);
+ if (revisions.setHead(before.getRecordId(), after.getRecordId())) {
+ head.set(after);
+ contentChanged(after.getChildNode(ROOT), commit.info());
+ refreshHead(true);
}
}
- return MILLISECONDS.convert(timeout, NANOSECONDS);
+ return head.get().getChildNode(ROOT);
}
- private void pessimisticMerge(Commit commit, long timeout)
- throws CommitFailedException, InterruptedException {
- while (true) {
- long now = currentTimeMillis();
- SegmentNodeState state = head.get();
- if (state.hasProperty("token")
- && state.getLong("timeout") >= now) {
- // locked by someone else, wait until unlocked or expired
- Thread.sleep(
- Math.min(state.getLong("timeout") - now, 1000),
- random.nextInt(1000000));
- } else {
- // attempt to acquire the lock
- SegmentNodeBuilder builder = state.builder();
- builder.setProperty("token", UUID.randomUUID().toString());
- builder.setProperty("timeout", now + timeout);
-
- if (setHead(state, builder.getNodeState(), commit.info())) {
- // lock acquired; rebase, apply commit hooks, and unlock
- builder = commit.apply(state).builder();
- builder.removeProperty("token");
- builder.removeProperty("timeout");
-
- // complete the commit
- if (setHead(state, builder.getNodeState(), commit.info()))
{
- return;
- }
- }
- }
- }
- }
-
- private boolean setHead(SegmentNodeState before, SegmentNodeState after,
CommitInfo info) {
- refreshHead(true);
- if (revisions.setHead(before.getRecordId(), after.getRecordId())) {
- head.set(after);
- contentChanged(after.getChildNode(ROOT), info);
- refreshHead(true);
- return true;
- } else {
- return false;
- }
- }
-
@Override
public String checkpoint(long lifetime, @Nonnull Map<String, String>
properties) {
checkArgument(lifetime > 0);
@@ -391,8 +302,7 @@ public class LockBasedScheduler implemen
SegmentNodeState state = head.get();
SegmentNodeBuilder builder = state.builder();
- NodeBuilder cp = builder.child("checkpoints").child(
- name);
+ NodeBuilder cp = builder.child("checkpoints").child(name);
if (cp.exists()) {
cp.remove();
SegmentNodeState newState = builder.getNodeState();
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/MergeTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/MergeTest.java?rev=1792857&r1=1792856&r2=1792857&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/MergeTest.java
(original)
+++
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/MergeTest.java
Thu Apr 27 09:55:46 2017
@@ -20,23 +20,15 @@ package org.apache.jackrabbit.oak.segmen
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import java.io.IOException;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.annotation.Nonnull;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.segment.memory.MemoryStore;
-import org.apache.jackrabbit.oak.spi.commit.CommitHook;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
-import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
-import org.junit.Ignore;
import org.junit.Test;
public class MergeTest {
@@ -86,59 +78,4 @@ public class MergeTest {
assertTrue(store.getRoot().hasProperty("foo"));
assertTrue(store.getRoot().hasProperty("bar"));
}
-
- @Test
- @Ignore("OAK-4122")
- public void testPessimisticMerge() throws Exception {
- final SegmentNodeStore store = SegmentNodeStoreBuilders.builder(new
MemoryStore()).build();
- final Semaphore semaphore = new Semaphore(0);
- final AtomicBoolean running = new AtomicBoolean(true);
-
- Thread background = new Thread() {
- @Override
- public void run() {
- for (int i = 0; running.get(); i++) {
- try {
- NodeBuilder a = store.getRoot().builder();
- a.setProperty("foo", "abc" + i);
- store.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY);
- semaphore.release();
- } catch (CommitFailedException e) {
- fail();
- }
- }
- }
- };
- background.start();
-
- // wait for the first commit
- semaphore.acquire();
-
- assertTrue(store.getRoot().hasProperty("foo"));
- assertFalse(store.getRoot().hasProperty("bar"));
-
- NodeBuilder b = store.getRoot().builder();
- b.setProperty("bar", "xyz");
-
- // FIXME OAK-4122
- // store.setMaximumBackoff(100);
- store.merge(b, new CommitHook() {
- @Override @Nonnull
- public NodeState processCommit(
- NodeState before, NodeState after, CommitInfo info) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- fail();
- }
- return after;
- }
- }, CommitInfo.EMPTY);
-
- assertTrue(store.getRoot().hasProperty("foo"));
- assertTrue(store.getRoot().hasProperty("bar"));
-
- running.set(false);
- background.join();
- }
}