[ 
https://issues.apache.org/jira/browse/KAFKA-8374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bob Barrett reassigned KAFKA-8374:
----------------------------------

    Assignee: Bob Barrett

> KafkaApis.handleLeaderAndIsrRequest not robust to ZooKeeper exceptions
> ----------------------------------------------------------------------
>
>                 Key: KAFKA-8374
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8374
>             Project: Kafka
>          Issue Type: Bug
>          Components: core, offset manager
>    Affects Versions: 2.0.1
>         Environment: Linux x86_64 (Ubuntu Xenial) running on AWS EC2
>            Reporter: Mike Mintz
>            Assignee: Bob Barrett
>            Priority: Major
>
> h2. Summary of bug (theory)
> During a leader election, when a broker is transitioning from leader to 
> follower on some __consumer_offset partitions and some __transaction_state 
> partitions, it’s possible for a ZooKeeper exception to be thrown, leaving the 
> GroupMetadataManager in an inconsistent state.
>  
> In particular, in KafkaApis.handleLeaderAndIsrRequest in the 
> onLeadershipChange callback, it’s possible for 
> TransactionCoordinator.handleTxnEmigration to throw 
> ZooKeeperClientExpiredException, ending the updatedFollowers.foreach loop 
> early. If there were any __consumer_offset partitions to be handled later in 
> the loop, GroupMetadataManager will be left with stale data in its 
> groupMetadataCache. Later, when this broker resumes leadership for the 
> affected __consumer_offset partitions, it will fail to load the updated 
> groups into the cache since it uses putIfNotExists, and it will serve stale 
> offsets to consumers.
>  
> h2. Details of what we experienced
> We ran into this issue running Kafka 2.0.1 in production. Several Kafka 
> consumers received stale offsets when reconnecting to their group coordinator 
> after a leadership election on their __consumer_offsets partition. This 
> caused them to reprocess many duplicate messages.
>  
> We believe we’ve tracked down the root cause: * On 2019-04-01, we were having 
> memory pressure in ZooKeeper, and we were getting several 
> ZooKeeperClientExpiredException errors in the logs.
>  * The impacted consumers were all in __consumer_offsets-15. There was a 
> leader election on this partition, and leadership transitioned from broker 
> 1088 to broker 1069. During this leadership election, the former leader 
> (1088) saw a ZooKeeperClientExpiredException  (stack trace below). This 
> happened inside KafkaApis.handleLeaderAndIsrRequest, specifically in 
> onLeadershipChange while it was updating a __transaction_state partition. 
> Since there are no “Scheduling unloading” or “Finished unloading” log 
> messages in this period, we believe it threw this exception before getting to 
> __consumer_offsets-15, so it never got a chance to call 
> GroupCoordinator.handleGroupEmigration, which means this broker didn’t unload 
> offsets from its GroupMetadataManager.
>  * Four days later, on 2019-04-05, we manually restarted broker 1069, so 
> broker 1088 became the leader of __consumer_offsets-15 again. When it ran 
> GroupMetadataManager.loadGroup, it presumably failed to update 
> groupMetadataCache since it uses putIfNotExists, and it would have found the 
> group id already in the cache. Unfortunately we did not have debug logging 
> enabled, but I would expect to have seen a log message like "Attempt to load 
> group ${group.groupId} from log with generation ${group.generationId} failed 
> because there is already a cached group with generation 
> ${currentGroup.generationId}".
>  * After the leadership election, the impacted consumers reconnected to 
> broker 1088 and received stale offsets that correspond to the last committed 
> offsets around 2019-04-01.
>  
> h2. Relevant log/stacktrace
> {code:java}
> [2019-04-01 22:44:18.968617] [2019-04-01 22:44:18,963] ERROR [KafkaApi-1088] 
> Error when handling request 
> {controller_id=1096,controller_epoch=122,partition_states=[...,{topic=__consumer_offsets,partition=15,controller_epoch=122,leader=1069,leader_epoch=440,isr=[1092,1088,1069],zk_version=807,replicas=[1069,1088,1092],is_new=false},...],live_leaders=[{id=1069,host=10.68.42.121,port=9094}]}
>  (kafka.server.KafkaApis)
> [2019-04-01 22:44:18.968689] kafka.zookeeper.ZooKeeperClientExpiredException: 
> Session expired either before or while waiting for connection
> [2019-04-01 22:44:18.968712]         at 
> kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply$mcV$sp(ZooKeeperClient.scala:238)
> [2019-04-01 22:44:18.968736]         at 
> kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply(ZooKeeperClient.scala:226)
> [2019-04-01 22:44:18.968759]         at 
> kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply(ZooKeeperClient.scala:226)
> [2019-04-01 22:44:18.968776]         at 
> kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
> [2019-04-01 22:44:18.968804]         at 
> kafka.zookeeper.ZooKeeperClient.kafka$zookeeper$ZooKeeperClient$$waitUntilConnected(ZooKeeperClient.scala:226)
> [2019-04-01 22:44:18.968836]         at 
> kafka.zookeeper.ZooKeeperClient$$anonfun$waitUntilConnected$1.apply$mcV$sp(ZooKeeperClient.scala:220)
> [2019-04-01 22:44:18.968863]         at 
> kafka.zookeeper.ZooKeeperClient$$anonfun$waitUntilConnected$1.apply(ZooKeeperClient.scala:220)
> [2019-04-01 22:44:18.968891]         at 
> kafka.zookeeper.ZooKeeperClient$$anonfun$waitUntilConnected$1.apply(ZooKeeperClient.scala:220)
> [2019-04-01 22:44:18.968941]         at 
> kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
> [2019-04-01 22:44:18.968972]         at 
> kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:219)
> [2019-04-01 22:44:18.968997]         at 
> kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1510)
> [2019-04-01 22:44:18.969020]         at 
> kafka.zk.KafkaZkClient.getReplicaAssignmentForTopics(KafkaZkClient.scala:463)
> [2019-04-01 22:44:18.969062]         at 
> kafka.zk.KafkaZkClient.getTopicPartitionCount(KafkaZkClient.scala:514)
> [2019-04-01 22:44:18.969118]         at 
> kafka.coordinator.transaction.TransactionStateManager.getTransactionTopicPartitionCount(TransactionStateManager.scala:280)
> [2019-04-01 22:44:18.969168]         at 
> kafka.coordinator.transaction.TransactionStateManager.validateTransactionTopicPartitionCountIsStable(TransactionStateManager.scala:466)
> [2019-04-01 22:44:18.969206]         at 
> kafka.coordinator.transaction.TransactionStateManager.removeTransactionsForTxnTopicPartition(TransactionStateManager.scala:435)
> [2019-04-01 22:44:18.969239]         at 
> kafka.coordinator.transaction.TransactionCoordinator.handleTxnEmigration(TransactionCoordinator.scala:282)
> [2019-04-01 22:44:18.969266]         at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$onLeadershipChange$1$2.apply(KafkaApis.scala:180)
> [2019-04-01 22:44:18.969293]         at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$onLeadershipChange$1$2.apply(KafkaApis.scala:176)
> [2019-04-01 22:44:18.969316]         at 
> scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
> [2019-04-01 22:44:18.969341]         at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$onLeadershipChange$1(KafkaApis.scala:176)
> [2019-04-01 22:44:18.969361]         at 
> kafka.server.KafkaApis$$anonfun$4.apply(KafkaApis.scala:185)
> [2019-04-01 22:44:18.969383]         at 
> kafka.server.KafkaApis$$anonfun$4.apply(KafkaApis.scala:185)
> [2019-04-01 22:44:18.969412]         at 
> kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:1117)
> [2019-04-01 22:44:18.969435]         at 
> kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:185)
> [2019-04-01 22:44:18.969454]         at 
> kafka.server.KafkaApis.handle(KafkaApis.scala:110)
> [2019-04-01 22:44:18.969476]         at 
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
> [2019-04-01 22:44:18.969513]         at java.lang.Thread.run(Thread.java:748)
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to