Author: stefanegli
Date: Thu Jul 30 07:21:38 2020
New Revision: 1880432

URL: http://svn.apache.org/viewvc?rev=1880432&view=rev
Log:
OAK-9149 : Use batch calls in backgroundSplit - merges PR#243

Modified:
    
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
    
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentSplitTest.java

Modified: 
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1880432&r1=1880431&r2=1880432&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
 Thu Jul 30 07:21:38 2020
@@ -48,6 +48,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -126,6 +127,7 @@ import org.apache.jackrabbit.oak.commons
 import org.apache.jackrabbit.oak.stats.StatisticsProvider;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -322,6 +324,8 @@ public final class DocumentNodeStore
      */
     private Thread backgroundUpdateThread;
 
+    BackgroundUpdateOperation backgroundUpdateOperation;
+
     /**
      * Monitor object to synchronize background writes.
      */
@@ -687,7 +691,7 @@ public final class DocumentNodeStore
                 "DocumentNodeStore background read thread " + 
threadNamePostfix);
         backgroundReadThread.setDaemon(true);
         backgroundUpdateThread = new Thread(
-                new BackgroundUpdateOperation(this, isDisposed),
+                backgroundUpdateOperation = new 
BackgroundUpdateOperation(this, isDisposed),
                 "DocumentNodeStore background update thread " + 
threadNamePostfix);
         backgroundUpdateThread.setDaemon(true);
         backgroundSweepThread = new Thread(
@@ -2378,7 +2382,10 @@ public final class DocumentNodeStore
 
     private void backgroundSplit() {
         Set<Path> invalidatedPaths = new HashSet<>();
+        Set<Path> pathsToinvalidate = new HashSet<>();
         RevisionVector head = getHeadRevision();
+        List<UpdateOp> splitOps = new LinkedList<>();
+        List<String> removeCandiates = new LinkedList<>();
         for (Iterator<String> it = splitCandidates.keySet().iterator(); 
it.hasNext();) {
             String id = it.next();
             NodeDocument doc = store.find(Collection.NODES, id);
@@ -2392,43 +2399,65 @@ public final class DocumentNodeStore
                 // already has a pending _lastRev update or an invalidation
                 // entry was already added in this backgroundSplit() call
                 if (unsavedLastRevisions.get(path) == null
-                        && invalidatedPaths.add(path)) {
-                    // create journal entry for cache invalidation
-                    JournalEntry entry = 
JOURNAL.newDocument(getDocumentStore());
-                    entry.modified(path);
-                    Revision r = newRevision().asBranchRevision();
-                    UpdateOp journalOp = entry.asUpdateOp(r);
-                    if (store.create(JOURNAL, singletonList(journalOp))) {
-                        changes.invalidate(singletonList(r));
-                        LOG.debug("Journal entry {} created for split of 
document {}",
-                                journalOp.getId(), path);
-                    } else {
-                        String msg = "Unable to create journal entry " +
-                                journalOp.getId() + " for document 
invalidation. " +
-                                "Will be retried with next background split " +
-                                "operation.";
-                        throw new DocumentStoreException(msg);
-                    }
-                }
-                // apply the split operations
-                NodeDocument before = null;
-                if (!op.isNew() ||
-                        !store.create(Collection.NODES, 
Collections.singletonList(op))) {
-                    before = store.createOrUpdate(Collection.NODES, op);
+                        && !invalidatedPaths.contains(path)) {
+                    pathsToinvalidate.add(path);
                 }
+                splitOps.add(op);
+            }
+            removeCandiates.add(id);
+            if (splitOps.size() >= getCreateOrUpdateBatchSize()) {
+                batchSplit(splitOps, pathsToinvalidate);
+                splitOps.clear();
+                invalidatedPaths.addAll(pathsToinvalidate);
+                pathsToinvalidate.clear();
+                splitCandidates.keySet().removeAll(removeCandiates);
+                removeCandiates.clear();
+            }
+        }
+
+        if (splitOps.size() > 0) {
+            batchSplit(splitOps, pathsToinvalidate);
+            splitCandidates.keySet().removeAll(removeCandiates);
+        }
+    }
+
+    private void batchSplit(List<UpdateOp> splitOps, Set<Path> 
pathsToinvalidate) {
+        if (!pathsToinvalidate.isEmpty()) {
+            // create journal entry for cache invalidation
+            JournalEntry entry = JOURNAL.newDocument(getDocumentStore());
+            entry.modified(pathsToinvalidate);
+            Revision r = newRevision().asBranchRevision();
+            UpdateOp journalOp = entry.asUpdateOp(r);
+            if (store.create(JOURNAL, singletonList(journalOp))) {
+                changes.invalidate(singletonList(r));
+                LOG.debug("Journal entry {} created for split of document(s) 
{}",
+                        journalOp.getId(), pathsToinvalidate);
+            } else {
+                String msg = "Unable to create journal entry " +
+                        journalOp.getId() + " for document invalidation. " +
+                        "Will be retried with next background split " +
+                        "operation.";
+                throw new DocumentStoreException(msg);
+            }
+        }
+        // apply the split operations
+        List<NodeDocument> beforeList = store.createOrUpdate(Collection.NODES, 
splitOps);
+        if (LOG.isDebugEnabled()) {
+            // this is rather expensive - but given we were doing log.debug 
before
+            // the batchSplit mechanism, so this somewhat negates the batch 
improvement indeed
+            for (int i = 0; i < splitOps.size(); i++) {
+                UpdateOp op = splitOps.get(i);
+                NodeDocument before = beforeList.size() > i ? 
beforeList.get(i) : null;
                 if (before != null) {
-                    if (LOG.isDebugEnabled()) {
-                        NodeDocument after = store.find(Collection.NODES, 
op.getId());
-                        if (after != null) {
-                            LOG.debug("Split operation on {}. Size before: {}, 
after: {}",
-                                    id, before.getMemory(), after.getMemory());
-                        }
+                    NodeDocument after = store.find(Collection.NODES, 
op.getId());
+                    if (after != null) {
+                        LOG.debug("Split operation on {}. Size before: {}, 
after: {}",
+                                op.getId(), before.getMemory(), 
after.getMemory());
                     }
                 } else {
                     LOG.debug("Split operation created {}", op.getId());
                 }
             }
-            it.remove();
         }
     }
 
@@ -3160,6 +3189,23 @@ public final class DocumentNodeStore
         }
     }
 
