Added: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTrackerStoreTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTrackerStoreTest.java?rev=1753429&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTrackerStoreTest.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTrackerStoreTest.java
 Wed Jul 20 03:56:20 2016
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob.datastore;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobIdTracker.Store;
+import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.collect.Lists.newArrayList;
+import static com.google.common.collect.Sets.newHashSet;
+import static com.google.common.collect.Sets.symmetricDifference;
+import static java.lang.String.valueOf;
+import static java.util.UUID.randomUUID;
+import static org.apache.commons.io.IOUtils.closeQuietly;
+import static org.apache.jackrabbit.oak.commons.FileIOUtils.readStringsAsSet;
+import static org.apache.jackrabbit.oak.commons.FileIOUtils.writeStrings;
+import static 
org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreUtils.getBlobStore;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeNoException;
+import static org.junit.Assume.assumeThat;
+
+/**
+ * Test for BlobIdTracker.Store to test addition, retrieval and removal of 
blob ids.
+ */
+public class BlobIdTrackerStoreTest {
+    private static final Logger log = 
LoggerFactory.getLogger(BlobIdTrackerStoreTest.class);
+
+    File root;
+    SharedDataStore dataStore;
+    BlobIdTracker tracker;
+
+    @Rule
+    public TemporaryFolder folder = new TemporaryFolder(new File("target"));
+    private String repoId;
+
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+        try {
+            assumeThat(getBlobStore(), instanceOf(SharedDataStore.class));
+        } catch (Exception e) {
+            assumeNoException(e);
+        }
+    }
+
+    @Before
+    public void setup() throws Exception {
+        this.root = folder.newFolder();
+        if (dataStore == null) {
+            dataStore = getBlobStore(root);
+        }
+        this.repoId = randomUUID().toString();
+        this.tracker = initTracker();
+    }
+
+    private BlobIdTracker initTracker() throws IOException {
+        return new BlobIdTracker(root.getAbsolutePath(),
+           repoId, 5 * 60, dataStore);
+    }
+
+    @After
+    public void tearDown() throws IOException {
+        tracker.close();
+        folder.delete();
+    }
+
+    @Test
+    public void addSnapshot() throws Exception {
+        BlobIdTracker.Store store = tracker.store;
+
+        Set<String> initAdd = add(store, range(0, 10000));
+        store.snapshot();
+        Set<String> retrieved = retrieve(store);
+
+        assertEquals("Incorrect elements after add snapshot", initAdd, 
retrieved);
+    }
+
+    @Test
+    public void addSnapshotRetrieve() throws Exception {
+        BlobIdTracker.Store store = tracker.store;
+
+        Set<String> initAdd = add(store, range(0, 10000));
+        store.snapshot();
+        Set<String> retrieved = retrieveFile(store, folder);
+
+        assertEquals("Incorrect elements after add snapshot reading file", 
initAdd, retrieved);
+    }
+
+    @Test
+    public void addSnapshotAdd() throws Exception {
+        BlobIdTracker.Store store = tracker.store;
+
+        Set<String> initAdd = add(store, range(0, 10000));
+        store.snapshot();
+        initAdd.addAll(add(store, range(10001, 10003)));
+        Set<String> retrieved = retrieve(store);
+
+        assertTrue("Incorrect elements with add before snapshot",
+            symmetricDifference(initAdd, retrieved)
+                .containsAll(newHashSet("10001", "10002", "10003")));
+    }
+
+    @Test
+    public void addSnapshotAddSnapshot() throws Exception {
+        BlobIdTracker.Store store = tracker.store;
+
+        Set<String> initAdd = add(store, range(0, 10000));
+        store.snapshot();
+        initAdd.addAll(add(store, range(10001, 10003)));
+        store.snapshot();
+        Set<String> retrieved = retrieve(store);
+
+        assertEquals("Incorrect elements with snapshot after add", initAdd, 
retrieved);
+    }
+
+    @Test
+    public void addSnapshotRemove() throws Exception {
+        BlobIdTracker.Store store = tracker.store;
+
+        Set<String> initAdd = add(store, range(0, 10000));
+        store.snapshot();
+        remove(store, folder.newFile(), initAdd, range(2, 3));
+
+        Set<String> retrieved = retrieve(store);
+        assertEquals("Incorrect elements after remove", initAdd, retrieved);
+    }
+
+    @Test
+    public void addRestart() throws IOException {
+        BlobIdTracker.Store store = tracker.store;
+
+        Set<String> initAdd = add(store, range(0, 100000));
+        this.tracker = initTracker();
+        Set<String> retrieved = retrieve(store);
+        assertTrue("Extra elements retrieved", retrieved.isEmpty());
+        store = tracker.store;
+        store.snapshot();
+        retrieved = retrieve(store);
+        assertEquals("Incorrect elements after dirty restart", initAdd, 
retrieved);
+    }
+
+    @Test
+    public void addCloseRestart() throws IOException {
+        BlobIdTracker.Store store = tracker.store;
+
+        Set<String> initAdd = add(store, range(0, 10000));
+        store.close();
+        this.tracker = initTracker();
+        store = tracker.store;
+        store.snapshot();
+        Set<String> retrieved = retrieve(store);
+        assertEquals("Incorrect elements after safe restart", initAdd, 
retrieved);
+    }
+
+    @Test
+    public void addConcurrentSnapshot() throws IOException, 
InterruptedException {
+        final BlobIdTracker.Store store = tracker.store;
+        final CountDownLatch start = new CountDownLatch(1);
+        final CountDownLatch done = new CountDownLatch(2);
+
+        Thread addThread = addThread(store, start, done);
+        Thread snapshotThread = snapshotThread(store, start, done);
+        snapshotThread.start();
+        addThread.start();
+
+        start.countDown();
+        done.await();
+
+        // Do a snapshot to ensure that all the adds if snapshot finished 
first are collected
+        store.snapshot();
+        Set<String> retrieved = retrieve(store);
+        assertEquals("Incorrect elements after concurrent snapshot",
+            newHashSet(range(0, 100000)), retrieved);
+    }
+
+    @Test
+    public void addSnapshotConcurrentRetrieve() throws IOException, 
InterruptedException {
+        final BlobIdTracker.Store store = tracker.store;
+        final CountDownLatch start = new CountDownLatch(1);
+        final CountDownLatch done = new CountDownLatch(2);
+        Set<String> initAdd = add(store, range(0, 100000));
+        final Set<String> retrieves = newHashSet();
+
+        Thread retrieveThread = retrieveThread(store, retrieves, start, done);
+        Thread snapshotThread = snapshotThread(store, start, done);
+        snapshotThread.start();
+        retrieveThread.start();
+        start.countDown();
+        done.await();
+
+        if (retrieves.isEmpty()) {
+            // take a snapshot to ensure that all adds accounted if snapshot 
finished last
+            store.snapshot();
+            retrieves.addAll(retrieve(store));
+        }
+        assertEquals("Incorrect elements after concurrent snapshot/retrieve",
+            initAdd, retrieves);
+    }
+
+    @Test
+    public void snapshotConcurrentRemove() throws IOException, 
InterruptedException {
+        final BlobIdTracker.Store store = tracker.store;
+        final CountDownLatch start = new CountDownLatch(1);
+        final CountDownLatch done = new CountDownLatch(2);
+        final Set<String> initAdd = add(store, range(0, 100000));
+        store.snapshot();
+
+        Thread removeThread = removeThread(store, folder.newFile(), initAdd, 
start, done);
+        Thread snapshotThread = snapshotThread(store, start, done);
+        removeThread.start();
+        snapshotThread.start();
+
+        // add some more to check that snapshot is successfull
+        initAdd.addAll(add(store, range(10001, 10003)));
+        start.countDown();
+        done.await();
+
+        Set<String> retrieves = retrieve(store);
+        assertEquals("Incorrect elements after concurrent snapshot/remove",
+            initAdd, retrieves);
+    }
+
+    @Test
+    public void addBulkAdd() throws IOException {
+        final BlobIdTracker.Store store = tracker.store;
+        final Set<String> initAdd = add(store, range(0, 4));
+
+        // Add new ids from a file
+        File temp = folder.newFile();
+        List<String> newAdd = range(5, 9);
+        initAdd.addAll(newAdd);
+        writeStrings(newAdd.iterator(), temp, false);
+
+        store.addRecords(temp);
+        store.snapshot();
+
+        Set<String> retrieved = retrieve(store);
+        assertEquals("Incorrect elements after bulk add from file",
+            initAdd, retrieved);
+
+        newAdd = range(10, 14);
+        initAdd.addAll(newAdd);
+
+        store.addRecords(newAdd.iterator());
+        store.snapshot();
+
+        retrieved = retrieve(store);
+        assertEquals("Incorrect elements after bulk add from iterator",
+            initAdd, retrieved);
+    }
+
+    @Test
+    public void bulkAddConcurrentCompact() throws IOException, 
InterruptedException {
+        final BlobIdTracker.Store store = tracker.store;
+        final CountDownLatch start = new CountDownLatch(1);
+        final CountDownLatch done = new CountDownLatch(2);
+
+        Thread addThread = addThread(store, true, start, done);
+        Thread snapshotThread = snapshotThread(store, start, done);
+        snapshotThread.start();
+        addThread.start();
+
+        start.countDown();
+        done.await();
+
+        // Do a snapshot to ensure that all the adds if snapshot finished 
first are collected
+        store.snapshot();
+        Set<String> retrieved = retrieve(store);
+        assertEquals("Incorrect elements after concurrent snapshot",
+            newHashSet(range(0, 100000)), retrieved);
+
+    }
+
+    private static Thread addThread(
+        final Store store, final CountDownLatch start, final CountDownLatch 
done) {
+        return addThread(store, false, start, done);
+    }
+
+    private static Thread addThread(
+        final Store store, final boolean bulk, final CountDownLatch start, 
final CountDownLatch done) {
+        return new Thread("AddThread") {
+            @Override
+            public void run() {
+                try {
+                    List<String> adds = range(0, 100000);
+                    start.await();
+                    if (!bulk) {
+                        add(store, adds);
+                    } else {
+                        store.addRecords(adds.iterator());
+                    }
+                    done.countDown();
+                } catch (IOException e) {
+                    log.info("Exception in add", e);
+                } catch (InterruptedException e) {
+                    log.info("Interrupted in add", e);
+                }
+            }
+        };
+    }
+
+    private static Thread retrieveThread(
+        final Store store, final Set<String> retrieves, final CountDownLatch 
start,
+        final CountDownLatch done) {
+        return new Thread("RetrieveThread") {
+            @Override
+            public void run() {
+                try {
+                    start.await();
+                    retrieves.addAll(retrieve(store));
+                    done.countDown();
+                } catch (IOException e) {
+                    log.info("Exception in retrieve", e);
+                } catch (InterruptedException e) {
+                    log.info("Interrupted in retrieve", e);
+                }
+            }
+        };
+    }
+
+    private static Thread removeThread(final Store store, final File temp,
+        final Set<String> adds, final CountDownLatch start, final 
CountDownLatch done) {
+        return new Thread("RemoveThread") {
+            @Override
+            public void run() {
+                try {
+                    start.await();
+                    remove(store, temp, adds, range(1, 3));
+                    done.countDown();
+                } catch (IOException e) {
+                    log.info("Exception in remove", e);
+                } catch (InterruptedException e) {
+                    log.info("Interrupted in remove", e);
+                }
+            }
+        };
+    }
+
+    private static Thread snapshotThread(
+        final Store store, final CountDownLatch start, final CountDownLatch 
done) {
+        return new Thread("SnapshotThread") {
+            @Override
+            public void run() {
+                try {
+                    start.await();
+                    store.snapshot();
+                    done.countDown();
+                } catch (IOException e) {
+                    log.info("Exception in snapshot", e);
+                } catch (InterruptedException e) {
+                    log.info("Interrupted in snapshot", e);
+                }
+            }
+        };
+    }
+
+    private static Set<String> add(Store store, List<String> ints) throws 
IOException {
+        Set<String> s = newHashSet();
+        for (String rec : ints) {
+            store.addRecord(rec);
+            s.add(rec);
+        }
+        return s;
+    }
+
+    private static Set<String> retrieve(Store store) throws IOException {
+        Set<String> retrieved = newHashSet();
+        Iterator<String> iter = store.getRecords();
+        while(iter.hasNext()) {
+            retrieved.add(iter.next());
+        }
+        closeQuietly((Closeable)iter);
+        return retrieved;
+    }
+    private static Set<String> retrieveFile(Store store, TemporaryFolder 
folder) throws IOException {
+        File f = folder.newFile();
+        Set<String> retrieved = readStringsAsSet(
+            new FileInputStream(store.getRecords(f.getAbsolutePath())), false);
+        return retrieved;
+    }
+
+    private static void remove(Store store, File temp, Set<String> initAdd,
+            List<String> ints) throws IOException {
+        writeStrings(ints.iterator(), temp, false);
+        initAdd.removeAll(ints);
+        store.removeRecords(temp);
+    }
+
+    private static List<String> range(int min, int max) {
+        List<String> list = newArrayList();
+        for (int i = min; i <= max; i++) {
+            list.add(valueOf(i));
+        }
+        return list;
+    }
+}
+

