artemlivshits commented on code in PR #15524:
URL: https://github.com/apache/kafka/pull/15524#discussion_r1521968243


##########
core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala:
##########
@@ -354,41 +366,42 @@ class TransactionMarkerChannelManager(
       txnLogAppend.newMetadata, appendCallback, _ == 
Errors.COORDINATOR_NOT_AVAILABLE, RequestLocal.NoCaching)
   }
 
-  def addTxnMarkersToBrokerQueue(transactionalId: String,
-                                 producerId: Long,
+  def addTxnMarkersToBrokerQueue(producerId: Long,
                                  producerEpoch: Short,
                                  result: TransactionResult,
-                                 coordinatorEpoch: Int,
+                                 pendingCompleteTxn: PendingCompleteTxn,

Review Comment:
   pendingCompleteTxn has transactional id and coordinator epoch, so we don't 
need to pass them explicitly.



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala:
##########
@@ -109,23 +109,30 @@ object TransactionMarkerChannelManager {
 
 }
 
-class TxnMarkerQueue(@volatile var destination: Node) {
+class TxnMarkerQueue(@volatile var destination: Node) extends Logging {
 
   // keep track of the requests per txn topic partition so we can easily clear 
the queue
   // during partition emigration
-  private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int, 
BlockingQueue[TxnIdAndMarkerEntry]]().asScala
+  private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int, 
BlockingQueue[PendingCompleteTxnAndMarkerEntry]]().asScala

Review Comment:
   Now we keep track of the PendingCompleteTxn that was added in the 
transactionsWithPendingMarkers.



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala:
##########
@@ -419,25 +432,34 @@ class TransactionMarkerChannelManager(
 
   def removeMarkersForTxnTopicPartition(txnTopicPartitionId: Int): Unit = {
     
markersQueueForUnknownBroker.removeMarkersForTxnTopicPartition(txnTopicPartitionId).foreach
 { queue =>
-      for (entry: TxnIdAndMarkerEntry <- queue.asScala)
-        removeMarkersForTxnId(entry.txnId)
+      for (entry <- queue.asScala) {
+        info(s"Removing $entry for txn partition $txnTopicPartitionId to 
destination broker -1")
+        removeMarkersForTxn(entry.pendingCompleteTxn)
+      }
     }
 
-    markersQueuePerBroker.foreach { case(_, brokerQueue) =>
+    markersQueuePerBroker.foreach { case(brokerId, brokerQueue) =>
       
brokerQueue.removeMarkersForTxnTopicPartition(txnTopicPartitionId).foreach { 
queue =>
-        for (entry: TxnIdAndMarkerEntry <- queue.asScala)
-          removeMarkersForTxnId(entry.txnId)
+        for (entry <- queue.asScala) {
+          info(s"Removing $entry for txn partition $txnTopicPartitionId to 
destination broker $brokerId")
+          removeMarkersForTxn(entry.pendingCompleteTxn)
+        }
       }
     }
   }
 
-  def removeMarkersForTxnId(transactionalId: String): Unit = {
-    transactionsWithPendingMarkers.remove(transactionalId)
+  def removeMarkersForTxn(pendingCompleteTxn: PendingCompleteTxn): Unit = {
+    val transactionalId = pendingCompleteTxn.transactionalId
+    val removed = transactionsWithPendingMarkers.remove(transactionalId, 
pendingCompleteTxn)

Review Comment:
   This the the core change -- use the original pendingCompleteTxn value to 
remove the entry.  The rest of the change is pretty much plumbing so that we 
can supply the correct pendingCompleteTxn.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to