[
https://issues.apache.org/jira/browse/KAFKA-9307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dhruvil Shah updated KAFKA-9307:
--------------------------------
Description:
We observed a case where the transaction coordinator could not load transaction
state from __transaction-state topic partition. Clients would continue seeing
COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker hosting the
coordinator is restarted.
This is the sequence of events that leads to the issue:
* The broker is the leader of one (or more) transaction state topic partitions.
* The broker loses its ZK session due to a network issue.
* Broker reestablishes session with ZK, though there are still transient
network issues.
* Broker is made follower of the transaction state topic partition it was
leading earlier.
* During the become-follower transition, the broker loses its ZK session again.
* The become-follower transition for this broker fails in-between, leaving us
in a partial leader / partial follower state for the transaction topic. This
meant that we could not unload the transaction metadata. However, the broker
successfully caches the leader epoch of associated with the LeaderAndIsrRequest.
* Later, when the ZK session is finally established successfully, the broker
ignores the become-follower transition as the leader epoch was same as the one
it had cached. This prevented the transaction metadata from being unloaded.
* Because this partition was a partial follower, we had setup replica
fetchers. The partition continued to fetch from the leader until it was made
part of the ISR.
* Once it was part of the ISR, preferred leader election kicked in and elected
this broker as the leader.
* When processing the become-leader transition, the operation failed as we
already had transaction metadata loaded at a previous epoch.
```
2019-12-12 03:11:56,320] ERROR [Transaction State Manager 3]: The metadata
cache for txn partition 41 has already exist with epoch 111 and 6 entries while
trying to add to it; this should not happen
(kafka.coordinator.transaction.TransactionStateManager) [2019-12-12
03:11:56,320] ERROR Uncaught exception in scheduled task
'load-txns-for-partition-__transaction_state-41' (kafka.utils.KafkaScheduler)
java.lang.IllegalStateException: The metadata cache for txn partition 41 has
already exist with epoch 111 and 6 entries while trying to add to it; this
should not happen at
kafka.coordinator.transaction.TransactionStateManager.addLoadedTransactionsToCache(TransactionStateManager.scala:369)
at
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply$mcV$sp(TransactionStateManager.scala:394)
at
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply(TransactionStateManager.scala:393)
at
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply(TransactionStateManager.scala:393)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at
kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:259) at
kafka.coordinator.transaction.TransactionStateManager.kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1(TransactionStateManager.scala:392)
at
kafka.coordinator.transaction.TransactionStateManager$$anonfun$loadTransactionsForTxnTopicPartition$2.apply$mcV$sp(TransactionStateManager.scala:426)
at
kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:114) at
kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java)
at java.lang.Thread.run(Thread.java:748)
```
* This meant that this partition was left in the "loading" state and we thus
returned COORDINATOR_LOAD_IN_PROGRESS errors.
* Broker restart fixed this partial in-memory state and we were able to resume
processing for transactions.
was:
We observed a case where the transaction coordinator could not load transaction
state from __transaction-state topic partition. Clients would continue seeing
COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker hosting the
coordinator is restarted.
This is the sequence of events that leads to the issue:
* The broker is the leader of one (or more) transaction state topic partitions.
* The broker loses its ZK session due to a network issue.
* Broker reestablishes session with ZK, though there are still transient
network issues.
* Broker is made follower of the transaction state topic partition it was
leading earlier.
* During the become-follower transition, the broker loses its ZK session again.
* The become-follower transition for this broker fails in-between, leaving us
in a partial leader / partial follower state for the transaction topic. This
meant that we could not unload the transaction metadata. However, the broker
successfully caches the leader epoch of associated with the LeaderAndIsrRequest.
```
[2019-12-12 03:08:17,864] ERROR [KafkaApi-3] Error when handling request:
clientId=2, correlationId=1, api=LEADER_AND_ISR, ...
{topic=__transaction_state,partition_states=[{...
{partition=41,controller_epoch=16,leader=4,leader_epoch=112,isr=[2,4,1],zk_version=208,replicas=[3,4,2,1],is_new=false}
... org.apache.zookeeper.KeeperException$SessionExpiredException:
KeeperErrorCode = Session expired for /brokers/topics/__transaction_state at
org.apache.zookeeper.KeeperException.create(KeeperException.java:130) at
org.apache.zookeeper.KeeperException.create(KeeperException.java:54) at
kafka.zookeeper.AsyncResponse.resultException(ZooKeeperClient.scala:537) at
kafka.zk.KafkaZkClient$$anonfun$getReplicaAssignmentForTopics$1.apply(KafkaZkClient.scala:579)
at
kafka.zk.KafkaZkClient$$anonfun$getReplicaAssignmentForTopics$1.apply(KafkaZkClient.scala:574)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at
scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at
kafka.zk.KafkaZkClient.getReplicaAssignmentForTopics(KafkaZkClient.scala:574)
at kafka.zk.KafkaZkClient.getTopicPartitionCount(KafkaZkClient.scala:624) at
kafka.coordinator.transaction.TransactionStateManager.getTransactionTopicPartitionCount(TransactionStateManager.scala:279)
at
kafka.coordinator.transaction.TransactionStateManager.validateTransactionTopicPartitionCountIsStable(TransactionStateManager.scala:465)
at
kafka.coordinator.transaction.TransactionStateManager.removeTransactionsForTxnTopicPartition(TransactionStateManager.scala:434)
at
kafka.coordinator.transaction.TransactionCoordinator.handleTxnEmigration(TransactionCoordinator.scala:282)
at
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$onLeadershipChange$1$2.apply(KafkaApis.scala:190)
at
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$onLeadershipChange$1$2.apply(KafkaApis.scala:186)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:78) at
kafka.server.KafkaApis.kafka$server$KafkaApis$$onLeadershipChange$1(KafkaApis.scala:186)
at kafka.server.KafkaApis$$anonfun$2.apply(KafkaApis.scala:202) at
kafka.server.KafkaApis$$anonfun$2.apply(KafkaApis.scala:202) at
kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:1153)
at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:202)
```
* Later, when the ZK session is finally established successfully, the broker
ignores the become-follower transition as the leader epoch was same as the one
it had cached. This prevented the transaction metadata from being unloaded.
* Because this partition was a partial follower, we had setup replica
fetchers. The partition continued to fetch from the leader until it was made
part of the ISR.
* Once it was part of the ISR, preferred leader election kicked in and elected
this broker as the leader.
* When processing the become-leader transition, the operation failed as we
already had transaction metadata loaded at a previous epoch.
```
2019-12-12 03:11:56,320] ERROR [Transaction State Manager 3]: The metadata
cache for txn partition 41 has already exist with epoch 111 and 6 entries while
trying to add to it; this should not happen
(kafka.coordinator.transaction.TransactionStateManager) [2019-12-12
03:11:56,320] ERROR Uncaught exception in scheduled task
'load-txns-for-partition-__transaction_state-41' (kafka.utils.KafkaScheduler)
java.lang.IllegalStateException: The metadata cache for txn partition 41 has
already exist with epoch 111 and 6 entries while trying to add to it; this
should not happen at
kafka.coordinator.transaction.TransactionStateManager.addLoadedTransactionsToCache(TransactionStateManager.scala:369)
at
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply$mcV$sp(TransactionStateManager.scala:394)
at
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply(TransactionStateManager.scala:393)
at
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply(TransactionStateManager.scala:393)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at
kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:259) at
kafka.coordinator.transaction.TransactionStateManager.kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1(TransactionStateManager.scala:392)
at
kafka.coordinator.transaction.TransactionStateManager$$anonfun$loadTransactionsForTxnTopicPartition$2.apply$mcV$sp(TransactionStateManager.scala:426)
at
kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:114) at
kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java)
at java.lang.Thread.run(Thread.java:748)
```
* This meant that this partition was left in the "loading" state and we thus
returned COORDINATOR_LOAD_IN_PROGRESS errors.
* Broker restart fixed this partial in-memory state and we were able to resume
processing for transactions.
> Transaction coordinator could be left in unknown state after ZK session
> timeout
> -------------------------------------------------------------------------------
>
> Key: KAFKA-9307
> URL: https://issues.apache.org/jira/browse/KAFKA-9307
> Project: Kafka
> Issue Type: Bug
> Components: core
> Reporter: Dhruvil Shah
> Assignee: Dhruvil Shah
> Priority: Major
>
> We observed a case where the transaction coordinator could not load
> transaction state from __transaction-state topic partition. Clients would
> continue seeing COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker
> hosting the coordinator is restarted.
> This is the sequence of events that leads to the issue:
> * The broker is the leader of one (or more) transaction state topic
> partitions.
> * The broker loses its ZK session due to a network issue.
> * Broker reestablishes session with ZK, though there are still transient
> network issues.
> * Broker is made follower of the transaction state topic partition it was
> leading earlier.
> * During the become-follower transition, the broker loses its ZK session
> again.
> * The become-follower transition for this broker fails in-between, leaving
> us in a partial leader / partial follower state for the transaction topic.
> This meant that we could not unload the transaction metadata. However, the
> broker successfully caches the leader epoch of associated with the
> LeaderAndIsrRequest.
>
> * Later, when the ZK session is finally established successfully, the broker
> ignores the become-follower transition as the leader epoch was same as the
> one it had cached. This prevented the transaction metadata from being
> unloaded.
> * Because this partition was a partial follower, we had setup replica
> fetchers. The partition continued to fetch from the leader until it was made
> part of the ISR.
> * Once it was part of the ISR, preferred leader election kicked in and
> elected this broker as the leader.
> * When processing the become-leader transition, the operation failed as we
> already had transaction metadata loaded at a previous epoch.
> ```
> 2019-12-12 03:11:56,320] ERROR [Transaction State Manager 3]: The metadata
> cache for txn partition 41 has already exist with epoch 111 and 6 entries
> while trying to add to it; this should not happen
> (kafka.coordinator.transaction.TransactionStateManager) [2019-12-12
> 03:11:56,320] ERROR Uncaught exception in scheduled task
> 'load-txns-for-partition-__transaction_state-41' (kafka.utils.KafkaScheduler)
> java.lang.IllegalStateException: The metadata cache for txn partition 41 has
> already exist with epoch 111 and 6 entries while trying to add to it; this
> should not happen at
> kafka.coordinator.transaction.TransactionStateManager.addLoadedTransactionsToCache(TransactionStateManager.scala:369)
> at
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply$mcV$sp(TransactionStateManager.scala:394)
> at
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply(TransactionStateManager.scala:393)
> at
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply(TransactionStateManager.scala:393)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at
> kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:259) at
> kafka.coordinator.transaction.TransactionStateManager.kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1(TransactionStateManager.scala:392)
> at
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$loadTransactionsForTxnTopicPartition$2.apply$mcV$sp(TransactionStateManager.scala:426)
> at
> kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:114)
> at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
> java.util.concurrent.FutureTask.run(FutureTask.java:266) at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java)
> at java.lang.Thread.run(Thread.java:748)
> ```
> * This meant that this partition was left in the "loading" state and we thus
> returned COORDINATOR_LOAD_IN_PROGRESS errors.
> * Broker restart fixed this partial in-memory state and we were able to
> resume processing for transactions.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)