Author: amitj
Date: Fri Feb 13 05:20:44 2015
New Revision: 1659457
URL: http://svn.apache.org/r1659457
Log:
OAK-2514: Shared DataStore GC framework support
Introduced a new interface for supporting heterogeneous deployments on a shared
data store
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStore.java
(with props)
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java
(with props)
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/identifier/ClusterRepositoryInfo.java
(with props)
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/ClusterRepositoryInfoTest.java
(with props)
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreUtilsTest.java
(with props)
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/SharedBlobStoreGCTest.java
(with props)
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGarbageCollector.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreUtils.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGarbageCollector.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGarbageCollector.java?rev=1659457&r1=1659456&r2=1659457&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGarbageCollector.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobGarbageCollector.java
Fri Feb 13 05:20:44 2015
@@ -22,9 +22,11 @@ package org.apache.jackrabbit.oak.plugin
public interface BlobGarbageCollector {
/**
- * Collect garbage blobs from the passed node store instance.
- *
- * @throws Exception
+ * Marks garbage blobs from the passed node store instance.
+ * Collects them only if markOnly is false.
+ *
+ * @param markOnly whether to only mark references and not sweep in the
mark and sweep operation.
+ * @throws Exception the exception
*/
- void collectGarbage() throws Exception;
-}
\ No newline at end of file
+ void collectGarbage(boolean markOnly) throws Exception;
+}
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java?rev=1659457&r1=1659456&r2=1659457&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java
Fri Feb 13 05:20:44 2015
@@ -18,12 +18,16 @@ package org.apache.jackrabbit.oak.plugin
import java.io.Closeable;
import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.util.Comparator;
+import java.util.List;
import com.google.common.io.Files;
import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.commons.IOUtils;
import org.apache.jackrabbit.oak.commons.sort.ExternalSort;
/**
@@ -47,6 +51,13 @@ class GarbageCollectorFileState implemen
/** The garbage stores the garbage collection candidates which were not
deleted . */
private final File garbage;
+ private final static Comparator<String> lexComparator =
+ new Comparator<String>() {
+ @Override
+ public int compare(String s1, String s2) {
+ return s1.compareTo(s2);
+ }
+ };
/**
* Instantiates a new garbage collector file state.
@@ -118,21 +129,26 @@ class GarbageCollectorFileState implemen
*
* @param file file whose contents needs to be sorted
*/
- public void sort(File file) throws IOException {
+ public static void sort(File file) throws IOException {
File sorted = createTempFile();
- Comparator<String> lexComparator = new Comparator<String>() {
- @Override
- public int compare(String s1, String s2) {
- return s1.compareTo(s2);
- }
- };
- ExternalSort.mergeSortedFiles(
- ExternalSort.sortInBatch(file, lexComparator, true),
- sorted, lexComparator, true);
+ merge(ExternalSort.sortInBatch(file, lexComparator, true), sorted);
Files.move(sorted, file);
}
+
+ public static void merge(List<File> files, File output) throws IOException
{
+ ExternalSort.mergeSortedFiles(
+ files,
+ output, lexComparator, true);
+ }
+
+ public static File copy(InputStream stream) throws IOException {
+ File file = createTempFile();
+ IOUtils.copy(stream,
+ new FileOutputStream(file));
+ return file;
+ }
- private File createTempFile() throws IOException {
- return File.createTempFile("temp", null, home);
+ private static File createTempFile() throws IOException {
+ return File.createTempFile("temp", null);
}
-}
\ No newline at end of file
+}
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java?rev=1659457&r1=1659456&r2=1659457&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
Fri Feb 13 05:20:44 2015
@@ -19,14 +19,16 @@ package org.apache.jackrabbit.oak.plugin
import java.io.BufferedWriter;
import java.io.Closeable;
import java.io.File;
+import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
+import java.io.InputStream;
import java.sql.Timestamp;
import java.util.Iterator;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -41,15 +43,20 @@ import com.google.common.collect.Lists;
import com.google.common.collect.PeekingIterator;
import com.google.common.io.Closeables;
import com.google.common.io.Files;
-import com.google.common.util.concurrent.ListenableFutureTask;
-import com.google.common.util.concurrent.MoreExecutors;
+
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStoreException;
import org.apache.jackrabbit.oak.commons.IOUtils;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
+import
org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType;
import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
/**
* Mark and sweep garbage collector.
*
@@ -72,9 +79,6 @@ public class MarkSweepGarbageCollector i
/** The last modified time before current time of blobs to consider for
garbage collection. */
private final long maxLastModifiedInterval;
- /** Run concurrently when possible. */
- private final boolean runConcurrently;
-
/** The blob store to be garbage collected. */
private final GarbageCollectableBlobStore blobStore;
@@ -89,6 +93,8 @@ public class MarkSweepGarbageCollector i
/** The batch count. */
private final int batchCount;
+ private String repoId;
+
/** Flag to indicate the state of the gc **/
private State state = State.NOT_RUNNING;
@@ -96,13 +102,15 @@ public class MarkSweepGarbageCollector i
* Creates an instance of MarkSweepGarbageCollector
*
* @param marker BlobReferenceRetriever instanced used to fetch refereed
blob entries
+ * @param blobStore the blob store instance
+ * @param executor executor
* @param root the root absolute path of directory under which temporary
* files would be created
* @param batchCount batch sized used for saving intermediate state
- * @param runBackendConcurrently - run the backend iterate concurrently
- * @param maxLastModifiedInterval - lastModifiedTime in millis. Only files
with time
+ * @param maxLastModifiedInterval lastModifiedTime in millis. Only files
with time
* less than this time would be considered
for GC
- * @throws IOException Signals that an I/O exception has occurred.
+ * @param repositoryId - unique repository id for this node
+ * @throws IOException
*/
public MarkSweepGarbageCollector(
BlobReferenceRetriever marker,
@@ -110,41 +118,43 @@ public class MarkSweepGarbageCollector i
Executor executor,
String root,
int batchCount,
- boolean runBackendConcurrently,
- long maxLastModifiedInterval)
+ long maxLastModifiedInterval,
+ @Nullable String repositoryId)
throws IOException {
this.executor = executor;
this.blobStore = blobStore;
this.marker = marker;
this.batchCount = batchCount;
- this.runConcurrently = runBackendConcurrently;
this.maxLastModifiedInterval = maxLastModifiedInterval;
- fs = new GarbageCollectorFileState(root);
+ this.repoId = repositoryId;
+ fs = new GarbageCollectorFileState(root);
}
/**
* Instantiates a new blob garbage collector.
*/
public MarkSweepGarbageCollector(
- BlobReferenceRetriever marker,
+ BlobReferenceRetriever marker,
GarbageCollectableBlobStore blobStore,
- Executor executor)
+ Executor executor,
+ @Nullable String repositoryId)
throws IOException {
- this(marker, blobStore, executor, TEMP_DIR, DEFAULT_BATCH_COUNT, true,
TimeUnit.HOURS.toMillis(24));
+ this(marker, blobStore, executor, TEMP_DIR, DEFAULT_BATCH_COUNT,
TimeUnit.HOURS
+ .toMillis(24), repositoryId);
}
public MarkSweepGarbageCollector(
BlobReferenceRetriever marker,
GarbageCollectableBlobStore blobStore,
Executor executor,
- long maxLastModifiedInterval)
- throws IOException {
- this(marker, blobStore, executor, TEMP_DIR, DEFAULT_BATCH_COUNT, true,
maxLastModifiedInterval);
+ long maxLastModifiedInterval,
+ @Nullable String repositoryId) throws IOException {
+ this(marker, blobStore, executor, TEMP_DIR, DEFAULT_BATCH_COUNT,
maxLastModifiedInterval, repositoryId);
}
@Override
- public void collectGarbage() throws Exception {
- markAndSweep();
+ public void collectGarbage(boolean markOnly) throws Exception {
+ markAndSweep(markOnly);
}
/**
@@ -157,22 +167,29 @@ public class MarkSweepGarbageCollector i
}
/**
- * Mark and sweep. Main method for GC.
+ * Mark and sweep. Main entry method for GC.
+ *
+ * @param markOnly whether to mark only
+ * @throws Exception the exception
*/
- private void markAndSweep() throws IOException, InterruptedException {
+ private void markAndSweep(boolean markOnly) throws Exception {
boolean threw = true;
try {
Stopwatch sw = Stopwatch.createStarted();
LOG.info("Starting Blob garbage collection");
mark();
- int deleteCount = sweep();
- threw = false;
+ if (!markOnly) {
+ int deleteCount = sweep();
+ threw = false;
- LOG.info("Blob garbage collection completed in {}. Number of blobs
" +
- "deleted [{}]", sw.toString(), deleteCount);
+ LOG.info("Blob garbage collection completed in {}. Number of
blobs "
+ + "deleted [{}]", sw.toString(), deleteCount);
+ }
} finally {
- Closeables.close(fs, threw);
+ if (LOG.isTraceEnabled()) {
+ Closeables.close(fs, threw);
+ }
state = State.NOT_RUNNING;
}
}
@@ -180,29 +197,16 @@ public class MarkSweepGarbageCollector i
/**
* Mark phase of the GC.
*/
- private void mark() throws IOException, InterruptedException {
+ private void mark() throws IOException, DataStoreException {
state = State.MARKING;
LOG.debug("Starting mark phase of the garbage collector");
- // Find all blobs available in the blob store
- ListenableFutureTask<Integer> blobIdRetriever =
ListenableFutureTask.create(new BlobIdRetriever());
- if (runConcurrently) {
- executor.execute(blobIdRetriever);
- } else {
- MoreExecutors.sameThreadExecutor().execute(blobIdRetriever);
- }
-
- // Find all blob references after iterating over the whole repository
+ // Mark all used references
iterateNodeTree();
- try {
- blobIdRetriever.get();
- } catch (ExecutionException e) {
- LOG.warn("Error occurred while fetching all the blobIds from the
BlobStore. GC would " +
- "continue with the blobIds retrieved so far", e.getCause());
- }
+ // Move the marked references file to the data store meta area if
applicable
+ GarbageCollectionType.get(blobStore).addMarked(blobStore, fs, repoId);
- difference();
LOG.debug("Ending mark phase of the garbage collector");
}
@@ -248,16 +252,55 @@ public class MarkSweepGarbageCollector i
/**
* Sweep phase of gc candidate deletion.
- *
- * @throws IOException
- * Signals that an I/O exception has occurred.
+ * <p>
+ * Performs the following steps depending upon the type of the blob store
refer
+ * {@link org.apache.jackrabbit.oak.plugins.blob.SharedDataStore.Type}:
+ *
+ * <ul>
+ * <li>Shared</li>
+ * <ul>
+ * <li> Merge all marked references (from the mark phase run
independently) available in the data store meta
+ * store (from all configured independent repositories).
+ * <li> Retrieve all blob ids available.
+ * <li> Diffs the 2 sets above to retrieve list of blob ids not used.
+ * <li> Deletes only blobs created after
+ * (earliest time stamp of the marked references -
#maxLastModifiedInterval) from the above set.
+ * </ul>
+ *
+ * <li>Default</li>
+ * <ul>
+ * <li> Mark phase already run.
+ * <li> Retrieve all blob ids available.
+ * <li> Diffs the 2 sets above to retrieve list of blob ids not used.
+ * <li> Deletes only blobs created after
+ * (time stamp of the marked references -
#maxLastModifiedInterval).
+ * </ul>
+ * </ul>
+ *
+ * @return the number of blobs deleted
+ * @throws Exception the exception
*/
- private int sweep() throws IOException {
+ private int sweep() throws Exception {
+ long earliestRefAvailTime;
+ // Merge all the blob references available from all the reference
files in the data store meta store
+ // Only go ahead if merge succeeded
+ try {
+ earliestRefAvailTime =
+
GarbageCollectionType.get(blobStore).mergeAllMarkedReferences(blobStore, fs);
+ } catch (Exception e) {
+ return 0;
+ }
+
+ // Find all blob references after iterating over the whole repository
+ (new BlobIdRetriever()).call();
+
+ // Calculate the references not used
+ difference();
int count = 0;
state = State.SWEEPING;
LOG.debug("Starting sweep phase of the garbage collector");
LOG.debug("Sweeping blobs with modified time > than the configured max
deleted time ({}). " +
- timestampToString(getLastMaxModifiedTime()));
+
timestampToString(getLastMaxModifiedTime(earliestRefAvailTime)));
ConcurrentLinkedQueue<String> exceptionQueue = new
ConcurrentLinkedQueue<String>();
@@ -270,13 +313,13 @@ public class MarkSweepGarbageCollector i
if (ids.size() > getBatchCount()) {
count += ids.size();
- executor.execute(new Sweeper(ids, exceptionQueue));
+ executor.execute(new Sweeper(ids, exceptionQueue,
earliestRefAvailTime));
ids = Lists.newArrayList();
}
}
if (!ids.isEmpty()) {
count += ids.size();
- executor.execute(new Sweeper(ids, exceptionQueue));
+ executor.execute(new Sweeper(ids, exceptionQueue,
earliestRefAvailTime));
}
count -= exceptionQueue.size();
@@ -291,11 +334,13 @@ public class MarkSweepGarbageCollector i
IOUtils.closeQuietly(writer);
}
if(!exceptionQueue.isEmpty()) {
- LOG.warn("Unable to delete some blobs entries from the blob store.
" +
- "This may happen if blob modified time is > than the max
deleted time ({}). " +
- "Details around such blob entries can be found in [{}]",
- timestampToString(getLastMaxModifiedTime()),
fs.getGarbage().getAbsolutePath());
+ LOG.warn(
+ "Unable to delete some blobs entries from the blob store. This
may happen if blob modified time is > "
+ + "than the max deleted time ({}). Details around such
blob entries can be found in [{}]",
+
timestampToString(getLastMaxModifiedTime(earliestRefAvailTime)),
fs.getGarbage().getAbsolutePath());
}
+ // Remove all the merged marked references
+
GarbageCollectionType.get(blobStore).removeAllMarkedReferences(blobStore);
LOG.debug("Ending sweep phase of the garbage collector");
return count;
}
@@ -304,10 +349,10 @@ public class MarkSweepGarbageCollector i
return batchCount;
}
- private long getLastMaxModifiedTime(){
+ private long getLastMaxModifiedTime(long maxModified) {
return maxLastModifiedInterval > 0 ?
- System.currentTimeMillis() - maxLastModifiedInterval : 0;
-
+ ((maxModified <= 0 ? System.currentTimeMillis() : maxModified) -
maxLastModifiedInterval) :
+ 0;
}
/**
@@ -331,16 +376,20 @@ public class MarkSweepGarbageCollector i
/** The ids to sweep. */
private final List<String> ids;
- public Sweeper(List<String> ids, ConcurrentLinkedQueue<String>
exceptionQueue) {
+ private final long maxModified;
+
+ public Sweeper(List<String> ids, ConcurrentLinkedQueue<String>
exceptionQueue,
+ long maxModified) {
this.exceptionQueue = exceptionQueue;
this.ids = ids;
+ this.maxModified = maxModified;
}
@Override
public void run() {
try {
LOG.debug("Blob ids to be deleted {}", ids);
- boolean deleted = blobStore.deleteChunks(ids,
getLastMaxModifiedTime());
+ boolean deleted = blobStore.deleteChunks(ids,
getLastMaxModifiedTime(maxModified));
if (!deleted) {
// Only log and do not add to exception queue since some
blobs may not match the
// lastMaxModifiedTime criteria.
@@ -369,7 +418,7 @@ public class MarkSweepGarbageCollector i
@Override
public void addReference(String blobId) {
if (debugMode) {
- LOG.trace("BlobId : {}",blobId);
+ LOG.trace("BlobId : {}", blobId);
}
try {
@@ -384,7 +433,7 @@ public class MarkSweepGarbageCollector i
}
if (debugMode) {
- LOG.trace("chunkId : {}",id);
+ LOG.trace("chunkId : {}", id);
}
count.getAndIncrement();
}
@@ -400,9 +449,9 @@ public class MarkSweepGarbageCollector i
}
);
LOG.info("Number of valid blob references marked under mark phase
of " +
- "Blob garbage collection [{}]",count.get());
+ "Blob garbage collection [{}]", count.get());
// sort the marked references
- fs.sort(fs.getMarkedRefs());
+ GarbageCollectorFileState.sort(fs.getMarkedRefs());
} finally {
IOUtils.closeQuietly(writer);
}
@@ -439,22 +488,20 @@ public class MarkSweepGarbageCollector i
}
// sort the file
- fs.sort(fs.getAvailableRefs());
+ GarbageCollectorFileState.sort(fs.getAvailableRefs());
LOG.debug("Number of blobs present in BlobStore : [{}] ",
blobsCount);
} finally {
IOUtils.closeQuietly(bufferWriter);
}
return blobsCount;
}
-
-
}
/**
* FileLineDifferenceIterator class which iterates over the difference of
2 files line by line.
*/
- static class FileLineDifferenceIterator extends AbstractIterator<String>
implements Closeable{
+ static class FileLineDifferenceIterator extends AbstractIterator<String>
implements Closeable {
private final PeekingIterator<String> peekMarked;
private final LineIterator marked;
private final LineIterator all;
@@ -527,4 +574,107 @@ public class MarkSweepGarbageCollector i
private static String timestampToString(long timestamp){
return (new Timestamp(timestamp) + "00").substring(0, 23);
}
+
+ /**
+ * Defines different data store types from the garbage collection
perspective and encodes the divergent behavior.
+ * <ul></ul>
+ */
+ enum GarbageCollectionType {
+ SHARED {
+ /**
+ * Remove the maked references from the blob store root. Default
NOOP.
+ *
+ * @param blobStore the blobStore instance
+ */
+ @Override
+ void removeAllMarkedReferences(GarbageCollectableBlobStore
blobStore) {
+ ((SharedDataStore)
blobStore).deleteAllMetadataRecords(SharedStoreRecordType.REFERENCES.getType());
+ }
+
+ /**
+ * Merge all marked references available from all repositories and
return the earliest time of the references.
+ *
+ * @param blobStore the blob store
+ * @param fs the fs
+ * @return the long the earliest time of the available references
+ * @throws IOException Signals that an I/O exception has occurred.
+ * @throws DataStoreException the data store exception
+ */
+ @Override
+ long mergeAllMarkedReferences(GarbageCollectableBlobStore
blobStore,
+ GarbageCollectorFileState fs)
+ throws IOException, DataStoreException {
+
+ List<DataRecord> refFiles =
+ ((SharedDataStore)
blobStore).getAllMetadataRecords(SharedStoreRecordType.REFERENCES.getType());
+
+ // Get all the repositories registered
+ List<DataRecord> repoFiles =
+ ((SharedDataStore)
blobStore).getAllMetadataRecords(SharedStoreRecordType.REPOSITORY.getType());
+
+ // Retrieve repos for which reference files have not been
created
+ Set<String> unAvailRepos =
+
SharedDataStoreUtils.refsNotAvailableFromRepos(repoFiles, refFiles);
+ if (unAvailRepos.isEmpty()) {
+ // List of files to be merged
+ List<File> files = Lists.newArrayList();
+ for (DataRecord refFile : refFiles) {
+ File file =
GarbageCollectorFileState.copy(refFile.getStream());
+ files.add(file);
+ }
+
+ GarbageCollectorFileState.merge(files, fs.getMarkedRefs());
+
+ return
SharedDataStoreUtils.getEarliestRecord(refFiles).getLastModified();
+ } else {
+ LOG.error("Not all repositories have marked references
available : {}", unAvailRepos);
+ throw new IOException("Not all repositories have marked
references available");
+ }
+ }
+
+ /**
+ * Adds the marked references to the blob store root. Default NOOP
+ *
+ * @param blobStore the blob store
+ * @param fs the fs
+ * @param repoId the repo id
+ * @throws DataStoreException the data store exception
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ @Override
+ void addMarked(GarbageCollectableBlobStore blobStore,
GarbageCollectorFileState fs,
+ String repoId) throws DataStoreException, IOException {
+ InputStream is = new FileInputStream(fs.getMarkedRefs());
+ try {
+ ((SharedDataStore) blobStore)
+ .addMetadataRecord(is,
SharedStoreRecordType.REFERENCES.getNameFromId(repoId));
+ } finally {
+ Closeables.close(is, false);
+ }
+ }
+ },
+ DEFAULT;
+
+ void removeAllMarkedReferences(GarbageCollectableBlobStore blobStore)
{}
+
+ void addMarked(GarbageCollectableBlobStore blobStore,
GarbageCollectorFileState fs,
+ String repoId) throws DataStoreException, IOException {}
+
+ long mergeAllMarkedReferences(GarbageCollectableBlobStore blobStore,
+ GarbageCollectorFileState fs)
+ throws IOException, DataStoreException {
+ // throw id the marked refs not available.
+ if (!fs.getMarkedRefs().exists() || fs.getMarkedRefs().length() ==
0) {
+ throw new IOException("Marked references not available");
+ }
+ return fs.getMarkedRefs().lastModified();
+ }
+
+ public static GarbageCollectionType get(GarbageCollectableBlobStore
blobStore) {
+ if (SharedDataStoreUtils.isShared(blobStore)) {
+ return SHARED;
+ }
+ return DEFAULT;
+ }
+ }
}
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStore.java?rev=1659457&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStore.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStore.java
Fri Feb 13 05:20:44 2015
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import java.io.InputStream;
+import java.util.List;
+
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStoreException;
+
+/**
+ * Interface to be implemented by a shared data store.
+ */
+public interface SharedDataStore {
+ /**
+ * Explicitly identifies the type of the data store
+ */
+ enum Type {
+ SHARED, DEFAULT
+ }
+
+ /**
+ * Adds the root record.
+ *
+ * @param stream the stream
+ * @param name the name of the root record
+ * @return the data record
+ * @throws DataStoreException the data store exception
+ */
+ void addMetadataRecord(InputStream stream, String name)
+ throws DataStoreException;
+
+ /**
+ * Retrieves the metadata record with the given name
+ *
+ * @param name the name of the record
+ * @return
+ */
+ DataRecord getMetadataRecord(String name);
+
+ /**
+ * Gets the all root records.
+ *
+ * @return the all root records
+ */
+ List<DataRecord> getAllMetadataRecords(String prefix);
+
+ /**
+ * Deletes the root record represented by the given parameters.
+ *
+ * @param name the name of the root record
+ * @return success/failure
+ */
+ boolean deleteMetadataRecord(String name);
+
+ /**
+ * Deletes all records matching the given prefix.
+ *
+ * @param prefix metadata type identifier
+ */
+ void deleteAllMetadataRecords(String prefix);
+
+ /**
+ * Gets the type.
+ *
+ * @return the type
+ */
+ Type getType();
+}
+
Propchange:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStore.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java?rev=1659457&r1=1659456&r2=1659457&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java
Fri Feb 13 05:20:44 2015
@@ -48,6 +48,7 @@ import org.apache.jackrabbit.core.data.D
import org.apache.jackrabbit.core.data.DataStoreException;
import org.apache.jackrabbit.core.data.MultiDataStoreAware;
import org.apache.jackrabbit.oak.cache.CacheLIRS;
+import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
import org.slf4j.Logger;
@@ -62,7 +63,8 @@ import static com.google.common.collect.
* It also handles inlining binaries if there size is smaller than
* {@link org.apache.jackrabbit.core.data.DataStore#getMinRecordLength()}
*/
-public class DataStoreBlobStore implements DataStore, BlobStore,
GarbageCollectableBlobStore {
+public class DataStoreBlobStore implements DataStore, SharedDataStore,
BlobStore,
+ GarbageCollectableBlobStore {
private final Logger log = LoggerFactory.getLogger(getClass());
private final DataStore delegate;
@@ -394,6 +396,48 @@ public class DataStoreBlobStore implemen
return Iterators.singletonIterator(blobId);
}
+ @Override
+ public void addMetadataRecord(InputStream stream, String name) throws
DataStoreException {
+ if (delegate instanceof SharedDataStore) {
+ ((SharedDataStore) delegate).addMetadataRecord(stream, name);
+ }
+ }
+
+ @Override public DataRecord getMetadataRecord(String name) {
+ if (delegate instanceof SharedDataStore) {
+ return ((SharedDataStore) delegate).getMetadataRecord(name);
+ }
+ return null;
+ }
+
+ @Override
+ public List<DataRecord> getAllMetadataRecords(String prefix) {
+ if (delegate instanceof SharedDataStore) {
+ return ((SharedDataStore) delegate).getAllMetadataRecords(prefix);
+ }
+ return null;
+ }
+
+ @Override
+ public boolean deleteMetadataRecord(String name) {
+ return delegate instanceof SharedDataStore && ((SharedDataStore)
delegate).deleteMetadataRecord(name);
+ }
+
+ @Override
+ public void deleteAllMetadataRecords(String prefix) {
+ if (delegate instanceof SharedDataStore) {
+ ((SharedDataStore) delegate).deleteAllMetadataRecords(prefix);
+ }
+ }
+
+ @Override
+ public Type getType() {
+ if (delegate instanceof SharedDataStore) {
+ return Type.SHARED;
+ }
+ return Type.DEFAULT;
+ }
+
//~---------------------------------------------< Object >
@Override
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java?rev=1659457&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java
Fri Feb 13 05:20:44 2015
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob.datastore;
+
+import java.util.List;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
+
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+
+/**
+ * Utility class for {@link SharedDataStore}.
+ */
+public class SharedDataStoreUtils {
+ /**
+ * Checks if the blob store shared.
+ *
+ * @param blobStore the blob store
+ * @return true if shared
+ */
+ public static boolean isShared(BlobStore blobStore) {
+ return (blobStore instanceof SharedDataStore)
+ && (((SharedDataStore) blobStore).getType() ==
SharedDataStore.Type.SHARED);
+ }
+
+ /**
+ * Gets the earliest record of the available reference records.
+ *
+ * @param recs the recs
+ * @return the earliest record
+ */
+ public static DataRecord getEarliestRecord(List<DataRecord> recs) {
+ return Ordering.natural().onResultOf(
+ new Function<DataRecord, Long>() {
+ @Override
+ @Nullable
+ public Long apply(@Nullable DataRecord input) {
+ return input.getLastModified();
+ }
+ }).min(recs);
+ }
+
+ /**
+ * Repositories from which marked references not available.
+ *
+ * @param repos the repos
+ * @param refs the refs
+ * @return the sets the sets whose references not available
+ */
+ public static Set<String> refsNotAvailableFromRepos(List<DataRecord> repos,
+ List<DataRecord> refs) {
+ return Sets.difference(FluentIterable.from(repos).uniqueIndex(
+ new Function<DataRecord, String>() {
+ @Override
+ @Nullable
+ public String apply(@Nullable DataRecord input) {
+ return
SharedStoreRecordType.REPOSITORY.getIdFromName(input.getIdentifier().toString());
+ }
+ }).keySet(),
+ FluentIterable.from(refs).uniqueIndex(
+ new Function<DataRecord, String>() {
+ @Override
+ @Nullable
+ public String apply(@Nullable DataRecord input) {
+ return
SharedStoreRecordType.REFERENCES.getIdFromName(input.getIdentifier().toString());
+ }
+ }).keySet());
+ }
+
+ /**
+ * Encapsulates the different type of records at the data store root.
+ */
+ public enum SharedStoreRecordType {
+ REFERENCES("references"), REPOSITORY("repository");
+
+ private final String type;
+
+ SharedStoreRecordType(String type) {
+ this.type = type;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public String getIdFromName(String name) {
+ return Splitter.on(DELIIM).limit(2).splitToList(name).get(1);
+ }
+
+ public String getNameFromId(String id) {
+ return Joiner.on(DELIIM).join(getType(), id);
+ }
+
+ static final String DELIIM = "-";
+ }
+}
+
Propchange:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1659457&r1=1659456&r2=1659457&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
Fri Feb 13 05:20:44 2015
@@ -1973,11 +1973,13 @@ public final class DocumentNodeStore
* Creates and returns a MarkSweepGarbageCollector if the current BlobStore
* supports garbage collection
*
- * @return garbage collector of the BlobStore supports GC otherwise null
* @param blobGcMaxAgeInSecs
+ * @param repositoryId
+ * @return garbage collector of the BlobStore supports GC otherwise null
*/
@CheckForNull
- public MarkSweepGarbageCollector createBlobGarbageCollector(long
blobGcMaxAgeInSecs) {
+ public MarkSweepGarbageCollector createBlobGarbageCollector(long
blobGcMaxAgeInSecs,
+ String repositoryId) {
MarkSweepGarbageCollector blobGC = null;
if(blobStore instanceof GarbageCollectableBlobStore){
try {
@@ -1985,7 +1987,8 @@ public final class DocumentNodeStore
new DocumentBlobReferenceRetriever(this),
(GarbageCollectableBlobStore) blobStore,
executor,
- TimeUnit.SECONDS.toMillis(blobGcMaxAgeInSecs));
+ TimeUnit.SECONDS.toMillis(blobGcMaxAgeInSecs),
+ repositoryId);
} catch (IOException e) {
throw new RuntimeException("Error occurred while initializing
" +
"the MarkSweepGarbageCollector",e);
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java?rev=1659457&r1=1659456&r2=1659457&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
Fri Feb 13 05:20:44 2015
@@ -24,6 +24,7 @@ import static org.apache.jackrabbit.oak.
import static org.apache.jackrabbit.oak.commons.PropertiesUtil.toLong;
import static
org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Dictionary;
@@ -55,8 +56,11 @@ import org.apache.jackrabbit.oak.osgi.Os
import org.apache.jackrabbit.oak.plugins.blob.BlobGC;
import org.apache.jackrabbit.oak.plugins.blob.BlobGCMBean;
import org.apache.jackrabbit.oak.plugins.blob.BlobGarbageCollector;
+import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
import org.apache.jackrabbit.oak.plugins.document.cache.CachingDocumentStore;
import org.apache.jackrabbit.oak.plugins.document.util.MongoConnection;
+import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
@@ -291,6 +295,17 @@ public class DocumentNodeStoreService {
mkBuilder.setExecutor(executor);
mk = mkBuilder.open();
+ // If a shared data store register the repo id in the data store
+ if (SharedDataStoreUtils.isShared(blobStore)) {
+ try {
+ String repoId =
ClusterRepositoryInfo.createId(mk.getNodeStore());
+ ((SharedDataStore) blobStore).addMetadataRecord(new
ByteArrayInputStream(new byte[0]),
+
SharedDataStoreUtils.SharedStoreRecordType.REPOSITORY.getNameFromId(repoId));
+ } catch (Exception e) {
+ throw new IOException("Could not register a unique
repositoryId", e);
+ }
+ }
+
registerJMXBeans(mk.getNodeStore());
registerLastRevRecoveryJob(mk.getNodeStore());
@@ -443,8 +458,10 @@ public class DocumentNodeStoreService {
if (store.getBlobStore() instanceof GarbageCollectableBlobStore) {
BlobGarbageCollector gc = new BlobGarbageCollector() {
@Override
- public void collectGarbage() throws Exception {
-
store.createBlobGarbageCollector(blobGcMaxAgeInSecs).collectGarbage();
+ public void collectGarbage(boolean sweep) throws Exception {
+ store.createBlobGarbageCollector(blobGcMaxAgeInSecs,
+ ClusterRepositoryInfo.getId(mk.getNodeStore()))
+ .collectGarbage(sweep);
}
};
registrations.add(registerMBean(whiteboard, BlobGCMBean.class, new
BlobGC(gc, executor),
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/identifier/ClusterRepositoryInfo.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/identifier/ClusterRepositoryInfo.java?rev=1659457&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/identifier/ClusterRepositoryInfo.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/identifier/ClusterRepositoryInfo.java
Fri Feb 13 05:20:44 2015
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.identifier;
+
+import java.util.UUID;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+
+/**
+ * Utility class to manage a unique cluster/repository id for the cluster.
+ */
+public class ClusterRepositoryInfo {
+ public static final String CLUSTER_CONFIG_NODE = ":clusterConfig";
+ public static final String CLUSTER_ID_PROP = ":clusterId";
+
+ /**
+ * Adds a new uuid for the repository in the property
/:clusterConfig/:clusterId with preoperty.
+ *
+ * @param store the NodeStore instance
+ * @return the repository id
+ * @throws CommitFailedException
+ */
+ public static String createId(NodeStore store) throws
CommitFailedException {
+ NodeBuilder root = store.getRoot().builder();
+ if (!root.hasChildNode(CLUSTER_CONFIG_NODE)) {
+ String id = UUID.randomUUID().toString();
+ root.child(CLUSTER_CONFIG_NODE).setProperty(CLUSTER_ID_PROP, id);
+ store.merge(root, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ return id;
+ } else {
+ return
root.getChildNode(CLUSTER_CONFIG_NODE).getProperty(CLUSTER_ID_PROP).getValue(Type.STRING);
+ }
+ }
+
+ /**
+ * Retrieves the {# CLUSTER_ID_PROP}
+ *
+ * @param store the NodeStore instance
+ * @return the repository id
+ */
+ public static String getId(NodeStore store) {
+ return
store.getRoot().getChildNode(CLUSTER_CONFIG_NODE).getProperty(CLUSTER_ID_PROP).getValue(Type.STRING);
+ }
+}
+
Propchange:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/identifier/ClusterRepositoryInfo.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java?rev=1659457&r1=1659456&r2=1659457&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
Fri Feb 13 05:20:44 2015
@@ -26,6 +26,7 @@ import static org.apache.jackrabbit.oak.
import static
org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.TIMESTAMP_DEFAULT;
import static
org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean;
+import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
@@ -50,6 +51,10 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.blob.BlobGCMBean;
import org.apache.jackrabbit.oak.plugins.blob.BlobGarbageCollector;
import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
+import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
+import
org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType;
+import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo;
import
org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategyMBean;
import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy;
import
org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.CleanupType;
@@ -262,15 +267,27 @@ public class SegmentNodeStoreService ext
revisionGCRegistration = registerMBean(whiteboard,
RevisionGCMBean.class, revisionGC,
RevisionGCMBean.TYPE, "Segment node store revision garbage
collection");
+ // If a shared data store register the repo id in the data store
+ if (SharedDataStoreUtils.isShared(blobStore)) {
+ try {
+ String repoId = ClusterRepositoryInfo.createId(delegate);
+ ((SharedDataStore) blobStore).addMetadataRecord(new
ByteArrayInputStream(new byte[0]),
+ SharedStoreRecordType.REPOSITORY.getNameFromId(repoId));
+ } catch (Exception e) {
+ throw new IOException("Could not register a unique
repositoryId", e);
+ }
+ }
+
if (store.getBlobStore() instanceof GarbageCollectableBlobStore) {
BlobGarbageCollector gc = new BlobGarbageCollector() {
@Override
- public void collectGarbage() throws Exception {
+ public void collectGarbage(boolean sweep) throws Exception {
MarkSweepGarbageCollector gc = new
MarkSweepGarbageCollector(
new
SegmentBlobReferenceRetriever(store.getTracker()),
(GarbageCollectableBlobStore) store.getBlobStore(),
- executor);
- gc.collectGarbage();
+ executor,
+ ClusterRepositoryInfo.getId(delegate));
+ gc.collectGarbage(sweep);
}
};
Added:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/ClusterRepositoryInfoTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/ClusterRepositoryInfoTest.java?rev=1659457&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/ClusterRepositoryInfoTest.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/ClusterRepositoryInfoTest.java
Fri Feb 13 05:20:44 2015
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import junit.framework.Assert;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.plugins.document.DocumentMK;
+import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreUtils;
+import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
+import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Tests the ClusterRepositoryInfo unique cluster repository id.
+ */
+public class ClusterRepositoryInfoTest {
+ static BlobStore blobStore;
+
+ @BeforeClass
+ public static void setup() {
+ try {
+ blobStore = DataStoreUtils.getBlobStore();
+ Assume.assumeThat(blobStore, instanceOf(SharedDataStore.class));
+ } catch (Exception e) {
+ Assume.assumeNoException(e);
+ }
+ }
+
+ @Test
+ public void differentCluster() throws Exception {
+ DocumentNodeStore ds1 = new DocumentMK.Builder()
+ .setAsyncDelay(0)
+ .setDocumentStore(new MemoryDocumentStore())
+ .setBlobStore(blobStore)
+ .getNodeStore();
+ String repoId1 = ClusterRepositoryInfo.createId(ds1);
+
+ DocumentNodeStore ds2 = new DocumentMK.Builder()
+ .setAsyncDelay(0)
+ .setDocumentStore(new MemoryDocumentStore())
+ .setBlobStore(blobStore)
+ .getNodeStore();
+ String repoId2 = ClusterRepositoryInfo.createId(ds2);
+
+ Assert.assertNotSame(repoId1, repoId2);
+ }
+
+ @Test
+ public void sameCluster() throws Exception {
+ MemoryDocumentStore store = new MemoryDocumentStore();
+ DocumentNodeStore ds1 = new DocumentMK.Builder()
+ .setAsyncDelay(0)
+ .setDocumentStore(store)
+ .setClusterId(1)
+ .setBlobStore(blobStore)
+ .getNodeStore();
+ String repoId1 = ClusterRepositoryInfo.createId(ds1);
+ ds1.runBackgroundOperations();
+
+ DocumentNodeStore ds2 = new DocumentMK.Builder()
+ .setAsyncDelay(0)
+ .setDocumentStore(store)
+ .setClusterId(2)
+ .setBlobStore(blobStore)
+ .getNodeStore();
+ String repoId2 = ClusterRepositoryInfo.createId(ds2);
+
+ // Since the same cluster the ids should be equal
+ Assert.assertEquals(repoId1, repoId2);
+ }
+
+ @After
+ public void close() throws IOException {
+ FileUtils.cleanDirectory(new File(DataStoreUtils.getHomeDir()));
+ }
+}
+
Propchange:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/ClusterRepositoryInfoTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreUtilsTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreUtilsTest.java?rev=1659457&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreUtilsTest.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreUtilsTest.java
Fri Feb 13 05:20:44 2015
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+
+import junit.framework.Assert;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
+import
org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreUtils;
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test for SharedDataUtils to test addition, retrieval and deletion of root
records.
+ */
+public class SharedDataStoreUtilsTest {
+ SharedDataStore dataStore;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ try {
+ Assume.assumeThat(DataStoreUtils.getBlobStore(),
instanceOf(SharedDataStore.class));
+ } catch (Exception e) {
+ Assume.assumeNoException(e);
+ }
+ }
+
+ @Test
+ public void test() throws Exception {
+ dataStore = DataStoreUtils.getBlobStore();
+ String repoId1 = UUID.randomUUID().toString();
+ String repoId2 = UUID.randomUUID().toString();
+
+ // Add repository records
+ dataStore.addMetadataRecord(new ByteArrayInputStream(new byte[0]),
+ SharedStoreRecordType.REPOSITORY.getNameFromId(repoId1));
+ DataRecord repo1 =
dataStore.getMetadataRecord(SharedStoreRecordType.REPOSITORY.getNameFromId(repoId1));
+ dataStore.addMetadataRecord(new ByteArrayInputStream(new byte[0]),
+ SharedStoreRecordType.REPOSITORY.getNameFromId(repoId2));
+ DataRecord repo2 =
dataStore.getMetadataRecord(SharedStoreRecordType.REPOSITORY.getNameFromId(repoId2));
+
+ // Add reference records
+ dataStore.addMetadataRecord(new ByteArrayInputStream(new byte[0]),
+ SharedStoreRecordType.REFERENCES.getNameFromId(repoId1));
+ DataRecord rec1 =
dataStore.getMetadataRecord(SharedStoreRecordType.REFERENCES.getNameFromId(repoId1));
+ dataStore.addMetadataRecord(new ByteArrayInputStream(new byte[0]),
+ SharedStoreRecordType.REFERENCES.getNameFromId(repoId2));
+ DataRecord rec2 =
dataStore.getMetadataRecord(SharedStoreRecordType.REFERENCES.getNameFromId(repoId2));
+
+ Assert.assertEquals(
+
SharedStoreRecordType.REPOSITORY.getIdFromName(repo1.getIdentifier().toString()),
+ repoId1);
+ Assert.assertEquals(
+
SharedStoreRecordType.REPOSITORY.getIdFromName(repo2.getIdentifier().toString()),
+ repoId2);
+ Assert.assertEquals(
+
SharedStoreRecordType.REFERENCES.getIdFromName(rec1.getIdentifier().toString()),
+ repoId1);
+ Assert.assertEquals(
+
SharedStoreRecordType.REFERENCES.getIdFromName(rec2.getIdentifier().toString()),
+ repoId2);
+
+ // All the references from registered repositories are available
+ Assert.assertTrue(
+ SharedDataStoreUtils.refsNotAvailableFromRepos(
+
dataStore.getAllMetadataRecords(SharedStoreRecordType.REPOSITORY.getType()),
+
dataStore.getAllMetadataRecords(SharedStoreRecordType.REFERENCES.getType())).isEmpty());
+
+ // Earliest should be the 1st reference record
+ Assert.assertEquals(
+ SharedDataStoreUtils.getEarliestRecord(
+
dataStore.getAllMetadataRecords(SharedStoreRecordType.REFERENCES.getType())).getIdentifier().toString(),
+ SharedStoreRecordType.REFERENCES.getNameFromId(repoId1));
+
+ // Delete references and check back if deleted
+
dataStore.deleteAllMetadataRecords(SharedStoreRecordType.REFERENCES.getType());
+
Assert.assertTrue(dataStore.getAllMetadataRecords(SharedStoreRecordType.REFERENCES.getType()).isEmpty());
+
+ // Repository ids should still be available
+ Assert.assertEquals(2,
+
dataStore.getAllMetadataRecords(SharedStoreRecordType.REPOSITORY.getType()).size());
+ }
+
+ @After
+ public void close() throws IOException {
+ FileUtils.cleanDirectory(new File(DataStoreUtils.getHomeDir()));
+ }
+}
+
Propchange:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreUtilsTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreUtils.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreUtils.java?rev=1659457&r1=1659456&r2=1659457&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreUtils.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreUtils.java
Fri Feb 13 05:20:44 2015
@@ -24,6 +24,7 @@ import org.apache.jackrabbit.core.data.D
import org.apache.jackrabbit.core.data.FileDataStore;
import org.apache.jackrabbit.oak.commons.PropertiesUtil;
import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.OakFileDataStore;
import org.apache.jackrabbit.oak.plugins.document.AbstractMongoConnectionTest;
import org.junit.Test;
@@ -51,7 +52,7 @@ public class DataStoreUtils {
private static long time = -1;
public static DataStoreBlobStore getBlobStore() throws Exception {
- String className = System.getProperty(DS_CLASS_NAME,
FileDataStore.class.getName());
+ String className = System.getProperty(DS_CLASS_NAME,
OakFileDataStore.class.getName());
DataStore ds =
Class.forName(className).asSubclass(DataStore.class).newInstance();
PropertiesUtil.populate(ds, getConfig(), false);
ds.init(getHomeDir());
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java?rev=1659457&r1=1659456&r2=1659457&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
Fri Feb 13 05:20:44 2015
@@ -16,6 +16,9 @@
*/
package org.apache.jackrabbit.oak.plugins.document;
+import static
org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType.REPOSITORY;
+import static org.junit.Assert.assertTrue;
+
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.HashSet;
@@ -33,7 +36,10 @@ import com.mongodb.DBCollection;
import junit.framework.Assert;
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
+import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
import
org.apache.jackrabbit.oak.plugins.document.VersionGarbageCollector.VersionGCStats;
+import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo;
import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
@@ -147,15 +153,22 @@ public class MongoBlobGCTest extends Abs
}
private void gc(HashSet<String> remaining) throws Exception {
DocumentNodeStore store = mk.getNodeStore();
+ String repoId = null;
+ if (SharedDataStoreUtils.isShared(store.getBlobStore())) {
+ repoId = ClusterRepositoryInfo.createId(store);
+ ((SharedDataStore) store.getBlobStore()).addMetadataRecord(
+ new ByteArrayInputStream(new byte[0]),
+ REPOSITORY.getNameFromId(repoId));
+ }
MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector(
new DocumentBlobReferenceRetriever(store),
(GarbageCollectableBlobStore) store.getBlobStore(),
MoreExecutors.sameThreadExecutor(),
- "./target", 5, true, 0);
- gc.collectGarbage();
+ "./target", 5, 0, repoId);
+ gc.collectGarbage(false);
Set<String> existingAfterGC = iterate();
- boolean empty = Sets.symmetricDifference(remaining,
existingAfterGC).isEmpty();
+ boolean empty = Sets.symmetricDifference(remaining,
existingAfterGC).isEmpty();
assertTrue(empty);
}
Added:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/SharedBlobStoreGCTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/SharedBlobStoreGCTest.java?rev=1659457&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/SharedBlobStoreGCTest.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/SharedBlobStoreGCTest.java
Fri Feb 13 05:20:44 2015
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.MoreExecutors;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.plugins.blob.BlobGarbageCollector;
+import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
+import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
+import
org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType;
+import
org.apache.jackrabbit.oak.plugins.document.VersionGarbageCollector.VersionGCStats;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreUtils;
+import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
+import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test for gc in a shared data store among hetrogeneous oak node stores.
+ */
+public class SharedBlobStoreGCTest {
+ private Cluster cluster1;
+ private Cluster cluster2;
+ private Clock clock;
+
+ @Before
+ public void setUp() throws Exception {
+ clock = new Clock.Virtual();
+ clock.waitUntil(Revision.getCurrentTimestamp());
+
+ BlobStore blobeStore1 = DataStoreUtils.getBlobStore();
+ DocumentNodeStore ds1 = new DocumentMK.Builder()
+ .setAsyncDelay(0)
+ .setDocumentStore(new MemoryDocumentStore())
+ .setBlobStore(blobeStore1)
+ .clock(clock)
+ .getNodeStore();
+ String repoId1 = ClusterRepositoryInfo.createId(ds1);
+ // Register the unique repository id in the data store
+ ((SharedDataStore) blobeStore1).addMetadataRecord(new
ByteArrayInputStream(new byte[0]),
+ SharedStoreRecordType.REPOSITORY.getNameFromId(repoId1));
+
+ BlobStore blobeStore2 = DataStoreUtils.getBlobStore();
+ DocumentNodeStore ds2 = new DocumentMK.Builder()
+ .setAsyncDelay(0)
+ .setDocumentStore(new MemoryDocumentStore())
+ .setBlobStore(blobeStore2)
+ .clock(clock)
+ .getNodeStore();
+ String repoId2 = ClusterRepositoryInfo.createId(ds2);
+ // Register the unique repository id in the data store
+ ((SharedDataStore) blobeStore2).addMetadataRecord(new
ByteArrayInputStream(new byte[0]),
+ SharedStoreRecordType.REPOSITORY.getNameFromId(repoId2));
+
+ cluster1 = new Cluster(ds1, repoId1, 20);
+ cluster1.init();
+ cluster2 = new Cluster(ds2, repoId2, 100);
+ cluster2.init();
+ }
+
+ static InputStream randomStream(int seed, int size) {
+ Random r = new Random(seed);
+ byte[] data = new byte[size];
+ r.nextBytes(data);
+ return new ByteArrayInputStream(data);
+ }
+
+ @Test
+ public void testGC() throws Exception {
+ // Only run the mark phase on both the clusters
+ cluster1.gc.collectGarbage(true);
+ cluster2.gc.collectGarbage(true);
+
+ // Execute the gc with sweep
+ cluster1.gc.collectGarbage(false);
+
+ Assert.assertEquals(true,
Sets.symmetricDifference(Sets.union(cluster1.getInitBlobs(),
cluster2.getInitBlobs()),
+ cluster1.getExistingBlobIds()).isEmpty());
+ }
+
+ @Test
+ // GC should fail
+ public void testOnly1ClusterMark() throws Exception {
+ // Only run the mark phase on one cluster
+ cluster1.gc.collectGarbage(true);
+
+ // Execute the gc with sweep
+ cluster1.gc.collectGarbage(false);
+
+ Assert.assertTrue(
+ (cluster1.getInitBlobs().size() + cluster2.getInitBlobs().size())
< cluster1.getExistingBlobIds().size());
+ Set<String> existing = cluster1.getExistingBlobIds();
+ Assert.assertTrue(existing.containsAll(cluster2.getInitBlobs()));
+ Assert.assertTrue(existing.containsAll(cluster1.getInitBlobs()));
+ }
+
+ @Test
+ public void testRepeatedMarkWithSweep() throws Exception {
+ // Only run the mark phase on one cluster
+ cluster1.gc.collectGarbage(true);
+ cluster2.gc.collectGarbage(true);
+ cluster2.gc.collectGarbage(true);
+
+ // Execute the gc with sweep
+ cluster2.gc.collectGarbage(false);
+
+ Assert.assertTrue(Sets.symmetricDifference(
+ Sets.union(cluster1.getInitBlobs(), cluster2.getInitBlobs()),
+ cluster1.getExistingBlobIds()).isEmpty());
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ FileUtils.cleanDirectory(new File(DataStoreUtils.getHomeDir()));
+ }
+
+ class Cluster {
+ private DocumentNodeStore ds;
+ private int seed;
+ private BlobGarbageCollector gc;
+
+ private Set<String> initBlobs = new HashSet<String>();
+
+ protected Set<String> getInitBlobs() {
+ return initBlobs;
+ }
+
+ public Cluster(final DocumentNodeStore ds, final String repoId, int
seed)
+ throws IOException {
+ this.ds = ds;
+ this.gc = new BlobGarbageCollector() {
+ @Override
+ public void collectGarbage(boolean markOnly) throws Exception {
+ MarkSweepGarbageCollector gc = new
MarkSweepGarbageCollector(
+ new DocumentBlobReferenceRetriever(ds),
+ (GarbageCollectableBlobStore) ds.getBlobStore(),
+ MoreExecutors.sameThreadExecutor(),
+ "./target", 5, 0, repoId);
+ gc.collectGarbage(markOnly);
+ }
+ };
+
+ this.seed = seed;
+ }
+
+ /**
+ * Creates the setup load with deletions.
+ *
+ * @throws Exception
+ */
+ public void init() throws Exception {
+ NodeBuilder a = ds.getRoot().builder();
+
+ int number = 10;
+ // track the number of the assets to be deleted
+ List<Integer> deletes = Lists.newArrayList();
+ Random rand = new Random(47);
+ for (int i = 0; i < 5; i++) {
+ int n = rand.nextInt(number);
+ if (!deletes.contains(n)) {
+ deletes.add(n);
+ }
+ }
+ for (int i = 0; i < number; i++) {
+ Blob b = ds.createBlob(randomStream(i + seed, 4160));
+ if (!deletes.contains(i)) {
+ Iterator<String> idIter =
+ ((GarbageCollectableBlobStore) ds.getBlobStore())
+ .resolveChunks(b.toString());
+ while (idIter.hasNext()) {
+ initBlobs.add(idIter.next());
+ }
+ }
+ a.child("c" + i).setProperty("x", b);
+ }
+ ds.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ a = ds.getRoot().builder();
+ for (int id : deletes) {
+ a.child("c" + id).remove();
+ ds.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ }
+ long maxAge = 10; // hours
+ // 1. Go past GC age and check no GC done as nothing deleted
+ clock.waitUntil(clock.getTime() +
TimeUnit.MINUTES.toMillis(maxAge));
+
+ VersionGarbageCollector vGC = ds.getVersionGarbageCollector();
+ VersionGCStats stats = vGC.gc(0, TimeUnit.MILLISECONDS);
+ Assert.assertEquals(deletes.size(), stats.deletedDocGCCount);
+ }
+
+ public Set<String> getExistingBlobIds() throws Exception {
+ GarbageCollectableBlobStore store = (GarbageCollectableBlobStore)
ds.getBlobStore();
+ Iterator<String> cur = store.getAllChunkIds(0);
+
+ Set<String> existing = Sets.newHashSet();
+ while (cur.hasNext()) {
+ existing.add(cur.next());
+ }
+ return existing;
+ }
+ }
+}
+
Propchange:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/SharedBlobStoreGCTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java?rev=1659457&r1=1659456&r2=1659457&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java
Fri Feb 13 05:20:44 2015
@@ -18,6 +18,7 @@
*/
package org.apache.jackrabbit.oak.plugins.segment;
+import static
org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType.REPOSITORY;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
@@ -38,8 +39,11 @@ import com.google.common.util.concurrent
import org.apache.commons.io.FileUtils;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
+import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreUtils;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
+import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo;
import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy;
import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
@@ -48,7 +52,6 @@ import org.apache.jackrabbit.oak.spi.com
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.junit.After;
-import org.junit.Ignore;
import org.junit.Test;
import javax.annotation.Nonnull;
@@ -131,16 +134,22 @@ public class SegmentDataStoreBlobGCTest
}
@Test
- @Ignore("OAK-2493")
public void gc() throws Exception {
HashSet<String> remaining = setUp();
+ String repoId = null;
+ if (SharedDataStoreUtils.isShared(store.getBlobStore())) {
+ repoId = ClusterRepositoryInfo.createId(nodeStore);
+ ((SharedDataStore) store.getBlobStore()).addMetadataRecord(
+ new ByteArrayInputStream(new byte[0]),
+ REPOSITORY.getNameFromId(repoId));
+ }
MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector(
new SegmentBlobReferenceRetriever(store.getTracker()),
(GarbageCollectableBlobStore) store.getBlobStore(),
MoreExecutors.sameThreadExecutor(),
- "./target", 2048, true, 0);
- gc.collectGarbage();
+ "./target", 2048, 0, repoId);
+ gc.collectGarbage(false);
Set<String> existingAfterGC = iterate();
assertTrue(Sets.symmetricDifference(remaining,
existingAfterGC).isEmpty());