cmccabe commented on code in PR #19545:
URL: https://github.com/apache/kafka/pull/19545#discussion_r2058834500


##########
metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java:
##########
@@ -66,48 +66,48 @@ public static BootstrapMetadata fromVersions(
                     setFeatureLevel(level), (short) 0));
             }
         }
-        return new BootstrapMetadata(records, metadataVersion, source);
+        return new BootstrapMetadata(records, metadataVersion.featureLevel(), 
source);
     }
 
     public static BootstrapMetadata fromVersion(MetadataVersion 
metadataVersion, String source) {
         List<ApiMessageAndVersion> records = List.of(
             new ApiMessageAndVersion(new FeatureLevelRecord().
                 setName(MetadataVersion.FEATURE_NAME).
                 setFeatureLevel(metadataVersion.featureLevel()), (short) 0));
-        return new BootstrapMetadata(records, metadataVersion, source);
+        return new BootstrapMetadata(records, metadataVersion.featureLevel(), 
source);
     }
 
     public static BootstrapMetadata fromRecords(List<ApiMessageAndVersion> 
records, String source) {
-        MetadataVersion metadataVersion = null;
+        Optional<Short> metadataVersionLevel = Optional.empty();
         for (ApiMessageAndVersion record : records) {
-            Optional<MetadataVersion> version = 
recordToMetadataVersion(record.message());
-            if (version.isPresent()) {
-                metadataVersion = version.get();
+            Optional<Short> level = 
recordToMetadataVersionLevel(record.message());
+            if (level.isPresent()) {
+                metadataVersionLevel = level;
             }
         }
-        if (metadataVersion == null) {
+        if (!metadataVersionLevel.isPresent()) {
             throw new RuntimeException("No FeatureLevelRecord for " + 
MetadataVersion.FEATURE_NAME +
                     " was found in the bootstrap metadata from " + source);
         }
-        return new BootstrapMetadata(records, metadataVersion, source);
+        return new BootstrapMetadata(records, metadataVersionLevel.get(), 
source);
     }
 
-    public static Optional<MetadataVersion> recordToMetadataVersion(ApiMessage 
record) {
+    public static Optional<Short> recordToMetadataVersionLevel(ApiMessage 
record) {
         if (record instanceof FeatureLevelRecord featureLevel) {
             if (featureLevel.name().equals(MetadataVersion.FEATURE_NAME)) {
-                return 
Optional.of(MetadataVersion.fromFeatureLevel(featureLevel.featureLevel()));
+                return Optional.of(featureLevel.featureLevel());

Review Comment:
   It's true that we might have to revisit this if we drop support for records 
previously used in bootstrapping.



##########
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##########
@@ -1619,6 +1619,55 @@ class KRaftClusterTest {
     }
   }
 
+  /**
+   * Test that once a cluster is formatted, a bootstrap.metadata file that 
contains an unsupported
+   * MetadataVersion is not a problem. This is a regression test for 
KAFKA-19192.
+   */
+  @Test
+  def testOldBootstrapMetadataFile(): Unit = {

Review Comment:
   yes, it does



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to