Author: jukka
Date: Wed Dec 4 01:02:36 2013
New Revision: 1547664
URL: http://svn.apache.org/r1547664
Log:
OAK-593: Segment-based MK
Further simplificatin of the SegmentMK commit/rebase logic
Removed:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentRootState.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java?rev=1547664&r1=1547663&r2=1547664&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
Wed Dec 4 01:02:36 2013
@@ -16,8 +16,10 @@
*/
package org.apache.jackrabbit.oak.plugins.segment;
+import static com.google.common.base.Objects.equal;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import static java.lang.System.currentTimeMillis;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -45,8 +47,6 @@ import org.apache.jackrabbit.oak.spi.sta
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
-import com.google.common.base.Objects;
-
public class SegmentNodeStore implements NodeStore, Observable {
static final String ROOT = "root";
@@ -111,26 +111,25 @@ public class SegmentNodeStore implements
commitSemaphore.release();
}
}
- return new SegmentRootState(head);
+ return head.getChildNode(ROOT);
}
@Override
public NodeState merge(
@Nonnull NodeBuilder builder, @Nonnull CommitHook commitHook,
@Nullable CommitInfo info) throws CommitFailedException {
+ checkArgument(builder instanceof SegmentNodeBuilder);
checkNotNull(commitHook);
- NodeState base = builder.getBaseState();
- checkArgument(store.isInstance(base, SegmentRootState.class));
- SegmentNodeState root = ((SegmentRootState) base).getRootState();
+ SegmentNodeBuilder snb = (SegmentNodeBuilder) builder;
+ checkArgument(store == snb.getBaseState().getStore());
try {
commitSemaphore.acquire();
try {
- Commit commit = new Commit(
- root, builder.getNodeState(), commitHook, info);
+ Commit commit = new Commit(snb, commitHook, info);
NodeState merged = commit.execute();
- ((SegmentNodeBuilder) builder).reset(merged);
+ snb.reset(merged);
return merged;
} finally {
commitSemaphore.release();
@@ -154,14 +153,6 @@ public class SegmentNodeStore implements
return builder.getNodeState();
}
- private boolean fastEquals(Object a, Object b) {
- return store.isInstance(a, Record.class)
- && store.isInstance(b, Record.class)
- && Objects.equal(
- ((Record) a).getRecordId(),
- ((Record) b).getRecordId());
- }
-
@Override @Nonnull
public NodeState reset(@Nonnull NodeBuilder builder) {
checkArgument(builder instanceof SegmentNodeBuilder);
@@ -223,30 +214,31 @@ public class SegmentNodeStore implements
private final Random random = new Random();
- private SegmentNodeState base;
+ private SegmentNodeState before;
- private SegmentNodeState head;
+ private SegmentNodeState after;
private final CommitHook hook;
private final CommitInfo info;
- Commit(@Nonnull SegmentNodeState base, @Nonnull NodeState head,
+ Commit(@Nonnull SegmentNodeBuilder builder,
@Nonnull CommitHook hook, @Nullable CommitInfo info) {
- this.base = checkNotNull(base);
- SegmentNodeBuilder builder = base.builder();
- builder.setChildNode(ROOT, checkNotNull(head));
- this.head = builder.getNodeState();
+ checkNotNull(builder);
+ this.before = builder.getBaseState();
+ this.after = builder.getNodeState();
this.hook = checkNotNull(hook);
this.info = info;
}
- private boolean setHead(
- SegmentNodeState base, SegmentNodeState head, CommitInfo info)
{
+ private boolean setHead(SegmentNodeBuilder builder) {
+ SegmentNodeState base = builder.getBaseState();
+ SegmentNodeState head = builder.getNodeState();
+
refreshHead();
if (journal.setHead(base.getRecordId(), head.getRecordId())) {
- this.head = head;
+ SegmentNodeStore.this.head = head;
changeDispatcher.contentChanged(head.getChildNode(ROOT), info);
refreshHead();
return true;
@@ -255,55 +247,46 @@ public class SegmentNodeStore implements
}
}
- private void rebase() {
- SegmentNodeState newBase = SegmentNodeStore.this.head;
- if (!base.getRecordId().equals(newBase.getRecordId())) {
- NodeBuilder builder = newBase.builder();
- head.getChildNode(ROOT).compareAgainstBaseState(
- base.getChildNode(ROOT),
- new ConflictAnnotatingRebaseDiff(builder.child(ROOT)));
- base = newBase;
- head = store.getWriter().writeNode(builder.getNodeState());
+ private SegmentNodeBuilder prepare() throws CommitFailedException {
+ SegmentNodeBuilder builder = head.builder();
+ if (fastEquals(before, head.getChildNode(ROOT))) {
+ // use a shortcut when there are no external changes
+ builder.setChildNode(ROOT, hook.processCommit(before, after));
+ } else {
+ // there were some external changes, so do the full rebase
+ ConflictAnnotatingRebaseDiff diff =
+ new ConflictAnnotatingRebaseDiff(builder.child(ROOT));
+ after.compareAgainstBaseState(before, diff);
+ // apply commit hooks on the rebased changes
+ builder.setChildNode(ROOT, hook.processCommit(
+ builder.getBaseState().getChildNode(ROOT),
+ builder.getNodeState().getChildNode(ROOT)));
}
+ return builder;
}
- private long optimisticMerge(CommitHook hook, CommitInfo info)
+ private long optimisticMerge()
throws CommitFailedException, InterruptedException {
long timeout = 1;
- SegmentNodeState originalBase = base;
- SegmentNodeState originalHead = head;
-
// use exponential backoff in case of concurrent commits
for (long backoff = 1; backoff < maximumBackoff; backoff *= 2) {
- rebase(); // rebase to latest head, a no-op if already there
-
long start = System.nanoTime();
- if (base.hasProperty("token")
- && base.getLong("timeout") >=
System.currentTimeMillis()) {
+ refreshHead();
+ if (head.hasProperty("token")
+ && head.getLong("timeout") >= currentTimeMillis()) {
// someone else has a pessimistic lock on the journal,
- // so we should not try to commit anything
+ // so we should not try to commit anything yet
} else {
- // apply commit hooks on the rebased changes
- NodeBuilder builder = head.builder();
- builder.setChildNode(ROOT, hook.processCommit(
- base.getChildNode(ROOT), head.getChildNode(ROOT)));
- SegmentNodeState newHead =
-
store.getWriter().writeNode(builder.getNodeState());
-
+ SegmentNodeBuilder builder = prepare();
// use optimistic locking to update the journal
- if (setHead(base, newHead, info)) {
- base = newHead;
- head = newHead;
+ if (setHead(builder)) {
return -1;
}
}
- // someone else was faster, so restore state and retry later
- base = originalBase;
- head = originalHead;
-
+ // someone else was faster, so wait a while and retry later
Thread.sleep(backoff, random.nextInt(1000000));
long stop = System.nanoTime();
@@ -315,49 +298,31 @@ public class SegmentNodeStore implements
return MILLISECONDS.convert(timeout, NANOSECONDS);
}
- private void pessimisticMerge(
- CommitHook hook, long timeout, CommitInfo info)
+ private void pessimisticMerge(long timeout)
throws CommitFailedException, InterruptedException {
while (true) {
- SegmentNodeState before = head;
- long now = System.currentTimeMillis();
- if (before.hasProperty("token")
- && before.getLong("timeout") >= now) {
+ long now = currentTimeMillis();
+ if (head.hasProperty("token")
+ && head.getLong("timeout") >= now) {
// locked by someone else, wait until unlocked or expired
Thread.sleep(
- Math.min(before.getLong("timeout") - now, 1000),
+ Math.min(head.getLong("timeout") - now, 1000),
random.nextInt(1000000));
} else {
// attempt to acquire the lock
- NodeBuilder builder = before.builder();
+ SegmentNodeBuilder builder = head.builder();
builder.setProperty("token", UUID.randomUUID().toString());
builder.setProperty("timeout", now + timeout);
- SegmentNodeState after =
-
store.getWriter().writeNode(builder.getNodeState());
- if (setHead(before, after, info)) {
- SegmentNodeState originalBase = base;
- SegmentNodeState originalHead = head;
-
- // lock acquired; rebase, apply commit hooks, and
unlock
- rebase();
- builder.setChildNode(ROOT, hook.processCommit(
- base.getChildNode(ROOT),
head.getChildNode(ROOT)));
+ if (setHead(builder)) {
+ // lock acquired; rebase, apply commit hooks, and
unlock
+ builder = prepare();
builder.removeProperty("token");
builder.removeProperty("timeout");
// complete the commit
- SegmentNodeState newHead =
-
store.getWriter().writeNode(builder.getNodeState());
- if (setHead(after, newHead, info)) {
- base = newHead;
- head = newHead;
+ if (setHead(builder)) {
return;
- } else {
- // something else happened, perhaps a timeout, so
- // undo the previous rebase and try again
- base = originalBase;
- head = originalHead;
}
}
}
@@ -365,17 +330,30 @@ public class SegmentNodeStore implements
}
@Nonnull
- SegmentRootState execute()
+ NodeState execute()
throws CommitFailedException, InterruptedException {
- if (base != head) {
- long timeout = optimisticMerge(hook, info);
+ // only do the merge if there are some changes to commit
+ if (!fastEquals(before, after)) {
+ long timeout = optimisticMerge();
if (timeout >= 0) {
- pessimisticMerge(hook, timeout, info);
+ pessimisticMerge(timeout);
}
}
- return new SegmentRootState(head);
+ return head.getChildNode(ROOT);
}
}
+ private static boolean fastEquals(Object a, Object b) {
+ return a instanceof Record && fastEquals((Record) a, b);
+ }
+
+ private static boolean fastEquals(Record a, Object b) {
+ return b instanceof Record && fastEquals(a, (Record) b);
+ }
+
+ private static boolean fastEquals(Record a, Record b) {
+ return equal(a.getRecordId(), b.getRecordId());
+ }
+
}