Repository: incubator-airflow
Updated Branches:
  refs/heads/master 4751abf8a -> 2920d0475


[AIRFLOW-2066] Add operator to create empty BQ table

- Add operator that creates a new, empty table in
the specified BigQuery dataset, optionally with
schema.

Closes #3006 from kaxil/bq_empty_table_op


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2920d047
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2920d047
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2920d047

Branch: refs/heads/master
Commit: 2920d047541c0c410e7db72c7ae81a6ee85bb08c
Parents: 4751abf
Author: Kaxil Naik <kaxiln...@gmail.com>
Authored: Fri Feb 9 10:04:18 2018 +0100
Committer: Fokko Driesprong <fokkodriespr...@godatadriven.com>
Committed: Fri Feb 9 10:04:18 2018 +0100

----------------------------------------------------------------------
 airflow/contrib/hooks/bigquery_hook.py          |  65 +++++++++
 airflow/contrib/hooks/gcs_hook.py               |  22 +++
 airflow/contrib/operators/bigquery_operator.py  | 146 +++++++++++++++++++
 docs/code.rst                                   |   1 +
 docs/integration.rst                            |   8 +
 tests/contrib/hooks/test_gcs_hook.py            |  46 ++++++
 .../contrib/operators/test_bigquery_operator.py |  53 +++++++
 7 files changed, 341 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2920d047/airflow/contrib/hooks/bigquery_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/bigquery_hook.py 
b/airflow/contrib/hooks/bigquery_hook.py
index e0dea46..653cb1b 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -207,6 +207,71 @@ class BigQueryBaseCursor(LoggingMixin):
         self.use_legacy_sql = use_legacy_sql
         self.running_job_id = None
 
+    def create_empty_table(self,
+                           project_id,
+                           dataset_id,
+                           table_id,
+                           schema_fields=None,
+                           time_partitioning={}
+                           ):
+        """
+        Creates a new, empty table in the dataset.
+
+        :param project_id: The project to create the table into.
+        :type project_id: str
+        :param dataset_id: The dataset to create the table into.
+        :type dataset_id: str
+        :param table_id: The Name of the table to be created.
+        :type table_id: str
+        :param schema_fields: If set, the schema field list as defined here:
+        
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema
+
+        **Example**: ::
+
+            schema_fields=[{"name": "emp_name", "type": "STRING", "mode": 
"REQUIRED"},
+                           {"name": "salary", "type": "INTEGER", "mode": 
"NULLABLE"}]
+
+        :type schema_fields: list
+        :param time_partitioning: configure optional time partitioning fields 
i.e.
+            partition by field, type and expiration as per API specifications.
+
+            .. seealso::
+            
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#timePartitioning
+        :type time_partitioning: dict
+
+        :return:
+        """
+        project_id = project_id if project_id is not None else self.project_id
+
+        table_resource = {
+            'tableReference': {
+                'tableId': table_id
+            }
+        }
+
+        if schema_fields:
+            table_resource['schema'] = {'fields': schema_fields}
+
+        if time_partitioning:
+            table_resource['timePartitioning'] = time_partitioning
+
+        self.log.info('Creating Table %s:%s.%s',
+                      project_id, dataset_id, table_id)
+
+        try:
+            self.service.tables().insert(
+                projectId=project_id,
+                datasetId=dataset_id,
+                body=table_resource).execute()
+
+            self.log.info('Table created successfully: %s:%s.%s',
+                          project_id, dataset_id, table_id)
+
+        except HttpError as err:
+            raise AirflowException(
+                'BigQuery job failed. Error was: {}'.format(err.content)
+            )
+
     def create_external_table(self,
                               external_project_dataset_table,
                               schema_fields,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2920d047/airflow/contrib/hooks/gcs_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcs_hook.py 
b/airflow/contrib/hooks/gcs_hook.py
index f959f95..5312daa 100644
--- a/airflow/contrib/hooks/gcs_hook.py
+++ b/airflow/contrib/hooks/gcs_hook.py
@@ -17,6 +17,7 @@ from apiclient.http import MediaFileUpload
 from googleapiclient import errors
 
 from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
+from airflow.exceptions import AirflowException
 
 
 class GoogleCloudStorageHook(GoogleCloudBaseHook):
@@ -352,3 +353,24 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook):
         except errors.HttpError as ex:
             if ex.resp['status'] == '404':
                 raise ValueError('Object Not Found')
