Repository: incubator-airflow
Updated Branches:
  refs/heads/master 573fb991f -> c970b09c4


[AIRFLOW-539] Updated BQ hook and BQ operator to support Standard SQL.

Closes #1820 from illop/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c970b09c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c970b09c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c970b09c

Branch: refs/heads/master
Commit: c970b09c4c9c15556c7ca0792c225655df1617b5
Parents: 573fb99
Author: Ilya Rakoshes <il...@wepay.com>
Authored: Wed Oct 5 08:09:11 2016 -0700
Committer: Chris Riccomini <chr...@wepay.com>
Committed: Wed Oct 5 08:09:21 2016 -0700

----------------------------------------------------------------------
 airflow/contrib/hooks/bigquery_hook.py         | 33 +++++++++++++--------
 airflow/contrib/operators/bigquery_operator.py |  6 +++-
 2 files changed, 25 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c970b09c/airflow/contrib/hooks/bigquery_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/bigquery_hook.py 
b/airflow/contrib/hooks/bigquery_hook.py
index 833dc8a..c5b57a9 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -161,7 +161,8 @@ class BigQueryBaseCursor(object):
             self, bql, destination_dataset_table = False,
             write_disposition = 'WRITE_EMPTY',
             allow_large_results=False,
-            udf_config = False):
+            udf_config = False,
+            use_legacy_sql=True):
         """
         Executes a BigQuery SQL query. Optionally persists results in a 
BigQuery
         table. See here:
@@ -181,10 +182,13 @@ class BigQueryBaseCursor(object):
         :param udf_config: The User Defined Function configuration for the 
query.
             See https://cloud.google.com/bigquery/user-defined-functions for 
details.
         :type udf_config: list
+        :param use_legacy_sql: Whether to use legacy SQL (true) or standard 
SQL (false).
+        :type use_legacy_sql: boolean
         """
         configuration = {
             'query': {
                 'query': bql,
+                'useLegacySql': use_legacy_sql
             }
         }
 
@@ -193,7 +197,8 @@ class BigQueryBaseCursor(object):
                 'Expected destination_dataset_table in the format of '
                 '<dataset>.<table>. Got: {}').format(destination_dataset_table)
             destination_project, destination_dataset, destination_table = \
-                _split_tablename(destination_dataset_table, self.project_id)
+                _split_tablename(table_input=destination_dataset_table,
+                                 default_project_id=self.project_id)
             configuration['query'].update({
                 'allowLargeResults': allow_large_results,
                 'writeDisposition': write_disposition,
@@ -241,9 +246,9 @@ class BigQueryBaseCursor(object):
         :type print_header: boolean
         """
         source_project, source_dataset, source_table = \
-            _split_tablename(
-                source_project_dataset_table, self.project_id,
-                'source_project_dataset_table')
+            _split_tablename(table_input=source_project_dataset_table,
+                             default_project_id=self.project_id,
+                             var_name='source_project_dataset_table')
         configuration = {
             'extract': {
                 'sourceTable': {
@@ -302,9 +307,9 @@ class BigQueryBaseCursor(object):
         source_project_dataset_tables_fixup = []
         for source_project_dataset_table in source_project_dataset_tables:
             source_project, source_dataset, source_table = \
-                _split_tablename(
-                    source_project_dataset_table, self.project_id,
-                    'source_project_dataset_table')
+                _split_tablename(table_input=source_project_dataset_table,
+                                 default_project_id=self.project_id,
+                                 var_name='source_project_dataset_table')
             source_project_dataset_tables_fixup.append({
                 'projectId': source_project,
                 'datasetId': source_dataset,
@@ -312,7 +317,8 @@ class BigQueryBaseCursor(object):
             })
 
         destination_project, destination_dataset, destination_table = \
-            _split_tablename(destination_project_dataset_table, 
self.project_id)
+            _split_tablename(table_input=destination_project_dataset_table,
+                             default_project_id=self.project_id)
         configuration = {
             'copy': {
                 'createDisposition': create_disposition,
@@ -368,9 +374,9 @@ class BigQueryBaseCursor(object):
         :type field_delimiter: string
         """
         destination_project, destination_dataset, destination_table = \
-            _split_tablename(
-                destination_project_dataset_table, self.project_id,
-                'destination_project_dataset_table')
+            _split_tablename(table_input=destination_project_dataset_table,
+                             default_project_id=self.project_id,
+                             var_name='destination_project_dataset_table')
 
         configuration = {
             'load': {
@@ -495,7 +501,8 @@ class BigQueryBaseCursor(object):
             'Expected deletion_dataset_table in the format of '
             '<dataset>.<table>. Got: {}').format(deletion_dataset_table)
         deletion_project, deletion_dataset, deletion_table = \
-            _split_tablename(deletion_dataset_table, self.project_id)
+            _split_tablename(table_input=deletion_dataset_table,
+                             default_project_id=self.project_id)
 
         try:
             tables_resource = self.service.tables() \

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c970b09c/airflow/contrib/operators/bigquery_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_operator.py 
b/airflow/contrib/operators/bigquery_operator.py
index b4e4b0e..de97b7a 100644
--- a/airflow/contrib/operators/bigquery_operator.py
+++ b/airflow/contrib/operators/bigquery_operator.py
@@ -36,6 +36,7 @@ class BigQueryOperator(BaseOperator):
                  bigquery_conn_id='bigquery_default',
                  delegate_to=None,
                  udf_config=False,
+                 use_legacy_sql=True,
                  *args,
                  **kwargs):
         """
@@ -58,6 +59,8 @@ class BigQueryOperator(BaseOperator):
         :param udf_config: The User Defined Function configuration for the 
query.
             See https://cloud.google.com/bigquery/user-defined-functions for 
details.
         :type udf_config: list
+        :param use_legacy_sql: Whether to use legacy SQL (true) or standard 
SQL (false).
+        :type use_legacy_sql: boolean
         """
         super(BigQueryOperator, self).__init__(*args, **kwargs)
         self.bql = bql
@@ -67,6 +70,7 @@ class BigQueryOperator(BaseOperator):
         self.bigquery_conn_id = bigquery_conn_id
         self.delegate_to = delegate_to
         self.udf_config = udf_config
+        self.use_legacy_sql = use_legacy_sql
 
     def execute(self, context):
         logging.info('Executing: %s', str(self.bql))
@@ -75,4 +79,4 @@ class BigQueryOperator(BaseOperator):
         conn = hook.get_conn()
         cursor = conn.cursor()
         cursor.run_query(self.bql, self.destination_dataset_table, 
self.write_disposition,
-                         self.allow_large_results, self.udf_config)
+                         self.allow_large_results, self.udf_config, 
self.use_legacy_sql)

Reply via email to