holdenk commented on code in PR #56327:
URL: https://github.com/apache/spark/pull/56327#discussion_r3477214314


##########
python/pyspark/sql/udf.py:
##########
@@ -206,6 +206,79 @@ def __init__(
         )
         self.evalType = evalType
         self.deterministic = deterministic
+        # Extract Python UDF details if transpilation is enabled.
+        self.transpiled: list = []
+        self._transpiled_param_names: list[str] = []
+        # Per-option input-type categories ("numeric"/"string" per public 
param),
+        # parallel to ``self.transpiled``; the JVM picks the option matching 
the
+        # actual column types or falls back to interpreted Python.
+        self._transpiled_input_categories: list = []
+        # When we have a transpiled rewrite, ``__call__`` resolves any
+        # user-supplied kwargs against this positional parameter list so
+        # the JVM-side ``_udf_param_N`` substitution sees the inputs in
+        # the right order. Empty list when transpilation didn't happen.
+        from pyspark.sql import SparkSession
+
+        session = SparkSession._instantiatedSession
+        # A nondeterministic UDF must not be transpiled: replacing it with a 
plain
+        # Catalyst expression would let the optimizer fold/reorder/duplicate 
it,
+        # discarding the nondeterminism barrier. (asNondeterministic() also 
clears
+        # any options set here, for the udf(f).asNondeterministic() ordering.)
+        transpile_enabled = (
+            False
+            if session is None
+            else (
+                deterministic
+                and evalType == PythonEvalType.SQL_BATCHED_UDF
+                and 
session.conf.get("spark.sql.experimental.optimizer.transpilePyUDFs") == "true"
+            )
+        )
+        ansi_enabled = (
+            False if session is None else 
session.conf.get("spark.sql.ansi.enabled") == "true"
+        )
+        self._transpile_errors = []
+        # Transpilation only attempts to reproduce ANSI-mode Spark SQL 
semantics
+        # (no silent integer overflow, divide-by-zero raises, etc.). Running it
+        # against non-ANSI Spark would balloon the test matrix we'd have to
+        # maintain to verify Python-vs-SQL equivalence, so we gate on ANSI here
+        # and warn the user instead of trying to transpile in a mode we don't
+        # claim to support yet.
+        if transpile_enabled and not ansi_enabled:
+            warnings.warn(
+                "Python UDF transpilation "
+                "(spark.sql.experimental.optimizer.transpilePyUDFs) is only "
+                "supported when ANSI mode is enabled "
+                "(spark.sql.ansi.enabled=true). Skipping transpilation for "
+                f"{func} -- enable ANSI mode or set transpilePyUDFs=false to "
+                "silence this warning."
+            )
+            self._transpile_errors.append("Transpilation only functions in 
ANSI mode.")
+            transpile_enabled = False

Review Comment:
   Looking at how we do it for Arrow we do trigger the warning everytime so 
lets actually match that behavior initially.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to