[jira] [Commented] (KAFKA-7519) Transactional Ids Left in Pending State by TransactionStateManager During Transactional Id Expiration Are Unusable

2020-01-10 Thread Alper Kanat (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17012625#comment-17012625
 ] 

Alper Kanat commented on KAFKA-7519:


I just had the exact error in Kafka 2.2.0 – any ideas? is this PR really merged 
into 2.0.1, 2.1.0 releases?

!image-2020-01-10-12-37-28-804.png!

> Transactional Ids Left in Pending State by TransactionStateManager During 
> Transactional Id Expiration Are Unusable
> --
>
> Key: KAFKA-7519
> URL: https://issues.apache.org/jira/browse/KAFKA-7519
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Affects Versions: 2.0.0
>Reporter: Bridger Howell
>Assignee: Bridger Howell
>Priority: Blocker
> Fix For: 2.0.1, 2.1.0
>
> Attachments: KAFKA-7519.patch, image-2018-10-18-13-02-22-371.png, 
> image-2020-01-10-12-37-28-804.png
>
>
>  
> After digging into a case where an exactly-once streams process was bizarrely 
> unable to process incoming data, we observed the following:
>  * StreamThreads stalling while creating a producer, eventually resulting in 
> no consumption by that streams process. Looking into those threads, we found 
> they were stuck in a loop, sending InitProducerIdRequests and always 
> receiving back the retriable error CONCURRENT_TRANSACTIONS and trying again. 
> These requests always had the same transactional id.
>  * After changing the streams process to not use exactly-once, it was able to 
> process messages with no problems.
>  * Alternatively, changing the applicationId for that streams process, it was 
> able to process with no problems.
>  * Every hour,  every broker would fail the task `transactionalId-expiration` 
> with the following error:
>  ** 
> {code:java}
> {"exception":{"stacktrace":"java.lang.IllegalStateException: Preparing 
> transaction state transition to Dead while it already a pending sta
> te Dead
>     at 
> kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:262)
>     at kafka.coordinator
> .transaction.TransactionMetadata.prepareDead(TransactionMetadata.scala:237)
>     at kafka.coordinator.transaction.TransactionStateManager$$a
> nonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scal
> a:151)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$ano
> nfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scala:151)
>     at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>     at
>  
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>     at kafka.coordinator.transaction.TransactionSt
> ateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9.apply(TransactionStateManager.sc
> ala:150)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$a
> nonfun$2$$anonfun$apply$9.apply(TransactionStateManager.scala:149)
>     at scala.collection.TraversableLike$$anonfun$map$1.apply(Traversable
> Like.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.foreach(List.scala:392)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.map(List.scala:296)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$app
> ly$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:149)
>     at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enabl
> eTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:142)
>     at scala.collection.Traversabl
> eLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>     at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.
> scala:241)
>     at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
>     at scala.collection.mutable.HashMap$$anon
> fun$foreach$1.apply(HashMap.scala:130)
>     at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
>     at scala.collec
> tion.mutable.HashMap.foreachEntry(HashMap.scala:40)
>     at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
>     at scala.collecti
> on.TraversableLike$class.flatMap(TraversableLike.scala:241)
>     at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>     a
> t 
> 

[jira] [Commented] (KAFKA-7519) Transactional Ids Left in Pending State by TransactionStateManager During Transactional Id Expiration Are Unusable

2018-10-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659332#comment-16659332
 ] 

ASF GitHub Bot commented on KAFKA-7519:
---

ijuma closed pull request #5820: KAFKA-7519 Clear pending transaction state 
when expiration fails
URL: https://github.com/apache/kafka/pull/5820
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 574c64e2c65..589407c2a2c 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -166,37 +166,32 @@ class TransactionStateManager(brokerId: Int,
 (topicPartition, records)
   }
 
