ueshin commented on a change in pull request #24232: [SPARK-27297] [SQL] Add 
higher order functions to scala API
URL: https://github.com/apache/spark/pull/24232#discussion_r330283268
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/functions.scala
 ##########
 @@ -3385,6 +3385,162 @@ object functions {
     ArrayExcept(col1.expr, col2.expr)
   }
 
+  private def createLambda(f: Column => Column) = {
+    val x = UnresolvedNamedLambdaVariable(Seq("x"))
+    val function = f(Column(x)).expr
+    LambdaFunction(function, Seq(x))
+  }
+
+  private def createLambda(f: (Column, Column) => Column) = {
+    val x = UnresolvedNamedLambdaVariable(Seq("x"))
+    val y = UnresolvedNamedLambdaVariable(Seq("y"))
+    val function = f(Column(x), Column(y)).expr
+    LambdaFunction(function, Seq(x, y))
+  }
+
+  private def createLambda(f: (Column, Column, Column) => Column) = {
+    val x = UnresolvedNamedLambdaVariable(Seq("x"))
+    val y = UnresolvedNamedLambdaVariable(Seq("y"))
+    val z = UnresolvedNamedLambdaVariable(Seq("z"))
+    val function = f(Column(x), Column(y), Column(z)).expr
+    LambdaFunction(function, Seq(x, y, z))
+  }
+
+  /**
+   * Returns an array of elements after applying a tranformation to each 
element
+   * in the input array.
+   *
+   * @group collection_funcs
+   * @since 3.0.0
+   */
+  def transform(column: Column, f: Column => Column): Column = withExpr {
+    ArrayTransform(column.expr, createLambda(f))
+  }
+
+  /**
+   * Returns an array of elements after applying a tranformation to each 
element
+   * in the input array.
+   *
+   * @group collection_funcs
+   * @since 3.0.0
+   */
+  def transform(column: Column, f: (Column, Column) => Column): Column = 
withExpr {
+    ArrayTransform(column.expr, createLambda(f))
+  }
+
+  /**
+   * Returns whether a predicate holds for one or more elements in the array.
+   *
+   * @group collection_funcs
+   * @since 3.0.0
+   */
+  def exists(column: Column, f: Column => Column): Column = withExpr {
+    ArrayExists(column.expr, createLambda(f))
+  }
+
+  /**
+   * Returns whether a predicate holds for every element in the array.
+   *
+   * @group collection_funcs
+   * @since 3.0.0
+   */
+  def forall(column: Column, f: Column => Column): Column = withExpr {
+    ArrayForAll(column.expr, createLambda(f))
+  }
+
+  /**
+   * Returns an array of elements for which a predicate holds in a given array.
+   *
+   * @group collection_funcs
+   * @since 3.0.0
+   */
+  def filter(column: Column, f: Column => Column): Column = withExpr {
+    ArrayFilter(column.expr, createLambda(f))
+  }
+
+  /**
+   * Applies a binary operator to an initial state and all elements in the 
array,
+   * and reduces this to a single state. The final state is converted into the 
final result
+   * by applying a finish function.
+   *
+   * @group collection_funcs
+   * @since 3.0.0
+   */
+  def aggregate(expr: Column, zero: Column, merge: (Column, Column) => Column,
+                finish: Column => Column): Column = withExpr {
+    ArrayAggregate(
+      expr.expr,
+      zero.expr,
+      createLambda(merge),
+      createLambda(finish)
+    )
+  }
+
+  /**
+   * Applies a binary operator to an initial state and all elements in the 
array,
+   * and reduces this to a single state.
+   *
+   * @group collection_funcs
+   * @since 3.0.0
+   */
+  def aggregate(expr: Column, zero: Column, merge: (Column, Column) => 
Column): Column =
+    aggregate(expr, zero, merge, c => c)
+
+  /**
+   * Merge two given arrays, element-wise, into a signle array using a 
function.
+   * If one array is shorter, nulls are appended at the end to match the 
length of the longer
+   * array, before applying the function.
+   *
+   * @group collection_funcs
+   * @since 3.0.0
+   */
+  def zip_with(left: Column, right: Column, f: (Column, Column) => Column): 
Column = withExpr {
+    ZipWith(left.expr, right.expr, createLambda(f))
+  }
+
+  /**
+   * Applies a function to every key-value pair in a map and returns
+   * a map with the results of those applications as the new keys for the 
pairs.
+   *
+   * @group collection_funcs
+   * @since 3.0.0
+   */
+  def transform_keys(expr: Column, f: (Column, Column) => Column): Column = 
withExpr {
+    TransformKeys(expr.expr, createLambda(f))
+  }
+
+  /**
+   * Applies a function to every key-value pair in a map and returns
+   * a map with the results of those applications as the new values for the 
pairs.
+   *
+   * @group collection_funcs
+   * @since 3.0.0
+   */
+  def transform_values(expr: Column, f: (Column, Column) => Column): Column = 
withExpr {
+    TransformValues(expr.expr, createLambda(f))
+  }
+
+  /**
+   * Returns a map whose key-value pairs satisfy a predicate.
+   *
+   * @group collection_funcs
+   * @since 3.0.0
+   */
+  def map_filter(expr: Column, f: (Column, Column) => Column): Column = 
withExpr {
+    MapFilter(expr.expr, createLambda(f))
+  }
+
+  /**
+   * Merge two given maps, key-wise into a single map using a function.
+   *
+   * @group collection_funcs
+   * @since 3.0.0
+   */
+  def map_zip_with(left: Column, right: Column,
+                   f: (Column, Column, Column) => Column): Column = withExpr {
 
 Review comment:
   ditto.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to