ShivsundarR opened a new pull request, #19886: URL: https://github.com/apache/kafka/pull/19886
https://issues.apache.org/jira/browse/KAFKA-17853 - - There is an issue with the console share consumer where if the broker is unavailable, even after force terminating using ctrl-c, the consumer does not shut down immediately. It takes around ~30 seconds to close once the broker shuts down. - The console consumer on the other hand, was supposedly shutting down immediately once we press ctrl-c. On reproducing the issue with a local kafka server, I observed the issue was present in both the console consumer and the console share consumer. Issue : - On seeing the client debug logs, this issue seemed related to network thread sending repeated `FindCoordinator` requests until the timer expired. This was happening in both the console-consumer and console-share-consumer. - Debug logs showed that when the broker is shut down, the heartbeat fails with a `DisconnectException`(which is retriable), this triggers a `findCoordinator` request on the network thread which retries until the default timeout expires. - This request is sent even before we trigger a close on the consumer, so once we press ctrl-c, although the `ConsumerNetworkThread::close()` is triggered, it waits for the default timeout until all the requests are sent out for a graceful shutdown. PR aims to fix this issue by adding a check in `NetworkClientDelegate` to remove any pending unsent requests(with empty node values) during close. This would avoid unnecessary retries and the consumers would shut down immediately upon termination. Share consumers shutting down after the fix. ``` [2025-06-03 16:23:42,175] DEBUG [ShareConsumer clientId=console-share-consumer, groupId=console-share-consumer] Removing unsent request UnsentRequest{requestBuilder=FindCoordinatorRequestData(key='console-share-consumer', keyType=0, coordinatorKeys=[]), handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@2b351de8, node=Optional.empty, remainingMs=28565} because the client is closing (org.apache.kafka.clients.consumer.internals.NetworkClientDelegate) [2025-06-03 16:23:42,175] DEBUG [ShareConsumer clientId=console-share-consumer, groupId=console-share-consumer] FindCoordinator request failed due to retriable exception (org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager) org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received. [2025-06-03 16:23:42,176] DEBUG [ShareConsumer clientId=console-share-consumer, groupId=console-share-consumer] Closing RequestManagers (org.apache.kafka.clients.consumer.internals.RequestManagers) [2025-06-03 16:23:42,177] DEBUG [ShareConsumer clientId=console-share-consumer, groupId=console-share-consumer] RequestManagers has been closed (org.apache.kafka.clients.consumer.internals.RequestManagers) [2025-06-03 16:23:42,179] DEBUG [ShareConsumer clientId=console-share-consumer, groupId=console-share-consumer] Closed the consumer network thread (org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread) [2025-06-03 16:23:42,181] DEBUG [ShareConsumer clientId=console-share-consumer, groupId=console-share-consumer] Kafka share consumer has been closed (org.apache.kafka.clients.consumer.internals.ShareConsumerImpl) Processed a total of 0 messages ``` Regular consumers shutting down after the fix. ``` [2025-06-03 16:24:27,196] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-5671] Removing unsent request UnsentRequest{requestBuilder=FindCoordinatorRequestData(key='console-consumer-5671', keyType=0, coordinatorKeys=[]), handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@3770591b, node=Optional.empty, remainingMs=29160} because the client is closing (org.apache.kafka.clients.consumer.internals.NetworkClientDelegate) [2025-06-03 16:24:27,196] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-5671] FindCoordinator request failed due to retriable exception (org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager) org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received. [2025-06-03 16:24:27,197] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-5671] Closing RequestManagers (org.apache.kafka.clients.consumer.internals.RequestManagers) [2025-06-03 16:24:27,197] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-5671] Removing test-topic-23-0 from buffered fetch data as it is not in the set of partitions to retain ([]) (org.apache.kafka.clients.consumer.internals.FetchBuffer) [2025-06-03 16:24:27,197] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-5671] RequestManagers has been closed (org.apache.kafka.clients.consumer.internals.RequestManagers) [2025-06-03 16:24:27,200] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-5671] Closed the consumer network thread (org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread) [2025-06-03 16:24:27,202] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-5671] Kafka consumer has been closed (org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer) Processed a total of 0 messages ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org