-
 def removeFromCacheCallback(responses: collection.Map[TopicPartition, 
PartitionResponse]): Unit = {
   responses.foreach { case (topicPartition, response) =>
-response.error match {
-  case Errors.NONE =>
-inReadLock(stateLock) {
-  val toRemove = 
transactionalIdByPartition(topicPartition.partition())
-  transactionMetadataCache.get(topicPartition.partition)
-.foreach { txnMetadataCacheEntry =>
-  toRemove.foreach { idCoordinatorEpochAndMetadata =>
-val txnMetadata = 
txnMetadataCacheEntry.metadataPerTransactionalId.get(idCoordinatorEpochAndMetadata.transactionalId)
-txnMetadata.inLock {
-  if (txnMetadataCacheEntry.coordinatorEpoch == 
idCoordinatorEpochAndMetadata.coordinatorEpoch
-&& txnMetadata.pendingState.contains(Dead)
-&& txnMetadata.producerEpoch == 
idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch
-  )
-
txnMetadataCacheEntry.metadataPerTransactionalId.remove(idCoordinatorEpochAndMetadata.transactionalId)
-  else {
- debug(s"failed to remove expired transactionalId: 
${idCoordinatorEpochAndMetadata.transactionalId}" +
-   s" from cache. pendingState: 
${txnMetadata.pendingState} producerEpoch: ${txnMetadata.producerEpoch}" +
-   s" expected producerEpoch: 
${idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch}" +
-   s" coordinatorEpoch: 
${txnMetadataCacheEntry.coordinatorEpoch} expected coordinatorEpoch: " +
-   
s"${idCoordinatorEpochAndMetadata.coordinatorEpoch}")
-txnMetadata.pendingState = None
-  }
-}
-  }
+inReadLock(stateLock) {
+  val toRemove = 
transactionalIdByPartition(topicPartition.partition)
+  transactionMetadataCache.get(topicPartition.partition).foreach { 
txnMetadataCacheEntry =>
+toRemove.foreach { idCoordinatorEpochAndMetadata =>
+  val transactionalId = 
idCoordinatorEpochAndMetadata.transactionalId
+  val txnMetadata = 
txnMetadataCacheEntry.metadataPerTransactionalId.get(transactionalId)
+  txnMetadata.inLock {
+if (txnMetadataCacheEntry.coordinatorEpoch == 
idCoordinatorEpochAndMetadata.coordinatorEpoch
+  && txnMetadata.pendingState.contains(Dead)
+  && txnMetadata.producerEpoch == 
idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch
+  && response.error == Errors.NONE) {
+  
txnMetadataCacheEntry.metadataPerTransactionalId.remove(transactionalId)
+} else {
+  warn(s"Failed to remove expired transactionalId: 
$transactionalId" +
+s" from cache. Tombstone append error code: 
${response.error}," +
+s" pendingState: ${txnMetadata.pendingState}, 
producerEpoch: ${txnMetadata.producerEpoch}," +
+s" expected producerEpoch: 
${idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch}," +
+s" coordinatorEpoch: 
${txnMetadataCacheEntry.coordinatorEpoch}, expected coordinatorEpoch: " +
+s"${idCoordinatorEpochAndMetadata.coordinatorEpoch}")
+  txnMetadata.pendingState = None
 }
+  }
 }
-  case _ =>
-  

[jira] [Commented] (KAFKA-7519) Transactional Ids Left in Pending State by TransactionStateManager During Transactional Id Expiration Are Unusable

2018-10-20 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657983#comment-16657983
 ] 

Dong Lin commented on KAFKA-7519:
-

[~ijuma] Thanks for updating the state. I would like to help review it. But it 
seems more related to the stream processing and transaction semantics. So it 
may be safer if someone with more expertise in these two areas can take a look 
:)

> Transactional Ids Left in Pending State by TransactionStateManager During 
> Transactional Id Expiration Are Unusable
> --
>
> Key: KAFKA-7519
> URL: https://issues.apache.org/jira/browse/KAFKA-7519
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Affects Versions: 2.0.0
>Reporter: Bridger Howell
>Priority: Blocker
> Fix For: 2.1.0
>
> Attachments: KAFKA-7519.patch, image-2018-10-18-13-02-22-371.png
>
>
>  
> After digging into a case where an exactly-once streams process was bizarrely 
> unable to process incoming data, we observed the following:
>  * StreamThreads stalling while creating a producer, eventually resulting in 
> no consumption by that streams process. Looking into those threads, we found 
> they were stuck in a loop, sending InitProducerIdRequests and always 
> receiving back the retriable error CONCURRENT_TRANSACTIONS and trying again. 
> These requests always had the same transactional id.
>  * After changing the streams process to not use exactly-once, it was able to 
> process messages with no problems.
>  * Alternatively, changing the applicationId for that streams process, it was 
> able to process with no problems.
>  * Every hour,  every broker would fail the task `transactionalId-expiration` 
> with the following error:
>  ** 
> {code:java}
> {"exception":{"stacktrace":"java.lang.IllegalStateException: Preparing 
> transaction state transition to Dead while it already a pending sta
> te Dead
>     at 
> kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:262)
>     at kafka.coordinator
> .transaction.TransactionMetadata.prepareDead(TransactionMetadata.scala:237)
>     at kafka.coordinator.transaction.TransactionStateManager$$a
> nonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scal
> a:151)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$ano
> nfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scala:151)
>     at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>     at
>  
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>     at kafka.coordinator.transaction.TransactionSt
> ateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9.apply(TransactionStateManager.sc
> ala:150)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$a
> nonfun$2$$anonfun$apply$9.apply(TransactionStateManager.scala:149)
>     at scala.collection.TraversableLike$$anonfun$map$1.apply(Traversable
> Like.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.foreach(List.scala:392)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.map(List.scala:296)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$app
> ly$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:149)
>     at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enabl
> eTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:142)
>     at scala.collection.Traversabl
> eLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>     at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.
> scala:241)
>     at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
>     at scala.collection.mutable.HashMap$$anon
> fun$foreach$1.apply(HashMap.scala:130)
>     at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
>     at scala.collec
> tion.mutable.HashMap.foreachEntry(HashMap.scala:40)
>     at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
>     at scala.collecti
> on.TraversableLike$class.flatMap(TraversableLike.scala:241)
>     at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>     a
> t 
> 

[jira] [Commented] (KAFKA-7519) Transactional Ids Left in Pending State by TransactionStateManager During Transactional Id Expiration Are Unusable

2018-10-20 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657899#comment-16657899
 ] 

Ismael Juma commented on KAFKA-7519:


[~lindong], I marked this as a blocker since the fix seems simple and the 
impact is severe.

> Transactional Ids Left in Pending State by TransactionStateManager During 
> Transactional Id Expiration Are Unusable
> --
>
> Key: KAFKA-7519
> URL: https://issues.apache.org/jira/browse/KAFKA-7519
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Affects Versions: 2.0.0
>Reporter: Bridger Howell
>Priority: Blocker
> Fix For: 2.1.0
>
> Attachments: KAFKA-7519.patch, image-2018-10-18-13-02-22-371.png
>
>
>  
> After digging into a case where an exactly-once streams process was bizarrely 
> unable to process incoming data, we observed the following:
>  * StreamThreads stalling while creating a producer, eventually resulting in 
> no consumption by that streams process. Looking into those threads, we found 
> they were stuck in a loop, sending InitProducerIdRequests and always 
> receiving back the retriable error CONCURRENT_TRANSACTIONS and trying again. 
> These requests always had the same transactional id.
>  * After changing the streams process to not use exactly-once, it was able to 
> process messages with no problems.
>  * Alternatively, changing the applicationId for that streams process, it was 
> able to process with no problems.
>  * Every hour,  every broker would fail the task `transactionalId-expiration` 
> with the following error:
>  ** 
> {code:java}
> {"exception":{"stacktrace":"java.lang.IllegalStateException: Preparing 
> transaction state transition to Dead while it already a pending sta
> te Dead
>     at 
> kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:262)
>     at kafka.coordinator
> .transaction.TransactionMetadata.prepareDead(TransactionMetadata.scala:237)
>     at kafka.coordinator.transaction.TransactionStateManager$$a
> nonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scal
> a:151)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$ano
> nfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scala:151)
>     at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>     at
>  
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>     at kafka.coordinator.transaction.TransactionSt
> ateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9.apply(TransactionStateManager.sc
> ala:150)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$a
> nonfun$2$$anonfun$apply$9.apply(TransactionStateManager.scala:149)
>     at scala.collection.TraversableLike$$anonfun$map$1.apply(Traversable
> Like.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.foreach(List.scala:392)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.map(List.scala:296)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$app
> ly$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:149)
>     at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enabl
> eTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:142)
>     at scala.collection.Traversabl
> eLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>     at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.
> scala:241)
>     at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
>     at scala.collection.mutable.HashMap$$anon
> fun$foreach$1.apply(HashMap.scala:130)
>     at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
>     at scala.collec
> tion.mutable.HashMap.foreachEntry(HashMap.scala:40)
>     at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
>     at scala.collecti
> on.TraversableLike$class.flatMap(TraversableLike.scala:241)
>     at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>     a
> t 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Tr
> ansactionStateManager.scala:142)
>     at 
> 

