jsancio commented on code in PR #16892:
URL: https://github.com/apache/kafka/pull/16892#discussion_r1719855994
##########
core/src/main/scala/kafka/server/KafkaConfig.scala:
##########
@@ -890,47 +890,49 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
// validate KRaft-related configs
val voterIds = QuorumConfig.parseVoterIds(quorumVoters)
- def validateNonEmptyQuorumVotersForKRaft(): Unit = {
- if (voterIds.isEmpty) {
- throw new ConfigException(s"If using
${KRaftConfigs.PROCESS_ROLES_CONFIG}, ${QuorumConfig.QUORUM_VOTERS_CONFIG} must
contain a parseable set of voters.")
+ def validateQuorumVotersAndQuorumBootstrapServerForKRaft(): Unit = {
+ if (voterIds.isEmpty && quorumBootstrapServers.isEmpty) {
+ throw new ConfigException(
+ s"""If using ${KRaftConfigs.PROCESS_ROLES_CONFIG}, either
${QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG} must
+ | contain the set of bootstrap controllers or
${QuorumConfig.QUORUM_VOTERS_CONFIG} must contain a parseable
+ | set of controllers.""".stripMargin
+ )
}
}
- def validateNonEmptyQuorumVotersForMigration(): Unit = {
- if (voterIds.isEmpty) {
- throw new ConfigException(s"If using
${KRaftConfigs.MIGRATION_ENABLED_CONFIG}, ${QuorumConfig.QUORUM_VOTERS_CONFIG}
must contain a parseable set of voters.")
+ def validateQuorumVotersAndQuorumBootstrapServerForMigration(): Unit = {
+ if (voterIds.isEmpty && quorumBootstrapServers.isEmpty) {
+ throw new ConfigException(
+ s"""If using ${KRaftConfigs.MIGRATION_ENABLED_CONFIG}, either
${QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG} must
+ | contain the set of bootstrap controllers or
${QuorumConfig.QUORUM_VOTERS_CONFIG} must contain a parseable
+ | set of controllers.""".stripMargin)
}
}
def validateControlPlaneListenerEmptyForKRaft(): Unit = {
require(controlPlaneListenerName.isEmpty,
s"${SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG} is not
supported in KRaft mode.")
}
- def
validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker():
Unit = {
- require(advertisedBrokerListenerNames.forall(aln =>
!controllerListenerNames.contains(aln.value())),
- s"The advertised.listeners config must not contain KRaft controller
listeners from ${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} when
${KRaftConfigs.PROCESS_ROLES_CONFIG} contains the broker role because Kafka
clients that send requests via advertised listeners do not send requests to
KRaft controllers -- they only send requests to KRaft brokers.")
- }
def validateControllerQuorumVotersMustContainNodeIdForKRaftController():
Unit = {
- require(voterIds.contains(nodeId),
+ require(voterIds.isEmpty || voterIds.contains(nodeId),
s"If ${KRaftConfigs.PROCESS_ROLES_CONFIG} contains the 'controller'
role, the node id $nodeId must be included in the set of voters
${QuorumConfig.QUORUM_VOTERS_CONFIG}=${voterIds.asScala.toSet}")
}
- def validateControllerListenerExistsForKRaftController(): Unit = {
- require(controllerListeners.nonEmpty,
+ def validateAdvertisedControllerListenersNonEmptyForKRaftController():
Unit = {
+ require(effectiveAdvertisedControllerListeners.nonEmpty,
s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must contain at
least one value appearing in the '${SocketServerConfigs.LISTENERS_CONFIG}'
configuration when running the KRaft controller role")
}
def
validateControllerListenerNamesMustAppearInListenersForKRaftController(): Unit
= {
val listenerNameValues = listeners.map(_.listenerName.value).toSet
require(controllerListenerNames.forall(cln =>
listenerNameValues.contains(cln)),
s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must only contain
values appearing in the '${SocketServerConfigs.LISTENERS_CONFIG}' configuration
when running the KRaft controller role")
}
- def validateAdvertisedListenersNonEmptyForBroker(): Unit = {
+ def validateAdvertisedBrokerListenersNonEmptyForBroker(): Unit = {
require(advertisedBrokerListenerNames.nonEmpty,
- "There must be at least one advertised listener." + (
+ "There must be at least one broker advertised listener." + (
if (processRoles.contains(ProcessRole.BrokerRole)) s" Perhaps all
listeners appear in ${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG}?" else ""))
}
if (processRoles == Set(ProcessRole.BrokerRole)) {
// KRaft broker-only
- validateNonEmptyQuorumVotersForKRaft()
+ validateQuorumVotersAndQuorumBootstrapServerForKRaft()
validateControlPlaneListenerEmptyForKRaft()
-
validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker()
Review Comment:
No. There are two advertised listeners that are used by each node when
"registering" with the active controller. The brokers use
`KafkaConfig#effectiveAdvertisedBrokerListeners()`. The controllers use
`KafkaConfig#effectiveAdvertisedControllerListeners()`. Each of those methods
filter the `listeners` and `advertised.listeners` according to the names in
`controller.listener.names`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]