holdenk commented on code in PR #56327: URL: https://github.com/apache/spark/pull/56327#discussion_r3469647975
########## python/pyspark/sql/tests/test_udf_transpile_hypothesis.py: ########## @@ -0,0 +1,658 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +Differential / property-based tests for UDF transpilation (SPARK-54783). + +These tests run a fixed set of small Python UDFs twice -- once with +``spark.sql.experimental.optimizer.transpilePyUDFs`` enabled (so the catalyst +transpiler in :mod:`pyspark.sql.transpile` rewrites them into native +expressions) and once without -- and assert that the two runs produce the +same results for inputs generated by Hypothesis. + +The transpiler is intentionally minimal at this point so we expect this +suite to surface bugs (e.g. truthiness / NULL-handling mismatches between +Python's ``if x:`` semantics and SQL's ``CASE WHEN``). Failures here should +be treated as real correctness gaps in the transpiler, not as test bugs to +silence. + +The suite is gated on two things, both required: + +* the ``RUN_HYPOTHESIS`` env var must be present in the environment + (its value doesn't matter, only its presence), and +* the ``hypothesis`` package must be installed. + +If either gate is unmet the entire suite is skipped cleanly so it never +becomes a CI tax for folks who haven't opted in. In CI, this opt-in suite +is wired through ``.github/workflows/build_and_test.yml``, which flips both +gates on for the relevant job when PR changes touch the transpiler or this +test file. + +To run locally:: + + pip install hypothesis + RUN_HYPOTHESIS=1 RUN_HYPOTHESIS_MAX_EXAMPLES=1000 \ + python/run-tests --testnames pyspark.sql.tests.test_udf_transpile_hypothesis + +Set ``RUN_HYPOTHESIS_MAX_EXAMPLES`` to override the per-test example count +(default 100, kept small for CI). +""" + +import os +import unittest +import warnings +from typing import Optional + +from pyspark.sql import Row +from pyspark.sql.types import ( + BooleanType, + LongType, + StructField, + StructType, +) +from pyspark.sql.udf import UserDefinedFunction +from pyspark.testing.sqlutils import ReusedSQLTestCase +from pyspark.testing.utils import have_package +from pyspark.util import is_remote_only + + +# Sentinel value used by ``_run`` to mark "this side raised". A unique +# object is sufficient because we only ever compare it against itself +# inside the helper. +_SENTINEL_RAISED = object() + + +_HYPOTHESIS_ENV = "RUN_HYPOTHESIS" +_have_hypothesis = have_package("hypothesis") +# Presence-based: any value (including empty) opts in. We just check for the +# key being in os.environ so e.g. `RUN_HYPOTHESIS= python ...` still counts. +_hypothesis_enabled = _HYPOTHESIS_ENV in os.environ +# Transpilation is only supported in regular (non-Connect) Spark for now, +# so the hypothesis suite skips cleanly under a pyspark-client-only install. +_regular_spark = not is_remote_only() +_skip_reason = ( + f"Set {_HYPOTHESIS_ENV} in the environment to run; hypothesis must also be installed, " + "and the suite only runs under regular (non-Connect) Spark." +) + + +if _have_hypothesis: + from hypothesis import HealthCheck, example, given, settings, strategies as st + + _DEFAULT_MAX_EXAMPLES = int(os.environ.get("RUN_HYPOTHESIS_MAX_EXAMPLES", "1000")) + + # The ``function_scoped_fixture` health check is suppressed because we intentionally reuse the + # class-level SparkSession across examples; the per-example ``deadline`` is disabled because + # Spark task execution is much slower than hypothesis's default budget. + _hyp_settings = settings( + max_examples=_DEFAULT_MAX_EXAMPLES, + deadline=None, + suppress_health_check=[HealthCheck.function_scoped_fixture], + ) + + # 64-bit signed range, kept clear of overflow so arithmetic in both + # Python and Spark stays in the safe LongType range. + _LONG_BOUND = 2**31 - 1 Review Comment: Range mismatch between text and impl ########## common/utils/src/main/resources/error/error-conditions.json: ########## @@ -5002,6 +5002,18 @@ ], "sqlState" : "38000" }, + "INVALID_UDF_PARAMETER_PLACEHOLDER" : { + "message" : [ + "Invalid Python UDF transpiled-expression parameter placeholder: <placeholder>. Placeholders must be of the form `_udf_param_N` where N is a non-negative integer index into the UDF's positional arguments. This is an internal error in the Python UDF transpiler." + ], + "sqlState" : "42000" Review Comment: Is this the right number? ########## python/pyspark/sql/tests/test_udf_transpile_hypothesis.py: ########## @@ -0,0 +1,658 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +Differential / property-based tests for UDF transpilation (SPARK-54783). + +These tests run a fixed set of small Python UDFs twice -- once with +``spark.sql.experimental.optimizer.transpilePyUDFs`` enabled (so the catalyst +transpiler in :mod:`pyspark.sql.transpile` rewrites them into native +expressions) and once without -- and assert that the two runs produce the +same results for inputs generated by Hypothesis. + +The transpiler is intentionally minimal at this point so we expect this +suite to surface bugs (e.g. truthiness / NULL-handling mismatches between +Python's ``if x:`` semantics and SQL's ``CASE WHEN``). Failures here should +be treated as real correctness gaps in the transpiler, not as test bugs to +silence. + +The suite is gated on two things, both required: + +* the ``RUN_HYPOTHESIS`` env var must be present in the environment + (its value doesn't matter, only its presence), and +* the ``hypothesis`` package must be installed. + +If either gate is unmet the entire suite is skipped cleanly so it never +becomes a CI tax for folks who haven't opted in. In CI, this opt-in suite +is wired through ``.github/workflows/build_and_test.yml``, which flips both +gates on for the relevant job when PR changes touch the transpiler or this +test file. + +To run locally:: + + pip install hypothesis + RUN_HYPOTHESIS=1 RUN_HYPOTHESIS_MAX_EXAMPLES=1000 \ + python/run-tests --testnames pyspark.sql.tests.test_udf_transpile_hypothesis + +Set ``RUN_HYPOTHESIS_MAX_EXAMPLES`` to override the per-test example count +(default 100, kept small for CI). +""" + +import os +import unittest +import warnings +from typing import Optional + +from pyspark.sql import Row +from pyspark.sql.types import ( + BooleanType, + LongType, + StructField, + StructType, +) +from pyspark.sql.udf import UserDefinedFunction +from pyspark.testing.sqlutils import ReusedSQLTestCase +from pyspark.testing.utils import have_package +from pyspark.util import is_remote_only + + +# Sentinel value used by ``_run`` to mark "this side raised". A unique +# object is sufficient because we only ever compare it against itself +# inside the helper. +_SENTINEL_RAISED = object() + + +_HYPOTHESIS_ENV = "RUN_HYPOTHESIS" +_have_hypothesis = have_package("hypothesis") +# Presence-based: any value (including empty) opts in. We just check for the +# key being in os.environ so e.g. `RUN_HYPOTHESIS= python ...` still counts. +_hypothesis_enabled = _HYPOTHESIS_ENV in os.environ +# Transpilation is only supported in regular (non-Connect) Spark for now, +# so the hypothesis suite skips cleanly under a pyspark-client-only install. +_regular_spark = not is_remote_only() +_skip_reason = ( + f"Set {_HYPOTHESIS_ENV} in the environment to run; hypothesis must also be installed, " + "and the suite only runs under regular (non-Connect) Spark." +) + + +if _have_hypothesis: + from hypothesis import HealthCheck, example, given, settings, strategies as st + + _DEFAULT_MAX_EXAMPLES = int(os.environ.get("RUN_HYPOTHESIS_MAX_EXAMPLES", "1000")) + + # The ``function_scoped_fixture` health check is suppressed because we intentionally reuse the + # class-level SparkSession across examples; the per-example ``deadline`` is disabled because + # Spark task execution is much slower than hypothesis's default budget. + _hyp_settings = settings( + max_examples=_DEFAULT_MAX_EXAMPLES, + deadline=None, + suppress_health_check=[HealthCheck.function_scoped_fixture], + ) + + # 64-bit signed range, kept clear of overflow so arithmetic in both + # Python and Spark stays in the safe LongType range. + _LONG_BOUND = 2**31 - 1 + _long_strategy = st.one_of( + st.none(), st.integers(min_value=-_LONG_BOUND, max_value=_LONG_BOUND) + ) + _bool_strategy = st.one_of(st.none(), st.booleans()) + + # ---- Edge-case seeds (scalacheck-style) ----------------------------- + # + # Hypothesis already biases toward "interesting" boundary values, but + # explicit ``@example`` decorators make a regression on a specific + # value -- e.g. NULL, zero, the type's max -- deterministic across + # runs. These are the values we always want to try, before random + # generation kicks in. + _LONG_EDGES = (None, 0, 1, -1, 7, -7, _LONG_BOUND, -_LONG_BOUND) + # Bool space is exhaustive (only three values) so the @example + # decorators here serve more as documentation of the NULL handling + # we care about than as new coverage on top of hypothesis's + # generator. + _BOOL_EDGES = (None, True, False) + # Multi-arg edges -- nulls plus the four sign-combos for non-zero + # values. Catches off-by-one errors in parameter-index plumbing + # better than random generation alone. + _LONG_PAIR_EDGES = ( Review Comment: Let's add int32 bounds too ########## python/pyspark/sql/udf.py: ########## @@ -206,6 +206,79 @@ def __init__( ) self.evalType = evalType self.deterministic = deterministic + # Extract Python UDF details if transpilation is enabled. + self.transpiled: list = [] + self._transpiled_param_names: list[str] = [] + # Per-option input-type categories ("numeric"/"string" per public param), + # parallel to ``self.transpiled``; the JVM picks the option matching the + # actual column types or falls back to interpreted Python. + self._transpiled_input_categories: list = [] + # When we have a transpiled rewrite, ``__call__`` resolves any + # user-supplied kwargs against this positional parameter list so + # the JVM-side ``_udf_param_N`` substitution sees the inputs in + # the right order. Empty list when transpilation didn't happen. + from pyspark.sql import SparkSession + + session = SparkSession._instantiatedSession + # A nondeterministic UDF must not be transpiled: replacing it with a plain + # Catalyst expression would let the optimizer fold/reorder/duplicate it, + # discarding the nondeterminism barrier. (asNondeterministic() also clears + # any options set here, for the udf(f).asNondeterministic() ordering.) + transpile_enabled = ( + False + if session is None + else ( + deterministic + and evalType == PythonEvalType.SQL_BATCHED_UDF + and session.conf.get("spark.sql.experimental.optimizer.transpilePyUDFs") == "true" + ) + ) + ansi_enabled = ( + False if session is None else session.conf.get("spark.sql.ansi.enabled") == "true" + ) + self._transpile_errors = [] + # Transpilation only attempts to reproduce ANSI-mode Spark SQL semantics + # (no silent integer overflow, divide-by-zero raises, etc.). Running it + # against non-ANSI Spark would balloon the test matrix we'd have to + # maintain to verify Python-vs-SQL equivalence, so we gate on ANSI here + # and warn the user instead of trying to transpile in a mode we don't + # claim to support yet. + if transpile_enabled and not ansi_enabled: + warnings.warn( + "Python UDF transpilation " + "(spark.sql.experimental.optimizer.transpilePyUDFs) is only " + "supported when ANSI mode is enabled " + "(spark.sql.ansi.enabled=true). Skipping transpilation for " + f"{func} -- enable ANSI mode or set transpilePyUDFs=false to " + "silence this warning." + ) + self._transpile_errors.append("Transpilation only functions in ANSI mode.") + transpile_enabled = False Review Comment: Consider moving so we're not checking this everytime ########## python/pyspark/sql/udf.py: ########## @@ -557,6 +663,14 @@ def asNondeterministic(self) -> "UserDefinedFunction": # with 'deterministic' updated. See SPARK-23233. self._judf_placeholder = None self.deterministic = False + # A transpiled rewrite replaces the (now nondeterministic) Python UDF + # with a plain Catalyst expression, which the optimizer is free to + # fold, reorder, or duplicate -- discarding the nondeterminism barrier + # the caller just asked for. Drop any transpiled options so a + # nondeterministic UDF always runs as interpreted Python. + self.transpiled = [] + self._transpiled_param_names = [] Review Comment: The more I think about this the less certain I am, we could (for example) have a non-deterministic UDF calling rand and transpile it into the sql rand / randn function but lets do that as a follow up. ########## python/pyspark/sql/tests/test_udf_transpile_hypothesis.py: ########## @@ -0,0 +1,658 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +Differential / property-based tests for UDF transpilation (SPARK-54783). + +These tests run a fixed set of small Python UDFs twice -- once with +``spark.sql.experimental.optimizer.transpilePyUDFs`` enabled (so the catalyst +transpiler in :mod:`pyspark.sql.transpile` rewrites them into native +expressions) and once without -- and assert that the two runs produce the +same results for inputs generated by Hypothesis. + +The transpiler is intentionally minimal at this point so we expect this +suite to surface bugs (e.g. truthiness / NULL-handling mismatches between +Python's ``if x:`` semantics and SQL's ``CASE WHEN``). Failures here should +be treated as real correctness gaps in the transpiler, not as test bugs to +silence. + +The suite is gated on two things, both required: + +* the ``RUN_HYPOTHESIS`` env var must be present in the environment + (its value doesn't matter, only its presence), and +* the ``hypothesis`` package must be installed. + +If either gate is unmet the entire suite is skipped cleanly so it never +becomes a CI tax for folks who haven't opted in. In CI, this opt-in suite +is wired through ``.github/workflows/build_and_test.yml``, which flips both +gates on for the relevant job when PR changes touch the transpiler or this +test file. + +To run locally:: + + pip install hypothesis + RUN_HYPOTHESIS=1 RUN_HYPOTHESIS_MAX_EXAMPLES=1000 \ + python/run-tests --testnames pyspark.sql.tests.test_udf_transpile_hypothesis + +Set ``RUN_HYPOTHESIS_MAX_EXAMPLES`` to override the per-test example count +(default 100, kept small for CI). +""" + +import os +import unittest +import warnings +from typing import Optional + +from pyspark.sql import Row +from pyspark.sql.types import ( + BooleanType, + LongType, + StructField, + StructType, +) +from pyspark.sql.udf import UserDefinedFunction +from pyspark.testing.sqlutils import ReusedSQLTestCase +from pyspark.testing.utils import have_package +from pyspark.util import is_remote_only + + +# Sentinel value used by ``_run`` to mark "this side raised". A unique +# object is sufficient because we only ever compare it against itself +# inside the helper. +_SENTINEL_RAISED = object() + + +_HYPOTHESIS_ENV = "RUN_HYPOTHESIS" +_have_hypothesis = have_package("hypothesis") +# Presence-based: any value (including empty) opts in. We just check for the +# key being in os.environ so e.g. `RUN_HYPOTHESIS= python ...` still counts. +_hypothesis_enabled = _HYPOTHESIS_ENV in os.environ +# Transpilation is only supported in regular (non-Connect) Spark for now, +# so the hypothesis suite skips cleanly under a pyspark-client-only install. +_regular_spark = not is_remote_only() +_skip_reason = ( + f"Set {_HYPOTHESIS_ENV} in the environment to run; hypothesis must also be installed, " + "and the suite only runs under regular (non-Connect) Spark." +) + + +if _have_hypothesis: + from hypothesis import HealthCheck, example, given, settings, strategies as st + + _DEFAULT_MAX_EXAMPLES = int(os.environ.get("RUN_HYPOTHESIS_MAX_EXAMPLES", "1000")) + + # The ``function_scoped_fixture` health check is suppressed because we intentionally reuse the + # class-level SparkSession across examples; the per-example ``deadline`` is disabled because + # Spark task execution is much slower than hypothesis's default budget. + _hyp_settings = settings( + max_examples=_DEFAULT_MAX_EXAMPLES, + deadline=None, + suppress_health_check=[HealthCheck.function_scoped_fixture], + ) + + # 64-bit signed range, kept clear of overflow so arithmetic in both + # Python and Spark stays in the safe LongType range. + _LONG_BOUND = 2**31 - 1 + _long_strategy = st.one_of( + st.none(), st.integers(min_value=-_LONG_BOUND, max_value=_LONG_BOUND) + ) + _bool_strategy = st.one_of(st.none(), st.booleans()) + + # ---- Edge-case seeds (scalacheck-style) ----------------------------- + # + # Hypothesis already biases toward "interesting" boundary values, but + # explicit ``@example`` decorators make a regression on a specific + # value -- e.g. NULL, zero, the type's max -- deterministic across + # runs. These are the values we always want to try, before random + # generation kicks in. + _LONG_EDGES = (None, 0, 1, -1, 7, -7, _LONG_BOUND, -_LONG_BOUND) + # Bool space is exhaustive (only three values) so the @example + # decorators here serve more as documentation of the NULL handling + # we care about than as new coverage on top of hypothesis's + # generator. + _BOOL_EDGES = (None, True, False) + # Multi-arg edges -- nulls plus the four sign-combos for non-zero + # values. Catches off-by-one errors in parameter-index plumbing + # better than random generation alone. + _LONG_PAIR_EDGES = ( + (None, None), + (None, 0), + (0, None), + (0, 0), + (1, -1), + (-1, 1), + (_LONG_BOUND, 1), + (1, -_LONG_BOUND), + ) + # Sign-combo edges (plus NULL combinations) for the boolean tests. + # The bodies (``x > 0 and y > 0`` / ``x > 0 or y > 0``) raise in + # pure Python on a None input (``TypeError``), and the transpiler's + # NULL-guarded Compare also raises -- so the ``_run`` helper's "both + # raised" equivalence covers the NULL cases here. + _BOOLEAN_PAIR_EDGES = ( + (None, None), + (None, 0), + (0, None), + (0, 0), + (1, -1), + (-1, 1), + (1, 1), + (-1, -1), + (_LONG_BOUND, 1), + (1, -_LONG_BOUND), + ) + + def _seed_examples(values, key="value"): + """Stack one ``@example`` decorator per seed value.""" + + def wrapper(method): + for v in reversed(values): + method = example(**{key: v})(method) + return method + + return wrapper + + def _seed_pair_examples(pairs, keys=("x", "y")): + def wrapper(method): + for v0, v1 in reversed(pairs): + method = example(**{keys[0]: v0, keys[1]: v1})(method) + return method + + return wrapper + + +# ---- The UDF templates we exercise -------------------------------------- +# +# We keep these as module-level callables so that ``inspect.getsource`` works +# (the transpiler reads source via inspection). They are deliberately written +# the way a user would: idiomatic Python, including ``if x is not None`` +# guards and bare ``if x:`` truthiness checks. + + +def plus_four(x): + if x is not None: + return x + 4 Review Comment: Let's also add plus_four unsafe where we don't check None ########## python/pyspark/sql/tests/test_udf_transpile_hypothesis.py: ########## @@ -0,0 +1,658 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +Differential / property-based tests for UDF transpilation (SPARK-54783). + +These tests run a fixed set of small Python UDFs twice -- once with +``spark.sql.experimental.optimizer.transpilePyUDFs`` enabled (so the catalyst +transpiler in :mod:`pyspark.sql.transpile` rewrites them into native +expressions) and once without -- and assert that the two runs produce the +same results for inputs generated by Hypothesis. + +The transpiler is intentionally minimal at this point so we expect this +suite to surface bugs (e.g. truthiness / NULL-handling mismatches between +Python's ``if x:`` semantics and SQL's ``CASE WHEN``). Failures here should +be treated as real correctness gaps in the transpiler, not as test bugs to +silence. + +The suite is gated on two things, both required: + +* the ``RUN_HYPOTHESIS`` env var must be present in the environment + (its value doesn't matter, only its presence), and +* the ``hypothesis`` package must be installed. + +If either gate is unmet the entire suite is skipped cleanly so it never +becomes a CI tax for folks who haven't opted in. In CI, this opt-in suite +is wired through ``.github/workflows/build_and_test.yml``, which flips both +gates on for the relevant job when PR changes touch the transpiler or this +test file. + +To run locally:: + + pip install hypothesis + RUN_HYPOTHESIS=1 RUN_HYPOTHESIS_MAX_EXAMPLES=1000 \ + python/run-tests --testnames pyspark.sql.tests.test_udf_transpile_hypothesis + +Set ``RUN_HYPOTHESIS_MAX_EXAMPLES`` to override the per-test example count +(default 100, kept small for CI). +""" + +import os +import unittest +import warnings +from typing import Optional + +from pyspark.sql import Row +from pyspark.sql.types import ( + BooleanType, + LongType, + StructField, + StructType, +) +from pyspark.sql.udf import UserDefinedFunction +from pyspark.testing.sqlutils import ReusedSQLTestCase +from pyspark.testing.utils import have_package +from pyspark.util import is_remote_only + + +# Sentinel value used by ``_run`` to mark "this side raised". A unique +# object is sufficient because we only ever compare it against itself +# inside the helper. +_SENTINEL_RAISED = object() + + +_HYPOTHESIS_ENV = "RUN_HYPOTHESIS" +_have_hypothesis = have_package("hypothesis") +# Presence-based: any value (including empty) opts in. We just check for the +# key being in os.environ so e.g. `RUN_HYPOTHESIS= python ...` still counts. +_hypothesis_enabled = _HYPOTHESIS_ENV in os.environ +# Transpilation is only supported in regular (non-Connect) Spark for now, +# so the hypothesis suite skips cleanly under a pyspark-client-only install. +_regular_spark = not is_remote_only() +_skip_reason = ( + f"Set {_HYPOTHESIS_ENV} in the environment to run; hypothesis must also be installed, " + "and the suite only runs under regular (non-Connect) Spark." +) + + +if _have_hypothesis: + from hypothesis import HealthCheck, example, given, settings, strategies as st + + _DEFAULT_MAX_EXAMPLES = int(os.environ.get("RUN_HYPOTHESIS_MAX_EXAMPLES", "1000")) + + # The ``function_scoped_fixture` health check is suppressed because we intentionally reuse the + # class-level SparkSession across examples; the per-example ``deadline`` is disabled because + # Spark task execution is much slower than hypothesis's default budget. + _hyp_settings = settings( + max_examples=_DEFAULT_MAX_EXAMPLES, + deadline=None, + suppress_health_check=[HealthCheck.function_scoped_fixture], + ) + + # 64-bit signed range, kept clear of overflow so arithmetic in both + # Python and Spark stays in the safe LongType range. + _LONG_BOUND = 2**31 - 1 + _long_strategy = st.one_of( + st.none(), st.integers(min_value=-_LONG_BOUND, max_value=_LONG_BOUND) + ) + _bool_strategy = st.one_of(st.none(), st.booleans()) + + # ---- Edge-case seeds (scalacheck-style) ----------------------------- + # + # Hypothesis already biases toward "interesting" boundary values, but + # explicit ``@example`` decorators make a regression on a specific + # value -- e.g. NULL, zero, the type's max -- deterministic across + # runs. These are the values we always want to try, before random + # generation kicks in. + _LONG_EDGES = (None, 0, 1, -1, 7, -7, _LONG_BOUND, -_LONG_BOUND) Review Comment: Let's add int32 bounds too ########## python/pyspark/sql/transpile.py: ########## @@ -0,0 +1,847 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +Experimental tools for transpiling UDFS. + +Transpilation is only attempted when both +``spark.sql.experimental.optimizer.transpilePyUDFs=true`` and +``spark.sql.ansi.enabled=true``. The generated Catalyst expressions +target ANSI-mode SQL semantics (overflow raises, divide-by-zero raises, +etc.); running them under non-ANSI mode would silently diverge from the +Python interpretation in ways we don't currently track. If you flip +transpilation on with ANSI off the UDF will fall back to interpreted +Python execution and a warning is logged at UDF construction time. + +Python's ``+`` and ``*`` are overloaded for text (concat / repeat), so an +untyped parameter is transpiled into one option per input-type category +(numeric and string) and the JVM picks the one matching the bound column +types -- falling back to interpreted Python when none fit. Annotating the +UDF's parameters (e.g. ``def f(a: int, b: str)``) pins each category and +keeps the option matrix small; prefer doing so. To bound plan growth, +functions with more than three untyped parameters only emit the +all-numeric and all-string variants. +""" + +import ast +from typing import Any, Callable, List, Optional, Tuple, TYPE_CHECKING +import inspect +import itertools +import textwrap +from pyspark.errors import UnsupportedOperationException +from pyspark.sql.column import Column +from pyspark.sql.functions import ( + abs as _abs, + coalesce, + col, + concat, + lit, + pmod, + raise_error, + repeat, + sign, + when, +) + + +if TYPE_CHECKING: + from pyspark.sql import SparkSession + from pyspark.sql._typing import DataTypeOrString + + +class AbstractTranspiler(object): + """Base class for transpilers. All experimental.""" + + varieties: dict[str, type["AbstractTranspiler"]] = {} + # Specify the "friendly" name a user can add to spark.sql.experimental.optimizer.transpilers + # to enable this transpiler. + variety: str = "" + + @classmethod + def register(cls) -> None: + AbstractTranspiler.varieties[cls.variety] = cls + + def _transpile_from_ast( + self, + src: Optional[str], + ast_info: ast.AST, + function_ast: ast.FunctionDef, + params: List[str], + returnType: "DataTypeOrString", + param_categories: Optional[dict] = None, + ) -> Optional[Column]: + pass + + +def _is_definitely_boolean(node: ast.AST) -> bool: + """Return True when ``node`` is statically guaranteed to produce a Python + ``bool`` (or ``None``, which round-trips through ``coalesce``). + + Used to gate ``if``/ternary lowering: we only allow the test expression + into Catalyst's ``when(coalesce(test, false), ...)`` form when it provably + produces a boolean. Everything else (bare Name, arithmetic, function calls, + subscript, …) must force a fallback to interpreted Python instead of + silently diverging. + """ + match node: + case ast.Constant(value=v): + return v is None or isinstance(v, bool) + case ast.Compare(): + # All comparison operators produce bool. + return True + case ast.BoolOp(): + # and / or of booleans produces bool. + return True Review Comment: We need to check that the values of the bool op are boolean. ########## python/pyspark/sql/transpile.py: ########## @@ -0,0 +1,847 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +Experimental tools for transpiling UDFS. + +Transpilation is only attempted when both +``spark.sql.experimental.optimizer.transpilePyUDFs=true`` and +``spark.sql.ansi.enabled=true``. The generated Catalyst expressions +target ANSI-mode SQL semantics (overflow raises, divide-by-zero raises, +etc.); running them under non-ANSI mode would silently diverge from the +Python interpretation in ways we don't currently track. If you flip +transpilation on with ANSI off the UDF will fall back to interpreted +Python execution and a warning is logged at UDF construction time. + +Python's ``+`` and ``*`` are overloaded for text (concat / repeat), so an +untyped parameter is transpiled into one option per input-type category +(numeric and string) and the JVM picks the one matching the bound column +types -- falling back to interpreted Python when none fit. Annotating the +UDF's parameters (e.g. ``def f(a: int, b: str)``) pins each category and +keeps the option matrix small; prefer doing so. To bound plan growth, +functions with more than three untyped parameters only emit the +all-numeric and all-string variants. +""" + +import ast +from typing import Any, Callable, List, Optional, Tuple, TYPE_CHECKING +import inspect +import itertools +import textwrap +from pyspark.errors import UnsupportedOperationException +from pyspark.sql.column import Column +from pyspark.sql.functions import ( + abs as _abs, + coalesce, + col, + concat, + lit, + pmod, + raise_error, + repeat, + sign, + when, +) + + +if TYPE_CHECKING: + from pyspark.sql import SparkSession + from pyspark.sql._typing import DataTypeOrString + + +class AbstractTranspiler(object): + """Base class for transpilers. All experimental.""" + + varieties: dict[str, type["AbstractTranspiler"]] = {} + # Specify the "friendly" name a user can add to spark.sql.experimental.optimizer.transpilers + # to enable this transpiler. + variety: str = "" + + @classmethod + def register(cls) -> None: + AbstractTranspiler.varieties[cls.variety] = cls + + def _transpile_from_ast( + self, + src: Optional[str], + ast_info: ast.AST, + function_ast: ast.FunctionDef, + params: List[str], + returnType: "DataTypeOrString", + param_categories: Optional[dict] = None, + ) -> Optional[Column]: + pass + + +def _is_definitely_boolean(node: ast.AST) -> bool: + """Return True when ``node`` is statically guaranteed to produce a Python + ``bool`` (or ``None``, which round-trips through ``coalesce``). + + Used to gate ``if``/ternary lowering: we only allow the test expression + into Catalyst's ``when(coalesce(test, false), ...)`` form when it provably + produces a boolean. Everything else (bare Name, arithmetic, function calls, + subscript, …) must force a fallback to interpreted Python instead of + silently diverging. + """ + match node: + case ast.Constant(value=v): + return v is None or isinstance(v, bool) + case ast.Compare(): + # All comparison operators produce bool. + return True + case ast.BoolOp(): + # and / or of booleans produces bool. + return True + case ast.UnaryOp(op=ast.Not()): + # `not x` always produces bool. + return True + case ast.IfExp(body=body, orelse=orelse): + # Ternary is boolean only if both branches are. + return _is_definitely_boolean(body) and _is_definitely_boolean(orelse) + case _: + return False + + +def _is_definitely_non_boolean(node: ast.AST) -> bool: + """Return True when ``node`` is statically guaranteed to evaluate to a + value that is *not* a Python ``bool``. + + Used to gate the bitwise lowering of ``and`` / ``or`` / ``not``: Python's + short-circuit operators return one of their operands rather than a strict + bool, so ``x or 0`` against an int column would silently get + bitwise-style behaviour from Spark's ``|`` instead of Python's truthiness + fallback. We can't always tell statically (a bare ``ast.Name`` could be + bound to any type), so we conservatively only refuse to lower when an + operand is *provably* non-boolean -- numeric / string literals, an + arithmetic ``BinOp``, a numeric ``UnaryOp(USub/UAdd)``. Everything else + (Names, Compare, Not, nested BoolOps, IfExp, conservative cases) is + treated as "possibly boolean" and we let the bitwise lowering proceed, + relying on the input being a boolean column at runtime. + """ + match node: + case ast.Constant(value=v): + # ``True`` and ``False`` are themselves bool; ``None`` we + # accept (it round-trips through coalesce). Everything else + # is definitely not bool. + return not (v is None or isinstance(v, bool)) + case ast.BinOp( + op=ast.Add() + | ast.Sub() + | ast.Mult() + | ast.Div() + | ast.FloorDiv() + | ast.Mod() + | ast.Pow() + | ast.LShift() + | ast.RShift() + | ast.MatMult() + ): + # Arithmetic / shift BinOps produce numeric (or matrix) results, + # never booleans, so they're provably non-boolean. Bitwise + # ``&`` / ``|`` / ``^`` are deliberately NOT matched: they + # produce a boolean when both operands are boolean (e.g. + # ``(x > 0) & (y > 0)``), so leaving them in the "possibly + # boolean" bucket lets the BoolOp / Not lowering proceed. + return True + case ast.UnaryOp(op=ast.USub()) | ast.UnaryOp(op=ast.UAdd()): + return True + case ast.IfExp(body=body, orelse=orelse): + # Conditional only known non-boolean if both branches are. + return _is_definitely_non_boolean(body) and _is_definitely_non_boolean(orelse) + case ast.Call(): + # Function calls: we don't know the return type statically, so we + # can't claim they're non-boolean. Leave as "possibly boolean" and + # let the caller attempt lowering; if the call itself is not + # supported the catch-all arm will raise UnsupportedOperationException. + return False Review Comment: Can we look at the annotated return type if it's strongly typed? ########## python/pyspark/sql/tests/test_udf_transpile_hypothesis.py: ########## @@ -0,0 +1,658 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +Differential / property-based tests for UDF transpilation (SPARK-54783). + +These tests run a fixed set of small Python UDFs twice -- once with +``spark.sql.experimental.optimizer.transpilePyUDFs`` enabled (so the catalyst +transpiler in :mod:`pyspark.sql.transpile` rewrites them into native +expressions) and once without -- and assert that the two runs produce the +same results for inputs generated by Hypothesis. + +The transpiler is intentionally minimal at this point so we expect this +suite to surface bugs (e.g. truthiness / NULL-handling mismatches between +Python's ``if x:`` semantics and SQL's ``CASE WHEN``). Failures here should +be treated as real correctness gaps in the transpiler, not as test bugs to +silence. + +The suite is gated on two things, both required: + +* the ``RUN_HYPOTHESIS`` env var must be present in the environment + (its value doesn't matter, only its presence), and +* the ``hypothesis`` package must be installed. + +If either gate is unmet the entire suite is skipped cleanly so it never +becomes a CI tax for folks who haven't opted in. In CI, this opt-in suite +is wired through ``.github/workflows/build_and_test.yml``, which flips both +gates on for the relevant job when PR changes touch the transpiler or this +test file. + +To run locally:: + + pip install hypothesis + RUN_HYPOTHESIS=1 RUN_HYPOTHESIS_MAX_EXAMPLES=1000 \ + python/run-tests --testnames pyspark.sql.tests.test_udf_transpile_hypothesis + +Set ``RUN_HYPOTHESIS_MAX_EXAMPLES`` to override the per-test example count +(default 100, kept small for CI). +""" + +import os +import unittest +import warnings +from typing import Optional + +from pyspark.sql import Row +from pyspark.sql.types import ( + BooleanType, + LongType, + StructField, + StructType, +) +from pyspark.sql.udf import UserDefinedFunction +from pyspark.testing.sqlutils import ReusedSQLTestCase +from pyspark.testing.utils import have_package +from pyspark.util import is_remote_only + + +# Sentinel value used by ``_run`` to mark "this side raised". A unique +# object is sufficient because we only ever compare it against itself +# inside the helper. +_SENTINEL_RAISED = object() + + +_HYPOTHESIS_ENV = "RUN_HYPOTHESIS" +_have_hypothesis = have_package("hypothesis") +# Presence-based: any value (including empty) opts in. We just check for the +# key being in os.environ so e.g. `RUN_HYPOTHESIS= python ...` still counts. +_hypothesis_enabled = _HYPOTHESIS_ENV in os.environ +# Transpilation is only supported in regular (non-Connect) Spark for now, +# so the hypothesis suite skips cleanly under a pyspark-client-only install. +_regular_spark = not is_remote_only() +_skip_reason = ( + f"Set {_HYPOTHESIS_ENV} in the environment to run; hypothesis must also be installed, " + "and the suite only runs under regular (non-Connect) Spark." +) + + +if _have_hypothesis: + from hypothesis import HealthCheck, example, given, settings, strategies as st + + _DEFAULT_MAX_EXAMPLES = int(os.environ.get("RUN_HYPOTHESIS_MAX_EXAMPLES", "1000")) + + # The ``function_scoped_fixture` health check is suppressed because we intentionally reuse the + # class-level SparkSession across examples; the per-example ``deadline`` is disabled because + # Spark task execution is much slower than hypothesis's default budget. + _hyp_settings = settings( + max_examples=_DEFAULT_MAX_EXAMPLES, + deadline=None, + suppress_health_check=[HealthCheck.function_scoped_fixture], + ) + + # 64-bit signed range, kept clear of overflow so arithmetic in both + # Python and Spark stays in the safe LongType range. + _LONG_BOUND = 2**31 - 1 + _long_strategy = st.one_of( + st.none(), st.integers(min_value=-_LONG_BOUND, max_value=_LONG_BOUND) + ) + _bool_strategy = st.one_of(st.none(), st.booleans()) + + # ---- Edge-case seeds (scalacheck-style) ----------------------------- + # + # Hypothesis already biases toward "interesting" boundary values, but + # explicit ``@example`` decorators make a regression on a specific + # value -- e.g. NULL, zero, the type's max -- deterministic across + # runs. These are the values we always want to try, before random + # generation kicks in. + _LONG_EDGES = (None, 0, 1, -1, 7, -7, _LONG_BOUND, -_LONG_BOUND) + # Bool space is exhaustive (only three values) so the @example + # decorators here serve more as documentation of the NULL handling + # we care about than as new coverage on top of hypothesis's + # generator. + _BOOL_EDGES = (None, True, False) + # Multi-arg edges -- nulls plus the four sign-combos for non-zero + # values. Catches off-by-one errors in parameter-index plumbing + # better than random generation alone. + _LONG_PAIR_EDGES = ( + (None, None), + (None, 0), + (0, None), + (0, 0), + (1, -1), + (-1, 1), + (_LONG_BOUND, 1), + (1, -_LONG_BOUND), + ) + # Sign-combo edges (plus NULL combinations) for the boolean tests. + # The bodies (``x > 0 and y > 0`` / ``x > 0 or y > 0``) raise in + # pure Python on a None input (``TypeError``), and the transpiler's + # NULL-guarded Compare also raises -- so the ``_run`` helper's "both + # raised" equivalence covers the NULL cases here. + _BOOLEAN_PAIR_EDGES = ( + (None, None), + (None, 0), + (0, None), + (0, 0), + (1, -1), + (-1, 1), + (1, 1), + (-1, -1), + (_LONG_BOUND, 1), + (1, -_LONG_BOUND), + ) Review Comment: Let's also add str/int compare and we should verify we throw an error Scala and Python side respectively. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
