Repository: incubator-airflow Updated Branches: refs/heads/master 573fb991f -> c970b09c4
[AIRFLOW-539] Updated BQ hook and BQ operator to support Standard SQL. Closes #1820 from illop/master Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c970b09c Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c970b09c Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c970b09c Branch: refs/heads/master Commit: c970b09c4c9c15556c7ca0792c225655df1617b5 Parents: 573fb99 Author: Ilya Rakoshes <il...@wepay.com> Authored: Wed Oct 5 08:09:11 2016 -0700 Committer: Chris Riccomini <chr...@wepay.com> Committed: Wed Oct 5 08:09:21 2016 -0700 ---------------------------------------------------------------------- airflow/contrib/hooks/bigquery_hook.py | 33 +++++++++++++-------- airflow/contrib/operators/bigquery_operator.py | 6 +++- 2 files changed, 25 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c970b09c/airflow/contrib/hooks/bigquery_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py index 833dc8a..c5b57a9 100644 --- a/airflow/contrib/hooks/bigquery_hook.py +++ b/airflow/contrib/hooks/bigquery_hook.py @@ -161,7 +161,8 @@ class BigQueryBaseCursor(object): self, bql, destination_dataset_table = False, write_disposition = 'WRITE_EMPTY', allow_large_results=False, - udf_config = False): + udf_config = False, + use_legacy_sql=True): """ Executes a BigQuery SQL query. Optionally persists results in a BigQuery table. See here: @@ -181,10 +182,13 @@ class BigQueryBaseCursor(object): :param udf_config: The User Defined Function configuration for the query. See https://cloud.google.com/bigquery/user-defined-functions for details. :type udf_config: list + :param use_legacy_sql: Whether to use legacy SQL (true) or standard SQL (false). + :type use_legacy_sql: boolean """ configuration = { 'query': { 'query': bql, + 'useLegacySql': use_legacy_sql } } @@ -193,7 +197,8 @@ class BigQueryBaseCursor(object): 'Expected destination_dataset_table in the format of ' '<dataset>.<table>. Got: {}').format(destination_dataset_table) destination_project, destination_dataset, destination_table = \ - _split_tablename(destination_dataset_table, self.project_id) + _split_tablename(table_input=destination_dataset_table, + default_project_id=self.project_id) configuration['query'].update({ 'allowLargeResults': allow_large_results, 'writeDisposition': write_disposition, @@ -241,9 +246,9 @@ class BigQueryBaseCursor(object): :type print_header: boolean """ source_project, source_dataset, source_table = \ - _split_tablename( - source_project_dataset_table, self.project_id, - 'source_project_dataset_table') + _split_tablename(table_input=source_project_dataset_table, + default_project_id=self.project_id, + var_name='source_project_dataset_table') configuration = { 'extract': { 'sourceTable': { @@ -302,9 +307,9 @@ class BigQueryBaseCursor(object): source_project_dataset_tables_fixup = [] for source_project_dataset_table in source_project_dataset_tables: source_project, source_dataset, source_table = \ - _split_tablename( - source_project_dataset_table, self.project_id, - 'source_project_dataset_table') + _split_tablename(table_input=source_project_dataset_table, + default_project_id=self.project_id, + var_name='source_project_dataset_table') source_project_dataset_tables_fixup.append({ 'projectId': source_project, 'datasetId': source_dataset, @@ -312,7 +317,8 @@ class BigQueryBaseCursor(object): }) destination_project, destination_dataset, destination_table = \ - _split_tablename(destination_project_dataset_table, self.project_id) + _split_tablename(table_input=destination_project_dataset_table, + default_project_id=self.project_id) configuration = { 'copy': { 'createDisposition': create_disposition, @@ -368,9 +374,9 @@ class BigQueryBaseCursor(object): :type field_delimiter: string """ destination_project, destination_dataset, destination_table = \ - _split_tablename( - destination_project_dataset_table, self.project_id, - 'destination_project_dataset_table') + _split_tablename(table_input=destination_project_dataset_table, + default_project_id=self.project_id, + var_name='destination_project_dataset_table') configuration = { 'load': { @@ -495,7 +501,8 @@ class BigQueryBaseCursor(object): 'Expected deletion_dataset_table in the format of ' '<dataset>.<table>. Got: {}').format(deletion_dataset_table) deletion_project, deletion_dataset, deletion_table = \ - _split_tablename(deletion_dataset_table, self.project_id) + _split_tablename(table_input=deletion_dataset_table, + default_project_id=self.project_id) try: tables_resource = self.service.tables() \ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c970b09c/airflow/contrib/operators/bigquery_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/bigquery_operator.py b/airflow/contrib/operators/bigquery_operator.py index b4e4b0e..de97b7a 100644 --- a/airflow/contrib/operators/bigquery_operator.py +++ b/airflow/contrib/operators/bigquery_operator.py @@ -36,6 +36,7 @@ class BigQueryOperator(BaseOperator): bigquery_conn_id='bigquery_default', delegate_to=None, udf_config=False, + use_legacy_sql=True, *args, **kwargs): """ @@ -58,6 +59,8 @@ class BigQueryOperator(BaseOperator): :param udf_config: The User Defined Function configuration for the query. See https://cloud.google.com/bigquery/user-defined-functions for details. :type udf_config: list + :param use_legacy_sql: Whether to use legacy SQL (true) or standard SQL (false). + :type use_legacy_sql: boolean """ super(BigQueryOperator, self).__init__(*args, **kwargs) self.bql = bql @@ -67,6 +70,7 @@ class BigQueryOperator(BaseOperator): self.bigquery_conn_id = bigquery_conn_id self.delegate_to = delegate_to self.udf_config = udf_config + self.use_legacy_sql = use_legacy_sql def execute(self, context): logging.info('Executing: %s', str(self.bql)) @@ -75,4 +79,4 @@ class BigQueryOperator(BaseOperator): conn = hook.get_conn() cursor = conn.cursor() cursor.run_query(self.bql, self.destination_dataset_table, self.write_disposition, - self.allow_large_results, self.udf_config) + self.allow_large_results, self.udf_config, self.use_legacy_sql)