Author: mreutegg
Date: Tue Oct 15 15:35:16 2013
New Revision: 1532391
URL: http://svn.apache.org/r1532391
Log:
OAK-1088: Thread safe MongoDocumentStore
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoDocumentStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Node.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/DocumentSplitTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMKDocumentStoreIT.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoDocumentStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoDocumentStore.java?rev=1532391&r1=1532390&r2=1532391&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoDocumentStore.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoDocumentStore.java
Tue Oct 15 15:35:16 2013
@@ -26,6 +26,7 @@ import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.locks.Lock;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
@@ -37,7 +38,10 @@ import org.apache.jackrabbit.oak.cache.C
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Objects;
import com.google.common.cache.Cache;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Striped;
import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
@@ -71,6 +75,11 @@ public class MongoDocumentStore implemen
private final CacheStats cacheStats;
/**
+ * Locks to ensure cache consistency on reads, writes and invalidation.
+ */
+ private final Striped<Lock> locks = Striped.lock(64);
+
+ /**
* Comparator for maps with {@link Revision} keys. The maps are ordered
* descending, newest revisions first!
*/
@@ -121,13 +130,25 @@ public class MongoDocumentStore implemen
@Override
public void invalidateCache() {
- nodesCache.invalidateAll();
+ for (String key : nodesCache.asMap().keySet()) {
+ Lock lock = getAndLock(key);
+ try {
+ nodesCache.invalidate(key);
+ } finally {
+ lock.unlock();
+ }
+ }
}
@Override
public <T extends Document> void invalidateCache(Collection<T> collection,
String key) {
if (collection == Collection.NODES) {
- nodesCache.invalidate(key);
+ Lock lock = getAndLock(key);
+ try {
+ nodesCache.invalidate(key);
+ } finally {
+ lock.unlock();
+ }
}
}
@@ -145,28 +166,33 @@ public class MongoDocumentStore implemen
}
try {
NodeDocument doc;
- if (maxCacheAge == 0) {
- nodesCache.invalidate(key);
- }
- while (true) {
- doc = nodesCache.get(key, new Callable<NodeDocument>() {
- @Override
- public NodeDocument call() throws Exception {
- NodeDocument doc = (NodeDocument)
findUncached(collection, key);
- if (doc == null) {
- doc = NodeDocument.NULL;
+ Lock lock = getAndLock(key);
+ try {
+ if (maxCacheAge == 0) {
+ invalidateCache(collection, key);
+ }
+ while (true) {
+ doc = nodesCache.get(key, new Callable<NodeDocument>() {
+ @Override
+ public NodeDocument call() throws Exception {
+ NodeDocument doc = (NodeDocument)
findUncached(collection, key);
+ if (doc == null) {
+ doc = NodeDocument.NULL;
+ }
+ return doc;
}
- return doc;
+ });
+ if (maxCacheAge == 0 || maxCacheAge == Integer.MAX_VALUE) {
+ break;
}
- });
- if (maxCacheAge == 0 || maxCacheAge == Integer.MAX_VALUE) {
- break;
- }
- if (System.currentTimeMillis() - doc.getCreated() <
maxCacheAge) {
- break;
+ if (System.currentTimeMillis() - doc.getCreated() <
maxCacheAge) {
+ break;
+ }
+ // too old: invalidate, try again
+ invalidateCache(collection, key);
}
- // too old: invalidate, try again
- nodesCache.invalidate(key);
+ } finally {
+ lock.unlock();
}
if (doc == NodeDocument.NULL) {
return null;
@@ -235,7 +261,29 @@ public class MongoDocumentStore implemen
T doc = convertFromDBObject(collection, o);
if (collection == Collection.NODES && doc != null) {
doc.seal();
- nodesCache.put(doc.getId(), (NodeDocument) doc);
+ String id = doc.getId();
+ Lock lock = getAndLock(id);
+ try {
+ // do not overwrite document in cache if the
+ // existing one in the cache is newer
+ NodeDocument cached = nodesCache.getIfPresent(id);
+ if (cached != null && cached != NodeDocument.NULL) {
+ // check mod count
+ Number cachedModCount = cached.getModCount();
+ Number modCount = doc.getModCount();
+ if (cachedModCount == null || modCount == null) {
+ throw new IllegalStateException(
+ "Missing " + NodeDocument.MOD_COUNT);
+ }
+ if (modCount.longValue() >
cachedModCount.longValue()) {
+ nodesCache.put(id, (NodeDocument) doc);
+ }
+ } else {
+ nodesCache.put(id, (NodeDocument) doc);
+ }
+ } finally {
+ lock.unlock();
+ }
}
list.add(doc);
}
@@ -251,10 +299,8 @@ public class MongoDocumentStore implemen
DBCollection dbCollection = getDBCollection(collection);
long start = start();
try {
- if (collection == Collection.NODES) {
- nodesCache.invalidate(key);
- }
WriteResult writeResult =
dbCollection.remove(getByKeyQuery(key).get(), WriteConcern.SAFE);
+ invalidateCache(collection, key);
if (writeResult.getError() != null) {
throw new MicroKernelException("Remove failed: " +
writeResult.getError());
}
@@ -271,19 +317,20 @@ public class MongoDocumentStore implemen
DBCollection dbCollection = getDBCollection(collection);
DBObject update = createUpdate(updateOp);
- // get modCount of cached document
- Number modCount = null;
- T cachedDoc = null;
- if (collection == Collection.NODES) {
- //noinspection unchecked
- cachedDoc = (T) nodesCache.getIfPresent(updateOp.getId());
- if (cachedDoc != null) {
- modCount = cachedDoc.getModCount();
- }
- }
-
+ Lock lock = getAndLock(updateOp.getId());
long start = start();
try {
+ // get modCount of cached document
+ Number modCount = null;
+ T cachedDoc = null;
+ if (collection == Collection.NODES) {
+ //noinspection unchecked
+ cachedDoc = (T) nodesCache.getIfPresent(updateOp.getId());
+ if (cachedDoc != null) {
+ modCount = cachedDoc.getModCount();
+ }
+ }
+
// perform a conditional update with limited result
// if we have a matching modCount
if (modCount != null) {
@@ -319,6 +366,7 @@ public class MongoDocumentStore implemen
} catch (Exception e) {
throw new MicroKernelException(e);
} finally {
+ lock.unlock();
end("findAndModify", start);
}
}
@@ -395,8 +443,12 @@ public class MongoDocumentStore implemen
}
if (collection == Collection.NODES) {
for (T doc : docs) {
- doc.seal();
- nodesCache.put(doc.getId(), (NodeDocument) doc);
+ Lock lock = getAndLock(doc.getId());
+ try {
+ addToCache((NodeDocument) doc);
+ } finally {
+ lock.unlock();
+ }
}
}
return true;
@@ -417,20 +469,32 @@ public class MongoDocumentStore implemen
DBObject update = createUpdate(updateOp);
long start = start();
try {
+ Map<String, NodeDocument> cachedDocs = Collections.emptyMap();
+ if (collection == Collection.NODES) {
+ cachedDocs = Maps.newHashMap();
+ for (String key : keys) {
+ cachedDocs.put(key, nodesCache.getIfPresent(key));
+ }
+ }
try {
WriteResult writeResult =
dbCollection.updateMulti(query.get(), update);
if (writeResult.getError() != null) {
throw new MicroKernelException("Update failed: " +
writeResult.getError());
}
- if (collection == Collection.NODES) {
- // update cache
- for (String key : keys) {
- @SuppressWarnings("unchecked")
- T doc = (T) nodesCache.getIfPresent(key);
- if (doc != null) {
- applyToCache(collection, doc, new UpdateOp(key,
updateOp));
+ // update cache
+ for (Entry<String, NodeDocument> entry :
cachedDocs.entrySet()) {
+ Lock lock = getAndLock(entry.getKey());
+ try {
+ if (entry.getValue() == null) {
+ // make sure concurrently loaded document is
invalidated
+ nodesCache.invalidate(entry.getKey());
+ } else {
+ applyToCache(Collection.NODES, entry.getValue(),
+ new UpdateOp(entry.getKey(), updateOp));
}
+ } finally {
+ lock.unlock();
}
}
} catch (MongoException e) {
@@ -517,9 +581,9 @@ public class MongoDocumentStore implemen
/**
- * Applies an update to the nodes cache.
- * <p>
- * FIXME: ensure consistent cache update.
+ * Applies an update to the nodes cache. This method does not acquire
+ * a lock for the document. The caller must ensure it holds a lock for
+ * the updated document. See striped {@link #locks}.
*
* @param <T> the document type.
* @param collection the document collection.
@@ -532,7 +596,7 @@ public class MongoDocumentStore implemen
@Nonnull UpdateOp updateOp)
{
// cache the new document
if (collection == Collection.NODES) {
- T newDoc = collection.newDocument(this);
+ NodeDocument newDoc = (NodeDocument) collection.newDocument(this);
if (oldDoc != null) {
oldDoc.deepCopy(newDoc);
oldDoc.seal();
@@ -540,7 +604,70 @@ public class MongoDocumentStore implemen
String key = updateOp.getId();
MemoryDocumentStore.applyChanges(newDoc, updateOp, comparator);
newDoc.seal();
- nodesCache.put(key, (NodeDocument) newDoc);
+
+ NodeDocument cached = addToCache(newDoc);
+ if (cached == newDoc) {
+ // successful
+ return;
+ }
+ if (oldDoc == null) {
+ // this is an insert and some other thread was quicker
+ // loading it into the cache -> return now
+ return;
+ }
+ // this is an update (oldDoc != null)
+ if (Objects.equal(cached.getModCount(), oldDoc.getModCount())) {
+ nodesCache.put(key, newDoc);
+ } else {
+ // the cache entry was modified by some other thread in
+ // the meantime. the updated cache entry may or may not
+ // include this update. we cannot just apply our update
+ // on top of the cached entry.
+ // therefore we must invalidate the cache entry
+ nodesCache.invalidate(key);
+ }
+ }
+ }
+
+ /**
+ * Adds a document to the {@link #nodesCache} iff there is no document
+ * in the cache with the document key. This method does not acquire a lock
+ * from {@link #locks}! The caller must ensure a lock is held for the
+ * given document.
+ *
+ * @param doc the document to add to the cache.
+ * @return either the given <code>doc</code> or the document already
present
+ * in the cache.
+ */
+ @Nonnull
+ private NodeDocument addToCache(final @Nonnull NodeDocument doc) {
+ if (doc == NodeDocument.NULL) {
+ throw new IllegalArgumentException("doc must not be NULL
document");
+ }
+ doc.seal();
+ // make sure we only cache the document if it wasn't
+ // changed and cached by some other thread in the
+ // meantime. That is, use get() with a Callable,
+ // which is only used when the document isn't there
+ try {
+ for (;;) {
+ NodeDocument cached = nodesCache.get(doc.getId(),
+ new Callable<NodeDocument>() {
+ @Override
+ public NodeDocument call() {
+ return doc;
+ }
+ });
+ if (cached != NodeDocument.NULL) {
+ return cached;
+ } else {
+ nodesCache.invalidate(doc.getId());
+ }
+ }
+ } catch (ExecutionException e) {
+ // will never happen because call() just returns
+ // the already available doc
+ throw new IllegalStateException(e);
}
}
@@ -620,4 +747,10 @@ public class MongoDocumentStore implemen
return update;
}
+
+ private Lock getAndLock(String key) {
+ Lock l = locks.get(key);
+ l.lock();
+ return l;
+ }
}
\ No newline at end of file
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java?rev=1532391&r1=1532390&r2=1532391&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java
Tue Oct 15 15:35:16 2013
@@ -519,21 +519,30 @@ public class MongoMK implements MicroKer
* Get the node for the given path and revision. The returned object might
* not be modified directly.
*
- * @param path
- * @param rev
- * @return the node
+ * @param path the path of the node.
+ * @param rev the read revision.
+ * @return the node or <code>null</code> if the node does not exist at the
+ * given revision.
*/
- Node getNode(@Nonnull String path, @Nonnull Revision rev) {
+ @CheckForNull
+ Node getNode(final @Nonnull String path, final @Nonnull Revision rev) {
checkRevisionAge(checkNotNull(rev), checkNotNull(path));
- String key = path + "@" + rev;
- Node node = nodeCache.getIfPresent(key);
- if (node == null) {
- node = readNode(path, rev);
- if (node != null) {
- nodeCache.put(key, node);
- }
+ try {
+ String key = path + "@" + rev;
+ Node node = nodeCache.get(key, new Callable<Node>() {
+ @Override
+ public Node call() throws Exception {
+ Node n = readNode(path, rev);
+ if (n == null) {
+ n = Node.MISSING;
+ }
+ return n;
+ }
+ });
+ return node == Node.MISSING ? null : node;
+ } catch (ExecutionException e) {
+ throw new MicroKernelException(e);
}
- return node;
}
/**
@@ -682,11 +691,12 @@ public class MongoMK implements MicroKer
@CheckForNull
private Node readNode(String path, Revision readRevision) {
String id = Utils.getIdFromPath(path);
+ Revision lastRevision = getPendingModifications().get(path);
NodeDocument doc = store.find(Collection.NODES, id);
if (doc == null) {
return null;
}
- return doc.getNodeAtRevision(this, readRevision);
+ return doc.getNodeAtRevision(this, readRevision, lastRevision);
}
@Override
@@ -1154,7 +1164,6 @@ public class MongoMK implements MicroKer
for (String childPath : c.children) {
markAsDeleted(childPath, commit, true);
}
- nodeChildrenCache.invalidate(n.getId());
}
}
}
@@ -1330,9 +1339,9 @@ public class MongoMK implements MicroKer
// we no longer need to update it in a background process
unsaved.remove(path);
}
- Children c = nodeChildrenCache.getIfPresent(path + "@" + rev);
+ String key = path + "@" + rev;
+ Children c = nodeChildrenCache.getIfPresent(key);
if (isNew || (!isDelete && c != null)) {
- String key = path + "@" + rev;
Children c2 = new Children();
TreeSet<String> set = new TreeSet<String>();
if (c != null) {
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Node.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Node.java?rev=1532391&r1=1532390&r2=1532391&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Node.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Node.java
Tue Oct 15 15:35:16 2013
@@ -30,6 +30,16 @@ import org.apache.jackrabbit.oak.plugins
*/
public class Node implements CacheValue {
+ /**
+ * A node, which does not exist at a given revision.
+ */
+ static final Node MISSING = new Node(null, null) {
+ @Override
+ public int getMemory() {
+ return 8;
+ }
+ };
+
final String path;
final Revision rev;
final Map<String, String> properties = Utils.newMap();
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java?rev=1532391&r1=1532390&r2=1532391&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java
Tue Oct 15 15:35:16 2013
@@ -373,11 +373,17 @@ public class NodeDocument extends Docume
*
* @param context the revision context.
* @param readRevision the read revision.
+ * @param lastModified the revision when this node was last modified, but
+ * the value is potentially not yet reflected in this
+ * document.
+ * See {@link
RevisionContext#getPendingModifications()}.
* @return the node or <code>null</code> if the node doesn't exist at the
* given read revision.
*/
@CheckForNull
- public Node getNodeAtRevision(RevisionContext context, Revision
readRevision) {
+ public Node getNodeAtRevision(@Nonnull RevisionContext context,
+ @Nonnull Revision readRevision,
+ @Nullable Revision lastModified) {
Set<Revision> validRevisions = new HashSet<Revision>();
Revision min = getLiveRevision(context, readRevision, validRevisions);
if (min == null) {
@@ -408,7 +414,6 @@ public class NodeDocument extends Docume
Revision lastRevision = null;
Map<Integer, Revision> lastRevs = Maps.newHashMap(getLastRev());
// overlay with unsaved last modified from this instance
- Revision lastModified = context.getPendingModifications().get(path);
if (lastModified != null) {
lastRevs.put(context.getClusterId(), lastModified);
}
@@ -717,8 +722,7 @@ public class NodeDocument extends Docume
}
/**
- * Returns the local value map for the given key. Returns <code>null</code>
- * if no such value map exists.
+ * Returns the local value map for the given key.
*
* @param key the key.
* @return local value map.
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/DocumentSplitTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/DocumentSplitTest.java?rev=1532391&r1=1532390&r2=1532391&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/DocumentSplitTest.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/DocumentSplitTest.java
Tue Oct 15 15:35:16 2013
@@ -60,7 +60,7 @@ public class DocumentSplitTest extends B
assertTrue(doc.isCommitted(rev));
}
// check if document is still there
- assertNotNull(doc.getNodeAtRevision(mk, Revision.fromString(head)));
+ assertNotNull(mk.getNode("/", Revision.fromString(head)));
mk.commit("/", "+\"baz\":{}", null, null);
mk.setAsyncDelay(0);
mk.backgroundWrite();
@@ -94,7 +94,7 @@ public class DocumentSplitTest extends B
assertTrue(doc.containsRevision(rev));
assertTrue(doc.isCommitted(rev));
}
- Node node = doc.getNodeAtRevision(mk, Revision.fromString(head));
+ Node node = mk.getNode("/foo", Revision.fromString(head));
// check status of node
if (create) {
assertNull(node);
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMKDocumentStoreIT.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMKDocumentStoreIT.java?rev=1532391&r1=1532390&r2=1532391&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMKDocumentStoreIT.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMKDocumentStoreIT.java
Tue Oct 15 15:35:16 2013
@@ -20,18 +20,20 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.jackrabbit.oak.plugins.mongomk.util.Utils;
-import org.junit.Ignore;
import org.junit.Test;
+import com.google.common.collect.Maps;
+
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertNull;
/**
* Tests {@code MongoDocumentStore} with concurrent updates.
*/
-@Ignore
public class MongoMKDocumentStoreIT extends AbstractMongoConnectionTest {
private static final int NUM_THREADS = 3;
@@ -58,12 +60,51 @@ public class MongoMKDocumentStoreIT exte
}
}));
}
+ final List<Exception> exceptions = new ArrayList<Exception>();
+ final AtomicBoolean running = new AtomicBoolean(true);
+ Thread reader = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Map<Revision, Integer> previous = Maps.newHashMap();
+ while (running.get()) {
+ NodeDocument doc = docStore.find(Collection.NODES, id);
+ if (doc == null) {
+ throw new Exception("document is null");
+ }
+ Map<Revision, String> values = doc.getValueMap("prop");
+ for (Map.Entry<Revision, String> entry :
values.entrySet()) {
+ Revision r = entry.getKey();
+ Integer previousValue = previous.get(r);
+ Integer currentValue =
Integer.parseInt(entry.getValue());
+ if (previousValue != null &&
+ previousValue > currentValue) {
+ throw new Exception("inconsistent read for " +
+ r + ". previous value: " +
previousValue +
+ ", now: " + entry.getValue());
+ }
+ // remember for next round
+ previous.put(r, currentValue);
+ }
+ Thread.yield();
+ }
+ } catch (Exception e) {
+ exceptions.add(e);
+ }
+ }
+ });
+ reader.start();
for (Thread t : threads) {
t.start();
}
for (Thread t : threads) {
t.join();
}
+ running.set(false);
+ reader.join();
+ for (Exception e : exceptions) {
+ throw e;
+ }
NodeDocument doc = docStore.find(Collection.NODES, id);
assertNotNull(doc);
Map<Revision, String> values = doc.getLocalMap("prop");
@@ -83,4 +124,13 @@ public class MongoMKDocumentStoreIT exte
setUpConnection();
}
}
+
+ @Test
+ public void negativeCache() throws Exception {
+ String id = Utils.getIdFromPath("/test");
+ DocumentStore docStore = mk.getDocumentStore();
+ assertNull(docStore.find(Collection.NODES, id));
+ mk.commit("/", "+\"test\":{}", null, null);
+ assertNotNull(docStore.find(Collection.NODES, id));
+ }
}