spark git commit: [SPARK-11590][SQL] use native json_tuple in lateral view

2015-11-10 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 6e2e84f3e -> 5ccc1eb08


[SPARK-11590][SQL] use native json_tuple in lateral view

Author: Wenchen Fan 

Closes #9562 from cloud-fan/json-tuple.

(cherry picked from commit 53600854c270d4c953fe95fbae528740b5cf6603)
Signed-off-by: Yin Huai 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5ccc1eb0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5ccc1eb0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5ccc1eb0

Branch: refs/heads/branch-1.6
Commit: 5ccc1eb08c14291bb1e94b1cd9fa3bff1172529d
Parents: 6e2e84f
Author: Wenchen Fan 
Authored: Tue Nov 10 11:21:31 2015 -0800
Committer: Yin Huai 
Committed: Tue Nov 10 11:22:11 2015 -0800

--
 .../catalyst/expressions/jsonExpressions.scala  | 23 +--
 .../expressions/JsonExpressionsSuite.scala  | 30 +++
 .../scala/org/apache/spark/sql/DataFrame.scala  |  8 +++--
 .../scala/org/apache/spark/sql/functions.scala  | 12 
 .../apache/spark/sql/JsonFunctionsSuite.scala   | 23 ---
 .../org/apache/spark/sql/hive/HiveQl.scala  |  4 +++
 .../org/apache/spark/sql/hive/HiveQlSuite.scala | 13 
 .../sql/hive/execution/SQLQuerySuite.scala  | 31 
 8 files changed, 104 insertions(+), 40 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5ccc1eb0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index 8c9853e..8cd7323 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -314,7 +314,7 @@ case class GetJsonObject(json: Expression, path: Expression)
 }
 
 case class JsonTuple(children: Seq[Expression])
-  extends Expression with CodegenFallback {
+  extends Generator with CodegenFallback {
 
   import SharedFactory._
 
@@ -324,8 +324,8 @@ case class JsonTuple(children: Seq[Expression])
   }
 
   // if processing fails this shared value will be returned
-  @transient private lazy val nullRow: InternalRow =
-new GenericInternalRow(Array.ofDim[Any](fieldExpressions.length))
+  @transient private lazy val nullRow: Seq[InternalRow] =
+new GenericInternalRow(Array.ofDim[Any](fieldExpressions.length)) :: Nil
 
   // the json body is the first child
   @transient private lazy val jsonExpr: Expression = children.head
@@ -344,15 +344,8 @@ case class JsonTuple(children: Seq[Expression])
   // and count the number of foldable fields, we'll use this later to optimize 
evaluation
   @transient private lazy val constantFields: Int = foldableFieldNames.count(_ 
!= null)
 
-  override lazy val dataType: StructType = {
-val fields = fieldExpressions.zipWithIndex.map {
-  case (_, idx) => StructField(
-name = s"c$idx", // mirroring GenericUDTFJSONTuple.initialize
-dataType = StringType,
-nullable = true)
-}
-
-StructType(fields)
+  override def elementTypes: Seq[(DataType, Boolean, String)] = 
fieldExpressions.zipWithIndex.map {
+case (_, idx) => (StringType, true, s"c$idx")
   }
 
   override def prettyName: String = "json_tuple"
@@ -367,7 +360,7 @@ case class JsonTuple(children: Seq[Expression])
 }
   }
 