Propchange: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTrackerStoreTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTrackerTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTrackerTest.java?rev=1753429&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTrackerTest.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTrackerTest.java
 Wed Jul 20 03:56:20 2016
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob.datastore;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
+import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static com.google.common.collect.Lists.newArrayList;
+import static com.google.common.collect.Sets.newHashSet;
+import static java.lang.String.valueOf;
+import static java.util.UUID.randomUUID;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.apache.commons.io.IOUtils.closeQuietly;
+import static org.apache.jackrabbit.oak.commons.FileIOUtils.readStringsAsSet;
+import static org.apache.jackrabbit.oak.commons.FileIOUtils.writeStrings;
+import static 
org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreUtils.getBlobStore;
+import static 
org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils
+    .SharedStoreRecordType.BLOBREFERENCES;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assume.assumeNoException;
+import static org.junit.Assume.assumeThat;
+
+/**
+ * Test for BlobIdTracker to test addition, retrieval and removal of blob ids.
+ */
+public class BlobIdTrackerTest {
+    File root;
+    SharedDataStore dataStore;
+    BlobIdTracker tracker;
+
+    @Rule
+    public TemporaryFolder folder = new TemporaryFolder(new File("target"));
+    private String repoId;
+    private ScheduledExecutorService scheduler;
+
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+        try {
+            assumeThat(getBlobStore(), instanceOf(SharedDataStore.class));
+        } catch (Exception e) {
+            assumeNoException(e);
+        }
+    }
+
+    @Before
+    public void setup() throws Exception {
+        this.root = folder.newFolder();
+        if (dataStore == null) {
+            dataStore = getBlobStore(root);
+        }
+        this.repoId = randomUUID().toString();
+        this.tracker = new BlobIdTracker(root.getAbsolutePath(), repoId, 100 * 
60, dataStore);
+        this.scheduler = newSingleThreadScheduledExecutor();
+    }
+
+    @After
+    public void tearDown() throws IOException {
+        tracker.close();
+        new ExecutorCloser(scheduler).close();
+        folder.delete();
+    }
+
+    @Test
+    public void addSnapshot() throws Exception {
+        Set<String> initAdd = add(tracker, range(0, 4));
+        ScheduledFuture<?> scheduledFuture =
+            scheduler.schedule(tracker.new SnapshotJob(), 0, 
TimeUnit.MILLISECONDS);
+        scheduledFuture.get();
+
+        Set<String> retrieved = retrieve(tracker);
+
+        assertEquals("Extra elements after add", initAdd, retrieved);
+        assertEquals("Different ids from the datastore",
+            initAdd, 
read(dataStore.getAllMetadataRecords(BLOBREFERENCES.getType())));
+    }
+
+    @Test
+    public void addSnapshotRemove() throws Exception {
+        Set<String> initAdd = add(tracker, range(0, 4));
+        ScheduledFuture<?> scheduledFuture =
+            scheduler.schedule(tracker.new SnapshotJob(), 0, 
TimeUnit.MILLISECONDS);
+        scheduledFuture.get();
+        remove(tracker, folder.newFile(), initAdd, range(1, 2));
+
+        Set<String> retrieved = retrieve(tracker);
+
+        assertEquals("Extra elements after add", initAdd, retrieved);
+    }
+
+    private static Set<String> read(List<DataRecord> recs)
+        throws IOException, DataStoreException {
+        Set<String> ids = newHashSet();
+        for (DataRecord b : recs) {
+            ids.addAll(readStringsAsSet(b.getStream(), false));
+        }
+        return ids;
+    }
+
+    private static Set<String> add(BlobTracker tracker, List<String> ints) 
throws IOException {
+        Set<String> s = newHashSet();
+        for (String rec : ints) {
+            tracker.add(rec);
+            s.add(rec);
+        }
+        return s;
+    }
+
+    private static Set<String> retrieve(BlobTracker tracker) throws 
IOException {
+        Set<String> retrieved = newHashSet();
+        Iterator<String> iter = tracker.get();
+        while(iter.hasNext()) {
+            retrieved.add(iter.next());
+        }
+        if (iter instanceof Closeable) {
+            closeQuietly((Closeable)iter);
+        }
+        return retrieved;
+    }
+
+    private static void remove(BlobTracker tracker, File temp, Set<String> 
initAdd,
+            List<String> ints) throws IOException {
+        writeStrings(ints.iterator(), temp, false);
+        initAdd.removeAll(ints);
+        tracker.remove(temp);
+    }
+
+    private static List<String> range(int min, int max) {
+        List<String> list = newArrayList();
+        for (int i = min; i <= max; i++) {
+            list.add(valueOf(i));
+        }
+        return list;
+    }
+}
+

