This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 822f58f0d26 [SPARK-45159][PYTHON] Handle named arguments only when 
necessary
822f58f0d26 is described below

commit 822f58f0d26b7d760469151a65eaf9ee863a07a1
Author: Takuya UESHIN <ues...@databricks.com>
AuthorDate: Fri Sep 15 11:13:02 2023 +0900

    [SPARK-45159][PYTHON] Handle named arguments only when necessary
    
    ### What changes were proposed in this pull request?
    
    Handles named arguments only when necessary.
    
    ### Why are the changes needed?
    
    Constructing `kwargs` as `dict` could be expensive. It should be done only 
when necessary.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #42915 from ueshin/issues/SPARK-45159/kwargs.
    
    Authored-by: Takuya UESHIN <ues...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../tests/connect/test_parity_arrow_python_udf.py  |  24 +++
 python/pyspark/worker.py                           | 199 ++++++++++++---------
 2 files changed, 137 insertions(+), 86 deletions(-)

diff --git a/python/pyspark/sql/tests/connect/test_parity_arrow_python_udf.py 
b/python/pyspark/sql/tests/connect/test_parity_arrow_python_udf.py
index e4a64a7d591..fa329b598d9 100644
--- a/python/pyspark/sql/tests/connect/test_parity_arrow_python_udf.py
+++ b/python/pyspark/sql/tests/connect/test_parity_arrow_python_udf.py
@@ -17,6 +17,8 @@
 
 import unittest
 
+from pyspark.errors import AnalysisException, PythonException
+from pyspark.sql.functions import udf
 from pyspark.sql.tests.connect.test_parity_udf import UDFParityTests
 from pyspark.sql.tests.test_arrow_python_udf import PythonUDFArrowTestsMixin
 
@@ -34,6 +36,28 @@ class ArrowPythonUDFParityTests(UDFParityTests, 
PythonUDFArrowTestsMixin):
         finally:
             super(ArrowPythonUDFParityTests, cls).tearDownClass()
 
+    def test_named_arguments_negative(self):
+        @udf("int")
+        def test_udf(a, b):
+            return a + b
+
+        self.spark.udf.register("test_udf", test_udf)
+
+        with self.assertRaisesRegex(
+            AnalysisException,
+            
"DUPLICATE_ROUTINE_PARAMETER_ASSIGNMENT.DOUBLE_NAMED_ARGUMENT_REFERENCE",
+        ):
+            self.spark.sql("SELECT test_udf(a => id, a => id * 10) FROM 
range(2)").show()
+
+        with self.assertRaisesRegex(AnalysisException, 
"UNEXPECTED_POSITIONAL_ARGUMENT"):
+            self.spark.sql("SELECT test_udf(a => id, id * 10) FROM 
range(2)").show()
+
+        with self.assertRaises(PythonException):
+            self.spark.sql("SELECT test_udf(c => 'x') FROM range(2)").show()
+
+        with self.assertRaises(PythonException):
+            self.spark.sql("SELECT test_udf(id, a => id * 10) FROM 
range(2)").show()
+
 
 if __name__ == "__main__":
     import unittest
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 92bc622775b..eea6e8fa783 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -81,15 +81,19 @@ def chain(f, g):
     return lambda *a: g(f(*a))
 
 
-def wrap_udf(f, return_type):
+def wrap_udf(f, args_offsets, kwargs_offsets, return_type):
+    func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets, 
kwargs_offsets)
+
     if return_type.needConversion():
         toInternal = return_type.toInternal
-        return lambda *a, **kw: toInternal(f(*a, **kw))
+        return args_kwargs_offsets, lambda *a: toInternal(func(*a))
     else:
-        return lambda *a, **kw: f(*a, **kw)
+        return args_kwargs_offsets, lambda *a: func(*a)
+
 
+def wrap_scalar_pandas_udf(f, args_offsets, kwargs_offsets, return_type):
+    func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets, 
kwargs_offsets)
 
-def wrap_scalar_pandas_udf(f, return_type):
     arrow_return_type = to_arrow_type(return_type)
 
     def verify_result_type(result):
@@ -115,17 +119,20 @@ def wrap_scalar_pandas_udf(f, return_type):
             )
         return result
 
