junrao commented on code in PR #16183: URL: https://github.com/apache/kafka/pull/16183#discussion_r1684620247
########## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ########## @@ -381,10 +381,12 @@ class BrokerLifecycleManager( private def sendBrokerRegistration(): Unit = { val features = new BrokerRegistrationRequestData.FeatureCollection() _supportedFeatures.asScala.foreach { - case (name, range) => features.add(new BrokerRegistrationRequestData.Feature(). + // Do not include features with the range 0-0. + case (name, range) if range.max() > 0 => features.add(new BrokerRegistrationRequestData.Feature(). Review Comment: We should do the same for supported features in ApiVersionResponse in the following code in BrokerFeatures, right? Also, to avoid duplicating the code, perhaps it's better to do the filtering logic in BrokerFeatures? ``` def defaultSupportedFeatures(unstableFeatureVersionsEnabled: Boolean): Features[SupportedVersionRange] = { val features = new util.HashMap[String, SupportedVersionRange]() features.put(MetadataVersion.FEATURE_NAME, new SupportedVersionRange( MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), if (unstableFeatureVersionsEnabled) { MetadataVersion.latestTesting.featureLevel } else { MetadataVersion.latestProduction.featureLevel })) PRODUCTION_FEATURES.forEach { feature => features.put(feature.featureName, new SupportedVersionRange(0, if (unstableFeatureVersionsEnabled) { feature.latestTesting } else { feature.latestProduction })) } Features.supportedFeatures(features) } ``` ########## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ########## @@ -99,6 +101,27 @@ class BrokerLifecycleManagerTest { } } + @Test + def testFeatureRangeMaxZeroNotIncludedInRegistration(): Unit = { + val context = new RegistrationTestContext(configProperties) + manager = new BrokerLifecycleManager(context.config, context.time, "successful-registration-", isZkBroker = false, Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ"))) + val controllerNode = new Node(3000, "localhost", 8021) + context.controllerNodeProvider.node.set(controllerNode) + + val features = new util.HashMap[String, VersionRange]() + features.put(Features.TEST_VERSION.featureName(), VersionRange.of(0, 0)) + features.put(Features.KRAFT_VERSION.featureName(), VersionRange.of(0, 1)) + + manager.start(() => context.highestMetadataOffset.get(), + context.mockChannelManager, context.clusterId, context.advertisedListeners, + features, OptionalLong.of(10L)) + TestUtils.retry(60000) { + assertEquals(1, context.mockChannelManager.unsentQueue.size) + // Only the feature with a range that is not 0-0 is included. + assertEquals(1, context.mockChannelManager.unsentQueue.getFirst.request.build().asInstanceOf[BrokerRegistrationRequest].data().features().size()) Review Comment: Could we further verify that the only feature is KRAFT_VERSION? ########## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ########## @@ -217,8 +217,12 @@ public enum MetadataVersion { // Add ELR related supports (KIP-966). IBP_3_9_IV1(22, "3.9", "IV1", true), + // Bootstrapping transaction version 2 Review Comment: Should we add a new MV just for bootstrapping a new feature or just piggyback on an existing one (e.g., IBP_3_9_IV1)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org