Author: amitj
Date: Wed Jul 20 03:56:20 2016
New Revision: 1753429

URL: http://svn.apache.org/viewvc?rev=1753429&view=rev
Log:
   OAK-4200:  [BlobGC] Improve collection times of blobs available

 Initial implementation
    * The blob ids are tracked locally on all instances.
    * There is a snapshot thread which uploads all the locally tracked files to 
the datastore.
    * When running GC these files are retrieved from all instances sharing the 
datastore and then used to return  the list blob ids.

Added:
    
jackrabbit/oak/trunk/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/FileLineDifferenceIteratorTest.java
   (contents, props changed)
      - copied, changed from r1753393, 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/FileLineDifferenceIteratorTest.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobTrackingStore.java
   (with props)
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTracker.java
   (with props)
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobTracker.java
   (with props)
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTrackerClusterSharedTest.java
   (with props)
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTrackerStoreTest.java
   (with props)
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTrackerTest.java
   (with props)
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreTrackerGCTest.java
   (with props)
Removed:
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/FileLineDifferenceIteratorTest.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileStateTest.java
Modified:
    
jackrabbit/oak/trunk/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/FileIOUtils.java
    
jackrabbit/oak/trunk/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/FileIOUtilsTest.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
    
jackrabbit/oak/trunk/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java

Modified: 
jackrabbit/oak/trunk/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/FileIOUtils.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/FileIOUtils.java?rev=1753429&r1=1753428&r2=1753429&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/FileIOUtils.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/FileIOUtils.java
 Wed Jul 20 03:56:20 2016
@@ -20,9 +20,11 @@ import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.Closeable;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.io.OutputStream;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
@@ -37,13 +39,22 @@ import com.google.common.collect.Iterato
 import com.google.common.collect.PeekingIterator;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.LineIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import static com.google.common.base.Charsets.UTF_8;
 import static com.google.common.collect.Sets.newHashSet;
 import static com.google.common.io.Closeables.close;
+import static com.google.common.io.FileWriteMode.APPEND;
+import static com.google.common.io.Files.asByteSink;
 import static com.google.common.io.Files.move;
 import static com.google.common.io.Files.newWriter;
 import static java.io.File.createTempFile;
-import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.commons.io.FileUtils.copyInputStreamToFile;
+import static org.apache.commons.io.FileUtils.forceDelete;
+import static org.apache.commons.io.IOUtils.closeQuietly;
+import static org.apache.commons.io.IOUtils.copyLarge;
+import static org.apache.commons.io.LineIterator.closeQuietly;
 import static 
org.apache.jackrabbit.oak.commons.sort.EscapeUtils.escapeLineBreak;
 import static 
org.apache.jackrabbit.oak.commons.sort.EscapeUtils.unescapeLineBreaks;
 import static 
org.apache.jackrabbit.oak.commons.sort.ExternalSort.mergeSortedFiles;
@@ -69,7 +80,7 @@ public final class FileIOUtils {
      * @param file file whose contents needs to be sorted
      */
     public static void sort(File file) throws IOException {
-        File sorted = createTempFile("temp", null);
+        File sorted = createTempFile("fleioutilssort", null);
         merge(sortInBatch(file, lexComparator, true), sorted);
         move(sorted, file);
     }
@@ -82,7 +93,7 @@ public final class FileIOUtils {
      * @throws IOException
      */
     public static void sort(File file, Comparator<String> comparator) throws 
IOException {
-        File sorted = createTempFile("temp", null);
+        File sorted = createTempFile("fleioutilssort", null);
         merge(sortInBatch(file, comparator, true), sorted);
         move(sorted, file);
     }
@@ -101,6 +112,57 @@ public final class FileIOUtils {
     }
 
     /**
+
+     * Copies an input stream to a file.
+     *
+     * @param stream steam to copy
+     * @return
+     * @throws IOException
+     */
+    public static File copy(InputStream stream) throws IOException {
+        File file = createTempFile("fleioutilscopy", null);
+        copyInputStreamToFile(stream, file);
+        return file;
+    }
+
+    /**
+     * Appends the contents of the list of files to the given file and deletes 
the files
+     * if the delete flag is enabled.
+     *
+     * If there is a scope for lines in the files containing line break 
characters it should be
+     * ensured that the files are written with {@link 
#writeAsLine(BufferedWriter, String, boolean)}
+     * with true to escape line break characters.
+     * @param files
+     * @param appendTo
+     * @throws IOException
+     */
+    public static void append(List<File> files, File appendTo, boolean delete) 
throws IOException {
+        OutputStream appendStream = null;
+        boolean threw = true;
+
+        try {
+            appendStream = asByteSink(appendTo, APPEND).openBufferedStream();
+
+            for (File f : files) {
+                InputStream iStream = new FileInputStream(f);
+                try {
+                    copyLarge(iStream, appendStream);
+                } finally {
+                    closeQuietly(iStream);
+                }
+            }
+            if (delete) {
+                for (File f : files) {
+                    f.delete();
+                }
+            }
+            threw = false;
+        } finally {
+            close(appendStream, threw);
+        }
+    }
+
+    /**
      * Writes a string as a new line into the given buffered writer and 
optionally
      * escapes the line for line breaks.
      *
@@ -215,8 +277,8 @@ public final class FileIOUtils {
     /**
      * FileLineDifferenceIterator class which iterates over the difference of 
2 files line by line.
      *
-     * If there is a scope for lines in files containing line break characters 
it should be
-     * ensured that both the file are written with
+     * If there is a scope for lines in the files containing line break 
characters it should be
+     * ensured that both the files are written with
      * {@link #writeAsLine(BufferedWriter, String, boolean)} with true to 
escape line break
      * characters.
      */
@@ -301,4 +363,70 @@ public final class FileIOUtils {
             return diff;
         }
     }
+
+    /**
+     * Implements a {@link java.io.Closeable} wrapper over a {@link 
LineIterator}.
+     * Also has a transformer to transform the output. If the underlying file 
is
+     * provide then it deletes the file on {@link #close()}.
+     *
+     * @param <T> the type of elements in the iterator
+     */
+    public static class CloseableFileIterator<T> extends AbstractIterator<T> 
implements Closeable {
+        private final Logger log = LoggerFactory.getLogger(getClass());
+
+        private final LineIterator iterator;
+        private final Function<String, T> transformer;
+        private File backingFile;
+
+        public CloseableFileIterator(LineIterator iterator, Function<String, 
T> transformer) {
+            this.iterator = iterator;
+            this.transformer = transformer;
+        }
+
+        public CloseableFileIterator(LineIterator iterator, File backingFile,
+            Function<String, T> transformer) {
+            this.iterator = iterator;
+            this.transformer = transformer;
+            this.backingFile = backingFile;
+        }
+
+        @Override
+        protected T computeNext() {
+            if (iterator.hasNext()) {
+                return transformer.apply(iterator.next());
+            }
+
+            try {
+                close();
+            } catch (IOException e) {
+                log.warn("Error closing iterator", e);
+            }
+            return endOfData();
+        }
+
+        @Override
+        public void close() throws IOException {
+            closeQuietly(iterator);
+            if (backingFile != null && backingFile.exists()) {
+                forceDelete(backingFile);
+            }
+        }
+
+        public static CloseableFileIterator<String> wrap(LineIterator iter) {
+            return new CloseableFileIterator<String>(iter, new 
Function<String, String>() {
+                public String apply(String s) {
+                    return s;
+                }
+            });
+        }
+
+        public static CloseableFileIterator<String> wrap(LineIterator iter, 
File backingFile) {
+            return new CloseableFileIterator<String>(iter, backingFile,
+                new Function<String, String>() {
+                    public String apply(String s) {
+                        return s;
+                    }
+                });
+        }
+    }
 }

Modified: 
jackrabbit/oak/trunk/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/FileIOUtilsTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/FileIOUtilsTest.java?rev=1753429&r1=1753428&r2=1753429&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/FileIOUtilsTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/FileIOUtilsTest.java
 Wed Jul 20 03:56:20 2016
@@ -19,10 +19,11 @@
 package org.apache.jackrabbit.oak.commons;
 
 import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileReader;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.nio.CharBuffer;
 import java.nio.charset.CharacterCodingException;
@@ -36,21 +37,27 @@ import java.util.Random;
 import java.util.Set;
 
 import com.google.common.base.Charsets;
+import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import static com.google.common.collect.Lists.newArrayList;
 import static com.google.common.collect.Sets.newHashSet;