Propchange: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTrackerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreTrackerGCTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreTrackerGCTest.java?rev=1753429&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreTrackerGCTest.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreTrackerGCTest.java
 Wed Jul 20 03:56:20 2016
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob.datastore;
+
+import java.io.ByteArrayInputStream;
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+
+import com.google.common.collect.Lists;
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.plugins.blob.BlobTrackingStore;
+import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
+import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
+import 
org.apache.jackrabbit.oak.plugins.document.DocumentBlobReferenceRetriever;
+import org.apache.jackrabbit.oak.plugins.document.DocumentMKBuilderProvider;
+import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
+import org.apache.jackrabbit.oak.plugins.document.VersionGarbageCollector;
+import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
+import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static com.google.common.collect.Sets.newHashSet;
+import static com.google.common.collect.Sets.union;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.apache.commons.io.FileUtils.forceDelete;
+import static org.apache.commons.io.IOUtils.closeQuietly;
+import static 
org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreUtils.getBlobStore;
+import static 
org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils
+    .SharedStoreRecordType.REPOSITORY;
+import static 
org.apache.jackrabbit.oak.plugins.document.Revision.getCurrentTimestamp;
+import static org.apache.jackrabbit.oak.spi.commit.CommitInfo.EMPTY;
+import static org.apache.jackrabbit.oak.spi.commit.EmptyHook.INSTANCE;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assume.assumeNoException;
+import static org.junit.Assume.assumeThat;
+
+/**
+ */
+public class DataStoreTrackerGCTest {
+    private Clock clock;
+    private File blobStoreRoot;
+
+    @Rule
+    public TemporaryFolder folder = new TemporaryFolder(new File("target"));
+
+    @Rule
+    public DocumentMKBuilderProvider builderProvider = new 
DocumentMKBuilderProvider();
+
+    @BeforeClass
+    public static void check() {
+        File root = new File("./target/blobstore");
+        try {
+            BlobStore store = getBlobStore(root);
+            assumeThat(store, instanceOf(BlobTrackingStore.class));
+        } catch (Exception e) {
+            assumeNoException(e);
+        } finally {
+            try {
+                forceDelete(root);
+            } catch (IOException e) {
+            }
+        }
+    }
+
+    @Before
+    public void setup() throws IOException, InterruptedException {
+        this.clock = getTestClock();
+        this.blobStoreRoot = folder.newFolder("blobstore");
+    }
+
+    @Test
+    public void gc() throws Exception {
+        Cluster cluster = new Cluster("cluster1");
+        BlobStore s = cluster.blobStore;
+        BlobIdTracker tracker = (BlobIdTracker) ((BlobTrackingStore) 
s).getTracker();
+        DataStoreState state = init(cluster.nodeStore, 0);
+        ScheduledFuture<?> scheduledFuture = newSingleThreadScheduledExecutor()
+            .schedule(tracker.new SnapshotJob(), 0, MILLISECONDS);
+        scheduledFuture.get();
+        // All blobs added should be tracked now
+        assertEquals(state.blobsAdded, retrieveTracked(tracker));
+
+        cluster.gc.collectGarbage(false);
+        Set<String> existingAfterGC = iterate(s);
+        // Check the state of the blob store after gc
+        assertEquals(state.blobsPresent, existingAfterGC);
+        // Tracked blobs should reflect deletions after gc
+        assertEquals(state.blobsPresent, retrieveTracked(tracker));
+    }
+
+    @Test
+    public void gcColdStart() throws Exception {
+        Cluster cluster = new Cluster("cluster1");
+        BlobStore s = cluster.blobStore;
+        BlobIdTracker tracker = (BlobIdTracker) ((BlobTrackingStore) 
s).getTracker();
+        DataStoreState state = init(cluster.nodeStore, 0);
+
+        // No blobs should be found now as snapshot not done
+        assertNotEquals(state.blobsAdded, retrieveTracked(tracker));
+
+        cluster.gc.collectGarbage(false);
+        Set<String> existingAfterGC = iterate(s);
+        // Check the state of the blob store after gc
+        assertEquals(state.blobsPresent, existingAfterGC);
+        // Tracked blobs should reflect deletions after gc
+        assertEquals(state.blobsPresent, retrieveTracked(tracker));
+    }
+
+    @Test
+    public void differentCluster() throws Exception {
+        // Add blobs to cluster1
+        Cluster cluster1 = new Cluster("cluster1");
+        BlobStore s1 = cluster1.blobStore;
+        BlobIdTracker tracker1 = (BlobIdTracker) ((BlobTrackingStore) 
s1).getTracker();
+        DataStoreState state1 = init(cluster1.nodeStore, 0);
+        ScheduledFuture<?> scheduledFuture1 = 
newSingleThreadScheduledExecutor()
+            .schedule(tracker1.new SnapshotJob(), 0, MILLISECONDS);
+        scheduledFuture1.get();
+        // All blobs added should be tracked now
+        assertEquals(state1.blobsAdded, retrieveTracked(tracker1));
+
+        // Add blobs to cluster1
+        Cluster cluster2 = new Cluster("cluster2");
+        BlobStore s2 = cluster2.blobStore;
+        BlobIdTracker tracker2 = (BlobIdTracker) ((BlobTrackingStore) 
s2).getTracker();
+        DataStoreState state2 = init(cluster2.nodeStore, 0);
+        ScheduledFuture<?> scheduledFuture2 = 
newSingleThreadScheduledExecutor()
+            .schedule(tracker2.new SnapshotJob(), 0, MILLISECONDS);
+        scheduledFuture2.get();
+
+        // All blobs added should be tracked now
+        assertEquals(state2.blobsAdded, retrieveTracked(tracker2));
+        cluster2.gc.collectGarbage(true);
+
+        // do a gc on cluster1 with sweep
+        cluster1.gc.collectGarbage(false);
+        Set<String> existingAfterGC = iterate(s1);
+
+        // Check the state of the blob store after gc
+        assertEquals(
+            union(state1.blobsPresent, state2.blobsPresent), existingAfterGC);
+        // Tracked blobs should reflect deletions after gc
+        assertEquals(
+            union(state1.blobsPresent, state2.blobsPresent),
+            retrieveTracked(tracker1));
+
+    }
+
+    private Set<String> iterate(BlobStore blobStore) throws Exception {
+        Iterator<String> cur = ((GarbageCollectableBlobStore) 
blobStore).getAllChunkIds(0);
+
+        Set<String> existing = newHashSet();
+        while (cur.hasNext()) {
+            existing.add(cur.next());
+        }
+        return existing;
+    }
+
+    private static Set<String> retrieveTracked(BlobTracker tracker) throws 
IOException {
+        Set<String> retrieved = newHashSet();
+        Iterator<String> iter = tracker.get();
+        while(iter.hasNext()) {
+            retrieved.add(iter.next());
+        }
+        closeQuietly((Closeable)iter);
+        return retrieved;
+    }
+
+    public DataStoreState init(DocumentNodeStore s, int idStart) throws 
Exception {
+        NodeBuilder a = s.getRoot().builder();
+
+        int number = 10;
+        int maxDeleted = 5;
+        // track the number of the assets to be deleted
+        List<Integer> processed = Lists.newArrayList();
+        Random rand = new Random(47);
+        for (int i = idStart; i < idStart + maxDeleted; i++) {
+            int n = rand.nextInt(number);
+            if (!processed.contains(n)) {
+                processed.add(n);
+            }
+        }
+        DataStoreState state = new DataStoreState();
+        for (int i = idStart; i < idStart + number; i++) {
+            Blob b = s.createBlob(randomStream(i, 16516));
+            Iterator<String> idIter =
+                ((GarbageCollectableBlobStore) s.getBlobStore())
+                    .resolveChunks(b.toString());
+            while (idIter.hasNext()) {
+                String chunk = idIter.next();
+                state.blobsAdded.add(chunk);
+                if (!processed.contains(i)) {
+                    state.blobsPresent.add(chunk);
+                }
+            }
+            a.child("c" + i).setProperty("x", b);
+            // Add a duplicated entry
+            if (i == idStart) {
+                a.child("cdup").setProperty("x", b);
+            }
+        }
+        s.merge(a, INSTANCE, EMPTY);
+
+
+        a = s.getRoot().builder();
+        for (int id : processed) {
+            a.child("c" + id).remove();
+            s.merge(a, INSTANCE, EMPTY);
+        }
+        long maxAge = 10; // hours
+        // 1. Go past GC age and check no GC done as nothing deleted
+        clock.waitUntil(clock.getTime() + MINUTES.toMillis(maxAge));
+
+        VersionGarbageCollector vGC = s.getVersionGarbageCollector();
+        VersionGarbageCollector.VersionGCStats stats = vGC.gc(0, MILLISECONDS);
+
+        return state;
+    }
+
+    class Cluster implements Closeable {
+        DocumentNodeStore nodeStore;
+        BlobStore blobStore;
+        MarkSweepGarbageCollector gc;
+        String repoId;
+        BlobIdTracker tracker;
+
+        public Cluster(String clusterId) throws Exception {
+            blobStore = getBlobStore(blobStoreRoot);
+            nodeStore = builderProvider.newBuilder()
+                .setAsyncDelay(0)
+                .setDocumentStore(new MemoryDocumentStore())
+                .setBlobStore(blobStore)
+                .getNodeStore();
+            repoId = ClusterRepositoryInfo.getOrCreateId(nodeStore);
+            nodeStore.runBackgroundOperations();
+
+            ((SharedDataStore) blobStore).addMetadataRecord(
+                new ByteArrayInputStream(new byte[0]),
+                REPOSITORY.getNameFromId(repoId));
+
+            String trackerRoot = folder.newFolder(clusterId).getAbsolutePath();
+            tracker = new BlobIdTracker(trackerRoot,
+                repoId, 86400, (SharedDataStore) blobStore);
+            // add the tracker to the blobStore
+            ((BlobTrackingStore) blobStore).addTracker(tracker);
+
+            // initialized the GC
+            gc = new MarkSweepGarbageCollector(
+                new DocumentBlobReferenceRetriever(nodeStore),
+                (GarbageCollectableBlobStore) blobStore, 
newSingleThreadExecutor(),
+                folder.newFolder("gc" + clusterId).getAbsolutePath(), 5, 0, 
repoId);
+        }
+
+        public void close() throws IOException {
+            nodeStore.dispose();
+        }
+
+    }
+
+    protected Clock getTestClock() throws InterruptedException {
+        Clock clock = new Clock.Virtual();
+        clock.waitUntil(getCurrentTimestamp());
+        return clock;
+    }
+
+    static InputStream randomStream(int seed, int size) {
+        Random r = new Random(seed);
+        byte[] data = new byte[size];
+        r.nextBytes(data);
+        return new ByteArrayInputStream(data);
+    }
+
+    private class DataStoreState {
+        Set<String> blobsAdded = newHashSet();
+        Set<String> blobsPresent = newHashSet();
+    }
+}

