Added:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTrackerStoreTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTrackerStoreTest.java?rev=1753429&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTrackerStoreTest.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTrackerStoreTest.java
Wed Jul 20 03:56:20 2016
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob.datastore;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobIdTracker.Store;
+import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.collect.Lists.newArrayList;
+import static com.google.common.collect.Sets.newHashSet;
+import static com.google.common.collect.Sets.symmetricDifference;
+import static java.lang.String.valueOf;
+import static java.util.UUID.randomUUID;
+import static org.apache.commons.io.IOUtils.closeQuietly;
+import static org.apache.jackrabbit.oak.commons.FileIOUtils.readStringsAsSet;
+import static org.apache.jackrabbit.oak.commons.FileIOUtils.writeStrings;
+import static
org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreUtils.getBlobStore;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeNoException;
+import static org.junit.Assume.assumeThat;
+
+/**
+ * Test for BlobIdTracker.Store to test addition, retrieval and removal of
blob ids.
+ */
+public class BlobIdTrackerStoreTest {
+ private static final Logger log =
LoggerFactory.getLogger(BlobIdTrackerStoreTest.class);
+
+ File root;
+ SharedDataStore dataStore;
+ BlobIdTracker tracker;
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder(new File("target"));
+ private String repoId;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ try {
+ assumeThat(getBlobStore(), instanceOf(SharedDataStore.class));
+ } catch (Exception e) {
+ assumeNoException(e);
+ }
+ }
+
+ @Before
+ public void setup() throws Exception {
+ this.root = folder.newFolder();
+ if (dataStore == null) {
+ dataStore = getBlobStore(root);
+ }
+ this.repoId = randomUUID().toString();
+ this.tracker = initTracker();
+ }
+
+ private BlobIdTracker initTracker() throws IOException {
+ return new BlobIdTracker(root.getAbsolutePath(),
+ repoId, 5 * 60, dataStore);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ tracker.close();
+ folder.delete();
+ }
+
+ @Test
+ public void addSnapshot() throws Exception {
+ BlobIdTracker.Store store = tracker.store;
+
+ Set<String> initAdd = add(store, range(0, 10000));
+ store.snapshot();
+ Set<String> retrieved = retrieve(store);
+
+ assertEquals("Incorrect elements after add snapshot", initAdd,
retrieved);
+ }
+
+ @Test
+ public void addSnapshotRetrieve() throws Exception {
+ BlobIdTracker.Store store = tracker.store;
+
+ Set<String> initAdd = add(store, range(0, 10000));
+ store.snapshot();
+ Set<String> retrieved = retrieveFile(store, folder);
+
+ assertEquals("Incorrect elements after add snapshot reading file",
initAdd, retrieved);
+ }
+
+ @Test
+ public void addSnapshotAdd() throws Exception {
+ BlobIdTracker.Store store = tracker.store;
+
+ Set<String> initAdd = add(store, range(0, 10000));
+ store.snapshot();
+ initAdd.addAll(add(store, range(10001, 10003)));
+ Set<String> retrieved = retrieve(store);
+
+ assertTrue("Incorrect elements with add before snapshot",
+ symmetricDifference(initAdd, retrieved)
+ .containsAll(newHashSet("10001", "10002", "10003")));
+ }
+
+ @Test
+ public void addSnapshotAddSnapshot() throws Exception {
+ BlobIdTracker.Store store = tracker.store;
+
+ Set<String> initAdd = add(store, range(0, 10000));
+ store.snapshot();
+ initAdd.addAll(add(store, range(10001, 10003)));
+ store.snapshot();
+ Set<String> retrieved = retrieve(store);
+
+ assertEquals("Incorrect elements with snapshot after add", initAdd,
retrieved);
+ }
+
+ @Test
+ public void addSnapshotRemove() throws Exception {
+ BlobIdTracker.Store store = tracker.store;
+
+ Set<String> initAdd = add(store, range(0, 10000));
+ store.snapshot();
+ remove(store, folder.newFile(), initAdd, range(2, 3));
+
+ Set<String> retrieved = retrieve(store);
+ assertEquals("Incorrect elements after remove", initAdd, retrieved);
+ }
+
+ @Test
+ public void addRestart() throws IOException {
+ BlobIdTracker.Store store = tracker.store;
+
+ Set<String> initAdd = add(store, range(0, 100000));
+ this.tracker = initTracker();
+ Set<String> retrieved = retrieve(store);
+ assertTrue("Extra elements retrieved", retrieved.isEmpty());
+ store = tracker.store;
+ store.snapshot();
+ retrieved = retrieve(store);
+ assertEquals("Incorrect elements after dirty restart", initAdd,
retrieved);
+ }
+
+ @Test
+ public void addCloseRestart() throws IOException {
+ BlobIdTracker.Store store = tracker.store;
+
+ Set<String> initAdd = add(store, range(0, 10000));
+ store.close();
+ this.tracker = initTracker();
+ store = tracker.store;
+ store.snapshot();
+ Set<String> retrieved = retrieve(store);
+ assertEquals("Incorrect elements after safe restart", initAdd,
retrieved);
+ }
+
+ @Test
+ public void addConcurrentSnapshot() throws IOException,
InterruptedException {
+ final BlobIdTracker.Store store = tracker.store;
+ final CountDownLatch start = new CountDownLatch(1);
+ final CountDownLatch done = new CountDownLatch(2);
+
+ Thread addThread = addThread(store, start, done);
+ Thread snapshotThread = snapshotThread(store, start, done);
+ snapshotThread.start();
+ addThread.start();
+
+ start.countDown();
+ done.await();
+
+ // Do a snapshot to ensure that all the adds if snapshot finished
first are collected
+ store.snapshot();
+ Set<String> retrieved = retrieve(store);
+ assertEquals("Incorrect elements after concurrent snapshot",
+ newHashSet(range(0, 100000)), retrieved);
+ }
+
+ @Test
+ public void addSnapshotConcurrentRetrieve() throws IOException,
InterruptedException {
+ final BlobIdTracker.Store store = tracker.store;
+ final CountDownLatch start = new CountDownLatch(1);
+ final CountDownLatch done = new CountDownLatch(2);
+ Set<String> initAdd = add(store, range(0, 100000));
+ final Set<String> retrieves = newHashSet();
+
+ Thread retrieveThread = retrieveThread(store, retrieves, start, done);
+ Thread snapshotThread = snapshotThread(store, start, done);
+ snapshotThread.start();
+ retrieveThread.start();
+ start.countDown();
+ done.await();
+
+ if (retrieves.isEmpty()) {
+ // take a snapshot to ensure that all adds accounted if snapshot
finished last
+ store.snapshot();
+ retrieves.addAll(retrieve(store));
+ }
+ assertEquals("Incorrect elements after concurrent snapshot/retrieve",
+ initAdd, retrieves);
+ }
+
+ @Test
+ public void snapshotConcurrentRemove() throws IOException,
InterruptedException {
+ final BlobIdTracker.Store store = tracker.store;
+ final CountDownLatch start = new CountDownLatch(1);
+ final CountDownLatch done = new CountDownLatch(2);
+ final Set<String> initAdd = add(store, range(0, 100000));
+ store.snapshot();
+
+ Thread removeThread = removeThread(store, folder.newFile(), initAdd,
start, done);
+ Thread snapshotThread = snapshotThread(store, start, done);
+ removeThread.start();
+ snapshotThread.start();
+
+ // add some more to check that snapshot is successfull
+ initAdd.addAll(add(store, range(10001, 10003)));
+ start.countDown();
+ done.await();
+
+ Set<String> retrieves = retrieve(store);
+ assertEquals("Incorrect elements after concurrent snapshot/remove",
+ initAdd, retrieves);
+ }
+
+ @Test
+ public void addBulkAdd() throws IOException {
+ final BlobIdTracker.Store store = tracker.store;
+ final Set<String> initAdd = add(store, range(0, 4));
+
+ // Add new ids from a file
+ File temp = folder.newFile();
+ List<String> newAdd = range(5, 9);
+ initAdd.addAll(newAdd);
+ writeStrings(newAdd.iterator(), temp, false);
+
+ store.addRecords(temp);
+ store.snapshot();
+
+ Set<String> retrieved = retrieve(store);
+ assertEquals("Incorrect elements after bulk add from file",
+ initAdd, retrieved);
+
+ newAdd = range(10, 14);
+ initAdd.addAll(newAdd);
+
+ store.addRecords(newAdd.iterator());
+ store.snapshot();
+
+ retrieved = retrieve(store);
+ assertEquals("Incorrect elements after bulk add from iterator",
+ initAdd, retrieved);
+ }
+
+ @Test
+ public void bulkAddConcurrentCompact() throws IOException,
InterruptedException {
+ final BlobIdTracker.Store store = tracker.store;
+ final CountDownLatch start = new CountDownLatch(1);
+ final CountDownLatch done = new CountDownLatch(2);
+
+ Thread addThread = addThread(store, true, start, done);
+ Thread snapshotThread = snapshotThread(store, start, done);
+ snapshotThread.start();
+ addThread.start();
+
+ start.countDown();
+ done.await();
+
+ // Do a snapshot to ensure that all the adds if snapshot finished
first are collected
+ store.snapshot();
+ Set<String> retrieved = retrieve(store);
+ assertEquals("Incorrect elements after concurrent snapshot",
+ newHashSet(range(0, 100000)), retrieved);
+
+ }
+
+ private static Thread addThread(
+ final Store store, final CountDownLatch start, final CountDownLatch
done) {
+ return addThread(store, false, start, done);
+ }
+
+ private static Thread addThread(
+ final Store store, final boolean bulk, final CountDownLatch start,
final CountDownLatch done) {
+ return new Thread("AddThread") {
+ @Override
+ public void run() {
+ try {
+ List<String> adds = range(0, 100000);
+ start.await();
+ if (!bulk) {
+ add(store, adds);
+ } else {
+ store.addRecords(adds.iterator());
+ }
+ done.countDown();
+ } catch (IOException e) {
+ log.info("Exception in add", e);
+ } catch (InterruptedException e) {
+ log.info("Interrupted in add", e);
+ }
+ }
+ };
+ }
+
+ private static Thread retrieveThread(
+ final Store store, final Set<String> retrieves, final CountDownLatch
start,
+ final CountDownLatch done) {
+ return new Thread("RetrieveThread") {
+ @Override
+ public void run() {
+ try {
+ start.await();
+ retrieves.addAll(retrieve(store));
+ done.countDown();
+ } catch (IOException e) {
+ log.info("Exception in retrieve", e);
+ } catch (InterruptedException e) {
+ log.info("Interrupted in retrieve", e);
+ }
+ }
+ };
+ }
+
+ private static Thread removeThread(final Store store, final File temp,
+ final Set<String> adds, final CountDownLatch start, final
CountDownLatch done) {
+ return new Thread("RemoveThread") {
+ @Override
+ public void run() {
+ try {
+ start.await();
+ remove(store, temp, adds, range(1, 3));
+ done.countDown();
+ } catch (IOException e) {
+ log.info("Exception in remove", e);
+ } catch (InterruptedException e) {
+ log.info("Interrupted in remove", e);
+ }
+ }
+ };
+ }
+
+ private static Thread snapshotThread(
+ final Store store, final CountDownLatch start, final CountDownLatch
done) {
+ return new Thread("SnapshotThread") {
+ @Override
+ public void run() {
+ try {
+ start.await();
+ store.snapshot();
+ done.countDown();
+ } catch (IOException e) {
+ log.info("Exception in snapshot", e);
+ } catch (InterruptedException e) {
+ log.info("Interrupted in snapshot", e);
+ }
+ }
+ };
+ }
+
+ private static Set<String> add(Store store, List<String> ints) throws
IOException {
+ Set<String> s = newHashSet();
+ for (String rec : ints) {
+ store.addRecord(rec);
+ s.add(rec);
+ }
+ return s;
+ }
+
+ private static Set<String> retrieve(Store store) throws IOException {
+ Set<String> retrieved = newHashSet();
+ Iterator<String> iter = store.getRecords();
+ while(iter.hasNext()) {
+ retrieved.add(iter.next());
+ }
+ closeQuietly((Closeable)iter);
+ return retrieved;
+ }
+ private static Set<String> retrieveFile(Store store, TemporaryFolder
folder) throws IOException {
+ File f = folder.newFile();
+ Set<String> retrieved = readStringsAsSet(
+ new FileInputStream(store.getRecords(f.getAbsolutePath())), false);
+ return retrieved;
+ }
+
+ private static void remove(Store store, File temp, Set<String> initAdd,
+ List<String> ints) throws IOException {
+ writeStrings(ints.iterator(), temp, false);
+ initAdd.removeAll(ints);
+ store.removeRecords(temp);
+ }
+
+ private static List<String> range(int min, int max) {
+ List<String> list = newArrayList();
+ for (int i = min; i <= max; i++) {
+ list.add(valueOf(i));
+ }
+ return list;
+ }
+}
+
Propchange:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTrackerStoreTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTrackerTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTrackerTest.java?rev=1753429&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTrackerTest.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTrackerTest.java
Wed Jul 20 03:56:20 2016
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob.datastore;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
+import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static com.google.common.collect.Lists.newArrayList;
+import static com.google.common.collect.Sets.newHashSet;
+import static java.lang.String.valueOf;
+import static java.util.UUID.randomUUID;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.apache.commons.io.IOUtils.closeQuietly;
+import static org.apache.jackrabbit.oak.commons.FileIOUtils.readStringsAsSet;
+import static org.apache.jackrabbit.oak.commons.FileIOUtils.writeStrings;
+import static
org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreUtils.getBlobStore;
+import static
org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils
+ .SharedStoreRecordType.BLOBREFERENCES;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assume.assumeNoException;
+import static org.junit.Assume.assumeThat;
+
+/**
+ * Test for BlobIdTracker to test addition, retrieval and removal of blob ids.
+ */
+public class BlobIdTrackerTest {
+ File root;
+ SharedDataStore dataStore;
+ BlobIdTracker tracker;
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder(new File("target"));
+ private String repoId;
+ private ScheduledExecutorService scheduler;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ try {
+ assumeThat(getBlobStore(), instanceOf(SharedDataStore.class));
+ } catch (Exception e) {
+ assumeNoException(e);
+ }
+ }
+
+ @Before
+ public void setup() throws Exception {
+ this.root = folder.newFolder();
+ if (dataStore == null) {
+ dataStore = getBlobStore(root);
+ }
+ this.repoId = randomUUID().toString();
+ this.tracker = new BlobIdTracker(root.getAbsolutePath(), repoId, 100 *
60, dataStore);
+ this.scheduler = newSingleThreadScheduledExecutor();
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ tracker.close();
+ new ExecutorCloser(scheduler).close();
+ folder.delete();
+ }
+
+ @Test
+ public void addSnapshot() throws Exception {
+ Set<String> initAdd = add(tracker, range(0, 4));
+ ScheduledFuture<?> scheduledFuture =
+ scheduler.schedule(tracker.new SnapshotJob(), 0,
TimeUnit.MILLISECONDS);
+ scheduledFuture.get();
+
+ Set<String> retrieved = retrieve(tracker);
+
+ assertEquals("Extra elements after add", initAdd, retrieved);
+ assertEquals("Different ids from the datastore",
+ initAdd,
read(dataStore.getAllMetadataRecords(BLOBREFERENCES.getType())));
+ }
+
+ @Test
+ public void addSnapshotRemove() throws Exception {
+ Set<String> initAdd = add(tracker, range(0, 4));
+ ScheduledFuture<?> scheduledFuture =
+ scheduler.schedule(tracker.new SnapshotJob(), 0,
TimeUnit.MILLISECONDS);
+ scheduledFuture.get();
+ remove(tracker, folder.newFile(), initAdd, range(1, 2));
+
+ Set<String> retrieved = retrieve(tracker);
+
+ assertEquals("Extra elements after add", initAdd, retrieved);
+ }
+
+ private static Set<String> read(List<DataRecord> recs)
+ throws IOException, DataStoreException {
+ Set<String> ids = newHashSet();
+ for (DataRecord b : recs) {
+ ids.addAll(readStringsAsSet(b.getStream(), false));
+ }
+ return ids;
+ }
+
+ private static Set<String> add(BlobTracker tracker, List<String> ints)
throws IOException {
+ Set<String> s = newHashSet();
+ for (String rec : ints) {
+ tracker.add(rec);
+ s.add(rec);
+ }
+ return s;
+ }
+
+ private static Set<String> retrieve(BlobTracker tracker) throws
IOException {
+ Set<String> retrieved = newHashSet();
+ Iterator<String> iter = tracker.get();
+ while(iter.hasNext()) {
+ retrieved.add(iter.next());
+ }
+ if (iter instanceof Closeable) {
+ closeQuietly((Closeable)iter);
+ }
+ return retrieved;
+ }
+
+ private static void remove(BlobTracker tracker, File temp, Set<String>
initAdd,
+ List<String> ints) throws IOException {
+ writeStrings(ints.iterator(), temp, false);
+ initAdd.removeAll(ints);
+ tracker.remove(temp);
+ }
+
+ private static List<String> range(int min, int max) {
+ List<String> list = newArrayList();
+ for (int i = min; i <= max; i++) {
+ list.add(valueOf(i));
+ }
+ return list;
+ }
+}
+
Propchange:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTrackerTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreTrackerGCTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreTrackerGCTest.java?rev=1753429&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreTrackerGCTest.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreTrackerGCTest.java
Wed Jul 20 03:56:20 2016
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob.datastore;
+
+import java.io.ByteArrayInputStream;
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+
+import com.google.common.collect.Lists;
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.plugins.blob.BlobTrackingStore;
+import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
+import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
+import
org.apache.jackrabbit.oak.plugins.document.DocumentBlobReferenceRetriever;
+import org.apache.jackrabbit.oak.plugins.document.DocumentMKBuilderProvider;
+import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
+import org.apache.jackrabbit.oak.plugins.document.VersionGarbageCollector;
+import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
+import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static com.google.common.collect.Sets.newHashSet;
+import static com.google.common.collect.Sets.union;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.apache.commons.io.FileUtils.forceDelete;
+import static org.apache.commons.io.IOUtils.closeQuietly;
+import static
org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreUtils.getBlobStore;
+import static
org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils
+ .SharedStoreRecordType.REPOSITORY;
+import static
org.apache.jackrabbit.oak.plugins.document.Revision.getCurrentTimestamp;
+import static org.apache.jackrabbit.oak.spi.commit.CommitInfo.EMPTY;
+import static org.apache.jackrabbit.oak.spi.commit.EmptyHook.INSTANCE;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assume.assumeNoException;
+import static org.junit.Assume.assumeThat;
+
+/**
+ */
+public class DataStoreTrackerGCTest {
+ private Clock clock;
+ private File blobStoreRoot;
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder(new File("target"));
+
+ @Rule
+ public DocumentMKBuilderProvider builderProvider = new
DocumentMKBuilderProvider();
+
+ @BeforeClass
+ public static void check() {
+ File root = new File("./target/blobstore");
+ try {
+ BlobStore store = getBlobStore(root);
+ assumeThat(store, instanceOf(BlobTrackingStore.class));
+ } catch (Exception e) {
+ assumeNoException(e);
+ } finally {
+ try {
+ forceDelete(root);
+ } catch (IOException e) {
+ }
+ }
+ }
+
+ @Before
+ public void setup() throws IOException, InterruptedException {
+ this.clock = getTestClock();
+ this.blobStoreRoot = folder.newFolder("blobstore");
+ }
+
+ @Test
+ public void gc() throws Exception {
+ Cluster cluster = new Cluster("cluster1");
+ BlobStore s = cluster.blobStore;
+ BlobIdTracker tracker = (BlobIdTracker) ((BlobTrackingStore)
s).getTracker();
+ DataStoreState state = init(cluster.nodeStore, 0);
+ ScheduledFuture<?> scheduledFuture = newSingleThreadScheduledExecutor()
+ .schedule(tracker.new SnapshotJob(), 0, MILLISECONDS);
+ scheduledFuture.get();
+ // All blobs added should be tracked now
+ assertEquals(state.blobsAdded, retrieveTracked(tracker));
+
+ cluster.gc.collectGarbage(false);
+ Set<String> existingAfterGC = iterate(s);
+ // Check the state of the blob store after gc
+ assertEquals(state.blobsPresent, existingAfterGC);
+ // Tracked blobs should reflect deletions after gc
+ assertEquals(state.blobsPresent, retrieveTracked(tracker));
+ }
+
+ @Test
+ public void gcColdStart() throws Exception {
+ Cluster cluster = new Cluster("cluster1");
+ BlobStore s = cluster.blobStore;
+ BlobIdTracker tracker = (BlobIdTracker) ((BlobTrackingStore)
s).getTracker();
+ DataStoreState state = init(cluster.nodeStore, 0);
+
+ // No blobs should be found now as snapshot not done
+ assertNotEquals(state.blobsAdded, retrieveTracked(tracker));
+
+ cluster.gc.collectGarbage(false);
+ Set<String> existingAfterGC = iterate(s);
+ // Check the state of the blob store after gc
+ assertEquals(state.blobsPresent, existingAfterGC);
+ // Tracked blobs should reflect deletions after gc
+ assertEquals(state.blobsPresent, retrieveTracked(tracker));
+ }
+
+ @Test
+ public void differentCluster() throws Exception {
+ // Add blobs to cluster1
+ Cluster cluster1 = new Cluster("cluster1");
+ BlobStore s1 = cluster1.blobStore;
+ BlobIdTracker tracker1 = (BlobIdTracker) ((BlobTrackingStore)
s1).getTracker();
+ DataStoreState state1 = init(cluster1.nodeStore, 0);
+ ScheduledFuture<?> scheduledFuture1 =
newSingleThreadScheduledExecutor()
+ .schedule(tracker1.new SnapshotJob(), 0, MILLISECONDS);
+ scheduledFuture1.get();
+ // All blobs added should be tracked now
+ assertEquals(state1.blobsAdded, retrieveTracked(tracker1));
+
+ // Add blobs to cluster1
+ Cluster cluster2 = new Cluster("cluster2");
+ BlobStore s2 = cluster2.blobStore;
+ BlobIdTracker tracker2 = (BlobIdTracker) ((BlobTrackingStore)
s2).getTracker();
+ DataStoreState state2 = init(cluster2.nodeStore, 0);
+ ScheduledFuture<?> scheduledFuture2 =
newSingleThreadScheduledExecutor()
+ .schedule(tracker2.new SnapshotJob(), 0, MILLISECONDS);
+ scheduledFuture2.get();
+
+ // All blobs added should be tracked now
+ assertEquals(state2.blobsAdded, retrieveTracked(tracker2));
+ cluster2.gc.collectGarbage(true);
+
+ // do a gc on cluster1 with sweep
+ cluster1.gc.collectGarbage(false);
+ Set<String> existingAfterGC = iterate(s1);
+
+ // Check the state of the blob store after gc
+ assertEquals(
+ union(state1.blobsPresent, state2.blobsPresent), existingAfterGC);
+ // Tracked blobs should reflect deletions after gc
+ assertEquals(
+ union(state1.blobsPresent, state2.blobsPresent),
+ retrieveTracked(tracker1));
+
+ }
+
+ private Set<String> iterate(BlobStore blobStore) throws Exception {
+ Iterator<String> cur = ((GarbageCollectableBlobStore)
blobStore).getAllChunkIds(0);
+
+ Set<String> existing = newHashSet();
+ while (cur.hasNext()) {
+ existing.add(cur.next());
+ }
+ return existing;
+ }
+
+ private static Set<String> retrieveTracked(BlobTracker tracker) throws
IOException {
+ Set<String> retrieved = newHashSet();
+ Iterator<String> iter = tracker.get();
+ while(iter.hasNext()) {
+ retrieved.add(iter.next());
+ }
+ closeQuietly((Closeable)iter);
+ return retrieved;
+ }
+
+ public DataStoreState init(DocumentNodeStore s, int idStart) throws
Exception {
+ NodeBuilder a = s.getRoot().builder();
+
+ int number = 10;
+ int maxDeleted = 5;
+ // track the number of the assets to be deleted
+ List<Integer> processed = Lists.newArrayList();
+ Random rand = new Random(47);
+ for (int i = idStart; i < idStart + maxDeleted; i++) {
+ int n = rand.nextInt(number);
+ if (!processed.contains(n)) {
+ processed.add(n);
+ }
+ }
+ DataStoreState state = new DataStoreState();
+ for (int i = idStart; i < idStart + number; i++) {
+ Blob b = s.createBlob(randomStream(i, 16516));
+ Iterator<String> idIter =
+ ((GarbageCollectableBlobStore) s.getBlobStore())
+ .resolveChunks(b.toString());
+ while (idIter.hasNext()) {
+ String chunk = idIter.next();
+ state.blobsAdded.add(chunk);
+ if (!processed.contains(i)) {
+ state.blobsPresent.add(chunk);
+ }
+ }
+ a.child("c" + i).setProperty("x", b);
+ // Add a duplicated entry
+ if (i == idStart) {
+ a.child("cdup").setProperty("x", b);
+ }
+ }
+ s.merge(a, INSTANCE, EMPTY);
+
+
+ a = s.getRoot().builder();
+ for (int id : processed) {
+ a.child("c" + id).remove();
+ s.merge(a, INSTANCE, EMPTY);
+ }
+ long maxAge = 10; // hours
+ // 1. Go past GC age and check no GC done as nothing deleted
+ clock.waitUntil(clock.getTime() + MINUTES.toMillis(maxAge));
+
+ VersionGarbageCollector vGC = s.getVersionGarbageCollector();
+ VersionGarbageCollector.VersionGCStats stats = vGC.gc(0, MILLISECONDS);
+
+ return state;
+ }
+
+ class Cluster implements Closeable {
+ DocumentNodeStore nodeStore;
+ BlobStore blobStore;
+ MarkSweepGarbageCollector gc;
+ String repoId;
+ BlobIdTracker tracker;
+
+ public Cluster(String clusterId) throws Exception {
+ blobStore = getBlobStore(blobStoreRoot);
+ nodeStore = builderProvider.newBuilder()
+ .setAsyncDelay(0)
+ .setDocumentStore(new MemoryDocumentStore())
+ .setBlobStore(blobStore)
+ .getNodeStore();
+ repoId = ClusterRepositoryInfo.getOrCreateId(nodeStore);
+ nodeStore.runBackgroundOperations();
+
+ ((SharedDataStore) blobStore).addMetadataRecord(
+ new ByteArrayInputStream(new byte[0]),
+ REPOSITORY.getNameFromId(repoId));
+
+ String trackerRoot = folder.newFolder(clusterId).getAbsolutePath();
+ tracker = new BlobIdTracker(trackerRoot,
+ repoId, 86400, (SharedDataStore) blobStore);
+ // add the tracker to the blobStore
+ ((BlobTrackingStore) blobStore).addTracker(tracker);
+
+ // initialized the GC
+ gc = new MarkSweepGarbageCollector(
+ new DocumentBlobReferenceRetriever(nodeStore),
+ (GarbageCollectableBlobStore) blobStore,
newSingleThreadExecutor(),
+ folder.newFolder("gc" + clusterId).getAbsolutePath(), 5, 0,
repoId);
+ }
+
+ public void close() throws IOException {
+ nodeStore.dispose();
+ }
+
+ }
+
+ protected Clock getTestClock() throws InterruptedException {
+ Clock clock = new Clock.Virtual();
+ clock.waitUntil(getCurrentTimestamp());
+ return clock;
+ }
+
+ static InputStream randomStream(int seed, int size) {
+ Random r = new Random(seed);
+ byte[] data = new byte[size];
+ r.nextBytes(data);
+ return new ByteArrayInputStream(data);
+ }
+
+ private class DataStoreState {
+ Set<String> blobsAdded = newHashSet();
+ Set<String> blobsPresent = newHashSet();
+ }
+}
Propchange:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreTrackerGCTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
jackrabbit/oak/trunk/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java?rev=1753429&r1=1753428&r2=1753429&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
(original)
+++
jackrabbit/oak/trunk/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
Wed Jul 20 03:56:20 2016
@@ -60,13 +60,16 @@ import org.apache.jackrabbit.oak.api.Des
import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean;
import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean;
import org.apache.jackrabbit.oak.cache.CacheStats;
+import org.apache.jackrabbit.oak.commons.PropertiesUtil;
import org.apache.jackrabbit.oak.osgi.ObserverTracker;
import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
import org.apache.jackrabbit.oak.plugins.blob.BlobGC;
import org.apache.jackrabbit.oak.plugins.blob.BlobGCMBean;
import org.apache.jackrabbit.oak.plugins.blob.BlobGarbageCollector;
+import org.apache.jackrabbit.oak.plugins.blob.BlobTrackingStore;
import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobIdTracker;
import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
import
org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType;
import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo;
@@ -305,6 +308,17 @@ public class SegmentNodeStoreService ext
)
public static final String PROP_BLOB_GC_MAX_AGE = "blobGcMaxAgeInSecs";
+ /**
+ * Default interval for taking snapshots of locally tracked blob ids.
+ */
+ private static final long DEFAULT_BLOB_SNAPSHOT_INTERVAL = 12 * 60 * 60;
+ @Property (longValue = DEFAULT_BLOB_SNAPSHOT_INTERVAL,
+ label = "Blob tracking snapshot interval (in secs)",
+ description = "This is the default interval in which the snapshots of
locally tracked blob ids will"
+ + "be taken and synchronized with the blob store"
+ )
+ public static final String PROP_BLOB_SNAPSHOT_INTERVAL =
"blobTrackSnapshotIntervalInSecs";
+
@Override
protected SegmentNodeStore getNodeStore() {
checkState(segmentNodeStore != null, "service must be activated when
used");
@@ -607,6 +621,20 @@ public class SegmentNodeStoreService ext
} catch (Exception e) {
throw new IOException("Could not register a unique
repositoryId", e);
}
+
+ if (blobStore instanceof BlobTrackingStore) {
+ final long trackSnapshotInterval =
toLong(property(PROP_BLOB_SNAPSHOT_INTERVAL),
+ DEFAULT_BLOB_SNAPSHOT_INTERVAL);
+ String root = PropertiesUtil.toString(property(DIRECTORY),
"./repository");
+
+ BlobTrackingStore trackingStore = (BlobTrackingStore)
blobStore;
+ if (trackingStore.getTracker() != null) {
+ trackingStore.getTracker().close();
+ }
+ ((BlobTrackingStore) blobStore).addTracker(
+ new BlobIdTracker(root, repoId, trackSnapshotInterval,
(SharedDataStore)
+ blobStore));
+ }
}
if (store.getBlobStore() instanceof GarbageCollectableBlobStore) {