Author: reschke
Date: Mon Feb 24 11:06:07 2014
New Revision: 1571235
URL: http://svn.apache.org/r1571235
Log:
OAK-1463 - memory cache for RDB persistence (work in process)
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java?rev=1571235&r1=1571234&r2=1571235&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
Mon Feb 24 11:06:07 2014
@@ -487,7 +487,7 @@ public class DocumentMK implements Micro
public Builder setRDBConnection(String jdbcurl, String username,
String password) {
// TODO maybe we need different connections for document store and
// node store
- this.documentStore = new RDBDocumentStore(jdbcurl, username,
password);
+ this.documentStore = new RDBDocumentStore(jdbcurl, username,
password, this);
this.blobStore = new RDBBlobStore(jdbcurl, username, password);
return this;
}
@@ -500,7 +500,7 @@ public class DocumentMK implements Micro
*/
public Builder setRDBConnection(String dsjdbcurl, String dsusername,
String dspassword, String bsjdbcurl,
String bsusername, String bspassword) {
- this.documentStore = new RDBDocumentStore(dsjdbcurl, dsusername,
dspassword);
+ this.documentStore = new RDBDocumentStore(dsjdbcurl, dsusername,
dspassword, this);
this.blobStore = new RDBBlobStore(bsjdbcurl, bsusername,
bspassword);
return this;
}
@@ -512,7 +512,7 @@ public class DocumentMK implements Micro
* @return this
*/
public Builder setRDBConnection(DataSource ds) {
- this.documentStore = new RDBDocumentStore(ds);
+ this.documentStore = new RDBDocumentStore(ds, this);
this.blobStore = new RDBBlobStore(ds);
return this;
}
@@ -524,7 +524,7 @@ public class DocumentMK implements Micro
* @return this
*/
public Builder setRDBConnection(DataSource documentStoreDataSource,
DataSource blobStoreDataSource) {
- this.documentStore = new RDBDocumentStore(documentStoreDataSource);
+ this.documentStore = new RDBDocumentStore(documentStoreDataSource,
this);
this.blobStore = new RDBBlobStore(blobStoreDataSource);
return this;
}
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java?rev=1571235&r1=1571234&r2=1571235&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
Mon Feb 24 11:06:07 2014
@@ -29,37 +29,46 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.locks.Lock;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import javax.sql.DataSource;
import org.apache.jackrabbit.mk.api.MicroKernelException;
+import org.apache.jackrabbit.oak.cache.CacheValue;
import org.apache.jackrabbit.oak.plugins.document.Collection;
import org.apache.jackrabbit.oak.plugins.document.Document;
+import org.apache.jackrabbit.oak.plugins.document.DocumentMK;
import org.apache.jackrabbit.oak.plugins.document.DocumentStore;
import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
import org.apache.jackrabbit.oak.plugins.document.Revision;
import org.apache.jackrabbit.oak.plugins.document.StableRevisionComparator;
import org.apache.jackrabbit.oak.plugins.document.UpdateOp;
import org.apache.jackrabbit.oak.plugins.document.UpdateUtils;
+import org.apache.jackrabbit.oak.plugins.document.util.StringValue;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.cache.Cache;
+import com.google.common.util.concurrent.Striped;
+
public class RDBDocumentStore implements DocumentStore {
/**
* Creates a {@linkplain RDBDocumentStore} instance using an embedded H2
* database in in-memory mode.
*/
- public RDBDocumentStore() {
+ public RDBDocumentStore(DocumentMK.Builder builder) {
try {
String jdbcurl = "jdbc:h2:mem:oaknodes";
Connection connection = DriverManager.getConnection(jdbcurl, "sa",
"");
- initialize(connection);
+ initialize(connection, builder);
} catch (Exception ex) {
throw new MicroKernelException("initializing RDB document store",
ex);
}
@@ -69,9 +78,9 @@ public class RDBDocumentStore implements
* Creates a {@linkplain RDBDocumentStore} instance using the provided
* {@link DataSource}.
*/
- public RDBDocumentStore(DataSource ds) {
+ public RDBDocumentStore(DataSource ds, DocumentMK.Builder builder) {
try {
- initialize(ds.getConnection());
+ initialize(ds.getConnection(), builder);
} catch (Exception ex) {
throw new MicroKernelException("initializing RDB document store",
ex);
}
@@ -81,10 +90,10 @@ public class RDBDocumentStore implements
* Creates a {@linkplain RDBDocumentStore} instance using the provided JDBC
* connection information.
*/
- public RDBDocumentStore(String jdbcurl, String username, String password) {
+ public RDBDocumentStore(String jdbcurl, String username, String password,
DocumentMK.Builder builder) {
try {
Connection connection = DriverManager.getConnection(jdbcurl,
username, password);
- initialize(connection);
+ initialize(connection, builder);
} catch (Exception ex) {
throw new MicroKernelException("initializing RDB document store",
ex);
}
@@ -96,30 +105,71 @@ public class RDBDocumentStore implements
}
@Override
- public <T extends Document> T find(Collection<T> collection, String id,
int maxCacheAge) {
- T fromCache = getFromCache(collection, id, maxCacheAge);
- if (fromCache != null) {
- return fromCache;
- }
- else {
+ public <T extends Document> T find(final Collection<T> collection, final
String id, int maxCacheAge) {
+ if (collection != Collection.NODES) {
return readDocument(collection, id);
+ } else {
+ CacheValue cacheKey = new StringValue(id);
+ NodeDocument doc;
+ if (maxCacheAge > 0) {
+ // first try without lock
+ doc = nodesCache.getIfPresent(cacheKey);
+ if (doc != null) {
+ if (maxCacheAge == Integer.MAX_VALUE ||
System.currentTimeMillis() - doc.getCreated() < maxCacheAge) {
+ return castAsT(unwrap(doc));
+ }
+ }
+ }
+ try {
+ Lock lock = getAndLock(id);
+ try {
+ if (maxCacheAge == 0) {
+ invalidateCache(collection, id);
+ }
+ while (true) {
+ doc = nodesCache.get(cacheKey, new
Callable<NodeDocument>() {
+ @Override
+ public NodeDocument call() throws Exception {
+ NodeDocument doc = (NodeDocument)
readDocument(collection, id);
+ return wrap(doc);
+ }
+ });
+ if (maxCacheAge == 0 || maxCacheAge ==
Integer.MAX_VALUE) {
+ break;
+ }
+ if (System.currentTimeMillis() - doc.getCreated() <
maxCacheAge) {
+ break;
+ }
+ // too old: invalidate, try again
+ invalidateCache(collection, id);
+ }
+ } finally {
+ lock.unlock();
+ }
+ return castAsT(unwrap(doc));
+ } catch (ExecutionException e) {
+ throw new IllegalStateException("Failed to load document with
" + id, e);
+ }
}
}
@Override
public <T extends Document> List<T> query(Collection<T> collection, String
fromKey, String toKey, int limit) {
+ // TODO cache
return query(collection, fromKey, toKey, null, 0, limit);
}
@Override
public <T extends Document> List<T> query(Collection<T> collection, String
fromKey, String toKey, String indexedProperty,
long startValue, int limit) {
+ // TODO cache
return internalQuery(collection, fromKey, toKey, indexedProperty,
startValue, limit);
}
@Override
public <T extends Document> void remove(Collection<T> collection, String
id) {
delete(collection, id);
+ invalidateCache(collection, id);
}
@Override
@@ -134,23 +184,30 @@ public class RDBDocumentStore implements
@Override
public <T extends Document> T createOrUpdate(Collection<T> collection,
UpdateOp update) throws MicroKernelException {
+ // TODO cache
return internalCreateOrUpdate(collection, update, true, false);
}
@Override
public <T extends Document> T findAndUpdate(Collection<T> collection,
UpdateOp update) throws MicroKernelException {
+ // TODO cache
return internalCreateOrUpdate(collection, update, false, true);
}
@Override
public void invalidateCache() {
- this.root = null;
+ nodesCache.invalidateAll();
}
@Override
public <T extends Document> void invalidateCache(Collection<T> collection,
String id) {
- if (collection == Collection.NODES && ROOT.equals(id)) {
- this.root = null;
+ if (collection == Collection.NODES) {
+ Lock lock = getAndLock(id);
+ try {
+ nodesCache.invalidate(new StringValue(id));
+ } finally {
+ lock.unlock();
+ }
}
}
@@ -166,10 +223,10 @@ public class RDBDocumentStore implements
@Override
public <T extends Document> T getIfCached(Collection<T> collection, String
id) {
- if (collection == Collection.NODES && ROOT.equals(id)) {
- return castAsT(this.root);
- } else {
+ if (collection != Collection.NODES) {
return null;
+ } else {
+ return castAsT(nodesCache.getIfPresent(new StringValue(id)));
}
}
@@ -186,7 +243,7 @@ public class RDBDocumentStore implements
private Connection connection;
- private void initialize(Connection con) throws Exception {
+ private void initialize(Connection con, DocumentMK.Builder builder) throws
Exception {
con.setAutoCommit(false);
Statement stmt = con.createStatement();
@@ -198,6 +255,8 @@ public class RDBDocumentStore implements
this.connection = con;
this.callStack = LOG.isDebugEnabled() ? new Exception("call stack of
RDBDocumentStore creation") : null;
+
+ this.nodesCache = builder.buildCache(builder.getDocumentCacheSize());
}
@Override
@@ -215,7 +274,7 @@ public class RDBDocumentStore implements
update.increment(MODCOUNT, 1);
UpdateUtils.applyChanges(doc, update, comparator);
insertDocument(collection, doc);
- updateCache(collection, doc);
+ addToCache(collection, doc);
}
return true;
} catch (MicroKernelException ex) {
@@ -240,7 +299,7 @@ public class RDBDocumentStore implements
UpdateUtils.applyChanges(doc, update, comparator);
doc.seal();
insertDocument(collection, doc);
- updateCache(collection, doc);
+ addToCache(collection, doc);
} else {
T doc = applyChanges(collection, oldDoc, update, checkConditions);
if (doc == null) {
@@ -260,9 +319,8 @@ public class RDBDocumentStore implements
if (doc == null) {
return null;
}
- }
- else {
- updateCache(collection, doc);
+ } else {
+ addToCache(collection, doc);
}
}
@@ -304,6 +362,7 @@ public class RDBDocumentStore implements
Long modified = (Long) doc.get(MODIFIED);
Long modcount = (Long) doc.get(MODCOUNT);
dbUpdate(connection, tableName, id, modified, modcount,
oldmodcount, data);
+ invalidateCache(collection, id); // TODO
}
connection.commit();
} catch (Exception ex) {
@@ -324,6 +383,7 @@ public class RDBDocumentStore implements
T doc = fromString(collection, data);
doc.seal();
result.add(doc);
+ addToCacheIfNotNewer(collection, doc);
}
} catch (Exception ex) {
throw new MicroKernelException(ex);
@@ -553,36 +613,110 @@ public class RDBDocumentStore implements
// ignored
}
- // very poor man's cache
- private NodeDocument root;
- private long rootWritten;
- private static final String ROOT = "0:/";
-
- private <T extends Document> void updateCache(Collection<T> collection,
Document doc) {
- if (collection == Collection.NODES && doc.getId().equals(ROOT)) {
- synchronized (this) {
- this.root = (NodeDocument) doc;
- this.rootWritten = System.currentTimeMillis();
- }
- }
+ @SuppressWarnings("unchecked")
+ private static <T extends Document> T castAsT(NodeDocument doc) {
+ return (T) doc;
}
- private <T extends Document> T getFromCache(Collection<T> collection,
String id, long maxCacheAge) {
- if (collection == Collection.NODES && id.equals(ROOT)) {
- synchronized (this) {
- if (this.root != null) {
- long age = System.currentTimeMillis() - rootWritten;
- if (age < maxCacheAge) {
- return castAsT(this.root);
+ // Memory Cache
+ private Cache<CacheValue, NodeDocument> nodesCache;
+ private final Striped<Lock> locks = Striped.lock(64);
+
+ private Lock getAndLock(String key) {
+ Lock l = locks.get(key);
+ l.lock();
+ return l;
+ }
+
+ @CheckForNull
+ private static NodeDocument unwrap(@Nonnull NodeDocument doc) {
+ return doc == NodeDocument.NULL ? null : doc;
+ }
+
+ @Nonnull
+ private static NodeDocument wrap(@CheckForNull NodeDocument doc) {
+ return doc == null ? NodeDocument.NULL : doc;
+ }
+
+ /**
+ * Adds a document to the {@link #nodesCache} iff there is no document in
+ * the cache with the document key. This method does not acquire a lock
from
+ * {@link #locks}! The caller must ensure a lock is held for the given
+ * document.
+ *
+ * @param doc
+ * the document to add to the cache.
+ * @return either the given <code>doc</code> or the document already
present
+ * in the cache.
+ */
+ @Nonnull
+ private NodeDocument addToCache(@Nonnull final NodeDocument doc) {
+ if (doc == NodeDocument.NULL) {
+ throw new IllegalArgumentException("doc must not be NULL
document");
+ }
+ doc.seal();
+ // make sure we only cache the document if it wasn't
+ // changed and cached by some other thread in the
+ // meantime. That is, use get() with a Callable,
+ // which is only used when the document isn't there
+ try {
+ CacheValue key = new StringValue(doc.getId());
+ for (;;) {
+ NodeDocument cached = nodesCache.get(key, new
Callable<NodeDocument>() {
+ @Override
+ public NodeDocument call() {
+ return doc;
}
+ });
+ if (cached != NodeDocument.NULL) {
+ return cached;
+ } else {
+ nodesCache.invalidate(key);
}
}
+ } catch (ExecutionException e) {
+ // will never happen because call() just returns
+ // the already available doc
+ throw new IllegalStateException(e);
}
- return null;
}
- @SuppressWarnings("unchecked")
- private static <T extends Document> T castAsT(NodeDocument doc) {
- return (T) doc;
+ private <T extends Document> void addToCache(Collection<T> collection, T
doc) {
+ if (collection == Collection.NODES) {
+ Lock lock = getAndLock(doc.getId());
+ try {
+ addToCache((NodeDocument) doc);
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ private <T extends Document> void addToCacheIfNotNewer(Collection<T>
collection, T doc) {
+ if (collection == Collection.NODES) {
+ String id = doc.getId();
+ Lock lock = getAndLock(id);
+ CacheValue cacheKey = new StringValue(id);
+ try {
+ // do not overwrite document in cache if the
+ // existing one in the cache is newer
+ NodeDocument cached = nodesCache.getIfPresent(cacheKey);
+ if (cached != null && cached != NodeDocument.NULL) {
+ // check mod count
+ Number cachedModCount = cached.getModCount();
+ Number modCount = doc.getModCount();
+ if (cachedModCount == null || modCount == null) {
+ throw new IllegalStateException("Missing " +
Document.MOD_COUNT);
+ }
+ if (modCount.longValue() > cachedModCount.longValue()) {
+ nodesCache.put(cacheKey, (NodeDocument) doc);
+ }
+ } else {
+ nodesCache.put(cacheKey, (NodeDocument) doc);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
}
}