xkrogen edited a comment on pull request #32969:
URL: https://github.com/apache/spark/pull/32969#issuecomment-866130603


   @bersprockets -- @gengliangwang brought up the good point on #31490 that 
there is some interaction between these two PRs. I believe my changes in #31490 
will also address the performance issue (I plan to run your benchmarks 
shortly), but also support matching fields by position (the main goal of 
#31490). Can you take a look there and see what you think?
   
   Edit: Sorry, that's not quite right. The current code in #31490 is still 
doing N^2 matching. But the adjustment to the structure, where all of the field 
matching is done in a single method call instead of each method call matching 
one field, makes it easier to implement your changes IMO -- we can just 
construct the map within the new `matchCatalystAvroSchema` call. Here's the 
diff to implement your changes on top of #31490:
   ```
   @@ -228,12 +227,15 @@ private[sql] object AvroUtils extends Logging {
          throw new IncompatibleSchemaException(
            s"Attempted to treat ${toFieldStr(avroPath)} as RECORD, but it was: 
${avroSchema.getType}")
        }
   -    val avroFields = avroSchema.getFields.asScala
   +    val avroFieldMap = 
avroSchema.getFields.asScala.groupBy(_.name.toLowerCase(Locale.ROOT))
   +    val avroFields = avroSchema.getFields.asScala.map(Some(_)).toArray
        val catalystMatches = catalystType.zipWithIndex.map { case 
(catalystField, catalystPos) =>
          val avroField = if (positionalFieldMatch) {
   -        avroFields.drop(catalystPos).headOption
   +        avroFields.applyOrElse(catalystPos, _ => None)
          } else {
   -        avroFields.filter(f => SQLConf.get.resolver(f.name(), 
catalystField.name)).toSeq match {
   +        avroFieldMap
   +          .getOrElse(catalystField.name.toLowerCase(Locale.ROOT), Nil)
   +          .filter(f => SQLConf.get.resolver(f.name(), 
catalystField.name)).toSeq match {
              case Seq(avroField) => Some(avroField)
              case Seq() => None
              case matches => throw new IncompatibleSchemaException("Searching 
for " +
   @@ -244,8 +246,8 @@ private[sql] object AvroUtils extends Logging {
          }
          (Some(catalystField), avroField)
        }
   -    val extraAvroFields = (avroFields.toSet -- 
catalystMatches.flatMap(_._2))
   -      .map(avroField => (None, Some(avroField)))
   +    val extraAvroFields = (avroFields.toSet -- 
catalystMatches.flatMap(_._2).map(Some(_)))
   +      .map(avroField => (None, avroField))
        catalystMatches ++ extraAvroFields
      }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to