Author: reschke
Date: Thu Nov 12 12:58:28 2015
New Revision: 1714034

URL: http://svn.apache.org/viewvc?rev=1714034&view=rev
Log:
OAK-3617: RDBDocumentStore: improve retry logic in updateDocument

Modified:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractMultiDocumentStoreTest.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStorePerformanceTest.java

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java?rev=1714034&r1=1714033&r2=1714034&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
 Thu Nov 12 12:58:28 2015
@@ -1363,14 +1363,20 @@ public class RDBDocumentStore implements
             Long modcount = (Long) document.get(MODCOUNT);
             Long cmodcount = (Long) document.get(COLLISIONSMODCOUNT);
             boolean success = false;
+            boolean shouldRetry = true;
 
             // every 16th update is a full rewrite
             if (isAppendableUpdate(update) && modcount % 16 != 0) {
                 String appendData = SR.asString(update);
                 if (appendData.length() < tmd.getDataLimitInOctets() / 
CHAR2OCTETRATIO) {
                     try {
-                        success = dbAppendingUpdate(connection, tmd, 
document.getId(), modified, modifiedIsConditional, hasBinary, deletedOnce,
-                                modcount, cmodcount, oldmodcount, appendData);
+                        success = dbAppendingUpdate(connection, tmd, 
document.getId(), modified, modifiedIsConditional, hasBinary,
+                                deletedOnce, modcount, cmodcount, oldmodcount, 
appendData);
+                        // if we get here, a retry is not going to help (the 
SQL
+                        // operation succeeded but simply did not select a row
+                        // that could be updated, likely because of the check 
on
+                        // MODCOUNT
+                        shouldRetry = false;
                         connection.commit();
                     } catch (SQLException ex) {
                         continueIfStringOverflow(ex);
@@ -1379,7 +1385,7 @@ public class RDBDocumentStore implements
                     }
                 }
             }
