spark git commit: [SPARK-15981][SQL][STREAMING] Fixed bug and added tests in DataStreamReader Python API
Repository: spark Updated Branches: refs/heads/branch-2.0 0a2291cd1 -> e11c27918 [SPARK-15981][SQL][STREAMING] Fixed bug and added tests in DataStreamReader Python API ## What changes were proposed in this pull request? - Fixed bug in Python API of DataStreamReader. Because a single path was being converted to a array before calling Java DataStreamReader method (which takes a string only), it gave the following error. ``` File "/Users/tdas/Projects/Spark/spark/python/pyspark/sql/readwriter.py", line 947, in pyspark.sql.readwriter.DataStreamReader.json Failed example: json_sdf = spark.readStream.json(os.path.join(tempfile.mkdtemp(), 'data'), schema = sdf_schema) Exception raised: Traceback (most recent call last): File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/doctest.py", line 1253, in __run compileflags, 1) in test.globs File "", line 1, in json_sdf = spark.readStream.json(os.path.join(tempfile.mkdtemp(), 'data'), schema = sdf_schema) File "/Users/tdas/Projects/Spark/spark/python/pyspark/sql/readwriter.py", line 963, in json return self._df(self._jreader.json(path)) File "/Users/tdas/Projects/Spark/spark/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", line 933, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/Users/tdas/Projects/Spark/spark/python/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/Users/tdas/Projects/Spark/spark/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", line 316, in get_return_value format(target_id, ".", name, value)) Py4JError: An error occurred while calling o121.json. Trace: py4j.Py4JException: Method json([class java.util.ArrayList]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) at py4j.Gateway.invoke(Gateway.java:272) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:211) at java.lang.Thread.run(Thread.java:744) ``` - Reduced code duplication between DataStreamReader and DataFrameWriter - Added missing Python doctests ## How was this patch tested? New tests Author: Tathagata Das Closes #13703 from tdas/SPARK-15981. (cherry picked from commit 084dca770f5c26f906e7555707c7894cf05fb86b) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e11c2791 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e11c2791 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e11c2791 Branch: refs/heads/branch-2.0 Commit: e11c279188b34d410f6ecf17cb1773c95f24a19e Parents: 0a2291c Author: Tathagata Das Authored: Thu Jun 16 13:17:41 2016 -0700 Committer: Shixiong Zhu Committed: Thu Jun 16 13:17:50 2016 -0700 -- python/pyspark/sql/readwriter.py | 258 ++ 1 file changed, 136 insertions(+), 122 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e11c2791/python/pyspark/sql/readwriter.py -- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index c982de6..72fd184 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -44,7 +44,82 @@ def to_str(value): return str(value) -class DataFrameReader(object): +class ReaderUtils(object): + +def _set_json_opts(self, schema, primitivesAsString, prefersDecimal, + allowComments, allowUnquotedFieldNames, allowSingleQuotes, + allowNumericLeadingZero, allowBackslashEscapingAnyCharacter, + mode, columnNameOfCorruptRecord): +""" +Set options based on the Json optional parameters +""" +if schema is not None: +self.schema(schema) +if primitivesAsString is not None: +self.option("primitivesAsString", primitivesAsString) +if prefersDecimal is not None: +self.option("prefersDecimal", prefersDecimal) +if allowComments is not None: +self.option("allowComments", allowComments) +if allowUnquotedFieldNames is not None: +self.option("allowUnquotedFieldNames", allowUnquotedFieldNames) +if allowSingleQuotes is not None: +self.option("allowSingleQuotes", allowSingleQuotes) +if allowNumericLeadingZero is not None: +self.option("allowNumericLeadingZero", allowNumericLeadingZero) +if allo
spark git commit: [SPARK-15981][SQL][STREAMING] Fixed bug and added tests in DataStreamReader Python API
Repository: spark Updated Branches: refs/heads/master a865f6e05 -> 084dca770 [SPARK-15981][SQL][STREAMING] Fixed bug and added tests in DataStreamReader Python API ## What changes were proposed in this pull request? - Fixed bug in Python API of DataStreamReader. Because a single path was being converted to a array before calling Java DataStreamReader method (which takes a string only), it gave the following error. ``` File "/Users/tdas/Projects/Spark/spark/python/pyspark/sql/readwriter.py", line 947, in pyspark.sql.readwriter.DataStreamReader.json Failed example: json_sdf = spark.readStream.json(os.path.join(tempfile.mkdtemp(), 'data'), schema = sdf_schema) Exception raised: Traceback (most recent call last): File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/doctest.py", line 1253, in __run compileflags, 1) in test.globs File "", line 1, in json_sdf = spark.readStream.json(os.path.join(tempfile.mkdtemp(), 'data'), schema = sdf_schema) File "/Users/tdas/Projects/Spark/spark/python/pyspark/sql/readwriter.py", line 963, in json return self._df(self._jreader.json(path)) File "/Users/tdas/Projects/Spark/spark/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", line 933, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/Users/tdas/Projects/Spark/spark/python/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/Users/tdas/Projects/Spark/spark/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", line 316, in get_return_value format(target_id, ".", name, value)) Py4JError: An error occurred while calling o121.json. Trace: py4j.Py4JException: Method json([class java.util.ArrayList]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) at py4j.Gateway.invoke(Gateway.java:272) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:211) at java.lang.Thread.run(Thread.java:744) ``` - Reduced code duplication between DataStreamReader and DataFrameWriter - Added missing Python doctests ## How was this patch tested? New tests Author: Tathagata Das Closes #13703 from tdas/SPARK-15981. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/084dca77 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/084dca77 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/084dca77 Branch: refs/heads/master Commit: 084dca770f5c26f906e7555707c7894cf05fb86b Parents: a865f6e Author: Tathagata Das Authored: Thu Jun 16 13:17:41 2016 -0700 Committer: Shixiong Zhu Committed: Thu Jun 16 13:17:41 2016 -0700 -- python/pyspark/sql/readwriter.py | 258 ++ 1 file changed, 136 insertions(+), 122 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/084dca77/python/pyspark/sql/readwriter.py -- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index c982de6..72fd184 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -44,7 +44,82 @@ def to_str(value): return str(value) -class DataFrameReader(object): +class ReaderUtils(object): + +def _set_json_opts(self, schema, primitivesAsString, prefersDecimal, + allowComments, allowUnquotedFieldNames, allowSingleQuotes, + allowNumericLeadingZero, allowBackslashEscapingAnyCharacter, + mode, columnNameOfCorruptRecord): +""" +Set options based on the Json optional parameters +""" +if schema is not None: +self.schema(schema) +if primitivesAsString is not None: +self.option("primitivesAsString", primitivesAsString) +if prefersDecimal is not None: +self.option("prefersDecimal", prefersDecimal) +if allowComments is not None: +self.option("allowComments", allowComments) +if allowUnquotedFieldNames is not None: +self.option("allowUnquotedFieldNames", allowUnquotedFieldNames) +if allowSingleQuotes is not None: +self.option("allowSingleQuotes", allowSingleQuotes) +if allowNumericLeadingZero is not None: +self.option("allowNumericLeadingZero", allowNumericLeadingZero) +if allowBackslashEscapingAnyCharacter is not None: +self.option("allowBackslashEscapingAnyCharacter",