zhengruifeng commented on code in PR #39068:
URL: https://github.com/apache/spark/pull/39068#discussion_r1049238906
##########
python/pyspark/sql/connect/functions.py:
##########
@@ -79,6 +85,84 @@ def _invoke_binary_math_function(name: str, col1: Any, col2:
Any) -> Column:
return _invoke_function(name, *_cols)
+def _get_lambda_parameters(f: Callable) -> ValuesView[inspect.Parameter]:
+ signature = inspect.signature(f)
+ parameters = signature.parameters.values()
+
+ # We should exclude functions that use
+ # variable args and keyword argnames
+ # as well as keyword only args
+ supported_parameter_types = {
+ inspect.Parameter.POSITIONAL_OR_KEYWORD,
+ inspect.Parameter.POSITIONAL_ONLY,
+ }
+
+ # Validate that
+ # function arity is between 1 and 3
+ if not (1 <= len(parameters) <= 3):
+ raise ValueError(
+ "f should take between 1 and 3 arguments, but provided function
takes {}".format(
+ len(parameters)
+ )
+ )
+
+ # and all arguments can be used as positional
+ if not all(p.kind in supported_parameter_types for p in parameters):
+ raise ValueError("f should use only POSITIONAL or POSITIONAL OR
KEYWORD arguments")
+
+ return parameters
+
+
+def _create_lambda(f: Callable) -> LambdaFunction:
+ """
+ Create `o.a.s.sql.expressions.LambdaFunction` corresponding
+ to transformation described by f
+
+ :param f: A Python of one of the following forms:
+ - (Column) -> Column: ...
+ - (Column, Column) -> Column: ...
+ - (Column, Column, Column) -> Column: ...
+ """
+ parameters = _get_lambda_parameters(f)
+
+ arg_names = ["x", "y", "z"]
+
+ arg_cols: List[Column] = []
+ for arg in arg_names[: len(parameters)]:
+ # TODO: How to make sure lambda variable names are unique? RPC for
increasing ID?
Review Comment:
PySpark invokes `UnresolvedNamedLambdaVariable.freshVarName` in JVM to get a
unique variable name
```
object UnresolvedNamedLambdaVariable {
// Counter to ensure lambda variable names are unique
private val nextVarNameId = new AtomicInteger(0)
def freshVarName(name: String): String = {
s"${name}_${nextVarNameId.getAndIncrement()}"
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]