+    /**
+     * FOR TESTING ONLY :
+     * stops the backgroundUpdateThread (by overwriting its
+     * isDisposed flag) and optionally waits for the thread to
+     * terminate.
+     * @param timeoutMillis optional amount of millis to wait for the thread 
to terminate at max
+     * @return true if thread is no longer running
+     */
+    @TestOnly
+    boolean stopBackgroundUpdateThread(long timeoutMillis) throws 
InterruptedException {
+        backgroundUpdateOperation.forceStop();
+        if (timeoutMillis > 0) {
+            backgroundUpdateThread.join(timeoutMillis);
+        }
+        return !backgroundUpdateThread.isAlive();
+    }
+
     public DocumentNodeStoreMBean getMBean() {
         return mbean;
     }
@@ -3176,7 +3222,7 @@ public final class DocumentNodeStore
 
     private static abstract class NodeStoreTask implements Runnable {
         final WeakReference<DocumentNodeStore> ref;
-        private final AtomicBoolean isDisposed;
+        private AtomicBoolean isDisposed;
         private final Supplier<Integer> delaySupplier;
         private boolean failing;
 
@@ -3202,6 +3248,10 @@ public final class DocumentNodeStore
             this(nodeStore, isDisposed, null);
         }
 
+        void forceStop() {
+            isDisposed = new AtomicBoolean(true);
+        }
+
         protected abstract void execute(@NotNull DocumentNodeStore nodeStore);
 
         @Override

Modified: 
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentSplitTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentSplitTest.java?rev=1880432&r1=1880431&r2=1880432&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentSplitTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentSplitTest.java
 Thu Jul 30 07:21:38 2020
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableList;
@@ -35,6 +36,7 @@ import org.apache.jackrabbit.oak.api.Pro
 import org.apache.jackrabbit.oak.commons.PathUtils;
 import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key;
 import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Operation;
+import 
org.apache.jackrabbit.oak.plugins.document.VersionGarbageCollector.VersionGCStats;
 import org.apache.jackrabbit.oak.spi.blob.MemoryBlobStore;
 import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
 import org.apache.jackrabbit.oak.plugins.document.util.Utils;
@@ -45,7 +47,11 @@ import org.apache.jackrabbit.oak.spi.sta
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
 import org.apache.jackrabbit.oak.stats.Clock;
 import org.jetbrains.annotations.NotNull;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
