[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...

2018-10-25 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/22749


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...

2018-10-25 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r228135443
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 ---
@@ -212,21 +181,91 @@ object ExpressionEncoder {
  * A generic encoder for JVM objects that uses Catalyst Expressions for a 
`serializer`
  * and a `deserializer`.
  *
- * @param schema The schema after converting `T` to a Spark SQL row.
- * @param serializer A set of expressions, one for each top-level field 
that can be used to
- *   extract the values from a raw object into an 
[[InternalRow]].
- * @param deserializer An expression that will construct an object given 
an [[InternalRow]].
+ * @param objSerializer An expression that can be used to encode a raw 
object to corresponding
+ *   Spark SQL representation that can be a primitive 
column, array, map or a
+ *   struct. This represents how Spark SQL generally 
serializes an object of
+ *   type `T`.
+ * @param objDeserializer An expression that will construct an object 
given a Spark SQL
+ *representation. This represents how Spark SQL 
generally deserializes
+ *a serialized value in Spark SQL representation 
back to an object of
+ *type `T`.
  * @param clsTag A classtag for `T`.
  */
 case class ExpressionEncoder[T](
-schema: StructType,
-flat: Boolean,
-serializer: Seq[Expression],
-deserializer: Expression,
+objSerializer: Expression,
+objDeserializer: Expression,
 clsTag: ClassTag[T])
   extends Encoder[T] {
 
-  if (flat) require(serializer.size == 1)
+  /**
+   * A sequence of expressions, one for each top-level field that can be 
used to
+   * extract the values from a raw object into an [[InternalRow]]:
+   * 1. If `serializer` encodes a raw object to a struct, strip the outer 
If-IsNull and get
+   *the `CreateNamedStruct`.
+   * 2. For other cases, wrap the single serializer with 
`CreateNamedStruct`.
+   */
+  val serializer: Seq[NamedExpression] = {
+val clsName = Utils.getSimpleName(clsTag.runtimeClass)
+
+if (isSerializedAsStruct) {
+  val nullSafeSerializer = objSerializer.transformUp {
+case r: BoundReference =>
+  // For input object of Product type, we can't encode it to row 
if it's null, as Spark SQL
+  // doesn't allow top-level row to be null, only its columns can 
be null.
+  AssertNotNull(r, Seq("top level Product or row object"))
+  }
+  nullSafeSerializer match {
+case If(_: IsNull, _, s: CreateNamedStruct) => s
+case s: CreateNamedStruct => s
--- End diff --

Ok. Sounds good to me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...

2018-10-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r228134985
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 ---
@@ -212,21 +181,91 @@ object ExpressionEncoder {
  * A generic encoder for JVM objects that uses Catalyst Expressions for a 
`serializer`
  * and a `deserializer`.
  *
- * @param schema The schema after converting `T` to a Spark SQL row.
- * @param serializer A set of expressions, one for each top-level field 
that can be used to
- *   extract the values from a raw object into an 
[[InternalRow]].
- * @param deserializer An expression that will construct an object given 
an [[InternalRow]].
+ * @param objSerializer An expression that can be used to encode a raw 
object to corresponding
+ *   Spark SQL representation that can be a primitive 
column, array, map or a
+ *   struct. This represents how Spark SQL generally 
serializes an object of
+ *   type `T`.
+ * @param objDeserializer An expression that will construct an object 
given a Spark SQL
+ *representation. This represents how Spark SQL 
generally deserializes
+ *a serialized value in Spark SQL representation 
back to an object of
+ *type `T`.
  * @param clsTag A classtag for `T`.
  */
 case class ExpressionEncoder[T](
-schema: StructType,
-flat: Boolean,
-serializer: Seq[Expression],
-deserializer: Expression,
+objSerializer: Expression,
+objDeserializer: Expression,
 clsTag: ClassTag[T])
   extends Encoder[T] {
 
-  if (flat) require(serializer.size == 1)
+  /**
+   * A sequence of expressions, one for each top-level field that can be 
used to
+   * extract the values from a raw object into an [[InternalRow]]:
+   * 1. If `serializer` encodes a raw object to a struct, strip the outer 
If-IsNull and get
+   *the `CreateNamedStruct`.
+   * 2. For other cases, wrap the single serializer with 
`CreateNamedStruct`.
+   */
+  val serializer: Seq[NamedExpression] = {
+val clsName = Utils.getSimpleName(clsTag.runtimeClass)
+
+if (isSerializedAsStruct) {
+  val nullSafeSerializer = objSerializer.transformUp {
+case r: BoundReference =>
+  // For input object of Product type, we can't encode it to row 
if it's null, as Spark SQL
+  // doesn't allow top-level row to be null, only its columns can 
be null.
+  AssertNotNull(r, Seq("top level Product or row object"))
+  }
+  nullSafeSerializer match {
+case If(_: IsNull, _, s: CreateNamedStruct) => s
+case s: CreateNamedStruct => s
--- End diff --

this is minor, we can update it in another PR. We don't need to wait for 
another jenkins QA round.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...

2018-10-25 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r228133724
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 ---
@@ -212,21 +181,91 @@ object ExpressionEncoder {
  * A generic encoder for JVM objects that uses Catalyst Expressions for a 
`serializer`
  * and a `deserializer`.
  *
- * @param schema The schema after converting `T` to a Spark SQL row.
- * @param serializer A set of expressions, one for each top-level field 
that can be used to
- *   extract the values from a raw object into an 
[[InternalRow]].
- * @param deserializer An expression that will construct an object given 
an [[InternalRow]].
+ * @param objSerializer An expression that can be used to encode a raw 
object to corresponding
+ *   Spark SQL representation that can be a primitive 
column, array, map or a
+ *   struct. This represents how Spark SQL generally 
serializes an object of
+ *   type `T`.
+ * @param objDeserializer An expression that will construct an object 
given a Spark SQL
+ *representation. This represents how Spark SQL 
generally deserializes
+ *a serialized value in Spark SQL representation 
back to an object of
+ *type `T`.
  * @param clsTag A classtag for `T`.
  */
 case class ExpressionEncoder[T](
-schema: StructType,
-flat: Boolean,
-serializer: Seq[Expression],
-deserializer: Expression,
+objSerializer: Expression,
+objDeserializer: Expression,
 clsTag: ClassTag[T])
   extends Encoder[T] {
 
-  if (flat) require(serializer.size == 1)
+  /**
+   * A sequence of expressions, one for each top-level field that can be 
used to
+   * extract the values from a raw object into an [[InternalRow]]:
+   * 1. If `serializer` encodes a raw object to a struct, strip the outer 
If-IsNull and get
+   *the `CreateNamedStruct`.
+   * 2. For other cases, wrap the single serializer with 
`CreateNamedStruct`.
+   */
+  val serializer: Seq[NamedExpression] = {
+val clsName = Utils.getSimpleName(clsTag.runtimeClass)
+
+if (isSerializedAsStruct) {
+  val nullSafeSerializer = objSerializer.transformUp {
+case r: BoundReference =>
+  // For input object of Product type, we can't encode it to row 
if it's null, as Spark SQL
+  // doesn't allow top-level row to be null, only its columns can 
be null.
+  AssertNotNull(r, Seq("top level Product or row object"))
+  }
+  nullSafeSerializer match {
+case If(_: IsNull, _, s: CreateNamedStruct) => s
+case s: CreateNamedStruct => s
--- End diff --

oh, good catch! I think this is redundant pattern.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...

2018-10-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r228132840
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 ---
@@ -212,21 +181,91 @@ object ExpressionEncoder {
  * A generic encoder for JVM objects that uses Catalyst Expressions for a 
`serializer`
  * and a `deserializer`.
  *
- * @param schema The schema after converting `T` to a Spark SQL row.
- * @param serializer A set of expressions, one for each top-level field 
that can be used to
- *   extract the values from a raw object into an 
[[InternalRow]].
- * @param deserializer An expression that will construct an object given 
an [[InternalRow]].
+ * @param objSerializer An expression that can be used to encode a raw 
object to corresponding
+ *   Spark SQL representation that can be a primitive 
column, array, map or a
+ *   struct. This represents how Spark SQL generally 
serializes an object of
+ *   type `T`.
+ * @param objDeserializer An expression that will construct an object 
given a Spark SQL
+ *representation. This represents how Spark SQL 
generally deserializes
+ *a serialized value in Spark SQL representation 
back to an object of
+ *type `T`.
  * @param clsTag A classtag for `T`.
  */
 case class ExpressionEncoder[T](
-schema: StructType,
-flat: Boolean,
-serializer: Seq[Expression],
-deserializer: Expression,
+objSerializer: Expression,
+objDeserializer: Expression,
 clsTag: ClassTag[T])
   extends Encoder[T] {
 
-  if (flat) require(serializer.size == 1)
+  /**
+   * A sequence of expressions, one for each top-level field that can be 
used to
+   * extract the values from a raw object into an [[InternalRow]]:
+   * 1. If `serializer` encodes a raw object to a struct, strip the outer 
If-IsNull and get
+   *the `CreateNamedStruct`.
+   * 2. For other cases, wrap the single serializer with 
`CreateNamedStruct`.
+   */
+  val serializer: Seq[NamedExpression] = {
+val clsName = Utils.getSimpleName(clsTag.runtimeClass)
+
+if (isSerializedAsStruct) {
+  val nullSafeSerializer = objSerializer.transformUp {
+case r: BoundReference =>
+  // For input object of Product type, we can't encode it to row 
if it's null, as Spark SQL
+  // doesn't allow top-level row to be null, only its columns can 
be null.
+  AssertNotNull(r, Seq("top level Product or row object"))
+  }
+  nullSafeSerializer match {
+case If(_: IsNull, _, s: CreateNamedStruct) => s
+case s: CreateNamedStruct => s
--- End diff --

when will we hit this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...

2018-10-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r227996228
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
 ---
@@ -280,59 +281,59 @@ class ScalaReflectionSuite extends SparkFunSuite {
 
   test("serialize and deserialize arbitrary sequence types") {
 import scala.collection.immutable.Queue
-val queueSerializer = serializerFor[Queue[Int]](BoundReference(
-  0, ObjectType(classOf[Queue[Int]]), nullable = false))
-assert(queueSerializer.dataType.head.dataType ==
+val queueSerializer = 
serializerForType(ScalaReflection.localTypeOf[Queue[Int]])
+assert(queueSerializer.dataType ==
   ArrayType(IntegerType, containsNull = false))
 val queueDeserializer = deserializerFor[Queue[Int]]
 assert(queueDeserializer.dataType == ObjectType(classOf[Queue[_]]))
 
 import scala.collection.mutable.ArrayBuffer
-val arrayBufferSerializer = 
serializerFor[ArrayBuffer[Int]](BoundReference(
-  0, ObjectType(classOf[ArrayBuffer[Int]]), nullable = false))
-assert(arrayBufferSerializer.dataType.head.dataType ==
+val arrayBufferSerializer = 
serializerForType(ScalaReflection.localTypeOf[ArrayBuffer[Int]])
+assert(arrayBufferSerializer.dataType ==
   ArrayType(IntegerType, containsNull = false))
 val arrayBufferDeserializer = deserializerFor[ArrayBuffer[Int]]
 assert(arrayBufferDeserializer.dataType == 
ObjectType(classOf[ArrayBuffer[_]]))
   }
 
   test("serialize and deserialize arbitrary map types") {
-val mapSerializer = serializerFor[Map[Int, Int]](BoundReference(
-  0, ObjectType(classOf[Map[Int, Int]]), nullable = false))
-assert(mapSerializer.dataType.head.dataType ==
+val mapSerializer = 
serializerForType(ScalaReflection.localTypeOf[Map[Int, Int]])
+assert(mapSerializer.dataType ==
   MapType(IntegerType, IntegerType, valueContainsNull = false))
 val mapDeserializer = deserializerFor[Map[Int, Int]]
 assert(mapDeserializer.dataType == ObjectType(classOf[Map[_, _]]))
 
 import scala.collection.immutable.HashMap
-val hashMapSerializer = serializerFor[HashMap[Int, 
Int]](BoundReference(
-  0, ObjectType(classOf[HashMap[Int, Int]]), nullable = false))
-assert(hashMapSerializer.dataType.head.dataType ==
+val hashMapSerializer = 
serializerForType(ScalaReflection.localTypeOf[HashMap[Int, Int]])
+assert(hashMapSerializer.dataType ==
   MapType(IntegerType, IntegerType, valueContainsNull = false))
 val hashMapDeserializer = deserializerFor[HashMap[Int, Int]]
 assert(hashMapDeserializer.dataType == ObjectType(classOf[HashMap[_, 
_]]))
 
 import scala.collection.mutable.{LinkedHashMap => LHMap}
-val linkedHashMapSerializer = serializerFor[LHMap[Long, 
String]](BoundReference(
-  0, ObjectType(classOf[LHMap[Long, String]]), nullable = false))
-assert(linkedHashMapSerializer.dataType.head.dataType ==
+val linkedHashMapSerializer = serializerForType(
+ScalaReflection.localTypeOf[LHMap[Long, String]])
+assert(linkedHashMapSerializer.dataType ==
   MapType(LongType, StringType, valueContainsNull = true))
 val linkedHashMapDeserializer = deserializerFor[LHMap[Long, String]]
 assert(linkedHashMapDeserializer.dataType == 
ObjectType(classOf[LHMap[_, _]]))
   }
 
   test("SPARK-22442: Generate correct field names for special characters") 
{
-val serializer = serializerFor[SpecialCharAsFieldData](BoundReference(
-  0, ObjectType(classOf[SpecialCharAsFieldData]), nullable = false))
+val serializer = 
serializerForType(ScalaReflection.localTypeOf[SpecialCharAsFieldData])
--- End diff --

Ok. I see.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...

2018-10-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r227867735
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
 ---
@@ -280,59 +281,59 @@ class ScalaReflectionSuite extends SparkFunSuite {
 
   test("serialize and deserialize arbitrary sequence types") {
 import scala.collection.immutable.Queue
-val queueSerializer = serializerFor[Queue[Int]](BoundReference(
-  0, ObjectType(classOf[Queue[Int]]), nullable = false))
-assert(queueSerializer.dataType.head.dataType ==
+val queueSerializer = 
serializerForType(ScalaReflection.localTypeOf[Queue[Int]])
+assert(queueSerializer.dataType ==
   ArrayType(IntegerType, containsNull = false))
 val queueDeserializer = deserializerFor[Queue[Int]]
 assert(queueDeserializer.dataType == ObjectType(classOf[Queue[_]]))
 
 import scala.collection.mutable.ArrayBuffer
-val arrayBufferSerializer = 
serializerFor[ArrayBuffer[Int]](BoundReference(
-  0, ObjectType(classOf[ArrayBuffer[Int]]), nullable = false))
-assert(arrayBufferSerializer.dataType.head.dataType ==
+val arrayBufferSerializer = 
serializerForType(ScalaReflection.localTypeOf[ArrayBuffer[Int]])
+assert(arrayBufferSerializer.dataType ==
   ArrayType(IntegerType, containsNull = false))
 val arrayBufferDeserializer = deserializerFor[ArrayBuffer[Int]]
 assert(arrayBufferDeserializer.dataType == 
ObjectType(classOf[ArrayBuffer[_]]))
   }
 
   test("serialize and deserialize arbitrary map types") {
-val mapSerializer = serializerFor[Map[Int, Int]](BoundReference(
-  0, ObjectType(classOf[Map[Int, Int]]), nullable = false))
-assert(mapSerializer.dataType.head.dataType ==
+val mapSerializer = 
serializerForType(ScalaReflection.localTypeOf[Map[Int, Int]])
+assert(mapSerializer.dataType ==
   MapType(IntegerType, IntegerType, valueContainsNull = false))
 val mapDeserializer = deserializerFor[Map[Int, Int]]
 assert(mapDeserializer.dataType == ObjectType(classOf[Map[_, _]]))
 
 import scala.collection.immutable.HashMap
-val hashMapSerializer = serializerFor[HashMap[Int, 
Int]](BoundReference(
-  0, ObjectType(classOf[HashMap[Int, Int]]), nullable = false))
-assert(hashMapSerializer.dataType.head.dataType ==
+val hashMapSerializer = 
serializerForType(ScalaReflection.localTypeOf[HashMap[Int, Int]])
+assert(hashMapSerializer.dataType ==
   MapType(IntegerType, IntegerType, valueContainsNull = false))
 val hashMapDeserializer = deserializerFor[HashMap[Int, Int]]
 assert(hashMapDeserializer.dataType == ObjectType(classOf[HashMap[_, 
_]]))
 
 import scala.collection.mutable.{LinkedHashMap => LHMap}
-val linkedHashMapSerializer = serializerFor[LHMap[Long, 
String]](BoundReference(
-  0, ObjectType(classOf[LHMap[Long, String]]), nullable = false))
-assert(linkedHashMapSerializer.dataType.head.dataType ==
+val linkedHashMapSerializer = serializerForType(
+ScalaReflection.localTypeOf[LHMap[Long, String]])
+assert(linkedHashMapSerializer.dataType ==
   MapType(LongType, StringType, valueContainsNull = true))
 val linkedHashMapDeserializer = deserializerFor[LHMap[Long, String]]
 assert(linkedHashMapDeserializer.dataType == 
ObjectType(classOf[LHMap[_, _]]))
   }
 
   test("SPARK-22442: Generate correct field names for special characters") 
{
-val serializer = serializerFor[SpecialCharAsFieldData](BoundReference(
-  0, ObjectType(classOf[SpecialCharAsFieldData]), nullable = false))
+val serializer = 
serializerForType(ScalaReflection.localTypeOf[SpecialCharAsFieldData])
--- End diff --

like `deserializerFor` in this suite, let's also create a `serializerFor`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...

2018-10-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r227797378
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
 ---
@@ -280,59 +281,59 @@ class ScalaReflectionSuite extends SparkFunSuite {
 
   test("serialize and deserialize arbitrary sequence types") {
 import scala.collection.immutable.Queue
-val queueSerializer = serializerFor[Queue[Int]](BoundReference(
-  0, ObjectType(classOf[Queue[Int]]), nullable = false))
-assert(queueSerializer.dataType.head.dataType ==
+val queueSerializer = 
serializerForType(ScalaReflection.localTypeOf[Queue[Int]])
+assert(queueSerializer.dataType ==
   ArrayType(IntegerType, containsNull = false))
 val queueDeserializer = deserializerFor[Queue[Int]]
 assert(queueDeserializer.dataType == ObjectType(classOf[Queue[_]]))
 
 import scala.collection.mutable.ArrayBuffer
-val arrayBufferSerializer = 
serializerFor[ArrayBuffer[Int]](BoundReference(
-  0, ObjectType(classOf[ArrayBuffer[Int]]), nullable = false))
-assert(arrayBufferSerializer.dataType.head.dataType ==
+val arrayBufferSerializer = 
serializerForType(ScalaReflection.localTypeOf[ArrayBuffer[Int]])
+assert(arrayBufferSerializer.dataType ==
   ArrayType(IntegerType, containsNull = false))
 val arrayBufferDeserializer = deserializerFor[ArrayBuffer[Int]]
 assert(arrayBufferDeserializer.dataType == 
ObjectType(classOf[ArrayBuffer[_]]))
   }
 
   test("serialize and deserialize arbitrary map types") {
-val mapSerializer = serializerFor[Map[Int, Int]](BoundReference(
-  0, ObjectType(classOf[Map[Int, Int]]), nullable = false))
-assert(mapSerializer.dataType.head.dataType ==
+val mapSerializer = 
serializerForType(ScalaReflection.localTypeOf[Map[Int, Int]])
+assert(mapSerializer.dataType ==
   MapType(IntegerType, IntegerType, valueContainsNull = false))
 val mapDeserializer = deserializerFor[Map[Int, Int]]
 assert(mapDeserializer.dataType == ObjectType(classOf[Map[_, _]]))
 
 import scala.collection.immutable.HashMap
-val hashMapSerializer = serializerFor[HashMap[Int, 
Int]](BoundReference(
-  0, ObjectType(classOf[HashMap[Int, Int]]), nullable = false))
-assert(hashMapSerializer.dataType.head.dataType ==
+val hashMapSerializer = 
serializerForType(ScalaReflection.localTypeOf[HashMap[Int, Int]])
+assert(hashMapSerializer.dataType ==
   MapType(IntegerType, IntegerType, valueContainsNull = false))
 val hashMapDeserializer = deserializerFor[HashMap[Int, Int]]
 assert(hashMapDeserializer.dataType == ObjectType(classOf[HashMap[_, 
_]]))
 
 import scala.collection.mutable.{LinkedHashMap => LHMap}
-val linkedHashMapSerializer = serializerFor[LHMap[Long, 
String]](BoundReference(
-  0, ObjectType(classOf[LHMap[Long, String]]), nullable = false))
-assert(linkedHashMapSerializer.dataType.head.dataType ==
+val linkedHashMapSerializer = serializerForType(
+ScalaReflection.localTypeOf[LHMap[Long, String]])
+assert(linkedHashMapSerializer.dataType ==
   MapType(LongType, StringType, valueContainsNull = true))
 val linkedHashMapDeserializer = deserializerFor[LHMap[Long, String]]
 assert(linkedHashMapDeserializer.dataType == 
ObjectType(classOf[LHMap[_, _]]))
   }
 
   test("SPARK-22442: Generate correct field names for special characters") 
{
-val serializer = serializerFor[SpecialCharAsFieldData](BoundReference(
-  0, ObjectType(classOf[SpecialCharAsFieldData]), nullable = false))
+val serializer = 
serializerForType(ScalaReflection.localTypeOf[SpecialCharAsFieldData])
--- End diff --

Do you mean to create a method `serializerFor` in this suite? Or replace 
`serializerForType` with `ScalaReflection.serializerFor`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...

2018-10-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r227796616
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
 ---
@@ -280,59 +281,59 @@ class ScalaReflectionSuite extends SparkFunSuite {
 
   test("serialize and deserialize arbitrary sequence types") {
 import scala.collection.immutable.Queue
-val queueSerializer = serializerFor[Queue[Int]](BoundReference(
-  0, ObjectType(classOf[Queue[Int]]), nullable = false))
-assert(queueSerializer.dataType.head.dataType ==
+val queueSerializer = 
serializerForType(ScalaReflection.localTypeOf[Queue[Int]])
+assert(queueSerializer.dataType ==
   ArrayType(IntegerType, containsNull = false))
 val queueDeserializer = deserializerFor[Queue[Int]]
 assert(queueDeserializer.dataType == ObjectType(classOf[Queue[_]]))
 
 import scala.collection.mutable.ArrayBuffer
-val arrayBufferSerializer = 
serializerFor[ArrayBuffer[Int]](BoundReference(
-  0, ObjectType(classOf[ArrayBuffer[Int]]), nullable = false))
-assert(arrayBufferSerializer.dataType.head.dataType ==
+val arrayBufferSerializer = 
serializerForType(ScalaReflection.localTypeOf[ArrayBuffer[Int]])
+assert(arrayBufferSerializer.dataType ==
   ArrayType(IntegerType, containsNull = false))
 val arrayBufferDeserializer = deserializerFor[ArrayBuffer[Int]]
 assert(arrayBufferDeserializer.dataType == 
ObjectType(classOf[ArrayBuffer[_]]))
   }
 
   test("serialize and deserialize arbitrary map types") {
-val mapSerializer = serializerFor[Map[Int, Int]](BoundReference(
-  0, ObjectType(classOf[Map[Int, Int]]), nullable = false))
-assert(mapSerializer.dataType.head.dataType ==
+val mapSerializer = 
serializerForType(ScalaReflection.localTypeOf[Map[Int, Int]])
+assert(mapSerializer.dataType ==
   MapType(IntegerType, IntegerType, valueContainsNull = false))
 val mapDeserializer = deserializerFor[Map[Int, Int]]
 assert(mapDeserializer.dataType == ObjectType(classOf[Map[_, _]]))
 
 import scala.collection.immutable.HashMap
-val hashMapSerializer = serializerFor[HashMap[Int, 
Int]](BoundReference(
-  0, ObjectType(classOf[HashMap[Int, Int]]), nullable = false))
-assert(hashMapSerializer.dataType.head.dataType ==
+val hashMapSerializer = 
serializerForType(ScalaReflection.localTypeOf[HashMap[Int, Int]])
+assert(hashMapSerializer.dataType ==
   MapType(IntegerType, IntegerType, valueContainsNull = false))
 val hashMapDeserializer = deserializerFor[HashMap[Int, Int]]
 assert(hashMapDeserializer.dataType == ObjectType(classOf[HashMap[_, 
_]]))
 
 import scala.collection.mutable.{LinkedHashMap => LHMap}
-val linkedHashMapSerializer = serializerFor[LHMap[Long, 
String]](BoundReference(
-  0, ObjectType(classOf[LHMap[Long, String]]), nullable = false))
-assert(linkedHashMapSerializer.dataType.head.dataType ==
+val linkedHashMapSerializer = serializerForType(
+ScalaReflection.localTypeOf[LHMap[Long, String]])
+assert(linkedHashMapSerializer.dataType ==
   MapType(LongType, StringType, valueContainsNull = true))
 val linkedHashMapDeserializer = deserializerFor[LHMap[Long, String]]
 assert(linkedHashMapDeserializer.dataType == 
ObjectType(classOf[LHMap[_, _]]))
   }
 
   test("SPARK-22442: Generate correct field names for special characters") 
{
-val serializer = serializerFor[SpecialCharAsFieldData](BoundReference(
-  0, ObjectType(classOf[SpecialCharAsFieldData]), nullable = false))
+val serializer = 
serializerForType(ScalaReflection.localTypeOf[SpecialCharAsFieldData])
--- End diff --

Ok.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...

2018-10-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r227783724
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
 ---
@@ -280,59 +281,59 @@ class ScalaReflectionSuite extends SparkFunSuite {
 
   test("serialize and deserialize arbitrary sequence types") {
 import scala.collection.immutable.Queue
-val queueSerializer = serializerFor[Queue[Int]](BoundReference(
-  0, ObjectType(classOf[Queue[Int]]), nullable = false))
-assert(queueSerializer.dataType.head.dataType ==
+val queueSerializer = 
serializerForType(ScalaReflection.localTypeOf[Queue[Int]])
+assert(queueSerializer.dataType ==
   ArrayType(IntegerType, containsNull = false))
 val queueDeserializer = deserializerFor[Queue[Int]]
 assert(queueDeserializer.dataType == ObjectType(classOf[Queue[_]]))
 
 import scala.collection.mutable.ArrayBuffer
-val arrayBufferSerializer = 
serializerFor[ArrayBuffer[Int]](BoundReference(
-  0, ObjectType(classOf[ArrayBuffer[Int]]), nullable = false))
-assert(arrayBufferSerializer.dataType.head.dataType ==
+val arrayBufferSerializer = 
serializerForType(ScalaReflection.localTypeOf[ArrayBuffer[Int]])
+assert(arrayBufferSerializer.dataType ==
   ArrayType(IntegerType, containsNull = false))
 val arrayBufferDeserializer = deserializerFor[ArrayBuffer[Int]]
 assert(arrayBufferDeserializer.dataType == 
ObjectType(classOf[ArrayBuffer[_]]))
   }
 
   test("serialize and deserialize arbitrary map types") {
-val mapSerializer = serializerFor[Map[Int, Int]](BoundReference(
-  0, ObjectType(classOf[Map[Int, Int]]), nullable = false))
-assert(mapSerializer.dataType.head.dataType ==
+val mapSerializer = 
serializerForType(ScalaReflection.localTypeOf[Map[Int, Int]])
+assert(mapSerializer.dataType ==
   MapType(IntegerType, IntegerType, valueContainsNull = false))
 val mapDeserializer = deserializerFor[Map[Int, Int]]
 assert(mapDeserializer.dataType == ObjectType(classOf[Map[_, _]]))
 
 import scala.collection.immutable.HashMap
-val hashMapSerializer = serializerFor[HashMap[Int, 
Int]](BoundReference(
-  0, ObjectType(classOf[HashMap[Int, Int]]), nullable = false))
-assert(hashMapSerializer.dataType.head.dataType ==
+val hashMapSerializer = 
serializerForType(ScalaReflection.localTypeOf[HashMap[Int, Int]])
+assert(hashMapSerializer.dataType ==
   MapType(IntegerType, IntegerType, valueContainsNull = false))
 val hashMapDeserializer = deserializerFor[HashMap[Int, Int]]
 assert(hashMapDeserializer.dataType == ObjectType(classOf[HashMap[_, 
_]]))
 
 import scala.collection.mutable.{LinkedHashMap => LHMap}
-val linkedHashMapSerializer = serializerFor[LHMap[Long, 
String]](BoundReference(
-  0, ObjectType(classOf[LHMap[Long, String]]), nullable = false))
-assert(linkedHashMapSerializer.dataType.head.dataType ==
+val linkedHashMapSerializer = serializerForType(
+ScalaReflection.localTypeOf[LHMap[Long, String]])
+assert(linkedHashMapSerializer.dataType ==
   MapType(LongType, StringType, valueContainsNull = true))
 val linkedHashMapDeserializer = deserializerFor[LHMap[Long, String]]
 assert(linkedHashMapDeserializer.dataType == 
ObjectType(classOf[LHMap[_, _]]))
   }
 
   test("SPARK-22442: Generate correct field names for special characters") 
{
-val serializer = serializerFor[SpecialCharAsFieldData](BoundReference(
-  0, ObjectType(classOf[SpecialCharAsFieldData]), nullable = false))
+val serializer = 
serializerForType(ScalaReflection.localTypeOf[SpecialCharAsFieldData])
--- End diff --

can we replace all the `serializerForType` with `serializerFor` in this 
suite?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...

2018-10-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r227745900
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
 ---
@@ -351,11 +347,15 @@ class ScalaReflectionSuite extends SparkFunSuite {
 
   test("SPARK-23835: add null check to non-nullable types in Tuples") {
 def numberOfCheckedArguments(deserializer: Expression): Int = {
-  assert(deserializer.isInstanceOf[NewInstance])
-  
deserializer.asInstanceOf[NewInstance].arguments.count(_.isInstanceOf[AssertNotNull])
+  val newInstance = deserializer.collect { case n: NewInstance => 
n}.head
+  newInstance.arguments.count(_.isInstanceOf[AssertNotNull])
 }
-assert(numberOfCheckedArguments(deserializerFor[(Double, Double)]) == 
2)
-assert(numberOfCheckedArguments(deserializerFor[(java.lang.Double, 
Int)]) == 1)
-assert(numberOfCheckedArguments(deserializerFor[(java.lang.Integer, 
java.lang.Integer)]) == 0)
+assert(numberOfCheckedArguments(
+  deserializerForType(ScalaReflection.localTypeOf[(Double, Double)])) 
== 2)
--- End diff --

Sounds good.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...

2018-10-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r227742062
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
 ---
@@ -351,11 +347,15 @@ class ScalaReflectionSuite extends SparkFunSuite {
 
   test("SPARK-23835: add null check to non-nullable types in Tuples") {
 def numberOfCheckedArguments(deserializer: Expression): Int = {
-  assert(deserializer.isInstanceOf[NewInstance])
-  
deserializer.asInstanceOf[NewInstance].arguments.count(_.isInstanceOf[AssertNotNull])
+  val newInstance = deserializer.collect { case n: NewInstance => 
n}.head
+  newInstance.arguments.count(_.isInstanceOf[AssertNotNull])
 }
-assert(numberOfCheckedArguments(deserializerFor[(Double, Double)]) == 
2)
-assert(numberOfCheckedArguments(deserializerFor[(java.lang.Double, 
Int)]) == 1)
-assert(numberOfCheckedArguments(deserializerFor[(java.lang.Integer, 
java.lang.Integer)]) == 0)
+assert(numberOfCheckedArguments(
+  deserializerForType(ScalaReflection.localTypeOf[(Double, Double)])) 
== 2)
--- End diff --

shall we create a `deserializerFor` method in this test suite to save some 
code diff?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...

2018-10-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r227739775
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 ---
@@ -212,21 +181,90 @@ object ExpressionEncoder {
  * A generic encoder for JVM objects that uses Catalyst Expressions for a 
`serializer`
  * and a `deserializer`.
  *
- * @param schema The schema after converting `T` to a Spark SQL row.
- * @param serializer A set of expressions, one for each top-level field 
that can be used to
- *   extract the values from a raw object into an 
[[InternalRow]].
- * @param deserializer An expression that will construct an object given 
an [[InternalRow]].
+ * @param objSerializer An expression that can be used to encode a raw 
object to corresponding
+ *   Spark SQL representation that can be a primitive 
column, array, map or a
+ *   struct. This represents how Spark SQL generally 
serializes an object of
+ *   type `T`.
+ * @param objDeserializer An expression that will construct an object 
given a Spark SQL
+ *representation. This represents how Spark SQL 
generally deserializes
+ *a serialized value in Spark SQL representation 
back to an object of
+ *type `T`.
  * @param clsTag A classtag for `T`.
  */
 case class ExpressionEncoder[T](
-schema: StructType,
-flat: Boolean,
-serializer: Seq[Expression],
-deserializer: Expression,
+objSerializer: Expression,
+objDeserializer: Expression,
 clsTag: ClassTag[T])
   extends Encoder[T] {
 
-  if (flat) require(serializer.size == 1)
+  /**
+   * A sequence of expressions, one for each top-level field that can be 
used to
+   * extract the values from a raw object into an [[InternalRow]]:
+   * 1. If `serializer` encodes a raw object to a struct, we directly use 
the `serializer`.
+   * 2. For other cases, we create a struct to wrap the `serializer`.
--- End diff --

Let's make these 2 comments more precise
```
1. If `serializer` encodes a raw object to a struct, strip the outer 
if-IsNull and get the CreateNamedStruct
2. For other cases, wrap the single serializer with CreateNamedStruct
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...

2018-10-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r227695574
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala 
---
@@ -434,17 +426,34 @@ object ScalaReflection extends ScalaReflection {
*  * the element type of [[Array]] or [[Seq]]: `array element class: 
"abc.xyz.MyClass"`
*  * the field of [[Product]]: `field (class: "abc.xyz.MyClass", name: 
"myField")`
*/
-  def serializerFor[T : TypeTag](inputObject: Expression): 
CreateNamedStruct = {
-val tpe = localTypeOf[T]
+  def serializerForType(tpe: `Type`,
+  cls: RuntimeClass): Expression = 
ScalaReflection.cleanUpReflectionObjects {
 val clsName = getClassNameFromType(tpe)
 val walkedTypePath = s"""- root class: "$clsName :: Nil
-serializerFor(inputObject, tpe, walkedTypePath) match {
-  case expressions.If(_, _, s: CreateNamedStruct) if 
definedByConstructorParams(tpe) => s
-  case other => CreateNamedStruct(expressions.Literal("value") :: 
other :: Nil)
-}
+
+// The input object to `ExpressionEncoder` is located at first column 
of an row.
+val inputObject = BoundReference(0, dataTypeFor(tpe),
+  nullable = !cls.isPrimitive)
+
+serializerFor(inputObject, tpe, walkedTypePath)
   }
 
-  /** Helper for extracting internal fields from a case class. */
+  /**
+   * Returns an expression for serializing the value of an input 
expression into Spark SQL
--- End diff --

I did simplify a lot of it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...

2018-10-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r227682823
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
 ---
@@ -58,12 +58,10 @@ object RowEncoder {
   def apply(schema: StructType): ExpressionEncoder[Row] = {
 val cls = classOf[Row]
 val inputObject = BoundReference(0, ObjectType(cls), nullable = true)
-val serializer = serializerFor(AssertNotNull(inputObject, Seq("top 
level row object")), schema)
-val deserializer = deserializerFor(schema)
+val serializer = serializerFor(inputObject, schema)
+val deserializer = deserializerFor(GetColumnByOrdinal(0, 
serializer.dataType), schema)
--- End diff --

ah i see, then let's leave it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...

2018-10-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r227682672
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala 
---
@@ -434,17 +426,34 @@ object ScalaReflection extends ScalaReflection {
*  * the element type of [[Array]] or [[Seq]]: `array element class: 
"abc.xyz.MyClass"`
*  * the field of [[Product]]: `field (class: "abc.xyz.MyClass", name: 
"myField")`
*/
-  def serializerFor[T : TypeTag](inputObject: Expression): 
CreateNamedStruct = {
-val tpe = localTypeOf[T]
+  def serializerForType(tpe: `Type`,
+  cls: RuntimeClass): Expression = 
ScalaReflection.cleanUpReflectionObjects {
 val clsName = getClassNameFromType(tpe)
 val walkedTypePath = s"""- root class: "$clsName :: Nil
-serializerFor(inputObject, tpe, walkedTypePath) match {
-  case expressions.If(_, _, s: CreateNamedStruct) if 
definedByConstructorParams(tpe) => s
-  case other => CreateNamedStruct(expressions.Literal("value") :: 
other :: Nil)
-}
+
+// The input object to `ExpressionEncoder` is located at first column 
of an row.
+val inputObject = BoundReference(0, dataTypeFor(tpe),
+  nullable = !cls.isPrimitive)
--- End diff --

good, then we don't need `cls` as a parameter.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...

2018-10-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r227681844
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -1087,7 +1087,7 @@ class Dataset[T] private[sql](
 // Note that we do this before joining them, to enable the join 
operator to return null for one
 // side, in cases like outer-join.
 val left = {
-  val combined = if (this.exprEnc.flat) {
+  val combined = if 
(!this.exprEnc.objSerializer.dataType.isInstanceOf[StructType]) {
--- End diff --

shall we create a method in `ExpressionEncoder` for this check?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...

2018-10-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r227678714
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
 ---
@@ -58,12 +58,10 @@ object RowEncoder {
   def apply(schema: StructType): ExpressionEncoder[Row] = {
 val cls = classOf[Row]
 val inputObject = BoundReference(0, ObjectType(cls), nullable = true)
-val serializer = serializerFor(AssertNotNull(inputObject, Seq("top 
level row object")), schema)
-val deserializer = deserializerFor(schema)
+val serializer = serializerFor(inputObject, schema)
+val deserializer = deserializerFor(GetColumnByOrdinal(0, 
serializer.dataType), schema)
--- End diff --

Ah, we need to access `serializer.dataType` here. So if we want to create 
`GetColumnByOrdinal` in `deserializeFor`, we need to pass this data type too. 
What do you think?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...

2018-10-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r227677176
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
 ---
@@ -58,12 +58,10 @@ object RowEncoder {
   def apply(schema: StructType): ExpressionEncoder[Row] = {
 val cls = classOf[Row]
 val inputObject = BoundReference(0, ObjectType(cls), nullable = true)
-val serializer = serializerFor(AssertNotNull(inputObject, Seq("top 
level row object")), schema)
-val deserializer = deserializerFor(schema)
+val serializer = serializerFor(inputObject, schema)
+val deserializer = deserializerFor(GetColumnByOrdinal(0, 
serializer.dataType), schema)
--- End diff --

Ok. Sounds better.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...

2018-10-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r227675871
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala 
---
@@ -434,17 +426,34 @@ object ScalaReflection extends ScalaReflection {
*  * the element type of [[Array]] or [[Seq]]: `array element class: 
"abc.xyz.MyClass"`
*  * the field of [[Product]]: `field (class: "abc.xyz.MyClass", name: 
"myField")`
*/
-  def serializerFor[T : TypeTag](inputObject: Expression): 
CreateNamedStruct = {
-val tpe = localTypeOf[T]
+  def serializerForType(tpe: `Type`,
+  cls: RuntimeClass): Expression = 
ScalaReflection.cleanUpReflectionObjects {
 val clsName = getClassNameFromType(tpe)
 val walkedTypePath = s"""- root class: "$clsName :: Nil
-serializerFor(inputObject, tpe, walkedTypePath) match {
-  case expressions.If(_, _, s: CreateNamedStruct) if 
definedByConstructorParams(tpe) => s
-  case other => CreateNamedStruct(expressions.Literal("value") :: 
other :: Nil)
-}
+
+// The input object to `ExpressionEncoder` is located at first column 
of an row.
+val inputObject = BoundReference(0, dataTypeFor(tpe),
+  nullable = !cls.isPrimitive)
--- End diff --

Yes, we can check `tpe.typeSymbol.asClass.isPrimitive` instead.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...

2018-10-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r227675880
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
 ---
@@ -58,12 +58,10 @@ object RowEncoder {
   def apply(schema: StructType): ExpressionEncoder[Row] = {
 val cls = classOf[Row]
 val inputObject = BoundReference(0, ObjectType(cls), nullable = true)
-val serializer = serializerFor(AssertNotNull(inputObject, Seq("top 
level row object")), schema)
-val deserializer = deserializerFor(schema)
+val serializer = serializerFor(inputObject, schema)
+val deserializer = deserializerFor(GetColumnByOrdinal(0, 
serializer.dataType), schema)
--- End diff --

in  `ScalaReflection`, we create `GetColumnByOrdinal` in `deserializeFor`, 
shall we follow it here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...

2018-10-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r227673675
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala 
---
@@ -434,17 +426,34 @@ object ScalaReflection extends ScalaReflection {
*  * the element type of [[Array]] or [[Seq]]: `array element class: 
"abc.xyz.MyClass"`
*  * the field of [[Product]]: `field (class: "abc.xyz.MyClass", name: 
"myField")`
*/
-  def serializerFor[T : TypeTag](inputObject: Expression): 
CreateNamedStruct = {
-val tpe = localTypeOf[T]
+  def serializerForType(tpe: `Type`,
+  cls: RuntimeClass): Expression = 
ScalaReflection.cleanUpReflectionObjects {
 val clsName = getClassNameFromType(tpe)
 val walkedTypePath = s"""- root class: "$clsName :: Nil
-serializerFor(inputObject, tpe, walkedTypePath) match {
-  case expressions.If(_, _, s: CreateNamedStruct) if 
definedByConstructorParams(tpe) => s
-  case other => CreateNamedStruct(expressions.Literal("value") :: 
other :: Nil)
-}
+
+// The input object to `ExpressionEncoder` is located at first column 
of an row.
+val inputObject = BoundReference(0, dataTypeFor(tpe),
+  nullable = !cls.isPrimitive)
+
+serializerFor(inputObject, tpe, walkedTypePath)
   }
 
-  /** Helper for extracting internal fields from a case class. */
+  /**
+   * Returns an expression for serializing the value of an input 
expression into Spark SQL
--- End diff --

do we really need to duplicate the doc in this private method?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...

2018-10-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r227672066
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala 
---
@@ -434,17 +426,34 @@ object ScalaReflection extends ScalaReflection {
*  * the element type of [[Array]] or [[Seq]]: `array element class: 
"abc.xyz.MyClass"`
*  * the field of [[Product]]: `field (class: "abc.xyz.MyClass", name: 
"myField")`
*/
-  def serializerFor[T : TypeTag](inputObject: Expression): 
CreateNamedStruct = {
-val tpe = localTypeOf[T]
+  def serializerForType(tpe: `Type`,
+  cls: RuntimeClass): Expression = 
ScalaReflection.cleanUpReflectionObjects {
 val clsName = getClassNameFromType(tpe)
 val walkedTypePath = s"""- root class: "$clsName :: Nil
-serializerFor(inputObject, tpe, walkedTypePath) match {
-  case expressions.If(_, _, s: CreateNamedStruct) if 
definedByConstructorParams(tpe) => s
-  case other => CreateNamedStruct(expressions.Literal("value") :: 
other :: Nil)
-}
+
+// The input object to `ExpressionEncoder` is located at first column 
of an row.
+val inputObject = BoundReference(0, dataTypeFor(tpe),
+  nullable = !cls.isPrimitive)
--- End diff --

we just check isPrimitive of the given `cls`, can we check `tpe` directly?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...

2018-10-22 Thread viirya
GitHub user viirya reopened a pull request:

https://github.com/apache/spark/pull/22749

[SPARK-25746][SQL] Refactoring ExpressionEncoder to get rid of flat flag

## What changes were proposed in this pull request?

This is inspired during implementing #21732. For now `ScalaReflection` 
needs to consider how `ExpressionEncoder` uses generated serializers and 
deserializers. And `ExpressionEncoder` has a weird `flat` flag. After 
discussion with @cloud-fan, it seems to be better to refactor 
`ExpressionEncoder`. It should make SPARK-24762 easier to do.

To summarize the proposed changes:

1. `serializerFor` and `deserializerFor` return expressions for 
serializing/deserializing an input expression for a given type. They are 
private and should not be called directly.
2. `serializerForType` and `deserializerForType` returns an expression for 
serializing/deserializing for an object of type T to/from Spark SQL 
representation. It assumes the input object/Spark SQL representation is located 
at ordinal 0 of a row.

So in other words, `serializerForType` and `deserializerForType` return 
expressions for atomically serializing/deserializing JVM object to/from Spark 
SQL value.

A serializer returned by `serializerForType` will serialize an object at 
`row(0)` to a corresponding Spark SQL representation, e.g. primitive type, 
array, map, struct.

A deserializer returned by `deserializerForType` will deserialize an input 
field at `row(0)` to an object with given type.

3. The construction of `ExpressionEncoder` takes a pair of serializer and 
deserializer for type `T`. It uses them to create serializer and deserializer 
for T <-> row serialization. Now `ExpressionEncoder` dones't need to remember 
if serializer is flat or not. When we need to construct new `ExpressionEncoder` 
based on existing ones, we only need to change input location in the atomic 
serializer and deserializer.

## How was this patch tested?

Existing tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/viirya/spark-1 SPARK-24762-refactor

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22749.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22749


commit e1b5deebe715479125c8878f0c90a55dc9ab3e85
Author: Liang-Chi Hsieh 
Date:   2018-07-09T03:42:04Z

Aggregator should be able to use Option of Product encoder.

commit 80506f4e98184ccd66dbaac14ec52d69c358020d
Author: Liang-Chi Hsieh 
Date:   2018-07-13T04:40:55Z

Enable top-level Option of Product encoders.

commit ed3d5cb697b10af2e2cf4c78ab521d4d0b2f3c9b
Author: Liang-Chi Hsieh 
Date:   2018-08-24T04:26:28Z

Remove topLevel parameter.

commit 9fc3f6165156051142a8366a32726badaaa16bb7
Author: Liang-Chi Hsieh 
Date:   2018-08-24T04:37:39Z

Merge remote-tracking branch 'upstream/master' into SPARK-24762

commit 5f95bd0cf1bd308c7df55c41caef7a9f19368f5d
Author: Liang-Chi Hsieh 
Date:   2018-08-24T04:42:33Z

Remove useless change.

commit a4f04055b2ba22f371663565710328791942855a
Author: Liang-Chi Hsieh 
Date:   2018-08-24T14:38:16Z

Add more tests.

commit c1f798f7e9cba0d04223eed06f1b1f547ec29dc5
Author: Liang-Chi Hsieh 
Date:   2018-08-25T01:52:01Z

Add test.

commit 80e11d289d7775863cb9c28b2c1d4364292048a4
Author: Liang-Chi Hsieh 
Date:   2018-10-06T04:06:57Z

Merge remote-tracking branch 'upstream/master' into SPARK-24762

commit 0f029b0a28700334dc6334f1ad89b3124f235a51
Author: Liang-Chi Hsieh 
Date:   2018-10-06T04:40:07Z

Improve code comments.

commit 84f3ce07f2f6a9236bd27f927fbb877e937f6917
Author: Liang-Chi Hsieh 
Date:   2018-10-15T09:55:03Z

Refactoring ExpressionEncoder.

commit 6a6fa454e22728cc2ad8e5515cd587fe0be84b26
Author: Liang-Chi Hsieh 
Date:   2018-10-17T02:07:40Z

Fix Malformed class name.

commit 25a616286075ca4f0a7d528095b387172b05c6c3
Author: Liang-Chi Hsieh 
Date:   2018-10-17T05:11:10Z

Fix error message.

commit 295ecde8103c26dda169d931f939f8a2fe641c4c
Author: Liang-Chi Hsieh 
Date:   2018-10-18T15:58:03Z

Fix test.

commit 85a91220ec4eb00bd9d5020ecf980eac0301f716
Author: Liang-Chi Hsieh 
Date:   2018-10-18T16:05:22Z

Merge remote-tracking branch 'upstream/master' into SPARK-24762-refactor

commit 35700f4a0f36fb397ac028a68011a2753c5c2c75
Author: Liang-Chi Hsieh 
Date:   2018-10-19T00:07:29Z

Fix rebase error.

commit b211ed069dceb33c45cf6caf12c19527334d4ad8
Author: Liang-Chi Hsieh 
Date:   2018-10-19T00:16:24Z

Fix unintentional style change.

commit 0c78b73e5abce2a51763c860e43aab214c8634d9
Author: Liang-Chi Hsieh 
Date:   2018-10-19T00:51:52Z

Address comments.

commit 5b9abb67907dfdb0c0c64751db3525564f832422
Author: Liang-Chi Hsieh 
Date:   2018-10-20T02:26:07Z

Address ComplexTypeMergingExpress

[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...

2018-10-22 Thread viirya
Github user viirya closed the pull request at:

https://github.com/apache/spark/pull/22749


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...

2018-10-20 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r226846925
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 ---
@@ -43,10 +44,11 @@ import org.apache.spark.util.Utils
  *to the name `value`.
  */
 object ExpressionEncoder {
+
   def apply[T : TypeTag](): ExpressionEncoder[T] = {
-// We convert the not-serializable TypeTag into StructType and 
ClassTag.
 val mirror = ScalaReflection.mirror
-val tpe = typeTag[T].in(mirror).tpe
+val tpe = ScalaReflection.localTypeOf[T]
--- End diff --

I think it should be fine, but let me revert this change first.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org