[ 
https://issues.apache.org/jira/browse/KAFKA-7519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16659332#comment-16659332
 ] 

ASF GitHub Bot commented on KAFKA-7519:
---------------------------------------

ijuma closed pull request #5820: KAFKA-7519 Clear pending transaction state 
when expiration fails
URL: https://github.com/apache/kafka/pull/5820
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 574c64e2c65..589407c2a2c 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -166,37 +166,32 @@ class TransactionStateManager(brokerId: Int,
             (topicPartition, records)
           }
 
-
         def removeFromCacheCallback(responses: collection.Map[TopicPartition, 
PartitionResponse]): Unit = {
           responses.foreach { case (topicPartition, response) =>
-            response.error match {
-              case Errors.NONE =>
-                inReadLock(stateLock) {
-                  val toRemove = 
transactionalIdByPartition(topicPartition.partition())
-                  transactionMetadataCache.get(topicPartition.partition)
-                    .foreach { txnMetadataCacheEntry =>
-                      toRemove.foreach { idCoordinatorEpochAndMetadata =>
-                        val txnMetadata = 
txnMetadataCacheEntry.metadataPerTransactionalId.get(idCoordinatorEpochAndMetadata.transactionalId)
-                        txnMetadata.inLock {
-                          if (txnMetadataCacheEntry.coordinatorEpoch == 
idCoordinatorEpochAndMetadata.coordinatorEpoch
-                            && txnMetadata.pendingState.contains(Dead)
-                            && txnMetadata.producerEpoch == 
idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch
-                          )
-                            
txnMetadataCacheEntry.metadataPerTransactionalId.remove(idCoordinatorEpochAndMetadata.transactionalId)
-                          else {
-                             debug(s"failed to remove expired transactionalId: 
${idCoordinatorEpochAndMetadata.transactionalId}" +
-                               s" from cache. pendingState: 
${txnMetadata.pendingState} producerEpoch: ${txnMetadata.producerEpoch}" +
-                               s" expected producerEpoch: 
${idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch}" +
-                               s" coordinatorEpoch: 
${txnMetadataCacheEntry.coordinatorEpoch} expected coordinatorEpoch: " +
-                               
s"${idCoordinatorEpochAndMetadata.coordinatorEpoch}")
-                            txnMetadata.pendingState = None
-                          }
-                        }
-                      }
+            inReadLock(stateLock) {
+              val toRemove = 
transactionalIdByPartition(topicPartition.partition)
+              transactionMetadataCache.get(topicPartition.partition).foreach { 
txnMetadataCacheEntry =>
+                toRemove.foreach { idCoordinatorEpochAndMetadata =>
+                  val transactionalId = 
idCoordinatorEpochAndMetadata.transactionalId
+                  val txnMetadata = 
txnMetadataCacheEntry.metadataPerTransactionalId.get(transactionalId)
+                  txnMetadata.inLock {
+                    if (txnMetadataCacheEntry.coordinatorEpoch == 
idCoordinatorEpochAndMetadata.coordinatorEpoch
+                      && txnMetadata.pendingState.contains(Dead)
+                      && txnMetadata.producerEpoch == 
idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch
+                      && response.error == Errors.NONE) {
+                      
txnMetadataCacheEntry.metadataPerTransactionalId.remove(transactionalId)
+                    } else {
+                      warn(s"Failed to remove expired transactionalId: 
$transactionalId" +
+                        s" from cache. Tombstone append error code: 
${response.error}," +
+                        s" pendingState: ${txnMetadata.pendingState}, 
producerEpoch: ${txnMetadata.producerEpoch}," +
+                        s" expected producerEpoch: 
${idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch}," +
+                        s" coordinatorEpoch: 
${txnMetadataCacheEntry.coordinatorEpoch}, expected coordinatorEpoch: " +
+                        s"${idCoordinatorEpochAndMetadata.coordinatorEpoch}")
+                      txnMetadata.pendingState = None
                     }
+                  }
                 }
-              case _ =>
-                debug(s"writing transactionalId tombstones for partition: 
${topicPartition.partition} failed with error: ${response.error.message()}")
+              }
             }
           }
         }
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index b395d00a98e..d2fe7eacb23 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -419,56 +419,57 @@ class TransactionStateManagerTest {
   def shouldRemoveCompleteCommmitExpiredTransactionalIds(): Unit = {
     setupAndRunTransactionalIdExpiration(Errors.NONE, CompleteCommit)
     verifyMetadataDoesntExist(transactionalId1)
-    verifyMetadataDoesExist(transactionalId2)
+    verifyMetadataDoesExistAndIsUsable(transactionalId2)
   }
 
   @Test
   def shouldRemoveCompleteAbortExpiredTransactionalIds(): Unit = {
     setupAndRunTransactionalIdExpiration(Errors.NONE, CompleteAbort)
     verifyMetadataDoesntExist(transactionalId1)
-    verifyMetadataDoesExist(transactionalId2)
+    verifyMetadataDoesExistAndIsUsable(transactionalId2)
   }
 
   @Test
   def shouldRemoveEmptyExpiredTransactionalIds(): Unit = {
     setupAndRunTransactionalIdExpiration(Errors.NONE, Empty)
     verifyMetadataDoesntExist(transactionalId1)
-    verifyMetadataDoesExist(transactionalId2)
+    verifyMetadataDoesExistAndIsUsable(transactionalId2)
   }
 
   @Test
   def shouldNotRemoveExpiredTransactionalIdsIfLogAppendFails(): Unit = {
     setupAndRunTransactionalIdExpiration(Errors.NOT_ENOUGH_REPLICAS, 
CompleteAbort)
-    verifyMetadataDoesExist(transactionalId1)
-    verifyMetadataDoesExist(transactionalId2)
+    verifyMetadataDoesExistAndIsUsable(transactionalId1)
+    verifyMetadataDoesExistAndIsUsable(transactionalId2)
   }
 
   @Test
   def shouldNotRemoveOngoingTransactionalIds(): Unit = {
     setupAndRunTransactionalIdExpiration(Errors.NONE, Ongoing)
-    verifyMetadataDoesExist(transactionalId1)
-    verifyMetadataDoesExist(transactionalId2)
+    verifyMetadataDoesExistAndIsUsable(transactionalId1)
+    verifyMetadataDoesExistAndIsUsable(transactionalId2)
   }
 
   @Test
   def shouldNotRemovePrepareAbortTransactionalIds(): Unit = {
     setupAndRunTransactionalIdExpiration(Errors.NONE, PrepareAbort)
-    verifyMetadataDoesExist(transactionalId1)
-    verifyMetadataDoesExist(transactionalId2)
+    verifyMetadataDoesExistAndIsUsable(transactionalId1)
+    verifyMetadataDoesExistAndIsUsable(transactionalId2)
   }
 
   @Test
   def shouldNotRemovePrepareCommitTransactionalIds(): Unit = {
     setupAndRunTransactionalIdExpiration(Errors.NONE, PrepareCommit)
-    verifyMetadataDoesExist(transactionalId1)
-    verifyMetadataDoesExist(transactionalId2)
+    verifyMetadataDoesExistAndIsUsable(transactionalId1)
+    verifyMetadataDoesExistAndIsUsable(transactionalId2)
   }
 
-  private def verifyMetadataDoesExist(transactionalId: String) = {
+  private def verifyMetadataDoesExistAndIsUsable(transactionalId: String) = {
     transactionManager.getTransactionState(transactionalId) match {
       case Left(errors) => fail("shouldn't have been any errors")
       case Right(None) => fail("metadata should have been removed")
-      case Right(Some(metadata)) => // ok
+      case Right(Some(metadata)) =>
+        assertTrue("metadata shouldn't be in a pending state", 
metadata.transactionMetadata.pendingState.isEmpty)
     }
   }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Transactional Ids Left in Pending State by TransactionStateManager During 
> Transactional Id Expiration Are Unusable
> ------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7519
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7519
>             Project: Kafka
>          Issue Type: Bug
>          Components: core, producer 
>    Affects Versions: 2.0.0
>            Reporter: Bridger Howell
>            Priority: Blocker
>             Fix For: 2.0.1, 2.1.0
>
>         Attachments: KAFKA-7519.patch, image-2018-10-18-13-02-22-371.png
>
>
>  
> After digging into a case where an exactly-once streams process was bizarrely 
> unable to process incoming data, we observed the following:
>  * StreamThreads stalling while creating a producer, eventually resulting in 
> no consumption by that streams process. Looking into those threads, we found 
> they were stuck in a loop, sending InitProducerIdRequests and always 
> receiving back the retriable error CONCURRENT_TRANSACTIONS and trying again. 
> These requests always had the same transactional id.
>  * After changing the streams process to not use exactly-once, it was able to 
> process messages with no problems.
>  * Alternatively, changing the applicationId for that streams process, it was 
> able to process with no problems.
>  * Every hour,  every broker would fail the task `transactionalId-expiration` 
> with the following error:
>  ** 
> {code:java}
> {"exception":{"stacktrace":"java.lang.IllegalStateException: Preparing 
> transaction state transition to Dead while it already a pending sta
> te Dead
>     at 
> kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:262)
>     at kafka.coordinator
> .transaction.TransactionMetadata.prepareDead(TransactionMetadata.scala:237)
>     at kafka.coordinator.transaction.TransactionStateManager$$a
> nonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scal
> a:151)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$ano
> nfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scala:151)
>     at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>     at
>  
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>     at kafka.coordinator.transaction.TransactionSt
> ateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9.apply(TransactionStateManager.sc
> ala:150)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$a
> nonfun$2$$anonfun$apply$9.apply(TransactionStateManager.scala:149)
>     at scala.collection.TraversableLike$$anonfun$map$1.apply(Traversable
> Like.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.foreach(List.scala:392)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.map(List.scala:296)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$app
> ly$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:149)
>     at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enabl
> eTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:142)
>     at scala.collection.Traversabl
> eLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>     at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.
> scala:241)
>     at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
>     at scala.collection.mutable.HashMap$$anon
> fun$foreach$1.apply(HashMap.scala:130)
>     at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
>     at scala.collec
> tion.mutable.HashMap.foreachEntry(HashMap.scala:40)
>     at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
>     at scala.collecti
> on.TraversableLike$class.flatMap(TraversableLike.scala:241)
>     at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>     a
> t 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Tr
> ansactionStateManager.scala:142)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$a
> nonfun$apply$mcV$sp$1.apply(TransactionStateManager.scala:140)
>     at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enable
> TransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1.apply(TransactionStateManager.scala:140)
>     at kafka.utils.CoreUtils$.inLock(CoreUtils
> .scala:251)
>     at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
>     at kafka.coordinator.transaction.TransactionStateManager$$anon
> fun$enableTransactionalIdExpiration$1.apply$mcV$sp(TransactionStateManager.scala:140)
>     at kafka.utils.KafkaScheduler$$anonfun$1.apply$mc
> V$sp(KafkaScheduler.scala:114)
>     at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
>     at java.util.concurrent.Executors$RunnableAd
> apter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>     at java.util.concurrent.Scheduled
> ThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>     at java.util.concurrent.ScheduledThreadPoolExec
> utor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecu
> tor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.jav
> a:748)","exception_class":"java.lang.IllegalStateException","exception_message":"Preparing
>  transaction state transition to Dead while it a
> lready a pending state 
> Dead"},"source_host":"kafka-broker-4.kafka-broker.default.svc.cluster.local","method":"error","level":"ERROR","message":"Uncaught
>  exception in scheduled task 
> transactionalId-expiration","mdc":{},"file":"Logging.scala","line_number":"76","thread_name":"transaction-log-manager-0","logger_name":"kafka.utils.KafkaScheduler","class<span
>  class="code-quote">":"kafka.utils.Logging$class"}{code}
> Based on these problems and having read a bit of the server source, I guessed 
> that this would all be explained by there being TransactionMetadata instances 
> that are stuck in a pendingState.
> After doing a heap dump of the broker that was returning the error for our 
> particular group, we found this:
> !image-2018-10-18-13-02-22-371.png|width=723,height=224!
> There were indeed a bunch of live TransactionMetadata instances that had a 
> pending state of  "Dead" but should have already been cleaned up, confirming 
> my guess.
> Finally, after reading carefully through the TransactionStateManager callback 
> for producing tombstones for expired transactional ids I noticed that if 
> there is any error returned by the ReplicaManager, those transactions will 
> _not_ have their pending state cleared.
> —
> h3. Short summary:
> If the ReplicaManager fails to append the tombstone records for expiring a 
> transactional id (in my case, this likely happened during a rebalance that 
> wasn't properly rate limited), the broker fails to clear it's pending state 
> for that transactional id, blocking any future actions on that transactional 
> id (including cleanup), until the broker is restarted or another broker 
> without that problem becomes the coordinator for that transactional id.
> —
>  Related:
>  There was a very similar case in KAFKA-5351 where not clearing a 
> TransactionMetadata's pendingState caused similar issues.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to