Author: jukka
Date: Fri Mar 7 18:56:50 2014
New Revision: 1575372
URL: http://svn.apache.org/r1575372
Log:
OAK-593: Segment-based MK
Inline AbstractStore into FileStore to make the caching code easier to push
down to RandomAccess
Removed:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/AbstractStore.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java?rev=1575372&r1=1575371&r2=1575372&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
Fri Mar 7 18:56:50 2014
@@ -20,6 +20,7 @@ import static com.google.common.base.Pre
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Lists.newArrayList;
import static com.google.common.collect.Lists.newCopyOnWriteArrayList;
+import static com.google.common.collect.Sets.newHashSet;
import static java.util.concurrent.TimeUnit.SECONDS;
import static
org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
import static
org.apache.jackrabbit.oak.plugins.segment.SegmentIdFactory.isBulkSegmentId;
@@ -29,32 +30,55 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
-import javax.annotation.Nonnull;
-
import org.apache.jackrabbit.oak.api.Blob;
-import org.apache.jackrabbit.oak.plugins.segment.AbstractStore;
+import org.apache.jackrabbit.oak.cache.CacheLIRS;
import org.apache.jackrabbit.oak.plugins.segment.RecordId;
import org.apache.jackrabbit.oak.plugins.segment.Segment;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentIdFactory;
import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentWriter;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class FileStore extends AbstractStore {
+import com.google.common.cache.Cache;
+
+public class FileStore implements SegmentStore {
private static final Logger log = LoggerFactory.getLogger(FileStore.class);
+ private static final int MB = 1024 * 1024;
+
private static final int DEFAULT_MEMORY_CACHE_SIZE = 256;
private static final String FILE_NAME_FORMAT = "%s%05d.tar";
private static final String JOURNAL_FILE_NAME = "journal.log";
+ private final SegmentIdFactory factory = new SegmentIdFactory();
+
+ private final SegmentWriter writer = new SegmentWriter(this, factory);
+
+ protected final Cache<UUID, Segment> segments;
+
+ /**
+ * Identifiers of the segments that are currently being loaded.
+ */
+ private final Set<UUID> currentlyLoading = newHashSet();
+
+ /**
+ * Number of threads that are currently waiting for segments to be loaded.
+ * Used to avoid extra {@link #notifyAll()} calls when nobody is waiting.
+ */
+ private int currentlyWaiting = 0;
+
private final File directory;
private final int maxFileSize;
@@ -103,12 +127,21 @@ public class FileStore extends AbstractS
public FileStore(
final File directory, NodeState initial, int maxFileSizeMB,
int cacheSizeMB, boolean memoryMapping) throws IOException {
- super(cacheSizeMB);
checkNotNull(directory).mkdirs();
this.directory = directory;
this.maxFileSize = maxFileSizeMB * MB;
this.memoryMapping = memoryMapping;
+ if (memoryMapping) {
+ // let the OS take care of caching
+ this.segments = null;
+ } else {
+ this.segments = CacheLIRS.newBuilder()
+ .weigher(Segment.WEIGHER)
+ .maximumWeight(cacheSizeMB * MB)
+ .build();
+ }
+
for (int i = 0; true; i++) {
String name = String.format(FILE_NAME_FORMAT, "bulk", i);
File file = new File(directory, name);
@@ -218,6 +251,23 @@ public class FileStore extends AbstractS
}
@Override
+ public SegmentWriter getWriter() {
+ return writer;
+ }
+
+ @Override
+ public SegmentNodeState getHead() {
+ return new SegmentNodeState(getWriter().getDummySegment(), head.get());
+ }
+
+ @Override
+ public boolean setHead(SegmentNodeState base, SegmentNodeState head) {
+ RecordId id = this.head.get();
+ return id.equals(base.getRecordId())
+ && this.head.compareAndSet(id, head.getRecordId());
+ }
+
+ @Override
public void close() {
try {
// avoid deadlocks while joining the flush thread
@@ -230,7 +280,18 @@ public class FileStore extends AbstractS
}
synchronized (this) {
- super.close();
+ if (segments != null) {
+ synchronized (segments) {
+ while (!currentlyLoading.isEmpty()) {
+ try {
+ segments.wait(); // for concurrent loads to
finish
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted", e);
+ }
+ }
+ segments.invalidateAll();
+ }
+ }
flush();
@@ -254,29 +315,67 @@ public class FileStore extends AbstractS
}
@Override
- public SegmentNodeState getHead() {
- return new SegmentNodeState(getWriter().getDummySegment(), head.get());
- }
+ public Segment readSegment(UUID id) {
+ if (isBulkSegmentId(id)) {
+ return loadSegment(id, bulkFiles);
+ }
- @Override
- public boolean setHead(SegmentNodeState base, SegmentNodeState head) {
- RecordId id = this.head.get();
- return id.equals(base.getRecordId())
- && this.head.compareAndSet(id, head.getRecordId());
- }
+ Segment segment = getWriter().getCurrentSegment(id);
+ if (segment != null) {
+ return segment;
+ }
- @Override @Nonnull
- protected Segment loadSegment(UUID id) {
- List<TarFile> files = dataFiles;
- if (isBulkSegmentId(id)) {
- files = bulkFiles;
+ if (segments != null) {
+ // no in-memory cache, load the segment directly
+ return loadSegment(id, dataFiles);
}
+ synchronized (segments) {
+ // check if the segment is already cached
+ segment = segments.getIfPresent(id);
+ // ... or currently being loaded
+ while (segment == null && currentlyLoading.contains(id)) {
+ currentlyWaiting++;
+ try {
+ segments.wait(); // for another thread to load the segment
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted", e);
+ } finally {
+ currentlyWaiting--;
+ }
+ segment = segments.getIfPresent(id);
+ }
+ if (segment != null) {
+ // found the segment in the cache
+ return segment;
+ }
+ // not yet cached, so start let others know that we're loading it
+ currentlyLoading.add(id);
+ }
+
+ try {
+ segment = loadSegment(id, dataFiles);
+ } finally {
+ synchronized (segments) {
+ if (segment != null) {
+ segments.put(id, segment);
+ }
+ currentlyLoading.remove(id);
+ if (currentlyWaiting > 0) {
+ segments.notifyAll();
+ }
+ }
+ }
+
+ return segment;
+ }
+
+ protected Segment loadSegment(UUID id, List<TarFile> files) {
for (TarFile file : files) {
try {
ByteBuffer buffer = file.readEntry(id);
if (buffer != null) {
- return createSegment(id, buffer);
+ return new Segment(this, factory, id, buffer);
}
} catch (IOException e) {
throw new RuntimeException(
@@ -317,4 +416,4 @@ public class FileStore extends AbstractS
return new FileBlob(reference); // FIXME: proper reference lookup
}
-}
+}
\ No newline at end of file