Hi,

Thanks for bringing this to our attention and for the investigation.

I have identified a potential root cause and created a JIRA ticket for it.

https://issues.apache.org/jira/browse/NIFI-12836

The observed behavior, as described by you and confirmed through my debugging, 
aligns with a concurrency issue where the AbstractAWSProcessor::onStopped 
method may execute concurrently with the RecordHandlerResult::handle method, 
initiated by the PutDynamoDBRecord::onTrigger. This concurrency leads to 
premature shutdown of connection pools, triggering the reported errors.

I'm curious why this concurrency issue is occurring.

Best Regards,
Lehel
________________________________
From: chien.lu.bt.com via users <[email protected]>
Sent: Thursday, February 22, 2024 6:37
To: [email protected] <[email protected]>
Subject: java.lang.IllegalStateException: Connection pool shut down for many 
AWS processors


Hello,



We are experiencing "Connection pool shut down" errors for PutDynamoDBRecord, 
DeleteDynamoDB, PutSQS when running in a NiFi cluster on version 2.0.0-M1



This is the stacktrace from the logs:



org.apache.nifi.serialization.SplitRecordSetHandlerException: 
java.lang.IllegalStateException: Connection pool shut down

        at 
org.apache.nifi.processors.aws.dynamodb.PutDynamoDBRecord$DynamoDbSplitRecordSetHandler.handleChunk(PutDynamoDBRecord.java:298)

        at 
org.apache.nifi.serialization.SplitRecordSetHandler.handle(SplitRecordSetHandler.java:62)

        at 
org.apache.nifi.processors.aws.dynamodb.PutDynamoDBRecord.onTrigger(PutDynamoDBRecord.java:211)

        at 
org.apache.nifi.processors.aws.v2.AbstractAwsProcessor.onTrigger(AbstractAwsProcessor.java:178)

        at 
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1274)

        at 
org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:244)

        at 
org.apache.nifi.controller.scheduling.AbstractTimeBasedSchedulingAgent.lambda$doScheduleOnce$0(AbstractTimeBasedSchedulingAgent.java:59)

        at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)

        at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)

        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)

        at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)

        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)

        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)

        at java.base/java.lang.Thread.run(Thread.java:1583)

Caused by: java.lang.IllegalStateException: Connection pool shut down

        at org.apache.http.util.Asserts.check(Asserts.java:34)

        at 
org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection(PoolingHttpClientConnectionManager.java:269)

        at 
software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$DelegatingHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:75)

        at 
software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$InstrumentedHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:57)

        at 
org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:176)

        at 
org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)

        at 
org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)

        at 
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)

        at 
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)

        at 
software.amazon.awssdk.http.apache.internal.impl.ApacheSdkHttpClient.execute(ApacheSdkHttpClient.java:72)

        at 
software.amazon.awssdk.http.apache.ApacheHttpClient.execute(ApacheHttpClient.java:254)

        at 
software.amazon.awssdk.http.apache.ApacheHttpClient.access$500(ApacheHttpClient.java:104)

        at 
software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:231)

        at 
software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:228)

        at 
software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java:67)

        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.executeHttpRequest(MakeHttpRequestStage.java:77)

        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:56)

        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:39)

        at 
software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)

        at 
software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)

        at 
software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)

        at 
software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)

        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:72)

        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42)

        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78)

        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40)

        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:52)

        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:37)

        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:81)

        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:36)

        at 
software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)

        at 
software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56)

        at 
software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36)

        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80)

        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60)

        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42)

        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:50)

        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:32)

        at 
software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)

        at 
software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)

        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37)

        at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26)

        at 
software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:198)

        at 
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103)

        at 
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:171)

        at 
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:82)

        at 
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:179)

        at 
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:76)

        at 
software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45)

        at 
software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:56)

        at 
software.amazon.awssdk.services.sts.DefaultStsClient.assumeRoleWithWebIdentity(DefaultStsClient.java:757)

        at 
software.amazon.awssdk.services.sts.auth.StsAssumeRoleWithWebIdentityCredentialsProvider.getUpdatedCredentials(StsAssumeRoleWithWebIdentityCredentialsProvider.java:74)

        at 
software.amazon.awssdk.services.sts.auth.StsCredentialsProvider.updateSessionCredentials(StsCredentialsProvider.java:92)

        at 
software.amazon.awssdk.utils.cache.CachedSupplier.lambda$jitteredPrefetchValueSupplier$8(CachedSupplier.java:300)

        at 
software.amazon.awssdk.utils.cache.CachedSupplier$PrefetchStrategy.fetch(CachedSupplier.java:448)

        at 
