spark git commit: [SPARK-11754][SQL] consolidate `ExpressionEncoder.tuple` and `Encoders.tuple`

2015-11-16 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 38fe092ff -> fbe65c592


[SPARK-11754][SQL] consolidate `ExpressionEncoder.tuple` and `Encoders.tuple`

These 2 are very similar, we can consolidate them into one.

Also add tests for it and fix a bug.

Author: Wenchen Fan 

Closes #9729 from cloud-fan/tuple.

(cherry picked from commit b1a9662623951079e80bd7498e064c4cae4977e9)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fbe65c59
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fbe65c59
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fbe65c59

Branch: refs/heads/branch-1.6
Commit: fbe65c5924d2f5f4789bf54a1da0a7b6bbf1eb42
Parents: 38fe092
Author: Wenchen Fan 
Authored: Mon Nov 16 12:45:34 2015 -0800
Committer: Michael Armbrust 
Committed: Mon Nov 16 12:46:26 2015 -0800

--
 .../scala/org/apache/spark/sql/Encoder.scala|  95 -
 .../catalyst/encoders/ExpressionEncoder.scala   | 104 ++-
 .../catalyst/encoders/ProductEncoderSuite.scala |  29 ++
 3 files changed, 108 insertions(+), 120 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fbe65c59/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
--
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
index 5f619d6..c8b017e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
@@ -19,10 +19,8 @@ package org.apache.spark.sql
 
 import scala.reflect.ClassTag
 
