[GitHub] [airflow] mik-laj commented on a change in pull request #6123: [AIRFLOW-5502][depends on AIRFLOW-5498, AIRFLOW-5499] Move GCP base hook to core

2019-09-18 Thread GitBox
mik-laj commented on a change in pull request #6123: [AIRFLOW-5502][depends on 
AIRFLOW-5498, AIRFLOW-5499] Move GCP base hook to core
URL: https://github.com/apache/airflow/pull/6123#discussion_r325936782
 
 

 ##
 File path: airflow/contrib/utils/gcp_field_sanitizer.py
 ##
 @@ -16,150 +16,16 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""Sanitizer for body fields sent via GCP API.
-
-The sanitizer removes fields specified from the body.
-
-Context

-In some cases where GCP operation requires modification of existing resources 
(such
-as instances or instance templates) we need to sanitize body of the resources 
returned
-via GCP APIs. This is in the case when we retrieve information from GCP first,
-modify the body and either update the existing resource or create a new one 
with the
-modified body. Usually when you retrieve resource from GCP you get some extra 
fields which
-are Output-only, and we need to delete those fields if we want to use
-the body as input for subsequent create/insert type operation.
-
-
-Field specification

-
-Specification of fields is an array of strings which denote names of fields to 
be removed.
-The field can be either direct field name to remove from the body or the full
-specification of the path you should delete - separated with '.'
-
-
->>> FIELDS_TO_SANITIZE = [
->>>"kind",
->>>"properties.disks.kind",
->>>"properties.metadata.kind",
->>>]
->>> body = {
->>> "kind": "compute#instanceTemplate",
->>> "name": "instance",
->>> "properties": {
->>> "disks": [
->>> {
->>> "name": "a",
->>> "kind": "compute#attachedDisk",
->>> "type": "PERSISTENT",
->>> "mode": "READ_WRITE",
->>> },
->>> {
->>> "name": "b",
->>> "kind": "compute#attachedDisk",
->>> "type": "PERSISTENT",
->>> "mode": "READ_WRITE",
->>> }
->>> ],
->>> "metadata": {
->>> "kind": "compute#metadata",
->>> "fingerprint": "GDPUYxlwHe4="
->>> },
->>> }
->>> }
->>> sanitizer=GcpBodyFieldSanitizer(FIELDS_TO_SANITIZE)
->>> SANITIZED_BODY = sanitizer.sanitize(body)
->>> json.dumps(SANITIZED_BODY, indent=2)
-{
-"name":  "instance",
-"properties": {
-"disks": [
-{
-"name": "a",
-"type": "PERSISTENT",
-"mode": "READ_WRITE",
-},
-{
-"name": "b",
-"type": "PERSISTENT",
-"mode": "READ_WRITE",
-}
-],
-"metadata": {
-"fingerprint": "GDPUYxlwHe4="
-},
-}
-}
-
-Note that the components of the path can be either dictionaries or arrays of 
dictionaries.
-In case  they are dictionaries, subsequent component names key of the field, 
in case of
-arrays - the sanitizer iterates through all dictionaries in the array and 
searches
-components in all elements of the array.
+"""
+This module is deprecated. Please use `airflow.gcp.utils.field_sanitizer`.
 """
 
-from typing import List
-
-from airflow import LoggingMixin, AirflowException
-
-
-class GcpFieldSanitizerException(AirflowException):
-"""Thrown when sanitizer finds unexpected field type in the path
-(other than dict or array).
-"""
-
-
-class GcpBodyFieldSanitizer(LoggingMixin):
-"""Sanitizes the body according to specification.
-
-:param sanitize_specs: array of strings that specifies which fields to 
remove
-:type sanitize_specs: list[str]
-
-"""
-def __init__(self, sanitize_specs: List[str]) -> None:
-super().__init__()
-self._sanitize_specs = sanitize_specs
+import warnings
 
