Author: stefanegli
Date: Wed Nov 18 08:50:52 2020
New Revision: 1883591

URL: http://svn.apache.org/viewvc?rev=1883591&view=rev
Log:
OAK-9149 : Phased, batched backgroundSplit - merges PR#260

Added:
    
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentBatchSplitTest.java
   (with props)
Modified:
    
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
    
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
    
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java

Modified: 
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1883591&r1=1883590&r2=1883591&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
 Wed Nov 18 08:50:52 2020
@@ -2451,58 +2451,126 @@ public final class DocumentNodeStore
     }
 
     private void backgroundSplit() {
-        Set<Path> invalidatedPaths = new HashSet<>();
+        final int initialCapacity = getCreateOrUpdateBatchSize() + 4;
+        Set<Path> invalidatedPaths = new HashSet<>(initialCapacity);
+        Set<Path> pathsToInvalidate = new HashSet<>(initialCapacity);
         RevisionVector head = getHeadRevision();
-        for (Iterator<String> it = splitCandidates.keySet().iterator(); 
it.hasNext();) {
-            String id = it.next();
+        // OAK-9149 : With backgroundSplit being done in batches, the
+        // updateOps must be executed in "phases".
+        // Reason being that the (DocumentStore) batch calls
+        // are not atomic. That means they could potentially
+        // be partially executed only - without any guarantees on
+        // which part is executed and which not.
+        // The split algorithm, however, requires that
+        // a part of the operations, namely intermediate/garbage/split ops,
+        // are executed *before* the main document is updated.
+        // In order to reflect this necessity in the batch variant,
+        // all those intermediate/garbage/split updateOps are grouped
+        // into a first phase - and the main document updateOps in a second 
phase.
+        // That way, if the first phase fails, partially, the main documents
+        // are not yet touched.
+        // TODO but if the split fails, we create actual garbage that cannot
+        // be cleaned up later, since there is no "pointer" to it. That's
+        // something to look at/consider at some point.
+
+        // phase1 therefore only contains intermediate/garbage/split updateOps
+        List<UpdateOp> splitOpsPhase1 = new ArrayList<>(initialCapacity);
+        // phase2 contains main document updateOps.
+        List<UpdateOp> splitOpsPhase2 = new ArrayList<>(initialCapacity);
+        List<String> removeCandidates = new ArrayList<>(initialCapacity);
+        for (String id : splitCandidates.keySet()) {
             NodeDocument doc = store.find(Collection.NODES, id);
             if (doc == null) {
                 continue;
             }
             cleanCollisions(doc, collisionGarbageBatchSize);
-            for (UpdateOp op : doc.split(this, head, binarySize)) {
+            Iterator<UpdateOp> it = doc.split(this, head, 
binarySize).iterator();
+            while(it.hasNext()) {
+                UpdateOp op = it.next();
                 Path path = doc.getPath();
                 // add an invalidation journal entry, unless the path
                 // already has a pending _lastRev update or an invalidation
                 // entry was already added in this backgroundSplit() call
-                if (unsavedLastRevisions.get(path) == null
-                        && invalidatedPaths.add(path)) {
-                    // create journal entry for cache invalidation
-                    JournalEntry entry = 
JOURNAL.newDocument(getDocumentStore());
-                    entry.modified(path);
-                    Revision r = newRevision().asBranchRevision();
-                    UpdateOp journalOp = entry.asUpdateOp(r);
-                    if (store.create(JOURNAL, singletonList(journalOp))) {
-                        changes.invalidate(singletonList(r));
-                        LOG.debug("Journal entry {} created for split of 
document {}",
-                                journalOp.getId(), path);
-                    } else {
-                        String msg = "Unable to create journal entry " +
-                                journalOp.getId() + " for document 
invalidation. " +
-                                "Will be retried with next background split " +
-                                "operation.";
-                        throw new DocumentStoreException(msg);
-                    }
+                if (unsavedLastRevisions.get(path) == null && 
!invalidatedPaths.contains(path)) {
+                    pathsToInvalidate.add(path);
                 }
-                // apply the split operations
-                NodeDocument before = null;
-                if (!op.isNew() ||
-                        !store.create(Collection.NODES, 
Collections.singletonList(op))) {
-                    before = store.createOrUpdate(Collection.NODES, op);
+                // the last entry is the main document update
+                // (as per updated NodeDocument.split documentation).
+                if (it.hasNext()) {
+                    splitOpsPhase1.add(op);
+                } else {
+                    splitOpsPhase2.add(op);
                 }
+            }
+            removeCandidates.add(id);
+            if (splitOpsPhase1.size() >= getCreateOrUpdateBatchSize()
+                    || splitOpsPhase2.size() >= getCreateOrUpdateBatchSize()) {
+                invalidatePaths(pathsToInvalidate);
+                batchSplit(splitOpsPhase1);
+                batchSplit(splitOpsPhase2);
+                invalidatedPaths.addAll(pathsToInvalidate);
+                pathsToInvalidate.clear();
+                splitOpsPhase1.clear();
+                splitOpsPhase2.clear();
+                splitCandidates.keySet().removeAll(removeCandidates);
+                removeCandidates.clear();
+            }
+        }
+
+        if (splitOpsPhase1.size() + splitOpsPhase2.size() > 0) {
+            invalidatePaths(pathsToInvalidate);
+            batchSplit(splitOpsPhase1);
+            batchSplit(splitOpsPhase2);
+            splitCandidates.keySet().removeAll(removeCandidates);
+        }
+    }
+
+    private void invalidatePaths(@NotNull Set<Path> pathsToInvalidate) {
+        if (pathsToInvalidate.isEmpty()) {
+            // nothing to do
+            return;
+        }
+        // create journal entry for cache invalidation
+        JournalEntry entry = JOURNAL.newDocument(getDocumentStore());
+        entry.modified(pathsToInvalidate);
+        Revision r = newRevision().asBranchRevision();
+        UpdateOp journalOp = entry.asUpdateOp(r);
+        if (store.create(JOURNAL, singletonList(journalOp))) {
+            changes.invalidate(singletonList(r));
+            LOG.debug("Journal entry {} created for split of document(s) {}",
+                    journalOp.getId(), pathsToInvalidate);
+        } else {
+            String msg = "Unable to create journal entry " +
+                    journalOp.getId() + " for document invalidation. " +
+                    "Will be retried with next background split " +
+                    "operation.";
+            throw new DocumentStoreException(msg);
+        }
+    }
+
+    private void batchSplit(@NotNull List<UpdateOp> splitOps) {
+        if (splitOps.isEmpty()) {
+            // nothing to do
+            return;
+        }
+        // apply the split operations
+        List<NodeDocument> beforeList = store.createOrUpdate(Collection.NODES, 
splitOps);
+        if (LOG.isDebugEnabled()) {
+            // this is rather expensive - but given we were doing log.debug 
before
+            // the batchSplit mechanism, so this somewhat negates the batch 
improvement indeed
+            for (int i = 0; i < splitOps.size(); i++) {
+                UpdateOp op = splitOps.get(i);
+                NodeDocument before = beforeList.size() > i ? 
beforeList.get(i) : null;
                 if (before != null) {
-                    if (LOG.isDebugEnabled()) {
-                        NodeDocument after = store.find(Collection.NODES, 
op.getId());
-                        if (after != null) {
-                            LOG.debug("Split operation on {}. Size before: {}, 
after: {}",
-                                    id, before.getMemory(), after.getMemory());
-                        }
+                    NodeDocument after = store.find(Collection.NODES, 
op.getId());
+                    if (after != null) {
+                        LOG.debug("Split operation on {}. Size before: {}, 
after: {}",
+                                op.getId(), before.getMemory(), 
after.getMemory());
                     }
                 } else {
                     LOG.debug("Split operation created {}", op.getId());
                 }
             }
-            it.remove();
         }
     }
 

Modified: 
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java?rev=1883591&r1=1883590&r2=1883591&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
 Wed Nov 18 08:50:52 2020
@@ -1170,7 +1170,8 @@ public final class NodeDocument extends
      *                the document store.
      * @param binarySize a function that returns the binary size of the given
      *                   JSON property value String.
-     * @return the split operations.
+     * @return the split operations, whereby the last updateOp is guaranteed 
to be
+     * the update of the main document (unless the entire list is empty)
      */
     @NotNull
     public Iterable<UpdateOp> split(@NotNull RevisionContext context,

Modified: 
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java?rev=1883591&r1=1883590&r2=1883591&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java
 Wed Nov 18 08:50:52 2020
@@ -81,6 +81,28 @@ public class TimingDocumentStoreWrapper
         lastLogTime = now();
     }
 
+    public void reset() {
+        startTime = 0;
+        counts.clear();
+        lastLogTime = 0;
+        totalLogTime = 0;
+        slowCalls.clear();
+    }
+
+    public long getOverallTime() {
+        long overallTime = 0;
+        for (Count count : counts.values()) {
+            overallTime += count.total;
+        }
+        return overallTime;
+    }
+
+    public long getAndResetOverallTime() {
+        final long result = getOverallTime();
+        reset();
+        return result;
+    }
+
     private boolean logCommonCall() {
         return callCount % 10 == 0;
     }

Added: 
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentBatchSplitTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentBatchSplitTest.java?rev=1883591&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentBatchSplitTest.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentBatchSplitTest.java
 Wed Nov 18 08:50:52 2020
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
+import static 
org.apache.jackrabbit.oak.plugins.memory.BinaryPropertyState.binaryProperty;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.jackrabbit.oak.api.PropertyState;
+import 
org.apache.jackrabbit.oak.plugins.document.VersionGarbageCollector.VersionGCStats;
+import 
org.apache.jackrabbit.oak.plugins.document.util.TimingDocumentStoreWrapper;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.LoggerContext;
+
+/**
+ * Check correct splitting of documents (OAK-926 & OAK-1342).
+ */
+@RunWith(Parameterized.class)
+public class DocumentBatchSplitTest {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DocumentBatchSplitTest.class);
+
+    private String createOrUpdateBatchSize;
+    private boolean createOrUpdateBatchSizeIsNull;
+
+    private DocumentStoreFixture fixture;
+    protected DocumentMK mk;
+
+    public DocumentBatchSplitTest(DocumentStoreFixture fixture) {
+        this.fixture = fixture;
+    }
+
+    @Parameterized.Parameters(name="{0}")
+    public static java.util.Collection<Object[]> fixtures() throws IOException 
{
+        List<Object[]> fixtures = Lists.newArrayList();
+        fixtures.add(new Object[] {new DocumentStoreFixture.MemoryFixture()});
+
+        DocumentStoreFixture mongo = new DocumentStoreFixture.MongoFixture();
+        if(mongo.isAvailable()){
+            fixtures.add(new Object[] {mongo});
+        }
+        return fixtures;
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (mk != null) {
+            mk.dispose();
+            mk = null;
+        }
+        fixture.dispose();
+        // reset log level to default
+        enableLevel("org", null);
+    }
+
+    @Before
+    public void backupProperty() {
+        createOrUpdateBatchSize = 
System.getProperty("oak.documentMK.createOrUpdateBatchSize");
+        if (createOrUpdateBatchSize == null) {
+            createOrUpdateBatchSizeIsNull = true;
+        }
+    }
+
+    @After
+    public void restoreProperty() {
+        if (createOrUpdateBatchSize != null) {
+            System.setProperty("oak.documentMK.createOrUpdateBatchSize", 
createOrUpdateBatchSize);
+        } else if (createOrUpdateBatchSizeIsNull) {
+            System.clearProperty("oak.documentMK.createOrUpdateBatchSize");
+        }
+    }
+
+    @Test
+    @Ignore(value = "useful for benchmarking only, long execution duration")
+    public void batchSplitBenchmark() throws Exception {
+        int[] batchSizes = new int[] 
{1,2,10,30,50,75,100,200,300,400,500,1000,2000,5000,10000};
+        for (int batchSize : batchSizes) {
+            batchSplitTest(batchSize, 10000);
+            batchSplitTest(batchSize, 10000);
+        }
+    }
+
+    @Test
+    public void largeBatchSplit() throws Exception {
+        batchSplitTest(200, 1000);
+    }
+
+    @Test
+    public void mediumBatchSplit() throws Exception {
+        batchSplitTest(50, 1000);
+    }
+
+    @Test
+    public void smallBatchSplit() throws Exception {
+        batchSplitTest(2, 1000);
+    }
+
+    @Test
+    public void noBatchSplit() throws Exception {
+        batchSplitTest(1, 1000);
+    }
+
+    /** Make sure we have a test that has log level set to DEBUG */
+    @Test
+    public void debugLogLevelBatchSplit() throws Exception {
+        enableLevel("org", Level.DEBUG);
+        batchSplitTest(50, 1000);
+    }
+
+    private void batchSplitTest(int batchSize, int splitDocCnt) throws 
Exception {
+        LOG.info("batchSplitTest: batchSize = " + batchSize+ ", splitDocCnt = 
" + splitDocCnt +
+                ", fixture = " + fixture);
+        // this tests wants to use CountingDocumentStore
+        // plus it wants to set the batchSize
+        if (mk != null) {
+            mk.dispose();
+            mk = null;
+        }
+        if (fixture.getName().equals("MongoDB")) {
+            MongoUtils.dropCollections(MongoUtils.DB);
+        }
+
+        System.setProperty("oak.documentMK.createOrUpdateBatchSize", 
String.valueOf(batchSize));
+
+        DocumentMK.Builder mkBuilder = new DocumentMK.Builder();
+        DocumentStore delegateStore = fixture.createDocumentStore();
+        TimingDocumentStoreWrapper timingStore = new 
TimingDocumentStoreWrapper(delegateStore);
+        CountingDocumentStore store = new CountingDocumentStore(timingStore);
+        mkBuilder.setDocumentStore(store);
+        // disable automatic background operations
+        mkBuilder.setAsyncDelay(0);
+        mk = mkBuilder.open();
+        DocumentNodeStore ns = mk.getNodeStore();
+        assertEquals(batchSize, ns.getCreateOrUpdateBatchSize());
+
+        NodeBuilder builder = ns.getRoot().builder();
+        for(int child = 0; child < 100; child++) {
+            builder.child("testchild-" + child);
+        }
+        ns.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        for(int i=0; i<2; i++) {
+            builder = ns.getRoot().builder();
+            for(int child = 0; child < splitDocCnt; child++) {
+                PropertyState binary = binaryProperty("prop", randomBytes(5 * 
1024));
+                builder.child("testchild-" + child).setProperty(binary);
+            }
+            ns.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+        }
+        timingStore.reset();
+        store.resetCounters();
+        final long start = System.currentTimeMillis();
+        ns.runBackgroundUpdateOperations();
+        int createOrUpdateCalls = store.getNumCreateOrUpdateCalls(NODES);
+        final long remoteCallMeasurement = 
timingStore.getAndResetOverallTime();
+        final long totalSplitDuration = (System.currentTimeMillis() - start);
+        final long localSplitPart = totalSplitDuration - remoteCallMeasurement;
+        LOG.info("batchSplitTest: batchSize = " + batchSize +
+                ", splitDocCnt = " + splitDocCnt +
+                ", createOrUpdateCalls = " + createOrUpdateCalls +
+                ", fixture = " + fixture.getName() +
+                ", split total ms = " + (System.currentTimeMillis() - start) +
+                " (thereof local = " + localSplitPart +
+                ", remote = " + remoteCallMeasurement + ")");
+        int expected = 2 * (splitDocCnt / batchSize)       /* 2 calls per 
batch */
+                + 2 * Math.min(1, splitDocCnt % batchSize) /* 1 additional 
pair for the last batch */
+                + 1;                                       /* 1 more for 
backgroundWrite's update to root */
+        assertTrue("batchSize = " + batchSize
+                + ", splitDocCnt = " + splitDocCnt
+                + ", expected=" + expected
+                + ", createOrUpdates=" + createOrUpdateCalls,
+                createOrUpdateCalls >= expected && createOrUpdateCalls <= 
expected + 2);
+        VersionGarbageCollector gc = ns.getVersionGarbageCollector();
+
+        int actualSplitDocGCCount = 0;
+        long timeout = ns.getClock().getTime() + 20000;
+        while(actualSplitDocGCCount < splitDocCnt && ns.getClock().getTime() < 
timeout) {
+            VersionGCStats stats = gc.gc(1, TimeUnit.MILLISECONDS);
+            actualSplitDocGCCount += stats.splitDocGCCount;
+            if (actualSplitDocGCCount != splitDocCnt) {
+                LOG.info("batchSplitTest: Expected " + splitDocCnt + ", actual 
" + actualSplitDocGCCount);
+                // advance time a bit to ensure gc does clean up the split docs
+                ns.getClock().waitUntil(ns.getClock().getTime() + 1000);
+                ns.runBackgroundUpdateOperations();
+            }
+        }
+
+        // make sure those splitDocCnt split docs are deleted
+        assertTrue("gc not as expected: expected " + splitDocCnt
+                + ", got " + actualSplitDocGCCount, splitDocCnt <= 
actualSplitDocGCCount);
+
+        mk.dispose();
+        mk = null;
+    }
+
+    private byte[] randomBytes(int num) {
+        Random random = new Random(42);
+        byte[] data = new byte[num];
+        random.nextBytes(data);
+        return data;
+    }
+
+    // TODO: from DocumentStoreStatsTest
+    // but there are various places such as RevisionsCommand, BroadcastTest 
that have similar code.
+    // we might want to move this to a new common util/helper
+    private static void enableLevel(String logName, Level level){
+        ((LoggerContext)LoggerFactory.getILoggerFactory())
+                .getLogger(logName).setLevel(level);
+    }
+}

Propchange: 
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentBatchSplitTest.java
------------------------------------------------------------------------------
    svn:eol-style = native


Reply via email to