-import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types.{ObjectType, StructField, StructType}
-import org.apache.spark.util.Utils
+import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, encoderFor}
+import org.apache.spark.sql.types.StructType
 
 /**
  * Used to convert a JVM object of type `T` to and from the internal Spark SQL 
representation.
@@ -49,83 +47,34 @@ object Encoders {
   def DOUBLE: Encoder[java.lang.Double] = ExpressionEncoder(flat = true)
   def STRING: Encoder[java.lang.String] = ExpressionEncoder(flat = true)
 
-  def tuple[T1, T2](enc1: Encoder[T1], enc2: Encoder[T2]): Encoder[(T1, T2)] = 
{
-tuple(Seq(enc1, enc2).map(_.asInstanceOf[ExpressionEncoder[_]]))
-  .asInstanceOf[ExpressionEncoder[(T1, T2)]]
+  def tuple[T1, T2](
+  e1: Encoder[T1],
+  e2: Encoder[T2]): Encoder[(T1, T2)] = {
+ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2))
   }
 
   def tuple[T1, T2, T3](
-  enc1: Encoder[T1],
-  enc2: Encoder[T2],
-  enc3: Encoder[T3]): Encoder[(T1, T2, T3)] = {
-tuple(Seq(enc1, enc2, enc3).map(_.asInstanceOf[ExpressionEncoder[_]]))
-  .asInstanceOf[ExpressionEncoder[(T1, T2, T3)]]
+  e1: Encoder[T1],
+  e2: Encoder[T2],
+  e3: Encoder[T3]): Encoder[(T1, T2, T3)] = {
+ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2), encoderFor(e3))
   }
 
   def tuple[T1, T2, T3, T4](
-  enc1: Encoder[T1],
-  enc2: Encoder[T2],
-  enc3: Encoder[T3],
-  enc4: Encoder[T4]): Encoder[(T1, T2, T3, T4)] = {
-tuple(Seq(enc1, enc2, enc3, 
enc4).map(_.asInstanceOf[ExpressionEncoder[_]]))
-  .asInstanceOf[ExpressionEncoder[(T1, T2, T3, T4)]]
+  e1: Encoder[T1],
+  e2: Encoder[T2],
+  e3: Encoder[T3],
+  e4: Encoder[T4]): Encoder[(T1, T2, T3, T4)] = {
+ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2), encoderFor(e3), 
encoderFor(e4))
   }
 
   def tuple[T1, T2, T3, T4, T5](
-  enc1: Encoder[T1],
-  enc2: Encoder[T2],
-  enc3: Encoder[T3],
-  enc4: Encoder[T4],
-  enc5: Encoder[T5]): Encoder[(T1, T2, T3, T4, T5)] = {
-tuple(Seq(enc1, enc2, enc3, enc4, 
enc5).map(_.asInstanceOf[ExpressionEncoder[_]]))
-  .asInstanceOf[ExpressionEncoder[(T1, T2, T3, T4, T5)]]
-  }
-
-  private def tuple(encoders: Seq[ExpressionEncoder[_]]): ExpressionEncoder[_] 
= {
-assert(encoders.length > 1)
-// make sure all encoders are resolved, i.e. `Attribute` has been resolved 
to `BoundReference`.
-
assert(encoders.forall(_.fromRowExpression.find(_.isInstanceOf[Attribute]).isEmpty))
-
-val schema = StructType(encoders.zipWithIndex.map {
-  case (e, i) => StructField(s"_${i + 1}", if (e.flat) 
e.schema.head.dataType else e.schema)
-})
-
-val cls = 
Utils.getContextOrSparkClassLoader.loadClass(s"scala.Tuple${encoders.size}")
-
-val extractExpressions = encoders.map {
-  case e if e.flat => e.toRowExpressions.head
-  case other => CreateStruct(other.toRowExpressions)
-}.zipWithIndex.m

spark git commit: [SPARK-11754][SQL] consolidate `ExpressionEncoder.tuple` and `Encoders.tuple`

2015-11-16 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 24477d270 -> b1a966262


[SPARK-11754][SQL] consolidate `ExpressionEncoder.tuple` and `Encoders.tuple`

These 2 are very similar, we can consolidate them into one.

Also add tests for it and fix a bug.

Author: Wenchen Fan 

Closes #9729 from cloud-fan/tuple.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b1a96626
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b1a96626
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b1a96626

Branch: refs/heads/master
Commit: b1a9662623951079e80bd7498e064c4cae4977e9
Parents: 24477d2
Author: Wenchen Fan 
Authored: Mon Nov 16 12:45:34 2015 -0800
Committer: Michael Armbrust 
Committed: Mon Nov 16 12:45:34 2015 -0800

--
 .../scala/org/apache/spark/sql/Encoder.scala|  95 -
 .../catalyst/encoders/ExpressionEncoder.scala   | 104 ++-
 .../catalyst/encoders/ProductEncoderSuite.scala |  29 ++
 3 files changed, 108 insertions(+), 120 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b1a96626/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
--
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
index 5f619d6..c8b017e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
@@ -19,10 +19,8 @@ package org.apache.spark.sql
 
 import scala.reflect.ClassTag
 
-import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types.{ObjectType, StructField, StructType}
-import org.apache.spark.util.Utils
+import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, encoderFor}
+import org.apache.spark.sql.types.StructType
 
 /**
  * Used to convert a JVM object of type `T` to and from the internal Spark SQL 
representation.
@@ -49,83 +47,34 @@ object Encoders {
   def DOUBLE: Encoder[java.lang.Double] = ExpressionEncoder(flat = true)
   def STRING: Encoder[java.lang.String] = ExpressionEncoder(flat = true)
 
-  def tuple[T1, T2](enc1: Encoder[T1], enc2: Encoder[T2]): Encoder[(T1, T2)] = 
{
-tuple(Seq(enc1, enc2).map(_.asInstanceOf[ExpressionEncoder[_]]))
-  .asInstanceOf[ExpressionEncoder[(T1, T2)]]
+  def tuple[T1, T2](
+  e1: Encoder[T1],
+  e2: Encoder[T2]): Encoder[(T1, T2)] = {
+ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2))
   }
 
   def tuple[T1, T2, T3](
-  enc1: Encoder[T1],
-  enc2: Encoder[T2],
-  enc3: Encoder[T3]): Encoder[(T1, T2, T3)] = {
-tuple(Seq(enc1, enc2, enc3).map(_.asInstanceOf[ExpressionEncoder[_]]))
-  .asInstanceOf[ExpressionEncoder[(T1, T2, T3)]]
+  e1: Encoder[T1],
+  e2: Encoder[T2],
+  e3: Encoder[T3]): Encoder[(T1, T2, T3)] = {
+ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2), encoderFor(e3))
   }
 
   def tuple[T1, T2, T3, T4](
-  enc1: Encoder[T1],
-  enc2: Encoder[T2],
-  enc3: Encoder[T3],
-  enc4: Encoder[T4]): Encoder[(T1, T2, T3, T4)] = {
-tuple(Seq(enc1, enc2, enc3, 
enc4).map(_.asInstanceOf[ExpressionEncoder[_]]))
-  .asInstanceOf[ExpressionEncoder[(T1, T2, T3, T4)]]
+  e1: Encoder[T1],
+  e2: Encoder[T2],
+  e3: Encoder[T3],
+  e4: Encoder[T4]): Encoder[(T1, T2, T3, T4)] = {
+ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2), encoderFor(e3), 
encoderFor(e4))
   }
 
   def tuple[T1, T2, T3, T4, T5](
-  enc1: Encoder[T1],
-  enc2: Encoder[T2],
-  enc3: Encoder[T3],
-  enc4: Encoder[T4],
-  enc5: Encoder[T5]): Encoder[(T1, T2, T3, T4, T5)] = {
-tuple(Seq(enc1, enc2, enc3, enc4, 
enc5).map(_.asInstanceOf[ExpressionEncoder[_]]))
-  .asInstanceOf[ExpressionEncoder[(T1, T2, T3, T4, T5)]]
-  }
-
-  private def tuple(encoders: Seq[ExpressionEncoder[_]]): ExpressionEncoder[_] 
= {
-assert(encoders.length > 1)
-// make sure all encoders are resolved, i.e. `Attribute` has been resolved 
to `BoundReference`.
-
assert(encoders.forall(_.fromRowExpression.find(_.isInstanceOf[Attribute]).isEmpty))
-
-val schema = StructType(encoders.zipWithIndex.map {
-  case (e, i) => StructField(s"_${i + 1}", if (e.flat) 
e.schema.head.dataType else e.schema)
-})
-
-val cls = 
Utils.getContextOrSparkClassLoader.loadClass(s"scala.Tuple${encoders.size}")
-
-val extractExpressions = encoders.map {
-  case e if e.flat => e.toRowExpressions.head
-  case other => CreateStruct(other.toRowExpressions)
-}.zipWithIndex.map { case (expr, index) =>
-  expr.transformUp {
-case BoundReference(0, t: ObjectType, _) =>
-