[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22749 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r228135443 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala --- @@ -212,21 +181,91 @@ object ExpressionEncoder { * A generic encoder for JVM objects that uses Catalyst Expressions for a `serializer` * and a `deserializer`. * - * @param schema The schema after converting `T` to a Spark SQL row. - * @param serializer A set of expressions, one for each top-level field that can be used to - * extract the values from a raw object into an [[InternalRow]]. - * @param deserializer An expression that will construct an object given an [[InternalRow]]. + * @param objSerializer An expression that can be used to encode a raw object to corresponding + * Spark SQL representation that can be a primitive column, array, map or a + * struct. This represents how Spark SQL generally serializes an object of + * type `T`. + * @param objDeserializer An expression that will construct an object given a Spark SQL + *representation. This represents how Spark SQL generally deserializes + *a serialized value in Spark SQL representation back to an object of + *type `T`. * @param clsTag A classtag for `T`. */ case class ExpressionEncoder[T]( -schema: StructType, -flat: Boolean, -serializer: Seq[Expression], -deserializer: Expression, +objSerializer: Expression, +objDeserializer: Expression, clsTag: ClassTag[T]) extends Encoder[T] { - if (flat) require(serializer.size == 1) + /** + * A sequence of expressions, one for each top-level field that can be used to + * extract the values from a raw object into an [[InternalRow]]: + * 1. If `serializer` encodes a raw object to a struct, strip the outer If-IsNull and get + *the `CreateNamedStruct`. + * 2. For other cases, wrap the single serializer with `CreateNamedStruct`. + */ + val serializer: Seq[NamedExpression] = { +val clsName = Utils.getSimpleName(clsTag.runtimeClass) + +if (isSerializedAsStruct) { + val nullSafeSerializer = objSerializer.transformUp { +case r: BoundReference => + // For input object of Product type, we can't encode it to row if it's null, as Spark SQL + // doesn't allow top-level row to be null, only its columns can be null. + AssertNotNull(r, Seq("top level Product or row object")) + } + nullSafeSerializer match { +case If(_: IsNull, _, s: CreateNamedStruct) => s +case s: CreateNamedStruct => s --- End diff -- Ok. Sounds good to me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r228134985 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala --- @@ -212,21 +181,91 @@ object ExpressionEncoder { * A generic encoder for JVM objects that uses Catalyst Expressions for a `serializer` * and a `deserializer`. * - * @param schema The schema after converting `T` to a Spark SQL row. - * @param serializer A set of expressions, one for each top-level field that can be used to - * extract the values from a raw object into an [[InternalRow]]. - * @param deserializer An expression that will construct an object given an [[InternalRow]]. + * @param objSerializer An expression that can be used to encode a raw object to corresponding + * Spark SQL representation that can be a primitive column, array, map or a + * struct. This represents how Spark SQL generally serializes an object of + * type `T`. + * @param objDeserializer An expression that will construct an object given a Spark SQL + *representation. This represents how Spark SQL generally deserializes + *a serialized value in Spark SQL representation back to an object of + *type `T`. * @param clsTag A classtag for `T`. */ case class ExpressionEncoder[T]( -schema: StructType, -flat: Boolean, -serializer: Seq[Expression], -deserializer: Expression, +objSerializer: Expression, +objDeserializer: Expression, clsTag: ClassTag[T]) extends Encoder[T] { - if (flat) require(serializer.size == 1) + /** + * A sequence of expressions, one for each top-level field that can be used to + * extract the values from a raw object into an [[InternalRow]]: + * 1. If `serializer` encodes a raw object to a struct, strip the outer If-IsNull and get + *the `CreateNamedStruct`. + * 2. For other cases, wrap the single serializer with `CreateNamedStruct`. + */ + val serializer: Seq[NamedExpression] = { +val clsName = Utils.getSimpleName(clsTag.runtimeClass) + +if (isSerializedAsStruct) { + val nullSafeSerializer = objSerializer.transformUp { +case r: BoundReference => + // For input object of Product type, we can't encode it to row if it's null, as Spark SQL + // doesn't allow top-level row to be null, only its columns can be null. + AssertNotNull(r, Seq("top level Product or row object")) + } + nullSafeSerializer match { +case If(_: IsNull, _, s: CreateNamedStruct) => s +case s: CreateNamedStruct => s --- End diff -- this is minor, we can update it in another PR. We don't need to wait for another jenkins QA round. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r228133724 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala --- @@ -212,21 +181,91 @@ object ExpressionEncoder { * A generic encoder for JVM objects that uses Catalyst Expressions for a `serializer` * and a `deserializer`. * - * @param schema The schema after converting `T` to a Spark SQL row. - * @param serializer A set of expressions, one for each top-level field that can be used to - * extract the values from a raw object into an [[InternalRow]]. - * @param deserializer An expression that will construct an object given an [[InternalRow]]. + * @param objSerializer An expression that can be used to encode a raw object to corresponding + * Spark SQL representation that can be a primitive column, array, map or a + * struct. This represents how Spark SQL generally serializes an object of + * type `T`. + * @param objDeserializer An expression that will construct an object given a Spark SQL + *representation. This represents how Spark SQL generally deserializes + *a serialized value in Spark SQL representation back to an object of + *type `T`. * @param clsTag A classtag for `T`. */ case class ExpressionEncoder[T]( -schema: StructType, -flat: Boolean, -serializer: Seq[Expression], -deserializer: Expression, +objSerializer: Expression, +objDeserializer: Expression, clsTag: ClassTag[T]) extends Encoder[T] { - if (flat) require(serializer.size == 1) + /** + * A sequence of expressions, one for each top-level field that can be used to + * extract the values from a raw object into an [[InternalRow]]: + * 1. If `serializer` encodes a raw object to a struct, strip the outer If-IsNull and get + *the `CreateNamedStruct`. + * 2. For other cases, wrap the single serializer with `CreateNamedStruct`. + */ + val serializer: Seq[NamedExpression] = { +val clsName = Utils.getSimpleName(clsTag.runtimeClass) + +if (isSerializedAsStruct) { + val nullSafeSerializer = objSerializer.transformUp { +case r: BoundReference => + // For input object of Product type, we can't encode it to row if it's null, as Spark SQL + // doesn't allow top-level row to be null, only its columns can be null. + AssertNotNull(r, Seq("top level Product or row object")) + } + nullSafeSerializer match { +case If(_: IsNull, _, s: CreateNamedStruct) => s +case s: CreateNamedStruct => s --- End diff -- oh, good catch! I think this is redundant pattern. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r228132840 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala --- @@ -212,21 +181,91 @@ object ExpressionEncoder { * A generic encoder for JVM objects that uses Catalyst Expressions for a `serializer` * and a `deserializer`. * - * @param schema The schema after converting `T` to a Spark SQL row. - * @param serializer A set of expressions, one for each top-level field that can be used to - * extract the values from a raw object into an [[InternalRow]]. - * @param deserializer An expression that will construct an object given an [[InternalRow]]. + * @param objSerializer An expression that can be used to encode a raw object to corresponding + * Spark SQL representation that can be a primitive column, array, map or a + * struct. This represents how Spark SQL generally serializes an object of + * type `T`. + * @param objDeserializer An expression that will construct an object given a Spark SQL + *representation. This represents how Spark SQL generally deserializes + *a serialized value in Spark SQL representation back to an object of + *type `T`. * @param clsTag A classtag for `T`. */ case class ExpressionEncoder[T]( -schema: StructType, -flat: Boolean, -serializer: Seq[Expression], -deserializer: Expression, +objSerializer: Expression, +objDeserializer: Expression, clsTag: ClassTag[T]) extends Encoder[T] { - if (flat) require(serializer.size == 1) + /** + * A sequence of expressions, one for each top-level field that can be used to + * extract the values from a raw object into an [[InternalRow]]: + * 1. If `serializer` encodes a raw object to a struct, strip the outer If-IsNull and get + *the `CreateNamedStruct`. + * 2. For other cases, wrap the single serializer with `CreateNamedStruct`. + */ + val serializer: Seq[NamedExpression] = { +val clsName = Utils.getSimpleName(clsTag.runtimeClass) + +if (isSerializedAsStruct) { + val nullSafeSerializer = objSerializer.transformUp { +case r: BoundReference => + // For input object of Product type, we can't encode it to row if it's null, as Spark SQL + // doesn't allow top-level row to be null, only its columns can be null. + AssertNotNull(r, Seq("top level Product or row object")) + } + nullSafeSerializer match { +case If(_: IsNull, _, s: CreateNamedStruct) => s +case s: CreateNamedStruct => s --- End diff -- when will we hit this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r227996228 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala --- @@ -280,59 +281,59 @@ class ScalaReflectionSuite extends SparkFunSuite { test("serialize and deserialize arbitrary sequence types") { import scala.collection.immutable.Queue -val queueSerializer = serializerFor[Queue[Int]](BoundReference( - 0, ObjectType(classOf[Queue[Int]]), nullable = false)) -assert(queueSerializer.dataType.head.dataType == +val queueSerializer = serializerForType(ScalaReflection.localTypeOf[Queue[Int]]) +assert(queueSerializer.dataType == ArrayType(IntegerType, containsNull = false)) val queueDeserializer = deserializerFor[Queue[Int]] assert(queueDeserializer.dataType == ObjectType(classOf[Queue[_]])) import scala.collection.mutable.ArrayBuffer -val arrayBufferSerializer = serializerFor[ArrayBuffer[Int]](BoundReference( - 0, ObjectType(classOf[ArrayBuffer[Int]]), nullable = false)) -assert(arrayBufferSerializer.dataType.head.dataType == +val arrayBufferSerializer = serializerForType(ScalaReflection.localTypeOf[ArrayBuffer[Int]]) +assert(arrayBufferSerializer.dataType == ArrayType(IntegerType, containsNull = false)) val arrayBufferDeserializer = deserializerFor[ArrayBuffer[Int]] assert(arrayBufferDeserializer.dataType == ObjectType(classOf[ArrayBuffer[_]])) } test("serialize and deserialize arbitrary map types") { -val mapSerializer = serializerFor[Map[Int, Int]](BoundReference( - 0, ObjectType(classOf[Map[Int, Int]]), nullable = false)) -assert(mapSerializer.dataType.head.dataType == +val mapSerializer = serializerForType(ScalaReflection.localTypeOf[Map[Int, Int]]) +assert(mapSerializer.dataType == MapType(IntegerType, IntegerType, valueContainsNull = false)) val mapDeserializer = deserializerFor[Map[Int, Int]] assert(mapDeserializer.dataType == ObjectType(classOf[Map[_, _]])) import scala.collection.immutable.HashMap -val hashMapSerializer = serializerFor[HashMap[Int, Int]](BoundReference( - 0, ObjectType(classOf[HashMap[Int, Int]]), nullable = false)) -assert(hashMapSerializer.dataType.head.dataType == +val hashMapSerializer = serializerForType(ScalaReflection.localTypeOf[HashMap[Int, Int]]) +assert(hashMapSerializer.dataType == MapType(IntegerType, IntegerType, valueContainsNull = false)) val hashMapDeserializer = deserializerFor[HashMap[Int, Int]] assert(hashMapDeserializer.dataType == ObjectType(classOf[HashMap[_, _]])) import scala.collection.mutable.{LinkedHashMap => LHMap} -val linkedHashMapSerializer = serializerFor[LHMap[Long, String]](BoundReference( - 0, ObjectType(classOf[LHMap[Long, String]]), nullable = false)) -assert(linkedHashMapSerializer.dataType.head.dataType == +val linkedHashMapSerializer = serializerForType( +ScalaReflection.localTypeOf[LHMap[Long, String]]) +assert(linkedHashMapSerializer.dataType == MapType(LongType, StringType, valueContainsNull = true)) val linkedHashMapDeserializer = deserializerFor[LHMap[Long, String]] assert(linkedHashMapDeserializer.dataType == ObjectType(classOf[LHMap[_, _]])) } test("SPARK-22442: Generate correct field names for special characters") { -val serializer = serializerFor[SpecialCharAsFieldData](BoundReference( - 0, ObjectType(classOf[SpecialCharAsFieldData]), nullable = false)) +val serializer = serializerForType(ScalaReflection.localTypeOf[SpecialCharAsFieldData]) --- End diff -- Ok. I see. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r227867735 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala --- @@ -280,59 +281,59 @@ class ScalaReflectionSuite extends SparkFunSuite { test("serialize and deserialize arbitrary sequence types") { import scala.collection.immutable.Queue -val queueSerializer = serializerFor[Queue[Int]](BoundReference( - 0, ObjectType(classOf[Queue[Int]]), nullable = false)) -assert(queueSerializer.dataType.head.dataType == +val queueSerializer = serializerForType(ScalaReflection.localTypeOf[Queue[Int]]) +assert(queueSerializer.dataType == ArrayType(IntegerType, containsNull = false)) val queueDeserializer = deserializerFor[Queue[Int]] assert(queueDeserializer.dataType == ObjectType(classOf[Queue[_]])) import scala.collection.mutable.ArrayBuffer -val arrayBufferSerializer = serializerFor[ArrayBuffer[Int]](BoundReference( - 0, ObjectType(classOf[ArrayBuffer[Int]]), nullable = false)) -assert(arrayBufferSerializer.dataType.head.dataType == +val arrayBufferSerializer = serializerForType(ScalaReflection.localTypeOf[ArrayBuffer[Int]]) +assert(arrayBufferSerializer.dataType == ArrayType(IntegerType, containsNull = false)) val arrayBufferDeserializer = deserializerFor[ArrayBuffer[Int]] assert(arrayBufferDeserializer.dataType == ObjectType(classOf[ArrayBuffer[_]])) } test("serialize and deserialize arbitrary map types") { -val mapSerializer = serializerFor[Map[Int, Int]](BoundReference( - 0, ObjectType(classOf[Map[Int, Int]]), nullable = false)) -assert(mapSerializer.dataType.head.dataType == +val mapSerializer = serializerForType(ScalaReflection.localTypeOf[Map[Int, Int]]) +assert(mapSerializer.dataType == MapType(IntegerType, IntegerType, valueContainsNull = false)) val mapDeserializer = deserializerFor[Map[Int, Int]] assert(mapDeserializer.dataType == ObjectType(classOf[Map[_, _]])) import scala.collection.immutable.HashMap -val hashMapSerializer = serializerFor[HashMap[Int, Int]](BoundReference( - 0, ObjectType(classOf[HashMap[Int, Int]]), nullable = false)) -assert(hashMapSerializer.dataType.head.dataType == +val hashMapSerializer = serializerForType(ScalaReflection.localTypeOf[HashMap[Int, Int]]) +assert(hashMapSerializer.dataType == MapType(IntegerType, IntegerType, valueContainsNull = false)) val hashMapDeserializer = deserializerFor[HashMap[Int, Int]] assert(hashMapDeserializer.dataType == ObjectType(classOf[HashMap[_, _]])) import scala.collection.mutable.{LinkedHashMap => LHMap} -val linkedHashMapSerializer = serializerFor[LHMap[Long, String]](BoundReference( - 0, ObjectType(classOf[LHMap[Long, String]]), nullable = false)) -assert(linkedHashMapSerializer.dataType.head.dataType == +val linkedHashMapSerializer = serializerForType( +ScalaReflection.localTypeOf[LHMap[Long, String]]) +assert(linkedHashMapSerializer.dataType == MapType(LongType, StringType, valueContainsNull = true)) val linkedHashMapDeserializer = deserializerFor[LHMap[Long, String]] assert(linkedHashMapDeserializer.dataType == ObjectType(classOf[LHMap[_, _]])) } test("SPARK-22442: Generate correct field names for special characters") { -val serializer = serializerFor[SpecialCharAsFieldData](BoundReference( - 0, ObjectType(classOf[SpecialCharAsFieldData]), nullable = false)) +val serializer = serializerForType(ScalaReflection.localTypeOf[SpecialCharAsFieldData]) --- End diff -- like `deserializerFor` in this suite, let's also create a `serializerFor` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r227797378 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala --- @@ -280,59 +281,59 @@ class ScalaReflectionSuite extends SparkFunSuite { test("serialize and deserialize arbitrary sequence types") { import scala.collection.immutable.Queue -val queueSerializer = serializerFor[Queue[Int]](BoundReference( - 0, ObjectType(classOf[Queue[Int]]), nullable = false)) -assert(queueSerializer.dataType.head.dataType == +val queueSerializer = serializerForType(ScalaReflection.localTypeOf[Queue[Int]]) +assert(queueSerializer.dataType == ArrayType(IntegerType, containsNull = false)) val queueDeserializer = deserializerFor[Queue[Int]] assert(queueDeserializer.dataType == ObjectType(classOf[Queue[_]])) import scala.collection.mutable.ArrayBuffer -val arrayBufferSerializer = serializerFor[ArrayBuffer[Int]](BoundReference( - 0, ObjectType(classOf[ArrayBuffer[Int]]), nullable = false)) -assert(arrayBufferSerializer.dataType.head.dataType == +val arrayBufferSerializer = serializerForType(ScalaReflection.localTypeOf[ArrayBuffer[Int]]) +assert(arrayBufferSerializer.dataType == ArrayType(IntegerType, containsNull = false)) val arrayBufferDeserializer = deserializerFor[ArrayBuffer[Int]] assert(arrayBufferDeserializer.dataType == ObjectType(classOf[ArrayBuffer[_]])) } test("serialize and deserialize arbitrary map types") { -val mapSerializer = serializerFor[Map[Int, Int]](BoundReference( - 0, ObjectType(classOf[Map[Int, Int]]), nullable = false)) -assert(mapSerializer.dataType.head.dataType == +val mapSerializer = serializerForType(ScalaReflection.localTypeOf[Map[Int, Int]]) +assert(mapSerializer.dataType == MapType(IntegerType, IntegerType, valueContainsNull = false)) val mapDeserializer = deserializerFor[Map[Int, Int]] assert(mapDeserializer.dataType == ObjectType(classOf[Map[_, _]])) import scala.collection.immutable.HashMap -val hashMapSerializer = serializerFor[HashMap[Int, Int]](BoundReference( - 0, ObjectType(classOf[HashMap[Int, Int]]), nullable = false)) -assert(hashMapSerializer.dataType.head.dataType == +val hashMapSerializer = serializerForType(ScalaReflection.localTypeOf[HashMap[Int, Int]]) +assert(hashMapSerializer.dataType == MapType(IntegerType, IntegerType, valueContainsNull = false)) val hashMapDeserializer = deserializerFor[HashMap[Int, Int]] assert(hashMapDeserializer.dataType == ObjectType(classOf[HashMap[_, _]])) import scala.collection.mutable.{LinkedHashMap => LHMap} -val linkedHashMapSerializer = serializerFor[LHMap[Long, String]](BoundReference( - 0, ObjectType(classOf[LHMap[Long, String]]), nullable = false)) -assert(linkedHashMapSerializer.dataType.head.dataType == +val linkedHashMapSerializer = serializerForType( +ScalaReflection.localTypeOf[LHMap[Long, String]]) +assert(linkedHashMapSerializer.dataType == MapType(LongType, StringType, valueContainsNull = true)) val linkedHashMapDeserializer = deserializerFor[LHMap[Long, String]] assert(linkedHashMapDeserializer.dataType == ObjectType(classOf[LHMap[_, _]])) } test("SPARK-22442: Generate correct field names for special characters") { -val serializer = serializerFor[SpecialCharAsFieldData](BoundReference( - 0, ObjectType(classOf[SpecialCharAsFieldData]), nullable = false)) +val serializer = serializerForType(ScalaReflection.localTypeOf[SpecialCharAsFieldData]) --- End diff -- Do you mean to create a method `serializerFor` in this suite? Or replace `serializerForType` with `ScalaReflection.serializerFor`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r227796616 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala --- @@ -280,59 +281,59 @@ class ScalaReflectionSuite extends SparkFunSuite { test("serialize and deserialize arbitrary sequence types") { import scala.collection.immutable.Queue -val queueSerializer = serializerFor[Queue[Int]](BoundReference( - 0, ObjectType(classOf[Queue[Int]]), nullable = false)) -assert(queueSerializer.dataType.head.dataType == +val queueSerializer = serializerForType(ScalaReflection.localTypeOf[Queue[Int]]) +assert(queueSerializer.dataType == ArrayType(IntegerType, containsNull = false)) val queueDeserializer = deserializerFor[Queue[Int]] assert(queueDeserializer.dataType == ObjectType(classOf[Queue[_]])) import scala.collection.mutable.ArrayBuffer -val arrayBufferSerializer = serializerFor[ArrayBuffer[Int]](BoundReference( - 0, ObjectType(classOf[ArrayBuffer[Int]]), nullable = false)) -assert(arrayBufferSerializer.dataType.head.dataType == +val arrayBufferSerializer = serializerForType(ScalaReflection.localTypeOf[ArrayBuffer[Int]]) +assert(arrayBufferSerializer.dataType == ArrayType(IntegerType, containsNull = false)) val arrayBufferDeserializer = deserializerFor[ArrayBuffer[Int]] assert(arrayBufferDeserializer.dataType == ObjectType(classOf[ArrayBuffer[_]])) } test("serialize and deserialize arbitrary map types") { -val mapSerializer = serializerFor[Map[Int, Int]](BoundReference( - 0, ObjectType(classOf[Map[Int, Int]]), nullable = false)) -assert(mapSerializer.dataType.head.dataType == +val mapSerializer = serializerForType(ScalaReflection.localTypeOf[Map[Int, Int]]) +assert(mapSerializer.dataType == MapType(IntegerType, IntegerType, valueContainsNull = false)) val mapDeserializer = deserializerFor[Map[Int, Int]] assert(mapDeserializer.dataType == ObjectType(classOf[Map[_, _]])) import scala.collection.immutable.HashMap -val hashMapSerializer = serializerFor[HashMap[Int, Int]](BoundReference( - 0, ObjectType(classOf[HashMap[Int, Int]]), nullable = false)) -assert(hashMapSerializer.dataType.head.dataType == +val hashMapSerializer = serializerForType(ScalaReflection.localTypeOf[HashMap[Int, Int]]) +assert(hashMapSerializer.dataType == MapType(IntegerType, IntegerType, valueContainsNull = false)) val hashMapDeserializer = deserializerFor[HashMap[Int, Int]] assert(hashMapDeserializer.dataType == ObjectType(classOf[HashMap[_, _]])) import scala.collection.mutable.{LinkedHashMap => LHMap} -val linkedHashMapSerializer = serializerFor[LHMap[Long, String]](BoundReference( - 0, ObjectType(classOf[LHMap[Long, String]]), nullable = false)) -assert(linkedHashMapSerializer.dataType.head.dataType == +val linkedHashMapSerializer = serializerForType( +ScalaReflection.localTypeOf[LHMap[Long, String]]) +assert(linkedHashMapSerializer.dataType == MapType(LongType, StringType, valueContainsNull = true)) val linkedHashMapDeserializer = deserializerFor[LHMap[Long, String]] assert(linkedHashMapDeserializer.dataType == ObjectType(classOf[LHMap[_, _]])) } test("SPARK-22442: Generate correct field names for special characters") { -val serializer = serializerFor[SpecialCharAsFieldData](BoundReference( - 0, ObjectType(classOf[SpecialCharAsFieldData]), nullable = false)) +val serializer = serializerForType(ScalaReflection.localTypeOf[SpecialCharAsFieldData]) --- End diff -- Ok. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r227783724 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala --- @@ -280,59 +281,59 @@ class ScalaReflectionSuite extends SparkFunSuite { test("serialize and deserialize arbitrary sequence types") { import scala.collection.immutable.Queue -val queueSerializer = serializerFor[Queue[Int]](BoundReference( - 0, ObjectType(classOf[Queue[Int]]), nullable = false)) -assert(queueSerializer.dataType.head.dataType == +val queueSerializer = serializerForType(ScalaReflection.localTypeOf[Queue[Int]]) +assert(queueSerializer.dataType == ArrayType(IntegerType, containsNull = false)) val queueDeserializer = deserializerFor[Queue[Int]] assert(queueDeserializer.dataType == ObjectType(classOf[Queue[_]])) import scala.collection.mutable.ArrayBuffer -val arrayBufferSerializer = serializerFor[ArrayBuffer[Int]](BoundReference( - 0, ObjectType(classOf[ArrayBuffer[Int]]), nullable = false)) -assert(arrayBufferSerializer.dataType.head.dataType == +val arrayBufferSerializer = serializerForType(ScalaReflection.localTypeOf[ArrayBuffer[Int]]) +assert(arrayBufferSerializer.dataType == ArrayType(IntegerType, containsNull = false)) val arrayBufferDeserializer = deserializerFor[ArrayBuffer[Int]] assert(arrayBufferDeserializer.dataType == ObjectType(classOf[ArrayBuffer[_]])) } test("serialize and deserialize arbitrary map types") { -val mapSerializer = serializerFor[Map[Int, Int]](BoundReference( - 0, ObjectType(classOf[Map[Int, Int]]), nullable = false)) -assert(mapSerializer.dataType.head.dataType == +val mapSerializer = serializerForType(ScalaReflection.localTypeOf[Map[Int, Int]]) +assert(mapSerializer.dataType == MapType(IntegerType, IntegerType, valueContainsNull = false)) val mapDeserializer = deserializerFor[Map[Int, Int]] assert(mapDeserializer.dataType == ObjectType(classOf[Map[_, _]])) import scala.collection.immutable.HashMap -val hashMapSerializer = serializerFor[HashMap[Int, Int]](BoundReference( - 0, ObjectType(classOf[HashMap[Int, Int]]), nullable = false)) -assert(hashMapSerializer.dataType.head.dataType == +val hashMapSerializer = serializerForType(ScalaReflection.localTypeOf[HashMap[Int, Int]]) +assert(hashMapSerializer.dataType == MapType(IntegerType, IntegerType, valueContainsNull = false)) val hashMapDeserializer = deserializerFor[HashMap[Int, Int]] assert(hashMapDeserializer.dataType == ObjectType(classOf[HashMap[_, _]])) import scala.collection.mutable.{LinkedHashMap => LHMap} -val linkedHashMapSerializer = serializerFor[LHMap[Long, String]](BoundReference( - 0, ObjectType(classOf[LHMap[Long, String]]), nullable = false)) -assert(linkedHashMapSerializer.dataType.head.dataType == +val linkedHashMapSerializer = serializerForType( +ScalaReflection.localTypeOf[LHMap[Long, String]]) +assert(linkedHashMapSerializer.dataType == MapType(LongType, StringType, valueContainsNull = true)) val linkedHashMapDeserializer = deserializerFor[LHMap[Long, String]] assert(linkedHashMapDeserializer.dataType == ObjectType(classOf[LHMap[_, _]])) } test("SPARK-22442: Generate correct field names for special characters") { -val serializer = serializerFor[SpecialCharAsFieldData](BoundReference( - 0, ObjectType(classOf[SpecialCharAsFieldData]), nullable = false)) +val serializer = serializerForType(ScalaReflection.localTypeOf[SpecialCharAsFieldData]) --- End diff -- can we replace all the `serializerForType` with `serializerFor` in this suite? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r227745900 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala --- @@ -351,11 +347,15 @@ class ScalaReflectionSuite extends SparkFunSuite { test("SPARK-23835: add null check to non-nullable types in Tuples") { def numberOfCheckedArguments(deserializer: Expression): Int = { - assert(deserializer.isInstanceOf[NewInstance]) - deserializer.asInstanceOf[NewInstance].arguments.count(_.isInstanceOf[AssertNotNull]) + val newInstance = deserializer.collect { case n: NewInstance => n}.head + newInstance.arguments.count(_.isInstanceOf[AssertNotNull]) } -assert(numberOfCheckedArguments(deserializerFor[(Double, Double)]) == 2) -assert(numberOfCheckedArguments(deserializerFor[(java.lang.Double, Int)]) == 1) -assert(numberOfCheckedArguments(deserializerFor[(java.lang.Integer, java.lang.Integer)]) == 0) +assert(numberOfCheckedArguments( + deserializerForType(ScalaReflection.localTypeOf[(Double, Double)])) == 2) --- End diff -- Sounds good. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r227742062 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala --- @@ -351,11 +347,15 @@ class ScalaReflectionSuite extends SparkFunSuite { test("SPARK-23835: add null check to non-nullable types in Tuples") { def numberOfCheckedArguments(deserializer: Expression): Int = { - assert(deserializer.isInstanceOf[NewInstance]) - deserializer.asInstanceOf[NewInstance].arguments.count(_.isInstanceOf[AssertNotNull]) + val newInstance = deserializer.collect { case n: NewInstance => n}.head + newInstance.arguments.count(_.isInstanceOf[AssertNotNull]) } -assert(numberOfCheckedArguments(deserializerFor[(Double, Double)]) == 2) -assert(numberOfCheckedArguments(deserializerFor[(java.lang.Double, Int)]) == 1) -assert(numberOfCheckedArguments(deserializerFor[(java.lang.Integer, java.lang.Integer)]) == 0) +assert(numberOfCheckedArguments( + deserializerForType(ScalaReflection.localTypeOf[(Double, Double)])) == 2) --- End diff -- shall we create a `deserializerFor` method in this test suite to save some code diff? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r227739775 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala --- @@ -212,21 +181,90 @@ object ExpressionEncoder { * A generic encoder for JVM objects that uses Catalyst Expressions for a `serializer` * and a `deserializer`. * - * @param schema The schema after converting `T` to a Spark SQL row. - * @param serializer A set of expressions, one for each top-level field that can be used to - * extract the values from a raw object into an [[InternalRow]]. - * @param deserializer An expression that will construct an object given an [[InternalRow]]. + * @param objSerializer An expression that can be used to encode a raw object to corresponding + * Spark SQL representation that can be a primitive column, array, map or a + * struct. This represents how Spark SQL generally serializes an object of + * type `T`. + * @param objDeserializer An expression that will construct an object given a Spark SQL + *representation. This represents how Spark SQL generally deserializes + *a serialized value in Spark SQL representation back to an object of + *type `T`. * @param clsTag A classtag for `T`. */ case class ExpressionEncoder[T]( -schema: StructType, -flat: Boolean, -serializer: Seq[Expression], -deserializer: Expression, +objSerializer: Expression, +objDeserializer: Expression, clsTag: ClassTag[T]) extends Encoder[T] { - if (flat) require(serializer.size == 1) + /** + * A sequence of expressions, one for each top-level field that can be used to + * extract the values from a raw object into an [[InternalRow]]: + * 1. If `serializer` encodes a raw object to a struct, we directly use the `serializer`. + * 2. For other cases, we create a struct to wrap the `serializer`. --- End diff -- Let's make these 2 comments more precise ``` 1. If `serializer` encodes a raw object to a struct, strip the outer if-IsNull and get the CreateNamedStruct 2. For other cases, wrap the single serializer with CreateNamedStruct ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r227695574 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala --- @@ -434,17 +426,34 @@ object ScalaReflection extends ScalaReflection { * * the element type of [[Array]] or [[Seq]]: `array element class: "abc.xyz.MyClass"` * * the field of [[Product]]: `field (class: "abc.xyz.MyClass", name: "myField")` */ - def serializerFor[T : TypeTag](inputObject: Expression): CreateNamedStruct = { -val tpe = localTypeOf[T] + def serializerForType(tpe: `Type`, + cls: RuntimeClass): Expression = ScalaReflection.cleanUpReflectionObjects { val clsName = getClassNameFromType(tpe) val walkedTypePath = s"""- root class: "$clsName :: Nil -serializerFor(inputObject, tpe, walkedTypePath) match { - case expressions.If(_, _, s: CreateNamedStruct) if definedByConstructorParams(tpe) => s - case other => CreateNamedStruct(expressions.Literal("value") :: other :: Nil) -} + +// The input object to `ExpressionEncoder` is located at first column of an row. +val inputObject = BoundReference(0, dataTypeFor(tpe), + nullable = !cls.isPrimitive) + +serializerFor(inputObject, tpe, walkedTypePath) } - /** Helper for extracting internal fields from a case class. */ + /** + * Returns an expression for serializing the value of an input expression into Spark SQL --- End diff -- I did simplify a lot of it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r227682823 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala --- @@ -58,12 +58,10 @@ object RowEncoder { def apply(schema: StructType): ExpressionEncoder[Row] = { val cls = classOf[Row] val inputObject = BoundReference(0, ObjectType(cls), nullable = true) -val serializer = serializerFor(AssertNotNull(inputObject, Seq("top level row object")), schema) -val deserializer = deserializerFor(schema) +val serializer = serializerFor(inputObject, schema) +val deserializer = deserializerFor(GetColumnByOrdinal(0, serializer.dataType), schema) --- End diff -- ah i see, then let's leave it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r227682672 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala --- @@ -434,17 +426,34 @@ object ScalaReflection extends ScalaReflection { * * the element type of [[Array]] or [[Seq]]: `array element class: "abc.xyz.MyClass"` * * the field of [[Product]]: `field (class: "abc.xyz.MyClass", name: "myField")` */ - def serializerFor[T : TypeTag](inputObject: Expression): CreateNamedStruct = { -val tpe = localTypeOf[T] + def serializerForType(tpe: `Type`, + cls: RuntimeClass): Expression = ScalaReflection.cleanUpReflectionObjects { val clsName = getClassNameFromType(tpe) val walkedTypePath = s"""- root class: "$clsName :: Nil -serializerFor(inputObject, tpe, walkedTypePath) match { - case expressions.If(_, _, s: CreateNamedStruct) if definedByConstructorParams(tpe) => s - case other => CreateNamedStruct(expressions.Literal("value") :: other :: Nil) -} + +// The input object to `ExpressionEncoder` is located at first column of an row. +val inputObject = BoundReference(0, dataTypeFor(tpe), + nullable = !cls.isPrimitive) --- End diff -- good, then we don't need `cls` as a parameter. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r227681844 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -1087,7 +1087,7 @@ class Dataset[T] private[sql]( // Note that we do this before joining them, to enable the join operator to return null for one // side, in cases like outer-join. val left = { - val combined = if (this.exprEnc.flat) { + val combined = if (!this.exprEnc.objSerializer.dataType.isInstanceOf[StructType]) { --- End diff -- shall we create a method in `ExpressionEncoder` for this check? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r227678714 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala --- @@ -58,12 +58,10 @@ object RowEncoder { def apply(schema: StructType): ExpressionEncoder[Row] = { val cls = classOf[Row] val inputObject = BoundReference(0, ObjectType(cls), nullable = true) -val serializer = serializerFor(AssertNotNull(inputObject, Seq("top level row object")), schema) -val deserializer = deserializerFor(schema) +val serializer = serializerFor(inputObject, schema) +val deserializer = deserializerFor(GetColumnByOrdinal(0, serializer.dataType), schema) --- End diff -- Ah, we need to access `serializer.dataType` here. So if we want to create `GetColumnByOrdinal` in `deserializeFor`, we need to pass this data type too. What do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r227677176 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala --- @@ -58,12 +58,10 @@ object RowEncoder { def apply(schema: StructType): ExpressionEncoder[Row] = { val cls = classOf[Row] val inputObject = BoundReference(0, ObjectType(cls), nullable = true) -val serializer = serializerFor(AssertNotNull(inputObject, Seq("top level row object")), schema) -val deserializer = deserializerFor(schema) +val serializer = serializerFor(inputObject, schema) +val deserializer = deserializerFor(GetColumnByOrdinal(0, serializer.dataType), schema) --- End diff -- Ok. Sounds better. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r227675871 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala --- @@ -434,17 +426,34 @@ object ScalaReflection extends ScalaReflection { * * the element type of [[Array]] or [[Seq]]: `array element class: "abc.xyz.MyClass"` * * the field of [[Product]]: `field (class: "abc.xyz.MyClass", name: "myField")` */ - def serializerFor[T : TypeTag](inputObject: Expression): CreateNamedStruct = { -val tpe = localTypeOf[T] + def serializerForType(tpe: `Type`, + cls: RuntimeClass): Expression = ScalaReflection.cleanUpReflectionObjects { val clsName = getClassNameFromType(tpe) val walkedTypePath = s"""- root class: "$clsName :: Nil -serializerFor(inputObject, tpe, walkedTypePath) match { - case expressions.If(_, _, s: CreateNamedStruct) if definedByConstructorParams(tpe) => s - case other => CreateNamedStruct(expressions.Literal("value") :: other :: Nil) -} + +// The input object to `ExpressionEncoder` is located at first column of an row. +val inputObject = BoundReference(0, dataTypeFor(tpe), + nullable = !cls.isPrimitive) --- End diff -- Yes, we can check `tpe.typeSymbol.asClass.isPrimitive` instead. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r227675880 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala --- @@ -58,12 +58,10 @@ object RowEncoder { def apply(schema: StructType): ExpressionEncoder[Row] = { val cls = classOf[Row] val inputObject = BoundReference(0, ObjectType(cls), nullable = true) -val serializer = serializerFor(AssertNotNull(inputObject, Seq("top level row object")), schema) -val deserializer = deserializerFor(schema) +val serializer = serializerFor(inputObject, schema) +val deserializer = deserializerFor(GetColumnByOrdinal(0, serializer.dataType), schema) --- End diff -- in `ScalaReflection`, we create `GetColumnByOrdinal` in `deserializeFor`, shall we follow it here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r227673675 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala --- @@ -434,17 +426,34 @@ object ScalaReflection extends ScalaReflection { * * the element type of [[Array]] or [[Seq]]: `array element class: "abc.xyz.MyClass"` * * the field of [[Product]]: `field (class: "abc.xyz.MyClass", name: "myField")` */ - def serializerFor[T : TypeTag](inputObject: Expression): CreateNamedStruct = { -val tpe = localTypeOf[T] + def serializerForType(tpe: `Type`, + cls: RuntimeClass): Expression = ScalaReflection.cleanUpReflectionObjects { val clsName = getClassNameFromType(tpe) val walkedTypePath = s"""- root class: "$clsName :: Nil -serializerFor(inputObject, tpe, walkedTypePath) match { - case expressions.If(_, _, s: CreateNamedStruct) if definedByConstructorParams(tpe) => s - case other => CreateNamedStruct(expressions.Literal("value") :: other :: Nil) -} + +// The input object to `ExpressionEncoder` is located at first column of an row. +val inputObject = BoundReference(0, dataTypeFor(tpe), + nullable = !cls.isPrimitive) + +serializerFor(inputObject, tpe, walkedTypePath) } - /** Helper for extracting internal fields from a case class. */ + /** + * Returns an expression for serializing the value of an input expression into Spark SQL --- End diff -- do we really need to duplicate the doc in this private methodï¼ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r227672066 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala --- @@ -434,17 +426,34 @@ object ScalaReflection extends ScalaReflection { * * the element type of [[Array]] or [[Seq]]: `array element class: "abc.xyz.MyClass"` * * the field of [[Product]]: `field (class: "abc.xyz.MyClass", name: "myField")` */ - def serializerFor[T : TypeTag](inputObject: Expression): CreateNamedStruct = { -val tpe = localTypeOf[T] + def serializerForType(tpe: `Type`, + cls: RuntimeClass): Expression = ScalaReflection.cleanUpReflectionObjects { val clsName = getClassNameFromType(tpe) val walkedTypePath = s"""- root class: "$clsName :: Nil -serializerFor(inputObject, tpe, walkedTypePath) match { - case expressions.If(_, _, s: CreateNamedStruct) if definedByConstructorParams(tpe) => s - case other => CreateNamedStruct(expressions.Literal("value") :: other :: Nil) -} + +// The input object to `ExpressionEncoder` is located at first column of an row. +val inputObject = BoundReference(0, dataTypeFor(tpe), + nullable = !cls.isPrimitive) --- End diff -- we just check isPrimitive of the given `cls`, can we check `tpe` directly? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...
GitHub user viirya reopened a pull request: https://github.com/apache/spark/pull/22749 [SPARK-25746][SQL] Refactoring ExpressionEncoder to get rid of flat flag ## What changes were proposed in this pull request? This is inspired during implementing #21732. For now `ScalaReflection` needs to consider how `ExpressionEncoder` uses generated serializers and deserializers. And `ExpressionEncoder` has a weird `flat` flag. After discussion with @cloud-fan, it seems to be better to refactor `ExpressionEncoder`. It should make SPARK-24762 easier to do. To summarize the proposed changes: 1. `serializerFor` and `deserializerFor` return expressions for serializing/deserializing an input expression for a given type. They are private and should not be called directly. 2. `serializerForType` and `deserializerForType` returns an expression for serializing/deserializing for an object of type T to/from Spark SQL representation. It assumes the input object/Spark SQL representation is located at ordinal 0 of a row. So in other words, `serializerForType` and `deserializerForType` return expressions for atomically serializing/deserializing JVM object to/from Spark SQL value. A serializer returned by `serializerForType` will serialize an object at `row(0)` to a corresponding Spark SQL representation, e.g. primitive type, array, map, struct. A deserializer returned by `deserializerForType` will deserialize an input field at `row(0)` to an object with given type. 3. The construction of `ExpressionEncoder` takes a pair of serializer and deserializer for type `T`. It uses them to create serializer and deserializer for T <-> row serialization. Now `ExpressionEncoder` dones't need to remember if serializer is flat or not. When we need to construct new `ExpressionEncoder` based on existing ones, we only need to change input location in the atomic serializer and deserializer. ## How was this patch tested? Existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/viirya/spark-1 SPARK-24762-refactor Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22749.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22749 commit e1b5deebe715479125c8878f0c90a55dc9ab3e85 Author: Liang-Chi Hsieh Date: 2018-07-09T03:42:04Z Aggregator should be able to use Option of Product encoder. commit 80506f4e98184ccd66dbaac14ec52d69c358020d Author: Liang-Chi Hsieh Date: 2018-07-13T04:40:55Z Enable top-level Option of Product encoders. commit ed3d5cb697b10af2e2cf4c78ab521d4d0b2f3c9b Author: Liang-Chi Hsieh Date: 2018-08-24T04:26:28Z Remove topLevel parameter. commit 9fc3f6165156051142a8366a32726badaaa16bb7 Author: Liang-Chi Hsieh Date: 2018-08-24T04:37:39Z Merge remote-tracking branch 'upstream/master' into SPARK-24762 commit 5f95bd0cf1bd308c7df55c41caef7a9f19368f5d Author: Liang-Chi Hsieh Date: 2018-08-24T04:42:33Z Remove useless change. commit a4f04055b2ba22f371663565710328791942855a Author: Liang-Chi Hsieh Date: 2018-08-24T14:38:16Z Add more tests. commit c1f798f7e9cba0d04223eed06f1b1f547ec29dc5 Author: Liang-Chi Hsieh Date: 2018-08-25T01:52:01Z Add test. commit 80e11d289d7775863cb9c28b2c1d4364292048a4 Author: Liang-Chi Hsieh Date: 2018-10-06T04:06:57Z Merge remote-tracking branch 'upstream/master' into SPARK-24762 commit 0f029b0a28700334dc6334f1ad89b3124f235a51 Author: Liang-Chi Hsieh Date: 2018-10-06T04:40:07Z Improve code comments. commit 84f3ce07f2f6a9236bd27f927fbb877e937f6917 Author: Liang-Chi Hsieh Date: 2018-10-15T09:55:03Z Refactoring ExpressionEncoder. commit 6a6fa454e22728cc2ad8e5515cd587fe0be84b26 Author: Liang-Chi Hsieh Date: 2018-10-17T02:07:40Z Fix Malformed class name. commit 25a616286075ca4f0a7d528095b387172b05c6c3 Author: Liang-Chi Hsieh Date: 2018-10-17T05:11:10Z Fix error message. commit 295ecde8103c26dda169d931f939f8a2fe641c4c Author: Liang-Chi Hsieh Date: 2018-10-18T15:58:03Z Fix test. commit 85a91220ec4eb00bd9d5020ecf980eac0301f716 Author: Liang-Chi Hsieh Date: 2018-10-18T16:05:22Z Merge remote-tracking branch 'upstream/master' into SPARK-24762-refactor commit 35700f4a0f36fb397ac028a68011a2753c5c2c75 Author: Liang-Chi Hsieh Date: 2018-10-19T00:07:29Z Fix rebase error. commit b211ed069dceb33c45cf6caf12c19527334d4ad8 Author: Liang-Chi Hsieh Date: 2018-10-19T00:16:24Z Fix unintentional style change. commit 0c78b73e5abce2a51763c860e43aab214c8634d9 Author: Liang-Chi Hsieh Date: 2018-10-19T00:51:52Z Address comments. commit 5b9abb67907dfdb0c0c64751db3525564f832422 Author: Liang-Chi Hsieh Date: 2018-10-20T02:26:07Z Address ComplexTypeMergingExpress
[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...
Github user viirya closed the pull request at: https://github.com/apache/spark/pull/22749 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r226846925 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala --- @@ -43,10 +44,11 @@ import org.apache.spark.util.Utils *to the name `value`. */ object ExpressionEncoder { + def apply[T : TypeTag](): ExpressionEncoder[T] = { -// We convert the not-serializable TypeTag into StructType and ClassTag. val mirror = ScalaReflection.mirror -val tpe = typeTag[T].in(mirror).tpe +val tpe = ScalaReflection.localTypeOf[T] --- End diff -- I think it should be fine, but let me revert this change first. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org