artemlivshits commented on code in PR #15524: URL: https://github.com/apache/kafka/pull/15524#discussion_r1521968243
########## core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala: ########## @@ -354,41 +366,42 @@ class TransactionMarkerChannelManager( txnLogAppend.newMetadata, appendCallback, _ == Errors.COORDINATOR_NOT_AVAILABLE, RequestLocal.NoCaching) } - def addTxnMarkersToBrokerQueue(transactionalId: String, - producerId: Long, + def addTxnMarkersToBrokerQueue(producerId: Long, producerEpoch: Short, result: TransactionResult, - coordinatorEpoch: Int, + pendingCompleteTxn: PendingCompleteTxn, Review Comment: pendingCompleteTxn has transactional id and coordinator epoch, so we don't need to pass them explicitly. ########## core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala: ########## @@ -109,23 +109,30 @@ object TransactionMarkerChannelManager { } -class TxnMarkerQueue(@volatile var destination: Node) { +class TxnMarkerQueue(@volatile var destination: Node) extends Logging { // keep track of the requests per txn topic partition so we can easily clear the queue // during partition emigration - private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int, BlockingQueue[TxnIdAndMarkerEntry]]().asScala + private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int, BlockingQueue[PendingCompleteTxnAndMarkerEntry]]().asScala Review Comment: Now we keep track of the PendingCompleteTxn that was added in the transactionsWithPendingMarkers. ########## core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala: ########## @@ -419,25 +432,34 @@ class TransactionMarkerChannelManager( def removeMarkersForTxnTopicPartition(txnTopicPartitionId: Int): Unit = { markersQueueForUnknownBroker.removeMarkersForTxnTopicPartition(txnTopicPartitionId).foreach { queue => - for (entry: TxnIdAndMarkerEntry <- queue.asScala) - removeMarkersForTxnId(entry.txnId) + for (entry <- queue.asScala) { + info(s"Removing $entry for txn partition $txnTopicPartitionId to destination broker -1") + removeMarkersForTxn(entry.pendingCompleteTxn) + } } - markersQueuePerBroker.foreach { case(_, brokerQueue) => + markersQueuePerBroker.foreach { case(brokerId, brokerQueue) => brokerQueue.removeMarkersForTxnTopicPartition(txnTopicPartitionId).foreach { queue => - for (entry: TxnIdAndMarkerEntry <- queue.asScala) - removeMarkersForTxnId(entry.txnId) + for (entry <- queue.asScala) { + info(s"Removing $entry for txn partition $txnTopicPartitionId to destination broker $brokerId") + removeMarkersForTxn(entry.pendingCompleteTxn) + } } } } - def removeMarkersForTxnId(transactionalId: String): Unit = { - transactionsWithPendingMarkers.remove(transactionalId) + def removeMarkersForTxn(pendingCompleteTxn: PendingCompleteTxn): Unit = { + val transactionalId = pendingCompleteTxn.transactionalId + val removed = transactionsWithPendingMarkers.remove(transactionalId, pendingCompleteTxn) Review Comment: This the the core change -- use the original pendingCompleteTxn value to remove the entry. The rest of the change is pretty much plumbing so that we can supply the correct pendingCompleteTxn. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org