Author: reschke
Date: Thu Nov 26 12:32:20 2015
New Revision: 1716616
URL: http://svn.apache.org/viewvc?rev=1716616&view=rev
Log:
OAK-3659: RDBDocumentStore - race condition might cause update() to put stale
data into cache
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBCacheConsistencyIT.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java?rev=1716616&r1=1716615&r2=1716616&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
Thu Nov 26 12:32:20 2015
@@ -1004,9 +1004,7 @@ public class RDBDocumentStore implements
return null;
}
} else {
- if (collection == Collection.NODES) {
- applyToCache((NodeDocument) oldDoc, (NodeDocument)
doc);
- }
+ updateCache(collection, oldDoc, doc);
}
}
@@ -1093,16 +1091,21 @@ public class RDBDocumentStore implements
}
for (Entry<String, NodeDocument> entry :
cachedDocs.entrySet()) {
T oldDoc = castAsT(entry.getValue());
- if (oldDoc == null) {
- String id = entry.getKey();
- // make sure concurrently loaded document is
- // invalidated
- nodesCache.invalidate(new StringValue(id));
- } else {
- T newDoc = applyChanges(collection, oldDoc,
update, true);
- if (newDoc != null) {
- applyToCache((NodeDocument) oldDoc,
(NodeDocument) newDoc);
+ String id = entry.getKey();
+ Lock lock = getAndLock(id);
+ try {
+ if (oldDoc == null) {
+ // make sure concurrently loaded document is
+ // invalidated
+ nodesCache.invalidate(new StringValue(id));
+ } else {
+ T newDoc = applyChanges(collection, oldDoc,
update, true);
+ if (newDoc != null) {
+ updateCache(collection, oldDoc, newDoc);
+ }
}
+ } finally {
+ lock.unlock();
}
}
} else {
@@ -1569,14 +1572,65 @@ public class RDBDocumentStore implements
return n != null ? n.longValue() : -1;
}
+ private <T extends Document> void addToCache(Collection<T> collection, T
doc) {
+ if (collection == Collection.NODES) {
+ Lock lock = getAndLock(idOf(doc));
+ try {
+ addToCache((NodeDocument) doc);
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ /**
+ * Applies an update to the nodes cache. This method does not acquire
+ * a lock for the document. The caller must ensure it holds a lock for
+ * the updated document. See striped {@link #locks}.
+ *
+ * @param <T> the document type.
+ * @param collection the document collection.
+ * @param oldDoc the old document.
+ * @param updateOp the update operation.
+ */
+ private <T extends Document> void updateCache(@Nonnull Collection<T>
collection,
+ @Nonnull T oldDoc,
+ @Nonnull T newDoc) {
+ // cache the new document
+ if (collection == Collection.NODES) {
+ checkNotNull(oldDoc);
+ checkNotNull(newDoc);
+ // we can only update the cache based on the oldDoc if we
+ // still have the oldDoc in the cache, otherwise we may
+ // update the cache with an outdated document
+ CacheValue key = new StringValue(idOf(newDoc));
+ NodeDocument cached = nodesCache.getIfPresent(key);
+ if (cached == null) {
+ // cannot use oldDoc to update cache
+ return;
+ }
+
+ // check if the currently cached document matches oldDoc
+ if (Objects.equal(cached.getModCount(), oldDoc.getModCount())) {
+ nodesCache.put(key, (NodeDocument)newDoc);
+ } else {
+ // the cache entry was modified by some other thread in
+ // the meantime. the updated cache entry may or may not
+ // include this update. we cannot just apply our update
+ // on top of the cached entry.
+ // therefore we must invalidate the cache entry
+ nodesCache.invalidate(key);
+ }
+ }
+ }
+
/**
- * Adds a document to the {@link #nodesCache} iff there is no document in
- * the cache with the document key. This method does not acquire a lock
from
- * {@link #locks}! The caller must ensure a lock is held for the given
- * document.
- *
- * @param doc
- * the document to add to the cache.
+ * Adds a document to the {@link #nodesCache} iff there is no document
+ * in the cache with the document key. This method does not acquire a lock
+ * from {@link #locks}! The caller must ensure a lock is held for the
+ * given document.
+ *
+ * @param doc the document to add to the cache.
* @return either the given <code>doc</code> or the document already
present
* in the cache.
*/
@@ -1612,43 +1666,6 @@ public class RDBDocumentStore implements
}
}
- @Nonnull
- private void applyToCache(@Nonnull final NodeDocument oldDoc, @Nonnull
final NodeDocument newDoc) {
- NodeDocument cached = addToCache(newDoc);
- if (cached == newDoc) {
- // successful
- return;
- } else if (oldDoc == null) {
- // this is an insert and some other thread was quicker
- // loading it into the cache -> return now
- return;
- } else {
- CacheValue key = new StringValue(idOf(newDoc));
- // this is an update (oldDoc != null)
- if (Objects.equal(cached.getModCount(), oldDoc.getModCount())) {
- nodesCache.put(key, newDoc);
- } else {
- // the cache entry was modified by some other thread in
- // the meantime. the updated cache entry may or may not
- // include this update. we cannot just apply our update
- // on top of the cached entry.
- // therefore we must invalidate the cache entry
- nodesCache.invalidate(key);
- }
- }
- }
-
- private <T extends Document> void addToCache(Collection<T> collection, T
doc) {
- if (collection == Collection.NODES) {
- Lock lock = getAndLock(idOf(doc));
- try {
- addToCache((NodeDocument) doc);
- } finally {
- lock.unlock();
- }
- }
- }
-
@Nonnull
protected <T extends Document> T convertFromDBObject(@Nonnull
Collection<T> collection, @Nonnull RDBRow row) {
// this method is present here in order to facilitate unit testing for
OAK-3566
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBCacheConsistencyIT.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBCacheConsistencyIT.java?rev=1716616&r1=1716615&r2=1716616&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBCacheConsistencyIT.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBCacheConsistencyIT.java
Thu Nov 26 12:32:20 2015
@@ -34,7 +34,6 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.document.util.StringValue;
import org.apache.jackrabbit.oak.plugins.document.util.Utils;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import com.google.common.cache.Cache;
@@ -50,7 +49,7 @@ public class RDBCacheConsistencyIT exten
@Before
@Override
public void setUpConnection() throws Exception {
- dataSource = RDBDataSourceFactory.forJdbcUrl(URL, USERNAME, PASSWD);
+ dataSource = RDBDataSourceFactory.forJdbcUrl(URL, USERNAME, PASSWD);
// /*"jdbc:derby:foo;create=true"*/
DocumentMK.Builder builder = new
DocumentMK.Builder().clock(getTestClock()).setAsyncDelay(0);
RDBOptions opt = new RDBOptions().tablePrefix("T" +
UUID.randomUUID().toString().replace("-", "")).dropTablesOnClose(true);
store = new RDBDocumentStore(dataSource, builder, opt);
@@ -58,7 +57,6 @@ public class RDBCacheConsistencyIT exten
}
@Test
- @Ignore("OAK-3659")
public void evictWhileUpdateLoop() throws Throwable {
for (int i = 0; i < 10; i++) {
runTest();
@@ -86,7 +84,9 @@ public class RDBCacheConsistencyIT exten
store.update(NODES, ids, op);
NodeDocument doc = store.find(NODES, id);
Object p = doc.get("p");
- assertEquals(/*"last modified by: " + doc.get("mb"),*/
v, ((Long) p).longValue());
+ assertEquals("Unexpected result after update-then-find
sequence, last modification of document by '"
+ + doc.get("mb") + "' thread @" +
doc.getModCount(), v, ((Long) p).longValue());
+ // System.out.println("u @" + doc.getModCount() + "
p=" + v + "; q=" + doc.get("q"));
} catch (Throwable e) {
exceptions.add(e);
}
@@ -100,6 +100,7 @@ public class RDBCacheConsistencyIT exten
public void run() {
String id = Utils.getIdFromPath("/test/foo");
long v = 0;
+ long lastWrittenV = 0;
while (exceptions.isEmpty()) {
try {
UpdateOp op = new UpdateOp(id, false);
@@ -108,10 +109,15 @@ public class RDBCacheConsistencyIT exten
NodeDocument old = store.findAndUpdate(NODES, op);
Object q = old.get("q");
if (q != null) {
- assertEquals(v - 1, ((Long) q).longValue());
+ assertEquals("Unexpected result after
findAndUpdate, last modification of document by '" + old.get("mb")
+ + "' thread @" + old.getModCount(),
lastWrittenV, ((Long) q).longValue());
}
+ lastWrittenV = v;
+ // System.out.println("f @" + old.getModCount() + "
p=" + old.get("p") + "; q=" + q);
} catch (DocumentStoreException e) {
- // keep going, RDBDocumentStore might have given up
due to race conditions
+ // System.err.println("f update of v to " + v + "
failed: " + e.getMessage());
+ // keep going, RDBDocumentStore might have given up due
+ // to race conditions
} catch (Throwable e) {
exceptions.add(e);
}
@@ -126,21 +132,31 @@ public class RDBCacheConsistencyIT exten
String id = Utils.getIdFromPath("/test/foo");
long p = 0;
long q = 0;
+ long mc = 0;
while (exceptions.isEmpty()) {
try {
NodeDocument doc = store.find(NODES, id);
if (doc != null) {
Object value = doc.get("p");
if (value != null) {
- assertTrue((Long) value >= p);
+ assertTrue(
+ "reader thread at @" +
doc.getModCount()
+ + ": observed property value
for 'p' (incremented by 'update' thread) decreased, last change by '"
+ + doc.get("mb") + "' thread;
previous: " + p + " (at @" + mc + "), now: " + value,
+ (Long) value >= p);
p = (Long) value;
}
value = doc.get("q");
if (value != null) {
- assertTrue("previous: " + q + ", now: " +
value, (Long) value >= q);
+ assertTrue(
+ "reader thread at @" +
doc.getModCount()
+ + ": observed property value
for 'q' (incremented by 'findAndUpdate' thread) decreased, last change by '"
+ + doc.get("mb") + "' thread;
previous: " + q + " (at @" + mc + "), now: " + value,
+ (Long) value >= q);
q = (Long) value;
}
}
+ mc = doc.getModCount().longValue();
} catch (Throwable e) {
exceptions.add(e);
}
@@ -160,6 +176,7 @@ public class RDBCacheConsistencyIT exten
if (cache.getIfPresent(key) != null) {
Thread.sleep(0, (int) (Math.random() * 100));
// simulate eviction
+ // System.out.println("EVICT");
cache.invalidate(key);
}
}