Author: amitj
Date: Fri Feb 13 05:20:44 2015
New Revision: 1659457

URL: http://svn.apache.org/r1659457
Log:
OAK-2514: Shared DataStore GC framework support

Introduced a new interface for supporting heterogeneous deployments on a shared 
data store

Added:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStore.java
   (with props)
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java
   (with props)
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/identifier/ClusterRepositoryInfo.java
   (with props)
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/ClusterRepositoryInfoTest.java
   (with props)
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreUtilsTest.java
   (with props)
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/SharedBlobStoreGCTest.java
   (with props)
Modified:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGarbageCollector.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreUtils.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGarbageCollector.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGarbageCollector.java?rev=1659457&r1=1659456&r2=1659457&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGarbageCollector.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGarbageCollector.java
 Fri Feb 13 05:20:44 2015
@@ -22,9 +22,11 @@ package org.apache.jackrabbit.oak.plugin
 public interface BlobGarbageCollector {
 
     /**
-     * Collect garbage blobs from the passed node store instance.
-     * 
-     * @throws Exception
+     * Marks garbage blobs from the passed node store instance.
+     * Collects them only if markOnly is false.
+     *
+     * @param markOnly whether to only mark references and not sweep in the 
mark and sweep operation.
+     * @throws Exception the exception
      */
-    void collectGarbage() throws Exception;
-}
\ No newline at end of file
+    void collectGarbage(boolean markOnly) throws Exception;
+}

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java?rev=1659457&r1=1659456&r2=1659457&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java
 Fri Feb 13 05:20:44 2015
@@ -18,12 +18,16 @@ package org.apache.jackrabbit.oak.plugin
 
 import java.io.Closeable;
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.Comparator;
+import java.util.List;
 
 import com.google.common.io.Files;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.commons.IOUtils;
 import org.apache.jackrabbit.oak.commons.sort.ExternalSort;
 
 /**
@@ -47,6 +51,13 @@ class GarbageCollectorFileState implemen
 
     /** The garbage stores the garbage collection candidates which were not 
deleted . */
     private final File garbage;
+    private final static Comparator<String> lexComparator = 
+            new Comparator<String>() {
+                @Override
+                public int compare(String s1, String s2) {
+                    return s1.compareTo(s2);
+                }
+            };
 
     /**
      * Instantiates a new garbage collector file state.
@@ -118,21 +129,26 @@ class GarbageCollectorFileState implemen
      * 
      * @param file file whose contents needs to be sorted
      */
-    public void sort(File file) throws IOException {
+    public static void sort(File file) throws IOException {
         File sorted = createTempFile();
-        Comparator<String> lexComparator = new Comparator<String>() {
-            @Override
-            public int compare(String s1, String s2) {
-                return s1.compareTo(s2);
-            }
-        };
-        ExternalSort.mergeSortedFiles(
-                ExternalSort.sortInBatch(file, lexComparator, true),
-                sorted, lexComparator, true);
+        merge(ExternalSort.sortInBatch(file, lexComparator, true), sorted);
         Files.move(sorted, file);
     }
+    
+    public static void merge(List<File> files, File output) throws IOException 
{
+        ExternalSort.mergeSortedFiles(
+                files,
+                output, lexComparator, true);        
+    }
+    
+    public static File copy(InputStream stream) throws IOException {
+        File file = createTempFile();
+        IOUtils.copy(stream, 
+                new FileOutputStream(file));
+        return file;
+    }
 
-    private File createTempFile() throws IOException {
-        return File.createTempFile("temp", null, home);
+    private static File createTempFile() throws IOException {
+        return File.createTempFile("temp", null);
     }
-}
\ No newline at end of file
+}

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java?rev=1659457&r1=1659456&r2=1659457&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
 Fri Feb 13 05:20:44 2015
@@ -19,14 +19,16 @@ package org.apache.jackrabbit.oak.plugin
 import java.io.BufferedWriter;
 import java.io.Closeable;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.FileWriter;
 import java.io.IOException;
+import java.io.InputStream;
 import java.sql.Timestamp;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -41,15 +43,20 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.PeekingIterator;
 import com.google.common.io.Closeables;
 import com.google.common.io.Files;
-import com.google.common.util.concurrent.ListenableFutureTask;
-import com.google.common.util.concurrent.MoreExecutors;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.LineIterator;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStoreException;
 import org.apache.jackrabbit.oak.commons.IOUtils;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
+import 
org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType;
 import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 /**
  * Mark and sweep garbage collector.
  * 
@@ -72,9 +79,6 @@ public class MarkSweepGarbageCollector i
     /** The last modified time before current time of blobs to consider for 
garbage collection. */
     private final long maxLastModifiedInterval;
 
-    /** Run concurrently when possible. */
-    private final boolean runConcurrently;
-
     /** The blob store to be garbage collected. */
     private final GarbageCollectableBlobStore blobStore;
 
@@ -89,6 +93,8 @@ public class MarkSweepGarbageCollector i
     /** The batch count. */
     private final int batchCount;
 
+    private String repoId;
+
     /** Flag to indicate the state of the gc **/
     private State state = State.NOT_RUNNING;
 
@@ -96,13 +102,15 @@ public class MarkSweepGarbageCollector i
      * Creates an instance of MarkSweepGarbageCollector
      *
      * @param marker BlobReferenceRetriever instanced used to fetch refereed 
blob entries
+     * @param blobStore the blob store instance
+     * @param executor executor
      * @param root the root absolute path of directory under which temporary
      *             files would be created
      * @param batchCount batch sized used for saving intermediate state
-     * @param runBackendConcurrently - run the backend iterate concurrently
-     * @param maxLastModifiedInterval - lastModifiedTime in millis. Only files 
with time
+     * @param maxLastModifiedInterval lastModifiedTime in millis. Only files 
with time
      *                                less than this time would be considered 
for GC
-     * @throws IOException Signals that an I/O exception has occurred.
+     * @param repositoryId - unique repository id for this node
+     * @throws IOException
      */
     public MarkSweepGarbageCollector(
             BlobReferenceRetriever marker,
@@ -110,41 +118,43 @@ public class MarkSweepGarbageCollector i
             Executor executor,
             String root,
             int batchCount,
-            boolean runBackendConcurrently,
-            long maxLastModifiedInterval)
+            long maxLastModifiedInterval,
+            @Nullable String repositoryId)
             throws IOException {
         this.executor = executor;
         this.blobStore = blobStore;
         this.marker = marker;
         this.batchCount = batchCount;
-        this.runConcurrently = runBackendConcurrently;
         this.maxLastModifiedInterval = maxLastModifiedInterval;
-        fs = new GarbageCollectorFileState(root);        
+        this.repoId = repositoryId;
+        fs = new GarbageCollectorFileState(root);
     }
 
     /**
      * Instantiates a new blob garbage collector.
      */
     public MarkSweepGarbageCollector(
-            BlobReferenceRetriever marker, 
+            BlobReferenceRetriever marker,
             GarbageCollectableBlobStore blobStore,
-            Executor executor)
+            Executor executor,
+            @Nullable String repositoryId)
             throws IOException {
-        this(marker, blobStore, executor, TEMP_DIR, DEFAULT_BATCH_COUNT, true, 
TimeUnit.HOURS.toMillis(24));
+        this(marker, blobStore, executor, TEMP_DIR, DEFAULT_BATCH_COUNT, 
TimeUnit.HOURS
+                .toMillis(24), repositoryId);
     }
 
     public MarkSweepGarbageCollector(
             BlobReferenceRetriever marker,
             GarbageCollectableBlobStore blobStore,
             Executor executor,
-            long maxLastModifiedInterval)
-            throws IOException {
-        this(marker, blobStore, executor, TEMP_DIR, DEFAULT_BATCH_COUNT, true, 
maxLastModifiedInterval);
+            long maxLastModifiedInterval,
+            @Nullable String repositoryId) throws IOException {
+        this(marker, blobStore, executor, TEMP_DIR, DEFAULT_BATCH_COUNT, 
maxLastModifiedInterval, repositoryId);
     }
 
     @Override
