zero323 commented on a change in pull request #27433: 
[SPARK-30682][SPARKR][SQL] Add SparkR interface for higher order functions
URL: https://github.com/apache/spark/pull/27433#discussion_r374069388
 
 

 ##########
 File path: R/pkg/R/functions.R
 ##########
 @@ -3281,6 +3322,126 @@ setMethod("row_number",
 
 ###################### Collection functions######################
 
+#' Create o.a.s.sql.expressions.UnresolvedNamedLambdaVariable,
+#' convert it to o.s.sql.Column and wrap with R Column.
+#' Used by higher order functions.
+#'
+#' @param ... character of length = 1
+#'        if length(...) > 1 then argument is interpreted as a nested
+#'        Column, for example \code{unresolved_named_lambda_var("a", "b", "c")}
+#'        yields unresolved \code{a.b.c}
+#' @return Column object wrapping JVM UnresolvedNamedLambdaVariable
+unresolved_named_lambda_var <- function(...) {
+  jc <- sparkR.newJObject(
+    "org.apache.spark.sql.Column",
+    sparkR.newJObject(
+      
"org.apache.spark.sql.catalyst.expressions.UnresolvedNamedLambdaVariable",
+      list(...)
+    )
+  )
+  column(jc)
+}
+
+#' Create o.a.s.sql.expressions.LambdaFunction corresponding
+#' to transformation described by func.
+#' Used by higher order functions.
+#'
+#' @param fun R \code{function} (unary, binary or ternary)
+#'        that transforms \code{Columns} into a \code{Column}
+#' @param expected_nargs numeric a vector of the expected
+#'        number of arguments. Used for validation
+#' @return JVM \code{LambdaFunction} object
+create_lambda <- function(fun, expected_nargs) {
+  as_jexpr <- function(x) callJMethod(x@jc, "expr")
+
+  # Process function arguments
+  parameters <- formals(fun)
+  nparameters <- length(parameters)
+
+  stopifnot(nparameters %in% expected_nargs)
 
 Review comment:
   I guess this not a place for a broader discussion about the problem, so let 
me defend this particular choice (by this I mean both Python and R 
implementation):
   
   - To create lambda placeholders we have to analyze signature ‒ this account 
for majority of related logic (weights more heavily on Python than on R). This 
part is not optional, and we mostly piggyback on it (R adds one line of code in 
`create_lambda` and one additional numeric that is passed from invoking 
wrapper).
   - Even if this check is removed we still do arity validation. This is 
primarily to keep things close to Scala and generate  placeholders with 
matching names (`x`, `y`, `z`) instead of something like [`x1`, `x2`, ...]. It 
is not strictly necessary, but makes debugging easier, and adds 5 lines of code 
(could fit in a one without significant loss of readbility, if it wasn't for 
line length restriction)
   - So we have literally < 10 lines of lightweight code (out of 400 here, 500 
in PySpark variant) to perform argument validation.  This gives us
   
       ```
       > array_filter("xs", function(x, y, z) TRUE)
        Error in (function (fun, expected_nargs)  : 
         nparameters %in% expected_nargs is not TRUE 
       11.
       stop(simpleError(msg, call = if (p <- sys.parent(1L)) sys.call(p))) 
       10.
       stopifnot(nparameters %in% expected_nargs) at functions.R#3361
       9.
       (function (fun, expected_nargs) 
       {
           as_jexpr <- function(x) callJMethod(x@jc, "expr")
           parameters <- formals(fun) ... 
       8.
       do.call(create_lambda, args) at functions.R#3412
       7.
       FUN(X[[i]], ...) 
       6.
       lapply(funs, function(args) do.call(create_lambda, args)) 
       5.
       lapply(funs, function(args) do.call(create_lambda, args)) at 
functions.R#3412
       4.
       do.call(sparkR.newJObject, 
c(paste("org.apache.spark.sql.catalyst.expressions", 
           name, sep = "."), lapply(cols, as_jexpr), lapply(funs, 
function(args) do.call(create_lambda, 
           args)))) at functions.R#3412
       3.
       invoke_higher_order_function("ArrayFilter", cols = list(x), funs = 
list(list(fun = f, 
           expected_nargs = c(1, 2)))) at functions.R#3510
       2.
       array_filter("xs", function(x, y, z) TRUE) at generics.R#786
       1.
       array_filter("xs", function(x, y, z) TRUE) 
       ```
   
       vs. 
   
       ```
       20/02/03 12:38:22 ERROR RBackendHandler: select on 20 failed
       java.lang.reflect.InvocationTargetException
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at 
org.apache.spark.api.r.RBackendHandler.handleMethodCall(RBackendHandler.scala:164)
        at 
org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:105)
        at 
org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:39)
        at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at 
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328)
        at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
        at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
        at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:834)
       Caused by: org.apache.spark.sql.AnalysisException: The number of lambda 
function arguments '3' does not match the number of arguments expected by the 
higher order function '1'.;
        at 
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
        at 
org.apache.spark.sql.catalyst.analysis.ResolveLambdaVariables.createLambda(higherOrderFunctions.scala:111)
        at 
org.apache.spark.sql.catalyst.analysis.ResolveLambdaVariables.$anonfun$resolve$1(higherOrderFunctions.scala:144)
        at 
org.apache.spark.sql.catalyst.expressions.ArrayFilter.bind(higherOrderFunctions.scala:477)
        at 
org.apache.spark.sql.catalyst.expressions.ArrayFilter.bind(higherOrderFunctions.scala:462)
        at 
org.apache.spark.sql.catalyst.analysis.ResolveLambdaVariables.org$apache$spark$sql$catalyst$analysis$ResolveLambdaVariables$$resolve(higherOrderFunctions.scala:144)
        at 
org.apache.spark.sql.catalyst.analysis.ResolveLambdaVariables.$anonfun$resolve$6(higherOrderFunctions.scala:167)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:376)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:214)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:374)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327)
        at 
org.apache.spark.sql.catalyst.analysis.ResolveLambdaVariables.org$apache$spark$sql$catalyst$analysis$ResolveLambdaVariables$$resolve(higherOrderFunctions.scala:167)
        at 
org.apache.spark.sql.catalyst.analysis.ResolveLambdaVariables$$anonfun$apply$2.$anonfun$applyOrElse$3(higherOrderFunctions.scala:90)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:109)
        at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:109)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:120)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$3(QueryPlan.scala:125)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at scala.collection.AbstractTraversable.map(Traversable.scala:108)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:125)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:130)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:214)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:130)
        at 
org.apache.spark.sql.catalyst.analysis.ResolveLambdaVariables$$anonfun$apply$2.applyOrElse(higherOrderFunctions.scala:90)
        at 
org.apache.spark.sql.catalyst.analysis.ResolveLambdaVariables$$anonfun$apply$2.applyOrElse(higherOrderFunctions.scala:88)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$2(AnalysisHelper.scala:108)
        at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$1(AnalysisHelper.scala:108)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:194)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown(AnalysisHelper.scala:106)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown$(AnalysisHelper.scala:104)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDown(LogicalPlan.scala:29)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators(AnalysisHelper.scala:73)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators$(AnalysisHelper.scala:72)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:29)
        at 
org.apache.spark.sql.catalyst.analysis.ResolveLambdaVariables.apply(higherOrderFunctions.scala:88)
        at 
org.apache.spark.sql.catalyst.analysis.ResolveLambdaVariables.apply(higherOrderFunctions.scala:73)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:130)
        at 
scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
        at 
scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
        at scala.collection.immutable.List.foldLeft(List.scala:89)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:127)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:119)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:119)
        at 
org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:175)
        at 
org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:169)
        at 
org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:129)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:98)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:98)
        at 
org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:153)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)
        at 
org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:152)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:69)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at 
org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:66)
        at 
org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:66)
        at 
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:58)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:87)
        at 
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:3526)
        at org.apache.spark.sql.Dataset.select(Dataset.scala:1433)
        ... 37 more
        Error in handleErrors(returnStatus, conn) : 
         org.apache.spark.sql.AnalysisException: The number of lambda function 
arguments '3' does not match the number of arguments expected by the higher 
order function '1'.;
        at 
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
        at 
org.apache.spark.sql.catalyst.analysis.ResolveLambdaVariables.createLambda(higherOrderFunctions.scala:111)
        at 
org.apache.spark.sql.catalyst.analysis.ResolveLambdaVariables.$anonfun$resolve$1(higherOrderFunctions.scala:144)
        at 
org.apache.spark.sql.catalyst.expressions.ArrayFilter.bind(higherOrderFunctions.scala:477)
        at 
org.apache.spark.sql.catalyst.expressions.ArrayFilter.bind(higherOrderFunctions.scala:462)
        at 
org.apache.spark.sql.catalyst.analysis.ResolveLambdaVariables.org$apache$spark$sql$catalyst$analysis$ResolveLambdaVariables$$resolve(higherOrderFunctions.scala:144)
        at 
org.apache.spark.sql.catalyst.analysis.ResolveLambdaVariables.$anonfun$resolve$6(higherOrderFunctions.scala:167)
        at org.apache.spark. 
       ```
   
       The problem is not so much about getting unfamiliar wall of JVM 
traceback, or amount of information that is conveyed to the user (naked 
`stopifnot` instead of `stop` with message is a looser here, but that's easy to 
fix), but that I get reference to specific piece of code in my language of 
choice, and stack that I  can inspect the stack using standard language tools.
   
      That's something that JVM exception, no matter how nicely wrapped, just 
cannot compete and alone is win by knockout.
   
     As a bonus guest language failures are usually fail-fast ‒ depending on 
the location, arity pre-check can fail, before any actual work is done. Not so 
much about failover to JVM exception. R users are more likely to inline 
expressions, but in Python it is not unusual to define expression outside 
`DataFrame` method calls.
   
     
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to