jsancio commented on code in PR #16848:
URL: https://github.com/apache/kafka/pull/16848#discussion_r1717066358
##########
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##########
@@ -99,6 +113,41 @@ class BrokerLifecycleManagerTest {
}
}
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testSuccessfulRegistrationDuringMigration(kraftVersionV1Supported:
Boolean): Unit = {
+ val ibp = if (kraftVersionV1Supported) IBP_3_9_IV0 else IBP_3_8_IV0
+ val context = new RegistrationTestContext(migrationConfigProperties(ibp))
+ manager = new BrokerLifecycleManager(context.config, context.time,
"successful-registration-", isZkBroker = false,
Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
+ val controllerNode = new Node(3000, "localhost", 8021)
+ context.controllerNodeProvider.node.set(controllerNode)
+ val featuresRemapped =
BrokerFeatures.createDefaultFeatureMap(BrokerFeatures.createDefault(true)).asJava
+
+ // Even though ZK brokers don't use "metadata.version" feature, we need to
overwrite it with our IBP as part of registration
+ // so the KRaft controller can verify that all brokers are on the same IBP
before starting the migration.
+ featuresRemapped.put(MetadataVersion.FEATURE_NAME,
+ VersionRange.of(ibp.featureLevel(), ibp.featureLevel()))
+
+ manager.start(() => context.highestMetadataOffset.get(),
+ context.mockChannelManager, context.clusterId,
context.advertisedListeners,
+ featuresRemapped, OptionalLong.of(10L))
+ TestUtils.retry(60000) {
+ assertEquals(1, context.mockChannelManager.unsentQueue.size)
+ val sentBrokerRegistrationData =
context.mockChannelManager.unsentQueue.getFirst.request.build().asInstanceOf[BrokerRegistrationRequest].data()
+ assertEquals(10L, sentBrokerRegistrationData.previousBrokerEpoch())
+ assertEquals(ibp.featureLevel(),
sentBrokerRegistrationData.features().find(MetadataVersion.FEATURE_NAME).maxSupportedVersion())
+ if (kraftVersionV1Supported) {
+ assertEquals(1,
sentBrokerRegistrationData.features().find(KRaftVersion.FEATURE_NAME).maxSupportedVersion())
+ }
Review Comment:
Doesn't the broker always send (0, 1) for the kraft.version in the latest
code?
Also, if the max supported version ever changes it will break this test.
Let's make the test more generic by checking that the sent broker registration
has the current code's min supported version and max supported version.
##########
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java:
##########
@@ -688,6 +691,61 @@ public void testNoOpRecordWriteAfterTimeout() throws
Throwable {
}
}
+ @ParameterizedTest
+ @CsvSource(value = {"0, 0", "0, 1", "1, 0", "1, 1"})
+ public void testRegisterBrokerKRaftVersions(short controllerKraftVersion,
short brokerKraftVersion) throws Throwable {
Review Comment:
Looks like you are using this to mean the finalized kraft.version. Let's
just call it that: `finalizedKraftVersion`.
##########
metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java:
##########
@@ -87,11 +89,20 @@ public Builder
setSharedLogDataInitializer(Consumer<SharedLogData> sharedLogData
return this;
}
+ /**
+ * Used to mock the latest KRaft version that would be returned from
RaftClient.kraftVersion()
+ */
+ public Builder setLastKRaftVersion(short kraftVersion) {
Review Comment:
I would change the signature to `setLastKRaftVersion(KRaftVersion` and make
the test do the conversion from `short` to `KRaftVersion`.
##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -409,6 +410,15 @@ public ControllerResult<BrokerRegistrationReply>
registerBroker(
for (BrokerRegistrationRequestData.Feature feature :
request.features()) {
record.features().add(processRegistrationFeature(brokerId,
finalizedFeatures, feature));
}
+ if (request.features().find(KRaftVersion.FEATURE_NAME) == null) {
Review Comment:
I see. This solution is not going to scale as Kafka adds more features.
Right now the controller looks at all of the supported features sent by the
broker and makes sure that they support the finalized version.
What we also need is code that looks at all of the remaining (not checked)
finalized versions and make sure that the broker supports them.
What do you think?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]