spark git commit: [SPARK-15981][SQL][STREAMING] Fixed bug and added tests in DataStreamReader Python API

2016-06-16 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 0a2291cd1 -> e11c27918


[SPARK-15981][SQL][STREAMING] Fixed bug and added tests in DataStreamReader 
Python API

## What changes were proposed in this pull request?

- Fixed bug in Python API of DataStreamReader.  Because a single path was being 
converted to a array before calling Java DataStreamReader method (which takes a 
string only), it gave the following error.
```
File "/Users/tdas/Projects/Spark/spark/python/pyspark/sql/readwriter.py", line 
947, in pyspark.sql.readwriter.DataStreamReader.json
Failed example:
json_sdf = spark.readStream.json(os.path.join(tempfile.mkdtemp(), 'data'),  
   schema = sdf_schema)
Exception raised:
Traceback (most recent call last):
  File 
"/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/doctest.py",
 line 1253, in __run
compileflags, 1) in test.globs
  File "", line 1, 
in 
json_sdf = spark.readStream.json(os.path.join(tempfile.mkdtemp(), 
'data'), schema = sdf_schema)
  File "/Users/tdas/Projects/Spark/spark/python/pyspark/sql/readwriter.py", 
line 963, in json
return self._df(self._jreader.json(path))
  File 
"/Users/tdas/Projects/Spark/spark/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py",
 line 933, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File "/Users/tdas/Projects/Spark/spark/python/pyspark/sql/utils.py", line 
63, in deco
return f(*a, **kw)
  File 
"/Users/tdas/Projects/Spark/spark/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py",
 line 316, in get_return_value
format(target_id, ".", name, value))
Py4JError: An error occurred while calling o121.json. Trace:
py4j.Py4JException: Method json([class java.util.ArrayList]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:272)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:211)
at java.lang.Thread.run(Thread.java:744)
```

- Reduced code duplication between DataStreamReader and DataFrameWriter
- Added missing Python doctests

## How was this patch tested?
New tests

Author: Tathagata Das 

Closes #13703 from tdas/SPARK-15981.

(cherry picked from commit 084dca770f5c26f906e7555707c7894cf05fb86b)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e11c2791
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e11c2791
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e11c2791

Branch: refs/heads/branch-2.0
Commit: e11c279188b34d410f6ecf17cb1773c95f24a19e
Parents: 0a2291c
Author: Tathagata Das 
Authored: Thu Jun 16 13:17:41 2016 -0700
Committer: Shixiong Zhu 
Committed: Thu Jun 16 13:17:50 2016 -0700

--
 python/pyspark/sql/readwriter.py | 258 ++
 1 file changed, 136 insertions(+), 122 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e11c2791/python/pyspark/sql/readwriter.py
--
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index c982de6..72fd184 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -44,7 +44,82 @@ def to_str(value):
 return str(value)
 
 
-class DataFrameReader(object):
+class ReaderUtils(object):
+
+def _set_json_opts(self, schema, primitivesAsString, prefersDecimal,
+   allowComments, allowUnquotedFieldNames, 
allowSingleQuotes,
+   allowNumericLeadingZero, 
allowBackslashEscapingAnyCharacter,
+   mode, columnNameOfCorruptRecord):
+"""
+Set options based on the Json optional parameters
+"""
+if schema is not None:
+self.schema(schema)
+if primitivesAsString is not None:
+self.option("primitivesAsString", primitivesAsString)
+if prefersDecimal is not None:
+self.option("prefersDecimal", prefersDecimal)
+if allowComments is not None:
+self.option("allowComments", allowComments)
+if allowUnquotedFieldNames is not None:
+self.option("allowUnquotedFieldNames", allowUnquotedFieldNames)
+if allowSingleQuotes is not None:
+self.option("allowSingleQuotes", allowSingleQuotes)
+if allowNumericLeadingZero is not None:
+self.option("allowNumericLeadingZero", allowNumericLeadingZero)
+if allo

