junrao commented on code in PR #16893:
URL: https://github.com/apache/kafka/pull/16893#discussion_r1720079897
##########
core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala:
##########
@@ -261,7 +257,9 @@ class TransactionMarkerChannelManager(
}.filter { case (_, entries) => !entries.isEmpty }.map { case (node,
entries) =>
val markersToSend = entries.asScala.map(_.txnMarkerEntry).asJava
val requestCompletionHandler = new
TransactionMarkerRequestCompletionHandler(node.id, txnStateManager, this,
entries)
- val request = new
WriteTxnMarkersRequest.Builder(writeTxnMarkersRequestVersion, markersToSend)
+ val request = new WriteTxnMarkersRequest.Builder(
+ config.interBrokerProtocolVersion.writeTxnMarkersRequestVersion(),
markersToSend
Review Comment:
It seems that we should get the MV from `metadataCache.metadataVersion()
`instead of `config.interBrokerProtocolVersion` since MV can be set
dynamically. @jolshan : What do you think?
##########
server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java:
##########
@@ -466,6 +467,15 @@ public void assertLatestProductionIsLessThanLatest() {
" to be less than the latest of " +
MetadataVersion.latestTesting());
}
+ @Test
+ public void testLastMetadataProductionDontReturnUnstableVersion() {
Review Comment:
Perhaps add a comment like the following.
`The broker picks the version for a few inter broker RPCs based on the
metadata version, instead of the supported version from ApiResponse. We need to
make sure that the latest production MV doesn't accidentally depend on an
unstable request version.`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]