Dhruvil Shah created KAFKA-9307:
-----------------------------------

             Summary: Transaction coordinator could be left in unknown state 
after ZK session timeout
                 Key: KAFKA-9307
                 URL: https://issues.apache.org/jira/browse/KAFKA-9307
             Project: Kafka
          Issue Type: Bug
          Components: core
            Reporter: Dhruvil Shah


We observed a case where the transaction coordinator could not load transaction 
state from __transaction-state topic partition. Clients would continue seeingĀ 
COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker hosting the 
coordinator is restarted.

This is the sequence of events that leads to the issue:
 * The broker is the leader of one (or more) transaction state topic partitions.
 * The broker loses its ZK session due to a network issue.

 * Broker reestablishes session with ZK, though there are still transient 
network issues.
 * Broker is made follower of the transaction state topic partition it was 
leading earlier.

 * During the become-follower transition, the broker loses its ZK session again.
 * The become-follower transition for this broker fails in-between, leaving us 
in a partial leader / partial follower state for the transaction topic. This 
meant that we could not unload the transaction metadata. However, the broker 
successfully caches the leader epoch of associated with the LeaderAndIsrRequest.

```
[2019-12-12 03:08:17,864] ERROR [KafkaApi-3] Error when handling request: 
clientId=2, correlationId=1, api=LEADER_AND_ISR, ... 
\{topic=__transaction_state,partition_states=[{... 
{partition=41,controller_epoch=16,leader=4,leader_epoch=112,isr=[2,4,1],zk_version=208,replicas=[3,4,2,1],is_new=false}
 ... org.apache.zookeeper.KeeperException$SessionExpiredException: 
KeeperErrorCode = Session expired for /brokers/topics/__transaction_state at 
org.apache.zookeeper.KeeperException.create(KeeperException.java:130) at 
org.apache.zookeeper.KeeperException.create(KeeperException.java:54) at 
kafka.zookeeper.AsyncResponse.resultException(ZooKeeperClient.scala:537) at 
kafka.zk.KafkaZkClient$$anonfun$getReplicaAssignmentForTopics$1.apply(KafkaZkClient.scala:579)
 at 
kafka.zk.KafkaZkClient$$anonfun$getReplicaAssignmentForTopics$1.apply(KafkaZkClient.scala:574)
 at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
 at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at 
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at 
scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at 
kafka.zk.KafkaZkClient.getReplicaAssignmentForTopics(KafkaZkClient.scala:574) 
at kafka.zk.KafkaZkClient.getTopicPartitionCount(KafkaZkClient.scala:624) at 
kafka.coordinator.transaction.TransactionStateManager.getTransactionTopicPartitionCount(TransactionStateManager.scala:279)
 at 
kafka.coordinator.transaction.TransactionStateManager.validateTransactionTopicPartitionCountIsStable(TransactionStateManager.scala:465)
 at 
kafka.coordinator.transaction.TransactionStateManager.removeTransactionsForTxnTopicPartition(TransactionStateManager.scala:434)
 at 
kafka.coordinator.transaction.TransactionCoordinator.handleTxnEmigration(TransactionCoordinator.scala:282)
 at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$onLeadershipChange$1$2.apply(KafkaApis.scala:190)
 at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$onLeadershipChange$1$2.apply(KafkaApis.scala:186)
 at scala.collection.mutable.HashSet.foreach(HashSet.scala:78) at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$onLeadershipChange$1(KafkaApis.scala:186)
 at kafka.server.KafkaApis$$anonfun$2.apply(KafkaApis.scala:202) at 
kafka.server.KafkaApis$$anonfun$2.apply(KafkaApis.scala:202) at 
kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:1153) 
at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:202) 
```

 * Later, when the ZK session is finally established successfully, the broker 
ignores the become-follower transition as the leader epoch was same as the one 
it had cached. This prevented the transaction metadata from being unloaded.

 * Because this partition was a partial follower, we had setup replica 
fetchers. The partition continued to fetch from the leader until it was made 
part of the ISR.

 * Once it was part of the ISR, preferred leader election kicked in and elected 
this broker as the leader.

 * When processing the become-leader transition, the operation failed as we 
already had transaction metadata loaded at a previous epoch. This meant that 
this partition was left in the "loading" state and we thus returned 
COORDINATOR_LOAD_IN_PROGRESS errors.

 * Broker restart fixed this partial in-memory state and we were able to resume 
processing for transactions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to