Author: jukka
Date: Wed Oct 2 19:06:45 2013
New Revision: 1528593
URL: http://svn.apache.org/r1528593
Log:
OAK-1042: Segment node store caching
Replace SegmentCache with CacheLIRS
Removed:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCache.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/AbstractStore.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/AbstractStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/AbstractStore.java?rev=1528593&r1=1528592&r2=1528593&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/AbstractStore.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/AbstractStore.java
Wed Oct 2 19:06:45 2013
@@ -16,16 +16,25 @@
*/
package org.apache.jackrabbit.oak.plugins.segment;
+import static com.google.common.collect.Sets.newHashSet;
+
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import org.apache.jackrabbit.oak.cache.CacheLIRS;
import com.google.common.cache.Cache;
+import com.google.common.cache.Weigher;
public abstract class AbstractStore implements SegmentStore {
- private final SegmentCache cache;
+ private final Cache<UUID, Segment> segments;
+
+ /**
+ * Identifiers of the segments that are currently being loaded.
+ */
+ private final Set<UUID> currentlyLoading = newHashSet();
private final Cache<RecordId, Object> records =
CacheLIRS.newBuilder().maximumSize(1000).build();
@@ -33,7 +42,15 @@ public abstract class AbstractStore impl
private final SegmentWriter writer = new SegmentWriter(this);
protected AbstractStore(int cacheSize) {
- this.cache = SegmentCache.create(cacheSize);
+ this.segments = CacheLIRS.newBuilder()
+ .weigher(new Weigher<UUID, Segment>() {
+ @Override
+ public int weigh(UUID key, Segment value) {
+ return value.size();
+ }
+ })
+ .maximumWeight(cacheSize)
+ .build();
}
protected abstract Segment loadSegment(UUID id) throws Exception;
@@ -44,27 +61,63 @@ public abstract class AbstractStore impl
}
@Override
- public Segment readSegment(final UUID id) {
+ public Segment readSegment(UUID id) {
+ Segment segment = segments.getIfPresent(id);
+ if (segment != null) {
+ return segment;
+ }
+
+ segment = getWriter().getCurrentSegment(id);
+ if (segment != null) {
+ return segment;
+ }
+
+ synchronized (this) {
+ segment = segments.getIfPresent(id);
+ while (segment == null && currentlyLoading.contains(id)) {
+ try {
+ wait(); // for another thread to load the segment
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted", e);
+ }
+ segment = segments.getIfPresent(id);
+ }
+ if (segment != null) {
+ return segment;
+ }
+ currentlyLoading.add(id);
+ }
+
try {
- Segment segment = getWriter().getCurrentSegment(id);
+ segment = loadSegment(id);
if (segment == null) {
- segment = cache.getSegment(id, new Callable<Segment>() {
- @Override
- public Segment call() throws Exception {
- return loadSegment(id);
- }
- });
+ throw new IllegalStateException("Unable to find segment " +
id);
}
- return segment;
- } catch (IllegalStateException e) {
- throw e;
} catch (Exception e) {
- throw new RuntimeException(e);
+ throw new IllegalStateException("Failed to load segment " + id, e);
+ } finally {
+ synchronized (this) {
+ if (segment != null) {
+ segments.put(id, segment);
+ }
+ currentlyLoading.remove(id);
+ notifyAll();
+ }
}
+
+ return segment;
}
@Override
- public void deleteSegment(UUID segmentId) {
+ public synchronized void deleteSegment(UUID segmentId) {
+ while (currentlyLoading.contains(segmentId)) {
+ try {
+ wait(); // for another thread to finish loading the segment
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted", e);
+ }
+ }
+ segments.invalidate(segmentId);
}
@Override
@@ -84,8 +137,16 @@ public abstract class AbstractStore impl
}
@Override
- public void close() {
+ public synchronized void close() {
+ while (!currentlyLoading.isEmpty()) {
+ try {
+ wait(); // for other threads to finish loading segments
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted", e);
+ }
+ }
records.invalidateAll();
+ segments.invalidateAll();
}
}