[jira] [Commented] (KAFKA-7519) Transactional Ids Left in Pending State by TransactionStateManager During Transactional Id Expiration Are Unusable

2018-10-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657556#comment-16657556
 ] 

ASF GitHub Bot commented on KAFKA-7519:
---

BirdHowl opened a new pull request #5820: KAFKA-7519 Clear pending transaction 
state when expiration fails
URL: https://github.com/apache/kafka/pull/5820
 
 
    Description:
   *Make sure that the transaction state is properly cleared when the 
`transactionalId-expiration` task fails. Operations on that transactional id 
would otherwise return a`CONCURRENT_TRANSACTIONS` error and appear 
"untouchable" to transaction state changes, preventing transactional producers 
from operating until a broker restart or transaction coordinator change.*
   
    Testing:
   *Unit tested by verifying that having the `transactionalId-expration` task 
won't leave the transaction metadata in a pending state if the replica manager 
returns an error.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Transactional Ids Left in Pending State by TransactionStateManager During 
> Transactional Id Expiration Are Unusable
> --
>
> Key: KAFKA-7519
> URL: https://issues.apache.org/jira/browse/KAFKA-7519
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Affects Versions: 2.0.0
>Reporter: Bridger Howell
>Assignee: Dhruvil Shah
>Priority: Critical
> Attachments: KAFKA-7519.patch, image-2018-10-18-13-02-22-371.png
>
>
>  
> After digging into a case where an exactly-once streams process was bizarrely 
> unable to process incoming data, we observed the following:
>  * StreamThreads stalling while creating a producer, eventually resulting in 
> no consumption by that streams process. Looking into those threads, we found 
> they were stuck in a loop, sending InitProducerIdRequests and always 
> receiving back the retriable error CONCURRENT_TRANSACTIONS and trying again. 
> These requests always had the same transactional id.
>  * After changing the streams process to not use exactly-once, it was able to 
> process messages with no problems.
>  * Alternatively, changing the applicationId for that streams process, it was 
> able to process with no problems.
>  * Every hour,  every broker would fail the task `transactionalId-expiration` 
> with the following error:
>  ** 
> {code:java}
> {"exception":{"stacktrace":"java.lang.IllegalStateException: Preparing 
> transaction state transition to Dead while it already a pending sta
> te Dead
>     at 
> kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:262)
>     at kafka.coordinator
> .transaction.TransactionMetadata.prepareDead(TransactionMetadata.scala:237)
>     at kafka.coordinator.transaction.TransactionStateManager$$a
> nonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scal
> a:151)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$ano
> nfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scala:151)
>     at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>     at
>  
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>     at kafka.coordinator.transaction.TransactionSt
> ateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9.apply(TransactionStateManager.sc
> ala:150)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$a
> nonfun$2$$anonfun$apply$9.apply(TransactionStateManager.scala:149)
>     at scala.collection.TraversableLike$$anonfun$map$1.apply(Traversable
> Like.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.foreach(List.scala:392)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.map(List.scala:296)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$app
> 

[jira] [Commented] (KAFKA-7519) Transactional Ids Left in Pending State by TransactionStateManager During Transactional Id Expiration Are Unusable

2018-10-19 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657056#comment-16657056
 ] 

