This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch postgresql in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 3cf17baede932b3e465da9227de2d0024ea46aec Author: TungTV <vtt...@linagora.com> AuthorDate: Thu Nov 28 09:05:41 2024 +0700 JAMES-2586 - PostgresBlobStoreDAO.listBlobs support batch/pagination query --- .../james/blob/postgres/PostgresBlobStoreDAO.java | 25 ++++- .../postgres/PostgresBlobStoreGCAlgorithmTest.java | 103 +++++++++++++++++++++ 2 files changed, 126 insertions(+), 2 deletions(-) diff --git a/server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStoreDAO.java b/server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStoreDAO.java index 22d2cb9dc8..5003f61179 100644 --- a/server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStoreDAO.java +++ b/server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStoreDAO.java @@ -29,16 +29,21 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; import jakarta.inject.Inject; import org.apache.commons.io.IOUtils; import org.apache.james.backends.postgres.utils.PostgresExecutor; +import org.apache.james.backends.postgres.utils.PostgresUtils; import org.apache.james.blob.api.BlobId; import org.apache.james.blob.api.BlobStoreDAO; import org.apache.james.blob.api.BucketName; import org.apache.james.blob.api.ObjectNotFoundException; import org.apache.james.blob.api.ObjectStoreIOException; +import org.jooq.impl.DSL; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -151,9 +156,25 @@ public class PostgresBlobStoreDAO implements BlobStoreDAO { @Override public Flux<BlobId> listBlobs(BucketName bucketName) { + return Flux.defer(() -> listBlobsBatch(bucketName, Optional.empty(), PostgresUtils.QUERY_BATCH_SIZE)) + .expand(blobIds -> { + if (blobIds.isEmpty() || blobIds.size() < PostgresUtils.QUERY_BATCH_SIZE) { + return Mono.empty(); + } + return listBlobsBatch(bucketName, Optional.of(blobIds.getLast()), PostgresUtils.QUERY_BATCH_SIZE); + }) + .flatMapIterable(Function.identity()); + } + + private Mono<List<BlobId>> listBlobsBatch(BucketName bucketName, Optional<BlobId> blobIdFrom, int batchSize) { return postgresExecutor.executeRows(dsl -> Flux.from(dsl.select(BLOB_ID) .from(TABLE_NAME) - .where(BUCKET_NAME.eq(bucketName.asString())))) - .map(record -> blobIdFactory.parse(record.get(BLOB_ID))); + .where(BUCKET_NAME.eq(bucketName.asString())) + .and(blobIdFrom.map(blobId -> BLOB_ID.greaterThan(blobId.asString())).orElseGet(DSL::noCondition)) + .orderBy(BLOB_ID.asc()) + .limit(batchSize))) + .map(record -> blobIdFactory.parse(record.get(BLOB_ID))) + .collectList() + .switchIfEmpty(Mono.just(ImmutableList.of())); } } diff --git a/server/blob/blob-postgres/src/test/java/org/apache/james/blob/postgres/PostgresBlobStoreGCAlgorithmTest.java b/server/blob/blob-postgres/src/test/java/org/apache/james/blob/postgres/PostgresBlobStoreGCAlgorithmTest.java new file mode 100644 index 0000000000..212966bf1d --- /dev/null +++ b/server/blob/blob-postgres/src/test/java/org/apache/james/blob/postgres/PostgresBlobStoreGCAlgorithmTest.java @@ -0,0 +1,103 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.blob.postgres; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.UUID; +import java.util.stream.IntStream; + +import org.apache.james.backends.postgres.PostgresExtension; +import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.BlobStore; +import org.apache.james.blob.api.BlobStoreDAO; +import org.apache.james.blob.api.PlainBlobId; +import org.apache.james.server.blob.deduplication.BloomFilterGCAlgorithm; +import org.apache.james.server.blob.deduplication.BloomFilterGCAlgorithmContract; +import org.apache.james.task.Task; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class PostgresBlobStoreGCAlgorithmTest implements BloomFilterGCAlgorithmContract { + + @RegisterExtension + static PostgresExtension postgresExtension = PostgresExtension.withoutRowLevelSecurity(PostgresBlobStorageModule.MODULE, PostgresExtension.PoolSize.LARGE); + private PostgresBlobStoreDAO blobStore; + + @BeforeAll + static void setUpClass() { + // We set the batch size to 10 to test the batching + System.setProperty("james.postgresql.query.batch.size", "10"); + } + + @AfterAll + static void tearDownClass() { + System.clearProperty("james.postgresql.query.batch.size"); + } + + @BeforeEach + void beforeEach() { + blobStore = new PostgresBlobStoreDAO(postgresExtension.getDefaultPostgresExecutor(), new PlainBlobId.Factory()); + } + + @Override + public BlobStoreDAO blobStoreDAO() { + return blobStore; + } + + @Test + void gcShouldSuccessWhenBatchSizeIsSmallerThanAllBlobEntries() { + BlobStore blobStore = blobStore(); + int orphanBlobCount = 200; + List<BlobId> referencedBlobIds = IntStream.range(0, 100) + .mapToObj(index -> Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block()) + .toList(); + List<BlobId> orphanBlobIds = IntStream.range(0, orphanBlobCount) + .mapToObj(index -> Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block()) + .toList(); + + when(BLOB_REFERENCE_SOURCE.listReferencedBlobs()).thenReturn(Flux.fromIterable(referencedBlobIds)); + CLOCK.setInstant(NOW.plusMonths(2).toInstant()); + + BloomFilterGCAlgorithm.Context context = new BloomFilterGCAlgorithm.Context(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY); + BloomFilterGCAlgorithm bloomFilterGCAlgorithm = bloomFilterGCAlgorithm(); + Task.Result result = Mono.from(bloomFilterGCAlgorithm.gc(EXPECTED_BLOB_COUNT, DELETION_WINDOW_SIZE, ASSOCIATED_PROBABILITY, DEFAULT_BUCKET, context)).block(); + + assertThat(result).isEqualTo(Task.Result.COMPLETED); + BloomFilterGCAlgorithm.Context.Snapshot snapshot = context.snapshot(); + + assertThat(snapshot.getReferenceSourceCount()) + .isEqualTo(referencedBlobIds.size()); + assertThat(snapshot.getBlobCount()) + .isEqualTo(referencedBlobIds.size() + orphanBlobIds.size()); + + assertThat(snapshot.getGcedBlobCount()) + .isLessThanOrEqualTo(orphanBlobIds.size()) + .isGreaterThan(0); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org