Author: mreutegg
Date: Wed Oct 19 16:29:11 2016
New Revision: 1765676

URL: http://svn.apache.org/viewvc?rev=1765676&view=rev
Log:
OAK-4915: Interrupt online revision cleanup on documentmk

Added:
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCTest.java
   (with props)
Modified:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollector.java

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java?rev=1765676&r1=1765675&r2=1765676&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
 Wed Oct 19 16:29:11 2016
@@ -826,7 +826,7 @@ public class DocumentNodeStoreService {
         Runnable cancelGC = new Runnable() {
             @Override
             public void run() {
-                throw new UnsupportedOperationException("Cancelling revision 
garbage collection is not supported");
+                store.getVersionGarbageCollector().cancel();
             }
         };
         RevisionGC revisionGC = new RevisionGC(startGC, cancelGC, executor);

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollector.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollector.java?rev=1765676&r1=1765675&r2=1765676&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollector.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollector.java
 Wed Oct 19 16:29:11 2016
@@ -27,6 +27,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -50,6 +52,7 @@ import static com.google.common.base.Pre
 import static com.google.common.base.StandardSystemProperty.LINE_SEPARATOR;
 import static com.google.common.collect.Iterables.all;
 import static com.google.common.collect.Iterators.partition;
+import static com.google.common.util.concurrent.Atomics.newReference;
 import static java.util.Collections.singletonMap;
 import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
 import static 
org.apache.jackrabbit.oak.plugins.document.NodeDocument.MODIFIED_IN_SECS;
@@ -66,6 +69,7 @@ public class VersionGarbageCollector {
     private final DocumentStore ds;
     private final VersionGCSupport versionStore;
     private int overflowToDiskThreshold = 100000;
+    private final AtomicReference<GCJob> collector = newReference();
 
     private static final Logger log = 
LoggerFactory.getLogger(VersionGarbageCollector.class);
 
@@ -84,87 +88,32 @@ public class VersionGarbageCollector {
 
     public VersionGCStats gc(long maxRevisionAge, TimeUnit unit) throws 
IOException {
         long maxRevisionAgeInMillis = unit.toMillis(maxRevisionAge);
-        Stopwatch sw = Stopwatch.createStarted();
-        VersionGCStats stats = new VersionGCStats();
-        final long oldestRevTimeStamp = nodeStore.getClock().getTime() - 
maxRevisionAgeInMillis;
-        final RevisionVector headRevision = nodeStore.getHeadRevision();
-
-        log.info("Starting revision garbage collection. Revisions older than 
[{}] will be " +
-                "removed", Utils.timestampToString(oldestRevTimeStamp));
-
-        //Check for any registered checkpoint which prevent the GC from running
-        Revision checkpoint = 
nodeStore.getCheckpoints().getOldestRevisionToKeep();
-        if (checkpoint != null && checkpoint.getTimestamp() < 
oldestRevTimeStamp) {
-            log.info("Ignoring revision garbage collection because a valid " +
-                            "checkpoint [{}] was found, which is older than 
[{}].",
-                    checkpoint.toReadableString(),
-                    Utils.timestampToString(oldestRevTimeStamp)
-            );
-            stats.ignoredGCDueToCheckPoint = true;
-            return stats;
+        GCJob job = new GCJob(maxRevisionAgeInMillis);
+        if (collector.compareAndSet(null, job)) {
+            try {
+                return job.run();
+            } finally {
+                collector.set(null);
+            }
+        } else {
+            throw new IOException("Revision garbage collection is already 
running");
         }
+    }
 
-        collectDeletedDocuments(stats, headRevision, oldestRevTimeStamp);
-        collectSplitDocuments(stats, oldestRevTimeStamp);
-
-        sw.stop();
-        log.info("Revision garbage collection finished in {}. {}", sw, stats);
-        return stats;
+    public void cancel() {
+        GCJob job = collector.get();
+        if (job != null) {
+            job.cancel();
+        }
     }
 
     public void setOverflowToDiskThreshold(int overflowToDiskThreshold) {
         this.overflowToDiskThreshold = overflowToDiskThreshold;
     }
 
-    private void collectSplitDocuments(VersionGCStats stats, long 
oldestRevTimeStamp) {
-        stats.collectAndDeleteSplitDocs.start();
-        versionStore.deleteSplitDocuments(GC_TYPES, oldestRevTimeStamp, stats);
-        stats.collectAndDeleteSplitDocs.stop();
-    }
-
-    private void collectDeletedDocuments(VersionGCStats stats,
-                                         RevisionVector headRevision,
-                                         long oldestRevTimeStamp)
-            throws IOException {
-        int docsTraversed = 0;
-        DeletedDocsGC gc = new DeletedDocsGC(headRevision);
-        try {
-            stats.collectDeletedDocs.start();
-            Iterable<NodeDocument> itr = 
versionStore.getPossiblyDeletedDocs(oldestRevTimeStamp);
-            try {
-                for (NodeDocument doc : itr) {
-                    // Check if node is actually deleted at current revision
-                    // As node is not modified since oldestRevTimeStamp then
-                    // this node has not be revived again in past 
maxRevisionAge
-                    // So deleting it is safe
-                    docsTraversed++;
-                    if (docsTraversed % PROGRESS_BATCH_SIZE == 0){
-                        log.info("Iterated through {} documents so far. {} 
found to be deleted",
-                                docsTraversed, gc.getNumDocuments());
-                    }
-                    gc.possiblyDeleted(doc);
-                }
-            } finally {
-                Utils.closeIfCloseable(itr);
-            }
-            stats.collectDeletedDocs.stop();
-
-            if (gc.getNumDocuments() == 0){
-                return;
-            }
-
-            stats.deleteDeletedDocs.start();
-
-            gc.removeDocuments(stats);
-
-            stats.deleteDeletedDocs.stop();
-        } finally {
-            gc.close();
-        }
-    }
-
     public static class VersionGCStats {
         boolean ignoredGCDueToCheckPoint;
+        boolean canceled;
         int deletedDocGCCount;
         int splitDocGCCount;
         int intermediateSplitDocGCCount;
@@ -176,6 +125,7 @@ public class VersionGarbageCollector {
         public String toString() {
             return "VersionGCStats{" +
                     "ignoredGCDueToCheckPoint=" + ignoredGCDueToCheckPoint +
+                    ", canceled=" + canceled+
                     ", deletedDocGCCount=" + deletedDocGCCount +
                     ", splitDocGCCount=" + splitDocGCCount +
                     ", intermediateSplitDocGCCount=" + 
intermediateSplitDocGCCount +
@@ -186,19 +136,122 @@ public class VersionGarbageCollector {
         }
     }
 
+    private class GCJob {
+
+        private final long maxRevisionAgeMillis;
+        private AtomicBoolean cancel = new AtomicBoolean();
+
+        GCJob(long maxRevisionAgeMillis) {
+            this.maxRevisionAgeMillis = maxRevisionAgeMillis;
+        }
+
+        VersionGCStats run() throws IOException {
+            return gc(maxRevisionAgeMillis);
+        }
+
+        void cancel() {
+            log.info("Canceling revision garbage collection.");
+            cancel.set(true);
+        }
+
+        private VersionGCStats gc(long maxRevisionAgeInMillis) throws 
IOException {
+            Stopwatch sw = Stopwatch.createStarted();
+            VersionGCStats stats = new VersionGCStats();
+            final long oldestRevTimeStamp = nodeStore.getClock().getTime() - 
maxRevisionAgeInMillis;
+            final RevisionVector headRevision = nodeStore.getHeadRevision();
+
+            log.info("Starting revision garbage collection. Revisions older 
than [{}] will be " +
+                    "removed", Utils.timestampToString(oldestRevTimeStamp));
+
+            //Check for any registered checkpoint which prevent the GC from 
running
+            Revision checkpoint = 
nodeStore.getCheckpoints().getOldestRevisionToKeep();
+            if (checkpoint != null && checkpoint.getTimestamp() < 
oldestRevTimeStamp) {
+                log.info("Ignoring revision garbage collection because a valid 
" +
+                                "checkpoint [{}] was found, which is older 
than [{}].",
+                        checkpoint.toReadableString(),
+                        Utils.timestampToString(oldestRevTimeStamp)
+                );
+                stats.ignoredGCDueToCheckPoint = true;
+                return stats;
+            }
+
+            collectDeletedDocuments(stats, headRevision, oldestRevTimeStamp);
+            collectSplitDocuments(stats, oldestRevTimeStamp);
+
+            sw.stop();
+            stats.canceled = cancel.get();
+            log.info("Revision garbage collection finished in {}. {}", sw, 
stats);
+            return stats;
+        }
+
+        private void collectSplitDocuments(VersionGCStats stats, long 
oldestRevTimeStamp) {
+            stats.collectAndDeleteSplitDocs.start();
+            versionStore.deleteSplitDocuments(GC_TYPES, oldestRevTimeStamp, 
stats);
+            stats.collectAndDeleteSplitDocs.stop();
+        }
+
+        private void collectDeletedDocuments(VersionGCStats stats,
+                                             RevisionVector headRevision,
+                                             long oldestRevTimeStamp)
+                throws IOException {
+            int docsTraversed = 0;
+            DeletedDocsGC gc = new DeletedDocsGC(headRevision, cancel);
+            try {
+                stats.collectDeletedDocs.start();
+                Iterable<NodeDocument> itr = 
versionStore.getPossiblyDeletedDocs(oldestRevTimeStamp);
+                try {
+                    for (NodeDocument doc : itr) {
+                        // continue with GC?
+                        if (cancel.get()) {
+                            break;
+                        }
+                        // Check if node is actually deleted at current 
revision
+                        // As node is not modified since oldestRevTimeStamp 
then
+                        // this node has not be revived again in past 
maxRevisionAge
+                        // So deleting it is safe
+                        docsTraversed++;
+                        if (docsTraversed % PROGRESS_BATCH_SIZE == 0){
+                            log.info("Iterated through {} documents so far. {} 
found to be deleted",
+                                    docsTraversed, gc.getNumDocuments());
+                        }
+                        gc.possiblyDeleted(doc);
+                    }
+                } finally {
+                    Utils.closeIfCloseable(itr);
+                }
+                stats.collectDeletedDocs.stop();
+
+                if (gc.getNumDocuments() == 0){
+                    return;
+                }
+
+                stats.deleteDeletedDocs.start();
+
+                gc.removeDocuments(stats);
+
+                stats.deleteDeletedDocs.stop();
+            } finally {
+                gc.close();
+            }
+        }
+    }
+
     /**
      * A helper class to remove document for deleted nodes.
      */
     private class DeletedDocsGC implements Closeable {
 
         private final RevisionVector headRevision;
+        private final AtomicBoolean cancel;
         private final StringSort docIdsToDelete = newStringSort();
         private final StringSort prevDocIdsToDelete = newStringSort();
         private final Set<String> exclude = Sets.newHashSet();
         private boolean sorted = false;
 
-        public DeletedDocsGC(@Nonnull RevisionVector headRevision) {
+        public DeletedDocsGC(@Nonnull RevisionVector headRevision,
+                             @Nonnull AtomicBoolean cancel) {
             this.headRevision = checkNotNull(headRevision);
+            this.cancel = checkNotNull(cancel);
         }
 
         /**
@@ -341,7 +394,7 @@ public class VersionGarbageCollector {
             int deletedCount = 0;
             int lastLoggedCount = 0;
             int recreatedCount = 0;
-            while (idListItr.hasNext()) {
+            while (idListItr.hasNext() && !cancel.get()) {
                 Map<String, Map<Key, Condition>> deletionBatch = 
Maps.newLinkedHashMap();
                 for (String s : idListItr.next()) {
                     int idx = s.lastIndexOf('/');
@@ -395,7 +448,7 @@ public class VersionGarbageCollector {
             int lastLoggedCount = 0;
             Iterator<List<String>> idListItr =
                     partition(getPrevDocIdsToDelete(), DELETE_BATCH_SIZE);
-            while (idListItr.hasNext()) {
+            while (idListItr.hasNext() && !cancel.get()) {
                 List<String> deletionBatch = idListItr.next();
                 deletedCount += deletionBatch.size();
 

Added: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCTest.java?rev=1765676&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCTest.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCTest.java
 Wed Oct 19 16:29:11 2016
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nonnull;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import 
org.apache.jackrabbit.oak.plugins.document.VersionGarbageCollector.VersionGCStats;
+import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import static java.util.concurrent.TimeUnit.HOURS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class VersionGCTest {
+
+    @Rule
+    public final DocumentMKBuilderProvider builderProvider = new 
DocumentMKBuilderProvider();
+
+    private ExecutorService execService;
+
+    private TestStore store = new TestStore();
+
+    private DocumentNodeStore ns;
+
+    private VersionGarbageCollector gc;
+
+    @Before
+    public void setUp() throws Exception {
+        execService = Executors.newCachedThreadPool();
+        Clock clock = new Clock.Virtual();
+        clock.waitUntil(System.currentTimeMillis());
+        Revision.setClock(clock);
+        ns = builderProvider.newBuilder()
+                .clock(clock)
+                .setLeaseCheck(false)
+                .setDocumentStore(store)
+                .setAsyncDelay(0)
+                .getNodeStore();
+        // create test content
+        createNode("foo");
+        removeNode("foo");
+
+        // wait one hour
+        clock.waitUntil(clock.getTime() + HOURS.toMillis(1));
+
+        gc = ns.getVersionGarbageCollector();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        Revision.resetClockToDefault();
+        execService.shutdown();
+        execService.awaitTermination(1, MINUTES);
+    }
+
+    @Test
+    public void failParallelGC() throws Exception {
+        // block gc call
+        store.semaphore.acquireUninterruptibly();
+        Future<VersionGCStats> stats = gc();
+        boolean gcBlocked = false;
+        for (int i = 0; i < 10; i ++) {
+            if (store.semaphore.hasQueuedThreads()) {
+                gcBlocked = true;
+                break;
+            }
+            Thread.sleep(100);
+        }
+        assertTrue(gcBlocked);
+        // now try to trigger another GC
+        try {
+            gc.gc(30, TimeUnit.MINUTES);
+            fail("must throw an IOException");
+        } catch (IOException e) {
+            assertTrue(e.getMessage().contains("already running"));
+        } finally {
+            store.semaphore.release();
+            stats.get();
+        }
+    }
+
+    @Test
+    public void cancel() throws Exception {
+        // block gc call
+        store.semaphore.acquireUninterruptibly();
+        Future<VersionGCStats> stats = gc();
+        boolean gcBlocked = false;
+        for (int i = 0; i < 10; i ++) {
+            if (store.semaphore.hasQueuedThreads()) {
+                gcBlocked = true;
+                break;
+            }
+            Thread.sleep(100);
+        }
+        assertTrue(gcBlocked);
+        // now cancel the GC
+        gc.cancel();
+        store.semaphore.release();
+        assertTrue(stats.get().canceled);
+    }
+
+    private Future<VersionGCStats> gc() {
+        // run gc in a separate thread
+        return execService.submit(new Callable<VersionGCStats>() {
+            @Override
+            public VersionGCStats call() throws Exception {
+                return gc.gc(30, TimeUnit.MINUTES);
+            }
+        });
+    }
+
+    private void removeNode(String name) throws CommitFailedException {
+        NodeBuilder builder = ns.getRoot().builder();
+        builder.child(name).remove();
+        merge(ns, builder);
+    }
+
+    private void createNode(String name) throws CommitFailedException {
+        NodeBuilder builder = ns.getRoot().builder();
+        builder.child(name);
+        merge(ns, builder);
+    }
+
+    private void merge(DocumentNodeStore store, NodeBuilder builder)
+            throws CommitFailedException {
+        store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+    }
+
+    private class TestStore extends MemoryDocumentStore {
+
+        Semaphore semaphore = new Semaphore(1);
+
+        @Nonnull
+        @Override
+        public <T extends Document> List<T> query(Collection<T> collection,
+                                                  String fromKey,
+                                                  String toKey,
+                                                  String indexedProperty,
+                                                  long startValue,
+                                                  int limit) {
+            semaphore.acquireUninterruptibly();
+            try {
+                return super.query(collection, fromKey, toKey, 
indexedProperty, startValue, limit);
+            } finally {
+                semaphore.release();
+            }
+        }
+    }
+
+}

Propchange: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCTest.java
------------------------------------------------------------------------------
    svn:eol-style = native


Reply via email to