This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch postgresql
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 3cf17baede932b3e465da9227de2d0024ea46aec
Author: TungTV <vtt...@linagora.com>
AuthorDate: Thu Nov 28 09:05:41 2024 +0700

    JAMES-2586 - PostgresBlobStoreDAO.listBlobs support batch/pagination query
---
 .../james/blob/postgres/PostgresBlobStoreDAO.java  |  25 ++++-
 .../postgres/PostgresBlobStoreGCAlgorithmTest.java | 103 +++++++++++++++++++++
 2 files changed, 126 insertions(+), 2 deletions(-)

diff --git 
a/server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStoreDAO.java
 
b/server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStoreDAO.java
index 22d2cb9dc8..5003f61179 100644
--- 
a/server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStoreDAO.java
+++ 
b/server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStoreDAO.java
@@ -29,16 +29,21 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
 
 import jakarta.inject.Inject;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.james.backends.postgres.utils.PostgresExecutor;
+import org.apache.james.backends.postgres.utils.PostgresUtils;
 import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.BlobStoreDAO;
 import org.apache.james.blob.api.BucketName;
 import org.apache.james.blob.api.ObjectNotFoundException;
 import org.apache.james.blob.api.ObjectStoreIOException;
+import org.jooq.impl.DSL;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
@@ -151,9 +156,25 @@ public class PostgresBlobStoreDAO implements BlobStoreDAO {
 
     @Override
     public Flux<BlobId> listBlobs(BucketName bucketName) {
+        return Flux.defer(() -> listBlobsBatch(bucketName, Optional.empty(), 
PostgresUtils.QUERY_BATCH_SIZE))
+            .expand(blobIds -> {
+                if (blobIds.isEmpty() || blobIds.size() < 
PostgresUtils.QUERY_BATCH_SIZE) {
+                    return Mono.empty();
+                }
+                return listBlobsBatch(bucketName, 
Optional.of(blobIds.getLast()), PostgresUtils.QUERY_BATCH_SIZE);
+            })
+            .flatMapIterable(Function.identity());
+    }
+
+    private Mono<List<BlobId>> listBlobsBatch(BucketName bucketName, 
Optional<BlobId> blobIdFrom, int batchSize) {
         return postgresExecutor.executeRows(dsl -> 
Flux.from(dsl.select(BLOB_ID)
                 .from(TABLE_NAME)
-                .where(BUCKET_NAME.eq(bucketName.asString()))))
-            .map(record -> blobIdFactory.parse(record.get(BLOB_ID)));
+                .where(BUCKET_NAME.eq(bucketName.asString()))
+                .and(blobIdFrom.map(blobId -> 
BLOB_ID.greaterThan(blobId.asString())).orElseGet(DSL::noCondition))
+                .orderBy(BLOB_ID.asc())
+                .limit(batchSize)))
+            .map(record -> blobIdFactory.parse(record.get(BLOB_ID)))
+            .collectList()
+            .switchIfEmpty(Mono.just(ImmutableList.of()));
     }
 }
diff --git 
a/server/blob/blob-postgres/src/test/java/org/apache/james/blob/postgres/PostgresBlobStoreGCAlgorithmTest.java
 
b/server/blob/blob-postgres/src/test/java/org/apache/james/blob/postgres/PostgresBlobStoreGCAlgorithmTest.java
new file mode 100644
index 0000000000..212966bf1d
--- /dev/null
+++ 
b/server/blob/blob-postgres/src/test/java/org/apache/james/blob/postgres/PostgresBlobStoreGCAlgorithmTest.java
@@ -0,0 +1,103 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.blob.postgres;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.IntStream;
+
+import org.apache.james.backends.postgres.PostgresExtension;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.blob.api.PlainBlobId;
+import org.apache.james.server.blob.deduplication.BloomFilterGCAlgorithm;
+import 
org.apache.james.server.blob.deduplication.BloomFilterGCAlgorithmContract;
+import org.apache.james.task.Task;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class PostgresBlobStoreGCAlgorithmTest implements 
BloomFilterGCAlgorithmContract {
+
+    @RegisterExtension
+    static PostgresExtension postgresExtension = 
PostgresExtension.withoutRowLevelSecurity(PostgresBlobStorageModule.MODULE, 
PostgresExtension.PoolSize.LARGE);
+    private PostgresBlobStoreDAO blobStore;
+
+    @BeforeAll
+    static void setUpClass() {
+        // We set the batch size to 10 to test the batching
+        System.setProperty("james.postgresql.query.batch.size", "10");
+    }
+
+    @AfterAll
+    static void tearDownClass() {
+        System.clearProperty("james.postgresql.query.batch.size");
+    }
+
+    @BeforeEach
+    void beforeEach() {
+        blobStore = new 
PostgresBlobStoreDAO(postgresExtension.getDefaultPostgresExecutor(), new 
PlainBlobId.Factory());
+    }
+
+    @Override
+    public BlobStoreDAO blobStoreDAO() {
+        return blobStore;
+    }
+
+    @Test
+    void gcShouldSuccessWhenBatchSizeIsSmallerThanAllBlobEntries() {
+        BlobStore blobStore = blobStore();
+        int orphanBlobCount = 200;
+        List<BlobId> referencedBlobIds = IntStream.range(0, 100)
+            .mapToObj(index -> Mono.from(blobStore.save(DEFAULT_BUCKET, 
UUID.randomUUID().toString(), 
BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block())
+            .toList();
+        List<BlobId> orphanBlobIds = IntStream.range(0, orphanBlobCount)
+            .mapToObj(index -> Mono.from(blobStore.save(DEFAULT_BUCKET, 
UUID.randomUUID().toString(), 
BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block())
+            .toList();
+
+        
when(BLOB_REFERENCE_SOURCE.listReferencedBlobs()).thenReturn(Flux.fromIterable(referencedBlobIds));
+        CLOCK.setInstant(NOW.plusMonths(2).toInstant());
+
+        BloomFilterGCAlgorithm.Context context = new 
BloomFilterGCAlgorithm.Context(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY);
+        BloomFilterGCAlgorithm bloomFilterGCAlgorithm = 
bloomFilterGCAlgorithm();
+        Task.Result result = 
Mono.from(bloomFilterGCAlgorithm.gc(EXPECTED_BLOB_COUNT, DELETION_WINDOW_SIZE, 
ASSOCIATED_PROBABILITY, DEFAULT_BUCKET, context)).block();
+
+        assertThat(result).isEqualTo(Task.Result.COMPLETED);
+        BloomFilterGCAlgorithm.Context.Snapshot snapshot = context.snapshot();
+
+        assertThat(snapshot.getReferenceSourceCount())
+            .isEqualTo(referencedBlobIds.size());
+        assertThat(snapshot.getBlobCount())
+            .isEqualTo(referencedBlobIds.size() + orphanBlobIds.size());
+
+        assertThat(snapshot.getGcedBlobCount())
+            .isLessThanOrEqualTo(orphanBlobIds.size())
+            .isGreaterThan(0);
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to