junrao commented on code in PR #18845: URL: https://github.com/apache/kafka/pull/18845#discussion_r1949804134
########## metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectoryTest.java: ########## @@ -81,18 +81,18 @@ public void testReadFromEmptyConfiguration() throws Exception { @Test public void testReadFromConfigurationWithAncientVersion() throws Exception { try (BootstrapTestDirectory testDirectory = new BootstrapTestDirectory().createDirectory()) { - assertEquals(BootstrapMetadata.fromVersion(MetadataVersion.MINIMUM_BOOTSTRAP_VERSION, - "the minimum version bootstrap with metadata.version 3.3-IV0"), + assertEquals(BootstrapMetadata.fromVersion(MetadataVersion.MINIMUM_VERSION, + "the minimum version bootstrap with metadata.version 3.3-IV3"), Review Comment: The usage of 3.0 in the next line and in `testReadFromConfigurationFile` need to be adjusted. ########## metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java: ########## @@ -56,7 +53,7 @@ public FeaturesImage( public boolean isEmpty() { return finalizedVersions.isEmpty() && - metadataVersion.equals(MetadataVersion.MINIMUM_KRAFT_VERSION); + metadataVersion.equals(MetadataVersion.MINIMUM_VERSION); Review Comment: Hmm, metadata version 3.3 can be set explicitly in KRaft. Is this still correct @cmccabe ? ########## server-common/src/main/java/org/apache/kafka/server/common/Feature.java: ########## @@ -298,7 +296,7 @@ public static void validateDefaultValueAndLatestProductionValue( for (MetadataVersion metadataVersion: MetadataVersion.values()) { // Only checking the kraft metadata versions. - if (metadataVersion.compareTo(MetadataVersion.MINIMUM_KRAFT_VERSION) < 0) { + if (metadataVersion.compareTo(MetadataVersion.MINIMUM_VERSION) < 0) { Review Comment: This is no longer needed since all passed in metadataVersion are >= `MetadataVersion.MINIMUM_VERSION`. ########## metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectoryTest.java: ########## @@ -81,18 +81,17 @@ public void testReadFromEmptyConfiguration() throws Exception { @Test public void testReadFromConfigurationWithAncientVersion() throws Exception { try (BootstrapTestDirectory testDirectory = new BootstrapTestDirectory().createDirectory()) { - assertEquals(BootstrapMetadata.fromVersion(MetadataVersion.MINIMUM_BOOTSTRAP_VERSION, - "the minimum version bootstrap with metadata.version 3.3-IV0"), + assertThrows(IllegalArgumentException.class, () -> new BootstrapDirectory(testDirectory.path(), Optional.of("3.0")).read()); } } @Test public void testReadFromConfiguration() throws Exception { try (BootstrapTestDirectory testDirectory = new BootstrapTestDirectory().createDirectory()) { - assertEquals(BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_3_IV2, - "the configured bootstrap with metadata.version 3.3-IV2"), - new BootstrapDirectory(testDirectory.path(), Optional.of("3.3-IV2")).read()); + assertEquals(BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_3_IV3, + "the configured bootstrap with metadata.version 3.3-IV3"), + new BootstrapDirectory(testDirectory.path(), Optional.of("3.3-IV3")).read()); Review Comment: Should we change `3.0-IV0` in `testReadFromConfigurationFile` below? ########## core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIntegrationTest.scala: ########## @@ -55,7 +55,7 @@ import scala.jdk.CollectionConverters._ class ProducerIntegrationTest { @ClusterTests(Array( - new ClusterTest(metadataVersion = MetadataVersion.IBP_3_3_IV0) + new ClusterTest(metadataVersion = MetadataVersion.IBP_3_3_IV3) Review Comment: Should we use MINIMUM_VERSION to avoid keeping changing it in the future? ########## server-common/src/main/java/org/apache/kafka/server/common/UnitTestFeatureVersion.java: ########## @@ -321,7 +321,7 @@ public Map<String, Short> dependencies() { * The feature is used to test the default value has MV dependency that is behind the bootstrap MV. */ public enum FV7 implements FeatureVersion { - UT_FV7_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION, Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_7_IV0.featureLevel())), + UT_FV7_0(0, MetadataVersion.MINIMUM_VERSION, Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_7_IV0.featureLevel())), Review Comment: The error message in FeatureTest with respect to 3.0 needs to be adjusted. ``` "Feature UNIT_TEST_VERSION_7 has default FeatureVersion UT_FV7_0 when MV=3.0-IV1 with " + "MV dependency 3.7-IV0 that is behind its bootstrap MV 3.0-IV1."); ``` ########## metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java: ########## @@ -126,13 +126,13 @@ public void testFeatureLevelForFeature() { static final List<ApiMessageAndVersion> RECORDS_WITH_OLD_METADATA_VERSION = Collections.singletonList( new ApiMessageAndVersion(new FeatureLevelRecord(). setName(FEATURE_NAME). - setFeatureLevel(IBP_3_0_IV1.featureLevel()), (short) 0)); + setFeatureLevel(MetadataVersionTestUtils.IBP_3_0_IV1_FEATURE_LEVEL), (short) 0)); @Test public void testFromRecordsListWithOldMetadataVersion() { RuntimeException exception = assertThrows(RuntimeException.class, () -> BootstrapMetadata.fromRecords(RECORDS_WITH_OLD_METADATA_VERSION, "quux")); - assertEquals("Bootstrap metadata.version before 3.3-IV0 are not supported. Can't load " + - "metadata from quux", exception.getMessage()); + assertEquals("No MetadataVersion with feature level 1. Valid feature levels are from 7 to 25.", Review Comment: Could we use defined constants for feature level to avoid keeping changing it in the future? ########## core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java: ########## @@ -380,7 +380,7 @@ public void testUpdateBrokerConfigNotAffectedByInvalidConfig() { @ClusterTest( // Must be at greater than 1MB per cleaner thread, set to 2M+2 so that we can set 2 cleaner threads. serverProperties = {@ClusterConfigProperty(key = "log.cleaner.dedupe.buffer.size", value = "2097154")}, - metadataVersion = MetadataVersion.IBP_3_3_IV0 + metadataVersion = MetadataVersion.IBP_3_9_IV0 Review Comment: Hmm, why do we choose IBP_3_9_IV0 here? ########## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ########## @@ -44,25 +44,6 @@ */ public enum MetadataVersion { - // Introduce ListOffsets V7 which supports listing offsets by max timestamp (KIP-734) - // Assume message format version is 3.0 (KIP-724) - IBP_3_0_IV1(1, "3.0", "IV1", true), - - // Adds topic IDs to Fetch requests/responses (KIP-516) - IBP_3_1_IV0(2, "3.1", "IV0", false), - - // Support for leader recovery for unclean leader election (KIP-704) - IBP_3_2_IV0(3, "3.2", "IV0", true), - - // Support for metadata.version feature flag and Removes min_version_level from the finalized version range that is written to ZooKeeper (KIP-778) - IBP_3_3_IV0(4, "3.3", "IV0", false), - - // Support NoopRecord for the cluster metadata log (KIP-835) - IBP_3_3_IV1(5, "3.3", "IV1", true), - - // In KRaft mode, use BrokerRegistrationChangeRecord instead of UnfenceBrokerRecord and FenceBrokerRecord. - IBP_3_3_IV2(6, "3.3", "IV2", true), Review Comment: The references of the old MV in AclsImage need to be adjusted. ########## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ########## @@ -220,18 +187,6 @@ public boolean isElrSupported() { return this.isAtLeast(IBP_4_0_IV1); } - public boolean isKRaftSupported() { - return this.featureLevel > 0; - } - - public boolean isBrokerRegistrationChangeRecordSupported() { Review Comment: The comment in BrokerRegistrationTrackerTest.java with respect to 3.0 needs to be adjusted. ` // No calls are made because MetadataVersion is 3.0-IV1 initially` ########## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ########## @@ -495,7 +495,9 @@ BrokerFeature processRegistrationFeature( FinalizedControllerFeatures finalizedFeatures, BrokerRegistrationRequestData.Feature feature ) { - int defaultVersion = feature.name().equals(MetadataVersion.FEATURE_NAME) ? 1 : 0; // The default value for MetadataVersion is 1 not 0. + // The default value for MetadataVersion changes over time while other features start at `0` + int defaultVersion = feature.name().equals(MetadataVersion.FEATURE_NAME) ? + MetadataVersion.MINIMUM_VERSION.featureLevel() : 0; Review Comment: @cmccabe : Should the finalized MetadataVersion always be present in any release upgradable to 4.0? ########## core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala: ########## @@ -552,24 +535,10 @@ class AlterPartitionManagerTest { } object AlterPartitionManagerTest { - def provideMetadataVersions(): JStream[MetadataVersion] = { + def provideLeaderRecoveryState(): JStream[Arguments] = { JStream.of( - // Supports KIP-903: include broker epoch in AlterPartition request - IBP_3_5_IV1, Review Comment: Should we change to test both IBP_3_3_IV3 and IBP_3_5_IV1 here, @CalvinConfluent ? ########## server-common/src/main/java/org/apache/kafka/server/common/Feature.java: ########## @@ -155,18 +155,16 @@ public FeatureVersion fromFeatureLevel(short level, * For example, say feature X level x relies on feature Y level y: * if feature X >= x then throw an error if feature Y < y. * - * All feature levels above 0 in kraft require metadata.version=4 (IBP_3_3_IV0) in order to write the feature records to the cluster. - * * @param feature the feature we are validating * @param features the feature versions we have (or want to set) * @throws IllegalArgumentException if the feature is not valid */ public static void validateVersion(FeatureVersion feature, Map<String, Short> features) { Short metadataVersion = features.get(MetadataVersion.FEATURE_NAME); - if (feature.featureLevel() >= 1 && (metadataVersion == null || metadataVersion < MetadataVersion.IBP_3_3_IV0.featureLevel())) + if (feature.featureLevel() >= 1 && (metadataVersion == null || metadataVersion < MetadataVersion.MINIMUM_VERSION.featureLevel())) Review Comment: It seems that we could remove this statement completely since the callers will always have metadataVersion set to >= 3.3. Is that correct @jolshan ? ########## core/src/main/scala/kafka/server/AlterPartitionManager.scala: ########## @@ -224,53 +215,37 @@ class DefaultAlterPartitionManager( private def buildRequest( inflightAlterPartitionItems: Seq[AlterPartitionItem], brokerEpoch: Long - ): (AlterPartitionRequest.Builder, mutable.Map[Uuid, String]) = { - val metadataVersion = metadataVersionSupplier() - // We build this mapping in order to map topic id back to their name when we - // receive the response. We cannot rely on the metadata cache for this because - // the metadata cache is updated after the partition state so it might not know - // yet about a topic id already used here. - val topicNamesByIds = mutable.HashMap[Uuid, String]() - + ): AlterPartitionRequest.Builder = { val message = new AlterPartitionRequestData() .setBrokerId(brokerId) .setBrokerEpoch(brokerEpoch) - inflightAlterPartitionItems.groupBy(_.topicIdPartition.topic).foreach { case (topicName, items) => - val topicId = items.head.topicIdPartition.topicId - topicNamesByIds(topicId) = topicName - + inflightAlterPartitionItems.groupBy(_.topicIdPartition.topicId).foreach { case (topicId, items) => // Both the topic name and the topic id are set here because at this stage // we don't know which version of the request will be used. - val topicData = new AlterPartitionRequestData.TopicData() - .setTopicName(topicName) - .setTopicId(topicId) + val topicData = new AlterPartitionRequestData.TopicData().setTopicId(topicId) Review Comment: Could we adjust the comment on topic name above? ########## metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java: ########## @@ -137,10 +137,10 @@ static ControllerResult<Void> recordsForNonEmptyLog( } } - if (curMetadataVersion.equals(MetadataVersion.MINIMUM_KRAFT_VERSION)) { + if (curMetadataVersion.equals(MetadataVersion.MINIMUM_VERSION)) { Review Comment: MetadataVersion.MINIMUM_VERSION now supports metadata.version feature level record. So, the following log message could be confusing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org