[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-13 Thread gatorsmile
Github user gatorsmile closed the pull request at:

https://github.com/apache/spark/pull/20217


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-11 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r161129504
  
--- Diff: python/pyspark/sql/context.py ---
@@ -203,18 +203,46 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = sqlContext.udf.register("stringLengthInt", lambda x: 
len(x), IntegerType())
 >>> sqlContext.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+"""
+return self.sparkSession.catalog.registerFunction(name, f, 
returnType)
+
+@ignore_unicode_prefix
+@since(2.3)
+def registerUDF(self, name, f):
--- End diff --

Ok. Sounds good.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-11 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r161126125
  
--- Diff: python/pyspark/sql/context.py ---
@@ -203,18 +203,48 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = sqlContext.udf.register("stringLengthInt", lambda x: 
len(x), IntegerType())
 >>> sqlContext.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+"""
+return self.sparkSession.catalog.registerFunction(name, f, 
returnType)
+
+@ignore_unicode_prefix
+@since(2.3)
+def registerUDF(self, name, f):
+"""Registers a :class:`UserDefinedFunction`. The registered UDF 
can be used in SQL
+statements.
+
+:param name: name of the UDF in SQL statements
+:param f: a wrapped/native UserDefinedFunction. The UDF can be 
either row-at-a-time or
+  scalar vectorized. For example, the object returned by 
udf or pandas_udf.
+  Grouped vectorized UDFs are not supported.
+:return: a wrapped :class:`UserDefinedFunction`
--- End diff --

nit: I think this can be `a wrapped/native UserDefinedFunction.`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r161124529
  
--- Diff: python/pyspark/sql/context.py ---
@@ -203,18 +203,46 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = sqlContext.udf.register("stringLengthInt", lambda x: 
len(x), IntegerType())
 >>> sqlContext.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+"""
+return self.sparkSession.catalog.registerFunction(name, f, 
returnType)
+
+@ignore_unicode_prefix
+@since(2.3)
+def registerUDF(self, name, f):
--- End diff --

We are deprecating sqlContext since Spark 2.0. The suggested calls should 
be `spark.udf.registerUDF` or `spark.catalog.registerUDF`. 

We can have a separate PR for cleaning all these method descriptions in 
sqlContext


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r160985677
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -255,26 +255,67 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = spark.udf.register("stringLengthInt", len, IntegerType())
 >>> spark.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+"""
+
+# This is to check whether the input function is a wrapped/native 
UserDefinedFunction
+if hasattr(f, 'asNondeterministic'):
+raise ValueError("Please use registerUDF for registering UDF. 
The expected function of "
+ "registerFunction is a Python function 
(including lambda function)")
+udf = UserDefinedFunction(f, returnType=returnType, name=name,
+  evalType=PythonEvalType.SQL_BATCHED_UDF)
+self._jsparkSession.udf().registerPython(name, udf._judf)
+return udf._wrapped()
+
+@ignore_unicode_prefix
+@since(2.3)
+def registerUDF(self, name, f):
+"""Registers a :class:`UserDefinedFunction`. The registered UDF 
can be used in SQL
+statement.
+
+:param name: name of the UDF
--- End diff --

After checking with our Scala API, I found the name is not changed. Let me 
update it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-11 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r160985290
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -255,26 +255,67 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = spark.udf.register("stringLengthInt", len, IntegerType())
 >>> spark.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+"""
+
+# This is to check whether the input function is a wrapped/native 
UserDefinedFunction
+if hasattr(f, 'asNondeterministic'):
+raise ValueError("Please use registerUDF for registering UDF. 
The expected function of "
+ "registerFunction is a Python function 
(including lambda function)")
+udf = UserDefinedFunction(f, returnType=returnType, name=name,
+  evalType=PythonEvalType.SQL_BATCHED_UDF)
+self._jsparkSession.udf().registerPython(name, udf._judf)
+return udf._wrapped()
+
+@ignore_unicode_prefix
+@since(2.3)
+def registerUDF(self, name, f):
+"""Registers a :class:`UserDefinedFunction`. The registered UDF 
can be used in SQL
+statement.
+
+:param name: name of the UDF
+:param f: a wrapped/native UserDefinedFunction. The UDF can be 
either row-at-a-time or
--- End diff --

SGTM.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r160984479
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -255,26 +255,67 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = spark.udf.register("stringLengthInt", len, IntegerType())
 >>> spark.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+"""
+
+# This is to check whether the input function is a wrapped/native 
UserDefinedFunction
+if hasattr(f, 'asNondeterministic'):
+raise ValueError("Please use registerUDF for registering UDF. 
The expected function of "
+ "registerFunction is a Python function 
(including lambda function)")
+udf = UserDefinedFunction(f, returnType=returnType, name=name,
+  evalType=PythonEvalType.SQL_BATCHED_UDF)
+self._jsparkSession.udf().registerPython(name, udf._judf)
+return udf._wrapped()
+
+@ignore_unicode_prefix
+@since(2.3)
+def registerUDF(self, name, f):
+"""Registers a :class:`UserDefinedFunction`. The registered UDF 
can be used in SQL
+statement.
+
+:param name: name of the UDF
+:param f: a wrapped/native UserDefinedFunction. The UDF can be 
either row-at-a-time or
--- End diff --

We can do it as a separate PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-11 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r160983255
  
--- Diff: python/pyspark/sql/context.py ---
@@ -203,18 +203,46 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = sqlContext.udf.register("stringLengthInt", lambda x: 
len(x), IntegerType())
 >>> sqlContext.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+"""
+return self.sparkSession.catalog.registerFunction(name, f, 
returnType)
+
+@ignore_unicode_prefix
+@since(2.3)
+def registerUDF(self, name, f):
--- End diff --

I am not sure about the difference between:
`spark.udf.registerUDF` `sqlContext.udf.registerUDF` and 
`sqlContext.registerUDF`

Seems too many ways to do the same thing...But if we indeed need to keep 
multiple methods, I would lean towards having comprehensive doc in one of them 
and have the doc for the rest to be something like 
"""
Same as :meth:...
"""


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-11 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r160981094
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -255,26 +255,67 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = spark.udf.register("stringLengthInt", len, IntegerType())
 >>> spark.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+"""
+
+# This is to check whether the input function is a wrapped/native 
UserDefinedFunction
+if hasattr(f, 'asNondeterministic'):
--- End diff --

I am fine with it as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-11 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r160980765
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -255,26 +255,67 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = spark.udf.register("stringLengthInt", len, IntegerType())
 >>> spark.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+"""
+
+# This is to check whether the input function is a wrapped/native 
UserDefinedFunction
+if hasattr(f, 'asNondeterministic'):
+raise ValueError("Please use registerUDF for registering UDF. 
The expected function of "
+ "registerFunction is a Python function 
(including lambda function)")
+udf = UserDefinedFunction(f, returnType=returnType, name=name,
+  evalType=PythonEvalType.SQL_BATCHED_UDF)
+self._jsparkSession.udf().registerPython(name, udf._judf)
+return udf._wrapped()
+
+@ignore_unicode_prefix
+@since(2.3)
+def registerUDF(self, name, f):
+"""Registers a :class:`UserDefinedFunction`. The registered UDF 
can be used in SQL
+statement.
+
+:param name: name of the UDF
--- End diff --

I also think changing the property (name) of the input udf object in 
`registerUDF` is unintuitive..


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-11 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r160980191
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -255,26 +255,67 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = spark.udf.register("stringLengthInt", len, IntegerType())
 >>> spark.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+"""
+
+# This is to check whether the input function is a wrapped/native 
UserDefinedFunction
+if hasattr(f, 'asNondeterministic'):
+raise ValueError("Please use registerUDF for registering UDF. 
The expected function of "
+ "registerFunction is a Python function 
(including lambda function)")
+udf = UserDefinedFunction(f, returnType=returnType, name=name,
+  evalType=PythonEvalType.SQL_BATCHED_UDF)
+self._jsparkSession.udf().registerPython(name, udf._judf)
+return udf._wrapped()
+
+@ignore_unicode_prefix
+@since(2.3)
+def registerUDF(self, name, f):
+"""Registers a :class:`UserDefinedFunction`. The registered UDF 
can be used in SQL
+statement.
+
+:param name: name of the UDF
+:param f: a wrapped/native UserDefinedFunction. The UDF can be 
either row-at-a-time or
--- End diff --

Yeah. I am fine with the doc here. But similar to 
https://github.com/apache/spark/pull/20217#discussion_r160979747 we should 
probably standardize the description.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-11 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r160979747
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -255,26 +255,67 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = spark.udf.register("stringLengthInt", len, IntegerType())
 >>> spark.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+"""
+
+# This is to check whether the input function is a wrapped/native 
UserDefinedFunction
+if hasattr(f, 'asNondeterministic'):
+raise ValueError("Please use registerUDF for registering UDF. 
The expected function of "
+ "registerFunction is a Python function 
(including lambda function)")
+udf = UserDefinedFunction(f, returnType=returnType, name=name,
+  evalType=PythonEvalType.SQL_BATCHED_UDF)
+self._jsparkSession.udf().registerPython(name, udf._judf)
+return udf._wrapped()
+
+@ignore_unicode_prefix
+@since(2.3)
+def registerUDF(self, name, f):
+"""Registers a :class:`UserDefinedFunction`. The registered UDF 
can be used in SQL
+statement.
+
+:param name: name of the UDF
+:param f: a wrapped/native UserDefinedFunction. The UDF can be 
either row-at-a-time or
+  scalar vectorized. Grouped vectorized UDFs are not 
supported.
--- End diff --

Ok. Maybe as a follow up we can standardize the language to describe the 
`udf` and `pandas_udf` objects.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-11 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r160960075
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -255,26 +255,67 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = spark.udf.register("stringLengthInt", len, IntegerType())
 >>> spark.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+"""
+
+# This is to check whether the input function is a wrapped/native 
UserDefinedFunction
+if hasattr(f, 'asNondeterministic'):
+raise ValueError("Please use registerUDF for registering UDF. 
The expected function of "
+ "registerFunction is a Python function 
(including lambda function)")
+udf = UserDefinedFunction(f, returnType=returnType, name=name,
+  evalType=PythonEvalType.SQL_BATCHED_UDF)
+self._jsparkSession.udf().registerPython(name, udf._judf)
+return udf._wrapped()
+
+@ignore_unicode_prefix
+@since(2.3)
+def registerUDF(self, name, f):
+"""Registers a :class:`UserDefinedFunction`. The registered UDF 
can be used in SQL
+statement.
+
+:param name: name of the UDF
--- End diff --

Because I thought it registers it to SQL statement with the given name and 
the defined UDF instance (`f`) shouldn't change its name itself at the register 
time.

Up to my knowledge, we happened to have `registerUDF` to avoid such problem 
in `returnType`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r160958943
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -255,26 +255,67 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = spark.udf.register("stringLengthInt", len, IntegerType())
 >>> spark.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+"""
+
+# This is to check whether the input function is a wrapped/native 
UserDefinedFunction
+if hasattr(f, 'asNondeterministic'):
+raise ValueError("Please use registerUDF for registering UDF. 
The expected function of "
+ "registerFunction is a Python function 
(including lambda function)")
+udf = UserDefinedFunction(f, returnType=returnType, name=name,
+  evalType=PythonEvalType.SQL_BATCHED_UDF)
+self._jsparkSession.udf().registerPython(name, udf._judf)
+return udf._wrapped()
+
+@ignore_unicode_prefix
+@since(2.3)
+def registerUDF(self, name, f):
+"""Registers a :class:`UserDefinedFunction`. The registered UDF 
can be used in SQL
+statement.
+
+:param name: name of the UDF
--- End diff --

Why?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-11 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r160953629
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -255,26 +255,67 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = spark.udf.register("stringLengthInt", len, IntegerType())
 >>> spark.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+"""
+
+# This is to check whether the input function is a wrapped/native 
UserDefinedFunction
+if hasattr(f, 'asNondeterministic'):
+raise ValueError("Please use registerUDF for registering UDF. 
The expected function of "
+ "registerFunction is a Python function 
(including lambda function)")
+udf = UserDefinedFunction(f, returnType=returnType, name=name,
+  evalType=PythonEvalType.SQL_BATCHED_UDF)
+self._jsparkSession.udf().registerPython(name, udf._judf)
+return udf._wrapped()
+
+@ignore_unicode_prefix
+@since(2.3)
+def registerUDF(self, name, f):
+"""Registers a :class:`UserDefinedFunction`. The registered UDF 
can be used in SQL
+statement.
+
+:param name: name of the UDF
--- End diff --

Hm .. actually I think we should not change the name of the returned UDF?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r160950622
  
--- Diff: python/pyspark/sql/context.py ---
@@ -578,6 +606,9 @@ def __init__(self, sqlContext):
 def register(self, name, f, returnType=StringType()):
 return self.sqlContext.registerFunction(name, f, returnType)
 
+def registerUDF(self, name, f):
--- End diff --

The examples in these docs are different. Thus, I prefer to keeping it 
untouched.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r160950521
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -255,26 +255,67 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = spark.udf.register("stringLengthInt", len, IntegerType())
 >>> spark.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+"""
+
+# This is to check whether the input function is a wrapped/native 
UserDefinedFunction
+if hasattr(f, 'asNondeterministic'):
+raise ValueError("Please use registerUDF for registering UDF. 
The expected function of "
+ "registerFunction is a Python function 
(including lambda function)")
+udf = UserDefinedFunction(f, returnType=returnType, name=name,
+  evalType=PythonEvalType.SQL_BATCHED_UDF)
+self._jsparkSession.udf().registerPython(name, udf._judf)
+return udf._wrapped()
+
+@ignore_unicode_prefix
+@since(2.3)
+def registerUDF(self, name, f):
+"""Registers a :class:`UserDefinedFunction`. The registered UDF 
can be used in SQL
+statement.
+
+:param name: name of the UDF
+:param f: a wrapped/native UserDefinedFunction. The UDF can be 
either row-at-a-time or
+  scalar vectorized. Grouped vectorized UDFs are not 
supported.
+:return: a wrapped :class:`UserDefinedFunction`
+
+>>> from pyspark.sql.types import IntegerType
+>>> from pyspark.sql.functions import udf
+>>> slen = udf(lambda s: len(s), IntegerType())
+>>> _ = spark.udf.registerUDF("slen", slen)
+>>> spark.sql("SELECT slen('test')").collect()
+[Row(slen(test)=4)]
 
 >>> import random
 >>> from pyspark.sql.functions import udf
->>> from pyspark.sql.types import IntegerType, StringType
+>>> from pyspark.sql.types import IntegerType
 >>> random_udf = udf(lambda: random.randint(0, 100), 
IntegerType()).asNondeterministic()
->>> newRandom_udf = spark.catalog.registerFunction("random_udf", 
random_udf, StringType())
+>>> newRandom_udf = spark.catalog.registerUDF("random_udf", 
random_udf)
 >>> spark.sql("SELECT random_udf()").collect()  # doctest: +SKIP
-[Row(random_udf()=u'82')]
+[Row(random_udf()=82)]
 >>> spark.range(1).select(newRandom_udf()).collect()  # doctest: 
+SKIP
-[Row(random_udf()=u'62')]
+[Row(random_udf()=62)]
+
+>>> from pyspark.sql.functions import pandas_udf, PandasUDFType
+>>> @pandas_udf("integer", PandasUDFType.SCALAR)  # doctest: +SKIP
+... def add_one(x):
+... return x + 1
+...
+>>> _ = spark.udf.registerUDF("add_one", add_one)  # doctest: +SKIP
+>>> spark.sql("SELECT add_one(id) FROM range(10)").collect()  # 
doctest: +SKIP
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r160950460
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -255,26 +255,67 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = spark.udf.register("stringLengthInt", len, IntegerType())
 >>> spark.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+"""
+
+# This is to check whether the input function is a wrapped/native 
UserDefinedFunction
+if hasattr(f, 'asNondeterministic'):
+raise ValueError("Please use registerUDF for registering UDF. 
The expected function of "
+ "registerFunction is a Python function 
(including lambda function)")
+udf = UserDefinedFunction(f, returnType=returnType, name=name,
+  evalType=PythonEvalType.SQL_BATCHED_UDF)
+self._jsparkSession.udf().registerPython(name, udf._judf)
+return udf._wrapped()
+
+@ignore_unicode_prefix
+@since(2.3)
+def registerUDF(self, name, f):
+"""Registers a :class:`UserDefinedFunction`. The registered UDF 
can be used in SQL
+statement.
+
+:param name: name of the UDF
+:param f: a wrapped/native UserDefinedFunction. The UDF can be 
either row-at-a-time or
+  scalar vectorized. Grouped vectorized UDFs are not 
supported.
--- End diff --

I think the following examples are self descriptive. I can emphasize it in 
the description too.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r160950226
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -255,26 +255,67 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = spark.udf.register("stringLengthInt", len, IntegerType())
 >>> spark.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+"""
+
+# This is to check whether the input function is a wrapped/native 
UserDefinedFunction
+if hasattr(f, 'asNondeterministic'):
+raise ValueError("Please use registerUDF for registering UDF. 
The expected function of "
+ "registerFunction is a Python function 
(including lambda function)")
+udf = UserDefinedFunction(f, returnType=returnType, name=name,
+  evalType=PythonEvalType.SQL_BATCHED_UDF)
+self._jsparkSession.udf().registerPython(name, udf._judf)
+return udf._wrapped()
+
+@ignore_unicode_prefix
+@since(2.3)
+def registerUDF(self, name, f):
+"""Registers a :class:`UserDefinedFunction`. The registered UDF 
can be used in SQL
+statement.
+
+:param name: name of the UDF
+:param f: a wrapped/native UserDefinedFunction. The UDF can be 
either row-at-a-time or
--- End diff --

> :param udf: A function object returned by 
:meth:`pyspark.sql.functions.pandas_udf`

This is actually not accurate. We have another way to define the scalar 
vectorized UDF.
```
>>> @pandas_udf("integer", PandasUDFType.SCALAR)  # doctest: +SKIP
... def add_one(x):
... return x + 1
...
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-11 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r160949917
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -255,26 +255,67 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = spark.udf.register("stringLengthInt", len, IntegerType())
 >>> spark.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+"""
+
+# This is to check whether the input function is a wrapped/native 
UserDefinedFunction
+if hasattr(f, 'asNondeterministic'):
--- End diff --

Nope, to be clear I am fine with it as is.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r160949520
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -255,26 +255,67 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = spark.udf.register("stringLengthInt", len, IntegerType())
 >>> spark.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+"""
+
+# This is to check whether the input function is a wrapped/native 
UserDefinedFunction
+if hasattr(f, 'asNondeterministic'):
+raise ValueError("Please use registerUDF for registering UDF. 
The expected function of "
+ "registerFunction is a Python function 
(including lambda function)")
+udf = UserDefinedFunction(f, returnType=returnType, name=name,
+  evalType=PythonEvalType.SQL_BATCHED_UDF)
+self._jsparkSession.udf().registerPython(name, udf._judf)
+return udf._wrapped()
+
+@ignore_unicode_prefix
+@since(2.3)
+def registerUDF(self, name, f):
+"""Registers a :class:`UserDefinedFunction`. The registered UDF 
can be used in SQL
+statement.
+
+:param name: name of the UDF
--- End diff --

Actually, `name of the UDF in SQL statement` is wrong. The name is also 
used as the name of returned UDF.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r160949342
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -255,26 +255,67 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = spark.udf.register("stringLengthInt", len, IntegerType())
 >>> spark.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+"""
+
+# This is to check whether the input function is a wrapped/native 
UserDefinedFunction
+if hasattr(f, 'asNondeterministic'):
--- End diff --

The current way is simple. Any hole?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r160948952
  
--- Diff: python/pyspark/sql/context.py ---
@@ -203,18 +203,46 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = sqlContext.udf.register("stringLengthInt", lambda x: 
len(x), IntegerType())
 >>> sqlContext.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+"""
+return self.sparkSession.catalog.registerFunction(name, f, 
returnType)
+
+@ignore_unicode_prefix
+@since(2.3)
+def registerUDF(self, name, f):
--- End diff --

The test cases are actually different. In this file, all the examples are 
using sqlContext.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-10 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r160877051
  
--- Diff: python/pyspark/sql/context.py ---
@@ -578,6 +606,9 @@ def __init__(self, sqlContext):
 def register(self, name, f, returnType=StringType()):
 return self.sqlContext.registerFunction(name, f, returnType)
 
+def registerUDF(self, name, f):
--- End diff --

Yup +1 like 
https://github.com/apache/spark/pull/20217/files/f25669a4b6c2298359df1b9083037468652cd141#r160861434
 but how about checking and doing this in batch? Seems we should fix the 
doctests too.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-10 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r160876638
  
--- Diff: python/pyspark/sql/context.py ---
@@ -203,18 +203,46 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = sqlContext.udf.register("stringLengthInt", lambda x: 
len(x), IntegerType())
 >>> sqlContext.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+"""
+return self.sparkSession.catalog.registerFunction(name, f, 
returnType)
+
+@ignore_unicode_prefix
+@since(2.3)
+def registerUDF(self, name, f):
--- End diff --

And also, seems we need to fix the examples too in this case .. `spark` -> 
`sqlContext`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-10 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r160873545
  
--- Diff: python/pyspark/sql/context.py ---
@@ -203,18 +203,46 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = sqlContext.udf.register("stringLengthInt", lambda x: 
len(x), IntegerType())
 >>> sqlContext.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+"""
+return self.sparkSession.catalog.registerFunction(name, f, 
returnType)
+
+@ignore_unicode_prefix
+@since(2.3)
+def registerUDF(self, name, f):
+"""Registers a :class:`UserDefinedFunction`. The registered UDF 
can be used in SQL
+statement.
+
+:param name: name of the UDF
+:param f: a wrapped/native UserDefinedFunction. The UDF can be 
either row-at-a-time or
+  scalar vectorized. Grouped vectorized UDFs are not 
supported.
+:return: a wrapped :class:`UserDefinedFunction`
+
+>>> from pyspark.sql.types import IntegerType
+>>> from pyspark.sql.functions import udf
+>>> slen = udf(lambda s: len(s), IntegerType())
+>>> _ = sqlContext.udf.registerUDF("slen", slen)
+>>> sqlContext.sql("SELECT slen('test')").collect()
+[Row(slen(test)=4)]
 
 >>> import random
 >>> from pyspark.sql.functions import udf
->>> from pyspark.sql.types import IntegerType, StringType
+>>> from pyspark.sql.types import IntegerType
 >>> random_udf = udf(lambda: random.randint(0, 100), 
IntegerType()).asNondeterministic()
->>> newRandom_udf = sqlContext.registerFunction("random_udf", 
random_udf, StringType())
+>>> newRandom_udf = sqlContext.registerUDF("random_udf", 
random_udf)
 >>> sqlContext.sql("SELECT random_udf()").collect()  # doctest: 
+SKIP
-[Row(random_udf()=u'82')]
+[Row(random_udf()=82)]
 >>> sqlContext.range(1).select(newRandom_udf()).collect()  # 
doctest: +SKIP
-[Row(random_udf()=u'62')]
+[Row(random_udf()=62)]
+
+>>> from pyspark.sql.functions import pandas_udf, PandasUDFType
+>>> @pandas_udf("integer", PandasUDFType.SCALAR)  # doctest: +SKIP
+... def add_one(x):
+... return x + 1
+...
+>>> _ = sqlContext.udf.registerUDF("add_one", add_one)  # doctest: 
+SKIP
+>>> sqlContext.sql("SELECT add_one(id) FROM range(10)").collect()  
# doctest: +SKIP
--- End diff --

ditto.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-10 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r160872774
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -255,26 +255,67 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = spark.udf.register("stringLengthInt", len, IntegerType())
 >>> spark.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+"""
+
+# This is to check whether the input function is a wrapped/native 
UserDefinedFunction
+if hasattr(f, 'asNondeterministic'):
+raise ValueError("Please use registerUDF for registering UDF. 
The expected function of "
+ "registerFunction is a Python function 
(including lambda function)")
+udf = UserDefinedFunction(f, returnType=returnType, name=name,
+  evalType=PythonEvalType.SQL_BATCHED_UDF)
+self._jsparkSession.udf().registerPython(name, udf._judf)
+return udf._wrapped()
+
+@ignore_unicode_prefix
+@since(2.3)
+def registerUDF(self, name, f):
+"""Registers a :class:`UserDefinedFunction`. The registered UDF 
can be used in SQL
+statement.
+
+:param name: name of the UDF
+:param f: a wrapped/native UserDefinedFunction. The UDF can be 
either row-at-a-time or
+  scalar vectorized. Grouped vectorized UDFs are not 
supported.
+:return: a wrapped :class:`UserDefinedFunction`
+
+>>> from pyspark.sql.types import IntegerType
+>>> from pyspark.sql.functions import udf
+>>> slen = udf(lambda s: len(s), IntegerType())
+>>> _ = spark.udf.registerUDF("slen", slen)
+>>> spark.sql("SELECT slen('test')").collect()
+[Row(slen(test)=4)]
 
 >>> import random
 >>> from pyspark.sql.functions import udf
->>> from pyspark.sql.types import IntegerType, StringType
+>>> from pyspark.sql.types import IntegerType
 >>> random_udf = udf(lambda: random.randint(0, 100), 
IntegerType()).asNondeterministic()
->>> newRandom_udf = spark.catalog.registerFunction("random_udf", 
random_udf, StringType())
+>>> newRandom_udf = spark.catalog.registerUDF("random_udf", 
random_udf)
 >>> spark.sql("SELECT random_udf()").collect()  # doctest: +SKIP
-[Row(random_udf()=u'82')]
+[Row(random_udf()=82)]
 >>> spark.range(1).select(newRandom_udf()).collect()  # doctest: 
+SKIP
-[Row(random_udf()=u'62')]
+[Row(random_udf()=62)]
+
+>>> from pyspark.sql.functions import pandas_udf, PandasUDFType
+>>> @pandas_udf("integer", PandasUDFType.SCALAR)  # doctest: +SKIP
+... def add_one(x):
+... return x + 1
+...
+>>> _ = spark.udf.registerUDF("add_one", add_one)  # doctest: +SKIP
+>>> spark.sql("SELECT add_one(id) FROM range(10)").collect()  # 
doctest: +SKIP
--- End diff --

Although this will be skipped, we should show the result like 
`[Row(add_one(id)=1), Row(add_one(id)=2), ...]`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-10 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r160873927
  
--- Diff: python/pyspark/sql/context.py ---
@@ -578,6 +606,9 @@ def __init__(self, sqlContext):
 def register(self, name, f, returnType=StringType()):
 return self.sqlContext.registerFunction(name, f, returnType)
 
+def registerUDF(self, name, f):
--- End diff --

Maybe we can add doc here by `registerUDF.__doc__ == 
SQLContext.registerUDF.__doc__` similar to doc for `register`.
(We should do it for `registerJavaFunction` and `registerJavaUDAF`, too?)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r160864450
  
--- Diff: python/pyspark/sql/context.py ---
@@ -203,18 +203,46 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = sqlContext.udf.register("stringLengthInt", lambda x: 
len(x), IntegerType())
 >>> sqlContext.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+"""
+return self.sparkSession.catalog.registerFunction(name, f, 
returnType)
+
+@ignore_unicode_prefix
+@since(2.3)
+def registerUDF(self, name, f):
--- End diff --

Ok. Sounds good to me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-10 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r160863048
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -4085,33 +4091,50 @@ def 
test_vectorized_udf_timestamps_respect_session_timezone(self):
 finally:
 self.spark.conf.set("spark.sql.session.timeZone", orig_tz)
 
-def test_nondeterministic_udf(self):
+def test_nondeterministic_vectorized_udf(self):
 # Test that nondeterministic UDFs are evaluated only once in 
chained UDF evaluations
 from pyspark.sql.functions import udf, pandas_udf, col
 
 @pandas_udf('double')
 def plus_ten(v):
 return v + 10
-random_udf = self.random_udf
+random_udf = self.nondeterministic_vectorized_udf
 
 df = self.spark.range(10).withColumn('rand', random_udf(col('id')))
 result1 = df.withColumn('plus_ten(rand)', 
plus_ten(df['rand'])).toPandas()
 
 self.assertEqual(random_udf.deterministic, False)
 self.assertTrue(result1['plus_ten(rand)'].equals(result1['rand'] + 
10))
 
-def test_nondeterministic_udf_in_aggregate(self):
+def test_nondeterministic_vectorized_udf_in_aggregate(self):
 from pyspark.sql.functions import pandas_udf, sum
 
 df = self.spark.range(10)
-random_udf = self.random_udf
+random_udf = self.nondeterministic_vectorized_udf
 
 with QuietTest(self.sc):
 with self.assertRaisesRegexp(AnalysisException, 
'nondeterministic'):
 df.groupby(df.id).agg(sum(random_udf(df.id))).collect()
 with self.assertRaisesRegexp(AnalysisException, 
'nondeterministic'):
 df.agg(sum(random_udf(df.id))).collect()
 
+def test_register_vectorized_udf_basic(self):
+from pyspark.rdd import PythonEvalType
+from pyspark.sql.functions import pandas_udf, col, expr
+df = self.spark.range(10).select(
+col('id').cast('int').alias('a'),
+col('id').cast('int').alias('b'))
+originalAdd = pandas_udf(lambda x, y: x + y, IntegerType())
--- End diff --

`originalAdd` ->`original_add`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-10 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r160863023
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -4085,33 +4091,50 @@ def 
test_vectorized_udf_timestamps_respect_session_timezone(self):
 finally:
 self.spark.conf.set("spark.sql.session.timeZone", orig_tz)
 
-def test_nondeterministic_udf(self):
+def test_nondeterministic_vectorized_udf(self):
 # Test that nondeterministic UDFs are evaluated only once in 
chained UDF evaluations
 from pyspark.sql.functions import udf, pandas_udf, col
 
 @pandas_udf('double')
 def plus_ten(v):
 return v + 10
-random_udf = self.random_udf
+random_udf = self.nondeterministic_vectorized_udf
 
 df = self.spark.range(10).withColumn('rand', random_udf(col('id')))
 result1 = df.withColumn('plus_ten(rand)', 
plus_ten(df['rand'])).toPandas()
 
 self.assertEqual(random_udf.deterministic, False)
 self.assertTrue(result1['plus_ten(rand)'].equals(result1['rand'] + 
10))
 
-def test_nondeterministic_udf_in_aggregate(self):
+def test_nondeterministic_vectorized_udf_in_aggregate(self):
 from pyspark.sql.functions import pandas_udf, sum
 
 df = self.spark.range(10)
-random_udf = self.random_udf
+random_udf = self.nondeterministic_vectorized_udf
 
 with QuietTest(self.sc):
 with self.assertRaisesRegexp(AnalysisException, 
'nondeterministic'):
 df.groupby(df.id).agg(sum(random_udf(df.id))).collect()
 with self.assertRaisesRegexp(AnalysisException, 
'nondeterministic'):
 df.agg(sum(random_udf(df.id))).collect()
 
+def test_register_vectorized_udf_basic(self):
+from pyspark.rdd import PythonEvalType
+from pyspark.sql.functions import pandas_udf, col, expr
+df = self.spark.range(10).select(
+col('id').cast('int').alias('a'),
+col('id').cast('int').alias('b'))
+originalAdd = pandas_udf(lambda x, y: x + y, IntegerType())
+self.assertEqual(originalAdd.deterministic, True)
+self.assertEqual(originalAdd.evalType, 
PythonEvalType.SQL_PANDAS_SCALAR_UDF)
+newAdd = self.spark.catalog.registerUDF("add1", originalAdd)
--- End diff --

`newAdd` -> `new_add`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-10 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r160862588
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -255,26 +255,67 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = spark.udf.register("stringLengthInt", len, IntegerType())
 >>> spark.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+"""
+
+# This is to check whether the input function is a wrapped/native 
UserDefinedFunction
+if hasattr(f, 'asNondeterministic'):
--- End diff --

BTW, wouldn't it break the compatibility?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-10 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r160862123
  
--- Diff: python/pyspark/sql/context.py ---
@@ -203,18 +203,46 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = sqlContext.udf.register("stringLengthInt", lambda x: 
len(x), IntegerType())
 >>> sqlContext.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+"""
+return self.sparkSession.catalog.registerFunction(name, f, 
returnType)
+
+@ignore_unicode_prefix
+@since(2.3)
+def registerUDF(self, name, f):
--- End diff --

I think that's ideally right. But let's keep it consistent with the 
convention used here. Let's discuss and try it separately and do it in batch.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-10 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r160861750
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -255,26 +255,67 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = spark.udf.register("stringLengthInt", len, IntegerType())
 >>> spark.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+"""
+
+# This is to check whether the input function is a wrapped/native 
UserDefinedFunction
+if hasattr(f, 'asNondeterministic'):
--- End diff --

Yea, but the wrapped UDF is also callable .. unfortunately :(.

I suggested this way for this reason:

in `udf.py`

```diff
+wrapper._unwrapped = lambda: self
 return wrapper
```

and then

```
if hasattr(f, "_unwrapped"):
f = f._unwrapped()
if isinstance(f, UserDefinedFunction):
...
else:
...
```

but it was no string opinion.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r160861434
  
--- Diff: python/pyspark/sql/context.py ---
@@ -203,18 +203,46 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = sqlContext.udf.register("stringLengthInt", lambda x: 
len(x), IntegerType())
 >>> sqlContext.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+"""
+return self.sparkSession.catalog.registerFunction(name, f, 
returnType)
+
+@ignore_unicode_prefix
+@since(2.3)
+def registerUDF(self, name, f):
--- End diff --

I prefer not to duplicate the doc string. Maybe we can put the docstring in 
the user facing API (I think it's the SQLContext one?) And reference the doc 
string in the other one.

Or, maybe we can do sth like this if we want both docstrings

```
@ignore_unicode_prefix
@since(2.3)
def registerUDF(self, name, f):
   return self.sparkSession.catalog.registerUDF(name, f)
registerUDF.__doc__ = pyspark.sql.catalog.Catalog.registerUDF.__doc__
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r160859938
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -4147,6 +4170,21 @@ def test_simple(self):
 expected = 
df.toPandas().groupby('id').apply(foo_udf.func).reset_index(drop=True)
 self.assertFramesEqual(expected, result)
 
+def test_registerGroupMapUDF(self):
--- End diff --

nit: test_register_group_map_udf


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r160858810
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -255,26 +255,67 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = spark.udf.register("stringLengthInt", len, IntegerType())
 >>> spark.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+"""
+
+# This is to check whether the input function is a wrapped/native 
UserDefinedFunction
+if hasattr(f, 'asNondeterministic'):
--- End diff --

It might be better to check `f` is (1) callable (2) not a UDF object


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r160858570
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -255,26 +255,67 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = spark.udf.register("stringLengthInt", len, IntegerType())
 >>> spark.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+"""
+
+# This is to check whether the input function is a wrapped/native 
UserDefinedFunction
+if hasattr(f, 'asNondeterministic'):
+raise ValueError("Please use registerUDF for registering UDF. 
The expected function of "
+ "registerFunction is a Python function 
(including lambda function)")
+udf = UserDefinedFunction(f, returnType=returnType, name=name,
+  evalType=PythonEvalType.SQL_BATCHED_UDF)
+self._jsparkSession.udf().registerPython(name, udf._judf)
+return udf._wrapped()
+
+@ignore_unicode_prefix
+@since(2.3)
+def registerUDF(self, name, f):
+"""Registers a :class:`UserDefinedFunction`. The registered UDF 
can be used in SQL
+statement.
+
+:param name: name of the UDF
+:param f: a wrapped/native UserDefinedFunction. The UDF can be 
either row-at-a-time or
+  scalar vectorized. Grouped vectorized UDFs are not 
supported.
+:return: a wrapped :class:`UserDefinedFunction`
+
+>>> from pyspark.sql.types import IntegerType
+>>> from pyspark.sql.functions import udf
+>>> slen = udf(lambda s: len(s), IntegerType())
+>>> _ = spark.udf.registerUDF("slen", slen)
+>>> spark.sql("SELECT slen('test')").collect()
+[Row(slen(test)=4)]
 
 >>> import random
 >>> from pyspark.sql.functions import udf
->>> from pyspark.sql.types import IntegerType, StringType
+>>> from pyspark.sql.types import IntegerType
 >>> random_udf = udf(lambda: random.randint(0, 100), 
IntegerType()).asNondeterministic()
->>> newRandom_udf = spark.catalog.registerFunction("random_udf", 
random_udf, StringType())
+>>> newRandom_udf = spark.catalog.registerUDF("random_udf", 
random_udf)
 >>> spark.sql("SELECT random_udf()").collect()  # doctest: +SKIP
-[Row(random_udf()=u'82')]
+[Row(random_udf()=82)]
 >>> spark.range(1).select(newRandom_udf()).collect()  # doctest: 
+SKIP
-[Row(random_udf()=u'62')]
+[Row(random_udf()=62)]
+
+>>> from pyspark.sql.functions import pandas_udf, PandasUDFType
+>>> @pandas_udf("integer", PandasUDFType.SCALAR)  # doctest: +SKIP
+... def add_one(x):
+... return x + 1
+...
+>>> _ = spark.udf.registerUDF("add_one", add_one)  # doctest: +SKIP
+>>> spark.sql("SELECT add_one(id) FROM range(10)").collect()  # 
doctest: +SKIP
 """
 
 # This is to check whether the input function is a wrapped/native 
UserDefinedFunction
 if hasattr(f, 'asNondeterministic'):
-udf = UserDefinedFunction(f.func, returnType=returnType, 
name=name,
-  
evalType=PythonEvalType.SQL_BATCHED_UDF,
+if f.evalType not in [PythonEvalType.SQL_BATCHED_UDF,
+  PythonEvalType.SQL_PANDAS_SCALAR_UDF]:
+raise ValueError(
+"Invalid f: f must be either SQL_BATCHED_UDF or 
SQL_PANDAS_SCALAR_UDF")
+udf = UserDefinedFunction(f.func, returnType=f.returnType, 
name=name,
+  evalType=f.evalType,
   deterministic=f.deterministic)
 else:
-udf = UserDefinedFunction(f, returnType=returnType, name=name,
-  
evalType=PythonEvalType.SQL_BATCHED_UDF)
+raise TypeError("Please use registerFunction for registering a 
Python function "
--- End diff --

+1. I think the user might not necessarily pass a python function here. 
Maybe a error message like "Invalid UDF: f must be a object returned by `udf` 
or `pandas_udf`"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r160858423
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -255,26 +255,67 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = spark.udf.register("stringLengthInt", len, IntegerType())
 >>> spark.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+"""
+
+# This is to check whether the input function is a wrapped/native 
UserDefinedFunction
+if hasattr(f, 'asNondeterministic'):
+raise ValueError("Please use registerUDF for registering UDF. 
The expected function of "
+ "registerFunction is a Python function 
(including lambda function)")
+udf = UserDefinedFunction(f, returnType=returnType, name=name,
+  evalType=PythonEvalType.SQL_BATCHED_UDF)
+self._jsparkSession.udf().registerPython(name, udf._judf)
+return udf._wrapped()
+
+@ignore_unicode_prefix
+@since(2.3)
+def registerUDF(self, name, f):
+"""Registers a :class:`UserDefinedFunction`. The registered UDF 
can be used in SQL
+statement.
+
+:param name: name of the UDF
+:param f: a wrapped/native UserDefinedFunction. The UDF can be 
either row-at-a-time or
+  scalar vectorized. Grouped vectorized UDFs are not 
supported.
--- End diff --

I would probably say something like 
"The UDF can be either returned by `udf` or `pandas_udf` with type `SCALAR`"
to be more specific


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r160858034
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -255,26 +255,67 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = spark.udf.register("stringLengthInt", len, IntegerType())
 >>> spark.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+"""
+
+# This is to check whether the input function is a wrapped/native 
UserDefinedFunction
+if hasattr(f, 'asNondeterministic'):
+raise ValueError("Please use registerUDF for registering UDF. 
The expected function of "
+ "registerFunction is a Python function 
(including lambda function)")
+udf = UserDefinedFunction(f, returnType=returnType, name=name,
+  evalType=PythonEvalType.SQL_BATCHED_UDF)
+self._jsparkSession.udf().registerPython(name, udf._judf)
+return udf._wrapped()
+
+@ignore_unicode_prefix
+@since(2.3)
+def registerUDF(self, name, f):
+"""Registers a :class:`UserDefinedFunction`. The registered UDF 
can be used in SQL
+statement.
+
+:param name: name of the UDF
--- End diff --

Maybe something like "name of the UDF in SQL statement" to be more specific.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r160857927
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -255,26 +255,67 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = spark.udf.register("stringLengthInt", len, IntegerType())
 >>> spark.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+"""
+
+# This is to check whether the input function is a wrapped/native 
UserDefinedFunction
+if hasattr(f, 'asNondeterministic'):
+raise ValueError("Please use registerUDF for registering UDF. 
The expected function of "
+ "registerFunction is a Python function 
(including lambda function)")
+udf = UserDefinedFunction(f, returnType=returnType, name=name,
+  evalType=PythonEvalType.SQL_BATCHED_UDF)
+self._jsparkSession.udf().registerPython(name, udf._judf)
+return udf._wrapped()
+
+@ignore_unicode_prefix
+@since(2.3)
+def registerUDF(self, name, f):
+"""Registers a :class:`UserDefinedFunction`. The registered UDF 
can be used in SQL
+statement.
+
+:param name: name of the UDF
+:param f: a wrapped/native UserDefinedFunction. The UDF can be 
either row-at-a-time or
--- End diff --

I don't have a strong opinion what to describe them, but I think it's easy 
for user to understand if the description is consistent.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r160857817
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -255,26 +255,67 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = spark.udf.register("stringLengthInt", len, IntegerType())
 >>> spark.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+"""
+
+# This is to check whether the input function is a wrapped/native 
UserDefinedFunction
+if hasattr(f, 'asNondeterministic'):
+raise ValueError("Please use registerUDF for registering UDF. 
The expected function of "
+ "registerFunction is a Python function 
(including lambda function)")
+udf = UserDefinedFunction(f, returnType=returnType, name=name,
+  evalType=PythonEvalType.SQL_BATCHED_UDF)
+self._jsparkSession.udf().registerPython(name, udf._judf)
+return udf._wrapped()
+
+@ignore_unicode_prefix
+@since(2.3)
+def registerUDF(self, name, f):
+"""Registers a :class:`UserDefinedFunction`. The registered UDF 
can be used in SQL
+statement.
+
+:param name: name of the UDF
+:param f: a wrapped/native UserDefinedFunction. The UDF can be 
either row-at-a-time or
--- End diff --

We should probably standardize the how to describe the wrapped function 
object returned by `udf` and `pandas_udf` in param docstring. Currently it's 
being described differently:

https://github.com/apache/spark/blob/master/python/pyspark/sql/group.py#L215


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-10 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20217#discussion_r160834039
  
--- Diff: python/pyspark/sql/catalog.py ---
@@ -255,26 +255,67 @@ def registerFunction(self, name, f, 
returnType=StringType()):
 >>> _ = spark.udf.register("stringLengthInt", len, IntegerType())
 >>> spark.sql("SELECT stringLengthInt('test')").collect()
 [Row(stringLengthInt(test)=4)]
+"""
+
+# This is to check whether the input function is a wrapped/native 
UserDefinedFunction
+if hasattr(f, 'asNondeterministic'):
+raise ValueError("Please use registerUDF for registering UDF. 
The expected function of "
+ "registerFunction is a Python function 
(including lambda function)")
+udf = UserDefinedFunction(f, returnType=returnType, name=name,
+  evalType=PythonEvalType.SQL_BATCHED_UDF)
+self._jsparkSession.udf().registerPython(name, udf._judf)
+return udf._wrapped()
+
+@ignore_unicode_prefix
+@since(2.3)
+def registerUDF(self, name, f):
+"""Registers a :class:`UserDefinedFunction`. The registered UDF 
can be used in SQL
+statement.
+
+:param name: name of the UDF
+:param f: a wrapped/native UserDefinedFunction. The UDF can be 
either row-at-a-time or
+  scalar vectorized. Grouped vectorized UDFs are not 
supported.
+:return: a wrapped :class:`UserDefinedFunction`
+
+>>> from pyspark.sql.types import IntegerType
+>>> from pyspark.sql.functions import udf
+>>> slen = udf(lambda s: len(s), IntegerType())
+>>> _ = spark.udf.registerUDF("slen", slen)
+>>> spark.sql("SELECT slen('test')").collect()
+[Row(slen(test)=4)]
 
 >>> import random
 >>> from pyspark.sql.functions import udf
->>> from pyspark.sql.types import IntegerType, StringType
+>>> from pyspark.sql.types import IntegerType
 >>> random_udf = udf(lambda: random.randint(0, 100), 
IntegerType()).asNondeterministic()
->>> newRandom_udf = spark.catalog.registerFunction("random_udf", 
random_udf, StringType())
+>>> newRandom_udf = spark.catalog.registerUDF("random_udf", 
random_udf)
 >>> spark.sql("SELECT random_udf()").collect()  # doctest: +SKIP
-[Row(random_udf()=u'82')]
+[Row(random_udf()=82)]
 >>> spark.range(1).select(newRandom_udf()).collect()  # doctest: 
+SKIP
-[Row(random_udf()=u'62')]
+[Row(random_udf()=62)]
+
+>>> from pyspark.sql.functions import pandas_udf, PandasUDFType
+>>> @pandas_udf("integer", PandasUDFType.SCALAR)  # doctest: +SKIP
+... def add_one(x):
+... return x + 1
+...
+>>> _ = spark.udf.registerUDF("add_one", add_one)  # doctest: +SKIP
+>>> spark.sql("SELECT add_one(id) FROM range(10)").collect()  # 
doctest: +SKIP
 """
 
 # This is to check whether the input function is a wrapped/native 
UserDefinedFunction
 if hasattr(f, 'asNondeterministic'):
-udf = UserDefinedFunction(f.func, returnType=returnType, 
name=name,
-  
evalType=PythonEvalType.SQL_BATCHED_UDF,
+if f.evalType not in [PythonEvalType.SQL_BATCHED_UDF,
+  PythonEvalType.SQL_PANDAS_SCALAR_UDF]:
+raise ValueError(
+"Invalid f: f must be either SQL_BATCHED_UDF or 
SQL_PANDAS_SCALAR_UDF")
+udf = UserDefinedFunction(f.func, returnType=f.returnType, 
name=name,
+  evalType=f.evalType,
   deterministic=f.deterministic)
 else:
-udf = UserDefinedFunction(f, returnType=returnType, name=name,
-  
evalType=PythonEvalType.SQL_BATCHED_UDF)
+raise TypeError("Please use registerFunction for registering a 
Python function "
--- End diff --

If users call `spark.udf.registerUDF`, this error message looks confusing 
to them because there is no `spark.udf. registerFunction`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...

2018-01-10 Thread gatorsmile
GitHub user gatorsmile opened a pull request:

https://github.com/apache/spark/pull/20217

[SPARK-23026] [PySpark] Add RegisterUDF to PySpark

## What changes were proposed in this pull request?
Add a new API for registering row-at-a-time or scalar vectorized UDFs. The 
registered UDFs can be used in the SQL statement. For example,

Add a new API for registering row-at-a-time or scalar vectorized UDFs. The 
registered UDFs can be used in the SQL statement.

```
>>> from pyspark.sql.types import IntegerType
>>> from pyspark.sql.functions import udf
>>> slen = udf(lambda s: len(s), IntegerType())
>>> _ = spark.udf.registerUDF("slen", slen)
>>> spark.sql("SELECT slen('test')").collect()
[Row(slen(test)=4)]

>>> import random
>>> from pyspark.sql.functions import udf
>>> from pyspark.sql.types import IntegerType
>>> random_udf = udf(lambda: random.randint(0, 100), 
IntegerType()).asNondeterministic()
>>> newRandom_udf = spark.catalog.registerUDF("random_udf", random_udf)
>>> spark.sql("SELECT random_udf()").collect()  
[Row(random_udf()=82)]
>>> spark.range(1).select(newRandom_udf()).collect()  
[Row(random_udf()=62)]

>>> from pyspark.sql.functions import pandas_udf, PandasUDFType
>>> @pandas_udf("integer", PandasUDFType.SCALAR)  
... def add_one(x):
... return x + 1
...
>>> _ = spark.udf.registerUDF("add_one", add_one)  
>>> spark.sql("SELECT add_one(id) FROM range(10)").collect()  
```
## How was this patch tested?
Added test cases

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gatorsmile/spark registerUDF

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20217.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20217


commit f25669a4b6c2298359df1b9083037468652cd141
Author: gatorsmile 
Date:   2018-01-10T10:24:08Z

fix




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org