spark git commit: [SPARK-11754][SQL] consolidate `ExpressionEncoder.tuple` and `Encoders.tuple`
Repository: spark Updated Branches: refs/heads/branch-1.6 38fe092ff -> fbe65c592 [SPARK-11754][SQL] consolidate `ExpressionEncoder.tuple` and `Encoders.tuple` These 2 are very similar, we can consolidate them into one. Also add tests for it and fix a bug. Author: Wenchen Fan Closes #9729 from cloud-fan/tuple. (cherry picked from commit b1a9662623951079e80bd7498e064c4cae4977e9) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fbe65c59 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fbe65c59 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fbe65c59 Branch: refs/heads/branch-1.6 Commit: fbe65c5924d2f5f4789bf54a1da0a7b6bbf1eb42 Parents: 38fe092 Author: Wenchen Fan Authored: Mon Nov 16 12:45:34 2015 -0800 Committer: Michael Armbrust Committed: Mon Nov 16 12:46:26 2015 -0800 -- .../scala/org/apache/spark/sql/Encoder.scala| 95 - .../catalyst/encoders/ExpressionEncoder.scala | 104 ++- .../catalyst/encoders/ProductEncoderSuite.scala | 29 ++ 3 files changed, 108 insertions(+), 120 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fbe65c59/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala index 5f619d6..c8b017e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala @@ -19,10 +19,8 @@ package org.apache.spark.sql import scala.reflect.ClassTag -import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{ObjectType, StructField, StructType} -import org.apache.spark.util.Utils +import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, encoderFor} +import org.apache.spark.sql.types.StructType /** * Used to convert a JVM object of type `T` to and from the internal Spark SQL representation. @@ -49,83 +47,34 @@ object Encoders { def DOUBLE: Encoder[java.lang.Double] = ExpressionEncoder(flat = true) def STRING: Encoder[java.lang.String] = ExpressionEncoder(flat = true) - def tuple[T1, T2](enc1: Encoder[T1], enc2: Encoder[T2]): Encoder[(T1, T2)] = { -tuple(Seq(enc1, enc2).map(_.asInstanceOf[ExpressionEncoder[_]])) - .asInstanceOf[ExpressionEncoder[(T1, T2)]] + def tuple[T1, T2]( + e1: Encoder[T1], + e2: Encoder[T2]): Encoder[(T1, T2)] = { +ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2)) } def tuple[T1, T2, T3]( - enc1: Encoder[T1], - enc2: Encoder[T2], - enc3: Encoder[T3]): Encoder[(T1, T2, T3)] = { -tuple(Seq(enc1, enc2, enc3).map(_.asInstanceOf[ExpressionEncoder[_]])) - .asInstanceOf[ExpressionEncoder[(T1, T2, T3)]] + e1: Encoder[T1], + e2: Encoder[T2], + e3: Encoder[T3]): Encoder[(T1, T2, T3)] = { +ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2), encoderFor(e3)) } def tuple[T1, T2, T3, T4]( - enc1: Encoder[T1], - enc2: Encoder[T2], - enc3: Encoder[T3], - enc4: Encoder[T4]): Encoder[(T1, T2, T3, T4)] = { -tuple(Seq(enc1, enc2, enc3, enc4).map(_.asInstanceOf[ExpressionEncoder[_]])) - .asInstanceOf[ExpressionEncoder[(T1, T2, T3, T4)]] + e1: Encoder[T1], + e2: Encoder[T2], + e3: Encoder[T3], + e4: Encoder[T4]): Encoder[(T1, T2, T3, T4)] = { +ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2), encoderFor(e3), encoderFor(e4)) } def tuple[T1, T2, T3, T4, T5]( - enc1: Encoder[T1], - enc2: Encoder[T2], - enc3: Encoder[T3], - enc4: Encoder[T4], - enc5: Encoder[T5]): Encoder[(T1, T2, T3, T4, T5)] = { -tuple(Seq(enc1, enc2, enc3, enc4, enc5).map(_.asInstanceOf[ExpressionEncoder[_]])) - .asInstanceOf[ExpressionEncoder[(T1, T2, T3, T4, T5)]] - } - - private def tuple(encoders: Seq[ExpressionEncoder[_]]): ExpressionEncoder[_] = { -assert(encoders.length > 1) -// make sure all encoders are resolved, i.e. `Attribute` has been resolved to `BoundReference`. - assert(encoders.forall(_.fromRowExpression.find(_.isInstanceOf[Attribute]).isEmpty)) - -val schema = StructType(encoders.zipWithIndex.map { - case (e, i) => StructField(s"_${i + 1}", if (e.flat) e.schema.head.dataType else e.schema) -}) - -val cls = Utils.getContextOrSparkClassLoader.loadClass(s"scala.Tuple${encoders.size}") - -val extractExpressions = encoders.map { - case e if e.flat => e.toRowExpressions.head - case other => CreateStruct(other.toRowExpressions) -}.zipWithIndex.m
spark git commit: [SPARK-11754][SQL] consolidate `ExpressionEncoder.tuple` and `Encoders.tuple`
Repository: spark Updated Branches: refs/heads/master 24477d270 -> b1a966262 [SPARK-11754][SQL] consolidate `ExpressionEncoder.tuple` and `Encoders.tuple` These 2 are very similar, we can consolidate them into one. Also add tests for it and fix a bug. Author: Wenchen Fan Closes #9729 from cloud-fan/tuple. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b1a96626 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b1a96626 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b1a96626 Branch: refs/heads/master Commit: b1a9662623951079e80bd7498e064c4cae4977e9 Parents: 24477d2 Author: Wenchen Fan Authored: Mon Nov 16 12:45:34 2015 -0800 Committer: Michael Armbrust Committed: Mon Nov 16 12:45:34 2015 -0800 -- .../scala/org/apache/spark/sql/Encoder.scala| 95 - .../catalyst/encoders/ExpressionEncoder.scala | 104 ++- .../catalyst/encoders/ProductEncoderSuite.scala | 29 ++ 3 files changed, 108 insertions(+), 120 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b1a96626/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala index 5f619d6..c8b017e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala @@ -19,10 +19,8 @@ package org.apache.spark.sql import scala.reflect.ClassTag -import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{ObjectType, StructField, StructType} -import org.apache.spark.util.Utils +import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, encoderFor} +import org.apache.spark.sql.types.StructType /** * Used to convert a JVM object of type `T` to and from the internal Spark SQL representation. @@ -49,83 +47,34 @@ object Encoders { def DOUBLE: Encoder[java.lang.Double] = ExpressionEncoder(flat = true) def STRING: Encoder[java.lang.String] = ExpressionEncoder(flat = true) - def tuple[T1, T2](enc1: Encoder[T1], enc2: Encoder[T2]): Encoder[(T1, T2)] = { -tuple(Seq(enc1, enc2).map(_.asInstanceOf[ExpressionEncoder[_]])) - .asInstanceOf[ExpressionEncoder[(T1, T2)]] + def tuple[T1, T2]( + e1: Encoder[T1], + e2: Encoder[T2]): Encoder[(T1, T2)] = { +ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2)) } def tuple[T1, T2, T3]( - enc1: Encoder[T1], - enc2: Encoder[T2], - enc3: Encoder[T3]): Encoder[(T1, T2, T3)] = { -tuple(Seq(enc1, enc2, enc3).map(_.asInstanceOf[ExpressionEncoder[_]])) - .asInstanceOf[ExpressionEncoder[(T1, T2, T3)]] + e1: Encoder[T1], + e2: Encoder[T2], + e3: Encoder[T3]): Encoder[(T1, T2, T3)] = { +ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2), encoderFor(e3)) } def tuple[T1, T2, T3, T4]( - enc1: Encoder[T1], - enc2: Encoder[T2], - enc3: Encoder[T3], - enc4: Encoder[T4]): Encoder[(T1, T2, T3, T4)] = { -tuple(Seq(enc1, enc2, enc3, enc4).map(_.asInstanceOf[ExpressionEncoder[_]])) - .asInstanceOf[ExpressionEncoder[(T1, T2, T3, T4)]] + e1: Encoder[T1], + e2: Encoder[T2], + e3: Encoder[T3], + e4: Encoder[T4]): Encoder[(T1, T2, T3, T4)] = { +ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2), encoderFor(e3), encoderFor(e4)) } def tuple[T1, T2, T3, T4, T5]( - enc1: Encoder[T1], - enc2: Encoder[T2], - enc3: Encoder[T3], - enc4: Encoder[T4], - enc5: Encoder[T5]): Encoder[(T1, T2, T3, T4, T5)] = { -tuple(Seq(enc1, enc2, enc3, enc4, enc5).map(_.asInstanceOf[ExpressionEncoder[_]])) - .asInstanceOf[ExpressionEncoder[(T1, T2, T3, T4, T5)]] - } - - private def tuple(encoders: Seq[ExpressionEncoder[_]]): ExpressionEncoder[_] = { -assert(encoders.length > 1) -// make sure all encoders are resolved, i.e. `Attribute` has been resolved to `BoundReference`. - assert(encoders.forall(_.fromRowExpression.find(_.isInstanceOf[Attribute]).isEmpty)) - -val schema = StructType(encoders.zipWithIndex.map { - case (e, i) => StructField(s"_${i + 1}", if (e.flat) e.schema.head.dataType else e.schema) -}) - -val cls = Utils.getContextOrSparkClassLoader.loadClass(s"scala.Tuple${encoders.size}") - -val extractExpressions = encoders.map { - case e if e.flat => e.toRowExpressions.head - case other => CreateStruct(other.toRowExpressions) -}.zipWithIndex.map { case (expr, index) => - expr.transformUp { -case BoundReference(0, t: ObjectType, _) => -