Hi, Thanks for bringing this to our attention and for the investigation.
I have identified a potential root cause and created a JIRA ticket for it. https://issues.apache.org/jira/browse/NIFI-12836 The observed behavior, as described by you and confirmed through my debugging, aligns with a concurrency issue where the AbstractAWSProcessor::onStopped method may execute concurrently with the RecordHandlerResult::handle method, initiated by the PutDynamoDBRecord::onTrigger. This concurrency leads to premature shutdown of connection pools, triggering the reported errors. I'm curious why this concurrency issue is occurring. Best Regards, Lehel ________________________________ From: chien.lu.bt.com via users <[email protected]> Sent: Thursday, February 22, 2024 6:37 To: [email protected] <[email protected]> Subject: java.lang.IllegalStateException: Connection pool shut down for many AWS processors Hello, We are experiencing "Connection pool shut down" errors for PutDynamoDBRecord, DeleteDynamoDB, PutSQS when running in a NiFi cluster on version 2.0.0-M1 This is the stacktrace from the logs: org.apache.nifi.serialization.SplitRecordSetHandlerException: java.lang.IllegalStateException: Connection pool shut down at org.apache.nifi.processors.aws.dynamodb.PutDynamoDBRecord$DynamoDbSplitRecordSetHandler.handleChunk(PutDynamoDBRecord.java:298) at org.apache.nifi.serialization.SplitRecordSetHandler.handle(SplitRecordSetHandler.java:62) at org.apache.nifi.processors.aws.dynamodb.PutDynamoDBRecord.onTrigger(PutDynamoDBRecord.java:211) at org.apache.nifi.processors.aws.v2.AbstractAwsProcessor.onTrigger(AbstractAwsProcessor.java:178) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1274) at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:244) at org.apache.nifi.controller.scheduling.AbstractTimeBasedSchedulingAgent.lambda$doScheduleOnce$0(AbstractTimeBasedSchedulingAgent.java:59) at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) at java.base/java.lang.Thread.run(Thread.java:1583) Caused by: java.lang.IllegalStateException: Connection pool shut down at org.apache.http.util.Asserts.check(Asserts.java:34) at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection(PoolingHttpClientConnectionManager.java:269) at software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$DelegatingHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:75) at software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$InstrumentedHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:57) at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:176) at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56) at software.amazon.awssdk.http.apache.internal.impl.ApacheSdkHttpClient.execute(ApacheSdkHttpClient.java:72) at software.amazon.awssdk.http.apache.ApacheHttpClient.execute(ApacheHttpClient.java:254) at software.amazon.awssdk.http.apache.ApacheHttpClient.access$500(ApacheHttpClient.java:104) at software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:231) at software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:228) at software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java:67) at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.executeHttpRequest(MakeHttpRequestStage.java:77) at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:56) at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:39) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:72) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42) at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78) at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:52) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:37) at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:81) at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:36) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56) at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:50) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:32) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37) at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26) at software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:198) at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103) at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:171) at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:82) at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:179) at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:76) at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45) at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:56) at software.amazon.awssdk.services.sts.DefaultStsClient.assumeRoleWithWebIdentity(DefaultStsClient.java:757) at software.amazon.awssdk.services.sts.auth.StsAssumeRoleWithWebIdentityCredentialsProvider.getUpdatedCredentials(StsAssumeRoleWithWebIdentityCredentialsProvider.java:74) at software.amazon.awssdk.services.sts.auth.StsCredentialsProvider.updateSessionCredentials(StsCredentialsProvider.java:92) at software.amazon.awssdk.utils.cache.CachedSupplier.lambda$jitteredPrefetchValueSupplier$8(CachedSupplier.java:300) at software.amazon.awssdk.utils.cache.CachedSupplier$PrefetchStrategy.fetch(CachedSupplier.java:448) at software.amazon.awssdk.utils.cache.CachedSupplier.refreshCache(CachedSupplier.java:208) at software.amazon.awssdk.utils.cache.CachedSupplier.get(CachedSupplier.java:135) at software.amazon.awssdk.services.sts.auth.StsCredentialsProvider.resolveCredentials(StsCredentialsProvider.java:105) at software.amazon.awssdk.services.sts.internal.StsWebIdentityCredentialsProviderFactory$StsWebIdentityCredentialsProvider.resolveCredentials(StsWebIdentityCredentialsProviderFactory.java:109) at software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider.resolveCredentials(WebIdentityTokenFileCredentialsProvider.java:143) at software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain.resolveCredentials(AwsCredentialsProviderChain.java:90) at software.amazon.awssdk.auth.credentials.internal.LazyAwsCredentialsProvider.resolveCredentials(LazyAwsCredentialsProvider.java:45) at software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.resolveCredentials(DefaultCredentialsProvider.java:128) at software.amazon.awssdk.core.internal.util.MetricUtils.measureDuration(MetricUtils.java:54) at software.amazon.awssdk.awscore.internal.authcontext.AwsCredentialsAuthorizationStrategy.resolveCredentials(AwsCredentialsAuthorizationStrategy.java:100) at software.amazon.awssdk.awscore.internal.authcontext.AwsCredentialsAuthorizationStrategy.addCredentialsToExecutionAttributes(AwsCredentialsAuthorizationStrategy.java:77) at software.amazon.awssdk.awscore.internal.AwsExecutionContextBuilder.invokeInterceptorsAndCreateExecutionContext(AwsExecutionContextBuilder.java:125) at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.invokeInterceptorsAndCreateExecutionContext(AwsSyncClientHandler.java:69) at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:78) at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:179) at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:76) at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45) at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:56) at software.amazon.awssdk.services.dynamodb.DefaultDynamoDbClient.(DefaultDynamoDbClient.java:644) at org.apache.nifi.processors.aws.dynamodb.PutDynamoDBRecord$DynamoDbSplitRecordSetHandler.handleChunk(PutDynamoDBRecord.java:287) ... 13 common frames omitted The exception is coming from here: https://github.com/apache/httpcomponents-client/blob/4.5.14-RC1/httpclient/src/main/java/org/apache/http/impl/conn/PoolingHttpClientConnectionManager.java#L269 This can only happen after the shutdown() function gets called https://github.com/apache/httpcomponents-client/blob/4.5.14-RC1/httpclient/src/main/java/org/apache/http/impl/conn/PoolingHttpClientConnectionManager.java#L410 The PoolingHttpClientConnectionManager will be shutdown() when close() is called, this will happen when the AWS SDK DynamoDbClient (DefaultDynamoDbClient) close() is called. https://github.com/aws/aws-sdk-java/issues/2788 The affected NiFi processors extend from AbstractAwsProcessor, which contains a client cache. https://github.com/apache/nifi/blob/rel/nifi-2.0.0-M1/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AbstractAwsProcessor.java#L159 It seems likely that there is a bug in NiFi that causes the SdkClients (including DynamoDbClient) to be closed prematurely, it will then stay in the cache causing all requests afterwards to fail. One thing that looks suspicious is that connections are closed in onStopped() here (code added Oct 2023): https://github.com/apache/nifi/blob/rel/nifi-2.0.0-M1/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AbstractAwsProcessor.java#L253 Interestingly the onStopped() function in AbstractAWSCredentialsProviderProcessor which is used by InvokeAWSGatewayApi, which we do not observe this bug, does not call the SdkClient::close method: https://github.com/apache/nifi/blob/rel/nifi-2.0.0-M1/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java#L175 Wondering if the bug may have been introduced here? https://github.com/apache/nifi/commit/0fd4ec50adc13bb3458d7349cf73da98606c55b0#diff-43fd04b170c20dbbb14315ead02fb32b7cc2541df1d60e682eb54860b907950c Version 1.25.0 looks quite different: https://github.com/apache/nifi/blame/rel/nifi-1.25.0/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AbstractAwsProcessor.java#L253 Let me know if you need any more information. Thank you for the help😊 Kind regards, Chien
