artemlivshits commented on code in PR #15524:
URL: https://github.com/apache/kafka/pull/15524#discussion_r1524022035


##########
core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala:
##########
@@ -109,23 +109,30 @@ object TransactionMarkerChannelManager {
 
 }
 
-class TxnMarkerQueue(@volatile var destination: Node) {
+class TxnMarkerQueue(@volatile var destination: Node) extends Logging {
 
   // keep track of the requests per txn topic partition so we can easily clear 
the queue
   // during partition emigration
-  private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int, 
BlockingQueue[TxnIdAndMarkerEntry]]().asScala
+  private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int, 
BlockingQueue[PendingCompleteTxnAndMarkerEntry]]().asScala
 
-  def removeMarkersForTxnTopicPartition(partition: Int): 
Option[BlockingQueue[TxnIdAndMarkerEntry]] = {
+  def removeMarkersForTxnTopicPartition(partition: Int): 
Option[BlockingQueue[PendingCompleteTxnAndMarkerEntry]] = {
     markersPerTxnTopicPartition.remove(partition)
   }
 
-  def addMarkers(txnTopicPartition: Int, txnIdAndMarker: TxnIdAndMarkerEntry): 
Unit = {
-    val queue = CoreUtils.atomicGetOrUpdate(markersPerTxnTopicPartition, 
txnTopicPartition,
-        new LinkedBlockingQueue[TxnIdAndMarkerEntry]())
-    queue.add(txnIdAndMarker)
+  def addMarkers(txnTopicPartition: Int, pendingCompleteTxnAndMarker: 
PendingCompleteTxnAndMarkerEntry): Unit = {
+    val queue = CoreUtils.atomicGetOrUpdate(markersPerTxnTopicPartition, 
txnTopicPartition, {
+      info(s"Creating new marker queue for txn partition $txnTopicPartition to 
destination broker ${destination.id}")
+      new LinkedBlockingQueue[PendingCompleteTxnAndMarkerEntry]()
+    })
+    queue.add(pendingCompleteTxnAndMarker)
+
+    if (markersPerTxnTopicPartition.get(txnTopicPartition).orNull != queue) {
+      // This could happen if the queue got removed concurrently.

Review Comment:
   As far as I can see, it shouldn't affect the user visible behavior.  It does 
create an interesting state when the queue is removed in 
removeMarkersForTxnTopicPartition -- we could have:
   1. [addMarkers] Retrieve queue.
   2. [removeMarkersForTxnTopicPartition] Remove queue.
   3. [removeMarkersForTxnTopicPartition] Iterate over queue, but not 
removeMarkersForTxn because queue is empty.
   4. [addMarkers] Add markers to the queue.
   
   Now we've effectively removed the markers while 
transactionsWithPendingMarkers has an entry.
   
   This state could last for a while if the removal happened on unload (and 
technically the txn id could expire or etc. so this state may stay indefinitely 
until broker restart), but as soon as real workflow happens on this txn id that 
sends out markers, the proper entry will be created and the actual 
functionality will work as expected.
   
   In other words, this race can lead to an orphan entry in 
transactionsWithPendingMarkers, but it doesn't affect anything (other than 
leaking a small amount of memory) until the markers are sent, and sending 
markers will fix it.



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala:
##########
@@ -90,9 +90,10 @@ class TransactionMarkerRequestCompletionHandler(brokerId: 
Int,
       val writeTxnMarkerResponse = 
response.responseBody.asInstanceOf[WriteTxnMarkersResponse]
 
       val responseErrors = writeTxnMarkerResponse.errorsByProducerId
-      for (txnIdAndMarker <- txnIdAndMarkerEntries.asScala) {

Review Comment:
   I agree this code could benefit from some refactor, we should probably 
structure it so that instead of branching on wasDisconnected at the top, it 
should just iterate over the pending entries and check wasDisconnected in 
specific cases.  But I think it should be done separately, as this change is 
fairly mechanical, that simplifies the review of what it does. 



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala:
##########
@@ -109,23 +109,30 @@ object TransactionMarkerChannelManager {
 
 }
 
-class TxnMarkerQueue(@volatile var destination: Node) {
+class TxnMarkerQueue(@volatile var destination: Node) extends Logging {
 
   // keep track of the requests per txn topic partition so we can easily clear 
the queue
   // during partition emigration
-  private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int, 
BlockingQueue[TxnIdAndMarkerEntry]]().asScala
+  private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int, 
BlockingQueue[PendingCompleteTxnAndMarkerEntry]]().asScala
 
-  def removeMarkersForTxnTopicPartition(partition: Int): 
Option[BlockingQueue[TxnIdAndMarkerEntry]] = {
+  def removeMarkersForTxnTopicPartition(partition: Int): 
Option[BlockingQueue[PendingCompleteTxnAndMarkerEntry]] = {
     markersPerTxnTopicPartition.remove(partition)
   }
 
-  def addMarkers(txnTopicPartition: Int, txnIdAndMarker: TxnIdAndMarkerEntry): 
Unit = {
-    val queue = CoreUtils.atomicGetOrUpdate(markersPerTxnTopicPartition, 
txnTopicPartition,
-        new LinkedBlockingQueue[TxnIdAndMarkerEntry]())
-    queue.add(txnIdAndMarker)
+  def addMarkers(txnTopicPartition: Int, pendingCompleteTxnAndMarker: 
PendingCompleteTxnAndMarkerEntry): Unit = {
+    val queue = CoreUtils.atomicGetOrUpdate(markersPerTxnTopicPartition, 
txnTopicPartition, {
+      info(s"Creating new marker queue for txn partition $txnTopicPartition to 
destination broker ${destination.id}")

Review Comment:
   That's correct, this is logged when the value is created, a more precise 
logic would be to make atomicGetOrUpdate return an indication if the value is 
actually created (then we'd log it exactly once), but I didn't think it would 
be worth the complexity -- at worst we'd get a couple logs at the same time and 
one of them would create the queue.  I'll add a comment.



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala:
##########
@@ -419,25 +432,34 @@ class TransactionMarkerChannelManager(
 
   def removeMarkersForTxnTopicPartition(txnTopicPartitionId: Int): Unit = {
     
markersQueueForUnknownBroker.removeMarkersForTxnTopicPartition(txnTopicPartitionId).foreach
 { queue =>
-      for (entry: TxnIdAndMarkerEntry <- queue.asScala)
-        removeMarkersForTxnId(entry.txnId)
+      for (entry <- queue.asScala) {

Review Comment:
   In cases I investigated it was a couple dozens or so, but I don't have the 
precise stats from a large selection of cases.  We log messages on every retry 
of failed marker send, I think this message would be much less spammy than that 
(these would happen only when partitions are changed).
   The disadvantage of having a single log is that it'll get eventually 
truncated, but these logs can help to see interesting transitions that could 
help to investigate race conditions related to load / unload.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to