Author: reschke
Date: Fri Feb 28 14:31:03 2014
New Revision: 1572957
URL: http://svn.apache.org/r1572957
Log:
OAK-1463 - memory cache for RDB persistence (fix update handling) (work in
progress)
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java?rev=1572957&r1=1572956&r2=1572957&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
Fri Feb 28 14:31:03 2014
@@ -43,7 +43,6 @@ import org.apache.jackrabbit.oak.cache.C
import org.apache.jackrabbit.oak.plugins.document.Collection;
import org.apache.jackrabbit.oak.plugins.document.Document;
import org.apache.jackrabbit.oak.plugins.document.DocumentMK;
-import org.apache.jackrabbit.oak.plugins.document.DocumentStore;
import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
import org.apache.jackrabbit.oak.plugins.document.Revision;
import org.apache.jackrabbit.oak.plugins.document.StableRevisionComparator;
@@ -57,6 +56,7 @@ import org.json.simple.parser.ParseExcep
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Objects;
import com.google.common.cache.Cache;
import com.google.common.util.concurrent.Striped;
@@ -103,7 +103,7 @@ public class RDBDocumentStore implements
@Override
public <T extends Document> T find(Collection<T> collection, String id) {
- return find(collection, id, 0);
+ return find(collection, id, Integer.MAX_VALUE);
}
@Override
@@ -133,6 +133,9 @@ public class RDBDocumentStore implements
@Override
public NodeDocument call() throws Exception {
NodeDocument doc = (NodeDocument)
readDocument(collection, id);
+ if (doc != null) {
+ doc.seal();
+ }
return wrap(doc);
}
});
@@ -151,7 +154,7 @@ public class RDBDocumentStore implements
return castAsT(unwrap(doc));
} catch (ExecutionException e) {
throw new IllegalStateException("Failed to load document with
" + id, e);
- }
+ }
}
}
@@ -186,13 +189,11 @@ public class RDBDocumentStore implements
@Override
public <T extends Document> T createOrUpdate(Collection<T> collection,
UpdateOp update) throws MicroKernelException {
- // TODO cache
return internalCreateOrUpdate(collection, update, true, false);
}
@Override
public <T extends Document> T findAndUpdate(Collection<T> collection,
UpdateOp update) throws MicroKernelException {
- // TODO cache
return internalCreateOrUpdate(collection, update, false, true);
}
@@ -228,7 +229,8 @@ public class RDBDocumentStore implements
if (collection != Collection.NODES) {
return null;
} else {
- return castAsT(nodesCache.getIfPresent(new StringValue(id)));
+ NodeDocument doc = nodesCache.getIfPresent(new StringValue(id));
+ return castAsT(doc);
}
}
@@ -305,7 +307,6 @@ public class RDBDocumentStore implements
}
update.increment(MODCOUNT, 1);
UpdateUtils.applyChanges(doc, update, comparator);
- doc.seal();
insertDocument(collection, doc);
addToCache(collection, doc);
} else {
@@ -328,7 +329,7 @@ public class RDBDocumentStore implements
return null;
}
} else {
- addToCache(collection, doc);
+ applyToCache(collection, oldDoc, doc);
}
}
@@ -690,6 +691,34 @@ public class RDBDocumentStore implements
}
}
+ @Nonnull
+ private void applyToCache(@Nonnull final NodeDocument oldDoc, @Nonnull
final NodeDocument newDoc) {
+ NodeDocument cached = addToCache(newDoc);
+ if (cached == newDoc) {
+ // successful
+ return;
+ }
+ else if (oldDoc == null) {
+ // this is an insert and some other thread was quicker
+ // loading it into the cache -> return now
+ return;
+ }
+ else {
+ CacheValue key = new StringValue(newDoc.getId());
+ // this is an update (oldDoc != null)
+ if (Objects.equal(cached.getModCount(), oldDoc.getModCount())) {
+ nodesCache.put(key, newDoc);
+ } else {
+ // the cache entry was modified by some other thread in
+ // the meantime. the updated cache entry may or may not
+ // include this update. we cannot just apply our update
+ // on top of the cached entry.
+ // therefore we must invalidate the cache entry
+ nodesCache.invalidate(key);
+ }
+ }
+ }
+
private <T extends Document> void addToCache(Collection<T> collection, T
doc) {
if (collection == Collection.NODES) {
Lock lock = getAndLock(doc.getId());
@@ -701,6 +730,17 @@ public class RDBDocumentStore implements
}
}
+ private <T extends Document> void applyToCache(Collection<T> collection, T
oldDoc, T newDoc) {
+ if (collection == Collection.NODES) {
+ Lock lock = getAndLock(newDoc.getId());
+ try {
+ applyToCache((NodeDocument) oldDoc, (NodeDocument) newDoc);
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
private <T extends Document> void addToCacheIfNotNewer(Collection<T>
collection, T doc) {
if (collection == Collection.NODES) {
String id = doc.getId();