Author: reschke
Date: Thu Nov 12 12:58:28 2015
New Revision: 1714034
URL: http://svn.apache.org/viewvc?rev=1714034&view=rev
Log:
OAK-3617: RDBDocumentStore: improve retry logic in updateDocument
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractMultiDocumentStoreTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStorePerformanceTest.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java?rev=1714034&r1=1714033&r2=1714034&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
Thu Nov 12 12:58:28 2015
@@ -1363,14 +1363,20 @@ public class RDBDocumentStore implements
Long modcount = (Long) document.get(MODCOUNT);
Long cmodcount = (Long) document.get(COLLISIONSMODCOUNT);
boolean success = false;
+ boolean shouldRetry = true;
// every 16th update is a full rewrite
if (isAppendableUpdate(update) && modcount % 16 != 0) {
String appendData = SR.asString(update);
if (appendData.length() < tmd.getDataLimitInOctets() /
CHAR2OCTETRATIO) {
try {
- success = dbAppendingUpdate(connection, tmd,
document.getId(), modified, modifiedIsConditional, hasBinary, deletedOnce,
- modcount, cmodcount, oldmodcount, appendData);
+ success = dbAppendingUpdate(connection, tmd,
document.getId(), modified, modifiedIsConditional, hasBinary,
+ deletedOnce, modcount, cmodcount, oldmodcount,
appendData);
+ // if we get here, a retry is not going to help (the
SQL
+ // operation succeeded but simply did not select a row
+ // that could be updated, likely because of the check
on
+ // MODCOUNT
+ shouldRetry = false;
connection.commit();
} catch (SQLException ex) {
continueIfStringOverflow(ex);
@@ -1379,7 +1385,7 @@ public class RDBDocumentStore implements
}
}
}
- if (!success) {
+ if (!success && shouldRetry) {
data = SR.asString(document);
success = dbUpdate(connection, tmd, document.getId(),
modified, hasBinary, deletedOnce, modcount, cmodcount,
oldmodcount, data);
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractMultiDocumentStoreTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractMultiDocumentStoreTest.java?rev=1714034&r1=1714033&r2=1714034&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractMultiDocumentStoreTest.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractMultiDocumentStoreTest.java
Thu Nov 12 12:58:28 2015
@@ -36,7 +36,7 @@ public abstract class AbstractMultiDocum
ds2.dispose();
}
- @Parameterized.Parameters
+ @Parameterized.Parameters(name="{0}")
public static Collection<Object[]> fixtures() {
return fixtures(true);
}
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStorePerformanceTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStorePerformanceTest.java?rev=1714034&r1=1714033&r2=1714034&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStorePerformanceTest.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStorePerformanceTest.java
Thu Nov 12 12:58:28 2015
@@ -29,6 +29,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Test;
import org.slf4j.Logger;
@@ -40,7 +41,7 @@ import org.slf4j.LoggerFactory;
* These tests are disabled by default due to their long running time. On the
command line
* specify {@code -DDocumentStorePerformanceTest=true} to enable them.
*/
-public class DocumentStorePerformanceTest extends AbstractDocumentStoreTest {
+public class DocumentStorePerformanceTest extends
AbstractMultiDocumentStoreTest {
private static final Logger LOG =
LoggerFactory.getLogger(DocumentStorePerformanceTest.class);
private static final boolean ENABLED =
Boolean.getBoolean(DocumentStorePerformanceTest.class.getSimpleName());
@@ -313,4 +314,109 @@ public class DocumentStorePerformanceTes
LOG.info("document updates with property of size " + size + (growing ?
" (growing)" : "") + " for " + super.dsname + " was " + cnt + " in " + duration
+ "ms (" + (cnt / (duration / 1000f)) + "/s)");
}
+
+ @Test
+ public void testConcurrentUpdatePerf1DS() throws InterruptedException {
+ String id = this.getClass().getName() + ".testConcurrentUpdatePerf1DS";
+ concurrentUpdatePerf(id, 1);
+ }
+
+ @Test
+ public void testConcurrentUpdatePerf2DS() throws InterruptedException {
+ String id = this.getClass().getName() + ".testConcurrentUpdatePerf1DS";
+ concurrentUpdatePerf(id, 2);
+ }
+
+ private void concurrentUpdatePerf(String testName, int stores) throws
InterruptedException {
+ final String id = testName;
+ final long duration = 1000;
+
+ ds1.remove(Collection.NODES, id);
+ UpdateOp up = new UpdateOp(id, true);
+ up.set(Document.ID, id);
+ up.set(Document.MOD_COUNT, 1L);
+ up.set("c", 0L);
+ up.set("u", 0L);
+ super.ds1.create(Collection.NODES, Collections.singletonList(up));
+ removeMe.add(id);
+
+ final DocumentStore ts1 = ds1;
+ final DocumentStore ts2 = stores == 2 ? ds2 : ds1;
+
+ final AtomicBoolean threadTwoIsActive = new AtomicBoolean(false);
+ final AtomicBoolean threadOneIsDone = new AtomicBoolean(false);
+
+ Thread one = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ int failures = 0;
+ while (!threadTwoIsActive.get()) {
+ }
+ // operation that requires fetching the previous state
+ UpdateOp up = new UpdateOp(id, false);
+ up.increment("c", 1);
+ up.notEquals("qux", "qux");
+ long end = System.currentTimeMillis() + duration;
+ while (System.currentTimeMillis() < end) {
+ try {
+ ts1.update(Collection.NODES,
Collections.singletonList(id), up);
+ } catch (RuntimeException ex) {
+ failures += 1;
+ }
+ }
+ try {
+ UpdateOp up2 = new UpdateOp(id, false);
+ up2.set("cfailures", failures);
+ ts1.update(Collection.NODES,
Collections.singletonList(id), up2);
+ } catch (RuntimeException ex) {
+ }
+ threadOneIsDone.set(true);
+ }
+ }, "cond");
+
+ Thread two = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ int failures = 0;
+ // operation that does not require fetching the previous state
+ UpdateOp up = new UpdateOp(id, false);
+ up.set(Document.ID, id);
+ up.increment("u", 1);
+ while (!threadOneIsDone.get()) {
+ try {
+ ts2.update(Collection.NODES,
Collections.singletonList(id), up);
+ threadTwoIsActive.set(true);
+ } catch (RuntimeException ex) {
+ failures += 1;
+ }
+ }
+ threadTwoIsActive.set(true);
+ try {
+ UpdateOp up2 = new UpdateOp(id, false);
+ up2.set("ufailures", failures);
+ ts1.update(Collection.NODES,
Collections.singletonList(id), up2);
+ } catch (RuntimeException ex) {
+ }
+ }
+ }, "uncond");
+
+ two.start();
+ one.start();
+
+ two.join();
+ one.join();
+
+ // reading uncached because for some reason MongoDS doesn't see the
+ // changes made in ds2
+ NodeDocument nd = ds1.find(Collection.NODES, id, 0);
+ assertNotNull(nd);
+ int cc = nd.get("c") == null ? 0 :
Integer.valueOf(nd.get("c").toString());
+ int uc = nd.get("u") == null ? 0 :
Integer.valueOf(nd.get("u").toString());
+ long mc = nd.getModCount().longValue();
+ String msg = String.format(
+ "Concurrent updates %s on %s cond. updates: %d (failures: %s),
uncond. updates: %d (failures: %s), _modCount: %d, ops/sec: %d, %% of cond.
updates: %d",
+ stores == 1 ? "(one ds)" : "(two ds)", super.dsname, cc,
nd.get("cfailures"), uc, nd.get("ufailures"), mc,
+ mc * 1000 / duration, cc * 100 / mc);
+ LOG.info(msg);
+ }
}