Repository: incubator-beam Updated Branches: refs/heads/python-sdk 701aff074 -> 345fc6985
Set allow_nan=False on bigquery JSON encoding Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e173765a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e173765a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e173765a Branch: refs/heads/python-sdk Commit: e173765adffa4ca13f495a3f7e9fbfc2127cdc1e Parents: 701aff0 Author: Alex Amato <ajam...@ajamato2016.sea.corp.google.com> Authored: Thu Sep 8 17:57:28 2016 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Wed Sep 21 15:42:58 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/bigquery.py | 24 +++++++++++---- sdks/python/apache_beam/io/bigquery_test.py | 37 ++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e173765a/sdks/python/apache_beam/io/bigquery.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py index 50c2eaf..5508eaa 100644 --- a/sdks/python/apache_beam/io/bigquery.py +++ b/sdks/python/apache_beam/io/bigquery.py @@ -136,6 +136,8 @@ __all__ = [ 'BigQuerySink', ] +JSON_COMPLIANCE_ERROR = 'NAN, INF and -INF values are not JSON compliant.' + class RowAsDictJsonCoder(coders.Coder): """A coder for a table row (represented as a dict) to/from a JSON string. @@ -145,7 +147,14 @@ class RowAsDictJsonCoder(coders.Coder): """ def encode(self, table_row): - return json.dumps(table_row) + # The normal error when dumping NAN/INF values is: + # ValueError: Out of range float values are not JSON compliant + # This code will catch this error to emit an error that explains + # to the programmer that they have used NAN/INF values. + try: + return json.dumps(table_row, allow_nan=False) + except ValueError as e: + raise ValueError('%s. %s' % (e, JSON_COMPLIANCE_ERROR)) def decode(self, encoded_table_row): return json.loads(encoded_table_row) @@ -173,10 +182,14 @@ class TableRowJsonCoder(coders.Coder): raise AttributeError( 'The TableRowJsonCoder requires a table schema for ' 'encoding operations. Please specify a table_schema argument.') - return json.dumps( - collections.OrderedDict( - zip(self.field_names, - [from_json_value(f.v) for f in table_row.f]))) + try: + return json.dumps( + collections.OrderedDict( + zip(self.field_names, + [from_json_value(f.v) for f in table_row.f])), + allow_nan=False) + except ValueError as e: + raise ValueError('%s. %s' % (e, JSON_COMPLIANCE_ERROR)) def decode(self, encoded_table_row): od = json.loads( @@ -428,7 +441,6 @@ class BigQuerySink(iobase.NativeSink): fs['fields'] = schema_list_as_object(f.fields) fields.append(fs) return fields - return json.dumps( {'fields': schema_list_as_object(self.table_schema.fields)}) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e173765a/sdks/python/apache_beam/io/bigquery_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/bigquery_test.py b/sdks/python/apache_beam/io/bigquery_test.py index 2bca0dc..09e64ce 100644 --- a/sdks/python/apache_beam/io/bigquery_test.py +++ b/sdks/python/apache_beam/io/bigquery_test.py @@ -40,6 +40,22 @@ class TestRowAsDictJsonCoder(unittest.TestCase): test_value = {'s': 'abc', 'i': 123, 'f': 123.456, 'b': True} self.assertEqual(test_value, coder.decode(coder.encode(test_value))) + def json_compliance_exception(self, value): + with self.assertRaises(ValueError) as exn: + coder = RowAsDictJsonCoder() + test_value = {'s': value} + self.assertEqual(test_value, coder.decode(coder.encode(test_value))) + self.assertTrue(bigquery.JSON_COMPLIANCE_ERROR in exn.exception.message) + + def test_invalid_json_nan(self): + self.json_compliance_exception(float('nan')) + + def test_invalid_json_inf(self): + self.json_compliance_exception(float('inf')) + + def test_invalid_json_neg_inf(self): + self.json_compliance_exception(float('-inf')) + class TestTableRowJsonCoder(unittest.TestCase): @@ -71,6 +87,27 @@ class TestTableRowJsonCoder(unittest.TestCase): self.assertTrue( ctx.exception.message.startswith('The TableRowJsonCoder requires')) + def json_compliance_exception(self, value): + with self.assertRaises(ValueError) as exn: + schema_definition = [('f', 'FLOAT')] + schema = bigquery.TableSchema( + fields=[bigquery.TableFieldSchema(name=k, type=v) + for k, v in schema_definition]) + coder = TableRowJsonCoder(table_schema=schema) + test_row = bigquery.TableRow( + f=[bigquery.TableCell(v=to_json_value(value))]) + coder.encode(test_row) + self.assertTrue(bigquery.JSON_COMPLIANCE_ERROR in exn.exception.message) + + def test_invalid_json_nan(self): + self.json_compliance_exception(float('nan')) + + def test_invalid_json_inf(self): + self.json_compliance_exception(float('inf')) + + def test_invalid_json_neg_inf(self): + self.json_compliance_exception(float('-inf')) + class TestBigQuerySource(unittest.TestCase):