Author: mreutegg
Date: Tue Oct 15 15:35:16 2013
New Revision: 1532391

URL: http://svn.apache.org/r1532391
Log:
OAK-1088: Thread safe MongoDocumentStore

Modified:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoDocumentStore.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Node.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/DocumentSplitTest.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMKDocumentStoreIT.java

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoDocumentStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoDocumentStore.java?rev=1532391&r1=1532390&r2=1532391&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoDocumentStore.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoDocumentStore.java
 Tue Oct 15 15:35:16 2013
@@ -26,6 +26,7 @@ import java.util.Map.Entry;
 import java.util.TreeMap;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.locks.Lock;
 
 import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
@@ -37,7 +38,10 @@ import org.apache.jackrabbit.oak.cache.C
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Objects;
 import com.google.common.cache.Cache;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Striped;
 import com.mongodb.BasicDBObject;
 import com.mongodb.DB;
 import com.mongodb.DBCollection;
@@ -71,6 +75,11 @@ public class MongoDocumentStore implemen
     private final CacheStats cacheStats;
 
     /**
+     * Locks to ensure cache consistency on reads, writes and invalidation.
+     */
+    private final Striped<Lock> locks = Striped.lock(64);
+
+    /**
      * Comparator for maps with {@link Revision} keys. The maps are ordered
      * descending, newest revisions first!
      */
@@ -121,13 +130,25 @@ public class MongoDocumentStore implemen
     
     @Override
     public void invalidateCache() {
-        nodesCache.invalidateAll();
+        for (String key : nodesCache.asMap().keySet()) {
+            Lock lock = getAndLock(key);
+            try {
+                nodesCache.invalidate(key);
+            } finally {
+                lock.unlock();
+            }
+        }
     }
     
     @Override
     public <T extends Document> void invalidateCache(Collection<T> collection, 
String key) {
         if (collection == Collection.NODES) {
-            nodesCache.invalidate(key);
+            Lock lock = getAndLock(key);
+            try {
+                nodesCache.invalidate(key);
+            } finally {
+                lock.unlock();
+            }
         }
     }
 