+
+
+def _parse_gcs_url(gsurl):
+    """
+    Given a Google Cloud Storage URL (gs://<bucket>/<blob>), returns a
+    tuple containing the corresponding bucket and blob.
+    """
+    # Python 3
+    try:
+        from urllib.parse import urlparse
+    # Python 2
+    except ImportError:
+        from urlparse import urlparse
+
+    parsed_url = urlparse(gsurl)
+    if not parsed_url.netloc:
+        raise AirflowException('Please provide a bucket name')
+    else:
+        bucket = parsed_url.netloc
+        blob = parsed_url.path.strip('/')
+        return bucket, blob

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2920d047/airflow/contrib/operators/bigquery_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_operator.py 
b/airflow/contrib/operators/bigquery_operator.py
index e24315d..b1d64b8 100644
--- a/airflow/contrib/operators/bigquery_operator.py
+++ b/airflow/contrib/operators/bigquery_operator.py
@@ -12,7 +12,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import json
+
 from airflow.contrib.hooks.bigquery_hook import BigQueryHook
+from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook, 
_parse_gcs_url
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
 
@@ -121,3 +124,146 @@ class BigQueryOperator(BaseOperator):
         if self.bq_cursor is not None:
             self.log.info('Canceling running query due to execution timeout')
             self.bq_cursor.cancel_query()
