Author: reschke
Date: Mon Feb 24 11:06:07 2014
New Revision: 1571235

URL: http://svn.apache.org/r1571235
Log:
OAK-1463 - memory cache for RDB persistence (work in process)

Modified:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java?rev=1571235&r1=1571234&r2=1571235&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
 Mon Feb 24 11:06:07 2014
@@ -487,7 +487,7 @@ public class DocumentMK implements Micro
         public Builder setRDBConnection(String jdbcurl, String username, 
String password) {
             // TODO maybe we need different connections for document store and
             // node store
-            this.documentStore = new RDBDocumentStore(jdbcurl, username, 
password);
+            this.documentStore = new RDBDocumentStore(jdbcurl, username, 
password, this);
             this.blobStore = new RDBBlobStore(jdbcurl, username, password);
             return this;
         }
@@ -500,7 +500,7 @@ public class DocumentMK implements Micro
          */
         public Builder setRDBConnection(String dsjdbcurl, String dsusername, 
String dspassword, String bsjdbcurl,
                 String bsusername, String bspassword) {
-            this.documentStore = new RDBDocumentStore(dsjdbcurl, dsusername, 
dspassword);
+            this.documentStore = new RDBDocumentStore(dsjdbcurl, dsusername, 
dspassword, this);
             this.blobStore = new RDBBlobStore(bsjdbcurl, bsusername, 
bspassword);
             return this;
         }
@@ -512,7 +512,7 @@ public class DocumentMK implements Micro
          * @return this
          */
         public Builder setRDBConnection(DataSource ds) {
-            this.documentStore = new RDBDocumentStore(ds);
+            this.documentStore = new RDBDocumentStore(ds, this);
             this.blobStore = new RDBBlobStore(ds);
             return this;
         }
@@ -524,7 +524,7 @@ public class DocumentMK implements Micro
          * @return this
          */
         public Builder setRDBConnection(DataSource documentStoreDataSource, 
DataSource blobStoreDataSource) {
-            this.documentStore = new RDBDocumentStore(documentStoreDataSource);
+            this.documentStore = new RDBDocumentStore(documentStoreDataSource, 
this);
             this.blobStore = new RDBBlobStore(blobStoreDataSource);
             return this;
         }

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java?rev=1571235&r1=1571234&r2=1571235&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
 Mon Feb 24 11:06:07 2014
@@ -29,37 +29,46 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.locks.Lock;
 
 import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 import javax.sql.DataSource;
 
 import org.apache.jackrabbit.mk.api.MicroKernelException;
+import org.apache.jackrabbit.oak.cache.CacheValue;
 import org.apache.jackrabbit.oak.plugins.document.Collection;
 import org.apache.jackrabbit.oak.plugins.document.Document;
+import org.apache.jackrabbit.oak.plugins.document.DocumentMK;
 import org.apache.jackrabbit.oak.plugins.document.DocumentStore;
 import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
 import org.apache.jackrabbit.oak.plugins.document.Revision;
 import org.apache.jackrabbit.oak.plugins.document.StableRevisionComparator;
 import org.apache.jackrabbit.oak.plugins.document.UpdateOp;
 import org.apache.jackrabbit.oak.plugins.document.UpdateUtils;
+import org.apache.jackrabbit.oak.plugins.document.util.StringValue;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
 import org.json.simple.parser.ParseException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.cache.Cache;
