Author: mreutegg
Date: Wed Oct 19 16:29:11 2016
New Revision: 1765676
URL: http://svn.apache.org/viewvc?rev=1765676&view=rev
Log:
OAK-4915: Interrupt online revision cleanup on documentmk
Added:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCTest.java
(with props)
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollector.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java?rev=1765676&r1=1765675&r2=1765676&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
Wed Oct 19 16:29:11 2016
@@ -826,7 +826,7 @@ public class DocumentNodeStoreService {
Runnable cancelGC = new Runnable() {
@Override
public void run() {
- throw new UnsupportedOperationException("Cancelling revision
garbage collection is not supported");
+ store.getVersionGarbageCollector().cancel();
}
};
RevisionGC revisionGC = new RevisionGC(startGC, cancelGC, executor);
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollector.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollector.java?rev=1765676&r1=1765675&r2=1765676&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollector.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollector.java
Wed Oct 19 16:29:11 2016
@@ -27,6 +27,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -50,6 +52,7 @@ import static com.google.common.base.Pre
import static com.google.common.base.StandardSystemProperty.LINE_SEPARATOR;
import static com.google.common.collect.Iterables.all;
import static com.google.common.collect.Iterators.partition;
+import static com.google.common.util.concurrent.Atomics.newReference;
import static java.util.Collections.singletonMap;
import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
import static
org.apache.jackrabbit.oak.plugins.document.NodeDocument.MODIFIED_IN_SECS;
@@ -66,6 +69,7 @@ public class VersionGarbageCollector {
private final DocumentStore ds;
private final VersionGCSupport versionStore;
private int overflowToDiskThreshold = 100000;
+ private final AtomicReference<GCJob> collector = newReference();
private static final Logger log =
LoggerFactory.getLogger(VersionGarbageCollector.class);
@@ -84,87 +88,32 @@ public class VersionGarbageCollector {
public VersionGCStats gc(long maxRevisionAge, TimeUnit unit) throws
IOException {
long maxRevisionAgeInMillis = unit.toMillis(maxRevisionAge);
- Stopwatch sw = Stopwatch.createStarted();
- VersionGCStats stats = new VersionGCStats();
- final long oldestRevTimeStamp = nodeStore.getClock().getTime() -
maxRevisionAgeInMillis;
- final RevisionVector headRevision = nodeStore.getHeadRevision();
-
- log.info("Starting revision garbage collection. Revisions older than
[{}] will be " +
- "removed", Utils.timestampToString(oldestRevTimeStamp));
-
- //Check for any registered checkpoint which prevent the GC from running
- Revision checkpoint =
nodeStore.getCheckpoints().getOldestRevisionToKeep();
- if (checkpoint != null && checkpoint.getTimestamp() <
oldestRevTimeStamp) {
- log.info("Ignoring revision garbage collection because a valid " +
- "checkpoint [{}] was found, which is older than
[{}].",
- checkpoint.toReadableString(),
- Utils.timestampToString(oldestRevTimeStamp)
- );
- stats.ignoredGCDueToCheckPoint = true;
- return stats;
+ GCJob job = new GCJob(maxRevisionAgeInMillis);
+ if (collector.compareAndSet(null, job)) {
+ try {
+ return job.run();
+ } finally {
+ collector.set(null);
+ }
+ } else {
+ throw new IOException("Revision garbage collection is already
running");
}
+ }
- collectDeletedDocuments(stats, headRevision, oldestRevTimeStamp);
- collectSplitDocuments(stats, oldestRevTimeStamp);
-
- sw.stop();
- log.info("Revision garbage collection finished in {}. {}", sw, stats);
- return stats;
+ public void cancel() {
+ GCJob job = collector.get();
+ if (job != null) {
+ job.cancel();
+ }
}
public void setOverflowToDiskThreshold(int overflowToDiskThreshold) {
this.overflowToDiskThreshold = overflowToDiskThreshold;
}
- private void collectSplitDocuments(VersionGCStats stats, long
oldestRevTimeStamp) {
- stats.collectAndDeleteSplitDocs.start();
- versionStore.deleteSplitDocuments(GC_TYPES, oldestRevTimeStamp, stats);
- stats.collectAndDeleteSplitDocs.stop();
- }
-
- private void collectDeletedDocuments(VersionGCStats stats,
- RevisionVector headRevision,
- long oldestRevTimeStamp)
- throws IOException {
- int docsTraversed = 0;
- DeletedDocsGC gc = new DeletedDocsGC(headRevision);
- try {
- stats.collectDeletedDocs.start();
- Iterable<NodeDocument> itr =
versionStore.getPossiblyDeletedDocs(oldestRevTimeStamp);
- try {
- for (NodeDocument doc : itr) {
- // Check if node is actually deleted at current revision
- // As node is not modified since oldestRevTimeStamp then
- // this node has not be revived again in past
maxRevisionAge
- // So deleting it is safe
- docsTraversed++;
- if (docsTraversed % PROGRESS_BATCH_SIZE == 0){
- log.info("Iterated through {} documents so far. {}
found to be deleted",
- docsTraversed, gc.getNumDocuments());
- }
- gc.possiblyDeleted(doc);
- }
- } finally {
- Utils.closeIfCloseable(itr);
- }
- stats.collectDeletedDocs.stop();
-
- if (gc.getNumDocuments() == 0){
- return;
- }
-
- stats.deleteDeletedDocs.start();
-
- gc.removeDocuments(stats);
-
- stats.deleteDeletedDocs.stop();
- } finally {
- gc.close();
- }
- }
-
public static class VersionGCStats {
boolean ignoredGCDueToCheckPoint;
+ boolean canceled;
int deletedDocGCCount;
int splitDocGCCount;
int intermediateSplitDocGCCount;
@@ -176,6 +125,7 @@ public class VersionGarbageCollector {
public String toString() {
return "VersionGCStats{" +
"ignoredGCDueToCheckPoint=" + ignoredGCDueToCheckPoint +
+ ", canceled=" + canceled+
", deletedDocGCCount=" + deletedDocGCCount +
", splitDocGCCount=" + splitDocGCCount +
", intermediateSplitDocGCCount=" +
intermediateSplitDocGCCount +
@@ -186,19 +136,122 @@ public class VersionGarbageCollector {
}
}
+ private class GCJob {
+
+ private final long maxRevisionAgeMillis;
+ private AtomicBoolean cancel = new AtomicBoolean();
+
+ GCJob(long maxRevisionAgeMillis) {
+ this.maxRevisionAgeMillis = maxRevisionAgeMillis;
+ }
+
+ VersionGCStats run() throws IOException {
+ return gc(maxRevisionAgeMillis);
+ }
+
+ void cancel() {
+ log.info("Canceling revision garbage collection.");
+ cancel.set(true);
+ }
+
+ private VersionGCStats gc(long maxRevisionAgeInMillis) throws
IOException {
+ Stopwatch sw = Stopwatch.createStarted();
+ VersionGCStats stats = new VersionGCStats();
+ final long oldestRevTimeStamp = nodeStore.getClock().getTime() -
maxRevisionAgeInMillis;
+ final RevisionVector headRevision = nodeStore.getHeadRevision();
+
+ log.info("Starting revision garbage collection. Revisions older
than [{}] will be " +
+ "removed", Utils.timestampToString(oldestRevTimeStamp));
+
+ //Check for any registered checkpoint which prevent the GC from
running
+ Revision checkpoint =
nodeStore.getCheckpoints().getOldestRevisionToKeep();
+ if (checkpoint != null && checkpoint.getTimestamp() <
oldestRevTimeStamp) {
+ log.info("Ignoring revision garbage collection because a valid
" +
+ "checkpoint [{}] was found, which is older
than [{}].",
+ checkpoint.toReadableString(),
+ Utils.timestampToString(oldestRevTimeStamp)
+ );
+ stats.ignoredGCDueToCheckPoint = true;
+ return stats;
+ }
+
+ collectDeletedDocuments(stats, headRevision, oldestRevTimeStamp);
+ collectSplitDocuments(stats, oldestRevTimeStamp);
+
+ sw.stop();
+ stats.canceled = cancel.get();
+ log.info("Revision garbage collection finished in {}. {}", sw,
stats);
+ return stats;
+ }
+
+ private void collectSplitDocuments(VersionGCStats stats, long
oldestRevTimeStamp) {
+ stats.collectAndDeleteSplitDocs.start();
+ versionStore.deleteSplitDocuments(GC_TYPES, oldestRevTimeStamp,
stats);
+ stats.collectAndDeleteSplitDocs.stop();
+ }
+
+ private void collectDeletedDocuments(VersionGCStats stats,
+ RevisionVector headRevision,
+ long oldestRevTimeStamp)
+ throws IOException {
+ int docsTraversed = 0;
+ DeletedDocsGC gc = new DeletedDocsGC(headRevision, cancel);
+ try {
+ stats.collectDeletedDocs.start();
+ Iterable<NodeDocument> itr =
versionStore.getPossiblyDeletedDocs(oldestRevTimeStamp);
+ try {
+ for (NodeDocument doc : itr) {
+ // continue with GC?
+ if (cancel.get()) {
+ break;
+ }
+ // Check if node is actually deleted at current
revision
+ // As node is not modified since oldestRevTimeStamp
then
+ // this node has not be revived again in past
maxRevisionAge
+ // So deleting it is safe
+ docsTraversed++;
+ if (docsTraversed % PROGRESS_BATCH_SIZE == 0){
+ log.info("Iterated through {} documents so far. {}
found to be deleted",
+ docsTraversed, gc.getNumDocuments());
+ }
+ gc.possiblyDeleted(doc);
+ }
+ } finally {
+ Utils.closeIfCloseable(itr);
+ }
+ stats.collectDeletedDocs.stop();
+
+ if (gc.getNumDocuments() == 0){
+ return;
+ }
+
+ stats.deleteDeletedDocs.start();
+
+ gc.removeDocuments(stats);
+
+ stats.deleteDeletedDocs.stop();
+ } finally {
+ gc.close();
+ }
+ }
+ }
+
/**
* A helper class to remove document for deleted nodes.
*/
private class DeletedDocsGC implements Closeable {
private final RevisionVector headRevision;
+ private final AtomicBoolean cancel;
private final StringSort docIdsToDelete = newStringSort();
private final StringSort prevDocIdsToDelete = newStringSort();
private final Set<String> exclude = Sets.newHashSet();
private boolean sorted = false;
- public DeletedDocsGC(@Nonnull RevisionVector headRevision) {
+ public DeletedDocsGC(@Nonnull RevisionVector headRevision,
+ @Nonnull AtomicBoolean cancel) {
this.headRevision = checkNotNull(headRevision);
+ this.cancel = checkNotNull(cancel);
}
/**
@@ -341,7 +394,7 @@ public class VersionGarbageCollector {
int deletedCount = 0;
int lastLoggedCount = 0;
int recreatedCount = 0;
- while (idListItr.hasNext()) {
+ while (idListItr.hasNext() && !cancel.get()) {
Map<String, Map<Key, Condition>> deletionBatch =
Maps.newLinkedHashMap();
for (String s : idListItr.next()) {
int idx = s.lastIndexOf('/');
@@ -395,7 +448,7 @@ public class VersionGarbageCollector {
int lastLoggedCount = 0;
Iterator<List<String>> idListItr =
partition(getPrevDocIdsToDelete(), DELETE_BATCH_SIZE);
- while (idListItr.hasNext()) {
+ while (idListItr.hasNext() && !cancel.get()) {
List<String> deletionBatch = idListItr.next();
deletedCount += deletionBatch.size();
Added:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCTest.java?rev=1765676&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCTest.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCTest.java
Wed Oct 19 16:29:11 2016
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nonnull;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import
org.apache.jackrabbit.oak.plugins.document.VersionGarbageCollector.VersionGCStats;
+import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import static java.util.concurrent.TimeUnit.HOURS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class VersionGCTest {
+
+ @Rule
+ public final DocumentMKBuilderProvider builderProvider = new
DocumentMKBuilderProvider();
+
+ private ExecutorService execService;
+
+ private TestStore store = new TestStore();
+
+ private DocumentNodeStore ns;
+
+ private VersionGarbageCollector gc;
+
+ @Before
+ public void setUp() throws Exception {
+ execService = Executors.newCachedThreadPool();
+ Clock clock = new Clock.Virtual();
+ clock.waitUntil(System.currentTimeMillis());
+ Revision.setClock(clock);
+ ns = builderProvider.newBuilder()
+ .clock(clock)
+ .setLeaseCheck(false)
+ .setDocumentStore(store)
+ .setAsyncDelay(0)
+ .getNodeStore();
+ // create test content
+ createNode("foo");
+ removeNode("foo");
+
+ // wait one hour
+ clock.waitUntil(clock.getTime() + HOURS.toMillis(1));
+
+ gc = ns.getVersionGarbageCollector();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ Revision.resetClockToDefault();
+ execService.shutdown();
+ execService.awaitTermination(1, MINUTES);
+ }
+
+ @Test
+ public void failParallelGC() throws Exception {
+ // block gc call
+ store.semaphore.acquireUninterruptibly();
+ Future<VersionGCStats> stats = gc();
+ boolean gcBlocked = false;
+ for (int i = 0; i < 10; i ++) {
+ if (store.semaphore.hasQueuedThreads()) {
+ gcBlocked = true;
+ break;
+ }
+ Thread.sleep(100);
+ }
+ assertTrue(gcBlocked);
+ // now try to trigger another GC
+ try {
+ gc.gc(30, TimeUnit.MINUTES);
+ fail("must throw an IOException");
+ } catch (IOException e) {
+ assertTrue(e.getMessage().contains("already running"));
+ } finally {
+ store.semaphore.release();
+ stats.get();
+ }
+ }
+
+ @Test
+ public void cancel() throws Exception {
+ // block gc call
+ store.semaphore.acquireUninterruptibly();
+ Future<VersionGCStats> stats = gc();
+ boolean gcBlocked = false;
+ for (int i = 0; i < 10; i ++) {
+ if (store.semaphore.hasQueuedThreads()) {
+ gcBlocked = true;
+ break;
+ }
+ Thread.sleep(100);
+ }
+ assertTrue(gcBlocked);
+ // now cancel the GC
+ gc.cancel();
+ store.semaphore.release();
+ assertTrue(stats.get().canceled);
+ }
+
+ private Future<VersionGCStats> gc() {
+ // run gc in a separate thread
+ return execService.submit(new Callable<VersionGCStats>() {
+ @Override
+ public VersionGCStats call() throws Exception {
+ return gc.gc(30, TimeUnit.MINUTES);
+ }
+ });
+ }
+
+ private void removeNode(String name) throws CommitFailedException {
+ NodeBuilder builder = ns.getRoot().builder();
+ builder.child(name).remove();
+ merge(ns, builder);
+ }
+
+ private void createNode(String name) throws CommitFailedException {
+ NodeBuilder builder = ns.getRoot().builder();
+ builder.child(name);
+ merge(ns, builder);
+ }
+
+ private void merge(DocumentNodeStore store, NodeBuilder builder)
+ throws CommitFailedException {
+ store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ }
+
+ private class TestStore extends MemoryDocumentStore {
+
+ Semaphore semaphore = new Semaphore(1);
+
+ @Nonnull
+ @Override
+ public <T extends Document> List<T> query(Collection<T> collection,
+ String fromKey,
+ String toKey,
+ String indexedProperty,
+ long startValue,
+ int limit) {
+ semaphore.acquireUninterruptibly();
+ try {
+ return super.query(collection, fromKey, toKey,
indexedProperty, startValue, limit);
+ } finally {
+ semaphore.release();
+ }
+ }
+ }
+
+}
Propchange:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCTest.java
------------------------------------------------------------------------------
svn:eol-style = native