+
+
+class BigQueryCreateEmptyTableOperator(BaseOperator):
+    """
+    Creates a new, empty table in the specified BigQuery dataset,
+    optionally with schema.
+
+    The schema to be used for the BigQuery table may be specified in one of
+    two ways. You may either directly pass the schema fields in, or you may
+    point the operator to a Google cloud storage object name. The object in
+    Google cloud storage must be a JSON file with the schema fields in it.
+    You can also create a table without schema.
+
+    :param project_id: The project to create the table into.
+    :type project_id: string
+    :param dataset_id: The dataset to create the table into.
+    :type dataset_id: string
+    :param table_id: The Name of the table to be created.
+    :type table_id: string
+    :param schema_fields: If set, the schema field list as defined here:
+        
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema
+
+        **Example**: ::
+
+            schema_fields=[{"name": "emp_name", "type": "STRING", "mode": 
"REQUIRED"},
+                           {"name": "salary", "type": "INTEGER", "mode": 
"NULLABLE"}]
+
+    :type schema_fields: list
+    :param gcs_schema_object: Full path to the JSON file containing schema. For
+        example: ``gs://test-bucket/dir1/dir2/employee_schema.json``
+    :type gcs_schema_object: string
+    :param time_partitioning: configure optional time partitioning fields i.e.
+        partition by field, type and  expiration as per API specifications.
+
+        .. seealso::
+            
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#timePartitioning
+    :type time_partitioning: dict
+    :param bigquery_conn_id: Reference to a specific BigQuery hook.
+    :type bigquery_conn_id: string
+    :param google_cloud_storage_conn_id: Reference to a specific Google
+        cloud storage hook.
+    :type google_cloud_storage_conn_id: string
+    :param delegate_to: The account to impersonate, if any. For this to
+        work, the service account making the request must have domain-wide
+        delegation enabled.
+    :type delegate_to: string
+
+    **Example (with schema JSON in GCS)**: ::
+
+        CreateTable = BigQueryCreateEmptyTableOperator(
+            task_id='BigQueryCreateEmptyTableOperator_task',
+            dataset_id='ODS',
+            table_id='Employees',
+            project_id='internal-gcp-project',
+            gcs_schema_object='gs://schema-bucket/employee_schema.json',
+            bigquery_conn_id='airflow-service-account',
+            google_cloud_storage_conn_id='airflow-service-account'
+        )
+
+    **Corresponding Schema file** (``employee_schema.json``): ::
+
+        [
+          {
+            "mode": "NULLABLE",
+            "name": "emp_name",
+            "type": "STRING"
+          },
+          {
+            "mode": "REQUIRED",
+            "name": "salary",
+            "type": "INTEGER"
+          }
+        ]
+
+    **Example (with schema in the DAG)**: ::
+
+        CreateTable = BigQueryCreateEmptyTableOperator(
+            task_id='BigQueryCreateEmptyTableOperator_task',
+            dataset_id='ODS',
+            table_id='Employees',
+            project_id='internal-gcp-project',
+            schema_fields=[{"name": "emp_name", "type": "STRING", "mode": 
"REQUIRED"},
+                           {"name": "salary", "type": "INTEGER", "mode": 
"NULLABLE"}],
+            bigquery_conn_id='airflow-service-account',
+            google_cloud_storage_conn_id='airflow-service-account'
+        )
+
+    """
+    template_fields = ('dataset_id', 'table_id', 'project_id', 
'gcs_schema_object')
+    ui_color = '#f0eee4'
+
+    @apply_defaults
+    def __init__(self,
+                 dataset_id,
+                 table_id,
+                 project_id=None,
+                 schema_fields=None,
+                 gcs_schema_object=None,
+                 time_partitioning={},
+                 bigquery_conn_id='bigquery_default',
+                 google_cloud_storage_conn_id='google_cloud_storage_default',
+                 delegate_to=None,
+                 *args, **kwargs):
+
+        super(BigQueryCreateEmptyTableOperator, self).__init__(*args, **kwargs)
+
+        self.project_id = project_id
+        self.dataset_id = dataset_id
+        self.table_id = table_id
+        self.schema_fields = schema_fields
+        self.gcs_schema_object = gcs_schema_object
+        self.bigquery_conn_id = bigquery_conn_id
+        self.google_cloud_storage_conn_id = google_cloud_storage_conn_id
+        self.delegate_to = delegate_to
+        self.time_partitioning = time_partitioning
+
+    def execute(self, context):
+        bq_hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
+                               delegate_to=self.delegate_to)
+
+        if not self.schema_fields and self.gcs_schema_object:
+
+            gcs_bucket, gcs_object = _parse_gcs_url(self.gcs_schema_object)
+
+            gcs_hook = GoogleCloudStorageHook(
+                google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
+                delegate_to=self.delegate_to)
+            schema_fields = json.loads(gcs_hook.download(
+                gcs_bucket,
+                gcs_object).decode("utf-8"))
+        else:
+            schema_fields = self.schema_fields
+
+        conn = bq_hook.get_conn()
+        cursor = conn.cursor()
+
+        cursor.create_empty_table(
+            project_id=self.project_id,
+            dataset_id=self.dataset_id,
+            table_id=self.table_id,
+            schema_fields=schema_fields,
+            time_partitioning=self.time_partitioning
+        )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2920d047/docs/code.rst
----------------------------------------------------------------------
diff --git a/docs/code.rst b/docs/code.rst
index 883f48c..a984f29 100644
--- a/docs/code.rst
+++ b/docs/code.rst
@@ -98,6 +98,7 @@ Community-contributed Operators
 .. autoclass:: 
airflow.contrib.operators.bigquery_check_operator.BigQueryValueCheckOperator
 .. autoclass:: 
airflow.contrib.operators.bigquery_check_operator.BigQueryIntervalCheckOperator
 .. autoclass:: 
airflow.contrib.operators.bigquery_get_data.BigQueryGetDataOperator
+.. autoclass:: 
airflow.contrib.operators.bigquery_operator.BigQueryCreateEmptyTableOperator
 .. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryOperator
 .. autoclass:: 
airflow.contrib.operators.bigquery_table_delete_operator.BigQueryTableDeleteOperator
 .. autoclass:: 
airflow.contrib.operators.bigquery_to_bigquery.BigQueryToBigQueryOperator

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2920d047/docs/integration.rst
----------------------------------------------------------------------
diff --git a/docs/integration.rst b/docs/integration.rst
index 5c26f9a..261649b 100644
--- a/docs/integration.rst
+++ b/docs/integration.rst
@@ -348,6 +348,7 @@ BigQuery Operators
 - :ref:`BigQueryCheckOperator` : Performs checks against a SQL query that will 
