This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 822f58f0d26 [SPARK-45159][PYTHON] Handle named arguments only when necessary 822f58f0d26 is described below commit 822f58f0d26b7d760469151a65eaf9ee863a07a1 Author: Takuya UESHIN <ues...@databricks.com> AuthorDate: Fri Sep 15 11:13:02 2023 +0900 [SPARK-45159][PYTHON] Handle named arguments only when necessary ### What changes were proposed in this pull request? Handles named arguments only when necessary. ### Why are the changes needed? Constructing `kwargs` as `dict` could be expensive. It should be done only when necessary. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #42915 from ueshin/issues/SPARK-45159/kwargs. Authored-by: Takuya UESHIN <ues...@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../tests/connect/test_parity_arrow_python_udf.py | 24 +++ python/pyspark/worker.py | 199 ++++++++++++--------- 2 files changed, 137 insertions(+), 86 deletions(-) diff --git a/python/pyspark/sql/tests/connect/test_parity_arrow_python_udf.py b/python/pyspark/sql/tests/connect/test_parity_arrow_python_udf.py index e4a64a7d591..fa329b598d9 100644 --- a/python/pyspark/sql/tests/connect/test_parity_arrow_python_udf.py +++ b/python/pyspark/sql/tests/connect/test_parity_arrow_python_udf.py @@ -17,6 +17,8 @@ import unittest +from pyspark.errors import AnalysisException, PythonException +from pyspark.sql.functions import udf from pyspark.sql.tests.connect.test_parity_udf import UDFParityTests from pyspark.sql.tests.test_arrow_python_udf import PythonUDFArrowTestsMixin @@ -34,6 +36,28 @@ class ArrowPythonUDFParityTests(UDFParityTests, PythonUDFArrowTestsMixin): finally: super(ArrowPythonUDFParityTests, cls).tearDownClass() + def test_named_arguments_negative(self): + @udf("int") + def test_udf(a, b): + return a + b + + self.spark.udf.register("test_udf", test_udf) + + with self.assertRaisesRegex( + AnalysisException, + "DUPLICATE_ROUTINE_PARAMETER_ASSIGNMENT.DOUBLE_NAMED_ARGUMENT_REFERENCE", + ): + self.spark.sql("SELECT test_udf(a => id, a => id * 10) FROM range(2)").show() + + with self.assertRaisesRegex(AnalysisException, "UNEXPECTED_POSITIONAL_ARGUMENT"): + self.spark.sql("SELECT test_udf(a => id, id * 10) FROM range(2)").show() + + with self.assertRaises(PythonException): + self.spark.sql("SELECT test_udf(c => 'x') FROM range(2)").show() + + with self.assertRaises(PythonException): + self.spark.sql("SELECT test_udf(id, a => id * 10) FROM range(2)").show() + if __name__ == "__main__": import unittest diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 92bc622775b..eea6e8fa783 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -81,15 +81,19 @@ def chain(f, g): return lambda *a: g(f(*a)) -def wrap_udf(f, return_type): +def wrap_udf(f, args_offsets, kwargs_offsets, return_type): + func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets, kwargs_offsets) + if return_type.needConversion(): toInternal = return_type.toInternal - return lambda *a, **kw: toInternal(f(*a, **kw)) + return args_kwargs_offsets, lambda *a: toInternal(func(*a)) else: - return lambda *a, **kw: f(*a, **kw) + return args_kwargs_offsets, lambda *a: func(*a) + +def wrap_scalar_pandas_udf(f, args_offsets, kwargs_offsets, return_type): + func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets, kwargs_offsets) -def wrap_scalar_pandas_udf(f, return_type): arrow_return_type = to_arrow_type(return_type) def verify_result_type(result): @@ -115,17 +119,20 @@ def wrap_scalar_pandas_udf(f, return_type): ) return result - return lambda *a, **kw: ( - verify_result_length( - verify_result_type(f(*a, **kw)), len((list(a) + list(kw.values()))[0]) + return ( + args_kwargs_offsets, + lambda *a: ( + verify_result_length(verify_result_type(func(*a)), len(a[0])), + arrow_return_type, ), - arrow_return_type, ) -def wrap_arrow_batch_udf(f, return_type): +def wrap_arrow_batch_udf(f, args_offsets, kwargs_offsets, return_type): import pandas as pd + func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets, kwargs_offsets) + arrow_return_type = to_arrow_type(return_type) # "result_func" ensures the result of a Python UDF to be consistent with/without Arrow @@ -140,17 +147,8 @@ def wrap_arrow_batch_udf(f, return_type): result_func = lambda r: bytes(r) if r is not None else r # noqa: E731 @fail_on_stopiteration - def evaluate(*args: pd.Series, **kwargs: pd.Series) -> pd.Series: - keys = list(kwargs.keys()) - len_args = len(args) - return pd.Series( - [ - result_func( - f(*row[:len_args], **{key: row[len_args + i] for i, key in enumerate(keys)}) - ) - for row in zip(*args, *[kwargs[key] for key in keys]) - ] - ) + def evaluate(*args: pd.Series) -> pd.Series: + return pd.Series([result_func(func(*row)) for row in zip(*args)]) def verify_result_length(result, length): if len(result) != length: @@ -163,9 +161,9 @@ def wrap_arrow_batch_udf(f, return_type): ) return result - return lambda *a, **kw: ( - verify_result_length(evaluate(*a, **kw), len((list(a) + list(kw.values()))[0])), - arrow_return_type, + return ( + args_kwargs_offsets, + lambda *a: (verify_result_length(evaluate(*a), len(a[0])), arrow_return_type), ) @@ -436,25 +434,32 @@ def wrap_grouped_map_pandas_udf_with_state(f, return_type): return lambda k, v, s: [(wrapped(k, v, s), to_arrow_type(return_type))] -def wrap_grouped_agg_pandas_udf(f, return_type): +def wrap_grouped_agg_pandas_udf(f, args_offsets, kwargs_offsets, return_type): + func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets, kwargs_offsets) + arrow_return_type = to_arrow_type(return_type) - def wrapped(*args, **kwargs): + def wrapped(*series): import pandas as pd - result = f(*args, **kwargs) + result = func(*series) return pd.Series([result]) - return lambda *a, **kw: (wrapped(*a, **kw), arrow_return_type) + return ( + args_kwargs_offsets, + lambda *a: (wrapped(*a), arrow_return_type), + ) -def wrap_window_agg_pandas_udf(f, return_type, runner_conf, udf_index): +def wrap_window_agg_pandas_udf( + f, args_offsets, kwargs_offsets, return_type, runner_conf, udf_index +): window_bound_types_str = runner_conf.get("pandas_window_bound_types") window_bound_type = [t.strip().lower() for t in window_bound_types_str.split(",")][udf_index] if window_bound_type == "bounded": - return wrap_bounded_window_agg_pandas_udf(f, return_type) + return wrap_bounded_window_agg_pandas_udf(f, args_offsets, kwargs_offsets, return_type) elif window_bound_type == "unbounded": - return wrap_unbounded_window_agg_pandas_udf(f, return_type) + return wrap_unbounded_window_agg_pandas_udf(f, args_offsets, kwargs_offsets, return_type) else: raise PySparkRuntimeError( error_class="INVALID_WINDOW_BOUND_TYPE", @@ -464,26 +469,35 @@ def wrap_window_agg_pandas_udf(f, return_type, runner_conf, udf_index): ) -def wrap_unbounded_window_agg_pandas_udf(f, return_type): +def wrap_unbounded_window_agg_pandas_udf(f, args_offsets, kwargs_offsets, return_type): + func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets, kwargs_offsets) + # This is similar to grouped_agg_pandas_udf, the only difference # is that window_agg_pandas_udf needs to repeat the return value # to match window length, where grouped_agg_pandas_udf just returns # the scalar value. arrow_return_type = to_arrow_type(return_type) - def wrapped(*args, **kwargs): + def wrapped(*series): import pandas as pd - result = f(*args, **kwargs) - return pd.Series([result]).repeat(len((list(args) + list(kwargs.values()))[0])) + result = func(*series) + return pd.Series([result]).repeat(len(series[0])) + + return ( + args_kwargs_offsets, + lambda *a: (wrapped(*a), arrow_return_type), + ) - return lambda *a, **kw: (wrapped(*a, **kw), arrow_return_type) +def wrap_bounded_window_agg_pandas_udf(f, args_offsets, kwargs_offsets, return_type): + # args_offsets should have at least 2 for begin_index, end_index. + assert len(args_offsets) >= 2, len(args_offsets) + func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets[2:], kwargs_offsets) -def wrap_bounded_window_agg_pandas_udf(f, return_type): arrow_return_type = to_arrow_type(return_type) - def wrapped(begin_index, end_index, *args, **kwargs): + def wrapped(begin_index, end_index, *series): import pandas as pd result = [] @@ -508,12 +522,34 @@ def wrap_bounded_window_agg_pandas_udf(f, return_type): # Note: Calling reset_index on the slices will increase the cost # of creating slices by about 100%. Therefore, for performance # reasons we don't do it here. - args_slices = [s.iloc[begin_array[i] : end_array[i]] for s in args] - kwargs_slices = {k: s.iloc[begin_array[i] : end_array[i]] for k, s in kwargs.items()} - result.append(f(*args_slices, **kwargs_slices)) + series_slices = [s.iloc[begin_array[i] : end_array[i]] for s in series] + result.append(func(*series_slices)) return pd.Series(result) - return lambda *a, **kw: (wrapped(*a, **kw), arrow_return_type) + return ( + args_offsets[:2] + args_kwargs_offsets, + lambda *a: (wrapped(*a), arrow_return_type), + ) + + +def wrap_kwargs_support(f, args_offsets, kwargs_offsets): + if len(kwargs_offsets): + keys = list(kwargs_offsets.keys()) + + len_args_offsets = len(args_offsets) + if len_args_offsets > 0: + + def func(*args): + return f(*args[:len_args_offsets], **dict(zip(keys, args[len_args_offsets:]))) + + else: + + def func(*args): + return f(**dict(zip(keys, args))) + + return func, args_offsets + [kwargs_offsets[key] for key in keys] + else: + return f, args_offsets def read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index): @@ -561,32 +597,33 @@ def read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index): # the last returnType will be the return type of UDF if eval_type == PythonEvalType.SQL_SCALAR_PANDAS_UDF: - udf = wrap_scalar_pandas_udf(func, return_type) + return wrap_scalar_pandas_udf(func, args_offsets, kwargs_offsets, return_type) elif eval_type == PythonEvalType.SQL_ARROW_BATCHED_UDF: - udf = wrap_arrow_batch_udf(func, return_type) + return wrap_arrow_batch_udf(func, args_offsets, kwargs_offsets, return_type) elif eval_type == PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF: - udf = wrap_pandas_batch_iter_udf(func, return_type) + return args_offsets, wrap_pandas_batch_iter_udf(func, return_type) elif eval_type == PythonEvalType.SQL_MAP_PANDAS_ITER_UDF: - udf = wrap_pandas_batch_iter_udf(func, return_type) + return args_offsets, wrap_pandas_batch_iter_udf(func, return_type) elif eval_type == PythonEvalType.SQL_MAP_ARROW_ITER_UDF: - udf = wrap_arrow_batch_iter_udf(func, return_type) + return args_offsets, wrap_arrow_batch_iter_udf(func, return_type) elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF: argspec = getfullargspec(chained_func) # signature was lost when wrapping it - udf = wrap_grouped_map_pandas_udf(func, return_type, argspec, runner_conf) + return args_offsets, wrap_grouped_map_pandas_udf(func, return_type, argspec, runner_conf) elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE: - udf = wrap_grouped_map_pandas_udf_with_state(func, return_type) + return args_offsets, wrap_grouped_map_pandas_udf_with_state(func, return_type) elif eval_type == PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF: argspec = getfullargspec(chained_func) # signature was lost when wrapping it - udf = wrap_cogrouped_map_pandas_udf(func, return_type, argspec, runner_conf) + return args_offsets, wrap_cogrouped_map_pandas_udf(func, return_type, argspec, runner_conf) elif eval_type == PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF: - udf = wrap_grouped_agg_pandas_udf(func, return_type) + return wrap_grouped_agg_pandas_udf(func, args_offsets, kwargs_offsets, return_type) elif eval_type == PythonEvalType.SQL_WINDOW_AGG_PANDAS_UDF: - udf = wrap_window_agg_pandas_udf(func, return_type, runner_conf, udf_index) + return wrap_window_agg_pandas_udf( + func, args_offsets, kwargs_offsets, return_type, runner_conf, udf_index + ) elif eval_type == PythonEvalType.SQL_BATCHED_UDF: - udf = wrap_udf(func, return_type) + return wrap_udf(func, args_offsets, kwargs_offsets, return_type) else: raise ValueError("Unknown eval type: {}".format(eval_type)) - return args_offsets, kwargs_offsets, udf # Used by SQL_GROUPED_MAP_PANDAS_UDF and SQL_SCALAR_PANDAS_UDF and SQL_ARROW_BATCHED_UDF when @@ -789,9 +826,9 @@ def read_udtf(pickleSer, infile, eval_type): return result # Wrap the exception thrown from the UDTF in a PySparkRuntimeError. - def func(*a: Any, **kw: Any) -> Any: + def func(*a: Any) -> Any: try: - return f(*a, **kw) + return f(*a) except Exception as e: raise PySparkRuntimeError( error_class="UDTF_EXEC_ERROR", @@ -810,28 +847,26 @@ def read_udtf(pickleSer, infile, eval_type): }, ) - def evaluate(*args: pd.Series, **kwargs: pd.Series): - if len(args) == 0 and len(kwargs) == 0: + def evaluate(*args: pd.Series): + if len(args) == 0: res = func() check_return_value(res) yield verify_result(pd.DataFrame(res)), arrow_return_type else: # Create tuples from the input pandas Series, each tuple # represents a row across all Series. - keys = list(kwargs.keys()) - len_args = len(args) - row_tuples = zip(*args, *[kwargs[key] for key in keys]) + row_tuples = zip(*args) for row in row_tuples: - res = func( - *row[:len_args], - **{key: row[len_args + i] for i, key in enumerate(keys)}, - ) + res = func(*row) check_return_value(res) yield verify_result(pd.DataFrame(res)), arrow_return_type return evaluate - eval = wrap_arrow_udtf(getattr(udtf, "eval"), return_type) + eval_func_kwargs_support, args_kwargs_offsets = wrap_kwargs_support( + getattr(udtf, "eval"), args_offsets, kwargs_offsets + ) + eval = wrap_arrow_udtf(eval_func_kwargs_support, return_type) if hasattr(udtf, "terminate"): terminate = wrap_arrow_udtf(getattr(udtf, "terminate"), return_type) @@ -843,10 +878,7 @@ def read_udtf(pickleSer, infile, eval_type): for a in it: # The eval function yields an iterator. Each element produced by this # iterator is a tuple in the form of (pandas.DataFrame, arrow_return_type). - yield from eval( - *[a[o] for o in args_offsets], - **{k: a[o] for k, o in kwargs_offsets.items()}, - ) + yield from eval(*[a[o] for o in args_kwargs_offsets]) finally: if terminate is not None: yield from terminate() @@ -884,9 +916,9 @@ def read_udtf(pickleSer, infile, eval_type): return toInternal(result) # Evaluate the function and return a tuple back to the executor. - def evaluate(*a, **kw) -> tuple: + def evaluate(*a) -> tuple: try: - res = f(*a, **kw) + res = f(*a) except Exception as e: raise PySparkRuntimeError( error_class="UDTF_EXEC_ERROR", @@ -914,7 +946,10 @@ def read_udtf(pickleSer, infile, eval_type): return evaluate - eval = wrap_udtf(getattr(udtf, "eval"), return_type) + eval_func_kwargs_support, args_kwargs_offsets = wrap_kwargs_support( + getattr(udtf, "eval"), args_offsets, kwargs_offsets + ) + eval = wrap_udtf(eval_func_kwargs_support, return_type) if hasattr(udtf, "terminate"): terminate = wrap_udtf(getattr(udtf, "terminate"), return_type) @@ -925,10 +960,7 @@ def read_udtf(pickleSer, infile, eval_type): def mapper(_, it): try: for a in it: - yield eval( - *[a[o] for o in args_offsets], - **{k: a[o] for k, o in kwargs_offsets.items()}, - ) + yield eval(*[a[o] for o in args_kwargs_offsets]) finally: if terminate is not None: yield terminate() @@ -1028,9 +1060,7 @@ def read_udfs(pickleSer, infile, eval_type): if is_map_arrow_iter: assert num_udfs == 1, "One MAP_ARROW_ITER UDF expected here." - arg_offsets, _, udf = read_single_udf( - pickleSer, infile, eval_type, runner_conf, udf_index=0 - ) + arg_offsets, udf = read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=0) def func(_, iterator): num_input_rows = 0 @@ -1120,7 +1150,7 @@ def read_udfs(pickleSer, infile, eval_type): # See FlatMapGroupsInPandasExec for how arg_offsets are used to # distinguish between grouping attributes and data attributes - arg_offsets, _, f = read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=0) + arg_offsets, f = read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=0) parsed_offsets = extract_key_value_indexes(arg_offsets) # Create function like this: @@ -1137,7 +1167,7 @@ def read_udfs(pickleSer, infile, eval_type): # See FlatMapGroupsInPandas(WithState)Exec for how arg_offsets are used to # distinguish between grouping attributes and data attributes - arg_offsets, _, f = read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=0) + arg_offsets, f = read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=0) parsed_offsets = extract_key_value_indexes(arg_offsets) def mapper(a): @@ -1171,7 +1201,7 @@ def read_udfs(pickleSer, infile, eval_type): # We assume there is only one UDF here because cogrouped map doesn't # support combining multiple UDFs. assert num_udfs == 1 - arg_offsets, _, f = read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=0) + arg_offsets, f = read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=0) parsed_offsets = extract_key_value_indexes(arg_offsets) @@ -1188,10 +1218,7 @@ def read_udfs(pickleSer, infile, eval_type): udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i)) def mapper(a): - result = tuple( - f(*[a[o] for o in args_offsets], **{k: a[o] for k, o in kwargs_offsets.items()}) - for args_offsets, kwargs_offsets, f in udfs - ) + result = tuple(f(*[a[o] for o in arg_offsets]) for arg_offsets, f in udfs) # In the special case of a single UDF this will return a single result rather # than a tuple of results; this is the format that the JVM side expects. if len(result) == 1: --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org