ueshin commented on code in PR #43611:
URL: https://github.com/apache/spark/pull/43611#discussion_r1390014123


##########
python/pyspark/sql/worker/analyze_udtf.py:
##########
@@ -116,12 +118,150 @@ def main(infile: IO, outfile: IO) -> None:
         handler = read_udtf(infile)
         args, kwargs = read_arguments(infile)
 
+        error_prefix = f"Failed to evaluate the user-defined table function 
'{handler.__name__}'"
+
+        def format_error(msg: str) -> str:
+            return dedent(msg).replace("\n", " ")
+
+        # Check invariants about the 'analyze' and 'eval' methods before 
running them.
+        def check_method_invariants_before_running(
+            expected: inspect.FullArgSpec, method: str, is_static: bool
+        ) -> None:
+            num_expected_args = len(expected.args)
+            num_provided_args = len(args) + len(kwargs)
+            num_provided_non_kw_args = len(args)
+            if not is_static:
+                num_provided_args += 1
+                num_provided_non_kw_args += 1
+            if (
+                expected.varargs is None
+                and expected.varkw is None
+                and expected.defaults is None
+                and num_expected_args != num_provided_args
+            ):
+                # The UDTF call provided the wrong number of positional 
arguments.
+                def arguments(num: int) -> str:
+                    return f"{num} argument{'' if num == 1 else 's'}"
+
+                raise PySparkValueError(
+                    format_error(
+                        f"""
+                    {error_prefix} because its '{method}' method expects 
exactly

Review Comment:
   nit: indent: shall we have the same indent as `f"""` or one more indent?
   ditto for the following error messages.



##########
python/pyspark/sql/worker/analyze_udtf.py:
##########
@@ -116,12 +118,150 @@ def main(infile: IO, outfile: IO) -> None:
         handler = read_udtf(infile)
         args, kwargs = read_arguments(infile)
 
+        error_prefix = f"Failed to evaluate the user-defined table function 
'{handler.__name__}'"
+
+        def format_error(msg: str) -> str:
+            return dedent(msg).replace("\n", " ")
+
+        # Check invariants about the 'analyze' and 'eval' methods before 
running them.
+        def check_method_invariants_before_running(
+            expected: inspect.FullArgSpec, method: str, is_static: bool
+        ) -> None:

Review Comment:
   I'm not sure these checks fully cover Python's errors.
   I guess we should use `inspect.Signature.bind` and its error message to 
build new error messages.



##########
python/pyspark/sql/worker/analyze_udtf.py:
##########
@@ -116,12 +118,150 @@ def main(infile: IO, outfile: IO) -> None:
         handler = read_udtf(infile)
         args, kwargs = read_arguments(infile)
 
+        error_prefix = f"Failed to evaluate the user-defined table function 
'{handler.__name__}'"
+
+        def format_error(msg: str) -> str:
+            return dedent(msg).replace("\n", " ")
+
+        # Check invariants about the 'analyze' and 'eval' methods before 
running them.
+        def check_method_invariants_before_running(
+            expected: inspect.FullArgSpec, method: str, is_static: bool
+        ) -> None:
+            num_expected_args = len(expected.args)
+            num_provided_args = len(args) + len(kwargs)
+            num_provided_non_kw_args = len(args)
+            if not is_static:
+                num_provided_args += 1
+                num_provided_non_kw_args += 1
+            if (
+                expected.varargs is None
+                and expected.varkw is None
+                and expected.defaults is None
+                and num_expected_args != num_provided_args
+            ):
+                # The UDTF call provided the wrong number of positional 
arguments.
+                def arguments(num: int) -> str:
+                    return f"{num} argument{'' if num == 1 else 's'}"
+
+                raise PySparkValueError(
+                    format_error(
+                        f"""
+                    {error_prefix} because its '{method}' method expects 
exactly
+                    {arguments(num_expected_args)}, but the function call 
provided
+                    {arguments(num_provided_args)} instead. Please update the 
query so that it
+                    provides exactly {arguments(num_expected_args)}, or else 
update the table
+                    function so that its '{method}' method accepts exactly
+                    {arguments(num_provided_non_kw_args)}, and then try the 
query again."""
+                    )
+                )
+            expected_arg_names = set(expected.args)
+            provided_positional_arg_names = set(expected.args[: len(args)])
+            for arg_name in kwargs.keys():
+                if expected.varkw is None and arg_name not in 
expected_arg_names:
+                    # The UDTF call provided a keyword argument whose name was 
not expected.
+                    raise PySparkValueError(
+                        format_error(
+                            f"""
+                        {error_prefix} because its '{method}' method expects 
arguments whose names
+                        appear in the set ({', '.join(expected.args)}), but 
the function call
+                        provided a keyword argument with unexpected name 
'{arg_name}' instead.
+                        Please update the query so that it provides only 
keyword arguments whose
+                        names appear in this set, or else update the table 
function so that its
+                        '{method}' method accepts argument names including 
'{arg_name}', and then
+                        try the query again."""
+                        )
+                    )
+                elif arg_name in provided_positional_arg_names:
+                    # The UDTF call provided a duplicate keyword argument when 
a value for that
+                    # argument was already specified positionally.
+                    raise PySparkValueError(
+                        format_error(
+                            f"""
+                        {error_prefix} because the function call provided 
keyword argument
+                        '{arg_name}' whose corresponding value was already 
specified positionally.
+                        Please update the query so that it provides this 
argument's value exactly
+                        once instead, and then try the query again."""
+                        )
+                    )
+
+        check_method_invariants_before_running(
+            inspect.getfullargspec(handler.analyze),  # type: 
ignore[attr-defined]
+            "static analyze",
+            is_static=True,
+        )
+        if hasattr(handler, "eval"):
+            check_method_invariants_before_running(
+                inspect.getfullargspec(handler.eval),  # type: 
ignore[attr-defined]
+                "eval",
+                is_static=False,
+            )

Review Comment:
   The same/similar check is necessary in `worker.py` for the case without 
`analyze`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to