xinrong-meng commented on code in PR #39384:
URL: https://github.com/apache/spark/pull/39384#discussion_r1063078787
##########
python/pyspark/sql/udf.py:
##########
@@ -75,6 +81,104 @@ def _create_udf(
return udf_obj._wrapped()
+def _create_py_udf(
+ f: Callable[..., Any],
+ returnType: "DataTypeOrString",
+ evalType: int,
+ useArrow: Optional[bool] = None,
+) -> "UserDefinedFunctionLike":
+ # The following table shows the results when the type coercion in Arrow is
needed, that is,
+ # when the user-specified return type(SQL Type) of the UDF and the actual
instance(Python
+ # Value(Type)) that the UDF returns are different.
+ # Arrow and Pickle have different type coercion rules, so a UDF might have
a different result
+ # with/without Arrow optimization. That's the main reason the Arrow
optimization for Python
+ # UDFs is disabled by default.
+ #
+-----------------------------+--------------+----------+------+------+----------------+-----------------------------+----------+----------------------+---------+-----------+----------------------------+----------+--------------+
# noqa
+ # |SQL Type \ Python
Value(Type)|None(NoneType)|True(bool)|1(int)|a(str)|1970-01-01(date)|1970-01-01
00:00:00(datetime)|1.0(float)|array('i',
[1])(array)|[1](list)|(1,)(tuple)|bytearray(b'ABC')(bytearray)|1(Decimal)|{'a':
1}(dict)| # noqa
+ #
+-----------------------------+--------------+----------+------+------+----------------+-----------------------------+----------+----------------------+---------+-----------+----------------------------+----------+--------------+
# noqa
+ # | boolean| X| X| X| X|
X| X| X| X|
X| X| X| X| X| #
noqa
+ # | tinyint| X| X| X| X|
X| X| X| X|
X| X| X| X| X| #
noqa
+ # | smallint| X| X| X| X|
X| X| X| X|
X| X| X| X| X| #
noqa
+ # | int| X| X| X| X|
X| X| X| X|
X| X| X| X| X| #
noqa
+ # | bigint| X| X| X| X|
X| X| X| X|
X| X| X| X| X| #
noqa
+ # | string| X| X| X| X|
X| X| X| X|
X| X| X| X| X| #
noqa
+ # | date| X| X| X| X|
X| X| X| X|
X| X| X| X| X| #
noqa
+ # | timestamp| X| X| X| X|
X| X| X| X|
X| X| X| X| X| #
noqa
+ # | float| X| X| X| X|
X| X| X| X|
X| X| X| X| X| #
noqa
+ # | double| X| X| X| X|
X| X| X| X|
X| X| X| X| X| #
noqa
+ # | binary| X| X| X| X|
X| X| X| X|
X| X| X| X| X| #
noqa
+ # | decimal(10,0)| X| X| X| X|
X| X| X| X|
X| X| X| X| X| #
noqa
+ #
+-----------------------------+--------------+----------+------+------+----------------+-----------------------------+----------+----------------------+---------+-----------+----------------------------+----------+--------------+
# noqa
+ # Note: The values of 'SQL Type' are DDL formatted strings, which can be
used as `returnType`s.
+ # Note: The values inside the table are generated by `repr`. X' means it
throws an exception
+ # during the conversion.
+
+ from pyspark.sql import SparkSession
+
+ session = SparkSession._instantiatedSession
+ if session is None:
+ is_arrow_enabled = False
+ else:
+ is_arrow_enabled = (
+ session.conf.get("spark.sql.execution.pythonUDF.arrow.enabled") ==
"true"
+ if useArrow is None
+ else useArrow
+ )
+
+ regular_udf = _create_udf(f, returnType, evalType)
+ return_type = regular_udf.returnType
+ try:
+ is_func_with_args = len(getfullargspec(f).args) > 0
+ except TypeError:
+ is_func_with_args = False
+ is_output_atomic_type = (
Review Comment:
The limitation ought to be eliminated as a follow-up.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]