BEAM-873 Support for BigQuery 2 SQL
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0a339653 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0a339653 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0a339653 Branch: refs/heads/python-sdk Commit: 0a339653b0c7f6225a2b12e6c0988af8ad758098 Parents: 7270471 Author: Sourabh Bajaj <sourabhba...@google.com> Authored: Fri Oct 28 13:59:23 2016 -0700 Committer: Robert Bradshaw <rober...@google.com> Committed: Thu Nov 3 10:38:44 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/bigquery.py | 26 ++++++++++++++------ .../apache_beam/runners/dataflow_runner.py | 2 ++ sdks/python/apache_beam/utils/names.py | 1 + 3 files changed, 21 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0a339653/sdks/python/apache_beam/io/bigquery.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py index 4eecaa5..41b1bdc 100644 --- a/sdks/python/apache_beam/io/bigquery.py +++ b/sdks/python/apache_beam/io/bigquery.py @@ -284,7 +284,7 @@ class BigQuerySource(dataflow_io.NativeSource): """A source based on a BigQuery table.""" def __init__(self, table=None, dataset=None, project=None, query=None, - validate=False, coder=None): + validate=False, coder=None, use_legacy_sql=True): """Initialize a BigQuerySource. Args: @@ -312,6 +312,10 @@ class BigQuerySource(dataflow_io.NativeSource): in a file as a JSON serialized dictionary. This argument needs a value only in special cases when returning table rows as dictionaries is not desirable. + useLegacySql: Specifies whether to use BigQuery's legacy + SQL dialect for this query. The default value is true. If set to false, + the query will use BigQuery's updated SQL dialect with improved + standards compliance. This parameter is forced to True for table inputs. Raises: ValueError: if any of the following is true @@ -328,8 +332,10 @@ class BigQuerySource(dataflow_io.NativeSource): elif table is not None: self.table_reference = _parse_table_reference(table, dataset, project) self.query = None + self.use_legacy_sql = True else: self.query = query + self.use_legacy_sql = use_legacy_sql self.table_reference = None self.validate = validate @@ -342,7 +348,9 @@ class BigQuerySource(dataflow_io.NativeSource): def reader(self, test_bigquery_client=None): return BigQueryReader( - source=self, test_bigquery_client=test_bigquery_client) + source=self, + test_bigquery_client=test_bigquery_client, + use_legacy_sql=self.use_legacy_sql) class BigQuerySink(dataflow_io.NativeSink): @@ -462,7 +470,7 @@ class BigQuerySink(dataflow_io.NativeSink): class BigQueryReader(dataflow_io.NativeSourceReader): """A reader for a BigQuery source.""" - def __init__(self, source, test_bigquery_client=None): + def __init__(self, source, test_bigquery_client=None, use_legacy_sql=True): self.source = source self.test_bigquery_client = test_bigquery_client if auth.is_running_in_gce: @@ -484,6 +492,7 @@ class BigQueryReader(dataflow_io.NativeSourceReader): # for reading the field values in each row but could be useful for # getting additional details. self.schema = None + self.use_legacy_sql = use_legacy_sql if self.source.query is None: # If table schema did not define a project we default to executing # project. @@ -506,7 +515,8 @@ class BigQueryReader(dataflow_io.NativeSourceReader): def __iter__(self): for rows, schema in self.client.run_query( - project_id=self.executing_project, query=self.query): + project_id=self.executing_project, query=self.query, + use_legacy_sql=self.use_legacy_sql): if self.schema is None: self.schema = schema for row in rows: @@ -607,14 +617,14 @@ class BigQueryWrapper(object): return '%s_%d' % (self._row_id_prefix, self._unique_row_id) @retry.with_exponential_backoff() # Using retry defaults from utils/retry.py - def _start_query_job(self, project_id, query, dry_run=False): + def _start_query_job(self, project_id, query, use_legacy_sql, dry_run=False): request = bigquery.BigqueryJobsInsertRequest( projectId=project_id, job=bigquery.Job( configuration=bigquery.JobConfiguration( dryRun=dry_run, query=bigquery.JobConfigurationQuery( - query=query)))) + query=query, useLegacySql=use_legacy_sql)))) response = self.client.jobs.Insert(request) return response.jobReference.jobId @@ -745,8 +755,8 @@ class BigQueryWrapper(object): table_id=table_id, schema=schema or found_table.schema) - def run_query(self, project_id, query, dry_run=False): - job_id = self._start_query_job(project_id, query, dry_run) + def run_query(self, project_id, query, use_legacy_sql, dry_run=False): + job_id = self._start_query_job(project_id, query, use_legacy_sql, dry_run) if dry_run: # If this was a dry run then the fact that we get here means the # query has no errors. The start_query_job would raise an error otherwise. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0a339653/sdks/python/apache_beam/runners/dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py index 1d0398c..57867fa 100644 --- a/sdks/python/apache_beam/runners/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow_runner.py @@ -528,6 +528,8 @@ class DataflowPipelineRunner(PipelineRunner): transform.source.table_reference.projectId) elif transform.source.query is not None: step.add_property(PropertyNames.BIGQUERY_QUERY, transform.source.query) + step.add_property(PropertyNames.BIGQUERY_USE_LEGACY_SQL, + transform.source.use_legacy_sql) else: raise ValueError('BigQuery source %r must specify either a table or' ' a query', http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0a339653/sdks/python/apache_beam/utils/names.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/names.py b/sdks/python/apache_beam/utils/names.py index 41b5b43..be8c92a 100644 --- a/sdks/python/apache_beam/utils/names.py +++ b/sdks/python/apache_beam/utils/names.py @@ -45,6 +45,7 @@ class PropertyNames(object): BIGQUERY_CREATE_DISPOSITION = 'create_disposition' BIGQUERY_DATASET = 'dataset' BIGQUERY_QUERY = 'bigquery_query' + BIGQUERY_USE_LEGACY_SQL = 'bigquery_use_legacy_sql' BIGQUERY_TABLE = 'table' BIGQUERY_PROJECT = 'project' BIGQUERY_SCHEMA = 'schema'