[GitHub] [airflow] mik-laj commented on a change in pull request #6123: [AIRFLOW-5502][depends on AIRFLOW-5498, AIRFLOW-5499] Move GCP base hook to core
mik-laj commented on a change in pull request #6123: [AIRFLOW-5502][depends on AIRFLOW-5498, AIRFLOW-5499] Move GCP base hook to core URL: https://github.com/apache/airflow/pull/6123#discussion_r325936782 ## File path: airflow/contrib/utils/gcp_field_sanitizer.py ## @@ -16,150 +16,16 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""Sanitizer for body fields sent via GCP API. - -The sanitizer removes fields specified from the body. - -Context -In some cases where GCP operation requires modification of existing resources (such -as instances or instance templates) we need to sanitize body of the resources returned -via GCP APIs. This is in the case when we retrieve information from GCP first, -modify the body and either update the existing resource or create a new one with the -modified body. Usually when you retrieve resource from GCP you get some extra fields which -are Output-only, and we need to delete those fields if we want to use -the body as input for subsequent create/insert type operation. - - -Field specification - -Specification of fields is an array of strings which denote names of fields to be removed. -The field can be either direct field name to remove from the body or the full -specification of the path you should delete - separated with '.' - - ->>> FIELDS_TO_SANITIZE = [ ->>>"kind", ->>>"properties.disks.kind", ->>>"properties.metadata.kind", ->>>] ->>> body = { ->>> "kind": "compute#instanceTemplate", ->>> "name": "instance", ->>> "properties": { ->>> "disks": [ ->>> { ->>> "name": "a", ->>> "kind": "compute#attachedDisk", ->>> "type": "PERSISTENT", ->>> "mode": "READ_WRITE", ->>> }, ->>> { ->>> "name": "b", ->>> "kind": "compute#attachedDisk", ->>> "type": "PERSISTENT", ->>> "mode": "READ_WRITE", ->>> } ->>> ], ->>> "metadata": { ->>> "kind": "compute#metadata", ->>> "fingerprint": "GDPUYxlwHe4=" ->>> }, ->>> } ->>> } ->>> sanitizer=GcpBodyFieldSanitizer(FIELDS_TO_SANITIZE) ->>> SANITIZED_BODY = sanitizer.sanitize(body) ->>> json.dumps(SANITIZED_BODY, indent=2) -{ -"name": "instance", -"properties": { -"disks": [ -{ -"name": "a", -"type": "PERSISTENT", -"mode": "READ_WRITE", -}, -{ -"name": "b", -"type": "PERSISTENT", -"mode": "READ_WRITE", -} -], -"metadata": { -"fingerprint": "GDPUYxlwHe4=" -}, -} -} - -Note that the components of the path can be either dictionaries or arrays of dictionaries. -In case they are dictionaries, subsequent component names key of the field, in case of -arrays - the sanitizer iterates through all dictionaries in the array and searches -components in all elements of the array. +""" +This module is deprecated. Please use `airflow.gcp.utils.field_sanitizer`. """ -from typing import List - -from airflow import LoggingMixin, AirflowException - - -class GcpFieldSanitizerException(AirflowException): -"""Thrown when sanitizer finds unexpected field type in the path -(other than dict or array). -""" - - -class GcpBodyFieldSanitizer(LoggingMixin): -"""Sanitizes the body according to specification. - -:param sanitize_specs: array of strings that specifies which fields to remove -:type sanitize_specs: list[str] - -""" -def __init__(self, sanitize_specs: List[str]) -> None: -super().__init__() -self._sanitize_specs = sanitize_specs +import warnings -def _sanitize(self, dictionary, remaining_field_spec, current_path): -field_split = remaining_field_spec.split(".", 1) -if len(field_split) == 1: # pylint: disable=too-many-nested-blocks -field_name = field_split[0] -if field_name in dictionary: -self.log.info("Deleted %s [%s]", field_name, current_path) -del dictionary[field_name] -else: -self.log.debug( -"The field %s is missing in %s at the path %s.", field_name, dictionary, current_path -) -else: -field_name = field_split[0] -remaining_path = field_split[1] -child = dictionary.get(field_name) -if child is None: -self.log.debug( -"The field %s is missing in %s at the path %s. ", field_name, dictionary, current_path -) -elif isinstance(child, dict): -self._sanitize(child, remaining_path, "{}.{}".format( -
[GitHub] [airflow] mik-laj commented on a change in pull request #6123: [AIRFLOW-5502][depends on AIRFLOW-5498, AIRFLOW-5499] Move GCP base hook to core
mik-laj commented on a change in pull request #6123: [AIRFLOW-5502][depends on AIRFLOW-5498, AIRFLOW-5499] Move GCP base hook to core URL: https://github.com/apache/airflow/pull/6123#discussion_r325936631 ## File path: airflow/contrib/hooks/gcp_api_base_hook.py ## @@ -16,299 +16,14 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -# -""" -This module contains a Google Cloud API base hook. -""" - -import json -import functools -import os -import tempfile -from typing import Any, Optional, Dict, Callable, TypeVar, Sequence - -import httplib2 - -import google.auth -import google.oauth2.service_account -from google.api_core.gapic_v1.client_info import ClientInfo -from google.api_core.exceptions import GoogleAPICallError, AlreadyExists, RetryError -from google.auth.environment_vars import CREDENTIALS - -import google_auth_httplib2 -from googleapiclient.errors import HttpError - -from airflow import version -from airflow.exceptions import AirflowException -from airflow.hooks.base_hook import BaseHook - - -_DEFAULT_SCOPES = ('https://www.googleapis.com/auth/cloud-platform',) # type: Sequence[str] - - -RT = TypeVar('RT') # pylint: disable=invalid-name - - -class GoogleCloudBaseHook(BaseHook): -""" -A base hook for Google cloud-related hooks. Google cloud has a shared REST -API client that is built in the same way no matter which service you use. -This class helps construct and authorize the credentials needed to then -call googleapiclient.discovery.build() to actually discover and build a client -for a Google cloud service. - -The class also contains some miscellaneous helper functions. - -All hook derived from this base hook use the 'Google Cloud Platform' connection -type. Three ways of authentication are supported: - -Default credentials: Only the 'Project Id' is required. You'll need to -have set up default credentials, such as by the -``GOOGLE_APPLICATION_DEFAULT`` environment variable or from the metadata -server on Google Compute Engine. - -JSON key file: Specify 'Project Id', 'Keyfile Path' and 'Scope'. - -Legacy P12 key files are not supported. - -JSON data provided in the UI: Specify 'Keyfile JSON'. - -:param gcp_conn_id: The connection ID to use when fetching connection info. -:type gcp_conn_id: str -:param delegate_to: The account to impersonate, if any. -For this to work, the service account making the request must have -domain-wide delegation enabled. -:type delegate_to: str -""" - -def __init__(self, gcp_conn_id: str = 'google_cloud_default', delegate_to: str = None) -> None: -self.gcp_conn_id = gcp_conn_id -self.delegate_to = delegate_to -self.extras = self.get_connection(self.gcp_conn_id).extra_dejson # type: Dict - -def _get_credentials_and_project_id(self) -> google.auth.credentials.Credentials: -""" -Returns the Credentials object for Google API and the associated project_id -""" -key_path = self._get_field('key_path', None) # type: Optional[str] -keyfile_dict = self._get_field('keyfile_dict', None) # type: Optional[str] -if not key_path and not keyfile_dict: -self.log.info('Getting connection using `google.auth.default()` ' - 'since no key file is defined for hook.') -credentials, project_id = google.auth.default(scopes=self.scopes) -elif key_path: -# Get credentials from a JSON file. -if key_path.endswith('.json'): -self.log.debug('Getting connection using JSON key file %s' % key_path) -credentials = ( - google.oauth2.service_account.Credentials.from_service_account_file( -key_path, scopes=self.scopes) -) -project_id = credentials.project_id -elif key_path.endswith('.p12'): -raise AirflowException('Legacy P12 key file are not supported, ' - 'use a JSON key file.') -else: -raise AirflowException('Unrecognised extension for key file.') -else: -# Get credentials from JSON data provided in the UI. -try: -assert keyfile_dict is not None -keyfile_dict_json = json.loads(keyfile_dict) # type: Dict[str, str] - -# Depending on how the JSON was formatted, it may contain -# escaped newlines. Convert those to actual newlines. -keyfile_dict_json['private_key'] = keyfile_dict_json['private_key'].replace( -'\\n', '\n') - -credentials = ( - google.oauth2.service_account.Credentials.from_service_account_info( -keyfile_dict_json,
[GitHub] [airflow] mik-laj commented on a change in pull request #6123: [AIRFLOW-5502][depends on AIRFLOW-5498, AIRFLOW-5499] Move GCP base hook to core
mik-laj commented on a change in pull request #6123: [AIRFLOW-5502][depends on AIRFLOW-5498, AIRFLOW-5499] Move GCP base hook to core URL: https://github.com/apache/airflow/pull/6123#discussion_r325936746 ## File path: airflow/contrib/operators/google_api_to_s3_transfer.py ## @@ -16,162 +16,14 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -# -""" -This module allows you to transfer data from any Google API endpoint into a S3 Bucket. -""" -import json -import sys - -from airflow.models import BaseOperator -from airflow.models.xcom import MAX_XCOM_SIZE -from airflow.utils.decorators import apply_defaults - -from airflow.contrib.hooks.google_discovery_api_hook import GoogleDiscoveryApiHook -from airflow.hooks.S3_hook import S3Hook - - -class GoogleApiToS3Transfer(BaseOperator): -""" -Basic class for transferring data from a Google API endpoint into a S3 Bucket. - -:param google_api_service_name: The specific API service that is being requested. -:type google_api_service_name: str -:param google_api_service_version: The version of the API that is being requested. -:type google_api_service_version: str -:param google_api_endpoint_path: The client libraries path to the api call's executing method. -For example: 'analyticsreporting.reports.batchGet' - -.. note:: See https://developers.google.com/apis-explorer -for more information on which methods are available. - -:type google_api_endpoint_path: str -:param google_api_endpoint_params: The params to control the corresponding endpoint result. -:type google_api_endpoint_params: dict -:param s3_destination_key: The url where to put the data retrieved from the endpoint in S3. -:type s3_destination_key: str -:param google_api_response_via_xcom: Can be set to expose the google api response to xcom. -:type google_api_response_via_xcom: str -:param google_api_endpoint_params_via_xcom: If set to a value this value will be used as a key -for pulling from xcom and updating the google api endpoint params. -:type google_api_endpoint_params_via_xcom: str -:param google_api_endpoint_params_via_xcom_task_ids: Task ids to filter xcom by. -:type google_api_endpoint_params_via_xcom_task_ids: str or list of str -:param google_api_pagination: If set to True Pagination will be enabled for this request -to retrieve all data. - -.. note:: This means the response will be a list of responses. - -:type google_api_pagination: bool -:param google_api_num_retries: Define the number of retries for the google api requests being made -if it fails. -:type google_api_num_retries: int -:param s3_overwrite: Specifies whether the s3 file will be overwritten if exists. -:type s3_overwrite: bool -:param gcp_conn_id: The connection ID to use when fetching connection info. -:type gcp_conn_id: string -:param delegate_to: The account to impersonate, if any. -For this to work, the service account making the request must have -domain-wide delegation enabled. -:type delegate_to: string -:param aws_conn_id: The connection id specifying the authentication information for the S3 Bucket. -:type aws_conn_id: str -""" - -template_fields = ( -'google_api_endpoint_params', -'s3_destination_key', -) -template_ext = () -ui_color = '#cc181e' - -@apply_defaults -def __init__( -self, -google_api_service_name, -google_api_service_version, -google_api_endpoint_path, -google_api_endpoint_params, -s3_destination_key, -*args, -google_api_response_via_xcom=None, -google_api_endpoint_params_via_xcom=None, -google_api_endpoint_params_via_xcom_task_ids=None, -google_api_pagination=False, -google_api_num_retries=0, -s3_overwrite=False, -gcp_conn_id='google_cloud_default', -delegate_to=None, -aws_conn_id='aws_default', -**kwargs -): -super(GoogleApiToS3Transfer, self).__init__(*args, **kwargs) -self.google_api_service_name = google_api_service_name -self.google_api_service_version = google_api_service_version -self.google_api_endpoint_path = google_api_endpoint_path -self.google_api_endpoint_params = google_api_endpoint_params -self.s3_destination_key = s3_destination_key -self.google_api_response_via_xcom = google_api_response_via_xcom -self.google_api_endpoint_params_via_xcom = google_api_endpoint_params_via_xcom -self.google_api_endpoint_params_via_xcom_task_ids = google_api_endpoint_params_via_xcom_task_ids -self.google_api_pagination = google_api_pagination -self.google_api_num_retries = google_api_num_retries -
[GitHub] [airflow] mik-laj commented on a change in pull request #6123: [AIRFLOW-5502][depends on AIRFLOW-5498, AIRFLOW-5499] Move GCP base hook to core
mik-laj commented on a change in pull request #6123: [AIRFLOW-5502][depends on AIRFLOW-5498, AIRFLOW-5499] Move GCP base hook to core URL: https://github.com/apache/airflow/pull/6123#discussion_r325936701 ## File path: airflow/contrib/hooks/google_discovery_api_hook.py ## @@ -16,132 +16,14 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -# -""" -This module allows you to connect to the Google Discovery API Service and query it. -""" -from googleapiclient.discovery import build - -from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook - - -class GoogleDiscoveryApiHook(GoogleCloudBaseHook): -""" -A hook to use the Google API Discovery Service. - -:param api_service_name: The name of the api service that is needed to get the data -for example 'youtube'. -:type api_service_name: str -:param api_version: The version of the api that will be requested for example 'v3'. -:type api_version: str -:param gcp_conn_id: The connection ID to use when fetching connection info. -:type gcp_conn_id: str -:param delegate_to: The account to impersonate, if any. -For this to work, the service account making the request must have -domain-wide delegation enabled. -:type delegate_to: str -""" -_conn = None - -def __init__(self, api_service_name, api_version, gcp_conn_id='google_cloud_default', delegate_to=None): -super(GoogleDiscoveryApiHook, self).__init__(gcp_conn_id=gcp_conn_id, delegate_to=delegate_to) -self.api_service_name = api_service_name -self.api_version = api_version - -def get_conn(self): -""" -Creates an authenticated api client for the given api service name and credentials. - -:return: the authenticated api service. -:rtype: Resource -""" -self.log.info("Authenticating Google API Client") - -if not self._conn: -http_authorized = self._authorize() -self._conn = build( -serviceName=self.api_service_name, -version=self.api_version, -http=http_authorized, -cache_discovery=False -) -return self._conn - -def query(self, endpoint, data, paginate=False, num_retries=0): -""" -Creates a dynamic API call to any Google API registered in Google's API Client Library -and queries it. - -:param endpoint: The client libraries path to the api call's executing method. -For example: 'analyticsreporting.reports.batchGet' - -.. seealso:: https://developers.google.com/apis-explorer -for more information on what methods are available. -:type endpoint: str -:param data: The data (endpoint params) needed for the specific request to given endpoint. -:type data: dict -:param paginate: If set to True, it will collect all pages of data. -:type paginate: bool -:param num_retries: Define the number of retries for the requests being made if it fails. -:type num_retries: int -:return: the API response from the passed endpoint. -:rtype: dict -""" -google_api_conn_client = self.get_conn() - -api_response = self._call_api_request(google_api_conn_client, endpoint, data, paginate, num_retries) -return api_response - -def _call_api_request(self, google_api_conn_client, endpoint, data, paginate, num_retries): -api_endpoint_parts = endpoint.split('.') - -google_api_endpoint_instance = self._build_api_request( -google_api_conn_client, -api_sub_functions=api_endpoint_parts[1:], -api_endpoint_params=data -) - -if paginate: -return self._paginate_api( -google_api_endpoint_instance, google_api_conn_client, api_endpoint_parts, num_retries -) - -return google_api_endpoint_instance.execute(num_retries=num_retries) - -def _build_api_request(self, google_api_conn_client, api_sub_functions, api_endpoint_params): -for sub_function in api_sub_functions: -google_api_conn_client = getattr(google_api_conn_client, sub_function) -if sub_function != api_sub_functions[-1]: -google_api_conn_client = google_api_conn_client() -else: -google_api_conn_client = google_api_conn_client(**api_endpoint_params) - -return google_api_conn_client - -def _paginate_api( -self, google_api_endpoint_instance, google_api_conn_client, api_endpoint_parts, num_retries -): -api_responses = [] - -while google_api_endpoint_instance: -api_response = google_api_endpoint_instance.execute(num_retries=num_retries) -api_responses.append(api_response) -
[GitHub] [airflow] mik-laj commented on a change in pull request #6123: [AIRFLOW-5502][depends on AIRFLOW-5498, AIRFLOW-5499] Move GCP base hook to core
mik-laj commented on a change in pull request #6123: [AIRFLOW-5502][depends on AIRFLOW-5498, AIRFLOW-5499] Move GCP base hook to core URL: https://github.com/apache/airflow/pull/6123#discussion_r325936838 ## File path: airflow/contrib/utils/gcp_field_validator.py ## @@ -16,417 +16,20 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""Validator for body fields sent via GCP API. - -The validator performs validation of the body (being dictionary of fields) that -is sent in the API request to Google Cloud (via googleclient API usually). - -Context -The specification mostly focuses on helping Airflow DAG developers in the development -phase. You can build your own GCP operator (such as GcfDeployOperator for example) which -can have built-in validation specification for the particular API. It's super helpful -when developer plays with different fields and their values at the initial phase of -DAG development. Most of the Google Cloud APIs perform their own validation on the -server side, but most of the requests are asynchronous and you need to wait for result -of the operation. This takes precious times and slows -down iteration over the API. BodyFieldValidator is meant to be used on the client side -and it should therefore provide an instant feedback to the developer on misspelled or -wrong type of parameters. - -The validation should be performed in "execute()" method call in order to allow -template parameters to be expanded before validation is performed. - -Types of fields - -Specification is an array of dictionaries - each dictionary describes field, its type, -validation, optionality, api_version supported and nested fields (for unions and dicts). - -Typically (for clarity and in order to aid syntax highlighting) the array of -dicts should be defined as series of dict() executions. Fragment of example -specification might look as follows:: - -SPECIFICATION =[ - dict(name="an_union", type="union", optional=True, fields=[ - dict(name="variant_1", type="dict"), - dict(name="variant_2", regexp=r'^.+$', api_version='v1beta2'), - ), - dict(name="an_union", type="dict", fields=[ - dict(name="field_1", type="dict"), - dict(name="field_2", regexp=r'^.+$'), - ), - ... -] - - -Each field should have key = "name" indicating field name. The field can be of one of the -following types: - -* Dict fields: (key = "type", value="dict"): - Field of this type should contain nested fields in form of an array of dicts. - Each of the fields in the array is then expected (unless marked as optional) - and validated recursively. If an extra field is present in the dictionary, warning is - printed in log file (but the validation succeeds - see the Forward-compatibility notes) -* List fields: (key = "type", value="list"): - Field of this type should be a list. Only the type correctness is validated. - The contents of a list are not subject to validation. -* Union fields (key = "type", value="union"): field of this type should contain nested - fields in form of an array of dicts. One of the fields (and only one) should be - present (unless the union is marked as optional). If more than one union field is - present, FieldValidationException is raised. If none of the union fields is - present - warning is printed in the log (see below Forward-compatibility notes). -* Fields validated for non-emptiness: (key = "allow_empty") - this applies only to - fields the value of which is a string, and it allows to check for non-emptiness of - the field (allow_empty=False). -* Regexp-validated fields: (key = "regexp") - fields of this type are assumed to be - strings and they are validated with the regexp specified. Remember that the regexps - should ideally contain ^ at the beginning and $ at the end to make sure that - the whole field content is validated. Typically such regexp - validations should be used carefully and sparingly (see Forward-compatibility - notes below). -* Custom-validated fields: (key = "custom_validation") - fields of this type are validated - using method specified via custom_validation field. Any exception thrown in the custom - validation will be turned into FieldValidationException and will cause validation to - fail. Such custom validations might be used to check numeric fields (including - ranges of values), booleans or any other types of fields. -* API version: (key="api_version") if API version is specified, then the field will only - be validated when api_version used at field validator initialization matches exactly the - the version specified. If you want to declare fields that are available in several - versions of the APIs, you should specify the field as many times as many API versions - should be supported (each time with different