[ https://issues.apache.org/jira/browse/KAFKA-18355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17922495#comment-17922495 ]
Lucas Brutschy edited comment on KAFKA-18355 at 1/30/25 7:00 PM: ----------------------------------------------------------------- I injected Authentication Errors into the producer and ran some tests with low {{{}max.poll.interval.ms{}}}, trying to reproduce this in 3.8.1, but no dice. I'm not sure if it will be possible to get to the bottom of this without further information. [~ravigupta] if there is indeed a thread getting stuck here (which is the most likely explanation), it would be extremely helpful to get a thread dump using {{{}jstack{}}}. was (Author: JIRAUSER302322): I injected Authentication Errors into the producer and ran some tests with low `max.poll.interval.ms`, trying to reproduce this in 3.8.1, but no dice. I'm not sure, it will be possible to get to the bottom of this without further information. [~ravigupta] if there is indeed a thread getting stuck here (which is the most likely explanation), it would be extremely helpful to get a thread dump using `jstack`. > Stream thread blocks indefinitely for acquiring state directory lock > -------------------------------------------------------------------- > > Key: KAFKA-18355 > URL: https://issues.apache.org/jira/browse/KAFKA-18355 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.8.1 > Reporter: Ravi Gupta > Priority: Major > > We are running Kafka streams based application in production and have noticed > couple of timesĀ {*}lag on source topic partition start increasing{*}. > Based on investigation, we found the below happening: > * Thread responsible for the partition task gets Authentication exception ( > MSK IAM authentication gives the transient exception) while producing record > in the Sink > {code:java} > { > "level":"ERROR", > "logger_name":"org.apache.kafka.clients.NetworkClient", > "message":"[Producer > clientId=xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-producer, > > transactionalId=xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-3] > Connection to node 1 > (b-1.xxxxxxx.yyyyyy.c2.kafka.xx-yyyyy.amazonaws.com/xx.xx.xxx.xxxx:yyyy) > failed authentication due to: An error: > (java.security.PrivilegedActionException: javax.security.sasl.SaslException: > Failed to find AWS IAM Credentials [Caused by > com.amazonaws.AmazonServiceException: Unauthorized (Service: null; Status > Code: 401; Error Code: null; Request ID: null; Proxy: null)]) occurred when > evaluating SASL token received from the Kafka Broker. Kafka Client will go to > AUTHENTICATION_FAILED state.", > "thread_name":"kafka-producer-network-thread | > xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-producer", > "time":"2024-12-26T07:40:45.113067247Z" > } {code} > * In some cases, the system recovers when the next record is polled and the > Sink Node ( RecordCollectorImpl) throws the exception from the last message > while processing > * However, in couple of cases the following logs appears, approximately 5 > minutes after the producer failure. ( {_}N{_}{_}o additional log statement to > understand why thread stopped polling, however it seems heartbeat thread got > the same exception as producer){_}. > {code:java} > { > "level":"WARN", > "logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator", > "message":"[Consumer > clientId=xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-consumer, > groupId=xxxxx-xxxx-lall-lio-step-executor_lio-se] consumer poll timeout has > expired. This means the time between subsequent calls to poll() was longer > than the configured max.poll.interval.ms, which typically implies that the > poll loop is spending too much time processing messages. You can address this > either by increasing max.poll.interval.ms or by reducing the maximum size of > batches returned in poll() with max.poll.records.", > "thread_name":"kafka-coordinator-heartbeat-thread | > xxxxx-xxxx-lall-lio-step-executor_lio-se", > "time":"2024-12-26T07:45:43.286428901Z" > } {code} > * In such cases, the partition gets assigned to a new thread ( Thread 5), > however the new thread keep throwing the following exception: > {code:java} > { > "level":"INFO", > "logger_name":"org.apache.kafka.streams.processor.internals.TaskManager", > "message":"stream-thread > [xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5] > Encountered lock exception. Reattempting locking the state in the next > iteration.", > "stack_trace":"org.apache.kafka.streams.errors.LockException: stream-thread > [xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5] > task [8_0] Failed to lock the state directory for task 8_0\n\tat > org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:96)\n\tat > > org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258)\n\tat > > org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1010)\n\tat > > org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:997)\n\tat > > org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:911)\n\tat > > org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1188)\n\tat > > org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:996)\n\tat > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)\n\tat > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)\n", > "thread_name":"xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5", > "time":"2024-12-26T07:50:53.904374419Z" > } {code} > * We are using exception handler, however, in these failure cases our > exception handler is not called for both producer and consumer exception. > However in some authentication exception during consume/produce we see the > handler being called. > It seems that old thread didn't clean up its state: as the producer failures > are cleaned up when processing next event ( which never happened due to > consumer exception). Neither did consumer failure tried to release the lock. -- This message was sent by Atlassian Jira (v8.20.10#820010)