jsancio commented on a change in pull request #11503:
URL: https://github.com/apache/kafka/pull/11503#discussion_r753852672



##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -2011,12 +2012,68 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
       "offsets.commit.required.acks must be greater or equal -1 and less or 
equal to offsets.topic.replication.factor")
     require(BrokerCompressionCodec.isValid(compressionType), "compression.type 
: " + compressionType + " is not valid." +
       " Valid options are " + 
BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
-    require(!processRoles.contains(ControllerRole) || 
controllerListeners.nonEmpty,
-      s"${KafkaConfig.ControllerListenerNamesProp} cannot be empty if the 
server has the controller role")
-
     val advertisedListenerNames = advertisedListeners.map(_.listenerName).toSet
+    if (usesSelfManagedQuorum) {
+      require(controlPlaneListenerName.isEmpty,
+        s"${KafkaConfig.ControlPlaneListenerNameProp} is not supported in 
KRaft mode.")

Review comment:
       It looks like we have section for validating this property. Maybe move 
this error there:
   ```scala
         // validate control.plane.listener.name config
         if (controlPlaneListenerName.isDefined) {
           
require(advertisedListenerNames.contains(controlPlaneListenerName.get),
             s"${KafkaConfig.ControlPlaneListenerNameProp} must be a listener 
name defined in ${KafkaConfig.AdvertisedListenersProp}. " +
             s"The valid options based on currently configured listeners are 
${advertisedListenerNames.map(_.value).mkString(",")}")
           // controlPlaneListenerName should be different from 
interBrokerListenerName
           
require(!controlPlaneListenerName.get.value().equals(interBrokerListenerName.value()),
             s"${KafkaConfig.ControlPlaneListenerNameProp}, when defined, 
should have a different value from the inter broker listener name. " +
             s"Currently they both have the value 
${controlPlaneListenerName.get}")
         }
   ```

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -2018,12 +2019,68 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
       "offsets.commit.required.acks must be greater or equal -1 and less or 
equal to offsets.topic.replication.factor")
     require(BrokerCompressionCodec.isValid(compressionType), "compression.type 
: " + compressionType + " is not valid." +
       " Valid options are " + 
BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
-    require(!processRoles.contains(ControllerRole) || 
controllerListeners.nonEmpty,
-      s"${KafkaConfig.ControllerListenerNamesProp} cannot be empty if the 
server has the controller role")
-
     val advertisedListenerNames = advertisedListeners.map(_.listenerName).toSet
+    if (usesSelfManagedQuorum) {
+      require(controlPlaneListenerName.isEmpty,
+        s"${KafkaConfig.ControlPlaneListenerNameProp} is not supported in 
KRaft mode.")
+      val sourceOfAdvertisedListeners = if 
(getString(KafkaConfig.AdvertisedListenersProp) != null)
+        s"${KafkaConfig.AdvertisedListenersProp}"
+      else
+        s"${KafkaConfig.ListenersProp}"
+      if (!processRoles.contains(BrokerRole)) {
+        // advertised listeners must be empty when not also running the broker 
role
+        require(advertisedListeners.isEmpty,
+          sourceOfAdvertisedListeners +
+            s" must only contain KRaft controller listeners from 
${KafkaConfig.ControllerListenerNamesProp} when 
${KafkaConfig.ProcessRolesProp}=controller")
+      } else {
+        // when running broker role advertised listeners cannot contain 
controller listeners
+        require(!advertisedListenerNames.exists(aln => 
controllerListenerNames.contains(aln.value())),
+          sourceOfAdvertisedListeners +
+            s" must not contain KRaft controller listeners from 
${KafkaConfig.ControllerListenerNamesProp} when ${KafkaConfig.ProcessRolesProp} 
contains the broker role")
+      }
+      if (processRoles.contains(ControllerRole)) {
+        // has controller role (and optionally broker role as well)
+        // controller.listener.names must be non-empty
+        // every one must appear in listeners
+        // the port appearing in controller.quorum.voters for this node must 
match the port of the first controller listener
+        // (we allow other nodes' voter ports to differ to support running 
multiple controllers on the same host)
+        require(controllerListeners.nonEmpty,
+          s"${KafkaConfig.ControllerListenerNamesProp} must contain at least 
one value appearing in the '${KafkaConfig.ListenersProp}' configuration when 
running the KRaft controller role")
+        val listenerNameValues = listeners.map(_.listenerName.value).toSet
+        require(controllerListenerNames.forall(cln => 
listenerNameValues.contains(cln)),
+          s"${KafkaConfig.ControllerListenerNamesProp} must only contain 
values appearing in the '${KafkaConfig.ListenersProp}' configuration when 
running the KRaft controller role")
+        val addressSpecForThisNode = 
RaftConfig.parseVoterConnections(quorumVoters).get(nodeId)
+        addressSpecForThisNode match {
+          case inetAddressSpec: RaftConfig.InetAddressSpec => {
+            val quorumVotersPort = inetAddressSpec.address.getPort
+            require(controllerListeners.head.port == quorumVotersPort,
+              s"Port in ${KafkaConfig.QuorumVotersProp} for this controller 
node (${KafkaConfig.NodeIdProp}=$nodeId, port=$quorumVotersPort) does not match 
the port for the first controller listener in 
${KafkaConfig.ControllerListenerNamesProp} 
(${controllerListeners.head.listenerName.value()}, 
port=${controllerListeners.head.port})")
+          }
+          case _ =>
+        }
+      } else {
+        // only broker role
+        // controller.listener.names must be non-empty
+        // none of them can appear in listeners
+        // warn that only the first one is used if there is more than one
+        require(controllerListenerNames.exists(_.nonEmpty),
+          s"${KafkaConfig.ControllerListenerNamesProp} must contain at least 
one value when running KRaft with just the broker role")
+        if (controllerListenerNames.size > 1) {
+          warn(s"${KafkaConfig.ControllerListenerNamesProp} has multiple 
entries; only the first will be used since 
${KafkaConfig.ProcessRolesProp}=broker: $controllerListenerNames")
+        }
+        require(controllerListeners.isEmpty,
+          s"${KafkaConfig.ControllerListenerNamesProp} must not contain a 
value appearing in the '${KafkaConfig.ListenersProp}' configuration when 
running KRaft with just the broker role")

Review comment:
       Are we already checking this here? 
https://github.com/apache/kafka/pull/11503/files#diff-cbe6a8b71b05ed22cf09d97591225b588e9fca6caaf95d3b34a43262cfd23aa6R2037-R2039

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -2018,12 +2019,68 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
       "offsets.commit.required.acks must be greater or equal -1 and less or 
equal to offsets.topic.replication.factor")
     require(BrokerCompressionCodec.isValid(compressionType), "compression.type 
: " + compressionType + " is not valid." +
       " Valid options are " + 
BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
-    require(!processRoles.contains(ControllerRole) || 
controllerListeners.nonEmpty,
-      s"${KafkaConfig.ControllerListenerNamesProp} cannot be empty if the 
server has the controller role")
-
     val advertisedListenerNames = advertisedListeners.map(_.listenerName).toSet
+    if (usesSelfManagedQuorum) {
+      require(controlPlaneListenerName.isEmpty,
+        s"${KafkaConfig.ControlPlaneListenerNameProp} is not supported in 
KRaft mode.")
+      val sourceOfAdvertisedListeners = if 
(getString(KafkaConfig.AdvertisedListenersProp) != null)
+        s"${KafkaConfig.AdvertisedListenersProp}"
+      else
+        s"${KafkaConfig.ListenersProp}"
+      if (!processRoles.contains(BrokerRole)) {
+        // advertised listeners must be empty when not also running the broker 
role
+        require(advertisedListeners.isEmpty,
+          sourceOfAdvertisedListeners +
+            s" must only contain KRaft controller listeners from 
${KafkaConfig.ControllerListenerNamesProp} when 
${KafkaConfig.ProcessRolesProp}=controller")
+      } else {
+        // when running broker role advertised listeners cannot contain 
controller listeners
+        require(!advertisedListenerNames.exists(aln => 
controllerListenerNames.contains(aln.value())),
+          sourceOfAdvertisedListeners +
+            s" must not contain KRaft controller listeners from 
${KafkaConfig.ControllerListenerNamesProp} when ${KafkaConfig.ProcessRolesProp} 
contains the broker role")
+      }
+      if (processRoles.contains(ControllerRole)) {
+        // has controller role (and optionally broker role as well)
+        // controller.listener.names must be non-empty
+        // every one must appear in listeners
+        // the port appearing in controller.quorum.voters for this node must 
match the port of the first controller listener
+        // (we allow other nodes' voter ports to differ to support running 
multiple controllers on the same host)
+        require(controllerListeners.nonEmpty,
+          s"${KafkaConfig.ControllerListenerNamesProp} must contain at least 
one value appearing in the '${KafkaConfig.ListenersProp}' configuration when 
running the KRaft controller role")
+        val listenerNameValues = listeners.map(_.listenerName.value).toSet
+        require(controllerListenerNames.forall(cln => 
listenerNameValues.contains(cln)),
+          s"${KafkaConfig.ControllerListenerNamesProp} must only contain 
values appearing in the '${KafkaConfig.ListenersProp}' configuration when 
running the KRaft controller role")
+        val addressSpecForThisNode = 
RaftConfig.parseVoterConnections(quorumVoters).get(nodeId)
+        addressSpecForThisNode match {
+          case inetAddressSpec: RaftConfig.InetAddressSpec => {
+            val quorumVotersPort = inetAddressSpec.address.getPort
+            require(controllerListeners.head.port == quorumVotersPort,
+              s"Port in ${KafkaConfig.QuorumVotersProp} for this controller 
node (${KafkaConfig.NodeIdProp}=$nodeId, port=$quorumVotersPort) does not match 
the port for the first controller listener in 
${KafkaConfig.ControllerListenerNamesProp} 
(${controllerListeners.head.listenerName.value()}, 
port=${controllerListeners.head.port})")
+          }
+          case _ =>

Review comment:
       Can you add a comment as to why we are skipping this case? For example, 
I think we hit this case both when `get(nodeId)` is `null` and 
`UnknownAddressSpec`.
   
   Please check this code and see if we can merge this two validation code:
   
   ```scala
           // Validate process.roles with controller.quorum.voters
           val voterIds: Set[Integer] = 
RaftConfig.parseVoterConnections(quorumVoters).asScala.keySet.toSet
           if (voterIds.isEmpty) {
             throw new ConfigException(s"If using 
${KafkaConfig.ProcessRolesProp}, ${KafkaConfig.QuorumVotersProp} must contain a 
parseable set of voters.")
           } else if (processRoles.contains(ControllerRole)) {
             // Ensure that controllers use their node.id as a voter in 
controller.quorum.voters
             require(voterIds.contains(nodeId), s"If 
${KafkaConfig.ProcessRolesProp} contains the 'controller' role, the node id 
$nodeId must be included in the set of voters 
${KafkaConfig.QuorumVotersProp}=$voterIds  ")
           } else {
             // Ensure that the broker's node.id is not an id in 
controller.quorum.voters
             require(!voterIds.contains(nodeId), s"If 
${KafkaConfig.ProcessRolesProp} does not contain the 'controller' role, the 
node id $nodeId must not be included in the set of voters 
${KafkaConfig.QuorumVotersPr  op}=$voterIds")
           }
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to