gengliangwang commented on a change in pull request #31490:
URL: https://github.com/apache/spark/pull/31490#discussion_r659970793
##########
File path:
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
##########
@@ -335,36 +336,32 @@ private[sql] class AvroDeserializer(
avroPath: Seq[String],
catalystPath: Seq[String],
applyFilters: Int => Boolean): (CatalystDataUpdater, GenericRecord) =>
Boolean = {
- val validFieldIndexes = ArrayBuffer.empty[Int]
- val fieldWriters = ArrayBuffer.empty[(CatalystDataUpdater, Any) => Unit]
-
- val avroSchemaHelper = new AvroUtils.AvroSchemaHelper(avroType, avroPath)
- val length = catalystType.length
- var i = 0
- while (i < length) {
- val catalystField = catalystType.fields(i)
- avroSchemaHelper.getFieldByName(catalystField.name) match {
- case Some(avroField) =>
- validFieldIndexes += avroField.pos()
+ val avroSchemaHelper =
+ new AvroUtils.AvroSchemaHelper(avroType, catalystType, avroPath,
positionalFieldMatch)
+
+ avroSchemaHelper.getCatalystFieldsWithoutMatch.filterNot(_.nullable) match
{
Review comment:
Shall we have this improvement in another PR? And we can have test cases
of incompatible schemas for both by-name and by-position matching.
Currently I find this PR a bit complicated and there is not test cases for
this.
Supporting the position matching in this one is enough.
##########
File path:
external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSerdeSuite.scala
##########
@@ -144,6 +173,43 @@ object AvroSerdeSuite {
private val CATALYST_STRUCT =
new StructType().add("foo", new StructType().add("bar", IntegerType))
+ /**
+ * Specifier for type of serde to be used for easy creation of tests that do
both
+ * serialization and deserialization.
+ */
+ object SerdeType extends Enumeration {
+ type SerdeType = Value
+ val SERIALIZE, DESERIALIZE = Value
+ }
+ import SerdeType._
+
+ /**
+ * Specifier for type of field matching to be used for easy creation of
tests that do both
+ * positional and by-name field matching.
+ */
+ object FieldMatchType extends Enumeration {
+ type FieldMatchType = Value
+ val BY_NAME, BY_POSITION = Value
+ }
+ import FieldMatchType._
+
+ private def createSerde(
+ catalystSchema: StructType,
+ avroSchema: Schema,
+ serdeType: SerdeType,
+ fieldMatchType: FieldMatchType): Either[AvroSerializer,
AvroDeserializer] = {
+ val positional = fieldMatchType match {
+ case BY_NAME => false
+ case BY_POSITION => true
+ }
+ serdeType match {
+ case SERIALIZE => Left(new AvroSerializer(catalystSchema, avroSchema,
false, positional,
Review comment:
QQ: why using Left/Right here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]