[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user gatorsmile closed the pull request at: https://github.com/apache/spark/pull/20217 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r161129504 --- Diff: python/pyspark/sql/context.py --- @@ -203,18 +203,46 @@ def registerFunction(self, name, f, returnType=StringType()): >>> _ = sqlContext.udf.register("stringLengthInt", lambda x: len(x), IntegerType()) >>> sqlContext.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] +""" +return self.sparkSession.catalog.registerFunction(name, f, returnType) + +@ignore_unicode_prefix +@since(2.3) +def registerUDF(self, name, f): --- End diff -- Ok. Sounds good. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r161126125 --- Diff: python/pyspark/sql/context.py --- @@ -203,18 +203,48 @@ def registerFunction(self, name, f, returnType=StringType()): >>> _ = sqlContext.udf.register("stringLengthInt", lambda x: len(x), IntegerType()) >>> sqlContext.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] +""" +return self.sparkSession.catalog.registerFunction(name, f, returnType) + +@ignore_unicode_prefix +@since(2.3) +def registerUDF(self, name, f): +"""Registers a :class:`UserDefinedFunction`. The registered UDF can be used in SQL +statements. + +:param name: name of the UDF in SQL statements +:param f: a wrapped/native UserDefinedFunction. The UDF can be either row-at-a-time or + scalar vectorized. For example, the object returned by udf or pandas_udf. + Grouped vectorized UDFs are not supported. +:return: a wrapped :class:`UserDefinedFunction` --- End diff -- nit: I think this can be `a wrapped/native UserDefinedFunction.`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r161124529 --- Diff: python/pyspark/sql/context.py --- @@ -203,18 +203,46 @@ def registerFunction(self, name, f, returnType=StringType()): >>> _ = sqlContext.udf.register("stringLengthInt", lambda x: len(x), IntegerType()) >>> sqlContext.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] +""" +return self.sparkSession.catalog.registerFunction(name, f, returnType) + +@ignore_unicode_prefix +@since(2.3) +def registerUDF(self, name, f): --- End diff -- We are deprecating sqlContext since Spark 2.0. The suggested calls should be `spark.udf.registerUDF` or `spark.catalog.registerUDF`. We can have a separate PR for cleaning all these method descriptions in sqlContext --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r160985677 --- Diff: python/pyspark/sql/catalog.py --- @@ -255,26 +255,67 @@ def registerFunction(self, name, f, returnType=StringType()): >>> _ = spark.udf.register("stringLengthInt", len, IntegerType()) >>> spark.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] +""" + +# This is to check whether the input function is a wrapped/native UserDefinedFunction +if hasattr(f, 'asNondeterministic'): +raise ValueError("Please use registerUDF for registering UDF. The expected function of " + "registerFunction is a Python function (including lambda function)") +udf = UserDefinedFunction(f, returnType=returnType, name=name, + evalType=PythonEvalType.SQL_BATCHED_UDF) +self._jsparkSession.udf().registerPython(name, udf._judf) +return udf._wrapped() + +@ignore_unicode_prefix +@since(2.3) +def registerUDF(self, name, f): +"""Registers a :class:`UserDefinedFunction`. The registered UDF can be used in SQL +statement. + +:param name: name of the UDF --- End diff -- After checking with our Scala API, I found the name is not changed. Let me update it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r160985290 --- Diff: python/pyspark/sql/catalog.py --- @@ -255,26 +255,67 @@ def registerFunction(self, name, f, returnType=StringType()): >>> _ = spark.udf.register("stringLengthInt", len, IntegerType()) >>> spark.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] +""" + +# This is to check whether the input function is a wrapped/native UserDefinedFunction +if hasattr(f, 'asNondeterministic'): +raise ValueError("Please use registerUDF for registering UDF. The expected function of " + "registerFunction is a Python function (including lambda function)") +udf = UserDefinedFunction(f, returnType=returnType, name=name, + evalType=PythonEvalType.SQL_BATCHED_UDF) +self._jsparkSession.udf().registerPython(name, udf._judf) +return udf._wrapped() + +@ignore_unicode_prefix +@since(2.3) +def registerUDF(self, name, f): +"""Registers a :class:`UserDefinedFunction`. The registered UDF can be used in SQL +statement. + +:param name: name of the UDF +:param f: a wrapped/native UserDefinedFunction. The UDF can be either row-at-a-time or --- End diff -- SGTM. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r160984479 --- Diff: python/pyspark/sql/catalog.py --- @@ -255,26 +255,67 @@ def registerFunction(self, name, f, returnType=StringType()): >>> _ = spark.udf.register("stringLengthInt", len, IntegerType()) >>> spark.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] +""" + +# This is to check whether the input function is a wrapped/native UserDefinedFunction +if hasattr(f, 'asNondeterministic'): +raise ValueError("Please use registerUDF for registering UDF. The expected function of " + "registerFunction is a Python function (including lambda function)") +udf = UserDefinedFunction(f, returnType=returnType, name=name, + evalType=PythonEvalType.SQL_BATCHED_UDF) +self._jsparkSession.udf().registerPython(name, udf._judf) +return udf._wrapped() + +@ignore_unicode_prefix +@since(2.3) +def registerUDF(self, name, f): +"""Registers a :class:`UserDefinedFunction`. The registered UDF can be used in SQL +statement. + +:param name: name of the UDF +:param f: a wrapped/native UserDefinedFunction. The UDF can be either row-at-a-time or --- End diff -- We can do it as a separate PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r160983255 --- Diff: python/pyspark/sql/context.py --- @@ -203,18 +203,46 @@ def registerFunction(self, name, f, returnType=StringType()): >>> _ = sqlContext.udf.register("stringLengthInt", lambda x: len(x), IntegerType()) >>> sqlContext.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] +""" +return self.sparkSession.catalog.registerFunction(name, f, returnType) + +@ignore_unicode_prefix +@since(2.3) +def registerUDF(self, name, f): --- End diff -- I am not sure about the difference between: `spark.udf.registerUDF` `sqlContext.udf.registerUDF` and `sqlContext.registerUDF` Seems too many ways to do the same thing...But if we indeed need to keep multiple methods, I would lean towards having comprehensive doc in one of them and have the doc for the rest to be something like """ Same as :meth:... """ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r160981094 --- Diff: python/pyspark/sql/catalog.py --- @@ -255,26 +255,67 @@ def registerFunction(self, name, f, returnType=StringType()): >>> _ = spark.udf.register("stringLengthInt", len, IntegerType()) >>> spark.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] +""" + +# This is to check whether the input function is a wrapped/native UserDefinedFunction +if hasattr(f, 'asNondeterministic'): --- End diff -- I am fine with it as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r160980765 --- Diff: python/pyspark/sql/catalog.py --- @@ -255,26 +255,67 @@ def registerFunction(self, name, f, returnType=StringType()): >>> _ = spark.udf.register("stringLengthInt", len, IntegerType()) >>> spark.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] +""" + +# This is to check whether the input function is a wrapped/native UserDefinedFunction +if hasattr(f, 'asNondeterministic'): +raise ValueError("Please use registerUDF for registering UDF. The expected function of " + "registerFunction is a Python function (including lambda function)") +udf = UserDefinedFunction(f, returnType=returnType, name=name, + evalType=PythonEvalType.SQL_BATCHED_UDF) +self._jsparkSession.udf().registerPython(name, udf._judf) +return udf._wrapped() + +@ignore_unicode_prefix +@since(2.3) +def registerUDF(self, name, f): +"""Registers a :class:`UserDefinedFunction`. The registered UDF can be used in SQL +statement. + +:param name: name of the UDF --- End diff -- I also think changing the property (name) of the input udf object in `registerUDF` is unintuitive.. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r160980191 --- Diff: python/pyspark/sql/catalog.py --- @@ -255,26 +255,67 @@ def registerFunction(self, name, f, returnType=StringType()): >>> _ = spark.udf.register("stringLengthInt", len, IntegerType()) >>> spark.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] +""" + +# This is to check whether the input function is a wrapped/native UserDefinedFunction +if hasattr(f, 'asNondeterministic'): +raise ValueError("Please use registerUDF for registering UDF. The expected function of " + "registerFunction is a Python function (including lambda function)") +udf = UserDefinedFunction(f, returnType=returnType, name=name, + evalType=PythonEvalType.SQL_BATCHED_UDF) +self._jsparkSession.udf().registerPython(name, udf._judf) +return udf._wrapped() + +@ignore_unicode_prefix +@since(2.3) +def registerUDF(self, name, f): +"""Registers a :class:`UserDefinedFunction`. The registered UDF can be used in SQL +statement. + +:param name: name of the UDF +:param f: a wrapped/native UserDefinedFunction. The UDF can be either row-at-a-time or --- End diff -- Yeah. I am fine with the doc here. But similar to https://github.com/apache/spark/pull/20217#discussion_r160979747 we should probably standardize the description. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r160979747 --- Diff: python/pyspark/sql/catalog.py --- @@ -255,26 +255,67 @@ def registerFunction(self, name, f, returnType=StringType()): >>> _ = spark.udf.register("stringLengthInt", len, IntegerType()) >>> spark.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] +""" + +# This is to check whether the input function is a wrapped/native UserDefinedFunction +if hasattr(f, 'asNondeterministic'): +raise ValueError("Please use registerUDF for registering UDF. The expected function of " + "registerFunction is a Python function (including lambda function)") +udf = UserDefinedFunction(f, returnType=returnType, name=name, + evalType=PythonEvalType.SQL_BATCHED_UDF) +self._jsparkSession.udf().registerPython(name, udf._judf) +return udf._wrapped() + +@ignore_unicode_prefix +@since(2.3) +def registerUDF(self, name, f): +"""Registers a :class:`UserDefinedFunction`. The registered UDF can be used in SQL +statement. + +:param name: name of the UDF +:param f: a wrapped/native UserDefinedFunction. The UDF can be either row-at-a-time or + scalar vectorized. Grouped vectorized UDFs are not supported. --- End diff -- Ok. Maybe as a follow up we can standardize the language to describe the `udf` and `pandas_udf` objects. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r160960075 --- Diff: python/pyspark/sql/catalog.py --- @@ -255,26 +255,67 @@ def registerFunction(self, name, f, returnType=StringType()): >>> _ = spark.udf.register("stringLengthInt", len, IntegerType()) >>> spark.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] +""" + +# This is to check whether the input function is a wrapped/native UserDefinedFunction +if hasattr(f, 'asNondeterministic'): +raise ValueError("Please use registerUDF for registering UDF. The expected function of " + "registerFunction is a Python function (including lambda function)") +udf = UserDefinedFunction(f, returnType=returnType, name=name, + evalType=PythonEvalType.SQL_BATCHED_UDF) +self._jsparkSession.udf().registerPython(name, udf._judf) +return udf._wrapped() + +@ignore_unicode_prefix +@since(2.3) +def registerUDF(self, name, f): +"""Registers a :class:`UserDefinedFunction`. The registered UDF can be used in SQL +statement. + +:param name: name of the UDF --- End diff -- Because I thought it registers it to SQL statement with the given name and the defined UDF instance (`f`) shouldn't change its name itself at the register time. Up to my knowledge, we happened to have `registerUDF` to avoid such problem in `returnType`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r160958943 --- Diff: python/pyspark/sql/catalog.py --- @@ -255,26 +255,67 @@ def registerFunction(self, name, f, returnType=StringType()): >>> _ = spark.udf.register("stringLengthInt", len, IntegerType()) >>> spark.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] +""" + +# This is to check whether the input function is a wrapped/native UserDefinedFunction +if hasattr(f, 'asNondeterministic'): +raise ValueError("Please use registerUDF for registering UDF. The expected function of " + "registerFunction is a Python function (including lambda function)") +udf = UserDefinedFunction(f, returnType=returnType, name=name, + evalType=PythonEvalType.SQL_BATCHED_UDF) +self._jsparkSession.udf().registerPython(name, udf._judf) +return udf._wrapped() + +@ignore_unicode_prefix +@since(2.3) +def registerUDF(self, name, f): +"""Registers a :class:`UserDefinedFunction`. The registered UDF can be used in SQL +statement. + +:param name: name of the UDF --- End diff -- Why? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r160953629 --- Diff: python/pyspark/sql/catalog.py --- @@ -255,26 +255,67 @@ def registerFunction(self, name, f, returnType=StringType()): >>> _ = spark.udf.register("stringLengthInt", len, IntegerType()) >>> spark.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] +""" + +# This is to check whether the input function is a wrapped/native UserDefinedFunction +if hasattr(f, 'asNondeterministic'): +raise ValueError("Please use registerUDF for registering UDF. The expected function of " + "registerFunction is a Python function (including lambda function)") +udf = UserDefinedFunction(f, returnType=returnType, name=name, + evalType=PythonEvalType.SQL_BATCHED_UDF) +self._jsparkSession.udf().registerPython(name, udf._judf) +return udf._wrapped() + +@ignore_unicode_prefix +@since(2.3) +def registerUDF(self, name, f): +"""Registers a :class:`UserDefinedFunction`. The registered UDF can be used in SQL +statement. + +:param name: name of the UDF --- End diff -- Hm .. actually I think we should not change the name of the returned UDF? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r160950622 --- Diff: python/pyspark/sql/context.py --- @@ -578,6 +606,9 @@ def __init__(self, sqlContext): def register(self, name, f, returnType=StringType()): return self.sqlContext.registerFunction(name, f, returnType) +def registerUDF(self, name, f): --- End diff -- The examples in these docs are different. Thus, I prefer to keeping it untouched. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r160950521 --- Diff: python/pyspark/sql/catalog.py --- @@ -255,26 +255,67 @@ def registerFunction(self, name, f, returnType=StringType()): >>> _ = spark.udf.register("stringLengthInt", len, IntegerType()) >>> spark.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] +""" + +# This is to check whether the input function is a wrapped/native UserDefinedFunction +if hasattr(f, 'asNondeterministic'): +raise ValueError("Please use registerUDF for registering UDF. The expected function of " + "registerFunction is a Python function (including lambda function)") +udf = UserDefinedFunction(f, returnType=returnType, name=name, + evalType=PythonEvalType.SQL_BATCHED_UDF) +self._jsparkSession.udf().registerPython(name, udf._judf) +return udf._wrapped() + +@ignore_unicode_prefix +@since(2.3) +def registerUDF(self, name, f): +"""Registers a :class:`UserDefinedFunction`. The registered UDF can be used in SQL +statement. + +:param name: name of the UDF +:param f: a wrapped/native UserDefinedFunction. The UDF can be either row-at-a-time or + scalar vectorized. Grouped vectorized UDFs are not supported. +:return: a wrapped :class:`UserDefinedFunction` + +>>> from pyspark.sql.types import IntegerType +>>> from pyspark.sql.functions import udf +>>> slen = udf(lambda s: len(s), IntegerType()) +>>> _ = spark.udf.registerUDF("slen", slen) +>>> spark.sql("SELECT slen('test')").collect() +[Row(slen(test)=4)] >>> import random >>> from pyspark.sql.functions import udf ->>> from pyspark.sql.types import IntegerType, StringType +>>> from pyspark.sql.types import IntegerType >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic() ->>> newRandom_udf = spark.catalog.registerFunction("random_udf", random_udf, StringType()) +>>> newRandom_udf = spark.catalog.registerUDF("random_udf", random_udf) >>> spark.sql("SELECT random_udf()").collect() # doctest: +SKIP -[Row(random_udf()=u'82')] +[Row(random_udf()=82)] >>> spark.range(1).select(newRandom_udf()).collect() # doctest: +SKIP -[Row(random_udf()=u'62')] +[Row(random_udf()=62)] + +>>> from pyspark.sql.functions import pandas_udf, PandasUDFType +>>> @pandas_udf("integer", PandasUDFType.SCALAR) # doctest: +SKIP +... def add_one(x): +... return x + 1 +... +>>> _ = spark.udf.registerUDF("add_one", add_one) # doctest: +SKIP +>>> spark.sql("SELECT add_one(id) FROM range(10)").collect() # doctest: +SKIP --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r160950460 --- Diff: python/pyspark/sql/catalog.py --- @@ -255,26 +255,67 @@ def registerFunction(self, name, f, returnType=StringType()): >>> _ = spark.udf.register("stringLengthInt", len, IntegerType()) >>> spark.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] +""" + +# This is to check whether the input function is a wrapped/native UserDefinedFunction +if hasattr(f, 'asNondeterministic'): +raise ValueError("Please use registerUDF for registering UDF. The expected function of " + "registerFunction is a Python function (including lambda function)") +udf = UserDefinedFunction(f, returnType=returnType, name=name, + evalType=PythonEvalType.SQL_BATCHED_UDF) +self._jsparkSession.udf().registerPython(name, udf._judf) +return udf._wrapped() + +@ignore_unicode_prefix +@since(2.3) +def registerUDF(self, name, f): +"""Registers a :class:`UserDefinedFunction`. The registered UDF can be used in SQL +statement. + +:param name: name of the UDF +:param f: a wrapped/native UserDefinedFunction. The UDF can be either row-at-a-time or + scalar vectorized. Grouped vectorized UDFs are not supported. --- End diff -- I think the following examples are self descriptive. I can emphasize it in the description too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r160950226 --- Diff: python/pyspark/sql/catalog.py --- @@ -255,26 +255,67 @@ def registerFunction(self, name, f, returnType=StringType()): >>> _ = spark.udf.register("stringLengthInt", len, IntegerType()) >>> spark.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] +""" + +# This is to check whether the input function is a wrapped/native UserDefinedFunction +if hasattr(f, 'asNondeterministic'): +raise ValueError("Please use registerUDF for registering UDF. The expected function of " + "registerFunction is a Python function (including lambda function)") +udf = UserDefinedFunction(f, returnType=returnType, name=name, + evalType=PythonEvalType.SQL_BATCHED_UDF) +self._jsparkSession.udf().registerPython(name, udf._judf) +return udf._wrapped() + +@ignore_unicode_prefix +@since(2.3) +def registerUDF(self, name, f): +"""Registers a :class:`UserDefinedFunction`. The registered UDF can be used in SQL +statement. + +:param name: name of the UDF +:param f: a wrapped/native UserDefinedFunction. The UDF can be either row-at-a-time or --- End diff -- > :param udf: A function object returned by :meth:`pyspark.sql.functions.pandas_udf` This is actually not accurate. We have another way to define the scalar vectorized UDF. ``` >>> @pandas_udf("integer", PandasUDFType.SCALAR) # doctest: +SKIP ... def add_one(x): ... return x + 1 ... ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r160949917 --- Diff: python/pyspark/sql/catalog.py --- @@ -255,26 +255,67 @@ def registerFunction(self, name, f, returnType=StringType()): >>> _ = spark.udf.register("stringLengthInt", len, IntegerType()) >>> spark.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] +""" + +# This is to check whether the input function is a wrapped/native UserDefinedFunction +if hasattr(f, 'asNondeterministic'): --- End diff -- Nope, to be clear I am fine with it as is. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r160949520 --- Diff: python/pyspark/sql/catalog.py --- @@ -255,26 +255,67 @@ def registerFunction(self, name, f, returnType=StringType()): >>> _ = spark.udf.register("stringLengthInt", len, IntegerType()) >>> spark.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] +""" + +# This is to check whether the input function is a wrapped/native UserDefinedFunction +if hasattr(f, 'asNondeterministic'): +raise ValueError("Please use registerUDF for registering UDF. The expected function of " + "registerFunction is a Python function (including lambda function)") +udf = UserDefinedFunction(f, returnType=returnType, name=name, + evalType=PythonEvalType.SQL_BATCHED_UDF) +self._jsparkSession.udf().registerPython(name, udf._judf) +return udf._wrapped() + +@ignore_unicode_prefix +@since(2.3) +def registerUDF(self, name, f): +"""Registers a :class:`UserDefinedFunction`. The registered UDF can be used in SQL +statement. + +:param name: name of the UDF --- End diff -- Actually, `name of the UDF in SQL statement` is wrong. The name is also used as the name of returned UDF. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r160949342 --- Diff: python/pyspark/sql/catalog.py --- @@ -255,26 +255,67 @@ def registerFunction(self, name, f, returnType=StringType()): >>> _ = spark.udf.register("stringLengthInt", len, IntegerType()) >>> spark.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] +""" + +# This is to check whether the input function is a wrapped/native UserDefinedFunction +if hasattr(f, 'asNondeterministic'): --- End diff -- The current way is simple. Any hole? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r160948952 --- Diff: python/pyspark/sql/context.py --- @@ -203,18 +203,46 @@ def registerFunction(self, name, f, returnType=StringType()): >>> _ = sqlContext.udf.register("stringLengthInt", lambda x: len(x), IntegerType()) >>> sqlContext.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] +""" +return self.sparkSession.catalog.registerFunction(name, f, returnType) + +@ignore_unicode_prefix +@since(2.3) +def registerUDF(self, name, f): --- End diff -- The test cases are actually different. In this file, all the examples are using sqlContext. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r160877051 --- Diff: python/pyspark/sql/context.py --- @@ -578,6 +606,9 @@ def __init__(self, sqlContext): def register(self, name, f, returnType=StringType()): return self.sqlContext.registerFunction(name, f, returnType) +def registerUDF(self, name, f): --- End diff -- Yup +1 like https://github.com/apache/spark/pull/20217/files/f25669a4b6c2298359df1b9083037468652cd141#r160861434 but how about checking and doing this in batch? Seems we should fix the doctests too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r160876638 --- Diff: python/pyspark/sql/context.py --- @@ -203,18 +203,46 @@ def registerFunction(self, name, f, returnType=StringType()): >>> _ = sqlContext.udf.register("stringLengthInt", lambda x: len(x), IntegerType()) >>> sqlContext.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] +""" +return self.sparkSession.catalog.registerFunction(name, f, returnType) + +@ignore_unicode_prefix +@since(2.3) +def registerUDF(self, name, f): --- End diff -- And also, seems we need to fix the examples too in this case .. `spark` -> `sqlContext`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r160873545 --- Diff: python/pyspark/sql/context.py --- @@ -203,18 +203,46 @@ def registerFunction(self, name, f, returnType=StringType()): >>> _ = sqlContext.udf.register("stringLengthInt", lambda x: len(x), IntegerType()) >>> sqlContext.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] +""" +return self.sparkSession.catalog.registerFunction(name, f, returnType) + +@ignore_unicode_prefix +@since(2.3) +def registerUDF(self, name, f): +"""Registers a :class:`UserDefinedFunction`. The registered UDF can be used in SQL +statement. + +:param name: name of the UDF +:param f: a wrapped/native UserDefinedFunction. The UDF can be either row-at-a-time or + scalar vectorized. Grouped vectorized UDFs are not supported. +:return: a wrapped :class:`UserDefinedFunction` + +>>> from pyspark.sql.types import IntegerType +>>> from pyspark.sql.functions import udf +>>> slen = udf(lambda s: len(s), IntegerType()) +>>> _ = sqlContext.udf.registerUDF("slen", slen) +>>> sqlContext.sql("SELECT slen('test')").collect() +[Row(slen(test)=4)] >>> import random >>> from pyspark.sql.functions import udf ->>> from pyspark.sql.types import IntegerType, StringType +>>> from pyspark.sql.types import IntegerType >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic() ->>> newRandom_udf = sqlContext.registerFunction("random_udf", random_udf, StringType()) +>>> newRandom_udf = sqlContext.registerUDF("random_udf", random_udf) >>> sqlContext.sql("SELECT random_udf()").collect() # doctest: +SKIP -[Row(random_udf()=u'82')] +[Row(random_udf()=82)] >>> sqlContext.range(1).select(newRandom_udf()).collect() # doctest: +SKIP -[Row(random_udf()=u'62')] +[Row(random_udf()=62)] + +>>> from pyspark.sql.functions import pandas_udf, PandasUDFType +>>> @pandas_udf("integer", PandasUDFType.SCALAR) # doctest: +SKIP +... def add_one(x): +... return x + 1 +... +>>> _ = sqlContext.udf.registerUDF("add_one", add_one) # doctest: +SKIP +>>> sqlContext.sql("SELECT add_one(id) FROM range(10)").collect() # doctest: +SKIP --- End diff -- ditto. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r160872774 --- Diff: python/pyspark/sql/catalog.py --- @@ -255,26 +255,67 @@ def registerFunction(self, name, f, returnType=StringType()): >>> _ = spark.udf.register("stringLengthInt", len, IntegerType()) >>> spark.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] +""" + +# This is to check whether the input function is a wrapped/native UserDefinedFunction +if hasattr(f, 'asNondeterministic'): +raise ValueError("Please use registerUDF for registering UDF. The expected function of " + "registerFunction is a Python function (including lambda function)") +udf = UserDefinedFunction(f, returnType=returnType, name=name, + evalType=PythonEvalType.SQL_BATCHED_UDF) +self._jsparkSession.udf().registerPython(name, udf._judf) +return udf._wrapped() + +@ignore_unicode_prefix +@since(2.3) +def registerUDF(self, name, f): +"""Registers a :class:`UserDefinedFunction`. The registered UDF can be used in SQL +statement. + +:param name: name of the UDF +:param f: a wrapped/native UserDefinedFunction. The UDF can be either row-at-a-time or + scalar vectorized. Grouped vectorized UDFs are not supported. +:return: a wrapped :class:`UserDefinedFunction` + +>>> from pyspark.sql.types import IntegerType +>>> from pyspark.sql.functions import udf +>>> slen = udf(lambda s: len(s), IntegerType()) +>>> _ = spark.udf.registerUDF("slen", slen) +>>> spark.sql("SELECT slen('test')").collect() +[Row(slen(test)=4)] >>> import random >>> from pyspark.sql.functions import udf ->>> from pyspark.sql.types import IntegerType, StringType +>>> from pyspark.sql.types import IntegerType >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic() ->>> newRandom_udf = spark.catalog.registerFunction("random_udf", random_udf, StringType()) +>>> newRandom_udf = spark.catalog.registerUDF("random_udf", random_udf) >>> spark.sql("SELECT random_udf()").collect() # doctest: +SKIP -[Row(random_udf()=u'82')] +[Row(random_udf()=82)] >>> spark.range(1).select(newRandom_udf()).collect() # doctest: +SKIP -[Row(random_udf()=u'62')] +[Row(random_udf()=62)] + +>>> from pyspark.sql.functions import pandas_udf, PandasUDFType +>>> @pandas_udf("integer", PandasUDFType.SCALAR) # doctest: +SKIP +... def add_one(x): +... return x + 1 +... +>>> _ = spark.udf.registerUDF("add_one", add_one) # doctest: +SKIP +>>> spark.sql("SELECT add_one(id) FROM range(10)").collect() # doctest: +SKIP --- End diff -- Although this will be skipped, we should show the result like `[Row(add_one(id)=1), Row(add_one(id)=2), ...]`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r160873927 --- Diff: python/pyspark/sql/context.py --- @@ -578,6 +606,9 @@ def __init__(self, sqlContext): def register(self, name, f, returnType=StringType()): return self.sqlContext.registerFunction(name, f, returnType) +def registerUDF(self, name, f): --- End diff -- Maybe we can add doc here by `registerUDF.__doc__ == SQLContext.registerUDF.__doc__` similar to doc for `register`. (We should do it for `registerJavaFunction` and `registerJavaUDAF`, too?) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r160864450 --- Diff: python/pyspark/sql/context.py --- @@ -203,18 +203,46 @@ def registerFunction(self, name, f, returnType=StringType()): >>> _ = sqlContext.udf.register("stringLengthInt", lambda x: len(x), IntegerType()) >>> sqlContext.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] +""" +return self.sparkSession.catalog.registerFunction(name, f, returnType) + +@ignore_unicode_prefix +@since(2.3) +def registerUDF(self, name, f): --- End diff -- Ok. Sounds good to me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r160863048 --- Diff: python/pyspark/sql/tests.py --- @@ -4085,33 +4091,50 @@ def test_vectorized_udf_timestamps_respect_session_timezone(self): finally: self.spark.conf.set("spark.sql.session.timeZone", orig_tz) -def test_nondeterministic_udf(self): +def test_nondeterministic_vectorized_udf(self): # Test that nondeterministic UDFs are evaluated only once in chained UDF evaluations from pyspark.sql.functions import udf, pandas_udf, col @pandas_udf('double') def plus_ten(v): return v + 10 -random_udf = self.random_udf +random_udf = self.nondeterministic_vectorized_udf df = self.spark.range(10).withColumn('rand', random_udf(col('id'))) result1 = df.withColumn('plus_ten(rand)', plus_ten(df['rand'])).toPandas() self.assertEqual(random_udf.deterministic, False) self.assertTrue(result1['plus_ten(rand)'].equals(result1['rand'] + 10)) -def test_nondeterministic_udf_in_aggregate(self): +def test_nondeterministic_vectorized_udf_in_aggregate(self): from pyspark.sql.functions import pandas_udf, sum df = self.spark.range(10) -random_udf = self.random_udf +random_udf = self.nondeterministic_vectorized_udf with QuietTest(self.sc): with self.assertRaisesRegexp(AnalysisException, 'nondeterministic'): df.groupby(df.id).agg(sum(random_udf(df.id))).collect() with self.assertRaisesRegexp(AnalysisException, 'nondeterministic'): df.agg(sum(random_udf(df.id))).collect() +def test_register_vectorized_udf_basic(self): +from pyspark.rdd import PythonEvalType +from pyspark.sql.functions import pandas_udf, col, expr +df = self.spark.range(10).select( +col('id').cast('int').alias('a'), +col('id').cast('int').alias('b')) +originalAdd = pandas_udf(lambda x, y: x + y, IntegerType()) --- End diff -- `originalAdd` ->`original_add` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r160863023 --- Diff: python/pyspark/sql/tests.py --- @@ -4085,33 +4091,50 @@ def test_vectorized_udf_timestamps_respect_session_timezone(self): finally: self.spark.conf.set("spark.sql.session.timeZone", orig_tz) -def test_nondeterministic_udf(self): +def test_nondeterministic_vectorized_udf(self): # Test that nondeterministic UDFs are evaluated only once in chained UDF evaluations from pyspark.sql.functions import udf, pandas_udf, col @pandas_udf('double') def plus_ten(v): return v + 10 -random_udf = self.random_udf +random_udf = self.nondeterministic_vectorized_udf df = self.spark.range(10).withColumn('rand', random_udf(col('id'))) result1 = df.withColumn('plus_ten(rand)', plus_ten(df['rand'])).toPandas() self.assertEqual(random_udf.deterministic, False) self.assertTrue(result1['plus_ten(rand)'].equals(result1['rand'] + 10)) -def test_nondeterministic_udf_in_aggregate(self): +def test_nondeterministic_vectorized_udf_in_aggregate(self): from pyspark.sql.functions import pandas_udf, sum df = self.spark.range(10) -random_udf = self.random_udf +random_udf = self.nondeterministic_vectorized_udf with QuietTest(self.sc): with self.assertRaisesRegexp(AnalysisException, 'nondeterministic'): df.groupby(df.id).agg(sum(random_udf(df.id))).collect() with self.assertRaisesRegexp(AnalysisException, 'nondeterministic'): df.agg(sum(random_udf(df.id))).collect() +def test_register_vectorized_udf_basic(self): +from pyspark.rdd import PythonEvalType +from pyspark.sql.functions import pandas_udf, col, expr +df = self.spark.range(10).select( +col('id').cast('int').alias('a'), +col('id').cast('int').alias('b')) +originalAdd = pandas_udf(lambda x, y: x + y, IntegerType()) +self.assertEqual(originalAdd.deterministic, True) +self.assertEqual(originalAdd.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF) +newAdd = self.spark.catalog.registerUDF("add1", originalAdd) --- End diff -- `newAdd` -> `new_add` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r160862588 --- Diff: python/pyspark/sql/catalog.py --- @@ -255,26 +255,67 @@ def registerFunction(self, name, f, returnType=StringType()): >>> _ = spark.udf.register("stringLengthInt", len, IntegerType()) >>> spark.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] +""" + +# This is to check whether the input function is a wrapped/native UserDefinedFunction +if hasattr(f, 'asNondeterministic'): --- End diff -- BTW, wouldn't it break the compatibility? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r160862123 --- Diff: python/pyspark/sql/context.py --- @@ -203,18 +203,46 @@ def registerFunction(self, name, f, returnType=StringType()): >>> _ = sqlContext.udf.register("stringLengthInt", lambda x: len(x), IntegerType()) >>> sqlContext.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] +""" +return self.sparkSession.catalog.registerFunction(name, f, returnType) + +@ignore_unicode_prefix +@since(2.3) +def registerUDF(self, name, f): --- End diff -- I think that's ideally right. But let's keep it consistent with the convention used here. Let's discuss and try it separately and do it in batch. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r160861750 --- Diff: python/pyspark/sql/catalog.py --- @@ -255,26 +255,67 @@ def registerFunction(self, name, f, returnType=StringType()): >>> _ = spark.udf.register("stringLengthInt", len, IntegerType()) >>> spark.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] +""" + +# This is to check whether the input function is a wrapped/native UserDefinedFunction +if hasattr(f, 'asNondeterministic'): --- End diff -- Yea, but the wrapped UDF is also callable .. unfortunately :(. I suggested this way for this reason: in `udf.py` ```diff +wrapper._unwrapped = lambda: self return wrapper ``` and then ``` if hasattr(f, "_unwrapped"): f = f._unwrapped() if isinstance(f, UserDefinedFunction): ... else: ... ``` but it was no string opinion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r160861434 --- Diff: python/pyspark/sql/context.py --- @@ -203,18 +203,46 @@ def registerFunction(self, name, f, returnType=StringType()): >>> _ = sqlContext.udf.register("stringLengthInt", lambda x: len(x), IntegerType()) >>> sqlContext.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] +""" +return self.sparkSession.catalog.registerFunction(name, f, returnType) + +@ignore_unicode_prefix +@since(2.3) +def registerUDF(self, name, f): --- End diff -- I prefer not to duplicate the doc string. Maybe we can put the docstring in the user facing API (I think it's the SQLContext one?) And reference the doc string in the other one. Or, maybe we can do sth like this if we want both docstrings ``` @ignore_unicode_prefix @since(2.3) def registerUDF(self, name, f): return self.sparkSession.catalog.registerUDF(name, f) registerUDF.__doc__ = pyspark.sql.catalog.Catalog.registerUDF.__doc__ ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r160859938 --- Diff: python/pyspark/sql/tests.py --- @@ -4147,6 +4170,21 @@ def test_simple(self): expected = df.toPandas().groupby('id').apply(foo_udf.func).reset_index(drop=True) self.assertFramesEqual(expected, result) +def test_registerGroupMapUDF(self): --- End diff -- nit: test_register_group_map_udf --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r160858810 --- Diff: python/pyspark/sql/catalog.py --- @@ -255,26 +255,67 @@ def registerFunction(self, name, f, returnType=StringType()): >>> _ = spark.udf.register("stringLengthInt", len, IntegerType()) >>> spark.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] +""" + +# This is to check whether the input function is a wrapped/native UserDefinedFunction +if hasattr(f, 'asNondeterministic'): --- End diff -- It might be better to check `f` is (1) callable (2) not a UDF object --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r160858570 --- Diff: python/pyspark/sql/catalog.py --- @@ -255,26 +255,67 @@ def registerFunction(self, name, f, returnType=StringType()): >>> _ = spark.udf.register("stringLengthInt", len, IntegerType()) >>> spark.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] +""" + +# This is to check whether the input function is a wrapped/native UserDefinedFunction +if hasattr(f, 'asNondeterministic'): +raise ValueError("Please use registerUDF for registering UDF. The expected function of " + "registerFunction is a Python function (including lambda function)") +udf = UserDefinedFunction(f, returnType=returnType, name=name, + evalType=PythonEvalType.SQL_BATCHED_UDF) +self._jsparkSession.udf().registerPython(name, udf._judf) +return udf._wrapped() + +@ignore_unicode_prefix +@since(2.3) +def registerUDF(self, name, f): +"""Registers a :class:`UserDefinedFunction`. The registered UDF can be used in SQL +statement. + +:param name: name of the UDF +:param f: a wrapped/native UserDefinedFunction. The UDF can be either row-at-a-time or + scalar vectorized. Grouped vectorized UDFs are not supported. +:return: a wrapped :class:`UserDefinedFunction` + +>>> from pyspark.sql.types import IntegerType +>>> from pyspark.sql.functions import udf +>>> slen = udf(lambda s: len(s), IntegerType()) +>>> _ = spark.udf.registerUDF("slen", slen) +>>> spark.sql("SELECT slen('test')").collect() +[Row(slen(test)=4)] >>> import random >>> from pyspark.sql.functions import udf ->>> from pyspark.sql.types import IntegerType, StringType +>>> from pyspark.sql.types import IntegerType >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic() ->>> newRandom_udf = spark.catalog.registerFunction("random_udf", random_udf, StringType()) +>>> newRandom_udf = spark.catalog.registerUDF("random_udf", random_udf) >>> spark.sql("SELECT random_udf()").collect() # doctest: +SKIP -[Row(random_udf()=u'82')] +[Row(random_udf()=82)] >>> spark.range(1).select(newRandom_udf()).collect() # doctest: +SKIP -[Row(random_udf()=u'62')] +[Row(random_udf()=62)] + +>>> from pyspark.sql.functions import pandas_udf, PandasUDFType +>>> @pandas_udf("integer", PandasUDFType.SCALAR) # doctest: +SKIP +... def add_one(x): +... return x + 1 +... +>>> _ = spark.udf.registerUDF("add_one", add_one) # doctest: +SKIP +>>> spark.sql("SELECT add_one(id) FROM range(10)").collect() # doctest: +SKIP """ # This is to check whether the input function is a wrapped/native UserDefinedFunction if hasattr(f, 'asNondeterministic'): -udf = UserDefinedFunction(f.func, returnType=returnType, name=name, - evalType=PythonEvalType.SQL_BATCHED_UDF, +if f.evalType not in [PythonEvalType.SQL_BATCHED_UDF, + PythonEvalType.SQL_PANDAS_SCALAR_UDF]: +raise ValueError( +"Invalid f: f must be either SQL_BATCHED_UDF or SQL_PANDAS_SCALAR_UDF") +udf = UserDefinedFunction(f.func, returnType=f.returnType, name=name, + evalType=f.evalType, deterministic=f.deterministic) else: -udf = UserDefinedFunction(f, returnType=returnType, name=name, - evalType=PythonEvalType.SQL_BATCHED_UDF) +raise TypeError("Please use registerFunction for registering a Python function " --- End diff -- +1. I think the user might not necessarily pass a python function here. Maybe a error message like "Invalid UDF: f must be a object returned by `udf` or `pandas_udf`" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r160858423 --- Diff: python/pyspark/sql/catalog.py --- @@ -255,26 +255,67 @@ def registerFunction(self, name, f, returnType=StringType()): >>> _ = spark.udf.register("stringLengthInt", len, IntegerType()) >>> spark.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] +""" + +# This is to check whether the input function is a wrapped/native UserDefinedFunction +if hasattr(f, 'asNondeterministic'): +raise ValueError("Please use registerUDF for registering UDF. The expected function of " + "registerFunction is a Python function (including lambda function)") +udf = UserDefinedFunction(f, returnType=returnType, name=name, + evalType=PythonEvalType.SQL_BATCHED_UDF) +self._jsparkSession.udf().registerPython(name, udf._judf) +return udf._wrapped() + +@ignore_unicode_prefix +@since(2.3) +def registerUDF(self, name, f): +"""Registers a :class:`UserDefinedFunction`. The registered UDF can be used in SQL +statement. + +:param name: name of the UDF +:param f: a wrapped/native UserDefinedFunction. The UDF can be either row-at-a-time or + scalar vectorized. Grouped vectorized UDFs are not supported. --- End diff -- I would probably say something like "The UDF can be either returned by `udf` or `pandas_udf` with type `SCALAR`" to be more specific --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r160858034 --- Diff: python/pyspark/sql/catalog.py --- @@ -255,26 +255,67 @@ def registerFunction(self, name, f, returnType=StringType()): >>> _ = spark.udf.register("stringLengthInt", len, IntegerType()) >>> spark.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] +""" + +# This is to check whether the input function is a wrapped/native UserDefinedFunction +if hasattr(f, 'asNondeterministic'): +raise ValueError("Please use registerUDF for registering UDF. The expected function of " + "registerFunction is a Python function (including lambda function)") +udf = UserDefinedFunction(f, returnType=returnType, name=name, + evalType=PythonEvalType.SQL_BATCHED_UDF) +self._jsparkSession.udf().registerPython(name, udf._judf) +return udf._wrapped() + +@ignore_unicode_prefix +@since(2.3) +def registerUDF(self, name, f): +"""Registers a :class:`UserDefinedFunction`. The registered UDF can be used in SQL +statement. + +:param name: name of the UDF --- End diff -- Maybe something like "name of the UDF in SQL statement" to be more specific. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r160857927 --- Diff: python/pyspark/sql/catalog.py --- @@ -255,26 +255,67 @@ def registerFunction(self, name, f, returnType=StringType()): >>> _ = spark.udf.register("stringLengthInt", len, IntegerType()) >>> spark.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] +""" + +# This is to check whether the input function is a wrapped/native UserDefinedFunction +if hasattr(f, 'asNondeterministic'): +raise ValueError("Please use registerUDF for registering UDF. The expected function of " + "registerFunction is a Python function (including lambda function)") +udf = UserDefinedFunction(f, returnType=returnType, name=name, + evalType=PythonEvalType.SQL_BATCHED_UDF) +self._jsparkSession.udf().registerPython(name, udf._judf) +return udf._wrapped() + +@ignore_unicode_prefix +@since(2.3) +def registerUDF(self, name, f): +"""Registers a :class:`UserDefinedFunction`. The registered UDF can be used in SQL +statement. + +:param name: name of the UDF +:param f: a wrapped/native UserDefinedFunction. The UDF can be either row-at-a-time or --- End diff -- I don't have a strong opinion what to describe them, but I think it's easy for user to understand if the description is consistent. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r160857817 --- Diff: python/pyspark/sql/catalog.py --- @@ -255,26 +255,67 @@ def registerFunction(self, name, f, returnType=StringType()): >>> _ = spark.udf.register("stringLengthInt", len, IntegerType()) >>> spark.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] +""" + +# This is to check whether the input function is a wrapped/native UserDefinedFunction +if hasattr(f, 'asNondeterministic'): +raise ValueError("Please use registerUDF for registering UDF. The expected function of " + "registerFunction is a Python function (including lambda function)") +udf = UserDefinedFunction(f, returnType=returnType, name=name, + evalType=PythonEvalType.SQL_BATCHED_UDF) +self._jsparkSession.udf().registerPython(name, udf._judf) +return udf._wrapped() + +@ignore_unicode_prefix +@since(2.3) +def registerUDF(self, name, f): +"""Registers a :class:`UserDefinedFunction`. The registered UDF can be used in SQL +statement. + +:param name: name of the UDF +:param f: a wrapped/native UserDefinedFunction. The UDF can be either row-at-a-time or --- End diff -- We should probably standardize the how to describe the wrapped function object returned by `udf` and `pandas_udf` in param docstring. Currently it's being described differently: https://github.com/apache/spark/blob/master/python/pyspark/sql/group.py#L215 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r160834039 --- Diff: python/pyspark/sql/catalog.py --- @@ -255,26 +255,67 @@ def registerFunction(self, name, f, returnType=StringType()): >>> _ = spark.udf.register("stringLengthInt", len, IntegerType()) >>> spark.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] +""" + +# This is to check whether the input function is a wrapped/native UserDefinedFunction +if hasattr(f, 'asNondeterministic'): +raise ValueError("Please use registerUDF for registering UDF. The expected function of " + "registerFunction is a Python function (including lambda function)") +udf = UserDefinedFunction(f, returnType=returnType, name=name, + evalType=PythonEvalType.SQL_BATCHED_UDF) +self._jsparkSession.udf().registerPython(name, udf._judf) +return udf._wrapped() + +@ignore_unicode_prefix +@since(2.3) +def registerUDF(self, name, f): +"""Registers a :class:`UserDefinedFunction`. The registered UDF can be used in SQL +statement. + +:param name: name of the UDF +:param f: a wrapped/native UserDefinedFunction. The UDF can be either row-at-a-time or + scalar vectorized. Grouped vectorized UDFs are not supported. +:return: a wrapped :class:`UserDefinedFunction` + +>>> from pyspark.sql.types import IntegerType +>>> from pyspark.sql.functions import udf +>>> slen = udf(lambda s: len(s), IntegerType()) +>>> _ = spark.udf.registerUDF("slen", slen) +>>> spark.sql("SELECT slen('test')").collect() +[Row(slen(test)=4)] >>> import random >>> from pyspark.sql.functions import udf ->>> from pyspark.sql.types import IntegerType, StringType +>>> from pyspark.sql.types import IntegerType >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic() ->>> newRandom_udf = spark.catalog.registerFunction("random_udf", random_udf, StringType()) +>>> newRandom_udf = spark.catalog.registerUDF("random_udf", random_udf) >>> spark.sql("SELECT random_udf()").collect() # doctest: +SKIP -[Row(random_udf()=u'82')] +[Row(random_udf()=82)] >>> spark.range(1).select(newRandom_udf()).collect() # doctest: +SKIP -[Row(random_udf()=u'62')] +[Row(random_udf()=62)] + +>>> from pyspark.sql.functions import pandas_udf, PandasUDFType +>>> @pandas_udf("integer", PandasUDFType.SCALAR) # doctest: +SKIP +... def add_one(x): +... return x + 1 +... +>>> _ = spark.udf.registerUDF("add_one", add_one) # doctest: +SKIP +>>> spark.sql("SELECT add_one(id) FROM range(10)").collect() # doctest: +SKIP """ # This is to check whether the input function is a wrapped/native UserDefinedFunction if hasattr(f, 'asNondeterministic'): -udf = UserDefinedFunction(f.func, returnType=returnType, name=name, - evalType=PythonEvalType.SQL_BATCHED_UDF, +if f.evalType not in [PythonEvalType.SQL_BATCHED_UDF, + PythonEvalType.SQL_PANDAS_SCALAR_UDF]: +raise ValueError( +"Invalid f: f must be either SQL_BATCHED_UDF or SQL_PANDAS_SCALAR_UDF") +udf = UserDefinedFunction(f.func, returnType=f.returnType, name=name, + evalType=f.evalType, deterministic=f.deterministic) else: -udf = UserDefinedFunction(f, returnType=returnType, name=name, - evalType=PythonEvalType.SQL_BATCHED_UDF) +raise TypeError("Please use registerFunction for registering a Python function " --- End diff -- If users call `spark.udf.registerUDF`, this error message looks confusing to them because there is no `spark.udf. registerFunction`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20217: [SPARK-23026] [PySpark] Add RegisterUDF to PySpar...
GitHub user gatorsmile opened a pull request: https://github.com/apache/spark/pull/20217 [SPARK-23026] [PySpark] Add RegisterUDF to PySpark ## What changes were proposed in this pull request? Add a new API for registering row-at-a-time or scalar vectorized UDFs. The registered UDFs can be used in the SQL statement. For example, Add a new API for registering row-at-a-time or scalar vectorized UDFs. The registered UDFs can be used in the SQL statement. ``` >>> from pyspark.sql.types import IntegerType >>> from pyspark.sql.functions import udf >>> slen = udf(lambda s: len(s), IntegerType()) >>> _ = spark.udf.registerUDF("slen", slen) >>> spark.sql("SELECT slen('test')").collect() [Row(slen(test)=4)] >>> import random >>> from pyspark.sql.functions import udf >>> from pyspark.sql.types import IntegerType >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic() >>> newRandom_udf = spark.catalog.registerUDF("random_udf", random_udf) >>> spark.sql("SELECT random_udf()").collect() [Row(random_udf()=82)] >>> spark.range(1).select(newRandom_udf()).collect() [Row(random_udf()=62)] >>> from pyspark.sql.functions import pandas_udf, PandasUDFType >>> @pandas_udf("integer", PandasUDFType.SCALAR) ... def add_one(x): ... return x + 1 ... >>> _ = spark.udf.registerUDF("add_one", add_one) >>> spark.sql("SELECT add_one(id) FROM range(10)").collect() ``` ## How was this patch tested? Added test cases You can merge this pull request into a Git repository by running: $ git pull https://github.com/gatorsmile/spark registerUDF Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20217.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20217 commit f25669a4b6c2298359df1b9083037468652cd141 Author: gatorsmile Date: 2018-01-10T10:24:08Z fix --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org