Hello,

We are experiencing "Connection pool shut down" errors for PutDynamoDBRecord, 
DeleteDynamoDB, PutSQS when running in a NiFi cluster on version 2.0.0-M1

This is the stacktrace from the logs:

org.apache.nifi.serialization.SplitRecordSetHandlerException: 
java.lang.IllegalStateException: Connection pool shut down
        at 
org.apache.nifi.processors.aws.dynamodb.PutDynamoDBRecord$DynamoDbSplitRecordSetHandler.handleChunk(PutDynamoDBRecord.java:298)
        at 
org.apache.nifi.serialization.SplitRecordSetHandler.handle(SplitRecordSetHandler.java:62)
        at 
org.apache.nifi.processors.aws.dynamodb.PutDynamoDBRecord.onTrigger(PutDynamoDBRecord.java:211)
        at 
org.apache.nifi.processors.aws.v2.AbstractAwsProcessor.onTrigger(AbstractAwsProcessor.java:178)
        at 
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1274)
        at 
org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:244)
        at 
org.apache.nifi.controller.scheduling.AbstractTimeBasedSchedulingAgent.lambda$doScheduleOnce$0(AbstractTimeBasedSchedulingAgent.java:59)
        at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
        at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
        at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
        at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.lang.IllegalStateException: Connection pool shut down
        at org.apache.http.util.Asserts.check(Asserts.java:34)
        at 
org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection(PoolingHttpClientConnectionManager.java:269)
        at 
software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$DelegatingHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:75)
        at 
software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$InstrumentedHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:57)
        at 
org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:176)
        at 
org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
        at 
org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
        at 
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
        at 
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
        at 
software.amazon.awssdk.http.apache.internal.impl.ApacheSdkHttpClient.execute(ApacheSdkHttpClient.java:72)
        at 
software.amazon.awssdk.http.apache.ApacheHttpClient.execute(ApacheHttpClient.java:254)
        at 
software.amazon.awssdk.http.apache.ApacheHttpClient.access$500(ApacheHttpClient.java:104)
        at 
software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:231)
        at 
software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:228)
        at 
software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java:67)
        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.executeHttpRequest(MakeHttpRequestStage.java:77)
        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:56)
        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:39)
        at 
software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
        at 
software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
        at 
software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
        at 
software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:72)
        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42)
        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78)
        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40)
        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:52)
        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:37)
        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:81)
        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:36)
        at 
software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
        at 
software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56)
        at 
software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36)
        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80)
        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60)
        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42)
        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:50)
        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:32)
        at 
software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
        at 
software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37)
        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26)
        at 
software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:198)
        at 
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103)
        at 
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:171)
        at 
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:82)
        at 
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:179)
        at 
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:76)
        at 
software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45)
        at 
software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:56)
        at 
software.amazon.awssdk.services.sts.DefaultStsClient.assumeRoleWithWebIdentity(DefaultStsClient.java:757)
        at 
software.amazon.awssdk.services.sts.auth.StsAssumeRoleWithWebIdentityCredentialsProvider.getUpdatedCredentials(StsAssumeRoleWithWebIdentityCredentialsProvider.java:74)
        at 
software.amazon.awssdk.services.sts.auth.StsCredentialsProvider.updateSessionCredentials(StsCredentialsProvider.java:92)
        at 
software.amazon.awssdk.utils.cache.CachedSupplier.lambda$jitteredPrefetchValueSupplier$8(CachedSupplier.java:300)
        at 
software.amazon.awssdk.utils.cache.CachedSupplier$PrefetchStrategy.fetch(CachedSupplier.java:448)
        at 
software.amazon.awssdk.utils.cache.CachedSupplier.refreshCache(CachedSupplier.java:208)
        at 
software.amazon.awssdk.utils.cache.CachedSupplier.get(CachedSupplier.java:135)
        at 
software.amazon.awssdk.services.sts.auth.StsCredentialsProvider.resolveCredentials(StsCredentialsProvider.java:105)
        at 
