[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

2018-08-07 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21954#discussion_r208282941
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala
 ---
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.DataType
+
+/**
+ * Resolve a higher order functions from the catalog. This is different 
from regular function
+ * resolution because lambda functions can only be resolved after the 
function has been resolved;
+ * so we need to resolve higher order function when all children are 
either resolved or a lambda
+ * function.
+ */
+case class ResolveHigherOrderFunctions(catalog: SessionCatalog) extends 
Rule[LogicalPlan] {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperators {
+case q: LogicalPlan =>
+  q.transformExpressions {
+case u @ UnresolvedFunction(fn, children, false)
+if hasLambdaAndResolvedArguments(children) =>
+  withPosition(u) {
+catalog.lookupFunction(fn, children) match {
+  case func: HigherOrderFunction => func
+  case other => other.failAnalysis(
+"A lambda function should only be used in a higher order 
function. However, " +
+  s"its class is ${other.getClass.getCanonicalName}, which 
is not a " +
+  s"higher order function.")
+}
+  }
+  }
+  }
+
+  /**
+   * Check if the arguments of a function are either resolved or a lambda 
function.
+   */
+  private def hasLambdaAndResolvedArguments(expressions: Seq[Expression]): 
Boolean = {
+val (lambdas, others) = 
expressions.partition(_.isInstanceOf[LambdaFunction])
+lambdas.nonEmpty && others.forall(_.resolved)
+  }
+}
+
+/**
+ * Resolve the lambda variables exposed by a higher order functions.
+ *
+ * This rule works in two steps:
+ * [1]. Bind the anonymous variables exposed by the higher order function 
to the lambda function's
+ *  arguments; this creates named and typed lambda variables. The 
argument names are checked
+ *  for duplicates and the number of arguments are checked during this 
step.
+ * [2]. Resolve the used lambda variables used in the lambda function's 
function expression tree.
+ *  Note that we allow the use of variables from outside the current 
lambda, this can either
+ *  be a lambda function defined in an outer scope, or a attribute in 
produced by the plan's
+ *  child. If names are duplicate, the name defined in the most inner 
scope is used.
+ */
+case class ResolveLambdaVariables(conf: SQLConf) extends Rule[LogicalPlan] 
{
+
+  type LambdaVariableMap = Map[String, NamedExpression]
+
+  private val canonicalizer = {
+if (!conf.caseSensitiveAnalysis) {
+  s: String => s.toLowerCase
+} else {
+  s: String => s
+}
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+plan.resolveOperators {
+  case q: LogicalPlan =>
+q.mapExpressions(resolve(_, Map.empty))
+}
+  }
+
+  /**
+   * Create a bound lambda function by binding the arguments of a lambda 
function to the given
+   * partial arguments (dataType and nullability only). If the expression 
happens to be an already
+   * bound lambda function then we assume it has been bound to the correct 
arguments and do
+   * nothing. This function will produce a lambda function with hidden 
arguments when it is passed
+   * an arbi

[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

2018-08-07 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21954#discussion_r208282971
  
--- Diff: 
sql/core/src/test/resources/sql-tests/inputs/higher-order-functions.sql ---
@@ -0,0 +1,26 @@
+create or replace temporary view nested as values
+  (1, array(32, 97), array(array(12, 99), array(123, 42), array(1))),
+  (2, array(77, -76), array(array(6, 96, 65), array(-1, -2))),
+  (3, array(12), array(array(17)))
+  as t(x, ys, zs);
+
+-- Only allow lambda's in higher order functions.
+select upper(x -> x) as v;
+
+-- Identity transform an array
+select transform(zs, z -> z) as v from nested;
+
+-- Transform an array
+select transform(ys, y -> y * y) as v from nested;
+
+-- Transform an array with index
+select transform(ys, (y, i) -> y + i) as v from nested;
+
+-- Transform an array with reference
+select transform(zs, z -> concat(ys, z)) as v from nested;
+
+-- Transform an array to an array of 0's
+select transform(ys, 0) as v from nested;
+
+-- Transform a null array
+select transform(cast(null as array), x -> x + 1) as v;
--- End diff --

Actually we have some at #21965 and #21982.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

2018-08-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21954#discussion_r208282782
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala
 ---
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.DataType
+
+/**
+ * Resolve a higher order functions from the catalog. This is different 
from regular function
+ * resolution because lambda functions can only be resolved after the 
function has been resolved;
+ * so we need to resolve higher order function when all children are 
either resolved or a lambda
+ * function.
+ */
+case class ResolveHigherOrderFunctions(catalog: SessionCatalog) extends 
Rule[LogicalPlan] {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperators {
+case q: LogicalPlan =>
+  q.transformExpressions {
+case u @ UnresolvedFunction(fn, children, false)
+if hasLambdaAndResolvedArguments(children) =>
+  withPosition(u) {
+catalog.lookupFunction(fn, children) match {
+  case func: HigherOrderFunction => func
+  case other => other.failAnalysis(
+"A lambda function should only be used in a higher order 
function. However, " +
+  s"its class is ${other.getClass.getCanonicalName}, which 
is not a " +
+  s"higher order function.")
+}
+  }
+  }
+  }
+
+  /**
+   * Check if the arguments of a function are either resolved or a lambda 
function.
+   */
+  private def hasLambdaAndResolvedArguments(expressions: Seq[Expression]): 
Boolean = {
+val (lambdas, others) = 
expressions.partition(_.isInstanceOf[LambdaFunction])
+lambdas.nonEmpty && others.forall(_.resolved)
+  }
+}
+
+/**
+ * Resolve the lambda variables exposed by a higher order functions.
+ *
+ * This rule works in two steps:
+ * [1]. Bind the anonymous variables exposed by the higher order function 
to the lambda function's
+ *  arguments; this creates named and typed lambda variables. The 
argument names are checked
+ *  for duplicates and the number of arguments are checked during this 
step.
+ * [2]. Resolve the used lambda variables used in the lambda function's 
function expression tree.
+ *  Note that we allow the use of variables from outside the current 
lambda, this can either
+ *  be a lambda function defined in an outer scope, or a attribute in 
produced by the plan's
+ *  child. If names are duplicate, the name defined in the most inner 
scope is used.
+ */
+case class ResolveLambdaVariables(conf: SQLConf) extends Rule[LogicalPlan] 
{
+
+  type LambdaVariableMap = Map[String, NamedExpression]
+
+  private val canonicalizer = {
+if (!conf.caseSensitiveAnalysis) {
+  s: String => s.toLowerCase
+} else {
+  s: String => s
+}
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+plan.resolveOperators {
+  case q: LogicalPlan =>
+q.mapExpressions(resolve(_, Map.empty))
+}
+  }
+
+  /**
+   * Create a bound lambda function by binding the arguments of a lambda 
function to the given
+   * partial arguments (dataType and nullability only). If the expression 
happens to be an already
+   * bound lambda function then we assume it has been bound to the correct 
arguments and do
+   * nothing. This function will produce a lambda function with hidden 
arguments when it is passed
+   * an a

[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

2018-08-07 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21954#discussion_r208282200
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types._
+
+/**
+ * A named lambda variable.
+ */
+case class NamedLambdaVariable(
+name: String,
+dataType: DataType,
+nullable: Boolean,
+value: AtomicReference[Any] = new AtomicReference(),
+exprId: ExprId = NamedExpression.newExprId)
+  extends LeafExpression
+  with NamedExpression
+  with CodegenFallback {
+
+  override def qualifier: Option[String] = None
+
+  override def newInstance(): NamedExpression =
+copy(value = new AtomicReference(), exprId = NamedExpression.newExprId)
+
+  override def toAttribute: Attribute = {
+AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, 
None)
+  }
+
+  override def eval(input: InternalRow): Any = value.get
+
+  override def toString: String = s"lambda $name#${exprId.id}$typeSuffix"
+
+  override def simpleString: String = s"lambda $name#${exprId.id}: 
${dataType.simpleString}"
+}
+
+/**
+ * A lambda function and its arguments. A lambda function can be hidden 
when a user wants to
+ * process an completely independent expression in a 
[[HigherOrderFunction]], the lambda function
+ * and its variables are then only used for internal bookkeeping within 
the higher order function.
+ */
+case class LambdaFunction(
+function: Expression,
+arguments: Seq[NamedExpression],
+hidden: Boolean = false)
+  extends Expression with CodegenFallback {
+
+  override def children: Seq[Expression] = function +: arguments
+  override def dataType: DataType = function.dataType
+  override def nullable: Boolean = function.nullable
+
+  lazy val bound: Boolean = arguments.forall(_.resolved)
+
+  override def eval(input: InternalRow): Any = function.eval(input)
+}
+
+/**
+ * A higher order function takes one or more (lambda) functions and 
applies these to some objects.
+ * The function produces a number of variables which can be consumed by 
some lambda function.
+ */
+trait HigherOrderFunction extends Expression {
+
+  override def children: Seq[Expression] = inputs ++ functions
+
+  /**
+   * Inputs to the higher ordered function.
+   */
+  def inputs: Seq[Expression]
+
+  /**
+   * All inputs have been resolved. This means that the types and 
nullabilty of (most of) the
+   * lambda function arguments is known, and that we can start binding the 
lambda functions.
+   */
+  lazy val inputResolved: Boolean = inputs.forall(_.resolved)
+
+  /**
+   * Functions applied by the higher order function.
+   */
+  def functions: Seq[Expression]
+
+  /**
+   * All inputs must be resolved and all functions must be resolved lambda 
functions.
+   */
+  override lazy val resolved: Boolean = inputResolved && functions.forall {
+case l: LambdaFunction => l.resolved
+case _ => false
+  }
+
+  /**
+   * Bind the lambda functions to the [[HigherOrderFunction]] using the 
given bind function. The
+   * bind function takes the potential lambda and it's (partial) arguments 
and converts this into
+   * a bound lambda function.
+   */
+  def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): 
HigherOrderFunction
+
+  @transie

[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

2018-08-07 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21954#discussion_r208281900
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types._
+
+/**
+ * A named lambda variable.
+ */
+case class NamedLambdaVariable(
+name: String,
+dataType: DataType,
+nullable: Boolean,
+value: AtomicReference[Any] = new AtomicReference(),
+exprId: ExprId = NamedExpression.newExprId)
+  extends LeafExpression
+  with NamedExpression
+  with CodegenFallback {
+
+  override def qualifier: Option[String] = None
+
+  override def newInstance(): NamedExpression =
+copy(value = new AtomicReference(), exprId = NamedExpression.newExprId)
+
+  override def toAttribute: Attribute = {
+AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, 
None)
+  }
+
+  override def eval(input: InternalRow): Any = value.get
+
+  override def toString: String = s"lambda $name#${exprId.id}$typeSuffix"
+
+  override def simpleString: String = s"lambda $name#${exprId.id}: 
${dataType.simpleString}"
+}
+
+/**
+ * A lambda function and its arguments. A lambda function can be hidden 
when a user wants to
+ * process an completely independent expression in a 
[[HigherOrderFunction]], the lambda function
+ * and its variables are then only used for internal bookkeeping within 
the higher order function.
+ */
+case class LambdaFunction(
+function: Expression,
+arguments: Seq[NamedExpression],
+hidden: Boolean = false)
+  extends Expression with CodegenFallback {
+
+  override def children: Seq[Expression] = function +: arguments
+  override def dataType: DataType = function.dataType
+  override def nullable: Boolean = function.nullable
+
+  lazy val bound: Boolean = arguments.forall(_.resolved)
+
+  override def eval(input: InternalRow): Any = function.eval(input)
+}
+
+/**
+ * A higher order function takes one or more (lambda) functions and 
applies these to some objects.
+ * The function produces a number of variables which can be consumed by 
some lambda function.
+ */
+trait HigherOrderFunction extends Expression {
+
+  override def children: Seq[Expression] = inputs ++ functions
+
+  /**
+   * Inputs to the higher ordered function.
+   */
+  def inputs: Seq[Expression]
+
+  /**
+   * All inputs have been resolved. This means that the types and 
nullabilty of (most of) the
+   * lambda function arguments is known, and that we can start binding the 
lambda functions.
+   */
+  lazy val inputResolved: Boolean = inputs.forall(_.resolved)
+
+  /**
+   * Functions applied by the higher order function.
+   */
+  def functions: Seq[Expression]
+
+  /**
+   * All inputs must be resolved and all functions must be resolved lambda 
functions.
+   */
+  override lazy val resolved: Boolean = inputResolved && functions.forall {
+case l: LambdaFunction => l.resolved
+case _ => false
+  }
+
+  /**
+   * Bind the lambda functions to the [[HigherOrderFunction]] using the 
given bind function. The
+   * bind function takes the potential lambda and it's (partial) arguments 
and converts this into
+   * a bound lambda function.
+   */
+  def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): 
HigherOrderFunction
+
+  @transie

[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

2018-08-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21954#discussion_r208277492
  
--- Diff: 
sql/core/src/test/resources/sql-tests/inputs/higher-order-functions.sql ---
@@ -0,0 +1,26 @@
+create or replace temporary view nested as values
+  (1, array(32, 97), array(array(12, 99), array(123, 42), array(1))),
+  (2, array(77, -76), array(array(6, 96, 65), array(-1, -2))),
+  (3, array(12), array(array(17)))
+  as t(x, ys, zs);
+
+-- Only allow lambda's in higher order functions.
+select upper(x -> x) as v;
+
+-- Identity transform an array
+select transform(zs, z -> z) as v from nested;
+
+-- Transform an array
+select transform(ys, y -> y * y) as v from nested;
+
+-- Transform an array with index
+select transform(ys, (y, i) -> y + i) as v from nested;
+
+-- Transform an array with reference
+select transform(zs, z -> concat(ys, z)) as v from nested;
+
+-- Transform an array to an array of 0's
+select transform(ys, 0) as v from nested;
+
+-- Transform a null array
+select transform(cast(null as array), x -> x + 1) as v;
--- End diff --

shall we add a test for nested lambda?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

2018-08-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21954#discussion_r208276367
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala
 ---
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.DataType
+
+/**
+ * Resolve a higher order functions from the catalog. This is different 
from regular function
+ * resolution because lambda functions can only be resolved after the 
function has been resolved;
+ * so we need to resolve higher order function when all children are 
either resolved or a lambda
+ * function.
+ */
+case class ResolveHigherOrderFunctions(catalog: SessionCatalog) extends 
Rule[LogicalPlan] {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperators {
+case q: LogicalPlan =>
+  q.transformExpressions {
+case u @ UnresolvedFunction(fn, children, false)
+if hasLambdaAndResolvedArguments(children) =>
+  withPosition(u) {
+catalog.lookupFunction(fn, children) match {
+  case func: HigherOrderFunction => func
+  case other => other.failAnalysis(
+"A lambda function should only be used in a higher order 
function. However, " +
+  s"its class is ${other.getClass.getCanonicalName}, which 
is not a " +
+  s"higher order function.")
+}
+  }
+  }
+  }
+
+  /**
+   * Check if the arguments of a function are either resolved or a lambda 
function.
+   */
+  private def hasLambdaAndResolvedArguments(expressions: Seq[Expression]): 
Boolean = {
+val (lambdas, others) = 
expressions.partition(_.isInstanceOf[LambdaFunction])
+lambdas.nonEmpty && others.forall(_.resolved)
+  }
+}
+
+/**
+ * Resolve the lambda variables exposed by a higher order functions.
+ *
+ * This rule works in two steps:
+ * [1]. Bind the anonymous variables exposed by the higher order function 
to the lambda function's
+ *  arguments; this creates named and typed lambda variables. The 
argument names are checked
+ *  for duplicates and the number of arguments are checked during this 
step.
+ * [2]. Resolve the used lambda variables used in the lambda function's 
function expression tree.
+ *  Note that we allow the use of variables from outside the current 
lambda, this can either
+ *  be a lambda function defined in an outer scope, or a attribute in 
produced by the plan's
+ *  child. If names are duplicate, the name defined in the most inner 
scope is used.
+ */
+case class ResolveLambdaVariables(conf: SQLConf) extends Rule[LogicalPlan] 
{
+
+  type LambdaVariableMap = Map[String, NamedExpression]
+
+  private val canonicalizer = {
+if (!conf.caseSensitiveAnalysis) {
+  s: String => s.toLowerCase
+} else {
+  s: String => s
+}
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+plan.resolveOperators {
+  case q: LogicalPlan =>
+q.mapExpressions(resolve(_, Map.empty))
+}
+  }
+
+  /**
+   * Create a bound lambda function by binding the arguments of a lambda 
function to the given
+   * partial arguments (dataType and nullability only). If the expression 
happens to be an already
+   * bound lambda function then we assume it has been bound to the correct 
arguments and do
+   * nothing. This function will produce a lambda function with hidden 
arguments when it is passed
+   * an a

[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

2018-08-07 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/21954#discussion_r208273712
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala
 ---
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.DataType
+
+/**
+ * Resolve a higher order functions from the catalog. This is different 
from regular function
+ * resolution because lambda functions can only be resolved after the 
function has been resolved;
+ * so we need to resolve higher order function when all children are 
either resolved or a lambda
+ * function.
+ */
+case class ResolveHigherOrderFunctions(catalog: SessionCatalog) extends 
Rule[LogicalPlan] {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperators {
+case q: LogicalPlan =>
+  q.transformExpressions {
+case u @ UnresolvedFunction(fn, children, false)
+if hasLambdaAndResolvedArguments(children) =>
+  withPosition(u) {
+catalog.lookupFunction(fn, children) match {
+  case func: HigherOrderFunction => func
+  case other => other.failAnalysis(
+"A lambda function should only be used in a higher order 
function. However, " +
+  s"its class is ${other.getClass.getCanonicalName}, which 
is not a " +
+  s"higher order function.")
+}
+  }
+  }
+  }
+
+  /**
+   * Check if the arguments of a function are either resolved or a lambda 
function.
+   */
+  private def hasLambdaAndResolvedArguments(expressions: Seq[Expression]): 
Boolean = {
+val (lambdas, others) = 
expressions.partition(_.isInstanceOf[LambdaFunction])
+lambdas.nonEmpty && others.forall(_.resolved)
+  }
+}
+
+/**
+ * Resolve the lambda variables exposed by a higher order functions.
+ *
+ * This rule works in two steps:
+ * [1]. Bind the anonymous variables exposed by the higher order function 
to the lambda function's
+ *  arguments; this creates named and typed lambda variables. The 
argument names are checked
+ *  for duplicates and the number of arguments are checked during this 
step.
+ * [2]. Resolve the used lambda variables used in the lambda function's 
function expression tree.
+ *  Note that we allow the use of variables from outside the current 
lambda, this can either
+ *  be a lambda function defined in an outer scope, or a attribute in 
produced by the plan's
+ *  child. If names are duplicate, the name defined in the most inner 
scope is used.
+ */
+case class ResolveLambdaVariables(conf: SQLConf) extends Rule[LogicalPlan] 
{
+
+  type LambdaVariableMap = Map[String, NamedExpression]
+
+  private val canonicalizer = {
+if (!conf.caseSensitiveAnalysis) {
+  s: String => s.toLowerCase
+} else {
+  s: String => s
+}
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+plan.resolveOperators {
+  case q: LogicalPlan =>
+q.mapExpressions(resolve(_, Map.empty))
+}
+  }
+
+  /**
+   * Create a bound lambda function by binding the arguments of a lambda 
function to the given
+   * partial arguments (dataType and nullability only). If the expression 
happens to be an already
+   * bound lambda function then we assume it has been bound to the correct 
arguments and do
+   * nothing. This function will produce a lambda function with hidden 
arguments when it is passed
+   * an 

[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

2018-08-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21954#discussion_r208272808
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala
 ---
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.DataType
+
+/**
+ * Resolve a higher order functions from the catalog. This is different 
from regular function
+ * resolution because lambda functions can only be resolved after the 
function has been resolved;
+ * so we need to resolve higher order function when all children are 
either resolved or a lambda
+ * function.
+ */
+case class ResolveHigherOrderFunctions(catalog: SessionCatalog) extends 
Rule[LogicalPlan] {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperators {
+case q: LogicalPlan =>
+  q.transformExpressions {
+case u @ UnresolvedFunction(fn, children, false)
+if hasLambdaAndResolvedArguments(children) =>
+  withPosition(u) {
+catalog.lookupFunction(fn, children) match {
+  case func: HigherOrderFunction => func
+  case other => other.failAnalysis(
+"A lambda function should only be used in a higher order 
function. However, " +
+  s"its class is ${other.getClass.getCanonicalName}, which 
is not a " +
+  s"higher order function.")
+}
+  }
+  }
+  }
+
+  /**
+   * Check if the arguments of a function are either resolved or a lambda 
function.
+   */
+  private def hasLambdaAndResolvedArguments(expressions: Seq[Expression]): 
Boolean = {
+val (lambdas, others) = 
expressions.partition(_.isInstanceOf[LambdaFunction])
+lambdas.nonEmpty && others.forall(_.resolved)
+  }
+}
+
+/**
+ * Resolve the lambda variables exposed by a higher order functions.
+ *
+ * This rule works in two steps:
+ * [1]. Bind the anonymous variables exposed by the higher order function 
to the lambda function's
+ *  arguments; this creates named and typed lambda variables. The 
argument names are checked
+ *  for duplicates and the number of arguments are checked during this 
step.
+ * [2]. Resolve the used lambda variables used in the lambda function's 
function expression tree.
+ *  Note that we allow the use of variables from outside the current 
lambda, this can either
+ *  be a lambda function defined in an outer scope, or a attribute in 
produced by the plan's
+ *  child. If names are duplicate, the name defined in the most inner 
scope is used.
+ */
+case class ResolveLambdaVariables(conf: SQLConf) extends Rule[LogicalPlan] 
{
+
+  type LambdaVariableMap = Map[String, NamedExpression]
+
+  private val canonicalizer = {
+if (!conf.caseSensitiveAnalysis) {
+  s: String => s.toLowerCase
+} else {
+  s: String => s
+}
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+plan.resolveOperators {
+  case q: LogicalPlan =>
+q.mapExpressions(resolve(_, Map.empty))
+}
+  }
+
+  /**
+   * Create a bound lambda function by binding the arguments of a lambda 
function to the given
+   * partial arguments (dataType and nullability only). If the expression 
happens to be an already
+   * bound lambda function then we assume it has been bound to the correct 
arguments and do
+   * nothing. This function will produce a lambda function with hidden 
arguments when it is passed
+   * an a

[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

2018-08-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21954#discussion_r208269300
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types._
+
+/**
+ * A named lambda variable.
+ */
+case class NamedLambdaVariable(
+name: String,
+dataType: DataType,
+nullable: Boolean,
+value: AtomicReference[Any] = new AtomicReference(),
+exprId: ExprId = NamedExpression.newExprId)
+  extends LeafExpression
+  with NamedExpression
+  with CodegenFallback {
+
+  override def qualifier: Option[String] = None
+
+  override def newInstance(): NamedExpression =
+copy(value = new AtomicReference(), exprId = NamedExpression.newExprId)
+
+  override def toAttribute: Attribute = {
+AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, 
None)
+  }
+
+  override def eval(input: InternalRow): Any = value.get
+
+  override def toString: String = s"lambda $name#${exprId.id}$typeSuffix"
+
+  override def simpleString: String = s"lambda $name#${exprId.id}: 
${dataType.simpleString}"
+}
+
+/**
+ * A lambda function and its arguments. A lambda function can be hidden 
when a user wants to
+ * process an completely independent expression in a 
[[HigherOrderFunction]], the lambda function
+ * and its variables are then only used for internal bookkeeping within 
the higher order function.
+ */
+case class LambdaFunction(
+function: Expression,
+arguments: Seq[NamedExpression],
+hidden: Boolean = false)
+  extends Expression with CodegenFallback {
+
+  override def children: Seq[Expression] = function +: arguments
+  override def dataType: DataType = function.dataType
+  override def nullable: Boolean = function.nullable
+
+  lazy val bound: Boolean = arguments.forall(_.resolved)
+
+  override def eval(input: InternalRow): Any = function.eval(input)
+}
+
+/**
+ * A higher order function takes one or more (lambda) functions and 
applies these to some objects.
+ * The function produces a number of variables which can be consumed by 
some lambda function.
+ */
+trait HigherOrderFunction extends Expression {
+
+  override def children: Seq[Expression] = inputs ++ functions
+
+  /**
+   * Inputs to the higher ordered function.
+   */
+  def inputs: Seq[Expression]
+
+  /**
+   * All inputs have been resolved. This means that the types and 
nullabilty of (most of) the
+   * lambda function arguments is known, and that we can start binding the 
lambda functions.
+   */
+  lazy val inputResolved: Boolean = inputs.forall(_.resolved)
+
+  /**
+   * Functions applied by the higher order function.
+   */
+  def functions: Seq[Expression]
+
+  /**
+   * All inputs must be resolved and all functions must be resolved lambda 
functions.
+   */
+  override lazy val resolved: Boolean = inputResolved && functions.forall {
+case l: LambdaFunction => l.resolved
+case _ => false
+  }
+
+  /**
+   * Bind the lambda functions to the [[HigherOrderFunction]] using the 
given bind function. The
+   * bind function takes the potential lambda and it's (partial) arguments 
and converts this into
+   * a bound lambda function.
+   */
+  def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): 
HigherOrderFunction
+
+  @tran

[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

2018-08-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21954#discussion_r208268129
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types._
+
+/**
+ * A named lambda variable.
+ */
+case class NamedLambdaVariable(
+name: String,
+dataType: DataType,
+nullable: Boolean,
+value: AtomicReference[Any] = new AtomicReference(),
+exprId: ExprId = NamedExpression.newExprId)
+  extends LeafExpression
+  with NamedExpression
+  with CodegenFallback {
+
+  override def qualifier: Option[String] = None
+
+  override def newInstance(): NamedExpression =
+copy(value = new AtomicReference(), exprId = NamedExpression.newExprId)
+
+  override def toAttribute: Attribute = {
+AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, 
None)
+  }
+
+  override def eval(input: InternalRow): Any = value.get
+
+  override def toString: String = s"lambda $name#${exprId.id}$typeSuffix"
+
+  override def simpleString: String = s"lambda $name#${exprId.id}: 
${dataType.simpleString}"
+}
+
+/**
+ * A lambda function and its arguments. A lambda function can be hidden 
when a user wants to
+ * process an completely independent expression in a 
[[HigherOrderFunction]], the lambda function
+ * and its variables are then only used for internal bookkeeping within 
the higher order function.
+ */
+case class LambdaFunction(
+function: Expression,
+arguments: Seq[NamedExpression],
+hidden: Boolean = false)
+  extends Expression with CodegenFallback {
+
+  override def children: Seq[Expression] = function +: arguments
+  override def dataType: DataType = function.dataType
+  override def nullable: Boolean = function.nullable
+
+  lazy val bound: Boolean = arguments.forall(_.resolved)
+
+  override def eval(input: InternalRow): Any = function.eval(input)
+}
+
+/**
+ * A higher order function takes one or more (lambda) functions and 
applies these to some objects.
+ * The function produces a number of variables which can be consumed by 
some lambda function.
+ */
+trait HigherOrderFunction extends Expression {
+
+  override def children: Seq[Expression] = inputs ++ functions
+
+  /**
+   * Inputs to the higher ordered function.
+   */
+  def inputs: Seq[Expression]
+
+  /**
+   * All inputs have been resolved. This means that the types and 
nullabilty of (most of) the
+   * lambda function arguments is known, and that we can start binding the 
lambda functions.
+   */
+  lazy val inputResolved: Boolean = inputs.forall(_.resolved)
+
+  /**
+   * Functions applied by the higher order function.
+   */
+  def functions: Seq[Expression]
+
+  /**
+   * All inputs must be resolved and all functions must be resolved lambda 
functions.
+   */
+  override lazy val resolved: Boolean = inputResolved && functions.forall {
+case l: LambdaFunction => l.resolved
+case _ => false
+  }
+
+  /**
+   * Bind the lambda functions to the [[HigherOrderFunction]] using the 
given bind function. The
+   * bind function takes the potential lambda and it's (partial) arguments 
and converts this into
+   * a bound lambda function.
+   */
+  def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): 
HigherOrderFunction
+
+  @tran

[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

2018-08-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21954#discussion_r208267632
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types._
+
+/**
+ * A named lambda variable.
+ */
+case class NamedLambdaVariable(
+name: String,
+dataType: DataType,
+nullable: Boolean,
+value: AtomicReference[Any] = new AtomicReference(),
+exprId: ExprId = NamedExpression.newExprId)
+  extends LeafExpression
+  with NamedExpression
+  with CodegenFallback {
+
+  override def qualifier: Option[String] = None
+
+  override def newInstance(): NamedExpression =
+copy(value = new AtomicReference(), exprId = NamedExpression.newExprId)
+
+  override def toAttribute: Attribute = {
+AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, 
None)
+  }
+
+  override def eval(input: InternalRow): Any = value.get
+
+  override def toString: String = s"lambda $name#${exprId.id}$typeSuffix"
+
+  override def simpleString: String = s"lambda $name#${exprId.id}: 
${dataType.simpleString}"
+}
+
+/**
+ * A lambda function and its arguments. A lambda function can be hidden 
when a user wants to
+ * process an completely independent expression in a 
[[HigherOrderFunction]], the lambda function
+ * and its variables are then only used for internal bookkeeping within 
the higher order function.
+ */
+case class LambdaFunction(
+function: Expression,
+arguments: Seq[NamedExpression],
+hidden: Boolean = false)
+  extends Expression with CodegenFallback {
+
+  override def children: Seq[Expression] = function +: arguments
+  override def dataType: DataType = function.dataType
+  override def nullable: Boolean = function.nullable
+
+  lazy val bound: Boolean = arguments.forall(_.resolved)
+
+  override def eval(input: InternalRow): Any = function.eval(input)
+}
+
+/**
+ * A higher order function takes one or more (lambda) functions and 
applies these to some objects.
+ * The function produces a number of variables which can be consumed by 
some lambda function.
+ */
+trait HigherOrderFunction extends Expression {
+
+  override def children: Seq[Expression] = inputs ++ functions
+
+  /**
+   * Inputs to the higher ordered function.
+   */
+  def inputs: Seq[Expression]
+
+  /**
+   * All inputs have been resolved. This means that the types and 
nullabilty of (most of) the
+   * lambda function arguments is known, and that we can start binding the 
lambda functions.
+   */
+  lazy val inputResolved: Boolean = inputs.forall(_.resolved)
+
+  /**
+   * Functions applied by the higher order function.
+   */
+  def functions: Seq[Expression]
--- End diff --

seems function must be `LambdaFunction`, why don't we enforce it at type 
level?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

2018-08-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21954#discussion_r208266333
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types._
+
+/**
+ * A named lambda variable.
+ */
+case class NamedLambdaVariable(
+name: String,
+dataType: DataType,
+nullable: Boolean,
+value: AtomicReference[Any] = new AtomicReference(),
+exprId: ExprId = NamedExpression.newExprId)
+  extends LeafExpression
+  with NamedExpression
+  with CodegenFallback {
+
+  override def qualifier: Option[String] = None
+
+  override def newInstance(): NamedExpression =
+copy(value = new AtomicReference(), exprId = NamedExpression.newExprId)
+
+  override def toAttribute: Attribute = {
+AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, 
None)
+  }
+
+  override def eval(input: InternalRow): Any = value.get
+
+  override def toString: String = s"lambda $name#${exprId.id}$typeSuffix"
+
+  override def simpleString: String = s"lambda $name#${exprId.id}: 
${dataType.simpleString}"
+}
+
+/**
+ * A lambda function and its arguments. A lambda function can be hidden 
when a user wants to
+ * process an completely independent expression in a 
[[HigherOrderFunction]], the lambda function
+ * and its variables are then only used for internal bookkeeping within 
the higher order function.
+ */
+case class LambdaFunction(
+function: Expression,
+arguments: Seq[NamedExpression],
+hidden: Boolean = false)
+  extends Expression with CodegenFallback {
+
+  override def children: Seq[Expression] = function +: arguments
+  override def dataType: DataType = function.dataType
+  override def nullable: Boolean = function.nullable
+
+  lazy val bound: Boolean = arguments.forall(_.resolved)
+
+  override def eval(input: InternalRow): Any = function.eval(input)
+}
+
+/**
+ * A higher order function takes one or more (lambda) functions and 
applies these to some objects.
+ * The function produces a number of variables which can be consumed by 
some lambda function.
+ */
+trait HigherOrderFunction extends Expression {
+
+  override def children: Seq[Expression] = inputs ++ functions
+
+  /**
+   * Inputs to the higher ordered function.
+   */
+  def inputs: Seq[Expression]
+
+  /**
+   * All inputs have been resolved. This means that the types and 
nullabilty of (most of) the
+   * lambda function arguments is known, and that we can start binding the 
lambda functions.
+   */
+  lazy val inputResolved: Boolean = inputs.forall(_.resolved)
+
+  /**
+   * Functions applied by the higher order function.
+   */
+  def functions: Seq[Expression]
+
+  /**
+   * All inputs must be resolved and all functions must be resolved lambda 
functions.
+   */
+  override lazy val resolved: Boolean = inputResolved && functions.forall {
+case l: LambdaFunction => l.resolved
+case _ => false
+  }
+
+  /**
+   * Bind the lambda functions to the [[HigherOrderFunction]] using the 
given bind function. The
+   * bind function takes the potential lambda and it's (partial) arguments 
and converts this into
+   * a bound lambda function.
+   */
+  def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): 
HigherOrderFunction
+
+  @tran

[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

2018-08-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21954


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

2018-08-02 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21954#discussion_r207172497
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types._
+
+/**
+ * A named lambda variable.
+ */
+case class NamedLambdaVariable(
+name: String,
+dataType: DataType,
+nullable: Boolean,
+value: AtomicReference[Any] = new AtomicReference(),
--- End diff --

I see. Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

2018-08-02 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/21954#discussion_r207171941
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types._
+
+/**
+ * A named lambda variable.
+ */
+case class NamedLambdaVariable(
+name: String,
+dataType: DataType,
+nullable: Boolean,
+value: AtomicReference[Any] = new AtomicReference(),
--- End diff --

Yeah, that makes sense. Let's leave it for now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

2018-08-02 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21954#discussion_r207169967
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types._
+
+/**
+ * A named lambda variable.
+ */
+case class NamedLambdaVariable(
+name: String,
+dataType: DataType,
+nullable: Boolean,
+value: AtomicReference[Any] = new AtomicReference(),
--- End diff --

Hmm, seems like overriding `fastEquals` is not enough..


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

2018-08-02 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21954#discussion_r207162371
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types._
+
+/**
+ * A named lambda variable.
+ */
+case class NamedLambdaVariable(
+name: String,
+dataType: DataType,
+nullable: Boolean,
+value: AtomicReference[Any] = new AtomicReference(),
--- End diff --

Ah, maybe I should override `fastEquals` instead of using `AtomicReference`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

2018-08-02 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21954#discussion_r207162167
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types._
+
+/**
+ * A named lambda variable.
+ */
+case class NamedLambdaVariable(
+name: String,
+dataType: DataType,
+nullable: Boolean,
+value: AtomicReference[Any] = new AtomicReference(),
--- End diff --

When I tried to make copies of `NamedLambdaVariable`s, the `transformUp` 
doesn't replace the variables, and generated wrong results.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

2018-08-02 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/21954#discussion_r207158636
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types._
+
+/**
+ * A named lambda variable.
+ */
+case class NamedLambdaVariable(
+name: String,
+dataType: DataType,
+nullable: Boolean,
+value: AtomicReference[Any] = new AtomicReference(),
--- End diff --

You did? Could you elaborate? There shouldn't be any current access here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

2018-08-02 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21954#discussion_r207148555
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types._
+
+/**
+ * A named lambda variable.
+ */
+case class NamedLambdaVariable(
+name: String,
+dataType: DataType,
+nullable: Boolean,
+value: AtomicReference[Any] = new AtomicReference(),
--- End diff --

Actually, also when creating `functionsForEval`. I needed it for 
`transformUp` work properly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

2018-08-02 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/21954#discussion_r207145478
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types._
+
+/**
+ * A named lambda variable.
+ */
+case class NamedLambdaVariable(
+name: String,
+dataType: DataType,
+nullable: Boolean,
+value: AtomicReference[Any] = new AtomicReference(),
--- End diff --

You are only using the `AtomicReference ` as an container right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

2018-08-02 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21954#discussion_r207143138
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types._
+
+/**
+ * A named lambda variable.
+ */
+case class NamedLambdaVariable(
+name: String,
+dataType: DataType,
+nullable: Boolean,
+value: AtomicReference[Any] = new AtomicReference(),
+exprId: ExprId = NamedExpression.newExprId)
+  extends LeafExpression
+  with NamedExpression
+  with CodegenFallback {
+
+  override def qualifier: Option[String] = None
+
+  override def newInstance(): NamedExpression =
+copy(value = new AtomicReference(), exprId = NamedExpression.newExprId)
+
+  override def toAttribute: Attribute = {
+AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, 
None)
+  }
+
+  override def eval(input: InternalRow): Any = value.get
+
+  override def toString: String = s"lambda $name#${exprId.id}$typeSuffix"
+
+  override def simpleString: String = s"lambda $name#${exprId.id}: 
${dataType.simpleString}"
+}
+
+/**
+ * A lambda function and its arguments. A lambda function can be hidden 
when a user wants to
+ * process an completely independent expression in a 
[[HigherOrderFunction]], the lambda function
+ * and its variables are then only used for internal bookkeeping within 
the higher order function.
+ */
+case class LambdaFunction(
+function: Expression,
+arguments: Seq[NamedExpression],
+hidden: Boolean = false)
+  extends Expression with CodegenFallback {
+
+  override def children: Seq[Expression] = function +: arguments
+  override def dataType: DataType = function.dataType
+  override def nullable: Boolean = function.nullable
+
+  lazy val bound: Boolean = arguments.forall(_.resolved)
+
+  override def eval(input: InternalRow): Any = function.eval(input)
+}
+
+/**
+ * A higher order function takes one or more (lambda) functions and 
applies these to some objects.
+ * The function produces a number of variables which can be consumed by 
some lambda function.
+ */
+trait HigherOrderFunction extends Expression {
+
+  override def children: Seq[Expression] = inputs ++ functions
+
+  /**
+   * Inputs to the higher ordered function.
+   */
+  def inputs: Seq[Expression]
+
+  /**
+   * All inputs have been resolved. This means that the types and 
nullabilty of (most of) the
+   * lambda function arguments is known, and that we can start binding the 
lambda functions.
+   */
+  lazy val inputResolved: Boolean = inputs.forall(_.resolved)
+
+  /**
+   * Functions applied by the higher order function.
+   */
+  def functions: Seq[Expression]
+
+  /**
+   * All inputs must be resolved and all functions must be resolved lambda 
functions.
+   */
+  override lazy val resolved: Boolean = inputResolved && functions.forall {
+case l: LambdaFunction => l.resolved
+case _ => false
+  }
+
+  /**
+   * Bind the lambda functions to the [[HigherOrderFunction]] using the 
given bind function. The
+   * bind function takes the potential lambda and it's (partial) arguments 
and converts this into
+   * a bound lambda function.
+   */
+  def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): 
HigherOrderFunction
+
+  @transie

[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

2018-08-02 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/21954#discussion_r207141916
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types._
+
+/**
+ * A named lambda variable.
+ */
+case class NamedLambdaVariable(
+name: String,
+dataType: DataType,
+nullable: Boolean,
+value: AtomicReference[Any] = new AtomicReference(),
+exprId: ExprId = NamedExpression.newExprId)
+  extends LeafExpression
+  with NamedExpression
+  with CodegenFallback {
+
+  override def qualifier: Option[String] = None
+
+  override def newInstance(): NamedExpression =
+copy(value = new AtomicReference(), exprId = NamedExpression.newExprId)
+
+  override def toAttribute: Attribute = {
+AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, 
None)
+  }
+
+  override def eval(input: InternalRow): Any = value.get
+
+  override def toString: String = s"lambda $name#${exprId.id}$typeSuffix"
+
+  override def simpleString: String = s"lambda $name#${exprId.id}: 
${dataType.simpleString}"
+}
+
+/**
+ * A lambda function and its arguments. A lambda function can be hidden 
when a user wants to
+ * process an completely independent expression in a 
[[HigherOrderFunction]], the lambda function
+ * and its variables are then only used for internal bookkeeping within 
the higher order function.
+ */
+case class LambdaFunction(
+function: Expression,
+arguments: Seq[NamedExpression],
+hidden: Boolean = false)
+  extends Expression with CodegenFallback {
+
+  override def children: Seq[Expression] = function +: arguments
+  override def dataType: DataType = function.dataType
+  override def nullable: Boolean = function.nullable
+
+  lazy val bound: Boolean = arguments.forall(_.resolved)
+
+  override def eval(input: InternalRow): Any = function.eval(input)
+}
+
+/**
+ * A higher order function takes one or more (lambda) functions and 
applies these to some objects.
+ * The function produces a number of variables which can be consumed by 
some lambda function.
+ */
+trait HigherOrderFunction extends Expression {
+
+  override def children: Seq[Expression] = inputs ++ functions
+
+  /**
+   * Inputs to the higher ordered function.
+   */
+  def inputs: Seq[Expression]
+
+  /**
+   * All inputs have been resolved. This means that the types and 
nullabilty of (most of) the
+   * lambda function arguments is known, and that we can start binding the 
lambda functions.
+   */
+  lazy val inputResolved: Boolean = inputs.forall(_.resolved)
+
+  /**
+   * Functions applied by the higher order function.
+   */
+  def functions: Seq[Expression]
+
+  /**
+   * All inputs must be resolved and all functions must be resolved lambda 
functions.
+   */
+  override lazy val resolved: Boolean = inputResolved && functions.forall {
+case l: LambdaFunction => l.resolved
+case _ => false
+  }
+
+  /**
+   * Bind the lambda functions to the [[HigherOrderFunction]] using the 
given bind function. The
+   * bind function takes the potential lambda and it's (partial) arguments 
and converts this into
+   * a bound lambda function.
+   */
+  def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): 
HigherOrderFunction
+
+  @tra

[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

2018-08-01 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21954#discussion_r207102738
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types._
+
+/**
+ * A named lambda variable.
+ */
+case class NamedLambdaVariable(
+name: String,
+dataType: DataType,
+nullable: Boolean,
+value: AtomicReference[Any] = new AtomicReference(),
+exprId: ExprId = NamedExpression.newExprId)
+  extends LeafExpression
+  with NamedExpression {
+
+  override def qualifier: Option[String] = None
+
+  override def newInstance(): NamedExpression =
+copy(value = new AtomicReference(), exprId = NamedExpression.newExprId)
+
+  override def toAttribute: Attribute = {
+AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, 
None)
+  }
+
+  override def eval(input: InternalRow): Any = value.get
+
+  override def genCode(ctx: CodegenContext): ExprCode = {
+val suffix = "_lambda_variable_" + exprId.id
+ExprCode(
+  if (nullable) JavaCode.isNullVariable(s"isNull_${name}$suffix") else 
FalseLiteral,
+  JavaCode.variable(s"value_${name}$suffix", dataType))
+  }
+
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
+throw new IllegalStateException("NamedLambdaVariable.doGenCode should 
not be called.")
+  }
+
+  override def toString: String = s"lambda $name#${exprId.id}$typeSuffix"
+
+  override def simpleString: String = s"lambda $name#${exprId.id}: 
${dataType.simpleString}"
+}
+
+/**
+ * A lambda function and its arguments. A lambda function can be hidden 
when a user wants to
+ * process an completely independent expression in a 
[[HigherOrderFunction]], the lambda function
+ * and its variables are then only used for internal bookkeeping within 
the higher order function.
+ */
+case class LambdaFunction(
+function: Expression,
+arguments: Seq[NamedExpression],
+hidden: Boolean = false)
+  extends Expression {
+
+  override def children: Seq[Expression] = function +: arguments
+  override def dataType: DataType = function.dataType
+  override def nullable: Boolean = function.nullable
+
+  lazy val bound: Boolean = arguments.forall(_.resolved)
+
+  override def eval(input: InternalRow): Any = function.eval(input)
+
+  override def genCode(ctx: CodegenContext): ExprCode = {
+function.genCode(ctx)
+  }
+
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
+throw new IllegalStateException("LambdaFunction.doGenCode should not 
be called.")
+  }
+}
+
+/**
+ * A higher order function takes one or more (lambda) functions and 
applies these to some objects.
+ * The function produces a number of variables which can be consumed by 
some lambda function.
+ */
+trait HigherOrderFunction extends Expression {
+
+  override def children: Seq[Expression] = inputs ++ functions
+
+  /**
+   * Inputs to the higher ordered function.
+   */
+  def inputs: Seq[Expression]
+
+  /**
+   * All inputs have been resolved. This means that the types and 
nullabilty of (most of) the
+   * lambda function arguments is known, and that we can start binding the 
lambda functions.
+   */
+  lazy val inputResolved: Boolean = inputs.forall(_.resolved)
+
+  /**
+   

[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

2018-08-01 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21954#discussion_r207098029
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types._
+
+/**
+ * A named lambda variable.
+ */
+case class NamedLambdaVariable(
+name: String,
+dataType: DataType,
+nullable: Boolean,
+value: AtomicReference[Any] = new AtomicReference(),
+exprId: ExprId = NamedExpression.newExprId)
+  extends LeafExpression
+  with NamedExpression {
+
+  override def qualifier: Option[String] = None
+
+  override def newInstance(): NamedExpression =
+copy(value = new AtomicReference(), exprId = NamedExpression.newExprId)
+
+  override def toAttribute: Attribute = {
+AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, 
None)
+  }
+
+  override def eval(input: InternalRow): Any = value.get
+
+  override def genCode(ctx: CodegenContext): ExprCode = {
+val suffix = "_lambda_variable_" + exprId.id
+ExprCode(
+  if (nullable) JavaCode.isNullVariable(s"isNull_${name}$suffix") else 
FalseLiteral,
+  JavaCode.variable(s"value_${name}$suffix", dataType))
+  }
+
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
+throw new IllegalStateException("NamedLambdaVariable.doGenCode should 
not be called.")
+  }
+
+  override def toString: String = s"lambda $name#${exprId.id}$typeSuffix"
+
+  override def simpleString: String = s"lambda $name#${exprId.id}: 
${dataType.simpleString}"
+}
+
+/**
+ * A lambda function and its arguments. A lambda function can be hidden 
when a user wants to
+ * process an completely independent expression in a 
[[HigherOrderFunction]], the lambda function
+ * and its variables are then only used for internal bookkeeping within 
the higher order function.
+ */
+case class LambdaFunction(
+function: Expression,
+arguments: Seq[NamedExpression],
+hidden: Boolean = false)
+  extends Expression {
+
+  override def children: Seq[Expression] = function +: arguments
+  override def dataType: DataType = function.dataType
+  override def nullable: Boolean = function.nullable
+
+  lazy val bound: Boolean = arguments.forall(_.resolved)
+
+  override def eval(input: InternalRow): Any = function.eval(input)
+
+  override def genCode(ctx: CodegenContext): ExprCode = {
+function.genCode(ctx)
+  }
+
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
+throw new IllegalStateException("LambdaFunction.doGenCode should not 
be called.")
+  }
+}
+
+/**
+ * A higher order function takes one or more (lambda) functions and 
applies these to some objects.
+ * The function produces a number of variables which can be consumed by 
some lambda function.
+ */
+trait HigherOrderFunction extends Expression {
+
+  override def children: Seq[Expression] = inputs ++ functions
+
+  /**
+   * Inputs to the higher ordered function.
+   */
+  def inputs: Seq[Expression]
+
+  /**
+   * All inputs have been resolved. This means that the types and 
nullabilty of (most of) the
+   * lambda function arguments is known, and that we can start binding the 
lambda functions.
+   */
+  lazy val inputResolved: Boolean = inputs.forall(_.resolved)
+
+  /**
+   

[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

2018-08-01 Thread ueshin
GitHub user ueshin opened a pull request:

https://github.com/apache/spark/pull/21954

[SPARK-23908][SQL] Add transform function.

## What changes were proposed in this pull request?

This pr adds `transform` function which transforms elements in an array 
using the function.
Optionally we can take the index of each element as the second argument.

```sql
> SELECT transform(array(1, 2, 3), x -> x + 1);
 array(2, 3, 4)
> SELECT transform(array(1, 2, 3), (x, i) -> x + i);
 array(1, 3, 5)
```

## How was this patch tested?

Added tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ueshin/apache-spark 
issues/SPARK-23908/transform

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21954.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21954


commit e860249c96854daefccc580ca52e3fad7d1acf67
Author: Takuya UESHIN 
Date:   2018-08-01T03:46:00Z

Add `LambdaFunction` and its parser.

commit 06825b6a3ae6b3085c4b4e5c010dffa75e988801
Author: Takuya UESHIN 
Date:   2018-08-01T05:52:40Z

Add `ResolveHigherOrderFunctions`.

commit 17ab2ffc73a664ba2b00d49f0835faff055274b0
Author: Takuya UESHIN 
Date:   2018-08-01T05:59:32Z

Add `ArrayTransform`.

commit 95a06b4fe3f660c617a6a53bf05a0a62306b
Author: Takuya UESHIN 
Date:   2018-08-01T08:06:01Z

Test in sql/core.

commit 4448d0b7a085c1613dc0dd52009b6d50388ec605
Author: Takuya UESHIN 
Date:   2018-08-01T09:20:14Z

Add negative cases.

commit abc685f86ee205f2a64b065c98a65fc2d36bfd75
Author: Takuya UESHIN 
Date:   2018-08-01T09:46:00Z

Add sql file.

commit ee450c5ef3f99d3bbf8dbbd05273bc63005bbccb
Author: Takuya UESHIN 
Date:   2018-08-01T10:15:44Z

Replace lambda variable in function by one in arguments to make sure the 
variables are the same as them in arguments.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org