Author: amitj
Date: Thu Aug 6 03:36:20 2015
New Revision: 1694393
URL: http://svn.apache.org/r1694393
Log:
OAK-3174: [Blob GC] Make actual deletion of blobs synchronous
Removed the concurrent requests to a synchronous request for deletion of blobs
from the blob store
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java?rev=1694393&r1=1694392&r2=1694393&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
Thu Aug 6 03:36:20 2015
@@ -316,13 +316,13 @@ public class MarkSweepGarbageCollector i
if (ids.size() > getBatchCount()) {
count += ids.size();
- executor.execute(new Sweeper(ids, exceptionQueue,
earliestRefAvailTime));
+ sweepInternal(ids, exceptionQueue, earliestRefAvailTime);
ids = Lists.newArrayList();
}
}
if (!ids.isEmpty()) {
count += ids.size();
- executor.execute(new Sweeper(ids, exceptionQueue,
earliestRefAvailTime));
+ sweepInternal(ids, exceptionQueue, earliestRefAvailTime);
}
count -= exceptionQueue.size();
@@ -367,41 +367,26 @@ public class MarkSweepGarbageCollector i
ids.clear();
writer.flush();
}
-
+
/**
- * Sweeper thread.
+ * Deletes a batch of blobs from blob store.
+ *
+ * @param ids
+ * @param exceptionQueue
+ * @param maxModified
*/
- class Sweeper implements Runnable {
-
- /** The exception queue. */
- private final ConcurrentLinkedQueue<String> exceptionQueue;
-
- /** The ids to sweep. */
- private final List<String> ids;
-
- private final long maxModified;
-
- public Sweeper(List<String> ids, ConcurrentLinkedQueue<String>
exceptionQueue,
- long maxModified) {
- this.exceptionQueue = exceptionQueue;
- this.ids = ids;
- this.maxModified = maxModified;
- }
-
- @Override
- public void run() {
- try {
- LOG.debug("Blob ids to be deleted {}", ids);
- boolean deleted = blobStore.deleteChunks(ids,
getLastMaxModifiedTime(maxModified));
- if (!deleted) {
- // Only log and do not add to exception queue since some
blobs may not match the
- // lastMaxModifiedTime criteria.
- LOG.debug("Some blobs were not deleted from the batch :
[{}]", ids);
- }
- } catch (Exception e) {
- LOG.warn("Error occurred while deleting blob with ids [{}]",
ids, e);
- exceptionQueue.addAll(ids);
+ private void sweepInternal(List<String> ids, ConcurrentLinkedQueue<String>
exceptionQueue, long maxModified) {
+ try {
+ LOG.debug("Blob ids to be deleted {}", ids);
+ boolean deleted = blobStore.deleteChunks(ids,
getLastMaxModifiedTime(maxModified));
+ if (!deleted) {
+ // Only log and do not add to exception queue since some blobs
may not match the
+ // lastMaxModifiedTime criteria.
+ LOG.debug("Some blobs were not deleted from the batch : [{}]",
ids);
}
+ } catch (Exception e) {
+ LOG.warn("Error occurred while deleting blob with ids [{}]", ids,
e);
+ exceptionQueue.addAll(ids);
}
}
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java?rev=1694393&r1=1694392&r2=1694393&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
Thu Aug 6 03:36:20 2015
@@ -17,6 +17,7 @@
package org.apache.jackrabbit.oak.plugins.document;
import static
org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType.REPOSITORY;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
@@ -26,6 +27,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.Lists;
@@ -158,16 +161,16 @@ public class MongoBlobGCTest extends Abs
new ByteArrayInputStream(new byte[0]),
REPOSITORY.getNameFromId(repoId));
}
+ ThreadPoolExecutor executor = (ThreadPoolExecutor)
Executors.newFixedThreadPool(10);
MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector(
new DocumentBlobReferenceRetriever(store),
- (GarbageCollectableBlobStore) store.getBlobStore(),
- MoreExecutors.sameThreadExecutor(),
- "./target", 5, 0, repoId);
+ (GarbageCollectableBlobStore) store.getBlobStore(), executor,
"./target", 5, 0, repoId);
Thread.sleep(4000);
gc.collectGarbage(false);
-
+
+ assertEquals(0, executor.getTaskCount());
Set<String> existingAfterGC = iterate();
- boolean empty = Sets.symmetricDifference(remaining,
existingAfterGC).isEmpty();
+ boolean empty = Sets.symmetricDifference(remaining,
existingAfterGC).isEmpty();
assertTrue(empty);
}
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java?rev=1694393&r1=1694392&r2=1694393&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java
Thu Aug 6 03:36:20 2015
@@ -20,6 +20,7 @@ package org.apache.jackrabbit.oak.plugin
import static org.apache.commons.io.FileUtils.byteCountToDisplaySize;
import static
org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType.REPOSITORY;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
@@ -33,6 +34,8 @@ import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
@@ -203,14 +206,15 @@ public class SegmentDataStoreBlobGCTest
new ByteArrayInputStream(new byte[0]),
REPOSITORY.getNameFromId(repoId));
}
-
+
+ ThreadPoolExecutor executor = (ThreadPoolExecutor)
Executors.newFixedThreadPool(10);
MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector(
new SegmentBlobReferenceRetriever(store.getTracker()),
- (GarbageCollectableBlobStore) store.getBlobStore(),
- MoreExecutors.sameThreadExecutor(),
+ (GarbageCollectableBlobStore) store.getBlobStore(),
executor,
"./target", 2048, 0, repoId);
gc.collectGarbage(false);
+ assertEquals(0, executor.getTaskCount());
Set<String> existingAfterGC = iterate();
log.info("{} blobs that should have remained after gc : {}",
remaining.size(), remaining);
log.info("{} blobs existing after gc : {}", existingAfterGC.size(),
existingAfterGC);