jsancio commented on code in PR #16848:
URL: https://github.com/apache/kafka/pull/16848#discussion_r1717066358


##########
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##########
@@ -99,6 +113,41 @@ class BrokerLifecycleManagerTest {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testSuccessfulRegistrationDuringMigration(kraftVersionV1Supported: 
Boolean): Unit = {
+    val ibp = if (kraftVersionV1Supported) IBP_3_9_IV0 else IBP_3_8_IV0
+    val context = new RegistrationTestContext(migrationConfigProperties(ibp))
+    manager = new BrokerLifecycleManager(context.config, context.time, 
"successful-registration-", isZkBroker = false, 
Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
+    val controllerNode = new Node(3000, "localhost", 8021)
+    context.controllerNodeProvider.node.set(controllerNode)
+    val featuresRemapped = 
BrokerFeatures.createDefaultFeatureMap(BrokerFeatures.createDefault(true)).asJava
+
+    // Even though ZK brokers don't use "metadata.version" feature, we need to 
overwrite it with our IBP as part of registration
+    // so the KRaft controller can verify that all brokers are on the same IBP 
before starting the migration.
+    featuresRemapped.put(MetadataVersion.FEATURE_NAME,
+      VersionRange.of(ibp.featureLevel(), ibp.featureLevel()))
+
+    manager.start(() => context.highestMetadataOffset.get(),
+      context.mockChannelManager, context.clusterId, 
context.advertisedListeners,
+      featuresRemapped, OptionalLong.of(10L))
+    TestUtils.retry(60000) {
+      assertEquals(1, context.mockChannelManager.unsentQueue.size)
+      val sentBrokerRegistrationData = 
context.mockChannelManager.unsentQueue.getFirst.request.build().asInstanceOf[BrokerRegistrationRequest].data()
+      assertEquals(10L, sentBrokerRegistrationData.previousBrokerEpoch())
+      assertEquals(ibp.featureLevel(), 
sentBrokerRegistrationData.features().find(MetadataVersion.FEATURE_NAME).maxSupportedVersion())
+      if (kraftVersionV1Supported) {
+        assertEquals(1, 
sentBrokerRegistrationData.features().find(KRaftVersion.FEATURE_NAME).maxSupportedVersion())
+      }

Review Comment:
   Doesn't the broker always send (0, 1) for the kraft.version in the latest 
code?
   
   Also, if the max supported version ever changes it will break this test. 
Let's make the test more generic by checking that the sent broker registration 
has the current code's min supported version and max supported version.



##########
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java:
##########
@@ -688,6 +691,61 @@ public void testNoOpRecordWriteAfterTimeout() throws 
Throwable {
         }
     }
 
+    @ParameterizedTest
+    @CsvSource(value = {"0, 0", "0, 1", "1, 0", "1, 1"})
+    public void testRegisterBrokerKRaftVersions(short controllerKraftVersion, 
short brokerKraftVersion) throws Throwable {

Review Comment:
   Looks like you are using this to mean the finalized kraft.version. Let's 
just call it that: `finalizedKraftVersion`.



##########
metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java:
##########
@@ -87,11 +89,20 @@ public Builder 
setSharedLogDataInitializer(Consumer<SharedLogData> sharedLogData
             return this;
         }
 
+        /**
+         * Used to mock the latest KRaft version that would be returned from 
RaftClient.kraftVersion()
+         */
+        public Builder setLastKRaftVersion(short kraftVersion) {

Review Comment:
   I would change the signature to `setLastKRaftVersion(KRaftVersion` and make 
the test do the conversion from `short` to `KRaftVersion`.



##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -409,6 +410,15 @@ public ControllerResult<BrokerRegistrationReply> 
registerBroker(
         for (BrokerRegistrationRequestData.Feature feature : 
request.features()) {
             record.features().add(processRegistrationFeature(brokerId, 
finalizedFeatures, feature));
         }
+        if (request.features().find(KRaftVersion.FEATURE_NAME) == null) {

Review Comment:
   I see. This solution is not going to scale as Kafka adds more features. 
Right now the controller looks at all of the supported features sent by the 
broker and makes sure that they support the finalized version.
   
   What we also need is code that looks at all of the remaining (not checked) 
finalized versions and make sure that the broker supports them.
   
   What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to