junrao commented on code in PR #18685: URL: https://github.com/apache/kafka/pull/18685#discussion_r1985613298
########## core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala: ########## @@ -522,10 +522,14 @@ class KRaftMetadataCache( if (kraftVersionLevel > 0) { finalizedFeatures.put(KRaftVersion.FEATURE_NAME, kraftVersionLevel) } + var metadataVersion = MetadataVersion.MINIMUM_VERSION + if (!image.features().metadataVersion().isEmpty) { + metadataVersion = image.features().metadataVersionOrThrow() + } new FinalizedFeatures( - image.features().metadataVersionOrThrow(), + metadataVersion, finalizedFeatures, - image.highestOffsetAndEpoch().offset) + image.highestOffsetAndEpoch().epoch()) Review Comment: A client could talk to multiple brokers. Since the metadata is propagated to broker asynchronously, we don't want a client to see finalized feature levels going backward. Here is an example of on FinalizedFeaturesEpoch is used. If FinalizedFeaturesEpoch increases, it will update the corresponding finalized feature level. If the level doesn't actually change, we performed an unnecessary update, but it's cheap. We could implement FinalizedFeaturesEpoch that actually reflects the feature updates. We will need to store it in the metadata log somewhere. ``` public synchronized void maybeUpdateTransactionV2Enabled(boolean onInitiatialization) { if (latestFinalizedFeaturesEpoch >= apiVersions.getMaxFinalizedFeaturesEpoch()) { return; } ApiVersions.FinalizedFeaturesInfo info = apiVersions.getFinalizedFeaturesInfo(); latestFinalizedFeaturesEpoch = info.finalizedFeaturesEpoch; Short transactionVersion = info.finalizedFeatures.get("transaction.version"); boolean wasTransactionV2Enabled = isTransactionV2Enabled; isTransactionV2Enabled = transactionVersion != null && transactionVersion >= 2; log.debug("Updating isTV2 enabled to {} with FinalizedFeaturesEpoch {}", isTransactionV2Enabled, latestFinalizedFeaturesEpoch); if (!onInitiatialization && !wasTransactionV2Enabled && isTransactionV2Enabled) clientSideEpochBumpRequired = true; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org