This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 416d0ce9582ddc4d59bfceb88bd9fde432a66b04 Author: Gautier DI FOLCO <[email protected]> AuthorDate: Wed Jul 29 10:45:48 2020 +0200 JAMES-3028 Use a pool for AWS S3 Client --- server/blob/blob-s3/pom.xml | 8 +++ .../blob/objectstorage/aws/S3DumbBlobStore.java | 71 ++++++++++++++-------- .../modules/objectstorage/S3BlobStoreModule.java | 10 +++ 3 files changed, 64 insertions(+), 25 deletions(-) diff --git a/server/blob/blob-s3/pom.xml b/server/blob/blob-s3/pom.xml index 02c7f61..b87695e 100644 --- a/server/blob/blob-s3/pom.xml +++ b/server/blob/blob-s3/pom.xml @@ -58,6 +58,10 @@ </dependency> <dependency> <groupId>${james.groupId}</groupId> + <artifactId>james-server-lifecycle-api</artifactId> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> <artifactId>james-server-util</artifactId> </dependency> <dependency> @@ -88,6 +92,10 @@ <artifactId>reactor-extra</artifactId> </dependency> <dependency> + <groupId>io.projectreactor.addons</groupId> + <artifactId>reactor-pool</artifactId> + </dependency> + <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> diff --git a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3DumbBlobStore.java b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3DumbBlobStore.java index 476f98a..7978ef0 100644 --- a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3DumbBlobStore.java +++ b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3DumbBlobStore.java @@ -25,6 +25,7 @@ import java.io.InputStream; import java.nio.ByteBuffer; import java.time.Duration; import java.util.List; +import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import javax.annotation.PreDestroy; @@ -36,6 +37,7 @@ import org.apache.james.blob.api.BucketName; import org.apache.james.blob.api.DumbBlobStore; import org.apache.james.blob.api.ObjectNotFoundException; import org.apache.james.blob.api.ObjectStoreIOException; +import org.apache.james.lifecycle.api.Startable; import org.apache.james.util.DataChunker; import org.apache.james.util.ReactorUtils; @@ -49,6 +51,8 @@ import com.google.common.io.FileBackedOutputStream; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; +import reactor.pool.InstrumentedPool; +import reactor.pool.PoolBuilder; import reactor.retry.Retry; import reactor.retry.RetryWithAsyncCallback; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; @@ -72,7 +76,7 @@ import software.amazon.awssdk.services.s3.model.ObjectIdentifier; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.S3Object; -public class S3DumbBlobStore implements DumbBlobStore, Closeable { +public class S3DumbBlobStore implements DumbBlobStore, Startable, Closeable { private static final int CHUNK_SIZE = 1024 * 1024; private static final int EMPTY_BUCKET_BATCH_SIZE = 1000; @@ -82,14 +86,15 @@ public class S3DumbBlobStore implements DumbBlobStore, Closeable { private static final boolean LAZY = false; private static final int MAX_RETRIES = 5; - private final S3AsyncClient client; + private final InstrumentedPool<S3AsyncClient> clientPool; @Inject S3DumbBlobStore(AwsS3AuthConfiguration configuration, Region region) { S3Configuration pathStyleAccess = S3Configuration.builder() .pathStyleAccessEnabled(true) .build(); - client = S3AsyncClient.builder() + + Callable<S3AsyncClient> clientCreator = () -> S3AsyncClient.builder() .credentialsProvider(StaticCredentialsProvider.create( AwsBasicCredentials.create(configuration.getAccessKeyId(), configuration.getSecretKey()))) .httpClientBuilder(NettyNioAsyncHttpClient.builder() @@ -99,12 +104,23 @@ public class S3DumbBlobStore implements DumbBlobStore, Closeable { .region(region.asAws()) .serviceConfiguration(pathStyleAccess) .build(); + + clientPool = PoolBuilder.from(Mono.fromCallable(clientCreator)) + .acquisitionScheduler(Schedulers.elastic()) + .destroyHandler(client -> Mono.fromRunnable(client::close)) + .maxPendingAcquireUnbounded() + .sizeUnbounded() + .fifo(); + } + + public void start() { + clientPool.warmup().block(); } @Override @PreDestroy public void close() { - client.close(); + clientPool.dispose(); } @Override @@ -123,7 +139,7 @@ public class S3DumbBlobStore implements DumbBlobStore, Closeable { } private Mono<FluxResponse> getObject(BucketName bucketName, BlobId blobId) { - return Mono.fromFuture(() -> + return clientPool.withPoolable(client -> Mono.fromFuture(() -> client.getObject( builder -> builder.bucket(bucketName.asString()).key(blobId.asString()), new AsyncResponseTransformer<GetObjectResponse, FluxResponse>() { @@ -151,16 +167,18 @@ public class S3DumbBlobStore implements DumbBlobStore, Closeable { response.flux = Flux.from(publisher); response.supportingCompletableFuture.complete(response); } - })); + }))) + .next(); } @Override public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) { - return Mono.fromFuture(() -> + return clientPool.withPoolable(client -> Mono.fromFuture(() -> client.getObject( builder -> builder.bucket(bucketName.asString()).key(blobId.asString()), - AsyncResponseTransformer.toBytes())) + AsyncResponseTransformer.toBytes()))) + .next() .onErrorMap(NoSuchBucketException.class, e -> new ObjectNotFoundException("Bucket not found " + bucketName, e)) .onErrorMap(NoSuchKeyException.class, e -> new ObjectNotFoundException("Blob not found " + bucketName, e)) .map(BytesWrapper::asByteArray); @@ -168,10 +186,11 @@ public class S3DumbBlobStore implements DumbBlobStore, Closeable { @Override public Mono<Void> save(BucketName bucketName, BlobId blobId, byte[] data) { - return Mono.fromFuture(() -> + return clientPool.withPoolable(client -> Mono.fromFuture(() -> client.putObject( builder -> builder.bucket(bucketName.asString()).key(blobId.asString()).contentLength((long) data.length), - AsyncRequestBody.fromBytes(data))) + AsyncRequestBody.fromBytes(data)))) + .next() .retryWhen(createBucketOnRetry(bucketName)) .then(); } @@ -198,13 +217,13 @@ public class S3DumbBlobStore implements DumbBlobStore, Closeable { public Mono<Void> save(BucketName bucketName, BlobId blobId, ByteSource content) { return Mono.using(content::openStream, stream -> - Mono.fromFuture(() -> + clientPool.withPoolable(client -> Mono.fromFuture(() -> client.putObject( Throwing.<PutObjectRequest.Builder>consumer( builder -> builder.bucket(bucketName.asString()).contentLength(content.size()).key(blobId.asString())) .sneakyThrow(), AsyncRequestBody.fromPublisher( - DataChunker.chunkStream(stream, CHUNK_SIZE)))), + DataChunker.chunkStream(stream, CHUNK_SIZE))))).next(), Throwing.consumer(InputStream::close), LAZY) .retryWhen(createBucketOnRetry(bucketName)) @@ -218,15 +237,16 @@ public class S3DumbBlobStore implements DumbBlobStore, Closeable { .exponentialBackoff(FIRST_BACK_OFF, FOREVER) .withBackoffScheduler(Schedulers.elastic()) .retryMax(MAX_RETRIES) - .onRetryWithMono(retryContext -> Mono + .onRetryWithMono(retryContext -> clientPool.withPoolable(client -> Mono .fromFuture(client.createBucket(builder -> builder.bucket(bucketName.asString()))) - .onErrorResume(BucketAlreadyOwnedByYouException.class, e -> Mono.empty())); + .onErrorResume(BucketAlreadyOwnedByYouException.class, e -> Mono.empty())).next()); } @Override public Mono<Void> delete(BucketName bucketName, BlobId blobId) { - return Mono.fromFuture(() -> - client.deleteObject(delete -> delete.bucket(bucketName.asString()).key(blobId.asString()))) + return clientPool.withPoolable(client -> Mono.fromFuture(() -> + client.deleteObject(delete -> delete.bucket(bucketName.asString()).key(blobId.asString())))) + .next() .then() .onErrorResume(NoSuchBucketException.class, e -> Mono.empty()); } @@ -235,14 +255,16 @@ public class S3DumbBlobStore implements DumbBlobStore, Closeable { public Mono<Void> deleteBucket(BucketName bucketName) { return emptyBucket(bucketName) .onErrorResume(t -> Mono.just(bucketName)) - .flatMap(ignore -> Mono.fromFuture(() -> client.deleteBucket(builder -> builder.bucket(bucketName.asString())))) + .flatMap(ignore -> clientPool.withPoolable(client -> Mono.fromFuture(() -> + client.deleteBucket(builder -> builder.bucket(bucketName.asString())))) + .next()) .onErrorResume(t -> Mono.empty()) .then(); } private Mono<BucketName> emptyBucket(BucketName bucketName) { - return Mono.fromFuture(() -> client.listObjects(builder -> builder.bucket(bucketName.asString()))) - .flatMapIterable(ListObjectsResponse::contents) + return clientPool.withPoolable(client -> Mono.fromFuture(() -> client.listObjects(builder -> builder.bucket(bucketName.asString()))) + .flatMapIterable(ListObjectsResponse::contents)) .window(EMPTY_BUCKET_BATCH_SIZE) .flatMap(this::buildListForBatch) .flatMap(identifiers -> deleteObjects(bucketName, identifiers)) @@ -256,17 +278,16 @@ public class S3DumbBlobStore implements DumbBlobStore, Closeable { } private Mono<DeleteObjectsResponse> deleteObjects(BucketName bucketName, List<ObjectIdentifier> identifiers) { - return Mono.fromFuture(() -> client.deleteObjects(builder -> - builder.bucket(bucketName.asString()).delete(delete -> delete.objects(identifiers)))); + return clientPool.withPoolable(client -> Mono.fromFuture(() -> client.deleteObjects(builder -> + builder.bucket(bucketName.asString()).delete(delete -> delete.objects(identifiers))))) + .next(); } @VisibleForTesting public Mono<Void> deleteAllBuckets() { - return Mono.fromFuture(client::listBuckets) + return clientPool.withPoolable(client -> Mono.fromFuture(client::listBuckets) .flatMapIterable(ListBucketsResponse::buckets) - .flatMap(bucket -> deleteBucket(BucketName.of(bucket.name()))) + .flatMap(bucket -> deleteBucket(BucketName.of(bucket.name())))) .then(); } - - } diff --git a/server/container/guice/blob-s3-guice/src/main/java/org/apache/james/modules/objectstorage/S3BlobStoreModule.java b/server/container/guice/blob-s3-guice/src/main/java/org/apache/james/modules/objectstorage/S3BlobStoreModule.java index 7c1092c..f56892c 100644 --- a/server/container/guice/blob-s3-guice/src/main/java/org/apache/james/modules/objectstorage/S3BlobStoreModule.java +++ b/server/container/guice/blob-s3-guice/src/main/java/org/apache/james/modules/objectstorage/S3BlobStoreModule.java @@ -29,11 +29,15 @@ import org.apache.james.blob.api.BucketName; import org.apache.james.blob.objectstorage.aws.AwsS3AuthConfiguration; import org.apache.james.blob.objectstorage.aws.Region; import org.apache.james.blob.objectstorage.aws.S3BlobStoreConfiguration; +import org.apache.james.blob.objectstorage.aws.S3DumbBlobStore; import org.apache.james.modules.mailbox.ConfigurationComponent; +import org.apache.james.utils.InitializationOperation; +import org.apache.james.utils.InitilizationOperationBuilder; import org.apache.james.utils.PropertiesProvider; import com.google.inject.AbstractModule; import com.google.inject.Provides; +import com.google.inject.multibindings.ProvidesIntoSet; public class S3BlobStoreModule extends AbstractModule { @@ -66,4 +70,10 @@ public class S3BlobStoreModule extends AbstractModule { return s3BlobStoreConfiguration.getRegion(); } + @ProvidesIntoSet + InitializationOperation startS3DumbBlobStore(S3DumbBlobStore s3DumbBlobStore) { + return InitilizationOperationBuilder + .forClass(S3DumbBlobStore.class) + .init(s3DumbBlobStore::start); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
