rondagostino commented on a change in pull request #11503:
URL: https://github.com/apache/kafka/pull/11503#discussion_r759519621



##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -2007,12 +2008,47 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
       "offsets.commit.required.acks must be greater or equal -1 and less or 
equal to offsets.topic.replication.factor")
     require(BrokerCompressionCodec.isValid(compressionType), "compression.type 
: " + compressionType + " is not valid." +
       " Valid options are " + 
BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
-    require(!processRoles.contains(ControllerRole) || 
controllerListeners.nonEmpty,
-      s"${KafkaConfig.ControllerListenerNamesProp} cannot be empty if the 
server has the controller role")
+    if (usesSelfManagedQuorum) {
+      require(controlPlaneListenerName.isEmpty,
+        s"${KafkaConfig.ControlPlaneListenerNameProp} is not supported in 
KRaft mode.")
+      if (processRoles.contains(ControllerRole)) {
+        // has controller role (and optionally broker role as well)
+        // controller.listener.names must be non-empty
+        // every one must appear in listeners
+        // each port appearing in controller.quorum.voters must match the port 
in exactly one controller listener
+        require(controllerListeners.nonEmpty,
+          s"${KafkaConfig.ControllerListenerNamesProp} must contain at least 
one value appearing in the '${KafkaConfig.ListenersProp}' configuration when 
running the KRaft controller role")
+        val listenerNameValues = listeners.map(_.listenerName.value).toSet
+        require(controllerListenerNames.forall(cln => 
listenerNameValues.contains(cln)),
+          s"${KafkaConfig.ControllerListenerNamesProp} must only contain 
values appearing in the '${KafkaConfig.ListenersProp}' configuration when 
running the KRaft controller role")
+        RaftConfig.parseVoterConnections(quorumVoters).asScala.foreach { case 
(nodeId, addressSpec) =>
+          addressSpec match {
+            case inetAddressSpec: RaftConfig.InetAddressSpec => {
+              val quorumVotersPort = inetAddressSpec.address.getPort
+              val controllerListenersWithSamePort = 
controllerListeners.filter(_.port == quorumVotersPort)
+              require(controllerListenersWithSamePort.size == 1,
+                s"Port in ${KafkaConfig.QuorumVotersProp} for controller node 
with ${KafkaConfig.NodeIdProp}=$nodeId ($quorumVotersPort) does not match the 
port for any controller listener in ${KafkaConfig.ControllerListenerNamesProp}")
+            }
+            case _ =>
+          }
+        }
+      } else {
+        // only broker role
+        // controller.listener.names must be non-empty
+        // none of them can appear in listeners
+        require(controllerListenerNames.exists(_.nonEmpty),
+          s"${KafkaConfig.ControllerListenerNamesProp} must contain at least 
one value when running KRaft with just the broker role")
+        require(controllerListeners.isEmpty,
+          s"${KafkaConfig.ControllerListenerNamesProp} must not contain a 
value appearing in the '${KafkaConfig.ListenersProp}' configuration when 
running KRaft with just the broker role")
+      }
+    }
 
     val advertisedListenerNames = advertisedListeners.map(_.listenerName).toSet
     val listenerNames = listeners.map(_.listenerName).toSet
     if (processRoles.isEmpty || processRoles.contains(BrokerRole)) {
+      require(advertisedListenerNames.nonEmpty,
+        "There must be at least one advertised listener." + (
+          if (processRoles.contains(BrokerRole)) s" Perhaps all listeners 
appear in ${ControllerListenerNamesProp}?" else ""))

Review comment:
       > Could we move it to the other BrokerRole handling above so that all 
the logic is consolidated
   
   Done, though I only moved this check and not the two immediately following 
it since this is the only one that is remotely related to a KRaft config.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to