dielhennr commented on a change in pull request #11179:
URL: https://github.com/apache/kafka/pull/11179#discussion_r684692820
##########
File path: core/src/test/scala/unit/kafka/KafkaConfigTest.scala
##########
@@ -79,6 +81,71 @@ class KafkaTest {
assertThrows(classOf[FatalExitError], () =>
KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile,
"broker.id=1", "--override", "broker.id=2"))))
}
+ @Test
+ def testBrokerRoleNodeIdValidation(): Unit = {
+ // Ensure that validation is happening at startup to check that brokers do
not use their node.id as a voter in controller.quorum.voters
+ val propertiesFile = new Properties
+ propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+ propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+ propertiesFile.setProperty(KafkaConfig.QuorumVotersProp,
"1@localhost:9092")
+ setListenerProps(propertiesFile)
+ assertThrows(classOf[IllegalArgumentException], () =>
KafkaConfig.fromProps(propertiesFile))
+
+ // Ensure that with a valid config no exception is thrown
+ propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
+ KafkaConfig.fromProps(propertiesFile)
+ }
+
+ @Test
+ def testControllerRoleNodeIdValidation(): Unit = {
+ // Ensure that validation is happening at startup to check that
controllers use their node.id as a voter in controller.quorum.voters
+ val propertiesFile = new Properties
+ propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "controller")
+ propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+ propertiesFile.setProperty(KafkaConfig.QuorumVotersProp,
"2@localhost:9092")
+ setListenerProps(propertiesFile)
+ assertThrows(classOf[IllegalArgumentException], () =>
KafkaConfig.fromProps(propertiesFile))
+
+ // Ensure that with a valid config no exception is thrown
+ propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
+ KafkaConfig.fromProps(propertiesFile)
+ }
+
+ @Test
+ def testColocatedRoleNodeIdValidation(): Unit = {
+ // Ensure that validation is happening at startup to check that colocated
processes use their node.id as a voter in controller.quorum.voters
+ val propertiesFile = new Properties
+ propertiesFile.setProperty(KafkaConfig.ProcessRolesProp,
"controller,broker")
+ propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+ propertiesFile.setProperty(KafkaConfig.QuorumVotersProp,
"2@localhost:9092")
+ setListenerProps(propertiesFile)
+ assertThrows(classOf[IllegalArgumentException], () =>
KafkaConfig.fromProps(propertiesFile))
+
+ // Ensure that with a valid config no exception is thrown
+ propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
+ KafkaConfig.fromProps(propertiesFile)
+ }
+
+ @Test
+ def testMustContainQuorumVotersIfUsingProcessRoles(): Unit = {
+ // Ensure that validation is happening at startup to check that colocated
processes use their node.id as a voter in controller.quorum.voters
+ val propertiesFile = new Properties
+ propertiesFile.setProperty(KafkaConfig.ProcessRolesProp,
"controller,broker")
+ propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+ propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "")
+ setListenerProps(propertiesFile)
+ assertThrows(classOf[ConfigException], () =>
KafkaConfig.fromProps(propertiesFile))
+ }
Review comment:
```
// Ensure that if neither process.roles nor controller.quorum.voters is
populated, then an exception is thrown if zookeeper.connect is not defined
propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "")
assertThrows(classOf[ConfigException], () =>
KafkaConfig.fromProps(propertiesFile))
// Ensure that no exception is thrown once zookeeper.connect is defined
propertiesFile.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
KafkaConfig.fromProps(propertiesFile)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]