Repository: incubator-airflow Updated Branches: refs/heads/master 4751abf8a -> 2920d0475
[AIRFLOW-2066] Add operator to create empty BQ table - Add operator that creates a new, empty table in the specified BigQuery dataset, optionally with schema. Closes #3006 from kaxil/bq_empty_table_op Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2920d047 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2920d047 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2920d047 Branch: refs/heads/master Commit: 2920d047541c0c410e7db72c7ae81a6ee85bb08c Parents: 4751abf Author: Kaxil Naik <kaxiln...@gmail.com> Authored: Fri Feb 9 10:04:18 2018 +0100 Committer: Fokko Driesprong <fokkodriespr...@godatadriven.com> Committed: Fri Feb 9 10:04:18 2018 +0100 ---------------------------------------------------------------------- airflow/contrib/hooks/bigquery_hook.py | 65 +++++++++ airflow/contrib/hooks/gcs_hook.py | 22 +++ airflow/contrib/operators/bigquery_operator.py | 146 +++++++++++++++++++ docs/code.rst | 1 + docs/integration.rst | 8 + tests/contrib/hooks/test_gcs_hook.py | 46 ++++++ .../contrib/operators/test_bigquery_operator.py | 53 +++++++ 7 files changed, 341 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2920d047/airflow/contrib/hooks/bigquery_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py index e0dea46..653cb1b 100644 --- a/airflow/contrib/hooks/bigquery_hook.py +++ b/airflow/contrib/hooks/bigquery_hook.py @@ -207,6 +207,71 @@ class BigQueryBaseCursor(LoggingMixin): self.use_legacy_sql = use_legacy_sql self.running_job_id = None + def create_empty_table(self, + project_id, + dataset_id, + table_id, + schema_fields=None, + time_partitioning={} + ): + """ + Creates a new, empty table in the dataset. + + :param project_id: The project to create the table into. + :type project_id: str + :param dataset_id: The dataset to create the table into. + :type dataset_id: str + :param table_id: The Name of the table to be created. + :type table_id: str + :param schema_fields: If set, the schema field list as defined here: + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema + + **Example**: :: + + schema_fields=[{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"}, + {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}] + + :type schema_fields: list + :param time_partitioning: configure optional time partitioning fields i.e. + partition by field, type and expiration as per API specifications. + + .. seealso:: + https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#timePartitioning + :type time_partitioning: dict + + :return: + """ + project_id = project_id if project_id is not None else self.project_id + + table_resource = { + 'tableReference': { + 'tableId': table_id + } + } + + if schema_fields: + table_resource['schema'] = {'fields': schema_fields} + + if time_partitioning: + table_resource['timePartitioning'] = time_partitioning + + self.log.info('Creating Table %s:%s.%s', + project_id, dataset_id, table_id) + + try: + self.service.tables().insert( + projectId=project_id, + datasetId=dataset_id, + body=table_resource).execute() + + self.log.info('Table created successfully: %s:%s.%s', + project_id, dataset_id, table_id) + + except HttpError as err: + raise AirflowException( + 'BigQuery job failed. Error was: {}'.format(err.content) + ) + def create_external_table(self, external_project_dataset_table, schema_fields, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2920d047/airflow/contrib/hooks/gcs_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/gcs_hook.py b/airflow/contrib/hooks/gcs_hook.py index f959f95..5312daa 100644 --- a/airflow/contrib/hooks/gcs_hook.py +++ b/airflow/contrib/hooks/gcs_hook.py @@ -17,6 +17,7 @@ from apiclient.http import MediaFileUpload from googleapiclient import errors from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook +from airflow.exceptions import AirflowException class GoogleCloudStorageHook(GoogleCloudBaseHook): @@ -352,3 +353,24 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook): except errors.HttpError as ex: if ex.resp['status'] == '404': raise ValueError('Object Not Found') + + +def _parse_gcs_url(gsurl): + """ + Given a Google Cloud Storage URL (gs://<bucket>/<blob>), returns a + tuple containing the corresponding bucket and blob. + """ + # Python 3 + try: + from urllib.parse import urlparse + # Python 2 + except ImportError: + from urlparse import urlparse + + parsed_url = urlparse(gsurl) + if not parsed_url.netloc: + raise AirflowException('Please provide a bucket name') + else: + bucket = parsed_url.netloc + blob = parsed_url.path.strip('/') + return bucket, blob http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2920d047/airflow/contrib/operators/bigquery_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/bigquery_operator.py b/airflow/contrib/operators/bigquery_operator.py index e24315d..b1d64b8 100644 --- a/airflow/contrib/operators/bigquery_operator.py +++ b/airflow/contrib/operators/bigquery_operator.py @@ -12,7 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json + from airflow.contrib.hooks.bigquery_hook import BigQueryHook +from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook, _parse_gcs_url from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults @@ -121,3 +124,146 @@ class BigQueryOperator(BaseOperator): if self.bq_cursor is not None: self.log.info('Canceling running query due to execution timeout') self.bq_cursor.cancel_query() + + +class BigQueryCreateEmptyTableOperator(BaseOperator): + """ + Creates a new, empty table in the specified BigQuery dataset, + optionally with schema. + + The schema to be used for the BigQuery table may be specified in one of + two ways. You may either directly pass the schema fields in, or you may + point the operator to a Google cloud storage object name. The object in + Google cloud storage must be a JSON file with the schema fields in it. + You can also create a table without schema. + + :param project_id: The project to create the table into. + :type project_id: string + :param dataset_id: The dataset to create the table into. + :type dataset_id: string + :param table_id: The Name of the table to be created. + :type table_id: string + :param schema_fields: If set, the schema field list as defined here: + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema + + **Example**: :: + + schema_fields=[{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"}, + {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}] + + :type schema_fields: list + :param gcs_schema_object: Full path to the JSON file containing schema. For + example: ``gs://test-bucket/dir1/dir2/employee_schema.json`` + :type gcs_schema_object: string + :param time_partitioning: configure optional time partitioning fields i.e. + partition by field, type and expiration as per API specifications. + + .. seealso:: + https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#timePartitioning + :type time_partitioning: dict + :param bigquery_conn_id: Reference to a specific BigQuery hook. + :type bigquery_conn_id: string + :param google_cloud_storage_conn_id: Reference to a specific Google + cloud storage hook. + :type google_cloud_storage_conn_id: string + :param delegate_to: The account to impersonate, if any. For this to + work, the service account making the request must have domain-wide + delegation enabled. + :type delegate_to: string + + **Example (with schema JSON in GCS)**: :: + + CreateTable = BigQueryCreateEmptyTableOperator( + task_id='BigQueryCreateEmptyTableOperator_task', + dataset_id='ODS', + table_id='Employees', + project_id='internal-gcp-project', + gcs_schema_object='gs://schema-bucket/employee_schema.json', + bigquery_conn_id='airflow-service-account', + google_cloud_storage_conn_id='airflow-service-account' + ) + + **Corresponding Schema file** (``employee_schema.json``): :: + + [ + { + "mode": "NULLABLE", + "name": "emp_name", + "type": "STRING" + }, + { + "mode": "REQUIRED", + "name": "salary", + "type": "INTEGER" + } + ] + + **Example (with schema in the DAG)**: :: + + CreateTable = BigQueryCreateEmptyTableOperator( + task_id='BigQueryCreateEmptyTableOperator_task', + dataset_id='ODS', + table_id='Employees', + project_id='internal-gcp-project', + schema_fields=[{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"}, + {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}], + bigquery_conn_id='airflow-service-account', + google_cloud_storage_conn_id='airflow-service-account' + ) + + """ + template_fields = ('dataset_id', 'table_id', 'project_id', 'gcs_schema_object') + ui_color = '#f0eee4' + + @apply_defaults + def __init__(self, + dataset_id, + table_id, + project_id=None, + schema_fields=None, + gcs_schema_object=None, + time_partitioning={}, + bigquery_conn_id='bigquery_default', + google_cloud_storage_conn_id='google_cloud_storage_default', + delegate_to=None, + *args, **kwargs): + + super(BigQueryCreateEmptyTableOperator, self).__init__(*args, **kwargs) + + self.project_id = project_id + self.dataset_id = dataset_id + self.table_id = table_id + self.schema_fields = schema_fields + self.gcs_schema_object = gcs_schema_object + self.bigquery_conn_id = bigquery_conn_id + self.google_cloud_storage_conn_id = google_cloud_storage_conn_id + self.delegate_to = delegate_to + self.time_partitioning = time_partitioning + + def execute(self, context): + bq_hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id, + delegate_to=self.delegate_to) + + if not self.schema_fields and self.gcs_schema_object: + + gcs_bucket, gcs_object = _parse_gcs_url(self.gcs_schema_object) + + gcs_hook = GoogleCloudStorageHook( + google_cloud_storage_conn_id=self.google_cloud_storage_conn_id, + delegate_to=self.delegate_to) + schema_fields = json.loads(gcs_hook.download( + gcs_bucket, + gcs_object).decode("utf-8")) + else: + schema_fields = self.schema_fields + + conn = bq_hook.get_conn() + cursor = conn.cursor() + + cursor.create_empty_table( + project_id=self.project_id, + dataset_id=self.dataset_id, + table_id=self.table_id, + schema_fields=schema_fields, + time_partitioning=self.time_partitioning + ) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2920d047/docs/code.rst ---------------------------------------------------------------------- diff --git a/docs/code.rst b/docs/code.rst index 883f48c..a984f29 100644 --- a/docs/code.rst +++ b/docs/code.rst @@ -98,6 +98,7 @@ Community-contributed Operators .. autoclass:: airflow.contrib.operators.bigquery_check_operator.BigQueryValueCheckOperator .. autoclass:: airflow.contrib.operators.bigquery_check_operator.BigQueryIntervalCheckOperator .. autoclass:: airflow.contrib.operators.bigquery_get_data.BigQueryGetDataOperator +.. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryCreateEmptyTableOperator .. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryOperator .. autoclass:: airflow.contrib.operators.bigquery_table_delete_operator.BigQueryTableDeleteOperator .. autoclass:: airflow.contrib.operators.bigquery_to_bigquery.BigQueryToBigQueryOperator http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2920d047/docs/integration.rst ---------------------------------------------------------------------- diff --git a/docs/integration.rst b/docs/integration.rst index 5c26f9a..261649b 100644 --- a/docs/integration.rst +++ b/docs/integration.rst @@ -348,6 +348,7 @@ BigQuery Operators - :ref:`BigQueryCheckOperator` : Performs checks against a SQL query that will return a single row with different values. - :ref:`BigQueryValueCheckOperator` : Performs a simple value check using SQL code. - :ref:`BigQueryIntervalCheckOperator` : Checks that the values of metrics given as SQL expressions are within a certain tolerance of the ones from days_back before. +- :ref:`BigQueryCreateEmptyTableOperator` : Creates a new, empty table in the specified BigQuery dataset optionally with schema. - :ref:`BigQueryOperator` : Executes BigQuery SQL queries in a specific BigQuery database. - :ref:`BigQueryToBigQueryOperator` : Copy a BigQuery table to another BigQuery table. - :ref:`BigQueryToCloudStorageOperator` : Transfers a BigQuery table to a Google Cloud Storage bucket @@ -381,6 +382,13 @@ BigQueryGetDataOperator .. autoclass:: airflow.contrib.operators.bigquery_get_data.BigQueryGetDataOperator +.. _BigQueryCreateEmptyTableOperator: + +BigQueryCreateEmptyTableOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryCreateEmptyTableOperator + .. _BigQueryOperator: BigQueryOperator http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2920d047/tests/contrib/hooks/test_gcs_hook.py ---------------------------------------------------------------------- diff --git a/tests/contrib/hooks/test_gcs_hook.py b/tests/contrib/hooks/test_gcs_hook.py new file mode 100644 index 0000000..cc87fb7 --- /dev/null +++ b/tests/contrib/hooks/test_gcs_hook.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +from airflow.exceptions import AirflowException +import airflow.contrib.hooks.gcs_hook as gcs_hook + + +class TestGCSHookHelperFunctions(unittest.TestCase): + + def test_parse_gcs_url(self): + """ + Test GCS url parsing + """ + + self.assertEqual( + gcs_hook._parse_gcs_url('gs://bucket/path/to/blob'), + ('bucket', 'path/to/blob')) + + # invalid URI + self.assertRaises( + AirflowException, + gcs_hook._parse_gcs_url, + 'gs:/bucket/path/to/blob') + + # trailing slash + self.assertEqual( + gcs_hook._parse_gcs_url('gs://bucket/path/to/blob/'), + ('bucket', 'path/to/blob')) + + # bucket only + self.assertEqual( + gcs_hook._parse_gcs_url('gs://bucket/'), + ('bucket', '')) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2920d047/tests/contrib/operators/test_bigquery_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_bigquery_operator.py b/tests/contrib/operators/test_bigquery_operator.py new file mode 100644 index 0000000..b7f7285 --- /dev/null +++ b/tests/contrib/operators/test_bigquery_operator.py @@ -0,0 +1,53 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +from airflow.contrib.operators.bigquery_operator import BigQueryCreateEmptyTableOperator + +try: + from unittest import mock +except ImportError: + try: + import mock + except ImportError: + mock = None + +TASK_ID = 'test-bq-create-empty-table-operator' +TEST_DATASET = 'test-dataset' +TEST_PROJECT_ID = 'test-project' +TEST_TABLE_ID = 'test-table-id' + + +class BigQueryCreateEmptyTableOperatorTest(unittest.TestCase): + + @mock.patch('airflow.contrib.operators.bigquery_operator.BigQueryHook') + def test_execute(self, mock_hook): + operator = BigQueryCreateEmptyTableOperator(task_id=TASK_ID, + dataset_id=TEST_DATASET, + project_id=TEST_PROJECT_ID, + table_id=TEST_TABLE_ID) + + operator.execute(None) + mock_hook.return_value \ + .get_conn() \ + .cursor() \ + .create_empty_table \ + .assert_called_once_with( + dataset_id=TEST_DATASET, + project_id=TEST_PROJECT_ID, + table_id=TEST_TABLE_ID, + schema_fields=None, + time_partitioning={} + )