junrao commented on code in PR #18685:
URL: https://github.com/apache/kafka/pull/18685#discussion_r1985613298


##########
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##########
@@ -522,10 +522,14 @@ class KRaftMetadataCache(
     if (kraftVersionLevel > 0) {
       finalizedFeatures.put(KRaftVersion.FEATURE_NAME, kraftVersionLevel)
     }
+    var metadataVersion = MetadataVersion.MINIMUM_VERSION
+    if (!image.features().metadataVersion().isEmpty) {
+      metadataVersion = image.features().metadataVersionOrThrow()
+    }
     new FinalizedFeatures(
-      image.features().metadataVersionOrThrow(),
+      metadataVersion,
       finalizedFeatures,
-      image.highestOffsetAndEpoch().offset)
+      image.highestOffsetAndEpoch().epoch())

Review Comment:
   A client could talk to multiple brokers. Since the metadata is propagated to 
broker asynchronously, we don't want a client to see finalized feature levels 
going backward. Here is an example of on FinalizedFeaturesEpoch is used. If 
FinalizedFeaturesEpoch increases, it will update the corresponding finalized 
feature level. If the level doesn't actually change, we performed an 
unnecessary update, but it's cheap. We could implement FinalizedFeaturesEpoch 
that actually reflects the feature updates. We will need to store it in the 
metadata log somewhere. 
   
   ```
       public synchronized void maybeUpdateTransactionV2Enabled(boolean 
onInitiatialization) {
           if (latestFinalizedFeaturesEpoch >= 
apiVersions.getMaxFinalizedFeaturesEpoch()) {
               return;
           }
           ApiVersions.FinalizedFeaturesInfo info = 
apiVersions.getFinalizedFeaturesInfo();
           latestFinalizedFeaturesEpoch = info.finalizedFeaturesEpoch;
           Short transactionVersion = 
info.finalizedFeatures.get("transaction.version");
           boolean wasTransactionV2Enabled = isTransactionV2Enabled;
           isTransactionV2Enabled = transactionVersion != null && 
transactionVersion >= 2;
           log.debug("Updating isTV2 enabled to {} with FinalizedFeaturesEpoch 
{}", isTransactionV2Enabled, latestFinalizedFeaturesEpoch);
           if (!onInitiatialization && !wasTransactionV2Enabled && 
isTransactionV2Enabled)
               clientSideEpochBumpRequired = true;
       }
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to