software.amazon.awssdk.utils.cache.CachedSupplier.refreshCache(CachedSupplier.java:208)

        at 
software.amazon.awssdk.utils.cache.CachedSupplier.get(CachedSupplier.java:135)

        at 
software.amazon.awssdk.services.sts.auth.StsCredentialsProvider.resolveCredentials(StsCredentialsProvider.java:105)

        at 
software.amazon.awssdk.services.sts.internal.StsWebIdentityCredentialsProviderFactory$StsWebIdentityCredentialsProvider.resolveCredentials(StsWebIdentityCredentialsProviderFactory.java:109)

        at 
software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider.resolveCredentials(WebIdentityTokenFileCredentialsProvider.java:143)

        at 
software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain.resolveCredentials(AwsCredentialsProviderChain.java:90)

        at 
software.amazon.awssdk.auth.credentials.internal.LazyAwsCredentialsProvider.resolveCredentials(LazyAwsCredentialsProvider.java:45)

        at 
software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.resolveCredentials(DefaultCredentialsProvider.java:128)

        at 
software.amazon.awssdk.core.internal.util.MetricUtils.measureDuration(MetricUtils.java:54)

        at 
software.amazon.awssdk.awscore.internal.authcontext.AwsCredentialsAuthorizationStrategy.resolveCredentials(AwsCredentialsAuthorizationStrategy.java:100)

        at 
software.amazon.awssdk.awscore.internal.authcontext.AwsCredentialsAuthorizationStrategy.addCredentialsToExecutionAttributes(AwsCredentialsAuthorizationStrategy.java:77)

        at 
software.amazon.awssdk.awscore.internal.AwsExecutionContextBuilder.invokeInterceptorsAndCreateExecutionContext(AwsExecutionContextBuilder.java:125)

        at 
software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.invokeInterceptorsAndCreateExecutionContext(AwsSyncClientHandler.java:69)

        at 
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:78)

        at 
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:179)

        at 
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:76)

        at 
software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45)

        at 
software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:56)

        at 
software.amazon.awssdk.services.dynamodb.DefaultDynamoDbClient.(DefaultDynamoDbClient.java:644)

        at 
org.apache.nifi.processors.aws.dynamodb.PutDynamoDBRecord$DynamoDbSplitRecordSetHandler.handleChunk(PutDynamoDBRecord.java:287)

        ... 13 common frames omitted





The exception is coming from here:

https://github.com/apache/httpcomponents-client/blob/4.5.14-RC1/httpclient/src/main/java/org/apache/http/impl/conn/PoolingHttpClientConnectionManager.java#L269



This can only happen after the shutdown() function gets called

https://github.com/apache/httpcomponents-client/blob/4.5.14-RC1/httpclient/src/main/java/org/apache/http/impl/conn/PoolingHttpClientConnectionManager.java#L410



The PoolingHttpClientConnectionManager will be shutdown() when close() is 
called, this will happen when the AWS SDK DynamoDbClient 
(DefaultDynamoDbClient) close() is called.

https://github.com/aws/aws-sdk-java/issues/2788



The affected NiFi processors extend from AbstractAwsProcessor, which contains a 
client cache.



https://github.com/apache/nifi/blob/rel/nifi-2.0.0-M1/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AbstractAwsProcessor.java#L159



It seems likely that there is a bug in NiFi that causes the SdkClients 
(including DynamoDbClient) to be closed prematurely, it will then stay in the 
cache causing all requests afterwards to fail.



One thing that looks suspicious is that connections are closed in onStopped() 
here (code added Oct 2023):



https://github.com/apache/nifi/blob/rel/nifi-2.0.0-M1/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AbstractAwsProcessor.java#L253



Interestingly the onStopped() function in 
AbstractAWSCredentialsProviderProcessor which is used by InvokeAWSGatewayApi, 
which we do not observe this bug, does not call the SdkClient::close method:



https://github.com/apache/nifi/blob/rel/nifi-2.0.0-M1/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java#L175



Wondering if the bug may have been introduced here?

https://github.com/apache/nifi/commit/0fd4ec50adc13bb3458d7349cf73da98606c55b0#diff-43fd04b170c20dbbb14315ead02fb32b7cc2541df1d60e682eb54860b907950c



Version 1.25.0 looks quite different:



https://github.com/apache/nifi/blame/rel/nifi-1.25.0/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AbstractAwsProcessor.java#L253



Let me know if you need any more information.

Thank you for the help😊



Kind regards,

Chien

Reply via email to