This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 416d0ce9582ddc4d59bfceb88bd9fde432a66b04
Author: Gautier DI FOLCO <[email protected]>
AuthorDate: Wed Jul 29 10:45:48 2020 +0200

    JAMES-3028 Use a pool for AWS S3 Client
---
 server/blob/blob-s3/pom.xml                        |  8 +++
 .../blob/objectstorage/aws/S3DumbBlobStore.java    | 71 ++++++++++++++--------
 .../modules/objectstorage/S3BlobStoreModule.java   | 10 +++
 3 files changed, 64 insertions(+), 25 deletions(-)

diff --git a/server/blob/blob-s3/pom.xml b/server/blob/blob-s3/pom.xml
index 02c7f61..b87695e 100644
--- a/server/blob/blob-s3/pom.xml
+++ b/server/blob/blob-s3/pom.xml
@@ -58,6 +58,10 @@
         </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-lifecycle-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
             <artifactId>james-server-util</artifactId>
         </dependency>
         <dependency>
@@ -88,6 +92,10 @@
             <artifactId>reactor-extra</artifactId>
         </dependency>
         <dependency>
+            <groupId>io.projectreactor.addons</groupId>
+            <artifactId>reactor-pool</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
         </dependency>
diff --git 
a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3DumbBlobStore.java
 
b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3DumbBlobStore.java
index 476f98a..7978ef0 100644
--- 
a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3DumbBlobStore.java
+++ 
b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3DumbBlobStore.java
@@ -25,6 +25,7 @@ import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.time.Duration;
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 
 import javax.annotation.PreDestroy;
@@ -36,6 +37,7 @@ import org.apache.james.blob.api.BucketName;
 import org.apache.james.blob.api.DumbBlobStore;
 import org.apache.james.blob.api.ObjectNotFoundException;
 import org.apache.james.blob.api.ObjectStoreIOException;
+import org.apache.james.lifecycle.api.Startable;
 import org.apache.james.util.DataChunker;
 import org.apache.james.util.ReactorUtils;
 
@@ -49,6 +51,8 @@ import com.google.common.io.FileBackedOutputStream;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
+import reactor.pool.InstrumentedPool;
+import reactor.pool.PoolBuilder;
 import reactor.retry.Retry;
 import reactor.retry.RetryWithAsyncCallback;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
@@ -72,7 +76,7 @@ import 
software.amazon.awssdk.services.s3.model.ObjectIdentifier;
 import software.amazon.awssdk.services.s3.model.PutObjectRequest;
 import software.amazon.awssdk.services.s3.model.S3Object;
 