Dhruvil Shah commented on KAFKA-7519:
-

[~howellbridger] thanks for the patch! Do you want to open a PR? I could help 
reviewing the code and look into any failures if needed.

> Transactional Ids Left in Pending State by TransactionStateManager During 
> Transactional Id Expiration Are Unusable
> --
>
> Key: KAFKA-7519
> URL: https://issues.apache.org/jira/browse/KAFKA-7519
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Affects Versions: 2.0.0
>Reporter: Bridger Howell
>Assignee: Dhruvil Shah
>Priority: Critical
> Attachments: KAFKA-7519.patch, image-2018-10-18-13-02-22-371.png
>
>
>  
> After digging into a case where an exactly-once streams process was bizarrely 
> unable to process incoming data, we observed the following:
>  * StreamThreads stalling while creating a producer, eventually resulting in 
> no consumption by that streams process. Looking into those threads, we found 
> they were stuck in a loop, sending InitProducerIdRequests and always 
> receiving back the retriable error CONCURRENT_TRANSACTIONS and trying again. 
> These requests always had the same transactional id.
>  * After changing the streams process to not use exactly-once, it was able to 
> process messages with no problems.
>  * Alternatively, changing the applicationId for that streams process, it was 
> able to process with no problems.
>  * Every hour,  every broker would fail the task `transactionalId-expiration` 
> with the following error:
>  ** 
> {code:java}
> {"exception":{"stacktrace":"java.lang.IllegalStateException: Preparing 
> transaction state transition to Dead while it already a pending sta
> te Dead
>     at 
> kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:262)
>     at kafka.coordinator
> .transaction.TransactionMetadata.prepareDead(TransactionMetadata.scala:237)
>     at kafka.coordinator.transaction.TransactionStateManager$$a
> nonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scal
> a:151)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$ano
> nfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scala:151)
>     at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>     at
>  
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>     at kafka.coordinator.transaction.TransactionSt
> ateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9.apply(TransactionStateManager.sc
> ala:150)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$a
> nonfun$2$$anonfun$apply$9.apply(TransactionStateManager.scala:149)
>     at scala.collection.TraversableLike$$anonfun$map$1.apply(Traversable
> Like.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.foreach(List.scala:392)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.map(List.scala:296)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$app
> ly$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:149)
>     at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enabl
> eTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:142)
>     at scala.collection.Traversabl
> eLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>     at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.
> scala:241)
>     at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
>     at scala.collection.mutable.HashMap$$anon
> fun$foreach$1.apply(HashMap.scala:130)
>     at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
>     at scala.collec
> tion.mutable.HashMap.foreachEntry(HashMap.scala:40)
>     at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
>     at scala.collecti
> on.TraversableLike$class.flatMap(TraversableLike.scala:241)
>     at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>     a
> t 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Tr
> 

