Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 701aff074 -> 345fc6985


Set allow_nan=False on bigquery JSON encoding


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e173765a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e173765a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e173765a

Branch: refs/heads/python-sdk
Commit: e173765adffa4ca13f495a3f7e9fbfc2127cdc1e
Parents: 701aff0
Author: Alex Amato <ajam...@ajamato2016.sea.corp.google.com>
Authored: Thu Sep 8 17:57:28 2016 -0700
Committer: Dan Halperin <dhalp...@google.com>
Committed: Wed Sep 21 15:42:58 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/bigquery.py      | 24 +++++++++++----
 sdks/python/apache_beam/io/bigquery_test.py | 37 ++++++++++++++++++++++++
 2 files changed, 55 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e173765a/sdks/python/apache_beam/io/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/bigquery.py 
b/sdks/python/apache_beam/io/bigquery.py
index 50c2eaf..5508eaa 100644
--- a/sdks/python/apache_beam/io/bigquery.py
+++ b/sdks/python/apache_beam/io/bigquery.py
@@ -136,6 +136,8 @@ __all__ = [
     'BigQuerySink',
     ]
 
+JSON_COMPLIANCE_ERROR = 'NAN, INF and -INF values are not JSON compliant.'
+
 
 class RowAsDictJsonCoder(coders.Coder):
   """A coder for a table row (represented as a dict) to/from a JSON string.
@@ -145,7 +147,14 @@ class RowAsDictJsonCoder(coders.Coder):
   """
 
   def encode(self, table_row):
-    return json.dumps(table_row)
+    # The normal error when dumping NAN/INF values is:
+    # ValueError: Out of range float values are not JSON compliant
+    # This code will catch this error to emit an error that explains
+    # to the programmer that they have used NAN/INF values.
+    try:
+      return json.dumps(table_row, allow_nan=False)
+    except ValueError as e:
+      raise ValueError('%s. %s' % (e, JSON_COMPLIANCE_ERROR))
 
   def decode(self, encoded_table_row):
     return json.loads(encoded_table_row)
@@ -173,10 +182,14 @@ class TableRowJsonCoder(coders.Coder):
       raise AttributeError(
           'The TableRowJsonCoder requires a table schema for '
           'encoding operations. Please specify a table_schema argument.')
-    return json.dumps(
-        collections.OrderedDict(
-            zip(self.field_names,
-                [from_json_value(f.v) for f in table_row.f])))
+    try:
+      return json.dumps(
+          collections.OrderedDict(
+              zip(self.field_names,
+                  [from_json_value(f.v) for f in table_row.f])),
+          allow_nan=False)
+    except ValueError as e:
+      raise ValueError('%s. %s' % (e, JSON_COMPLIANCE_ERROR))
 
   def decode(self, encoded_table_row):
     od = json.loads(
@@ -428,7 +441,6 @@ class BigQuerySink(iobase.NativeSink):
           fs['fields'] = schema_list_as_object(f.fields)
         fields.append(fs)
       return fields
-
     return json.dumps(
         {'fields': schema_list_as_object(self.table_schema.fields)})
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e173765a/sdks/python/apache_beam/io/bigquery_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/bigquery_test.py 
b/sdks/python/apache_beam/io/bigquery_test.py
index 2bca0dc..09e64ce 100644
--- a/sdks/python/apache_beam/io/bigquery_test.py
+++ b/sdks/python/apache_beam/io/bigquery_test.py
@@ -40,6 +40,22 @@ class TestRowAsDictJsonCoder(unittest.TestCase):
     test_value = {'s': 'abc', 'i': 123, 'f': 123.456, 'b': True}
     self.assertEqual(test_value, coder.decode(coder.encode(test_value)))
 
+  def json_compliance_exception(self, value):
+    with self.assertRaises(ValueError) as exn:
+      coder = RowAsDictJsonCoder()
+      test_value = {'s': value}
+      self.assertEqual(test_value, coder.decode(coder.encode(test_value)))
+      self.assertTrue(bigquery.JSON_COMPLIANCE_ERROR in exn.exception.message)
+
+  def test_invalid_json_nan(self):
+    self.json_compliance_exception(float('nan'))
+
+  def test_invalid_json_inf(self):
+    self.json_compliance_exception(float('inf'))
+
+  def test_invalid_json_neg_inf(self):
+    self.json_compliance_exception(float('-inf'))
+
 
 class TestTableRowJsonCoder(unittest.TestCase):
 
@@ -71,6 +87,27 @@ class TestTableRowJsonCoder(unittest.TestCase):
     self.assertTrue(
         ctx.exception.message.startswith('The TableRowJsonCoder requires'))
 
+  def json_compliance_exception(self, value):
+    with self.assertRaises(ValueError) as exn:
+      schema_definition = [('f', 'FLOAT')]
+      schema = bigquery.TableSchema(
+          fields=[bigquery.TableFieldSchema(name=k, type=v)
+                  for k, v in schema_definition])
+      coder = TableRowJsonCoder(table_schema=schema)
+      test_row = bigquery.TableRow(
+          f=[bigquery.TableCell(v=to_json_value(value))])
+      coder.encode(test_row)
+      self.assertTrue(bigquery.JSON_COMPLIANCE_ERROR in exn.exception.message)
+
+  def test_invalid_json_nan(self):
+    self.json_compliance_exception(float('nan'))
+
+  def test_invalid_json_inf(self):
+    self.json_compliance_exception(float('inf'))
+
+  def test_invalid_json_neg_inf(self):
+    self.json_compliance_exception(float('-inf'))
+
 
 class TestBigQuerySource(unittest.TestCase):
 

Reply via email to