bersprockets commented on a change in pull request #32969:
URL: https://github.com/apache/spark/pull/32969#discussion_r655794745



##########
File path: 
external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroWriteBenchmark.scala
##########
@@ -31,7 +36,34 @@ package org.apache.spark.sql.execution.benchmark
  *  }}}
  */
 object AvroWriteBenchmark extends DataSourceWriteBenchmark {
+  private def wideColumnsBenchmark: Unit = {
+    import spark.implicits._
+
+    withTempPath { dir =>
+      withTempTable("t1") {
+        val width = 1000
+        val values = 500000
+        val files = 20
+        val selectExpr = (1 to width).map(i => s"value as c$i")
+        // repartition to ensure we will write multiple files
+        val df = spark.range(values)
+          .map(_ => Random.nextInt).selectExpr(selectExpr: 
_*).repartition(files)
+          .persist(StorageLevel.DISK_ONLY)
+        // cache the data to ensure we are not benchmarking range or 
repartition
+        df.filter("(c1*c2) = 12").collect

Review comment:
       I think I can replace this with `df.noop()`. I will check.
   
   I was using `collect` to force the evaluation (and caching) of df. But I 
didn't want to actually collect 500K wide rows, so I filtered away all of df's 
rows.
   
   The whacky filter expression `(c1*c2) = 12` was to ensure no push-down to 
the file format's libraries (so that df actually reads all rows). Not really 
needed in the case of Avro.
   

##########
File path: 
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
##########
@@ -202,34 +203,40 @@ private[sql] object AvroUtils extends Logging {
     }
   }
 
-  /**
-   * Extract a single field from `avroSchema` which has the desired field name,
-   * performing the matching with proper case sensitivity according to 
[[SQLConf.resolver]].
-   *
-   * @param avroSchema The schema in which to search for the field. Must be of 
type RECORD.
-   * @param name The name of the field to search for.
-   * @param avroPath The seq of parent field names leading to `avroSchema`.
-   * @return `Some(match)` if a matching Avro field is found, otherwise `None`.
-   * @throws IncompatibleSchemaException if `avroSchema` is not a RECORD or 
contains multiple
-   *                                     fields matching `name` (i.e., 
case-insensitive matching
-   *                                     is used and `avroSchema` has two or 
more fields that have
-   *                                     the same name with difference case).
-   */
-  private[avro] def getAvroFieldByName(
-      avroSchema: Schema,
-      name: String,
-      avroPath: Seq[String]): Option[Schema.Field] = {
+  class AvroSchemaHelper(avroSchema: Schema, avroPath: Seq[String]) {
     if (avroSchema.getType != Schema.Type.RECORD) {
       throw new IncompatibleSchemaException(
         s"Attempting to treat ${avroSchema.getName} as a RECORD, but it was: 
${avroSchema.getType}")
     }
-    avroSchema.getFields.asScala.filter(f => SQLConf.get.resolver(f.name(), 
name)).toSeq match {
-      case Seq(avroField) => Some(avroField)
-      case Seq() => None
-      case matches => throw new IncompatibleSchemaException(s"Searching for 
'$name' in Avro " +
+
+    val schemaMap = avroSchema.getFields.asScala.groupBy { f =>
+        f.name.toLowerCase(Locale.ROOT)
+      }.map { case (k, v) =>
+      (k, v.toSeq) // needed for scala 2.13
+    }
+
+    /**
+     * Extract a single field from the contained avro schema which has the 
desired field name,
+     * performing the matching with proper case sensitivity according to 
SQLConf.resolver.
+     *
+     * @param name The name of the field to search for.
+     * @return `Some(match)` if a matching Avro field is found, otherwise 
`None`.
+     */
+    def getFieldByName(name: String): Option[Schema.Field] = {
+
+      // get candidates, ignoring case of field name
+      val candidates = schemaMap.get(name.toLowerCase(Locale.ROOT))
+        .getOrElse(Seq.empty[Schema.Field])
+
+      // search candidates, taking into account case sensitivity settings
+      candidates.filter(f => SQLConf.get.resolver(f.name(), name)) match {
+        case Seq(avroField) => Some(avroField)

Review comment:
       >Is it so that we can make use of SQLConf#resolver to perform the 
resolution
   
   Well, that was certainly one reason (so I don't have to decide the resolver, 
in case that decision logic changes).
   
   But I also wanted to keep the code similar to the original implementation 
(i.e., duplicates are detected at lookup time, etc.).
   
   I suppose my version wouldn't help much in the case of a wide table where 
all fields map to the same lower case name (e.g., a 10 character field name 
that just repeats with many different permutations of case). Hopefully that is 
an extremely unusual situation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to