Author: amitj
Date: Fri Sep 25 11:11:01 2015
New Revision: 1705273
URL: http://svn.apache.org/viewvc?rev=1705273&view=rev
Log:
OAK-3443: Track the start time of mark in GC
Test case for SegmentMK which simulates the condition by delaying the blob
identification phase and adds additional blobs which should not be collected
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCIT.java
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCIT.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCIT.java?rev=1705273&r1=1705272&r2=1705273&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCIT.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCIT.java
Fri Sep 25 11:11:01 2015
@@ -37,18 +37,24 @@ import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import com.google.common.io.Closeables;
import org.apache.commons.io.FileUtils;
import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.plugins.blob.BlobReferenceRetriever;
+import org.apache.jackrabbit.oak.plugins.blob.GarbageCollectorFileState;
import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
@@ -266,6 +272,31 @@ public class SegmentDataStoreBlobGCIT {
assertEquals(count, candidates);
}
+ @Test
+ public void gcLongRunningBlobCollection() throws Exception {
+ DataStoreState state = setUp();
+ log.info("{} Blobs added {}", state.blobsAdded.size(),
state.blobsAdded);
+ log.info("{} Blobs should be present {}", state.blobsPresent.size(),
state.blobsPresent);
+
+ ThreadPoolExecutor executor = (ThreadPoolExecutor)
Executors.newFixedThreadPool(10);
+ String repoId = null;
+ if (SharedDataStoreUtils.isShared(store.getBlobStore())) {
+ repoId = ClusterRepositoryInfo.createId(nodeStore);
+ ((SharedDataStore) store.getBlobStore()).addMetadataRecord(
+ new ByteArrayInputStream(new byte[0]),
+ REPOSITORY.getNameFromId(repoId));
+ }
+ TestGarbageCollector gc = new TestGarbageCollector(
+ new SegmentBlobReferenceRetriever(store.getTracker()),
+ (GarbageCollectableBlobStore) store.getBlobStore(), executor,
"./target", 5, 5000, repoId);
+ gc.collectGarbage(false);
+ Set<String> existingAfterGC = iterate();
+ log.info("{} Blobs existing after gc {}", existingAfterGC.size(),
existingAfterGC);
+
+ assertTrue(Sets.difference(state.blobsPresent,
existingAfterGC).isEmpty());
+ assertEquals(gc.additionalBlobs,
Sets.symmetricDifference(state.blobsPresent, existingAfterGC));
+ }
+
private Set<String> gcInternal(long maxBlobGcInSecs) throws Exception {
ThreadPoolExecutor executor = (ThreadPoolExecutor)
Executors.newFixedThreadPool(10);
MarkSweepGarbageCollector gc = init(maxBlobGcInSecs, executor);
@@ -318,5 +349,81 @@ public class SegmentDataStoreBlobGCIT {
r.nextBytes(data);
return new ByteArrayInputStream(data);
}
+
+ /**
+ * Waits for some time and adds additional blobs after blob referenced
identified to simulate
+ * long running blob id collection phase.
+ */
+ class TestGarbageCollector extends MarkSweepGarbageCollector {
+ long maxLastModifiedInterval;
+ String root;
+ GarbageCollectableBlobStore blobStore;
+ Set<String> additionalBlobs;
+
+ public TestGarbageCollector(BlobReferenceRetriever marker,
GarbageCollectableBlobStore blobStore,
+ Executor executor, String root, int
batchCount, long maxLastModifiedInterval,
+ @Nullable String repositoryId) throws
IOException {
+ super(marker, blobStore, executor, root, batchCount,
maxLastModifiedInterval, repositoryId);
+ this.root = root;
+ this.blobStore = blobStore;
+ this.maxLastModifiedInterval = maxLastModifiedInterval;
+ this.additionalBlobs = Sets.newHashSet();
+ }
+
+ @Override
+ protected void markAndSweep(boolean markOnly) throws Exception {
+ boolean threw = true;
+ GarbageCollectorFileState fs = new GarbageCollectorFileState(root);
+ try {
+ Stopwatch sw = Stopwatch.createStarted();
+ LOG.info("Starting Test Blob garbage collection");
+
+ // Sleep a little more than the max interval to get over the
interval for valid blobs
+ Thread.sleep(maxLastModifiedInterval + 100);
+ LOG.info("Slept {} to make blobs old", maxLastModifiedInterval
+ 100);
+
+ long markStart = System.currentTimeMillis();
+ mark(fs);
+ LOG.info("Mark finished");
+
+ additionalBlobs = createAdditional();
+
+ if (!markOnly) {
+ Thread.sleep(maxLastModifiedInterval + 100);
+ LOG.info("Slept {} to make additional blobs old",
maxLastModifiedInterval + 100);
+
+ long deleteCount = sweep(fs, markStart);
+ threw = false;
+
+ LOG.info("Blob garbage collection completed in {}. Number
of blobs deleted [{}]", sw.toString(),
+ deleteCount, maxLastModifiedInterval);
+ }
+ } finally {
+ if (!LOG.isTraceEnabled()) {
+ Closeables.close(fs, threw);
+ }
+ }
+ }
+
+ public HashSet<String> createAdditional() throws Exception {
+ HashSet<String> blobSet = new HashSet<String>();
+ NodeBuilder a = nodeStore.getRoot().builder();
+ int number = 5;
+ for (int i = 0; i < number; i++) {
+ SegmentBlob b = (SegmentBlob)
nodeStore.createBlob(randomStream(100 + i, 16516));
+ a.child("cafter" + i).setProperty("x", b);
+ Iterator<String> idIter =
+ ((GarbageCollectableBlobStore)
blobStore).resolveChunks(b.getBlobId());
+ while (idIter.hasNext()) {
+ String chunk = idIter.next();
+ blobSet.add(chunk);
+ }
+ }
+ log.info("{} Additional created {}", blobSet.size(), blobSet);
+
+ nodeStore.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ return blobSet;
+ }
+ }
}