rondagostino commented on a change in pull request #11503:
URL: https://github.com/apache/kafka/pull/11503#discussion_r760626984



##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -2018,12 +2015,104 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
       "offsets.commit.required.acks must be greater or equal -1 and less or 
equal to offsets.topic.replication.factor")
     require(BrokerCompressionCodec.isValid(compressionType), "compression.type 
: " + compressionType + " is not valid." +
       " Valid options are " + 
BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
-    require(!processRoles.contains(ControllerRole) || 
controllerListeners.nonEmpty,
-      s"${KafkaConfig.ControllerListenerNamesProp} cannot be empty if the 
server has the controller role")
-
     val advertisedListenerNames = advertisedListeners.map(_.listenerName).toSet
+
+    // validate KRaft-related configs
+    val voterAddressSpecsByNodeId = 
RaftConfig.parseVoterConnections(quorumVoters)
+    def validateCanParseControllerQuorumVotersForKRaft(): Unit = {
+      if (voterAddressSpecsByNodeId.isEmpty) {
+        throw new ConfigException(s"If using ${KafkaConfig.ProcessRolesProp}, 
${KafkaConfig.QuorumVotersProp} must contain a parseable set of voters.")
+      }
+    }
+    def validateControlPlaneListenerEmptyForKRaft(): Unit = {
+      require(controlPlaneListenerName.isEmpty,
+        s"${KafkaConfig.ControlPlaneListenerNameProp} is not supported in 
KRaft mode. KRaft uses ${KafkaConfig.ControllerListenerNamesProp} instead.")
+    }
+    def sourceOfAdvertisedListeners: String = {
+      if (getString(KafkaConfig.AdvertisedListenersProp) != null)
+        s"${KafkaConfig.AdvertisedListenersProp}"
+      else
+        s"${KafkaConfig.ListenersProp}"
+    }
+    def 
validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker(): 
Unit = {
+      require(!advertisedListenerNames.exists(aln => 
controllerListenerNames.contains(aln.value())),
+        s"$sourceOfAdvertisedListeners must not contain KRaft controller 
listeners from ${KafkaConfig.ControllerListenerNamesProp} when 
${KafkaConfig.ProcessRolesProp} contains the broker role because Kafka clients 
that send requests via advertised listeners do not send requests to KRaft 
controllers -- they only send requests to KRaft brokers.")
+    }
+    def validateControllerQuorumVotersMustContainNodeIDForKRaftController(): 
Unit = {
+      require(voterAddressSpecsByNodeId.containsKey(nodeId),
+        s"If ${KafkaConfig.ProcessRolesProp} contains the 'controller' role, 
the node id $nodeId must be included in the set of voters 
${KafkaConfig.QuorumVotersProp}=${voterAddressSpecsByNodeId.asScala.keySet.toSet}")
+    }
+    def validateControllerListenerExistsForKRaftController(): Unit = {
+      require(controllerListeners.nonEmpty,
+        s"${KafkaConfig.ControllerListenerNamesProp} must contain at least one 
value appearing in the '${KafkaConfig.ListenersProp}' configuration when 
running the KRaft controller role")
+    }
+    def 
validateControllerListenerNamesMustAppearInListenersForKRaftController(): Unit 
= {
+      val listenerNameValues = listeners.map(_.listenerName.value).toSet
+      require(controllerListenerNames.forall(cln => 
listenerNameValues.contains(cln)),
+        s"${KafkaConfig.ControllerListenerNamesProp} must only contain values 
appearing in the '${KafkaConfig.ListenersProp}' configuration when running the 
KRaft controller role")
+    }
+    def validateAdvertisedListenersNonEmptyForBroker(): Unit = {
+      require(advertisedListenerNames.nonEmpty,
+        "There must be at least one advertised listener." + (
+          if (processRoles.contains(BrokerRole)) s" Perhaps all listeners 
appear in ${ControllerListenerNamesProp}?" else ""))
+    }
+    if (processRoles == Set(BrokerRole)) {
+      // KRaft broker-only
+      validateCanParseControllerQuorumVotersForKRaft()
+      validateControlPlaneListenerEmptyForKRaft()
+      
validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker()
+      // nodeId must not appear in controller.quorum.voters
+      require(!voterAddressSpecsByNodeId.containsKey(nodeId),
+        s"If ${KafkaConfig.ProcessRolesProp} contains just the 'broker' role, 
the node id $nodeId must not be included in the set of voters 
${KafkaConfig.QuorumVotersProp}=${voterAddressSpecsByNodeId.asScala.keySet.toSet}")
+      // controller.listener.names must be non-empty...
+      require(controllerListenerNames.exists(_.nonEmpty),
+        s"${KafkaConfig.ControllerListenerNamesProp} must contain at least one 
value when running KRaft with just the broker role")
+      // controller.listener.names are forbidden in listeners...
+      require(controllerListeners.isEmpty,
+        s"${KafkaConfig.ControllerListenerNamesProp} must not contain a value 
appearing in the '${KafkaConfig.ListenersProp}' configuration when running 
KRaft with just the broker role")
+      // controller.listener.names must all appear in 
listener.security.protocol.map
+      controllerListenerNames.filter(_.nonEmpty).foreach { name =>
+        val listenerName = ListenerName.normalised(name)
+        if (!listenerSecurityProtocolMap.contains(listenerName)) {
+          throw new ConfigException(s"Controller listener with name 
${listenerName.value} defined in " +
+            s"${KafkaConfig.ControllerListenerNamesProp} not found in 
${KafkaConfig.ListenerSecurityProtocolMapProp}.")
+        }
+      }
+      // warn that only the first controller listener is used if there is more 
than one
+      if (controllerListenerNames.size > 1) {
+        warn(s"${KafkaConfig.ControllerListenerNamesProp} has multiple 
entries; only the first will be used since 
${KafkaConfig.ProcessRolesProp}=broker: 
${controllerListenerNames.asJava.toString}")
+      }
+      validateAdvertisedListenersNonEmptyForBroker()
+    } else if (processRoles == Set(ControllerRole)) {
+      // KRaft controller-only
+      validateCanParseControllerQuorumVotersForKRaft()
+      validateControlPlaneListenerEmptyForKRaft()
+      // advertised listeners must be empty when not also running the broker 
role
+      require(advertisedListeners.isEmpty,
+        s"$sourceOfAdvertisedListeners must only contain KRaft controller 
listeners from ${KafkaConfig.ControllerListenerNamesProp} when 
${KafkaConfig.ProcessRolesProp}=controller")
+      validateControllerQuorumVotersMustContainNodeIDForKRaftController()
+      validateControllerListenerExistsForKRaftController()
+      validateControllerListenerNamesMustAppearInListenersForKRaftController()
+    } else if (processRoles == Set(BrokerRole, ControllerRole)) {
+      // KRaft colocated broker and controller
+      validateCanParseControllerQuorumVotersForKRaft()
+      validateControlPlaneListenerEmptyForKRaft()
+      
validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker()
+      validateControllerQuorumVotersMustContainNodeIDForKRaftController()
+      validateControllerListenerExistsForKRaftController()
+      validateControllerListenerNamesMustAppearInListenersForKRaftController()
+      validateAdvertisedListenersNonEmptyForBroker()

Review comment:
       The config you presented as being illegal is actually acceptable 
assuming we also set `inter.broker.listener.name` and 
`listener.secuerity.protocol.map`:
   
   ```
   process.roles=broker,controller
   controller.listener.names=CONTROLLER
   listeners=DEFAULT://:9092,CONTROLLER://:9093
   # need to add these two configs to make it work
   inter.broker.listener.name=DEFAULT
   listener.security.protocol.map=DEFAULT:PLAINTEXT,CONTROLLER:PLAINTEXT
   ```
   
   We could make it simpler if we take advantage of various default values, 
like this:
   
   ```
   process.roles=broker,controller
   controller.listener.names=CONTROLLER
   listeners=PLAINTEXT://:9092,CONTROLLER://:9093
   # inter.broker.listener.name and litener.security.protocol.map default 
values will now allow it to work
   ```
   
   > An alternative approach we could use here would be to automatically 
exclude the controller listeners from the advertised listeners when they are 
specified by listeners
   
   Yes, this is what we do:
   ```
     def advertisedListeners: Seq[EndPoint] = {
   ...
         listeners.filterNot(l => 
controllerListenerNames.contains(l.listenerName.value()))
     }
   ```
   
   > By the way, would it make sense to rename advertisedListeners to 
effectiveAdvertisedListeners to make it clearer that the source might not be 
advertised.listeners?
   
   Yes!  I think that makes perfect sense.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to