[jira] [Commented] (KAFKA-7519) Transactional Ids Left in Pending State by TransactionStateManager During Transactional Id Expiration Are Unusable

2018-10-18 Thread Bridger Howell (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655973#comment-16655973
 ] 

Bridger Howell commented on KAFKA-7519:
---

[~dhruvilshah], [~hachikuji] thanks for looking at this!

I'd prepared a patch, since it seemed pretty straight forward to fix, but I 
wasn't able to get it to pass all of the tests running locally. I've uploaded 
it here in case it's useful or there's a problem with the tests.

> Transactional Ids Left in Pending State by TransactionStateManager During 
> Transactional Id Expiration Are Unusable
> --
>
> Key: KAFKA-7519
> URL: https://issues.apache.org/jira/browse/KAFKA-7519
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Affects Versions: 2.0.0
>Reporter: Bridger Howell
>Assignee: Dhruvil Shah
>Priority: Critical
> Attachments: KAFKA-7519.patch, image-2018-10-18-13-02-22-371.png
>
>
>  
> After digging into a case where an exactly-once streams process was bizarrely 
> unable to process incoming data, we observed the following:
>  * StreamThreads stalling while creating a producer, eventually resulting in 
> no consumption by that streams process. Looking into those threads, we found 
> they were stuck in a loop, sending InitProducerIdRequests and always 
> receiving back the retriable error CONCURRENT_TRANSACTIONS and trying again. 
> These requests always had the same transactional id.
>  * After changing the streams process to not use exactly-once, it was able to 
> process messages with no problems.
>  * Alternatively, changing the applicationId for that streams process, it was 
> able to process with no problems.
>  * Every hour,  every broker would fail the task `transactionalId-expiration` 
> with the following error:
>  ** 
> {code:java}
> {"exception":{"stacktrace":"java.lang.IllegalStateException: Preparing 
> transaction state transition to Dead while it already a pending sta
> te Dead
>     at 
> kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:262)
>     at kafka.coordinator
> .transaction.TransactionMetadata.prepareDead(TransactionMetadata.scala:237)
>     at kafka.coordinator.transaction.TransactionStateManager$$a
> nonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scal
> a:151)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$ano
> nfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scala:151)
>     at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>     at
>  
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>     at kafka.coordinator.transaction.TransactionSt
> ateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9.apply(TransactionStateManager.sc
> ala:150)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$a
> nonfun$2$$anonfun$apply$9.apply(TransactionStateManager.scala:149)
>     at scala.collection.TraversableLike$$anonfun$map$1.apply(Traversable
> Like.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.foreach(List.scala:392)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.map(List.scala:296)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$app
> ly$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:149)
>     at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enabl
> eTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:142)
>     at scala.collection.Traversabl
> eLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>     at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.
> scala:241)
>     at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
>     at scala.collection.mutable.HashMap$$anon
> fun$foreach$1.apply(HashMap.scala:130)
>     at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
>     at scala.collec
> tion.mutable.HashMap.foreachEntry(HashMap.scala:40)
>     at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
>     at scala.collecti
> on.TraversableLike$class.flatMap(TraversableLike.scala:241)
>     at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>     a
> t 
> 

