Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20217#discussion_r160950521 --- Diff: python/pyspark/sql/catalog.py --- @@ -255,26 +255,67 @@ def registerFunction(self, name, f, returnType=StringType()): >>> _ = spark.udf.register("stringLengthInt", len, IntegerType()) >>> spark.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] + """ + + # This is to check whether the input function is a wrapped/native UserDefinedFunction + if hasattr(f, 'asNondeterministic'): + raise ValueError("Please use registerUDF for registering UDF. The expected function of " + "registerFunction is a Python function (including lambda function)") + udf = UserDefinedFunction(f, returnType=returnType, name=name, + evalType=PythonEvalType.SQL_BATCHED_UDF) + self._jsparkSession.udf().registerPython(name, udf._judf) + return udf._wrapped() + + @ignore_unicode_prefix + @since(2.3) + def registerUDF(self, name, f): + """Registers a :class:`UserDefinedFunction`. The registered UDF can be used in SQL + statement. + + :param name: name of the UDF + :param f: a wrapped/native UserDefinedFunction. The UDF can be either row-at-a-time or + scalar vectorized. Grouped vectorized UDFs are not supported. + :return: a wrapped :class:`UserDefinedFunction` + + >>> from pyspark.sql.types import IntegerType + >>> from pyspark.sql.functions import udf + >>> slen = udf(lambda s: len(s), IntegerType()) + >>> _ = spark.udf.registerUDF("slen", slen) + >>> spark.sql("SELECT slen('test')").collect() + [Row(slen(test)=4)] >>> import random >>> from pyspark.sql.functions import udf - >>> from pyspark.sql.types import IntegerType, StringType + >>> from pyspark.sql.types import IntegerType >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic() - >>> newRandom_udf = spark.catalog.registerFunction("random_udf", random_udf, StringType()) + >>> newRandom_udf = spark.catalog.registerUDF("random_udf", random_udf) >>> spark.sql("SELECT random_udf()").collect() # doctest: +SKIP - [Row(random_udf()=u'82')] + [Row(random_udf()=82)] >>> spark.range(1).select(newRandom_udf()).collect() # doctest: +SKIP - [Row(random_udf()=u'62')] + [Row(random_udf()=62)] + + >>> from pyspark.sql.functions import pandas_udf, PandasUDFType + >>> @pandas_udf("integer", PandasUDFType.SCALAR) # doctest: +SKIP + ... def add_one(x): + ... return x + 1 + ... + >>> _ = spark.udf.registerUDF("add_one", add_one) # doctest: +SKIP + >>> spark.sql("SELECT add_one(id) FROM range(10)").collect() # doctest: +SKIP --- End diff -- Done.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org