xinrong-meng commented on code in PR #40725:
URL: https://github.com/apache/spark/pull/40725#discussion_r1161969893


##########
python/pyspark/sql/udf.py:
##########
@@ -139,48 +139,52 @@ def _create_py_udf(
         and not isinstance(return_type, ArrayType)
     )
     if is_arrow_enabled and is_output_atomic_type and is_func_with_args:
-        require_minimum_pandas_version()
-        require_minimum_pyarrow_version()
-
-        import pandas as pd
-        from pyspark.sql.pandas.functions import _create_pandas_udf  # type: 
ignore[attr-defined]
-
-        # "result_func" ensures the result of a Python UDF to be consistent 
with/without Arrow
-        # optimization.
-        # Otherwise, an Arrow-optimized Python UDF raises 
"pyarrow.lib.ArrowTypeError: Expected a
-        # string or bytes dtype, got ..." whereas a non-Arrow-optimized Python 
UDF returns
-        # successfully.
-        result_func = lambda pdf: pdf  # noqa: E731
-        if type(return_type) == StringType:
-            result_func = lambda r: str(r) if r is not None else r  # noqa: 
E731
-        elif type(return_type) == BinaryType:
-            result_func = lambda r: bytes(r) if r is not None else r  # noqa: 
E731
-
-        def vectorized_udf(*args: pd.Series) -> pd.Series:
-            if any(map(lambda arg: isinstance(arg, pd.DataFrame), args)):
-                raise NotImplementedError(
-                    "Struct input type are not supported with Arrow 
optimization "
-                    "enabled in Python UDFs. Disable "
-                    "'spark.sql.execution.pythonUDF.arrow.enabled' to 
workaround."
-                )
-            return pd.Series(result_func(f(*a)) for a in zip(*args))
-
-        # Regular UDFs can take callable instances too.
-        vectorized_udf.__name__ = f.__name__ if hasattr(f, "__name__") else 
f.__class__.__name__
-        vectorized_udf.__module__ = (
-            f.__module__ if hasattr(f, "__module__") else 
f.__class__.__module__
-        )
-        vectorized_udf.__doc__ = f.__doc__
-        pudf = _create_pandas_udf(vectorized_udf, returnType, None)
-        # Keep the attributes as if this is a regular Python UDF.
-        pudf.func = f
-        pudf.returnType = return_type
-        pudf.evalType = regular_udf.evalType
-        return pudf
+        return _create_arrow_py_udf(f, regular_udf)
     else:
         return regular_udf
 
 
+def _create_arrow_py_udf(f, regular_udf):  # type: ignore

Review Comment:
   The function is only an extraction of original code L142 - L179 for code 
reuse.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to