This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 3eaef14ce3313005fdd0e2ecba6184da4e663a1b
Author: TungTV <vtt...@linagora.com>
AuthorDate: Tue Nov 19 10:17:14 2024 +0700

    JAMES-4085 [S3 SSEC] - S3BlobStoreDAO - extract build Get/Put 
ObjectRequestBuilder
---
 .../blob/objectstorage/aws/S3BlobStoreDAO.java     | 113 +++++++++++----------
 1 file changed, 62 insertions(+), 51 deletions(-)

diff --git 
a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
 
b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
index 74ac19a483..9b4b993de3 100644
--- 
a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
+++ 
b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
@@ -63,12 +63,14 @@ import software.amazon.awssdk.services.s3.S3AsyncClient;
 import software.amazon.awssdk.services.s3.model.Bucket;
 import 
software.amazon.awssdk.services.s3.model.BucketAlreadyOwnedByYouException;
 import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
 import software.amazon.awssdk.services.s3.model.GetObjectResponse;
 import software.amazon.awssdk.services.s3.model.ListBucketsResponse;
 import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
 import software.amazon.awssdk.services.s3.model.NoSuchBucketException;
 import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
 import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
 import software.amazon.awssdk.services.s3.model.PutObjectResponse;
 import software.amazon.awssdk.services.s3.model.S3Object;
 