+import com.google.common.util.concurrent.Striped;
+
 public class RDBDocumentStore implements DocumentStore {
 
     /**
      * Creates a {@linkplain RDBDocumentStore} instance using an embedded H2
      * database in in-memory mode.
      */
-    public RDBDocumentStore() {
+    public RDBDocumentStore(DocumentMK.Builder builder) {
         try {
             String jdbcurl = "jdbc:h2:mem:oaknodes";
             Connection connection = DriverManager.getConnection(jdbcurl, "sa", 
"");
-            initialize(connection);
+            initialize(connection, builder);
         } catch (Exception ex) {
             throw new MicroKernelException("initializing RDB document store", 
ex);
         }
@@ -69,9 +78,9 @@ public class RDBDocumentStore implements
      * Creates a {@linkplain RDBDocumentStore} instance using the provided
      * {@link DataSource}.
      */
-    public RDBDocumentStore(DataSource ds) {
+    public RDBDocumentStore(DataSource ds, DocumentMK.Builder builder) {
         try {
-            initialize(ds.getConnection());
+            initialize(ds.getConnection(), builder);
         } catch (Exception ex) {
             throw new MicroKernelException("initializing RDB document store", 
ex);
         }
@@ -81,10 +90,10 @@ public class RDBDocumentStore implements
      * Creates a {@linkplain RDBDocumentStore} instance using the provided JDBC
      * connection information.
      */
-    public RDBDocumentStore(String jdbcurl, String username, String password) {
+    public RDBDocumentStore(String jdbcurl, String username, String password, 
DocumentMK.Builder builder) {
         try {
             Connection connection = DriverManager.getConnection(jdbcurl, 
username, password);
-            initialize(connection);
+            initialize(connection, builder);
         } catch (Exception ex) {
             throw new MicroKernelException("initializing RDB document store", 
ex);
         }
@@ -96,30 +105,71 @@ public class RDBDocumentStore implements
     }
 
     @Override
-    public <T extends Document> T find(Collection<T> collection, String id, 
int maxCacheAge) {
-        T fromCache = getFromCache(collection, id, maxCacheAge);
-        if (fromCache != null) {
-            return fromCache;
-        }
-        else {
+    public <T extends Document> T find(final Collection<T> collection, final 
String id, int maxCacheAge) {
+        if (collection != Collection.NODES) {
             return readDocument(collection, id);
+        } else {
+            CacheValue cacheKey = new StringValue(id);
+            NodeDocument doc;
+            if (maxCacheAge > 0) {
+                // first try without lock
+                doc = nodesCache.getIfPresent(cacheKey);
+                if (doc != null) {
+                    if (maxCacheAge == Integer.MAX_VALUE || 
System.currentTimeMillis() - doc.getCreated() < maxCacheAge) {
+                        return castAsT(unwrap(doc));
+                    }
+                }
+            }
+            try {
+                Lock lock = getAndLock(id);
+                try {
+                    if (maxCacheAge == 0) {
+                        invalidateCache(collection, id);
+                    }
+                    while (true) {
+                        doc = nodesCache.get(cacheKey, new 
Callable<NodeDocument>() {
+                            @Override
+                            public NodeDocument call() throws Exception {
+                                NodeDocument doc = (NodeDocument) 
readDocument(collection, id);
+                                return wrap(doc);
+                            }
+                        });
+                        if (maxCacheAge == 0 || maxCacheAge == 
Integer.MAX_VALUE) {
+                            break;
+                        }
+                        if (System.currentTimeMillis() - doc.getCreated() < 
maxCacheAge) {
+                            break;
+                        }
+                        // too old: invalidate, try again
+                        invalidateCache(collection, id);
+                    }
+                } finally {
+                    lock.unlock();
+                }
+                return castAsT(unwrap(doc));
+            } catch (ExecutionException e) {
+                throw new IllegalStateException("Failed to load document with 
" + id, e);
+            }
         }
     }
 
     @Override
     public <T extends Document> List<T> query(Collection<T> collection, String 
fromKey, String toKey, int limit) {
+        // TODO cache
         return query(collection, fromKey, toKey, null, 0, limit);
     }
 
     @Override
     public <T extends Document> List<T> query(Collection<T> collection, String 
fromKey, String toKey, String indexedProperty,
             long startValue, int limit) {
+        // TODO cache
         return internalQuery(collection, fromKey, toKey, indexedProperty, 
startValue, limit);
     }
 
     @Override
     public <T extends Document> void remove(Collection<T> collection, String 
id) {
         delete(collection, id);
+        invalidateCache(collection, id);
     }
 
     @Override
@@ -134,23 +184,30 @@ public class RDBDocumentStore implements
 
     @Override
     public <T extends Document> T createOrUpdate(Collection<T> collection, 
UpdateOp update) throws MicroKernelException {
+        // TODO cache
         return internalCreateOrUpdate(collection, update, true, false);
     }
 
     @Override
     public <T extends Document> T findAndUpdate(Collection<T> collection, 
UpdateOp update) throws MicroKernelException {
+        // TODO cache
         return internalCreateOrUpdate(collection, update, false, true);
     }
 
     @Override
     public void invalidateCache() {
-        this.root = null;
+        nodesCache.invalidateAll();
     }
 
     @Override
     public <T extends Document> void invalidateCache(Collection<T> collection, 
String id) {
-        if (collection == Collection.NODES && ROOT.equals(id)) {
-            this.root = null;
+        if (collection == Collection.NODES) {
+            Lock lock = getAndLock(id);
+            try {
+                nodesCache.invalidate(new StringValue(id));
+            } finally {
+                lock.unlock();
+            }
         }
     }
 
@@ -166,10 +223,10 @@ public class RDBDocumentStore implements
 
     @Override
     public <T extends Document> T getIfCached(Collection<T> collection, String 
id) {
-        if (collection == Collection.NODES && ROOT.equals(id)) {
-            return castAsT(this.root);
-        } else {
+        if (collection != Collection.NODES) {
             return null;
+        } else {
+            return castAsT(nodesCache.getIfPresent(new StringValue(id)));
         }
     }
 
@@ -186,7 +243,7 @@ public class RDBDocumentStore implements
 
     private Connection connection;
 
-    private void initialize(Connection con) throws Exception {
+    private void initialize(Connection con, DocumentMK.Builder builder) throws 
Exception {
         con.setAutoCommit(false);
 
         Statement stmt = con.createStatement();
@@ -198,6 +255,8 @@ public class RDBDocumentStore implements
 
         this.connection = con;
         this.callStack = LOG.isDebugEnabled() ? new Exception("call stack of 
RDBDocumentStore creation") : null;
+
+        this.nodesCache = builder.buildCache(builder.getDocumentCacheSize());
     }
 
     @Override
@@ -215,7 +274,7 @@ public class RDBDocumentStore implements
                 update.increment(MODCOUNT, 1);
                 UpdateUtils.applyChanges(doc, update, comparator);
                 insertDocument(collection, doc);
-                updateCache(collection, doc);
+                addToCache(collection, doc);
             }
             return true;
         } catch (MicroKernelException ex) {
@@ -240,7 +299,7 @@ public class RDBDocumentStore implements
             UpdateUtils.applyChanges(doc, update, comparator);
             doc.seal();
             insertDocument(collection, doc);
-            updateCache(collection, doc);
+            addToCache(collection, doc);
         } else {
             T doc = applyChanges(collection, oldDoc, update, checkConditions);
             if (doc == null) {
@@ -260,9 +319,8 @@ public class RDBDocumentStore implements
                     if (doc == null) {
                         return null;
                     }
-                }
-                else {
-                    updateCache(collection, doc);
+                } else {
+                    addToCache(collection, doc);
                 }
             }
 
@@ -304,6 +362,7 @@ public class RDBDocumentStore implements
                 Long modified = (Long) doc.get(MODIFIED);
                 Long modcount = (Long) doc.get(MODCOUNT);
                 dbUpdate(connection, tableName, id, modified, modcount, 
oldmodcount, data);
+                invalidateCache(collection, id); // TODO
             }
             connection.commit();
         } catch (Exception ex) {
@@ -324,6 +383,7 @@ public class RDBDocumentStore implements
                 T doc = fromString(collection, data);
                 doc.seal();
                 result.add(doc);
+                addToCacheIfNotNewer(collection, doc);
             }
         } catch (Exception ex) {
             throw new MicroKernelException(ex);
@@ -553,36 +613,110 @@ public class RDBDocumentStore implements
         // ignored
     }
 
-    // very poor man's cache
-    private NodeDocument root;
-    private long rootWritten;
-    private static final String ROOT = "0:/";
-
-    private <T extends Document> void updateCache(Collection<T> collection, 
Document doc) {
-        if (collection == Collection.NODES && doc.getId().equals(ROOT)) {
-            synchronized (this) {
-                this.root = (NodeDocument) doc;
-                this.rootWritten = System.currentTimeMillis();
-            }
-        }
+    @SuppressWarnings("unchecked")
+    private static <T extends Document> T castAsT(NodeDocument doc) {
+        return (T) doc;
     }
 
-    private <T extends Document> T getFromCache(Collection<T> collection, 
String id, long maxCacheAge) {
-        if (collection == Collection.NODES && id.equals(ROOT)) {
-            synchronized (this) {
-                if (this.root != null) {
-                    long age = System.currentTimeMillis() - rootWritten;
-                    if (age < maxCacheAge) {
-                        return castAsT(this.root);
+    // Memory Cache
+    private Cache<CacheValue, NodeDocument> nodesCache;
+    private final Striped<Lock> locks = Striped.lock(64);
+
+    private Lock getAndLock(String key) {
+        Lock l = locks.get(key);
+        l.lock();
+        return l;
+    }
+
+    @CheckForNull
+    private static NodeDocument unwrap(@Nonnull NodeDocument doc) {
+        return doc == NodeDocument.NULL ? null : doc;
+    }
+
+    @Nonnull
+    private static NodeDocument wrap(@CheckForNull NodeDocument doc) {
+        return doc == null ? NodeDocument.NULL : doc;
+    }
+
+    /**
+     * Adds a document to the {@link #nodesCache} iff there is no document in
+     * the cache with the document key. This method does not acquire a lock 
from
+     * {@link #locks}! The caller must ensure a lock is held for the given
+     * document.
+     * 
+     * @param doc
+     *            the document to add to the cache.
+     * @return either the given <code>doc</code> or the document already 
present
+     *         in the cache.
+     */
+    @Nonnull
+    private NodeDocument addToCache(@Nonnull final NodeDocument doc) {
+        if (doc == NodeDocument.NULL) {
+            throw new IllegalArgumentException("doc must not be NULL 
document");
+        }
+        doc.seal();
+        // make sure we only cache the document if it wasn't
+        // changed and cached by some other thread in the
+        // meantime. That is, use get() with a Callable,
+        // which is only used when the document isn't there
+        try {
+            CacheValue key = new StringValue(doc.getId());
+            for (;;) {
+                NodeDocument cached = nodesCache.get(key, new 
Callable<NodeDocument>() {
+                    @Override
+                    public NodeDocument call() {
+                        return doc;
                     }
+                });
+                if (cached != NodeDocument.NULL) {
+                    return cached;
+                } else {
+                    nodesCache.invalidate(key);
                 }
             }
+        } catch (ExecutionException e) {
+            // will never happen because call() just returns
+            // the already available doc
+            throw new IllegalStateException(e);
         }
-        return null;
     }
 
-    @SuppressWarnings("unchecked")
-    private static <T extends Document> T castAsT(NodeDocument doc) {
-        return (T) doc;
+    private <T extends Document> void addToCache(Collection<T> collection, T 
doc) {
+        if (collection == Collection.NODES) {
+            Lock lock = getAndLock(doc.getId());
+            try {
+                addToCache((NodeDocument) doc);
+            } finally {
+                lock.unlock();
+            }
+        }
+    }
+
+    private <T extends Document> void addToCacheIfNotNewer(Collection<T> 
collection, T doc) {
+        if (collection == Collection.NODES) {
+            String id = doc.getId();
+            Lock lock = getAndLock(id);
+            CacheValue cacheKey = new StringValue(id);
+            try {
+                // do not overwrite document in cache if the
+                // existing one in the cache is newer
+                NodeDocument cached = nodesCache.getIfPresent(cacheKey);
+                if (cached != null && cached != NodeDocument.NULL) {
+                    // check mod count
+                    Number cachedModCount = cached.getModCount();
+                    Number modCount = doc.getModCount();
+                    if (cachedModCount == null || modCount == null) {
+                        throw new IllegalStateException("Missing " + 
Document.MOD_COUNT);
+                    }
+                    if (modCount.longValue() > cachedModCount.longValue()) {
+                        nodesCache.put(cacheKey, (NodeDocument) doc);
+                    }
+                } else {
+                    nodesCache.put(cacheKey, (NodeDocument) doc);
+                }
+            } finally {
+                lock.unlock();
+            }
+        }
     }
 }


Reply via email to