-def _sanitize(self, dictionary, remaining_field_spec, current_path):
-field_split = remaining_field_spec.split(".", 1)
-if len(field_split) == 1:  # pylint: disable=too-many-nested-blocks
-field_name = field_split[0]
-if field_name in dictionary:
-self.log.info("Deleted %s [%s]", field_name, current_path)
-del dictionary[field_name]
-else:
-self.log.debug(
-"The field %s is missing in %s at the path %s.", 
field_name, dictionary, current_path
-)
-else:
-field_name = field_split[0]
-remaining_path = field_split[1]
-child = dictionary.get(field_name)
-if child is None:
-self.log.debug(
-"The field %s is missing in %s at the path %s. ", 
field_name, dictionary, current_path
-)
-elif isinstance(child, dict):
-self._sanitize(child, remaining_path, "{}.{}".format(
-

[GitHub] [airflow] mik-laj commented on a change in pull request #6123: [AIRFLOW-5502][depends on AIRFLOW-5498, AIRFLOW-5499] Move GCP base hook to core

2019-09-18 Thread GitBox
mik-laj commented on a change in pull request #6123: [AIRFLOW-5502][depends on 
AIRFLOW-5498, AIRFLOW-5499] Move GCP base hook to core
URL: https://github.com/apache/airflow/pull/6123#discussion_r325936631
 
 

 ##
 File path: airflow/contrib/hooks/gcp_api_base_hook.py
 ##
 @@ -16,299 +16,14 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-#
-"""
-This module contains a Google Cloud API base hook.
-"""
-
-import json
-import functools
-import os
-import tempfile
-from typing import Any, Optional, Dict, Callable, TypeVar, Sequence
-
-import httplib2
-
-import google.auth
-import google.oauth2.service_account
-from google.api_core.gapic_v1.client_info import ClientInfo
-from google.api_core.exceptions import GoogleAPICallError, AlreadyExists, 
RetryError
-from google.auth.environment_vars import CREDENTIALS
-
-import google_auth_httplib2
-from googleapiclient.errors import HttpError
-
-from airflow import version
-from airflow.exceptions import AirflowException
-from airflow.hooks.base_hook import BaseHook
-
-
-_DEFAULT_SCOPES = ('https://www.googleapis.com/auth/cloud-platform',)  # type: 
Sequence[str]
-
-
-RT = TypeVar('RT')  # pylint: disable=invalid-name
-
-
-class GoogleCloudBaseHook(BaseHook):
-"""
-A base hook for Google cloud-related hooks. Google cloud has a shared REST
-API client that is built in the same way no matter which service you use.
-This class helps construct and authorize the credentials needed to then
-call googleapiclient.discovery.build() to actually discover and build a 
client
-for a Google cloud service.
-
-The class also contains some miscellaneous helper functions.
-
-All hook derived from this base hook use the 'Google Cloud Platform' 
connection
-type. Three ways of authentication are supported:
-
-Default credentials: Only the 'Project Id' is required. You'll need to
-have set up default credentials, such as by the
-``GOOGLE_APPLICATION_DEFAULT`` environment variable or from the metadata
-server on Google Compute Engine.
-
-JSON key file: Specify 'Project Id', 'Keyfile Path' and 'Scope'.
-
-Legacy P12 key files are not supported.
-
-JSON data provided in the UI: Specify 'Keyfile JSON'.
-
-:param gcp_conn_id: The connection ID to use when fetching connection info.
-:type gcp_conn_id: str
-:param delegate_to: The account to impersonate, if any.
-For this to work, the service account making the request must have
-domain-wide delegation enabled.
-:type delegate_to: str
-"""
-
-def __init__(self, gcp_conn_id: str = 'google_cloud_default', delegate_to: 
str = None) -> None:
-self.gcp_conn_id = gcp_conn_id
-self.delegate_to = delegate_to
-self.extras = self.get_connection(self.gcp_conn_id).extra_dejson  # 
type: Dict
-
-def _get_credentials_and_project_id(self) -> 
google.auth.credentials.Credentials:
-"""
-Returns the Credentials object for Google API and the associated 
project_id
-"""
-key_path = self._get_field('key_path', None)  # type: Optional[str]
-keyfile_dict = self._get_field('keyfile_dict', None)  # type: 
Optional[str]
-if not key_path and not keyfile_dict:
-self.log.info('Getting connection using `google.auth.default()` '
-  'since no key file is defined for hook.')
-credentials, project_id = google.auth.default(scopes=self.scopes)
-elif key_path:
-# Get credentials from a JSON file.
-if key_path.endswith('.json'):
-self.log.debug('Getting connection using JSON key file %s' % 
key_path)
-credentials = (
-
google.oauth2.service_account.Credentials.from_service_account_file(
-key_path, scopes=self.scopes)
-)
-project_id = credentials.project_id
-elif key_path.endswith('.p12'):
-raise AirflowException('Legacy P12 key file are not supported, 
'
-   'use a JSON key file.')
-else:
-raise AirflowException('Unrecognised extension for key file.')
-else:
-# Get credentials from JSON data provided in the UI.
-try:
-assert keyfile_dict is not None
-keyfile_dict_json = json.loads(keyfile_dict)  # type: 
Dict[str, str]
-
-# Depending on how the JSON was formatted, it may contain
-# escaped newlines. Convert those to actual newlines.
-keyfile_dict_json['private_key'] = 
keyfile_dict_json['private_key'].replace(
-'\\n', '\n')
-
-credentials = (
-
google.oauth2.service_account.Credentials.from_service_account_info(
-keyfile_dict_json, 

[GitHub] [airflow] mik-laj commented on a change in pull request #6123: [AIRFLOW-5502][depends on AIRFLOW-5498, AIRFLOW-5499] Move GCP base hook to core

2019-09-18 Thread GitBox
mik-laj commented on a change in pull request #6123: [AIRFLOW-5502][depends on 
AIRFLOW-5498, AIRFLOW-5499] Move GCP base hook to core
URL: https://github.com/apache/airflow/pull/6123#discussion_r325936746
 
 

 ##
 File path: airflow/contrib/operators/google_api_to_s3_transfer.py
 ##
 @@ -16,162 +16,14 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-#
-"""
-This module allows you to transfer data from any Google API endpoint into a S3 
Bucket.
-"""
-import json
-import sys
-
-from airflow.models import BaseOperator
-from airflow.models.xcom import MAX_XCOM_SIZE
-from airflow.utils.decorators import apply_defaults
-
-from airflow.contrib.hooks.google_discovery_api_hook import 
GoogleDiscoveryApiHook
-from airflow.hooks.S3_hook import S3Hook
-
-
-class GoogleApiToS3Transfer(BaseOperator):
-"""
-Basic class for transferring data from a Google API endpoint into a S3 
Bucket.
-
-:param google_api_service_name: The specific API service that is being 
requested.
-:type google_api_service_name: str
-:param google_api_service_version: The version of the API that is being 
requested.
-:type google_api_service_version: str
-:param google_api_endpoint_path: The client libraries path to the api 
call's executing method.
-For example: 'analyticsreporting.reports.batchGet'
-
-.. note:: See https://developers.google.com/apis-explorer
-for more information on which methods are available.
-
-:type google_api_endpoint_path: str
-:param google_api_endpoint_params: The params to control the corresponding 
endpoint result.
-:type google_api_endpoint_params: dict
-:param s3_destination_key: The url where to put the data retrieved from 
the endpoint in S3.
-:type s3_destination_key: str
-:param google_api_response_via_xcom: Can be set to expose the google api 
response to xcom.
-:type google_api_response_via_xcom: str
-:param google_api_endpoint_params_via_xcom: If set to a value this value 
will be used as a key
-for pulling from xcom and updating the google api endpoint params.
-:type google_api_endpoint_params_via_xcom: str
-:param google_api_endpoint_params_via_xcom_task_ids: Task ids to filter 
xcom by.
-:type google_api_endpoint_params_via_xcom_task_ids: str or list of str
-:param google_api_pagination: If set to True Pagination will be enabled 
for this request
-to retrieve all data.
-
-.. note:: This means the response will be a list of responses.
-
-:type google_api_pagination: bool
-:param google_api_num_retries: Define the number of retries for the google 
api requests being made
-if it fails.
-:type google_api_num_retries: int
-:param s3_overwrite: Specifies whether the s3 file will be overwritten if 
exists.
-:type s3_overwrite: bool
-:param gcp_conn_id: The connection ID to use when fetching connection info.
-:type gcp_conn_id: string
-:param delegate_to: The account to impersonate, if any.
-For this to work, the service account making the request must have
-domain-wide delegation enabled.
-:type delegate_to: string
-:param aws_conn_id: The connection id specifying the authentication 
information for the S3 Bucket.
-:type aws_conn_id: str
-"""
-
-template_fields = (
-'google_api_endpoint_params',
-'s3_destination_key',
-)
-template_ext = ()
-ui_color = '#cc181e'
-
-@apply_defaults
-def __init__(
-self,
-google_api_service_name,
-google_api_service_version,
-google_api_endpoint_path,
-google_api_endpoint_params,
-s3_destination_key,
-*args,
-google_api_response_via_xcom=None,
-google_api_endpoint_params_via_xcom=None,
-google_api_endpoint_params_via_xcom_task_ids=None,
-google_api_pagination=False,
-google_api_num_retries=0,
-s3_overwrite=False,
-gcp_conn_id='google_cloud_default',
-delegate_to=None,
-aws_conn_id='aws_default',
-**kwargs
-):
-super(GoogleApiToS3Transfer, self).__init__(*args, **kwargs)
-self.google_api_service_name = google_api_service_name
-self.google_api_service_version = google_api_service_version
-self.google_api_endpoint_path = google_api_endpoint_path
-self.google_api_endpoint_params = google_api_endpoint_params
-self.s3_destination_key = s3_destination_key
-self.google_api_response_via_xcom = google_api_response_via_xcom
-self.google_api_endpoint_params_via_xcom = 
google_api_endpoint_params_via_xcom
-self.google_api_endpoint_params_via_xcom_task_ids = 
google_api_endpoint_params_via_xcom_task_ids
-self.google_api_pagination = google_api_pagination
-self.google_api_num_retries = google_api_num_retries
-

[GitHub] [airflow] mik-laj commented on a change in pull request #6123: [AIRFLOW-5502][depends on AIRFLOW-5498, AIRFLOW-5499] Move GCP base hook to core

2019-09-18 Thread GitBox
mik-laj commented on a change in pull request #6123: [AIRFLOW-5502][depends on 
AIRFLOW-5498, AIRFLOW-5499] Move GCP base hook to core
URL: https://github.com/apache/airflow/pull/6123#discussion_r325936701
 
 

 ##
 File path: airflow/contrib/hooks/google_discovery_api_hook.py
 ##
 @@ -16,132 +16,14 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-#
-"""
-This module allows you to connect to the Google Discovery API Service and 
query it.
-"""
-from googleapiclient.discovery import build
-
-from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
-
-
-class GoogleDiscoveryApiHook(GoogleCloudBaseHook):
-"""
-A hook to use the Google API Discovery Service.
-
-:param api_service_name: The name of the api service that is needed to get 
the data
-for example 'youtube'.
-:type api_service_name: str
-:param api_version: The version of the api that will be requested for 
example 'v3'.
-:type api_version: str
-:param gcp_conn_id: The connection ID to use when fetching connection info.
-:type gcp_conn_id: str
-:param delegate_to: The account to impersonate, if any.
-For this to work, the service account making the request must have
-domain-wide delegation enabled.
-:type delegate_to: str
-"""
-_conn = None
-
-def __init__(self, api_service_name, api_version, 
gcp_conn_id='google_cloud_default', delegate_to=None):
-super(GoogleDiscoveryApiHook, self).__init__(gcp_conn_id=gcp_conn_id, 
delegate_to=delegate_to)
-self.api_service_name = api_service_name
-self.api_version = api_version
-
-def get_conn(self):
-"""
-Creates an authenticated api client for the given api service name and 
credentials.
-
-:return: the authenticated api service.
-:rtype: Resource
-"""
-self.log.info("Authenticating Google API Client")
-
-if not self._conn:
-http_authorized = self._authorize()
-self._conn = build(
-serviceName=self.api_service_name,
-version=self.api_version,
-http=http_authorized,
-cache_discovery=False
-)
-return self._conn
-
-def query(self, endpoint, data, paginate=False, num_retries=0):
-"""
-Creates a dynamic API call to any Google API registered in Google's 
API Client Library
-and queries it.
-
-:param endpoint: The client libraries path to the api call's executing 
method.
-For example: 'analyticsreporting.reports.batchGet'
-
-.. seealso:: https://developers.google.com/apis-explorer
-for more information on what methods are available.
-:type endpoint: str
-:param data: The data (endpoint params) needed for the specific 
request to given endpoint.
-:type data: dict
-:param paginate: If set to True, it will collect all pages of data.
-:type paginate: bool
-:param num_retries: Define the number of retries for the requests 
being made if it fails.
-:type num_retries: int
-:return: the API response from the passed endpoint.
-:rtype: dict
-"""
-google_api_conn_client = self.get_conn()
-
-api_response = self._call_api_request(google_api_conn_client, 
endpoint, data, paginate, num_retries)
-return api_response
-
-def _call_api_request(self, google_api_conn_client, endpoint, data, 
paginate, num_retries):
-api_endpoint_parts = endpoint.split('.')
-
-google_api_endpoint_instance = self._build_api_request(
-google_api_conn_client,
-api_sub_functions=api_endpoint_parts[1:],
-api_endpoint_params=data
-)
-
-if paginate:
-return self._paginate_api(
-google_api_endpoint_instance, google_api_conn_client, 
api_endpoint_parts, num_retries
-)
-
-return google_api_endpoint_instance.execute(num_retries=num_retries)
-
-def _build_api_request(self, google_api_conn_client, api_sub_functions, 
api_endpoint_params):
-for sub_function in api_sub_functions:
-google_api_conn_client = getattr(google_api_conn_client, 
sub_function)
-if sub_function != api_sub_functions[-1]:
-google_api_conn_client = google_api_conn_client()
-else:
-google_api_conn_client = 
google_api_conn_client(**api_endpoint_params)
-
-return google_api_conn_client
-
-def _paginate_api(
-self, google_api_endpoint_instance, google_api_conn_client, 
api_endpoint_parts, num_retries
-):
-api_responses = []
-
-while google_api_endpoint_instance:
-api_response = 
google_api_endpoint_instance.execute(num_retries=num_retries)
-api_responses.append(api_response)
-

[GitHub] [airflow] mik-laj commented on a change in pull request #6123: [AIRFLOW-5502][depends on AIRFLOW-5498, AIRFLOW-5499] Move GCP base hook to core

2019-09-18 Thread GitBox
mik-laj commented on a change in pull request #6123: [AIRFLOW-5502][depends on 
AIRFLOW-5498, AIRFLOW-5499] Move GCP base hook to core
URL: https://github.com/apache/airflow/pull/6123#discussion_r325936838
 
 

 ##
 File path: airflow/contrib/utils/gcp_field_validator.py
 ##
 @@ -16,417 +16,20 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""Validator for body fields sent via GCP API.