[jira] [Commented] (KAFKA-7519) Transactional Ids Left in Pending State by TransactionStateManager During Transactional Id Expiration Are Unusable

2018-10-18 Thread Jason Gustafson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655934#comment-16655934
 ] 

Jason Gustafson commented on KAFKA-7519:


Good find and thanks for the investigation.

> Transactional Ids Left in Pending State by TransactionStateManager During 
> Transactional Id Expiration Are Unusable
> --
>
> Key: KAFKA-7519
> URL: https://issues.apache.org/jira/browse/KAFKA-7519
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Reporter: Bridger Howell
>Assignee: Dhruvil Shah
>Priority: Critical
> Attachments: image-2018-10-18-13-02-22-371.png
>
>
>  
> After digging into a case where an exactly-once streams process was bizarrely 
> unable to process incoming data, we observed the following:
>  * StreamThreads stalling while creating a producer, eventually resulting in 
> no consumption by that streams process. Looking into those threads, we found 
> they were stuck in a loop, sending InitProducerIdRequests and always 
> receiving back the retriable error CONCURRENT_TRANSACTIONS and trying again. 
> These requests always had the same transactional id.
>  * After changing the streams process to not use exactly-once, it was able to 
> process messages with no problems.
>  * Alternatively, changing the applicationId for that streams process, it was 
> able to process with no problems.
>  * Every hour,  every broker would fail the task `transactionalId-expiration` 
> with the following error:
>  ** 
> {code:java}
> {"exception":{"stacktrace":"java.lang.IllegalStateException: Preparing 
> transaction state transition to Dead while it already a pending sta
> te Dead
>     at 
> kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:262)
>     at kafka.coordinator
> .transaction.TransactionMetadata.prepareDead(TransactionMetadata.scala:237)
>     at kafka.coordinator.transaction.TransactionStateManager$$a
> nonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scal
> a:151)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$ano
> nfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scala:151)
>     at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>     at
>  
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>     at kafka.coordinator.transaction.TransactionSt
> ateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9.apply(TransactionStateManager.sc
> ala:150)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$a
> nonfun$2$$anonfun$apply$9.apply(TransactionStateManager.scala:149)
>     at scala.collection.TraversableLike$$anonfun$map$1.apply(Traversable
> Like.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.foreach(List.scala:392)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.map(List.scala:296)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$app
> ly$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:149)
>     at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enabl
> eTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:142)
>     at scala.collection.Traversabl
> eLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>     at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.
> scala:241)
>     at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
>     at scala.collection.mutable.HashMap$$anon
> fun$foreach$1.apply(HashMap.scala:130)
>     at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
>     at scala.collec
> tion.mutable.HashMap.foreachEntry(HashMap.scala:40)
>     at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
>     at scala.collecti
> on.TraversableLike$class.flatMap(TraversableLike.scala:241)
>     at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>     a
> t 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Tr
> ansactionStateManager.scala:142)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$a
>