Author: amitj
Date: Wed Jul 20 03:56:20 2016
New Revision: 1753429
URL: http://svn.apache.org/viewvc?rev=1753429&view=rev
Log:
OAK-4200: [BlobGC] Improve collection times of blobs available
Initial implementation
* The blob ids are tracked locally on all instances.
* There is a snapshot thread which uploads all the locally tracked files to
the datastore.
* When running GC these files are retrieved from all instances sharing the
datastore and then used to return the list blob ids.
Added:
jackrabbit/oak/trunk/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/FileLineDifferenceIteratorTest.java
(contents, props changed)
- copied, changed from r1753393,
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/FileLineDifferenceIteratorTest.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobTrackingStore.java
(with props)
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTracker.java
(with props)
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobTracker.java
(with props)
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTrackerClusterSharedTest.java
(with props)
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTrackerStoreTest.java
(with props)
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTrackerTest.java
(with props)
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreTrackerGCTest.java
(with props)
Removed:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/FileLineDifferenceIteratorTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileStateTest.java
Modified:
jackrabbit/oak/trunk/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/FileIOUtils.java
jackrabbit/oak/trunk/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/FileIOUtilsTest.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
jackrabbit/oak/trunk/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
Modified:
jackrabbit/oak/trunk/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/FileIOUtils.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/FileIOUtils.java?rev=1753429&r1=1753428&r2=1753429&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/FileIOUtils.java
(original)
+++
jackrabbit/oak/trunk/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/FileIOUtils.java
Wed Jul 20 03:56:20 2016
@@ -20,9 +20,11 @@ import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.Closeable;
import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.io.OutputStream;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
@@ -37,13 +39,22 @@ import com.google.common.collect.Iterato
import com.google.common.collect.PeekingIterator;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static com.google.common.base.Charsets.UTF_8;
import static com.google.common.collect.Sets.newHashSet;
import static com.google.common.io.Closeables.close;
+import static com.google.common.io.FileWriteMode.APPEND;
+import static com.google.common.io.Files.asByteSink;
import static com.google.common.io.Files.move;
import static com.google.common.io.Files.newWriter;
import static java.io.File.createTempFile;
-import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.commons.io.FileUtils.copyInputStreamToFile;
+import static org.apache.commons.io.FileUtils.forceDelete;
+import static org.apache.commons.io.IOUtils.closeQuietly;
+import static org.apache.commons.io.IOUtils.copyLarge;
+import static org.apache.commons.io.LineIterator.closeQuietly;
import static
org.apache.jackrabbit.oak.commons.sort.EscapeUtils.escapeLineBreak;
import static
org.apache.jackrabbit.oak.commons.sort.EscapeUtils.unescapeLineBreaks;
import static
org.apache.jackrabbit.oak.commons.sort.ExternalSort.mergeSortedFiles;
@@ -69,7 +80,7 @@ public final class FileIOUtils {
* @param file file whose contents needs to be sorted
*/
public static void sort(File file) throws IOException {
- File sorted = createTempFile("temp", null);
+ File sorted = createTempFile("fleioutilssort", null);
merge(sortInBatch(file, lexComparator, true), sorted);
move(sorted, file);
}
@@ -82,7 +93,7 @@ public final class FileIOUtils {
* @throws IOException
*/
public static void sort(File file, Comparator<String> comparator) throws
IOException {
- File sorted = createTempFile("temp", null);
+ File sorted = createTempFile("fleioutilssort", null);
merge(sortInBatch(file, comparator, true), sorted);
move(sorted, file);
}
@@ -101,6 +112,57 @@ public final class FileIOUtils {
}
/**
+
+ * Copies an input stream to a file.
+ *
+ * @param stream steam to copy
+ * @return
+ * @throws IOException
+ */
+ public static File copy(InputStream stream) throws IOException {
+ File file = createTempFile("fleioutilscopy", null);
+ copyInputStreamToFile(stream, file);
+ return file;
+ }
+
+ /**
+ * Appends the contents of the list of files to the given file and deletes
the files
+ * if the delete flag is enabled.
+ *
+ * If there is a scope for lines in the files containing line break
characters it should be
+ * ensured that the files are written with {@link
#writeAsLine(BufferedWriter, String, boolean)}
+ * with true to escape line break characters.
+ * @param files
+ * @param appendTo
+ * @throws IOException
+ */
+ public static void append(List<File> files, File appendTo, boolean delete)
throws IOException {
+ OutputStream appendStream = null;
+ boolean threw = true;
+
+ try {
+ appendStream = asByteSink(appendTo, APPEND).openBufferedStream();
+
+ for (File f : files) {
+ InputStream iStream = new FileInputStream(f);
+ try {
+ copyLarge(iStream, appendStream);
+ } finally {
+ closeQuietly(iStream);
+ }
+ }
+ if (delete) {
+ for (File f : files) {
+ f.delete();
+ }
+ }
+ threw = false;
+ } finally {
+ close(appendStream, threw);
+ }
+ }
+
+ /**
* Writes a string as a new line into the given buffered writer and
optionally
* escapes the line for line breaks.
*
@@ -215,8 +277,8 @@ public final class FileIOUtils {
/**
* FileLineDifferenceIterator class which iterates over the difference of
2 files line by line.
*
- * If there is a scope for lines in files containing line break characters
it should be
- * ensured that both the file are written with
+ * If there is a scope for lines in the files containing line break
characters it should be
+ * ensured that both the files are written with
* {@link #writeAsLine(BufferedWriter, String, boolean)} with true to
escape line break
* characters.
*/
@@ -301,4 +363,70 @@ public final class FileIOUtils {
return diff;
}
}
+
+ /**
+ * Implements a {@link java.io.Closeable} wrapper over a {@link
LineIterator}.
+ * Also has a transformer to transform the output. If the underlying file
is
+ * provide then it deletes the file on {@link #close()}.
+ *
+ * @param <T> the type of elements in the iterator
+ */
+ public static class CloseableFileIterator<T> extends AbstractIterator<T>
implements Closeable {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final LineIterator iterator;
+ private final Function<String, T> transformer;
+ private File backingFile;
+
+ public CloseableFileIterator(LineIterator iterator, Function<String,
T> transformer) {
+ this.iterator = iterator;
+ this.transformer = transformer;
+ }
+
+ public CloseableFileIterator(LineIterator iterator, File backingFile,
+ Function<String, T> transformer) {
+ this.iterator = iterator;
+ this.transformer = transformer;
+ this.backingFile = backingFile;
+ }
+
+ @Override
+ protected T computeNext() {
+ if (iterator.hasNext()) {
+ return transformer.apply(iterator.next());
+ }
+
+ try {
+ close();
+ } catch (IOException e) {
+ log.warn("Error closing iterator", e);
+ }
+ return endOfData();
+ }
+
+ @Override
+ public void close() throws IOException {
+ closeQuietly(iterator);
+ if (backingFile != null && backingFile.exists()) {
+ forceDelete(backingFile);
+ }
+ }
+
+ public static CloseableFileIterator<String> wrap(LineIterator iter) {
+ return new CloseableFileIterator<String>(iter, new
Function<String, String>() {
+ public String apply(String s) {
+ return s;
+ }
+ });
+ }
+
+ public static CloseableFileIterator<String> wrap(LineIterator iter,
File backingFile) {
+ return new CloseableFileIterator<String>(iter, backingFile,
+ new Function<String, String>() {
+ public String apply(String s) {
+ return s;
+ }
+ });
+ }
+ }
}
Modified:
jackrabbit/oak/trunk/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/FileIOUtilsTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/FileIOUtilsTest.java?rev=1753429&r1=1753428&r2=1753429&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/FileIOUtilsTest.java
(original)
+++
jackrabbit/oak/trunk/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/FileIOUtilsTest.java
Wed Jul 20 03:56:20 2016
@@ -19,10 +19,11 @@
package org.apache.jackrabbit.oak.commons;
import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
-import java.io.FileReader;
import java.io.IOException;
+import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
@@ -36,21 +37,27 @@ import java.util.Random;
import java.util.Set;
import com.google.common.base.Charsets;
+import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import static com.google.common.collect.Lists.newArrayList;
import static com.google.common.collect.Sets.newHashSet;
+import static com.google.common.collect.Sets.union;
+import static org.apache.jackrabbit.oak.commons.FileIOUtils.append;
+import static org.apache.jackrabbit.oak.commons.FileIOUtils.copy;
import static org.apache.jackrabbit.oak.commons.FileIOUtils.lexComparator;
import static
org.apache.jackrabbit.oak.commons.FileIOUtils.lineBreakAwareComparator;
import static org.apache.jackrabbit.oak.commons.FileIOUtils.readStringsAsSet;
import static org.apache.jackrabbit.oak.commons.FileIOUtils.sort;
import static org.apache.jackrabbit.oak.commons.FileIOUtils.writeStrings;
+import static org.apache.jackrabbit.oak.commons.IOUtils.closeQuietly;
import static
org.apache.jackrabbit.oak.commons.sort.EscapeUtils.escapeLineBreak;
import static
org.apache.jackrabbit.oak.commons.sort.EscapeUtils.unescapeLineBreaks;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
/**
@@ -130,7 +137,7 @@ public class FileIOUtilsTest {
while ((line = reader.readLine()) != null) {
retrieved.add(line);
}
- IOUtils.closeQuietly(reader);
+ closeQuietly(reader);
Collections.sort(list);
assertArrayEquals(Arrays.toString(list.toArray()), list.toArray(),
retrieved.toArray());
}
@@ -149,11 +156,100 @@ public class FileIOUtilsTest {
while ((line = reader.readLine()) != null) {
retrieved.add(unescapeLineBreaks(line));
}
- IOUtils.closeQuietly(reader);
+ closeQuietly(reader);
Collections.sort(list);
assertArrayEquals(Arrays.toString(list.toArray()), list.toArray(),
retrieved.toArray());
}
+ @Test
+ public void testCopy() throws IOException{
+ File f = copy(randomStream(0, 256));
+ assertTrue("File does not exist", f.exists());
+ Assert.assertEquals("File length not equal to byte array from which
copied",
+ 256, f.length());
+ assertTrue("Could not delete file", f.delete());
+ }
+
+ @Test
+ public void appendTest() throws IOException {
+ Set<String> added1 = newHashSet("a", "z", "e", "b");
+ File f1 = folder.newFile();
+ writeStrings(added1.iterator(), f1, false);
+
+ Set<String> added2 = newHashSet("2", "3", "5", "6");
+ File f2 = folder.newFile();
+ writeStrings(added2.iterator(), f2, false);
+
+ Set<String> added3 = newHashSet("t", "y", "8", "9");
+ File f3 = folder.newFile();
+ writeStrings(added3.iterator(), f3, false);
+
+ append(newArrayList(f2, f3), f1, true);
+ assertEquals(union(union(added1, added2), added3),
+ readStringsAsSet(new FileInputStream(f1), false));
+ assertTrue(!f2.exists());
+ assertTrue(!f3.exists());
+ assertTrue(f1.exists());
+ }
+
+ @Test
+ public void appendTestNoDelete() throws IOException {
+ Set<String> added1 = newHashSet("a", "z", "e", "b");
+ File f1 = folder.newFile();
+ writeStrings(added1.iterator(), f1, false);
+
+ Set<String> added2 = newHashSet("2", "3", "5", "6");
+ File f2 = folder.newFile();
+ writeStrings(added2.iterator(), f2, false);
+
+ Set<String> added3 = newHashSet("t", "y", "8", "9");
+ File f3 = folder.newFile();
+ writeStrings(added3.iterator(), f3, false);
+
+ append(newArrayList(f2, f3), f1, false);
+ assertEquals(union(union(added1, added2), added3),
+ readStringsAsSet(new FileInputStream(f1), false));
+ assertTrue(f2.exists());
+ assertTrue(f3.exists());
+ assertTrue(f1.exists());
+ }
+
+ @Test
+ public void appendRandomizedTest() throws Exception {
+ Set<String> added1 = newHashSet();
+ File f1 = folder.newFile();
+
+ for (int i = 0; i < 100; i++) {
+ added1.add(getRandomTestString());
+ }
+ int count = writeStrings(added1.iterator(), f1, true);
+ assertEquals(added1.size(), count);
+
+ Set<String> added2 = newHashSet("2", "3", "5", "6");
+ File f2 = folder.newFile();
+ writeStrings(added2.iterator(), f2, true);
+
+ append(newArrayList(f2), f1, true);
+ assertEquals(union(added1, added2),
+ readStringsAsSet(new FileInputStream(f1), true));
+ }
+
+ @Test
+ public void appendWithLineBreaksTest() throws IOException {
+ Set<String> added1 = newHashSet(getLineBreakStrings());
+ File f1 = folder.newFile();
+ int count = writeStrings(added1.iterator(), f1, true);
+ assertEquals(added1.size(), count);
+
+ Set<String> added2 = newHashSet("2", "3", "5", "6");
+ File f2 = folder.newFile();
+ writeStrings(added2.iterator(), f2, true);
+
+ append(newArrayList(f1), f2, true);
+ assertEquals(union(added1, added2), readStringsAsSet(new
FileInputStream(f2), true));
+ }
+
+
private static List<String> getLineBreakStrings() {
return newArrayList("ab\nc\r", "ab\\z", "a\\\\z\nc",
"/a", "/a/b\nc", "/a/b\rd", "/a/b\r\ne", "/a/c");
@@ -194,4 +290,11 @@ public class FileIOUtilsTest {
}
return buffer.toString();
}
+
+ static InputStream randomStream(int seed, int size) {
+ Random r = new Random(seed);
+ byte[] data = new byte[size];
+ r.nextBytes(data);
+ return new ByteArrayInputStream(data);
+ }
}
Copied:
jackrabbit/oak/trunk/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/FileLineDifferenceIteratorTest.java
(from r1753393,
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/FileLineDifferenceIteratorTest.java)
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/FileLineDifferenceIteratorTest.java?p2=jackrabbit/oak/trunk/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/FileLineDifferenceIteratorTest.java&p1=jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/FileLineDifferenceIteratorTest.java&r1=1753393&r2=1753429&rev=1753429&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/FileLineDifferenceIteratorTest.java
(original)
+++
jackrabbit/oak/trunk/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/FileLineDifferenceIteratorTest.java
Wed Jul 20 03:56:20 2016
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.jackrabbit.oak.plugins.blob;
+package org.apache.jackrabbit.oak.commons;
import java.io.IOException;
import java.io.StringReader;
Propchange:
jackrabbit/oak/trunk/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/FileLineDifferenceIteratorTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobTrackingStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobTrackingStore.java?rev=1753429&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobTrackingStore.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobTrackingStore.java
Wed Jul 20 03:56:20 2016
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobTracker;
+
+/**
+ * Interface to be implemented by a data store which can support local blob id
tracking.
+ */
+public interface BlobTrackingStore extends SharedDataStore {
+ /**
+ * Registers a tracker in the data store.
+ * @param tracker
+ */
+ void addTracker(BlobTracker tracker);
+
+ /**
+ * Gets the traker registered in the data store.
+ *
+ * @return tracker
+ */
+ BlobTracker getTracker();
+}
+
Propchange:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobTrackingStore.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java?rev=1753429&r1=1753428&r2=1753429&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java
Wed Jul 20 03:56:20 2016
@@ -18,18 +18,9 @@ package org.apache.jackrabbit.oak.plugin
import java.io.Closeable;
import java.io.File;
-import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Comparator;
-import java.util.List;
-
-import com.google.common.io.Files;
import org.apache.commons.io.FileUtils;
-import org.apache.jackrabbit.oak.commons.IOUtils;
-import org.apache.jackrabbit.oak.commons.sort.ExternalSort;
/**
* Class for keeping the file system state of the garbage collection.
@@ -52,13 +43,6 @@ public class GarbageCollectorFileState i
/** The garbage stores the garbage collection candidates which were not
deleted . */
private final File garbage;
- private final static Comparator<String> lexComparator =
- new Comparator<String>() {
- @Override
- public int compare(String s1, String s2) {
- return s1.compareTo(s2);
- }
- };
/**
* Instantiates a new garbage collector file state.
@@ -124,51 +108,4 @@ public class GarbageCollectorFileState i
FileUtils.deleteDirectory(home);
}
}
-
- /**
- * Sorts the given file externally.
- *
- * @param file file whose contents needs to be sorted
- */
- public static void sort(File file) throws IOException {
- File sorted = createTempFile();
- merge(ExternalSort.sortInBatch(file, lexComparator, true), sorted);
- Files.move(sorted, file);
- }
-
- /**
- * Sorts the given file externally with the given comparator.
- *
- * @param file file whose contents needs to be sorted
- * @param comparator to compare
- * @throws IOException
- */
- public static void sort(File file, Comparator<String> comparator) throws
IOException {
- File sorted = createTempFile();
- merge(ExternalSort.sortInBatch(file, comparator, true), sorted);
- Files.move(sorted, file);
- }
-
-
- public static void merge(List<File> files, File output) throws IOException
{
- ExternalSort.mergeSortedFiles(
- files,
- output, lexComparator, true);
- }
-
- public static File copy(InputStream stream) throws IOException {
- File file = createTempFile();
- OutputStream out = null;
- try {
- out = new FileOutputStream(file);
- IOUtils.copy(stream, out);
- } finally {
- IOUtils.closeQuietly(out);
- }
- return file;
- }
-
- private static File createTempFile() throws IOException {
- return File.createTempFile("temp", null);
- }
}
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java?rev=1753429&r1=1753428&r2=1753429&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
Wed Jul 20 03:56:20 2016
@@ -18,6 +18,7 @@ package org.apache.jackrabbit.oak.plugin
import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
+import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
@@ -57,6 +58,7 @@ import org.apache.jackrabbit.core.data.D
import org.apache.jackrabbit.oak.commons.FileIOUtils;
import
org.apache.jackrabbit.oak.commons.FileIOUtils.FileLineDifferenceIterator;
import org.apache.jackrabbit.oak.commons.IOUtils;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobTracker;
import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
import
org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType;
import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
@@ -64,6 +66,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.google.common.collect.Lists.newArrayList;
+import static org.apache.commons.io.FileUtils.copyFile;
+import static org.apache.jackrabbit.oak.commons.FileIOUtils.copy;
+import static org.apache.jackrabbit.oak.commons.FileIOUtils.merge;
+import static org.apache.jackrabbit.oak.commons.FileIOUtils.sort;
+import static org.apache.jackrabbit.oak.commons.IOUtils.closeQuietly;
/**
* Mark and sweep garbage collector.
@@ -397,42 +404,41 @@ public class MarkSweepGarbageCollector i
LOG.debug("Sweeping blobs with modified time > than the configured max
deleted time ({}). ",
timestampToString(lastMaxModifiedTime));
- ConcurrentLinkedQueue<String> exceptionQueue = new
ConcurrentLinkedQueue<String>();
-
- LineIterator iterator =
- FileUtils.lineIterator(fs.getGcCandidates(),
Charsets.UTF_8.name());
- List<String> ids = newArrayList();
-
- while (iterator.hasNext()) {
- ids.add(iterator.next());
-
- if (ids.size() >= getBatchCount()) {
- count += ids.size();
- deleted += sweepInternal(ids, exceptionQueue,
lastMaxModifiedTime);
- ids = newArrayList();
- }
- }
- if (!ids.isEmpty()) {
- count += ids.size();
- deleted += sweepInternal(ids, exceptionQueue, lastMaxModifiedTime);
- }
-
- BufferedWriter writer = null;
+ BufferedWriter removesWriter = null;
+ LineIterator iterator = null;
try {
- if (!exceptionQueue.isEmpty()) {
- writer = Files.newWriter(fs.getGarbage(), Charsets.UTF_8);
- saveBatchToFile(newArrayList(exceptionQueue), writer);
+ removesWriter = Files.newWriter(fs.getGarbage(), Charsets.UTF_8);
+ ConcurrentLinkedQueue<String> exceptionQueue = new
ConcurrentLinkedQueue<String>();
+ iterator =
+ FileUtils.lineIterator(fs.getGcCandidates(),
Charsets.UTF_8.name());
+
+ List<String> ids = newArrayList();
+ while (iterator.hasNext()) {
+ ids.add(iterator.next());
+
+ if (ids.size() >= getBatchCount()) {
+ count += ids.size();
+ deleted += BlobCollectionType.get(blobStore)
+ .sweepInternal(blobStore,ids, exceptionQueue,
lastMaxModifiedTime);
+ ids = newArrayList();
+ saveBatchToFile(newArrayList(exceptionQueue),
removesWriter);
+ exceptionQueue.clear();
+ }
+ }
+ if (!ids.isEmpty()) {
+ count += ids.size();
+ deleted += BlobCollectionType.get(blobStore)
+ .sweepInternal(blobStore, ids, exceptionQueue,
lastMaxModifiedTime);
+ saveBatchToFile(newArrayList(exceptionQueue), removesWriter);
+ exceptionQueue.clear();
}
} finally {
LineIterator.closeQuietly(iterator);
- IOUtils.closeQuietly(writer);
+ closeQuietly(removesWriter);
}
- if(!exceptionQueue.isEmpty()) {
- LOG.warn("Unable to delete some blobs entries from the blob store.
Details around such blob entries can "
- + "be found in [{}]",
- fs.getGarbage().getAbsolutePath());
- }
+ BlobCollectionType.get(blobStore).handleRemoves(blobStore,
fs.getGarbage());
+
if(count != deleted) {
LOG.warn("Deleted only [{}] blobs entries from the [{}] candidates
identified. This may happen if blob "
+ "modified time is > "
@@ -466,30 +472,6 @@ public class MarkSweepGarbageCollector i
ids.clear();
writer.flush();
}
-
- /**
- * Deletes a batch of blobs from blob store.
- *
- * @param ids
- * @param exceptionQueue
- * @param maxModified
- */
- private long sweepInternal(List<String> ids, ConcurrentLinkedQueue<String>
exceptionQueue, long maxModified) {
- long deleted = 0;
- try {
- LOG.trace("Blob ids to be deleted {}", ids);
- deleted = blobStore.countDeleteChunks(ids, maxModified);
- if (deleted != ids.size()) {
- // Only log and do not add to exception queue since some blobs
may not match the
- // lastMaxModifiedTime criteria.
- LOG.debug("Some [{}] blobs were not deleted from the batch :
[{}]", ids.size() - deleted, ids);
- }
- } catch (Exception e) {
- LOG.warn("Error occurred while deleting blob with ids [{}]", ids,
e);
- exceptionQueue.addAll(ids);
- }
- return deleted;
- }
/**
* Iterates the complete node tree and collect all blob references
@@ -552,7 +534,7 @@ public class MarkSweepGarbageCollector i
LOG.info("Number of valid blob references marked under mark phase
of " +
"Blob garbage collection [{}]", count.get());
// sort the marked references with the first part of the key
- GarbageCollectorFileState.sort(fs.getMarkedRefs(),
+ sort(fs.getMarkedRefs(),
new Comparator<String>() {
@Override
public int compare(String s1, String s2) {
@@ -560,7 +542,7 @@ public class MarkSweepGarbageCollector i
}
});
} finally {
- IOUtils.closeQuietly(writer);
+ closeQuietly(writer);
}
}
@@ -626,37 +608,20 @@ public class MarkSweepGarbageCollector i
@Override
public Integer call() throws Exception {
- LOG.debug("Starting retrieve of all blobs");
- BufferedWriter bufferWriter = null;
- int blobsCount = 0;
- try {
- bufferWriter = new BufferedWriter(
- new FileWriter(fs.getAvailableRefs()));
- Iterator<String> idsIter = blobStore.getAllChunkIds(0);
- List<String> ids = newArrayList();
-
- while (idsIter.hasNext()) {
- ids.add(idsIter.next());
- if (ids.size() > getBatchCount()) {
- blobsCount += ids.size();
- saveBatchToFile(ids, bufferWriter);
- LOG.info("Retrieved ({}) blobs", blobsCount);
- }
- }
-
- if (!ids.isEmpty()) {
- blobsCount += ids.size();
- saveBatchToFile(ids, bufferWriter);
- LOG.info("Retrieved ({}) blobs", blobsCount);
- }
+ BlobCollectionType.get(blobStore).retrieve(blobStore, fs,
getBatchCount());
+ long length = fs.getAvailableRefs().length();
+ LOG.info("Length of blob ids file retrieved {}", length);
+
+ // If the length is 0 then references not available from the
tracker
+ // retrieve from the data store
+ if (fs.getAvailableRefs().length() <= 0) {
+ BlobCollectionType.DEFAULT.retrieve(blobStore, fs,
getBatchCount());
+ length = fs.getAvailableRefs().length();
+ LOG.info("Length of blob ids file retrieved {}", length);
- // sort the file
- GarbageCollectorFileState.sort(fs.getAvailableRefs());
- LOG.info("Number of blobs present in BlobStore : [{}] ",
blobsCount);
- } finally {
- IOUtils.closeQuietly(bufferWriter);
+ BlobCollectionType.get(blobStore).track(blobStore, fs);
}
- return blobsCount;
+ return 0;
}
}
@@ -712,11 +677,11 @@ public class MarkSweepGarbageCollector i
// List of files to be merged
List<File> files = newArrayList();
for (DataRecord refFile : refFiles) {
- File file =
GarbageCollectorFileState.copy(refFile.getStream());
+ File file = copy(refFile.getStream());
files.add(file);
}
- GarbageCollectorFileState.merge(files, fs.getMarkedRefs());
+ merge(files, fs.getMarkedRefs());
// Get the timestamp to indicate the earliest mark phase
start
List<DataRecord> markerFiles =
@@ -788,4 +753,169 @@ public class MarkSweepGarbageCollector i
public void addMarkedStartMarker(GarbageCollectableBlobStore
blobStore, String repoId) {}
}
+
+ /**
+ * Defines different blob collection types and encodes the divergent
behavior.
+ * <ul></ul>
+ */
+ private enum BlobCollectionType {
+ TRACKER {
+ /**
+ * Deletes the given batch by deleting individually to exactly
know the actual deletes.
+ */
+ @Override
+ long sweepInternal(GarbageCollectableBlobStore blobStore,
List<String> ids,
+ ConcurrentLinkedQueue<String> exceptionQueue, long
maxModified) {
+ long totalDeleted = 0;
+ LOG.trace("Blob ids to be deleted {}", ids);
+ for (String id : ids) {
+ try {
+ long deleted =
blobStore.countDeleteChunks(newArrayList(id), maxModified);
+ if (deleted != 1) {
+ LOG.debug("Blob [{}] not deleted", id);
+ } else {
+ exceptionQueue.add(id);
+ }
+ totalDeleted += deleted;
+ } catch (Exception e) {
+ LOG.warn("Error occurred while deleting blob with id
[{}]", id, e);
+ }
+ }
+ return totalDeleted;
+ }
+
+ @Override
+ void retrieve(GarbageCollectableBlobStore blobStore,
+ GarbageCollectorFileState fs, int batchCount) throws
Exception {
+ ((BlobTrackingStore) blobStore).getTracker()
+ .get(fs.getAvailableRefs().getAbsolutePath());
+ }
+
+ @Override
+ void handleRemoves(GarbageCollectableBlobStore blobStore,
+ File removedIds) throws IOException {
+ ((BlobTrackingStore)
blobStore).getTracker().remove(removedIds);
+ }
+
+ @Override
+ void track(GarbageCollectableBlobStore blobStore,
+ GarbageCollectorFileState fs) {
+ try {
+ File f = File.createTempFile("blobiddownload", null);
+ copyFile(fs.getAvailableRefs(), f);
+ ((BlobTrackingStore) blobStore).getTracker().add(f);
+ } catch (IOException e) {
+ LOG.warn("Unable to track blob ids locally");
+ }
+ }
+ },
+ DEFAULT;
+
+ /**
+ * Deletes a batch of blobs from blob store.
+ *
+ * @param blobStore blobStore
+ * @param ids ids to sweep
+ * @param exceptionQueue add removes to the queue
+ * @param maxModified maxModified time of blobs to be deleted
+ * @return
+ */
+ long sweepInternal(GarbageCollectableBlobStore blobStore,
+ List<String> ids, ConcurrentLinkedQueue<String> exceptionQueue,
long maxModified) {
+ long deleted = 0;
+ try {
+ LOG.trace("Blob ids to be deleted {}", ids);
+ deleted = blobStore.countDeleteChunks(ids, maxModified);
+ if (deleted != ids.size()) {
+ LOG.debug("Some [{}] blobs were not deleted from the batch
: [{}]",
+ ids.size() - deleted, ids);
+ exceptionQueue.addAll(ids);
+ }
+ } catch (Exception e) {
+ LOG.warn("Error occurred while deleting blob with ids [{}]",
ids, e);
+ }
+ return deleted;
+ }
+
+ /**
+ * Retrieve the put the list of available blobs in the file.
+ *
+ * @param blobStore
+ * @param fs
+ * @param batchCount
+ * @throws Exception
+ */
+ void retrieve(GarbageCollectableBlobStore blobStore,
+ GarbageCollectorFileState fs, int batchCount) throws Exception
{
+ LOG.debug("Starting retrieve of all blobs");
+ BufferedWriter bufferWriter = null;
+ int blobsCount = 0;
+ Iterator<String> idsIter = null;
+ try {
+ bufferWriter = new BufferedWriter(
+ new FileWriter(fs.getAvailableRefs()));
+
+ idsIter = blobStore.getAllChunkIds(0);
+ List<String> ids = newArrayList();
+ while (idsIter.hasNext()) {
+ ids.add(idsIter.next());
+ if (ids.size() > batchCount) {
+ blobsCount += ids.size();
+ saveBatchToFile(ids, bufferWriter);
+ LOG.info("Retrieved ({}) blobs", blobsCount);
+ }
+ }
+
+ if (!ids.isEmpty()) {
+ blobsCount += ids.size();
+ saveBatchToFile(ids, bufferWriter);
+ LOG.info("Retrieved ({}) blobs", blobsCount);
+ }
+
+ // sort the file
+ sort(fs.getAvailableRefs());
+ LOG.info("Number of blobs present in BlobStore : [{}] ",
blobsCount);
+ } finally {
+ closeQuietly(bufferWriter);
+ if (idsIter instanceof Closeable) {
+ try {
+ Closeables.close((Closeable) idsIter, false);
+ } catch (Exception e) {
+ LOG.debug("Error closing iterator");
+ }
+ }
+ }
+ }
+
+ /**
+ * Hook to handle all the removed ids.
+ *
+ * @param blobStore
+ * @param removedIds
+ * @throws IOException
+ */
+ void handleRemoves(GarbageCollectableBlobStore blobStore,
+ File removedIds) throws IOException {
+ FileUtils.forceDelete(removedIds);
+ }
+
+ /**
+ * Tracker may want to track this file
+ *
+ * @param blobStore
+ * @param fs
+ */
+ void track(GarbageCollectableBlobStore blobStore,
GarbageCollectorFileState fs) {
+ }
+
+ public static BlobCollectionType get(GarbageCollectableBlobStore
blobStore) {
+ if (blobStore instanceof BlobTrackingStore) {
+ BlobTracker tracker = ((BlobTrackingStore)
blobStore).getTracker();
+ if (tracker != null) {
+ return TRACKER;
+ }
+ }
+ return DEFAULT;
+ }
+ }
}
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTracker.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTracker.java?rev=1753429&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTracker.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTracker.java
Wed Jul 20 03:56:20 2016
@@ -0,0 +1,607 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob.datastore;
+
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.locks.ReentrantLock;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import org.apache.jackrabbit.core.data.DataRecord;
+import
org.apache.jackrabbit.oak.commons.FileIOUtils.FileLineDifferenceIterator;
+import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
+import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static com.google.common.base.Predicates.alwaysTrue;
+import static com.google.common.collect.Iterables.filter;
+import static com.google.common.collect.Iterables.transform;
+import static com.google.common.collect.Lists.newArrayList;
+import static com.google.common.io.Files.asByteSource;
+import static com.google.common.io.Files.fileTreeTraverser;
+import static com.google.common.io.Files.move;
+import static com.google.common.io.Files.newWriter;
+import static java.io.File.createTempFile;
+import static java.lang.System.currentTimeMillis;
+import static java.util.Collections.synchronizedList;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.commons.io.FileUtils.copyFile;
+import static org.apache.commons.io.FileUtils.forceDelete;
+import static org.apache.commons.io.FileUtils.forceMkdir;
+import static org.apache.commons.io.FileUtils.lineIterator;
+import static org.apache.commons.io.FilenameUtils.concat;
+import static org.apache.commons.io.FilenameUtils.removeExtension;
+import static org.apache.commons.io.IOUtils.closeQuietly;
+import static
org.apache.jackrabbit.oak.commons.FileIOUtils.CloseableFileIterator.wrap;
+import static org.apache.jackrabbit.oak.commons.FileIOUtils.append;
+import static org.apache.jackrabbit.oak.commons.FileIOUtils.copy;
+import static org.apache.jackrabbit.oak.commons.FileIOUtils.sort;
+import static org.apache.jackrabbit.oak.commons.FileIOUtils.writeStrings;
+import static
org.apache.jackrabbit.oak.plugins.blob.datastore.BlobIdTracker.Store.Type.GENERATION;
+import static
org.apache.jackrabbit.oak.plugins.blob.datastore.BlobIdTracker.Store.Type.IN_PROCESS;
+import static
org.apache.jackrabbit.oak.plugins.blob.datastore.BlobIdTracker.Store.Type.REFS;
+
+
+/**
+ * Tracks the blob ids available or added in the blob store.
+ * - We can identify instance node where uploading/downloading not required
+ * - From other nodes we can only upload deltas since, last snapshot
+ * - If all nodes have the speificed flag then all nodes run partial gc
+ *
+ */
+public class BlobIdTracker implements Closeable, BlobTracker {
+ private static final Logger LOG =
LoggerFactory.getLogger(BlobIdTracker.class);
+
+ private static final String datastoreMeta = "blobreferences";
+ private static final String fileNamePrefix = "blob";
+ private static final String mergedFileSuffix = ".refs";
+
+ /* Local instance identifier */
+ private final String instanceId = String.valueOf(currentTimeMillis());
+
+ private final SharedDataStore datastore;
+
+ protected Store store;
+
+ private final ScheduledExecutorService scheduler;
+
+ private String prefix;
+
+ public BlobIdTracker(String path, String repositoryId,
+ long snapshotIntervalSecs, SharedDataStore datastore) throws
IOException {
+ this(path, repositoryId, newSingleThreadScheduledExecutor(),
+ snapshotIntervalSecs, snapshotIntervalSecs, datastore);
+ }
+
+ public BlobIdTracker(String path, String repositoryId,
ScheduledExecutorService scheduler,
+ long snapshotDelaySecs, long snapshotIntervalSecs, SharedDataStore
datastore)
+ throws IOException {
+ String root = concat(path, datastoreMeta);
+ File rootDir = new File(root);
+ this.datastore = datastore;
+ this.scheduler = scheduler;
+
+ try {
+ forceMkdir(rootDir);
+ prefix = fileNamePrefix + "-" + repositoryId;
+ this.store = new Store(rootDir, prefix);
+ scheduler.scheduleAtFixedRate(new SnapshotJob(),
+ SECONDS.toMillis(snapshotDelaySecs),
+ SECONDS.toMillis(snapshotIntervalSecs), MILLISECONDS);
+ } catch (IOException e) {
+ LOG.error("Error initializing blob tracker", e);
+ close();
+ throw e;
+ }
+ }
+
+ @Override
+ public void remove(File recs) throws IOException {
+ store.removeRecords(recs);
+ }
+
+ @Override
+ public void remove(Iterator<String> recs) throws IOException {
+ store.removeRecords(recs);
+ }
+
+ @Override
+ public void add(String id) throws IOException {
+ store.addRecord(id);
+ }
+
+ @Override
+ public void add(Iterator<String> recs) throws IOException {
+ store.addRecords(recs);
+ }
+
+ @Override
+ public void add(File recs) throws IOException {
+ store.addRecords(recs);
+ }
+
+ /**
+ * Retrieves all the reference files available in the DataStore and merges
them to a local
+ * and then returns an iterator over it. This way the ids returned are as
recent as the
+ * snapshots taken on all instances/repositories connected to the
DataStore.
+ *
+ * The iterator returned ia a Closeable instance and should be closed by
calling #close().
+ *
+ * @return iterator over all the blob ids available
+ * @throws IOException
+ */
+ @Override
+ public Iterator<String> get() throws IOException {
+ try {
+ globalMerge();
+ return store.getRecords();
+ } catch (IOException e) {
+ LOG.error("Error in retrieving blob records iterator", e);
+ throw e;
+ }
+ }
+
+ @Override
+ public File get(String path) throws IOException {
+ globalMerge();
+ return store.getRecords(path);
+ }
+
+ private void globalMerge() throws IOException {
+ try {
+ // Download all the blob reference files except the local one from
the data store
+ Iterable<DataRecord> refRecords = filter(
+ datastore.getAllMetadataRecords(fileNamePrefix), new
Predicate<DataRecord>() {
+ @Override
+ public boolean apply(DataRecord input) {
+ return
!input.getIdentifier().toString().contains(instanceId);
+ }
+ });
+ // Download all the corresponding reference files
+ List<File> refFiles = newArrayList(
+ transform(refRecords,
+ new Function<DataRecord, File>() {
+ @Override
+ public File apply(DataRecord input) {
+ InputStream inputStream = null;
+ try {
+ inputStream = input.getStream();
+ return copy(inputStream);
+ } catch (Exception e) {
+ LOG.warn("Error copying data store file
locally {}",
+ input.getIdentifier(), e);
+ } finally {
+ closeQuietly(inputStream);
+ }
+ return null;
+ }
+ }));
+
+ // Merge all the downloaded files in to the local store
+ store.merge(refFiles, true);
+
+ // Remove all the data store records
+ for (DataRecord rec : refRecords) {
+ datastore.deleteMetadataRecord(rec.getIdentifier().toString());
+ LOG.info("Deleted metadata record {}",
rec.getIdentifier().toString());
+ }
+ } catch (IOException e) {
+ LOG.error("Error in merging blob records iterator from the data
store", e);
+ throw e;
+ }
+ }
+
+ /**
+ * Takes a snapshot on the tracker.
+ * Uploads the latest snapshot to the DataStore so, that it's visible
+ * to other cluster nodes/repositories connected to the DataStore.
+ *
+ * @throws IOException
+ */
+ private void snapshot() throws IOException {
+ InputStream inputStream = null;
+ try {
+ store.snapshot();
+
+ inputStream =
+ asByteSource(store.getBlobRecordsFile()).openBufferedStream();
+ datastore.addMetadataRecord(inputStream,
+ (prefix + instanceId + mergedFileSuffix));
+ } catch (Exception e) {
+ LOG.error("Error taking snapshot", e);
+ throw new IOException("Snapshot error", e);
+ } finally {
+ closeQuietly(inputStream);
+ }
+ }
+
+ /**
+ * Closes the tracker and the underlying store.
+ *
+ * @throws IOException
+ */
+ @Override
+ public void close() throws IOException {
+ store.close();
+ new ExecutorCloser(scheduler).close();
+ }
+
+ /**
+ * Local store for managing the blob reference
+ */
+ static class Store implements Closeable {
+ /* Suffix for a snapshot generation file */
+ private static final String genFileNameSuffix = ".gen";
+
+ /* Suffix for in process file */
+ private static final String workingCopySuffix = ".process";
+
+ /* The current writer where all blob ids are being appended */
+ private BufferedWriter writer;
+
+ /* In-process file */
+ private File processFile;
+
+ /* All available generations that need to be merged */
+ private final List<File> generations;
+
+ private final File rootDir;
+
+ private final String prefix;
+
+ /* Lock for operations on references file */
+ private final ReentrantLock refLock;
+
+ Store(File rootDir, String prefix) throws IOException {
+ this.rootDir = rootDir;
+ this.prefix = prefix;
+ this.refLock = new ReentrantLock();
+
+ // Retrieve the process file if it exists
+ processFile =
+
fileTreeTraverser().breadthFirstTraversal(rootDir).firstMatch(IN_PROCESS.filter())
+ .orNull();
+
+ // Get the List of all generations available.
+ generations = synchronizedList(newArrayList(
+
fileTreeTraverser().breadthFirstTraversal(rootDir).filter(GENERATION.filter())));
+
+ // Close/rename any existing in process
+ nextGeneration();
+ }
+
+ /**
+ * Add a blob id to the tracking file.
+ *
+ * @param id id to track
+ * @throws IOException
+ */
+ protected synchronized void addRecord(String id) throws IOException {
+ writer.append(id);
+ writer.newLine();
+ writer.flush();
+ LOG.debug("Added record {}", id);
+ }
+
+ /**
+ * Returns an iterator on the tracked blob ids.
+ *
+ * @return record iterator
+ * @throws IOException
+ */
+ protected Iterator<String> getRecords() throws IOException {
+ try {
+ // Get a temp file path
+ String path = createTempFile("temp", null).getAbsolutePath();
+ return wrap(lineIterator(getRecords(path)), new File(path));
+ } catch (IOException e) {
+ LOG.error("Error in retrieving blob records iterator", e);
+ throw e;
+ }
+ }
+
+ /**
+ * Returns a file with the tracked blob ids.
+ *
+ * @param path path of the file
+ * @return File containing tracked file ids
+ * @throws IOException
+ */
+ protected File getRecords(String path) throws IOException {
+ refLock.lock();
+ File copiedRecsFile = new File(path);
+ try {
+ copyFile(getBlobRecordsFile(), copiedRecsFile);
+ return copiedRecsFile;
+ } catch (IOException e) {
+ LOG.error("Error in retrieving blob records file", e);
+ throw e;
+ } finally {
+ refLock.unlock();
+ }
+ }
+
+ /**
+ * Returns the blob references file
+ *
+ * @return blob reference file
+ * @throws IOException
+ */
+ protected File getBlobRecordsFile() throws IOException {
+ File refs = new File(rootDir, prefix + REFS.getFileNameSuffix());
+ if (!refs.exists()) {
+ LOG.debug("File created {}", refs.createNewFile());
+ }
+ return refs;
+ }
+
+ /**
+ * Merges the given files with the references file and deletes the
files.
+ *
+ * @param refFiles files to merge
+ * @param doSort whether to sort while merging
+ * @throws IOException
+ */
+ protected void merge(List<File> refFiles, boolean doSort) throws
IOException {
+ refLock.lock();
+ try {
+ if (refFiles != null && !refFiles.isEmpty()) {
+ File merged = new File(rootDir, prefix +
REFS.getFileNameSuffix());
+ append(refFiles, merged, true);
+ LOG.debug("Merged files into references {}", refFiles);
+ refFiles.clear();
+ }
+ if (doSort) {
+ sort(getBlobRecordsFile());
+ }
+ } finally {
+ refLock.unlock();
+ }
+ }
+
+ /**
+ * Removes ids obtained by the given iterator from the tracked
references.
+ * The iterator has to be closed by the caller.
+ *
+ * @param recs iterator over records to remove
+ * @throws IOException
+ */
+ protected void removeRecords(Iterator<String> recs) throws IOException
{
+ // Spool the ids to be deleted into a file and sort
+ File deleted = createTempFile("deleted", null);
+ writeStrings(recs, deleted, false);
+ removeRecords(deleted);
+ LOG.trace("Removed records");
+ }
+
+ /**
+ * Removes ids obtained by the given file from the tracked references.
+ * File is deleted before returning.
+ *
+ * @param recs file of records to delete
+ * @throws IOException
+ */
+ protected void removeRecords(File recs) throws IOException {
+ refLock.lock();
+ try {
+ sort(getBlobRecordsFile());
+ sort(recs);
+ LOG.trace("Sorted files");
+
+ // Remove and spool the remaining ids into a temp file
+ File temp = createTempFile("sorted", null);
+ FileLineDifferenceIterator iterator = null;
+ try {
+ iterator = new FileLineDifferenceIterator(recs,
getBlobRecordsFile(), null);
+ writeStrings(iterator, temp, false);
+ } finally {
+ if (iterator != null) {
+ iterator.close();
+ }
+ }
+
+ File blobRecs = getBlobRecordsFile();
+ move(temp, blobRecs);
+ LOG.trace("removed records");
+ } finally {
+ refLock.unlock();
+ try {
+ forceDelete(recs);
+ } catch (IOException e) {
+ LOG.debug("Failed to delete file {}", recs, e);
+ }
+ }
+ }
+
+ /**
+ * Opens a new generation file and a writer over it.
+ *
+ * @throws IOException
+ */
+ private synchronized void nextGeneration() throws IOException {
+ close();
+
+ processFile = new File(rootDir, prefix +
IN_PROCESS.getFileNameSuffix());
+ writer = newWriter(processFile, UTF_8);
+ LOG.info("Created new process file and writer over {} ",
processFile.getAbsolutePath());
+ }
+
+ /**
+ * Adds all the ids backed by the iterator into the references.
+ * Closing the iterator is the responsibility of the caller.
+ *
+ * @param recs iterator over records to add
+ * @throws IOException
+ */
+ protected void addRecords(Iterator<String> recs) throws IOException {
+ // Spool contents into a temp file
+ File added = createTempFile("added", null);
+ writeStrings(recs, added, false);
+ // Merge the file with the references
+ merge(Lists.newArrayList(added), false);
+ }
+
+ /**
+ * Merges the contents of the file into the references file.
+ *
+ * @param recs File of records to add
+ * @throws IOException
+ */
+ protected void addRecords(File recs) throws IOException {
+ // Merge the file with the references
+ merge(Lists.newArrayList(recs), false);
+ }
+
+ /**
+ * Snapshots the local store, starts a new generation for writing
+ * and merges all generations available.
+ *
+ * @throws IOException
+ */
+ protected void snapshot() throws IOException {
+ nextGeneration();
+ merge(generations, false);
+ }
+
+ @Override
+ public synchronized void close() {
+ closeQuietly(writer);
+ LOG.info("Closed writer");
+
+ // Convert the process file to a generational file
+ if (processFile != null) {
+ File renamed = new
File(removeExtension(processFile.getAbsolutePath()));
+ boolean success = processFile.renameTo(renamed);
+ LOG.debug("File renamed {}", success);
+ if (success) {
+ generations.add(renamed);
+ LOG.info("Process file renamed to {}",
renamed.getAbsolutePath());
+ } else {
+ LOG.trace("Trying a copy file operation");
+ try {
+ if (renamed.createNewFile()) {
+ Files.copy(processFile, renamed);
+ generations.add(renamed);
+ LOG.info("{} File copied to {}",
processFile.getAbsolutePath(),
+ renamed.getAbsolutePath());
+ }
+ } catch (Exception e) {
+ LOG.warn("Unable to copy process file to corresponding
gen file. Some"
+ + " elements may be missed", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Different types of files
+ */
+ enum Type {
+ IN_PROCESS {
+ @Override String getFileNameSuffix() {
+ return "." + currentTimeMillis() + genFileNameSuffix +
workingCopySuffix;
+ }
+
+ @Override Predicate<File> filter() {
+ return new Predicate<File>() {
+ @Override public boolean apply(File input) {
+ return input.getName().endsWith(workingCopySuffix)
&& input.getName().startsWith(fileNamePrefix);
+ }
+ };
+ }
+ },
+ GENERATION {
+ @Override String getFileNameSuffix() {
+ return "." + currentTimeMillis() + genFileNameSuffix;
+ }
+
+ @Override Predicate<File> filter() {
+ return new Predicate<File>() {
+ @Override public boolean apply(File input) {
+ return input.getName().startsWith(fileNamePrefix)
+ && input.getName().contains(genFileNameSuffix)
+ &&
!input.getName().endsWith(workingCopySuffix);
+ }
+ };
+ }
+ },
+ REFS {
+ @Override String getFileNameSuffix() {
+ return mergedFileSuffix;
+ }
+
+ @Override Predicate<File> filter() {
+ return new Predicate<File>() {
+ @Override public boolean apply(File input) {
+ return input.getName().endsWith(mergedFileSuffix)
+ && input.getName().startsWith(fileNamePrefix);
+ }
+ };
+ }
+ };
+
+ /**
+ * Returns the file name suffix according to the type.
+ *
+ * @return file name suffix
+ */
+ String getFileNameSuffix() {
+ return "";
+ }
+
+ /**
+ * Returns the predicate to filter files in the root directory
according to the type.
+ *
+ * @return a predicate to select particular file types
+ */
+ Predicate<File> filter() {
+ return alwaysTrue();
+ }
+ }
+ }
+
+ /**
+ * Job which calls the snapshot on the tracker.
+ */
+ class SnapshotJob implements Runnable {
+ @Override
+ public void run() {
+ try {
+ snapshot();
+ LOG.info("Finished taking snapshot");
+ } catch (Exception e) {
+ LOG.warn("Failure in taking snapshot", e);
+ }
+ }
+ }
+
+}
Propchange:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTracker.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobTracker.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobTracker.java?rev=1753429&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobTracker.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobTracker.java
Wed Jul 20 03:56:20 2016
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob.datastore;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Track the blob ids.
+ */
+public interface BlobTracker extends Closeable {
+ /**
+ * Adds the given id.
+ *
+ * @param id the record id to be tracked
+ * @throws IOException
+ */
+ void add(String id) throws IOException;
+
+ /**
+ * Adds the given ids.
+ *
+ * @param recs
+ * @throws IOException
+ */
+ void add(Iterator<String> recs) throws IOException;
+
+ /**
+ * Adds the ids in the given file.
+ *
+ * @param recs
+ * @throws IOException
+ */
+ void add(File recs) throws IOException;
+
+ /**
+ * Remove the given ids.
+ *
+ * @param recs
+ * @throws IOException
+ */
+ void remove(Iterator<String> recs) throws IOException;
+
+ /**
+ * Remove the ids in the given file and deletes the file.
+ *
+ * @param recs
+ * @throws IOException
+ */
+ void remove(File recs) throws IOException;
+
+ /**
+ * Fetches an iterator of records available.
+ *
+ * @return
+ * @throws IOException
+ */
+ Iterator<String> get() throws IOException;
+
+ /**
+ * Fetches a File object which having all the sorted records.
+ * The lifecycle of the returned {@link File} handle is the responsibility
of the handler.
+ *
+ * @return
+ * @throws IOException
+ */
+ File get(String path) throws IOException;
+}
Propchange:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobTracker.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java?rev=1753429&r1=1753428&r2=1753429&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java
Wed Jul 20 03:56:20 2016
@@ -22,6 +22,7 @@ package org.apache.jackrabbit.oak.plugin
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Iterators.filter;
import static com.google.common.collect.Iterators.transform;
+import static org.apache.commons.io.IOUtils.closeQuietly;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
@@ -57,6 +58,7 @@ import org.apache.jackrabbit.core.data.M
import org.apache.jackrabbit.oak.cache.CacheLIRS;
import org.apache.jackrabbit.oak.cache.CacheStats;
import org.apache.jackrabbit.oak.commons.StringUtils;
+import org.apache.jackrabbit.oak.plugins.blob.BlobTrackingStore;
import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.spi.blob.stats.StatsCollectingStreams;
@@ -70,14 +72,16 @@ import org.slf4j.LoggerFactory;
* It also handles inlining binaries if there size is smaller than
* {@link org.apache.jackrabbit.core.data.DataStore#getMinRecordLength()}
*/
-public class DataStoreBlobStore implements DataStore, SharedDataStore,
BlobStore,
- GarbageCollectableBlobStore {
+public class DataStoreBlobStore implements DataStore, BlobStore,
+ GarbageCollectableBlobStore, BlobTrackingStore {
private final Logger log = LoggerFactory.getLogger(getClass());
private final DataStore delegate;
private BlobStatsCollector stats = BlobStatsCollector.NOOP;
+ private BlobTracker tracker;
+
/**
* If set to true then the blob length information would be encoded as
part of blobId
* and thus no extra call would be made to DataStore to determine the
length
@@ -194,6 +198,7 @@ public class DataStoreBlobStore implemen
public void close() throws DataStoreException {
delegate.close();
cache.invalidateAll();
+ closeQuietly(tracker);
}
//~-------------------------------------------< BlobStore >
@@ -206,6 +211,9 @@ public class DataStoreBlobStore implemen
checkNotNull(stream);
DataRecord dr = writeStream(stream);
String id = getBlobId(dr);
+ if (tracker != null) {
+ tracker.add(id);
+ }
threw = false;
stats.uploaded(System.nanoTime() - start, TimeUnit.NANOSECONDS,
dr.getLength());
stats.uploadCompleted(id);
@@ -332,7 +340,7 @@ public class DataStoreBlobStore implemen
in = new FileInputStream(file);
return writeBlob(in);
} finally {
- org.apache.commons.io.IOUtils.closeQuietly(in);
+ closeQuietly(in);
FileUtils.forceDelete(file);
}
}
@@ -521,6 +529,19 @@ public class DataStoreBlobStore implemen
this.stats = stats;
}
+
+ @Override
+ public void addTracker(BlobTracker tracker) {
+ this.tracker = tracker;
+ }
+
+ @Override
+ @Nullable
+ public BlobTracker getTracker() {
+ return tracker;
+ }
+
+
//~---------------------------------------------< Internal >
private InputStream getStream(String blobId) throws IOException {
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java?rev=1753429&r1=1753428&r2=1753429&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java
Wed Jul 20 03:56:20 2016
@@ -96,7 +96,10 @@ public class SharedDataStoreUtils {
* Encapsulates the different type of records at the data store root.
*/
public enum SharedStoreRecordType {
- REFERENCES("references"), REPOSITORY("repository"),
MARKED_START_MARKER("markedTimestamp");
+ REFERENCES("references"),
+ REPOSITORY("repository"),
+ MARKED_START_MARKER("markedTimestamp"),
+ BLOBREFERENCES("blob");
private final String type;
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java?rev=1753429&r1=1753428&r2=1753429&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
Wed Jul 20 03:56:20 2016
@@ -69,7 +69,9 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.blob.BlobGCMBean;
import org.apache.jackrabbit.oak.plugins.blob.BlobGarbageCollector;
import org.apache.jackrabbit.oak.plugins.blob.BlobStoreStats;
+import org.apache.jackrabbit.oak.plugins.blob.BlobTrackingStore;
import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobIdTracker;
import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
import org.apache.jackrabbit.oak.plugins.document.persistentCache.CacheType;
import
org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCacheStats;
@@ -324,6 +326,24 @@ public class DocumentNodeStoreService {
)
public static final String PROP_BLOB_GC_MAX_AGE = "blobGcMaxAgeInSecs";
+ /**
+ * Default interval for taking snapshots of locally tracked blob ids.
+ */
+ private static final long DEFAULT_BLOB_SNAPSHOT_INTERVAL = 12 * 60 * 60;
+ @Property (longValue = DEFAULT_BLOB_SNAPSHOT_INTERVAL,
+ label = "Blob tracking snapshot interval (in secs)",
+ description = "This is the default interval in which the snapshots of
locally tracked blob ids will"
+ + "be taken and synchronized with the blob store"
+ )
+ public static final String PROP_BLOB_SNAPSHOT_INTERVAL =
"blobTrackSnapshotIntervalInSecs";
+
+ private static final String DEFAULT_PROP_HOME = "./repository";
+ @Property(value = DEFAULT_PROP_HOME,
+ label = "Root directory",
+ description = "Root directory for local tracking of blob ids"
+ )
+ private static final String PROP_HOME = "repository.home";
+
private static final long DEFAULT_MAX_REPLICATION_LAG = 6 * 60 * 60;
@Property(longValue = DEFAULT_MAX_REPLICATION_LAG,
label = "Max Replication Lag (in secs)",
@@ -504,15 +524,31 @@ public class DocumentNodeStoreService {
// If a shared data store register the repo id in the data store
if (SharedDataStoreUtils.isShared(blobStore)) {
+ String repoId = null;
try {
- String repoId =
ClusterRepositoryInfo.getOrCreateId(mk.getNodeStore());
+ repoId =
ClusterRepositoryInfo.getOrCreateId(mk.getNodeStore());
((SharedDataStore) blobStore).addMetadataRecord(new
ByteArrayInputStream(new byte[0]),
SharedDataStoreUtils.SharedStoreRecordType.REPOSITORY.getNameFromId(repoId));
} catch (Exception e) {
throw new IOException("Could not register a unique
repositoryId", e);
}
+
+ if (blobStore instanceof BlobTrackingStore) {
+ final long trackSnapshotInterval =
toLong(prop(PROP_BLOB_SNAPSHOT_INTERVAL),
+ DEFAULT_BLOB_SNAPSHOT_INTERVAL);
+ String root = PropertiesUtil.toString(prop(PROP_HOME),
DEFAULT_PROP_HOME);
+
+ BlobTrackingStore trackingStore = (BlobTrackingStore)
blobStore;
+ if (trackingStore.getTracker() != null) {
+ trackingStore.getTracker().close();
+ }
+ ((BlobTrackingStore) blobStore).addTracker(
+ new BlobIdTracker(root, repoId, trackSnapshotInterval,
(SharedDataStore)
+ blobStore));
+ }
}
+
registerJMXBeans(mk.getNodeStore(), mkBuilder);
registerLastRevRecoveryJob(mk.getNodeStore());
registerJournalGC(mk.getNodeStore());
Added:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTrackerClusterSharedTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTrackerClusterSharedTest.java?rev=1753429&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTrackerClusterSharedTest.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTrackerClusterSharedTest.java
Wed Jul 20 03:56:20 2016
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob.datastore;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
+import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static com.google.common.collect.Lists.newArrayList;
+import static com.google.common.collect.Sets.newHashSet;
+import static com.google.common.io.Closeables.close;
+import static java.lang.String.valueOf;
+import static java.util.UUID.randomUUID;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.jackrabbit.oak.commons.FileIOUtils.writeStrings;
+import static
org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreUtils.getBlobStore;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assume.assumeNoException;
+import static org.junit.Assume.assumeThat;
+
+/**
+ * Test for BlobIdTracker simulating a cluster and a shared data store
scenarios
+ * to test addition, retrieval and removal of blob ids.
+ */
+public class BlobIdTrackerClusterSharedTest {
+ private static final Logger log =
LoggerFactory.getLogger(BlobIdTrackerClusterSharedTest.class);
+
+ File root;
+ Cluster cluster1;
+ Cluster cluster2;
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder(new File("target"));
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ try {
+ assumeThat(getBlobStore(), instanceOf(SharedDataStore.class));
+ } catch (Exception e) {
+ assumeNoException(e);
+ }
+ }
+
+ @Before
+ public void setup() throws Exception {
+ this.root = folder.newFolder();
+ }
+
+ /**
+ * Test simulating add, remove, retrieve scenarios for blobId tracker on a
2 node cluster.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void addRetrieveCluster() throws Exception {
+ String clusterRepoId = randomUUID().toString();
+ cluster1 = new Cluster(clusterRepoId,
+ folder.newFolder("cluster1").getAbsolutePath(), folder);
+ cluster2 = new Cluster(clusterRepoId,
+ folder.newFolder("cluster2").getAbsolutePath(), folder);
+ Set<String> adds = newHashSet();
+
+ // Add some on cluster 2 & simulate snapshot
+ adds.addAll(cluster2.doAdd(range(5, 9)));
+ cluster2.forceSnapshot();
+ log.info("Done force snapshot for cluster2");
+
+ // Add some on cluster1 & simulate snapshot
+ adds.addAll(cluster1.doAdd(range(0, 4)));
+ cluster1.forceSnapshot();
+ log.info("Done force snapshot for cluster1");
+
+ // Get on cluster 1
+ Set<String> retrieves = cluster1.doRetrieve();
+ assertEquals("Retrieves not correct", adds, retrieves);
+ log.info("Done retrieve on cluster1");
+
+ cluster1.doRemove(adds, range(4, 5));
+ log.info("Done remove on cluster1");
+ retrieves = cluster1.doRetrieve();
+ log.info("Done retrieve on cluster1 again");
+ assertEquals("Retrieves not correct after remove", adds, retrieves);
+ }
+
+ /**
+ * Test simulating add, remove, retrieve scenarios for blobId tracker on a
2 node shared
+ * repository.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void addRetrieveShared() throws Exception {
+ cluster1 = new Cluster(randomUUID().toString(),
+ folder.newFolder("cluster1").getAbsolutePath(), folder);
+ cluster2 = new Cluster(randomUUID().toString(),
+ folder.newFolder("cluster2").getAbsolutePath(), folder);
+ Set<String> adds = newHashSet();
+
+ // Add some on cluster1 & simulate snapshot
+ adds.addAll(cluster1.doAdd(range(0, 4)));
+ cluster1.forceSnapshot();
+ log.info("Done force snapshot for cluster1");
+
+ // Add some on cluster 2 & simulate shapshot
+ adds.addAll(cluster2.doAdd(range(5, 9)));
+ cluster2.forceSnapshot();
+ log.info("Done force snapshot for cluster2");
+
+ // Get on cluster 1
+ Set<String> retrieves = cluster1.doRetrieve();
+ assertEquals("Retrieves not correct",
+ adds, retrieves);
+ log.info("Done retrieve on cluster1");
+
+ cluster1.doRemove(adds, range(4, 5));
+ log.info("Done remove on cluster1");
+ retrieves = cluster1.doRetrieve();
+ log.info("Done retrieve on cluster1 again");
+ assertEquals("Retrieves not correct after remove",
+ adds, retrieves);
+ }
+
+ /**
+ * Logical instance.
+ */
+ class Cluster {
+ ScheduledExecutorService scheduler;
+ BlobIdTracker tracker;
+ TemporaryFolder folder;
+ SharedDataStore dataStore;
+
+ Cluster(String repoId, String path, TemporaryFolder folder) throws
Exception {
+ this.dataStore = getBlobStore(root);
+ this.tracker = new BlobIdTracker(path, repoId, 100 * 60,
dataStore);
+ this.scheduler = newSingleThreadScheduledExecutor();
+ this.folder = folder;
+ }
+
+ Set<String> doAdd(List<String> ints) throws IOException {
+ return add(tracker, ints);
+ }
+
+ void doRemove(Set<String> adds, List<String> removes) throws
IOException {
+ remove(tracker, folder.newFile(), adds, removes);
+ }
+
+ void forceSnapshot() {
+ try {
+ ScheduledFuture<?> scheduledFuture =
+ scheduler.schedule(tracker.new SnapshotJob(), 0,
MILLISECONDS);
+ scheduledFuture.get();
+ } catch (Exception e) {
+ log.error("Error in snapshot", e);
+ }
+ }
+
+ Set<String> doRetrieve() throws IOException {
+ return retrieve(tracker);
+ }
+
+ public void close() throws IOException {
+ new ExecutorCloser(scheduler).close();
+ tracker.close();
+ try {
+ ((DataStoreBlobStore) dataStore).close();
+ } catch (DataStoreException e) {
+ log.warn("Error closing blobstore", e);
+ }
+ }
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ cluster1.close();
+ cluster2.close();
+ folder.delete();
+ }
+
+ private static Set<String> add(BlobTracker tracker, List<String> ints)
throws IOException {
+ Set<String> s = newHashSet();
+ for (String rec : ints) {
+ tracker.add(rec);
+ s.add(rec);
+ }
+ return s;
+ }
+
+ private static Set<String> retrieve(BlobTracker tracker) throws
IOException {
+ Set<String> retrieved = newHashSet();
+ Iterator<String> iter = tracker.get();
+ log.info("retrieving blob ids");
+ while(iter.hasNext()) {
+ retrieved.add(iter.next());
+ }
+ if (iter instanceof Closeable) {
+ close((Closeable)iter, true);
+ }
+ return retrieved;
+ }
+
+ private static void remove(BlobIdTracker tracker, File temp, Set<String>
initAdd,
+ List<String> ints) throws IOException {
+ writeStrings(ints.iterator(), temp, false);
+ initAdd.removeAll(ints);
+ tracker.remove(temp);
+ }
+
+ private static List<String> range(int min, int max) {
+ List<String> list = newArrayList();
+ for (int i = min; i <= max; i++) {
+ list.add(valueOf(i));
+ }
+ return list;
+ }
+}
+
Propchange:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/BlobIdTrackerClusterSharedTest.java
------------------------------------------------------------------------------
svn:eol-style = native