HoussemNasri commented on code in PR #2488: URL: https://github.com/apache/james-project/pull/2488#discussion_r1847911827
########## server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java: ########## @@ -166,64 +177,79 @@ private static class FluxResponse { } private Mono<FluxResponse> getObject(BucketName bucketName, BlobId blobId) { - return Mono.fromFuture(() -> - client.getObject( - builder -> builder.bucket(bucketName.asString()).key(blobId.asString()), - new AsyncResponseTransformer<GetObjectResponse, FluxResponse>() { - - FluxResponse response; - - @Override - public CompletableFuture<FluxResponse> prepare() { - response = new FluxResponse(); - return response.supportingCompletableFuture; - } - - @Override - public void onResponse(GetObjectResponse response) { - this.response.sdkResponse = response; - } - - @Override - public void exceptionOccurred(Throwable error) { - this.response.supportingCompletableFuture.completeExceptionally(error); - } - - @Override - public void onStream(SdkPublisher<ByteBuffer> publisher) { - response.flux = Flux.from(publisher); - response.supportingCompletableFuture.complete(response); - } - })) - .switchIfEmpty(Mono.error(() -> new ObjectStoreIOException("Request was unexpectedly canceled, no GetObjectResponse"))); + return buildGetObjectRequestBuilder(bucketName, blobId) + .flatMap(getObjectRequestBuilder -> Mono.fromFuture(() -> + client.getObject(getObjectRequestBuilder.build(), + new AsyncResponseTransformer<GetObjectResponse, FluxResponse>() { + + FluxResponse response; + + @Override + public CompletableFuture<FluxResponse> prepare() { + response = new FluxResponse(); + return response.supportingCompletableFuture; + } + + @Override + public void onResponse(GetObjectResponse response) { + this.response.sdkResponse = response; + } + + @Override + public void exceptionOccurred(Throwable error) { + this.response.supportingCompletableFuture.completeExceptionally(error); + } + + @Override + public void onStream(SdkPublisher<ByteBuffer> publisher) { + response.flux = Flux.from(publisher); + response.supportingCompletableFuture.complete(response); + } + })) + .switchIfEmpty(Mono.error(() -> new ObjectStoreIOException("Request was unexpectedly canceled, no GetObjectResponse")))); } @Override public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) { BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName); - return Mono.fromFuture(() -> - client.getObject( - builder -> builder.bucket(resolvedBucketName.asString()).key(blobId.asString()), - new MinimalCopyBytesResponseTransformer(configuration, blobId))) - .onErrorMap(NoSuchBucketException.class, e -> new ObjectNotFoundException("Bucket not found " + resolvedBucketName.asString(), e)) - .onErrorMap(NoSuchKeyException.class, e -> new ObjectNotFoundException("Blob not found " + blobId.asString() + " in bucket " + resolvedBucketName.asString(), e)) - .publishOn(Schedulers.parallel()) - .map(BytesWrapper::asByteArrayUnsafe) - .onErrorMap(e -> e.getCause() instanceof OutOfMemoryError, Throwable::getCause); + return buildGetObjectRequestBuilder(resolvedBucketName, blobId) + .flatMap(putObjectRequest -> Mono.fromFuture(() -> + client.getObject(putObjectRequest.build(), new MinimalCopyBytesResponseTransformer(configuration, blobId))) + .onErrorMap(NoSuchBucketException.class, e -> new ObjectNotFoundException("Bucket not found " + resolvedBucketName.asString(), e)) + .onErrorMap(NoSuchKeyException.class, e -> new ObjectNotFoundException("Blob not found " + blobId.asString() + " in bucket " + resolvedBucketName.asString(), e)) + .publishOn(Schedulers.parallel()) + .map(BytesWrapper::asByteArrayUnsafe) + .onErrorMap(e -> e.getCause() instanceof OutOfMemoryError, Throwable::getCause)); + } + + private Mono<GetObjectRequest.Builder> buildGetObjectRequestBuilder(BucketName bucketName, BlobId blobId) { + GetObjectRequest.Builder baseBuilder = GetObjectRequest.builder() + .bucket(bucketName.asString()) + .key(blobId.asString()); + + if (s3RequestOption.ssec().enable()) { + return Mono.from(s3RequestOption.ssec().sseCustomerKeyFactory().get() + .generate(bucketName, blobId)) + .map(sseCustomerKey -> baseBuilder + .sseCustomerAlgorithm(sseCustomerKey.ssecAlgorithm()) Review Comment: @quantranhong1999 @vttranlina It's clearer now, it should be good, thanks you for clarifying. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org