chibenwa commented on a change in pull request #618:
URL: https://github.com/apache/james-project/pull/618#discussion_r694753471



##########
File path: 
server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCService.java
##########
@@ -0,0 +1,109 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.server.blob.deduplication;
+
+import static org.apache.james.task.Task.Result;
+
+import java.time.Clock;
+
+import javax.inject.Inject;
+
+import org.apache.james.blob.api.BlobReferenceSource;
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.mailbox.cassandra.mail.AttachmentBlobReferenceSource;
+import org.apache.james.mailbox.cassandra.mail.MessageBlobReferenceSource;
+import 
org.apache.james.mailrepository.cassandra.MailRepositoryBlobReferenceSource;
+import 
org.apache.james.queue.rabbitmq.view.cassandra.MailQueueViewBlobReferenceSource;
+import 
org.apache.james.server.blob.deduplication.BloomFilterGCAlgorithm.Context;
+import org.apache.james.task.Task;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class BlobGCService {

Review comment:
       Can I match the Task API ?

##########
File path: 
server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCService.java
##########
@@ -0,0 +1,109 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.server.blob.deduplication;
+
+import static org.apache.james.task.Task.Result;
+
+import java.time.Clock;
+
+import javax.inject.Inject;
+
+import org.apache.james.blob.api.BlobReferenceSource;
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.mailbox.cassandra.mail.AttachmentBlobReferenceSource;
+import org.apache.james.mailbox.cassandra.mail.MessageBlobReferenceSource;
+import 
org.apache.james.mailrepository.cassandra.MailRepositoryBlobReferenceSource;
+import 
org.apache.james.queue.rabbitmq.view.cassandra.MailQueueViewBlobReferenceSource;
+import 
org.apache.james.server.blob.deduplication.BloomFilterGCAlgorithm.Context;
+import org.apache.james.task.Task;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class BlobGCService {
+
+    public static final int EXPECTED_BLOB_COUNT = 1_000_000;
+    public static final double ASSOCIATED_PROBABILITY = 0.8;
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(BlobGCService.class);
+
+    private final BlobStoreDAO blobStoreDAO;
+    private final GenerationAwareBlobId.Factory generationAwareBlobIdFactory;
+    private final GenerationAwareBlobId.Configuration 
generationAwareBlobIdConfiguration;
+
+    private final MessageBlobReferenceSource messageBlobReferenceSource;
+    private final MailRepositoryBlobReferenceSource 
mailRepositoryBlobReferenceSource;
+    private final MailQueueViewBlobReferenceSource 
mailQueueViewBlobReferenceSource;
+    private final AttachmentBlobReferenceSource attachmentBlobReferenceSource;
+    private final Clock clock;
+
+    @Inject
+    public BlobGCService(BlobStoreDAO blobStoreDAO,
+                         GenerationAwareBlobId.Factory 
generationAwareBlobIdFactory,
+                         GenerationAwareBlobId.Configuration 
generationAwareBlobIdConfiguration,
+                         MessageBlobReferenceSource messageBlobReferenceSource,
+                         MailRepositoryBlobReferenceSource 
mailRepositoryBlobReferenceSource,
+                         MailQueueViewBlobReferenceSource 
mailQueueViewBlobReferenceSource,
+                         AttachmentBlobReferenceSource 
attachmentBlobReferenceSource,
+                         Clock clock) {
+
+        this.blobStoreDAO = blobStoreDAO;
+        this.generationAwareBlobIdFactory = generationAwareBlobIdFactory;
+        this.generationAwareBlobIdConfiguration = 
generationAwareBlobIdConfiguration;
+        this.messageBlobReferenceSource = messageBlobReferenceSource;
+        this.mailRepositoryBlobReferenceSource = 
mailRepositoryBlobReferenceSource;
+        this.mailQueueViewBlobReferenceSource = 
mailQueueViewBlobReferenceSource;
+        this.attachmentBlobReferenceSource = attachmentBlobReferenceSource;
+        this.clock = clock;
+    }
+
+    public Mono<Result> gc() {
+        BloomFilterGCAlgorithm gcAlgorithm = new BloomFilterGCAlgorithm(
+            getBlobReferenceSource(),
+            blobStoreDAO,
+            generationAwareBlobIdFactory,
+            generationAwareBlobIdConfiguration,
+            clock);
+
+        Context context = new Context(EXPECTED_BLOB_COUNT, 
ASSOCIATED_PROBABILITY);
+
+        return gcAlgorithm.populatedBloomFilter(EXPECTED_BLOB_COUNT, 
ASSOCIATED_PROBABILITY, context)
+            .flatMapMany(bloomFilter ->
+                Flux.from(blobStoreDAO.listBuckets())
+                    .flatMap(bucketName -> gcAlgorithm.gc(bloomFilter, 
bucketName, context)))
+            .reduce(Task::combine)
+            .onErrorResume(error -> {
+                LOGGER.error("Error when running the blob garbage collection 
", error);
+                return Mono.just(Result.PARTIAL);
+            });

Review comment:
       Move the context handling to the algorithm itself IMO.

##########
File path: 
server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BloomFilterGCAlgorithm.java
##########
@@ -0,0 +1,212 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.server.blob.deduplication;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobReferenceSource;
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.task.Task;
+import org.apache.james.task.Task.Result;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.hash.BloomFilter;
+import com.google.common.hash.Funnel;
+import com.google.common.hash.Funnels;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class BloomFilterGCAlgorithm {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(BloomFilterGCAlgorithm.class);
+    private static final Funnel<CharSequence> BLOOM_FILTER_FUNNEL = 
Funnels.stringFunnel(StandardCharsets.US_ASCII);
+
+    public static class Context {
+
+        public static class Snapshot {
+            private final long referenceSourceCount;
+            private final long blobCount;
+            private final long gcedBlobCount;
+            private final long errorCount;
+            private final long bloomFilterExpectedBlobCount;
+            private final double bloomFilterAssociatedProbability;
+
+            public Snapshot(long referenceSourceCount,
+                            long blobCount,
+                            long gcedBlobCount,
+                            long errorCount,
+                            long bloomFilterExpectedBlobCount,
+                            double bloomFilterAssociatedProbability) {
+                this.referenceSourceCount = referenceSourceCount;
+                this.blobCount = blobCount;
+                this.gcedBlobCount = gcedBlobCount;
+                this.errorCount = errorCount;
+                this.bloomFilterExpectedBlobCount = 
bloomFilterExpectedBlobCount;
+                this.bloomFilterAssociatedProbability = 
bloomFilterAssociatedProbability;
+            }
+
+            @Override
+            public final boolean equals(Object o) {
+                if (o instanceof Snapshot) {
+                    Snapshot that = (Snapshot) o;
+
+                    return Objects.equals(this.referenceSourceCount, 
that.referenceSourceCount)
+                        && Objects.equals(this.blobCount, that.blobCount)
+                        && Objects.equals(this.gcedBlobCount, 
that.gcedBlobCount)
+                        && Objects.equals(this.errorCount, that.errorCount)
+                        && Objects.equals(this.bloomFilterExpectedBlobCount, 
that.bloomFilterExpectedBlobCount)
+                        && 
Objects.equals(this.bloomFilterAssociatedProbability, 
that.bloomFilterAssociatedProbability);
+                }
+                return false;
+            }
+
+            @Override
+            public final int hashCode() {
+                return Objects.hash(referenceSourceCount, blobCount, 
gcedBlobCount, errorCount, bloomFilterExpectedBlobCount, 
bloomFilterAssociatedProbability);
+            }
+
+            @Override
+            public String toString() {
+                return MoreObjects.toStringHelper(this)
+                    .add("referenceSourceCount", referenceSourceCount)
+                    .add("blobCount", blobCount)
+                    .add("gcedBlobCount", gcedBlobCount)
+                    .add("errorCount", errorCount)
+                    .add("bloomFilterExpectedBlobCount", 
bloomFilterExpectedBlobCount)
+                    .add("bloomFilterAssociatedProbability", 
bloomFilterAssociatedProbability)
+                    .toString();
+            }
+        }
+
+        private final AtomicLong referenceSourceCount;
+        private final AtomicLong blobCount;
+        private final AtomicLong gcedBlobCount;
+        private final AtomicLong errorCount;
+        private final Long bloomFilterExpectedBlobCount;
+        private final Double bloomFilterAssociatedProbability;
+
+        public Context(long bloomFilterExpectedBlobCount, double 
bloomFilterAssociatedProbability) {
+            this.referenceSourceCount = new AtomicLong();
+            this.blobCount = new AtomicLong();
+            this.gcedBlobCount = new AtomicLong();
+            this.errorCount = new AtomicLong();
+            this.bloomFilterExpectedBlobCount = bloomFilterExpectedBlobCount;
+            this.bloomFilterAssociatedProbability = 
bloomFilterAssociatedProbability;
+        }
+
+        public void incrementBlobCount() {
+            blobCount.incrementAndGet();
+        }
+
+        public void incrementReferenceSourceCount() {
+            referenceSourceCount.incrementAndGet();
+        }
+
+        public void incrementGCedBlobCount() {
+            gcedBlobCount.incrementAndGet();
+        }
+
+        public void incrementErrorCount() {
+            errorCount.incrementAndGet();
+        }
+
+        Snapshot snapshot() {
+            return new Snapshot(
+                referenceSourceCount.get(),
+                blobCount.get(),
+                gcedBlobCount.get(),
+                errorCount.get(),
+                bloomFilterExpectedBlobCount,
+                bloomFilterAssociatedProbability);
+        }
+    }
+
+    private final BlobReferenceSource referenceSource;
+    private final BlobStoreDAO blobStoreDAO;
+    private final GenerationAwareBlobId.Factory generationAwareBlobIdFactory;
+    private final GenerationAwareBlobId.Configuration 
generationAwareBlobIdConfiguration;
+    private final Instant now;
+
+    // Avoids two subsequent run to have the same false positives.
+    private final String salt;
+
+    public BloomFilterGCAlgorithm(BlobReferenceSource referenceSource,
+                                  BlobStoreDAO blobStoreDAO,
+                                  GenerationAwareBlobId.Factory 
generationAwareBlobIdFactory,
+                                  GenerationAwareBlobId.Configuration 
generationAwareBlobIdConfiguration,
+                                  Clock clock) {
+        this.referenceSource = referenceSource;
+        this.blobStoreDAO = blobStoreDAO;
+        this.generationAwareBlobIdFactory = generationAwareBlobIdFactory;
+        this.generationAwareBlobIdConfiguration = 
generationAwareBlobIdConfiguration;
+        this.salt = UUID.randomUUID().toString();
+        this.now = clock.instant();
+    }
+
+    public Mono<Result> gc(int expectedBlobCount, double 
associatedProbability, BucketName bucketName, Context context) {
+        return populatedBloomFilter(expectedBlobCount, associatedProbability, 
context)
+            .flatMap(bloomFilter -> gc(bloomFilter, bucketName, context));
+    }
+
+    public Mono<Result> gc(BloomFilter<CharSequence> bloomFilter, BucketName 
bucketName, Context context) {

Review comment:
       private

##########
File path: 
server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BloomFilterGCAlgorithm.java
##########
@@ -0,0 +1,212 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.server.blob.deduplication;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobReferenceSource;
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.task.Task;
+import org.apache.james.task.Task.Result;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.hash.BloomFilter;
+import com.google.common.hash.Funnel;
+import com.google.common.hash.Funnels;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class BloomFilterGCAlgorithm {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(BloomFilterGCAlgorithm.class);
+    private static final Funnel<CharSequence> BLOOM_FILTER_FUNNEL = 
Funnels.stringFunnel(StandardCharsets.US_ASCII);
+
+    public static class Context {
+
+        public static class Snapshot {
+            private final long referenceSourceCount;
+            private final long blobCount;
+            private final long gcedBlobCount;
+            private final long errorCount;
+            private final long bloomFilterExpectedBlobCount;
+            private final double bloomFilterAssociatedProbability;
+
+            public Snapshot(long referenceSourceCount,
+                            long blobCount,
+                            long gcedBlobCount,
+                            long errorCount,
+                            long bloomFilterExpectedBlobCount,
+                            double bloomFilterAssociatedProbability) {
+                this.referenceSourceCount = referenceSourceCount;
+                this.blobCount = blobCount;
+                this.gcedBlobCount = gcedBlobCount;
+                this.errorCount = errorCount;
+                this.bloomFilterExpectedBlobCount = 
bloomFilterExpectedBlobCount;
+                this.bloomFilterAssociatedProbability = 
bloomFilterAssociatedProbability;
+            }
+
+            @Override
+            public final boolean equals(Object o) {
+                if (o instanceof Snapshot) {
+                    Snapshot that = (Snapshot) o;
+
+                    return Objects.equals(this.referenceSourceCount, 
that.referenceSourceCount)
+                        && Objects.equals(this.blobCount, that.blobCount)
+                        && Objects.equals(this.gcedBlobCount, 
that.gcedBlobCount)
+                        && Objects.equals(this.errorCount, that.errorCount)
+                        && Objects.equals(this.bloomFilterExpectedBlobCount, 
that.bloomFilterExpectedBlobCount)
+                        && 
Objects.equals(this.bloomFilterAssociatedProbability, 
that.bloomFilterAssociatedProbability);
+                }
+                return false;
+            }
+
+            @Override
+            public final int hashCode() {
+                return Objects.hash(referenceSourceCount, blobCount, 
gcedBlobCount, errorCount, bloomFilterExpectedBlobCount, 
bloomFilterAssociatedProbability);
+            }
+
+            @Override
+            public String toString() {
+                return MoreObjects.toStringHelper(this)
+                    .add("referenceSourceCount", referenceSourceCount)
+                    .add("blobCount", blobCount)
+                    .add("gcedBlobCount", gcedBlobCount)
+                    .add("errorCount", errorCount)
+                    .add("bloomFilterExpectedBlobCount", 
bloomFilterExpectedBlobCount)
+                    .add("bloomFilterAssociatedProbability", 
bloomFilterAssociatedProbability)
+                    .toString();
+            }
+        }
+
+        private final AtomicLong referenceSourceCount;
+        private final AtomicLong blobCount;
+        private final AtomicLong gcedBlobCount;
+        private final AtomicLong errorCount;
+        private final Long bloomFilterExpectedBlobCount;
+        private final Double bloomFilterAssociatedProbability;
+
+        public Context(long bloomFilterExpectedBlobCount, double 
bloomFilterAssociatedProbability) {
+            this.referenceSourceCount = new AtomicLong();
+            this.blobCount = new AtomicLong();
+            this.gcedBlobCount = new AtomicLong();
+            this.errorCount = new AtomicLong();
+            this.bloomFilterExpectedBlobCount = bloomFilterExpectedBlobCount;
+            this.bloomFilterAssociatedProbability = 
bloomFilterAssociatedProbability;
+        }
+
+        public void incrementBlobCount() {
+            blobCount.incrementAndGet();
+        }
+
+        public void incrementReferenceSourceCount() {
+            referenceSourceCount.incrementAndGet();
+        }
+
+        public void incrementGCedBlobCount() {
+            gcedBlobCount.incrementAndGet();
+        }
+
+        public void incrementErrorCount() {
+            errorCount.incrementAndGet();
+        }
+
+        Snapshot snapshot() {
+            return new Snapshot(
+                referenceSourceCount.get(),
+                blobCount.get(),
+                gcedBlobCount.get(),
+                errorCount.get(),
+                bloomFilterExpectedBlobCount,
+                bloomFilterAssociatedProbability);
+        }
+    }
+
+    private final BlobReferenceSource referenceSource;
+    private final BlobStoreDAO blobStoreDAO;
+    private final GenerationAwareBlobId.Factory generationAwareBlobIdFactory;
+    private final GenerationAwareBlobId.Configuration 
generationAwareBlobIdConfiguration;
+    private final Instant now;
+
+    // Avoids two subsequent run to have the same false positives.
+    private final String salt;
+
+    public BloomFilterGCAlgorithm(BlobReferenceSource referenceSource,
+                                  BlobStoreDAO blobStoreDAO,
+                                  GenerationAwareBlobId.Factory 
generationAwareBlobIdFactory,
+                                  GenerationAwareBlobId.Configuration 
generationAwareBlobIdConfiguration,
+                                  Clock clock) {
+        this.referenceSource = referenceSource;
+        this.blobStoreDAO = blobStoreDAO;
+        this.generationAwareBlobIdFactory = generationAwareBlobIdFactory;
+        this.generationAwareBlobIdConfiguration = 
generationAwareBlobIdConfiguration;
+        this.salt = UUID.randomUUID().toString();
+        this.now = clock.instant();
+    }
+
+    public Mono<Result> gc(int expectedBlobCount, double 
associatedProbability, BucketName bucketName, Context context) {
+        return populatedBloomFilter(expectedBlobCount, associatedProbability, 
context)
+            .flatMap(bloomFilter -> gc(bloomFilter, bucketName, context));
+    }
+
+    public Mono<Result> gc(BloomFilter<CharSequence> bloomFilter, BucketName 
bucketName, Context context) {
+        return Flux.from(blobStoreDAO.listBlobs(bucketName))
+            .doOnNext(blobId -> context.incrementBlobCount())
+            .flatMap(blobId -> gcBlob(bloomFilter, blobId, bucketName, 
context))
+            .reduce(Task::combine);
+    }
+
+    public Mono<BloomFilter<CharSequence>> populatedBloomFilter(int 
expectedBlobCount, double associatedProbability, Context context) {

Review comment:
       private

##########
File path: 
server/blob/blob-storage-strategy/src/test/java/org/apache/james/server/blob/deduplication/BloomFilterGCAlgorithmTest.java
##########
@@ -0,0 +1,109 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.server.blob.deduplication;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+import static org.awaitility.Durations.ONE_HUNDRED_MILLISECONDS;
+import static org.awaitility.Durations.TEN_SECONDS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.time.Clock;
+import java.time.ZonedDateTime;
+
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobReferenceSource;
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.blob.api.ObjectNotFoundException;
+import org.apache.james.blob.memory.MemoryBlobStoreDAO;
+import 
org.apache.james.server.blob.deduplication.BloomFilterGCAlgorithm.Context;
+import 
org.apache.james.server.blob.deduplication.BloomFilterGCAlgorithm.Context.Snapshot;
+import org.apache.james.task.Task;
+import org.apache.james.utils.UpdatableTickingClock;
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionFactory;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class BloomFilterGCAlgorithmTest {
+    private static final HashBlobId.Factory BLOB_ID_FACTORY = new 
HashBlobId.Factory();
+    private static final ZonedDateTime NOW = 
ZonedDateTime.parse("2015-10-30T16:12:00Z");
+    private static final int EXPECTED_BLOB_COUNT = 100;
+    private static final double ASSOCIATED_PROBABILITY = 0.8;
+    private static final BucketName DEFAULT_BUCKET = BucketName.of("default");
+    private static final GenerationAwareBlobId.Configuration 
GENERATION_AWARE_BLOB_ID_CONFIGURATION = 
GenerationAwareBlobId.Configuration.DEFAULT;
+
+    private final ConditionFactory CALMLY_AWAIT = Awaitility
+        .with().pollInterval(ONE_HUNDRED_MILLISECONDS)
+        .and().pollDelay(ONE_HUNDRED_MILLISECONDS)
+        .await()
+        .atMost(TEN_SECONDS);
+
+    private BlobReferenceSource blobReferenceSource;
+    private Clock clock;
+    private BlobStore blobStore;
+    private BlobStoreDAO blobStoreDAO;
+    private GenerationAwareBlobId.Factory generationAwareBlobIdFactory;
+
+    @BeforeEach
+    void setUp() {
+        clock = new UpdatableTickingClock(NOW.toInstant());
+        generationAwareBlobIdFactory = new 
GenerationAwareBlobId.Factory(clock, BLOB_ID_FACTORY, 
GENERATION_AWARE_BLOB_ID_CONFIGURATION);
+
+        blobReferenceSource = mock(BlobReferenceSource.class);
+
+        blobStoreDAO = new MemoryBlobStoreDAO();
+        blobStore = new DeDuplicationBlobStore(blobStoreDAO, DEFAULT_BUCKET, 
generationAwareBlobIdFactory);
+    }
+
+    private BloomFilterGCAlgorithm bloomFilterGCAlgorithm() {
+        return new BloomFilterGCAlgorithm(
+            blobReferenceSource,
+            blobStoreDAO,
+            generationAwareBlobIdFactory,
+            GENERATION_AWARE_BLOB_ID_CONFIGURATION,
+            clock);
+    }
+
+    @Test

Review comment:
       `@RepeatedTest(100)` IMO

##########
File path: 
server/blob/blob-storage-strategy/src/test/java/org/apache/james/server/blob/deduplication/BloomFilterGCAlgorithmTest.java
##########
@@ -0,0 +1,109 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.server.blob.deduplication;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+import static org.awaitility.Durations.ONE_HUNDRED_MILLISECONDS;
+import static org.awaitility.Durations.TEN_SECONDS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.time.Clock;
+import java.time.ZonedDateTime;
+
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobReferenceSource;
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.blob.api.ObjectNotFoundException;
+import org.apache.james.blob.memory.MemoryBlobStoreDAO;
+import 
org.apache.james.server.blob.deduplication.BloomFilterGCAlgorithm.Context;
+import 
org.apache.james.server.blob.deduplication.BloomFilterGCAlgorithm.Context.Snapshot;
+import org.apache.james.task.Task;
+import org.apache.james.utils.UpdatableTickingClock;
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionFactory;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class BloomFilterGCAlgorithmTest {
+    private static final HashBlobId.Factory BLOB_ID_FACTORY = new 
HashBlobId.Factory();
+    private static final ZonedDateTime NOW = 
ZonedDateTime.parse("2015-10-30T16:12:00Z");
+    private static final int EXPECTED_BLOB_COUNT = 100;
+    private static final double ASSOCIATED_PROBABILITY = 0.8;
+    private static final BucketName DEFAULT_BUCKET = BucketName.of("default");
+    private static final GenerationAwareBlobId.Configuration 
GENERATION_AWARE_BLOB_ID_CONFIGURATION = 
GenerationAwareBlobId.Configuration.DEFAULT;
+
+    private final ConditionFactory CALMLY_AWAIT = Awaitility
+        .with().pollInterval(ONE_HUNDRED_MILLISECONDS)
+        .and().pollDelay(ONE_HUNDRED_MILLISECONDS)
+        .await()
+        .atMost(TEN_SECONDS);
+
+    private BlobReferenceSource blobReferenceSource;
+    private Clock clock;
+    private BlobStore blobStore;
+    private BlobStoreDAO blobStoreDAO;
+    private GenerationAwareBlobId.Factory generationAwareBlobIdFactory;
+
+    @BeforeEach
+    void setUp() {
+        clock = new UpdatableTickingClock(NOW.toInstant());
+        generationAwareBlobIdFactory = new 
GenerationAwareBlobId.Factory(clock, BLOB_ID_FACTORY, 
GENERATION_AWARE_BLOB_ID_CONFIGURATION);
+
+        blobReferenceSource = mock(BlobReferenceSource.class);
+
+        blobStoreDAO = new MemoryBlobStoreDAO();
+        blobStore = new DeDuplicationBlobStore(blobStoreDAO, DEFAULT_BUCKET, 
generationAwareBlobIdFactory);
+    }
+
+    private BloomFilterGCAlgorithm bloomFilterGCAlgorithm() {
+        return new BloomFilterGCAlgorithm(
+            blobReferenceSource,
+            blobStoreDAO,
+            generationAwareBlobIdFactory,
+            GENERATION_AWARE_BLOB_ID_CONFIGURATION,
+            clock);
+    }
+
+    @Test
+    void gcShouldRemoveOrphanBlobId() {

Review comment:
       We miss the test `gcShouldKeepReferencedBlobId`

##########
File path: 
server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCService.java
##########
@@ -0,0 +1,109 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.server.blob.deduplication;
+
+import static org.apache.james.task.Task.Result;
+
+import java.time.Clock;
+
+import javax.inject.Inject;
+
+import org.apache.james.blob.api.BlobReferenceSource;
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.mailbox.cassandra.mail.AttachmentBlobReferenceSource;
+import org.apache.james.mailbox.cassandra.mail.MessageBlobReferenceSource;
+import 
org.apache.james.mailrepository.cassandra.MailRepositoryBlobReferenceSource;
+import 
org.apache.james.queue.rabbitmq.view.cassandra.MailQueueViewBlobReferenceSource;
+import 
org.apache.james.server.blob.deduplication.BloomFilterGCAlgorithm.Context;
+import org.apache.james.task.Task;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class BlobGCService {
+
+    public static final int EXPECTED_BLOB_COUNT = 1_000_000;
+    public static final double ASSOCIATED_PROBABILITY = 0.8;
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(BlobGCService.class);
+
+    private final BlobStoreDAO blobStoreDAO;
+    private final GenerationAwareBlobId.Factory generationAwareBlobIdFactory;
+    private final GenerationAwareBlobId.Configuration 
generationAwareBlobIdConfiguration;
+
+    private final MessageBlobReferenceSource messageBlobReferenceSource;
+    private final MailRepositoryBlobReferenceSource 
mailRepositoryBlobReferenceSource;
+    private final MailQueueViewBlobReferenceSource 
mailQueueViewBlobReferenceSource;
+    private final AttachmentBlobReferenceSource attachmentBlobReferenceSource;
+    private final Clock clock;
+
+    @Inject
+    public BlobGCService(BlobStoreDAO blobStoreDAO,
+                         GenerationAwareBlobId.Factory 
generationAwareBlobIdFactory,
+                         GenerationAwareBlobId.Configuration 
generationAwareBlobIdConfiguration,
+                         MessageBlobReferenceSource messageBlobReferenceSource,
+                         MailRepositoryBlobReferenceSource 
mailRepositoryBlobReferenceSource,
+                         MailQueueViewBlobReferenceSource 
mailQueueViewBlobReferenceSource,
+                         AttachmentBlobReferenceSource 
attachmentBlobReferenceSource,
+                         Clock clock) {
+
+        this.blobStoreDAO = blobStoreDAO;
+        this.generationAwareBlobIdFactory = generationAwareBlobIdFactory;
+        this.generationAwareBlobIdConfiguration = 
generationAwareBlobIdConfiguration;
+        this.messageBlobReferenceSource = messageBlobReferenceSource;
+        this.mailRepositoryBlobReferenceSource = 
mailRepositoryBlobReferenceSource;
+        this.mailQueueViewBlobReferenceSource = 
mailQueueViewBlobReferenceSource;
+        this.attachmentBlobReferenceSource = attachmentBlobReferenceSource;
+        this.clock = clock;
+    }
+
+    public Mono<Result> gc() {
+        BloomFilterGCAlgorithm gcAlgorithm = new BloomFilterGCAlgorithm(
+            getBlobReferenceSource(),
+            blobStoreDAO,
+            generationAwareBlobIdFactory,
+            generationAwareBlobIdConfiguration,
+            clock);
+
+        Context context = new Context(EXPECTED_BLOB_COUNT, 
ASSOCIATED_PROBABILITY);

Review comment:
       TODO EXPECTED_BLOB_COUNT and ASSOCIATED_PROBABILITY should not be 
hardcoded...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to