Author: mduerig
Date: Fri Mar 28 13:30:06 2014
New Revision: 1582748

URL: http://svn.apache.org/r1582748
Log:
OAK-1582: ClassCastException in MarkSweepGarbageCollector#init()
Slightly edited patch from Amit Jain

Added:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobReferenceRetriever.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/ReferenceCollector.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentBlobReferenceRetriever.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBlobReferenceRetriever.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java
Removed:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/ReferenceCollector.java
Modified:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java

Added: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobReferenceRetriever.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobReferenceRetriever.java?rev=1582748&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobReferenceRetriever.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobReferenceRetriever.java
 Fri Mar 28 13:30:06 2014
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+
+/**
+ * Interface to abstract out the low-level details of retrieving blob 
references from different
+ * {@link NodeStore}
+ */
+public interface BlobReferenceRetriever {
+
+    /**
+     * Collect references.
+     * 
+     * @param collector the collector to collect all references
+     * @return the references
+     * @throws Exception the exception
+     */
+    void getReferences(ReferenceCollector collector) throws Exception;
+}
+

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java?rev=1582748&r1=1582747&r2=1582748&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
 Fri Mar 28 13:30:06 2014
@@ -21,7 +21,6 @@ import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.util.ArrayDeque;
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
@@ -43,14 +42,10 @@ import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.io.Files;
-
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.LineIterator;
-import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
 import org.apache.jackrabbit.oak.commons.IOUtils;
-import org.apache.jackrabbit.oak.api.Blob;
-import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
-import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -76,8 +71,8 @@ public class MarkSweepGarbageCollector i
 
     public static final String SWEEPING = "Running-Sweeping";
 
-    /** The max last modified time of blobs to consider for garbage 
collection. */
-    private long maxLastModifiedTime;
+    /** The last modified time before current time of blobs to consider for 
garbage collection. */
+    private long maxLastModifiedInterval = TimeUnit.HOURS.toMillis(24);
 
     /** Run concurrently when possible. */
     private boolean runConcurrently = true;
@@ -85,8 +80,11 @@ public class MarkSweepGarbageCollector i
     /** The number of sweeper threads to use. */
     private int numSweepers = 1;
 
-    /** The node store. */
-    private DocumentNodeStore nodeStore;
+    /** The blob store to be garbage collected. */
+    private GarbageCollectableBlobStore blobStore;
+
+    /** Helper class to mark blob references which **/
+    private BlobReferenceRetriever marker;
     
     /** The garbage collector file state */
     private GarbageCollectorFileState fs;
@@ -98,27 +96,27 @@ public class MarkSweepGarbageCollector i
     private int batchCount = DEFAULT_BATCH_COUNT;
 
     /** Flag to indicate whether to run in a debug mode **/
-    private boolean debugMode = Boolean.getBoolean("debugModeGC") | 
LOG.isDebugEnabled();
+    private final boolean debugMode = Boolean.getBoolean("debugModeGC") || 
LOG.isDebugEnabled();
 
     /** Flag to indicate the state of the gc **/
     private String state;
 
     /**
-     * Gets the max last modified time considered for garbage collection.
+     * Gets the max last modified interval considered for garbage collection.
      * 
-     * @return the max last modified time
+     * @return the max last modified interval
      */