-    return lambda *a, **kw: (
-        verify_result_length(
-            verify_result_type(f(*a, **kw)), len((list(a) + 
list(kw.values()))[0])
+    return (
+        args_kwargs_offsets,
+        lambda *a: (
+            verify_result_length(verify_result_type(func(*a)), len(a[0])),
+            arrow_return_type,
         ),
-        arrow_return_type,
     )
 
 
-def wrap_arrow_batch_udf(f, return_type):
+def wrap_arrow_batch_udf(f, args_offsets, kwargs_offsets, return_type):
     import pandas as pd
 
+    func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets, 
kwargs_offsets)
+
     arrow_return_type = to_arrow_type(return_type)
 
     # "result_func" ensures the result of a Python UDF to be consistent 
with/without Arrow
@@ -140,17 +147,8 @@ def wrap_arrow_batch_udf(f, return_type):
         result_func = lambda r: bytes(r) if r is not None else r  # noqa: E731
 
     @fail_on_stopiteration
-    def evaluate(*args: pd.Series, **kwargs: pd.Series) -> pd.Series:
-        keys = list(kwargs.keys())
-        len_args = len(args)
-        return pd.Series(
-            [
-                result_func(
-                    f(*row[:len_args], **{key: row[len_args + i] for i, key in 
enumerate(keys)})
-                )
-                for row in zip(*args, *[kwargs[key] for key in keys])
-            ]
-        )
+    def evaluate(*args: pd.Series) -> pd.Series:
+        return pd.Series([result_func(func(*row)) for row in zip(*args)])
 
     def verify_result_length(result, length):
         if len(result) != length:
@@ -163,9 +161,9 @@ def wrap_arrow_batch_udf(f, return_type):
             )
         return result
 
-    return lambda *a, **kw: (
-        verify_result_length(evaluate(*a, **kw), len((list(a) + 
list(kw.values()))[0])),
-        arrow_return_type,
+    return (
+        args_kwargs_offsets,
+        lambda *a: (verify_result_length(evaluate(*a), len(a[0])), 
arrow_return_type),
     )
 
 
@@ -436,25 +434,32 @@ def wrap_grouped_map_pandas_udf_with_state(f, 
return_type):
     return lambda k, v, s: [(wrapped(k, v, s), to_arrow_type(return_type))]
 
 
-def wrap_grouped_agg_pandas_udf(f, return_type):
+def wrap_grouped_agg_pandas_udf(f, args_offsets, kwargs_offsets, return_type):
+    func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets, 
kwargs_offsets)
+
     arrow_return_type = to_arrow_type(return_type)
 
-    def wrapped(*args, **kwargs):
+    def wrapped(*series):
         import pandas as pd
 
-        result = f(*args, **kwargs)
+        result = func(*series)
         return pd.Series([result])
 
-    return lambda *a, **kw: (wrapped(*a, **kw), arrow_return_type)
+    return (
+        args_kwargs_offsets,
+        lambda *a: (wrapped(*a), arrow_return_type),
+    )
 
 
-def wrap_window_agg_pandas_udf(f, return_type, runner_conf, udf_index):
+def wrap_window_agg_pandas_udf(
+    f, args_offsets, kwargs_offsets, return_type, runner_conf, udf_index
+):
     window_bound_types_str = runner_conf.get("pandas_window_bound_types")
     window_bound_type = [t.strip().lower() for t in 
window_bound_types_str.split(",")][udf_index]
     if window_bound_type == "bounded":
-        return wrap_bounded_window_agg_pandas_udf(f, return_type)
+        return wrap_bounded_window_agg_pandas_udf(f, args_offsets, 
kwargs_offsets, return_type)
     elif window_bound_type == "unbounded":
-        return wrap_unbounded_window_agg_pandas_udf(f, return_type)
+        return wrap_unbounded_window_agg_pandas_udf(f, args_offsets, 
kwargs_offsets, return_type)
     else:
         raise PySparkRuntimeError(
             error_class="INVALID_WINDOW_BOUND_TYPE",
@@ -464,26 +469,35 @@ def wrap_window_agg_pandas_udf(f, return_type, 
runner_conf, udf_index):
         )
 
 
-def wrap_unbounded_window_agg_pandas_udf(f, return_type):
+def wrap_unbounded_window_agg_pandas_udf(f, args_offsets, kwargs_offsets, 
return_type):
+    func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets, 
kwargs_offsets)
+
     # This is similar to grouped_agg_pandas_udf, the only difference
     # is that window_agg_pandas_udf needs to repeat the return value
     # to match window length, where grouped_agg_pandas_udf just returns
     # the scalar value.
     arrow_return_type = to_arrow_type(return_type)
 
