This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 9c9d5754d8fd33e0483d67397dfa5d55c9daee74 Author: Benoit Tellier <[email protected]> AuthorDate: Tue May 18 07:50:01 2021 +0700 JAMES-3028 S3Client should not be pooled Running above 900 req/s on a 3 James setup I encounter the following exception: ``` software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: Channel was closed before it could be written to. at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:98) at software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:43) at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:198) at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:194) at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:143) at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:125) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:74) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$null$0(MakeAsyncHttpRequestStage.java:104) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$executeHttpRequest$3(MakeAsyncHttpRequestStage.java:209) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.io.IOException: Channel was closed before it could be written to. at software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.tryConfigurePipeline(NettyRequestExecutor.java:220) at software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.makeRequestListener(NettyRequestExecutor.java:168) at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578) at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552) at io.netty.util.concurrent.DefaultPromise.access$200(DefaultPromise.java:35) at io.netty.util.concurrent.DefaultPromise$1.run(DefaultPromise.java:502) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ... 1 common frames omitted ``` Removing pooling yield massive improvements for blob appends. Sending emails with setMessages -50% mean time, -33% p99. I could reach throughput of 1100+ req/s without exceptions. --- .../blob/objectstorage/aws/S3BlobStoreDAO.java | 69 +++++++--------------- .../modules/objectstorage/S3BlobStoreModule.java | 12 ---- 2 files changed, 22 insertions(+), 59 deletions(-) diff --git a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java index 37f2810..d228aee 100644 --- a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java +++ b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java @@ -27,7 +27,6 @@ import java.io.InputStream; import java.nio.ByteBuffer; import java.time.Duration; import java.util.List; -import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import javax.annotation.PreDestroy; @@ -52,9 +51,6 @@ import com.google.common.io.FileBackedOutputStream; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; -import reactor.pool.InstrumentedPool; -import reactor.pool.PoolBuilder; import reactor.util.retry.RetryBackoffSpec; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; @@ -83,12 +79,11 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable { private static final int EMPTY_BUCKET_BATCH_SIZE = 1000; private static final int FILE_THRESHOLD = 1024 * 100; private static final Duration FIRST_BACK_OFF = Duration.ofMillis(100); - private static final Duration FOREVER = Duration.ofMillis(Long.MAX_VALUE); private static final boolean LAZY = false; private static final int MAX_RETRIES = 5; - private final InstrumentedPool<S3AsyncClient> clientPool; private final BucketNameResolver bucketNameResolver; + private final S3AsyncClient client; @Inject S3BlobStoreDAO(S3BlobStoreConfiguration configuration) { @@ -98,7 +93,7 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable { .pathStyleAccessEnabled(true) .build(); - Callable<S3AsyncClient> clientCreator = () -> S3AsyncClient.builder() + client = S3AsyncClient.builder() .credentialsProvider(StaticCredentialsProvider.create( AwsBasicCredentials.create(authConfiguration.getAccessKeyId(), authConfiguration.getSecretKey()))) .httpClientBuilder(NettyNioAsyncHttpClient.builder() @@ -109,27 +104,16 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable { .serviceConfiguration(pathStyleAccess) .build(); - clientPool = PoolBuilder.from(Mono.fromCallable(clientCreator)) - .acquisitionScheduler(Schedulers.elastic()) - .destroyHandler(client -> Mono.fromRunnable(client::close)) - .maxPendingAcquireUnbounded() - .sizeUnbounded() - .fifo(); - bucketNameResolver = BucketNameResolver.builder() .prefix(configuration.getBucketPrefix()) .namespace(configuration.getNamespace()) .build(); } - public void start() { - clientPool.warmup().block(); - } - @Override @PreDestroy public void close() { - clientPool.dispose(); + client.close(); } @Override @@ -150,7 +134,7 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable { } private Mono<FluxResponse> getObject(BucketName bucketName, BlobId blobId) { - return clientPool.withPoolable(client -> Mono.fromFuture(() -> + return Mono.fromFuture(() -> client.getObject( builder -> builder.bucket(bucketName.asString()).key(blobId.asString()), new AsyncResponseTransformer<GetObjectResponse, FluxResponse>() { @@ -178,8 +162,7 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable { response.flux = Flux.from(publisher); response.supportingCompletableFuture.complete(response); } - }))) - .next(); + })); } @@ -187,11 +170,10 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable { public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) { BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName); - return clientPool.withPoolable(client -> Mono.fromFuture(() -> + return Mono.fromFuture(() -> client.getObject( builder -> builder.bucket(resolvedBucketName.asString()).key(blobId.asString()), - AsyncResponseTransformer.toBytes()))) - .next() + AsyncResponseTransformer.toBytes())) .onErrorMap(NoSuchBucketException.class, e -> new ObjectNotFoundException("Bucket not found " + resolvedBucketName.asString(), e)) .onErrorMap(NoSuchKeyException.class, e -> new ObjectNotFoundException("Blob not found " + resolvedBucketName.asString(), e)) .map(BytesWrapper::asByteArray); @@ -201,11 +183,10 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable { public Mono<Void> save(BucketName bucketName, BlobId blobId, byte[] data) { BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName); - return clientPool.withPoolable(client -> Mono.fromFuture(() -> + return Mono.fromFuture(() -> client.putObject( builder -> builder.bucket(resolvedBucketName.asString()).key(blobId.asString()).contentLength((long) data.length), - AsyncRequestBody.fromBytes(data)))) - .next() + AsyncRequestBody.fromBytes(data))) .retryWhen(createBucketOnRetry(resolvedBucketName)) .then(); } @@ -233,14 +214,13 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable { BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName); return Mono.using(content::openStream, - stream -> - clientPool.withPoolable(client -> Mono.fromFuture(() -> + stream -> Mono.fromFuture(() -> client.putObject( Throwing.<PutObjectRequest.Builder>consumer( builder -> builder.bucket(resolvedBucketName.asString()).contentLength(content.size()).key(blobId.asString())) .sneakyThrow(), AsyncRequestBody.fromPublisher( - DataChunker.chunkStream(stream, CHUNK_SIZE))))).next(), + DataChunker.chunkStream(stream, CHUNK_SIZE)))), Throwing.consumer(InputStream::close), LAZY) .retryWhen(createBucketOnRetry(resolvedBucketName)) @@ -254,10 +234,8 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable { .maxAttempts(MAX_RETRIES) .doBeforeRetryAsync(retrySignal -> { if (retrySignal.failure() instanceof NoSuchBucketException) { - return clientPool.withPoolable(client -> Mono - .fromFuture(client.createBucket(builder -> builder.bucket(bucketName.asString()))) - .onErrorResume(BucketAlreadyOwnedByYouException.class, e -> Mono.empty())) - .next() + return Mono.fromFuture(client.createBucket(builder -> builder.bucket(bucketName.asString()))) + .onErrorResume(BucketAlreadyOwnedByYouException.class, e -> Mono.empty()) .then(); } else { return Mono.error(retrySignal.failure()); @@ -269,9 +247,8 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable { public Mono<Void> delete(BucketName bucketName, BlobId blobId) { BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName); - return clientPool.withPoolable(client -> Mono.fromFuture(() -> - client.deleteObject(delete -> delete.bucket(resolvedBucketName.asString()).key(blobId.asString())))) - .next() + return Mono.fromFuture(() -> + client.deleteObject(delete -> delete.bucket(resolvedBucketName.asString()).key(blobId.asString()))) .then() .onErrorResume(NoSuchBucketException.class, e -> Mono.empty()); } @@ -286,16 +263,15 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable { private Mono<Void> deleteResolvedBucket(BucketName bucketName) { return emptyBucket(bucketName) .onErrorResume(t -> Mono.just(bucketName)) - .flatMap(ignore -> clientPool.withPoolable(client -> Mono.fromFuture(() -> + .flatMap(ignore -> Mono.fromFuture(() -> client.deleteBucket(builder -> builder.bucket(bucketName.asString())))) - .next()) .onErrorResume(t -> Mono.empty()) .then(); } private Mono<BucketName> emptyBucket(BucketName bucketName) { - return clientPool.withPoolable(client -> Mono.fromFuture(() -> client.listObjects(builder -> builder.bucket(bucketName.asString()))) - .flatMapIterable(ListObjectsResponse::contents)) + return Mono.fromFuture(() -> client.listObjects(builder -> builder.bucket(bucketName.asString()))) + .flatMapIterable(ListObjectsResponse::contents) .window(EMPTY_BUCKET_BATCH_SIZE) .flatMap(this::buildListForBatch, DEFAULT_CONCURRENCY) .flatMap(identifiers -> deleteObjects(bucketName, identifiers), DEFAULT_CONCURRENCY) @@ -309,16 +285,15 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable { } private Mono<DeleteObjectsResponse> deleteObjects(BucketName bucketName, List<ObjectIdentifier> identifiers) { - return clientPool.withPoolable(client -> Mono.fromFuture(() -> client.deleteObjects(builder -> - builder.bucket(bucketName.asString()).delete(delete -> delete.objects(identifiers))))) - .next(); + return Mono.fromFuture(() -> client.deleteObjects(builder -> + builder.bucket(bucketName.asString()).delete(delete -> delete.objects(identifiers)))); } @VisibleForTesting public Mono<Void> deleteAllBuckets() { - return clientPool.withPoolable(client -> Mono.fromFuture(client::listBuckets) + return Mono.fromFuture(client::listBuckets) .flatMapIterable(ListBucketsResponse::buckets) - .flatMap(bucket -> deleteResolvedBucket(BucketName.of(bucket.name())), DEFAULT_CONCURRENCY)) + .flatMap(bucket -> deleteResolvedBucket(BucketName.of(bucket.name())), DEFAULT_CONCURRENCY) .then(); } } diff --git a/server/container/guice/blob/s3/src/main/java/org/apache/james/modules/objectstorage/S3BlobStoreModule.java b/server/container/guice/blob/s3/src/main/java/org/apache/james/modules/objectstorage/S3BlobStoreModule.java index c6494e2..c97c669 100644 --- a/server/container/guice/blob/s3/src/main/java/org/apache/james/modules/objectstorage/S3BlobStoreModule.java +++ b/server/container/guice/blob/s3/src/main/java/org/apache/james/modules/objectstorage/S3BlobStoreModule.java @@ -26,18 +26,13 @@ import javax.inject.Singleton; import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.ex.ConfigurationException; import org.apache.james.blob.objectstorage.aws.S3BlobStoreConfiguration; -import org.apache.james.blob.objectstorage.aws.S3BlobStoreDAO; import org.apache.james.modules.mailbox.ConfigurationComponent; -import org.apache.james.utils.InitializationOperation; -import org.apache.james.utils.InitilizationOperationBuilder; import org.apache.james.utils.PropertiesProvider; import com.google.inject.AbstractModule; import com.google.inject.Provides; -import com.google.inject.multibindings.ProvidesIntoSet; public class S3BlobStoreModule extends AbstractModule { - @Provides @Singleton private S3BlobStoreConfiguration getObjectStorageConfiguration(PropertiesProvider propertiesProvider) throws ConfigurationException { @@ -48,11 +43,4 @@ public class S3BlobStoreModule extends AbstractModule { throw new ConfigurationException(ConfigurationComponent.NAME + " configuration was not found"); } } - - @ProvidesIntoSet - InitializationOperation startS3BlobStoreDAO(S3BlobStoreDAO s3BlobStoreDAO) { - return InitilizationOperationBuilder - .forClass(S3BlobStoreDAO.class) - .init(s3BlobStoreDAO::start); - } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
