[ 
https://issues.apache.org/jira/browse/KAFKA-7519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-7519:
-------------------------------
    Fix Version/s: 2.0.1

> Transactional Ids Left in Pending State by TransactionStateManager During 
> Transactional Id Expiration Are Unusable
> ------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7519
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7519
>             Project: Kafka
>          Issue Type: Bug
>          Components: core, producer 
>    Affects Versions: 2.0.0
>            Reporter: Bridger Howell
>            Priority: Blocker
>             Fix For: 2.0.1, 2.1.0
>
>         Attachments: KAFKA-7519.patch, image-2018-10-18-13-02-22-371.png
>
>
>  
> After digging into a case where an exactly-once streams process was bizarrely 
> unable to process incoming data, we observed the following:
>  * StreamThreads stalling while creating a producer, eventually resulting in 
> no consumption by that streams process. Looking into those threads, we found 
> they were stuck in a loop, sending InitProducerIdRequests and always 
> receiving back the retriable error CONCURRENT_TRANSACTIONS and trying again. 
> These requests always had the same transactional id.
>  * After changing the streams process to not use exactly-once, it was able to 
> process messages with no problems.
>  * Alternatively, changing the applicationId for that streams process, it was 
> able to process with no problems.
>  * Every hour,  every broker would fail the task `transactionalId-expiration` 
> with the following error:
>  ** 
> {code:java}
> {"exception":{"stacktrace":"java.lang.IllegalStateException: Preparing 
> transaction state transition to Dead while it already a pending sta
> te Dead
>     at 
> kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:262)
>     at kafka.coordinator
> .transaction.TransactionMetadata.prepareDead(TransactionMetadata.scala:237)
>     at kafka.coordinator.transaction.TransactionStateManager$$a
> nonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scal
> a:151)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$ano
> nfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scala:151)
>     at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>     at
>  
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>     at kafka.coordinator.transaction.TransactionSt
> ateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9.apply(TransactionStateManager.sc
> ala:150)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$a
> nonfun$2$$anonfun$apply$9.apply(TransactionStateManager.scala:149)
>     at scala.collection.TraversableLike$$anonfun$map$1.apply(Traversable
> Like.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.foreach(List.scala:392)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.map(List.scala:296)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$app
> ly$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:149)
>     at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enabl
> eTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:142)
>     at scala.collection.Traversabl
> eLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>     at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.
> scala:241)
>     at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
>     at scala.collection.mutable.HashMap$$anon
> fun$foreach$1.apply(HashMap.scala:130)
>     at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
>     at scala.collec
> tion.mutable.HashMap.foreachEntry(HashMap.scala:40)
>     at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
>     at scala.collecti
> on.TraversableLike$class.flatMap(TraversableLike.scala:241)
>     at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>     a
> t 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Tr
> ansactionStateManager.scala:142)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$a
> nonfun$apply$mcV$sp$1.apply(TransactionStateManager.scala:140)
>     at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enable
> TransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1.apply(TransactionStateManager.scala:140)
>     at kafka.utils.CoreUtils$.inLock(CoreUtils
> .scala:251)
>     at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
>     at kafka.coordinator.transaction.TransactionStateManager$$anon
> fun$enableTransactionalIdExpiration$1.apply$mcV$sp(TransactionStateManager.scala:140)
>     at kafka.utils.KafkaScheduler$$anonfun$1.apply$mc
> V$sp(KafkaScheduler.scala:114)
>     at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
>     at java.util.concurrent.Executors$RunnableAd
> apter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>     at java.util.concurrent.Scheduled
> ThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>     at java.util.concurrent.ScheduledThreadPoolExec
> utor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecu
> tor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.jav
> a:748)","exception_class":"java.lang.IllegalStateException","exception_message":"Preparing
>  transaction state transition to Dead while it a
> lready a pending state 
> Dead"},"source_host":"kafka-broker-4.kafka-broker.default.svc.cluster.local","method":"error","level":"ERROR","message":"Uncaught
>  exception in scheduled task 
> transactionalId-expiration","mdc":{},"file":"Logging.scala","line_number":"76","thread_name":"transaction-log-manager-0","logger_name":"kafka.utils.KafkaScheduler","class<span
>  class="code-quote">":"kafka.utils.Logging$class"}{code}
> Based on these problems and having read a bit of the server source, I guessed 
> that this would all be explained by there being TransactionMetadata instances 
> that are stuck in a pendingState.
> After doing a heap dump of the broker that was returning the error for our 
> particular group, we found this:
> !image-2018-10-18-13-02-22-371.png|width=723,height=224!
> There were indeed a bunch of live TransactionMetadata instances that had a 
> pending state of  "Dead" but should have already been cleaned up, confirming 
> my guess.
> Finally, after reading carefully through the TransactionStateManager callback 
> for producing tombstones for expired transactional ids I noticed that if 
> there is any error returned by the ReplicaManager, those transactions will 
> _not_ have their pending state cleared.
> —
> h3. Short summary:
> If the ReplicaManager fails to append the tombstone records for expiring a 
> transactional id (in my case, this likely happened during a rebalance that 
> wasn't properly rate limited), the broker fails to clear it's pending state 
> for that transactional id, blocking any future actions on that transactional 
> id (including cleanup), until the broker is restarted or another broker 
> without that problem becomes the coordinator for that transactional id.
> —
>  Related:
>  There was a very similar case in KAFKA-5351 where not clearing a 
> TransactionMetadata's pendingState caused similar issues.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to