return a single row with different values.
 - :ref:`BigQueryValueCheckOperator` : Performs a simple value check using SQL 
code.
 - :ref:`BigQueryIntervalCheckOperator` : Checks that the values of metrics 
given as SQL expressions are within a certain tolerance of the ones from 
days_back before.
+- :ref:`BigQueryCreateEmptyTableOperator` : Creates a new, empty table in the 
specified BigQuery dataset optionally with schema.
 - :ref:`BigQueryOperator` : Executes BigQuery SQL queries in a specific 
BigQuery database.
 - :ref:`BigQueryToBigQueryOperator` : Copy a BigQuery table to another 
BigQuery table.
 - :ref:`BigQueryToCloudStorageOperator` : Transfers a BigQuery table to a 
Google Cloud Storage bucket
@@ -381,6 +382,13 @@ BigQueryGetDataOperator
 
 .. autoclass:: 
airflow.contrib.operators.bigquery_get_data.BigQueryGetDataOperator
 
+.. _BigQueryCreateEmptyTableOperator:
+
+BigQueryCreateEmptyTableOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+.. autoclass:: 
airflow.contrib.operators.bigquery_operator.BigQueryCreateEmptyTableOperator
+
 .. _BigQueryOperator:
 
 BigQueryOperator

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2920d047/tests/contrib/hooks/test_gcs_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_gcs_hook.py 
b/tests/contrib/hooks/test_gcs_hook.py
new file mode 100644
index 0000000..cc87fb7
--- /dev/null
+++ b/tests/contrib/hooks/test_gcs_hook.py
@@ -0,0 +1,46 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+
+from airflow.exceptions import AirflowException
+import airflow.contrib.hooks.gcs_hook as gcs_hook
+
+
+class TestGCSHookHelperFunctions(unittest.TestCase):
+
+    def test_parse_gcs_url(self):
+        """
+        Test GCS url parsing
+        """
+
+        self.assertEqual(
+            gcs_hook._parse_gcs_url('gs://bucket/path/to/blob'),
+            ('bucket', 'path/to/blob'))
+
+        # invalid URI
+        self.assertRaises(
+            AirflowException,
+            gcs_hook._parse_gcs_url,
+            'gs:/bucket/path/to/blob')
+
+        # trailing slash
+        self.assertEqual(
+            gcs_hook._parse_gcs_url('gs://bucket/path/to/blob/'),
+            ('bucket', 'path/to/blob'))
+
+        # bucket only
+        self.assertEqual(
+            gcs_hook._parse_gcs_url('gs://bucket/'),
+            ('bucket', ''))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2920d047/tests/contrib/operators/test_bigquery_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_bigquery_operator.py 
b/tests/contrib/operators/test_bigquery_operator.py
new file mode 100644
index 0000000..b7f7285
--- /dev/null
+++ b/tests/contrib/operators/test_bigquery_operator.py
@@ -0,0 +1,53 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+
+from airflow.contrib.operators.bigquery_operator import 
BigQueryCreateEmptyTableOperator
+
+try:
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+TASK_ID = 'test-bq-create-empty-table-operator'
+TEST_DATASET = 'test-dataset'
+TEST_PROJECT_ID = 'test-project'
+TEST_TABLE_ID = 'test-table-id'
+
+
+class BigQueryCreateEmptyTableOperatorTest(unittest.TestCase):
+
+    @mock.patch('airflow.contrib.operators.bigquery_operator.BigQueryHook')
+    def test_execute(self, mock_hook):
+        operator = BigQueryCreateEmptyTableOperator(task_id=TASK_ID,
+                                                    dataset_id=TEST_DATASET,
+                                                    project_id=TEST_PROJECT_ID,
+                                                    table_id=TEST_TABLE_ID)
+
+        operator.execute(None)
+        mock_hook.return_value \
+            .get_conn() \
+            .cursor() \
+            .create_empty_table \
+            .assert_called_once_with(
+                dataset_id=TEST_DATASET,
+                project_id=TEST_PROJECT_ID,
+                table_id=TEST_TABLE_ID,
+                schema_fields=None,
+                time_partitioning={}
+            )

Reply via email to