jolshan commented on code in PR #15524: URL: https://github.com/apache/kafka/pull/15524#discussion_r1523907297
########## core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala: ########## @@ -109,23 +109,30 @@ object TransactionMarkerChannelManager { } -class TxnMarkerQueue(@volatile var destination: Node) { +class TxnMarkerQueue(@volatile var destination: Node) extends Logging { // keep track of the requests per txn topic partition so we can easily clear the queue // during partition emigration - private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int, BlockingQueue[TxnIdAndMarkerEntry]]().asScala + private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int, BlockingQueue[PendingCompleteTxnAndMarkerEntry]]().asScala - def removeMarkersForTxnTopicPartition(partition: Int): Option[BlockingQueue[TxnIdAndMarkerEntry]] = { + def removeMarkersForTxnTopicPartition(partition: Int): Option[BlockingQueue[PendingCompleteTxnAndMarkerEntry]] = { markersPerTxnTopicPartition.remove(partition) } - def addMarkers(txnTopicPartition: Int, txnIdAndMarker: TxnIdAndMarkerEntry): Unit = { - val queue = CoreUtils.atomicGetOrUpdate(markersPerTxnTopicPartition, txnTopicPartition, - new LinkedBlockingQueue[TxnIdAndMarkerEntry]()) - queue.add(txnIdAndMarker) + def addMarkers(txnTopicPartition: Int, pendingCompleteTxnAndMarker: PendingCompleteTxnAndMarkerEntry): Unit = { + val queue = CoreUtils.atomicGetOrUpdate(markersPerTxnTopicPartition, txnTopicPartition, { + info(s"Creating new marker queue for txn partition $txnTopicPartition to destination broker ${destination.id}") Review Comment: I noticed in the doc: ``` `createValue` may be invoked more than once if multiple threads attempt to insert a key at the same time ``` This seems ok, but just wanted to call it out -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org