Author: reschke Date: Thu Sep 27 12:25:22 2018 New Revision: 1842089 URL: http://svn.apache.org/viewvc?rev=1842089&view=rev Log: OAK-7748: DocumentStore: test (and optionally optimize) bulk update fallback logic
Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateClusterTest.java Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java?rev=1842089&r1=1842088&r2=1842089&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java (original) +++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java Thu Sep 27 12:25:22 2018 @@ -423,6 +423,12 @@ public class RDBDocumentStore implements for (UpdateOp updateOp : updateOps) { UpdateOp conflictedOp = operationsToCover.remove(updateOp.getId()); if (conflictedOp != null) { + if (collection == Collection.NODES) { + LOG.debug("update conflict on {}, invalidating cache and retrying...", updateOp.getId()); + nodesCache.invalidate(updateOp.getId()); + } else { + LOG.debug("update conflict on {}, retrying...", updateOp.getId()); + } results.put(conflictedOp, createOrUpdate(collection, updateOp)); } else if (duplicates.contains(updateOp)) { results.put(updateOp, createOrUpdate(collection, updateOp)); @@ -1621,10 +1627,17 @@ public class RDBDocumentStore implements if (lastmodcount == newmodcount) { // cached copy did not change so it probably was // updated by a different instance, get a fresh one + LOG.debug("suspect update from different instance (current modcount: {}), refetching: {}...", + newmodcount, update.getId()); if (collection == Collection.NODES) { nodesCache.invalidate(update.getId()); } oldDoc = readDocumentUncached(collection, update.getId(), null); + if (oldDoc == null) { + LOG.debug("after refetch: {} is gone", update.getId()); + } else { + LOG.debug("after refetch: modcount for {} is {}", update.getId(), modcountOf(oldDoc)); + } } } Modified: jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateClusterTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateClusterTest.java?rev=1842089&r1=1842088&r2=1842089&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateClusterTest.java (original) +++ jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateClusterTest.java Thu Sep 27 12:25:22 2018 @@ -31,12 +31,18 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import org.apache.jackrabbit.oak.commons.junit.LogCustomizer; +import org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Lists; + +import ch.qos.logback.classic.Level; + public class BulkCreateOrUpdateClusterTest extends AbstractMultiDocumentStoreTest { - + final Logger logger = LoggerFactory.getLogger(getClass()); public BulkCreateOrUpdateClusterTest(DocumentStoreFixture dsf) { @@ -202,4 +208,64 @@ public class BulkCreateOrUpdateClusterTe } } + @Test + public void testSimpleConflictHandling() { + LogCustomizer logCustomizer = LogCustomizer.forLogger(RDBDocumentStore.class.getName()).enable(Level.DEBUG) + .contains("invalidating cache and retrying").create(); + logCustomizer.starting(); + + try { + String id1 = this.getClass().getName() + ".testSimpleConflictHandling1"; + String id2 = this.getClass().getName() + ".testSimpleConflictHandling2"; + String id3 = this.getClass().getName() + ".testSimpleConflictHandling3"; + + removeMe.add(id1); + removeMe.add(id2); + removeMe.add(id3); + + { + UpdateOp op1a = new UpdateOp(id1, true); + op1a.set("foo", 1); + UpdateOp op2a = new UpdateOp(id2, true); + op2a.set("foo", 1); + UpdateOp op3a = new UpdateOp(id3, true); + op3a.set("foo", 1); + + List<NodeDocument> resulta = ds1.createOrUpdate(Collection.NODES, Lists.newArrayList(op1a, op2a, op3a)); + assertEquals(3, resulta.size()); + } + + { + UpdateOp op2b = new UpdateOp(id2, false); + op2b.increment("foo", 1); + NodeDocument prev2 = ds2.createOrUpdate(Collection.NODES, op2b); + assertNotNull(prev2); + assertEquals(1L, ((Long)prev2.get("foo")).longValue()); + } + + { + UpdateOp op1c = new UpdateOp(id1, true); + op1c.increment("foo", 1); + UpdateOp op2c = new UpdateOp(id2, true); + op2c.increment("foo", 1); + UpdateOp op3c = new UpdateOp(id3, true); + op3c.increment("foo", 1); + + List<NodeDocument> resultc = ds1.createOrUpdate(Collection.NODES, Lists.newArrayList(op1c, op2c, op3c)); + assertEquals(3, resultc.size()); + for (NodeDocument d : resultc) { + Long fooval = (Long) d.get("foo"); + assertEquals((d.getId().equals(id2)) ? 2L : 1L, fooval.longValue()); + } + } + + if (ds1 instanceof RDBDocumentStore) { + // for RDB, verify that the cache invalidation was reached + assertEquals(1, logCustomizer.getLogs().size()); + } + } + finally { + logCustomizer.finished(); + } + } }