This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit c589698256984780ec4f23104c251b2969b645b9 Author: Tung Van TRAN <[email protected]> AuthorDate: Thu Aug 26 16:18:07 2021 +0700 JAMES-3150 Refactor BlobGCTask - Add Builder + scope --- .../server/blob/deduplication/BlobGCTask.java | 126 ++++++++++++++++++--- 1 file changed, 112 insertions(+), 14 deletions(-) diff --git a/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCTask.java b/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCTask.java index 660422b..7ecb0a4 100644 --- a/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCTask.java +++ b/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCTask.java @@ -21,6 +21,7 @@ package org.apache.james.server.blob.deduplication; import java.time.Clock; import java.time.Instant; +import java.util.Arrays; import java.util.Optional; import java.util.Set; @@ -34,6 +35,8 @@ import org.apache.james.task.TaskType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + import reactor.core.scheduler.Schedulers; public class BlobGCTask implements Task { @@ -42,7 +45,7 @@ public class BlobGCTask implements Task { public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation { - private static AdditionalInformation from(Context context) { + private static AdditionalInformation from(Scope scope, Context context) { Context.Snapshot snapshot = context.snapshot(); return new AdditionalInformation( snapshot.getReferenceSourceCount(), @@ -50,7 +53,8 @@ public class BlobGCTask implements Task { snapshot.getGcedBlobCount(), snapshot.getErrorCount(), snapshot.getBloomFilterExpectedBlobCount(), - snapshot.getBloomFilterAssociatedProbability()); + snapshot.getBloomFilterAssociatedProbability(), + scope); } private final Instant timestamp; @@ -60,13 +64,15 @@ public class BlobGCTask implements Task { private final long errorCount; private final long bloomFilterExpectedBlobCount; private final double bloomFilterAssociatedProbability; + private final Scope scope; AdditionalInformation(long referenceSourceCount, long blobCount, long gcedBlobCount, long errorCount, long bloomFilterExpectedBlobCount, - double bloomFilterAssociatedProbability) { + double bloomFilterAssociatedProbability, + Scope scope) { this.referenceSourceCount = referenceSourceCount; this.blobCount = blobCount; this.gcedBlobCount = gcedBlobCount; @@ -74,6 +80,7 @@ public class BlobGCTask implements Task { this.bloomFilterExpectedBlobCount = bloomFilterExpectedBlobCount; this.bloomFilterAssociatedProbability = bloomFilterAssociatedProbability; this.timestamp = Clock.systemUTC().instant(); + this.scope = scope; } @Override @@ -108,8 +115,91 @@ public class BlobGCTask implements Task { public double getBloomFilterAssociatedProbability() { return bloomFilterAssociatedProbability; } + + public Scope getScope() { + return scope; + } + } + + public enum Scope { + UNREFERENCED; + + static class ScopeInvalidException extends IllegalArgumentException { + } + + public static Optional<Scope> from(String name) { + Preconditions.checkNotNull(name); + return Arrays.stream(Scope.values()) + .filter(value -> name.equalsIgnoreCase(value.name())) + .findFirst(); + } + } + + interface Builder { + + @FunctionalInterface + interface RequireScope { + BlobGCTask scope(Scope scope); + } + + @FunctionalInterface + interface RequireAssociatedProbability { + RequireScope associatedProbability(double associatedProbability); + } + + @FunctionalInterface + interface RequireExpectedBlobCount { + RequireAssociatedProbability expectedBlobCount(int expectedBlobCount); + } + + @FunctionalInterface + interface RequireClock { + RequireExpectedBlobCount clock(Clock clock); + } + + @FunctionalInterface + interface RequireBucketName { + RequireClock bucketName(BucketName bucketName); + } + + @FunctionalInterface + interface RequireBlobReferenceSources { + RequireBucketName blobReferenceSource(Set<BlobReferenceSource> blobReferenceSources); + } + + @FunctionalInterface + interface RequireGenerationAwareBlobIdConfiguration { + RequireBlobReferenceSources generationAwareBlobIdConfiguration(GenerationAwareBlobId.Configuration generationAwareBlobIdConfiguration); + } + + @FunctionalInterface + interface RequireGenerationAwareBlobIdFactory { + RequireGenerationAwareBlobIdConfiguration generationAwareBlobIdFactory(GenerationAwareBlobId.Factory generationAwareBlobIdFactory); + } + + @FunctionalInterface + interface RequireBlobStoreDAO { + RequireGenerationAwareBlobIdFactory blobStoreDAO(BlobStoreDAO blobStoreDAO); + } } + public static Builder.RequireBlobStoreDAO builder() { + return blobStoreDao -> generationAwareBlobIdFactory -> generationAwareBlobIdConfiguration + -> blobReferenceSources -> bucketName -> clock -> expectedBlobCount + -> associatedProbability -> scope + -> new BlobGCTask( + blobStoreDao, + generationAwareBlobIdFactory, + generationAwareBlobIdConfiguration, + blobReferenceSources, + bucketName, + clock, + expectedBlobCount, + associatedProbability, + scope); + } + + private final BlobStoreDAO blobStoreDAO; private final GenerationAwareBlobId.Factory generationAwareBlobIdFactory; private final GenerationAwareBlobId.Configuration generationAwareBlobIdConfiguration; @@ -119,6 +209,8 @@ public class BlobGCTask implements Task { private final int expectedBlobCount; private final double associatedProbability; private final Context context; + private final Scope scope; + public BlobGCTask(BlobStoreDAO blobStoreDAO, GenerationAwareBlobId.Factory generationAwareBlobIdFactory, @@ -127,7 +219,8 @@ public class BlobGCTask implements Task { BucketName bucketName, Clock clock, int expectedBlobCount, - double associatedProbability) { + double associatedProbability, + Scope scope) { this.blobStoreDAO = blobStoreDAO; this.generationAwareBlobIdFactory = generationAwareBlobIdFactory; this.generationAwareBlobIdConfiguration = generationAwareBlobIdConfiguration; @@ -137,20 +230,25 @@ public class BlobGCTask implements Task { this.expectedBlobCount = expectedBlobCount; this.associatedProbability = associatedProbability; this.context = new Context(expectedBlobCount, associatedProbability); + this.scope = scope; } @Override public Result run() throws InterruptedException { - BloomFilterGCAlgorithm gcAlgorithm = new BloomFilterGCAlgorithm( - BlobReferenceAggregate.aggregate(blobReferenceSources), - blobStoreDAO, - generationAwareBlobIdFactory, - generationAwareBlobIdConfiguration, - clock); + if (Scope.UNREFERENCED.equals(this.scope)) { + BloomFilterGCAlgorithm gcAlgorithm = new BloomFilterGCAlgorithm( + BlobReferenceAggregate.aggregate(blobReferenceSources), + blobStoreDAO, + generationAwareBlobIdFactory, + generationAwareBlobIdConfiguration, + clock); - return gcAlgorithm.gc(expectedBlobCount, associatedProbability, bucketName, context) - .subscribeOn(Schedulers.elastic()) - .block(); + return gcAlgorithm.gc(expectedBlobCount, associatedProbability, bucketName, context) + .subscribeOn(Schedulers.elastic()) + .block(); + } else { + return Result.COMPLETED; + } } @Override @@ -160,6 +258,6 @@ public class BlobGCTask implements Task { @Override public Optional<TaskExecutionDetails.AdditionalInformation> details() { - return Optional.of(AdditionalInformation.from(context)); + return Optional.of(AdditionalInformation.from(scope, context)); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