-    def wrapped(*args, **kwargs):
+    def wrapped(*series):
         import pandas as pd
 
-        result = f(*args, **kwargs)
-        return pd.Series([result]).repeat(len((list(args) + 
list(kwargs.values()))[0]))
+        result = func(*series)
+        return pd.Series([result]).repeat(len(series[0]))
+
+    return (
+        args_kwargs_offsets,
+        lambda *a: (wrapped(*a), arrow_return_type),
+    )
 
-    return lambda *a, **kw: (wrapped(*a, **kw), arrow_return_type)
 
+def wrap_bounded_window_agg_pandas_udf(f, args_offsets, kwargs_offsets, 
return_type):
+    # args_offsets should have at least 2 for begin_index, end_index.
+    assert len(args_offsets) >= 2, len(args_offsets)
+    func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets[2:], 
kwargs_offsets)
 
-def wrap_bounded_window_agg_pandas_udf(f, return_type):
     arrow_return_type = to_arrow_type(return_type)
 
-    def wrapped(begin_index, end_index, *args, **kwargs):
+    def wrapped(begin_index, end_index, *series):
         import pandas as pd
 
         result = []
@@ -508,12 +522,34 @@ def wrap_bounded_window_agg_pandas_udf(f, return_type):
             # Note: Calling reset_index on the slices will increase the cost
             #       of creating slices by about 100%. Therefore, for 
performance
             #       reasons we don't do it here.
-            args_slices = [s.iloc[begin_array[i] : end_array[i]] for s in args]
-            kwargs_slices = {k: s.iloc[begin_array[i] : end_array[i]] for k, s 
in kwargs.items()}
-            result.append(f(*args_slices, **kwargs_slices))
+            series_slices = [s.iloc[begin_array[i] : end_array[i]] for s in 
series]
+            result.append(func(*series_slices))
         return pd.Series(result)
 
-    return lambda *a, **kw: (wrapped(*a, **kw), arrow_return_type)
+    return (
+        args_offsets[:2] + args_kwargs_offsets,
+        lambda *a: (wrapped(*a), arrow_return_type),
+    )
+
+
+def wrap_kwargs_support(f, args_offsets, kwargs_offsets):
+    if len(kwargs_offsets):
+        keys = list(kwargs_offsets.keys())
+
+        len_args_offsets = len(args_offsets)
+        if len_args_offsets > 0:
+
+            def func(*args):
+                return f(*args[:len_args_offsets], **dict(zip(keys, 
args[len_args_offsets:])))
+
+        else:
+
+            def func(*args):
+                return f(**dict(zip(keys, args)))
+
+        return func, args_offsets + [kwargs_offsets[key] for key in keys]
+    else:
+        return f, args_offsets
 
 
 def read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index):
@@ -561,32 +597,33 @@ def read_single_udf(pickleSer, infile, eval_type, 
runner_conf, udf_index):
 
     # the last returnType will be the return type of UDF
     if eval_type == PythonEvalType.SQL_SCALAR_PANDAS_UDF:
-        udf = wrap_scalar_pandas_udf(func, return_type)
+        return wrap_scalar_pandas_udf(func, args_offsets, kwargs_offsets, 
return_type)
     elif eval_type == PythonEvalType.SQL_ARROW_BATCHED_UDF:
-        udf = wrap_arrow_batch_udf(func, return_type)
+        return wrap_arrow_batch_udf(func, args_offsets, kwargs_offsets, 
return_type)
     elif eval_type == PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF:
-        udf = wrap_pandas_batch_iter_udf(func, return_type)
+        return args_offsets, wrap_pandas_batch_iter_udf(func, return_type)
     elif eval_type == PythonEvalType.SQL_MAP_PANDAS_ITER_UDF:
-        udf = wrap_pandas_batch_iter_udf(func, return_type)
+        return args_offsets, wrap_pandas_batch_iter_udf(func, return_type)
     elif eval_type == PythonEvalType.SQL_MAP_ARROW_ITER_UDF:
-        udf = wrap_arrow_batch_iter_udf(func, return_type)
+        return args_offsets, wrap_arrow_batch_iter_udf(func, return_type)
     elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF:
         argspec = getfullargspec(chained_func)  # signature was lost when 
wrapping it
-        udf = wrap_grouped_map_pandas_udf(func, return_type, argspec, 
runner_conf)
+        return args_offsets, wrap_grouped_map_pandas_udf(func, return_type, 
argspec, runner_conf)
     elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE:
