Author: jukka
Date: Fri Mar 7 18:17:15 2014
New Revision: 1575352
URL: http://svn.apache.org/r1575352
Log:
OAK-593: Segment-based MK
Implement HttpStore without AbstractStore to make the caching code easier to
customize
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/http/HttpStore.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java?rev=1575352&r1=1575351&r2=1575352&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
Fri Mar 7 18:17:15 2014
@@ -97,7 +97,7 @@ public class Segment {
*/
static final int MEDIUM_LIMIT = (1 << (16 - 2)) + SMALL_LIMIT;
- static final Weigher<UUID, Segment> WEIGHER =
+ public static final Weigher<UUID, Segment> WEIGHER =
new Weigher<UUID, Segment>() {
@Override
public int weigh(UUID key, Segment value) {
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/http/HttpStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/http/HttpStore.java?rev=1575352&r1=1575351&r2=1575352&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/http/HttpStore.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/http/HttpStore.java
Fri Mar 7 18:17:15 2014
@@ -17,6 +17,8 @@
package org.apache.jackrabbit.oak.plugins.segment.http;
import static com.google.common.base.Charsets.UTF_8;
+import static com.google.common.collect.Sets.newHashSet;
+import static
org.apache.jackrabbit.oak.plugins.segment.SegmentIdFactory.isBulkSegmentId;
import java.io.BufferedReader;
import java.io.IOException;
@@ -27,24 +29,46 @@ import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import java.nio.ByteBuffer;
+import java.util.Set;
import java.util.UUID;
import javax.annotation.CheckForNull;
-import org.apache.jackrabbit.oak.plugins.segment.AbstractStore;
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.cache.CacheLIRS;
import org.apache.jackrabbit.oak.plugins.segment.RecordId;
import org.apache.jackrabbit.oak.plugins.segment.Segment;
import org.apache.jackrabbit.oak.plugins.segment.SegmentIdFactory;
import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentWriter;
+import com.google.common.cache.Cache;
import com.google.common.io.ByteStreams;
-public class HttpStore extends AbstractStore {
+public class HttpStore implements SegmentStore {
+
+ protected static final int MB = 1024 * 1024;
private final SegmentIdFactory factory = new SegmentIdFactory();
+ private final SegmentWriter writer = new SegmentWriter(this, factory);
+
private final URL base;
+ private final Cache<UUID, Segment> segments;
+
+ /**
+ * Identifiers of the segments that are currently being loaded.
+ */
+ private final Set<UUID> currentlyLoading = newHashSet();
+
+ /**
+ * Number of threads that are currently waiting for segments to be loaded.
+ * Used to avoid extra {@link #notifyAll()} calls when nobody is waiting.
+ */
+ private int currentlyWaiting = 0;
+
/**
* @param base
* make sure the url ends with a slash "/", otherwise the
@@ -52,13 +76,16 @@ public class HttpStore extends AbstractS
* @param cacheSizeMB
*/
public HttpStore(URL base, int cacheSizeMB) {
- super(cacheSizeMB);
this.base = base;
+ this.segments = CacheLIRS.newBuilder()
+ .weigher(Segment.WEIGHER)
+ .maximumWeight(cacheSizeMB * MB)
+ .build();
}
- protected URLConnection get(String fragment) throws MalformedURLException,
- IOException {
- return new URL(base, fragment).openConnection();
+ @Override
+ public SegmentWriter getWriter() {
+ return writer;
}
@Override
@@ -91,14 +118,64 @@ public class HttpStore extends AbstractS
}
@Override
- @CheckForNull
- protected Segment loadSegment(UUID id) {
+ public Segment readSegment(UUID id) {
+ if (isBulkSegmentId(id)) {
+ return loadSegment(id);
+ }
+
+ Segment segment = getWriter().getCurrentSegment(id);
+ if (segment != null) {
+ return segment;
+ }
+
+ synchronized (segments) {
+ // check if the segment is already cached
+ segment = segments.getIfPresent(id);
+ // ... or currently being loaded
+ while (segment == null && currentlyLoading.contains(id)) {
+ currentlyWaiting++;
+ try {
+ segments.wait(); // for another thread to load the segment
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted", e);
+ } finally {
+ currentlyWaiting--;
+ }
+ segment = segments.getIfPresent(id);
+ }
+ if (segment != null) {
+ // found the segment in the cache
+ return segment;
+ }
+ // not yet cached, so start let others know that we're loading it
+ currentlyLoading.add(id);
+ }
+
+ try {
+ segment = loadSegment(id);
+ } finally {
+ synchronized (segments) {
+ if (segment != null) {
+ segments.put(id, segment);
+ }
+ currentlyLoading.remove(id);
+ if (currentlyWaiting > 0) {
+ segments.notifyAll();
+ }
+ }
+ }
+
+ return segment;
+ }
+
+ private Segment loadSegment(UUID uuid) {
try {
- final URLConnection connection = get(id.toString());
+ URLConnection connection =
+ new URL(base, uuid.toString()).openConnection();
InputStream stream = connection.getInputStream();
try {
byte[] data = ByteStreams.toByteArray(stream);
- return new Segment(this, factory, id, ByteBuffer.wrap(data));
+ return new Segment(this, factory, uuid, ByteBuffer.wrap(data));
} finally {
stream.close();
}
@@ -113,7 +190,8 @@ public class HttpStore extends AbstractS
public void writeSegment(UUID segmentId, byte[] bytes, int offset,
int length) {
try {
- URLConnection connection = get(segmentId.toString());
+ URLConnection connection =
+ new URL(base, segmentId.toString()).openConnection();
connection.setDoInput(false);
connection.setDoOutput(true);
OutputStream stream = connection.getOutputStream();
@@ -129,4 +207,23 @@ public class HttpStore extends AbstractS
}
}
+ @Override
+ public void close() {
+ synchronized (segments) {
+ while (!currentlyLoading.isEmpty()) {
+ try {
+ segments.wait(); // for concurrent loads to finish
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted", e);
+ }
+ }
+ segments.invalidateAll();
+ }
+ }
+
+ @Override @CheckForNull
+ public Blob readBlob(String reference) {
+ return null;
+ }
+
}