Author: mreutegg
Date: Mon Mar 24 09:20:55 2014
New Revision: 1580792

URL: http://svn.apache.org/r1580792
Log:
OAK-1578: Configurable size of capped collection used by MongoDiffCache

Added:
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoDiffCacheTest.java
   (with props)
Modified:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MongoDiffCache.java

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java?rev=1580792&r1=1580791&r2=1580792&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
 Mon Mar 24 09:20:55 2014
@@ -468,12 +468,14 @@ public class DocumentMK implements Micro
         }
 
         /**
-         * Set the MongoDB connection to use. By default an in-memory store is 
used.
+         * Use the given MongoDB as backend storage for the DocumentNodeStore.
          *
          * @param db the MongoDB connection
+         * @param changesSizeMB the size in MB of the capped collection backing
+         *                      the MongoDiffCache.
          * @return this
          */
-        public Builder setMongoDB(DB db) {
+        public Builder setMongoDB(DB db, int changesSizeMB) {
             if (db != null) {
                 if (this.documentStore == null) {
                     this.documentStore = new MongoDocumentStore(db, this);
@@ -484,13 +486,23 @@ public class DocumentMK implements Micro
                 }
 
                 if (this.diffCache == null) {
-                    this.diffCache = new MongoDiffCache(db, this);
+                    this.diffCache = new MongoDiffCache(db, changesSizeMB, 
this);
                 }
             }
             return this;
         }
 
         /**
+         * Set the MongoDB connection to use. By default an in-memory store is 
used.
+         *
+         * @param db the MongoDB connection
+         * @return this
+         */
+        public Builder setMongoDB(DB db) {
+            return setMongoDB(db, 8);
+        }
+
+        /**
          * Sets a JDBC connection URL to use for the RDB document and blob
          * stores.
          *

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java?rev=1580792&r1=1580791&r2=1580792&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
 Mon Mar 24 09:20:55 2014
@@ -70,6 +70,7 @@ public class DocumentNodeStoreService {
     private static final String DEFAULT_URI = "mongodb://localhost:27017/oak";
     private static final int DEFAULT_CACHE = 256;
     private static final int DEFAULT_OFF_HEAP_CACHE = 0;
+    private static final int DEFAULT_CHANGES_SIZE = 256;
     private static final String DEFAULT_DB = "oak";
     private static final String PREFIX = "oak.documentstore.";
 
@@ -101,6 +102,9 @@ public class DocumentNodeStoreService {
     @Property(intValue = DEFAULT_OFF_HEAP_CACHE)
     private static final String PROP_OFF_HEAP_CACHE = "offHeapCache";
 
+    @Property(intValue =  DEFAULT_CHANGES_SIZE)
+    private static final String PROP_CHANGES_SIZE = "changesSize";
+
     /**
      * Boolean value indicating a blobStore is to be used
      */
@@ -145,6 +149,7 @@ public class DocumentNodeStoreService {
 
         int offHeapCache = PropertiesUtil.toInteger(prop(PROP_OFF_HEAP_CACHE), 
DEFAULT_OFF_HEAP_CACHE);
         int cacheSize = PropertiesUtil.toInteger(prop(PROP_CACHE), 
DEFAULT_CACHE);
+        int changesSize = PropertiesUtil.toInteger(prop(PROP_CHANGES_SIZE), 
DEFAULT_CHANGES_SIZE);
         boolean useMK = 
PropertiesUtil.toBoolean(context.getProperties().get(PROP_USE_MK), false);
 
 
@@ -155,8 +160,8 @@ public class DocumentNodeStoreService {
             // Take care around not logging the uri directly as it
             // might contain passwords
             String type = useMK ? "MK" : "NodeStore";
-            log.info("Starting Document{} with host={}, db={}, cache size 
(MB)={}, Off Heap Cache size (MB)={}",
-                    type, mongoURI.getHosts(), db, cacheSize, offHeapCache);
+            log.info("Starting Document{} with host={}, db={}, cache size 
(MB)={}, Off Heap Cache size (MB)={}, 'changes' collection size (MB)={}",
+                    type, mongoURI.getHosts(), db, cacheSize, offHeapCache, 
changesSize);
             log.info("Mongo Connection details {}", 
MongoConnection.toString(mongoURI.getOptions()));
         }
 
@@ -173,7 +178,7 @@ public class DocumentNodeStoreService {
             mkBuilder.setBlobStore(blobStore);
         }
 
-        mkBuilder.setMongoDB(mongoDB);
+        mkBuilder.setMongoDB(mongoDB, changesSize);
 
         mk = mkBuilder.open();
 

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MongoDiffCache.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MongoDiffCache.java?rev=1580792&r1=1580791&r2=1580792&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MongoDiffCache.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MongoDiffCache.java
 Mon Mar 24 09:20:55 2014
@@ -16,14 +16,25 @@
  */
 package org.apache.jackrabbit.oak.plugins.document;
 
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+
 import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import org.apache.jackrabbit.oak.commons.PathUtils;
+import org.apache.jackrabbit.oak.commons.json.JsopReader;
+import org.apache.jackrabbit.oak.commons.json.JsopStream;
+import org.apache.jackrabbit.oak.commons.json.JsopTokenizer;
+import org.apache.jackrabbit.oak.commons.json.JsopWriter;
 import org.apache.jackrabbit.oak.plugins.document.util.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Striped;
 import com.mongodb.BasicDBObject;
 import com.mongodb.BasicDBObjectBuilder;
 import com.mongodb.DB;
@@ -43,20 +54,22 @@ public class MongoDiffCache extends Memo
 
     private static final long MB = 1024 * 1024;
 
-    private static final String COLLECTION_NAME = "changeLog";
-
-    // TODO: make configurable
-    private static final DBObject COLLECTION_OPTIONS = 
BasicDBObjectBuilder.start()
-            .add("capped", true).add("size", 256 * MB).get();
+    private static final String COLLECTION_NAME = "changes";
 
     private final DBCollection changes;
 
-    public MongoDiffCache(DB db, DocumentMK.Builder builder) {
+    private final Cache<String, String> blacklist = 
CacheBuilder.newBuilder().maximumSize(1024).build();
+
+    private final Striped<Lock> locks = Striped.lock(16);
+
+    public MongoDiffCache(DB db, int sizeMB, DocumentMK.Builder builder) {
         super(builder);
         if (db.collectionExists(COLLECTION_NAME)) {
             changes = db.getCollection(COLLECTION_NAME);
         } else {
-            changes = db.createCollection(COLLECTION_NAME, COLLECTION_OPTIONS);
+            changes = db.createCollection(COLLECTION_NAME,
+                    BasicDBObjectBuilder.start().add("capped", true)
+                            .add("size", sizeMB * MB).get());
         }
     }
 
@@ -65,40 +78,61 @@ public class MongoDiffCache extends Memo
     public String getChanges(@Nonnull Revision from,
                              @Nonnull Revision to,
                              @Nonnull String path) {
-        // first try to serve from cache
-        String diff = super.getChanges(from, to, path);
-        if (diff != null) {
-            return diff;
-        }
-        // grab from mongo
-        DBObject obj = changes.findOne(new BasicDBObject("_id", 
to.toString()));
-        if (obj == null) {
-            return null;
-        }
-        if (obj.get("_b").equals(from.toString())) {
-            // apply to diff cache and serve later requests from cache
-            Entry entry = super.newEntry(from, to);
-            applyToDiffCache(obj, "/", entry);
-            entry.done();
+        Lock lock = locks.get(from);
+        lock.lock();
+        try {
+            // first try to serve from cache
+            String diff = super.getChanges(from, to, path);
+            if (diff != null) {
+                return diff;
+            }
+            if (from.getClusterId() != to.getClusterId()) {
+                return null;
+            }
+            // check blacklist
+            if (blacklist.getIfPresent(from + "/" + to) != null) {
+                return null;
+            }
+            Revision id = to;
+            Diff d = null;
+            int numCommits = 0;
+            for (;;) {
+                // grab from mongo
+                DBObject obj = changes.findOne(new BasicDBObject("_id", 
id.toString()));
+                if (obj == null) {
+                    return null;
+                }
+                numCommits++;
+                if (numCommits > 32) {
+                    // do not merge more than 32 commits
+                    blacklist.put(from + "/" + to, "");
+                    return null;
+                }
+                if (d == null) {
+                    d = new Diff(obj);
+                } else {
+                    d.mergeBeforeDiff(new Diff(obj));
+                }
 
-            DBObject current = obj;
-            for (String name : PathUtils.elements(path)) {
-                String n = Utils.unescapePropertyName(name);
-                current = (DBObject) obj.get(n);
-                if (current == null) {
+                // the from revision of the current diff
+                id = Revision.fromString((String) obj.get("_b"));
+                if (from.equals(id)) {
+                    // diff is complete
+                    LOG.debug("Built diff from {} commits", numCommits);
+                    // apply to diff cache and serve later requests from cache
+                    d.applyToEntry(super.newEntry(from, to)).done();
+                    // return changes
+                    return d.getChanges(path);
+                }
+
+                if (StableRevisionComparator.INSTANCE.compare(id, from) < 0) {
                     break;
                 }
             }
-            if (current == null || !current.containsField("_c")) {
-                // no changes here
-                return "";
-            } else {
-                return current.get("_c").toString();
-            }
+            return null;
+        } finally {
+            lock.unlock();
         }
-        // diff request goes across multiple commits
-        // TODO: implement
-        return null;
     }
 
     @Nonnull
@@ -107,35 +141,19 @@ public class MongoDiffCache extends Memo
                           @Nonnull final Revision to) {
         return new MemoryEntry(from, to) {
 
-            private BasicDBObject commit = new BasicDBObject();
-
-            {
-                commit.put("_id", to.toString());
-                commit.put("_b", from.toString());
-            }
+            private Diff commit = new Diff(from, to);
 
             @Override
             public void append(@Nonnull String path, @Nonnull String changes) {
                 // super.append() will apply to diff cache in base class
                 super.append(path, changes);
-                BasicDBObject current = commit;
-                for (String name : PathUtils.elements(path)) {
-                    String escName = Utils.escapePropertyName(name);
-                    if (current.containsField(escName)) {
-                        current = (BasicDBObject) current.get(escName);
-                    } else {
-                        BasicDBObject child = new BasicDBObject();
-                        current.append(escName, child);
-                        current = child;
-                    }
-                }
-                current.append("_c", checkNotNull(changes));
+                commit.append(path, changes);
             }
 
             @Override
             public void done() {
                 try {
-                    changes.insert(commit, WriteConcern.UNACKNOWLEDGED);
+                    changes.insert(commit.doc, WriteConcern.UNACKNOWLEDGED);
                 } catch (MongoException e) {
                     LOG.warn("Write back of diff cache entry failed", e);
                 }
@@ -143,19 +161,202 @@ public class MongoDiffCache extends Memo
         };
     }
 
-    private void applyToDiffCache(DBObject obj,
-                                  String path,
-                                  Entry entry) {
-        String diff = (String) obj.get("_c");
-        if (diff != null) {
-            entry.append(path, diff);
-        }
-        for (String k : obj.keySet()) {
-            if (Utils.isPropertyName(k)) {
-                String name = Utils.unescapePropertyName(k);
-                applyToDiffCache((DBObject) obj.get(k),
-                        PathUtils.concat(path, name), entry);
+    static class Diff {
+
+        private final DBObject doc;
+
+        Diff(Revision from, Revision to) {
+            this.doc = new BasicDBObject();
+            this.doc.put("_id", to.toString());
+            this.doc.put("_b", from.toString());
+        }
+
+        Diff(DBObject doc) {
+            this.doc = doc;
+        }
+
+        void append(String path, String changes) {
+            DBObject current = doc;
+            for (String name : PathUtils.elements(path)) {
+                String escName = Utils.escapePropertyName(name);
+                if (current.containsField(escName)) {
+                    current = (DBObject) current.get(escName);
+                } else {
+                    BasicDBObject child = new BasicDBObject();
+                    current.put(escName, child);
+                    current = child;
+                }
+            }
+            current.put("_c", checkNotNull(changes));
+        }
+
+        String getChanges(String path) {
+            DBObject current = doc;
+            for (String name : PathUtils.elements(path)) {
+                String n = Utils.unescapePropertyName(name);
+                current = (DBObject) current.get(n);
+                if (current == null) {
+                    break;
+                }
             }
+            if (current == null || !current.containsField("_c")) {
+                // no changes here
+                return "";
+            } else {
+                return current.get("_c").toString();
+            }
+        }
+
+        Entry applyToEntry(Entry entry) {
+            applyInternal(doc, "/", entry);
+            return entry;
+        }
+
+        void mergeBeforeDiff(Diff before) {
+            mergeInternal(doc, before.doc, Sets.<String>newHashSet(),
+                    Sets.<String>newHashSet(), Sets.<String>newHashSet());
+            doc.put("_b", before.doc.get("_b"));
+        }
+
+        private static void mergeInternal(DBObject doc, DBObject before,
+                                          final Set<String> added,
+                                          final Set<String> removed,
+                                          final Set<String> modified) {
+            added.clear();
+            removed.clear();
+            modified.clear();
+            String changes = (String) doc.get("_c");
+            if (changes != null) {
+                parse(changes, new ParserCallback() {
+                    @Override
+                    public void added(String name) {
+                        added.add(name);
+                    }
+
+                    @Override
+                    public void removed(String name) {
+                        removed.add(name);
+                    }
+
+                    @Override
+                    public void modified(String name) {
+                        modified.add(name);
+                    }
+                });
+            }
+
+            changes = (String) before.get("_c");
+            if (changes != null) {
+                parse(changes, new ParserCallback() {
+                    @Override
+                    public void added(String name) {
+                        if (modified.remove(name) || !removed.remove(name)) {
+                            added.add(name);
+                        }
+                    }
+
+                    @Override
+                    public void removed(String name) {
+                        if (added.remove(name)) {
+                            modified.add(name);
+                        } else {
+                            removed.add(name);
+                        }
+                    }
+
+                    @Override
+                    public void modified(String name) {
+                        if (added.remove(name) || !removed.contains(name)) {
+                            modified.add(name);
+                        }
+                    }
+                });
+                doc.put("_c", serialize(added, removed, modified));
+            }
+
+            // merge recursively
+            for (String k : before.keySet()) {
+                if (Utils.isPropertyName(k)) {
+                    DBObject beforeChild = (DBObject) before.get(k);
+                    DBObject thisChild = (DBObject) doc.get(k);
+                    if (thisChild == null) {
+                        thisChild = new BasicDBObject();
+                        doc.put(k, thisChild);
+                    }
+                    mergeInternal(thisChild, beforeChild, added, removed, 
modified);
+                }
+            }
+        }
+
+        private static String serialize(final Set<String> added,
+                                        final Set<String> removed,
+                                        final Set<String> modified) {
+            JsopWriter w = new JsopStream();
+            for (String p : added) {
+                
w.tag('+').key(PathUtils.getName(p)).object().endObject().newline();
+            }
+            for (String p : removed) {
+                w.tag('-').value(PathUtils.getName(p)).newline();
+            }
+            for (String p : modified) {
+                
w.tag('^').key(PathUtils.getName(p)).object().endObject().newline();
+            }
+            return w.toString();
+        }
+
+        private static void parse(String changes, ParserCallback callback) {
+            JsopTokenizer t = new JsopTokenizer(changes);
+            for (;;) {
+                int r = t.read();
+                if (r == JsopReader.END) {
+                    break;
+                }
+                switch (r) {
+                    case '+': {
+                        callback.added(t.readString());
+                        t.read(':');
+                        t.read('{');
+                        t.read('}');
+                        break;
+                    }
+                    case '-': {
+                        callback.removed(t.readString());
+                        break;
+                    }
+                    case '^': {
+                        callback.modified(t.readString());
+                        t.read(':');
+                        t.read('{');
+                        t.read('}');
+                        break;
+                    }
+                    default:
+                        throw new IllegalArgumentException("jsonDiff: illegal 
token '"
+                                + t.getToken() + "' at pos: " + t.getLastPos() 
+ ' ' + changes);
+                }
+            }
+        }
+
+        private void applyInternal(DBObject obj,
+                                   String path,
+                                   Entry entry) {
+            String diff = (String) obj.get("_c");
+            if (diff != null) {
+                entry.append(path, diff);
+            }
+            for (String k : obj.keySet()) {
+                if (Utils.isPropertyName(k)) {
+                    String name = Utils.unescapePropertyName(k);
+                    applyInternal((DBObject) obj.get(k), 
PathUtils.concat(path, name), entry);
+                }
+            }
+        }
+
+        private interface ParserCallback {
+
+            void added(String name);
+            void removed(String name);
+            void modified(String name);
         }
     }
 }

Added: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoDiffCacheTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoDiffCacheTest.java?rev=1580792&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoDiffCacheTest.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoDiffCacheTest.java
 Mon Mar 24 09:20:55 2014
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Test;
+
+import static org.apache.jackrabbit.oak.plugins.document.MongoDiffCache.Diff;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the MongoDiffCache.
+ */
+public class MongoDiffCacheTest {
+
+    @Test
+    public void diff() {
+        Revision from = Revision.fromString("r1-0-1");
+        Revision to = Revision.fromString("r2-0-1");
+        Diff diff = new Diff(from, to);
+        diff.append("/", "^\"foo\":{}");
+        diff.append("/foo", "^\"bar\":{}");
+        diff.append("/foo/bar", "-\"qux\"");
+
+        assertEquals("^\"foo\":{}", diff.getChanges("/"));
+        assertEquals("^\"bar\":{}", diff.getChanges("/foo"));
+        assertEquals("-\"qux\"", diff.getChanges("/foo/bar"));
+        assertEquals("", diff.getChanges("/baz"));
+    }
+
+    @Test
+    public void merge() {
+        assertEquals("+", doMerge("+", ""));
+        assertEquals("-", doMerge("-", ""));
+        assertEquals("^", doMerge("^", ""));
+
+        assertEquals("+", doMerge("+"));
+        assertEquals("^", doMerge("-", "+"));
+        assertEquals("^", doMerge("^", "-", "+"));
+        assertEquals("+", doMerge("+", "^", "-", "+"));
+
+        assertEquals("-", doMerge("-"));
+        assertEquals("-", doMerge("^", "-"));
+        assertEquals("", doMerge("+", "^", "-"));
+        assertEquals("-", doMerge("-", "+", "^", "-"));
+
+        assertEquals("^", doMerge("^"));
+        assertEquals("+", doMerge("+", "^"));
+        assertEquals("^", doMerge("-", "+", "^"));
+        assertEquals("^", doMerge("^", "-", "+", "^"));
+    }
+
+    private String doMerge(String... ops) {
+        List<String> opsList = Arrays.asList(ops);
+        Diff diff = null;
+        for (int i = opsList.size() - 1; i >= 0; i--) {
+            String op = opsList.get(i);
+            if (diff == null) {
+                diff = diffFromOp(op);
+            } else {
+                diff.mergeBeforeDiff(diffFromOp(op));
+            }
+        }
+        if (diff == null) {
+            return null;
+        }
+        String changes = diff.getChanges("/test");
+        if (changes == null) {
+            return null;
+        } else if (changes.length() == 0) {
+            return "";
+        } else {
+            return changes.substring(0, 1);
+        }
+    }
+
+    private static String changeFromOp(String op) {
+        if (op.length() == 0) {
+            return "";
+        }
+        String changes = op + "\"child\"";
+        if (!op.equals("-")) {
+            changes += ":{}";
+        }
+        return changes;
+    }
+
+    private static Diff diffFromOp(String op) {
+        Revision from = Revision.fromString("r1-0-1");
+        Revision to = Revision.fromString("r2-0-1");
+        Diff d = new Diff(from, to);
+        d.append("/test", changeFromOp(op));
+        return d;
+    }
+}

Propchange: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoDiffCacheTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoDiffCacheTest.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision Rev URL


Reply via email to