kings129 commented on code in PR #40755:
URL: https://github.com/apache/spark/pull/40755#discussion_r1167250242
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala:
##########
@@ -97,26 +97,33 @@ object ExpressionEncoder {
}
val newSerializer = CreateStruct(serializers)
+ def nullSafe(input: Expression, result: Expression): Expression = {
+ If(IsNull(input), Literal.create(null, result.dataType), result)
+ }
+
val newDeserializerInput = GetColumnByOrdinal(0, newSerializer.dataType)
- val deserializers = encoders.zipWithIndex.map { case (enc, index) =>
+ val childrenDeserializers = encoders.zipWithIndex.map { case (enc, index)
=>
val getColExprs = enc.objDeserializer.collect { case c:
GetColumnByOrdinal => c }.distinct
assert(getColExprs.size == 1, "object deserializer should have only one
" +
s"`GetColumnByOrdinal`, but there are ${getColExprs.size}")
val input = GetStructField(newDeserializerInput, index)
- enc.objDeserializer.transformUp {
+ val newDeserializer = enc.objDeserializer.transformUp {
case GetColumnByOrdinal(0, _) => input
}
- }
- val newDeserializer = NewInstance(cls, deserializers, ObjectType(cls),
propagateNull = false)
- def nullSafe(input: Expression, result: Expression): Expression = {
- If(IsNull(input), Literal.create(null, result.dataType), result)
+ if (enc.objSerializer.nullable) {
+ nullSafe(input, newDeserializer)
+ } else {
+ newDeserializer
+ }
}
+ val newDeserializer =
+ NewInstance(cls, childrenDeserializers, ObjectType(cls), propagateNull =
false)
new ExpressionEncoder[Any](
nullSafe(newSerializerInput, newSerializer),
- nullSafe(newDeserializerInput, newDeserializer),
Review Comment:
This change is intended to create a deserializer type `newinstance(class
scala.Tuple*)` that can convert to a single null value. This behavior is the
same as before the
[commit](https://github.com/apache/spark/commit/cd92f25be5a221e0d4618925f7bc9dfd3bb8cb59)
introduced the regression.
Regarding the serializer, in the new unit test added in this pull request,
when the tuple is not null, named_struct is created for each element, and null
is handled there.
> if (isnull(input[0, scala.Tuple2, true])) null else named_struct(_1, if
(isnull(input[0, scala.Tuple2, true]._1)) null else named_struct(a, if
(input[0, scala.Tuple2, true]._1.isNullAt) null else staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
validateexternaltype(getexternalrowfield(input[0, scala.Tuple2, true]._1, 0,
a), StringType, ObjectType(class java.lang.String)), true, false, true), b,
assertnotnull(validateexternaltype(getexternalrowfield(input[0, scala.Tuple2,
true]._1, 1, b), IntegerType, ObjectType(class java.lang.Integer)).intValue))
AS _1#18, _2, if (isnull(input[0, scala.Tuple2, true]._2)) null else
named_struct(a, if (input[0, scala.Tuple2, true]._2.isNullAt) null else
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType,
fromString, validateexternaltype(getexternalrowfield(input[0, scala.Tuple2,
true]._2, 0, a), StringType, ObjectType(class java.lang.String)), true, false,
true), b, assertnotnull(validateex
ternaltype(getexternalrowfield(input[0, scala.Tuple2, true]._2, 1, b),
IntegerType, ObjectType(class java.lang.Integer)).intValue)) AS _2#19)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]