spark git commit: [SPARK-18251][SQL] the type of Dataset can't be Option of non-flat type

2016-11-30 Thread lian
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 f542df310 -> 9e96ac5a9


[SPARK-18251][SQL] the type of Dataset can't be Option of non-flat type

## What changes were proposed in this pull request?

For input object of non-flat type, we can't encode it to row if it's null, as 
Spark SQL doesn't allow the entire row to be null, only its columns can be 
null. That's the reason we forbid users to use top level null objects in 
https://github.com/apache/spark/pull/13469

However, if users wrap non-flat type with `Option`, then we may still encoder 
top level null object to row, which is not allowed.

This PR fixes this case, and suggests users to wrap their type with `Tuple1` if 
they do wanna top level null objects.

## How was this patch tested?

new test

Author: Wenchen Fan 

Closes #15979 from cloud-fan/option.

(cherry picked from commit f135b70fd590438bebb2a54012a6f73074219758)
Signed-off-by: Cheng Lian 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9e96ac5a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9e96ac5a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9e96ac5a

Branch: refs/heads/branch-2.1
Commit: 9e96ac5a986c53ca1689e3d1f1365cc5107b5d88
Parents: f542df3
Author: Wenchen Fan 
Authored: Wed Nov 30 13:36:17 2016 -0800
Committer: Cheng Lian 
Committed: Wed Nov 30 13:54:37 2016 -0800

--
 .../apache/spark/sql/catalyst/ScalaReflection.scala   | 13 +
 .../sql/catalyst/encoders/ExpressionEncoder.scala | 14 --
 .../scala/org/apache/spark/sql/DatasetSuite.scala | 13 +++--
 .../org/apache/spark/sql/JsonFunctionsSuite.scala |  2 +-
 4 files changed, 37 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9e96ac5a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index 7bcaea7..0aa21b9 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -606,6 +606,19 @@ object ScalaReflection extends ScalaReflection {
   }
 
   /**
+   * Returns true if the given type is option of product type, e.g. 
`Option[Tuple2]`. Note that,
+   * we also treat [[DefinedByConstructorParams]] as product type.
+   */
+  def optionOfProductType(tpe: `Type`): Boolean = 
ScalaReflectionLock.synchronized {
+tpe match {
+  case t if t <:< localTypeOf[Option[_]] =>
+val TypeRef(_, _, Seq(optType)) = t
+definedByConstructorParams(optType)
+  case _ => false
+}
+  }
+
+  /**
* Returns the parameter names and types for the primary constructor of this 
class.
*
* Note that it only works for scala classes with primary constructor, and 
currently doesn't

http://git-wip-us.apache.org/repos/asf/spark/blob/9e96ac5a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
index 82e1a8a..9c4818d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
@@ -47,6 +47,16 @@ object ExpressionEncoder {
 // We convert the not-serializable TypeTag into StructType and ClassTag.
 val mirror = typeTag[T].mirror
 val tpe = typeTag[T].tpe
+
+if (ScalaReflection.optionOfProductType(tpe)) {
+  throw new UnsupportedOperationException(
+"Cannot create encoder for Option of Product type, because Product 
type is represented " +
+  "as a row, and the entire row can not be null in Spark SQL like 
normal databases. " +
+  "You can wrap your type with Tuple1 if you do want top level null 
Product objects, " +
+  "e.g. instead of creating `Dataset[Option[MyClass]]`, you can do 
something like " +
+  "`val ds: Dataset[Tuple1[MyClass]] = Seq(Tuple1(MyClass(...)), 
Tuple1(null)).toDS`")
+}
+
 val cls = mirror.runtimeClass(tpe)
 val flat = !ScalaReflection.definedByConstructorParams(tpe)
 
@@ -54,9 +64,9 @@ object ExpressionEncoder {
 val nullSafeInput = if (flat) {
   inputObject
 } else {
-  // For 

spark git commit: [SPARK-18251][SQL] the type of Dataset can't be Option of non-flat type

2016-11-30 Thread lian
Repository: spark
Updated Branches:
  refs/heads/master 60022bfd6 -> f135b70fd


[SPARK-18251][SQL] the type of Dataset can't be Option of non-flat type

## What changes were proposed in this pull request?

For input object of non-flat type, we can't encode it to row if it's null, as 
Spark SQL doesn't allow the entire row to be null, only its columns can be 
null. That's the reason we forbid users to use top level null objects in 
https://github.com/apache/spark/pull/13469

However, if users wrap non-flat type with `Option`, then we may still encoder 
top level null object to row, which is not allowed.

This PR fixes this case, and suggests users to wrap their type with `Tuple1` if 
they do wanna top level null objects.

## How was this patch tested?

new test

Author: Wenchen Fan 

Closes #15979 from cloud-fan/option.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f135b70f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f135b70f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f135b70f

Branch: refs/heads/master
Commit: f135b70fd590438bebb2a54012a6f73074219758
Parents: 60022bf
Author: Wenchen Fan 
Authored: Wed Nov 30 13:36:17 2016 -0800
Committer: Cheng Lian 
Committed: Wed Nov 30 13:36:17 2016 -0800

--
 .../apache/spark/sql/catalyst/ScalaReflection.scala   | 13 +
 .../sql/catalyst/encoders/ExpressionEncoder.scala | 14 --
 .../scala/org/apache/spark/sql/DatasetSuite.scala | 13 +++--
 .../org/apache/spark/sql/JsonFunctionsSuite.scala |  2 +-
 4 files changed, 37 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f135b70f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index 7bcaea7..0aa21b9 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -606,6 +606,19 @@ object ScalaReflection extends ScalaReflection {
   }
 
   /**
+   * Returns true if the given type is option of product type, e.g. 
`Option[Tuple2]`. Note that,
+   * we also treat [[DefinedByConstructorParams]] as product type.
+   */
+  def optionOfProductType(tpe: `Type`): Boolean = 
ScalaReflectionLock.synchronized {
+tpe match {
+  case t if t <:< localTypeOf[Option[_]] =>
+val TypeRef(_, _, Seq(optType)) = t
+definedByConstructorParams(optType)
+  case _ => false
+}
+  }
+
+  /**
* Returns the parameter names and types for the primary constructor of this 
class.
*
* Note that it only works for scala classes with primary constructor, and 
currently doesn't

http://git-wip-us.apache.org/repos/asf/spark/blob/f135b70f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
index 82e1a8a..9c4818d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
@@ -47,6 +47,16 @@ object ExpressionEncoder {
 // We convert the not-serializable TypeTag into StructType and ClassTag.
 val mirror = typeTag[T].mirror
 val tpe = typeTag[T].tpe
+
+if (ScalaReflection.optionOfProductType(tpe)) {
+  throw new UnsupportedOperationException(
+"Cannot create encoder for Option of Product type, because Product 
type is represented " +
+  "as a row, and the entire row can not be null in Spark SQL like 
normal databases. " +
+  "You can wrap your type with Tuple1 if you do want top level null 
Product objects, " +
+  "e.g. instead of creating `Dataset[Option[MyClass]]`, you can do 
something like " +
+  "`val ds: Dataset[Tuple1[MyClass]] = Seq(Tuple1(MyClass(...)), 
Tuple1(null)).toDS`")
+}
+
 val cls = mirror.runtimeClass(tpe)
 val flat = !ScalaReflection.definedByConstructorParams(tpe)
 
@@ -54,9 +64,9 @@ object ExpressionEncoder {
 val nullSafeInput = if (flat) {
   inputObject
 } else {
-  // For input object of non-flat type, we can't encode it to row if it's 
null, as Spark SQL
+  // For input object of Product type,