spark git commit: [SPARK-16286][SQL] Implement stack table generating function

2016-07-07 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 e32c29d86 -> 565e18cf7


[SPARK-16286][SQL] Implement stack table generating function

This PR implements `stack` table generating function.

Pass the Jenkins tests including new testcases.

Author: Dongjoon Hyun 

Closes #14033 from dongjoon-hyun/SPARK-16286.

(cherry picked from commit d0d28507cacfca5919dbfb4269892d58b62e8662)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/565e18cf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/565e18cf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/565e18cf

Branch: refs/heads/branch-2.0
Commit: 565e18cf7670231b1fa9db84f907654da79e6cef
Parents: e32c29d
Author: Dongjoon Hyun 
Authored: Wed Jul 6 10:54:43 2016 +0800
Committer: Reynold Xin 
Committed: Thu Jul 7 21:09:09 2016 -0700

--
 .../catalyst/analysis/FunctionRegistry.scala|  1 +
 .../sql/catalyst/expressions/generators.scala   | 53 
 .../expressions/GeneratorExpressionSuite.scala  | 18 +++
 .../spark/sql/GeneratorFunctionSuite.scala  | 53 
 .../spark/sql/hive/HiveSessionCatalog.scala |  2 +-
 5 files changed, 126 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/565e18cf/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 021bec7..f6ebcae 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -182,6 +182,7 @@ object FunctionRegistry {
 expression[PosExplode]("posexplode"),
 expression[Rand]("rand"),
 expression[Randn]("randn"),
+expression[Stack]("stack"),
 expression[CreateStruct]("struct"),
 expression[CaseWhen]("when"),
 

http://git-wip-us.apache.org/repos/asf/spark/blob/565e18cf/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
index 99b97c8..9d5c856 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
@@ -94,6 +94,59 @@ case class UserDefinedGenerator(
 }
 
 /**
+ * Separate v1, ..., vk into n rows. Each row will have k/n columns. n must be 
constant.
+ * {{{
+ *   SELECT stack(2, 1, 2, 3) ->
+ *   1  2
+ *   3  NULL
+ * }}}
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(n, v1, ..., vk) - Separate v1, ..., vk into n rows.",
+  extended = "> SELECT _FUNC_(2, 1, 2, 3);\n  [1,2]\n  [3,null]")
+case class Stack(children: Seq[Expression])
+extends Expression with Generator with CodegenFallback {
+
+  private lazy val numRows = children.head.eval().asInstanceOf[Int]
+  private lazy val numFields = Math.ceil((children.length - 1.0) / 
numRows).toInt
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+if (children.length <= 1) {
+  TypeCheckResult.TypeCheckFailure(s"$prettyName requires at least 2 
arguments.")
+} else if (children.head.dataType != IntegerType || 
!children.head.foldable || numRows < 1) {
+  TypeCheckResult.TypeCheckFailure("The number of rows must be a positive 
constant integer.")
+} else {
+  for (i <- 1 until children.length) {
+val j = (i - 1) % numFields
+if (children(i).dataType != elementSchema.fields(j).dataType) {
+  return TypeCheckResult.TypeCheckFailure(
+s"Argument ${j + 1} (${elementSchema.fields(j).dataType}) != " +
+  s"Argument $i (${children(i).dataType})")
+}
+  }
+  TypeCheckResult.TypeCheckSuccess
+}
+  }
+
+  override def elementSchema: StructType =
+StructType(children.tail.take(numFields).zipWithIndex.map {
+  case (e, index) => StructField(s"col$index", e.dataType)
+})
+
+  override def eval(input: InternalRow): TraversableOnce[InternalRow] = {
+val values = children.tail.map(_.eval(input)).toArray
+for (row <- 0 until numRows) yield {
+  val fields = new Array[Any](numFields)
+  for (col <- 0 until numFields) {
+val index = row * 

spark git commit: [SPARK-16286][SQL] Implement stack table generating function

2016-07-05 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master fdde7d0aa -> d0d28507c


[SPARK-16286][SQL] Implement stack table generating function

## What changes were proposed in this pull request?

This PR implements `stack` table generating function.

## How was this patch tested?

Pass the Jenkins tests including new testcases.

Author: Dongjoon Hyun 

Closes #14033 from dongjoon-hyun/SPARK-16286.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d0d28507
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d0d28507
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d0d28507

Branch: refs/heads/master
Commit: d0d28507cacfca5919dbfb4269892d58b62e8662
Parents: fdde7d0
Author: Dongjoon Hyun 
Authored: Wed Jul 6 10:54:43 2016 +0800
Committer: Wenchen Fan 
Committed: Wed Jul 6 10:54:43 2016 +0800

--
 .../catalyst/analysis/FunctionRegistry.scala|  1 +
 .../sql/catalyst/expressions/generators.scala   | 53 
 .../expressions/GeneratorExpressionSuite.scala  | 18 +++
 .../spark/sql/GeneratorFunctionSuite.scala  | 53 
 .../spark/sql/hive/HiveSessionCatalog.scala |  2 +-
 5 files changed, 126 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d0d28507/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 021bec7..f6ebcae 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -182,6 +182,7 @@ object FunctionRegistry {
 expression[PosExplode]("posexplode"),
 expression[Rand]("rand"),
 expression[Randn]("randn"),
+expression[Stack]("stack"),
 expression[CreateStruct]("struct"),
 expression[CaseWhen]("when"),
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d0d28507/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
index 99b97c8..9d5c856 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
@@ -94,6 +94,59 @@ case class UserDefinedGenerator(
 }
 
 /**
+ * Separate v1, ..., vk into n rows. Each row will have k/n columns. n must be 
constant.
+ * {{{
+ *   SELECT stack(2, 1, 2, 3) ->
+ *   1  2
+ *   3  NULL
+ * }}}
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(n, v1, ..., vk) - Separate v1, ..., vk into n rows.",
+  extended = "> SELECT _FUNC_(2, 1, 2, 3);\n  [1,2]\n  [3,null]")
+case class Stack(children: Seq[Expression])
+extends Expression with Generator with CodegenFallback {
+
+  private lazy val numRows = children.head.eval().asInstanceOf[Int]
+  private lazy val numFields = Math.ceil((children.length - 1.0) / 
numRows).toInt
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+if (children.length <= 1) {
+  TypeCheckResult.TypeCheckFailure(s"$prettyName requires at least 2 
arguments.")
+} else if (children.head.dataType != IntegerType || 
!children.head.foldable || numRows < 1) {
+  TypeCheckResult.TypeCheckFailure("The number of rows must be a positive 
constant integer.")
+} else {
+  for (i <- 1 until children.length) {
+val j = (i - 1) % numFields
+if (children(i).dataType != elementSchema.fields(j).dataType) {
+  return TypeCheckResult.TypeCheckFailure(
+s"Argument ${j + 1} (${elementSchema.fields(j).dataType}) != " +
+  s"Argument $i (${children(i).dataType})")
+}
+  }
+  TypeCheckResult.TypeCheckSuccess
+}
+  }
+
+  override def elementSchema: StructType =
+StructType(children.tail.take(numFields).zipWithIndex.map {
+  case (e, index) => StructField(s"col$index", e.dataType)
+})
+
+  override def eval(input: InternalRow): TraversableOnce[InternalRow] = {
+val values = children.tail.map(_.eval(input)).toArray
+for (row <- 0 until numRows) yield {
+  val fields = new Array[Any](numFields)
+  for (col <- 0 until numFields) {
+val index = row * numFields + col
+fields.update(col,