-    public void collectGarbage() throws Exception {
-        markAndSweep();
+    public void collectGarbage(boolean markOnly) throws Exception {
+        markAndSweep(markOnly);
     }
 
     /**
@@ -157,22 +167,29 @@ public class MarkSweepGarbageCollector i
     }
 
     /**
-     * Mark and sweep. Main method for GC.
+     * Mark and sweep. Main entry method for GC.
+     *
+     * @param markOnly whether to mark only
+     * @throws Exception the exception
      */
-    private void markAndSweep() throws IOException, InterruptedException {
+    private void markAndSweep(boolean markOnly) throws Exception {
         boolean threw = true;
         try {
             Stopwatch sw = Stopwatch.createStarted();
             LOG.info("Starting Blob garbage collection");
 
             mark();
-            int deleteCount = sweep();
-            threw = false;
+            if (!markOnly) {
+                int deleteCount = sweep();
+                threw = false;
 
-            LOG.info("Blob garbage collection completed in {}. Number of blobs 
" +
-                    "deleted [{}]", sw.toString(), deleteCount);
+                LOG.info("Blob garbage collection completed in {}. Number of 
blobs "
+                        + "deleted [{}]", sw.toString(), deleteCount);
+            }
         } finally {
-            Closeables.close(fs, threw);
+            if (LOG.isTraceEnabled()) {
+                Closeables.close(fs, threw);
+            }
             state = State.NOT_RUNNING;
         }
     }
@@ -180,29 +197,16 @@ public class MarkSweepGarbageCollector i
     /**
      * Mark phase of the GC.
      */
-    private void mark() throws IOException, InterruptedException {
+    private void mark() throws IOException, DataStoreException {
         state = State.MARKING;
         LOG.debug("Starting mark phase of the garbage collector");
 
-        // Find all blobs available in the blob store
-        ListenableFutureTask<Integer> blobIdRetriever = 
ListenableFutureTask.create(new BlobIdRetriever());
-        if (runConcurrently) {
-            executor.execute(blobIdRetriever);
-        } else {
-            MoreExecutors.sameThreadExecutor().execute(blobIdRetriever);
-        }
-
-        // Find all blob references after iterating over the whole repository
+        // Mark all used references
         iterateNodeTree();
 
-        try {
-            blobIdRetriever.get();
-        } catch (ExecutionException e) {
-           LOG.warn("Error occurred while fetching all the blobIds from the 
BlobStore. GC would " +
-                   "continue with the blobIds retrieved so far", e.getCause());
-        }
+        // Move the marked references file to the data store meta area if 
applicable
+        GarbageCollectionType.get(blobStore).addMarked(blobStore, fs, repoId);
 
-        difference();
         LOG.debug("Ending mark phase of the garbage collector");
     }
 
@@ -248,16 +252,55 @@ public class MarkSweepGarbageCollector i
 
     /**
      * Sweep phase of gc candidate deletion.
-     * 
-     * @throws IOException
-     *             Signals that an I/O exception has occurred.
+     * <p>
+     * Performs the following steps depending upon the type of the blob store 
refer
+     * {@link org.apache.jackrabbit.oak.plugins.blob.SharedDataStore.Type}:
+     *
+     * <ul>
+     *     <li>Shared</li>
+     *     <ul>
+     *      <li> Merge all marked references (from the mark phase run 
independently) available in the data store meta
+     *          store (from all configured independent repositories).
+     *      <li> Retrieve all blob ids available.
+     *      <li> Diffs the 2 sets above to retrieve list of blob ids not used.
+     *      <li> Deletes only blobs created after
+     *          (earliest time stamp of the marked references - 
#maxLastModifiedInterval) from the above set.
+     *     </ul>
+     *
+     *     <li>Default</li>
+     *     <ul>
+     *      <li> Mark phase already run.
+     *      <li> Retrieve all blob ids available.
+     *      <li> Diffs the 2 sets above to retrieve list of blob ids not used.
+     *      <li> Deletes only blobs created after
+     *          (time stamp of the marked references - 
#maxLastModifiedInterval).
+     *     </ul>
+     * </ul>
+     *
+     * @return the number of blobs deleted
+     * @throws Exception the exception
      */
-    private int sweep() throws IOException {
+    private int sweep() throws Exception {
+        long earliestRefAvailTime;
+        // Merge all the blob references available from all the reference 
files in the data store meta store
+        // Only go ahead if merge succeeded
+        try {
+            earliestRefAvailTime =
+                    
GarbageCollectionType.get(blobStore).mergeAllMarkedReferences(blobStore, fs);
+        } catch (Exception e) {
+            return 0;
+        }
+
+        // Find all blob references after iterating over the whole repository
+        (new BlobIdRetriever()).call();
+
+        // Calculate the references not used
+        difference();
         int count = 0;
         state = State.SWEEPING;
         LOG.debug("Starting sweep phase of the garbage collector");
         LOG.debug("Sweeping blobs with modified time > than the configured max 
deleted time ({}). " +
-                timestampToString(getLastMaxModifiedTime()));
+                
timestampToString(getLastMaxModifiedTime(earliestRefAvailTime)));
 
         ConcurrentLinkedQueue<String> exceptionQueue = new 
ConcurrentLinkedQueue<String>();
 
@@ -270,13 +313,13 @@ public class MarkSweepGarbageCollector i
 
             if (ids.size() > getBatchCount()) {
                 count += ids.size();
-                executor.execute(new Sweeper(ids, exceptionQueue));
+                executor.execute(new Sweeper(ids, exceptionQueue, 
earliestRefAvailTime));
                 ids = Lists.newArrayList();
             }
         }
         if (!ids.isEmpty()) {
             count += ids.size();
-            executor.execute(new Sweeper(ids, exceptionQueue));
+            executor.execute(new Sweeper(ids, exceptionQueue, 
earliestRefAvailTime));
         }
 
         count -= exceptionQueue.size();
@@ -291,11 +334,13 @@ public class MarkSweepGarbageCollector i
             IOUtils.closeQuietly(writer);
         }
         if(!exceptionQueue.isEmpty()) {
-            LOG.warn("Unable to delete some blobs entries from the blob store. 
" +
-                    "This may happen if blob modified time is > than the max 
deleted time ({}). " +
-                    "Details around such blob entries can be found in [{}]",
-                    timestampToString(getLastMaxModifiedTime()), 
fs.getGarbage().getAbsolutePath());
+            LOG.warn(
+                "Unable to delete some blobs entries from the blob store. This 
may happen if blob modified time is > "
+                    + "than the max deleted time ({}). Details around such 
blob entries can be found in [{}]",
+                
timestampToString(getLastMaxModifiedTime(earliestRefAvailTime)), 
fs.getGarbage().getAbsolutePath());
         }
+        // Remove all the merged marked references
+        
GarbageCollectionType.get(blobStore).removeAllMarkedReferences(blobStore);
         LOG.debug("Ending sweep phase of the garbage collector");
         return count;
     }
@@ -304,10 +349,10 @@ public class MarkSweepGarbageCollector i
         return batchCount;
     }
 
