ShivsundarR opened a new pull request, #19886:
URL: https://github.com/apache/kafka/pull/19886

   https://issues.apache.org/jira/browse/KAFKA-17853  - 
   
   - There is an issue with the console share consumer where if the broker is 
unavailable, even after force terminating using ctrl-c, the consumer does not 
shut down immediately. It takes around ~30 seconds to close once the broker 
shuts down.
   
   - The console consumer on the other hand, was supposedly shutting down 
immediately once we press ctrl-c. On reproducing the issue with a local kafka 
server, I observed the issue was present in both the console consumer and the 
console share consumer.
   
   Issue : 
   - On seeing the client debug logs, this issue seemed related to network 
thread sending repeated `FindCoordinator` requests until the timer expired. 
This was happening in both the console-consumer and console-share-consumer.
   
   - Debug logs showed that when the broker is shut down, the heartbeat fails 
with a `DisconnectException`(which is retriable), this triggers a 
`findCoordinator` request on the network thread which retries until the default 
timeout expires. 
   
   - This request is sent even before we trigger a close on the consumer, so 
once we press ctrl-c, although the `ConsumerNetworkThread::close()` is 
triggered, it waits for the default timeout until all the requests are sent out 
for a graceful shutdown.
   
   PR aims to fix this issue by adding a check in `NetworkClientDelegate` to 
remove any pending unsent requests(with empty node values) during close. 
   This would avoid unnecessary retries and the consumers would shut down 
immediately upon termination.
   
   Share consumers shutting down after the fix.
   ```
   [2025-06-03 16:23:42,175] DEBUG [ShareConsumer 
clientId=console-share-consumer, groupId=console-share-consumer] Removing 
unsent request 
UnsentRequest{requestBuilder=FindCoordinatorRequestData(key='console-share-consumer',
 keyType=0, coordinatorKeys=[]), 
handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@2b351de8,
 node=Optional.empty, remainingMs=28565} because the client is closing 
(org.apache.kafka.clients.consumer.internals.NetworkClientDelegate)
   [2025-06-03 16:23:42,175] DEBUG [ShareConsumer 
clientId=console-share-consumer, groupId=console-share-consumer] 
FindCoordinator request failed due to retriable exception 
(org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager)
   org.apache.kafka.common.errors.NetworkException: The server disconnected 
before a response was received.
   [2025-06-03 16:23:42,176] DEBUG [ShareConsumer 
clientId=console-share-consumer, groupId=console-share-consumer] Closing 
RequestManagers (org.apache.kafka.clients.consumer.internals.RequestManagers)
   [2025-06-03 16:23:42,177] DEBUG [ShareConsumer 
clientId=console-share-consumer, groupId=console-share-consumer] 
RequestManagers has been closed 
(org.apache.kafka.clients.consumer.internals.RequestManagers)
   [2025-06-03 16:23:42,179] DEBUG [ShareConsumer 
clientId=console-share-consumer, groupId=console-share-consumer] Closed the 
consumer network thread 
(org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread)
   [2025-06-03 16:23:42,181] DEBUG [ShareConsumer 
clientId=console-share-consumer, groupId=console-share-consumer] Kafka share 
consumer has been closed 
(org.apache.kafka.clients.consumer.internals.ShareConsumerImpl)
   Processed a total of 0 messages
   ```
   
   Regular consumers shutting down after the fix.
   
   ```
   [2025-06-03 16:24:27,196] DEBUG [Consumer clientId=console-consumer, 
groupId=console-consumer-5671] Removing unsent request 
UnsentRequest{requestBuilder=FindCoordinatorRequestData(key='console-consumer-5671',
 keyType=0, coordinatorKeys=[]), 
handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@3770591b,
 node=Optional.empty, remainingMs=29160} because the client is closing 
(org.apache.kafka.clients.consumer.internals.NetworkClientDelegate)
   [2025-06-03 16:24:27,196] DEBUG [Consumer clientId=console-consumer, 
groupId=console-consumer-5671] FindCoordinator request failed due to retriable 
exception 
(org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager)
   org.apache.kafka.common.errors.NetworkException: The server disconnected 
before a response was received.
   [2025-06-03 16:24:27,197] DEBUG [Consumer clientId=console-consumer, 
groupId=console-consumer-5671] Closing RequestManagers 
(org.apache.kafka.clients.consumer.internals.RequestManagers)
   [2025-06-03 16:24:27,197] DEBUG [Consumer clientId=console-consumer, 
groupId=console-consumer-5671] Removing test-topic-23-0 from buffered fetch 
data as it is not in the set of partitions to retain ([]) 
(org.apache.kafka.clients.consumer.internals.FetchBuffer)
   [2025-06-03 16:24:27,197] DEBUG [Consumer clientId=console-consumer, 
groupId=console-consumer-5671] RequestManagers has been closed 
(org.apache.kafka.clients.consumer.internals.RequestManagers)
   [2025-06-03 16:24:27,200] DEBUG [Consumer clientId=console-consumer, 
groupId=console-consumer-5671] Closed the consumer network thread 
(org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread)
   [2025-06-03 16:24:27,202] DEBUG [Consumer clientId=console-consumer, 
groupId=console-consumer-5671] Kafka consumer has been closed 
(org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer)
   Processed a total of 0 messages
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to