This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 59b035febe08d990e4120bb16369a2be41719bf9 Author: Tung Van TRAN <[email protected]> AuthorDate: Tue Aug 24 18:04:28 2021 +0700 JAMES-3150 BlobGCService & BloomFilterGCAlgorithm --- pom.xml | 6 + server/blob/blob-memory/pom.xml | 11 + .../memory/MemoryBlobStoreGCAlgorithmTest.java | 39 +++ server/blob/blob-storage-strategy/pom.xml | 17 ++ .../server/blob/deduplication/BlobGCTask.java | 165 +++++++++++ .../blob/deduplication/BloomFilterGCAlgorithm.java | 304 +++++++++++++++++++++ .../BloomFilterGCAlgorithmContract.java | 258 +++++++++++++++++ 7 files changed, 800 insertions(+) diff --git a/pom.xml b/pom.xml index cce10fe..aab9136 100644 --- a/pom.xml +++ b/pom.xml @@ -1143,6 +1143,12 @@ </dependency> <dependency> <groupId>${james.groupId}</groupId> + <artifactId>blob-storage-strategy</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> <artifactId>dead-letter-cassandra</artifactId> <version>${project.version}</version> </dependency> diff --git a/server/blob/blob-memory/pom.xml b/server/blob/blob-memory/pom.xml index 1938dae..a11713c 100644 --- a/server/blob/blob-memory/pom.xml +++ b/server/blob/blob-memory/pom.xml @@ -48,6 +48,17 @@ </dependency> <dependency> <groupId>${james.groupId}</groupId> + <artifactId>blob-storage-strategy</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>james-server-testing</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> <artifactId>james-server-util</artifactId> <scope>test</scope> </dependency> diff --git a/server/blob/blob-memory/src/test/java/org/apache/james/blob/memory/MemoryBlobStoreGCAlgorithmTest.java b/server/blob/blob-memory/src/test/java/org/apache/james/blob/memory/MemoryBlobStoreGCAlgorithmTest.java new file mode 100644 index 0000000..ebe1928 --- /dev/null +++ b/server/blob/blob-memory/src/test/java/org/apache/james/blob/memory/MemoryBlobStoreGCAlgorithmTest.java @@ -0,0 +1,39 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.blob.memory; + +import org.apache.james.blob.api.BlobStoreDAO; +import org.apache.james.server.blob.deduplication.BloomFilterGCAlgorithmContract; +import org.junit.jupiter.api.BeforeEach; + +public class MemoryBlobStoreGCAlgorithmTest implements BloomFilterGCAlgorithmContract { + + private BlobStoreDAO blobStoreDAO; + + @BeforeEach + public void beforeEach() { + blobStoreDAO = new MemoryBlobStoreDAO(); + } + + @Override + public BlobStoreDAO blobStoreDAO() { + return blobStoreDAO; + } +} diff --git a/server/blob/blob-storage-strategy/pom.xml b/server/blob/blob-storage-strategy/pom.xml index ad1bb7a..af1daa1 100644 --- a/server/blob/blob-storage-strategy/pom.xml +++ b/server/blob/blob-storage-strategy/pom.xml @@ -45,6 +45,10 @@ <scope>test</scope> </dependency> <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>james-server-task-api</artifactId> + </dependency> + <dependency> <!-- Added because of https://issues.apache.org/jira/browse/SUREFIRE-1266 --> <groupId>${james.groupId}</groupId> <artifactId>james-server-testing</artifactId> @@ -63,6 +67,11 @@ <artifactId>commons-configuration2</artifactId> </dependency> <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> </dependency> @@ -78,6 +87,14 @@ <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <reuseForks>true</reuseForks> + <forkCount>1C</forkCount> + </configuration> + </plugin> </plugins> </build> diff --git a/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCTask.java b/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCTask.java new file mode 100644 index 0000000..660422b --- /dev/null +++ b/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCTask.java @@ -0,0 +1,165 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.server.blob.deduplication; + +import java.time.Clock; +import java.time.Instant; +import java.util.Optional; +import java.util.Set; + +import org.apache.james.blob.api.BlobReferenceSource; +import org.apache.james.blob.api.BlobStoreDAO; +import org.apache.james.blob.api.BucketName; +import org.apache.james.server.blob.deduplication.BloomFilterGCAlgorithm.Context; +import org.apache.james.task.Task; +import org.apache.james.task.TaskExecutionDetails; +import org.apache.james.task.TaskType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import reactor.core.scheduler.Schedulers; + +public class BlobGCTask implements Task { + private static final Logger LOGGER = LoggerFactory.getLogger(BlobGCTask.class); + public static final TaskType TASK_TYPE = TaskType.of("BlobGCTask"); + + public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation { + + private static AdditionalInformation from(Context context) { + Context.Snapshot snapshot = context.snapshot(); + return new AdditionalInformation( + snapshot.getReferenceSourceCount(), + snapshot.getBlobCount(), + snapshot.getGcedBlobCount(), + snapshot.getErrorCount(), + snapshot.getBloomFilterExpectedBlobCount(), + snapshot.getBloomFilterAssociatedProbability()); + } + + private final Instant timestamp; + private final long referenceSourceCount; + private final long blobCount; + private final long gcedBlobCount; + private final long errorCount; + private final long bloomFilterExpectedBlobCount; + private final double bloomFilterAssociatedProbability; + + AdditionalInformation(long referenceSourceCount, + long blobCount, + long gcedBlobCount, + long errorCount, + long bloomFilterExpectedBlobCount, + double bloomFilterAssociatedProbability) { + this.referenceSourceCount = referenceSourceCount; + this.blobCount = blobCount; + this.gcedBlobCount = gcedBlobCount; + this.errorCount = errorCount; + this.bloomFilterExpectedBlobCount = bloomFilterExpectedBlobCount; + this.bloomFilterAssociatedProbability = bloomFilterAssociatedProbability; + this.timestamp = Clock.systemUTC().instant(); + } + + @Override + public Instant timestamp() { + return timestamp; + } + + public Instant getTimestamp() { + return timestamp; + } + + public long getReferenceSourceCount() { + return referenceSourceCount; + } + + public long getBlobCount() { + return blobCount; + } + + public long getGcedBlobCount() { + return gcedBlobCount; + } + + public long getErrorCount() { + return errorCount; + } + + public long getBloomFilterExpectedBlobCount() { + return bloomFilterExpectedBlobCount; + } + + public double getBloomFilterAssociatedProbability() { + return bloomFilterAssociatedProbability; + } + } + + private final BlobStoreDAO blobStoreDAO; + private final GenerationAwareBlobId.Factory generationAwareBlobIdFactory; + private final GenerationAwareBlobId.Configuration generationAwareBlobIdConfiguration; + private final Set<BlobReferenceSource> blobReferenceSources; + private final Clock clock; + private final BucketName bucketName; + private final int expectedBlobCount; + private final double associatedProbability; + private final Context context; + + public BlobGCTask(BlobStoreDAO blobStoreDAO, + GenerationAwareBlobId.Factory generationAwareBlobIdFactory, + GenerationAwareBlobId.Configuration generationAwareBlobIdConfiguration, + Set<BlobReferenceSource> blobReferenceSources, + BucketName bucketName, + Clock clock, + int expectedBlobCount, + double associatedProbability) { + this.blobStoreDAO = blobStoreDAO; + this.generationAwareBlobIdFactory = generationAwareBlobIdFactory; + this.generationAwareBlobIdConfiguration = generationAwareBlobIdConfiguration; + this.blobReferenceSources = blobReferenceSources; + this.clock = clock; + this.bucketName = bucketName; + this.expectedBlobCount = expectedBlobCount; + this.associatedProbability = associatedProbability; + this.context = new Context(expectedBlobCount, associatedProbability); + } + + @Override + public Result run() throws InterruptedException { + BloomFilterGCAlgorithm gcAlgorithm = new BloomFilterGCAlgorithm( + BlobReferenceAggregate.aggregate(blobReferenceSources), + blobStoreDAO, + generationAwareBlobIdFactory, + generationAwareBlobIdConfiguration, + clock); + + return gcAlgorithm.gc(expectedBlobCount, associatedProbability, bucketName, context) + .subscribeOn(Schedulers.elastic()) + .block(); + } + + @Override + public TaskType type() { + return TASK_TYPE; + } + + @Override + public Optional<TaskExecutionDetails.AdditionalInformation> details() { + return Optional.of(AdditionalInformation.from(context)); + } +} diff --git a/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BloomFilterGCAlgorithm.java b/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BloomFilterGCAlgorithm.java new file mode 100644 index 0000000..d9fc0aa --- /dev/null +++ b/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BloomFilterGCAlgorithm.java @@ -0,0 +1,304 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.server.blob.deduplication; + +import java.nio.charset.StandardCharsets; +import java.time.Clock; +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.BlobReferenceSource; +import org.apache.james.blob.api.BlobStoreDAO; +import org.apache.james.blob.api.BucketName; +import org.apache.james.task.Task; +import org.apache.james.task.Task.Result; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.MoreObjects; +import com.google.common.hash.BloomFilter; +import com.google.common.hash.Funnel; +import com.google.common.hash.Funnels; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class BloomFilterGCAlgorithm { + + private static final Logger LOGGER = LoggerFactory.getLogger(BloomFilterGCAlgorithm.class); + private static final Funnel<CharSequence> BLOOM_FILTER_FUNNEL = Funnels.stringFunnel(StandardCharsets.US_ASCII); + + public static class Context { + + public static class Snapshot { + + public static Builder builder() { + return new Builder(); + } + + static class Builder { + private Optional<Long> referenceSourceCount; + private Optional<Long> blobCount; + private Optional<Long> gcedBlobCount; + private Optional<Long> errorCount; + private Optional<Long> bloomFilterExpectedBlobCount; + private Optional<Double> bloomFilterAssociatedProbability; + + Builder() { + referenceSourceCount = Optional.empty(); + blobCount = Optional.empty(); + gcedBlobCount = Optional.empty(); + errorCount = Optional.empty(); + bloomFilterExpectedBlobCount = Optional.empty(); + bloomFilterAssociatedProbability = Optional.empty(); + } + + public Snapshot build() { + return new Snapshot( + referenceSourceCount.orElse(0L), + blobCount.orElse(0L), + gcedBlobCount.orElse(0L), + errorCount.orElse(0L), + bloomFilterExpectedBlobCount.orElse(0L), + bloomFilterAssociatedProbability.orElse(0.0)); + } + + public Builder referenceSourceCount(long referenceSourceCount) { + this.referenceSourceCount = Optional.of(referenceSourceCount); + return this; + } + + public Builder blobCount(long blobCount) { + this.blobCount = Optional.of(blobCount); + return this; + } + + public Builder gcedBlobCount(long gcedBlobCount) { + this.gcedBlobCount = Optional.of(gcedBlobCount); + return this; + } + + public Builder errorCount(long errorCount) { + this.errorCount = Optional.of(errorCount); + return this; + } + + public Builder bloomFilterExpectedBlobCount(long bloomFilterExpectedBlobCount) { + this.bloomFilterExpectedBlobCount = Optional.of(bloomFilterExpectedBlobCount); + return this; + } + + public Builder bloomFilterAssociatedProbability(double bloomFilterAssociatedProbability) { + this.bloomFilterAssociatedProbability = Optional.of(bloomFilterAssociatedProbability); + return this; + } + } + + private final long referenceSourceCount; + private final long blobCount; + private final long gcedBlobCount; + private final long errorCount; + private final long bloomFilterExpectedBlobCount; + private final double bloomFilterAssociatedProbability; + + Snapshot(long referenceSourceCount, + long blobCount, + long gcedBlobCount, + long errorCount, + long bloomFilterExpectedBlobCount, + double bloomFilterAssociatedProbability) { + this.referenceSourceCount = referenceSourceCount; + this.blobCount = blobCount; + this.gcedBlobCount = gcedBlobCount; + this.errorCount = errorCount; + this.bloomFilterExpectedBlobCount = bloomFilterExpectedBlobCount; + this.bloomFilterAssociatedProbability = bloomFilterAssociatedProbability; + } + + public long getReferenceSourceCount() { + return referenceSourceCount; + } + + public long getBlobCount() { + return blobCount; + } + + public long getGcedBlobCount() { + return gcedBlobCount; + } + + public long getErrorCount() { + return errorCount; + } + + public long getBloomFilterExpectedBlobCount() { + return bloomFilterExpectedBlobCount; + } + + public double getBloomFilterAssociatedProbability() { + return bloomFilterAssociatedProbability; + } + + @Override + public final boolean equals(Object o) { + if (o instanceof Snapshot) { + Snapshot that = (Snapshot) o; + + return Objects.equals(this.referenceSourceCount, that.referenceSourceCount) + && Objects.equals(this.blobCount, that.blobCount) + && Objects.equals(this.gcedBlobCount, that.gcedBlobCount) + && Objects.equals(this.errorCount, that.errorCount) + && Objects.equals(this.bloomFilterExpectedBlobCount, that.bloomFilterExpectedBlobCount) + && Objects.equals(this.bloomFilterAssociatedProbability, that.bloomFilterAssociatedProbability); + } + return false; + } + + @Override + public final int hashCode() { + return Objects.hash(referenceSourceCount, blobCount, gcedBlobCount, errorCount, bloomFilterExpectedBlobCount, bloomFilterAssociatedProbability); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("referenceSourceCount", referenceSourceCount) + .add("blobCount", blobCount) + .add("gcedBlobCount", gcedBlobCount) + .add("errorCount", errorCount) + .add("bloomFilterExpectedBlobCount", bloomFilterExpectedBlobCount) + .add("bloomFilterAssociatedProbability", bloomFilterAssociatedProbability) + .toString(); + } + } + + private final AtomicLong referenceSourceCount; + private final AtomicLong blobCount; + private final AtomicLong gcedBlobCount; + private final AtomicLong errorCount; + private final Long bloomFilterExpectedBlobCount; + private final Double bloomFilterAssociatedProbability; + + public Context(long bloomFilterExpectedBlobCount, double bloomFilterAssociatedProbability) { + this.referenceSourceCount = new AtomicLong(); + this.blobCount = new AtomicLong(); + this.gcedBlobCount = new AtomicLong(); + this.errorCount = new AtomicLong(); + this.bloomFilterExpectedBlobCount = bloomFilterExpectedBlobCount; + this.bloomFilterAssociatedProbability = bloomFilterAssociatedProbability; + } + + public void incrementBlobCount() { + blobCount.incrementAndGet(); + } + + public void incrementReferenceSourceCount() { + referenceSourceCount.incrementAndGet(); + } + + public void incrementGCedBlobCount() { + gcedBlobCount.incrementAndGet(); + } + + public void incrementErrorCount() { + errorCount.incrementAndGet(); + } + + public Snapshot snapshot() { + return Snapshot.builder() + .referenceSourceCount(referenceSourceCount.get()) + .blobCount(blobCount.get()) + .gcedBlobCount(gcedBlobCount.get()) + .errorCount(errorCount.get()) + .bloomFilterExpectedBlobCount(bloomFilterExpectedBlobCount) + .bloomFilterAssociatedProbability(bloomFilterAssociatedProbability) + .build(); + } + } + + private final BlobReferenceSource referenceSource; + private final BlobStoreDAO blobStoreDAO; + private final GenerationAwareBlobId.Factory generationAwareBlobIdFactory; + private final GenerationAwareBlobId.Configuration generationAwareBlobIdConfiguration; + private final Instant now; + + // Avoids two subsequent run to have the same false positives. + private final String salt; + + public BloomFilterGCAlgorithm(BlobReferenceSource referenceSource, + BlobStoreDAO blobStoreDAO, + GenerationAwareBlobId.Factory generationAwareBlobIdFactory, + GenerationAwareBlobId.Configuration generationAwareBlobIdConfiguration, + Clock clock) { + this.referenceSource = referenceSource; + this.blobStoreDAO = blobStoreDAO; + this.generationAwareBlobIdFactory = generationAwareBlobIdFactory; + this.generationAwareBlobIdConfiguration = generationAwareBlobIdConfiguration; + this.salt = UUID.randomUUID().toString(); + this.now = clock.instant(); + } + + public Mono<Result> gc(int expectedBlobCount, double associatedProbability, BucketName bucketName, Context context) { + return populatedBloomFilter(expectedBlobCount, associatedProbability, context) + .flatMap(bloomFilter -> gc(bloomFilter, bucketName, context)); + } + + private Mono<Result> gc(BloomFilter<CharSequence> bloomFilter, BucketName bucketName, Context context) { + return Flux.from(blobStoreDAO.listBlobs(bucketName)) + .doOnNext(blobId -> context.incrementBlobCount()) + .flatMap(blobId -> gcBlob(bloomFilter, blobId, bucketName, context)) + .reduce(Task::combine); + } + + private Mono<BloomFilter<CharSequence>> populatedBloomFilter(int expectedBlobCount, double associatedProbability, Context context) { + return Mono.fromCallable(() -> BloomFilter.create( + BLOOM_FILTER_FUNNEL, + expectedBlobCount, + associatedProbability)) + .flatMap(bloomFilter -> + Flux.from(referenceSource.listReferencedBlobs()) + .doOnNext(ref -> context.incrementReferenceSourceCount()) + .map(ref -> bloomFilter.put(salt + ref.asString())) + .then() + .thenReturn(bloomFilter)); + } + + private Mono<Result> gcBlob(BloomFilter<CharSequence> bloomFilter, BlobId blobId, BucketName bucketName, Context context) { + return Mono.fromCallable(() -> generationAwareBlobIdFactory.from(blobId.asString())) + .filter(awareBlobId -> !awareBlobId.inActiveGeneration(generationAwareBlobIdConfiguration, now)) + .filter(expiredAwareBlobId -> !bloomFilter.mightContain(salt + blobId.asString())) + .flatMap(orphanBlobId -> + Mono.from(blobStoreDAO.delete(bucketName, orphanBlobId)) + .then(Mono.fromCallable(() -> { + context.incrementGCedBlobCount(); + return Result.COMPLETED; + }))) + .onErrorResume(error -> { + LOGGER.error("Error when gc orphan blob ", error); + context.incrementErrorCount(); + return Mono.just(Result.PARTIAL); + }) + .switchIfEmpty(Mono.just(Result.COMPLETED)); + } +} diff --git a/server/blob/blob-storage-strategy/src/test/java/org/apache/james/server/blob/deduplication/BloomFilterGCAlgorithmContract.java b/server/blob/blob-storage-strategy/src/test/java/org/apache/james/server/blob/deduplication/BloomFilterGCAlgorithmContract.java new file mode 100644 index 0000000..d08ae74 --- /dev/null +++ b/server/blob/blob-storage-strategy/src/test/java/org/apache/james/server/blob/deduplication/BloomFilterGCAlgorithmContract.java @@ -0,0 +1,258 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.server.blob.deduplication; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; +import static org.awaitility.Durations.ONE_HUNDRED_MILLISECONDS; +import static org.awaitility.Durations.TEN_SECONDS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.time.ZonedDateTime; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.james.server.blob.deduplication.BloomFilterGCAlgorithm.Context; +import org.apache.james.server.blob.deduplication.BloomFilterGCAlgorithm.Context.Snapshot; +import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.BlobReferenceSource; +import org.apache.james.blob.api.BlobStore; +import org.apache.james.blob.api.BlobStoreDAO; +import org.apache.james.blob.api.BucketName; +import org.apache.james.blob.api.HashBlobId; +import org.apache.james.blob.api.ObjectNotFoundException; +import org.apache.james.task.Task; +import org.apache.james.utils.UpdatableTickingClock; +import org.awaitility.Awaitility; +import org.awaitility.core.ConditionFactory; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.Test; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public interface BloomFilterGCAlgorithmContract { + + HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory(); + ZonedDateTime NOW = ZonedDateTime.parse("2015-10-30T16:12:00Z"); + BucketName DEFAULT_BUCKET = BucketName.of("default"); + GenerationAwareBlobId.Configuration GENERATION_AWARE_BLOB_ID_CONFIGURATION = GenerationAwareBlobId.Configuration.DEFAULT; + int EXPECTED_BLOB_COUNT = 100; + double ASSOCIATED_PROBABILITY = 0.8; + + ConditionFactory CALMLY_AWAIT = Awaitility + .with().pollInterval(ONE_HUNDRED_MILLISECONDS) + .and().pollDelay(ONE_HUNDRED_MILLISECONDS) + .await() + .atMost(TEN_SECONDS); + + BlobReferenceSource BLOB_REFERENCE_SOURCE = mock(BlobReferenceSource.class); + UpdatableTickingClock CLOCK = new UpdatableTickingClock(NOW.toInstant()); + GenerationAwareBlobId.Factory GENERATION_AWARE_BLOB_ID_FACTORY = new GenerationAwareBlobId.Factory(CLOCK, BLOB_ID_FACTORY, GENERATION_AWARE_BLOB_ID_CONFIGURATION); + + BlobStoreDAO blobStoreDAO(); + + @BeforeEach + default void setUp() { + CLOCK.setInstant(NOW.toInstant()); + } + + default BlobStore blobStore() { + return new DeDuplicationBlobStore(blobStoreDAO(), DEFAULT_BUCKET, GENERATION_AWARE_BLOB_ID_FACTORY); + } + + default BloomFilterGCAlgorithm bloomFilterGCAlgorithm() { + return new BloomFilterGCAlgorithm(BLOB_REFERENCE_SOURCE, + blobStoreDAO(), + GENERATION_AWARE_BLOB_ID_FACTORY, + GENERATION_AWARE_BLOB_ID_CONFIGURATION, + CLOCK); + } + + @RepeatedTest(10) + default void gcShouldRemoveOrphanBlob() { + BlobStore blobStore = blobStore(); + BlobId blobId = Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block(); + when(BLOB_REFERENCE_SOURCE.listReferencedBlobs()).thenReturn(Flux.empty()); + CLOCK.setInstant(NOW.plusMonths(2).toInstant()); + + Context context = new Context(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY); + BloomFilterGCAlgorithm bloomFilterGCAlgorithm = bloomFilterGCAlgorithm(); + Task.Result result = Mono.from(bloomFilterGCAlgorithm.gc(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY, DEFAULT_BUCKET, context)).block(); + + assertThat(result).isEqualTo(Task.Result.COMPLETED); + assertThat(context.snapshot()) + .isEqualTo(Snapshot.builder() + .referenceSourceCount(0) + .blobCount(1) + .gcedBlobCount(1) + .errorCount(0) + .bloomFilterExpectedBlobCount(100) + .bloomFilterAssociatedProbability(0.8) + .build()); + assertThatThrownBy(() -> blobStore.read(DEFAULT_BUCKET, blobId)) + .isInstanceOf(ObjectNotFoundException.class); + } + + @Test + default void gcShouldNotRemoveUnExpireBlob() { + BlobStore blobStore = blobStore(); + BlobId blobId = Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block(); + when(BLOB_REFERENCE_SOURCE.listReferencedBlobs()).thenReturn(Flux.empty()); + + Context context = new Context(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY); + BloomFilterGCAlgorithm bloomFilterGCAlgorithm = bloomFilterGCAlgorithm(); + Task.Result result = Mono.from(bloomFilterGCAlgorithm.gc(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY, DEFAULT_BUCKET, context)).block(); + + assertThat(result).isEqualTo(Task.Result.COMPLETED); + assertThat(context.snapshot()) + .isEqualTo(Snapshot.builder() + .referenceSourceCount(0) + .blobCount(1) + .gcedBlobCount(0) + .errorCount(0) + .bloomFilterExpectedBlobCount(100) + .bloomFilterAssociatedProbability(0.8) + .build()); + assertThat(blobStore.read(DEFAULT_BUCKET, blobId)) + .isNotNull(); + } + + @RepeatedTest(10) + default void gcShouldNotRemoveReferencedBlob() { + BlobStore blobStore = blobStore(); + BlobId blobId = Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block(); + + when(BLOB_REFERENCE_SOURCE.listReferencedBlobs()).thenReturn(Flux.just(blobId)); + + Context context = new Context(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY); + BloomFilterGCAlgorithm bloomFilterGCAlgorithm = bloomFilterGCAlgorithm(); + Task.Result result = Mono.from(bloomFilterGCAlgorithm.gc(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY, DEFAULT_BUCKET, context)).block(); + + assertThat(result).isEqualTo(Task.Result.COMPLETED); + assertThat(context.snapshot()) + .isEqualTo(Snapshot.builder() + .referenceSourceCount(1) + .blobCount(1) + .gcedBlobCount(0) + .errorCount(0) + .bloomFilterExpectedBlobCount(100) + .bloomFilterAssociatedProbability(0.8) + .build()); + assertThat(blobStore.read(DEFAULT_BUCKET, blobId)) + .isNotNull(); + } + + @Test + default void gcShouldSuccessWhenMixCase() { + BlobStore blobStore = blobStore(); + List<BlobId> referencedBlobIds = IntStream.range(0, 100) + .mapToObj(index -> Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block()) + .collect(Collectors.toList()); + List<BlobId> orphanBlobIds = IntStream.range(0, 50) + .mapToObj(index -> Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block()) + .collect(Collectors.toList()); + + when(BLOB_REFERENCE_SOURCE.listReferencedBlobs()).thenReturn(Flux.fromIterable(referencedBlobIds)); + CLOCK.setInstant(NOW.plusMonths(2).toInstant()); + + Context context = new Context(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY); + BloomFilterGCAlgorithm bloomFilterGCAlgorithm = bloomFilterGCAlgorithm(); + Task.Result result = Mono.from(bloomFilterGCAlgorithm.gc(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY, DEFAULT_BUCKET, context)).block(); + + assertThat(result).isEqualTo(Task.Result.COMPLETED); + Context.Snapshot snapshot = context.snapshot(); + + assertThat(snapshot.getReferenceSourceCount()) + .isEqualTo(referencedBlobIds.size()); + assertThat(snapshot.getBlobCount()) + .isEqualTo(referencedBlobIds.size() + orphanBlobIds.size()); + + assertThat(snapshot.getGcedBlobCount()) + .isLessThanOrEqualTo(orphanBlobIds.size()) + .isGreaterThan(0); + + referencedBlobIds.forEach(blobId -> + assertThat(blobStore.read(DEFAULT_BUCKET, blobId)) + .isNotNull()); + } + + @Test + default void allOrphanBlobIdsShouldRemovedAfterMultipleRunningTimesGC() { + BlobStore blobStore = blobStore(); + List<BlobId> referencedBlobIds = IntStream.range(0, 100) + .mapToObj(index -> Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block()) + .collect(Collectors.toList()); + List<BlobId> orphanBlobIds = IntStream.range(0, 50) + .mapToObj(index -> Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block()) + .collect(Collectors.toList()); + + when(BLOB_REFERENCE_SOURCE.listReferencedBlobs()).thenReturn(Flux.fromIterable(referencedBlobIds)); + CLOCK.setInstant(NOW.plusMonths(2).toInstant()); + + CALMLY_AWAIT.untilAsserted(() -> { + Mono.from(bloomFilterGCAlgorithm().gc( + EXPECTED_BLOB_COUNT, + ASSOCIATED_PROBABILITY, + DEFAULT_BUCKET, + new Context(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY))) + .block(); + + orphanBlobIds.forEach(blobId -> + assertThatThrownBy(() -> blobStore.read(DEFAULT_BUCKET, blobId)) + .isInstanceOf(ObjectNotFoundException.class)); + }); + } + + @Test + default void gcShouldHandlerErrorWhenException() { + when(BLOB_REFERENCE_SOURCE.listReferencedBlobs()).thenReturn(Flux.empty()); + BlobStoreDAO blobStoreDAO = mock(BlobStoreDAO.class); + BlobId blobId = GENERATION_AWARE_BLOB_ID_FACTORY.randomId(); + when(blobStoreDAO.listBlobs(DEFAULT_BUCKET)).thenReturn(Flux.just(blobId)); + when(blobStoreDAO.delete(DEFAULT_BUCKET, blobId)).thenThrow(new RuntimeException("test")); + + CLOCK.setInstant(NOW.plusMonths(2).toInstant()); + + Context context = new Context(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY); + BloomFilterGCAlgorithm bloomFilterGCAlgorithm = new BloomFilterGCAlgorithm( + BLOB_REFERENCE_SOURCE, + blobStoreDAO, + GENERATION_AWARE_BLOB_ID_FACTORY, + GENERATION_AWARE_BLOB_ID_CONFIGURATION, + CLOCK); + Task.Result result = Mono.from(bloomFilterGCAlgorithm.gc(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY, DEFAULT_BUCKET, context)).block(); + + assertThat(result).isEqualTo(Task.Result.PARTIAL); + assertThat(context.snapshot()) + .isEqualTo(Snapshot.builder() + .referenceSourceCount(0) + .blobCount(1) + .gcedBlobCount(0) + .errorCount(1) + .bloomFilterExpectedBlobCount(100) + .bloomFilterAssociatedProbability(0.8) + .build()); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