spark git commit: [SPARK-15981][SQL][STREAMING] Fixed bug and added tests in DataStreamReader Python API

2016-06-16 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master a865f6e05 -> 084dca770


[SPARK-15981][SQL][STREAMING] Fixed bug and added tests in DataStreamReader 
Python API

## What changes were proposed in this pull request?

- Fixed bug in Python API of DataStreamReader.  Because a single path was being 
converted to a array before calling Java DataStreamReader method (which takes a 
string only), it gave the following error.
```
File "/Users/tdas/Projects/Spark/spark/python/pyspark/sql/readwriter.py", line 
947, in pyspark.sql.readwriter.DataStreamReader.json
Failed example:
json_sdf = spark.readStream.json(os.path.join(tempfile.mkdtemp(), 'data'),  
   schema = sdf_schema)
Exception raised:
Traceback (most recent call last):
  File 
"/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/doctest.py",
 line 1253, in __run
compileflags, 1) in test.globs
  File "", line 1, 
in 
json_sdf = spark.readStream.json(os.path.join(tempfile.mkdtemp(), 
'data'), schema = sdf_schema)
  File "/Users/tdas/Projects/Spark/spark/python/pyspark/sql/readwriter.py", 
line 963, in json
return self._df(self._jreader.json(path))
  File 
"/Users/tdas/Projects/Spark/spark/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py",
 line 933, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File "/Users/tdas/Projects/Spark/spark/python/pyspark/sql/utils.py", line 
63, in deco
return f(*a, **kw)
  File 
"/Users/tdas/Projects/Spark/spark/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py",
 line 316, in get_return_value
format(target_id, ".", name, value))
Py4JError: An error occurred while calling o121.json. Trace:
py4j.Py4JException: Method json([class java.util.ArrayList]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:272)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:211)
at java.lang.Thread.run(Thread.java:744)
```

- Reduced code duplication between DataStreamReader and DataFrameWriter
- Added missing Python doctests

## How was this patch tested?
New tests

Author: Tathagata Das 

Closes #13703 from tdas/SPARK-15981.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/084dca77
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/084dca77
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/084dca77

Branch: refs/heads/master
Commit: 084dca770f5c26f906e7555707c7894cf05fb86b
Parents: a865f6e
Author: Tathagata Das 
Authored: Thu Jun 16 13:17:41 2016 -0700
Committer: Shixiong Zhu 
Committed: Thu Jun 16 13:17:41 2016 -0700

--
 python/pyspark/sql/readwriter.py | 258 ++
 1 file changed, 136 insertions(+), 122 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/084dca77/python/pyspark/sql/readwriter.py
--
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index c982de6..72fd184 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -44,7 +44,82 @@ def to_str(value):
 return str(value)
 
 
-class DataFrameReader(object):
+class ReaderUtils(object):
+
+def _set_json_opts(self, schema, primitivesAsString, prefersDecimal,
+   allowComments, allowUnquotedFieldNames, 
allowSingleQuotes,
+   allowNumericLeadingZero, 
allowBackslashEscapingAnyCharacter,
+   mode, columnNameOfCorruptRecord):
+"""
+Set options based on the Json optional parameters
+"""
+if schema is not None:
+self.schema(schema)
+if primitivesAsString is not None:
+self.option("primitivesAsString", primitivesAsString)
+if prefersDecimal is not None:
+self.option("prefersDecimal", prefersDecimal)
+if allowComments is not None:
+self.option("allowComments", allowComments)
+if allowUnquotedFieldNames is not None:
+self.option("allowUnquotedFieldNames", allowUnquotedFieldNames)
+if allowSingleQuotes is not None:
+self.option("allowSingleQuotes", allowSingleQuotes)
+if allowNumericLeadingZero is not None:
+self.option("allowNumericLeadingZero", allowNumericLeadingZero)
+if allowBackslashEscapingAnyCharacter is not None:
+self.option("allowBackslashEscapingAnyCharacter",