[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21954#discussion_r208282941 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala --- @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.catalog.SessionCatalog +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.DataType + +/** + * Resolve a higher order functions from the catalog. This is different from regular function + * resolution because lambda functions can only be resolved after the function has been resolved; + * so we need to resolve higher order function when all children are either resolved or a lambda + * function. + */ +case class ResolveHigherOrderFunctions(catalog: SessionCatalog) extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { +case q: LogicalPlan => + q.transformExpressions { +case u @ UnresolvedFunction(fn, children, false) +if hasLambdaAndResolvedArguments(children) => + withPosition(u) { +catalog.lookupFunction(fn, children) match { + case func: HigherOrderFunction => func + case other => other.failAnalysis( +"A lambda function should only be used in a higher order function. However, " + + s"its class is ${other.getClass.getCanonicalName}, which is not a " + + s"higher order function.") +} + } + } + } + + /** + * Check if the arguments of a function are either resolved or a lambda function. + */ + private def hasLambdaAndResolvedArguments(expressions: Seq[Expression]): Boolean = { +val (lambdas, others) = expressions.partition(_.isInstanceOf[LambdaFunction]) +lambdas.nonEmpty && others.forall(_.resolved) + } +} + +/** + * Resolve the lambda variables exposed by a higher order functions. + * + * This rule works in two steps: + * [1]. Bind the anonymous variables exposed by the higher order function to the lambda function's + * arguments; this creates named and typed lambda variables. The argument names are checked + * for duplicates and the number of arguments are checked during this step. + * [2]. Resolve the used lambda variables used in the lambda function's function expression tree. + * Note that we allow the use of variables from outside the current lambda, this can either + * be a lambda function defined in an outer scope, or a attribute in produced by the plan's + * child. If names are duplicate, the name defined in the most inner scope is used. + */ +case class ResolveLambdaVariables(conf: SQLConf) extends Rule[LogicalPlan] { + + type LambdaVariableMap = Map[String, NamedExpression] + + private val canonicalizer = { +if (!conf.caseSensitiveAnalysis) { + s: String => s.toLowerCase +} else { + s: String => s +} + } + + override def apply(plan: LogicalPlan): LogicalPlan = { +plan.resolveOperators { + case q: LogicalPlan => +q.mapExpressions(resolve(_, Map.empty)) +} + } + + /** + * Create a bound lambda function by binding the arguments of a lambda function to the given + * partial arguments (dataType and nullability only). If the expression happens to be an already + * bound lambda function then we assume it has been bound to the correct arguments and do + * nothing. This function will produce a lambda function with hidden arguments when it is passed + * an arbi
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21954#discussion_r208282971 --- Diff: sql/core/src/test/resources/sql-tests/inputs/higher-order-functions.sql --- @@ -0,0 +1,26 @@ +create or replace temporary view nested as values + (1, array(32, 97), array(array(12, 99), array(123, 42), array(1))), + (2, array(77, -76), array(array(6, 96, 65), array(-1, -2))), + (3, array(12), array(array(17))) + as t(x, ys, zs); + +-- Only allow lambda's in higher order functions. +select upper(x -> x) as v; + +-- Identity transform an array +select transform(zs, z -> z) as v from nested; + +-- Transform an array +select transform(ys, y -> y * y) as v from nested; + +-- Transform an array with index +select transform(ys, (y, i) -> y + i) as v from nested; + +-- Transform an array with reference +select transform(zs, z -> concat(ys, z)) as v from nested; + +-- Transform an array to an array of 0's +select transform(ys, 0) as v from nested; + +-- Transform a null array +select transform(cast(null as array), x -> x + 1) as v; --- End diff -- Actually we have some at #21965 and #21982. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21954#discussion_r208282782 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala --- @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.catalog.SessionCatalog +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.DataType + +/** + * Resolve a higher order functions from the catalog. This is different from regular function + * resolution because lambda functions can only be resolved after the function has been resolved; + * so we need to resolve higher order function when all children are either resolved or a lambda + * function. + */ +case class ResolveHigherOrderFunctions(catalog: SessionCatalog) extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { +case q: LogicalPlan => + q.transformExpressions { +case u @ UnresolvedFunction(fn, children, false) +if hasLambdaAndResolvedArguments(children) => + withPosition(u) { +catalog.lookupFunction(fn, children) match { + case func: HigherOrderFunction => func + case other => other.failAnalysis( +"A lambda function should only be used in a higher order function. However, " + + s"its class is ${other.getClass.getCanonicalName}, which is not a " + + s"higher order function.") +} + } + } + } + + /** + * Check if the arguments of a function are either resolved or a lambda function. + */ + private def hasLambdaAndResolvedArguments(expressions: Seq[Expression]): Boolean = { +val (lambdas, others) = expressions.partition(_.isInstanceOf[LambdaFunction]) +lambdas.nonEmpty && others.forall(_.resolved) + } +} + +/** + * Resolve the lambda variables exposed by a higher order functions. + * + * This rule works in two steps: + * [1]. Bind the anonymous variables exposed by the higher order function to the lambda function's + * arguments; this creates named and typed lambda variables. The argument names are checked + * for duplicates and the number of arguments are checked during this step. + * [2]. Resolve the used lambda variables used in the lambda function's function expression tree. + * Note that we allow the use of variables from outside the current lambda, this can either + * be a lambda function defined in an outer scope, or a attribute in produced by the plan's + * child. If names are duplicate, the name defined in the most inner scope is used. + */ +case class ResolveLambdaVariables(conf: SQLConf) extends Rule[LogicalPlan] { + + type LambdaVariableMap = Map[String, NamedExpression] + + private val canonicalizer = { +if (!conf.caseSensitiveAnalysis) { + s: String => s.toLowerCase +} else { + s: String => s +} + } + + override def apply(plan: LogicalPlan): LogicalPlan = { +plan.resolveOperators { + case q: LogicalPlan => +q.mapExpressions(resolve(_, Map.empty)) +} + } + + /** + * Create a bound lambda function by binding the arguments of a lambda function to the given + * partial arguments (dataType and nullability only). If the expression happens to be an already + * bound lambda function then we assume it has been bound to the correct arguments and do + * nothing. This function will produce a lambda function with hidden arguments when it is passed + * an a
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21954#discussion_r208282200 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import java.util.concurrent.atomic.AtomicReference + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} +import org.apache.spark.sql.types._ + +/** + * A named lambda variable. + */ +case class NamedLambdaVariable( +name: String, +dataType: DataType, +nullable: Boolean, +value: AtomicReference[Any] = new AtomicReference(), +exprId: ExprId = NamedExpression.newExprId) + extends LeafExpression + with NamedExpression + with CodegenFallback { + + override def qualifier: Option[String] = None + + override def newInstance(): NamedExpression = +copy(value = new AtomicReference(), exprId = NamedExpression.newExprId) + + override def toAttribute: Attribute = { +AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, None) + } + + override def eval(input: InternalRow): Any = value.get + + override def toString: String = s"lambda $name#${exprId.id}$typeSuffix" + + override def simpleString: String = s"lambda $name#${exprId.id}: ${dataType.simpleString}" +} + +/** + * A lambda function and its arguments. A lambda function can be hidden when a user wants to + * process an completely independent expression in a [[HigherOrderFunction]], the lambda function + * and its variables are then only used for internal bookkeeping within the higher order function. + */ +case class LambdaFunction( +function: Expression, +arguments: Seq[NamedExpression], +hidden: Boolean = false) + extends Expression with CodegenFallback { + + override def children: Seq[Expression] = function +: arguments + override def dataType: DataType = function.dataType + override def nullable: Boolean = function.nullable + + lazy val bound: Boolean = arguments.forall(_.resolved) + + override def eval(input: InternalRow): Any = function.eval(input) +} + +/** + * A higher order function takes one or more (lambda) functions and applies these to some objects. + * The function produces a number of variables which can be consumed by some lambda function. + */ +trait HigherOrderFunction extends Expression { + + override def children: Seq[Expression] = inputs ++ functions + + /** + * Inputs to the higher ordered function. + */ + def inputs: Seq[Expression] + + /** + * All inputs have been resolved. This means that the types and nullabilty of (most of) the + * lambda function arguments is known, and that we can start binding the lambda functions. + */ + lazy val inputResolved: Boolean = inputs.forall(_.resolved) + + /** + * Functions applied by the higher order function. + */ + def functions: Seq[Expression] + + /** + * All inputs must be resolved and all functions must be resolved lambda functions. + */ + override lazy val resolved: Boolean = inputResolved && functions.forall { +case l: LambdaFunction => l.resolved +case _ => false + } + + /** + * Bind the lambda functions to the [[HigherOrderFunction]] using the given bind function. The + * bind function takes the potential lambda and it's (partial) arguments and converts this into + * a bound lambda function. + */ + def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): HigherOrderFunction + + @transie
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21954#discussion_r208281900 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import java.util.concurrent.atomic.AtomicReference + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} +import org.apache.spark.sql.types._ + +/** + * A named lambda variable. + */ +case class NamedLambdaVariable( +name: String, +dataType: DataType, +nullable: Boolean, +value: AtomicReference[Any] = new AtomicReference(), +exprId: ExprId = NamedExpression.newExprId) + extends LeafExpression + with NamedExpression + with CodegenFallback { + + override def qualifier: Option[String] = None + + override def newInstance(): NamedExpression = +copy(value = new AtomicReference(), exprId = NamedExpression.newExprId) + + override def toAttribute: Attribute = { +AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, None) + } + + override def eval(input: InternalRow): Any = value.get + + override def toString: String = s"lambda $name#${exprId.id}$typeSuffix" + + override def simpleString: String = s"lambda $name#${exprId.id}: ${dataType.simpleString}" +} + +/** + * A lambda function and its arguments. A lambda function can be hidden when a user wants to + * process an completely independent expression in a [[HigherOrderFunction]], the lambda function + * and its variables are then only used for internal bookkeeping within the higher order function. + */ +case class LambdaFunction( +function: Expression, +arguments: Seq[NamedExpression], +hidden: Boolean = false) + extends Expression with CodegenFallback { + + override def children: Seq[Expression] = function +: arguments + override def dataType: DataType = function.dataType + override def nullable: Boolean = function.nullable + + lazy val bound: Boolean = arguments.forall(_.resolved) + + override def eval(input: InternalRow): Any = function.eval(input) +} + +/** + * A higher order function takes one or more (lambda) functions and applies these to some objects. + * The function produces a number of variables which can be consumed by some lambda function. + */ +trait HigherOrderFunction extends Expression { + + override def children: Seq[Expression] = inputs ++ functions + + /** + * Inputs to the higher ordered function. + */ + def inputs: Seq[Expression] + + /** + * All inputs have been resolved. This means that the types and nullabilty of (most of) the + * lambda function arguments is known, and that we can start binding the lambda functions. + */ + lazy val inputResolved: Boolean = inputs.forall(_.resolved) + + /** + * Functions applied by the higher order function. + */ + def functions: Seq[Expression] + + /** + * All inputs must be resolved and all functions must be resolved lambda functions. + */ + override lazy val resolved: Boolean = inputResolved && functions.forall { +case l: LambdaFunction => l.resolved +case _ => false + } + + /** + * Bind the lambda functions to the [[HigherOrderFunction]] using the given bind function. The + * bind function takes the potential lambda and it's (partial) arguments and converts this into + * a bound lambda function. + */ + def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): HigherOrderFunction + + @transie
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21954#discussion_r208277492 --- Diff: sql/core/src/test/resources/sql-tests/inputs/higher-order-functions.sql --- @@ -0,0 +1,26 @@ +create or replace temporary view nested as values + (1, array(32, 97), array(array(12, 99), array(123, 42), array(1))), + (2, array(77, -76), array(array(6, 96, 65), array(-1, -2))), + (3, array(12), array(array(17))) + as t(x, ys, zs); + +-- Only allow lambda's in higher order functions. +select upper(x -> x) as v; + +-- Identity transform an array +select transform(zs, z -> z) as v from nested; + +-- Transform an array +select transform(ys, y -> y * y) as v from nested; + +-- Transform an array with index +select transform(ys, (y, i) -> y + i) as v from nested; + +-- Transform an array with reference +select transform(zs, z -> concat(ys, z)) as v from nested; + +-- Transform an array to an array of 0's +select transform(ys, 0) as v from nested; + +-- Transform a null array +select transform(cast(null as array), x -> x + 1) as v; --- End diff -- shall we add a test for nested lambda? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21954#discussion_r208276367 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala --- @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.catalog.SessionCatalog +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.DataType + +/** + * Resolve a higher order functions from the catalog. This is different from regular function + * resolution because lambda functions can only be resolved after the function has been resolved; + * so we need to resolve higher order function when all children are either resolved or a lambda + * function. + */ +case class ResolveHigherOrderFunctions(catalog: SessionCatalog) extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { +case q: LogicalPlan => + q.transformExpressions { +case u @ UnresolvedFunction(fn, children, false) +if hasLambdaAndResolvedArguments(children) => + withPosition(u) { +catalog.lookupFunction(fn, children) match { + case func: HigherOrderFunction => func + case other => other.failAnalysis( +"A lambda function should only be used in a higher order function. However, " + + s"its class is ${other.getClass.getCanonicalName}, which is not a " + + s"higher order function.") +} + } + } + } + + /** + * Check if the arguments of a function are either resolved or a lambda function. + */ + private def hasLambdaAndResolvedArguments(expressions: Seq[Expression]): Boolean = { +val (lambdas, others) = expressions.partition(_.isInstanceOf[LambdaFunction]) +lambdas.nonEmpty && others.forall(_.resolved) + } +} + +/** + * Resolve the lambda variables exposed by a higher order functions. + * + * This rule works in two steps: + * [1]. Bind the anonymous variables exposed by the higher order function to the lambda function's + * arguments; this creates named and typed lambda variables. The argument names are checked + * for duplicates and the number of arguments are checked during this step. + * [2]. Resolve the used lambda variables used in the lambda function's function expression tree. + * Note that we allow the use of variables from outside the current lambda, this can either + * be a lambda function defined in an outer scope, or a attribute in produced by the plan's + * child. If names are duplicate, the name defined in the most inner scope is used. + */ +case class ResolveLambdaVariables(conf: SQLConf) extends Rule[LogicalPlan] { + + type LambdaVariableMap = Map[String, NamedExpression] + + private val canonicalizer = { +if (!conf.caseSensitiveAnalysis) { + s: String => s.toLowerCase +} else { + s: String => s +} + } + + override def apply(plan: LogicalPlan): LogicalPlan = { +plan.resolveOperators { + case q: LogicalPlan => +q.mapExpressions(resolve(_, Map.empty)) +} + } + + /** + * Create a bound lambda function by binding the arguments of a lambda function to the given + * partial arguments (dataType and nullability only). If the expression happens to be an already + * bound lambda function then we assume it has been bound to the correct arguments and do + * nothing. This function will produce a lambda function with hidden arguments when it is passed + * an a
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/21954#discussion_r208273712 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala --- @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.catalog.SessionCatalog +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.DataType + +/** + * Resolve a higher order functions from the catalog. This is different from regular function + * resolution because lambda functions can only be resolved after the function has been resolved; + * so we need to resolve higher order function when all children are either resolved or a lambda + * function. + */ +case class ResolveHigherOrderFunctions(catalog: SessionCatalog) extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { +case q: LogicalPlan => + q.transformExpressions { +case u @ UnresolvedFunction(fn, children, false) +if hasLambdaAndResolvedArguments(children) => + withPosition(u) { +catalog.lookupFunction(fn, children) match { + case func: HigherOrderFunction => func + case other => other.failAnalysis( +"A lambda function should only be used in a higher order function. However, " + + s"its class is ${other.getClass.getCanonicalName}, which is not a " + + s"higher order function.") +} + } + } + } + + /** + * Check if the arguments of a function are either resolved or a lambda function. + */ + private def hasLambdaAndResolvedArguments(expressions: Seq[Expression]): Boolean = { +val (lambdas, others) = expressions.partition(_.isInstanceOf[LambdaFunction]) +lambdas.nonEmpty && others.forall(_.resolved) + } +} + +/** + * Resolve the lambda variables exposed by a higher order functions. + * + * This rule works in two steps: + * [1]. Bind the anonymous variables exposed by the higher order function to the lambda function's + * arguments; this creates named and typed lambda variables. The argument names are checked + * for duplicates and the number of arguments are checked during this step. + * [2]. Resolve the used lambda variables used in the lambda function's function expression tree. + * Note that we allow the use of variables from outside the current lambda, this can either + * be a lambda function defined in an outer scope, or a attribute in produced by the plan's + * child. If names are duplicate, the name defined in the most inner scope is used. + */ +case class ResolveLambdaVariables(conf: SQLConf) extends Rule[LogicalPlan] { + + type LambdaVariableMap = Map[String, NamedExpression] + + private val canonicalizer = { +if (!conf.caseSensitiveAnalysis) { + s: String => s.toLowerCase +} else { + s: String => s +} + } + + override def apply(plan: LogicalPlan): LogicalPlan = { +plan.resolveOperators { + case q: LogicalPlan => +q.mapExpressions(resolve(_, Map.empty)) +} + } + + /** + * Create a bound lambda function by binding the arguments of a lambda function to the given + * partial arguments (dataType and nullability only). If the expression happens to be an already + * bound lambda function then we assume it has been bound to the correct arguments and do + * nothing. This function will produce a lambda function with hidden arguments when it is passed + * an
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21954#discussion_r208272808 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala --- @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.catalog.SessionCatalog +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.DataType + +/** + * Resolve a higher order functions from the catalog. This is different from regular function + * resolution because lambda functions can only be resolved after the function has been resolved; + * so we need to resolve higher order function when all children are either resolved or a lambda + * function. + */ +case class ResolveHigherOrderFunctions(catalog: SessionCatalog) extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { +case q: LogicalPlan => + q.transformExpressions { +case u @ UnresolvedFunction(fn, children, false) +if hasLambdaAndResolvedArguments(children) => + withPosition(u) { +catalog.lookupFunction(fn, children) match { + case func: HigherOrderFunction => func + case other => other.failAnalysis( +"A lambda function should only be used in a higher order function. However, " + + s"its class is ${other.getClass.getCanonicalName}, which is not a " + + s"higher order function.") +} + } + } + } + + /** + * Check if the arguments of a function are either resolved or a lambda function. + */ + private def hasLambdaAndResolvedArguments(expressions: Seq[Expression]): Boolean = { +val (lambdas, others) = expressions.partition(_.isInstanceOf[LambdaFunction]) +lambdas.nonEmpty && others.forall(_.resolved) + } +} + +/** + * Resolve the lambda variables exposed by a higher order functions. + * + * This rule works in two steps: + * [1]. Bind the anonymous variables exposed by the higher order function to the lambda function's + * arguments; this creates named and typed lambda variables. The argument names are checked + * for duplicates and the number of arguments are checked during this step. + * [2]. Resolve the used lambda variables used in the lambda function's function expression tree. + * Note that we allow the use of variables from outside the current lambda, this can either + * be a lambda function defined in an outer scope, or a attribute in produced by the plan's + * child. If names are duplicate, the name defined in the most inner scope is used. + */ +case class ResolveLambdaVariables(conf: SQLConf) extends Rule[LogicalPlan] { + + type LambdaVariableMap = Map[String, NamedExpression] + + private val canonicalizer = { +if (!conf.caseSensitiveAnalysis) { + s: String => s.toLowerCase +} else { + s: String => s +} + } + + override def apply(plan: LogicalPlan): LogicalPlan = { +plan.resolveOperators { + case q: LogicalPlan => +q.mapExpressions(resolve(_, Map.empty)) +} + } + + /** + * Create a bound lambda function by binding the arguments of a lambda function to the given + * partial arguments (dataType and nullability only). If the expression happens to be an already + * bound lambda function then we assume it has been bound to the correct arguments and do + * nothing. This function will produce a lambda function with hidden arguments when it is passed + * an a
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21954#discussion_r208269300 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import java.util.concurrent.atomic.AtomicReference + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} +import org.apache.spark.sql.types._ + +/** + * A named lambda variable. + */ +case class NamedLambdaVariable( +name: String, +dataType: DataType, +nullable: Boolean, +value: AtomicReference[Any] = new AtomicReference(), +exprId: ExprId = NamedExpression.newExprId) + extends LeafExpression + with NamedExpression + with CodegenFallback { + + override def qualifier: Option[String] = None + + override def newInstance(): NamedExpression = +copy(value = new AtomicReference(), exprId = NamedExpression.newExprId) + + override def toAttribute: Attribute = { +AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, None) + } + + override def eval(input: InternalRow): Any = value.get + + override def toString: String = s"lambda $name#${exprId.id}$typeSuffix" + + override def simpleString: String = s"lambda $name#${exprId.id}: ${dataType.simpleString}" +} + +/** + * A lambda function and its arguments. A lambda function can be hidden when a user wants to + * process an completely independent expression in a [[HigherOrderFunction]], the lambda function + * and its variables are then only used for internal bookkeeping within the higher order function. + */ +case class LambdaFunction( +function: Expression, +arguments: Seq[NamedExpression], +hidden: Boolean = false) + extends Expression with CodegenFallback { + + override def children: Seq[Expression] = function +: arguments + override def dataType: DataType = function.dataType + override def nullable: Boolean = function.nullable + + lazy val bound: Boolean = arguments.forall(_.resolved) + + override def eval(input: InternalRow): Any = function.eval(input) +} + +/** + * A higher order function takes one or more (lambda) functions and applies these to some objects. + * The function produces a number of variables which can be consumed by some lambda function. + */ +trait HigherOrderFunction extends Expression { + + override def children: Seq[Expression] = inputs ++ functions + + /** + * Inputs to the higher ordered function. + */ + def inputs: Seq[Expression] + + /** + * All inputs have been resolved. This means that the types and nullabilty of (most of) the + * lambda function arguments is known, and that we can start binding the lambda functions. + */ + lazy val inputResolved: Boolean = inputs.forall(_.resolved) + + /** + * Functions applied by the higher order function. + */ + def functions: Seq[Expression] + + /** + * All inputs must be resolved and all functions must be resolved lambda functions. + */ + override lazy val resolved: Boolean = inputResolved && functions.forall { +case l: LambdaFunction => l.resolved +case _ => false + } + + /** + * Bind the lambda functions to the [[HigherOrderFunction]] using the given bind function. The + * bind function takes the potential lambda and it's (partial) arguments and converts this into + * a bound lambda function. + */ + def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): HigherOrderFunction + + @tran
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21954#discussion_r208268129 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import java.util.concurrent.atomic.AtomicReference + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} +import org.apache.spark.sql.types._ + +/** + * A named lambda variable. + */ +case class NamedLambdaVariable( +name: String, +dataType: DataType, +nullable: Boolean, +value: AtomicReference[Any] = new AtomicReference(), +exprId: ExprId = NamedExpression.newExprId) + extends LeafExpression + with NamedExpression + with CodegenFallback { + + override def qualifier: Option[String] = None + + override def newInstance(): NamedExpression = +copy(value = new AtomicReference(), exprId = NamedExpression.newExprId) + + override def toAttribute: Attribute = { +AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, None) + } + + override def eval(input: InternalRow): Any = value.get + + override def toString: String = s"lambda $name#${exprId.id}$typeSuffix" + + override def simpleString: String = s"lambda $name#${exprId.id}: ${dataType.simpleString}" +} + +/** + * A lambda function and its arguments. A lambda function can be hidden when a user wants to + * process an completely independent expression in a [[HigherOrderFunction]], the lambda function + * and its variables are then only used for internal bookkeeping within the higher order function. + */ +case class LambdaFunction( +function: Expression, +arguments: Seq[NamedExpression], +hidden: Boolean = false) + extends Expression with CodegenFallback { + + override def children: Seq[Expression] = function +: arguments + override def dataType: DataType = function.dataType + override def nullable: Boolean = function.nullable + + lazy val bound: Boolean = arguments.forall(_.resolved) + + override def eval(input: InternalRow): Any = function.eval(input) +} + +/** + * A higher order function takes one or more (lambda) functions and applies these to some objects. + * The function produces a number of variables which can be consumed by some lambda function. + */ +trait HigherOrderFunction extends Expression { + + override def children: Seq[Expression] = inputs ++ functions + + /** + * Inputs to the higher ordered function. + */ + def inputs: Seq[Expression] + + /** + * All inputs have been resolved. This means that the types and nullabilty of (most of) the + * lambda function arguments is known, and that we can start binding the lambda functions. + */ + lazy val inputResolved: Boolean = inputs.forall(_.resolved) + + /** + * Functions applied by the higher order function. + */ + def functions: Seq[Expression] + + /** + * All inputs must be resolved and all functions must be resolved lambda functions. + */ + override lazy val resolved: Boolean = inputResolved && functions.forall { +case l: LambdaFunction => l.resolved +case _ => false + } + + /** + * Bind the lambda functions to the [[HigherOrderFunction]] using the given bind function. The + * bind function takes the potential lambda and it's (partial) arguments and converts this into + * a bound lambda function. + */ + def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): HigherOrderFunction + + @tran
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21954#discussion_r208267632 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import java.util.concurrent.atomic.AtomicReference + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} +import org.apache.spark.sql.types._ + +/** + * A named lambda variable. + */ +case class NamedLambdaVariable( +name: String, +dataType: DataType, +nullable: Boolean, +value: AtomicReference[Any] = new AtomicReference(), +exprId: ExprId = NamedExpression.newExprId) + extends LeafExpression + with NamedExpression + with CodegenFallback { + + override def qualifier: Option[String] = None + + override def newInstance(): NamedExpression = +copy(value = new AtomicReference(), exprId = NamedExpression.newExprId) + + override def toAttribute: Attribute = { +AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, None) + } + + override def eval(input: InternalRow): Any = value.get + + override def toString: String = s"lambda $name#${exprId.id}$typeSuffix" + + override def simpleString: String = s"lambda $name#${exprId.id}: ${dataType.simpleString}" +} + +/** + * A lambda function and its arguments. A lambda function can be hidden when a user wants to + * process an completely independent expression in a [[HigherOrderFunction]], the lambda function + * and its variables are then only used for internal bookkeeping within the higher order function. + */ +case class LambdaFunction( +function: Expression, +arguments: Seq[NamedExpression], +hidden: Boolean = false) + extends Expression with CodegenFallback { + + override def children: Seq[Expression] = function +: arguments + override def dataType: DataType = function.dataType + override def nullable: Boolean = function.nullable + + lazy val bound: Boolean = arguments.forall(_.resolved) + + override def eval(input: InternalRow): Any = function.eval(input) +} + +/** + * A higher order function takes one or more (lambda) functions and applies these to some objects. + * The function produces a number of variables which can be consumed by some lambda function. + */ +trait HigherOrderFunction extends Expression { + + override def children: Seq[Expression] = inputs ++ functions + + /** + * Inputs to the higher ordered function. + */ + def inputs: Seq[Expression] + + /** + * All inputs have been resolved. This means that the types and nullabilty of (most of) the + * lambda function arguments is known, and that we can start binding the lambda functions. + */ + lazy val inputResolved: Boolean = inputs.forall(_.resolved) + + /** + * Functions applied by the higher order function. + */ + def functions: Seq[Expression] --- End diff -- seems function must be `LambdaFunction`, why don't we enforce it at type level? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21954#discussion_r208266333 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import java.util.concurrent.atomic.AtomicReference + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} +import org.apache.spark.sql.types._ + +/** + * A named lambda variable. + */ +case class NamedLambdaVariable( +name: String, +dataType: DataType, +nullable: Boolean, +value: AtomicReference[Any] = new AtomicReference(), +exprId: ExprId = NamedExpression.newExprId) + extends LeafExpression + with NamedExpression + with CodegenFallback { + + override def qualifier: Option[String] = None + + override def newInstance(): NamedExpression = +copy(value = new AtomicReference(), exprId = NamedExpression.newExprId) + + override def toAttribute: Attribute = { +AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, None) + } + + override def eval(input: InternalRow): Any = value.get + + override def toString: String = s"lambda $name#${exprId.id}$typeSuffix" + + override def simpleString: String = s"lambda $name#${exprId.id}: ${dataType.simpleString}" +} + +/** + * A lambda function and its arguments. A lambda function can be hidden when a user wants to + * process an completely independent expression in a [[HigherOrderFunction]], the lambda function + * and its variables are then only used for internal bookkeeping within the higher order function. + */ +case class LambdaFunction( +function: Expression, +arguments: Seq[NamedExpression], +hidden: Boolean = false) + extends Expression with CodegenFallback { + + override def children: Seq[Expression] = function +: arguments + override def dataType: DataType = function.dataType + override def nullable: Boolean = function.nullable + + lazy val bound: Boolean = arguments.forall(_.resolved) + + override def eval(input: InternalRow): Any = function.eval(input) +} + +/** + * A higher order function takes one or more (lambda) functions and applies these to some objects. + * The function produces a number of variables which can be consumed by some lambda function. + */ +trait HigherOrderFunction extends Expression { + + override def children: Seq[Expression] = inputs ++ functions + + /** + * Inputs to the higher ordered function. + */ + def inputs: Seq[Expression] + + /** + * All inputs have been resolved. This means that the types and nullabilty of (most of) the + * lambda function arguments is known, and that we can start binding the lambda functions. + */ + lazy val inputResolved: Boolean = inputs.forall(_.resolved) + + /** + * Functions applied by the higher order function. + */ + def functions: Seq[Expression] + + /** + * All inputs must be resolved and all functions must be resolved lambda functions. + */ + override lazy val resolved: Boolean = inputResolved && functions.forall { +case l: LambdaFunction => l.resolved +case _ => false + } + + /** + * Bind the lambda functions to the [[HigherOrderFunction]] using the given bind function. The + * bind function takes the potential lambda and it's (partial) arguments and converts this into + * a bound lambda function. + */ + def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): HigherOrderFunction + + @tran
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21954 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21954#discussion_r207172497 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import java.util.concurrent.atomic.AtomicReference + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} +import org.apache.spark.sql.types._ + +/** + * A named lambda variable. + */ +case class NamedLambdaVariable( +name: String, +dataType: DataType, +nullable: Boolean, +value: AtomicReference[Any] = new AtomicReference(), --- End diff -- I see. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/21954#discussion_r207171941 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import java.util.concurrent.atomic.AtomicReference + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} +import org.apache.spark.sql.types._ + +/** + * A named lambda variable. + */ +case class NamedLambdaVariable( +name: String, +dataType: DataType, +nullable: Boolean, +value: AtomicReference[Any] = new AtomicReference(), --- End diff -- Yeah, that makes sense. Let's leave it for now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21954#discussion_r207169967 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import java.util.concurrent.atomic.AtomicReference + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} +import org.apache.spark.sql.types._ + +/** + * A named lambda variable. + */ +case class NamedLambdaVariable( +name: String, +dataType: DataType, +nullable: Boolean, +value: AtomicReference[Any] = new AtomicReference(), --- End diff -- Hmm, seems like overriding `fastEquals` is not enough.. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21954#discussion_r207162371 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import java.util.concurrent.atomic.AtomicReference + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} +import org.apache.spark.sql.types._ + +/** + * A named lambda variable. + */ +case class NamedLambdaVariable( +name: String, +dataType: DataType, +nullable: Boolean, +value: AtomicReference[Any] = new AtomicReference(), --- End diff -- Ah, maybe I should override `fastEquals` instead of using `AtomicReference`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21954#discussion_r207162167 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import java.util.concurrent.atomic.AtomicReference + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} +import org.apache.spark.sql.types._ + +/** + * A named lambda variable. + */ +case class NamedLambdaVariable( +name: String, +dataType: DataType, +nullable: Boolean, +value: AtomicReference[Any] = new AtomicReference(), --- End diff -- When I tried to make copies of `NamedLambdaVariable`s, the `transformUp` doesn't replace the variables, and generated wrong results. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/21954#discussion_r207158636 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import java.util.concurrent.atomic.AtomicReference + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} +import org.apache.spark.sql.types._ + +/** + * A named lambda variable. + */ +case class NamedLambdaVariable( +name: String, +dataType: DataType, +nullable: Boolean, +value: AtomicReference[Any] = new AtomicReference(), --- End diff -- You did? Could you elaborate? There shouldn't be any current access here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21954#discussion_r207148555 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import java.util.concurrent.atomic.AtomicReference + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} +import org.apache.spark.sql.types._ + +/** + * A named lambda variable. + */ +case class NamedLambdaVariable( +name: String, +dataType: DataType, +nullable: Boolean, +value: AtomicReference[Any] = new AtomicReference(), --- End diff -- Actually, also when creating `functionsForEval`. I needed it for `transformUp` work properly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/21954#discussion_r207145478 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import java.util.concurrent.atomic.AtomicReference + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} +import org.apache.spark.sql.types._ + +/** + * A named lambda variable. + */ +case class NamedLambdaVariable( +name: String, +dataType: DataType, +nullable: Boolean, +value: AtomicReference[Any] = new AtomicReference(), --- End diff -- You are only using the `AtomicReference ` as an container right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21954#discussion_r207143138 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import java.util.concurrent.atomic.AtomicReference + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} +import org.apache.spark.sql.types._ + +/** + * A named lambda variable. + */ +case class NamedLambdaVariable( +name: String, +dataType: DataType, +nullable: Boolean, +value: AtomicReference[Any] = new AtomicReference(), +exprId: ExprId = NamedExpression.newExprId) + extends LeafExpression + with NamedExpression + with CodegenFallback { + + override def qualifier: Option[String] = None + + override def newInstance(): NamedExpression = +copy(value = new AtomicReference(), exprId = NamedExpression.newExprId) + + override def toAttribute: Attribute = { +AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, None) + } + + override def eval(input: InternalRow): Any = value.get + + override def toString: String = s"lambda $name#${exprId.id}$typeSuffix" + + override def simpleString: String = s"lambda $name#${exprId.id}: ${dataType.simpleString}" +} + +/** + * A lambda function and its arguments. A lambda function can be hidden when a user wants to + * process an completely independent expression in a [[HigherOrderFunction]], the lambda function + * and its variables are then only used for internal bookkeeping within the higher order function. + */ +case class LambdaFunction( +function: Expression, +arguments: Seq[NamedExpression], +hidden: Boolean = false) + extends Expression with CodegenFallback { + + override def children: Seq[Expression] = function +: arguments + override def dataType: DataType = function.dataType + override def nullable: Boolean = function.nullable + + lazy val bound: Boolean = arguments.forall(_.resolved) + + override def eval(input: InternalRow): Any = function.eval(input) +} + +/** + * A higher order function takes one or more (lambda) functions and applies these to some objects. + * The function produces a number of variables which can be consumed by some lambda function. + */ +trait HigherOrderFunction extends Expression { + + override def children: Seq[Expression] = inputs ++ functions + + /** + * Inputs to the higher ordered function. + */ + def inputs: Seq[Expression] + + /** + * All inputs have been resolved. This means that the types and nullabilty of (most of) the + * lambda function arguments is known, and that we can start binding the lambda functions. + */ + lazy val inputResolved: Boolean = inputs.forall(_.resolved) + + /** + * Functions applied by the higher order function. + */ + def functions: Seq[Expression] + + /** + * All inputs must be resolved and all functions must be resolved lambda functions. + */ + override lazy val resolved: Boolean = inputResolved && functions.forall { +case l: LambdaFunction => l.resolved +case _ => false + } + + /** + * Bind the lambda functions to the [[HigherOrderFunction]] using the given bind function. The + * bind function takes the potential lambda and it's (partial) arguments and converts this into + * a bound lambda function. + */ + def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): HigherOrderFunction + + @transie
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/21954#discussion_r207141916 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import java.util.concurrent.atomic.AtomicReference + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} +import org.apache.spark.sql.types._ + +/** + * A named lambda variable. + */ +case class NamedLambdaVariable( +name: String, +dataType: DataType, +nullable: Boolean, +value: AtomicReference[Any] = new AtomicReference(), +exprId: ExprId = NamedExpression.newExprId) + extends LeafExpression + with NamedExpression + with CodegenFallback { + + override def qualifier: Option[String] = None + + override def newInstance(): NamedExpression = +copy(value = new AtomicReference(), exprId = NamedExpression.newExprId) + + override def toAttribute: Attribute = { +AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, None) + } + + override def eval(input: InternalRow): Any = value.get + + override def toString: String = s"lambda $name#${exprId.id}$typeSuffix" + + override def simpleString: String = s"lambda $name#${exprId.id}: ${dataType.simpleString}" +} + +/** + * A lambda function and its arguments. A lambda function can be hidden when a user wants to + * process an completely independent expression in a [[HigherOrderFunction]], the lambda function + * and its variables are then only used for internal bookkeeping within the higher order function. + */ +case class LambdaFunction( +function: Expression, +arguments: Seq[NamedExpression], +hidden: Boolean = false) + extends Expression with CodegenFallback { + + override def children: Seq[Expression] = function +: arguments + override def dataType: DataType = function.dataType + override def nullable: Boolean = function.nullable + + lazy val bound: Boolean = arguments.forall(_.resolved) + + override def eval(input: InternalRow): Any = function.eval(input) +} + +/** + * A higher order function takes one or more (lambda) functions and applies these to some objects. + * The function produces a number of variables which can be consumed by some lambda function. + */ +trait HigherOrderFunction extends Expression { + + override def children: Seq[Expression] = inputs ++ functions + + /** + * Inputs to the higher ordered function. + */ + def inputs: Seq[Expression] + + /** + * All inputs have been resolved. This means that the types and nullabilty of (most of) the + * lambda function arguments is known, and that we can start binding the lambda functions. + */ + lazy val inputResolved: Boolean = inputs.forall(_.resolved) + + /** + * Functions applied by the higher order function. + */ + def functions: Seq[Expression] + + /** + * All inputs must be resolved and all functions must be resolved lambda functions. + */ + override lazy val resolved: Boolean = inputResolved && functions.forall { +case l: LambdaFunction => l.resolved +case _ => false + } + + /** + * Bind the lambda functions to the [[HigherOrderFunction]] using the given bind function. The + * bind function takes the potential lambda and it's (partial) arguments and converts this into + * a bound lambda function. + */ + def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): HigherOrderFunction + + @tra
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21954#discussion_r207102738 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import java.util.concurrent.atomic.AtomicReference + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} +import org.apache.spark.sql.types._ + +/** + * A named lambda variable. + */ +case class NamedLambdaVariable( +name: String, +dataType: DataType, +nullable: Boolean, +value: AtomicReference[Any] = new AtomicReference(), +exprId: ExprId = NamedExpression.newExprId) + extends LeafExpression + with NamedExpression { + + override def qualifier: Option[String] = None + + override def newInstance(): NamedExpression = +copy(value = new AtomicReference(), exprId = NamedExpression.newExprId) + + override def toAttribute: Attribute = { +AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, None) + } + + override def eval(input: InternalRow): Any = value.get + + override def genCode(ctx: CodegenContext): ExprCode = { +val suffix = "_lambda_variable_" + exprId.id +ExprCode( + if (nullable) JavaCode.isNullVariable(s"isNull_${name}$suffix") else FalseLiteral, + JavaCode.variable(s"value_${name}$suffix", dataType)) + } + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +throw new IllegalStateException("NamedLambdaVariable.doGenCode should not be called.") + } + + override def toString: String = s"lambda $name#${exprId.id}$typeSuffix" + + override def simpleString: String = s"lambda $name#${exprId.id}: ${dataType.simpleString}" +} + +/** + * A lambda function and its arguments. A lambda function can be hidden when a user wants to + * process an completely independent expression in a [[HigherOrderFunction]], the lambda function + * and its variables are then only used for internal bookkeeping within the higher order function. + */ +case class LambdaFunction( +function: Expression, +arguments: Seq[NamedExpression], +hidden: Boolean = false) + extends Expression { + + override def children: Seq[Expression] = function +: arguments + override def dataType: DataType = function.dataType + override def nullable: Boolean = function.nullable + + lazy val bound: Boolean = arguments.forall(_.resolved) + + override def eval(input: InternalRow): Any = function.eval(input) + + override def genCode(ctx: CodegenContext): ExprCode = { +function.genCode(ctx) + } + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +throw new IllegalStateException("LambdaFunction.doGenCode should not be called.") + } +} + +/** + * A higher order function takes one or more (lambda) functions and applies these to some objects. + * The function produces a number of variables which can be consumed by some lambda function. + */ +trait HigherOrderFunction extends Expression { + + override def children: Seq[Expression] = inputs ++ functions + + /** + * Inputs to the higher ordered function. + */ + def inputs: Seq[Expression] + + /** + * All inputs have been resolved. This means that the types and nullabilty of (most of) the + * lambda function arguments is known, and that we can start binding the lambda functions. + */ + lazy val inputResolved: Boolean = inputs.forall(_.resolved) + + /** +
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21954#discussion_r207098029 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import java.util.concurrent.atomic.AtomicReference + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} +import org.apache.spark.sql.types._ + +/** + * A named lambda variable. + */ +case class NamedLambdaVariable( +name: String, +dataType: DataType, +nullable: Boolean, +value: AtomicReference[Any] = new AtomicReference(), +exprId: ExprId = NamedExpression.newExprId) + extends LeafExpression + with NamedExpression { + + override def qualifier: Option[String] = None + + override def newInstance(): NamedExpression = +copy(value = new AtomicReference(), exprId = NamedExpression.newExprId) + + override def toAttribute: Attribute = { +AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, None) + } + + override def eval(input: InternalRow): Any = value.get + + override def genCode(ctx: CodegenContext): ExprCode = { +val suffix = "_lambda_variable_" + exprId.id +ExprCode( + if (nullable) JavaCode.isNullVariable(s"isNull_${name}$suffix") else FalseLiteral, + JavaCode.variable(s"value_${name}$suffix", dataType)) + } + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +throw new IllegalStateException("NamedLambdaVariable.doGenCode should not be called.") + } + + override def toString: String = s"lambda $name#${exprId.id}$typeSuffix" + + override def simpleString: String = s"lambda $name#${exprId.id}: ${dataType.simpleString}" +} + +/** + * A lambda function and its arguments. A lambda function can be hidden when a user wants to + * process an completely independent expression in a [[HigherOrderFunction]], the lambda function + * and its variables are then only used for internal bookkeeping within the higher order function. + */ +case class LambdaFunction( +function: Expression, +arguments: Seq[NamedExpression], +hidden: Boolean = false) + extends Expression { + + override def children: Seq[Expression] = function +: arguments + override def dataType: DataType = function.dataType + override def nullable: Boolean = function.nullable + + lazy val bound: Boolean = arguments.forall(_.resolved) + + override def eval(input: InternalRow): Any = function.eval(input) + + override def genCode(ctx: CodegenContext): ExprCode = { +function.genCode(ctx) + } + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +throw new IllegalStateException("LambdaFunction.doGenCode should not be called.") + } +} + +/** + * A higher order function takes one or more (lambda) functions and applies these to some objects. + * The function produces a number of variables which can be consumed by some lambda function. + */ +trait HigherOrderFunction extends Expression { + + override def children: Seq[Expression] = inputs ++ functions + + /** + * Inputs to the higher ordered function. + */ + def inputs: Seq[Expression] + + /** + * All inputs have been resolved. This means that the types and nullabilty of (most of) the + * lambda function arguments is known, and that we can start binding the lambda functions. + */ + lazy val inputResolved: Boolean = inputs.forall(_.resolved) + + /** +
[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.
GitHub user ueshin opened a pull request: https://github.com/apache/spark/pull/21954 [SPARK-23908][SQL] Add transform function. ## What changes were proposed in this pull request? This pr adds `transform` function which transforms elements in an array using the function. Optionally we can take the index of each element as the second argument. ```sql > SELECT transform(array(1, 2, 3), x -> x + 1); array(2, 3, 4) > SELECT transform(array(1, 2, 3), (x, i) -> x + i); array(1, 3, 5) ``` ## How was this patch tested? Added tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ueshin/apache-spark issues/SPARK-23908/transform Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21954.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21954 commit e860249c96854daefccc580ca52e3fad7d1acf67 Author: Takuya UESHIN Date: 2018-08-01T03:46:00Z Add `LambdaFunction` and its parser. commit 06825b6a3ae6b3085c4b4e5c010dffa75e988801 Author: Takuya UESHIN Date: 2018-08-01T05:52:40Z Add `ResolveHigherOrderFunctions`. commit 17ab2ffc73a664ba2b00d49f0835faff055274b0 Author: Takuya UESHIN Date: 2018-08-01T05:59:32Z Add `ArrayTransform`. commit 95a06b4fe3f660c617a6a53bf05a0a62306b Author: Takuya UESHIN Date: 2018-08-01T08:06:01Z Test in sql/core. commit 4448d0b7a085c1613dc0dd52009b6d50388ec605 Author: Takuya UESHIN Date: 2018-08-01T09:20:14Z Add negative cases. commit abc685f86ee205f2a64b065c98a65fc2d36bfd75 Author: Takuya UESHIN Date: 2018-08-01T09:46:00Z Add sql file. commit ee450c5ef3f99d3bbf8dbbd05273bc63005bbccb Author: Takuya UESHIN Date: 2018-08-01T10:15:44Z Replace lambda variable in function by one in arguments to make sure the variables are the same as them in arguments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org