[ 
https://issues.apache.org/jira/browse/KAFKA-9307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah reassigned KAFKA-9307:
-----------------------------------

    Assignee: Dhruvil Shah

> Transaction coordinator could be left in unknown state after ZK session 
> timeout
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-9307
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9307
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>            Reporter: Dhruvil Shah
>            Assignee: Dhruvil Shah
>            Priority: Major
>
> We observed a case where the transaction coordinator could not load 
> transaction state from __transaction-state topic partition. Clients would 
> continue seeing COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker 
> hosting the coordinator is restarted.
> This is the sequence of events that leads to the issue:
>  * The broker is the leader of one (or more) transaction state topic 
> partitions.
>  * The broker loses its ZK session due to a network issue.
>  * Broker reestablishes session with ZK, though there are still transient 
> network issues.
>  * Broker is made follower of the transaction state topic partition it was 
> leading earlier.
>  * During the become-follower transition, the broker loses its ZK session 
> again.
>  * The become-follower transition for this broker fails in-between, leaving 
> us in a partial leader / partial follower state for the transaction topic. 
> This meant that we could not unload the transaction metadata. However, the 
> broker successfully caches the leader epoch of associated with the 
> LeaderAndIsrRequest.
> ```
> [2019-12-12 03:08:17,864] ERROR [KafkaApi-3] Error when handling request: 
> clientId=2, correlationId=1, api=LEADER_AND_ISR, ... 
> \{topic=__transaction_state,partition_states=[{... 
> {partition=41,controller_epoch=16,leader=4,leader_epoch=112,isr=[2,4,1],zk_version=208,replicas=[3,4,2,1],is_new=false}
>  ... org.apache.zookeeper.KeeperException$SessionExpiredException: 
> KeeperErrorCode = Session expired for /brokers/topics/__transaction_state at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:130) at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:54) at 
> kafka.zookeeper.AsyncResponse.resultException(ZooKeeperClient.scala:537) at 
> kafka.zk.KafkaZkClient$$anonfun$getReplicaAssignmentForTopics$1.apply(KafkaZkClient.scala:579)
>  at 
> kafka.zk.KafkaZkClient$$anonfun$getReplicaAssignmentForTopics$1.apply(KafkaZkClient.scala:574)
>  at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>  at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>  at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at 
> scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at 
> kafka.zk.KafkaZkClient.getReplicaAssignmentForTopics(KafkaZkClient.scala:574) 
> at kafka.zk.KafkaZkClient.getTopicPartitionCount(KafkaZkClient.scala:624) at 
> kafka.coordinator.transaction.TransactionStateManager.getTransactionTopicPartitionCount(TransactionStateManager.scala:279)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.validateTransactionTopicPartitionCountIsStable(TransactionStateManager.scala:465)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.removeTransactionsForTxnTopicPartition(TransactionStateManager.scala:434)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.handleTxnEmigration(TransactionCoordinator.scala:282)
>  at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$onLeadershipChange$1$2.apply(KafkaApis.scala:190)
>  at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$onLeadershipChange$1$2.apply(KafkaApis.scala:186)
>  at scala.collection.mutable.HashSet.foreach(HashSet.scala:78) at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$onLeadershipChange$1(KafkaApis.scala:186)
>  at kafka.server.KafkaApis$$anonfun$2.apply(KafkaApis.scala:202) at 
> kafka.server.KafkaApis$$anonfun$2.apply(KafkaApis.scala:202) at 
> kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:1153) 
> at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:202) 
> ```
>  * Later, when the ZK session is finally established successfully, the broker 
> ignores the become-follower transition as the leader epoch was same as the 
> one it had cached. This prevented the transaction metadata from being 
> unloaded.
>  * Because this partition was a partial follower, we had setup replica 
> fetchers. The partition continued to fetch from the leader until it was made 
> part of the ISR.
>  * Once it was part of the ISR, preferred leader election kicked in and 
> elected this broker as the leader.
>  * When processing the become-leader transition, the operation failed as we 
> already had transaction metadata loaded at a previous epoch. This meant that 
> this partition was left in the "loading" state and we thus returned 
> COORDINATOR_LOAD_IN_PROGRESS errors.
>  * Broker restart fixed this partial in-memory state and we were able to 
> resume processing for transactions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to