Author: stefanegli
Date: Thu Jul 30 07:21:38 2020
New Revision: 1880432
URL: http://svn.apache.org/viewvc?rev=1880432&view=rev
Log:
OAK-9149 : Use batch calls in backgroundSplit - merges PR#243
Modified:
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentSplitTest.java
Modified:
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1880432&r1=1880431&r2=1880432&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
(original)
+++
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
Thu Jul 30 07:21:38 2020
@@ -48,6 +48,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -126,6 +127,7 @@ import org.apache.jackrabbit.oak.commons
import org.apache.jackrabbit.oak.stats.StatisticsProvider;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -322,6 +324,8 @@ public final class DocumentNodeStore
*/
private Thread backgroundUpdateThread;
+ BackgroundUpdateOperation backgroundUpdateOperation;
+
/**
* Monitor object to synchronize background writes.
*/
@@ -687,7 +691,7 @@ public final class DocumentNodeStore
"DocumentNodeStore background read thread " +
threadNamePostfix);
backgroundReadThread.setDaemon(true);
backgroundUpdateThread = new Thread(
- new BackgroundUpdateOperation(this, isDisposed),
+ backgroundUpdateOperation = new
BackgroundUpdateOperation(this, isDisposed),
"DocumentNodeStore background update thread " +
threadNamePostfix);
backgroundUpdateThread.setDaemon(true);
backgroundSweepThread = new Thread(
@@ -2378,7 +2382,10 @@ public final class DocumentNodeStore
private void backgroundSplit() {
Set<Path> invalidatedPaths = new HashSet<>();
+ Set<Path> pathsToinvalidate = new HashSet<>();
RevisionVector head = getHeadRevision();
+ List<UpdateOp> splitOps = new LinkedList<>();
+ List<String> removeCandiates = new LinkedList<>();
for (Iterator<String> it = splitCandidates.keySet().iterator();
it.hasNext();) {
String id = it.next();
NodeDocument doc = store.find(Collection.NODES, id);
@@ -2392,43 +2399,65 @@ public final class DocumentNodeStore
// already has a pending _lastRev update or an invalidation
// entry was already added in this backgroundSplit() call
if (unsavedLastRevisions.get(path) == null
- && invalidatedPaths.add(path)) {
- // create journal entry for cache invalidation
- JournalEntry entry =
JOURNAL.newDocument(getDocumentStore());
- entry.modified(path);
- Revision r = newRevision().asBranchRevision();
- UpdateOp journalOp = entry.asUpdateOp(r);
- if (store.create(JOURNAL, singletonList(journalOp))) {
- changes.invalidate(singletonList(r));
- LOG.debug("Journal entry {} created for split of
document {}",
- journalOp.getId(), path);
- } else {
- String msg = "Unable to create journal entry " +
- journalOp.getId() + " for document
invalidation. " +
- "Will be retried with next background split " +
- "operation.";
- throw new DocumentStoreException(msg);
- }
- }
- // apply the split operations
- NodeDocument before = null;
- if (!op.isNew() ||
- !store.create(Collection.NODES,
Collections.singletonList(op))) {
- before = store.createOrUpdate(Collection.NODES, op);
+ && !invalidatedPaths.contains(path)) {
+ pathsToinvalidate.add(path);
}
+ splitOps.add(op);
+ }
+ removeCandiates.add(id);
+ if (splitOps.size() >= getCreateOrUpdateBatchSize()) {
+ batchSplit(splitOps, pathsToinvalidate);
+ splitOps.clear();
+ invalidatedPaths.addAll(pathsToinvalidate);
+ pathsToinvalidate.clear();
+ splitCandidates.keySet().removeAll(removeCandiates);
+ removeCandiates.clear();
+ }
+ }
+
+ if (splitOps.size() > 0) {
+ batchSplit(splitOps, pathsToinvalidate);
+ splitCandidates.keySet().removeAll(removeCandiates);
+ }
+ }
+
+ private void batchSplit(List<UpdateOp> splitOps, Set<Path>
pathsToinvalidate) {
+ if (!pathsToinvalidate.isEmpty()) {
+ // create journal entry for cache invalidation
+ JournalEntry entry = JOURNAL.newDocument(getDocumentStore());
+ entry.modified(pathsToinvalidate);
+ Revision r = newRevision().asBranchRevision();
+ UpdateOp journalOp = entry.asUpdateOp(r);
+ if (store.create(JOURNAL, singletonList(journalOp))) {
+ changes.invalidate(singletonList(r));
+ LOG.debug("Journal entry {} created for split of document(s)
{}",
+ journalOp.getId(), pathsToinvalidate);
+ } else {
+ String msg = "Unable to create journal entry " +
+ journalOp.getId() + " for document invalidation. " +
+ "Will be retried with next background split " +
+ "operation.";
+ throw new DocumentStoreException(msg);
+ }
+ }
+ // apply the split operations
+ List<NodeDocument> beforeList = store.createOrUpdate(Collection.NODES,
splitOps);
+ if (LOG.isDebugEnabled()) {
+ // this is rather expensive - but given we were doing log.debug
before
+ // the batchSplit mechanism, so this somewhat negates the batch
improvement indeed
+ for (int i = 0; i < splitOps.size(); i++) {
+ UpdateOp op = splitOps.get(i);
+ NodeDocument before = beforeList.size() > i ?
beforeList.get(i) : null;
if (before != null) {
- if (LOG.isDebugEnabled()) {
- NodeDocument after = store.find(Collection.NODES,
op.getId());
- if (after != null) {
- LOG.debug("Split operation on {}. Size before: {},
after: {}",
- id, before.getMemory(), after.getMemory());
- }
+ NodeDocument after = store.find(Collection.NODES,
op.getId());
+ if (after != null) {
+ LOG.debug("Split operation on {}. Size before: {},
after: {}",
+ op.getId(), before.getMemory(),
after.getMemory());
}
} else {
LOG.debug("Split operation created {}", op.getId());
}
}
- it.remove();
}
}
@@ -3160,6 +3189,23 @@ public final class DocumentNodeStore
}
}
+ /**
+ * FOR TESTING ONLY :
+ * stops the backgroundUpdateThread (by overwriting its
+ * isDisposed flag) and optionally waits for the thread to
+ * terminate.
+ * @param timeoutMillis optional amount of millis to wait for the thread
to terminate at max
+ * @return true if thread is no longer running
+ */
+ @TestOnly
+ boolean stopBackgroundUpdateThread(long timeoutMillis) throws
InterruptedException {
+ backgroundUpdateOperation.forceStop();
+ if (timeoutMillis > 0) {
+ backgroundUpdateThread.join(timeoutMillis);
+ }
+ return !backgroundUpdateThread.isAlive();
+ }
+
public DocumentNodeStoreMBean getMBean() {
return mbean;
}
@@ -3176,7 +3222,7 @@ public final class DocumentNodeStore
private static abstract class NodeStoreTask implements Runnable {
final WeakReference<DocumentNodeStore> ref;
- private final AtomicBoolean isDisposed;
+ private AtomicBoolean isDisposed;
private final Supplier<Integer> delaySupplier;
private boolean failing;
@@ -3202,6 +3248,10 @@ public final class DocumentNodeStore
this(nodeStore, isDisposed, null);
}
+ void forceStop() {
+ isDisposed = new AtomicBoolean(true);
+ }
+
protected abstract void execute(@NotNull DocumentNodeStore nodeStore);
@Override
Modified:
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentSplitTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentSplitTest.java?rev=1880432&r1=1880431&r2=1880432&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentSplitTest.java
(original)
+++
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentSplitTest.java
Thu Jul 30 07:21:38 2020
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
@@ -35,6 +36,7 @@ import org.apache.jackrabbit.oak.api.Pro
import org.apache.jackrabbit.oak.commons.PathUtils;
import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key;
import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Operation;
+import
org.apache.jackrabbit.oak.plugins.document.VersionGarbageCollector.VersionGCStats;
import org.apache.jackrabbit.oak.spi.blob.MemoryBlobStore;
import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
import org.apache.jackrabbit.oak.plugins.document.util.Utils;
@@ -45,7 +47,11 @@ import org.apache.jackrabbit.oak.spi.sta
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.apache.jackrabbit.oak.stats.Clock;
import org.jetbrains.annotations.NotNull;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
@@ -81,6 +87,118 @@ import static org.junit.Assert.fail;
*/
public class DocumentSplitTest extends BaseDocumentMKTest {
+ private static final Logger LOG =
LoggerFactory.getLogger(DocumentSplitTest.class);
+
+ private String createOrUpdateBatchSize;
+ private boolean createOrUpdateBatchSizeIsNull;
+
+ @Before
+ public void backupProperty() {
+ createOrUpdateBatchSize =
System.getProperty("oak.documentMK.createOrUpdateBatchSize");
+ if (createOrUpdateBatchSize == null) {
+ createOrUpdateBatchSizeIsNull = true;
+ }
+ }
+
+ @After
+ public void restoreProperty() {
+ if (createOrUpdateBatchSize != null) {
+ System.setProperty("oak.documentMK.createOrUpdateBatchSize",
createOrUpdateBatchSize);
+ } else if (createOrUpdateBatchSizeIsNull) {
+ System.clearProperty("oak.documentMK.createOrUpdateBatchSize");
+ }
+ }
+
+ @Test
+ public void largeBatchSplitTest() throws Exception {
+ for(int i=1; i<21; i+=5) {
+ batchSplitTest(1000, i * 1000);
+ }
+ }
+
+ @Test
+ public void mediumBatchSplitTest() throws Exception {
+ batchSplitTest(50, 1000);
+ }
+
+ @Test
+ public void smallBatchSplitTest() throws Exception {
+ batchSplitTest(1, 1000);
+ }
+
+ private void batchSplitTest(int batchSize, int splitDocCnt) throws
Exception {
+ LOG.info("batchSplitTest: batchSize = " + batchSize+ ", splitDocCnt =
" + splitDocCnt);
+ // this tests wants to use CountingDocumentStore - hence creating a
fresh DocumentMk
+ // plus it wants to set the batchSize
+ if (mk != null) {
+ mk.dispose();
+ mk = null;
+ }
+
+ System.setProperty("oak.documentMK.createOrUpdateBatchSize",
String.valueOf(batchSize));
+ DocumentMK.Builder mkBuilder = new DocumentMK.Builder();
+ MemoryDocumentStore delegateStore = new MemoryDocumentStore();
+ CountingDocumentStore store = new CountingDocumentStore(delegateStore);
+ mkBuilder.setDocumentStore(store);
+ mk = mkBuilder.open();
+ DocumentNodeStore ns = mk.getNodeStore();
+ assertTrue(ns.stopBackgroundUpdateThread(10000));
+ assertEquals(batchSize, ns.getCreateOrUpdateBatchSize());
+
+ NodeBuilder builder = ns.getRoot().builder();
+ for(int child = 0; child < 100; child++) {
+ builder.child("testchild-" + child);
+ }
+ ns.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ for(int i=0; i<2; i++) {
+ builder = ns.getRoot().builder();
+ for(int child = 0; child < splitDocCnt; child++) {
+ PropertyState binary = binaryProperty("prop", randomBytes(5 *
1024));
+ builder.child("testchild-" + child).setProperty(binary);
+ }
+ ns.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ }
+ store.resetCounters();
+ ns.runBackgroundUpdateOperations();
+ int createOrUpdates = store.getNumCreateOrUpdateCalls(NODES);
+ int minBatchSplitCalls = Math.max(1, splitDocCnt / Math.max(1,
batchSize / 2));
+ int maxBatchSplitCalls = minBatchSplitCalls + Math.max(1, splitDocCnt
% batchSize);
+ // backgroundWrite could issue another 2 writes, so:
+ maxBatchSplitCalls += 2;
+ assertTrue("batchSize = " + batchSize
+ + ", splitDocCnt = " + splitDocCnt
+ + ", minBatchSplitCalls=" + minBatchSplitCalls
+ + ", createOrUpdates=" + createOrUpdates,
+ minBatchSplitCalls <= createOrUpdates);
+ assertTrue("batchSize = " + batchSize
+ + ", splitDocCnt = " + splitDocCnt
+ + ", minBatchSplitCalls=" + minBatchSplitCalls
+ + ", maxBatchSplitCalls=" + maxBatchSplitCalls
+ + ", createOrUpdates="+createOrUpdates,
+ maxBatchSplitCalls >= createOrUpdates);
+ VersionGarbageCollector gc = ns.getVersionGarbageCollector();
+
+ int actualSplitDocGCCount = 0;
+ long timeout = ns.getClock().getTime() + 10000;
+ while(actualSplitDocGCCount != splitDocCnt && ns.getClock().getTime()
< timeout) {
+ VersionGCStats stats = gc.gc(1, TimeUnit.MILLISECONDS);
+ actualSplitDocGCCount += stats.splitDocGCCount;
+ if (actualSplitDocGCCount != splitDocCnt) {
+ LOG.info("batchSplitTest: Expected " + splitDocCnt + ", actual
" + actualSplitDocGCCount);
+ // advance time a bit to ensure gc does clean up the split docs
+ ns.getClock().waitUntil(ns.getClock().getTime() + 1000);
+ ns.runBackgroundUpdateOperations();
+ }
+ }
+
+ // make sure those splitDocCnt split docs are deleted
+ assertEquals("gc not as expected: expected " + splitDocCnt
+ + ", got " + actualSplitDocGCCount, splitDocCnt,
actualSplitDocGCCount);
+ mk.dispose();
+ mk = null;
+ }
+
@Test
public void splitRevisions() throws Exception {
DocumentStore store = mk.getDocumentStore();