-        udf = wrap_grouped_map_pandas_udf_with_state(func, return_type)
+        return args_offsets, wrap_grouped_map_pandas_udf_with_state(func, 
return_type)
     elif eval_type == PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF:
         argspec = getfullargspec(chained_func)  # signature was lost when 
wrapping it
-        udf = wrap_cogrouped_map_pandas_udf(func, return_type, argspec, 
runner_conf)
+        return args_offsets, wrap_cogrouped_map_pandas_udf(func, return_type, 
argspec, runner_conf)
     elif eval_type == PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF:
-        udf = wrap_grouped_agg_pandas_udf(func, return_type)
+        return wrap_grouped_agg_pandas_udf(func, args_offsets, kwargs_offsets, 
return_type)
     elif eval_type == PythonEvalType.SQL_WINDOW_AGG_PANDAS_UDF:
-        udf = wrap_window_agg_pandas_udf(func, return_type, runner_conf, 
udf_index)
+        return wrap_window_agg_pandas_udf(
+            func, args_offsets, kwargs_offsets, return_type, runner_conf, 
udf_index
+        )
     elif eval_type == PythonEvalType.SQL_BATCHED_UDF:
-        udf = wrap_udf(func, return_type)
+        return wrap_udf(func, args_offsets, kwargs_offsets, return_type)
     else:
         raise ValueError("Unknown eval type: {}".format(eval_type))
-    return args_offsets, kwargs_offsets, udf
 
 
 # Used by SQL_GROUPED_MAP_PANDAS_UDF and SQL_SCALAR_PANDAS_UDF and 
SQL_ARROW_BATCHED_UDF when
@@ -789,9 +826,9 @@ def read_udtf(pickleSer, infile, eval_type):
                 return result
 
             # Wrap the exception thrown from the UDTF in a PySparkRuntimeError.
-            def func(*a: Any, **kw: Any) -> Any:
+            def func(*a: Any) -> Any:
                 try:
-                    return f(*a, **kw)
+                    return f(*a)
                 except Exception as e:
                     raise PySparkRuntimeError(
                         error_class="UDTF_EXEC_ERROR",
@@ -810,28 +847,26 @@ def read_udtf(pickleSer, infile, eval_type):
                         },
                     )
 
-            def evaluate(*args: pd.Series, **kwargs: pd.Series):
-                if len(args) == 0 and len(kwargs) == 0:
+            def evaluate(*args: pd.Series):
+                if len(args) == 0:
                     res = func()
                     check_return_value(res)
                     yield verify_result(pd.DataFrame(res)), arrow_return_type
                 else:
                     # Create tuples from the input pandas Series, each tuple
                     # represents a row across all Series.
-                    keys = list(kwargs.keys())
-                    len_args = len(args)
-                    row_tuples = zip(*args, *[kwargs[key] for key in keys])
+                    row_tuples = zip(*args)
                     for row in row_tuples:
-                        res = func(
-                            *row[:len_args],
-                            **{key: row[len_args + i] for i, key in 
enumerate(keys)},
-                        )
+                        res = func(*row)
                         check_return_value(res)
                         yield verify_result(pd.DataFrame(res)), 
arrow_return_type
 
             return evaluate
 
-        eval = wrap_arrow_udtf(getattr(udtf, "eval"), return_type)
+        eval_func_kwargs_support, args_kwargs_offsets = wrap_kwargs_support(
+            getattr(udtf, "eval"), args_offsets, kwargs_offsets
+        )
+        eval = wrap_arrow_udtf(eval_func_kwargs_support, return_type)
 
         if hasattr(udtf, "terminate"):
             terminate = wrap_arrow_udtf(getattr(udtf, "terminate"), 
return_type)
@@ -843,10 +878,7 @@ def read_udtf(pickleSer, infile, eval_type):
                 for a in it:
                     # The eval function yields an iterator. Each element 
produced by this
                     # iterator is a tuple in the form of (pandas.DataFrame, 
arrow_return_type).
-                    yield from eval(
-                        *[a[o] for o in args_offsets],
-                        **{k: a[o] for k, o in kwargs_offsets.items()},
-                    )
+                    yield from eval(*[a[o] for o in args_kwargs_offsets])
             finally:
                 if terminate is not None:
                     yield from terminate()
