[
https://issues.apache.org/jira/browse/KAFKA-10827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
lqjacklee updated KAFKA-10827:
------------------------------
Attachment: KAFKA-10827.patch
> Consumer group coordinator node never gets updated for manual partition
> assignment with infrequent requests
> -----------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-10827
> URL: https://issues.apache.org/jira/browse/KAFKA-10827
> Project: Kafka
> Issue Type: Bug
> Components: consumer
> Affects Versions: 2.2.0
> Reporter: Jaebin Yoon
> Assignee: lqjacklee
> Priority: Major
> Attachments: KAFKA-10827.patch, KAFKA-10827.patch, KAFKA-10827.patch
>
>
> We've run into a situation where the coordinator node in the consumer never
> gets updated with the new coordinator when the coordinator broker gets
> replaced with a new instance. Once the consumer gets into this mode, the
> consumer keeps trying to connect to the old coordinator and never recovers
> unless restarted.
> This happens when the consumer uses manual partition assignment and commits
> offsets very infrequently (every 5 minutes) and the coordinator broker is not
> reachable (ip address, hostname are gone in a cloud environment).
> The exception the consumer keeps getting isĀ
> {code:java}
> Offset commit failed with a retriable exception. You should retry committing
> the latest consumed offsets. Caused by:
> org.apache.kafka.common.errors.TimeoutException: Failed to send request after
> 120000 ms.
> {code}
> We could see a bunch of *SYN_SENT* tcp state from the consumer app to the old
> hostname in this error condition.
> In the current manual partition assignment scenario, the only way for the
> coordinator to gets updated is through checkAndGetCoordinator in
> AbstractCoordinator but this gets called only in committing offsets every 5
> minutes in our case.
> The current logic of checkAndGetCoordinator is using
> ConsumerNetworkClient.isUnavailable but it returns false unless the Network
> client is in reconnect backoff time, which is currently configured with
> default values (reconnect.backoff.ms (50), reconnect.backoff.max.ms (1000)
> while request.timeout.ms is 120000. In this scenario,
> ConsumerNetworkClient.isUnavailable for the old coordinator node always
> returns false, resulting in checkAndGetCoordinator keeps the old coordinator
> node forever.
> What the consumer does essentially, in this case, is that it sends one Commit
> offsets request every 5 min to the coordinator. And that request times out
> and when the consumer calls checkAndGetCoordinator in 5 min, it returns old
> coordinator again since the last attempt was more than 3 min ago (with 2 min
> request.timeout.ms) and repeats this forever.
> The current implementation assumes that there are many requests to the
> coordinator (normally through heartbeat thread, etc) to detect new
> coordinator but with this infrequent request, it never gets out of old
> coordinator.
> We had to restart the consumer to recover from this condition.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)