@@ -81,6 +87,118 @@ import static org.junit.Assert.fail;
  */
 public class DocumentSplitTest extends BaseDocumentMKTest {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(DocumentSplitTest.class);
+
+    private String createOrUpdateBatchSize;
+    private boolean createOrUpdateBatchSizeIsNull;
+
+    @Before
+    public void backupProperty() {
+        createOrUpdateBatchSize = 
System.getProperty("oak.documentMK.createOrUpdateBatchSize");
+        if (createOrUpdateBatchSize == null) {
+            createOrUpdateBatchSizeIsNull = true;
+        }
+    }
+
+    @After
+    public void restoreProperty() {
+        if (createOrUpdateBatchSize != null) {
+            System.setProperty("oak.documentMK.createOrUpdateBatchSize", 
createOrUpdateBatchSize);
+        } else if (createOrUpdateBatchSizeIsNull) {
+            System.clearProperty("oak.documentMK.createOrUpdateBatchSize");
+        }
+    }
+
+    @Test
+    public void largeBatchSplitTest() throws Exception {
+        for(int i=1; i<21; i+=5) {
+            batchSplitTest(1000, i * 1000);
+        }
+    }
+
+    @Test
+    public void mediumBatchSplitTest() throws Exception {
+        batchSplitTest(50, 1000);
+    }
+
+    @Test
+    public void smallBatchSplitTest() throws Exception {
+        batchSplitTest(1, 1000);
+    }
+
+    private void batchSplitTest(int batchSize, int splitDocCnt) throws 
Exception {
+        LOG.info("batchSplitTest: batchSize = " + batchSize+ ", splitDocCnt = 
" + splitDocCnt);
+        // this tests wants to use CountingDocumentStore - hence creating a 
fresh DocumentMk
+        // plus it wants to set the batchSize
+        if (mk != null) {
+            mk.dispose();
+            mk = null;
+        }
+
+        System.setProperty("oak.documentMK.createOrUpdateBatchSize", 
String.valueOf(batchSize));
+        DocumentMK.Builder mkBuilder = new DocumentMK.Builder();
+        MemoryDocumentStore delegateStore = new MemoryDocumentStore();
+        CountingDocumentStore store = new CountingDocumentStore(delegateStore);
+        mkBuilder.setDocumentStore(store);
+        mk = mkBuilder.open();
+        DocumentNodeStore ns = mk.getNodeStore();
+        assertTrue(ns.stopBackgroundUpdateThread(10000));
+        assertEquals(batchSize, ns.getCreateOrUpdateBatchSize());
+
+        NodeBuilder builder = ns.getRoot().builder();
+        for(int child = 0; child < 100; child++) {
+            builder.child("testchild-" + child);
+        }
+        ns.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        for(int i=0; i<2; i++) {
+            builder = ns.getRoot().builder();
+            for(int child = 0; child < splitDocCnt; child++) {
+                PropertyState binary = binaryProperty("prop", randomBytes(5 * 
1024));
+                builder.child("testchild-" + child).setProperty(binary);
+            }
+            ns.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+        }
+        store.resetCounters();
+        ns.runBackgroundUpdateOperations();
+        int createOrUpdates = store.getNumCreateOrUpdateCalls(NODES);
+        int minBatchSplitCalls = Math.max(1, splitDocCnt / Math.max(1, 
batchSize / 2));
+        int maxBatchSplitCalls = minBatchSplitCalls + Math.max(1, splitDocCnt 
% batchSize);
+        // backgroundWrite could issue another 2 writes, so:
+        maxBatchSplitCalls += 2;
+        assertTrue("batchSize = " + batchSize
+                + ", splitDocCnt = " + splitDocCnt
+                + ", minBatchSplitCalls=" + minBatchSplitCalls
+                + ", createOrUpdates=" + createOrUpdates,
+                minBatchSplitCalls <= createOrUpdates);
+        assertTrue("batchSize = " + batchSize
+                + ", splitDocCnt = " + splitDocCnt
+                + ", minBatchSplitCalls=" + minBatchSplitCalls
+                + ", maxBatchSplitCalls=" + maxBatchSplitCalls
+                + ", createOrUpdates="+createOrUpdates,
+                maxBatchSplitCalls >= createOrUpdates);
+        VersionGarbageCollector gc = ns.getVersionGarbageCollector();
+
+        int actualSplitDocGCCount = 0;
+        long timeout = ns.getClock().getTime() + 10000;
+        while(actualSplitDocGCCount != splitDocCnt && ns.getClock().getTime() 
< timeout) {
+            VersionGCStats stats = gc.gc(1, TimeUnit.MILLISECONDS);
+            actualSplitDocGCCount += stats.splitDocGCCount;
+            if (actualSplitDocGCCount != splitDocCnt) {
+                LOG.info("batchSplitTest: Expected " + splitDocCnt + ", actual 
" + actualSplitDocGCCount);
+                // advance time a bit to ensure gc does clean up the split docs
+                ns.getClock().waitUntil(ns.getClock().getTime() + 1000);
+                ns.runBackgroundUpdateOperations();
+            }
+        }
+
+        // make sure those splitDocCnt split docs are deleted
+        assertEquals("gc not as expected: expected " + splitDocCnt
+                + ", got " + actualSplitDocGCCount, splitDocCnt, 
actualSplitDocGCCount);
+        mk.dispose();
+        mk = null;
+    }
+
     @Test
     public void splitRevisions() throws Exception {
         DocumentStore store = mk.getDocumentStore();


Reply via email to