zero323 commented on a change in pull request #27433:
[SPARK-30682][SPARKR][SQL] Add SparkR interface for higher order functions
URL: https://github.com/apache/spark/pull/27433#discussion_r374069388
##########
File path: R/pkg/R/functions.R
##########
@@ -3281,6 +3322,126 @@ setMethod("row_number",
###################### Collection functions######################
+#' Create o.a.s.sql.expressions.UnresolvedNamedLambdaVariable,
+#' convert it to o.s.sql.Column and wrap with R Column.
+#' Used by higher order functions.
+#'
+#' @param ... character of length = 1
+#' if length(...) > 1 then argument is interpreted as a nested
+#' Column, for example \code{unresolved_named_lambda_var("a", "b", "c")}
+#' yields unresolved \code{a.b.c}
+#' @return Column object wrapping JVM UnresolvedNamedLambdaVariable
+unresolved_named_lambda_var <- function(...) {
+ jc <- sparkR.newJObject(
+ "org.apache.spark.sql.Column",
+ sparkR.newJObject(
+
"org.apache.spark.sql.catalyst.expressions.UnresolvedNamedLambdaVariable",
+ list(...)
+ )
+ )
+ column(jc)
+}
+
+#' Create o.a.s.sql.expressions.LambdaFunction corresponding
+#' to transformation described by func.
+#' Used by higher order functions.
+#'
+#' @param fun R \code{function} (unary, binary or ternary)
+#' that transforms \code{Columns} into a \code{Column}
+#' @param expected_nargs numeric a vector of the expected
+#' number of arguments. Used for validation
+#' @return JVM \code{LambdaFunction} object
+create_lambda <- function(fun, expected_nargs) {
+ as_jexpr <- function(x) callJMethod(x@jc, "expr")
+
+ # Process function arguments
+ parameters <- formals(fun)
+ nparameters <- length(parameters)
+
+ stopifnot(nparameters %in% expected_nargs)
Review comment:
I guess this not a place for a broader discussion about the problem, so let
me defend this particular choice (by this I mean both Python and R
implementation):
- To create lambda placeholders we have to analyze signature ‒ this account
for majority of related logic (weights more heavily on Python than on R). This
part is not optional, and we mostly piggyback on it (R adds one line of code in
`create_lambda` and one additional numeric that is passed from invoking
wrapper).
- Even if this check is removed we still do arity validation. This is
primarily to keep things close to Scala and generate placeholders with
matching names (`x`, `y`, `z`) instead of something like [`x1`, `x2`, ...]. It
is not strictly necessary, but makes debugging easier, and adds 5 lines of code
(could fit in a one without significant loss of readbility, if it wasn't for
line length restriction)
- So we have literally < 10 lines of lightweight code (out of 400 here, 500
in PySpark variant) to perform argument validation. This gives us
```
> array_filter("xs", function(x, y, z) TRUE)
Error in (function (fun, expected_nargs) :
nparameters %in% expected_nargs is not TRUE
11.
stop(simpleError(msg, call = if (p <- sys.parent(1L)) sys.call(p)))
10.
stopifnot(nparameters %in% expected_nargs) at functions.R#3361
9.
(function (fun, expected_nargs)
{
as_jexpr <- function(x) callJMethod(x@jc, "expr")
parameters <- formals(fun) ...
8.
do.call(create_lambda, args) at functions.R#3412
7.
FUN(X[[i]], ...)
6.
lapply(funs, function(args) do.call(create_lambda, args))
5.
lapply(funs, function(args) do.call(create_lambda, args)) at
functions.R#3412
4.
do.call(sparkR.newJObject,
c(paste("org.apache.spark.sql.catalyst.expressions",
name, sep = "."), lapply(cols, as_jexpr), lapply(funs,
function(args) do.call(create_lambda,
args)))) at functions.R#3412
3.
invoke_higher_order_function("ArrayFilter", cols = list(x), funs =
list(list(fun = f,
expected_nargs = c(1, 2)))) at functions.R#3510
2.
array_filter("xs", function(x, y, z) TRUE) at generics.R#786
1.
array_filter("xs", function(x, y, z) TRUE)
```
vs.
```
20/02/03 12:38:22 ERROR RBackendHandler: select on 20 failed
java.lang.reflect.InvocationTargetException
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
org.apache.spark.api.r.RBackendHandler.handleMethodCall(RBackendHandler.scala:164)
at
org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:105)
at
org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:39)
at
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328)
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
at
io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
at
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.spark.sql.AnalysisException: The number of lambda
function arguments '3' does not match the number of arguments expected by the
higher order function '1'.;
at
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at
org.apache.spark.sql.catalyst.analysis.ResolveLambdaVariables.createLambda(higherOrderFunctions.scala:111)
at
org.apache.spark.sql.catalyst.analysis.ResolveLambdaVariables.$anonfun$resolve$1(higherOrderFunctions.scala:144)
at
org.apache.spark.sql.catalyst.expressions.ArrayFilter.bind(higherOrderFunctions.scala:477)
at
org.apache.spark.sql.catalyst.expressions.ArrayFilter.bind(higherOrderFunctions.scala:462)
at
org.apache.spark.sql.catalyst.analysis.ResolveLambdaVariables.org$apache$spark$sql$catalyst$analysis$ResolveLambdaVariables$$resolve(higherOrderFunctions.scala:144)
at
org.apache.spark.sql.catalyst.analysis.ResolveLambdaVariables.$anonfun$resolve$6(higherOrderFunctions.scala:167)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:376)
at
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:214)
at
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:374)
at
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327)
at
org.apache.spark.sql.catalyst.analysis.ResolveLambdaVariables.org$apache$spark$sql$catalyst$analysis$ResolveLambdaVariables$$resolve(higherOrderFunctions.scala:167)
at
org.apache.spark.sql.catalyst.analysis.ResolveLambdaVariables$$anonfun$apply$2.$anonfun$applyOrElse$3(higherOrderFunctions.scala:90)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:109)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:109)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:120)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$3(QueryPlan.scala:125)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:125)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:130)
at
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:214)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:130)
at
org.apache.spark.sql.catalyst.analysis.ResolveLambdaVariables$$anonfun$apply$2.applyOrElse(higherOrderFunctions.scala:90)
at
org.apache.spark.sql.catalyst.analysis.ResolveLambdaVariables$$anonfun$apply$2.applyOrElse(higherOrderFunctions.scala:88)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$2(AnalysisHelper.scala:108)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$1(AnalysisHelper.scala:108)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:194)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown(AnalysisHelper.scala:106)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown$(AnalysisHelper.scala:104)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDown(LogicalPlan.scala:29)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators(AnalysisHelper.scala:73)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators$(AnalysisHelper.scala:72)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:29)
at
org.apache.spark.sql.catalyst.analysis.ResolveLambdaVariables.apply(higherOrderFunctions.scala:88)
at
org.apache.spark.sql.catalyst.analysis.ResolveLambdaVariables.apply(higherOrderFunctions.scala:73)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:130)
at
scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at
scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:89)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:127)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:119)
at scala.collection.immutable.List.foreach(List.scala:392)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:119)
at
org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:175)
at
org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:169)
at
org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:129)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:98)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:98)
at
org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:153)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)
at
org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:152)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:69)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at
org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:66)
at
org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:66)
at
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:58)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:87)
at
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:3526)
at org.apache.spark.sql.Dataset.select(Dataset.scala:1433)
... 37 more
Error in handleErrors(returnStatus, conn) :
org.apache.spark.sql.AnalysisException: The number of lambda function
arguments '3' does not match the number of arguments expected by the higher
order function '1'.;
at
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at
org.apache.spark.sql.catalyst.analysis.ResolveLambdaVariables.createLambda(higherOrderFunctions.scala:111)
at
org.apache.spark.sql.catalyst.analysis.ResolveLambdaVariables.$anonfun$resolve$1(higherOrderFunctions.scala:144)
at
org.apache.spark.sql.catalyst.expressions.ArrayFilter.bind(higherOrderFunctions.scala:477)
at
org.apache.spark.sql.catalyst.expressions.ArrayFilter.bind(higherOrderFunctions.scala:462)
at
org.apache.spark.sql.catalyst.analysis.ResolveLambdaVariables.org$apache$spark$sql$catalyst$analysis$ResolveLambdaVariables$$resolve(higherOrderFunctions.scala:144)
at
org.apache.spark.sql.catalyst.analysis.ResolveLambdaVariables.$anonfun$resolve$6(higherOrderFunctions.scala:167)
at org.apache.spark.
```
The problem is not so much about getting unfamiliar wall of JVM
traceback, or amount of information that is conveyed to the user (naked
`stopifnot` instead of `stop` with message is a looser here, but that's easy to
fix), but that I get reference to specific piece of code in my language of
choice, and stack that I can inspect the stack using standard language tools.
That's something that JVM exception, no matter how nicely wrapped, just
cannot compete and alone is win by knockout.
As a bonus guest language failures are usually fail-fast ‒ depending on
the location, arity pre-check can fail, before any actual work is done. Not so
much about failover to JVM exception. R users are more likely to inline
expressions, but in Python it is not unusual to define expression outside
`DataFrame` method calls.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]