Author: mduerig
Date: Fri Mar 28 13:30:06 2014
New Revision: 1582748
URL: http://svn.apache.org/r1582748
Log:
OAK-1582: ClassCastException in MarkSweepGarbageCollector#init()
Slightly edited patch from Amit Jain
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobReferenceRetriever.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/ReferenceCollector.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentBlobReferenceRetriever.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBlobReferenceRetriever.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java
Removed:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/ReferenceCollector.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobReferenceRetriever.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobReferenceRetriever.java?rev=1582748&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobReferenceRetriever.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobReferenceRetriever.java
Fri Mar 28 13:30:06 2014
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+
+/**
+ * Interface to abstract out the low-level details of retrieving blob
references from different
+ * {@link NodeStore}
+ */
+public interface BlobReferenceRetriever {
+
+ /**
+ * Collect references.
+ *
+ * @param collector the collector to collect all references
+ * @return the references
+ * @throws Exception the exception
+ */
+ void getReferences(ReferenceCollector collector) throws Exception;
+}
+
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java?rev=1582748&r1=1582747&r2=1582748&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
Fri Mar 28 13:30:06 2014
@@ -21,7 +21,6 @@ import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayDeque;
-import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
@@ -43,14 +42,10 @@ import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.io.Files;
-
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;
-import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
import org.apache.jackrabbit.oak.commons.IOUtils;
-import org.apache.jackrabbit.oak.api.Blob;
-import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
-import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,8 +71,8 @@ public class MarkSweepGarbageCollector i
public static final String SWEEPING = "Running-Sweeping";
- /** The max last modified time of blobs to consider for garbage
collection. */
- private long maxLastModifiedTime;
+ /** The last modified time before current time of blobs to consider for
garbage collection. */
+ private long maxLastModifiedInterval = TimeUnit.HOURS.toMillis(24);
/** Run concurrently when possible. */
private boolean runConcurrently = true;
@@ -85,8 +80,11 @@ public class MarkSweepGarbageCollector i
/** The number of sweeper threads to use. */
private int numSweepers = 1;
- /** The node store. */
- private DocumentNodeStore nodeStore;
+ /** The blob store to be garbage collected. */
+ private GarbageCollectableBlobStore blobStore;
+
+ /** Helper class to mark blob references which **/
+ private BlobReferenceRetriever marker;
/** The garbage collector file state */
private GarbageCollectorFileState fs;
@@ -98,27 +96,27 @@ public class MarkSweepGarbageCollector i
private int batchCount = DEFAULT_BATCH_COUNT;
/** Flag to indicate whether to run in a debug mode **/
- private boolean debugMode = Boolean.getBoolean("debugModeGC") |
LOG.isDebugEnabled();
+ private final boolean debugMode = Boolean.getBoolean("debugModeGC") ||
LOG.isDebugEnabled();
/** Flag to indicate the state of the gc **/
private String state;
/**
- * Gets the max last modified time considered for garbage collection.
+ * Gets the max last modified interval considered for garbage collection.
*
- * @return the max last modified time
+ * @return the max last modified interval
*/
- protected long getMaxLastModifiedTime() {
- return maxLastModifiedTime;
+ protected long getMaxLastModifiedInterval() {
+ return maxLastModifiedInterval;
}
/**
- * Sets the max last modified time considered for garbage collection.
+ * Sets the max last modified interval considered for garbage collection.
*
- * @param maxLastModifiedTime the new max last modified time
+ * @param maxLastModifiedInterval the new max last modified interval
*/
- protected void setMaxLastModifiedTime(long maxLastModifiedTime) {
- this.maxLastModifiedTime = maxLastModifiedTime;
+ protected void setMaxLastModifiedInterval(long maxLastModifiedInterval) {
+ this.maxLastModifiedInterval = maxLastModifiedInterval;
}
/**
@@ -167,41 +165,45 @@ public class MarkSweepGarbageCollector i
}
/**
- * @param nodeStore the node store
+ * @param marker
+ * @param blobStore
* @param root the root
* @param batchCount the batch count
* @param runBackendConcurrently - run the backend iterate concurrently
* @param maxSweeperThreads the max sweeper threads
- * @param maxLastModifiedTime the max last modified time
+ * @param maxLastModifiedInterval
* @throws IOException Signals that an I/O exception has occurred.
*/
public void init(
- NodeStore nodeStore,
+ BlobReferenceRetriever marker,
+ GarbageCollectableBlobStore blobStore,
String root,
int batchCount,
boolean runBackendConcurrently,
int maxSweeperThreads,
- long maxLastModifiedTime)
+ long maxLastModifiedInterval)
throws IOException {
this.batchCount = batchCount;
this.root = root;
this.runConcurrently = runBackendConcurrently;
this.numSweepers = maxSweeperThreads;
- this.maxLastModifiedTime = maxLastModifiedTime;
- init(nodeStore);
+ this.maxLastModifiedInterval = maxLastModifiedInterval;
+ init(marker, blobStore);
}
/**
* Instantiates a new blob garbage collector.
*
- * @param nodeStore
- * the node store
- * @throws IOException
- * Signals that an I/O exception has occurred.
+ * @param marker
+ * @param blobStore
+ * @throws IOException Signals that an I/O exception has occurred.
*/
- public void init(NodeStore nodeStore) throws IOException {
+ public void init(BlobReferenceRetriever marker,
GarbageCollectableBlobStore blobStore)
+ throws IOException {
Preconditions.checkState(!Strings.isNullOrEmpty(root));
- this.nodeStore = (DocumentNodeStore) nodeStore;
+
+ this.blobStore = blobStore;
+ this.marker = marker;
fs = new GarbageCollectorFileState(root);
}
@@ -216,12 +218,11 @@ public class MarkSweepGarbageCollector i
* @throws Exception
* the exception
*/
- protected void markAndSweep() throws Exception {
+ public void markAndSweep() throws Exception {
try {
LOG.debug("Starting garbage collector");
mark();
- difference();
sweep();
LOG.debug("garbage collector finished");
@@ -237,7 +238,7 @@ public class MarkSweepGarbageCollector i
* @throws Exception
* the exception
*/
- protected void mark() throws Exception {
+ public void mark() throws Exception {
state = MARKING;
LOG.debug("Starting mark phase of the garbage collector");
@@ -260,7 +261,7 @@ public class MarkSweepGarbageCollector i
blobIdRetrieverThread.join();
}
}
-
+ difference();
LOG.debug("Ending mark phase of the garbage collector");
}
@@ -309,68 +310,74 @@ public class MarkSweepGarbageCollector i
* @throws IOException
* Signals that an I/O exception has occurred.
*/
- protected void sweep() throws IOException {
- state = SWEEPING;
- LOG.debug("Starting sweep phase of the garbage collector");
-
- ConcurrentLinkedQueue<String> exceptionQueue = new
ConcurrentLinkedQueue<String>();
- ExecutorService executorService =
- new ThreadPoolExecutor(getNumSweepers(), getNumSweepers(), 1,
- TimeUnit.MINUTES,
- new LinkedBlockingQueue<Runnable>(),
- new ThreadFactory() {
- private final AtomicInteger threadCounter = new
AtomicInteger();
-
- private String getName() {
- return "MarkSweepGarbageCollector-Sweeper-" +
threadCounter.getAndIncrement();
- }
-
- @Override
- public Thread newThread(Runnable r) {
- Thread thread = new Thread(r, getName());
- thread.setDaemon(true);
- return thread;
- }
- });
-
- LineIterator iterator = FileUtils.lineIterator(fs.getGcCandidates(),
Charsets.UTF_8.name());
- List<String> ids = Lists.newArrayList();
- int count = 0;
- while (iterator.hasNext()) {
- ids.add(iterator.next());
-
- if (ids.size() > getBatchCount()) {
+ public void sweep() throws IOException {
+ try {
+ state = SWEEPING;
+ LOG.debug("Starting sweep phase of the garbage collector");
+
+ ConcurrentLinkedQueue<String> exceptionQueue = new
ConcurrentLinkedQueue<String>();
+ ExecutorService executorService =
+ new ThreadPoolExecutor(getNumSweepers(), getNumSweepers(),
1,
+ TimeUnit.MINUTES,
+ new LinkedBlockingQueue<Runnable>(),
+ new ThreadFactory() {
+ private final AtomicInteger threadCounter =
new AtomicInteger();
+
+ private String getName() {
+ return
"MarkSweepGarbageCollector-Sweeper-" + threadCounter.getAndIncrement();
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread thread = new Thread(r, getName());
+ thread.setDaemon(true);
+ return thread;
+ }
+ });
+
+ LineIterator iterator =
+ FileUtils.lineIterator(fs.getGcCandidates(),
Charsets.UTF_8.name());
+ List<String> ids = Lists.newArrayList();
+ int count = 0;
+ while (iterator.hasNext()) {
+ ids.add(iterator.next());
+
+ if (ids.size() > getBatchCount()) {
+ count += ids.size();
+ executorService.execute(new Sweeper(ids, exceptionQueue));
+ ids = Lists.newArrayList();
+ }
+ }
+ if (!ids.isEmpty()) {
count += ids.size();
executorService.execute(new Sweeper(ids, exceptionQueue));
- ids = Lists.newArrayList();
}
- }
- if (!ids.isEmpty()) {
- count += ids.size();
- executorService.execute(new Sweeper(ids, exceptionQueue));
- }
-
- try {
- executorService.shutdown();
- executorService.awaitTermination(100, TimeUnit.MINUTES);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- count -= exceptionQueue.size();
- BufferedWriter writer = null;
- try {
- if (!exceptionQueue.isEmpty()) {
- writer = Files.newWriter(fs.getGarbage(), Charsets.UTF_8);
- saveBatchToFile(Lists.newArrayList(exceptionQueue), writer);
+
+ try {
+ executorService.shutdown();
+ executorService.awaitTermination(100, TimeUnit.MINUTES);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
}
+
+ count -= exceptionQueue.size();
+ BufferedWriter writer = null;
+ try {
+ if (!exceptionQueue.isEmpty()) {
+ writer = Files.newWriter(fs.getGarbage(), Charsets.UTF_8);
+ saveBatchToFile(Lists.newArrayList(exceptionQueue),
writer);
+ }
+ } finally {
+ LineIterator.closeQuietly(iterator);
+ IOUtils.closeQuietly(writer);
+ }
+
+ LOG.debug("Blobs deleted count - " + count);
+ LOG.debug("Ending sweep phase of the garbage collector");
} finally {
- LineIterator.closeQuietly(iterator);
- IOUtils.closeQuietly(writer);
+ fs.complete();
+ state = NOT_RUNNING;
}
-
- LOG.debug("Blobs deleted count - " + count);
- LOG.debug("Ending sweep phase of the garbage collector");
}
/**
@@ -396,10 +403,10 @@ public class MarkSweepGarbageCollector i
class Sweeper implements Runnable {
/** The exception queue. */
- private ConcurrentLinkedQueue<String> exceptionQueue;
+ private final ConcurrentLinkedQueue<String> exceptionQueue;
/** The ids to sweep. */
- private List<String> ids;
+ private final List<String> ids;
/**
* Instantiates a new sweeper.
@@ -417,8 +424,10 @@ public class MarkSweepGarbageCollector i
@Override
public void run() {
try {
- boolean deleted = ((GarbageCollectableBlobStore)
nodeStore.getBlobStore())
- .deleteChunks(ids, maxLastModifiedTime);
+ boolean deleted =
+ blobStore.deleteChunks(ids,
+ (maxLastModifiedInterval > 0 ?
System.currentTimeMillis()
+ - maxLastModifiedInterval :
0));
if (!deleted) {
exceptionQueue.addAll(ids);
}
@@ -436,57 +445,59 @@ public class MarkSweepGarbageCollector i
* @throws Exception
* the exception
*/
- private List<String> iterateNodeTree() throws Exception {
- ArrayList<String> referencedBlobs = Lists.newArrayList();
- BufferedWriter writer = null;
+ private void iterateNodeTree() throws Exception {
+ final BufferedWriter writer = Files.newWriter(fs.getMarkedRefs(),
Charsets.UTF_8);
try {
- writer = Files.newWriter(fs.getMarkedRefs(), Charsets.UTF_8);
-
- fs.sort(fs.getMarkedRefs());
-
- Iterator<Blob> blobIterator =
nodeStore.getReferencedBlobsIterator();
- referencedBlobs.ensureCapacity(getBatchCount());
-
- int referencesFound = 0;
- while (blobIterator.hasNext()) {
- Blob blob = blobIterator.next();
+ marker.getReferences(
+ new ReferenceCollector() {
+ private final List<String> idBatch = Lists
+ .newArrayListWithCapacity(getBatchCount());
+
+ private int count = 0;
+
+ @Override
+ public void addReference(String blobId) {
+ if (debugMode) {
+ LOG.debug("BlobId : " + blobId);
+ }
- if (debugMode) {
- LOG.debug("BlobId : " + blob.toString());
- }
+ try {
+ Iterator<String> idIter =
blobStore.resolveChunks(blobId);
+ while (idIter.hasNext()) {
+ String id = idIter.next();
+ idBatch.add(id);
+
+ if (idBatch.size() >= getBatchCount()) {
+ saveBatchToFile(idBatch, writer);
+ idBatch.clear();
+ }
+
+ if (debugMode) {
+ LOG.debug("chunkId : " + id);
+ }
+ count++;
+ }
+
+ if (!idBatch.isEmpty()) {
+ saveBatchToFile(idBatch, writer);
+ idBatch.clear();
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Error in
retrieving references", e);
+ }
- if (blob.toString().length() != 0) {
- Iterator<String> idIter = ((GarbageCollectableBlobStore)
nodeStore
- .getBlobStore())
- .resolveChunks(blob.toString());
- while (idIter.hasNext()) {
- String id = idIter.next();
- referencedBlobs.add(id);
- if (debugMode) {
- LOG.debug("chunkId : " + id);
+ LOG.info("Marked Reference : " + count);
}
- }
- }
-
- if (referencedBlobs.size() >= getBatchCount()) {
- referencesFound += referencedBlobs.size();
- saveBatchToFile(referencedBlobs, writer);
- }
- }
+ });
- if (!referencedBlobs.isEmpty()) {
- referencesFound += referencedBlobs.size();
- saveBatchToFile(referencedBlobs, writer);
- }
+ // sort the marked references
fs.sort(fs.getMarkedRefs());
-
- LOG.debug("Blob references found (including chunk resolution) " +
referencesFound);
} finally {
IOUtils.closeQuietly(writer);
}
- return referencedBlobs;
}
+
/**
* BlobIdRetriever class to retrieve all blob ids.
*/
@@ -506,8 +517,8 @@ public class MarkSweepGarbageCollector i
try {
bufferWriter = new BufferedWriter(
new FileWriter(fs.getAvailableRefs()));
- Iterator<String> idsIter = ((GarbageCollectableBlobStore)
nodeStore.getBlobStore())
- .getAllChunkIds(maxLastModifiedTime);
+ Iterator<String> idsIter =
blobStore.getAllChunkIds(maxLastModifiedInterval);
+
List<String> ids = Lists.newArrayList();
int blobsCount = 0;
while (idsIter.hasNext()) {
@@ -525,7 +536,7 @@ public class MarkSweepGarbageCollector i
// sort the file
fs.sort(fs.getAvailableRefs());
- LOG.debug("Ending retrieve of all blobs : " + blobsCount);
+ LOG.debug("Ending retrieving all blobs : " + blobsCount);
} catch (Exception e) {
e.printStackTrace();
} finally {
@@ -543,12 +554,12 @@ public class MarkSweepGarbageCollector i
class FileLineDifferenceIterator<T> implements Iterator<String> {
/** The marked references iterator. */
- private LineIterator markedIter;
+ private final LineIterator markedIter;
/** The available references iter. */
- private LineIterator allIter;
+ private final LineIterator allIter;
- private ArrayDeque<String> queue;
+ private final ArrayDeque<String> queue;
private boolean done;
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/ReferenceCollector.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/ReferenceCollector.java?rev=1582748&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/ReferenceCollector.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/ReferenceCollector.java
Fri Mar 28 13:30:06 2014
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob;
+
+/**
+ * Callback interface for collecting all blob references that are
+ * potentially accessible. Useful for marking referenced blobs as
+ * in use when collecting garbage in an external data store.
+ */
+public interface ReferenceCollector {
+
+ void addReference(String reference);
+
+}
+
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentBlobReferenceRetriever.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentBlobReferenceRetriever.java?rev=1582748&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentBlobReferenceRetriever.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentBlobReferenceRetriever.java
Fri Mar 28 13:30:06 2014
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.Iterator;
+
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.plugins.blob.BlobReferenceRetriever;
+import org.apache.jackrabbit.oak.plugins.blob.ReferenceCollector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of {@link BlobReferenceRetriever} for the DocumentNodeStore.
+ */
+public class DocumentBlobReferenceRetriever implements BlobReferenceRetriever {
+ public static final Logger LOG =
LoggerFactory.getLogger(DocumentBlobReferenceRetriever.class);
+
+ private final Iterator<Blob> blobIterator;
+
+ public DocumentBlobReferenceRetriever(Iterator<Blob> iterator) {
+ this.blobIterator = iterator;
+ }
+
+ @Override
+ public void getReferences(ReferenceCollector collector) throws Exception {
+ int referencesFound = 0;
+ while (blobIterator.hasNext()) {
+ Blob blob = blobIterator.next();
+ if (blob.length() != 0) {
+ collector.addReference(blob.toString());
+ }
+ }
+
+ LOG.debug("Blob references found (including chunk resolution) " +
referencesFound);
+ }
+}
+
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java?rev=1582748&r1=1582747&r2=1582748&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
Fri Mar 28 13:30:06 2014
@@ -309,9 +309,10 @@ public class DocumentNodeStoreService {
executor.start(wb);
if (blobStore instanceof GarbageCollectableBlobStore) {
MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector();
- gc.init(store); // FIXME OAK-1582 ClassCastException in
MarkSweepGarbageCollector#init() if using KernelNodeStore
+ gc.init(new
DocumentBlobReferenceRetriever(store.getReferencedBlobsIterator()),
+ (GarbageCollectableBlobStore) store.getBlobStore());
registrations.add(registerMBean(wb, BlobGCMBean.class, new
BlobGC(gc, executor),
- BlobGCMBean.TYPE, "Segment node store blob garbage
collection"));
+ BlobGCMBean.TYPE, "Document node store blob garbage
collection"));
}
RevisionGC revisionGC = new RevisionGC(new Runnable() {
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java?rev=1582748&r1=1582747&r2=1582748&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
Fri Mar 28 13:30:06 2014
@@ -36,6 +36,7 @@ import java.util.concurrent.ConcurrentMa
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.plugins.blob.ReferenceCollector;
import org.apache.jackrabbit.oak.plugins.memory.PropertyStates;
import com.google.common.base.Charsets;
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBlobReferenceRetriever.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBlobReferenceRetriever.java?rev=1582748&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBlobReferenceRetriever.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBlobReferenceRetriever.java
Fri Mar 28 13:30:06 2014
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment;
+
+import org.apache.jackrabbit.oak.plugins.blob.BlobReferenceRetriever;
+import org.apache.jackrabbit.oak.plugins.blob.ReferenceCollector;
+
+/**
+ * Implementation of {@link BlobReferenceRetriever} to retrieve blob
references from the
+ * {@link SegmentTracker}.
+ */
+public class SegmentBlobReferenceRetriever implements BlobReferenceRetriever {
+
+ private final SegmentTracker tracker;
+
+ public SegmentBlobReferenceRetriever(SegmentTracker tracker) {
+ this.tracker = tracker;
+ }
+
+ @Override
+ public void getReferences(final ReferenceCollector collector) {
+ tracker.collectBlobReferences(collector);
+ }
+}
+
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java?rev=1582748&r1=1582747&r2=1582748&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
Fri Mar 28 13:30:06 2014
@@ -127,7 +127,7 @@ public class SegmentNodeStoreService ext
}
Dictionary<?, ?> properties = context.getProperties();
- name = "" + properties.get(NAME);
+ name = String.valueOf(properties.get(NAME));
String directory = lookup(context, DIRECTORY);
if (directory == null) {
@@ -175,7 +175,8 @@ public class SegmentNodeStoreService ext
if (blobStore instanceof GarbageCollectableBlobStore) {
MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector();
-// gc.init(delegate); FIXME OAK-1582 ClassCastException in
MarkSweepGarbageCollector#init()
+ gc.init(new SegmentBlobReferenceRetriever(store.getTracker()),
+ (GarbageCollectableBlobStore) blobStore);
blobGCRegistration = registerMBean(whiteboard, BlobGCMBean.class,
new BlobGC(gc, executor),
BlobGCMBean.TYPE, "Segment node store blob garbage
collection");
}
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java?rev=1582748&r1=1582747&r2=1582748&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java
Fri Mar 28 13:30:06 2014
@@ -26,6 +26,8 @@ import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;
+import org.apache.jackrabbit.oak.plugins.blob.ReferenceCollector;
+
/**
* Tracker of references to segment identifiers and segment instances
* that are currently kept in memory.
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java?rev=1582748&r1=1582747&r2=1582748&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
Fri Mar 28 13:30:06 2014
@@ -31,9 +31,9 @@ import com.google.common.collect.Sets;
import com.mongodb.BasicDBObject;
import com.mongodb.DBCollection;
-import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
+import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
@@ -92,9 +92,10 @@ public class MongoBlobGCTest extends Abs
public void gc() throws Exception {
HashSet<String> set = setUp();
- DocumentNodeStore s = mk.getNodeStore();
+ DocumentNodeStore store = mk.getNodeStore();
MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector();
- gc.init(s, "./target", 2048, true, 2, 0);
+ gc.init(new
DocumentBlobReferenceRetriever(store.getReferencedBlobsIterator()),
+ (GarbageCollectableBlobStore) store.getBlobStore(),
"./target", 2048, true, 2, 0);
gc.collectGarbage();
Set<String> existing = iterate();
@@ -109,7 +110,7 @@ public class MongoBlobGCTest extends Abs
Set<String> existing = Sets.newHashSet();
while (cur.hasNext()) {
- existing.add((String) cur.next());
+ existing.add(cur.next());
}
return existing;
}
Added:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java?rev=1582748&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java
Fri Mar 28 13:30:06 2014
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.core.data.FileDataStore;
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
+import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.junit.After;
+import org.junit.Test;
+
+/**
+ * Tests for SegmentNodeStore DataStore GC
+ */
+public class SegmentDataStoreBlobGCTest {
+ SegmentNodeStore nodeStore;
+ SegmentStore store;
+ DataStoreBlobStore blobStore;
+
+ protected SegmentNodeStore getNodeStore(BlobStore blobStore) throws
IOException {
+ if (nodeStore == null) {
+ store = new FileStore(blobStore, getWorkDir(), 256, false);
+ nodeStore = new SegmentNodeStore(store);
+ }
+ return nodeStore;
+ }
+
+ private static File getWorkDir() {
+ return new File("target", "DataStoreBlobGCTest");
+ }
+
+ public HashSet<String> setUp() throws Exception {
+ FileDataStore fds = new FileDataStore();
+ fds.setMinRecordLength(4092);
+ fds.init(getWorkDir().getAbsolutePath());
+ blobStore = new DataStoreBlobStore(fds);
+ nodeStore = getNodeStore(blobStore);
+
+ HashSet<String> set = new HashSet<String>();
+
+ NodeBuilder a = nodeStore.getRoot().builder();
+
+ int number = 2;
+ // track the number of the assets to be deleted
+ List<Integer> processed = Lists.newArrayList();
+ Random rand = new Random();
+ for (int i = 0; i < 1; i++) {
+ int n = rand.nextInt(number);
+ if (!processed.contains(n)) {
+ processed.add(n);
+ }
+ }
+ for (int i = 0; i < number; i++) {
+ Blob b = nodeStore.createBlob(randomStream(i, 16516));
+ if (processed.contains(i)) {
+ Iterator<String> idIter = blobStore
+ .resolveChunks(b.toString());
+ while (idIter.hasNext()) {
+ set.add(idIter.next());
+ }
+ }
+ a.child("c" + i).setProperty("x", b);
+ }
+ nodeStore.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ for (int id : processed) {
+ delete("c" + id);
+ }
+ store.gc();
+
+ return set;
+ }
+
+ private void delete(String nodeId) throws CommitFailedException {
+ NodeBuilder builder = nodeStore.getRoot().builder();
+ builder.child(nodeId).remove();
+
+ nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ }
+
+ @Test
+ public void gc() throws Exception {
+ HashSet<String> set = setUp();
+
+ MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector();
+ gc.init(new SegmentBlobReferenceRetriever(store.getTracker()),
+ (GarbageCollectableBlobStore) store.getBlobStore(),
"./target", 2048, true, 2, 0);
+ gc.collectGarbage();
+
+ Set<String> existing = iterate();
+ boolean empty = Sets.intersection(set, existing).isEmpty();
+ assertTrue(empty);
+ }
+
+ protected Set<String> iterate() throws Exception {
+ Iterator<String> cur = blobStore.getAllChunkIds(0);
+
+ Set<String> existing = Sets.newHashSet();
+ while (cur.hasNext()) {
+ existing.add(cur.next());
+ }
+ return existing;
+ }
+
+ @After
+ public void close() throws IOException {
+ if (store != null) {
+ store.close();
+ }
+ FileUtils.cleanDirectory(getWorkDir());
+ }
+
+ static InputStream randomStream(int seed, int size) {
+ Random r = new Random(seed);
+ byte[] data = new byte[size];
+ r.nextBytes(data);
+ return new ByteArrayInputStream(data);
+ }
+}
+