[ 
https://issues.apache.org/jira/browse/KAFKA-9307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-9307:
--------------------------------
    Description: 
We observed a case where the transaction coordinator could not load transaction 
state from __transaction-state topic partition. Clients would continue seeing 
COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker hosting the 
coordinator is restarted.

This is the sequence of events that leads to the issue:
 * The broker is the leader of one (or more) transaction state topic partitions.
 * The broker loses its ZK session due to a network issue.

 * Broker reestablishes session with ZK, though there are still transient 
network issues.
 * Broker is made follower of the transaction state topic partition it was 
leading earlier.

 * During the become-follower transition, the broker loses its ZK session again.
 * The become-follower transition for this broker fails in-between, leaving us 
in a partial leader / partial follower state for the transaction topic. This 
meant that we could not unload the transaction metadata. However, the broker 
successfully caches the leader epoch of associated with the LeaderAndIsrRequest.

 
 * Later, when the ZK session is finally established successfully, the broker 
ignores the become-follower transition as the leader epoch was same as the one 
it had cached. This prevented the transaction metadata from being unloaded.

 * Because this partition was a partial follower, we had setup replica 
fetchers. The partition continued to fetch from the leader until it was made 
part of the ISR.

 * Once it was part of the ISR, preferred leader election kicked in and elected 
this broker as the leader.

 * When processing the become-leader transition, the operation failed as we 
already had transaction metadata loaded at a previous epoch.

```
 2019-12-12 03:11:56,320] ERROR [Transaction State Manager 3]: The metadata 
cache for txn partition 41 has already exist with epoch 111 and 6 entries while 
trying to add to it; this should not happen 
(kafka.coordinator.transaction.TransactionStateManager) [2019-12-12 
03:11:56,320] ERROR Uncaught exception in scheduled task 
'load-txns-for-partition-__transaction_state-41' (kafka.utils.KafkaScheduler) 
java.lang.IllegalStateException: The metadata cache for txn partition 41 has 
already exist with epoch 111 and 6 entries while trying to add to it; this 
should not happen at 
kafka.coordinator.transaction.TransactionStateManager.addLoadedTransactionsToCache(TransactionStateManager.scala:369)
 at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply$mcV$sp(TransactionStateManager.scala:394)
 at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply(TransactionStateManager.scala:393)
 at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply(TransactionStateManager.scala:393)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at 
kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:259) at 
kafka.coordinator.transaction.TransactionStateManager.kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1(TransactionStateManager.scala:392)
 at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$loadTransactionsForTxnTopicPartition$2.apply$mcV$sp(TransactionStateManager.scala:426)
 at 
kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:114) at 
kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java) 
at java.lang.Thread.run(Thread.java:748)
 ```
 * This meant that this partition was left in the "loading" state and we thus 
returned COORDINATOR_LOAD_IN_PROGRESS errors.

 * Broker restart fixed this partial in-memory state and we were able to resume 
processing for transactions.

  was:
We observed a case where the transaction coordinator could not load transaction 
state from __transaction-state topic partition. Clients would continue seeing 
COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker hosting the 
coordinator is restarted.

This is the sequence of events that leads to the issue:
 * The broker is the leader of one (or more) transaction state topic partitions.
 * The broker loses its ZK session due to a network issue.

 * Broker reestablishes session with ZK, though there are still transient 
network issues.
 * Broker is made follower of the transaction state topic partition it was 
leading earlier.

 * During the become-follower transition, the broker loses its ZK session again.
 * The become-follower transition for this broker fails in-between, leaving us 
in a partial leader / partial follower state for the transaction topic. This 
meant that we could not unload the transaction metadata. However, the broker 
successfully caches the leader epoch of associated with the LeaderAndIsrRequest.

```
 [2019-12-12 03:08:17,864] ERROR [KafkaApi-3] Error when handling request: 
clientId=2, correlationId=1, api=LEADER_AND_ISR, ... 
{topic=__transaction_state,partition_states=[{...

{partition=41,controller_epoch=16,leader=4,leader_epoch=112,isr=[2,4,1],zk_version=208,replicas=[3,4,2,1],is_new=false}

... org.apache.zookeeper.KeeperException$SessionExpiredException: 
KeeperErrorCode = Session expired for /brokers/topics/__transaction_state at 
org.apache.zookeeper.KeeperException.create(KeeperException.java:130) at 
org.apache.zookeeper.KeeperException.create(KeeperException.java:54) at 
kafka.zookeeper.AsyncResponse.resultException(ZooKeeperClient.scala:537) at 
kafka.zk.KafkaZkClient$$anonfun$getReplicaAssignmentForTopics$1.apply(KafkaZkClient.scala:579)
 at 
kafka.zk.KafkaZkClient$$anonfun$getReplicaAssignmentForTopics$1.apply(KafkaZkClient.scala:574)
 at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
 at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at 
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at 
scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at 
kafka.zk.KafkaZkClient.getReplicaAssignmentForTopics(KafkaZkClient.scala:574) 
at kafka.zk.KafkaZkClient.getTopicPartitionCount(KafkaZkClient.scala:624) at 
kafka.coordinator.transaction.TransactionStateManager.getTransactionTopicPartitionCount(TransactionStateManager.scala:279)
 at 
kafka.coordinator.transaction.TransactionStateManager.validateTransactionTopicPartitionCountIsStable(TransactionStateManager.scala:465)
 at 
kafka.coordinator.transaction.TransactionStateManager.removeTransactionsForTxnTopicPartition(TransactionStateManager.scala:434)
 at 
kafka.coordinator.transaction.TransactionCoordinator.handleTxnEmigration(TransactionCoordinator.scala:282)
 at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$onLeadershipChange$1$2.apply(KafkaApis.scala:190)
 at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$onLeadershipChange$1$2.apply(KafkaApis.scala:186)
 at scala.collection.mutable.HashSet.foreach(HashSet.scala:78) at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$onLeadershipChange$1(KafkaApis.scala:186)
 at kafka.server.KafkaApis$$anonfun$2.apply(KafkaApis.scala:202) at 
kafka.server.KafkaApis$$anonfun$2.apply(KafkaApis.scala:202) at 
kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:1153) 
at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:202) 
 ```
 * Later, when the ZK session is finally established successfully, the broker 
ignores the become-follower transition as the leader epoch was same as the one 
it had cached. This prevented the transaction metadata from being unloaded.

 * Because this partition was a partial follower, we had setup replica 
fetchers. The partition continued to fetch from the leader until it was made 
part of the ISR.

 * Once it was part of the ISR, preferred leader election kicked in and elected 
this broker as the leader.

 * When processing the become-leader transition, the operation failed as we 
already had transaction metadata loaded at a previous epoch.

```
2019-12-12 03:11:56,320] ERROR [Transaction State Manager 3]: The metadata 
cache for txn partition 41 has already exist with epoch 111 and 6 entries while 
trying to add to it; this should not happen 
(kafka.coordinator.transaction.TransactionStateManager) [2019-12-12 
03:11:56,320] ERROR Uncaught exception in scheduled task 
'load-txns-for-partition-__transaction_state-41' (kafka.utils.KafkaScheduler) 
java.lang.IllegalStateException: The metadata cache for txn partition 41 has 
already exist with epoch 111 and 6 entries while trying to add to it; this 
should not happen at 
kafka.coordinator.transaction.TransactionStateManager.addLoadedTransactionsToCache(TransactionStateManager.scala:369)
 at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply$mcV$sp(TransactionStateManager.scala:394)
 at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply(TransactionStateManager.scala:393)
 at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply(TransactionStateManager.scala:393)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at 
kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:259) at 
kafka.coordinator.transaction.TransactionStateManager.kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1(TransactionStateManager.scala:392)
 at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$loadTransactionsForTxnTopicPartition$2.apply$mcV$sp(TransactionStateManager.scala:426)
 at 
kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:114) at 
kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java) 
at java.lang.Thread.run(Thread.java:748)
```
 * This meant that this partition was left in the "loading" state and we thus 
returned COORDINATOR_LOAD_IN_PROGRESS errors.

 * Broker restart fixed this partial in-memory state and we were able to resume 
processing for transactions.


> Transaction coordinator could be left in unknown state after ZK session 
> timeout
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-9307
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9307
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>            Reporter: Dhruvil Shah
>            Assignee: Dhruvil Shah
>            Priority: Major
>
> We observed a case where the transaction coordinator could not load 
> transaction state from __transaction-state topic partition. Clients would 
> continue seeing COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker 
> hosting the coordinator is restarted.
> This is the sequence of events that leads to the issue:
>  * The broker is the leader of one (or more) transaction state topic 
> partitions.
>  * The broker loses its ZK session due to a network issue.
>  * Broker reestablishes session with ZK, though there are still transient 
> network issues.
>  * Broker is made follower of the transaction state topic partition it was 
> leading earlier.
>  * During the become-follower transition, the broker loses its ZK session 
> again.
>  * The become-follower transition for this broker fails in-between, leaving 
> us in a partial leader / partial follower state for the transaction topic. 
> This meant that we could not unload the transaction metadata. However, the 
> broker successfully caches the leader epoch of associated with the 
> LeaderAndIsrRequest.
>  
>  * Later, when the ZK session is finally established successfully, the broker 
> ignores the become-follower transition as the leader epoch was same as the 
> one it had cached. This prevented the transaction metadata from being 
> unloaded.
>  * Because this partition was a partial follower, we had setup replica 
> fetchers. The partition continued to fetch from the leader until it was made 
> part of the ISR.
>  * Once it was part of the ISR, preferred leader election kicked in and 
> elected this broker as the leader.
>  * When processing the become-leader transition, the operation failed as we 
> already had transaction metadata loaded at a previous epoch.
> ```
>  2019-12-12 03:11:56,320] ERROR [Transaction State Manager 3]: The metadata 
> cache for txn partition 41 has already exist with epoch 111 and 6 entries 
> while trying to add to it; this should not happen 
> (kafka.coordinator.transaction.TransactionStateManager) [2019-12-12 
> 03:11:56,320] ERROR Uncaught exception in scheduled task 
> 'load-txns-for-partition-__transaction_state-41' (kafka.utils.KafkaScheduler) 
> java.lang.IllegalStateException: The metadata cache for txn partition 41 has 
> already exist with epoch 111 and 6 entries while trying to add to it; this 
> should not happen at 
> kafka.coordinator.transaction.TransactionStateManager.addLoadedTransactionsToCache(TransactionStateManager.scala:369)
>  at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply$mcV$sp(TransactionStateManager.scala:394)
>  at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply(TransactionStateManager.scala:393)
>  at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply(TransactionStateManager.scala:393)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at 
> kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:259) at 
> kafka.coordinator.transaction.TransactionStateManager.kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1(TransactionStateManager.scala:392)
>  at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$loadTransactionsForTxnTopicPartition$2.apply$mcV$sp(TransactionStateManager.scala:426)
>  at 
> kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:114) 
> at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java) 
> at java.lang.Thread.run(Thread.java:748)
>  ```
>  * This meant that this partition was left in the "loading" state and we thus 
> returned COORDINATOR_LOAD_IN_PROGRESS errors.
>  * Broker restart fixed this partial in-memory state and we were able to 
> resume processing for transactions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to