jolshan commented on code in PR #15524:
URL: https://github.com/apache/kafka/pull/15524#discussion_r1523907297


##########
core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala:
##########
@@ -109,23 +109,30 @@ object TransactionMarkerChannelManager {
 
 }
 
-class TxnMarkerQueue(@volatile var destination: Node) {
+class TxnMarkerQueue(@volatile var destination: Node) extends Logging {
 
   // keep track of the requests per txn topic partition so we can easily clear 
the queue
   // during partition emigration
-  private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int, 
BlockingQueue[TxnIdAndMarkerEntry]]().asScala
+  private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int, 
BlockingQueue[PendingCompleteTxnAndMarkerEntry]]().asScala
 
-  def removeMarkersForTxnTopicPartition(partition: Int): 
Option[BlockingQueue[TxnIdAndMarkerEntry]] = {
+  def removeMarkersForTxnTopicPartition(partition: Int): 
Option[BlockingQueue[PendingCompleteTxnAndMarkerEntry]] = {
     markersPerTxnTopicPartition.remove(partition)
   }
 
-  def addMarkers(txnTopicPartition: Int, txnIdAndMarker: TxnIdAndMarkerEntry): 
Unit = {
-    val queue = CoreUtils.atomicGetOrUpdate(markersPerTxnTopicPartition, 
txnTopicPartition,
-        new LinkedBlockingQueue[TxnIdAndMarkerEntry]())
-    queue.add(txnIdAndMarker)
+  def addMarkers(txnTopicPartition: Int, pendingCompleteTxnAndMarker: 
PendingCompleteTxnAndMarkerEntry): Unit = {
+    val queue = CoreUtils.atomicGetOrUpdate(markersPerTxnTopicPartition, 
txnTopicPartition, {
+      info(s"Creating new marker queue for txn partition $txnTopicPartition to 
destination broker ${destination.id}")

Review Comment:
   I noticed in the doc:
   
   ```
   `createValue` may be invoked more than once if multiple threads attempt to 
insert a key at the same time
    ```
   
   This seems ok, but just wanted to call it out



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to