Author: jukka
Date: Tue Sep 24 17:10:05 2013
New Revision: 1525946
URL: http://svn.apache.org/r1525946
Log:
OAK-593: Segment-based MK
Use the same segment cache across file and MongoDB backends
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCache.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/mongo/MongoStore.java
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/BenchmarkRunner.java
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/OakRepositoryFixture.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCache.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCache.java?rev=1525946&r1=1525945&r2=1525946&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCache.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCache.java
Tue Sep 24 17:10:05 2013
@@ -16,77 +16,101 @@
*/
package org.apache.jackrabbit.oak.plugins.segment;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import org.apache.jackrabbit.oak.cache.CacheStats;
/**
- * Combined memory and disk cache for segments.
+ * Memory cache for segments.
*/
public class SegmentCache {
- private static final long DEFAULT_MEMORY_CACHE_SIZE = 1 << 28; // 256MB
+ private static final int DEFAULT_MEMORY_CACHE_SIZE = 1 << 28; // 256MB
- private final Cache<UUID, Segment> memoryCache;
+ private final int maximumSize;
- private final CacheStats cacheStats;
+ private int currentSize = 0;
- // private final Cache<UUID, File> diskCache;
+ private final Map<UUID, Segment> segments =
+ new LinkedHashMap<UUID, Segment>(1000, 0.75f, true) {
+ @Override
+ protected boolean removeEldestEntry(Entry<UUID, Segment> eldest) {
+ if (currentSize > maximumSize) {
+ currentSize -= eldest.getValue().size();
+ return true;
+ } else {
+ return false;
+ }
+ }
+ };
- // private final File diskCacheDirectory;
-
- public SegmentCache(long memoryCacheSize) {
-// this.diskCacheDirectory = diskCacheDirectory;
-// this.diskCache = CacheBuilder.newBuilder()
-// .maximumWeight(diskCacheSize)
-// .weigher(new Weigher<UUID, File>() {
-// @Override
-// public int weigh(UUID key, File value) {
-// return (int) value.length(); // <= max segment size
-// }
-// }).build();
- this.memoryCache = CacheBuilder.newBuilder()
- .maximumWeight(memoryCacheSize)
- .recordStats()
- .weigher(Segment.WEIGHER)
-// .removalListener(new RemovalListener<UUID, Segment>() {
-// @Override
-// public void onRemoval(
-// RemovalNotification<UUID, Segment> notification)
{
-// notification.getValue();
-// }
-// })
- .build();
+ private final Set<UUID> currentlyLoading = new HashSet<UUID>();
- cacheStats = new CacheStats(memoryCache, "Segment", Segment.WEIGHER,
memoryCacheSize);
+ public SegmentCache(int maximumSize) {
+ this.maximumSize = maximumSize;
}
public SegmentCache() {
this(DEFAULT_MEMORY_CACHE_SIZE);
}
- public Segment getSegment(UUID segmentId, Callable<Segment> loader) {
- try {
- return memoryCache.get(segmentId, loader);
- } catch (ExecutionException e) {
- throw new IllegalStateException(
- "Failed to load segment " + segmentId, e);
+ public Segment getSegment(UUID segmentId, Callable<Segment> loader)
+ throws Exception {
+ synchronized (this) {
+ Segment segment = segments.get(segmentId);
+
+ while (segment == null && currentlyLoading.contains(segmentId)) {
+ wait();
+ segment = segments.get(segmentId);
+ }
+
+ if (segment != null) {
+ return segment;
+ } else {
+ currentlyLoading.add(segmentId);
+ }
+ }
+
+ Segment segment = loader.call();
+ synchronized (this) {
+ segments.put(segmentId, segment);
+ currentSize += segment.size();
+ currentlyLoading.remove(segmentId);
+ notifyAll();
}
+ return segment;
}
- public void addSegment(Segment segment) {
- memoryCache.put(segment.getSegmentId(), segment);
+ public synchronized void addSegment(Segment segment) {
+ checkState(!segments.containsKey(segment.getSegmentId()));
+ checkState(!currentlyLoading.contains(segment.getSegmentId()));
+ segments.put(segment.getSegmentId(), segment);
+ currentSize += segment.size();
}
- public void removeSegment(UUID segmentId) {
- memoryCache.invalidate(segmentId);
+ public synchronized void removeSegment(UUID segmentId)
+ throws InterruptedException {
+ while (currentlyLoading.contains(segmentId)) {
+ wait();
+ }
+ Segment segment = segments.remove(segmentId);
+ if (segment != null) {
+ currentSize -= segment.size();
+ }
}
- public CacheStats getCacheStats() {
- return cacheStats;
+ public synchronized void clear() throws InterruptedException {
+ while (!currentlyLoading.isEmpty()) {
+ wait();
+ }
+ segments.clear();
+ currentSize = 0;
}
+
}
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java?rev=1525946&r1=1525945&r2=1525946&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
Tue Sep 24 17:10:05 2013
@@ -16,8 +16,6 @@
*/
package org.apache.jackrabbit.oak.plugins.segment;
-import static
org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean;
-
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@@ -35,7 +33,6 @@ import org.apache.felix.scr.annotations.
import org.apache.felix.scr.annotations.Service;
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.api.CommitFailedException;
-import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean;
import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
import org.apache.jackrabbit.oak.plugins.segment.mongo.MongoStore;
import org.apache.jackrabbit.oak.spi.commit.CommitHook;
@@ -43,8 +40,6 @@ import org.apache.jackrabbit.oak.spi.com
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
-import org.apache.jackrabbit.oak.spi.whiteboard.OsgiWhiteboard;
-import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
import org.osgi.service.component.ComponentContext;
@Component(policy = ConfigurationPolicy.REQUIRE)
@@ -75,7 +70,7 @@ public class SegmentNodeStoreService imp
@Property(description="Cache size (MB)", intValue=200)
public static final String CACHE = "cache";
- private static final long MB = 1024 * 1024;
+ private static final int MB = 1024 * 1024;
private String name;
@@ -85,8 +80,6 @@ public class SegmentNodeStoreService imp
private NodeStore delegate;
- private Registration cacheStatsReg;
-
private synchronized NodeStore getDelegate() {
assert delegate != null : "service must be activated when used";
return delegate;
@@ -128,9 +121,6 @@ public class SegmentNodeStoreService imp
mongo = new Mongo(host, port);
SegmentCache sc = new SegmentCache(cache * MB);
store = new MongoStore(mongo.getDB(db), sc);
-
- cacheStatsReg = registerMBean(new
OsgiWhiteboard(context.getBundleContext()), CacheStatsMBean.class,
- sc.getCacheStats(), CacheStatsMBean.TYPE,
sc.getCacheStats().getName());
}
delegate = new SegmentNodeStore(store);
@@ -150,10 +140,6 @@ public class SegmentNodeStoreService imp
public synchronized void deactivate() {
delegate = null;
- if(cacheStatsReg != null){
- cacheStatsReg.unregister();
- }
-
store.close();
if (mongo != null) {
mongo.close();
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java?rev=1525946&r1=1525945&r2=1525946&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
Tue Sep 24 17:10:05 2013
@@ -31,21 +31,18 @@ import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Map;
-import java.util.UUID;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
+import java.util.UUID;
import org.apache.jackrabbit.oak.plugins.segment.Journal;
import org.apache.jackrabbit.oak.plugins.segment.RecordId;
import org.apache.jackrabbit.oak.plugins.segment.Segment;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentCache;
import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
import org.apache.jackrabbit.oak.plugins.segment.Template;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-
public class FileStore implements SegmentStore {
private static final long SEGMENT_MAGIC = 0x4f616b0a527845ddL;
@@ -66,8 +63,7 @@ public class FileStore implements Segmen
private final Map<String, Journal> journals = newHashMap();
- private final Cache<UUID, Segment> segments =
- CacheBuilder.newBuilder().maximumSize(1000).build();
+ private final SegmentCache cache = new SegmentCache();
public FileStore(File directory, int maxFileSize, boolean memoryMapping)
throws IOException {
@@ -112,17 +108,16 @@ public class FileStore implements Segmen
}
public synchronized void close() {
- for (TarFile file : files) {
- try {
+ try {
+ for (TarFile file : files) {
file.close();
- } catch (IOException e) {
- throw new RuntimeException(e);
}
+ files.clear();
+ cache.clear();
+ System.gc(); // for any memory-mappings that are no longer used
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
- files.clear();
- segments.invalidateAll();
- segments.cleanUp();
- System.gc(); // for any memory-mappings that are no longer used
}
@Override
@@ -138,41 +133,47 @@ public class FileStore implements Segmen
@Override
public Segment readSegment(final UUID id) {
try {
- return segments.get(id, new Callable<Segment>() {
+ return cache.getSegment(id, new Callable<Segment>() {
@Override
public Segment call() throws Exception {
- for (TarFile file : files) {
- ByteBuffer buffer = file.readEntry(id);
- if (buffer != null) {
- checkState(SEGMENT_MAGIC == buffer.getLong());
- int length = buffer.getInt();
- int count = buffer.getInt();
-
- checkState(id.equals(new UUID(
- buffer.getLong(), buffer.getLong())));
-
- Collection<UUID> referencedIds =
- newArrayListWithCapacity(count);
- for (int i = 0; i < count; i++) {
- referencedIds.add(new UUID(
- buffer.getLong(), buffer.getLong()));
- }
-
- buffer.limit(buffer.position() + length);
- return new Segment(
- FileStore.this, id,
- buffer.slice(), referencedIds,
- Collections.<String, RecordId>emptyMap(),
- Collections.<Template,
RecordId>emptyMap());
- }
- }
- throw new IllegalStateException(
- "Segment " + id + " not found");
+ return loadSegment(id);
}
});
- } catch (ExecutionException e) {
- throw new RuntimeException("Failed to load segment " + id, e);
+ } catch (IllegalStateException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Segment loadSegment(UUID id) throws Exception {
+ for (TarFile file : files) {
+ ByteBuffer buffer = file.readEntry(id);
+ if (buffer != null) {
+ checkState(SEGMENT_MAGIC == buffer.getLong());
+ int length = buffer.getInt();
+ int count = buffer.getInt();
+
+ checkState(id.equals(new UUID(
+ buffer.getLong(), buffer.getLong())));
+
+ Collection<UUID> referencedIds =
+ newArrayListWithCapacity(count);
+ for (int i = 0; i < count; i++) {
+ referencedIds.add(new UUID(
+ buffer.getLong(), buffer.getLong()));
+ }
+
+ buffer.limit(buffer.position() + length);
+ return new Segment(
+ FileStore.this, id,
+ buffer.slice(), referencedIds,
+ Collections.<String, RecordId>emptyMap(),
+ Collections.<Template, RecordId>emptyMap());
+ }
}
+
+ throw new IllegalStateException("Segment " + id + " not found");
}
@Override
@@ -203,7 +204,8 @@ public class FileStore implements Segmen
}
buffer.position(pos);
- segments.put(segmentId, new Segment(
+
+ cache.addSegment(new Segment(
this, segmentId, buffer.slice(),
referencedSegmentIds, strings, templates));
}
@@ -223,6 +225,11 @@ public class FileStore implements Segmen
@Override
public void deleteSegment(UUID segmentId) {
// TODO: implement
+ try {
+ cache.removeSegment(segmentId);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
}
synchronized void writeJournals() throws IOException {
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/mongo/MongoStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/mongo/MongoStore.java?rev=1525946&r1=1525945&r2=1525946&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/mongo/MongoStore.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/mongo/MongoStore.java
Tue Sep 24 17:10:05 2013
@@ -70,12 +70,12 @@ public class MongoStore implements Segme
concern, builder.getNodeState()));
}
- public MongoStore(DB db, long cacheSize) {
+ public MongoStore(DB db, int cacheSize) {
this(db, new SegmentCache(cacheSize));
}
- public MongoStore(Mongo mongo, long cacheSize) {
+ public MongoStore(Mongo mongo, int cacheSize) {
this(mongo.getDB("Oak"), cacheSize);
}
@@ -96,12 +96,16 @@ public class MongoStore implements Segme
@Override
public Segment readSegment(final UUID segmentId) {
- return cache.getSegment(segmentId, new Callable<Segment>() {
- @Override
- public Segment call() throws Exception {
- return findSegment(segmentId);
- }
- });
+ try {
+ return cache.getSegment(segmentId, new Callable<Segment>() {
+ @Override
+ public Segment call() throws Exception {
+ return findSegment(segmentId);
+ }
+ });
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
@Override
@@ -162,6 +166,11 @@ public class MongoStore implements Segme
@Override
public void deleteSegment(UUID segmentId) {
segments.remove(new BasicDBObject("_id", segmentId.toString()));
- cache.removeSegment(segmentId);
+ try {
+ cache.removeSegment(segmentId);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
}
+
}
Modified:
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/BenchmarkRunner.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/BenchmarkRunner.java?rev=1525946&r1=1525945&r2=1525946&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/BenchmarkRunner.java
(original)
+++
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/BenchmarkRunner.java
Tue Sep 24 17:10:05 2013
@@ -32,7 +32,7 @@ import org.apache.jackrabbit.oak.fixture
public class BenchmarkRunner {
- private static final long MB = 1024 * 1024;
+ private static final int MB = 1024 * 1024;
public static void main(String[] args) throws Exception {
OptionParser parser = new OptionParser();
Modified:
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/OakRepositoryFixture.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/OakRepositoryFixture.java?rev=1525946&r1=1525945&r2=1525946&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/OakRepositoryFixture.java
(original)
+++
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/OakRepositoryFixture.java
Tue Sep 24 17:10:05 2013
@@ -115,7 +115,7 @@ public abstract class OakRepositoryFixtu
}
public static RepositoryFixture getSegment(
- final String host, final int port, final long cacheSize) {
+ final String host, final int port, final int cacheSize) {
return new OakRepositoryFixture("Oak-Segment") {
private SegmentStore[] stores;
private Mongo mongo;