holdenk commented on code in PR #56327:
URL: https://github.com/apache/spark/pull/56327#discussion_r3477214314
##########
python/pyspark/sql/udf.py:
##########
@@ -206,6 +206,79 @@ def __init__(
)
self.evalType = evalType
self.deterministic = deterministic
+ # Extract Python UDF details if transpilation is enabled.
+ self.transpiled: list = []
+ self._transpiled_param_names: list[str] = []
+ # Per-option input-type categories ("numeric"/"string" per public
param),
+ # parallel to ``self.transpiled``; the JVM picks the option matching
the
+ # actual column types or falls back to interpreted Python.
+ self._transpiled_input_categories: list = []
+ # When we have a transpiled rewrite, ``__call__`` resolves any
+ # user-supplied kwargs against this positional parameter list so
+ # the JVM-side ``_udf_param_N`` substitution sees the inputs in
+ # the right order. Empty list when transpilation didn't happen.
+ from pyspark.sql import SparkSession
+
+ session = SparkSession._instantiatedSession
+ # A nondeterministic UDF must not be transpiled: replacing it with a
plain
+ # Catalyst expression would let the optimizer fold/reorder/duplicate
it,
+ # discarding the nondeterminism barrier. (asNondeterministic() also
clears
+ # any options set here, for the udf(f).asNondeterministic() ordering.)
+ transpile_enabled = (
+ False
+ if session is None
+ else (
+ deterministic
+ and evalType == PythonEvalType.SQL_BATCHED_UDF
+ and
session.conf.get("spark.sql.experimental.optimizer.transpilePyUDFs") == "true"
+ )
+ )
+ ansi_enabled = (
+ False if session is None else
session.conf.get("spark.sql.ansi.enabled") == "true"
+ )
+ self._transpile_errors = []
+ # Transpilation only attempts to reproduce ANSI-mode Spark SQL
semantics
+ # (no silent integer overflow, divide-by-zero raises, etc.). Running it
+ # against non-ANSI Spark would balloon the test matrix we'd have to
+ # maintain to verify Python-vs-SQL equivalence, so we gate on ANSI here
+ # and warn the user instead of trying to transpile in a mode we don't
+ # claim to support yet.
+ if transpile_enabled and not ansi_enabled:
+ warnings.warn(
+ "Python UDF transpilation "
+ "(spark.sql.experimental.optimizer.transpilePyUDFs) is only "
+ "supported when ANSI mode is enabled "
+ "(spark.sql.ansi.enabled=true). Skipping transpilation for "
+ f"{func} -- enable ANSI mode or set transpilePyUDFs=false to "
+ "silence this warning."
+ )
+ self._transpile_errors.append("Transpilation only functions in
ANSI mode.")
+ transpile_enabled = False
Review Comment:
Looking at how we do it for Arrow we do trigger the warning everytime so
lets actually match that behavior initially.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]