-    protected long getMaxLastModifiedTime() {
-        return maxLastModifiedTime;
+    protected long getMaxLastModifiedInterval() {
+        return maxLastModifiedInterval;
     }
 
     /**
-     * Sets the max last modified time considered for garbage collection.
+     * Sets the max last modified interval considered for garbage collection.
      * 
-     * @param maxLastModifiedTime the new max last modified time
+     * @param maxLastModifiedInterval the new max last modified interval
      */
-    protected void setMaxLastModifiedTime(long maxLastModifiedTime) {
-        this.maxLastModifiedTime = maxLastModifiedTime;
+    protected void setMaxLastModifiedInterval(long maxLastModifiedInterval) {
+        this.maxLastModifiedInterval = maxLastModifiedInterval;
     }
     
     /**
@@ -167,41 +165,45 @@ public class MarkSweepGarbageCollector i
     }
 
     /**
-     * @param nodeStore the node store
+     * @param marker
+     * @param blobStore
      * @param root the root
      * @param batchCount the batch count
      * @param runBackendConcurrently - run the backend iterate concurrently
      * @param maxSweeperThreads the max sweeper threads
-     * @param maxLastModifiedTime the max last modified time
+     * @param maxLastModifiedInterval
      * @throws IOException Signals that an I/O exception has occurred.
      */
     public void init(
-            NodeStore nodeStore,
+            BlobReferenceRetriever marker,
+            GarbageCollectableBlobStore blobStore,
             String root,
             int batchCount,
             boolean runBackendConcurrently,
             int maxSweeperThreads,
-            long maxLastModifiedTime)
+            long maxLastModifiedInterval)
             throws IOException {
         this.batchCount = batchCount;
         this.root = root;
         this.runConcurrently = runBackendConcurrently;
         this.numSweepers = maxSweeperThreads;
-        this.maxLastModifiedTime = maxLastModifiedTime;        
-        init(nodeStore);
+        this.maxLastModifiedInterval = maxLastModifiedInterval;
+        init(marker, blobStore);
     }
 
     /**
      * Instantiates a new blob garbage collector.
      * 
-     * @param nodeStore
-     *            the node store
-     * @throws IOException
-     *             Signals that an I/O exception has occurred.
+     * @param marker
+     * @param blobStore
+     * @throws IOException Signals that an I/O exception has occurred.
      */
-    public void init(NodeStore nodeStore) throws IOException {
+    public void init(BlobReferenceRetriever marker, 
GarbageCollectableBlobStore blobStore)
+            throws IOException {
         Preconditions.checkState(!Strings.isNullOrEmpty(root));
-        this.nodeStore = (DocumentNodeStore) nodeStore;
+
+        this.blobStore = blobStore;
+        this.marker = marker;
         fs = new GarbageCollectorFileState(root);
     }
 
@@ -216,12 +218,11 @@ public class MarkSweepGarbageCollector i
      * @throws Exception
      *             the exception
      */
-    protected void markAndSweep() throws Exception {
+    public void markAndSweep() throws Exception {
         try {
             LOG.debug("Starting garbage collector");
 
             mark();
-            difference();
             sweep();
 
             LOG.debug("garbage collector finished");
@@ -237,7 +238,7 @@ public class MarkSweepGarbageCollector i
      * @throws Exception
      *             the exception
      */
-    protected void mark() throws Exception {
+    public void mark() throws Exception {
         state = MARKING;
         LOG.debug("Starting mark phase of the garbage collector");
 
@@ -260,7 +261,7 @@ public class MarkSweepGarbageCollector i
                 blobIdRetrieverThread.join();
             }
         }
-
+        difference();
         LOG.debug("Ending mark phase of the garbage collector");
     }
 
@@ -309,68 +310,74 @@ public class MarkSweepGarbageCollector i
      * @throws IOException
      *             Signals that an I/O exception has occurred.
      */
-    protected void sweep() throws IOException {
-        state = SWEEPING;        
-        LOG.debug("Starting sweep phase of the garbage collector");
-
-        ConcurrentLinkedQueue<String> exceptionQueue = new 
ConcurrentLinkedQueue<String>();
-        ExecutorService executorService =
-                new ThreadPoolExecutor(getNumSweepers(), getNumSweepers(), 1,
-                        TimeUnit.MINUTES,
-                        new LinkedBlockingQueue<Runnable>(),
-                        new ThreadFactory() {
-                            private final AtomicInteger threadCounter = new 
AtomicInteger();
-
-                            private String getName() {
-                                return "MarkSweepGarbageCollector-Sweeper-" + 
threadCounter.getAndIncrement();
-                            }
-
-                            @Override
-                            public Thread newThread(Runnable r) {
-                                Thread thread = new Thread(r, getName());
-                                thread.setDaemon(true);
-                                return thread;
-                            }
-                        });
-
-        LineIterator iterator = FileUtils.lineIterator(fs.getGcCandidates(), 
Charsets.UTF_8.name());
-        List<String> ids = Lists.newArrayList();
-        int count = 0;
-        while (iterator.hasNext()) {
-            ids.add(iterator.next());
-
-            if (ids.size() > getBatchCount()) {
+    public void sweep() throws IOException {
+        try {        
+            state = SWEEPING;        
+            LOG.debug("Starting sweep phase of the garbage collector");
+    
+            ConcurrentLinkedQueue<String> exceptionQueue = new 
ConcurrentLinkedQueue<String>();
+            ExecutorService executorService =
+                    new ThreadPoolExecutor(getNumSweepers(), getNumSweepers(), 
1,
+                            TimeUnit.MINUTES,
+                            new LinkedBlockingQueue<Runnable>(),
+                            new ThreadFactory() {
+                                private final AtomicInteger threadCounter = 
new AtomicInteger();
+    
+                                private String getName() {
+                                    return 
"MarkSweepGarbageCollector-Sweeper-" + threadCounter.getAndIncrement();
+                                }
+    
+                                @Override
+                                public Thread newThread(Runnable r) {
+                                    Thread thread = new Thread(r, getName());
+                                    thread.setDaemon(true);
+                                    return thread;
+                                }
+                            });
+    
+            LineIterator iterator = 
+                    FileUtils.lineIterator(fs.getGcCandidates(), 
Charsets.UTF_8.name());
+            List<String> ids = Lists.newArrayList();
+            int count = 0;
+            while (iterator.hasNext()) {
+                ids.add(iterator.next());
+    
+                if (ids.size() > getBatchCount()) {
+                    count += ids.size();
+                    executorService.execute(new Sweeper(ids, exceptionQueue));
+                    ids = Lists.newArrayList();
+                }
+            }
+            if (!ids.isEmpty()) {
                 count += ids.size();
                 executorService.execute(new Sweeper(ids, exceptionQueue));
-                ids = Lists.newArrayList();
             }
-        }
-        if (!ids.isEmpty()) {
-            count += ids.size();
-            executorService.execute(new Sweeper(ids, exceptionQueue));
-        }
-
-        try {
-            executorService.shutdown();
-            executorService.awaitTermination(100, TimeUnit.MINUTES);
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
-
-        count -= exceptionQueue.size();
-        BufferedWriter writer = null;
-        try {
-            if (!exceptionQueue.isEmpty()) {
-                writer = Files.newWriter(fs.getGarbage(), Charsets.UTF_8);
-                saveBatchToFile(Lists.newArrayList(exceptionQueue), writer);
+    
+            try {
+                executorService.shutdown();
+                executorService.awaitTermination(100, TimeUnit.MINUTES);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
             }
+    
+            count -= exceptionQueue.size();
+            BufferedWriter writer = null;
+            try {
+                if (!exceptionQueue.isEmpty()) {
+                    writer = Files.newWriter(fs.getGarbage(), Charsets.UTF_8);
+                    saveBatchToFile(Lists.newArrayList(exceptionQueue), 
writer);
+                }
+            } finally {
+                LineIterator.closeQuietly(iterator);
+                IOUtils.closeQuietly(writer);
+            }
+    
+            LOG.debug("Blobs deleted count - " + count);
+            LOG.debug("Ending sweep phase of the garbage collector");
         } finally {
-            LineIterator.closeQuietly(iterator);
-            IOUtils.closeQuietly(writer);
+            fs.complete();
+            state = NOT_RUNNING;
         }
-
-        LOG.debug("Blobs deleted count - " + count);
-        LOG.debug("Ending sweep phase of the garbage collector");
     }
 
     /**
@@ -396,10 +403,10 @@ public class MarkSweepGarbageCollector i
     class Sweeper implements Runnable {
 
         /** The exception queue. */
-        private ConcurrentLinkedQueue<String> exceptionQueue;
+        private final ConcurrentLinkedQueue<String> exceptionQueue;
 
         /** The ids to sweep. */
-        private List<String> ids;
+        private final List<String> ids;
 
         /**
          * Instantiates a new sweeper.
@@ -417,8 +424,10 @@ public class MarkSweepGarbageCollector i
         @Override
         public void run() {
             try {
-                boolean deleted = ((GarbageCollectableBlobStore) 
nodeStore.getBlobStore())
-                        .deleteChunks(ids, maxLastModifiedTime);
+                boolean deleted =
+                        blobStore.deleteChunks(ids,
+                                        (maxLastModifiedInterval > 0 ? 
System.currentTimeMillis()
+                                                - maxLastModifiedInterval : 
0));
                 if (!deleted) {
                     exceptionQueue.addAll(ids);
                 }
@@ -436,57 +445,59 @@ public class MarkSweepGarbageCollector i
      * @throws Exception
      *             the exception
      */
-    private List<String> iterateNodeTree() throws Exception {
-        ArrayList<String> referencedBlobs = Lists.newArrayList();
-        BufferedWriter writer = null;
+    private void iterateNodeTree() throws Exception {
+        final BufferedWriter writer = Files.newWriter(fs.getMarkedRefs(), 
Charsets.UTF_8);
         try {
-            writer = Files.newWriter(fs.getMarkedRefs(), Charsets.UTF_8);
-
-            fs.sort(fs.getMarkedRefs());
-
-            Iterator<Blob> blobIterator = 
nodeStore.getReferencedBlobsIterator();
-            referencedBlobs.ensureCapacity(getBatchCount());
-
-            int referencesFound = 0;
-            while (blobIterator.hasNext()) {
-                Blob blob = blobIterator.next();
+            marker.getReferences(
+                    new ReferenceCollector() {
+                        private final List<String> idBatch = Lists
+                                .newArrayListWithCapacity(getBatchCount());
+
+                        private int count = 0;
+
+                        @Override
+                        public void addReference(String blobId) {
+                            if (debugMode) {
+                                LOG.debug("BlobId : " + blobId);
+                            }
 
-                if (debugMode) {
-                    LOG.debug("BlobId : " + blob.toString());
-                }
+                            try {
+                                Iterator<String> idIter = 
blobStore.resolveChunks(blobId);
+                                while (idIter.hasNext()) {
+                                    String id = idIter.next();
+                                    idBatch.add(id);
+
+                                    if (idBatch.size() >= getBatchCount()) {
+                                        saveBatchToFile(idBatch, writer);
+                                        idBatch.clear();
+                                    }
+
+                                    if (debugMode) {
+                                        LOG.debug("chunkId : " + id);
+                                    }
+                                    count++;
+                                }
+
+                                if (!idBatch.isEmpty()) {
+                                    saveBatchToFile(idBatch, writer);
+                                    idBatch.clear();
+                                }
+                            } catch (Exception e) {
+                                throw new RuntimeException("Error in 
retrieving references", e);
+                            }
 
-                if (blob.toString().length() != 0) {
-                    Iterator<String> idIter = ((GarbageCollectableBlobStore) 
nodeStore
-                            .getBlobStore())
-                            .resolveChunks(blob.toString());
-                    while (idIter.hasNext()) {
-                        String id = idIter.next();
-                        referencedBlobs.add(id);
-                        if (debugMode) {
-                            LOG.debug("chunkId : " + id);
+                            LOG.info("Marked Reference : " + count);
                         }
-                    }
-                }
-
-                if (referencedBlobs.size() >= getBatchCount()) {
-                    referencesFound += referencedBlobs.size();
-                    saveBatchToFile(referencedBlobs, writer);
-                }
-            }
+                    });
 
-            if (!referencedBlobs.isEmpty()) {
-                referencesFound += referencedBlobs.size();
-                saveBatchToFile(referencedBlobs, writer);
-            }
+            // sort the marked references
             fs.sort(fs.getMarkedRefs());
-
-            LOG.debug("Blob references found (including chunk resolution) " + 
referencesFound);
         } finally {
             IOUtils.closeQuietly(writer);
         }
-        return referencedBlobs;
     }
 
+
     /**
      * BlobIdRetriever class to retrieve all blob ids.
      */
@@ -506,8 +517,8 @@ public class MarkSweepGarbageCollector i
             try {
                 bufferWriter = new BufferedWriter(
                         new FileWriter(fs.getAvailableRefs()));
-                Iterator<String> idsIter = ((GarbageCollectableBlobStore) 
nodeStore.getBlobStore())
-                        .getAllChunkIds(maxLastModifiedTime);
+                Iterator<String> idsIter = 
blobStore.getAllChunkIds(maxLastModifiedInterval);
+                
                 List<String> ids = Lists.newArrayList();
                 int blobsCount = 0;
                 while (idsIter.hasNext()) {
@@ -525,7 +536,7 @@ public class MarkSweepGarbageCollector i
 
                 // sort the file
                 fs.sort(fs.getAvailableRefs());
-                LOG.debug("Ending retrieve of all blobs : " + blobsCount);
+                LOG.debug("Ending retrieving all blobs : " + blobsCount);
             } catch (Exception e) {
                 e.printStackTrace();
             } finally {
@@ -543,12 +554,12 @@ public class MarkSweepGarbageCollector i
     class FileLineDifferenceIterator<T> implements Iterator<String> {
 
         /** The marked references iterator. */
-        private LineIterator markedIter;
+        private final LineIterator markedIter;
 
         /** The available references iter. */
-        private LineIterator allIter;
+        private final LineIterator allIter;
 
-        private ArrayDeque<String> queue;
+        private final ArrayDeque<String> queue;
 
         private boolean done;
 

Added: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/ReferenceCollector.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/ReferenceCollector.java?rev=1582748&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/ReferenceCollector.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/ReferenceCollector.java
 Fri Mar 28 13:30:06 2014
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob;
+
+/**
+ * Callback interface for collecting all blob references that are
+ * potentially accessible. Useful for marking referenced blobs as
+ * in use when collecting garbage in an external data store.
+ */
+public interface ReferenceCollector {
+
+    void addReference(String reference);
+
+}
+

Added: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentBlobReferenceRetriever.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentBlobReferenceRetriever.java?rev=1582748&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentBlobReferenceRetriever.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentBlobReferenceRetriever.java
 Fri Mar 28 13:30:06 2014
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.Iterator;
+
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.plugins.blob.BlobReferenceRetriever;
+import org.apache.jackrabbit.oak.plugins.blob.ReferenceCollector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of {@link BlobReferenceRetriever} for the DocumentNodeStore.
+ */
+public class DocumentBlobReferenceRetriever implements BlobReferenceRetriever {
+    public static final Logger LOG = 
LoggerFactory.getLogger(DocumentBlobReferenceRetriever.class);
+
+    private final Iterator<Blob> blobIterator;
+
+    public DocumentBlobReferenceRetriever(Iterator<Blob> iterator) {
+        this.blobIterator = iterator;
+    }
+
+    @Override
+    public void getReferences(ReferenceCollector collector) throws Exception {
+        int referencesFound = 0;
+        while (blobIterator.hasNext()) {
+            Blob blob = blobIterator.next();
+            if (blob.length() != 0) {
+                collector.addReference(blob.toString());
+            }
+        }
+
+        LOG.debug("Blob references found (including chunk resolution) " + 
referencesFound);
+    }
+}
+

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java?rev=1582748&r1=1582747&r2=1582748&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
 Fri Mar 28 13:30:06 2014
@@ -309,9 +309,10 @@ public class DocumentNodeStoreService {
         executor.start(wb);
         if (blobStore instanceof GarbageCollectableBlobStore) {
             MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector();
-            gc.init(store);  // FIXME OAK-1582 ClassCastException in 
MarkSweepGarbageCollector#init() if using KernelNodeStore
+            gc.init(new 
DocumentBlobReferenceRetriever(store.getReferencedBlobsIterator()),
+                    (GarbageCollectableBlobStore) store.getBlobStore());
             registrations.add(registerMBean(wb, BlobGCMBean.class, new 
BlobGC(gc, executor),
-                    BlobGCMBean.TYPE, "Segment node store blob garbage 
collection"));
+                    BlobGCMBean.TYPE, "Document node store blob garbage 
collection"));
         }
 
         RevisionGC revisionGC = new RevisionGC(new Runnable() {

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java?rev=1582748&r1=1582747&r2=1582748&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
 Fri Mar 28 13:30:06 2014
@@ -36,6 +36,7 @@ import java.util.concurrent.ConcurrentMa
 
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.plugins.blob.ReferenceCollector;
 import org.apache.jackrabbit.oak.plugins.memory.PropertyStates;
 
 import com.google.common.base.Charsets;

Added: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBlobReferenceRetriever.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBlobReferenceRetriever.java?rev=1582748&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBlobReferenceRetriever.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBlobReferenceRetriever.java
 Fri Mar 28 13:30:06 2014
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment;
+
+import org.apache.jackrabbit.oak.plugins.blob.BlobReferenceRetriever;
+import org.apache.jackrabbit.oak.plugins.blob.ReferenceCollector;
+
+/**
+ * Implementation of {@link BlobReferenceRetriever} to retrieve blob 
references from the
+ * {@link SegmentTracker}.
+ */
+public class SegmentBlobReferenceRetriever implements BlobReferenceRetriever {
+
+    private final SegmentTracker tracker;
+
+    public SegmentBlobReferenceRetriever(SegmentTracker tracker) {
+        this.tracker = tracker;
+    }
+
+    @Override
+    public void getReferences(final ReferenceCollector collector) {
+        tracker.collectBlobReferences(collector);
+    }
+}
+

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java?rev=1582748&r1=1582747&r2=1582748&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
 Fri Mar 28 13:30:06 2014
@@ -127,7 +127,7 @@ public class SegmentNodeStoreService ext
         }
 
         Dictionary<?, ?> properties = context.getProperties();
-        name = "" + properties.get(NAME);
+        name = String.valueOf(properties.get(NAME));
 
         String directory = lookup(context, DIRECTORY);
         if (directory == null) {
@@ -175,7 +175,8 @@ public class SegmentNodeStoreService ext
 
         if (blobStore instanceof GarbageCollectableBlobStore) {
             MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector();
-//            gc.init(delegate);  FIXME OAK-1582 ClassCastException in 
MarkSweepGarbageCollector#init()
+            gc.init(new SegmentBlobReferenceRetriever(store.getTracker()), 
+                        (GarbageCollectableBlobStore) blobStore);
             blobGCRegistration = registerMBean(whiteboard, BlobGCMBean.class, 
new BlobGC(gc, executor),
                     BlobGCMBean.TYPE, "Segment node store blob garbage 
collection");
         }

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java?rev=1582748&r1=1582747&r2=1582748&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java
 Fri Mar 28 13:30:06 2014
@@ -26,6 +26,8 @@ import java.util.LinkedList;
 import java.util.Queue;
 import java.util.Set;
 
+import org.apache.jackrabbit.oak.plugins.blob.ReferenceCollector;
+
 /**
  * Tracker of references to segment identifiers and segment instances
  * that are currently kept in memory.

Modified: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java?rev=1582748&r1=1582747&r2=1582748&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
 Fri Mar 28 13:30:06 2014
@@ -31,9 +31,9 @@ import com.google.common.collect.Sets;
 import com.mongodb.BasicDBObject;
 import com.mongodb.DBCollection;
 
-import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
 import org.apache.jackrabbit.oak.api.Blob;
 import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
+import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
@@ -92,9 +92,10 @@ public class MongoBlobGCTest extends Abs
     public void gc() throws Exception {
         HashSet<String> set = setUp();
 
-        DocumentNodeStore s = mk.getNodeStore();
+        DocumentNodeStore store = mk.getNodeStore();
         MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector();
-        gc.init(s, "./target", 2048, true, 2, 0);
+        gc.init(new 
DocumentBlobReferenceRetriever(store.getReferencedBlobsIterator()),
+                (GarbageCollectableBlobStore) store.getBlobStore(), 
"./target", 2048, true, 2, 0);
         gc.collectGarbage();
 
         Set<String> existing = iterate();
@@ -109,7 +110,7 @@ public class MongoBlobGCTest extends Abs
 
         Set<String> existing = Sets.newHashSet();
         while (cur.hasNext()) {
-            existing.add((String) cur.next());
+            existing.add(cur.next());
         }
         return existing;
     }

Added: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java?rev=1582748&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java
 Fri Mar 28 13:30:06 2014
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.core.data.FileDataStore;
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
+import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.junit.After;
+import org.junit.Test;
+
+/**
+ * Tests for SegmentNodeStore DataStore GC
+ */
+public class SegmentDataStoreBlobGCTest {
+    SegmentNodeStore nodeStore;
+    SegmentStore store;
+    DataStoreBlobStore blobStore;
+
+    protected SegmentNodeStore getNodeStore(BlobStore blobStore) throws 
IOException {
+        if (nodeStore == null) {
+            store = new FileStore(blobStore, getWorkDir(), 256, false);
+            nodeStore = new SegmentNodeStore(store);
+        }
+        return nodeStore;
+    }
+
+    private static File getWorkDir() {
+        return new File("target", "DataStoreBlobGCTest");
+    }
+
+    public HashSet<String> setUp() throws Exception {
+        FileDataStore fds = new FileDataStore();
+        fds.setMinRecordLength(4092);
+        fds.init(getWorkDir().getAbsolutePath());
+        blobStore = new DataStoreBlobStore(fds);
+        nodeStore = getNodeStore(blobStore);
+
+        HashSet<String> set = new HashSet<String>();
+
+        NodeBuilder a = nodeStore.getRoot().builder();
+
+        int number = 2;
+        // track the number of the assets to be deleted
+        List<Integer> processed = Lists.newArrayList();
+        Random rand = new Random();
+        for (int i = 0; i < 1; i++) {
+            int n = rand.nextInt(number);
+            if (!processed.contains(n)) {
+                processed.add(n);
+            }
+        }
+        for (int i = 0; i < number; i++) {
+            Blob b = nodeStore.createBlob(randomStream(i, 16516));
+            if (processed.contains(i)) {
+                Iterator<String> idIter = blobStore
+                        .resolveChunks(b.toString());
+                while (idIter.hasNext()) {
+                    set.add(idIter.next());
+                }
+            }
+            a.child("c" + i).setProperty("x", b);
+        }
+        nodeStore.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        for (int id : processed) {
+            delete("c" + id);
+        }
+        store.gc();
+
+        return set;
+    }
+
+    private void delete(String nodeId) throws CommitFailedException {
+        NodeBuilder builder = nodeStore.getRoot().builder();
+        builder.child(nodeId).remove();
+
+        nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+    }
+
+    @Test
+    public void gc() throws Exception {
+        HashSet<String> set = setUp();
+
+        MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector();
+        gc.init(new SegmentBlobReferenceRetriever(store.getTracker()),
+                (GarbageCollectableBlobStore) store.getBlobStore(), 
"./target", 2048, true, 2, 0);
+        gc.collectGarbage();
+
+        Set<String> existing = iterate();
+        boolean empty = Sets.intersection(set, existing).isEmpty();
+        assertTrue(empty);
+    }
+
+    protected Set<String> iterate() throws Exception {
+        Iterator<String> cur = blobStore.getAllChunkIds(0);
+
+        Set<String> existing = Sets.newHashSet();
+        while (cur.hasNext()) {
+            existing.add(cur.next());
+        }
+        return existing;
+    }
+
+    @After
+    public void close() throws IOException {
+        if (store != null) {
+            store.close();
+        }
+        FileUtils.cleanDirectory(getWorkDir());
+    }
+
+    static InputStream randomStream(int seed, int size) {
+        Random r = new Random(seed);
+        byte[] data = new byte[size];
+        r.nextBytes(data);
+        return new ByteArrayInputStream(data);
+    }
+}
+


Reply via email to