Author: mreutegg
Date: Mon Mar 24 09:20:55 2014
New Revision: 1580792
URL: http://svn.apache.org/r1580792
Log:
OAK-1578: Configurable size of capped collection used by MongoDiffCache
Added:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoDiffCacheTest.java
(with props)
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MongoDiffCache.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java?rev=1580792&r1=1580791&r2=1580792&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
Mon Mar 24 09:20:55 2014
@@ -468,12 +468,14 @@ public class DocumentMK implements Micro
}
/**
- * Set the MongoDB connection to use. By default an in-memory store is
used.
+ * Use the given MongoDB as backend storage for the DocumentNodeStore.
*
* @param db the MongoDB connection
+ * @param changesSizeMB the size in MB of the capped collection backing
+ * the MongoDiffCache.
* @return this
*/
- public Builder setMongoDB(DB db) {
+ public Builder setMongoDB(DB db, int changesSizeMB) {
if (db != null) {
if (this.documentStore == null) {
this.documentStore = new MongoDocumentStore(db, this);
@@ -484,13 +486,23 @@ public class DocumentMK implements Micro
}
if (this.diffCache == null) {
- this.diffCache = new MongoDiffCache(db, this);
+ this.diffCache = new MongoDiffCache(db, changesSizeMB,
this);
}
}
return this;
}
/**
+ * Set the MongoDB connection to use. By default an in-memory store is
used.
+ *
+ * @param db the MongoDB connection
+ * @return this
+ */
+ public Builder setMongoDB(DB db) {
+ return setMongoDB(db, 8);
+ }
+
+ /**
* Sets a JDBC connection URL to use for the RDB document and blob
* stores.
*
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java?rev=1580792&r1=1580791&r2=1580792&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
Mon Mar 24 09:20:55 2014
@@ -70,6 +70,7 @@ public class DocumentNodeStoreService {
private static final String DEFAULT_URI = "mongodb://localhost:27017/oak";
private static final int DEFAULT_CACHE = 256;
private static final int DEFAULT_OFF_HEAP_CACHE = 0;
+ private static final int DEFAULT_CHANGES_SIZE = 256;
private static final String DEFAULT_DB = "oak";
private static final String PREFIX = "oak.documentstore.";
@@ -101,6 +102,9 @@ public class DocumentNodeStoreService {
@Property(intValue = DEFAULT_OFF_HEAP_CACHE)
private static final String PROP_OFF_HEAP_CACHE = "offHeapCache";
+ @Property(intValue = DEFAULT_CHANGES_SIZE)
+ private static final String PROP_CHANGES_SIZE = "changesSize";
+
/**
* Boolean value indicating a blobStore is to be used
*/
@@ -145,6 +149,7 @@ public class DocumentNodeStoreService {
int offHeapCache = PropertiesUtil.toInteger(prop(PROP_OFF_HEAP_CACHE),
DEFAULT_OFF_HEAP_CACHE);
int cacheSize = PropertiesUtil.toInteger(prop(PROP_CACHE),
DEFAULT_CACHE);
+ int changesSize = PropertiesUtil.toInteger(prop(PROP_CHANGES_SIZE),
DEFAULT_CHANGES_SIZE);
boolean useMK =
PropertiesUtil.toBoolean(context.getProperties().get(PROP_USE_MK), false);
@@ -155,8 +160,8 @@ public class DocumentNodeStoreService {
// Take care around not logging the uri directly as it
// might contain passwords
String type = useMK ? "MK" : "NodeStore";
- log.info("Starting Document{} with host={}, db={}, cache size
(MB)={}, Off Heap Cache size (MB)={}",
- type, mongoURI.getHosts(), db, cacheSize, offHeapCache);
+ log.info("Starting Document{} with host={}, db={}, cache size
(MB)={}, Off Heap Cache size (MB)={}, 'changes' collection size (MB)={}",
+ type, mongoURI.getHosts(), db, cacheSize, offHeapCache,
changesSize);
log.info("Mongo Connection details {}",
MongoConnection.toString(mongoURI.getOptions()));
}
@@ -173,7 +178,7 @@ public class DocumentNodeStoreService {
mkBuilder.setBlobStore(blobStore);
}
- mkBuilder.setMongoDB(mongoDB);
+ mkBuilder.setMongoDB(mongoDB, changesSize);
mk = mkBuilder.open();
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MongoDiffCache.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MongoDiffCache.java?rev=1580792&r1=1580791&r2=1580792&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MongoDiffCache.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MongoDiffCache.java
Mon Mar 24 09:20:55 2014
@@ -16,14 +16,25 @@
*/
package org.apache.jackrabbit.oak.plugins.document;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import org.apache.jackrabbit.oak.commons.PathUtils;
+import org.apache.jackrabbit.oak.commons.json.JsopReader;
+import org.apache.jackrabbit.oak.commons.json.JsopStream;
+import org.apache.jackrabbit.oak.commons.json.JsopTokenizer;
+import org.apache.jackrabbit.oak.commons.json.JsopWriter;
import org.apache.jackrabbit.oak.plugins.document.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Striped;
import com.mongodb.BasicDBObject;
import com.mongodb.BasicDBObjectBuilder;
import com.mongodb.DB;
@@ -43,20 +54,22 @@ public class MongoDiffCache extends Memo
private static final long MB = 1024 * 1024;
- private static final String COLLECTION_NAME = "changeLog";
-
- // TODO: make configurable
- private static final DBObject COLLECTION_OPTIONS =
BasicDBObjectBuilder.start()
- .add("capped", true).add("size", 256 * MB).get();
+ private static final String COLLECTION_NAME = "changes";
private final DBCollection changes;
- public MongoDiffCache(DB db, DocumentMK.Builder builder) {
+ private final Cache<String, String> blacklist =
CacheBuilder.newBuilder().maximumSize(1024).build();
+
+ private final Striped<Lock> locks = Striped.lock(16);
+
+ public MongoDiffCache(DB db, int sizeMB, DocumentMK.Builder builder) {
super(builder);
if (db.collectionExists(COLLECTION_NAME)) {
changes = db.getCollection(COLLECTION_NAME);
} else {
- changes = db.createCollection(COLLECTION_NAME, COLLECTION_OPTIONS);
+ changes = db.createCollection(COLLECTION_NAME,
+ BasicDBObjectBuilder.start().add("capped", true)
+ .add("size", sizeMB * MB).get());
}
}
@@ -65,40 +78,61 @@ public class MongoDiffCache extends Memo
public String getChanges(@Nonnull Revision from,
@Nonnull Revision to,
@Nonnull String path) {
- // first try to serve from cache
- String diff = super.getChanges(from, to, path);
- if (diff != null) {
- return diff;
- }
- // grab from mongo
- DBObject obj = changes.findOne(new BasicDBObject("_id",
to.toString()));
- if (obj == null) {
- return null;
- }
- if (obj.get("_b").equals(from.toString())) {
- // apply to diff cache and serve later requests from cache
- Entry entry = super.newEntry(from, to);
- applyToDiffCache(obj, "/", entry);
- entry.done();
+ Lock lock = locks.get(from);
+ lock.lock();
+ try {
+ // first try to serve from cache
+ String diff = super.getChanges(from, to, path);
+ if (diff != null) {
+ return diff;
+ }
+ if (from.getClusterId() != to.getClusterId()) {
+ return null;
+ }
+ // check blacklist
+ if (blacklist.getIfPresent(from + "/" + to) != null) {
+ return null;
+ }
+ Revision id = to;
+ Diff d = null;
+ int numCommits = 0;
+ for (;;) {
+ // grab from mongo
+ DBObject obj = changes.findOne(new BasicDBObject("_id",
id.toString()));
+ if (obj == null) {
+ return null;
+ }
+ numCommits++;
+ if (numCommits > 32) {
+ // do not merge more than 32 commits
+ blacklist.put(from + "/" + to, "");
+ return null;
+ }
+ if (d == null) {
+ d = new Diff(obj);
+ } else {
+ d.mergeBeforeDiff(new Diff(obj));
+ }
- DBObject current = obj;
- for (String name : PathUtils.elements(path)) {
- String n = Utils.unescapePropertyName(name);
- current = (DBObject) obj.get(n);
- if (current == null) {
+ // the from revision of the current diff
+ id = Revision.fromString((String) obj.get("_b"));
+ if (from.equals(id)) {
+ // diff is complete
+ LOG.debug("Built diff from {} commits", numCommits);
+ // apply to diff cache and serve later requests from cache
+ d.applyToEntry(super.newEntry(from, to)).done();
+ // return changes
+ return d.getChanges(path);
+ }
+
+ if (StableRevisionComparator.INSTANCE.compare(id, from) < 0) {
break;
}
}
- if (current == null || !current.containsField("_c")) {
- // no changes here
- return "";
- } else {
- return current.get("_c").toString();
- }
+ return null;
+ } finally {
+ lock.unlock();
}
- // diff request goes across multiple commits
- // TODO: implement
- return null;
}
@Nonnull
@@ -107,35 +141,19 @@ public class MongoDiffCache extends Memo
@Nonnull final Revision to) {
return new MemoryEntry(from, to) {
- private BasicDBObject commit = new BasicDBObject();
-
- {
- commit.put("_id", to.toString());
- commit.put("_b", from.toString());
- }
+ private Diff commit = new Diff(from, to);
@Override
public void append(@Nonnull String path, @Nonnull String changes) {
// super.append() will apply to diff cache in base class
super.append(path, changes);
- BasicDBObject current = commit;
- for (String name : PathUtils.elements(path)) {
- String escName = Utils.escapePropertyName(name);
- if (current.containsField(escName)) {
- current = (BasicDBObject) current.get(escName);
- } else {
- BasicDBObject child = new BasicDBObject();
- current.append(escName, child);
- current = child;
- }
- }
- current.append("_c", checkNotNull(changes));
+ commit.append(path, changes);
}
@Override
public void done() {
try {
- changes.insert(commit, WriteConcern.UNACKNOWLEDGED);
+ changes.insert(commit.doc, WriteConcern.UNACKNOWLEDGED);
} catch (MongoException e) {
LOG.warn("Write back of diff cache entry failed", e);
}
@@ -143,19 +161,202 @@ public class MongoDiffCache extends Memo
};
}
- private void applyToDiffCache(DBObject obj,
- String path,
- Entry entry) {
- String diff = (String) obj.get("_c");
- if (diff != null) {
- entry.append(path, diff);
- }
- for (String k : obj.keySet()) {
- if (Utils.isPropertyName(k)) {
- String name = Utils.unescapePropertyName(k);
- applyToDiffCache((DBObject) obj.get(k),
- PathUtils.concat(path, name), entry);
+ static class Diff {
+
+ private final DBObject doc;
+
+ Diff(Revision from, Revision to) {
+ this.doc = new BasicDBObject();
+ this.doc.put("_id", to.toString());
+ this.doc.put("_b", from.toString());
+ }
+
+ Diff(DBObject doc) {
+ this.doc = doc;
+ }
+
+ void append(String path, String changes) {
+ DBObject current = doc;
+ for (String name : PathUtils.elements(path)) {
+ String escName = Utils.escapePropertyName(name);
+ if (current.containsField(escName)) {
+ current = (DBObject) current.get(escName);
+ } else {
+ BasicDBObject child = new BasicDBObject();
+ current.put(escName, child);
+ current = child;
+ }
+ }
+ current.put("_c", checkNotNull(changes));
+ }
+
+ String getChanges(String path) {
+ DBObject current = doc;
+ for (String name : PathUtils.elements(path)) {
+ String n = Utils.unescapePropertyName(name);
+ current = (DBObject) current.get(n);
+ if (current == null) {
+ break;
+ }
}
+ if (current == null || !current.containsField("_c")) {
+ // no changes here
+ return "";
+ } else {
+ return current.get("_c").toString();
+ }
+ }
+
+ Entry applyToEntry(Entry entry) {
+ applyInternal(doc, "/", entry);
+ return entry;
+ }
+
+ void mergeBeforeDiff(Diff before) {
+ mergeInternal(doc, before.doc, Sets.<String>newHashSet(),
+ Sets.<String>newHashSet(), Sets.<String>newHashSet());
+ doc.put("_b", before.doc.get("_b"));
+ }
+
+ private static void mergeInternal(DBObject doc, DBObject before,
+ final Set<String> added,
+ final Set<String> removed,
+ final Set<String> modified) {
+ added.clear();
+ removed.clear();
+ modified.clear();
+ String changes = (String) doc.get("_c");
+ if (changes != null) {
+ parse(changes, new ParserCallback() {
+ @Override
+ public void added(String name) {
+ added.add(name);
+ }
+
+ @Override
+ public void removed(String name) {
+ removed.add(name);
+ }
+
+ @Override
+ public void modified(String name) {
+ modified.add(name);
+ }
+ });
+ }
+
+ changes = (String) before.get("_c");
+ if (changes != null) {
+ parse(changes, new ParserCallback() {
+ @Override
+ public void added(String name) {
+ if (modified.remove(name) || !removed.remove(name)) {
+ added.add(name);
+ }
+ }
+
+ @Override
+ public void removed(String name) {
+ if (added.remove(name)) {
+ modified.add(name);
+ } else {
+ removed.add(name);
+ }
+ }
+
+ @Override
+ public void modified(String name) {
+ if (added.remove(name) || !removed.contains(name)) {
+ modified.add(name);
+ }
+ }
+ });
+ doc.put("_c", serialize(added, removed, modified));
+ }
+
+ // merge recursively
+ for (String k : before.keySet()) {
+ if (Utils.isPropertyName(k)) {
+ DBObject beforeChild = (DBObject) before.get(k);
+ DBObject thisChild = (DBObject) doc.get(k);
+ if (thisChild == null) {
+ thisChild = new BasicDBObject();
+ doc.put(k, thisChild);
+ }
+ mergeInternal(thisChild, beforeChild, added, removed,
modified);
+ }
+ }
+ }
+
+ private static String serialize(final Set<String> added,
+ final Set<String> removed,
+ final Set<String> modified) {
+ JsopWriter w = new JsopStream();
+ for (String p : added) {
+
w.tag('+').key(PathUtils.getName(p)).object().endObject().newline();
+ }
+ for (String p : removed) {
+ w.tag('-').value(PathUtils.getName(p)).newline();
+ }
+ for (String p : modified) {
+
w.tag('^').key(PathUtils.getName(p)).object().endObject().newline();
+ }
+ return w.toString();
+ }
+
+ private static void parse(String changes, ParserCallback callback) {
+ JsopTokenizer t = new JsopTokenizer(changes);
+ for (;;) {
+ int r = t.read();
+ if (r == JsopReader.END) {
+ break;
+ }
+ switch (r) {
+ case '+': {
+ callback.added(t.readString());
+ t.read(':');
+ t.read('{');
+ t.read('}');
+ break;
+ }
+ case '-': {
+ callback.removed(t.readString());
+ break;
+ }
+ case '^': {
+ callback.modified(t.readString());
+ t.read(':');
+ t.read('{');
+ t.read('}');
+ break;
+ }
+ default:
+ throw new IllegalArgumentException("jsonDiff: illegal
token '"
+ + t.getToken() + "' at pos: " + t.getLastPos()
+ ' ' + changes);
+ }
+ }
+ }
+
+ private void applyInternal(DBObject obj,
+ String path,
+ Entry entry) {
+ String diff = (String) obj.get("_c");
+ if (diff != null) {
+ entry.append(path, diff);
+ }
+ for (String k : obj.keySet()) {
+ if (Utils.isPropertyName(k)) {
+ String name = Utils.unescapePropertyName(k);
+ applyInternal((DBObject) obj.get(k),
PathUtils.concat(path, name), entry);
+ }
+ }
+ }
+
+ private interface ParserCallback {
+
+ void added(String name);
+ void removed(String name);
+ void modified(String name);
}
}
}
Added:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoDiffCacheTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoDiffCacheTest.java?rev=1580792&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoDiffCacheTest.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoDiffCacheTest.java
Mon Mar 24 09:20:55 2014
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Test;
+
+import static org.apache.jackrabbit.oak.plugins.document.MongoDiffCache.Diff;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the MongoDiffCache.
+ */
+public class MongoDiffCacheTest {
+
+ @Test
+ public void diff() {
+ Revision from = Revision.fromString("r1-0-1");
+ Revision to = Revision.fromString("r2-0-1");
+ Diff diff = new Diff(from, to);
+ diff.append("/", "^\"foo\":{}");
+ diff.append("/foo", "^\"bar\":{}");
+ diff.append("/foo/bar", "-\"qux\"");
+
+ assertEquals("^\"foo\":{}", diff.getChanges("/"));
+ assertEquals("^\"bar\":{}", diff.getChanges("/foo"));
+ assertEquals("-\"qux\"", diff.getChanges("/foo/bar"));
+ assertEquals("", diff.getChanges("/baz"));
+ }
+
+ @Test
+ public void merge() {
+ assertEquals("+", doMerge("+", ""));
+ assertEquals("-", doMerge("-", ""));
+ assertEquals("^", doMerge("^", ""));
+
+ assertEquals("+", doMerge("+"));
+ assertEquals("^", doMerge("-", "+"));
+ assertEquals("^", doMerge("^", "-", "+"));
+ assertEquals("+", doMerge("+", "^", "-", "+"));
+
+ assertEquals("-", doMerge("-"));
+ assertEquals("-", doMerge("^", "-"));
+ assertEquals("", doMerge("+", "^", "-"));
+ assertEquals("-", doMerge("-", "+", "^", "-"));
+
+ assertEquals("^", doMerge("^"));
+ assertEquals("+", doMerge("+", "^"));
+ assertEquals("^", doMerge("-", "+", "^"));
+ assertEquals("^", doMerge("^", "-", "+", "^"));
+ }
+
+ private String doMerge(String... ops) {
+ List<String> opsList = Arrays.asList(ops);
+ Diff diff = null;
+ for (int i = opsList.size() - 1; i >= 0; i--) {
+ String op = opsList.get(i);
+ if (diff == null) {
+ diff = diffFromOp(op);
+ } else {
+ diff.mergeBeforeDiff(diffFromOp(op));
+ }
+ }
+ if (diff == null) {
+ return null;
+ }
+ String changes = diff.getChanges("/test");
+ if (changes == null) {
+ return null;
+ } else if (changes.length() == 0) {
+ return "";
+ } else {
+ return changes.substring(0, 1);
+ }
+ }
+
+ private static String changeFromOp(String op) {
+ if (op.length() == 0) {
+ return "";
+ }
+ String changes = op + "\"child\"";
+ if (!op.equals("-")) {
+ changes += ":{}";
+ }
+ return changes;
+ }
+
+ private static Diff diffFromOp(String op) {
+ Revision from = Revision.fromString("r1-0-1");
+ Revision to = Revision.fromString("r2-0-1");
+ Diff d = new Diff(from, to);
+ d.append("/test", changeFromOp(op));
+ return d;
+ }
+}
Propchange:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoDiffCacheTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoDiffCacheTest.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision Rev URL