+import static com.google.common.collect.Sets.union;
+import static org.apache.jackrabbit.oak.commons.FileIOUtils.append;
+import static org.apache.jackrabbit.oak.commons.FileIOUtils.copy;
 import static org.apache.jackrabbit.oak.commons.FileIOUtils.lexComparator;
 import static 
org.apache.jackrabbit.oak.commons.FileIOUtils.lineBreakAwareComparator;
 import static org.apache.jackrabbit.oak.commons.FileIOUtils.readStringsAsSet;
 import static org.apache.jackrabbit.oak.commons.FileIOUtils.sort;
 import static org.apache.jackrabbit.oak.commons.FileIOUtils.writeStrings;
+import static org.apache.jackrabbit.oak.commons.IOUtils.closeQuietly;
 import static 
org.apache.jackrabbit.oak.commons.sort.EscapeUtils.escapeLineBreak;
 import static 
org.apache.jackrabbit.oak.commons.sort.EscapeUtils.unescapeLineBreaks;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 
 /**
@@ -130,7 +137,7 @@ public class FileIOUtilsTest {
         while ((line = reader.readLine()) != null) {
             retrieved.add(line);
         }
-        IOUtils.closeQuietly(reader);
+        closeQuietly(reader);
         Collections.sort(list);
         assertArrayEquals(Arrays.toString(list.toArray()), list.toArray(), 
retrieved.toArray());
     }
@@ -149,11 +156,100 @@ public class FileIOUtilsTest {
         while ((line = reader.readLine()) != null) {
             retrieved.add(unescapeLineBreaks(line));
         }
-        IOUtils.closeQuietly(reader);
+        closeQuietly(reader);
         Collections.sort(list);
         assertArrayEquals(Arrays.toString(list.toArray()), list.toArray(), 
retrieved.toArray());
     }
 
+    @Test
+    public void testCopy() throws IOException{
+        File f = copy(randomStream(0, 256));
+        assertTrue("File does not exist", f.exists());
+        Assert.assertEquals("File length not equal to byte array from which 
copied",
+            256, f.length());
+        assertTrue("Could not delete file", f.delete());
+    }
+
+    @Test
+    public void appendTest() throws IOException {
+        Set<String> added1 = newHashSet("a", "z", "e", "b");
+        File f1 = folder.newFile();
+        writeStrings(added1.iterator(), f1, false);
+
+        Set<String> added2 = newHashSet("2", "3", "5", "6");
+        File f2 = folder.newFile();
+        writeStrings(added2.iterator(), f2, false);
+
+        Set<String> added3 = newHashSet("t", "y", "8", "9");
+        File f3 = folder.newFile();
+        writeStrings(added3.iterator(), f3, false);
+
+        append(newArrayList(f2, f3), f1, true);
+        assertEquals(union(union(added1, added2), added3),
+            readStringsAsSet(new FileInputStream(f1), false));
+        assertTrue(!f2.exists());
+        assertTrue(!f3.exists());
+        assertTrue(f1.exists());
+    }
+
+    @Test
+    public void appendTestNoDelete() throws IOException {
+        Set<String> added1 = newHashSet("a", "z", "e", "b");
+        File f1 = folder.newFile();
+        writeStrings(added1.iterator(), f1, false);
+
+        Set<String> added2 = newHashSet("2", "3", "5", "6");
+        File f2 = folder.newFile();
+        writeStrings(added2.iterator(), f2, false);
+
+        Set<String> added3 = newHashSet("t", "y", "8", "9");
+        File f3 = folder.newFile();
+        writeStrings(added3.iterator(), f3, false);
+
+        append(newArrayList(f2, f3), f1, false);
+        assertEquals(union(union(added1, added2), added3),
+            readStringsAsSet(new FileInputStream(f1), false));
+        assertTrue(f2.exists());
+        assertTrue(f3.exists());
+        assertTrue(f1.exists());
+    }
+
+    @Test
+    public void appendRandomizedTest() throws Exception {
+        Set<String> added1 = newHashSet();
+        File f1 = folder.newFile();
+
+        for (int i = 0; i < 100; i++) {
+            added1.add(getRandomTestString());
+        }
+        int count = writeStrings(added1.iterator(), f1, true);
+        assertEquals(added1.size(), count);
+
+        Set<String> added2 = newHashSet("2", "3", "5", "6");
+        File f2 = folder.newFile();
+        writeStrings(added2.iterator(), f2, true);
+
+        append(newArrayList(f2), f1, true);
+        assertEquals(union(added1, added2),
+            readStringsAsSet(new FileInputStream(f1), true));
+    }
+
+    @Test
+    public void appendWithLineBreaksTest() throws IOException {
+        Set<String> added1 = newHashSet(getLineBreakStrings());
+        File f1 = folder.newFile();
+        int count = writeStrings(added1.iterator(), f1, true);
+        assertEquals(added1.size(), count);
+
+        Set<String> added2 = newHashSet("2", "3", "5", "6");
+        File f2 = folder.newFile();
+        writeStrings(added2.iterator(), f2, true);
+
+        append(newArrayList(f1), f2, true);
+        assertEquals(union(added1, added2), readStringsAsSet(new 
FileInputStream(f2), true));
+    }
+
+
     private static List<String> getLineBreakStrings() {
         return newArrayList("ab\nc\r", "ab\\z", "a\\\\z\nc",
             "/a", "/a/b\nc", "/a/b\rd", "/a/b\r\ne", "/a/c");
@@ -194,4 +290,11 @@ public class FileIOUtilsTest {
         }
         return buffer.toString();
     }
+
+    static InputStream randomStream(int seed, int size) {
+        Random r = new Random(seed);
+        byte[] data = new byte[size];
+        r.nextBytes(data);
+        return new ByteArrayInputStream(data);
+    }
 }

Copied: 
jackrabbit/oak/trunk/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/FileLineDifferenceIteratorTest.java
 (from r1753393, 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/FileLineDifferenceIteratorTest.java)
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/FileLineDifferenceIteratorTest.java?p2=jackrabbit/oak/trunk/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/FileLineDifferenceIteratorTest.java&p1=jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/FileLineDifferenceIteratorTest.java&r1=1753393&r2=1753429&rev=1753429&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/FileLineDifferenceIteratorTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/FileLineDifferenceIteratorTest.java
 Wed Jul 20 03:56:20 2016
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.jackrabbit.oak.plugins.blob;
+package org.apache.jackrabbit.oak.commons;
 
 import java.io.IOException;
 import java.io.StringReader;

Propchange: 
jackrabbit/oak/trunk/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/FileLineDifferenceIteratorTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobTrackingStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobTrackingStore.java?rev=1753429&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobTrackingStore.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobTrackingStore.java
 Wed Jul 20 03:56:20 2016
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobTracker;
+
+/**
+ * Interface to be implemented by a data store which can support local blob id 
tracking.
+ */
+public interface BlobTrackingStore extends SharedDataStore {
+    /**
+     * Registers a tracker in the data store.
+     * @param tracker
+     */
+    void addTracker(BlobTracker tracker);
+
+    /**
+     * Gets the traker registered in the data store.
+     *
+     * @return tracker
+     */
+    BlobTracker getTracker();
+}
+

Propchange: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobTrackingStore.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java?rev=1753429&r1=1753428&r2=1753429&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java
 Wed Jul 20 03:56:20 2016
@@ -18,18 +18,9 @@ package org.apache.jackrabbit.oak.plugin
 
 import java.io.Closeable;
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Comparator;
-import java.util.List;
-
-import com.google.common.io.Files;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.jackrabbit.oak.commons.IOUtils;
-import org.apache.jackrabbit.oak.commons.sort.ExternalSort;
 
 /**
  * Class for keeping the file system state of the garbage collection.
@@ -52,13 +43,6 @@ public class GarbageCollectorFileState i
 
     /** The garbage stores the garbage collection candidates which were not 
deleted . */
     private final File garbage;
