mridulm commented on a change in pull request #31490:
URL: https://github.com/apache/spark/pull/31490#discussion_r657650239
##########
File path:
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
##########
@@ -203,34 +203,50 @@ private[sql] object AvroUtils extends Logging {
}
/**
- * Extract a single field from `avroSchema` which has the desired field name,
- * performing the matching with proper case sensitivity according to
[[SQLConf.resolver]].
+ * Match the provided `avroType` against the provided `catalystType`,
attempting to find a
+ * matching field in the Avro schema for each field in the Catalyst schema
and vice-versa. Fields
+ * are matched by position (i.e. structural match) if `positionalFieldMatch`
is true, otherwise
+ * they are matched by name.
*
- * @param avroSchema The schema in which to search for the field. Must be of
type RECORD.
- * @param name The name of the field to search for.
+ * @param avroSchema The Avro schema to use for matching
+ * @param catalystType The Catalyst type to use for matching
* @param avroPath The seq of parent field names leading to `avroSchema`.
- * @return `Some(match)` if a matching Avro field is found, otherwise `None`.
+ * @return A sequence of `(catalystField, avroField)` tuples. Every field
from `avroSchema` and
+ * `catalystType` will appear exactly once, either with a matched
field from the other
+ * type, or with `None` if no match was found. No entry will contain
`(None, None)`.
* @throws IncompatibleSchemaException if `avroSchema` is not a RECORD or
contains multiple
- * fields matching `name` (i.e.,
case-insensitive matching
- * is used and `avroSchema` has two or
more fields that have
- * the same name with difference case).
+ * fields matching a single Catalyst
field (i.e.,
+ * case-insensitive matching is used and
`avroSchema` has 2 or
+ * more fields that have the same name
with difference case)
*/
- private[avro] def getAvroFieldByName(
+ private[avro] def matchCatalystAvroSchema(
avroSchema: Schema,
- name: String,
- avroPath: Seq[String]): Option[Schema.Field] = {
+ catalystType: StructType,
+ avroPath: Seq[String],
+ positionalFieldMatch: Boolean): Seq[(Option[StructField],
Option[Schema.Field])] = {
if (avroSchema.getType != Schema.Type.RECORD) {
throw new IncompatibleSchemaException(
- s"Attempting to treat ${avroSchema.getName} as a RECORD, but it was:
${avroSchema.getType}")
+ s"Attempted to treat ${toFieldStr(avroPath)} as RECORD, but it was:
${avroSchema.getType}")
}
- avroSchema.getFields.asScala.filter(f => SQLConf.get.resolver(f.name(),
name)).toSeq match {
- case Seq(avroField) => Some(avroField)
- case Seq() => None
- case matches => throw new IncompatibleSchemaException(s"Searching for
'$name' in Avro " +
- s"schema at ${toFieldStr(avroPath)} gave ${matches.size} matches.
Candidates: " +
- matches.map(_.name()).mkString("[", ", ", "]")
- )
+ val avroFields = avroSchema.getFields.asScala
+ val catalystMatches = catalystType.zipWithIndex.map { case (catalystField,
catalystPos) =>
+ val avroField = if (positionalFieldMatch) {
Review comment:
Thanks for clarifying !
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]