-public class S3DumbBlobStore implements DumbBlobStore, Closeable {
+public class S3DumbBlobStore implements DumbBlobStore, Startable, Closeable {
 
     private static final int CHUNK_SIZE = 1024 * 1024;
     private static final int EMPTY_BUCKET_BATCH_SIZE = 1000;
@@ -82,14 +86,15 @@ public class S3DumbBlobStore implements DumbBlobStore, 
Closeable {
     private static final boolean LAZY = false;
     private static final int MAX_RETRIES = 5;
 
-    private final S3AsyncClient client;
+    private final InstrumentedPool<S3AsyncClient> clientPool;
 
     @Inject
     S3DumbBlobStore(AwsS3AuthConfiguration configuration, Region region) {
         S3Configuration pathStyleAccess = S3Configuration.builder()
             .pathStyleAccessEnabled(true)
             .build();
-        client = S3AsyncClient.builder()
+
+        Callable<S3AsyncClient> clientCreator = () -> S3AsyncClient.builder()
             .credentialsProvider(StaticCredentialsProvider.create(
                 AwsBasicCredentials.create(configuration.getAccessKeyId(), 
configuration.getSecretKey())))
             .httpClientBuilder(NettyNioAsyncHttpClient.builder()
@@ -99,12 +104,23 @@ public class S3DumbBlobStore implements DumbBlobStore, 
Closeable {
             .region(region.asAws())
             .serviceConfiguration(pathStyleAccess)
             .build();
+
+        clientPool = PoolBuilder.from(Mono.fromCallable(clientCreator))
+            .acquisitionScheduler(Schedulers.elastic())
+            .destroyHandler(client -> Mono.fromRunnable(client::close))
+            .maxPendingAcquireUnbounded()
+            .sizeUnbounded()
+            .fifo();
+    }
+
+    public void start() {
+        clientPool.warmup().block();
     }
 
     @Override
     @PreDestroy
     public void close() {
-        client.close();
+        clientPool.dispose();
     }
 
     @Override
@@ -123,7 +139,7 @@ public class S3DumbBlobStore implements DumbBlobStore, 
Closeable {
     }
 
     private Mono<FluxResponse> getObject(BucketName bucketName, BlobId blobId) 
{
-        return Mono.fromFuture(() ->
+        return clientPool.withPoolable(client -> Mono.fromFuture(() ->
             client.getObject(
                 builder -> 
builder.bucket(bucketName.asString()).key(blobId.asString()),
                 new AsyncResponseTransformer<GetObjectResponse, 
FluxResponse>() {
@@ -151,16 +167,18 @@ public class S3DumbBlobStore implements DumbBlobStore, 
Closeable {
                         response.flux = Flux.from(publisher);
                         
response.supportingCompletableFuture.complete(response);
                     }
-                }));
+                })))
+            .next();
     }
 
 
     @Override
     public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
-        return Mono.fromFuture(() ->
+        return clientPool.withPoolable(client -> Mono.fromFuture(() ->
                 client.getObject(
                     builder -> 
builder.bucket(bucketName.asString()).key(blobId.asString()),
-                    AsyncResponseTransformer.toBytes()))
+                    AsyncResponseTransformer.toBytes())))
+            .next()
             .onErrorMap(NoSuchBucketException.class, e -> new 
ObjectNotFoundException("Bucket not found " + bucketName, e))
             .onErrorMap(NoSuchKeyException.class, e -> new 
ObjectNotFoundException("Blob not found " + bucketName, e))
             .map(BytesWrapper::asByteArray);
@@ -168,10 +186,11 @@ public class S3DumbBlobStore implements DumbBlobStore, 
Closeable {
 
     @Override
     public Mono<Void> save(BucketName bucketName, BlobId blobId, byte[] data) {
-        return Mono.fromFuture(() ->
+        return clientPool.withPoolable(client -> Mono.fromFuture(() ->
                 client.putObject(
                     builder -> 
builder.bucket(bucketName.asString()).key(blobId.asString()).contentLength((long)
 data.length),
-                    AsyncRequestBody.fromBytes(data)))
+                    AsyncRequestBody.fromBytes(data))))
+            .next()
             .retryWhen(createBucketOnRetry(bucketName))
             .then();
     }
@@ -198,13 +217,13 @@ public class S3DumbBlobStore implements DumbBlobStore, 
Closeable {
     public Mono<Void> save(BucketName bucketName, BlobId blobId, ByteSource 
content) {
         return Mono.using(content::openStream,
             stream ->
-                Mono.fromFuture(() ->
+                clientPool.withPoolable(client -> Mono.fromFuture(() ->
                     client.putObject(
                         Throwing.<PutObjectRequest.Builder>consumer(
                             builder -> 
builder.bucket(bucketName.asString()).contentLength(content.size()).key(blobId.asString()))
                         .sneakyThrow(),
                         AsyncRequestBody.fromPublisher(
-                            DataChunker.chunkStream(stream, CHUNK_SIZE)))),
+                            DataChunker.chunkStream(stream, 
CHUNK_SIZE))))).next(),
             Throwing.consumer(InputStream::close),
             LAZY)
             .retryWhen(createBucketOnRetry(bucketName))
@@ -218,15 +237,16 @@ public class S3DumbBlobStore implements DumbBlobStore, 
Closeable {
             .exponentialBackoff(FIRST_BACK_OFF, FOREVER)
             .withBackoffScheduler(Schedulers.elastic())
             .retryMax(MAX_RETRIES)
-            .onRetryWithMono(retryContext -> Mono
+            .onRetryWithMono(retryContext -> clientPool.withPoolable(client -> 
Mono
                 .fromFuture(client.createBucket(builder -> 
builder.bucket(bucketName.asString())))
-                .onErrorResume(BucketAlreadyOwnedByYouException.class, e -> 
Mono.empty()));
+                .onErrorResume(BucketAlreadyOwnedByYouException.class, e -> 
Mono.empty())).next());
     }
 
     @Override
     public Mono<Void> delete(BucketName bucketName, BlobId blobId) {
-        return Mono.fromFuture(() ->
-                client.deleteObject(delete -> 
delete.bucket(bucketName.asString()).key(blobId.asString())))
+        return clientPool.withPoolable(client -> Mono.fromFuture(() ->
+                client.deleteObject(delete -> 
delete.bucket(bucketName.asString()).key(blobId.asString()))))
+            .next()
             .then()
             .onErrorResume(NoSuchBucketException.class, e -> Mono.empty());
     }
@@ -235,14 +255,16 @@ public class S3DumbBlobStore implements DumbBlobStore, 
Closeable {
     public Mono<Void> deleteBucket(BucketName bucketName) {
         return emptyBucket(bucketName)
             .onErrorResume(t -> Mono.just(bucketName))
-            .flatMap(ignore -> Mono.fromFuture(() -> 
client.deleteBucket(builder -> builder.bucket(bucketName.asString()))))
+            .flatMap(ignore -> clientPool.withPoolable(client -> 
Mono.fromFuture(() ->
+                client.deleteBucket(builder -> 
builder.bucket(bucketName.asString()))))
+                .next())
             .onErrorResume(t -> Mono.empty())
             .then();
     }
 
     private Mono<BucketName> emptyBucket(BucketName bucketName) {
-        return Mono.fromFuture(() -> client.listObjects(builder -> 
builder.bucket(bucketName.asString())))
-            .flatMapIterable(ListObjectsResponse::contents)
+        return clientPool.withPoolable(client -> Mono.fromFuture(() -> 
client.listObjects(builder -> builder.bucket(bucketName.asString())))
+            .flatMapIterable(ListObjectsResponse::contents))
             .window(EMPTY_BUCKET_BATCH_SIZE)
             .flatMap(this::buildListForBatch)
             .flatMap(identifiers -> deleteObjects(bucketName, identifiers))
@@ -256,17 +278,16 @@ public class S3DumbBlobStore implements DumbBlobStore, 
Closeable {
     }
 
     private Mono<DeleteObjectsResponse> deleteObjects(BucketName bucketName, 
List<ObjectIdentifier> identifiers) {
-        return Mono.fromFuture(() -> client.deleteObjects(builder ->
-            builder.bucket(bucketName.asString()).delete(delete -> 
delete.objects(identifiers))));
+        return clientPool.withPoolable(client -> Mono.fromFuture(() -> 
client.deleteObjects(builder ->
+            builder.bucket(bucketName.asString()).delete(delete -> 
delete.objects(identifiers)))))
+            .next();
     }
 
     @VisibleForTesting
     public Mono<Void> deleteAllBuckets() {
-        return Mono.fromFuture(client::listBuckets)
+        return clientPool.withPoolable(client -> 
Mono.fromFuture(client::listBuckets)
                 .flatMapIterable(ListBucketsResponse::buckets)
-                     .flatMap(bucket -> 
deleteBucket(BucketName.of(bucket.name())))
+                     .flatMap(bucket -> 
deleteBucket(BucketName.of(bucket.name()))))
             .then();
     }
-
-
 }
diff --git 
a/server/container/guice/blob-s3-guice/src/main/java/org/apache/james/modules/objectstorage/S3BlobStoreModule.java
 
b/server/container/guice/blob-s3-guice/src/main/java/org/apache/james/modules/objectstorage/S3BlobStoreModule.java
index 7c1092c..f56892c 100644
--- 
a/server/container/guice/blob-s3-guice/src/main/java/org/apache/james/modules/objectstorage/S3BlobStoreModule.java
+++ 
b/server/container/guice/blob-s3-guice/src/main/java/org/apache/james/modules/objectstorage/S3BlobStoreModule.java
@@ -29,11 +29,15 @@ import org.apache.james.blob.api.BucketName;
 import org.apache.james.blob.objectstorage.aws.AwsS3AuthConfiguration;
 import org.apache.james.blob.objectstorage.aws.Region;
 import org.apache.james.blob.objectstorage.aws.S3BlobStoreConfiguration;
+import org.apache.james.blob.objectstorage.aws.S3DumbBlobStore;
 import org.apache.james.modules.mailbox.ConfigurationComponent;
+import org.apache.james.utils.InitializationOperation;
+import org.apache.james.utils.InitilizationOperationBuilder;
 import org.apache.james.utils.PropertiesProvider;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Provides;
+import com.google.inject.multibindings.ProvidesIntoSet;
 
 public class S3BlobStoreModule extends AbstractModule {
 
@@ -66,4 +70,10 @@ public class S3BlobStoreModule extends AbstractModule {
         return s3BlobStoreConfiguration.getRegion();
     }
 
+    @ProvidesIntoSet
+    InitializationOperation startS3DumbBlobStore(S3DumbBlobStore 
s3DumbBlobStore) {
+        return InitilizationOperationBuilder
+            .forClass(S3DumbBlobStore.class)
+            .init(s3DumbBlobStore::start);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to