spark git commit: [SPARK-18251][SQL] the type of Dataset can't be Option of non-flat type
Repository: spark Updated Branches: refs/heads/branch-2.1 f542df310 -> 9e96ac5a9 [SPARK-18251][SQL] the type of Dataset can't be Option of non-flat type ## What changes were proposed in this pull request? For input object of non-flat type, we can't encode it to row if it's null, as Spark SQL doesn't allow the entire row to be null, only its columns can be null. That's the reason we forbid users to use top level null objects in https://github.com/apache/spark/pull/13469 However, if users wrap non-flat type with `Option`, then we may still encoder top level null object to row, which is not allowed. This PR fixes this case, and suggests users to wrap their type with `Tuple1` if they do wanna top level null objects. ## How was this patch tested? new test Author: Wenchen FanCloses #15979 from cloud-fan/option. (cherry picked from commit f135b70fd590438bebb2a54012a6f73074219758) Signed-off-by: Cheng Lian Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9e96ac5a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9e96ac5a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9e96ac5a Branch: refs/heads/branch-2.1 Commit: 9e96ac5a986c53ca1689e3d1f1365cc5107b5d88 Parents: f542df3 Author: Wenchen Fan Authored: Wed Nov 30 13:36:17 2016 -0800 Committer: Cheng Lian Committed: Wed Nov 30 13:54:37 2016 -0800 -- .../apache/spark/sql/catalyst/ScalaReflection.scala | 13 + .../sql/catalyst/encoders/ExpressionEncoder.scala | 14 -- .../scala/org/apache/spark/sql/DatasetSuite.scala | 13 +++-- .../org/apache/spark/sql/JsonFunctionsSuite.scala | 2 +- 4 files changed, 37 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9e96ac5a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index 7bcaea7..0aa21b9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -606,6 +606,19 @@ object ScalaReflection extends ScalaReflection { } /** + * Returns true if the given type is option of product type, e.g. `Option[Tuple2]`. Note that, + * we also treat [[DefinedByConstructorParams]] as product type. + */ + def optionOfProductType(tpe: `Type`): Boolean = ScalaReflectionLock.synchronized { +tpe match { + case t if t <:< localTypeOf[Option[_]] => +val TypeRef(_, _, Seq(optType)) = t +definedByConstructorParams(optType) + case _ => false +} + } + + /** * Returns the parameter names and types for the primary constructor of this class. * * Note that it only works for scala classes with primary constructor, and currently doesn't http://git-wip-us.apache.org/repos/asf/spark/blob/9e96ac5a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala index 82e1a8a..9c4818d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala @@ -47,6 +47,16 @@ object ExpressionEncoder { // We convert the not-serializable TypeTag into StructType and ClassTag. val mirror = typeTag[T].mirror val tpe = typeTag[T].tpe + +if (ScalaReflection.optionOfProductType(tpe)) { + throw new UnsupportedOperationException( +"Cannot create encoder for Option of Product type, because Product type is represented " + + "as a row, and the entire row can not be null in Spark SQL like normal databases. " + + "You can wrap your type with Tuple1 if you do want top level null Product objects, " + + "e.g. instead of creating `Dataset[Option[MyClass]]`, you can do something like " + + "`val ds: Dataset[Tuple1[MyClass]] = Seq(Tuple1(MyClass(...)), Tuple1(null)).toDS`") +} + val cls = mirror.runtimeClass(tpe) val flat = !ScalaReflection.definedByConstructorParams(tpe) @@ -54,9 +64,9 @@ object ExpressionEncoder { val nullSafeInput = if (flat) { inputObject } else { - // For
spark git commit: [SPARK-18251][SQL] the type of Dataset can't be Option of non-flat type
Repository: spark Updated Branches: refs/heads/master 60022bfd6 -> f135b70fd [SPARK-18251][SQL] the type of Dataset can't be Option of non-flat type ## What changes were proposed in this pull request? For input object of non-flat type, we can't encode it to row if it's null, as Spark SQL doesn't allow the entire row to be null, only its columns can be null. That's the reason we forbid users to use top level null objects in https://github.com/apache/spark/pull/13469 However, if users wrap non-flat type with `Option`, then we may still encoder top level null object to row, which is not allowed. This PR fixes this case, and suggests users to wrap their type with `Tuple1` if they do wanna top level null objects. ## How was this patch tested? new test Author: Wenchen FanCloses #15979 from cloud-fan/option. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f135b70f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f135b70f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f135b70f Branch: refs/heads/master Commit: f135b70fd590438bebb2a54012a6f73074219758 Parents: 60022bf Author: Wenchen Fan Authored: Wed Nov 30 13:36:17 2016 -0800 Committer: Cheng Lian Committed: Wed Nov 30 13:36:17 2016 -0800 -- .../apache/spark/sql/catalyst/ScalaReflection.scala | 13 + .../sql/catalyst/encoders/ExpressionEncoder.scala | 14 -- .../scala/org/apache/spark/sql/DatasetSuite.scala | 13 +++-- .../org/apache/spark/sql/JsonFunctionsSuite.scala | 2 +- 4 files changed, 37 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f135b70f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index 7bcaea7..0aa21b9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -606,6 +606,19 @@ object ScalaReflection extends ScalaReflection { } /** + * Returns true if the given type is option of product type, e.g. `Option[Tuple2]`. Note that, + * we also treat [[DefinedByConstructorParams]] as product type. + */ + def optionOfProductType(tpe: `Type`): Boolean = ScalaReflectionLock.synchronized { +tpe match { + case t if t <:< localTypeOf[Option[_]] => +val TypeRef(_, _, Seq(optType)) = t +definedByConstructorParams(optType) + case _ => false +} + } + + /** * Returns the parameter names and types for the primary constructor of this class. * * Note that it only works for scala classes with primary constructor, and currently doesn't http://git-wip-us.apache.org/repos/asf/spark/blob/f135b70f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala index 82e1a8a..9c4818d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala @@ -47,6 +47,16 @@ object ExpressionEncoder { // We convert the not-serializable TypeTag into StructType and ClassTag. val mirror = typeTag[T].mirror val tpe = typeTag[T].tpe + +if (ScalaReflection.optionOfProductType(tpe)) { + throw new UnsupportedOperationException( +"Cannot create encoder for Option of Product type, because Product type is represented " + + "as a row, and the entire row can not be null in Spark SQL like normal databases. " + + "You can wrap your type with Tuple1 if you do want top level null Product objects, " + + "e.g. instead of creating `Dataset[Option[MyClass]]`, you can do something like " + + "`val ds: Dataset[Tuple1[MyClass]] = Seq(Tuple1(MyClass(...)), Tuple1(null)).toDS`") +} + val cls = mirror.runtimeClass(tpe) val flat = !ScalaReflection.definedByConstructorParams(tpe) @@ -54,9 +64,9 @@ object ExpressionEncoder { val nullSafeInput = if (flat) { inputObject } else { - // For input object of non-flat type, we can't encode it to row if it's null, as Spark SQL + // For input object of Product type,