junrao commented on code in PR #16183:
URL: https://github.com/apache/kafka/pull/16183#discussion_r1684620247


##########
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##########
@@ -381,10 +381,12 @@ class BrokerLifecycleManager(
   private def sendBrokerRegistration(): Unit = {
     val features = new BrokerRegistrationRequestData.FeatureCollection()
     _supportedFeatures.asScala.foreach {
-      case (name, range) => features.add(new 
BrokerRegistrationRequestData.Feature().
+      // Do not include features with the range 0-0.
+      case (name, range) if range.max() > 0 => features.add(new 
BrokerRegistrationRequestData.Feature().

Review Comment:
   We should do the same for supported features in ApiVersionResponse in the 
following code in BrokerFeatures, right? Also, to avoid duplicating the code, 
perhaps it's better to do the filtering logic in BrokerFeatures?
    
   ```
     def defaultSupportedFeatures(unstableFeatureVersionsEnabled: Boolean): 
Features[SupportedVersionRange] = {
       val features = new util.HashMap[String, SupportedVersionRange]()
         features.put(MetadataVersion.FEATURE_NAME,
           new SupportedVersionRange(
             MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
             if (unstableFeatureVersionsEnabled) {
               MetadataVersion.latestTesting.featureLevel
             } else {
               MetadataVersion.latestProduction.featureLevel
             }))
       PRODUCTION_FEATURES.forEach { feature => 
features.put(feature.featureName,
             new SupportedVersionRange(0,
               if (unstableFeatureVersionsEnabled) {
                 feature.latestTesting
               } else {
                 feature.latestProduction
               }))
       }
       Features.supportedFeatures(features)
     }
   ```



##########
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##########
@@ -99,6 +101,27 @@ class BrokerLifecycleManagerTest {
     }
   }
 
+  @Test
+  def testFeatureRangeMaxZeroNotIncludedInRegistration(): Unit = {
+    val context = new RegistrationTestContext(configProperties)
+    manager = new BrokerLifecycleManager(context.config, context.time, 
"successful-registration-", isZkBroker = false, 
Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
+    val controllerNode = new Node(3000, "localhost", 8021)
+    context.controllerNodeProvider.node.set(controllerNode)
+
+    val features = new util.HashMap[String, VersionRange]()
+    features.put(Features.TEST_VERSION.featureName(), VersionRange.of(0, 0))
+    features.put(Features.KRAFT_VERSION.featureName(), VersionRange.of(0, 1))
+
+    manager.start(() => context.highestMetadataOffset.get(),
+      context.mockChannelManager, context.clusterId, 
context.advertisedListeners,
+      features, OptionalLong.of(10L))
+    TestUtils.retry(60000) {
+      assertEquals(1, context.mockChannelManager.unsentQueue.size)
+      // Only the feature with a range that is not 0-0 is included.
+      assertEquals(1, 
context.mockChannelManager.unsentQueue.getFirst.request.build().asInstanceOf[BrokerRegistrationRequest].data().features().size())

Review Comment:
   Could we further verify that the only feature is KRAFT_VERSION?



##########
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##########
@@ -217,8 +217,12 @@ public enum MetadataVersion {
     // Add ELR related supports (KIP-966).
     IBP_3_9_IV1(22, "3.9", "IV1", true),
 
+    // Bootstrapping transaction version 2

Review Comment:
   Should we add a new MV just for bootstrapping a new feature or just 
piggyback on an existing one (e.g., IBP_3_9_IV1)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to