This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 3eaef14ce3313005fdd0e2ecba6184da4e663a1b Author: TungTV <vtt...@linagora.com> AuthorDate: Tue Nov 19 10:17:14 2024 +0700 JAMES-4085 [S3 SSEC] - S3BlobStoreDAO - extract build Get/Put ObjectRequestBuilder --- .../blob/objectstorage/aws/S3BlobStoreDAO.java | 113 +++++++++++---------- 1 file changed, 62 insertions(+), 51 deletions(-) diff --git a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java index 74ac19a483..9b4b993de3 100644 --- a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java +++ b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java @@ -63,12 +63,14 @@ import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.Bucket; import software.amazon.awssdk.services.s3.model.BucketAlreadyOwnedByYouException; import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.services.s3.model.ListBucketsResponse; import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; import software.amazon.awssdk.services.s3.model.NoSuchBucketException; import software.amazon.awssdk.services.s3.model.NoSuchKeyException; import software.amazon.awssdk.services.s3.model.ObjectIdentifier; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectResponse; import software.amazon.awssdk.services.s3.model.S3Object; @@ -166,36 +168,36 @@ public class S3BlobStoreDAO implements BlobStoreDAO { } private Mono<FluxResponse> getObject(BucketName bucketName, BlobId blobId) { - return Mono.fromFuture(() -> - client.getObject( - builder -> builder.bucket(bucketName.asString()).key(blobId.asString()), - new AsyncResponseTransformer<GetObjectResponse, FluxResponse>() { - - FluxResponse response; - - @Override - public CompletableFuture<FluxResponse> prepare() { - response = new FluxResponse(); - return response.supportingCompletableFuture; - } - - @Override - public void onResponse(GetObjectResponse response) { - this.response.sdkResponse = response; - } - - @Override - public void exceptionOccurred(Throwable error) { - this.response.supportingCompletableFuture.completeExceptionally(error); - } - - @Override - public void onStream(SdkPublisher<ByteBuffer> publisher) { - response.flux = Flux.from(publisher); - response.supportingCompletableFuture.complete(response); - } - })) - .switchIfEmpty(Mono.error(() -> new ObjectStoreIOException("Request was unexpectedly canceled, no GetObjectResponse"))); + return buildGetObjectRequestBuilder(bucketName, blobId) + .flatMap(getObjectRequestBuilder -> Mono.fromFuture(() -> + client.getObject(getObjectRequestBuilder.build(), + new AsyncResponseTransformer<GetObjectResponse, FluxResponse>() { + + FluxResponse response; + + @Override + public CompletableFuture<FluxResponse> prepare() { + response = new FluxResponse(); + return response.supportingCompletableFuture; + } + + @Override + public void onResponse(GetObjectResponse response) { + this.response.sdkResponse = response; + } + + @Override + public void exceptionOccurred(Throwable error) { + this.response.supportingCompletableFuture.completeExceptionally(error); + } + + @Override + public void onStream(SdkPublisher<ByteBuffer> publisher) { + response.flux = Flux.from(publisher); + response.supportingCompletableFuture.complete(response); + } + })) + .switchIfEmpty(Mono.error(() -> new ObjectStoreIOException("Request was unexpectedly canceled, no GetObjectResponse")))); } @@ -203,27 +205,31 @@ public class S3BlobStoreDAO implements BlobStoreDAO { public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) { BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName); - return Mono.fromFuture(() -> - client.getObject( - builder -> builder.bucket(resolvedBucketName.asString()).key(blobId.asString()), - new MinimalCopyBytesResponseTransformer(configuration, blobId))) - .onErrorMap(NoSuchBucketException.class, e -> new ObjectNotFoundException("Bucket not found " + resolvedBucketName.asString(), e)) - .onErrorMap(NoSuchKeyException.class, e -> new ObjectNotFoundException("Blob not found " + blobId.asString() + " in bucket " + resolvedBucketName.asString(), e)) - .publishOn(Schedulers.parallel()) - .map(BytesWrapper::asByteArrayUnsafe) - .onErrorMap(e -> e.getCause() instanceof OutOfMemoryError, Throwable::getCause); + return buildGetObjectRequestBuilder(resolvedBucketName, blobId) + .flatMap(putObjectRequest -> Mono.fromFuture(() -> + client.getObject(putObjectRequest.build(), new MinimalCopyBytesResponseTransformer(configuration, blobId))) + .onErrorMap(NoSuchBucketException.class, e -> new ObjectNotFoundException("Bucket not found " + resolvedBucketName.asString(), e)) + .onErrorMap(NoSuchKeyException.class, e -> new ObjectNotFoundException("Blob not found " + blobId.asString() + " in bucket " + resolvedBucketName.asString(), e)) + .publishOn(Schedulers.parallel()) + .map(BytesWrapper::asByteArrayUnsafe) + .onErrorMap(e -> e.getCause() instanceof OutOfMemoryError, Throwable::getCause)); + } + + private Mono<GetObjectRequest.Builder> buildGetObjectRequestBuilder(BucketName bucketName, BlobId blobId) { + return Mono.just(GetObjectRequest.builder() + .bucket(bucketName.asString()) + .key(blobId.asString())); } @Override public Mono<Void> save(BucketName bucketName, BlobId blobId, byte[] data) { BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName); - return Mono.fromFuture(() -> - client.putObject( - builder -> builder.bucket(resolvedBucketName.asString()).key(blobId.asString()).contentLength((long) data.length), - AsyncRequestBody.fromBytes(data))) - .retryWhen(createBucketOnRetry(resolvedBucketName)) - .publishOn(Schedulers.parallel()) + return buildPutObjectRequestBuilder(resolvedBucketName, data.length, blobId) + .flatMap(putObjectRequest -> Mono.fromFuture(() -> + client.putObject(putObjectRequest.build(), AsyncRequestBody.fromBytes(data))) + .retryWhen(createBucketOnRetry(resolvedBucketName)) + .publishOn(Schedulers.parallel())) .then(); } @@ -266,12 +272,17 @@ public class S3BlobStoreDAO implements BlobStoreDAO { private Mono<PutObjectResponse> save(BucketName resolvedBucketName, BlobId blobId, InputStream stream, long contentLength) { int chunkSize = Math.min((int) contentLength, CHUNK_SIZE); - return Mono.fromFuture(() -> client.putObject(builder -> builder - .bucket(resolvedBucketName.asString()) - .contentLength(contentLength) - .key(blobId.asString()), - AsyncRequestBody.fromPublisher(chunkStream(chunkSize, stream) - .subscribeOn(Schedulers.boundedElastic())))); + return buildPutObjectRequestBuilder(resolvedBucketName, contentLength, blobId) + .flatMap(putObjectRequest -> Mono.fromFuture(() -> client.putObject(putObjectRequest.build(), + AsyncRequestBody.fromPublisher(chunkStream(chunkSize, stream) + .subscribeOn(Schedulers.boundedElastic()))))); + } + + private Mono<PutObjectRequest.Builder> buildPutObjectRequestBuilder(BucketName bucketName, long contentLength, BlobId blobId) { + return Mono.just(PutObjectRequest.builder() + .bucket(bucketName.asString()) + .contentLength(contentLength) + .key(blobId.asString())); } private Flux<ByteBuffer> chunkStream(int chunkSize, InputStream stream) { --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org