rreddy-22 commented on code in PR #20868:
URL: https://github.com/apache/kafka/pull/20868#discussion_r2520500063
##########
core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala:
##########
@@ -260,7 +260,19 @@ class TransactionMarkerChannelManager(
}.filter { case (_, entries) => !entries.isEmpty }.map { case (node,
entries) =>
val markersToSend = entries.asScala.map(_.txnMarkerEntry).asJava
val requestCompletionHandler = new
TransactionMarkerRequestCompletionHandler(node.id, txnStateManager, this,
entries)
- val request = new WriteTxnMarkersRequest.Builder(markersToSend)
+
+ // Always extract transaction versions from entries (one per marker,
preserving order)
+ val transactionVersions = new util.ArrayList[java.lang.Byte]()
+ entries.asScala.foreach { entry =>
+ val transactionVersion =
entry.pendingCompleteTxn.txnMetadata.clientTransactionVersion()
+ if (transactionVersion != null) {
Review Comment:
Not really, but I just wanted to be safe jic
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]