Author: jukka
Date: Fri Dec 6 01:22:26 2013
New Revision: 1548354
URL: http://svn.apache.org/r1548354
Log:
OAK-593: Segment-based MK
Use AtomicReferences to track the latest head state
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java?rev=1548354&r1=1548353&r2=1548354&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
Fri Dec 6 01:22:26 2013
@@ -30,6 +30,7 @@ import java.io.InputStream;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
@@ -61,7 +62,7 @@ public class SegmentNodeStore implements
/**
* Local copy of the head of the journal associated with this store.
*/
- private volatile SegmentNodeState head;
+ private final AtomicReference<SegmentNodeState> head;
/**
* Semaphore that controls access to the {@link #head} variable.
@@ -75,8 +76,8 @@ public class SegmentNodeStore implements
public SegmentNodeStore(SegmentStore store, String journal) {
this.store = store;
this.journal = store.getJournal(journal);
- this.head = new SegmentNodeState(
- store.getWriter().getDummySegment(), this.journal.getHead());
+ this.head = new AtomicReference<SegmentNodeState>(new SegmentNodeState(
+ store.getWriter().getDummySegment(), this.journal.getHead()));
this.changeDispatcher = new ChangeDispatcher(getRoot());
}
@@ -93,15 +94,16 @@ public class SegmentNodeStore implements
}
/**
- * Refreshes the head state. Does nothing if a concurrent local commit is
- * in progress, as that commit will automatically refresh the head state.
+ * Refreshes the head state. Should only be called while holding a
+ * permit from the {@link #commitSemaphore}.
*/
private void refreshHead() {
RecordId id = journal.getHead();
- if (!id.equals(head.getRecordId())) {
- head = new SegmentNodeState(
+ if (!id.equals(head.get().getRecordId())) {
+ SegmentNodeState state = new SegmentNodeState(
store.getWriter().getDummySegment(), id);
- changeDispatcher.contentChanged(head.getChildNode(ROOT), null);
+ head.set(state);
+ changeDispatcher.contentChanged(state.getChildNode(ROOT), null);
}
}
@@ -119,7 +121,7 @@ public class SegmentNodeStore implements
commitSemaphore.release();
}
}
- return head.getChildNode(ROOT);
+ return head.get().getChildNode(ROOT);
}
@Override
@@ -196,14 +198,14 @@ public class SegmentNodeStore implements
try {
refreshHead();
- SegmentNodeState ns = head;
- RecordId ri = head.getRecordId();
+ SegmentNodeState state = head.get();
+ RecordId ri = state.getRecordId();
- SegmentNodeBuilder builder = ns.builder();
+ SegmentNodeBuilder builder = state.builder();
NodeBuilder cp = builder.child(name);
cp.setProperty("timestamp", System.currentTimeMillis()
+ lifetime);
- cp.setChildNode(ROOT, ns.getChildNode(ROOT));
+ cp.setChildNode(ROOT, state.getChildNode(ROOT));
if (journal.setHead(ri, builder.getNodeState()
.getRecordId())) {
@@ -222,7 +224,7 @@ public class SegmentNodeStore implements
@Override @CheckForNull
public NodeState retrieve(@Nonnull String checkpoint) {
- NodeState cp = head.getChildNode(checkpoint).getChildNode(ROOT);
+ NodeState cp = head.get().getChildNode(checkpoint).getChildNode(ROOT);
if (cp.exists()) {
return cp;
}
@@ -252,13 +254,13 @@ public class SegmentNodeStore implements
}
private boolean setHead(SegmentNodeBuilder builder) {
- SegmentNodeState base = builder.getBaseState();
- SegmentNodeState head = builder.getNodeState();
+ SegmentNodeState before = builder.getBaseState();
+ SegmentNodeState after = builder.getNodeState();
refreshHead();
- if (journal.setHead(base.getRecordId(), head.getRecordId())) {
- SegmentNodeStore.this.head = head;
- changeDispatcher.contentChanged(head.getChildNode(ROOT), info);
+ if (journal.setHead(before.getRecordId(), after.getRecordId())) {
+ head.set(after);
+ changeDispatcher.contentChanged(after.getChildNode(ROOT),
info);
refreshHead();
return true;
} else {
@@ -267,8 +269,9 @@ public class SegmentNodeStore implements
}
private SegmentNodeBuilder prepare() throws CommitFailedException {
- SegmentNodeBuilder builder = head.builder();
- if (fastEquals(before, head.getChildNode(ROOT))) {
+ SegmentNodeState state = head.get();
+ SegmentNodeBuilder builder = state.builder();
+ if (fastEquals(before, state.getChildNode(ROOT))) {
// use a shortcut when there are no external changes
builder.setChildNode(ROOT, hook.processCommit(before, after));
} else {
@@ -293,8 +296,9 @@ public class SegmentNodeStore implements
long start = System.nanoTime();
refreshHead();
- if (head.hasProperty("token")
- && head.getLong("timeout") >= currentTimeMillis()) {
+ SegmentNodeState state = head.get();
+ if (state.hasProperty("token")
+ && state.getLong("timeout") >= currentTimeMillis()) {
// someone else has a pessimistic lock on the journal,
// so we should not try to commit anything yet
} else {
@@ -321,15 +325,16 @@ public class SegmentNodeStore implements
throws CommitFailedException, InterruptedException {
while (true) {
long now = currentTimeMillis();
- if (head.hasProperty("token")
- && head.getLong("timeout") >= now) {
+ SegmentNodeState state = head.get();
+ if (state.hasProperty("token")
+ && state.getLong("timeout") >= now) {
// locked by someone else, wait until unlocked or expired
Thread.sleep(
- Math.min(head.getLong("timeout") - now, 1000),
+ Math.min(state.getLong("timeout") - now, 1000),
random.nextInt(1000000));
} else {
// attempt to acquire the lock
- SegmentNodeBuilder builder = head.builder();
+ SegmentNodeBuilder builder = state.builder();
builder.setProperty("token", UUID.randomUUID().toString());
builder.setProperty("timeout", now + timeout);
@@ -358,7 +363,7 @@ public class SegmentNodeStore implements
pessimisticMerge(timeout);
}
}
- return head.getChildNode(ROOT);
+ return head.get().getChildNode(ROOT);
}
}
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java?rev=1548354&r1=1548353&r2=1548354&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
Fri Dec 6 01:22:26 2013
@@ -33,6 +33,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
@@ -40,7 +41,6 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.segment.Journal;
import org.apache.jackrabbit.oak.plugins.segment.RecordId;
import org.apache.jackrabbit.oak.plugins.segment.Segment;
-import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.slf4j.Logger;
@@ -68,9 +68,16 @@ public class FileStore extends AbstractS
private final RandomAccessFile journalFile;
- private volatile RecordId head;
+ /**
+ * The latest head of the root journal.
+ */
+ private final AtomicReference<RecordId> head;
- private volatile boolean updated = false;
+ /**
+ * The persisted head of the root journal, used to determine whether the
+ * latest {@link #head} value should be written to the disk.
+ */
+ private RecordId persistedHead = null;
/**
* The background flush thread. Automatically flushes the TarMK state
@@ -94,7 +101,8 @@ public class FileStore extends AbstractS
this(directory, EMPTY_NODE, maxFileSizeMB, cacheSizeMB, memoryMapping);
}
- public FileStore(File directory, NodeState initial, int maxFileSizeMB,
+ public FileStore(
+ final File directory, NodeState initial, int maxFileSizeMB,
int cacheSizeMB, boolean memoryMapping) throws IOException {
super(cacheSizeMB);
checkNotNull(directory).mkdirs();
@@ -122,25 +130,24 @@ public class FileStore extends AbstractS
}
}
- head = null;
journalFile = new RandomAccessFile(
new File(directory, JOURNAL_FILE_NAME), "rw");
String line = journalFile.readLine();
while (line != null) {
int space = line.indexOf(' ');
if (space != -1) {
- head = RecordId.fromString(line.substring(0, space));
+ persistedHead = RecordId.fromString(line.substring(0, space));
}
line = journalFile.readLine();
}
- if (head == null) {
+ if (persistedHead != null) {
+ head = new AtomicReference<RecordId>(persistedHead);
+ } else {
NodeBuilder builder = EMPTY_NODE.builder();
builder.setChildNode("root", initial);
- SegmentNodeState root =
- getWriter().writeNode(builder.getNodeState());
- head = root.getRecordId();
- updated = true;
+ head = new AtomicReference<RecordId>(
+
getWriter().writeNode(builder.getNodeState()).getRecordId());
}
this.flushThread = new Thread(new Runnable() {
@@ -153,7 +160,7 @@ public class FileStore extends AbstractS
flush();
} catch (IOException e) {
log.warn("Failed to flush the TarMK at" +
- FileStore.this.directory, e);
+ directory, e);
}
timeToClose.await(5, SECONDS);
}
@@ -169,7 +176,8 @@ public class FileStore extends AbstractS
}
public synchronized void flush() throws IOException {
- if (updated) {
+ RecordId id = head.get();
+ if (!id.equals(persistedHead)) {
getWriter().flush();
for (TarFile file : bulkFiles) {
file.flush();
@@ -177,8 +185,9 @@ public class FileStore extends AbstractS
for (TarFile file : dataFiles) {
file.flush();
}
- journalFile.writeBytes(head + " root\n");
+ journalFile.writeBytes(id + " root\n");
journalFile.getChannel().force(false);
+ persistedHead = id;
}
}
@@ -235,19 +244,12 @@ public class FileStore extends AbstractS
return new Journal() {
@Override
public RecordId getHead() {
- return head;
+ return head.get();
}
@Override
- public boolean setHead(RecordId base, RecordId head) {
- synchronized (FileStore.this) {
- if (base.equals(FileStore.this.head)) {
- updated = !head.equals(FileStore.this.head);
- FileStore.this.head = head;
- return true;
- } else {
- return false;
- }
- }
+ public boolean setHead(RecordId before, RecordId after) {
+ RecordId id = head.get();
+ return id.equals(before) && head.compareAndSet(id, after);
}
@Override
public void merge() {