@@ -884,9 +916,9 @@ def read_udtf(pickleSer, infile, eval_type):
                 return toInternal(result)
 
             # Evaluate the function and return a tuple back to the executor.
-            def evaluate(*a, **kw) -> tuple:
+            def evaluate(*a) -> tuple:
                 try:
-                    res = f(*a, **kw)
+                    res = f(*a)
                 except Exception as e:
                     raise PySparkRuntimeError(
                         error_class="UDTF_EXEC_ERROR",
@@ -914,7 +946,10 @@ def read_udtf(pickleSer, infile, eval_type):
 
             return evaluate
 
-        eval = wrap_udtf(getattr(udtf, "eval"), return_type)
+        eval_func_kwargs_support, args_kwargs_offsets = wrap_kwargs_support(
+            getattr(udtf, "eval"), args_offsets, kwargs_offsets
+        )
+        eval = wrap_udtf(eval_func_kwargs_support, return_type)
 
         if hasattr(udtf, "terminate"):
             terminate = wrap_udtf(getattr(udtf, "terminate"), return_type)
@@ -925,10 +960,7 @@ def read_udtf(pickleSer, infile, eval_type):
         def mapper(_, it):
             try:
                 for a in it:
-                    yield eval(
-                        *[a[o] for o in args_offsets],
-                        **{k: a[o] for k, o in kwargs_offsets.items()},
-                    )
+                    yield eval(*[a[o] for o in args_kwargs_offsets])
             finally:
                 if terminate is not None:
                     yield terminate()
@@ -1028,9 +1060,7 @@ def read_udfs(pickleSer, infile, eval_type):
         if is_map_arrow_iter:
             assert num_udfs == 1, "One MAP_ARROW_ITER UDF expected here."
 
-        arg_offsets, _, udf = read_single_udf(
-            pickleSer, infile, eval_type, runner_conf, udf_index=0
-        )
+        arg_offsets, udf = read_single_udf(pickleSer, infile, eval_type, 
runner_conf, udf_index=0)
 
         def func(_, iterator):
             num_input_rows = 0
@@ -1120,7 +1150,7 @@ def read_udfs(pickleSer, infile, eval_type):
 
         # See FlatMapGroupsInPandasExec for how arg_offsets are used to
         # distinguish between grouping attributes and data attributes
-        arg_offsets, _, f = read_single_udf(pickleSer, infile, eval_type, 
runner_conf, udf_index=0)
+        arg_offsets, f = read_single_udf(pickleSer, infile, eval_type, 
runner_conf, udf_index=0)
         parsed_offsets = extract_key_value_indexes(arg_offsets)
 
         # Create function like this:
@@ -1137,7 +1167,7 @@ def read_udfs(pickleSer, infile, eval_type):
 
         # See FlatMapGroupsInPandas(WithState)Exec for how arg_offsets are 
used to
         # distinguish between grouping attributes and data attributes
-        arg_offsets, _, f = read_single_udf(pickleSer, infile, eval_type, 
runner_conf, udf_index=0)
+        arg_offsets, f = read_single_udf(pickleSer, infile, eval_type, 
runner_conf, udf_index=0)
         parsed_offsets = extract_key_value_indexes(arg_offsets)
 
         def mapper(a):
@@ -1171,7 +1201,7 @@ def read_udfs(pickleSer, infile, eval_type):
         # We assume there is only one UDF here because cogrouped map doesn't
         # support combining multiple UDFs.
         assert num_udfs == 1
-        arg_offsets, _, f = read_single_udf(pickleSer, infile, eval_type, 
runner_conf, udf_index=0)
+        arg_offsets, f = read_single_udf(pickleSer, infile, eval_type, 
runner_conf, udf_index=0)
 
         parsed_offsets = extract_key_value_indexes(arg_offsets)
 
@@ -1188,10 +1218,7 @@ def read_udfs(pickleSer, infile, eval_type):
             udfs.append(read_single_udf(pickleSer, infile, eval_type, 
runner_conf, udf_index=i))
 
         def mapper(a):
-            result = tuple(
-                f(*[a[o] for o in args_offsets], **{k: a[o] for k, o in 
kwargs_offsets.items()})
-                for args_offsets, kwargs_offsets, f in udfs
-            )
+            result = tuple(f(*[a[o] for o in arg_offsets]) for arg_offsets, f 
in udfs)
             # In the special case of a single UDF this will return a single 
result rather
             # than a tuple of results; this is the format that the JVM side 
expects.
             if len(result) == 1:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to