Author: reschke
Date: Thu Nov 26 12:32:20 2015
New Revision: 1716616

URL: http://svn.apache.org/viewvc?rev=1716616&view=rev
Log:
OAK-3659: RDBDocumentStore - race condition might cause update() to put stale 
data into cache

Modified:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBCacheConsistencyIT.java

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java?rev=1716616&r1=1716615&r2=1716616&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
 Thu Nov 26 12:32:20 2015
@@ -1004,9 +1004,7 @@ public class RDBDocumentStore implements
                             return null;
                         }
                     } else {
-                        if (collection == Collection.NODES) {
-                            applyToCache((NodeDocument) oldDoc, (NodeDocument) 
doc);
-                        }
+                        updateCache(collection, oldDoc, doc);
                     }
                 }
 
@@ -1093,16 +1091,21 @@ public class RDBDocumentStore implements
                     }
                     for (Entry<String, NodeDocument> entry : 
cachedDocs.entrySet()) {
                         T oldDoc = castAsT(entry.getValue());
-                        if (oldDoc == null) {
-                            String id = entry.getKey();
-                            // make sure concurrently loaded document is
-                            // invalidated
-                            nodesCache.invalidate(new StringValue(id));
-                        } else {
-                            T newDoc = applyChanges(collection, oldDoc, 
update, true);
-                            if (newDoc != null) {
-                                applyToCache((NodeDocument) oldDoc, 
(NodeDocument) newDoc);
+                        String id = entry.getKey();
+                        Lock lock = getAndLock(id);
+                        try {
+                            if (oldDoc == null) {
+                                // make sure concurrently loaded document is
+                                // invalidated
+                                nodesCache.invalidate(new StringValue(id));
+                            } else {
+                                T newDoc = applyChanges(collection, oldDoc, 
update, true);
+                                if (newDoc != null) {
+                                    updateCache(collection, oldDoc, newDoc);
+                                }
                             }
+                        } finally {
+                            lock.unlock();
                         }
                     }
                 } else {
@@ -1569,14 +1572,65 @@ public class RDBDocumentStore implements
         return n != null ? n.longValue() : -1;
     }
 
+    private <T extends Document> void addToCache(Collection<T> collection, T 
doc) {
+        if (collection == Collection.NODES) {
+            Lock lock = getAndLock(idOf(doc));
+            try {
+                addToCache((NodeDocument) doc);
+            } finally {
+                lock.unlock();
+            }
+        }
+    }
+
+    /**
+     * Applies an update to the nodes cache. This method does not acquire
+     * a lock for the document. The caller must ensure it holds a lock for
+     * the updated document. See striped {@link #locks}.
+     *
+     * @param <T> the document type.
+     * @param collection the document collection.
+     * @param oldDoc the old document.
+     * @param updateOp the update operation.
+     */
+    private <T extends Document> void updateCache(@Nonnull Collection<T> 
collection,
+                                                  @Nonnull T oldDoc,
+                                                  @Nonnull T newDoc) {
+        // cache the new document
+        if (collection == Collection.NODES) {
+            checkNotNull(oldDoc);
+            checkNotNull(newDoc);
+            // we can only update the cache based on the oldDoc if we
+            // still have the oldDoc in the cache, otherwise we may
+            // update the cache with an outdated document
+            CacheValue key = new StringValue(idOf(newDoc));
+            NodeDocument cached = nodesCache.getIfPresent(key);
+            if (cached == null) {
+                // cannot use oldDoc to update cache
+                return;
+            }
+
+            // check if the currently cached document matches oldDoc
+            if (Objects.equal(cached.getModCount(), oldDoc.getModCount())) {
+                nodesCache.put(key, (NodeDocument)newDoc);
+            } else {
+                // the cache entry was modified by some other thread in
+                // the meantime. the updated cache entry may or may not
+                // include this update. we cannot just apply our update
+                // on top of the cached entry.
+                // therefore we must invalidate the cache entry
+                nodesCache.invalidate(key);
+            }
+        }
+    }
+
     /**
-     * Adds a document to the {@link #nodesCache} iff there is no document in
-     * the cache with the document key. This method does not acquire a lock 
from
-     * {@link #locks}! The caller must ensure a lock is held for the given
-     * document.
-     * 
-     * @param doc
-     *            the document to add to the cache.
+     * Adds a document to the {@link #nodesCache} iff there is no document
+     * in the cache with the document key. This method does not acquire a lock
+     * from {@link #locks}! The caller must ensure a lock is held for the
+     * given document.
+     *
+     * @param doc the document to add to the cache.
      * @return either the given <code>doc</code> or the document already 
present
      *         in the cache.
      */
@@ -1612,43 +1666,6 @@ public class RDBDocumentStore implements
         }
     }
 
