bersprockets commented on a change in pull request #32969:
URL: https://github.com/apache/spark/pull/32969#discussion_r655794745
##########
File path:
external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroWriteBenchmark.scala
##########
@@ -31,7 +36,34 @@ package org.apache.spark.sql.execution.benchmark
* }}}
*/
object AvroWriteBenchmark extends DataSourceWriteBenchmark {
+ private def wideColumnsBenchmark: Unit = {
+ import spark.implicits._
+
+ withTempPath { dir =>
+ withTempTable("t1") {
+ val width = 1000
+ val values = 500000
+ val files = 20
+ val selectExpr = (1 to width).map(i => s"value as c$i")
+ // repartition to ensure we will write multiple files
+ val df = spark.range(values)
+ .map(_ => Random.nextInt).selectExpr(selectExpr:
_*).repartition(files)
+ .persist(StorageLevel.DISK_ONLY)
+ // cache the data to ensure we are not benchmarking range or
repartition
+ df.filter("(c1*c2) = 12").collect
Review comment:
I think I can replace this with `df.noop()`. I will check.
I was using `collect` to force the evaluation (and caching) of df. But I
didn't want to actually collect 500K wide rows, so I filtered away all of df's
rows.
The whacky filter expression `(c1*c2) = 12` was to ensure no push-down to
the file format's libraries (so that df actually reads all rows). Not really
needed in the case of Avro.
##########
File path:
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
##########
@@ -202,34 +203,40 @@ private[sql] object AvroUtils extends Logging {
}
}
- /**
- * Extract a single field from `avroSchema` which has the desired field name,
- * performing the matching with proper case sensitivity according to
[[SQLConf.resolver]].
- *
- * @param avroSchema The schema in which to search for the field. Must be of
type RECORD.
- * @param name The name of the field to search for.
- * @param avroPath The seq of parent field names leading to `avroSchema`.
- * @return `Some(match)` if a matching Avro field is found, otherwise `None`.
- * @throws IncompatibleSchemaException if `avroSchema` is not a RECORD or
contains multiple
- * fields matching `name` (i.e.,
case-insensitive matching
- * is used and `avroSchema` has two or
more fields that have
- * the same name with difference case).
- */
- private[avro] def getAvroFieldByName(
- avroSchema: Schema,
- name: String,
- avroPath: Seq[String]): Option[Schema.Field] = {
+ class AvroSchemaHelper(avroSchema: Schema, avroPath: Seq[String]) {
if (avroSchema.getType != Schema.Type.RECORD) {
throw new IncompatibleSchemaException(
s"Attempting to treat ${avroSchema.getName} as a RECORD, but it was:
${avroSchema.getType}")
}
- avroSchema.getFields.asScala.filter(f => SQLConf.get.resolver(f.name(),
name)).toSeq match {
- case Seq(avroField) => Some(avroField)
- case Seq() => None
- case matches => throw new IncompatibleSchemaException(s"Searching for
'$name' in Avro " +
+
+ val schemaMap = avroSchema.getFields.asScala.groupBy { f =>
+ f.name.toLowerCase(Locale.ROOT)
+ }.map { case (k, v) =>
+ (k, v.toSeq) // needed for scala 2.13
+ }
+
+ /**
+ * Extract a single field from the contained avro schema which has the
desired field name,
+ * performing the matching with proper case sensitivity according to
SQLConf.resolver.
+ *
+ * @param name The name of the field to search for.
+ * @return `Some(match)` if a matching Avro field is found, otherwise
`None`.
+ */
+ def getFieldByName(name: String): Option[Schema.Field] = {
+
+ // get candidates, ignoring case of field name
+ val candidates = schemaMap.get(name.toLowerCase(Locale.ROOT))
+ .getOrElse(Seq.empty[Schema.Field])
+
+ // search candidates, taking into account case sensitivity settings
+ candidates.filter(f => SQLConf.get.resolver(f.name(), name)) match {
+ case Seq(avroField) => Some(avroField)
Review comment:
>Is it so that we can make use of SQLConf#resolver to perform the
resolution
Well, that was certainly one reason (so I don't have to decide the resolver,
in case that decision logic changes).
But I also wanted to keep the code similar to the original implementation
(i.e., duplicates are detected at lookup time, etc.).
I suppose my version wouldn't help much in the case of a wide table where
all fields map to the same lower case name (e.g., a 10 character field name
that just repeats with many different permutations of case). Hopefully that is
an extremely unusual situation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]