-    private long getLastMaxModifiedTime(){
+    private long getLastMaxModifiedTime(long maxModified) {
         return maxLastModifiedInterval > 0 ?
-                System.currentTimeMillis() - maxLastModifiedInterval : 0;
-
+            ((maxModified <= 0 ? System.currentTimeMillis() : maxModified) - 
maxLastModifiedInterval) :
+            0;
     }
 
     /**
@@ -331,16 +376,20 @@ public class MarkSweepGarbageCollector i
         /** The ids to sweep. */
         private final List<String> ids;
 
-        public Sweeper(List<String> ids, ConcurrentLinkedQueue<String> 
exceptionQueue) {
+        private final long maxModified;
+
+        public Sweeper(List<String> ids, ConcurrentLinkedQueue<String> 
exceptionQueue,
+                long maxModified) {
             this.exceptionQueue = exceptionQueue;
             this.ids = ids;
+            this.maxModified = maxModified;
         }
 
         @Override
         public void run() {
             try {
                 LOG.debug("Blob ids to be deleted {}", ids);
-                boolean deleted = blobStore.deleteChunks(ids, 
getLastMaxModifiedTime());
+                boolean deleted = blobStore.deleteChunks(ids, 
getLastMaxModifiedTime(maxModified));
                 if (!deleted) {
                     // Only log and do not add to exception queue since some 
blobs may not match the
                     // lastMaxModifiedTime criteria.
@@ -369,7 +418,7 @@ public class MarkSweepGarbageCollector i
                         @Override
                         public void addReference(String blobId) {
                             if (debugMode) {
-                                LOG.trace("BlobId : {}",blobId);
+                                LOG.trace("BlobId : {}", blobId);
                             }
 
                             try {
@@ -384,7 +433,7 @@ public class MarkSweepGarbageCollector i
                                     }
 
                                     if (debugMode) {
-                                        LOG.trace("chunkId : {}",id);
+                                        LOG.trace("chunkId : {}", id);
                                     }
                                     count.getAndIncrement();
                                 }
@@ -400,9 +449,9 @@ public class MarkSweepGarbageCollector i
                     }
             );
             LOG.info("Number of valid blob references marked under mark phase 
of " +
-                    "Blob garbage collection [{}]",count.get());
+                    "Blob garbage collection [{}]", count.get());
             // sort the marked references
-            fs.sort(fs.getMarkedRefs());
+            GarbageCollectorFileState.sort(fs.getMarkedRefs());
         } finally {
             IOUtils.closeQuietly(writer);
         }
@@ -439,22 +488,20 @@ public class MarkSweepGarbageCollector i
                 }
 
                 // sort the file
-                fs.sort(fs.getAvailableRefs());
+                GarbageCollectorFileState.sort(fs.getAvailableRefs());
                 LOG.debug("Number of blobs present in BlobStore : [{}] ", 
blobsCount);
             } finally {
                 IOUtils.closeQuietly(bufferWriter);
             }
             return blobsCount;
         }
-
-
     }
 
 
     /**
      * FileLineDifferenceIterator class which iterates over the difference of 
2 files line by line.
      */
