junrao commented on code in PR #18845:
URL: https://github.com/apache/kafka/pull/18845#discussion_r1949804134


##########
metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectoryTest.java:
##########
@@ -81,18 +81,18 @@ public void testReadFromEmptyConfiguration() throws 
Exception {
     @Test
     public void testReadFromConfigurationWithAncientVersion() throws Exception 
{
         try (BootstrapTestDirectory testDirectory = new 
BootstrapTestDirectory().createDirectory()) {
-            
assertEquals(BootstrapMetadata.fromVersion(MetadataVersion.MINIMUM_BOOTSTRAP_VERSION,
-                    "the minimum version bootstrap with metadata.version 
3.3-IV0"),
+            
assertEquals(BootstrapMetadata.fromVersion(MetadataVersion.MINIMUM_VERSION,
+                    "the minimum version bootstrap with metadata.version 
3.3-IV3"),

Review Comment:
   The usage of 3.0 in the next line and in `testReadFromConfigurationFile` 
need to be adjusted.



##########
metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java:
##########
@@ -56,7 +53,7 @@ public FeaturesImage(
 
     public boolean isEmpty() {
         return finalizedVersions.isEmpty() &&
-            metadataVersion.equals(MetadataVersion.MINIMUM_KRAFT_VERSION);
+            metadataVersion.equals(MetadataVersion.MINIMUM_VERSION);

Review Comment:
   Hmm, metadata version 3.3 can be set explicitly in KRaft. Is this still 
correct @cmccabe ?



##########
server-common/src/main/java/org/apache/kafka/server/common/Feature.java:
##########
@@ -298,7 +296,7 @@ public static void 
validateDefaultValueAndLatestProductionValue(
 
         for (MetadataVersion metadataVersion: MetadataVersion.values()) {
             // Only checking the kraft metadata versions.
-            if 
(metadataVersion.compareTo(MetadataVersion.MINIMUM_KRAFT_VERSION) < 0) {
+            if (metadataVersion.compareTo(MetadataVersion.MINIMUM_VERSION) < 
0) {

Review Comment:
   This is no longer needed since all passed in metadataVersion are >= 
`MetadataVersion.MINIMUM_VERSION`.



##########
metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectoryTest.java:
##########
@@ -81,18 +81,17 @@ public void testReadFromEmptyConfiguration() throws 
Exception {
     @Test
     public void testReadFromConfigurationWithAncientVersion() throws Exception 
{
         try (BootstrapTestDirectory testDirectory = new 
BootstrapTestDirectory().createDirectory()) {
-            
assertEquals(BootstrapMetadata.fromVersion(MetadataVersion.MINIMUM_BOOTSTRAP_VERSION,
-                    "the minimum version bootstrap with metadata.version 
3.3-IV0"),
+            assertThrows(IllegalArgumentException.class, () ->
                 new BootstrapDirectory(testDirectory.path(), 
Optional.of("3.0")).read());
         }
     }
 
     @Test
     public void testReadFromConfiguration() throws Exception {
         try (BootstrapTestDirectory testDirectory = new 
BootstrapTestDirectory().createDirectory()) {
-            
assertEquals(BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_3_IV2,
-                    "the configured bootstrap with metadata.version 3.3-IV2"),
-                new BootstrapDirectory(testDirectory.path(), 
Optional.of("3.3-IV2")).read());
+            
assertEquals(BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_3_IV3,
+                    "the configured bootstrap with metadata.version 3.3-IV3"),
+                new BootstrapDirectory(testDirectory.path(), 
Optional.of("3.3-IV3")).read());

Review Comment:
   Should we change `3.0-IV0` in `testReadFromConfigurationFile` below?



##########
core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIntegrationTest.scala:
##########
@@ -55,7 +55,7 @@ import scala.jdk.CollectionConverters._
 class ProducerIntegrationTest {
 
   @ClusterTests(Array(
-    new ClusterTest(metadataVersion = MetadataVersion.IBP_3_3_IV0)
+    new ClusterTest(metadataVersion = MetadataVersion.IBP_3_3_IV3)

Review Comment:
   Should we use MINIMUM_VERSION to avoid keeping changing it in the future?



##########
server-common/src/main/java/org/apache/kafka/server/common/UnitTestFeatureVersion.java:
##########
@@ -321,7 +321,7 @@ public Map<String, Short> dependencies() {
      * The feature is used to test the default value has MV dependency that is 
behind the bootstrap MV.
      */
     public enum FV7 implements FeatureVersion {
-        UT_FV7_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION, 
Collections.singletonMap(MetadataVersion.FEATURE_NAME, 
MetadataVersion.IBP_3_7_IV0.featureLevel())),
+        UT_FV7_0(0, MetadataVersion.MINIMUM_VERSION, 
Collections.singletonMap(MetadataVersion.FEATURE_NAME, 
MetadataVersion.IBP_3_7_IV0.featureLevel())),

Review Comment:
   The error message in FeatureTest with respect to 3.0 needs to be adjusted. 
   ```
               "Feature UNIT_TEST_VERSION_7 has default FeatureVersion UT_FV7_0 
when MV=3.0-IV1 with " +
                   "MV dependency 3.7-IV0 that is behind its bootstrap MV 
3.0-IV1.");
   ```



##########
metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java:
##########
@@ -126,13 +126,13 @@ public void testFeatureLevelForFeature() {
     static final List<ApiMessageAndVersion> RECORDS_WITH_OLD_METADATA_VERSION 
= Collections.singletonList(
             new ApiMessageAndVersion(new FeatureLevelRecord().
                 setName(FEATURE_NAME).
-                setFeatureLevel(IBP_3_0_IV1.featureLevel()), (short) 0));
+                
setFeatureLevel(MetadataVersionTestUtils.IBP_3_0_IV1_FEATURE_LEVEL), (short) 
0));
 
     @Test
     public void testFromRecordsListWithOldMetadataVersion() {
         RuntimeException exception = assertThrows(RuntimeException.class,
             () -> 
BootstrapMetadata.fromRecords(RECORDS_WITH_OLD_METADATA_VERSION, "quux"));
-        assertEquals("Bootstrap metadata.version before 3.3-IV0 are not 
supported. Can't load " +
-            "metadata from quux", exception.getMessage());
+        assertEquals("No MetadataVersion with feature level 1. Valid feature 
levels are from 7 to 25.",

Review Comment:
   Could we use defined constants for feature level to avoid keeping changing 
it in the future?



##########
core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java:
##########
@@ -380,7 +380,7 @@ public void 
testUpdateBrokerConfigNotAffectedByInvalidConfig() {
     @ClusterTest(
          // Must be at greater than 1MB per cleaner thread, set to 2M+2 so 
that we can set 2 cleaner threads.
          serverProperties = {@ClusterConfigProperty(key = 
"log.cleaner.dedupe.buffer.size", value = "2097154")},
-         metadataVersion = MetadataVersion.IBP_3_3_IV0
+         metadataVersion = MetadataVersion.IBP_3_9_IV0

Review Comment:
   Hmm, why do we choose IBP_3_9_IV0 here?



##########
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##########
@@ -44,25 +44,6 @@
  */
 public enum MetadataVersion {
 
-    // Introduce ListOffsets V7 which supports listing offsets by max 
timestamp (KIP-734)
-    // Assume message format version is 3.0 (KIP-724)
-    IBP_3_0_IV1(1, "3.0", "IV1", true),
-
-    // Adds topic IDs to Fetch requests/responses (KIP-516)
-    IBP_3_1_IV0(2, "3.1", "IV0", false),
-
-    // Support for leader recovery for unclean leader election (KIP-704)
-    IBP_3_2_IV0(3, "3.2", "IV0", true),
-
-    // Support for metadata.version feature flag and Removes min_version_level 
from the finalized version range that is written to ZooKeeper (KIP-778)
-    IBP_3_3_IV0(4, "3.3", "IV0", false),
-
-    // Support NoopRecord for the cluster metadata log (KIP-835)
-    IBP_3_3_IV1(5, "3.3", "IV1", true),
-
-    // In KRaft mode, use BrokerRegistrationChangeRecord instead of 
UnfenceBrokerRecord and FenceBrokerRecord.
-    IBP_3_3_IV2(6, "3.3", "IV2", true),

Review Comment:
   The references of the old MV in AclsImage  need to be adjusted.



##########
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##########
@@ -220,18 +187,6 @@ public boolean isElrSupported() {
         return this.isAtLeast(IBP_4_0_IV1);
     }
 
-    public boolean isKRaftSupported() {
-        return this.featureLevel > 0;
-    }
-
-    public boolean isBrokerRegistrationChangeRecordSupported() {

Review Comment:
   The comment in BrokerRegistrationTrackerTest.java with respect to 3.0 needs 
to be adjusted.
   `        // No calls are made because MetadataVersion is 3.0-IV1 initially`



##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -495,7 +495,9 @@ BrokerFeature processRegistrationFeature(
         FinalizedControllerFeatures finalizedFeatures,
         BrokerRegistrationRequestData.Feature feature
     ) {
-        int defaultVersion = 
feature.name().equals(MetadataVersion.FEATURE_NAME) ? 1 : 0; // The default 
value for MetadataVersion is 1 not 0.
+        // The default value for MetadataVersion changes over time while other 
features start at `0`
+        int defaultVersion = 
feature.name().equals(MetadataVersion.FEATURE_NAME) ?
+            MetadataVersion.MINIMUM_VERSION.featureLevel() : 0;

Review Comment:
   @cmccabe : Should the finalized MetadataVersion always be present in any 
release upgradable to 4.0?



##########
core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala:
##########
@@ -552,24 +535,10 @@ class AlterPartitionManagerTest {
 }
 
 object AlterPartitionManagerTest {
-  def provideMetadataVersions(): JStream[MetadataVersion] = {
+  def provideLeaderRecoveryState(): JStream[Arguments] = {
     JStream.of(
-      // Supports KIP-903: include broker epoch in AlterPartition request
-      IBP_3_5_IV1,

Review Comment:
   Should we change to test both IBP_3_3_IV3 and IBP_3_5_IV1 here, 
@CalvinConfluent ?



##########
server-common/src/main/java/org/apache/kafka/server/common/Feature.java:
##########
@@ -155,18 +155,16 @@ public FeatureVersion fromFeatureLevel(short level,
      * For example, say feature X level x relies on feature Y level y:
      * if feature X >= x then throw an error if feature Y < y.
      *
-     * All feature levels above 0 in kraft require metadata.version=4 
(IBP_3_3_IV0) in order to write the feature records to the cluster.
-     *
      * @param feature                   the feature we are validating
      * @param features                  the feature versions we have (or want 
to set)
      * @throws IllegalArgumentException if the feature is not valid
      */
     public static void validateVersion(FeatureVersion feature, Map<String, 
Short> features) {
         Short metadataVersion = features.get(MetadataVersion.FEATURE_NAME);
 
-        if (feature.featureLevel() >= 1 && (metadataVersion == null || 
metadataVersion < MetadataVersion.IBP_3_3_IV0.featureLevel()))
+        if (feature.featureLevel() >= 1 && (metadataVersion == null || 
metadataVersion < MetadataVersion.MINIMUM_VERSION.featureLevel()))

Review Comment:
   It seems that we could remove this statement completely since the callers 
will always have metadataVersion set to >= 3.3. Is that correct @jolshan ?



##########
core/src/main/scala/kafka/server/AlterPartitionManager.scala:
##########
@@ -224,53 +215,37 @@ class DefaultAlterPartitionManager(
   private def buildRequest(
     inflightAlterPartitionItems: Seq[AlterPartitionItem],
     brokerEpoch: Long
-  ): (AlterPartitionRequest.Builder, mutable.Map[Uuid, String]) = {
-    val metadataVersion = metadataVersionSupplier()
-    // We build this mapping in order to map topic id back to their name when 
we
-    // receive the response. We cannot rely on the metadata cache for this 
because
-    // the metadata cache is updated after the partition state so it might not 
know
-    // yet about a topic id already used here.
-    val topicNamesByIds = mutable.HashMap[Uuid, String]()
-
+  ): AlterPartitionRequest.Builder = {
     val message = new AlterPartitionRequestData()
       .setBrokerId(brokerId)
       .setBrokerEpoch(brokerEpoch)
 
-    inflightAlterPartitionItems.groupBy(_.topicIdPartition.topic).foreach { 
case (topicName, items) =>
-      val topicId = items.head.topicIdPartition.topicId
-      topicNamesByIds(topicId) = topicName
-
+    inflightAlterPartitionItems.groupBy(_.topicIdPartition.topicId).foreach { 
case (topicId, items) =>
       // Both the topic name and the topic id are set here because at this 
stage
       // we don't know which version of the request will be used.
-      val topicData = new AlterPartitionRequestData.TopicData()
-        .setTopicName(topicName)
-        .setTopicId(topicId)
+      val topicData = new 
AlterPartitionRequestData.TopicData().setTopicId(topicId)

Review Comment:
   Could we adjust the comment on topic name above?



##########
metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java:
##########
@@ -137,10 +137,10 @@ static ControllerResult<Void> recordsForNonEmptyLog(
             }
         }
 
-        if (curMetadataVersion.equals(MetadataVersion.MINIMUM_KRAFT_VERSION)) {
+        if (curMetadataVersion.equals(MetadataVersion.MINIMUM_VERSION)) {

Review Comment:
   MetadataVersion.MINIMUM_VERSION now supports metadata.version feature level 
record. So, the following log message could be confusing. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to