spark git commit: [SPARK-16286][SQL] Implement stack table generating function
Repository: spark Updated Branches: refs/heads/branch-2.0 e32c29d86 -> 565e18cf7 [SPARK-16286][SQL] Implement stack table generating function This PR implements `stack` table generating function. Pass the Jenkins tests including new testcases. Author: Dongjoon HyunCloses #14033 from dongjoon-hyun/SPARK-16286. (cherry picked from commit d0d28507cacfca5919dbfb4269892d58b62e8662) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/565e18cf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/565e18cf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/565e18cf Branch: refs/heads/branch-2.0 Commit: 565e18cf7670231b1fa9db84f907654da79e6cef Parents: e32c29d Author: Dongjoon Hyun Authored: Wed Jul 6 10:54:43 2016 +0800 Committer: Reynold Xin Committed: Thu Jul 7 21:09:09 2016 -0700 -- .../catalyst/analysis/FunctionRegistry.scala| 1 + .../sql/catalyst/expressions/generators.scala | 53 .../expressions/GeneratorExpressionSuite.scala | 18 +++ .../spark/sql/GeneratorFunctionSuite.scala | 53 .../spark/sql/hive/HiveSessionCatalog.scala | 2 +- 5 files changed, 126 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/565e18cf/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 021bec7..f6ebcae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -182,6 +182,7 @@ object FunctionRegistry { expression[PosExplode]("posexplode"), expression[Rand]("rand"), expression[Randn]("randn"), +expression[Stack]("stack"), expression[CreateStruct]("struct"), expression[CaseWhen]("when"), http://git-wip-us.apache.org/repos/asf/spark/blob/565e18cf/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala index 99b97c8..9d5c856 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala @@ -94,6 +94,59 @@ case class UserDefinedGenerator( } /** + * Separate v1, ..., vk into n rows. Each row will have k/n columns. n must be constant. + * {{{ + * SELECT stack(2, 1, 2, 3) -> + * 1 2 + * 3 NULL + * }}} + */ +@ExpressionDescription( + usage = "_FUNC_(n, v1, ..., vk) - Separate v1, ..., vk into n rows.", + extended = "> SELECT _FUNC_(2, 1, 2, 3);\n [1,2]\n [3,null]") +case class Stack(children: Seq[Expression]) +extends Expression with Generator with CodegenFallback { + + private lazy val numRows = children.head.eval().asInstanceOf[Int] + private lazy val numFields = Math.ceil((children.length - 1.0) / numRows).toInt + + override def checkInputDataTypes(): TypeCheckResult = { +if (children.length <= 1) { + TypeCheckResult.TypeCheckFailure(s"$prettyName requires at least 2 arguments.") +} else if (children.head.dataType != IntegerType || !children.head.foldable || numRows < 1) { + TypeCheckResult.TypeCheckFailure("The number of rows must be a positive constant integer.") +} else { + for (i <- 1 until children.length) { +val j = (i - 1) % numFields +if (children(i).dataType != elementSchema.fields(j).dataType) { + return TypeCheckResult.TypeCheckFailure( +s"Argument ${j + 1} (${elementSchema.fields(j).dataType}) != " + + s"Argument $i (${children(i).dataType})") +} + } + TypeCheckResult.TypeCheckSuccess +} + } + + override def elementSchema: StructType = +StructType(children.tail.take(numFields).zipWithIndex.map { + case (e, index) => StructField(s"col$index", e.dataType) +}) + + override def eval(input: InternalRow): TraversableOnce[InternalRow] = { +val values = children.tail.map(_.eval(input)).toArray +for (row <- 0 until numRows) yield { + val fields = new Array[Any](numFields) + for (col <- 0 until numFields) { +val index = row *
spark git commit: [SPARK-16286][SQL] Implement stack table generating function
Repository: spark Updated Branches: refs/heads/master fdde7d0aa -> d0d28507c [SPARK-16286][SQL] Implement stack table generating function ## What changes were proposed in this pull request? This PR implements `stack` table generating function. ## How was this patch tested? Pass the Jenkins tests including new testcases. Author: Dongjoon HyunCloses #14033 from dongjoon-hyun/SPARK-16286. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d0d28507 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d0d28507 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d0d28507 Branch: refs/heads/master Commit: d0d28507cacfca5919dbfb4269892d58b62e8662 Parents: fdde7d0 Author: Dongjoon Hyun Authored: Wed Jul 6 10:54:43 2016 +0800 Committer: Wenchen Fan Committed: Wed Jul 6 10:54:43 2016 +0800 -- .../catalyst/analysis/FunctionRegistry.scala| 1 + .../sql/catalyst/expressions/generators.scala | 53 .../expressions/GeneratorExpressionSuite.scala | 18 +++ .../spark/sql/GeneratorFunctionSuite.scala | 53 .../spark/sql/hive/HiveSessionCatalog.scala | 2 +- 5 files changed, 126 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d0d28507/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 021bec7..f6ebcae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -182,6 +182,7 @@ object FunctionRegistry { expression[PosExplode]("posexplode"), expression[Rand]("rand"), expression[Randn]("randn"), +expression[Stack]("stack"), expression[CreateStruct]("struct"), expression[CaseWhen]("when"), http://git-wip-us.apache.org/repos/asf/spark/blob/d0d28507/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala index 99b97c8..9d5c856 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala @@ -94,6 +94,59 @@ case class UserDefinedGenerator( } /** + * Separate v1, ..., vk into n rows. Each row will have k/n columns. n must be constant. + * {{{ + * SELECT stack(2, 1, 2, 3) -> + * 1 2 + * 3 NULL + * }}} + */ +@ExpressionDescription( + usage = "_FUNC_(n, v1, ..., vk) - Separate v1, ..., vk into n rows.", + extended = "> SELECT _FUNC_(2, 1, 2, 3);\n [1,2]\n [3,null]") +case class Stack(children: Seq[Expression]) +extends Expression with Generator with CodegenFallback { + + private lazy val numRows = children.head.eval().asInstanceOf[Int] + private lazy val numFields = Math.ceil((children.length - 1.0) / numRows).toInt + + override def checkInputDataTypes(): TypeCheckResult = { +if (children.length <= 1) { + TypeCheckResult.TypeCheckFailure(s"$prettyName requires at least 2 arguments.") +} else if (children.head.dataType != IntegerType || !children.head.foldable || numRows < 1) { + TypeCheckResult.TypeCheckFailure("The number of rows must be a positive constant integer.") +} else { + for (i <- 1 until children.length) { +val j = (i - 1) % numFields +if (children(i).dataType != elementSchema.fields(j).dataType) { + return TypeCheckResult.TypeCheckFailure( +s"Argument ${j + 1} (${elementSchema.fields(j).dataType}) != " + + s"Argument $i (${children(i).dataType})") +} + } + TypeCheckResult.TypeCheckSuccess +} + } + + override def elementSchema: StructType = +StructType(children.tail.take(numFields).zipWithIndex.map { + case (e, index) => StructField(s"col$index", e.dataType) +}) + + override def eval(input: InternalRow): TraversableOnce[InternalRow] = { +val values = children.tail.map(_.eval(input)).toArray +for (row <- 0 until numRows) yield { + val fields = new Array[Any](numFields) + for (col <- 0 until numFields) { +val index = row * numFields + col +fields.update(col,