-    static class FileLineDifferenceIterator extends AbstractIterator<String> 
implements Closeable{
+    static class FileLineDifferenceIterator extends AbstractIterator<String> 
implements Closeable {
         private final PeekingIterator<String> peekMarked;
         private final LineIterator marked;
         private final LineIterator all;
@@ -527,4 +574,107 @@ public class MarkSweepGarbageCollector i
     private static String timestampToString(long timestamp){
         return (new Timestamp(timestamp) + "00").substring(0, 23);
     }
+
+    /**
+     * Defines different data store types from the garbage collection 
perspective and encodes the divergent behavior.
+     * <ul></ul>
+     */
+    enum GarbageCollectionType {
+        SHARED {
+            /**
+             * Remove the maked references from the blob store root. Default 
NOOP.
+             * 
+             * @param blobStore the blobStore instance
+             */
+            @Override
+            void removeAllMarkedReferences(GarbageCollectableBlobStore 
blobStore) {
+                ((SharedDataStore) 
blobStore).deleteAllMetadataRecords(SharedStoreRecordType.REFERENCES.getType());
+            }
+
+            /**
+             * Merge all marked references available from all repositories and 
return the earliest time of the references.
+             * 
+             * @param blobStore the blob store
+             * @param fs the fs
+             * @return the long the earliest time of the available references
+             * @throws IOException Signals that an I/O exception has occurred.
+             * @throws DataStoreException the data store exception
+             */
+            @Override
+            long mergeAllMarkedReferences(GarbageCollectableBlobStore 
blobStore,
+                    GarbageCollectorFileState fs)
+                    throws IOException, DataStoreException {
+
+                List<DataRecord> refFiles =
+                    ((SharedDataStore) 
blobStore).getAllMetadataRecords(SharedStoreRecordType.REFERENCES.getType());
+
+                // Get all the repositories registered
+                List<DataRecord> repoFiles =
+                    ((SharedDataStore) 
blobStore).getAllMetadataRecords(SharedStoreRecordType.REPOSITORY.getType());
+
+                // Retrieve repos for which reference files have not been 
created
+                Set<String> unAvailRepos =
+                        
SharedDataStoreUtils.refsNotAvailableFromRepos(repoFiles, refFiles);
+                if (unAvailRepos.isEmpty()) {
+                    // List of files to be merged
+                    List<File> files = Lists.newArrayList();
+                    for (DataRecord refFile : refFiles) {
+                        File file = 
GarbageCollectorFileState.copy(refFile.getStream());
+                        files.add(file);
+                    }
+
+                    GarbageCollectorFileState.merge(files, fs.getMarkedRefs());
+
+                    return 
SharedDataStoreUtils.getEarliestRecord(refFiles).getLastModified();
+                } else {
+                    LOG.error("Not all repositories have marked references 
available : {}", unAvailRepos);
+                    throw new IOException("Not all repositories have marked 
references available");
+                }
+            }
+
+            /**
+             * Adds the marked references to the blob store root. Default NOOP
+             * 
+             * @param blobStore the blob store
+             * @param fs the fs
+             * @param repoId the repo id
+             * @throws DataStoreException the data store exception
+             * @throws IOException Signals that an I/O exception has occurred.
+             */
+            @Override
+            void addMarked(GarbageCollectableBlobStore blobStore, 
GarbageCollectorFileState fs,
+                    String repoId) throws DataStoreException, IOException {
+                InputStream is = new FileInputStream(fs.getMarkedRefs());
+                try {
+                    ((SharedDataStore) blobStore)
+                        .addMetadataRecord(is, 
SharedStoreRecordType.REFERENCES.getNameFromId(repoId));
+                } finally {
+                    Closeables.close(is, false);
+                }
+            }
+        },
+        DEFAULT;
+
+        void removeAllMarkedReferences(GarbageCollectableBlobStore blobStore) 
{}
+
+        void addMarked(GarbageCollectableBlobStore blobStore, 
GarbageCollectorFileState fs,
+                String repoId) throws DataStoreException, IOException {}
+
+        long mergeAllMarkedReferences(GarbageCollectableBlobStore blobStore,
+                GarbageCollectorFileState fs)
+                throws IOException, DataStoreException {
+            // throw id the marked refs not available.
+            if (!fs.getMarkedRefs().exists() || fs.getMarkedRefs().length() == 
0) {
+                throw new IOException("Marked references not available");
+            }
+            return fs.getMarkedRefs().lastModified();
+        }
+
+        public static GarbageCollectionType get(GarbageCollectableBlobStore 
blobStore) {
+            if (SharedDataStoreUtils.isShared(blobStore)) {
+                return SHARED;
+            }
+            return DEFAULT;
+        }
+    }
 }

Added: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStore.java?rev=1659457&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStore.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStore.java
 Fri Feb 13 05:20:44 2015
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import java.io.InputStream;
+import java.util.List;
+
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStoreException;
+
+/**
+ * Interface to be implemented by a shared data store.
+ */
+public interface SharedDataStore {
+    /**
+     * Explicitly identifies the type of the data store
+     */
+    enum Type {
+        SHARED, DEFAULT
+    }
+
+    /**
+     * Adds the root record.
+     * 
+     * @param stream the stream
+     * @param name the name of the root record
+     * @return the data record
+     * @throws DataStoreException the data store exception
+     */
+    void addMetadataRecord(InputStream stream, String name)
+            throws DataStoreException;
+
+    /**
+     * Retrieves the metadata record with the given name
+     *
+     * @param name the name of the record
+     * @return
+     */
+    DataRecord getMetadataRecord(String name);
+
+    /**
+     * Gets the all root records.
+     * 
+     * @return the all root records
+     */
+    List<DataRecord> getAllMetadataRecords(String prefix);
+
+    /**
+     * Deletes the root record represented by the given parameters.
+     * 
+     * @param name the name of the root record
+     * @return success/failure
+     */
+    boolean deleteMetadataRecord(String name);
+
+    /**
+     * Deletes all records matching the given prefix.
+     * 
+     * @param prefix metadata type identifier
+     */
+    void deleteAllMetadataRecords(String prefix);
+
+    /**
+     * Gets the type.
+     * 
+     * @return the type
+     */
+    Type getType();
+}
+

Propchange: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStore.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java?rev=1659457&r1=1659456&r2=1659457&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java
 Fri Feb 13 05:20:44 2015
@@ -48,6 +48,7 @@ import org.apache.jackrabbit.core.data.D
 import org.apache.jackrabbit.core.data.DataStoreException;
 import org.apache.jackrabbit.core.data.MultiDataStoreAware;
 import org.apache.jackrabbit.oak.cache.CacheLIRS;
+import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
 import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
 import org.slf4j.Logger;
@@ -62,7 +63,8 @@ import static com.google.common.collect.
  * It also handles inlining binaries if there size is smaller than
  * {@link org.apache.jackrabbit.core.data.DataStore#getMinRecordLength()}
  */
-public class DataStoreBlobStore implements DataStore, BlobStore, 
GarbageCollectableBlobStore {
+public class DataStoreBlobStore implements DataStore, SharedDataStore, 
BlobStore,
+        GarbageCollectableBlobStore {
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     private final DataStore delegate;
@@ -394,6 +396,48 @@ public class DataStoreBlobStore implemen
         return Iterators.singletonIterator(blobId);
     }
 
+    @Override
+    public void addMetadataRecord(InputStream stream, String name) throws 
DataStoreException {
+        if (delegate instanceof SharedDataStore) {
+            ((SharedDataStore) delegate).addMetadataRecord(stream, name);
+        }
+    }
+
+    @Override public DataRecord getMetadataRecord(String name) {
+        if (delegate instanceof SharedDataStore) {
+            return ((SharedDataStore) delegate).getMetadataRecord(name);
+        }
+        return null;
+    }
+
+    @Override
+    public List<DataRecord> getAllMetadataRecords(String prefix) {
+        if (delegate instanceof SharedDataStore) {
+            return ((SharedDataStore) delegate).getAllMetadataRecords(prefix);
+        }
+        return null;
+    }
+
+    @Override
+    public boolean deleteMetadataRecord(String name) {
+        return delegate instanceof SharedDataStore && ((SharedDataStore) 
delegate).deleteMetadataRecord(name);
+    }
+
+    @Override
+    public void deleteAllMetadataRecords(String prefix) {
+        if (delegate instanceof SharedDataStore) {
+            ((SharedDataStore) delegate).deleteAllMetadataRecords(prefix);
+        }
+    }
+
+    @Override
+    public Type getType() {
+        if (delegate instanceof SharedDataStore) {
+            return Type.SHARED;
+        }
+        return Type.DEFAULT;
+    }
+
     //~---------------------------------------------< Object >
 
     @Override

Added: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java?rev=1659457&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java
 Fri Feb 13 05:20:44 2015
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob.datastore;
+
+import java.util.List;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
+
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+
+/**
+ * Utility class for {@link SharedDataStore}.
+ */
+public class SharedDataStoreUtils {
+    /**
+     * Checks if the blob store shared.
+     *
+     * @param blobStore the blob store
+     * @return true if shared
+     */
+    public static boolean isShared(BlobStore blobStore) {
+        return (blobStore instanceof SharedDataStore)
+            && (((SharedDataStore) blobStore).getType() == 
SharedDataStore.Type.SHARED);
+    }
+
+    /**
+     * Gets the earliest record of the available reference records.
+     * 
+     * @param recs the recs
+     * @return the earliest record
+     */
+    public static DataRecord getEarliestRecord(List<DataRecord> recs) {
+        return Ordering.natural().onResultOf(
+                new Function<DataRecord, Long>() {
+                    @Override
+                    @Nullable
+                    public Long apply(@Nullable DataRecord input) {
+                        return input.getLastModified();
+                    }
+                }).min(recs);
+    }
+
+    /**
+     * Repositories from which marked references not available.
+     * 
+     * @param repos the repos
+     * @param refs the refs
+     * @return the sets the sets whose references not available
+     */
+    public static Set<String> refsNotAvailableFromRepos(List<DataRecord> repos,
+            List<DataRecord> refs) {
+        return Sets.difference(FluentIterable.from(repos).uniqueIndex(
+                new Function<DataRecord, String>() {
+                    @Override
+                    @Nullable
+                    public String apply(@Nullable DataRecord input) {
+                        return 
SharedStoreRecordType.REPOSITORY.getIdFromName(input.getIdentifier().toString());
+                    }
+                }).keySet(),
+                FluentIterable.from(refs).uniqueIndex(
+                        new Function<DataRecord, String>() {
+                            @Override
+                            @Nullable
+                            public String apply(@Nullable DataRecord input) {
+                                return 
SharedStoreRecordType.REFERENCES.getIdFromName(input.getIdentifier().toString());
+                            }
+                        }).keySet());
+    }
+
+    /**
+     * Encapsulates the different type of records at the data store root.
+     */
+    public enum SharedStoreRecordType {
+        REFERENCES("references"), REPOSITORY("repository");
+
+        private final String type;
+
+        SharedStoreRecordType(String type) {
+            this.type = type;
+        }
+
+        public String getType() {
+            return type;
+        }
+
+        public String getIdFromName(String name) {
+            return Splitter.on(DELIIM).limit(2).splitToList(name).get(1);
+        }
+
+        public String getNameFromId(String id) {
+            return Joiner.on(DELIIM).join(getType(), id);
+        }
+
+        static final String DELIIM = "-";
+    }
+}
+

Propchange: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1659457&r1=1659456&r2=1659457&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
 Fri Feb 13 05:20:44 2015
@@ -1973,11 +1973,13 @@ public final class DocumentNodeStore
      * Creates and returns a MarkSweepGarbageCollector if the current BlobStore
      * supports garbage collection
      *
-     * @return garbage collector of the BlobStore supports GC otherwise null
      * @param blobGcMaxAgeInSecs
+     * @param repositoryId
+     * @return garbage collector of the BlobStore supports GC otherwise null
      */
     @CheckForNull
-    public MarkSweepGarbageCollector createBlobGarbageCollector(long 
blobGcMaxAgeInSecs) {
+    public MarkSweepGarbageCollector createBlobGarbageCollector(long 
blobGcMaxAgeInSecs,
+            String repositoryId) {
         MarkSweepGarbageCollector blobGC = null;
         if(blobStore instanceof GarbageCollectableBlobStore){
             try {
@@ -1985,7 +1987,8 @@ public final class DocumentNodeStore
                         new DocumentBlobReferenceRetriever(this),
                             (GarbageCollectableBlobStore) blobStore,
                         executor,
-                        TimeUnit.SECONDS.toMillis(blobGcMaxAgeInSecs));
+                        TimeUnit.SECONDS.toMillis(blobGcMaxAgeInSecs),
+                        repositoryId);
             } catch (IOException e) {
                 throw new RuntimeException("Error occurred while initializing 
" +
                         "the MarkSweepGarbageCollector",e);

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java?rev=1659457&r1=1659456&r2=1659457&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
 Fri Feb 13 05:20:44 2015
@@ -24,6 +24,7 @@ import static org.apache.jackrabbit.oak.
 import static org.apache.jackrabbit.oak.commons.PropertiesUtil.toLong;
 import static 
org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Dictionary;
@@ -55,8 +56,11 @@ import org.apache.jackrabbit.oak.osgi.Os
 import org.apache.jackrabbit.oak.plugins.blob.BlobGC;
 import org.apache.jackrabbit.oak.plugins.blob.BlobGCMBean;
 import org.apache.jackrabbit.oak.plugins.blob.BlobGarbageCollector;
+import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
 import org.apache.jackrabbit.oak.plugins.document.cache.CachingDocumentStore;
 import org.apache.jackrabbit.oak.plugins.document.util.MongoConnection;
+import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
 import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
@@ -291,6 +295,17 @@ public class DocumentNodeStoreService {
         mkBuilder.setExecutor(executor);
         mk = mkBuilder.open();
 
+        // If a shared data store register the repo id in the data store
+        if (SharedDataStoreUtils.isShared(blobStore)) {
+            try {
+                String repoId = 
ClusterRepositoryInfo.createId(mk.getNodeStore());
+                ((SharedDataStore) blobStore).addMetadataRecord(new 
ByteArrayInputStream(new byte[0]),
+                    
SharedDataStoreUtils.SharedStoreRecordType.REPOSITORY.getNameFromId(repoId));
+            } catch (Exception e) {
+                throw new IOException("Could not register a unique 
repositoryId", e);
+            }
+        }
+
         registerJMXBeans(mk.getNodeStore());
         registerLastRevRecoveryJob(mk.getNodeStore());
 
@@ -443,8 +458,10 @@ public class DocumentNodeStoreService {
         if (store.getBlobStore() instanceof GarbageCollectableBlobStore) {
             BlobGarbageCollector gc = new BlobGarbageCollector() {
                 @Override
-                public void collectGarbage() throws Exception {
-                    
store.createBlobGarbageCollector(blobGcMaxAgeInSecs).collectGarbage();
+                public void collectGarbage(boolean sweep) throws Exception {
+                    store.createBlobGarbageCollector(blobGcMaxAgeInSecs,
+                            ClusterRepositoryInfo.getId(mk.getNodeStore()))
+                            .collectGarbage(sweep);
                 }
             };
             registrations.add(registerMBean(whiteboard, BlobGCMBean.class, new 
BlobGC(gc, executor),

Added: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/identifier/ClusterRepositoryInfo.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/identifier/ClusterRepositoryInfo.java?rev=1659457&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/identifier/ClusterRepositoryInfo.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/identifier/ClusterRepositoryInfo.java
 Fri Feb 13 05:20:44 2015
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.identifier;
+
+import java.util.UUID;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+
+/**
+ * Utility class to manage a unique cluster/repository id for the cluster.
+ */
+public class ClusterRepositoryInfo {
+    public static final String CLUSTER_CONFIG_NODE = ":clusterConfig";
+    public static final String CLUSTER_ID_PROP = ":clusterId";
+
+    /**
+     * Adds a new uuid for the repository in the property 
/:clusterConfig/:clusterId with preoperty.
+     *
+     * @param store the NodeStore instance
+     * @return the repository id
+     * @throws CommitFailedException
+     */
+    public static String createId(NodeStore store) throws 
CommitFailedException {
+        NodeBuilder root = store.getRoot().builder();
+        if (!root.hasChildNode(CLUSTER_CONFIG_NODE)) {
+            String id = UUID.randomUUID().toString();
+            root.child(CLUSTER_CONFIG_NODE).setProperty(CLUSTER_ID_PROP, id);
+            store.merge(root, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+            return id;
+        } else {
+            return 
root.getChildNode(CLUSTER_CONFIG_NODE).getProperty(CLUSTER_ID_PROP).getValue(Type.STRING);
+        }
+    }
+
+    /**
+     * Retrieves the {# CLUSTER_ID_PROP}
+     *
+     * @param store the NodeStore instance
+     * @return the repository id
+     */
+    public static String getId(NodeStore store) {
+        return 
store.getRoot().getChildNode(CLUSTER_CONFIG_NODE).getProperty(CLUSTER_ID_PROP).getValue(Type.STRING);
+    }
+}
+

Propchange: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/identifier/ClusterRepositoryInfo.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java?rev=1659457&r1=1659456&r2=1659457&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
 Fri Feb 13 05:20:44 2015
@@ -26,6 +26,7 @@ import static org.apache.jackrabbit.oak.
 import static 
org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.TIMESTAMP_DEFAULT;
 import static 
org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean;
 
+import java.io.ByteArrayInputStream;
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
@@ -50,6 +51,10 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.blob.BlobGCMBean;
 import org.apache.jackrabbit.oak.plugins.blob.BlobGarbageCollector;
 import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
+import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
+import 
org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType;
+import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo;
 import 
org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategyMBean;
 import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy;
 import 
org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.CleanupType;
@@ -262,15 +267,27 @@ public class SegmentNodeStoreService ext
         revisionGCRegistration = registerMBean(whiteboard, 
RevisionGCMBean.class, revisionGC,
                 RevisionGCMBean.TYPE, "Segment node store revision garbage 
collection");
 
+        // If a shared data store register the repo id in the data store
+        if (SharedDataStoreUtils.isShared(blobStore)) {
+            try {
+                String repoId = ClusterRepositoryInfo.createId(delegate);
+                ((SharedDataStore) blobStore).addMetadataRecord(new 
ByteArrayInputStream(new byte[0]),
+                    SharedStoreRecordType.REPOSITORY.getNameFromId(repoId));
+            } catch (Exception e) {
+                throw new IOException("Could not register a unique 
repositoryId", e);
+            }
+        }
+
         if (store.getBlobStore() instanceof GarbageCollectableBlobStore) {
             BlobGarbageCollector gc = new BlobGarbageCollector() {
                 @Override
-                public void collectGarbage() throws Exception {
+                public void collectGarbage(boolean sweep) throws Exception {
                     MarkSweepGarbageCollector gc = new 
MarkSweepGarbageCollector(
                             new 
SegmentBlobReferenceRetriever(store.getTracker()),
                             (GarbageCollectableBlobStore) store.getBlobStore(),
-                            executor);
-                    gc.collectGarbage();
+                            executor,
+                            ClusterRepositoryInfo.getId(delegate));
+                    gc.collectGarbage(sweep);
                 }
             };
 

Added: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/ClusterRepositoryInfoTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/ClusterRepositoryInfoTest.java?rev=1659457&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/ClusterRepositoryInfoTest.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/ClusterRepositoryInfoTest.java
 Fri Feb 13 05:20:44 2015
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import junit.framework.Assert;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.plugins.document.DocumentMK;
+import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreUtils;
+import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
+import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Tests the ClusterRepositoryInfo unique cluster repository id.
+ */
+public class ClusterRepositoryInfoTest {
+    static BlobStore blobStore;
+
+    @BeforeClass
+    public static void setup() {
+        try {
+            blobStore = DataStoreUtils.getBlobStore();
+            Assume.assumeThat(blobStore, instanceOf(SharedDataStore.class));
+        } catch (Exception e) {
+            Assume.assumeNoException(e);
+        }
+    }
+
+    @Test
+    public void differentCluster() throws Exception {
+        DocumentNodeStore ds1 = new DocumentMK.Builder()
+                .setAsyncDelay(0)
+                .setDocumentStore(new MemoryDocumentStore())
+                .setBlobStore(blobStore)
+                .getNodeStore();
+        String repoId1 = ClusterRepositoryInfo.createId(ds1);
+
+        DocumentNodeStore ds2 = new DocumentMK.Builder()
+                .setAsyncDelay(0)
+                .setDocumentStore(new MemoryDocumentStore())
+                .setBlobStore(blobStore)
+                .getNodeStore();
+        String repoId2 = ClusterRepositoryInfo.createId(ds2);
+
+        Assert.assertNotSame(repoId1, repoId2);
+    }
+
+    @Test
+    public void sameCluster() throws Exception {
+        MemoryDocumentStore store = new MemoryDocumentStore();
+        DocumentNodeStore ds1 = new DocumentMK.Builder()
+                .setAsyncDelay(0)
+                .setDocumentStore(store)
+                .setClusterId(1)
+                .setBlobStore(blobStore)
+                .getNodeStore();
+        String repoId1 = ClusterRepositoryInfo.createId(ds1);
+        ds1.runBackgroundOperations();
+
+        DocumentNodeStore ds2 = new DocumentMK.Builder()
+                .setAsyncDelay(0)
+                .setDocumentStore(store)
+                .setClusterId(2)
+                .setBlobStore(blobStore)
+                .getNodeStore();
+        String repoId2 = ClusterRepositoryInfo.createId(ds2);
+
+        // Since the same cluster the ids should be equal
+        Assert.assertEquals(repoId1, repoId2);
+    }
+
+    @After
+    public void close() throws IOException {
+        FileUtils.cleanDirectory(new File(DataStoreUtils.getHomeDir()));
+    }
+}
+

Propchange: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/ClusterRepositoryInfoTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreUtilsTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreUtilsTest.java?rev=1659457&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreUtilsTest.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreUtilsTest.java
 Fri Feb 13 05:20:44 2015
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+
+import junit.framework.Assert;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
+import 
org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreUtils;
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test for SharedDataUtils to test addition, retrieval and deletion of root 
records.
+ */
+public class SharedDataStoreUtilsTest {
+    SharedDataStore dataStore;
+
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+        try {
+            Assume.assumeThat(DataStoreUtils.getBlobStore(), 
instanceOf(SharedDataStore.class));
+        } catch (Exception e) {
+            Assume.assumeNoException(e);
+        }
+    }
+
+    @Test
+    public void test() throws Exception {
+        dataStore = DataStoreUtils.getBlobStore();
+        String repoId1 = UUID.randomUUID().toString();
+        String repoId2 = UUID.randomUUID().toString();
+
+        // Add repository records
+        dataStore.addMetadataRecord(new ByteArrayInputStream(new byte[0]),
+            SharedStoreRecordType.REPOSITORY.getNameFromId(repoId1));
+        DataRecord repo1 = 
dataStore.getMetadataRecord(SharedStoreRecordType.REPOSITORY.getNameFromId(repoId1));
+        dataStore.addMetadataRecord(new ByteArrayInputStream(new byte[0]),
+            SharedStoreRecordType.REPOSITORY.getNameFromId(repoId2));
+        DataRecord repo2 = 
dataStore.getMetadataRecord(SharedStoreRecordType.REPOSITORY.getNameFromId(repoId2));
+
+            // Add reference records
+        dataStore.addMetadataRecord(new ByteArrayInputStream(new byte[0]),
+            SharedStoreRecordType.REFERENCES.getNameFromId(repoId1));
+        DataRecord rec1 = 
dataStore.getMetadataRecord(SharedStoreRecordType.REFERENCES.getNameFromId(repoId1));
+        dataStore.addMetadataRecord(new ByteArrayInputStream(new byte[0]),
+            SharedStoreRecordType.REFERENCES.getNameFromId(repoId2));
+        DataRecord rec2 = 
dataStore.getMetadataRecord(SharedStoreRecordType.REFERENCES.getNameFromId(repoId2));
+
+        Assert.assertEquals(
+                
SharedStoreRecordType.REPOSITORY.getIdFromName(repo1.getIdentifier().toString()),
+                repoId1);
+        Assert.assertEquals(
+                
SharedStoreRecordType.REPOSITORY.getIdFromName(repo2.getIdentifier().toString()),
+                repoId2);
+        Assert.assertEquals(
+                
SharedStoreRecordType.REFERENCES.getIdFromName(rec1.getIdentifier().toString()),
+                repoId1);
+        Assert.assertEquals(
+                
SharedStoreRecordType.REFERENCES.getIdFromName(rec2.getIdentifier().toString()),
+                repoId2);
+
+        // All the references from registered repositories are available
+        Assert.assertTrue(
+                SharedDataStoreUtils.refsNotAvailableFromRepos(
+                    
dataStore.getAllMetadataRecords(SharedStoreRecordType.REPOSITORY.getType()),
+                    
dataStore.getAllMetadataRecords(SharedStoreRecordType.REFERENCES.getType())).isEmpty());
+
+        // Earliest should be the 1st reference record
+        Assert.assertEquals(
+            SharedDataStoreUtils.getEarliestRecord(
+                
dataStore.getAllMetadataRecords(SharedStoreRecordType.REFERENCES.getType())).getIdentifier().toString(),
+            SharedStoreRecordType.REFERENCES.getNameFromId(repoId1));
+
+        // Delete references and check back if deleted
+        
dataStore.deleteAllMetadataRecords(SharedStoreRecordType.REFERENCES.getType());
+        
Assert.assertTrue(dataStore.getAllMetadataRecords(SharedStoreRecordType.REFERENCES.getType()).isEmpty());
+
+        // Repository ids should still be available
+        Assert.assertEquals(2,
+            
dataStore.getAllMetadataRecords(SharedStoreRecordType.REPOSITORY.getType()).size());
+    }
+
+    @After
+    public void close() throws IOException {
+        FileUtils.cleanDirectory(new File(DataStoreUtils.getHomeDir()));
+    }
+}
+

Propchange: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreUtilsTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreUtils.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreUtils.java?rev=1659457&r1=1659456&r2=1659457&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreUtils.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreUtils.java
 Fri Feb 13 05:20:44 2015
@@ -24,6 +24,7 @@ import org.apache.jackrabbit.core.data.D
 import org.apache.jackrabbit.core.data.FileDataStore;
 import org.apache.jackrabbit.oak.commons.PropertiesUtil;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.OakFileDataStore;
 import org.apache.jackrabbit.oak.plugins.document.AbstractMongoConnectionTest;
 import org.junit.Test;
 
@@ -51,7 +52,7 @@ public class DataStoreUtils {
     private static long time = -1;
 
     public static DataStoreBlobStore getBlobStore() throws Exception {
-        String className = System.getProperty(DS_CLASS_NAME, 
FileDataStore.class.getName());
+        String className = System.getProperty(DS_CLASS_NAME, 
OakFileDataStore.class.getName());
         DataStore ds = 
Class.forName(className).asSubclass(DataStore.class).newInstance();
         PropertiesUtil.populate(ds, getConfig(), false);
         ds.init(getHomeDir());

Modified: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java?rev=1659457&r1=1659456&r2=1659457&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
 Fri Feb 13 05:20:44 2015
@@ -16,6 +16,9 @@
  */
 package org.apache.jackrabbit.oak.plugins.document;
 
+import static 
org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType.REPOSITORY;
+import static org.junit.Assert.assertTrue;
+
 import java.io.ByteArrayInputStream;
 import java.io.InputStream;
 import java.util.HashSet;
@@ -33,7 +36,10 @@ import com.mongodb.DBCollection;
 import junit.framework.Assert;
 import org.apache.jackrabbit.oak.api.Blob;
 import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
+import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
 import 
org.apache.jackrabbit.oak.plugins.document.VersionGarbageCollector.VersionGCStats;
+import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo;
 import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
@@ -147,15 +153,22 @@ public class MongoBlobGCTest extends Abs
     }
     private void gc(HashSet<String> remaining) throws Exception {
         DocumentNodeStore store = mk.getNodeStore();
+        String repoId = null;
+        if (SharedDataStoreUtils.isShared(store.getBlobStore())) {
+            repoId = ClusterRepositoryInfo.createId(store);
+            ((SharedDataStore) store.getBlobStore()).addMetadataRecord(
+                new ByteArrayInputStream(new byte[0]),
+                REPOSITORY.getNameFromId(repoId));
+        }
         MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector(
                 new DocumentBlobReferenceRetriever(store),
                 (GarbageCollectableBlobStore) store.getBlobStore(),
                 MoreExecutors.sameThreadExecutor(),
-                "./target", 5, true, 0);
-        gc.collectGarbage();
+                "./target", 5, 0, repoId);
+        gc.collectGarbage(false);
 
         Set<String> existingAfterGC = iterate();
-        boolean empty = Sets.symmetricDifference(remaining, 
existingAfterGC).isEmpty();
+    boolean empty = Sets.symmetricDifference(remaining, 
existingAfterGC).isEmpty();
         assertTrue(empty);
     }
 

Added: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/SharedBlobStoreGCTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/SharedBlobStoreGCTest.java?rev=1659457&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/SharedBlobStoreGCTest.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/SharedBlobStoreGCTest.java
 Fri Feb 13 05:20:44 2015
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.MoreExecutors;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.plugins.blob.BlobGarbageCollector;
+import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
+import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
+import 
org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType;
+import 
org.apache.jackrabbit.oak.plugins.document.VersionGarbageCollector.VersionGCStats;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreUtils;
+import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
+import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test for gc in a shared data store among hetrogeneous oak node stores.
+ */
+public class SharedBlobStoreGCTest {
+    private Cluster cluster1;
+    private Cluster cluster2;
+    private Clock clock;
+
+    @Before
+    public void setUp() throws Exception {
+        clock = new Clock.Virtual();
+        clock.waitUntil(Revision.getCurrentTimestamp());
+
+        BlobStore blobeStore1 = DataStoreUtils.getBlobStore();
+        DocumentNodeStore ds1 = new DocumentMK.Builder()
+                .setAsyncDelay(0)
+                .setDocumentStore(new MemoryDocumentStore())
+                .setBlobStore(blobeStore1)
+                .clock(clock)
+                .getNodeStore();
+        String repoId1 = ClusterRepositoryInfo.createId(ds1);
+        // Register the unique repository id in the data store
+        ((SharedDataStore) blobeStore1).addMetadataRecord(new 
ByteArrayInputStream(new byte[0]),
+            SharedStoreRecordType.REPOSITORY.getNameFromId(repoId1));
+
+        BlobStore blobeStore2 = DataStoreUtils.getBlobStore();
+        DocumentNodeStore ds2 = new DocumentMK.Builder()
+                .setAsyncDelay(0)
+                .setDocumentStore(new MemoryDocumentStore())
+                .setBlobStore(blobeStore2)
+                .clock(clock)
+                .getNodeStore();
+        String repoId2 = ClusterRepositoryInfo.createId(ds2);
+        // Register the unique repository id in the data store
+        ((SharedDataStore) blobeStore2).addMetadataRecord(new 
ByteArrayInputStream(new byte[0]),
+            SharedStoreRecordType.REPOSITORY.getNameFromId(repoId2));
+
+        cluster1 = new Cluster(ds1, repoId1, 20);
+        cluster1.init();
+        cluster2 = new Cluster(ds2, repoId2, 100);
+        cluster2.init();
+    }
+
+    static InputStream randomStream(int seed, int size) {
+        Random r = new Random(seed);
+        byte[] data = new byte[size];
+        r.nextBytes(data);
+        return new ByteArrayInputStream(data);
+    }
+
+    @Test
+    public void testGC() throws Exception {
+        // Only run the mark phase on both the clusters
+        cluster1.gc.collectGarbage(true);
+        cluster2.gc.collectGarbage(true);
+
+        // Execute the gc with sweep
+        cluster1.gc.collectGarbage(false);
+
+        Assert.assertEquals(true, 
Sets.symmetricDifference(Sets.union(cluster1.getInitBlobs(), 
cluster2.getInitBlobs()),
+            cluster1.getExistingBlobIds()).isEmpty());
+    }
+
+    @Test
+    // GC should fail
+    public void testOnly1ClusterMark() throws Exception {
+        // Only run the mark phase on one cluster
+        cluster1.gc.collectGarbage(true);
+
+        // Execute the gc with sweep
+        cluster1.gc.collectGarbage(false);
+
+        Assert.assertTrue(
+            (cluster1.getInitBlobs().size() + cluster2.getInitBlobs().size()) 
< cluster1.getExistingBlobIds().size());
+        Set<String> existing = cluster1.getExistingBlobIds();
+        Assert.assertTrue(existing.containsAll(cluster2.getInitBlobs()));
+        Assert.assertTrue(existing.containsAll(cluster1.getInitBlobs()));
+    }
+
+    @Test
+    public void testRepeatedMarkWithSweep() throws Exception {
+        // Only run the mark phase on one cluster
+        cluster1.gc.collectGarbage(true);
+        cluster2.gc.collectGarbage(true);
+        cluster2.gc.collectGarbage(true);
+
+        // Execute the gc with sweep
+        cluster2.gc.collectGarbage(false);
+
+        Assert.assertTrue(Sets.symmetricDifference(
+            Sets.union(cluster1.getInitBlobs(), cluster2.getInitBlobs()),
+            cluster1.getExistingBlobIds()).isEmpty());
+    }
+
+    @After
+    public void tearDown() throws IOException {
+        FileUtils.cleanDirectory(new File(DataStoreUtils.getHomeDir()));
+    }
+
+    class Cluster {
+        private DocumentNodeStore ds;
+        private int seed;
+        private BlobGarbageCollector gc;
+
+        private Set<String> initBlobs = new HashSet<String>();
+
+        protected Set<String> getInitBlobs() {
+            return initBlobs;
+        }
+
+        public Cluster(final DocumentNodeStore ds, final String repoId, int 
seed)
+                throws IOException {
+            this.ds = ds;
+            this.gc = new BlobGarbageCollector() {
+                @Override
+                public void collectGarbage(boolean markOnly) throws Exception {
+                    MarkSweepGarbageCollector gc = new 
MarkSweepGarbageCollector(
+                            new DocumentBlobReferenceRetriever(ds),
+                            (GarbageCollectableBlobStore) ds.getBlobStore(),
+                            MoreExecutors.sameThreadExecutor(),
+                            "./target", 5, 0, repoId);
+                    gc.collectGarbage(markOnly);
+                }
+            };
+
+            this.seed = seed;
+        }
+
+        /**
+         * Creates the setup load with deletions.
+         * 
+         * @throws Exception
+         */
+        public void init() throws Exception {
+            NodeBuilder a = ds.getRoot().builder();
+
+            int number = 10;
+            // track the number of the assets to be deleted
+            List<Integer> deletes = Lists.newArrayList();
+            Random rand = new Random(47);
+            for (int i = 0; i < 5; i++) {
+                int n = rand.nextInt(number);
+                if (!deletes.contains(n)) {
+                    deletes.add(n);
+                }
+            }
+            for (int i = 0; i < number; i++) {
+                Blob b = ds.createBlob(randomStream(i + seed, 4160));
+                if (!deletes.contains(i)) {
+                    Iterator<String> idIter =
+                            ((GarbageCollectableBlobStore) ds.getBlobStore())
+                                    .resolveChunks(b.toString());
+                    while (idIter.hasNext()) {
+                        initBlobs.add(idIter.next());
+                    }
+                }
+                a.child("c" + i).setProperty("x", b);
+            }
+            ds.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+            a = ds.getRoot().builder();
+            for (int id : deletes) {
+                a.child("c" + id).remove();
+                ds.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+            }
+            long maxAge = 10; // hours
+            // 1. Go past GC age and check no GC done as nothing deleted
+            clock.waitUntil(clock.getTime() + 
TimeUnit.MINUTES.toMillis(maxAge));
+
+            VersionGarbageCollector vGC = ds.getVersionGarbageCollector();
+            VersionGCStats stats = vGC.gc(0, TimeUnit.MILLISECONDS);
+            Assert.assertEquals(deletes.size(), stats.deletedDocGCCount);
+        }
+
+        public Set<String> getExistingBlobIds() throws Exception {
+            GarbageCollectableBlobStore store = (GarbageCollectableBlobStore) 
ds.getBlobStore();
+            Iterator<String> cur = store.getAllChunkIds(0);
+
+            Set<String> existing = Sets.newHashSet();
+            while (cur.hasNext()) {
+                existing.add(cur.next());
+            }
+            return existing;
+        }
+    }
+}
+

Propchange: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/SharedBlobStoreGCTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java?rev=1659457&r1=1659456&r2=1659457&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java
 Fri Feb 13 05:20:44 2015
@@ -18,6 +18,7 @@
  */
 package org.apache.jackrabbit.oak.plugins.segment;
 
+import static 
org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType.REPOSITORY;
 import static org.junit.Assert.assertTrue;
 
 import java.io.ByteArrayInputStream;
@@ -38,8 +39,11 @@ import com.google.common.util.concurrent
 import org.apache.commons.io.FileUtils;
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
+import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreUtils;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
+import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo;
 import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy;
 import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
@@ -48,7 +52,6 @@ import org.apache.jackrabbit.oak.spi.com
 import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.junit.After;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import javax.annotation.Nonnull;
@@ -131,16 +134,22 @@ public class SegmentDataStoreBlobGCTest
     }
 
     @Test
-    @Ignore("OAK-2493")
     public void gc() throws Exception {
         HashSet<String> remaining = setUp();
+        String repoId = null;
+        if (SharedDataStoreUtils.isShared(store.getBlobStore())) {
+            repoId = ClusterRepositoryInfo.createId(nodeStore);
+            ((SharedDataStore) store.getBlobStore()).addMetadataRecord(
+                new ByteArrayInputStream(new byte[0]),
+                REPOSITORY.getNameFromId(repoId));
+        }
 
         MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector(
                 new SegmentBlobReferenceRetriever(store.getTracker()),
                     (GarbageCollectableBlobStore) store.getBlobStore(),
                     MoreExecutors.sameThreadExecutor(),
-                    "./target", 2048, true,  0);
-        gc.collectGarbage();
+                    "./target", 2048, 0, repoId);
+        gc.collectGarbage(false);
 
         Set<String> existingAfterGC = iterate();
         assertTrue(Sets.symmetricDifference(remaining, 
existingAfterGC).isEmpty());


Reply via email to