cloud-fan commented on code in PR #39068:
URL: https://github.com/apache/spark/pull/39068#discussion_r1051941366


##########
python/pyspark/sql/connect/functions.py:
##########
@@ -80,6 +84,78 @@ def _invoke_binary_math_function(name: str, col1: Any, col2: 
Any) -> Column:
     return _invoke_function(name, *_cols)
 
 
+def _get_lambda_parameters(f: Callable) -> ValuesView[inspect.Parameter]:
+    signature = inspect.signature(f)
+    parameters = signature.parameters.values()
+
+    # We should exclude functions that use, variable args and keyword argument
+    # names, as well as keyword only args.
+    supported_parameter_types = {
+        inspect.Parameter.POSITIONAL_OR_KEYWORD,
+        inspect.Parameter.POSITIONAL_ONLY,
+    }
+
+    # Validate that the function arity is between 1 and 3.
+    if not (1 <= len(parameters) <= 3):
+        raise ValueError(
+            "f should take between 1 and 3 arguments, but provided function 
takes {}".format(
+                len(parameters)
+            )
+        )
+
+    # Verify that all arguments can be used as positional arguments.
+    if not all(p.kind in supported_parameter_types for p in parameters):
+        raise ValueError("All arguments of f must be usable as POSITIONAL 
arguments")
+
+    return parameters
+
+
+def _create_lambda(f: Callable) -> LambdaFunction:
+    """
+    Create `o.a.s.sql.expressions.LambdaFunction` corresponding
+    to transformation described by f
+
+    :param f: A Python of one of the following forms:
+            - (Column) -> Column: ...
+            - (Column, Column) -> Column: ...
+            - (Column, Column, Column) -> Column: ...
+    """
+    parameters = _get_lambda_parameters(f)
+
+    arg_names = ["x", "y", "z"]
+    arg_cols = [column(arg) for arg in arg_names[: len(parameters)]]
+
+    result = f(*arg_cols)
+
+    if not isinstance(result, Column):
+        raise ValueError(f"Callable {f} should return Column, got 
{type(result)}")
+
+    return LambdaFunction(result._expr, [arg_col._expr for arg_col in 
arg_cols])
+
+
+def _invoke_higher_order_function(
+    name: str,
+    cols: List["ColumnOrName"],
+    funs: List[Callable],
+) -> Column:
+    """
+    Invokes expression identified by name,
+    (relative to ```org.apache.spark.sql.catalyst.expressions``)
+    and wraps the result with Column (first Scala one, then Python).
+
+    :param name: Name of the expression
+    :param cols: a list of columns
+    :param funs: a list of((*Column) -> Column functions.
+
+    :return: a Column
+    """
+    assert len(funs) == 1
+    _cols = [_to_col(c) for c in cols]
+    _funs = [_create_lambda(f) for f in funs]
+
+    return _invoke_function(name, *_cols, *_funs)

Review Comment:
   This looks like a hacky way to create lambda function...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to