xkrogen commented on a change in pull request #32969:
URL: https://github.com/apache/spark/pull/32969#discussion_r655548950
##########
File path:
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
##########
@@ -202,34 +203,40 @@ private[sql] object AvroUtils extends Logging {
}
}
- /**
- * Extract a single field from `avroSchema` which has the desired field name,
- * performing the matching with proper case sensitivity according to
[[SQLConf.resolver]].
- *
- * @param avroSchema The schema in which to search for the field. Must be of
type RECORD.
- * @param name The name of the field to search for.
- * @param avroPath The seq of parent field names leading to `avroSchema`.
- * @return `Some(match)` if a matching Avro field is found, otherwise `None`.
- * @throws IncompatibleSchemaException if `avroSchema` is not a RECORD or
contains multiple
- * fields matching `name` (i.e.,
case-insensitive matching
- * is used and `avroSchema` has two or
more fields that have
- * the same name with difference case).
- */
- private[avro] def getAvroFieldByName(
- avroSchema: Schema,
- name: String,
- avroPath: Seq[String]): Option[Schema.Field] = {
+ class AvroSchemaHelper(avroSchema: Schema, avroPath: Seq[String]) {
if (avroSchema.getType != Schema.Type.RECORD) {
throw new IncompatibleSchemaException(
s"Attempting to treat ${avroSchema.getName} as a RECORD, but it was:
${avroSchema.getType}")
}
- avroSchema.getFields.asScala.filter(f => SQLConf.get.resolver(f.name(),
name)).toSeq match {
- case Seq(avroField) => Some(avroField)
- case Seq() => None
- case matches => throw new IncompatibleSchemaException(s"Searching for
'$name' in Avro " +
+
+ val schemaMap = avroSchema.getFields.asScala.groupBy { f =>
Review comment:
minor nit: `private[this] val`
also maybe `fieldMap` instead of `schemaMap` ?
##########
File path:
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
##########
@@ -250,11 +250,12 @@ private[sql] class AvroSerializer(
s"Avro $avroPathStr schema length (${avroFields.size}) doesn't match "
+
s"SQL ${toFieldStr(catalystPath)} schema length
(${catalystStruct.length})")
}
+ val avroSchemaHelper = new AvroUtils.AvroSchemaHelper(avroStruct, avroPath)
val (avroIndices: Array[Int], fieldConverters: Array[Converter]) =
catalystStruct.map { catalystField =>
- val avroField = AvroUtils
- .getAvroFieldByName(avroStruct, catalystField.name, avroPath)
match {
+ val avroField = avroSchemaHelper
+ .getFieldByName(catalystField.name) match {
Review comment:
minor nit: you can combine these two lines back into 1 now that the
method call is shorter
##########
File path:
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
##########
@@ -202,34 +203,40 @@ private[sql] object AvroUtils extends Logging {
}
}
- /**
- * Extract a single field from `avroSchema` which has the desired field name,
- * performing the matching with proper case sensitivity according to
[[SQLConf.resolver]].
- *
- * @param avroSchema The schema in which to search for the field. Must be of
type RECORD.
- * @param name The name of the field to search for.
- * @param avroPath The seq of parent field names leading to `avroSchema`.
- * @return `Some(match)` if a matching Avro field is found, otherwise `None`.
- * @throws IncompatibleSchemaException if `avroSchema` is not a RECORD or
contains multiple
- * fields matching `name` (i.e.,
case-insensitive matching
- * is used and `avroSchema` has two or
more fields that have
- * the same name with difference case).
- */
- private[avro] def getAvroFieldByName(
- avroSchema: Schema,
- name: String,
- avroPath: Seq[String]): Option[Schema.Field] = {
+ class AvroSchemaHelper(avroSchema: Schema, avroPath: Seq[String]) {
Review comment:
can you keep the Scaladoc for `avroSchema` and `avroPath` from the
previous version?
##########
File path:
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
##########
@@ -202,34 +203,40 @@ private[sql] object AvroUtils extends Logging {
}
}
- /**
- * Extract a single field from `avroSchema` which has the desired field name,
- * performing the matching with proper case sensitivity according to
[[SQLConf.resolver]].
- *
- * @param avroSchema The schema in which to search for the field. Must be of
type RECORD.
- * @param name The name of the field to search for.
- * @param avroPath The seq of parent field names leading to `avroSchema`.
- * @return `Some(match)` if a matching Avro field is found, otherwise `None`.
- * @throws IncompatibleSchemaException if `avroSchema` is not a RECORD or
contains multiple
- * fields matching `name` (i.e.,
case-insensitive matching
- * is used and `avroSchema` has two or
more fields that have
- * the same name with difference case).
- */
- private[avro] def getAvroFieldByName(
- avroSchema: Schema,
- name: String,
- avroPath: Seq[String]): Option[Schema.Field] = {
+ class AvroSchemaHelper(avroSchema: Schema, avroPath: Seq[String]) {
if (avroSchema.getType != Schema.Type.RECORD) {
throw new IncompatibleSchemaException(
s"Attempting to treat ${avroSchema.getName} as a RECORD, but it was:
${avroSchema.getType}")
}
- avroSchema.getFields.asScala.filter(f => SQLConf.get.resolver(f.name(),
name)).toSeq match {
- case Seq(avroField) => Some(avroField)
- case Seq() => None
- case matches => throw new IncompatibleSchemaException(s"Searching for
'$name' in Avro " +
+
+ val schemaMap = avroSchema.getFields.asScala.groupBy { f =>
+ f.name.toLowerCase(Locale.ROOT)
+ }.map { case (k, v) =>
+ (k, v.toSeq) // needed for scala 2.13
+ }
+
+ /**
+ * Extract a single field from the contained avro schema which has the
desired field name,
+ * performing the matching with proper case sensitivity according to
SQLConf.resolver.
+ *
+ * @param name The name of the field to search for.
+ * @return `Some(match)` if a matching Avro field is found, otherwise
`None`.
+ */
+ def getFieldByName(name: String): Option[Schema.Field] = {
+
+ // get candidates, ignoring case of field name
+ val candidates = schemaMap.get(name.toLowerCase(Locale.ROOT))
+ .getOrElse(Seq.empty[Schema.Field])
+
+ // search candidates, taking into account case sensitivity settings
+ candidates.filter(f => SQLConf.get.resolver(f.name(), name)) match {
+ case Seq(avroField) => Some(avroField)
Review comment:
Can you provide any commentary on why you chose to always lowercase and
then resolve later based on case sensitivity, vs. storing the map keys as case
insensitive or sensitive based on the SQL conf? Is it so that we can make use
of `SQLConf#resolver` to perform the resolution instead of checking the value
of the case sensitive conf ourselves?
##########
File path:
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
##########
@@ -202,34 +203,40 @@ private[sql] object AvroUtils extends Logging {
}
}
- /**
- * Extract a single field from `avroSchema` which has the desired field name,
- * performing the matching with proper case sensitivity according to
[[SQLConf.resolver]].
- *
- * @param avroSchema The schema in which to search for the field. Must be of
type RECORD.
- * @param name The name of the field to search for.
- * @param avroPath The seq of parent field names leading to `avroSchema`.
- * @return `Some(match)` if a matching Avro field is found, otherwise `None`.
- * @throws IncompatibleSchemaException if `avroSchema` is not a RECORD or
contains multiple
- * fields matching `name` (i.e.,
case-insensitive matching
- * is used and `avroSchema` has two or
more fields that have
- * the same name with difference case).
- */
- private[avro] def getAvroFieldByName(
- avroSchema: Schema,
- name: String,
- avroPath: Seq[String]): Option[Schema.Field] = {
+ class AvroSchemaHelper(avroSchema: Schema, avroPath: Seq[String]) {
if (avroSchema.getType != Schema.Type.RECORD) {
throw new IncompatibleSchemaException(
s"Attempting to treat ${avroSchema.getName} as a RECORD, but it was:
${avroSchema.getType}")
}
- avroSchema.getFields.asScala.filter(f => SQLConf.get.resolver(f.name(),
name)).toSeq match {
- case Seq(avroField) => Some(avroField)
- case Seq() => None
- case matches => throw new IncompatibleSchemaException(s"Searching for
'$name' in Avro " +
+
+ val schemaMap = avroSchema.getFields.asScala.groupBy { f =>
+ f.name.toLowerCase(Locale.ROOT)
+ }.map { case (k, v) =>
+ (k, v.toSeq) // needed for scala 2.13
+ }
Review comment:
Formatting here looks a little odd to me, maybe we can do:
```
avroSchema.getFields.asScala
.groupBy(_.name.toLowerCase(Locale.ROOT))
.mapValues(_.toSeq) // toSeq needed for scala 2.13
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]