[GitHub] [spark] hvanhovell commented on a diff in pull request #39517: [SPARK-41993][SQL] Move RowEncoder to AgnosticEncoders

2023-01-13 Thread GitBox


hvanhovell commented on code in PR #39517:
URL: https://github.com/apache/spark/pull/39517#discussion_r1069354107


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala:
##
@@ -46,35 +46,42 @@ object AgnosticEncoders {
 override val clsTag: ClassTag[Option[E]] = ClassTag(classOf[Option[E]])
   }
 
-  case class ArrayEncoder[E](element: AgnosticEncoder[E])
+  case class ArrayEncoder[E](element: AgnosticEncoder[E], containsNull: 
Boolean)
 extends AgnosticEncoder[Array[E]] {
 override def isPrimitive: Boolean = false
-override def dataType: DataType = ArrayType(element.dataType, 
element.nullable)
+override def dataType: DataType = ArrayType(element.dataType, containsNull)
 override val clsTag: ClassTag[Array[E]] = element.clsTag.wrap
   }
 
-  case class IterableEncoder[C <: Iterable[E], E](
+  case class IterableEncoder[C, E](
   override val clsTag: ClassTag[C],
-  element: AgnosticEncoder[E])
+  element: AgnosticEncoder[E],
+  containsNull: Boolean,
+  override val lenientSerialization: Boolean)

Review Comment:
   yeah will do. TBH I was quite surprised by it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #39517: [SPARK-41993][SQL] Move RowEncoder to AgnosticEncoders

2023-01-12 Thread GitBox