@@ -145,28 +166,33 @@ public class MongoDocumentStore implemen
         }
         try {
             NodeDocument doc;
-            if (maxCacheAge == 0) {
-                nodesCache.invalidate(key);
-            }
-            while (true) {
-                doc = nodesCache.get(key, new Callable<NodeDocument>() {
-                    @Override
-                    public NodeDocument call() throws Exception {
-                        NodeDocument doc = (NodeDocument) 
findUncached(collection, key);
-                        if (doc == null) {
-                            doc = NodeDocument.NULL;
+            Lock lock = getAndLock(key);
+            try {
+                if (maxCacheAge == 0) {
+                    invalidateCache(collection, key);
+                }
+                while (true) {
+                    doc = nodesCache.get(key, new Callable<NodeDocument>() {
+                        @Override
+                        public NodeDocument call() throws Exception {
+                            NodeDocument doc = (NodeDocument) 
findUncached(collection, key);
+                            if (doc == null) {
+                                doc = NodeDocument.NULL;
+                            }
+                            return doc;
                         }
-                        return doc;
+                    });
+                    if (maxCacheAge == 0 || maxCacheAge == Integer.MAX_VALUE) {
+                        break;
                     }
-                });
-                if (maxCacheAge == 0 || maxCacheAge == Integer.MAX_VALUE) {
-                    break;
-                }
-                if (System.currentTimeMillis() - doc.getCreated() < 
maxCacheAge) {
-                    break;
+                    if (System.currentTimeMillis() - doc.getCreated() < 
maxCacheAge) {
+                        break;
+                    }
+                    // too old: invalidate, try again
+                    invalidateCache(collection, key);
                 }
-                // too old: invalidate, try again
-                nodesCache.invalidate(key);
+            } finally {
+                lock.unlock();
             }
             if (doc == NodeDocument.NULL) {
                 return null;
@@ -235,7 +261,29 @@ public class MongoDocumentStore implemen
                 T doc = convertFromDBObject(collection, o);
                 if (collection == Collection.NODES && doc != null) {
                     doc.seal();
-                    nodesCache.put(doc.getId(), (NodeDocument) doc);
+                    String id = doc.getId();
+                    Lock lock = getAndLock(id);
+                    try {
+                        // do not overwrite document in cache if the
+                        // existing one in the cache is newer
+                        NodeDocument cached = nodesCache.getIfPresent(id);
+                        if (cached != null && cached != NodeDocument.NULL) {
+                            // check mod count
+                            Number cachedModCount = cached.getModCount();
+                            Number modCount = doc.getModCount();
+                            if (cachedModCount == null || modCount == null) {
+                                throw new IllegalStateException(
+                                        "Missing " + NodeDocument.MOD_COUNT);
+                            }
+                            if (modCount.longValue() > 
cachedModCount.longValue()) {
+                                nodesCache.put(id, (NodeDocument) doc);
+                            }
+                        } else {
+                            nodesCache.put(id, (NodeDocument) doc);
+                        }
+                    } finally {
+                        lock.unlock();
+                    }
                 }
                 list.add(doc);
             }
@@ -251,10 +299,8 @@ public class MongoDocumentStore implemen
         DBCollection dbCollection = getDBCollection(collection);
         long start = start();
         try {
-            if (collection == Collection.NODES) {
-                nodesCache.invalidate(key);
-            }
             WriteResult writeResult = 
dbCollection.remove(getByKeyQuery(key).get(), WriteConcern.SAFE);
+            invalidateCache(collection, key);
             if (writeResult.getError() != null) {
                 throw new MicroKernelException("Remove failed: " + 
writeResult.getError());
             }
@@ -271,19 +317,20 @@ public class MongoDocumentStore implemen
         DBCollection dbCollection = getDBCollection(collection);
         DBObject update = createUpdate(updateOp);
 
-        // get modCount of cached document
-        Number modCount = null;
-        T cachedDoc = null;
-        if (collection == Collection.NODES) {
-            //noinspection unchecked
-            cachedDoc = (T) nodesCache.getIfPresent(updateOp.getId());
-            if (cachedDoc != null) {
-                modCount = cachedDoc.getModCount();
-            }
-        }
-
+        Lock lock = getAndLock(updateOp.getId());
         long start = start();
         try {
+            // get modCount of cached document
+            Number modCount = null;
+            T cachedDoc = null;
+            if (collection == Collection.NODES) {
+                //noinspection unchecked
+                cachedDoc = (T) nodesCache.getIfPresent(updateOp.getId());
+                if (cachedDoc != null) {
+                    modCount = cachedDoc.getModCount();
+                }
+            }
+
             // perform a conditional update with limited result
             // if we have a matching modCount
             if (modCount != null) {
@@ -319,6 +366,7 @@ public class MongoDocumentStore implemen
         } catch (Exception e) {
             throw new MicroKernelException(e);
         } finally {
+            lock.unlock();
             end("findAndModify", start);
         }
     }
@@ -395,8 +443,12 @@ public class MongoDocumentStore implemen
                 }
                 if (collection == Collection.NODES) {
                     for (T doc : docs) {
-                        doc.seal();
-                        nodesCache.put(doc.getId(), (NodeDocument) doc);
+                        Lock lock = getAndLock(doc.getId());
+                        try {
+                            addToCache((NodeDocument) doc);
+                        } finally {
+                            lock.unlock();
+                        }
                     }
                 }
                 return true;
@@ -417,20 +469,32 @@ public class MongoDocumentStore implemen
         DBObject update = createUpdate(updateOp);
         long start = start();
         try {
+            Map<String, NodeDocument> cachedDocs = Collections.emptyMap();
+            if (collection == Collection.NODES) {
+                cachedDocs = Maps.newHashMap();
+                for (String key : keys) {
+                    cachedDocs.put(key, nodesCache.getIfPresent(key));
+                }
+            }
             try {
 
                 WriteResult writeResult = 
dbCollection.updateMulti(query.get(), update);
                 if (writeResult.getError() != null) {
                     throw new MicroKernelException("Update failed: " + 
writeResult.getError());
                 }
-                if (collection == Collection.NODES) {
-                    // update cache
-                    for (String key : keys) {
-                        @SuppressWarnings("unchecked")
-                        T doc = (T) nodesCache.getIfPresent(key);
-                        if (doc != null) {
-                            applyToCache(collection, doc, new UpdateOp(key, 
updateOp));
+                // update cache
+                for (Entry<String, NodeDocument> entry : 
cachedDocs.entrySet()) {
+                    Lock lock = getAndLock(entry.getKey());
+                    try {
+                        if (entry.getValue() == null) {
+                            // make sure concurrently loaded document is 
invalidated
+                            nodesCache.invalidate(entry.getKey());
+                        } else {
+                            applyToCache(Collection.NODES, entry.getValue(),
+                                    new UpdateOp(entry.getKey(), updateOp));
                         }
+                    } finally {
+                        lock.unlock();
                     }
                 }
             } catch (MongoException e) {
@@ -517,9 +581,9 @@ public class MongoDocumentStore implemen
 
 
     /**
-     * Applies an update to the nodes cache.
-     * <p>
-     * FIXME: ensure consistent cache update.
+     * Applies an update to the nodes cache. This method does not acquire
+     * a lock for the document. The caller must ensure it holds a lock for
+     * the updated document. See striped {@link #locks}.
      *
      * @param <T> the document type.
      * @param collection the document collection.
@@ -532,7 +596,7 @@ public class MongoDocumentStore implemen
                                                    @Nonnull UpdateOp updateOp) 
{
         // cache the new document
         if (collection == Collection.NODES) {
-            T newDoc = collection.newDocument(this);
+            NodeDocument newDoc = (NodeDocument) collection.newDocument(this);
             if (oldDoc != null) {
                 oldDoc.deepCopy(newDoc);
                 oldDoc.seal();
@@ -540,7 +604,70 @@ public class MongoDocumentStore implemen
             String key = updateOp.getId();
             MemoryDocumentStore.applyChanges(newDoc, updateOp, comparator);
             newDoc.seal();
-            nodesCache.put(key, (NodeDocument) newDoc);
+
+            NodeDocument cached = addToCache(newDoc);
+            if (cached == newDoc) {
+                // successful
+                return;
+            }
+            if (oldDoc == null) {
+                // this is an insert and some other thread was quicker
+                // loading it into the cache -> return now
+                return;
+            }
+            // this is an update (oldDoc != null)
+            if (Objects.equal(cached.getModCount(), oldDoc.getModCount())) {
+                nodesCache.put(key, newDoc);
+            } else {
+                // the cache entry was modified by some other thread in
+                // the meantime. the updated cache entry may or may not
+                // include this update. we cannot just apply our update
+                // on top of the cached entry.
+                // therefore we must invalidate the cache entry
+                nodesCache.invalidate(key);
+            }
+        }
+    }
+
+    /**
+     * Adds a document to the {@link #nodesCache} iff there is no document
+     * in the cache with the document key. This method does not acquire a lock
+     * from {@link #locks}! The caller must ensure a lock is held for the
+     * given document.
+     *
+     * @param doc the document to add to the cache.
+     * @return either the given <code>doc</code> or the document already 
present
+     *          in the cache.
+     */
+    @Nonnull
+    private NodeDocument addToCache(final @Nonnull NodeDocument doc) {
+        if (doc == NodeDocument.NULL) {
+            throw new IllegalArgumentException("doc must not be NULL 
document");
+        }
+        doc.seal();
+        // make sure we only cache the document if it wasn't
+        // changed and cached by some other thread in the
+        // meantime. That is, use get() with a Callable,
+        // which is only used when the document isn't there
+        try {
+            for (;;) {
+                NodeDocument cached = nodesCache.get(doc.getId(),
+                        new Callable<NodeDocument>() {
+                    @Override
+                    public NodeDocument call() {
+                        return doc;
+                    }
+                });
+                if (cached != NodeDocument.NULL) {
+                    return cached;
+                } else {
+                    nodesCache.invalidate(doc.getId());
+                }
+            }
+        } catch (ExecutionException e) {
+            // will never happen because call() just returns
+            // the already available doc
+            throw new IllegalStateException(e);
         }
     }
 
@@ -620,4 +747,10 @@ public class MongoDocumentStore implemen
 
         return update;
     }
+
+    private Lock getAndLock(String key) {
+        Lock l = locks.get(key);
+        l.lock();
+        return l;
+    }
 }
\ No newline at end of file

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java?rev=1532391&r1=1532390&r2=1532391&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java
 Tue Oct 15 15:35:16 2013
@@ -519,21 +519,30 @@ public class MongoMK implements MicroKer
      * Get the node for the given path and revision. The returned object might
      * not be modified directly.
      *
-     * @param path
-     * @param rev
-     * @return the node
+     * @param path the path of the node.
+     * @param rev the read revision.
+     * @return the node or <code>null</code> if the node does not exist at the
+     *          given revision.
      */
-    Node getNode(@Nonnull String path, @Nonnull Revision rev) {
+    @CheckForNull
+    Node getNode(final @Nonnull String path, final @Nonnull Revision rev) {
         checkRevisionAge(checkNotNull(rev), checkNotNull(path));
-        String key = path + "@" + rev;
-        Node node = nodeCache.getIfPresent(key);
-        if (node == null) {
-            node = readNode(path, rev);
-            if (node != null) {
-                nodeCache.put(key, node);
-            }
+        try {
+            String key = path + "@" + rev;
+            Node node = nodeCache.get(key, new Callable<Node>() {
+                @Override
+                public Node call() throws Exception {
+                    Node n = readNode(path, rev);
+                    if (n == null) {
+                        n = Node.MISSING;
+                    }
+                    return n;
+                }
+            });
+            return node == Node.MISSING ? null : node;
+        } catch (ExecutionException e) {
+            throw new MicroKernelException(e);
         }
-        return node;
     }
 
     /**
@@ -682,11 +691,12 @@ public class MongoMK implements MicroKer
     @CheckForNull
     private Node readNode(String path, Revision readRevision) {
         String id = Utils.getIdFromPath(path);
+        Revision lastRevision = getPendingModifications().get(path);
         NodeDocument doc = store.find(Collection.NODES, id);
         if (doc == null) {
             return null;
         }
-        return doc.getNodeAtRevision(this, readRevision);
+        return doc.getNodeAtRevision(this, readRevision, lastRevision);
     }
     
     @Override
@@ -1154,7 +1164,6 @@ public class MongoMK implements MicroKer
                 for (String childPath : c.children) {
                     markAsDeleted(childPath, commit, true);
                 }
-                nodeChildrenCache.invalidate(n.getId());
             }
         }
     }
@@ -1330,9 +1339,9 @@ public class MongoMK implements MicroKer
             // we no longer need to update it in a background process
             unsaved.remove(path);
         }
-        Children c = nodeChildrenCache.getIfPresent(path + "@" + rev);
+        String key = path + "@" + rev;
+        Children c = nodeChildrenCache.getIfPresent(key);
         if (isNew || (!isDelete && c != null)) {
-            String key = path + "@" + rev;
             Children c2 = new Children();
             TreeSet<String> set = new TreeSet<String>();
             if (c != null) {

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Node.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Node.java?rev=1532391&r1=1532390&r2=1532391&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Node.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Node.java
 Tue Oct 15 15:35:16 2013
@@ -30,6 +30,16 @@ import org.apache.jackrabbit.oak.plugins
  */
 public class Node implements CacheValue {
 
+    /**
+     * A node, which does not exist at a given revision.
+     */
+    static final Node MISSING = new Node(null, null) {
+        @Override
+        public int getMemory() {
+            return 8;
+        }
+    };
+
     final String path;
     final Revision rev;
     final Map<String, String> properties = Utils.newMap();

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java?rev=1532391&r1=1532390&r2=1532391&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java
 Tue Oct 15 15:35:16 2013
@@ -373,11 +373,17 @@ public class NodeDocument extends Docume
      *
      * @param context      the revision context.
      * @param readRevision the read revision.
+     * @param lastModified the revision when this node was last modified, but
+     *                     the value is potentially not yet reflected in this
+     *                     document.
+     *                     See {@link 
RevisionContext#getPendingModifications()}.
      * @return the node or <code>null</code> if the node doesn't exist at the
      *         given read revision.
      */
     @CheckForNull
-    public Node getNodeAtRevision(RevisionContext context, Revision 
readRevision) {
+    public Node getNodeAtRevision(@Nonnull RevisionContext context,
+                                  @Nonnull Revision readRevision,
+                                  @Nullable Revision lastModified) {
         Set<Revision> validRevisions = new HashSet<Revision>();
         Revision min = getLiveRevision(context, readRevision, validRevisions);
         if (min == null) {
@@ -408,7 +414,6 @@ public class NodeDocument extends Docume
         Revision lastRevision = null;
         Map<Integer, Revision> lastRevs = Maps.newHashMap(getLastRev());
         // overlay with unsaved last modified from this instance
-        Revision lastModified = context.getPendingModifications().get(path);
         if (lastModified != null) {
             lastRevs.put(context.getClusterId(), lastModified);
         }
@@ -717,8 +722,7 @@ public class NodeDocument extends Docume
     }
 
     /**
-     * Returns the local value map for the given key. Returns <code>null</code>
-     * if no such value map exists.
+     * Returns the local value map for the given key.
      *
      * @param key the key.
      * @return local value map.

Modified: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/DocumentSplitTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/DocumentSplitTest.java?rev=1532391&r1=1532390&r2=1532391&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/DocumentSplitTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/DocumentSplitTest.java
 Tue Oct 15 15:35:16 2013
@@ -60,7 +60,7 @@ public class DocumentSplitTest extends B
             assertTrue(doc.isCommitted(rev));
         }
         // check if document is still there
-        assertNotNull(doc.getNodeAtRevision(mk, Revision.fromString(head)));
+        assertNotNull(mk.getNode("/", Revision.fromString(head)));
         mk.commit("/", "+\"baz\":{}", null, null);
         mk.setAsyncDelay(0);
         mk.backgroundWrite();
@@ -94,7 +94,7 @@ public class DocumentSplitTest extends B
             assertTrue(doc.containsRevision(rev));
             assertTrue(doc.isCommitted(rev));
         }
-        Node node = doc.getNodeAtRevision(mk, Revision.fromString(head));
+        Node node = mk.getNode("/foo", Revision.fromString(head));
         // check status of node
         if (create) {
             assertNull(node);

Modified: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMKDocumentStoreIT.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMKDocumentStoreIT.java?rev=1532391&r1=1532390&r2=1532391&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMKDocumentStoreIT.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMKDocumentStoreIT.java
 Tue Oct 15 15:35:16 2013
@@ -20,18 +20,20 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.jackrabbit.oak.plugins.mongomk.util.Utils;
-import org.junit.Ignore;
 import org.junit.Test;
 
+import com.google.common.collect.Maps;
+
 import static junit.framework.Assert.assertEquals;
 import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertNull;
 
 /**
  * Tests {@code MongoDocumentStore} with concurrent updates.
  */
-@Ignore
 public class MongoMKDocumentStoreIT extends AbstractMongoConnectionTest {
 
     private static final int NUM_THREADS = 3;
@@ -58,12 +60,51 @@ public class MongoMKDocumentStoreIT exte
                 }
             }));
         }
+        final List<Exception> exceptions = new ArrayList<Exception>();
+        final AtomicBoolean running = new AtomicBoolean(true);
+        Thread reader = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    Map<Revision, Integer> previous = Maps.newHashMap();
+                    while (running.get()) {
+                        NodeDocument doc = docStore.find(Collection.NODES, id);
+                        if (doc == null) {
+                            throw new Exception("document is null");
+                        }
+                        Map<Revision, String> values = doc.getValueMap("prop");
+                        for (Map.Entry<Revision, String> entry : 
values.entrySet()) {
+                            Revision r = entry.getKey();
+                            Integer previousValue = previous.get(r);
+                            Integer currentValue = 
Integer.parseInt(entry.getValue());
+                            if (previousValue != null &&
+                                    previousValue > currentValue) {
+                                throw new Exception("inconsistent read for " +
+                                        r + ". previous value: " + 
previousValue +
+                                        ", now: " + entry.getValue());
+                            }
+                            // remember for next round
+                            previous.put(r, currentValue);
+                        }
+                        Thread.yield();
+                    }
+                } catch (Exception e) {
+                    exceptions.add(e);
+                }
+            }
+        });
+        reader.start();
         for (Thread t : threads) {
             t.start();
         }
         for (Thread t : threads) {
             t.join();
         }
+        running.set(false);
+        reader.join();
+        for (Exception e : exceptions) {
+            throw e;
+        }
         NodeDocument doc = docStore.find(Collection.NODES, id);
         assertNotNull(doc);
         Map<Revision, String> values = doc.getLocalMap("prop");
@@ -83,4 +124,13 @@ public class MongoMKDocumentStoreIT exte
             setUpConnection();
         }
     }
+
+    @Test
+    public void negativeCache() throws Exception {
+        String id = Utils.getIdFromPath("/test");
+        DocumentStore docStore = mk.getDocumentStore();
+        assertNull(docStore.find(Collection.NODES, id));
+        mk.commit("/", "+\"test\":{}", null, null);
+        assertNotNull(docStore.find(Collection.NODES, id));
+    }
 }


Reply via email to