hachikuji commented on a change in pull request #11503:
URL: https://github.com/apache/kafka/pull/11503#discussion_r765427639



##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -1887,15 +1891,21 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   }
 
   def listeners: Seq[EndPoint] =
-    CoreUtils.listenerListToEndPoints(getString(KafkaConfig.ListenersProp), 
listenerSecurityProtocolMap)
+    CoreUtils.listenerListToEndPoints(getString(KafkaConfig.ListenersProp), 
effectiveListenerSecurityProtocolMap)
 
-  def controllerListenerNames: Seq[String] =
-    
Option(getString(KafkaConfig.ControllerListenerNamesProp)).getOrElse("").split(",")
+  def controllerListenerNames: Seq[String] = {
+    val value = 
Option(getString(KafkaConfig.ControllerListenerNamesProp)).getOrElse("")
+      if (value.isEmpty) {

Review comment:
       nit: this looks misaligned

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -1959,10 +1969,28 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
     }
   }
 
-  def listenerSecurityProtocolMap: Map[ListenerName, SecurityProtocol] = {
-    getMap(KafkaConfig.ListenerSecurityProtocolMapProp, 
getString(KafkaConfig.ListenerSecurityProtocolMapProp))
+  def effectiveListenerSecurityProtocolMap: Map[ListenerName, 
SecurityProtocol] = {
+    val mapValue = getMap(KafkaConfig.ListenerSecurityProtocolMapProp, 
getString(KafkaConfig.ListenerSecurityProtocolMapProp))
       .map { case (listenerName, protocolName) =>
-      ListenerName.normalised(listenerName) -> 
getSecurityProtocol(protocolName, KafkaConfig.ListenerSecurityProtocolMapProp)
+        ListenerName.normalised(listenerName) -> 
getSecurityProtocol(protocolName, KafkaConfig.ListenerSecurityProtocolMapProp)
+      }
+    if (usesSelfManagedQuorum && 
!originals.containsKey(ListenerSecurityProtocolMapProp)) {
+      // Nothing was specified explicitly for listener.security.protocol.map, 
so we are using the default value,
+      // and we are using KRaft.
+      // Add PLAINTEXT mappings for controller listeners as long as there is 
no SSL or SASL_{PLAINTEXT,SSL} in use
+      def isSslOrSasl(name: String) : Boolean = 
name.equals(SecurityProtocol.SSL.name) || 
name.equals(SecurityProtocol.SASL_SSL.name) || 
name.equals(SecurityProtocol.SASL_PLAINTEXT.name)

Review comment:
       nit: remove space before colon

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -1959,10 +1969,28 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
     }
   }
 
-  def listenerSecurityProtocolMap: Map[ListenerName, SecurityProtocol] = {
-    getMap(KafkaConfig.ListenerSecurityProtocolMapProp, 
getString(KafkaConfig.ListenerSecurityProtocolMapProp))
+  def effectiveListenerSecurityProtocolMap: Map[ListenerName, 
SecurityProtocol] = {

Review comment:
       One more thing to check is that the validations in 
`DynamicListenerConfig` make sense in the context of these changes.

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -2018,12 +2034,103 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
       "offsets.commit.required.acks must be greater or equal -1 and less or 
equal to offsets.topic.replication.factor")
     require(BrokerCompressionCodec.isValid(compressionType), "compression.type 
: " + compressionType + " is not valid." +
       " Valid options are " + 
BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
-    require(!processRoles.contains(ControllerRole) || 
controllerListeners.nonEmpty,
-      s"${KafkaConfig.ControllerListenerNamesProp} cannot be empty if the 
server has the controller role")
+    val advertisedListenerNames = 
effectiveAdvertisedListeners.map(_.listenerName).toSet
+
+    // validate KRaft-related configs
+    val voterAddressSpecsByNodeId = 
RaftConfig.parseVoterConnections(quorumVoters)
+    def validateNonEmptyQuorumVotersForKRaft(): Unit = {
+      if (voterAddressSpecsByNodeId.isEmpty) {
+        throw new ConfigException(s"If using ${KafkaConfig.ProcessRolesProp}, 
${KafkaConfig.QuorumVotersProp} must contain a parseable set of voters.")
+      }
+    }
+    def validateControlPlaneListenerEmptyForKRaft(): Unit = {
+      require(controlPlaneListenerName.isEmpty,
+        s"${KafkaConfig.ControlPlaneListenerNameProp} is not supported in 
KRaft mode. KRaft uses ${KafkaConfig.ControllerListenerNamesProp} instead.")
+    }
+    def 
validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker(): 
Unit = {
+      require(!advertisedListenerNames.exists(aln => 
controllerListenerNames.contains(aln.value())),
+        s"The advertised.listeners config must not contain KRaft controller 
listeners from ${KafkaConfig.ControllerListenerNamesProp} when 
${KafkaConfig.ProcessRolesProp} contains the broker role because Kafka clients 
that send requests via advertised listeners do not send requests to KRaft 
controllers -- they only send requests to KRaft brokers.")
+    }
+    def validateControllerQuorumVotersMustContainNodeIDForKRaftController(): 
Unit = {

Review comment:
       nit: everywhere else, we use `NodeId` instead of `NodeID`

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -748,7 +750,9 @@ object KafkaConfig {
     "Different security (SSL and SASL) settings can be configured for each 
listener by adding a normalised " +
     "prefix (the listener name is lowercased) to the config name. For example, 
to set a different keystore for the " +
     "INTERNAL listener, a config with name 
<code>listener.name.internal.ssl.keystore.location</code> would be set. " +
-    "If the config for the listener name is not set, the config will fallback 
to the generic config (i.e. <code>ssl.keystore.location</code>). "
+    "If the config for the listener name is not set, the config will fallback 
to the generic config (i.e. <code>ssl.keystore.location</code>). " +
+    "Note that in KRaft a default mapping from the listener names defined by 
controller.listener.names to PLAINTEXT " +

Review comment:
       nit: consistent with the rest of the comment, can we use `<code>` blocks 
around `controller.listener.names`?

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -2018,12 +2034,103 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
       "offsets.commit.required.acks must be greater or equal -1 and less or 
equal to offsets.topic.replication.factor")
     require(BrokerCompressionCodec.isValid(compressionType), "compression.type 
: " + compressionType + " is not valid." +
       " Valid options are " + 
BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
-    require(!processRoles.contains(ControllerRole) || 
controllerListeners.nonEmpty,
-      s"${KafkaConfig.ControllerListenerNamesProp} cannot be empty if the 
server has the controller role")
+    val advertisedListenerNames = 
effectiveAdvertisedListeners.map(_.listenerName).toSet
+
+    // validate KRaft-related configs
+    val voterAddressSpecsByNodeId = 
RaftConfig.parseVoterConnections(quorumVoters)
+    def validateNonEmptyQuorumVotersForKRaft(): Unit = {
+      if (voterAddressSpecsByNodeId.isEmpty) {
+        throw new ConfigException(s"If using ${KafkaConfig.ProcessRolesProp}, 
${KafkaConfig.QuorumVotersProp} must contain a parseable set of voters.")
+      }
+    }
+    def validateControlPlaneListenerEmptyForKRaft(): Unit = {
+      require(controlPlaneListenerName.isEmpty,
+        s"${KafkaConfig.ControlPlaneListenerNameProp} is not supported in 
KRaft mode. KRaft uses ${KafkaConfig.ControllerListenerNamesProp} instead.")
+    }
+    def 
validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker(): 
Unit = {
+      require(!advertisedListenerNames.exists(aln => 
controllerListenerNames.contains(aln.value())),
+        s"The advertised.listeners config must not contain KRaft controller 
listeners from ${KafkaConfig.ControllerListenerNamesProp} when 
${KafkaConfig.ProcessRolesProp} contains the broker role because Kafka clients 
that send requests via advertised listeners do not send requests to KRaft 
controllers -- they only send requests to KRaft brokers.")
+    }
+    def validateControllerQuorumVotersMustContainNodeIDForKRaftController(): 
Unit = {
+      require(voterAddressSpecsByNodeId.containsKey(nodeId),
+        s"If ${KafkaConfig.ProcessRolesProp} contains the 'controller' role, 
the node id $nodeId must be included in the set of voters 
${KafkaConfig.QuorumVotersProp}=${voterAddressSpecsByNodeId.asScala.keySet.toSet}")
+    }
+    def validateControllerListenerExistsForKRaftController(): Unit = {
+      require(controllerListeners.nonEmpty,
+        s"${KafkaConfig.ControllerListenerNamesProp} must contain at least one 
value appearing in the '${KafkaConfig.ListenersProp}' configuration when 
running the KRaft controller role")
+    }
+    def 
validateControllerListenerNamesMustAppearInListenersForKRaftController(): Unit 
= {
+      val listenerNameValues = listeners.map(_.listenerName.value).toSet
+      require(controllerListenerNames.forall(cln => 
listenerNameValues.contains(cln)),
+        s"${KafkaConfig.ControllerListenerNamesProp} must only contain values 
appearing in the '${KafkaConfig.ListenersProp}' configuration when running the 
KRaft controller role")
+    }
+    def validateAdvertisedListenersNonEmptyForBroker(): Unit = {
+      require(advertisedListenerNames.nonEmpty,
+        "There must be at least one advertised listener." + (
+          if (processRoles.contains(BrokerRole)) s" Perhaps all listeners 
appear in ${ControllerListenerNamesProp}?" else ""))
+    }
+    if (processRoles == Set(BrokerRole)) {
+      // KRaft broker-only
+      validateNonEmptyQuorumVotersForKRaft()
+      validateControlPlaneListenerEmptyForKRaft()
+      
validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker()
+      // nodeId must not appear in controller.quorum.voters
+      require(!voterAddressSpecsByNodeId.containsKey(nodeId),
+        s"If ${KafkaConfig.ProcessRolesProp} contains just the 'broker' role, 
the node id $nodeId must not be included in the set of voters 
${KafkaConfig.QuorumVotersProp}=${voterAddressSpecsByNodeId.asScala.keySet.toSet}")
+      // controller.listener.names must be non-empty...
+      require(controllerListenerNames.nonEmpty,
+        s"${KafkaConfig.ControllerListenerNamesProp} must contain at least one 
value when running KRaft with just the broker role")
+      // controller.listener.names are forbidden in listeners...
+      require(controllerListeners.isEmpty,
+        s"${KafkaConfig.ControllerListenerNamesProp} must not contain a value 
appearing in the '${KafkaConfig.ListenersProp}' configuration when running 
KRaft with just the broker role")
+      // controller.listener.names must all appear in 
listener.security.protocol.map
+      controllerListenerNames.foreach { name =>
+        val listenerName = ListenerName.normalised(name)
+        if (!effectiveListenerSecurityProtocolMap.contains(listenerName)) {
+          throw new ConfigException(s"Controller listener with name 
${listenerName.value} defined in " +
+            s"${KafkaConfig.ControllerListenerNamesProp} not found in 
${KafkaConfig.ListenerSecurityProtocolMapProp}.")

Review comment:
       It might be nice to add in parenthesis the following:
   `(an explicit security mapping for each controller listener is required if 
${KafkaConfig.ListenerSecurityProtocolMapProp} is non-empty, or if there are 
security protocols other than PLAINTEXT in use)`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to