-            if (!success) {
+            if (!success && shouldRetry) {
                 data = SR.asString(document);
                 success = dbUpdate(connection, tmd, document.getId(), 
modified, hasBinary, deletedOnce, modcount, cmodcount,
                         oldmodcount, data);

Modified: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractMultiDocumentStoreTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractMultiDocumentStoreTest.java?rev=1714034&r1=1714033&r2=1714034&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractMultiDocumentStoreTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractMultiDocumentStoreTest.java
 Thu Nov 12 12:58:28 2015
@@ -36,7 +36,7 @@ public abstract class AbstractMultiDocum
         ds2.dispose();
     }
 
-    @Parameterized.Parameters
+    @Parameterized.Parameters(name="{0}")
     public static Collection<Object[]> fixtures() {
         return fixtures(true);
     }

Modified: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStorePerformanceTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStorePerformanceTest.java?rev=1714034&r1=1714033&r2=1714034&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStorePerformanceTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStorePerformanceTest.java
 Thu Nov 12 12:58:28 2015
@@ -29,6 +29,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -40,7 +41,7 @@ import org.slf4j.LoggerFactory;
  * These tests are disabled by default due to their long running time. On the 
command line
  * specify {@code -DDocumentStorePerformanceTest=true} to enable them.
  */
-public class DocumentStorePerformanceTest extends AbstractDocumentStoreTest {
+public class DocumentStorePerformanceTest extends 
AbstractMultiDocumentStoreTest {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(DocumentStorePerformanceTest.class);
     private static final boolean ENABLED = 
Boolean.getBoolean(DocumentStorePerformanceTest.class.getSimpleName());
@@ -313,4 +314,109 @@ public class DocumentStorePerformanceTes
 
         LOG.info("document updates with property of size " + size + (growing ? 
" (growing)" : "") + " for " + super.dsname + " was " + cnt + " in " + duration 
+ "ms (" + (cnt / (duration / 1000f)) + "/s)");
     }
+
+    @Test
+    public void testConcurrentUpdatePerf1DS() throws InterruptedException {
+        String id = this.getClass().getName() + ".testConcurrentUpdatePerf1DS";
+        concurrentUpdatePerf(id, 1);
+    }
+
+    @Test
+    public void testConcurrentUpdatePerf2DS() throws InterruptedException {
+        String id = this.getClass().getName() + ".testConcurrentUpdatePerf1DS";
+        concurrentUpdatePerf(id, 2);
+    }
+
+    private void concurrentUpdatePerf(String testName, int stores) throws 
InterruptedException {
+        final String id = testName;
+        final long duration = 1000;
+
+        ds1.remove(Collection.NODES, id);
+        UpdateOp up = new UpdateOp(id, true);
+        up.set(Document.ID, id);
+        up.set(Document.MOD_COUNT, 1L);
+        up.set("c", 0L);
+        up.set("u", 0L);
+        super.ds1.create(Collection.NODES, Collections.singletonList(up));
+        removeMe.add(id);
+
+        final DocumentStore ts1 = ds1;
+        final DocumentStore ts2 = stores == 2 ? ds2 : ds1;
+
+        final AtomicBoolean threadTwoIsActive = new AtomicBoolean(false);
+        final AtomicBoolean threadOneIsDone = new AtomicBoolean(false);
+
+        Thread one = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                int failures = 0;
+                while (!threadTwoIsActive.get()) {
+                }
+                // operation that requires fetching the previous state
+                UpdateOp up = new UpdateOp(id, false);
+                up.increment("c", 1);
+                up.notEquals("qux", "qux");
+                long end = System.currentTimeMillis() + duration;
+                while (System.currentTimeMillis() < end) {
+                    try {
+                        ts1.update(Collection.NODES, 
Collections.singletonList(id), up);
+                    } catch (RuntimeException ex) {
+                        failures += 1;
+                    }
+                }
+                try {
+                    UpdateOp up2 = new UpdateOp(id, false);
+                    up2.set("cfailures", failures);
+                    ts1.update(Collection.NODES, 
Collections.singletonList(id), up2);
+                } catch (RuntimeException ex) {
+                }
+                threadOneIsDone.set(true);
+            }
+        }, "cond");
+
+        Thread two = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                int failures = 0;
+                // operation that does not require fetching the previous state
+                UpdateOp up = new UpdateOp(id, false);
+                up.set(Document.ID, id);
+                up.increment("u", 1);
+                while (!threadOneIsDone.get()) {
+                    try {
+                        ts2.update(Collection.NODES, 
Collections.singletonList(id), up);
+                        threadTwoIsActive.set(true);
+                    } catch (RuntimeException ex) {
+                        failures += 1;
+                    }
+                }
+                threadTwoIsActive.set(true);
+                try {
+                    UpdateOp up2 = new UpdateOp(id, false);
+                    up2.set("ufailures", failures);
+                    ts1.update(Collection.NODES, 
Collections.singletonList(id), up2);
+                } catch (RuntimeException ex) {
+                }
+            }
+        }, "uncond");
+
+        two.start();
+        one.start();
+
+        two.join();
+        one.join();
+
+        // reading uncached because for some reason MongoDS doesn't see the
+        // changes made in ds2
+        NodeDocument nd = ds1.find(Collection.NODES, id, 0);
+        assertNotNull(nd);
+        int cc = nd.get("c") == null ? 0 : 
Integer.valueOf(nd.get("c").toString());
+        int uc = nd.get("u") == null ? 0 : 
Integer.valueOf(nd.get("u").toString());
+        long mc = nd.getModCount().longValue();
+        String msg = String.format(
+                "Concurrent updates %s on %s cond. updates: %d (failures: %s), 
uncond. updates: %d (failures: %s), _modCount: %d, ops/sec: %d, %% of cond. 
updates: %d",
+                stores == 1 ? "(one ds)" : "(two ds)", super.dsname, cc, 
nd.get("cfailures"), uc, nd.get("ufailures"), mc,
+                mc * 1000 / duration, cc * 100 / mc);
+        LOG.info(msg);
+    }
 }


Reply via email to