rondagostino commented on a change in pull request #11503:
URL: https://github.com/apache/kafka/pull/11503#discussion_r759505035



##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -2018,12 +2014,72 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
       "offsets.commit.required.acks must be greater or equal -1 and less or 
equal to offsets.topic.replication.factor")
     require(BrokerCompressionCodec.isValid(compressionType), "compression.type 
: " + compressionType + " is not valid." +
       " Valid options are " + 
BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
-    require(!processRoles.contains(ControllerRole) || 
controllerListeners.nonEmpty,
-      s"${KafkaConfig.ControllerListenerNamesProp} cannot be empty if the 
server has the controller role")
-
     val advertisedListenerNames = advertisedListeners.map(_.listenerName).toSet
+    if (usesSelfManagedQuorum) {
+      // validations for all 3 KRaft setups (co-located, controller-only, 
broker-only)
+      val addressSpecsByNodeId = RaftConfig.parseVoterConnections(quorumVoters)
+      if (addressSpecsByNodeId.isEmpty) {
+        throw new ConfigException(s"If using ${KafkaConfig.ProcessRolesProp}, 
${KafkaConfig.QuorumVotersProp} must contain a parseable set of voters.")
+      }
+      require(controlPlaneListenerName.isEmpty,
+        s"${KafkaConfig.ControlPlaneListenerNameProp} is not supported in 
KRaft mode.")
+      val sourceOfAdvertisedListeners = if 
(getString(KafkaConfig.AdvertisedListenersProp) != null)
+        s"${KafkaConfig.AdvertisedListenersProp}"
+      else
+        s"${KafkaConfig.ListenersProp}"
+      if (!processRoles.contains(BrokerRole)) {
+        // validations for KRaft controller-only setup
+        // advertised listeners must be empty when not also running the broker 
role
+        require(advertisedListeners.isEmpty,
+          sourceOfAdvertisedListeners +
+            s" must only contain KRaft controller listeners from 
${KafkaConfig.ControllerListenerNamesProp} when 
${KafkaConfig.ProcessRolesProp}=controller")
+      } else {
+        // validations for both KRaft broker setup (i.e. broker-only and 
co-located)
+        // when running broker role advertised listeners cannot contain 
controller listeners
+        require(!advertisedListenerNames.exists(aln => 
controllerListenerNames.contains(aln.value())),
+          sourceOfAdvertisedListeners +
+            s" must not contain KRaft controller listeners from 
${KafkaConfig.ControllerListenerNamesProp} when ${KafkaConfig.ProcessRolesProp} 
contains the broker role")
+      }
+      if (processRoles.contains(ControllerRole)) {
+        // validations for both KRaft controller setups (i.e. controller-only 
and co-located)
+        // nodeId must appear in controller.quorum.voters
+        // controller.listener.names must be non-empty
+        // every one must appear in listeners
+        require(addressSpecsByNodeId.get(nodeId) != null,
+          s"If ${KafkaConfig.ProcessRolesProp} contains the 'controller' role, 
the node id $nodeId must be included in the set of voters 
${KafkaConfig.QuorumVotersProp}=${addressSpecsByNodeId.asScala.keySet.toSet}")
+        require(controllerListeners.nonEmpty,
+          s"${KafkaConfig.ControllerListenerNamesProp} must contain at least 
one value appearing in the '${KafkaConfig.ListenersProp}' configuration when 
running the KRaft controller role")
+        val listenerNameValues = listeners.map(_.listenerName.value).toSet
+        require(controllerListenerNames.forall(cln => 
listenerNameValues.contains(cln)),
+          s"${KafkaConfig.ControllerListenerNamesProp} must only contain 
values appearing in the '${KafkaConfig.ListenersProp}' configuration when 
running the KRaft controller role")
+      } else {

Review comment:
       > I find it a bit difficult to follow these validations because of the 
way they are organized
   
   I reorganized them as per your suggestion, and I think it is better... 
thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to