-    private final static Comparator<String> lexComparator = 
-            new Comparator<String>() {
-                @Override
-                public int compare(String s1, String s2) {
-                    return s1.compareTo(s2);
-                }
-            };
 
     /**
      * Instantiates a new garbage collector file state.
@@ -124,51 +108,4 @@ public class GarbageCollectorFileState i
             FileUtils.deleteDirectory(home);
         }
     }
-
-    /**
-     * Sorts the given file externally.
-     * 
-     * @param file file whose contents needs to be sorted
-     */
-    public static void sort(File file) throws IOException {
-        File sorted = createTempFile();
-        merge(ExternalSort.sortInBatch(file, lexComparator, true), sorted);
-        Files.move(sorted, file);
-    }
-
-    /**
-     * Sorts the given file externally with the given comparator.
-     *
-     * @param file file whose contents needs to be sorted
-     * @param comparator to compare
-     * @throws IOException
-     */
-    public static void sort(File file, Comparator<String> comparator) throws 
IOException {
-        File sorted = createTempFile();
-        merge(ExternalSort.sortInBatch(file, comparator, true), sorted);
-        Files.move(sorted, file);
-    }
-    
-    
-    public static void merge(List<File> files, File output) throws IOException 
{
-        ExternalSort.mergeSortedFiles(
-                files,
-                output, lexComparator, true);        
-    }
-    
-    public static File copy(InputStream stream) throws IOException {
-        File file = createTempFile();
-        OutputStream out = null;
-        try {
-            out = new FileOutputStream(file);
-            IOUtils.copy(stream, out);
-        } finally {
-            IOUtils.closeQuietly(out);
-        }
-        return file;
-    }
-
-    private static File createTempFile() throws IOException {
-        return File.createTempFile("temp", null);
-    }
 }

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java?rev=1753429&r1=1753428&r2=1753429&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
 Wed Jul 20 03:56:20 2016
@@ -18,6 +18,7 @@ package org.apache.jackrabbit.oak.plugin
 
 import java.io.BufferedWriter;
 import java.io.ByteArrayInputStream;
+import java.io.Closeable;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileWriter;
@@ -57,6 +58,7 @@ import org.apache.jackrabbit.core.data.D
 import org.apache.jackrabbit.oak.commons.FileIOUtils;
 import 
org.apache.jackrabbit.oak.commons.FileIOUtils.FileLineDifferenceIterator;
 import org.apache.jackrabbit.oak.commons.IOUtils;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobTracker;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
 import 
org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType;
 import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