Propchange: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreTrackerGCTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
jackrabbit/oak/trunk/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java?rev=1753429&r1=1753428&r2=1753429&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
 Wed Jul 20 03:56:20 2016
@@ -60,13 +60,16 @@ import org.apache.jackrabbit.oak.api.Des
 import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean;
 import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean;
 import org.apache.jackrabbit.oak.cache.CacheStats;
+import org.apache.jackrabbit.oak.commons.PropertiesUtil;
 import org.apache.jackrabbit.oak.osgi.ObserverTracker;
 import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
 import org.apache.jackrabbit.oak.plugins.blob.BlobGC;
 import org.apache.jackrabbit.oak.plugins.blob.BlobGCMBean;
 import org.apache.jackrabbit.oak.plugins.blob.BlobGarbageCollector;
+import org.apache.jackrabbit.oak.plugins.blob.BlobTrackingStore;
 import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
 import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobIdTracker;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
 import 
org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType;
 import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo;
@@ -305,6 +308,17 @@ public class SegmentNodeStoreService ext
     )
     public static final String PROP_BLOB_GC_MAX_AGE = "blobGcMaxAgeInSecs";
 
+    /**
+     * Default interval for taking snapshots of locally tracked blob ids.
+     */
+    private static final long DEFAULT_BLOB_SNAPSHOT_INTERVAL = 12 * 60 * 60;
+    @Property (longValue = DEFAULT_BLOB_SNAPSHOT_INTERVAL,
+        label = "Blob tracking snapshot interval (in secs)",
+        description = "This is the default interval in which the snapshots of 
locally tracked blob ids will"
+            + "be taken and synchronized with the blob store"
+    )
+    public static final String PROP_BLOB_SNAPSHOT_INTERVAL = 
"blobTrackSnapshotIntervalInSecs";
+
     @Override
     protected SegmentNodeStore getNodeStore() {
         checkState(segmentNodeStore != null, "service must be activated when 
used");
@@ -607,6 +621,20 @@ public class SegmentNodeStoreService ext
             } catch (Exception e) {
                 throw new IOException("Could not register a unique 
repositoryId", e);
             }
+
+            if (blobStore instanceof BlobTrackingStore) {
+                final long trackSnapshotInterval = 
toLong(property(PROP_BLOB_SNAPSHOT_INTERVAL),
+                    DEFAULT_BLOB_SNAPSHOT_INTERVAL);
+                String root = PropertiesUtil.toString(property(DIRECTORY), 
"./repository");
+
+                BlobTrackingStore trackingStore = (BlobTrackingStore) 
blobStore;
+                if (trackingStore.getTracker() != null) {
+                    trackingStore.getTracker().close();
+                }
+                ((BlobTrackingStore) blobStore).addTracker(
+                    new BlobIdTracker(root, repoId, trackSnapshotInterval, 
(SharedDataStore)
+                        blobStore));
+            }
         }
 
         if (store.getBlobStore() instanceof GarbageCollectableBlobStore) {



Reply via email to