-  override def eval(input: InternalRow): InternalRow = {
+  override def eval(input: InternalRow): TraversableOnce[InternalRow] = {
 val json = jsonExpr.eval(input).asInstanceOf[UTF8String]
 if (json == null) {
   return nullRow
@@ -383,7 +376,7 @@ case class JsonTuple(children: Seq[Expression])
 }
   }
 
-  private def parseRow(parser: JsonParser, input: InternalRow): InternalRow = {
+  private def parseRow(parser: JsonParser, input: InternalRow): 
Seq[InternalRow] = {
 // only objects are supported
 if (parser.nextToken() != JsonToken.START_OBJECT) {
   return nullRow
@@ -433,7 +426,7 @@ case class JsonTuple(children: Seq[Expression])
   parser.skipChildren()
 }
 
-new GenericInternalRow(row)
+new GenericInternalRow(row) :: Nil
   }
 
   private def copyCurrentStructure(generator: JsonGenerator, parser: 
JsonParser): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/5ccc1eb0/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
 
b/s

spark git commit: [SPARK-11590][SQL] use native json_tuple in lateral view

2015-11-10 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master dfcfcbcc0 -> 53600854c


[SPARK-11590][SQL] use native json_tuple in lateral view

Author: Wenchen Fan 

Closes #9562 from cloud-fan/json-tuple.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/53600854
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/53600854
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/53600854

Branch: refs/heads/master
Commit: 53600854c270d4c953fe95fbae528740b5cf6603
Parents: dfcfcbc
Author: Wenchen Fan 
Authored: Tue Nov 10 11:21:31 2015 -0800
Committer: Yin Huai 
Committed: Tue Nov 10 11:21:31 2015 -0800

--
 .../catalyst/expressions/jsonExpressions.scala  | 23 +--
 .../expressions/JsonExpressionsSuite.scala  | 30 +++
 .../scala/org/apache/spark/sql/DataFrame.scala  |  8 +++--
 .../scala/org/apache/spark/sql/functions.scala  | 12 
 .../apache/spark/sql/JsonFunctionsSuite.scala   | 23 ---
 .../org/apache/spark/sql/hive/HiveQl.scala  |  4 +++
 .../org/apache/spark/sql/hive/HiveQlSuite.scala | 13 
 .../sql/hive/execution/SQLQuerySuite.scala  | 31 
 8 files changed, 104 insertions(+), 40 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/53600854/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index 8c9853e..8cd7323 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -314,7 +314,7 @@ case class GetJsonObject(json: Expression, path: Expression)
 }
 
 case class JsonTuple(children: Seq[Expression])
-  extends Expression with CodegenFallback {
+  extends Generator with CodegenFallback {
 
   import SharedFactory._
 
@@ -324,8 +324,8 @@ case class JsonTuple(children: Seq[Expression])
   }
 
   // if processing fails this shared value will be returned
-  @transient private lazy val nullRow: InternalRow =
-new GenericInternalRow(Array.ofDim[Any](fieldExpressions.length))
+  @transient private lazy val nullRow: Seq[InternalRow] =
+new GenericInternalRow(Array.ofDim[Any](fieldExpressions.length)) :: Nil
 
   // the json body is the first child
   @transient private lazy val jsonExpr: Expression = children.head
@@ -344,15 +344,8 @@ case class JsonTuple(children: Seq[Expression])
   // and count the number of foldable fields, we'll use this later to optimize 
evaluation
   @transient private lazy val constantFields: Int = foldableFieldNames.count(_ 
!= null)
 
-  override lazy val dataType: StructType = {
-val fields = fieldExpressions.zipWithIndex.map {
-  case (_, idx) => StructField(
-name = s"c$idx", // mirroring GenericUDTFJSONTuple.initialize
-dataType = StringType,
-nullable = true)
-}
-
-StructType(fields)
+  override def elementTypes: Seq[(DataType, Boolean, String)] = 
fieldExpressions.zipWithIndex.map {
+case (_, idx) => (StringType, true, s"c$idx")
   }
 
   override def prettyName: String = "json_tuple"
@@ -367,7 +360,7 @@ case class JsonTuple(children: Seq[Expression])
 }
   }
 
-  override def eval(input: InternalRow): InternalRow = {
+  override def eval(input: InternalRow): TraversableOnce[InternalRow] = {
 val json = jsonExpr.eval(input).asInstanceOf[UTF8String]
 if (json == null) {
   return nullRow
@@ -383,7 +376,7 @@ case class JsonTuple(children: Seq[Expression])
 }
   }
 
-  private def parseRow(parser: JsonParser, input: InternalRow): InternalRow = {
+  private def parseRow(parser: JsonParser, input: InternalRow): 
Seq[InternalRow] = {
 // only objects are supported
 if (parser.nextToken() != JsonToken.START_OBJECT) {
   return nullRow
@@ -433,7 +426,7 @@ case class JsonTuple(children: Seq[Expression])
   parser.skipChildren()
 }
 
-new GenericInternalRow(row)
+new GenericInternalRow(row) :: Nil
   }
 
   private def copyCurrentStructure(generator: JsonGenerator, parser: 
JsonParser): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/53600854/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
index f