[ https://issues.apache.org/jira/browse/KAFKA-9307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dhruvil Shah updated KAFKA-9307: -------------------------------- Description: We observed a case where the transaction coordinator could not load transaction state from __transaction-state topic partition. Clients would continue seeing COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker hosting the coordinator is restarted. This is the sequence of events that leads to the issue: * The broker is the leader of one (or more) transaction state topic partitions. * The broker loses its ZK session due to a network issue. * Broker reestablishes session with ZK, though there are still transient network issues. * Broker is made follower of the transaction state topic partition it was leading earlier. * During the become-follower transition, the broker loses its ZK session again. * The become-follower transition for this broker fails in-between, leaving us in a partial leader / partial follower state for the transaction topic. This meant that we could not unload the transaction metadata. However, the broker successfully caches the leader epoch of associated with the LeaderAndIsrRequest. * Later, when the ZK session is finally established successfully, the broker ignores the become-follower transition as the leader epoch was same as the one it had cached. This prevented the transaction metadata from being unloaded. * Because this partition was a partial follower, we had setup replica fetchers. The partition continued to fetch from the leader until it was made part of the ISR. * Once it was part of the ISR, preferred leader election kicked in and elected this broker as the leader. * When processing the become-leader transition, the operation failed as we already had transaction metadata loaded at a previous epoch. ``` 2019-12-12 03:11:56,320] ERROR [Transaction State Manager 3]: The metadata cache for txn partition 41 has already exist with epoch 111 and 6 entries while trying to add to it; this should not happen (kafka.coordinator.transaction.TransactionStateManager) [2019-12-12 03:11:56,320] ERROR Uncaught exception in scheduled task 'load-txns-for-partition-__transaction_state-41' (kafka.utils.KafkaScheduler) java.lang.IllegalStateException: The metadata cache for txn partition 41 has already exist with epoch 111 and 6 entries while trying to add to it; this should not happen at kafka.coordinator.transaction.TransactionStateManager.addLoadedTransactionsToCache(TransactionStateManager.scala:369) at kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply$mcV$sp(TransactionStateManager.scala:394) at kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply(TransactionStateManager.scala:393) at kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply(TransactionStateManager.scala:393) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:259) at kafka.coordinator.transaction.TransactionStateManager.kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1(TransactionStateManager.scala:392) at kafka.coordinator.transaction.TransactionStateManager$$anonfun$loadTransactionsForTxnTopicPartition$2.apply$mcV$sp(TransactionStateManager.scala:426) at kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:114) at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java) at java.lang.Thread.run(Thread.java:748) ``` * This meant that this partition was left in the "loading" state and we thus returned COORDINATOR_LOAD_IN_PROGRESS errors. * Broker restart fixed this partial in-memory state and we were able to resume processing for transactions. was: We observed a case where the transaction coordinator could not load transaction state from __transaction-state topic partition. Clients would continue seeing COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker hosting the coordinator is restarted. This is the sequence of events that leads to the issue: * The broker is the leader of one (or more) transaction state topic partitions. * The broker loses its ZK session due to a network issue. * Broker reestablishes session with ZK, though there are still transient network issues. * Broker is made follower of the transaction state topic partition it was leading earlier. * During the become-follower transition, the broker loses its ZK session again. * The become-follower transition for this broker fails in-between, leaving us in a partial leader / partial follower state for the transaction topic. This meant that we could not unload the transaction metadata. However, the broker successfully caches the leader epoch of associated with the LeaderAndIsrRequest. ``` [2019-12-12 03:08:17,864] ERROR [KafkaApi-3] Error when handling request: clientId=2, correlationId=1, api=LEADER_AND_ISR, ... {topic=__transaction_state,partition_states=[{... {partition=41,controller_epoch=16,leader=4,leader_epoch=112,isr=[2,4,1],zk_version=208,replicas=[3,4,2,1],is_new=false} ... org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = Session expired for /brokers/topics/__transaction_state at org.apache.zookeeper.KeeperException.create(KeeperException.java:130) at org.apache.zookeeper.KeeperException.create(KeeperException.java:54) at kafka.zookeeper.AsyncResponse.resultException(ZooKeeperClient.scala:537) at kafka.zk.KafkaZkClient$$anonfun$getReplicaAssignmentForTopics$1.apply(KafkaZkClient.scala:579) at kafka.zk.KafkaZkClient$$anonfun$getReplicaAssignmentForTopics$1.apply(KafkaZkClient.scala:574) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at kafka.zk.KafkaZkClient.getReplicaAssignmentForTopics(KafkaZkClient.scala:574) at kafka.zk.KafkaZkClient.getTopicPartitionCount(KafkaZkClient.scala:624) at kafka.coordinator.transaction.TransactionStateManager.getTransactionTopicPartitionCount(TransactionStateManager.scala:279) at kafka.coordinator.transaction.TransactionStateManager.validateTransactionTopicPartitionCountIsStable(TransactionStateManager.scala:465) at kafka.coordinator.transaction.TransactionStateManager.removeTransactionsForTxnTopicPartition(TransactionStateManager.scala:434) at kafka.coordinator.transaction.TransactionCoordinator.handleTxnEmigration(TransactionCoordinator.scala:282) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$onLeadershipChange$1$2.apply(KafkaApis.scala:190) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$onLeadershipChange$1$2.apply(KafkaApis.scala:186) at scala.collection.mutable.HashSet.foreach(HashSet.scala:78) at kafka.server.KafkaApis.kafka$server$KafkaApis$$onLeadershipChange$1(KafkaApis.scala:186) at kafka.server.KafkaApis$$anonfun$2.apply(KafkaApis.scala:202) at kafka.server.KafkaApis$$anonfun$2.apply(KafkaApis.scala:202) at kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:1153) at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:202) ``` * Later, when the ZK session is finally established successfully, the broker ignores the become-follower transition as the leader epoch was same as the one it had cached. This prevented the transaction metadata from being unloaded. * Because this partition was a partial follower, we had setup replica fetchers. The partition continued to fetch from the leader until it was made part of the ISR. * Once it was part of the ISR, preferred leader election kicked in and elected this broker as the leader. * When processing the become-leader transition, the operation failed as we already had transaction metadata loaded at a previous epoch. ``` 2019-12-12 03:11:56,320] ERROR [Transaction State Manager 3]: The metadata cache for txn partition 41 has already exist with epoch 111 and 6 entries while trying to add to it; this should not happen (kafka.coordinator.transaction.TransactionStateManager) [2019-12-12 03:11:56,320] ERROR Uncaught exception in scheduled task 'load-txns-for-partition-__transaction_state-41' (kafka.utils.KafkaScheduler) java.lang.IllegalStateException: The metadata cache for txn partition 41 has already exist with epoch 111 and 6 entries while trying to add to it; this should not happen at kafka.coordinator.transaction.TransactionStateManager.addLoadedTransactionsToCache(TransactionStateManager.scala:369) at kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply$mcV$sp(TransactionStateManager.scala:394) at kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply(TransactionStateManager.scala:393) at kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply(TransactionStateManager.scala:393) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:259) at kafka.coordinator.transaction.TransactionStateManager.kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1(TransactionStateManager.scala:392) at kafka.coordinator.transaction.TransactionStateManager$$anonfun$loadTransactionsForTxnTopicPartition$2.apply$mcV$sp(TransactionStateManager.scala:426) at kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:114) at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java) at java.lang.Thread.run(Thread.java:748) ``` * This meant that this partition was left in the "loading" state and we thus returned COORDINATOR_LOAD_IN_PROGRESS errors. * Broker restart fixed this partial in-memory state and we were able to resume processing for transactions. > Transaction coordinator could be left in unknown state after ZK session > timeout > ------------------------------------------------------------------------------- > > Key: KAFKA-9307 > URL: https://issues.apache.org/jira/browse/KAFKA-9307 > Project: Kafka > Issue Type: Bug > Components: core > Reporter: Dhruvil Shah > Assignee: Dhruvil Shah > Priority: Major > > We observed a case where the transaction coordinator could not load > transaction state from __transaction-state topic partition. Clients would > continue seeing COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker > hosting the coordinator is restarted. > This is the sequence of events that leads to the issue: > * The broker is the leader of one (or more) transaction state topic > partitions. > * The broker loses its ZK session due to a network issue. > * Broker reestablishes session with ZK, though there are still transient > network issues. > * Broker is made follower of the transaction state topic partition it was > leading earlier. > * During the become-follower transition, the broker loses its ZK session > again. > * The become-follower transition for this broker fails in-between, leaving > us in a partial leader / partial follower state for the transaction topic. > This meant that we could not unload the transaction metadata. However, the > broker successfully caches the leader epoch of associated with the > LeaderAndIsrRequest. > > * Later, when the ZK session is finally established successfully, the broker > ignores the become-follower transition as the leader epoch was same as the > one it had cached. This prevented the transaction metadata from being > unloaded. > * Because this partition was a partial follower, we had setup replica > fetchers. The partition continued to fetch from the leader until it was made > part of the ISR. > * Once it was part of the ISR, preferred leader election kicked in and > elected this broker as the leader. > * When processing the become-leader transition, the operation failed as we > already had transaction metadata loaded at a previous epoch. > ``` > 2019-12-12 03:11:56,320] ERROR [Transaction State Manager 3]: The metadata > cache for txn partition 41 has already exist with epoch 111 and 6 entries > while trying to add to it; this should not happen > (kafka.coordinator.transaction.TransactionStateManager) [2019-12-12 > 03:11:56,320] ERROR Uncaught exception in scheduled task > 'load-txns-for-partition-__transaction_state-41' (kafka.utils.KafkaScheduler) > java.lang.IllegalStateException: The metadata cache for txn partition 41 has > already exist with epoch 111 and 6 entries while trying to add to it; this > should not happen at > kafka.coordinator.transaction.TransactionStateManager.addLoadedTransactionsToCache(TransactionStateManager.scala:369) > at > kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply$mcV$sp(TransactionStateManager.scala:394) > at > kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply(TransactionStateManager.scala:393) > at > kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply(TransactionStateManager.scala:393) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at > kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:259) at > kafka.coordinator.transaction.TransactionStateManager.kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1(TransactionStateManager.scala:392) > at > kafka.coordinator.transaction.TransactionStateManager$$anonfun$loadTransactionsForTxnTopicPartition$2.apply$mcV$sp(TransactionStateManager.scala:426) > at > kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:114) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at > java.util.concurrent.FutureTask.run(FutureTask.java:266) at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java) > at java.lang.Thread.run(Thread.java:748) > ``` > * This meant that this partition was left in the "loading" state and we thus > returned COORDINATOR_LOAD_IN_PROGRESS errors. > * Broker restart fixed this partial in-memory state and we were able to > resume processing for transactions. -- This message was sent by Atlassian Jira (v8.3.4#803005)