This is an automated email from the ASF dual-hosted git repository. taragolis pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new e0dd075d1b AIP-21: yandexcloud: rename files, emit deprecation warning (#39618) e0dd075d1b is described below commit e0dd075d1b4ef48bdae5a9a690a27518e4438104 Author: uzhastik <uz...@ydb.tech> AuthorDate: Wed May 15 21:19:05 2024 +0300 AIP-21: yandexcloud: rename files, emit deprecation warning (#39618) * AIP-21: rename files, emit deprecation warning * revert most of the changes * fix static checks * remove first line * revert file * fix static checks * revert * remove 4 ignores * ignore missing tests for yandexcloud deprecated modules --- .../hooks/{yandexcloud_dataproc.py => dataproc.py} | 0 .../providers/yandex/hooks/yandexcloud_dataproc.py | 25 +- airflow/providers/yandex/hooks/yq.py | 15 +- .../{yandexcloud_dataproc.py => dataproc.py} | 2 +- .../yandex/operators/yandexcloud_dataproc.py | 521 +-------------------- airflow/providers/yandex/provider.yaml | 7 +- generated/provider_dependencies.json | 2 +- .../in_container/run_provider_yaml_files_check.py | 2 + tests/always/test_project_structure.py | 2 + tests/deprecations_ignore.yml | 4 - ...st_yandexcloud_dataproc.py => test_dataproc.py} | 2 +- tests/providers/yandex/hooks/test_yandex.py | 14 +- tests/providers/yandex/hooks/test_yq.py | 21 +- ...st_yandexcloud_dataproc.py => test_dataproc.py} | 10 +- .../yandex/example_yandexcloud_dataproc.py | 2 +- .../example_yandexcloud_dataproc_lightweight.py | 2 +- 16 files changed, 50 insertions(+), 581 deletions(-) diff --git a/airflow/providers/yandex/hooks/yandexcloud_dataproc.py b/airflow/providers/yandex/hooks/dataproc.py similarity index 100% copy from airflow/providers/yandex/hooks/yandexcloud_dataproc.py copy to airflow/providers/yandex/hooks/dataproc.py diff --git a/airflow/providers/yandex/hooks/yandexcloud_dataproc.py b/airflow/providers/yandex/hooks/yandexcloud_dataproc.py index 9b1862205e..6256769c92 100644 --- a/airflow/providers/yandex/hooks/yandexcloud_dataproc.py +++ b/airflow/providers/yandex/hooks/yandexcloud_dataproc.py @@ -14,22 +14,17 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -from __future__ import annotations - -from airflow.providers.yandex.hooks.yandex import YandexCloudBaseHook +"""This module is deprecated. Please use :mod:`airflow.providers.yandex.hooks.dataproc` instead.""" +from __future__ import annotations -class DataprocHook(YandexCloudBaseHook): - """ - A base hook for Yandex.Cloud Data Proc. +import warnings - :param yandex_conn_id: The connection ID to use when fetching connection info. - """ +from airflow.exceptions import AirflowProviderDeprecationWarning +from airflow.providers.yandex.hooks.dataproc import * # noqa: F403 - def __init__(self, *args, **kwargs) -> None: - super().__init__(*args, **kwargs) - self.cluster_id = None - self.client = self.sdk.wrappers.Dataproc( - default_folder_id=self.default_folder_id, - default_public_ssh_key=self.default_public_ssh_key, - ) +warnings.warn( + "This module is deprecated. Please use `airflow.providers.yandex.hooks.dataproc` instead.", + AirflowProviderDeprecationWarning, + stacklevel=2, +) diff --git a/airflow/providers/yandex/hooks/yq.py b/airflow/providers/yandex/hooks/yq.py index 37f7550df6..c62595fe0a 100644 --- a/airflow/providers/yandex/hooks/yq.py +++ b/airflow/providers/yandex/hooks/yq.py @@ -19,9 +19,7 @@ from __future__ import annotations from datetime import timedelta from typing import Any -import yandexcloud -import yandexcloud._auth_fabric as auth_fabric -from yandex.cloud.iam.v1.iam_token_service_pb2_grpc import IamTokenServiceStub +import yandexcloud.auth as yc_auth from yandex_query_client import YQHttpClient, YQHttpClientConfig from airflow.providers.yandex.hooks.yandex import YandexCloudBaseHook @@ -100,13 +98,4 @@ class YQHook(YandexCloudBaseHook): if iam_token is not None: return iam_token - service_account_key = self.credentials.get("service_account_key") - # if service_account_key is None metadata server will be used - token_requester = auth_fabric.get_auth_token_requester(service_account_key=service_account_key) - - if service_account_key is None: - return token_requester.get_token() - - sdk = yandexcloud.SDK() - client = sdk.client(IamTokenServiceStub) - return client.Create(token_requester.get_token_request()).iam_token + return yc_auth.get_auth_token(service_account_key=self.credentials.get("service_account_key")) diff --git a/airflow/providers/yandex/operators/yandexcloud_dataproc.py b/airflow/providers/yandex/operators/dataproc.py similarity index 99% copy from airflow/providers/yandex/operators/yandexcloud_dataproc.py copy to airflow/providers/yandex/operators/dataproc.py index 49bf136c3f..94bf096b66 100644 --- a/airflow/providers/yandex/operators/yandexcloud_dataproc.py +++ b/airflow/providers/yandex/operators/dataproc.py @@ -22,7 +22,7 @@ from typing import TYPE_CHECKING, Iterable, Sequence from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.models import BaseOperator -from airflow.providers.yandex.hooks.yandexcloud_dataproc import DataprocHook +from airflow.providers.yandex.hooks.dataproc import DataprocHook if TYPE_CHECKING: from airflow.utils.context import Context diff --git a/airflow/providers/yandex/operators/yandexcloud_dataproc.py b/airflow/providers/yandex/operators/yandexcloud_dataproc.py index 49bf136c3f..1f7db5d512 100644 --- a/airflow/providers/yandex/operators/yandexcloud_dataproc.py +++ b/airflow/providers/yandex/operators/yandexcloud_dataproc.py @@ -14,522 +14,17 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +"""This module is deprecated. Please use :mod:`airflow.providers.yandex.operators.dataproc` instead.""" + from __future__ import annotations import warnings -from dataclasses import dataclass -from typing import TYPE_CHECKING, Iterable, Sequence from airflow.exceptions import AirflowProviderDeprecationWarning -from airflow.models import BaseOperator -from airflow.providers.yandex.hooks.yandexcloud_dataproc import DataprocHook - -if TYPE_CHECKING: - from airflow.utils.context import Context - - -@dataclass -class InitializationAction: - """Data for initialization action to be run at start of DataProc cluster.""" - - uri: str # Uri of the executable file - args: Sequence[str] # Arguments to the initialization action - timeout: int # Execution timeout - - -class DataprocCreateClusterOperator(BaseOperator): - """Creates Yandex.Cloud Data Proc cluster. - - :param folder_id: ID of the folder in which cluster should be created. - :param cluster_name: Cluster name. Must be unique inside the folder. - :param cluster_description: Cluster description. - :param cluster_image_version: Cluster image version. Use default. - :param ssh_public_keys: List of SSH public keys that will be deployed to created compute instances. - :param subnet_id: ID of the subnetwork. All Data Proc cluster nodes will use one subnetwork. - :param services: List of services that will be installed to the cluster. Possible options: - HDFS, YARN, MAPREDUCE, HIVE, TEZ, ZOOKEEPER, HBASE, SQOOP, FLUME, SPARK, SPARK, ZEPPELIN, OOZIE - :param s3_bucket: Yandex.Cloud S3 bucket to store cluster logs. - Jobs will not work if the bucket is not specified. - :param zone: Availability zone to create cluster in. - Currently there are ru-central1-a, ru-central1-b and ru-central1-c. - :param service_account_id: Service account id for the cluster. - Service account can be created inside the folder. - :param masternode_resource_preset: Resources preset (CPU+RAM configuration) - for the primary node of the cluster. - :param masternode_disk_size: Masternode storage size in GiB. - :param masternode_disk_type: Masternode storage type. Possible options: network-ssd, network-hdd. - :param datanode_resource_preset: Resources preset (CPU+RAM configuration) - for the data nodes of the cluster. - :param datanode_disk_size: Datanodes storage size in GiB. - :param datanode_disk_type: Datanodes storage type. Possible options: network-ssd, network-hdd. - :param computenode_resource_preset: Resources preset (CPU+RAM configuration) - for the compute nodes of the cluster. - :param computenode_disk_size: Computenodes storage size in GiB. - :param computenode_disk_type: Computenodes storage type. Possible options: network-ssd, network-hdd. - :param connection_id: ID of the Yandex.Cloud Airflow connection. - :param computenode_max_count: Maximum number of nodes of compute autoscaling subcluster. - :param computenode_warmup_duration: The warmup time of the instance in seconds. During this time, - traffic is sent to the instance, - but instance metrics are not collected. In seconds. - :param computenode_stabilization_duration: Minimum amount of time in seconds for monitoring before - Instance Groups can reduce the number of instances in the group. - During this time, the group size doesn't decrease, - even if the new metric values indicate that it should. In seconds. - :param computenode_preemptible: Preemptible instances are stopped at least once every 24 hours, - and can be stopped at any time if their resources are needed by Compute. - :param computenode_cpu_utilization_target: Defines an autoscaling rule - based on the average CPU utilization of the instance group. - in percents. 10-100. - By default is not set and default autoscaling strategy is used. - :param computenode_decommission_timeout: Timeout to gracefully decommission nodes during downscaling. - In seconds - :param properties: Properties passed to main node software. - Docs: https://cloud.yandex.com/docs/data-proc/concepts/settings-list - :param enable_ui_proxy: Enable UI Proxy feature for forwarding Hadoop components web interfaces - Docs: https://cloud.yandex.com/docs/data-proc/concepts/ui-proxy - :param host_group_ids: Dedicated host groups to place VMs of cluster on. - Docs: https://cloud.yandex.com/docs/compute/concepts/dedicated-host - :param security_group_ids: User security groups. - Docs: https://cloud.yandex.com/docs/data-proc/concepts/network#security-groups - :param log_group_id: Id of log group to write logs. By default logs will be sent to default log group. - To disable cloud log sending set cluster property dataproc:disable_cloud_logging = true - Docs: https://cloud.yandex.com/docs/data-proc/concepts/logs - :param initialization_actions: Set of init-actions to run when cluster starts. - Docs: https://cloud.yandex.com/docs/data-proc/concepts/init-action - :param labels: Cluster labels as key:value pairs. No more than 64 per resource. - Docs: https://cloud.yandex.com/docs/resource-manager/concepts/labels - """ - - def __init__( - self, - *, - folder_id: str | None = None, - cluster_name: str | None = None, - cluster_description: str | None = "", - cluster_image_version: str | None = None, - ssh_public_keys: str | Iterable[str] | None = None, - subnet_id: str | None = None, - services: Iterable[str] = ("HDFS", "YARN", "MAPREDUCE", "HIVE", "SPARK"), - s3_bucket: str | None = None, - zone: str = "ru-central1-b", - service_account_id: str | None = None, - masternode_resource_preset: str | None = None, - masternode_disk_size: int | None = None, - masternode_disk_type: str | None = None, - datanode_resource_preset: str | None = None, - datanode_disk_size: int | None = None, - datanode_disk_type: str | None = None, - datanode_count: int = 1, - computenode_resource_preset: str | None = None, - computenode_disk_size: int | None = None, - computenode_disk_type: str | None = None, - computenode_count: int = 0, - computenode_max_hosts_count: int | None = None, - computenode_measurement_duration: int | None = None, - computenode_warmup_duration: int | None = None, - computenode_stabilization_duration: int | None = None, - computenode_preemptible: bool = False, - computenode_cpu_utilization_target: int | None = None, - computenode_decommission_timeout: int | None = None, - connection_id: str | None = None, - properties: dict[str, str] | None = None, - enable_ui_proxy: bool = False, - host_group_ids: Iterable[str] | None = None, - security_group_ids: Iterable[str] | None = None, - log_group_id: str | None = None, - initialization_actions: Iterable[InitializationAction] | None = None, - labels: dict[str, str] | None = None, - **kwargs, - ) -> None: - super().__init__(**kwargs) - self.folder_id = folder_id - self.yandex_conn_id = connection_id - self.cluster_name = cluster_name - self.cluster_description = cluster_description - self.cluster_image_version = cluster_image_version - self.ssh_public_keys = ssh_public_keys - self.subnet_id = subnet_id - self.services = services - self.s3_bucket = s3_bucket - self.zone = zone - self.service_account_id = service_account_id - self.masternode_resource_preset = masternode_resource_preset - self.masternode_disk_size = masternode_disk_size - self.masternode_disk_type = masternode_disk_type - self.datanode_resource_preset = datanode_resource_preset - self.datanode_disk_size = datanode_disk_size - self.datanode_disk_type = datanode_disk_type - self.datanode_count = datanode_count - self.computenode_resource_preset = computenode_resource_preset - self.computenode_disk_size = computenode_disk_size - self.computenode_disk_type = computenode_disk_type - self.computenode_count = computenode_count - self.computenode_max_hosts_count = computenode_max_hosts_count - self.computenode_measurement_duration = computenode_measurement_duration - self.computenode_warmup_duration = computenode_warmup_duration - self.computenode_stabilization_duration = computenode_stabilization_duration - self.computenode_preemptible = computenode_preemptible - self.computenode_cpu_utilization_target = computenode_cpu_utilization_target - self.computenode_decommission_timeout = computenode_decommission_timeout - self.properties = properties - self.enable_ui_proxy = enable_ui_proxy - self.host_group_ids = host_group_ids - self.security_group_ids = security_group_ids - self.log_group_id = log_group_id - self.initialization_actions = initialization_actions - self.labels = labels - - self.hook: DataprocHook | None = None - - def execute(self, context: Context) -> dict: - self.hook = DataprocHook( - yandex_conn_id=self.yandex_conn_id, - ) - operation_result = self.hook.client.create_cluster( - folder_id=self.folder_id, - cluster_name=self.cluster_name, - cluster_description=self.cluster_description, - cluster_image_version=self.cluster_image_version, - ssh_public_keys=self.ssh_public_keys, - subnet_id=self.subnet_id, - services=self.services, - s3_bucket=self.s3_bucket, - zone=self.zone, - service_account_id=self.service_account_id or self.hook.default_service_account_id, - masternode_resource_preset=self.masternode_resource_preset, - masternode_disk_size=self.masternode_disk_size, - masternode_disk_type=self.masternode_disk_type, - datanode_resource_preset=self.datanode_resource_preset, - datanode_disk_size=self.datanode_disk_size, - datanode_disk_type=self.datanode_disk_type, - datanode_count=self.datanode_count, - computenode_resource_preset=self.computenode_resource_preset, - computenode_disk_size=self.computenode_disk_size, - computenode_disk_type=self.computenode_disk_type, - computenode_count=self.computenode_count, - computenode_max_hosts_count=self.computenode_max_hosts_count, - computenode_measurement_duration=self.computenode_measurement_duration, - computenode_warmup_duration=self.computenode_warmup_duration, - computenode_stabilization_duration=self.computenode_stabilization_duration, - computenode_preemptible=self.computenode_preemptible, - computenode_cpu_utilization_target=self.computenode_cpu_utilization_target, - computenode_decommission_timeout=self.computenode_decommission_timeout, - properties=self.properties, - enable_ui_proxy=self.enable_ui_proxy, - host_group_ids=self.host_group_ids, - security_group_ids=self.security_group_ids, - log_group_id=self.log_group_id, - labels=self.labels, - initialization_actions=self.initialization_actions - and [ - self.hook.sdk.wrappers.InitializationAction( - uri=init_action.uri, - args=init_action.args, - timeout=init_action.timeout, - ) - for init_action in self.initialization_actions - ], - ) - cluster_id = operation_result.response.id - - context["task_instance"].xcom_push(key="cluster_id", value=cluster_id) - # Deprecated - context["task_instance"].xcom_push(key="yandexcloud_connection_id", value=self.yandex_conn_id) - return cluster_id - - @property - def cluster_id(self): - return self.output - - -class DataprocBaseOperator(BaseOperator): - """Base class for DataProc operators working with given cluster. - - :param connection_id: ID of the Yandex.Cloud Airflow connection. - :param cluster_id: ID of the cluster to remove. (templated) - """ - - template_fields: Sequence[str] = ("cluster_id",) - - def __init__(self, *, yandex_conn_id: str | None = None, cluster_id: str | None = None, **kwargs) -> None: - super().__init__(**kwargs) - self.cluster_id = cluster_id - self.yandex_conn_id = yandex_conn_id - - def _setup(self, context: Context) -> DataprocHook: - if self.cluster_id is None: - self.cluster_id = context["task_instance"].xcom_pull(key="cluster_id") - if self.yandex_conn_id is None: - xcom_yandex_conn_id = context["task_instance"].xcom_pull(key="yandexcloud_connection_id") - if xcom_yandex_conn_id: - warnings.warn( - "Implicit pass of `yandex_conn_id` is deprecated, please pass it explicitly", - AirflowProviderDeprecationWarning, - stacklevel=2, - ) - self.yandex_conn_id = xcom_yandex_conn_id - - return DataprocHook(yandex_conn_id=self.yandex_conn_id) - - def execute(self, context: Context): - raise NotImplementedError() - - -class DataprocDeleteClusterOperator(DataprocBaseOperator): - """Deletes Yandex.Cloud Data Proc cluster. - - :param connection_id: ID of the Yandex.Cloud Airflow connection. - :param cluster_id: ID of the cluster to remove. (templated) - """ - - def __init__(self, *, connection_id: str | None = None, cluster_id: str | None = None, **kwargs) -> None: - super().__init__(yandex_conn_id=connection_id, cluster_id=cluster_id, **kwargs) - - def execute(self, context: Context) -> None: - hook = self._setup(context) - hook.client.delete_cluster(self.cluster_id) - - -class DataprocCreateHiveJobOperator(DataprocBaseOperator): - """Runs Hive job in Data Proc cluster. - - :param query: Hive query. - :param query_file_uri: URI of the script that contains Hive queries. Can be placed in HDFS or S3. - :param properties: A mapping of property names to values, used to configure Hive. - :param script_variables: Mapping of query variable names to values. - :param continue_on_failure: Whether to continue executing queries if a query fails. - :param name: Name of the job. Used for labeling. - :param cluster_id: ID of the cluster to run job in. - Will try to take the ID from Dataproc Hook object if it's specified. (templated) - :param connection_id: ID of the Yandex.Cloud Airflow connection. - """ - - def __init__( - self, - *, - query: str | None = None, - query_file_uri: str | None = None, - script_variables: dict[str, str] | None = None, - continue_on_failure: bool = False, - properties: dict[str, str] | None = None, - name: str = "Hive job", - cluster_id: str | None = None, - connection_id: str | None = None, - **kwargs, - ) -> None: - super().__init__(yandex_conn_id=connection_id, cluster_id=cluster_id, **kwargs) - self.query = query - self.query_file_uri = query_file_uri - self.script_variables = script_variables - self.continue_on_failure = continue_on_failure - self.properties = properties - self.name = name - - def execute(self, context: Context) -> None: - hook = self._setup(context) - hook.client.create_hive_job( - query=self.query, - query_file_uri=self.query_file_uri, - script_variables=self.script_variables, - continue_on_failure=self.continue_on_failure, - properties=self.properties, - name=self.name, - cluster_id=self.cluster_id, - ) - - -class DataprocCreateMapReduceJobOperator(DataprocBaseOperator): - """Runs Mapreduce job in Data Proc cluster. - - :param main_jar_file_uri: URI of jar file with job. - Can be placed in HDFS or S3. Can be specified instead of main_class. - :param main_class: Name of the main class of the job. Can be specified instead of main_jar_file_uri. - :param file_uris: URIs of files used in the job. Can be placed in HDFS or S3. - :param archive_uris: URIs of archive files used in the job. Can be placed in HDFS or S3. - :param jar_file_uris: URIs of JAR files used in the job. Can be placed in HDFS or S3. - :param properties: Properties for the job. - :param args: Arguments to be passed to the job. - :param name: Name of the job. Used for labeling. - :param cluster_id: ID of the cluster to run job in. - Will try to take the ID from Dataproc Hook object if it's specified. (templated) - :param connection_id: ID of the Yandex.Cloud Airflow connection. - """ - - def __init__( - self, - *, - main_class: str | None = None, - main_jar_file_uri: str | None = None, - jar_file_uris: Iterable[str] | None = None, - archive_uris: Iterable[str] | None = None, - file_uris: Iterable[str] | None = None, - args: Iterable[str] | None = None, - properties: dict[str, str] | None = None, - name: str = "Mapreduce job", - cluster_id: str | None = None, - connection_id: str | None = None, - **kwargs, - ) -> None: - super().__init__(yandex_conn_id=connection_id, cluster_id=cluster_id, **kwargs) - self.main_class = main_class - self.main_jar_file_uri = main_jar_file_uri - self.jar_file_uris = jar_file_uris - self.archive_uris = archive_uris - self.file_uris = file_uris - self.args = args - self.properties = properties - self.name = name - - def execute(self, context: Context) -> None: - hook = self._setup(context) - hook.client.create_mapreduce_job( - main_class=self.main_class, - main_jar_file_uri=self.main_jar_file_uri, - jar_file_uris=self.jar_file_uris, - archive_uris=self.archive_uris, - file_uris=self.file_uris, - args=self.args, - properties=self.properties, - name=self.name, - cluster_id=self.cluster_id, - ) - - -class DataprocCreateSparkJobOperator(DataprocBaseOperator): - """Runs Spark job in Data Proc cluster. - - :param main_jar_file_uri: URI of jar file with job. Can be placed in HDFS or S3. - :param main_class: Name of the main class of the job. - :param file_uris: URIs of files used in the job. Can be placed in HDFS or S3. - :param archive_uris: URIs of archive files used in the job. Can be placed in HDFS or S3. - :param jar_file_uris: URIs of JAR files used in the job. Can be placed in HDFS or S3. - :param properties: Properties for the job. - :param args: Arguments to be passed to the job. - :param name: Name of the job. Used for labeling. - :param cluster_id: ID of the cluster to run job in. - Will try to take the ID from Dataproc Hook object if it's specified. (templated) - :param connection_id: ID of the Yandex.Cloud Airflow connection. - :param packages: List of maven coordinates of jars to include on the driver and executor classpaths. - :param repositories: List of additional remote repositories to search for the maven coordinates - given with --packages. - :param exclude_packages: List of groupId:artifactId, to exclude while resolving the dependencies - provided in --packages to avoid dependency conflicts. - """ - - def __init__( - self, - *, - main_class: str | None = None, - main_jar_file_uri: str | None = None, - jar_file_uris: Iterable[str] | None = None, - archive_uris: Iterable[str] | None = None, - file_uris: Iterable[str] | None = None, - args: Iterable[str] | None = None, - properties: dict[str, str] | None = None, - name: str = "Spark job", - cluster_id: str | None = None, - connection_id: str | None = None, - packages: Iterable[str] | None = None, - repositories: Iterable[str] | None = None, - exclude_packages: Iterable[str] | None = None, - **kwargs, - ) -> None: - super().__init__(yandex_conn_id=connection_id, cluster_id=cluster_id, **kwargs) - self.main_class = main_class - self.main_jar_file_uri = main_jar_file_uri - self.jar_file_uris = jar_file_uris - self.archive_uris = archive_uris - self.file_uris = file_uris - self.args = args - self.properties = properties - self.name = name - self.packages = packages - self.repositories = repositories - self.exclude_packages = exclude_packages - - def execute(self, context: Context) -> None: - hook = self._setup(context) - hook.client.create_spark_job( - main_class=self.main_class, - main_jar_file_uri=self.main_jar_file_uri, - jar_file_uris=self.jar_file_uris, - archive_uris=self.archive_uris, - file_uris=self.file_uris, - args=self.args, - properties=self.properties, - packages=self.packages, - repositories=self.repositories, - exclude_packages=self.exclude_packages, - name=self.name, - cluster_id=self.cluster_id, - ) - - -class DataprocCreatePysparkJobOperator(DataprocBaseOperator): - """Runs Pyspark job in Data Proc cluster. - - :param main_python_file_uri: URI of python file with job. Can be placed in HDFS or S3. - :param python_file_uris: URIs of python files used in the job. Can be placed in HDFS or S3. - :param file_uris: URIs of files used in the job. Can be placed in HDFS or S3. - :param archive_uris: URIs of archive files used in the job. Can be placed in HDFS or S3. - :param jar_file_uris: URIs of JAR files used in the job. Can be placed in HDFS or S3. - :param properties: Properties for the job. - :param args: Arguments to be passed to the job. - :param name: Name of the job. Used for labeling. - :param cluster_id: ID of the cluster to run job in. - Will try to take the ID from Dataproc Hook object if it's specified. (templated) - :param connection_id: ID of the Yandex.Cloud Airflow connection. - :param packages: List of maven coordinates of jars to include on the driver and executor classpaths. - :param repositories: List of additional remote repositories to search for the maven coordinates - given with --packages. - :param exclude_packages: List of groupId:artifactId, to exclude while resolving the dependencies - provided in --packages to avoid dependency conflicts. - """ - - def __init__( - self, - *, - main_python_file_uri: str | None = None, - python_file_uris: Iterable[str] | None = None, - jar_file_uris: Iterable[str] | None = None, - archive_uris: Iterable[str] | None = None, - file_uris: Iterable[str] | None = None, - args: Iterable[str] | None = None, - properties: dict[str, str] | None = None, - name: str = "Pyspark job", - cluster_id: str | None = None, - connection_id: str | None = None, - packages: Iterable[str] | None = None, - repositories: Iterable[str] | None = None, - exclude_packages: Iterable[str] | None = None, - **kwargs, - ) -> None: - super().__init__(yandex_conn_id=connection_id, cluster_id=cluster_id, **kwargs) - self.main_python_file_uri = main_python_file_uri - self.python_file_uris = python_file_uris - self.jar_file_uris = jar_file_uris - self.archive_uris = archive_uris - self.file_uris = file_uris - self.args = args - self.properties = properties - self.name = name - self.packages = packages - self.repositories = repositories - self.exclude_packages = exclude_packages +from airflow.providers.yandex.operators.dataproc import * # noqa: F403 - def execute(self, context: Context) -> None: - hook = self._setup(context) - hook.client.create_pyspark_job( - main_python_file_uri=self.main_python_file_uri, - python_file_uris=self.python_file_uris, - jar_file_uris=self.jar_file_uris, - archive_uris=self.archive_uris, - file_uris=self.file_uris, - args=self.args, - properties=self.properties, - packages=self.packages, - repositories=self.repositories, - exclude_packages=self.exclude_packages, - name=self.name, - cluster_id=self.cluster_id, - ) +warnings.warn( + "This module is deprecated. Please use `airflow.providers.yandex.operators.dataproc` instead.", + AirflowProviderDeprecationWarning, + stacklevel=2, +) diff --git a/airflow/providers/yandex/provider.yaml b/airflow/providers/yandex/provider.yaml index efbf4bba93..764a08586f 100644 --- a/airflow/providers/yandex/provider.yaml +++ b/airflow/providers/yandex/provider.yaml @@ -50,7 +50,7 @@ versions: dependencies: - apache-airflow>=2.7.0 - - yandexcloud>=0.228.0 + - yandexcloud>=0.278.0 - yandex-query-client>=0.1.4 integrations: @@ -76,8 +76,7 @@ integrations: operators: - integration-name: Yandex.Cloud Dataproc python-modules: - - airflow.providers.yandex.operators.yandexcloud_dataproc - + - airflow.providers.yandex.operators.dataproc - integration-name: Yandex.Cloud YQ python-modules: - airflow.providers.yandex.operators.yq @@ -88,7 +87,7 @@ hooks: - airflow.providers.yandex.hooks.yandex - integration-name: Yandex.Cloud Dataproc python-modules: - - airflow.providers.yandex.hooks.yandexcloud_dataproc + - airflow.providers.yandex.hooks.dataproc - integration-name: Yandex.Cloud YQ python-modules: - airflow.providers.yandex.hooks.yq diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index d2c0896ba8..f10e93060e 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -1199,7 +1199,7 @@ "deps": [ "apache-airflow>=2.7.0", "yandex-query-client>=0.1.4", - "yandexcloud>=0.228.0" + "yandexcloud>=0.278.0" ], "devel-deps": [], "cross-providers-deps": [], diff --git a/scripts/in_container/run_provider_yaml_files_check.py b/scripts/in_container/run_provider_yaml_files_check.py index 4f74846147..d3cf8b1e2e 100755 --- a/scripts/in_container/run_provider_yaml_files_check.py +++ b/scripts/in_container/run_provider_yaml_files_check.py @@ -51,6 +51,8 @@ DEPRECATED_MODULES = [ "airflow.providers.cncf.kubernetes.triggers.kubernetes_pod", "airflow.providers.cncf.kubernetes.operators.kubernetes_pod", "airflow.providers.tabular.hooks.tabular", + "airflow.providers.yandex.hooks.yandexcloud_dataproc", + "airflow.providers.yandex.operators.yandexcloud_dataproc", ] KNOWN_DEPRECATED_CLASSES = [ diff --git a/tests/always/test_project_structure.py b/tests/always/test_project_structure.py index 428570ee58..ba4a467351 100644 --- a/tests/always/test_project_structure.py +++ b/tests/always/test_project_structure.py @@ -159,6 +159,8 @@ class TestProjectStructure: "tests/providers/redis/sensors/test_redis_key.py", "tests/providers/slack/notifications/test_slack_notifier.py", "tests/providers/snowflake/triggers/test_snowflake_trigger.py", + "tests/providers/yandex/hooks/test_yandexcloud_dataproc.py", + "tests/providers/yandex/operators/test_yandexcloud_dataproc.py", ] # TODO: Should we extend this test to cover other directories? diff --git a/tests/deprecations_ignore.yml b/tests/deprecations_ignore.yml index e63df14631..d6400074e6 100644 --- a/tests/deprecations_ignore.yml +++ b/tests/deprecations_ignore.yml @@ -691,8 +691,4 @@ - tests/providers/weaviate/operators/test_weaviate.py::TestWeaviateIngestOperator::test_execute_with_input_json - tests/providers/yandex/hooks/test_yandex.py::TestYandexHook::test_provider_user_agent - tests/providers/yandex/hooks/test_yandex.py::TestYandexHook::test_backcompat_prefix_works -- tests/providers/yandex/operators/test_yandexcloud_dataproc.py::TestDataprocClusterCreateOperator::test_create_hive_job_operator -- tests/providers/yandex/operators/test_yandexcloud_dataproc.py::TestDataprocClusterCreateOperator::test_create_mapreduce_job_operator -- tests/providers/yandex/operators/test_yandexcloud_dataproc.py::TestDataprocClusterCreateOperator::test_create_spark_job_operator -- tests/providers/yandex/operators/test_yandexcloud_dataproc.py::TestDataprocClusterCreateOperator::test_create_pyspark_job_operator - tests/providers/yandex/secrets/test_lockbox.py::TestLockboxSecretBackend::test_yandex_lockbox_secret_backend_get_connection_from_json diff --git a/tests/providers/yandex/hooks/test_yandexcloud_dataproc.py b/tests/providers/yandex/hooks/test_dataproc.py similarity index 99% rename from tests/providers/yandex/hooks/test_yandexcloud_dataproc.py rename to tests/providers/yandex/hooks/test_dataproc.py index 5629a3678e..0665b17b47 100644 --- a/tests/providers/yandex/hooks/test_yandexcloud_dataproc.py +++ b/tests/providers/yandex/hooks/test_dataproc.py @@ -20,7 +20,7 @@ import json from unittest import mock from airflow.models import Connection -from airflow.providers.yandex.hooks.yandexcloud_dataproc import DataprocHook +from airflow.providers.yandex.hooks.dataproc import DataprocHook # Airflow connection with type "yandexcloud" must be created CONNECTION_ID = "yandexcloud_default" diff --git a/tests/providers/yandex/hooks/test_yandex.py b/tests/providers/yandex/hooks/test_yandex.py index ea885c6b3f..ae6dac568b 100644 --- a/tests/providers/yandex/hooks/test_yandex.py +++ b/tests/providers/yandex/hooks/test_yandex.py @@ -40,7 +40,7 @@ class TestYandexHook: mock_get_connection["extra_dejson"] = "sds" mock_get_connection.extra_dejson = '{"extras": "extra"}' mock_get_connection.return_value = mock.Mock( - connection_id="yandexcloud_default", extra_dejson=extra_dejson + yandex_conn_id="yandexcloud_default", extra_dejson=extra_dejson ) mock_get_credentials.return_value = {"token": 122323} @@ -54,7 +54,7 @@ class TestYandexHook: @mock.patch("airflow.hooks.base.BaseHook.get_connection") @mock.patch("airflow.providers.yandex.utils.credentials.get_credentials") def test_provider_user_agent(self, mock_get_credentials, mock_get_connection): - mock_get_connection.return_value = mock.Mock(connection_id="yandexcloud_default", extra_dejson="{}") + mock_get_connection.return_value = mock.Mock(yandex_conn_id="yandexcloud_default", extra_dejson="{}") mock_get_credentials.return_value = {"token": 122323} sdk_prefix = "MyAirflow" @@ -65,7 +65,7 @@ class TestYandexHook: @mock.patch("airflow.hooks.base.BaseHook.get_connection") @mock.patch("airflow.providers.yandex.utils.credentials.get_credentials") def test_sdk_user_agent(self, mock_get_credentials, mock_get_connection): - mock_get_connection.return_value = mock.Mock(connection_id="yandexcloud_default", extra_dejson="{}") + mock_get_connection.return_value = mock.Mock(yandex_conn_id="yandexcloud_default", extra_dejson="{}") mock_get_credentials.return_value = {"token": 122323} sdk_prefix = "MyAirflow" @@ -97,7 +97,7 @@ class TestYandexHook: extra_dejson = {"endpoint": "my_endpoint", "something_else": "some_value"} mock_get_connection.return_value = mock.Mock( - connection_id="yandexcloud_default", extra_dejson=extra_dejson + yandex_conn_id="yandexcloud_default", extra_dejson=extra_dejson ) mock_get_credentials.return_value = {"token": 122323} @@ -117,7 +117,7 @@ class TestYandexHook: extra_dejson = {"something_else": "some_value"} mock_get_connection.return_value = mock.Mock( - connection_id="yandexcloud_default", extra_dejson=extra_dejson + yandex_conn_id="yandexcloud_default", extra_dejson=extra_dejson ) mock_get_credentials.return_value = {"token": 122323} @@ -140,7 +140,7 @@ class TestYandexHook: mock_get_connection["extra_dejson"] = "sds" mock_get_connection.extra_dejson = '{"extras": "extra"}' mock_get_connection.return_value = mock.Mock( - connection_id="yandexcloud_default", extra_dejson=extra_dejson + yandex_conn_id="yandexcloud_default", extra_dejson=extra_dejson ) hook = YandexCloudBaseHook( @@ -163,7 +163,7 @@ class TestYandexHook: get_connection_mock["extra_dejson"] = "sds" get_connection_mock.extra_dejson = '{"extras": "extra"}' get_connection_mock.return_value = mock.Mock( - connection_id="yandexcloud_default", extra_dejson=extra_dejson + yandex_conn_id="yandexcloud_default", extra_dejson=extra_dejson ) hook = YandexCloudBaseHook() diff --git a/tests/providers/yandex/hooks/test_yq.py b/tests/providers/yandex/hooks/test_yq.py index 2864642c0e..71723d97b8 100644 --- a/tests/providers/yandex/hooks/test_yq.py +++ b/tests/providers/yandex/hooks/test_yq.py @@ -31,20 +31,11 @@ IAM_TOKEN = "my_iam_token" SERVICE_ACCOUNT_AUTH_KEY_JSON = """{"id":"my_id", "service_account_id":"my_sa1", "private_key":"my_pk"}""" -class FakeTokenRequester: - def get_token(self) -> str: - return IAM_TOKEN - - def get_token_request(self) -> str: - return "my_dummy_request" - - class TestYandexCloudYqHook: def _init_hook(self): with mock.patch("airflow.hooks.base.BaseHook.get_connection") as mock_get_connection: mock_get_connection.return_value = self.connection - with mock.patch("airflow.providers.yandex.hooks.yq.yandexcloud.SDK.client"): - self.hook = YQHook(default_folder_id="my_folder_id") + self.hook = YQHook(default_folder_id="my_folder_id") def setup_method(self): self.connection = Connection(extra={"service_account_json": SERVICE_ACCOUNT_AUTH_KEY_JSON}) @@ -74,8 +65,8 @@ class TestYandexCloudYqHook: m.assert_called_once_with("query1") @responses.activate() - @mock.patch("yandexcloud._auth_fabric.get_auth_token_requester", return_value=FakeTokenRequester()) - def test_metadata_token_usage(self, mock_get_auth_token_requester): + @mock.patch("yandexcloud.auth.get_auth_token", return_value=IAM_TOKEN) + def test_metadata_token_usage(self, mock_get_auth_token): responses.post( "https://api.yandex-query.cloud.yandex.net/api/fq/v1/queries", match=[ @@ -94,8 +85,8 @@ class TestYandexCloudYqHook: assert query_id == "query1" @mock.patch("yandexcloud._auth_fabric.__validate_service_account_key") - @mock.patch("yandexcloud._auth_fabric.get_auth_token_requester", return_value=FakeTokenRequester()) - def test_select_results(self, mock_get_auth_token_requester, mock_validate): + @mock.patch("yandexcloud.auth.get_auth_token", return_value=IAM_TOKEN) + def test_select_results(self, mock_get_auth_token, mock_validate): with mock.patch.multiple( "yandex_query_client.YQHttpClient", create_query=mock.DEFAULT, @@ -107,7 +98,7 @@ class TestYandexCloudYqHook: ) as mocks: self._init_hook() mock_validate.assert_called() - mock_get_auth_token_requester.assert_called_once_with( + mock_get_auth_token.assert_called_once_with( service_account_key=json.loads(SERVICE_ACCOUNT_AUTH_KEY_JSON) ) diff --git a/tests/providers/yandex/operators/test_yandexcloud_dataproc.py b/tests/providers/yandex/operators/test_dataproc.py similarity index 98% rename from tests/providers/yandex/operators/test_yandexcloud_dataproc.py rename to tests/providers/yandex/operators/test_dataproc.py index 0a055dd377..9711ba55ee 100644 --- a/tests/providers/yandex/operators/test_yandexcloud_dataproc.py +++ b/tests/providers/yandex/operators/test_dataproc.py @@ -20,7 +20,7 @@ import datetime from unittest.mock import MagicMock, call, patch from airflow.models.dag import DAG -from airflow.providers.yandex.operators.yandexcloud_dataproc import ( +from airflow.providers.yandex.operators.dataproc import ( DataprocCreateClusterOperator, DataprocCreateHiveJobOperator, DataprocCreateMapReduceJobOperator, @@ -161,6 +161,7 @@ class TestDataprocClusterCreateOperator: operator = DataprocCreateHiveJobOperator( task_id="create_hive_job", query="SELECT 1;", + connection_id=CONNECTION_ID, ) context = {"task_instance": MagicMock()} context["task_instance"].xcom_pull.return_value = "my_cluster_id" @@ -169,7 +170,6 @@ class TestDataprocClusterCreateOperator: context["task_instance"].xcom_pull.assert_has_calls( [ call(key="cluster_id"), - call(key="yandexcloud_connection_id"), ] ) @@ -189,6 +189,7 @@ class TestDataprocClusterCreateOperator: def test_create_mapreduce_job_operator(self, mock_create_mapreduce_job, *_): operator = DataprocCreateMapReduceJobOperator( task_id="run_mapreduce_job", + connection_id=CONNECTION_ID, main_class="org.apache.hadoop.streaming.HadoopStreaming", file_uris=[ "s3a://some-in-bucket/jobs/sources/mapreduce-001/mapper.py", @@ -219,7 +220,6 @@ class TestDataprocClusterCreateOperator: context["task_instance"].xcom_pull.assert_has_calls( [ call(key="cluster_id"), - call(key="yandexcloud_connection_id"), ] ) @@ -259,6 +259,7 @@ class TestDataprocClusterCreateOperator: def test_create_spark_job_operator(self, mock_create_spark_job, *_): operator = DataprocCreateSparkJobOperator( task_id="create_spark_job", + connection_id=CONNECTION_ID, main_jar_file_uri="s3a://data-proc-public/jobs/sources/java/dataproc-examples-1.0.jar", main_class="ru.yandex.cloud.dataproc.examples.PopulationSparkJob", file_uris=[ @@ -288,7 +289,6 @@ class TestDataprocClusterCreateOperator: context["task_instance"].xcom_pull.assert_has_calls( [ call(key="cluster_id"), - call(key="yandexcloud_connection_id"), ] ) @@ -321,6 +321,7 @@ class TestDataprocClusterCreateOperator: def test_create_pyspark_job_operator(self, mock_create_pyspark_job, *_): operator = DataprocCreatePysparkJobOperator( task_id="create_pyspark_job", + connection_id=CONNECTION_ID, main_python_file_uri="s3a://some-in-bucket/jobs/sources/pyspark-001/main.py", python_file_uris=[ "s3a://some-in-bucket/jobs/sources/pyspark-001/geonames.py", @@ -351,7 +352,6 @@ class TestDataprocClusterCreateOperator: context["task_instance"].xcom_pull.assert_has_calls( [ call(key="cluster_id"), - call(key="yandexcloud_connection_id"), ] ) diff --git a/tests/system/providers/yandex/example_yandexcloud_dataproc.py b/tests/system/providers/yandex/example_yandexcloud_dataproc.py index cfae4e94e0..7ff4aa541d 100644 --- a/tests/system/providers/yandex/example_yandexcloud_dataproc.py +++ b/tests/system/providers/yandex/example_yandexcloud_dataproc.py @@ -20,7 +20,7 @@ import uuid from datetime import datetime from airflow import DAG -from airflow.providers.yandex.operators.yandexcloud_dataproc import ( +from airflow.providers.yandex.operators.dataproc import ( DataprocCreateClusterOperator, DataprocCreateHiveJobOperator, DataprocCreateMapReduceJobOperator, diff --git a/tests/system/providers/yandex/example_yandexcloud_dataproc_lightweight.py b/tests/system/providers/yandex/example_yandexcloud_dataproc_lightweight.py index fa5a3c758b..475bc789ec 100644 --- a/tests/system/providers/yandex/example_yandexcloud_dataproc_lightweight.py +++ b/tests/system/providers/yandex/example_yandexcloud_dataproc_lightweight.py @@ -19,7 +19,7 @@ from __future__ import annotations from datetime import datetime from airflow import DAG -from airflow.providers.yandex.operators.yandexcloud_dataproc import ( +from airflow.providers.yandex.operators.dataproc import ( DataprocCreateClusterOperator, DataprocCreateSparkJobOperator, DataprocDeleteClusterOperator,