software.amazon.awssdk.services.sts.internal.StsWebIdentityCredentialsProviderFactory$StsWebIdentityCredentialsProvider.resolveCredentials(StsWebIdentityCredentialsProviderFactory.java:109)
        at 
software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider.resolveCredentials(WebIdentityTokenFileCredentialsProvider.java:143)
        at 
software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain.resolveCredentials(AwsCredentialsProviderChain.java:90)
        at 
software.amazon.awssdk.auth.credentials.internal.LazyAwsCredentialsProvider.resolveCredentials(LazyAwsCredentialsProvider.java:45)
        at 
software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.resolveCredentials(DefaultCredentialsProvider.java:128)
        at 
software.amazon.awssdk.core.internal.util.MetricUtils.measureDuration(MetricUtils.java:54)
        at 
software.amazon.awssdk.awscore.internal.authcontext.AwsCredentialsAuthorizationStrategy.resolveCredentials(AwsCredentialsAuthorizationStrategy.java:100)
        at 
software.amazon.awssdk.awscore.internal.authcontext.AwsCredentialsAuthorizationStrategy.addCredentialsToExecutionAttributes(AwsCredentialsAuthorizationStrategy.java:77)
        at 
software.amazon.awssdk.awscore.internal.AwsExecutionContextBuilder.invokeInterceptorsAndCreateExecutionContext(AwsExecutionContextBuilder.java:125)
        at 
software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.invokeInterceptorsAndCreateExecutionContext(AwsSyncClientHandler.java:69)
        at 
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:78)
        at 
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:179)
        at 
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:76)
        at 
software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45)
        at 
software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:56)
        at 
software.amazon.awssdk.services.dynamodb.DefaultDynamoDbClient.(DefaultDynamoDbClient.java:644)
        at 
org.apache.nifi.processors.aws.dynamodb.PutDynamoDBRecord$DynamoDbSplitRecordSetHandler.handleChunk(PutDynamoDBRecord.java:287)
        ... 13 common frames omitted


The exception is coming from here:
https://github.com/apache/httpcomponents-client/blob/4.5.14-RC1/httpclient/src/main/java/org/apache/http/impl/conn/PoolingHttpClientConnectionManager.java#L269

This can only happen after the shutdown() function gets called
https://github.com/apache/httpcomponents-client/blob/4.5.14-RC1/httpclient/src/main/java/org/apache/http/impl/conn/PoolingHttpClientConnectionManager.java#L410

The PoolingHttpClientConnectionManager will be shutdown() when close() is 
called, this will happen when the AWS SDK DynamoDbClient 
(DefaultDynamoDbClient) close() is called.
https://github.com/aws/aws-sdk-java/issues/2788

The affected NiFi processors extend from AbstractAwsProcessor, which contains a 
client cache.

https://github.com/apache/nifi/blob/rel/nifi-2.0.0-M1/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AbstractAwsProcessor.java#L159

It seems likely that there is a bug in NiFi that causes the SdkClients 
(including DynamoDbClient) to be closed prematurely, it will then stay in the 
cache causing all requests afterwards to fail.

One thing that looks suspicious is that connections are closed in onStopped() 
here (code added Oct 2023):

https://github.com/apache/nifi/blob/rel/nifi-2.0.0-M1/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AbstractAwsProcessor.java#L253

Interestingly the onStopped() function in 
AbstractAWSCredentialsProviderProcessor which is used by InvokeAWSGatewayApi, 
which we do not observe this bug, does not call the SdkClient::close method:

https://github.com/apache/nifi/blob/rel/nifi-2.0.0-M1/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java#L175

Wondering if the bug may have been introduced here?
https://github.com/apache/nifi/commit/0fd4ec50adc13bb3458d7349cf73da98606c55b0#diff-43fd04b170c20dbbb14315ead02fb32b7cc2541df1d60e682eb54860b907950c

Version 1.25.0 looks quite different:

https://github.com/apache/nifi/blame/rel/nifi-1.25.0/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AbstractAwsProcessor.java#L253

Let me know if you need any more information.
Thank you for the help😊

Kind regards,
Chien

Reply via email to