-
-The validator performs validation of the body (being dictionary of fields) that
-is sent in the API request to Google Cloud (via googleclient API usually).
-
-Context

-The specification mostly focuses on helping Airflow DAG developers in the 
development
-phase. You can build your own GCP operator (such as GcfDeployOperator for 
example) which
-can have built-in validation specification for the particular API. It's super 
helpful
-when developer plays with different fields and their values at the initial 
phase of
-DAG development. Most of the Google Cloud APIs perform their own validation on 
the
-server side, but most of the requests are asynchronous and you need to wait 
for result
-of the operation. This takes precious times and slows
-down iteration over the API. BodyFieldValidator is meant to be used on the 
client side
-and it should therefore provide an instant feedback to the developer on 
misspelled or
-wrong type of parameters.
-
-The validation should be performed in "execute()" method call in order to allow
-template parameters to be expanded before validation is performed.
-
-Types of fields

-
-Specification is an array of dictionaries - each dictionary describes field, 
its type,
-validation, optionality, api_version supported and nested fields (for unions 
and dicts).
-
-Typically (for clarity and in order to aid syntax highlighting) the array of
-dicts should be defined as series of dict() executions. Fragment of example
-specification might look as follows::
-
-SPECIFICATION =[
-   dict(name="an_union", type="union", optional=True, fields=[
-   dict(name="variant_1", type="dict"),
-   dict(name="variant_2", regexp=r'^.+$', api_version='v1beta2'),
-   ),
-   dict(name="an_union", type="dict", fields=[
-   dict(name="field_1", type="dict"),
-   dict(name="field_2", regexp=r'^.+$'),
-   ),
-   ...
-]
-
-
-Each field should have key = "name" indicating field name. The field can be of 
one of the
-following types:
-
-* Dict fields: (key = "type", value="dict"):
-  Field of this type should contain nested fields in form of an array of dicts.
-  Each of the fields in the array is then expected (unless marked as optional)
-  and validated recursively. If an extra field is present in the dictionary, 
warning is
-  printed in log file (but the validation succeeds - see the 
Forward-compatibility notes)
-* List fields: (key = "type", value="list"):
-  Field of this type should be a list. Only the type correctness is validated.
-  The contents of a list are not subject to validation.
-* Union fields (key = "type", value="union"): field of this type should 
contain nested
-  fields in form of an array of dicts. One of the fields (and only one) should 
be
-  present (unless the union is marked as optional). If more than one union 
field is
-  present, FieldValidationException is raised. If none of the union fields is
-  present - warning is printed in the log (see below Forward-compatibility 
notes).
-* Fields validated for non-emptiness: (key = "allow_empty") - this applies 
only to
-  fields the value of which is a string, and it allows to check for 
non-emptiness of
-  the field (allow_empty=False).
-* Regexp-validated fields: (key = "regexp") - fields of this type are assumed 
to be
-  strings and they are validated with the regexp specified. Remember that the 
regexps
-  should ideally contain ^ at the beginning and $ at the end to make sure that
-  the whole field content is validated. Typically such regexp
-  validations should be used carefully and sparingly (see Forward-compatibility
-  notes below).
-* Custom-validated fields: (key = "custom_validation") - fields of this type 
are validated
-  using method specified via custom_validation field. Any exception thrown in 
the custom
-  validation will be turned into FieldValidationException and will cause 
validation to
-  fail. Such custom validations might be used to check numeric fields 
(including
-  ranges of values), booleans or any other types of fields.
-* API version: (key="api_version") if API version is specified, then the field 
will only
-  be validated when api_version used at field validator initialization matches 
exactly the
-  the version specified. If you want to declare fields that are available in 
several
-  versions of the APIs, you should specify the field as many times as many API 
versions
-  should be supported (each time with different