hvanhovell commented on code in PR #39517:
URL: https://github.com/apache/spark/pull/39517#discussion_r1068889994


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala:
##
@@ -155,11 +169,19 @@ object ScalaReflection extends ScalaReflection {
 val walkedTypePath = 
WalkedTypePath().recordRoot(enc.clsTag.runtimeClass.getName)
 // Assumes we are deserializing the first column of a row.
 val input = GetColumnByOrdinal(0, enc.dataType)
-val deserializer = deserializerFor(
-  enc,
-  upCastToExpectedType(input, enc.dataType, walkedTypePath),
-  walkedTypePath)
-expressionWithNullSafety(deserializer, enc.nullable, walkedTypePath)
+enc match {
+  case RowEncoder(fields) =>

Review Comment:
   We create one with null checks. This one does not need them because we 
always return a Row (the toplevel row always exists).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #39517: [SPARK-41993][SQL] Move RowEncoder to AgnosticEncoders

2023-01-12 Thread GitBox


hvanhovell commented on code in PR #39517:
URL: https://github.com/apache/spark/pull/39517#discussion_r1068889994


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala:
##
@@ -155,11 +169,19 @@ object ScalaReflection extends ScalaReflection {
 val walkedTypePath = 
WalkedTypePath().recordRoot(enc.clsTag.runtimeClass.getName)
 // Assumes we are deserializing the first column of a row.
 val input = GetColumnByOrdinal(0, enc.dataType)
-val deserializer = deserializerFor(
-  enc,
-  upCastToExpectedType(input, enc.dataType, walkedTypePath),
-  walkedTypePath)
-expressionWithNullSafety(deserializer, enc.nullable, walkedTypePath)
+enc match {
+  case RowEncoder(fields) =>

Review Comment:
   We create one with null checks. This one does not need them because we 
always return a Row.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #39517: [SPARK-41993][SQL] Move RowEncoder to AgnosticEncoders

2023-01-12 Thread GitBox


hvanhovell commented on code in PR #39517:
URL: https://github.com/apache/spark/pull/39517#discussion_r106785


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala:
##
@@ -46,35 +46,42 @@ object AgnosticEncoders {
 override val clsTag: ClassTag[Option[E]] = ClassTag(classOf[Option[E]])
   }
 
-  case class ArrayEncoder[E](element: AgnosticEncoder[E])
+  case class ArrayEncoder[E](element: AgnosticEncoder[E], containsNull: 
Boolean)
 extends AgnosticEncoder[Array[E]] {
 override def isPrimitive: Boolean = false
-override def dataType: DataType = ArrayType(element.dataType, 
element.nullable)
+override def dataType: DataType = ArrayType(element.dataType, containsNull)
 override val clsTag: ClassTag[Array[E]] = element.clsTag.wrap
   }
 
-  case class IterableEncoder[C <: Iterable[E], E](
+  case class IterableEncoder[C, E](
   override val clsTag: ClassTag[C],
-  element: AgnosticEncoder[E])
+  element: AgnosticEncoder[E],
+  containsNull: Boolean,
+  override val lenientSerialization: Boolean)

Review Comment:
   It means we allow a Seq, a generic Array, or a primitive array as input for 
serialization



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #39517: [SPARK-41993][SQL] Move RowEncoder to AgnosticEncoders

2023-01-12 Thread GitBox


hvanhovell commented on code in PR #39517:
URL: https://github.com/apache/spark/pull/39517#discussion_r1068885292


##
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala:
##
@@ -125,7 +125,7 @@ class RowEncoderSuite extends CodegenInterpretedPlanTest {
 new StructType()
   .add("mapOfIntAndString", MapType(IntegerType, StringType))
   .add("mapOfStringAndArray", MapType(StringType, arrayOfString))
-  .add("mapOfArrayAndInt", MapType(arrayOfString, IntegerType))

Review Comment:
   That is a mistake. My bad :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #39517: [SPARK-41993][SQL] Move RowEncoder to AgnosticEncoders

2023-01-12 Thread GitBox


hvanhovell commented on code in PR #39517:
URL: https://github.com/apache/spark/pull/39517#discussion_r1068884900


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala:
##
@@ -377,27 +408,96 @@ object ScalaReflection extends ScalaReflection {
 val getter = Invoke(
   KnownNotNull(input),
   field.name,
-  dataTypeFor(field.enc),
-  returnNullable = field.enc.nullable)
+  externalDataTypeFor(field.enc),
+  returnNullable = field.nullable)
 field.name -> serializerFor(field.enc, getter)
   }
   createSerializerForObject(input, serializedFields)
+
+case RowEncoder(fields) =>
+  val serializedFields = fields.zipWithIndex.map { case (field, index) =>
+val fieldValue = serializerFor(
+  field.enc,
+  ValidateExternalType(
+GetExternalRowField(input, index, field.name),
+field.enc.dataType,
+lenientExternalDataTypeFor(field.enc)))
+
+val convertedField = if (field.nullable) {
+  exprs.If(
+Invoke(input, "isNullAt", BooleanType, exprs.Literal(index) :: 
Nil),
+// Because we strip UDTs, `field.dataType` can be different from 
`fieldValue.dataType`.
+// We should use `fieldValue.dataType` here.
+exprs.Literal.create(null, fieldValue.dataType),
+fieldValue
+  )
+} else {
+  AssertNotNull(fieldValue)
+}
+field.name -> convertedField
+  }
+  createSerializerForObject(input, serializedFields)
   }
 
   private def serializerForArray(
-  isArray: Boolean,
   elementEnc: AgnosticEncoder[_],
-  input: Expression): Expression = {
-dataTypeFor(elementEnc) match {
-  case dt: ObjectType =>
-createSerializerForMapObjects(input, dt, serializerFor(elementEnc, _))
-  case dt if isArray && elementEnc.isPrimitive =>
-createSerializerForPrimitiveArray(input, dt)
-  case dt =>
-createSerializerForGenericArray(input, dt, elementEnc.nullable)
+  elementNullable: Boolean,
+  input: Expression,
+  lenientSerialization: Boolean): Expression = {
+// Default serializer for Seq and generic Arrays. This does not work for 
primitive arrays.

Review Comment:
   I don't know. It was like that before this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #39517: [SPARK-41993][SQL] Move RowEncoder to AgnosticEncoders

2023-01-11 Thread GitBox


hvanhovell commented on code in PR #39517:
URL: https://github.com/apache/spark/pull/39517#discussion_r1067435036


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala:
##
@@ -377,27 +408,96 @@ object ScalaReflection extends ScalaReflection {
 val getter = Invoke(
   KnownNotNull(input),
   field.name,
-  dataTypeFor(field.enc),
-  returnNullable = field.enc.nullable)
+  externalDataTypeFor(field.enc),
+  returnNullable = field.nullable)
 field.name -> serializerFor(field.enc, getter)
   }
   createSerializerForObject(input, serializedFields)
+
+case RowEncoder(fields) =>
+  val serializedFields = fields.zipWithIndex.map { case (field, index) =>
+val fieldValue = serializerFor(
+  field.enc,
+  ValidateExternalType(
+GetExternalRowField(input, index, field.name),
+field.enc.dataType,
+lenientExternalDataTypeFor(field.enc)))
+
+val convertedField = if (field.nullable) {
+  exprs.If(
+Invoke(input, "isNullAt", BooleanType, exprs.Literal(index) :: 
Nil),
+// Because we strip UDTs, `field.dataType` can be different from 
`fieldValue.dataType`.
+// We should use `fieldValue.dataType` here.
+exprs.Literal.create(null, fieldValue.dataType),
+fieldValue
+  )
+} else {
+  AssertNotNull(fieldValue)
+}
+field.name -> convertedField
+  }
+  createSerializerForObject(input, serializedFields)
   }
 
   private def serializerForArray(
-  isArray: Boolean,
   elementEnc: AgnosticEncoder[_],
-  input: Expression): Expression = {
-dataTypeFor(elementEnc) match {
-  case dt: ObjectType =>
-createSerializerForMapObjects(input, dt, serializerFor(elementEnc, _))
-  case dt if isArray && elementEnc.isPrimitive =>
-createSerializerForPrimitiveArray(input, dt)
-  case dt =>
-createSerializerForGenericArray(input, dt, elementEnc.nullable)
+  elementNullable: Boolean,
+  input: Expression,
+  lenientSerialization: Boolean): Expression = {
+// Default serializer for Seq and generic Arrays. This does not work for 
primitive arrays.
+val genericSerializer = createSerializerForMapObjects(
+  input,
+  ObjectType(classOf[AnyRef]),
+  validateAndSerializeElement(elementEnc, elementNullable))
+
+// Check if it is possible the user can pass a primitive array. This is 
the only case when it
+// is safe to directly convert to an array (for generic arrays and Seqs 
the type and the
+// nullability can be violated). If the user has passed a primitive array 
we create a special
+// code path to deal with these.
+val primitiveEncoderOption = elementEnc match {
+  case _ if !lenientSerialization => None
+  case enc: PrimitiveLeafEncoder[_] => Option(enc)
+  case enc: BoxedLeafEncoder[_, _] => Option(enc.primitive)
+  case _ => None
+}
+primitiveEncoderOption match {
+  case Some(primitiveEncoder) =>
+val primitiveArrayClass = primitiveEncoder.clsTag.wrap.runtimeClass
+val check = Invoke(
+  targetObject = exprs.Literal.fromObject(primitiveArrayClass),
+  functionName = "isInstance",
+  BooleanType,
+  arguments = input :: Nil,
+  propagateNull = false,
+  returnNullable = false)
+exprs.If(
+  check,

Review Comment:
   We can widen this to arrays where the element is allowed to be null. In that 
case we do need to make sure the element type is sound.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #39517: [SPARK-41993][SQL] Move RowEncoder to AgnosticEncoders

2023-01-11 Thread GitBox


hvanhovell commented on code in PR #39517:
URL: https://github.com/apache/spark/pull/39517#discussion_r1067433716


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala:
##
@@ -306,7 +330,7 @@ object ScalaReflection extends ScalaReflection {
* input object is located at ordinal 0 of a row, i.e., `BoundReference(0, 
_)`.
*/
   def serializerFor(enc: AgnosticEncoder[_]): Expression = {

Review Comment:
   TODO check the generated code for boxed primitives. We might be doing double 
conversions there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org