HyukjinKwon commented on a change in pull request #27433:
[SPARK-30682][SPARKR][SQL] Add SparkR interface for higher order functions
URL: https://github.com/apache/spark/pull/27433#discussion_r376324448
##########
File path: R/pkg/R/functions.R
##########
@@ -3281,6 +3322,121 @@ setMethod("row_number",
###################### Collection functions######################
+#' Create o.a.s.sql.expressions.UnresolvedNamedLambdaVariable,
+#' convert it to o.s.sql.Column and wrap with R Column.
+#' Used by higher order functions.
+#'
+#' @param ... character of length = 1
+#' if length(...) > 1 then argument is interpreted as a nested
+#' Column, for example \code{unresolved_named_lambda_var("a", "b", "c")}
+#' yields unresolved \code{a.b.c}
+#' @return Column object wrapping JVM UnresolvedNamedLambdaVariable
+unresolved_named_lambda_var <- function(...) {
+ jc <- newJObject(
+ "org.apache.spark.sql.Column",
+ newJObject(
+
"org.apache.spark.sql.catalyst.expressions.UnresolvedNamedLambdaVariable",
+ list(...)
+ )
+ )
+ column(jc)
+}
+
+#' Create o.a.s.sql.expressions.LambdaFunction corresponding
+#' to transformation described by func.
+#' Used by higher order functions.
+#'
+#' @param fun R \code{function} (unary, binary or ternary)
+#' that transforms \code{Columns} into a \code{Column}
+#' @return JVM \code{LambdaFunction} object
+create_lambda <- function(fun) {
+ as_jexpr <- function(x) callJMethod(x@jc, "expr")
+
+ # Process function arguments
+ parameters <- formals(fun)
+ nparameters <- length(parameters)
+
+ stopifnot(
+ nparameters >= 1 &
+ nparameters <= 3 &
+ !"..." %in% names(parameters)
+ )
+
+ args <- lapply(c("x", "y", "z")[seq_along(parameters)], function(p) {
+ unresolved_named_lambda_var(p)
+ })
+
+ # Invoke function and validate return type
+ result <- do.call(fun, args)
+ stopifnot(class(result) == "Column")
+
+ # Convert both Columns to Scala expressions
+ jexpr <- as_jexpr(result)
+
+ jargs <- handledCallJStatic(
+ "org.apache.spark.api.python.PythonUtils",
+ "toSeq",
+ handledCallJStatic(
+ "java.util.Arrays", "asList", lapply(args, as_jexpr)
+ )
+ )
+
+ # Create Scala LambdaFunction
+ sparkR.newJObject(
+ "org.apache.spark.sql.catalyst.expressions.LambdaFunction",
+ jexpr,
+ jargs,
+ FALSE
+ )
+}
+
+#' Invokes higher order function expression identified by name,
+#' (relative to o.a.s.sql.catalyst.expressions)
+#'
+#' @param name character
+#' @param cols list of character or Column objects
+#' @param funs list of named list(fun = ..., expected_narg = ...)
+#' @return a \code{Column} representing name applied to cols with funs
+invoke_higher_order_function <- function(name, cols, funs) {
+ as_jexpr <- function(x) {
+ if (class(x) == "character") {
+ x <- column(x)
+ }
+ callJMethod(x@jc, "expr")
+ }
+
+ jexpr <- do.call(sparkR.newJObject, c(
+ paste("org.apache.spark.sql.catalyst.expressions", name, sep = "."),
Review comment:
Oops, I misread the codes. Yup.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]