Author: amitj
Date: Fri Sep 25 11:11:01 2015
New Revision: 1705273

URL: http://svn.apache.org/viewvc?rev=1705273&view=rev
Log:
OAK-3443: Track the start time of mark in GC

Test case for SegmentMK which simulates the condition by delaying the blob 
identification phase and adds additional blobs which should not be collected

Modified:
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCIT.java

Modified: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCIT.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCIT.java?rev=1705273&r1=1705272&r2=1705273&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCIT.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCIT.java
 Fri Sep 25 11:11:01 2015
@@ -37,18 +37,24 @@ import java.util.List;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
+import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
+import com.google.common.io.Closeables;
 import org.apache.commons.io.FileUtils;
 import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.plugins.blob.BlobReferenceRetriever;
+import org.apache.jackrabbit.oak.plugins.blob.GarbageCollectorFileState;
 import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
 import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
@@ -266,6 +272,31 @@ public class SegmentDataStoreBlobGCIT {
         assertEquals(count, candidates);
     }
     
+    @Test
+    public void gcLongRunningBlobCollection() throws Exception {
+        DataStoreState state = setUp();
+        log.info("{} Blobs added {}", state.blobsAdded.size(), 
state.blobsAdded);
+        log.info("{} Blobs should be present {}", state.blobsPresent.size(), 
state.blobsPresent);
+        
+        ThreadPoolExecutor executor = (ThreadPoolExecutor) 
Executors.newFixedThreadPool(10);
+        String repoId = null;
+        if (SharedDataStoreUtils.isShared(store.getBlobStore())) {
+            repoId = ClusterRepositoryInfo.createId(nodeStore);
+            ((SharedDataStore) store.getBlobStore()).addMetadataRecord(
+                new ByteArrayInputStream(new byte[0]),
+                REPOSITORY.getNameFromId(repoId));
+        }
+        TestGarbageCollector gc = new TestGarbageCollector(
+            new SegmentBlobReferenceRetriever(store.getTracker()),
+            (GarbageCollectableBlobStore) store.getBlobStore(), executor, 
"./target", 5, 5000, repoId);
+        gc.collectGarbage(false);
+        Set<String> existingAfterGC = iterate();
+        log.info("{} Blobs existing after gc {}", existingAfterGC.size(), 
existingAfterGC);
+        
+        assertTrue(Sets.difference(state.blobsPresent, 
existingAfterGC).isEmpty());
+        assertEquals(gc.additionalBlobs, 
Sets.symmetricDifference(state.blobsPresent, existingAfterGC));
+    }
+    
     private Set<String> gcInternal(long maxBlobGcInSecs) throws Exception {
         ThreadPoolExecutor executor = (ThreadPoolExecutor) 
Executors.newFixedThreadPool(10);
         MarkSweepGarbageCollector gc = init(maxBlobGcInSecs, executor);
@@ -318,5 +349,81 @@ public class SegmentDataStoreBlobGCIT {
         r.nextBytes(data);
         return new ByteArrayInputStream(data);
     }
+    
+    /**
+    * Waits for some time and adds additional blobs after blob referenced 
identified to simulate
+    * long running blob id collection phase.
+     */
+    class TestGarbageCollector extends MarkSweepGarbageCollector {
+        long maxLastModifiedInterval;
+        String root;
+        GarbageCollectableBlobStore blobStore;
+        Set<String> additionalBlobs;
+        
+        public TestGarbageCollector(BlobReferenceRetriever marker, 
GarbageCollectableBlobStore blobStore,
+                                    Executor executor, String root, int 
batchCount, long maxLastModifiedInterval,
+                                    @Nullable String repositoryId) throws 
IOException {
+            super(marker, blobStore, executor, root, batchCount, 
maxLastModifiedInterval, repositoryId);
+            this.root = root;
+            this.blobStore = blobStore;
+            this.maxLastModifiedInterval = maxLastModifiedInterval;
+            this.additionalBlobs = Sets.newHashSet();
+        }
+        
+        @Override
+        protected void markAndSweep(boolean markOnly) throws Exception {
+            boolean threw = true;
+            GarbageCollectorFileState fs = new GarbageCollectorFileState(root);
+            try {
+                Stopwatch sw = Stopwatch.createStarted();
+                LOG.info("Starting Test Blob garbage collection");
+                
+                // Sleep a little more than the max interval to get over the 
interval for valid blobs
+                Thread.sleep(maxLastModifiedInterval + 100);
+                LOG.info("Slept {} to make blobs old", maxLastModifiedInterval 
+ 100);
+                
+                long markStart = System.currentTimeMillis();
+                mark(fs);
+                LOG.info("Mark finished");
+                
+                additionalBlobs = createAdditional();
+                
+                if (!markOnly) {
+                    Thread.sleep(maxLastModifiedInterval + 100);
+                    LOG.info("Slept {} to make additional blobs old", 
maxLastModifiedInterval + 100);
+                    
+                    long deleteCount = sweep(fs, markStart);
+                    threw = false;
+                    
+                    LOG.info("Blob garbage collection completed in {}. Number 
of blobs deleted [{}]", sw.toString(),
+                        deleteCount, maxLastModifiedInterval);
+                }
+            } finally {
+                if (!LOG.isTraceEnabled()) {
+                    Closeables.close(fs, threw);
+                }
+            }
+        }
+        
+        public HashSet<String> createAdditional() throws Exception {
+            HashSet<String> blobSet = new HashSet<String>();
+            NodeBuilder a = nodeStore.getRoot().builder();
+            int number = 5;
+            for (int i = 0; i < number; i++) {
+                SegmentBlob b = (SegmentBlob) 
nodeStore.createBlob(randomStream(100 + i, 16516));
+                a.child("cafter" + i).setProperty("x", b);
+                Iterator<String> idIter =
+                    ((GarbageCollectableBlobStore) 
blobStore).resolveChunks(b.getBlobId());
+                while (idIter.hasNext()) {
+                    String chunk = idIter.next();
+                    blobSet.add(chunk);
+                }
+            }
+            log.info("{} Additional created {}", blobSet.size(), blobSet);
+            
+            nodeStore.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+            return blobSet;
+        }
+    }
 }
 


Reply via email to