Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/21954#discussion_r207141916 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import java.util.concurrent.atomic.AtomicReference + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} +import org.apache.spark.sql.types._ + +/** + * A named lambda variable. + */ +case class NamedLambdaVariable( + name: String, + dataType: DataType, + nullable: Boolean, + value: AtomicReference[Any] = new AtomicReference(), + exprId: ExprId = NamedExpression.newExprId) + extends LeafExpression + with NamedExpression + with CodegenFallback { + + override def qualifier: Option[String] = None + + override def newInstance(): NamedExpression = + copy(value = new AtomicReference(), exprId = NamedExpression.newExprId) + + override def toAttribute: Attribute = { + AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, None) + } + + override def eval(input: InternalRow): Any = value.get + + override def toString: String = s"lambda $name#${exprId.id}$typeSuffix" + + override def simpleString: String = s"lambda $name#${exprId.id}: ${dataType.simpleString}" +} + +/** + * A lambda function and its arguments. A lambda function can be hidden when a user wants to + * process an completely independent expression in a [[HigherOrderFunction]], the lambda function + * and its variables are then only used for internal bookkeeping within the higher order function. + */ +case class LambdaFunction( + function: Expression, + arguments: Seq[NamedExpression], + hidden: Boolean = false) + extends Expression with CodegenFallback { + + override def children: Seq[Expression] = function +: arguments + override def dataType: DataType = function.dataType + override def nullable: Boolean = function.nullable + + lazy val bound: Boolean = arguments.forall(_.resolved) + + override def eval(input: InternalRow): Any = function.eval(input) +} + +/** + * A higher order function takes one or more (lambda) functions and applies these to some objects. + * The function produces a number of variables which can be consumed by some lambda function. + */ +trait HigherOrderFunction extends Expression { + + override def children: Seq[Expression] = inputs ++ functions + + /** + * Inputs to the higher ordered function. + */ + def inputs: Seq[Expression] + + /** + * All inputs have been resolved. This means that the types and nullabilty of (most of) the + * lambda function arguments is known, and that we can start binding the lambda functions. + */ + lazy val inputResolved: Boolean = inputs.forall(_.resolved) + + /** + * Functions applied by the higher order function. + */ + def functions: Seq[Expression] + + /** + * All inputs must be resolved and all functions must be resolved lambda functions. + */ + override lazy val resolved: Boolean = inputResolved && functions.forall { + case l: LambdaFunction => l.resolved + case _ => false + } + + /** + * Bind the lambda functions to the [[HigherOrderFunction]] using the given bind function. The + * bind function takes the potential lambda and it's (partial) arguments and converts this into + * a bound lambda function. + */ + def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): HigherOrderFunction + + @transient lazy val functionsForEval: Seq[Expression] = functions.map { + case LambdaFunction(function, arguments, hidden) => + val argumentMap = arguments.map { arg => arg.exprId -> arg }.toMap + function.transformUp { + case variable: NamedLambdaVariable if argumentMap.contains(variable.exprId) => + argumentMap(variable.exprId) + } + } +} + +trait ArrayBasedHigherOrderFunction extends HigherOrderFunction with ExpectsInputTypes { + + def input: Expression + + override def inputs: Seq[Expression] = input :: Nil + + def function: Expression + + override def functions: Seq[Expression] = function :: Nil + + def expectingFunctionType: AbstractDataType = AnyDataType + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, expectingFunctionType) + + @transient lazy val functionForEval: Expression = functionsForEval.head +} + +/** + * Transform elements in an array using the transform function. This is similar to + * a `map` in functional programming. + */ +@ExpressionDescription( + usage = "_FUNC_(expr, func) - Transforms elements in an array using the function.", + examples = """ + Examples: + > SELECT _FUNC_(array(1, 2, 3), x -> x + 1); + array(2, 3, 4) + > SELECT _FUNC_(array(1, 2, 3), (x, i) -> x + i); + array(1, 3, 5) + """, + since = "2.4.0") +case class ArrayTransform( + input: Expression, + function: Expression) + extends ArrayBasedHigherOrderFunction with CodegenFallback { + + override def nullable: Boolean = input.nullable + + override def dataType: ArrayType = ArrayType(function.dataType, function.nullable) + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): ArrayTransform = { + val (elementType, containsNull) = input.dataType match { + case ArrayType(elementType, containsNull) => (elementType, containsNull) + case _ => + val ArrayType(elementType, containsNull) = ArrayType.defaultConcreteType --- End diff -- When does this happen?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org