@@ -166,36 +168,36 @@ public class S3BlobStoreDAO implements BlobStoreDAO {
     }
 
     private Mono<FluxResponse> getObject(BucketName bucketName, BlobId blobId) 
{
-        return Mono.fromFuture(() ->
-            client.getObject(
-                builder -> 
builder.bucket(bucketName.asString()).key(blobId.asString()),
-                new AsyncResponseTransformer<GetObjectResponse, 
FluxResponse>() {
-
-                    FluxResponse response;
-
-                    @Override
-                    public CompletableFuture<FluxResponse> prepare() {
-                        response = new FluxResponse();
-                        return response.supportingCompletableFuture;
-                    }
-
-                    @Override
-                    public void onResponse(GetObjectResponse response) {
-                        this.response.sdkResponse = response;
-                    }
-
-                    @Override
-                    public void exceptionOccurred(Throwable error) {
-                        
this.response.supportingCompletableFuture.completeExceptionally(error);
-                    }
-
-                    @Override
-                    public void onStream(SdkPublisher<ByteBuffer> publisher) {
-                        response.flux = Flux.from(publisher);
-                        
response.supportingCompletableFuture.complete(response);
-                    }
-                }))
-            .switchIfEmpty(Mono.error(() -> new 
ObjectStoreIOException("Request was unexpectedly canceled, no 
GetObjectResponse")));
+        return buildGetObjectRequestBuilder(bucketName, blobId)
+            .flatMap(getObjectRequestBuilder -> Mono.fromFuture(() ->
+                    client.getObject(getObjectRequestBuilder.build(),
+                        new AsyncResponseTransformer<GetObjectResponse, 
FluxResponse>() {
+
+                            FluxResponse response;
+
+                            @Override
+                            public CompletableFuture<FluxResponse> prepare() {
+                                response = new FluxResponse();
+                                return response.supportingCompletableFuture;
+                            }
+
+                            @Override
+                            public void onResponse(GetObjectResponse response) 
{
+                                this.response.sdkResponse = response;
+                            }
+
+                            @Override
+                            public void exceptionOccurred(Throwable error) {
+                                
this.response.supportingCompletableFuture.completeExceptionally(error);
+                            }
+
+                            @Override
+                            public void onStream(SdkPublisher<ByteBuffer> 
publisher) {
+                                response.flux = Flux.from(publisher);
+                                
response.supportingCompletableFuture.complete(response);
+                            }
+                        }))
+                .switchIfEmpty(Mono.error(() -> new 
ObjectStoreIOException("Request was unexpectedly canceled, no 
GetObjectResponse"))));
     }
 
 
@@ -203,27 +205,31 @@ public class S3BlobStoreDAO implements BlobStoreDAO {
     public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
         BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName);
 
-        return Mono.fromFuture(() ->
-                client.getObject(
-                    builder -> 
builder.bucket(resolvedBucketName.asString()).key(blobId.asString()),
-                    new MinimalCopyBytesResponseTransformer(configuration, 
blobId)))
-            .onErrorMap(NoSuchBucketException.class, e -> new 
ObjectNotFoundException("Bucket not found " + resolvedBucketName.asString(), e))
-            .onErrorMap(NoSuchKeyException.class, e -> new 
ObjectNotFoundException("Blob not found " + blobId.asString() + " in bucket " + 
resolvedBucketName.asString(), e))
-            .publishOn(Schedulers.parallel())
-            .map(BytesWrapper::asByteArrayUnsafe)
-            .onErrorMap(e -> e.getCause() instanceof OutOfMemoryError, 
Throwable::getCause);
+        return buildGetObjectRequestBuilder(resolvedBucketName, blobId)
+            .flatMap(putObjectRequest -> Mono.fromFuture(() ->
+                    client.getObject(putObjectRequest.build(), new 
MinimalCopyBytesResponseTransformer(configuration, blobId)))
+                .onErrorMap(NoSuchBucketException.class, e -> new 
ObjectNotFoundException("Bucket not found " + resolvedBucketName.asString(), e))
+                .onErrorMap(NoSuchKeyException.class, e -> new 
ObjectNotFoundException("Blob not found " + blobId.asString() + " in bucket " + 
resolvedBucketName.asString(), e))
+                .publishOn(Schedulers.parallel())
+                .map(BytesWrapper::asByteArrayUnsafe)
+                .onErrorMap(e -> e.getCause() instanceof OutOfMemoryError, 
Throwable::getCause));
+    }
+
+    private Mono<GetObjectRequest.Builder> 
buildGetObjectRequestBuilder(BucketName bucketName, BlobId blobId) {
+        return Mono.just(GetObjectRequest.builder()
+            .bucket(bucketName.asString())
+            .key(blobId.asString()));
     }
 
     @Override
     public Mono<Void> save(BucketName bucketName, BlobId blobId, byte[] data) {
         BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName);
 
-        return Mono.fromFuture(() ->
-                client.putObject(
-                    builder -> 
builder.bucket(resolvedBucketName.asString()).key(blobId.asString()).contentLength((long)
 data.length),
-                    AsyncRequestBody.fromBytes(data)))
-            .retryWhen(createBucketOnRetry(resolvedBucketName))
-            .publishOn(Schedulers.parallel())
+        return buildPutObjectRequestBuilder(resolvedBucketName, data.length, 
blobId)
+            .flatMap(putObjectRequest -> Mono.fromFuture(() ->
+                    client.putObject(putObjectRequest.build(), 
AsyncRequestBody.fromBytes(data)))
+                .retryWhen(createBucketOnRetry(resolvedBucketName))
+                .publishOn(Schedulers.parallel()))
             .then();
     }
 
@@ -266,12 +272,17 @@ public class S3BlobStoreDAO implements BlobStoreDAO {
     private Mono<PutObjectResponse> save(BucketName resolvedBucketName, BlobId 
blobId, InputStream stream, long contentLength) {
         int chunkSize = Math.min((int) contentLength, CHUNK_SIZE);
 
-        return Mono.fromFuture(() -> client.putObject(builder -> builder
-                .bucket(resolvedBucketName.asString())
-                .contentLength(contentLength)
-                .key(blobId.asString()),
-            AsyncRequestBody.fromPublisher(chunkStream(chunkSize, stream)
-                .subscribeOn(Schedulers.boundedElastic()))));
+        return buildPutObjectRequestBuilder(resolvedBucketName, contentLength, 
blobId)
+            .flatMap(putObjectRequest -> Mono.fromFuture(() -> 
client.putObject(putObjectRequest.build(),
+                AsyncRequestBody.fromPublisher(chunkStream(chunkSize, stream)
+                    .subscribeOn(Schedulers.boundedElastic())))));
+    }
+
+    private Mono<PutObjectRequest.Builder> 
buildPutObjectRequestBuilder(BucketName bucketName, long contentLength, BlobId 
blobId) {
+        return Mono.just(PutObjectRequest.builder()
+            .bucket(bucketName.asString())
+            .contentLength(contentLength)
+            .key(blobId.asString()));
     }
 
     private Flux<ByteBuffer> chunkStream(int chunkSize, InputStream stream) {


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to