artemlivshits commented on code in PR #15524: URL: https://github.com/apache/kafka/pull/15524#discussion_r1524022035
########## core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala: ########## @@ -109,23 +109,30 @@ object TransactionMarkerChannelManager { } -class TxnMarkerQueue(@volatile var destination: Node) { +class TxnMarkerQueue(@volatile var destination: Node) extends Logging { // keep track of the requests per txn topic partition so we can easily clear the queue // during partition emigration - private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int, BlockingQueue[TxnIdAndMarkerEntry]]().asScala + private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int, BlockingQueue[PendingCompleteTxnAndMarkerEntry]]().asScala - def removeMarkersForTxnTopicPartition(partition: Int): Option[BlockingQueue[TxnIdAndMarkerEntry]] = { + def removeMarkersForTxnTopicPartition(partition: Int): Option[BlockingQueue[PendingCompleteTxnAndMarkerEntry]] = { markersPerTxnTopicPartition.remove(partition) } - def addMarkers(txnTopicPartition: Int, txnIdAndMarker: TxnIdAndMarkerEntry): Unit = { - val queue = CoreUtils.atomicGetOrUpdate(markersPerTxnTopicPartition, txnTopicPartition, - new LinkedBlockingQueue[TxnIdAndMarkerEntry]()) - queue.add(txnIdAndMarker) + def addMarkers(txnTopicPartition: Int, pendingCompleteTxnAndMarker: PendingCompleteTxnAndMarkerEntry): Unit = { + val queue = CoreUtils.atomicGetOrUpdate(markersPerTxnTopicPartition, txnTopicPartition, { + info(s"Creating new marker queue for txn partition $txnTopicPartition to destination broker ${destination.id}") + new LinkedBlockingQueue[PendingCompleteTxnAndMarkerEntry]() + }) + queue.add(pendingCompleteTxnAndMarker) + + if (markersPerTxnTopicPartition.get(txnTopicPartition).orNull != queue) { + // This could happen if the queue got removed concurrently. Review Comment: As far as I can see, it shouldn't affect the user visible behavior. It does create an interesting state when the queue is removed in removeMarkersForTxnTopicPartition -- we could have: 1. [addMarkers] Retrieve queue. 2. [removeMarkersForTxnTopicPartition] Remove queue. 3. [removeMarkersForTxnTopicPartition] Iterate over queue, but not removeMarkersForTxn because queue is empty. 4. [addMarkers] Add markers to the queue. Now we've effectively removed the markers while transactionsWithPendingMarkers has an entry. This state could last for a while if the removal happened on unload (and technically the txn id could expire or etc. so this state may stay indefinitely until broker restart), but as soon as real workflow happens on this txn id that sends out markers, the proper entry will be created and the actual functionality will work as expected. In other words, this race can lead to an orphan entry in transactionsWithPendingMarkers, but it doesn't affect anything (other than leaking a small amount of memory) until the markers are sent, and sending markers will fix it. ########## core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala: ########## @@ -90,9 +90,10 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int, val writeTxnMarkerResponse = response.responseBody.asInstanceOf[WriteTxnMarkersResponse] val responseErrors = writeTxnMarkerResponse.errorsByProducerId - for (txnIdAndMarker <- txnIdAndMarkerEntries.asScala) { Review Comment: I agree this code could benefit from some refactor, we should probably structure it so that instead of branching on wasDisconnected at the top, it should just iterate over the pending entries and check wasDisconnected in specific cases. But I think it should be done separately, as this change is fairly mechanical, that simplifies the review of what it does. ########## core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala: ########## @@ -109,23 +109,30 @@ object TransactionMarkerChannelManager { } -class TxnMarkerQueue(@volatile var destination: Node) { +class TxnMarkerQueue(@volatile var destination: Node) extends Logging { // keep track of the requests per txn topic partition so we can easily clear the queue // during partition emigration - private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int, BlockingQueue[TxnIdAndMarkerEntry]]().asScala + private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int, BlockingQueue[PendingCompleteTxnAndMarkerEntry]]().asScala - def removeMarkersForTxnTopicPartition(partition: Int): Option[BlockingQueue[TxnIdAndMarkerEntry]] = { + def removeMarkersForTxnTopicPartition(partition: Int): Option[BlockingQueue[PendingCompleteTxnAndMarkerEntry]] = { markersPerTxnTopicPartition.remove(partition) } - def addMarkers(txnTopicPartition: Int, txnIdAndMarker: TxnIdAndMarkerEntry): Unit = { - val queue = CoreUtils.atomicGetOrUpdate(markersPerTxnTopicPartition, txnTopicPartition, - new LinkedBlockingQueue[TxnIdAndMarkerEntry]()) - queue.add(txnIdAndMarker) + def addMarkers(txnTopicPartition: Int, pendingCompleteTxnAndMarker: PendingCompleteTxnAndMarkerEntry): Unit = { + val queue = CoreUtils.atomicGetOrUpdate(markersPerTxnTopicPartition, txnTopicPartition, { + info(s"Creating new marker queue for txn partition $txnTopicPartition to destination broker ${destination.id}") Review Comment: That's correct, this is logged when the value is created, a more precise logic would be to make atomicGetOrUpdate return an indication if the value is actually created (then we'd log it exactly once), but I didn't think it would be worth the complexity -- at worst we'd get a couple logs at the same time and one of them would create the queue. I'll add a comment. ########## core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala: ########## @@ -419,25 +432,34 @@ class TransactionMarkerChannelManager( def removeMarkersForTxnTopicPartition(txnTopicPartitionId: Int): Unit = { markersQueueForUnknownBroker.removeMarkersForTxnTopicPartition(txnTopicPartitionId).foreach { queue => - for (entry: TxnIdAndMarkerEntry <- queue.asScala) - removeMarkersForTxnId(entry.txnId) + for (entry <- queue.asScala) { Review Comment: In cases I investigated it was a couple dozens or so, but I don't have the precise stats from a large selection of cases. We log messages on every retry of failed marker send, I think this message would be much less spammy than that (these would happen only when partitions are changed). The disadvantage of having a single log is that it'll get eventually truncated, but these logs can help to see interesting transitions that could help to investigate race conditions related to load / unload. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org