-    @Nonnull
-    private void applyToCache(@Nonnull final NodeDocument oldDoc, @Nonnull 
final NodeDocument newDoc) {
-        NodeDocument cached = addToCache(newDoc);
-        if (cached == newDoc) {
-            // successful
-            return;
-        } else if (oldDoc == null) {
-            // this is an insert and some other thread was quicker
-            // loading it into the cache -> return now
-            return;
-        } else {
-            CacheValue key = new StringValue(idOf(newDoc));
-            // this is an update (oldDoc != null)
-            if (Objects.equal(cached.getModCount(), oldDoc.getModCount())) {
-                nodesCache.put(key, newDoc);
-            } else {
-                // the cache entry was modified by some other thread in
-                // the meantime. the updated cache entry may or may not
-                // include this update. we cannot just apply our update
-                // on top of the cached entry.
-                // therefore we must invalidate the cache entry
-                nodesCache.invalidate(key);
-            }
-        }
-    }
-
-    private <T extends Document> void addToCache(Collection<T> collection, T 
doc) {
-        if (collection == Collection.NODES) {
-            Lock lock = getAndLock(idOf(doc));
-            try {
-                addToCache((NodeDocument) doc);
-            } finally {
-                lock.unlock();
-            }
-        }
-    }
-
     @Nonnull
     protected <T extends Document> T convertFromDBObject(@Nonnull 
Collection<T> collection, @Nonnull RDBRow row) {
         // this method is present here in order to facilitate unit testing for 
OAK-3566

Modified: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBCacheConsistencyIT.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBCacheConsistencyIT.java?rev=1716616&r1=1716615&r2=1716616&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBCacheConsistencyIT.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBCacheConsistencyIT.java
 Thu Nov 26 12:32:20 2015
@@ -34,7 +34,6 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.document.util.StringValue;
 import org.apache.jackrabbit.oak.plugins.document.util.Utils;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import com.google.common.cache.Cache;
@@ -50,7 +49,7 @@ public class RDBCacheConsistencyIT exten
     @Before
     @Override
     public void setUpConnection() throws Exception {
-        dataSource = RDBDataSourceFactory.forJdbcUrl(URL, USERNAME, PASSWD);
+        dataSource = RDBDataSourceFactory.forJdbcUrl(URL, USERNAME, PASSWD); 
// /*"jdbc:derby:foo;create=true"*/
         DocumentMK.Builder builder = new 
DocumentMK.Builder().clock(getTestClock()).setAsyncDelay(0);
         RDBOptions opt = new RDBOptions().tablePrefix("T" + 
UUID.randomUUID().toString().replace("-", "")).dropTablesOnClose(true);
         store = new RDBDocumentStore(dataSource, builder, opt);
@@ -58,7 +57,6 @@ public class RDBCacheConsistencyIT exten
     }
 
     @Test
-    @Ignore("OAK-3659")
     public void evictWhileUpdateLoop() throws Throwable {
         for (int i = 0; i < 10; i++) {
             runTest();
@@ -86,7 +84,9 @@ public class RDBCacheConsistencyIT exten
                         store.update(NODES, ids, op);
                         NodeDocument doc = store.find(NODES, id);
                         Object p = doc.get("p");
-                        assertEquals(/*"last modified by: " + doc.get("mb"),*/ 
v, ((Long) p).longValue());
+                        assertEquals("Unexpected result after update-then-find 
sequence, last modification of document by '"
+                                + doc.get("mb") + "' thread @" + 
doc.getModCount(), v, ((Long) p).longValue());
+                        // System.out.println("u @" + doc.getModCount() + " 
p=" + v + "; q=" + doc.get("q"));
                     } catch (Throwable e) {
                         exceptions.add(e);
                     }
@@ -100,6 +100,7 @@ public class RDBCacheConsistencyIT exten
             public void run() {
                 String id = Utils.getIdFromPath("/test/foo");
                 long v = 0;
+                long lastWrittenV = 0;
                 while (exceptions.isEmpty()) {
                     try {
                         UpdateOp op = new UpdateOp(id, false);
@@ -108,10 +109,15 @@ public class RDBCacheConsistencyIT exten
                         NodeDocument old = store.findAndUpdate(NODES, op);
                         Object q = old.get("q");
                         if (q != null) {
-                            assertEquals(v - 1, ((Long) q).longValue());
+                            assertEquals("Unexpected result after 
findAndUpdate, last modification of document by '" + old.get("mb")
+                                    + "' thread @" + old.getModCount(), 
lastWrittenV, ((Long) q).longValue());
                         }
+                        lastWrittenV = v;
+                        // System.out.println("f @" + old.getModCount() + " 
p=" + old.get("p") + "; q=" + q);
                     } catch (DocumentStoreException e) {
-                        // keep going, RDBDocumentStore might have given up 
due to race conditions
+                        // System.err.println("f update of v to " + v + " 
failed: " + e.getMessage());
+                        // keep going, RDBDocumentStore might have given up due
+                        // to race conditions
                     } catch (Throwable e) {
                         exceptions.add(e);
                     }
@@ -126,21 +132,31 @@ public class RDBCacheConsistencyIT exten
                 String id = Utils.getIdFromPath("/test/foo");
                 long p = 0;
                 long q = 0;
+                long mc = 0;
                 while (exceptions.isEmpty()) {
                     try {
                         NodeDocument doc = store.find(NODES, id);
                         if (doc != null) {
                             Object value = doc.get("p");
                             if (value != null) {
-                                assertTrue((Long) value >= p);
+                                assertTrue(
+                                        "reader thread at @" + 
doc.getModCount()
+                                                + ": observed property value 
for 'p' (incremented by 'update' thread) decreased, last change by '"
+                                                + doc.get("mb") + "' thread; 
previous: " + p + " (at @" + mc + "), now: " + value,
+                                        (Long) value >= p);
                                 p = (Long) value;
                             }
                             value = doc.get("q");
                             if (value != null) {
-                                assertTrue("previous: " + q + ", now: " + 
value, (Long) value >= q);
+                                assertTrue(
+                                        "reader thread at @" + 
doc.getModCount()
+                                                + ": observed property value 
for 'q' (incremented by 'findAndUpdate' thread) decreased, last change by '"
+                                                + doc.get("mb") + "' thread; 
previous: " + q + " (at @" + mc + "), now: " + value,
+                                        (Long) value >= q);
                                 q = (Long) value;
                             }
                         }
+                        mc = doc.getModCount().longValue();
                     } catch (Throwable e) {
                         exceptions.add(e);
                     }
@@ -160,6 +176,7 @@ public class RDBCacheConsistencyIT exten
             if (cache.getIfPresent(key) != null) {
                 Thread.sleep(0, (int) (Math.random() * 100));
                 // simulate eviction
+                // System.out.println("EVICT");
                 cache.invalidate(key);
             }
         }


Reply via email to