Repository: incubator-beam Updated Branches: refs/heads/python-sdk aa9071d56 -> 739a43197
Parse table schema from JSON Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b4c2f62b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b4c2f62b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b4c2f62b Branch: refs/heads/python-sdk Commit: b4c2f62be8a809b666089e7b2fe5dada9cbd6c16 Parents: aa9071d Author: Sourabh Bajaj <sourabhba...@google.com> Authored: Wed Nov 30 13:48:28 2016 -0800 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Thu Dec 1 09:07:27 2016 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/io/bigquery.py | 37 ++++++++++++++++++++++++ sdks/python/apache_beam/io/bigquery_test.py | 22 ++++++++++++++ 2 files changed, 59 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b4c2f62b/sdks/python/apache_beam/io/bigquery.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py index 8d7892a..0885e3a 100644 --- a/sdks/python/apache_beam/io/bigquery.py +++ b/sdks/python/apache_beam/io/bigquery.py @@ -200,6 +200,43 @@ class TableRowJsonCoder(coders.Coder): f=[bigquery.TableCell(v=to_json_value(e)) for e in od.itervalues()]) +def parse_table_schema_from_json(schema_string): + """Parse the Table Schema provided as string. + + Args: + schema_string: String serialized table schema, should be a valid JSON. + + Returns: + A TableSchema of the BigQuery export from either the Query or the Table. + """ + json_schema = json.loads(schema_string) + + def _parse_schema_field(field): + """Parse a single schema field from dictionary. + + Args: + field: Dictionary object containing serialized schema. + + Returns: + A TableFieldSchema for a single column in BigQuery. + """ + schema = bigquery.TableFieldSchema() + schema.name = field['name'] + schema.type = field['type'] + if 'mode' in field: + schema.mode = field['mode'] + else: + schema.mode = 'NULLABLE' + if 'description' in field: + schema.description = field['description'] + if 'fields' in field: + schema.fields = [_parse_schema_field(x) for x in field['fields']] + return schema + + fields = [_parse_schema_field(f) for f in json_schema['fields']] + return bigquery.TableSchema(fields=fields) + + class BigQueryDisposition(object): """Class holding standard strings used for create and write dispositions.""" http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b4c2f62b/sdks/python/apache_beam/io/bigquery_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/bigquery_test.py b/sdks/python/apache_beam/io/bigquery_test.py index b0c3bbe..e263e13 100644 --- a/sdks/python/apache_beam/io/bigquery_test.py +++ b/sdks/python/apache_beam/io/bigquery_test.py @@ -32,6 +32,7 @@ from apache_beam.internal.clients import bigquery from apache_beam.internal.json_value import to_json_value from apache_beam.io.bigquery import RowAsDictJsonCoder from apache_beam.io.bigquery import TableRowJsonCoder +from apache_beam.io.bigquery import parse_table_schema_from_json from apache_beam.transforms.display import DisplayData from apache_beam.transforms.display_test import DisplayDataItemMatcher from apache_beam.utils.options import PipelineOptions @@ -113,6 +114,27 @@ class TestTableRowJsonCoder(unittest.TestCase): self.json_compliance_exception(float('-inf')) +class TestTableSchemaParser(unittest.TestCase): + def test_parse_table_schema_from_json(self): + string_field = bigquery.TableFieldSchema( + name='s', type='STRING', mode='NULLABLE', description='s description') + number_field = bigquery.TableFieldSchema( + name='n', type='INTEGER', mode='REQUIRED', description='n description') + record_field = bigquery.TableFieldSchema( + name='r', type='RECORD', mode='REQUIRED', description='r description', + fields=[string_field, number_field]) + expected_schema = bigquery.TableSchema(fields=[record_field]) + json_str = json.dumps({'fields': [ + {'name': 'r', 'type': 'RECORD', 'mode': 'REQUIRED', + 'description': 'r description', 'fields': [ + {'name': 's', 'type': 'STRING', 'mode': 'NULLABLE', + 'description': 's description'}, + {'name': 'n', 'type': 'INTEGER', 'mode': 'REQUIRED', + 'description': 'n description'}]}]}) + self.assertEqual(parse_table_schema_from_json(json_str), + expected_schema) + + class TestBigQuerySource(unittest.TestCase): def test_display_data_item_on_validate_true(self):