@@ -64,6 +66,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static com.google.common.collect.Lists.newArrayList;
+import static org.apache.commons.io.FileUtils.copyFile;
+import static org.apache.jackrabbit.oak.commons.FileIOUtils.copy;
+import static org.apache.jackrabbit.oak.commons.FileIOUtils.merge;
+import static org.apache.jackrabbit.oak.commons.FileIOUtils.sort;
+import static org.apache.jackrabbit.oak.commons.IOUtils.closeQuietly;
 
 /**
  * Mark and sweep garbage collector.
@@ -397,42 +404,41 @@ public class MarkSweepGarbageCollector i
         LOG.debug("Sweeping blobs with modified time > than the configured max 
deleted time ({}). ",
                 timestampToString(lastMaxModifiedTime));
 
-        ConcurrentLinkedQueue<String> exceptionQueue = new 
ConcurrentLinkedQueue<String>();
-
-        LineIterator iterator =
-                FileUtils.lineIterator(fs.getGcCandidates(), 
Charsets.UTF_8.name());
-        List<String> ids = newArrayList();
-
-        while (iterator.hasNext()) {
-            ids.add(iterator.next());
-
-            if (ids.size() >= getBatchCount()) {
-                count += ids.size();
-                deleted += sweepInternal(ids, exceptionQueue, 
lastMaxModifiedTime);
-                ids = newArrayList();
-            }
-        }
-        if (!ids.isEmpty()) {
-            count += ids.size();
-            deleted += sweepInternal(ids, exceptionQueue, lastMaxModifiedTime);
-        }
-
-        BufferedWriter writer = null;
+        BufferedWriter removesWriter = null;
+        LineIterator iterator = null;
         try {
-            if (!exceptionQueue.isEmpty()) {
-                writer = Files.newWriter(fs.getGarbage(), Charsets.UTF_8);
-                saveBatchToFile(newArrayList(exceptionQueue), writer);
+            removesWriter = Files.newWriter(fs.getGarbage(), Charsets.UTF_8);
+            ConcurrentLinkedQueue<String> exceptionQueue = new 
ConcurrentLinkedQueue<String>();
+            iterator =
+                    FileUtils.lineIterator(fs.getGcCandidates(), 
Charsets.UTF_8.name());
+
+            List<String> ids = newArrayList();
+            while (iterator.hasNext()) {
+                ids.add(iterator.next());
+
+                if (ids.size() >= getBatchCount()) {
+                    count += ids.size();
+                    deleted += BlobCollectionType.get(blobStore)
+                        .sweepInternal(blobStore,ids, exceptionQueue, 
lastMaxModifiedTime);
+                    ids = newArrayList();
+                    saveBatchToFile(newArrayList(exceptionQueue), 
removesWriter);
+                    exceptionQueue.clear();
+                }
+            }
+            if (!ids.isEmpty()) {
+                count += ids.size();
+                deleted += BlobCollectionType.get(blobStore)
+                    .sweepInternal(blobStore, ids, exceptionQueue, 
lastMaxModifiedTime);
+                saveBatchToFile(newArrayList(exceptionQueue), removesWriter);
+                exceptionQueue.clear();
             }
         } finally {
             LineIterator.closeQuietly(iterator);
-            IOUtils.closeQuietly(writer);
+            closeQuietly(removesWriter);
         }
 
-        if(!exceptionQueue.isEmpty()) {
-            LOG.warn("Unable to delete some blobs entries from the blob store. 
Details around such blob entries can " 
-                         + "be found in [{}]",
-                        fs.getGarbage().getAbsolutePath());
-        }
+        BlobCollectionType.get(blobStore).handleRemoves(blobStore, 
fs.getGarbage());
+
         if(count != deleted) {
             LOG.warn("Deleted only [{}] blobs entries from the [{}] candidates 
identified. This may happen if blob " 
                          + "modified time is > "
@@ -466,30 +472,6 @@ public class MarkSweepGarbageCollector i
         ids.clear();
         writer.flush();
     }
-    
-    /**
-     * Deletes a batch of blobs from blob store.
-     * 
-     * @param ids
-     * @param exceptionQueue
-     * @param maxModified
-     */
-    private long sweepInternal(List<String> ids, ConcurrentLinkedQueue<String> 
exceptionQueue, long maxModified) {
-        long deleted = 0;
-        try {
-            LOG.trace("Blob ids to be deleted {}", ids);
-            deleted = blobStore.countDeleteChunks(ids, maxModified);
-            if (deleted != ids.size()) {
-                // Only log and do not add to exception queue since some blobs 
may not match the
-                // lastMaxModifiedTime criteria.
-                LOG.debug("Some [{}] blobs were not deleted from the batch : 
[{}]", ids.size() - deleted, ids);
-            }
-        } catch (Exception e) {
-            LOG.warn("Error occurred while deleting blob with ids [{}]", ids, 
e);
-            exceptionQueue.addAll(ids);
-        }
-        return deleted;
-    }
 
     /**
      * Iterates the complete node tree and collect all blob references
@@ -552,7 +534,7 @@ public class MarkSweepGarbageCollector i
             LOG.info("Number of valid blob references marked under mark phase 
of " +
                     "Blob garbage collection [{}]", count.get());
             // sort the marked references with the first part of the key
-            GarbageCollectorFileState.sort(fs.getMarkedRefs(), 
+            sort(fs.getMarkedRefs(),
                 new Comparator<String>() {
                     @Override
                     public int compare(String s1, String s2) {
@@ -560,7 +542,7 @@ public class MarkSweepGarbageCollector i
                     }
                 });
         } finally {
-            IOUtils.closeQuietly(writer);
+            closeQuietly(writer);
         }
     }
     
@@ -626,37 +608,20 @@ public class MarkSweepGarbageCollector i
     
         @Override
         public Integer call() throws Exception {
-            LOG.debug("Starting retrieve of all blobs");
-            BufferedWriter bufferWriter = null;
-            int blobsCount = 0;
-            try {
-                bufferWriter = new BufferedWriter(
-                        new FileWriter(fs.getAvailableRefs()));
-                Iterator<String> idsIter = blobStore.getAllChunkIds(0);
-                List<String> ids = newArrayList();
-
-                while (idsIter.hasNext()) {
-                    ids.add(idsIter.next());
-                    if (ids.size() > getBatchCount()) {
-                        blobsCount += ids.size();
-                        saveBatchToFile(ids, bufferWriter);
-                        LOG.info("Retrieved ({}) blobs", blobsCount);
-                    }
-                }
-
-                if (!ids.isEmpty()) {
-                    blobsCount += ids.size();
-                    saveBatchToFile(ids, bufferWriter);
-                    LOG.info("Retrieved ({}) blobs", blobsCount);
-                }
+            BlobCollectionType.get(blobStore).retrieve(blobStore, fs, 
getBatchCount());
+            long length = fs.getAvailableRefs().length();
+            LOG.info("Length of blob ids file retrieved {}", length);
+
+            // If the length is 0 then references not available from the 
tracker
+            // retrieve from the data store
+            if (fs.getAvailableRefs().length() <= 0) {
+                BlobCollectionType.DEFAULT.retrieve(blobStore, fs, 
getBatchCount());
+                length = fs.getAvailableRefs().length();
+                LOG.info("Length of blob ids file retrieved {}", length);
 
-                // sort the file
-                GarbageCollectorFileState.sort(fs.getAvailableRefs());
-                LOG.info("Number of blobs present in BlobStore : [{}] ", 
blobsCount);
-            } finally {
-                IOUtils.closeQuietly(bufferWriter);
+                BlobCollectionType.get(blobStore).track(blobStore, fs);
             }
-            return blobsCount;
+            return 0;
         }
     }
 
@@ -712,11 +677,11 @@ public class MarkSweepGarbageCollector i
                     // List of files to be merged
                     List<File> files = newArrayList();
                     for (DataRecord refFile : refFiles) {
-                        File file = 
GarbageCollectorFileState.copy(refFile.getStream());
+                        File file = copy(refFile.getStream());
                         files.add(file);
                     }
 
-                    GarbageCollectorFileState.merge(files, fs.getMarkedRefs());
+                    merge(files, fs.getMarkedRefs());
                     
                     // Get the timestamp to indicate the earliest mark phase 
start
                     List<DataRecord> markerFiles =
@@ -788,4 +753,169 @@ public class MarkSweepGarbageCollector i
     
         public void addMarkedStartMarker(GarbageCollectableBlobStore 
blobStore, String repoId) {}
     }
+
+    /**
+     * Defines different blob collection types and encodes the divergent 
behavior.
+     * <ul></ul>
+     */
+    private enum BlobCollectionType {
+        TRACKER {
+            /**
+             * Deletes the given batch by deleting individually to exactly 
know the actual deletes.
+             */
+            @Override
+            long sweepInternal(GarbageCollectableBlobStore blobStore, 
List<String> ids,
+                ConcurrentLinkedQueue<String> exceptionQueue, long 
maxModified) {
+                long totalDeleted = 0;
+                LOG.trace("Blob ids to be deleted {}", ids);
+                for (String id : ids) {
+                    try {
+                        long deleted = 
blobStore.countDeleteChunks(newArrayList(id), maxModified);
+                        if (deleted != 1) {
+                            LOG.debug("Blob [{}] not deleted", id);
+                        } else {
+                            exceptionQueue.add(id);
+                        }
+                        totalDeleted += deleted;
+                    } catch (Exception e) {
+                        LOG.warn("Error occurred while deleting blob with id 
[{}]", id, e);
+                    }
+                }
+                return totalDeleted;
+            }
+
+            @Override
+            void retrieve(GarbageCollectableBlobStore blobStore,
+                    GarbageCollectorFileState fs, int batchCount) throws 
Exception {
+                ((BlobTrackingStore) blobStore).getTracker()
+                    .get(fs.getAvailableRefs().getAbsolutePath());
+            }
+
+            @Override
+            void handleRemoves(GarbageCollectableBlobStore blobStore,
+                    File removedIds) throws IOException {
+                ((BlobTrackingStore) 
blobStore).getTracker().remove(removedIds);
+            }
+
+            @Override
+            void track(GarbageCollectableBlobStore blobStore,
+                GarbageCollectorFileState fs) {
+                try {
+                    File f = File.createTempFile("blobiddownload", null);
+                    copyFile(fs.getAvailableRefs(), f);
+                    ((BlobTrackingStore) blobStore).getTracker().add(f);
+                } catch (IOException e) {
+                    LOG.warn("Unable to track blob ids locally");
+                }
+            }
+        },
+        DEFAULT;
+
+        /**
+         * Deletes a batch of blobs from blob store.
+         *
+         * @param blobStore blobStore
+         * @param ids ids to sweep
+         * @param exceptionQueue add removes to the queue
+         * @param maxModified maxModified time of blobs to be deleted
+         * @return
+         */
+        long sweepInternal(GarbageCollectableBlobStore blobStore,
+            List<String> ids, ConcurrentLinkedQueue<String> exceptionQueue, 
long maxModified) {
+            long deleted = 0;
+            try {
+                LOG.trace("Blob ids to be deleted {}", ids);
+                deleted = blobStore.countDeleteChunks(ids, maxModified);
+                if (deleted != ids.size()) {
+                    LOG.debug("Some [{}] blobs were not deleted from the batch 
: [{}]",
+                        ids.size() - deleted, ids);
+                    exceptionQueue.addAll(ids);
+                }
+            } catch (Exception e) {
+                LOG.warn("Error occurred while deleting blob with ids [{}]", 
ids, e);
+            }
+            return deleted;
+        }
+
+        /**
+         * Retrieve the put the list of available blobs in the file.
+         *
+         * @param blobStore
+         * @param fs
+         * @param batchCount
+         * @throws Exception
+         */
+        void retrieve(GarbageCollectableBlobStore blobStore,
+                GarbageCollectorFileState fs, int batchCount) throws Exception 
{
+            LOG.debug("Starting retrieve of all blobs");
+            BufferedWriter bufferWriter = null;
+            int blobsCount = 0;
+            Iterator<String> idsIter = null;
+            try {
+                bufferWriter = new BufferedWriter(
+                    new FileWriter(fs.getAvailableRefs()));
+
+                idsIter = blobStore.getAllChunkIds(0);
+                List<String> ids = newArrayList();
+                while (idsIter.hasNext()) {
+                    ids.add(idsIter.next());
+                    if (ids.size() > batchCount) {
+                        blobsCount += ids.size();
+                        saveBatchToFile(ids, bufferWriter);
+                        LOG.info("Retrieved ({}) blobs", blobsCount);
+                    }
+                }
+
+                if (!ids.isEmpty()) {
+                    blobsCount += ids.size();
+                    saveBatchToFile(ids, bufferWriter);
+                    LOG.info("Retrieved ({}) blobs", blobsCount);
+                }
+
+                // sort the file
+                sort(fs.getAvailableRefs());
+                LOG.info("Number of blobs present in BlobStore : [{}] ", 
blobsCount);
+            } finally {
+                closeQuietly(bufferWriter);
+                if (idsIter instanceof Closeable) {
+                    try {
+                        Closeables.close((Closeable) idsIter, false);
+                    } catch (Exception e) {
+                        LOG.debug("Error closing iterator");
+                    }
+                }
+            }
+        }
+
+        /**
+         * Hook to handle all the removed ids.
+         *
+         * @param blobStore
+         * @param removedIds
+         * @throws IOException
+         */
+        void handleRemoves(GarbageCollectableBlobStore blobStore,
+            File removedIds) throws IOException {
+            FileUtils.forceDelete(removedIds);
+        }
+
+        /**
+         * Tracker may want to track this file
+         *
+         * @param blobStore
+         * @param fs
+         */
+        void track(GarbageCollectableBlobStore blobStore, 
GarbageCollectorFileState fs) {
+        }
+
+        public static BlobCollectionType get(GarbageCollectableBlobStore 
blobStore) {
+            if (blobStore instanceof BlobTrackingStore) {
+                BlobTracker tracker = ((BlobTrackingStore) 
blobStore).getTracker();
+                if (tracker != null) {
+                    return TRACKER;
+                }
+            }
+            return DEFAULT;
+        }
+    }
 }

Added: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTracker.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTracker.java?rev=1753429&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTracker.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTracker.java
 Wed Jul 20 03:56:20 2016
@@ -0,0 +1,607 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob.datastore;
+
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.locks.ReentrantLock;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import org.apache.jackrabbit.core.data.DataRecord;
+import 
org.apache.jackrabbit.oak.commons.FileIOUtils.FileLineDifferenceIterator;
+import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
+import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static com.google.common.base.Predicates.alwaysTrue;
+import static com.google.common.collect.Iterables.filter;
+import static com.google.common.collect.Iterables.transform;
+import static com.google.common.collect.Lists.newArrayList;
+import static com.google.common.io.Files.asByteSource;
+import static com.google.common.io.Files.fileTreeTraverser;
+import static com.google.common.io.Files.move;
+import static com.google.common.io.Files.newWriter;
+import static java.io.File.createTempFile;
+import static java.lang.System.currentTimeMillis;
+import static java.util.Collections.synchronizedList;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.commons.io.FileUtils.copyFile;
+import static org.apache.commons.io.FileUtils.forceDelete;
+import static org.apache.commons.io.FileUtils.forceMkdir;
+import static org.apache.commons.io.FileUtils.lineIterator;
+import static org.apache.commons.io.FilenameUtils.concat;
+import static org.apache.commons.io.FilenameUtils.removeExtension;
+import static org.apache.commons.io.IOUtils.closeQuietly;
+import static 
org.apache.jackrabbit.oak.commons.FileIOUtils.CloseableFileIterator.wrap;
+import static org.apache.jackrabbit.oak.commons.FileIOUtils.append;
+import static org.apache.jackrabbit.oak.commons.FileIOUtils.copy;
+import static org.apache.jackrabbit.oak.commons.FileIOUtils.sort;
+import static org.apache.jackrabbit.oak.commons.FileIOUtils.writeStrings;
+import static 
org.apache.jackrabbit.oak.plugins.blob.datastore.BlobIdTracker.Store.Type.GENERATION;
+import static 
org.apache.jackrabbit.oak.plugins.blob.datastore.BlobIdTracker.Store.Type.IN_PROCESS;
+import static 
org.apache.jackrabbit.oak.plugins.blob.datastore.BlobIdTracker.Store.Type.REFS;
+
+
+/**
+ * Tracks the blob ids available or added in the blob store.
+ *     - We can identify instance node where uploading/downloading not required
+ *     - From other nodes we can only upload deltas since, last snapshot
+ *     - If all nodes have the speificed flag then all nodes run partial gc
+ *
+ */
+public class BlobIdTracker implements Closeable, BlobTracker {
+    private static final Logger LOG = 
LoggerFactory.getLogger(BlobIdTracker.class);
+
+    private static final String datastoreMeta = "blobreferences";
+    private static final String fileNamePrefix = "blob";
+    private static final String mergedFileSuffix = ".refs";
+
+    /* Local instance identifier */
+    private final String instanceId = String.valueOf(currentTimeMillis());
+
+    private final SharedDataStore datastore;
+
+    protected Store store;
+
+    private final ScheduledExecutorService scheduler;
+
+    private String prefix;
+
+    public BlobIdTracker(String path, String repositoryId,
+        long snapshotIntervalSecs, SharedDataStore datastore) throws 
IOException {
+        this(path, repositoryId, newSingleThreadScheduledExecutor(),
+            snapshotIntervalSecs, snapshotIntervalSecs, datastore);
+    }
+
+    public BlobIdTracker(String path, String repositoryId, 
ScheduledExecutorService scheduler,
+            long snapshotDelaySecs, long snapshotIntervalSecs, SharedDataStore 
datastore)
+        throws IOException {
+        String root = concat(path, datastoreMeta);
+        File rootDir = new File(root);
+        this.datastore = datastore;
+        this.scheduler = scheduler;
+
+        try {
+            forceMkdir(rootDir);
+            prefix = fileNamePrefix + "-" + repositoryId;
+            this.store = new Store(rootDir, prefix);
+            scheduler.scheduleAtFixedRate(new SnapshotJob(),
+                SECONDS.toMillis(snapshotDelaySecs),
+                SECONDS.toMillis(snapshotIntervalSecs), MILLISECONDS);
+        } catch (IOException e) {
+            LOG.error("Error initializing blob tracker", e);
+            close();
+            throw e;
+        }
+    }
+
+    @Override
+    public void remove(File recs) throws IOException {
+        store.removeRecords(recs);
+    }
+
+    @Override
+    public void remove(Iterator<String> recs) throws IOException {
+        store.removeRecords(recs);
+    }
+
+    @Override
+    public void add(String id) throws IOException {
+        store.addRecord(id);
+    }
+
+    @Override
+    public void add(Iterator<String> recs) throws IOException {
+        store.addRecords(recs);
+    }
+
+    @Override
+    public void add(File recs) throws IOException {
+        store.addRecords(recs);
+    }
+
+    /**
+     * Retrieves all the reference files available in the DataStore and merges 
them to a local
+     * and then returns an iterator over it. This way the ids returned are as 
recent as the
+     * snapshots taken on all instances/repositories connected to the 
DataStore.
+     *
+     * The iterator returned ia a Closeable instance and should be closed by 
calling #close().
+     *
+     * @return iterator over all the blob ids available
+     * @throws IOException
+     */
+    @Override
+    public Iterator<String> get() throws IOException {
+        try {
+            globalMerge();
+            return store.getRecords();
+        } catch (IOException e) {
+            LOG.error("Error in retrieving blob records iterator", e);
+            throw e;
+        }
+    }
+
+    @Override
+    public File get(String path) throws IOException {
+        globalMerge();
+        return store.getRecords(path);
+    }
+
+    private void globalMerge() throws IOException {
+        try {
+            // Download all the blob reference files except the local one from 
the data store
+            Iterable<DataRecord> refRecords = filter(
+                datastore.getAllMetadataRecords(fileNamePrefix), new 
Predicate<DataRecord>() {
+                    @Override
+                    public boolean apply(DataRecord input) {
+                        return 
!input.getIdentifier().toString().contains(instanceId);
+                    }
+                });
+            // Download all the corresponding reference files
+            List<File> refFiles = newArrayList(
+                transform(refRecords,
+                    new Function<DataRecord, File>() {
+                        @Override
+                        public File apply(DataRecord input) {
+                            InputStream inputStream = null;
+                            try {
+                                inputStream = input.getStream();
+                                return copy(inputStream);
+                            } catch (Exception e) {
+                                LOG.warn("Error copying data store file 
locally {}",
+                                    input.getIdentifier(), e);
+                            } finally {
+                                closeQuietly(inputStream);
+                            }
+                            return null;
+                        }
+                    }));
+
+            // Merge all the downloaded files in to the local store
+            store.merge(refFiles, true);
+
+            // Remove all the data store records
+            for (DataRecord rec : refRecords) {
+                datastore.deleteMetadataRecord(rec.getIdentifier().toString());
+                LOG.info("Deleted metadata record {}", 
rec.getIdentifier().toString());
+            }
+        } catch (IOException e) {
+            LOG.error("Error in merging blob records iterator from the data 
store", e);
+            throw e;
+        }
+    }
+
+    /**
+     * Takes a snapshot on the tracker.
+     * Uploads the latest snapshot to the DataStore so, that it's visible
+     * to other cluster nodes/repositories connected to the DataStore.
+     *
+     * @throws IOException
+     */
+    private void snapshot() throws IOException {
+        InputStream inputStream = null;
+        try {
+            store.snapshot();
+
+            inputStream =
+                asByteSource(store.getBlobRecordsFile()).openBufferedStream();
+            datastore.addMetadataRecord(inputStream,
+                (prefix + instanceId + mergedFileSuffix));
+        } catch (Exception e) {
+            LOG.error("Error taking snapshot", e);
+            throw new IOException("Snapshot error", e);
+        } finally {
+            closeQuietly(inputStream);
+        }
+    }
+
+    /**
+     * Closes the tracker and the underlying store.
+     *
+     * @throws IOException
+     */
+    @Override
+    public void close() throws IOException {
+        store.close();
+        new ExecutorCloser(scheduler).close();
+    }
+
+    /**
+     * Local store for managing the blob reference
+     */
+    static class Store implements Closeable {
+        /* Suffix for a snapshot generation file */
+        private static final String genFileNameSuffix = ".gen";
+
+        /* Suffix for in process file */
+        private static final String workingCopySuffix = ".process";
+
+        /* The current writer where all blob ids are being appended */
+        private BufferedWriter writer;
+
+        /* In-process file */
+        private File processFile;
+
+        /* All available generations that need to be merged */
+        private final List<File> generations;
+
+        private final File rootDir;
+
+        private final String prefix;
+
+        /* Lock for operations on references file */
+        private final ReentrantLock refLock;
+
+        Store(File rootDir, String prefix) throws IOException {
+            this.rootDir = rootDir;
+            this.prefix = prefix;
+            this.refLock = new ReentrantLock();
+
+            // Retrieve the process file if it exists
+            processFile =
+                
fileTreeTraverser().breadthFirstTraversal(rootDir).firstMatch(IN_PROCESS.filter())
+                    .orNull();
+
+            // Get the List of all generations available.
+            generations = synchronizedList(newArrayList(
+                
fileTreeTraverser().breadthFirstTraversal(rootDir).filter(GENERATION.filter())));
+
+            // Close/rename any existing in process
+            nextGeneration();
+        }
+
+        /**
+         * Add a blob id to the tracking file.
+         *
+         * @param id id to track
+         * @throws IOException
+         */
+        protected synchronized void addRecord(String id) throws IOException {
+            writer.append(id);
+            writer.newLine();
+            writer.flush();
+            LOG.debug("Added record {}", id);
+        }
+
+        /**
+         * Returns an iterator on the tracked blob ids.
+         *
+         * @return record iterator
+         * @throws IOException
+         */
+        protected Iterator<String> getRecords() throws IOException {
+            try {
+                // Get a temp file path
+                String path = createTempFile("temp", null).getAbsolutePath();
+                return wrap(lineIterator(getRecords(path)), new File(path));
+            } catch (IOException e) {
+                LOG.error("Error in retrieving blob records iterator", e);
+                throw e;
+            }
+        }
+
+        /**
+         * Returns a file with the tracked blob ids.
+         *
+         * @param path path of the file
+         * @return File containing tracked file ids
+         * @throws IOException
+         */
+        protected File getRecords(String path) throws IOException {
+            refLock.lock();
+            File copiedRecsFile = new File(path);
+            try {
+                copyFile(getBlobRecordsFile(), copiedRecsFile);
+                return copiedRecsFile;
+            } catch (IOException e) {
+                LOG.error("Error in retrieving blob records file", e);
+                throw e;
+            } finally {
+                refLock.unlock();
+            }
+        }
+
+        /**
+         * Returns the blob references file
+         *
+         * @return blob reference file
+         * @throws IOException
+         */
+        protected File getBlobRecordsFile() throws IOException {
+            File refs = new File(rootDir, prefix + REFS.getFileNameSuffix());
+            if (!refs.exists()) {
+                LOG.debug("File created {}", refs.createNewFile());
+            }
+            return refs;
+        }
+
+        /**
+         * Merges the given files with the references file and deletes the 
files.
+         *
+         * @param refFiles files to merge
+         * @param doSort whether to sort while merging
+         * @throws IOException
+         */
+        protected void merge(List<File> refFiles, boolean doSort) throws 
IOException {
+            refLock.lock();
+            try {
+                if (refFiles != null && !refFiles.isEmpty()) {
+                    File merged = new File(rootDir, prefix + 
REFS.getFileNameSuffix());
+                    append(refFiles, merged, true);
+                    LOG.debug("Merged files into references {}", refFiles);
+                    refFiles.clear();
+                }
+                if (doSort) {
+                    sort(getBlobRecordsFile());
+                }
+            } finally {
+                refLock.unlock();
+            }
+        }
+
+        /**
+         * Removes ids obtained by the given iterator from the tracked 
references.
+         * The iterator has to be closed by the caller.
+         *
+         * @param recs iterator over records to remove
+         * @throws IOException
+         */
+        protected void removeRecords(Iterator<String> recs) throws IOException 
{
+            // Spool the ids to be deleted into a file and sort
+            File deleted = createTempFile("deleted", null);
+            writeStrings(recs, deleted, false);
+            removeRecords(deleted);
+            LOG.trace("Removed records");
+        }
+
+        /**
+         * Removes ids obtained by the given file from the tracked references.
+         * File is deleted before returning.
+         *
+         * @param recs file of records to delete
+         * @throws IOException
+         */
+        protected void removeRecords(File recs) throws IOException {
+            refLock.lock();
+            try {
+                sort(getBlobRecordsFile());
+                sort(recs);
+                LOG.trace("Sorted files");
+
+                // Remove and spool the remaining ids into a temp file
+                File temp = createTempFile("sorted", null);
+                FileLineDifferenceIterator iterator = null;
+                try {
+                    iterator = new FileLineDifferenceIterator(recs, 
getBlobRecordsFile(), null);
+                    writeStrings(iterator, temp, false);
+                } finally {
+                    if (iterator != null) {
+                        iterator.close();
+                    }
+                }
+
+                File blobRecs = getBlobRecordsFile();
+                move(temp, blobRecs);
+                LOG.trace("removed records");
+            } finally {
+                refLock.unlock();
+                try {
+                    forceDelete(recs);
+                } catch (IOException e) {
+                    LOG.debug("Failed to delete file {}", recs, e);
+                }
+            }
+        }
+
+        /**
+         * Opens a new generation file and a writer over it.
+         *
+         * @throws IOException
+         */
+        private synchronized void nextGeneration() throws IOException {
+            close();
+
+            processFile = new File(rootDir, prefix + 
IN_PROCESS.getFileNameSuffix());
+            writer = newWriter(processFile, UTF_8);
+            LOG.info("Created new process file and writer over {} ", 
processFile.getAbsolutePath());
+        }
+
+        /**
+         * Adds all the ids backed by the iterator into the references.
+         * Closing the iterator is the responsibility of the caller.
+         *
+         * @param recs iterator over records to add
+         * @throws IOException
+         */
+        protected void addRecords(Iterator<String> recs) throws IOException {
+            // Spool contents into a temp file
+            File added = createTempFile("added", null);
+            writeStrings(recs, added, false);
+            // Merge the file with the references
+            merge(Lists.newArrayList(added), false);
+        }
+
+        /**
+         * Merges the contents of the file into the references file.
+         *
+         * @param recs File of records to add
+         * @throws IOException
+         */
+        protected void addRecords(File recs) throws IOException {
+            // Merge the file with the references
+            merge(Lists.newArrayList(recs), false);
+        }
+
+        /**
+         * Snapshots the local store, starts a new generation for writing
+         * and merges all generations available.
+         *
+         * @throws IOException
+         */
+        protected void snapshot() throws IOException {
+            nextGeneration();
+            merge(generations, false);
+        }
+
+        @Override
+        public synchronized void close() {
+            closeQuietly(writer);
+            LOG.info("Closed writer");
+
+            // Convert the process file to a generational file
+            if (processFile != null) {
+                File renamed = new 
File(removeExtension(processFile.getAbsolutePath()));
+                boolean success = processFile.renameTo(renamed);
+                LOG.debug("File renamed {}", success);
+                if (success) {
+                    generations.add(renamed);
+                    LOG.info("Process file renamed to {}", 
renamed.getAbsolutePath());
+                } else {
+                    LOG.trace("Trying a copy file operation");
+                    try {
+                        if (renamed.createNewFile()) {
+                            Files.copy(processFile, renamed);
+                            generations.add(renamed);
+                            LOG.info("{} File copied to {}", 
processFile.getAbsolutePath(),
+                                renamed.getAbsolutePath());
+                        }
+                    } catch (Exception e) {
+                        LOG.warn("Unable to copy process file to corresponding 
gen file. Some"
+                            + " elements may be missed", e);
+                    }
+                }
+            }
+        }
+
+        /**
+         * Different types of files
+         */
+        enum Type {
+            IN_PROCESS {
+                @Override String getFileNameSuffix() {
+                    return "." + currentTimeMillis() + genFileNameSuffix + 
workingCopySuffix;
+                }
+
+                @Override Predicate<File> filter() {
+                    return new Predicate<File>() {
+                        @Override public boolean apply(File input) {
+                            return input.getName().endsWith(workingCopySuffix) 
&& input.getName().startsWith(fileNamePrefix);
+                        }
+                    };
+                }
+            },
+            GENERATION {
+                @Override String getFileNameSuffix() {
+                    return "." + currentTimeMillis() + genFileNameSuffix;
+                }
+
+                @Override Predicate<File> filter() {
+                    return new Predicate<File>() {
+                        @Override public boolean apply(File input) {
+                            return input.getName().startsWith(fileNamePrefix)
+                                && input.getName().contains(genFileNameSuffix)
+                                && 
!input.getName().endsWith(workingCopySuffix);
+                        }
+                    };
+                }
+            },
+            REFS {
+                @Override String getFileNameSuffix() {
+                    return mergedFileSuffix;
+                }
+
+                @Override Predicate<File> filter() {
+                    return new Predicate<File>() {
+                        @Override public boolean apply(File input) {
+                            return input.getName().endsWith(mergedFileSuffix)
+                                && input.getName().startsWith(fileNamePrefix);
+                        }
+                    };
+                }
+            };
+
+            /**
+             * Returns the file name suffix according to the type.
+             *
+             * @return file name suffix
+             */
+            String getFileNameSuffix() {
+                return "";
+            }
+
+            /**
+             * Returns the predicate to filter files in the root directory 
according to the type.
+             *
+             * @return a predicate to select particular file types
+             */
+            Predicate<File> filter() {
+                return alwaysTrue();
+            }
+        }
+    }
+
+    /**
+     * Job which calls the snapshot on the tracker.
+     */
+    class SnapshotJob implements Runnable {
+        @Override
+        public void run() {
+            try {
+                snapshot();
+                LOG.info("Finished taking snapshot");
+            } catch (Exception e) {
+                LOG.warn("Failure in taking snapshot", e);
+            }
+        }
+    }
+
+}

Propchange: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTracker.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobTracker.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobTracker.java?rev=1753429&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobTracker.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobTracker.java
 Wed Jul 20 03:56:20 2016
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob.datastore;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Track the blob ids.
+ */
+public interface BlobTracker extends Closeable {
+    /**
+     * Adds the given id.
+     *
+     * @param id the record id to be tracked
+     * @throws IOException
+     */
+    void add(String id) throws IOException;
+
+    /**
+     * Adds the given ids.
+     *
+     * @param recs
+     * @throws IOException
+     */
+    void add(Iterator<String> recs) throws IOException;
+
+    /**
+     * Adds the ids in the given file.
+     *
+     * @param recs
+     * @throws IOException
+     */
+    void add(File recs) throws IOException;
+
+    /**
+     * Remove the given ids.
+     *
+     * @param recs
+     * @throws IOException
+     */
+    void remove(Iterator<String> recs) throws IOException;
+
+    /**
+     * Remove the ids in the given file and deletes the file.
+     *
+     * @param recs
+     * @throws IOException
+     */
+    void remove(File recs) throws IOException;
+
+    /**
+     * Fetches an iterator of records available.
+     *
+     * @return
+     * @throws IOException
+     */
+    Iterator<String> get() throws IOException;
+
+    /**
+     * Fetches a File object which having all the sorted records.
+     * The lifecycle of the returned {@link File} handle is the responsibility 
of the handler.
+     *
+     * @return
+     * @throws IOException
+     */
+    File get(String path) throws IOException;
+}

Propchange: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobTracker.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java?rev=1753429&r1=1753428&r2=1753429&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java
 Wed Jul 20 03:56:20 2016
@@ -22,6 +22,7 @@ package org.apache.jackrabbit.oak.plugin
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.collect.Iterators.filter;
 import static com.google.common.collect.Iterators.transform;
+import static org.apache.commons.io.IOUtils.closeQuietly;
 
 import java.io.BufferedInputStream;
 import java.io.ByteArrayInputStream;
@@ -57,6 +58,7 @@ import org.apache.jackrabbit.core.data.M
 import org.apache.jackrabbit.oak.cache.CacheLIRS;
 import org.apache.jackrabbit.oak.cache.CacheStats;
 import org.apache.jackrabbit.oak.commons.StringUtils;
+import org.apache.jackrabbit.oak.plugins.blob.BlobTrackingStore;
 import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
 import org.apache.jackrabbit.oak.spi.blob.stats.StatsCollectingStreams;
@@ -70,14 +72,16 @@ import org.slf4j.LoggerFactory;
  * It also handles inlining binaries if there size is smaller than
  * {@link org.apache.jackrabbit.core.data.DataStore#getMinRecordLength()}
  */
-public class DataStoreBlobStore implements DataStore, SharedDataStore, 
BlobStore,
-        GarbageCollectableBlobStore {
+public class DataStoreBlobStore implements DataStore, BlobStore,
+        GarbageCollectableBlobStore, BlobTrackingStore {
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     private final DataStore delegate;
 
     private BlobStatsCollector stats = BlobStatsCollector.NOOP;
 
+    private BlobTracker tracker;
+
     /**
      * If set to true then the blob length information would be encoded as 
part of blobId
      * and thus no extra call would be made to DataStore to determine the 
length
@@ -194,6 +198,7 @@ public class DataStoreBlobStore implemen
     public void close() throws DataStoreException {
         delegate.close();
         cache.invalidateAll();
+        closeQuietly(tracker);
     }
 
     //~-------------------------------------------< BlobStore >
@@ -206,6 +211,9 @@ public class DataStoreBlobStore implemen
             checkNotNull(stream);
             DataRecord dr = writeStream(stream);
             String id = getBlobId(dr);
+            if (tracker != null) {
+                tracker.add(id);
+            }
             threw = false;
             stats.uploaded(System.nanoTime() - start, TimeUnit.NANOSECONDS, 
dr.getLength());
             stats.uploadCompleted(id);
@@ -332,7 +340,7 @@ public class DataStoreBlobStore implemen
             in = new FileInputStream(file);
             return writeBlob(in);
         } finally {
-            org.apache.commons.io.IOUtils.closeQuietly(in);
+            closeQuietly(in);
             FileUtils.forceDelete(file);
         }
     }
@@ -521,6 +529,19 @@ public class DataStoreBlobStore implemen
         this.stats = stats;
     }
 
+
+    @Override
+    public void addTracker(BlobTracker tracker) {
+        this.tracker = tracker;
+    }
+
+    @Override
+    @Nullable
+    public BlobTracker getTracker() {
+        return tracker;
+    }
+
+
     //~---------------------------------------------< Internal >
 
     private InputStream getStream(String blobId) throws IOException {

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java?rev=1753429&r1=1753428&r2=1753429&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java
 Wed Jul 20 03:56:20 2016
@@ -96,7 +96,10 @@ public class SharedDataStoreUtils {
      * Encapsulates the different type of records at the data store root.
      */
     public enum SharedStoreRecordType {
-        REFERENCES("references"), REPOSITORY("repository"), 
MARKED_START_MARKER("markedTimestamp");
+        REFERENCES("references"),
+        REPOSITORY("repository"),
+        MARKED_START_MARKER("markedTimestamp"),
+        BLOBREFERENCES("blob");
 
         private final String type;
 

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java?rev=1753429&r1=1753428&r2=1753429&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
 Wed Jul 20 03:56:20 2016
@@ -69,7 +69,9 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.blob.BlobGCMBean;
 import org.apache.jackrabbit.oak.plugins.blob.BlobGarbageCollector;
 import org.apache.jackrabbit.oak.plugins.blob.BlobStoreStats;
+import org.apache.jackrabbit.oak.plugins.blob.BlobTrackingStore;
 import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobIdTracker;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
 import org.apache.jackrabbit.oak.plugins.document.persistentCache.CacheType;
 import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCacheStats;
@@ -324,6 +326,24 @@ public class DocumentNodeStoreService {
     )
     public static final String PROP_BLOB_GC_MAX_AGE = "blobGcMaxAgeInSecs";
 
+    /**
+     * Default interval for taking snapshots of locally tracked blob ids.
+     */
+    private static final long DEFAULT_BLOB_SNAPSHOT_INTERVAL = 12 * 60 * 60;
+    @Property (longValue = DEFAULT_BLOB_SNAPSHOT_INTERVAL,
+        label = "Blob tracking snapshot interval (in secs)",
+        description = "This is the default interval in which the snapshots of 
locally tracked blob ids will"
+            + "be taken and synchronized with the blob store"
+    )
+    public static final String PROP_BLOB_SNAPSHOT_INTERVAL = 
"blobTrackSnapshotIntervalInSecs";
+
+    private static final String DEFAULT_PROP_HOME = "./repository";
+    @Property(value = DEFAULT_PROP_HOME,
+        label = "Root directory",
+        description = "Root directory for local tracking of blob ids"
+    )
+    private static final String PROP_HOME = "repository.home";
+
     private static final long DEFAULT_MAX_REPLICATION_LAG = 6 * 60 * 60;
     @Property(longValue = DEFAULT_MAX_REPLICATION_LAG,
             label = "Max Replication Lag (in secs)",
@@ -504,15 +524,31 @@ public class DocumentNodeStoreService {
         
         // If a shared data store register the repo id in the data store
         if (SharedDataStoreUtils.isShared(blobStore)) {
+            String repoId = null;
             try {
-                String repoId = 
ClusterRepositoryInfo.getOrCreateId(mk.getNodeStore());
+                repoId = 
ClusterRepositoryInfo.getOrCreateId(mk.getNodeStore());
                 ((SharedDataStore) blobStore).addMetadataRecord(new 
ByteArrayInputStream(new byte[0]),
                     
SharedDataStoreUtils.SharedStoreRecordType.REPOSITORY.getNameFromId(repoId));
             } catch (Exception e) {
                 throw new IOException("Could not register a unique 
repositoryId", e);
             }
+
+            if (blobStore instanceof BlobTrackingStore) {
+                final long trackSnapshotInterval = 
toLong(prop(PROP_BLOB_SNAPSHOT_INTERVAL),
+                    DEFAULT_BLOB_SNAPSHOT_INTERVAL);
+                String root = PropertiesUtil.toString(prop(PROP_HOME), 
DEFAULT_PROP_HOME);
+
+                BlobTrackingStore trackingStore = (BlobTrackingStore) 
blobStore;
+                if (trackingStore.getTracker() != null) {
+                    trackingStore.getTracker().close();
+                }
+                ((BlobTrackingStore) blobStore).addTracker(
+                    new BlobIdTracker(root, repoId, trackSnapshotInterval, 
(SharedDataStore)
+                        blobStore));
+            }
         }
 
+
         registerJMXBeans(mk.getNodeStore(), mkBuilder);
         registerLastRevRecoveryJob(mk.getNodeStore());
         registerJournalGC(mk.getNodeStore());

Added: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTrackerClusterSharedTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTrackerClusterSharedTest.java?rev=1753429&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTrackerClusterSharedTest.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTrackerClusterSharedTest.java
 Wed Jul 20 03:56:20 2016
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob.datastore;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
+import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static com.google.common.collect.Lists.newArrayList;
+import static com.google.common.collect.Sets.newHashSet;
+import static com.google.common.io.Closeables.close;
+import static java.lang.String.valueOf;
+import static java.util.UUID.randomUUID;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.jackrabbit.oak.commons.FileIOUtils.writeStrings;
+import static 
org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreUtils.getBlobStore;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assume.assumeNoException;
+import static org.junit.Assume.assumeThat;
+
+/**
+ * Test for BlobIdTracker simulating a cluster and a shared data store 
scenarios
+ * to test addition, retrieval and removal of blob ids.
+ */
+public class BlobIdTrackerClusterSharedTest {
+    private static final Logger log = 
LoggerFactory.getLogger(BlobIdTrackerClusterSharedTest.class);
+
+    File root;
+    Cluster cluster1;
+    Cluster cluster2;
+
+    @Rule
+    public TemporaryFolder folder = new TemporaryFolder(new File("target"));
+
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+        try {
+            assumeThat(getBlobStore(), instanceOf(SharedDataStore.class));
+        } catch (Exception e) {
+            assumeNoException(e);
+        }
+    }
+
+    @Before
+    public void setup() throws Exception {
+        this.root = folder.newFolder();
+    }
+
+    /**
+     * Test simulating add, remove, retrieve scenarios for blobId tracker on a 
2 node cluster.
+     *
+     * @throws IOException
+     */
+    @Test
+    public void addRetrieveCluster() throws Exception {
+        String clusterRepoId = randomUUID().toString();
+        cluster1 = new Cluster(clusterRepoId,
+            folder.newFolder("cluster1").getAbsolutePath(), folder);
+        cluster2 = new Cluster(clusterRepoId,
+            folder.newFolder("cluster2").getAbsolutePath(), folder);
+        Set<String> adds = newHashSet();
+
+        // Add some on cluster 2 & simulate snapshot
+        adds.addAll(cluster2.doAdd(range(5, 9)));
+        cluster2.forceSnapshot();
+        log.info("Done force snapshot for cluster2");
+
+        // Add some on cluster1 & simulate snapshot
+        adds.addAll(cluster1.doAdd(range(0, 4)));
+        cluster1.forceSnapshot();
+        log.info("Done force snapshot for cluster1");
+
+        // Get on cluster 1
+        Set<String> retrieves = cluster1.doRetrieve();
+        assertEquals("Retrieves not correct", adds, retrieves);
+        log.info("Done retrieve on cluster1");
+
+        cluster1.doRemove(adds, range(4, 5));
+        log.info("Done remove on cluster1");
+        retrieves = cluster1.doRetrieve();
+        log.info("Done retrieve on cluster1 again");
+        assertEquals("Retrieves not correct after remove", adds, retrieves);
+    }
+
+    /**
+     * Test simulating add, remove, retrieve scenarios for blobId tracker on a 
2 node shared
+     * repository.
+     *
+     * @throws IOException
+     */
+    @Test
+    public void addRetrieveShared() throws Exception {
+        cluster1 = new Cluster(randomUUID().toString(),
+            folder.newFolder("cluster1").getAbsolutePath(), folder);
+        cluster2 = new Cluster(randomUUID().toString(),
+            folder.newFolder("cluster2").getAbsolutePath(), folder);
+        Set<String> adds = newHashSet();
+
+        // Add some on cluster1 & simulate snapshot
+        adds.addAll(cluster1.doAdd(range(0, 4)));
+        cluster1.forceSnapshot();
+        log.info("Done force snapshot for cluster1");
+
+        // Add some on cluster 2 & simulate shapshot
+        adds.addAll(cluster2.doAdd(range(5, 9)));
+        cluster2.forceSnapshot();
+        log.info("Done force snapshot for cluster2");
+
+        // Get on cluster 1
+        Set<String> retrieves = cluster1.doRetrieve();
+        assertEquals("Retrieves not correct",
+            adds, retrieves);
+        log.info("Done retrieve on cluster1");
+
+        cluster1.doRemove(adds, range(4, 5));
+        log.info("Done remove on cluster1");
+        retrieves = cluster1.doRetrieve();
+        log.info("Done retrieve on cluster1 again");
+        assertEquals("Retrieves not correct after remove",
+            adds, retrieves);
+    }
+
+    /**
+     * Logical instance.
+     */
+    class Cluster {
+        ScheduledExecutorService scheduler;
+        BlobIdTracker tracker;
+        TemporaryFolder folder;
+        SharedDataStore dataStore;
+
+        Cluster(String repoId, String path, TemporaryFolder folder) throws 
Exception {
+            this.dataStore = getBlobStore(root);
+            this.tracker = new BlobIdTracker(path, repoId, 100 * 60, 
dataStore);
+            this.scheduler = newSingleThreadScheduledExecutor();
+            this.folder = folder;
+        }
+
+        Set<String> doAdd(List<String> ints) throws IOException {
+            return add(tracker, ints);
+        }
+
+        void doRemove(Set<String> adds, List<String> removes) throws 
IOException {
+            remove(tracker, folder.newFile(), adds, removes);
+        }
+
+        void forceSnapshot() {
+            try {
+                ScheduledFuture<?> scheduledFuture =
+                    scheduler.schedule(tracker.new SnapshotJob(), 0, 
MILLISECONDS);
+                scheduledFuture.get();
+            } catch (Exception e) {
+                log.error("Error in snapshot", e);
+            }
+        }
+
+        Set<String> doRetrieve() throws IOException {
+            return retrieve(tracker);
+        }
+
+        public void close() throws IOException {
+            new ExecutorCloser(scheduler).close();
+            tracker.close();
+            try {
+                ((DataStoreBlobStore) dataStore).close();
+            } catch (DataStoreException e) {
+                log.warn("Error closing blobstore", e);
+            }
+        }
+    }
+
+    @After
+    public void tearDown() throws IOException {
+        cluster1.close();
+        cluster2.close();
+        folder.delete();
+    }
+
+    private static Set<String> add(BlobTracker tracker, List<String> ints) 
throws IOException {
+        Set<String> s = newHashSet();
+        for (String rec : ints) {
+            tracker.add(rec);
+            s.add(rec);
+        }
+        return s;
+    }
+
+    private static Set<String> retrieve(BlobTracker tracker) throws 
IOException {
+        Set<String> retrieved = newHashSet();
+        Iterator<String> iter = tracker.get();
+        log.info("retrieving blob ids");
+        while(iter.hasNext()) {
+            retrieved.add(iter.next());
+        }
+        if (iter instanceof Closeable) {
+            close((Closeable)iter, true);
+        }
+        return retrieved;
+    }
+
+    private static void remove(BlobIdTracker tracker, File temp, Set<String> 
initAdd,
+        List<String> ints) throws IOException {
+        writeStrings(ints.iterator(), temp, false);
+        initAdd.removeAll(ints);
+        tracker.remove(temp);
+    }
+
+    private static List<String> range(int min, int max) {
+        List<String> list = newArrayList();
+        for (int i = min; i <= max; i++) {
+            list.add(valueOf(i));
+        }
+        return list;
+    }
+}
+

Propchange: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTrackerClusterSharedTest.java
------------------------------------------------------------------------------
    svn:eol-style = native


Reply via email to