karuppayya opened a new pull request #28804: URL: https://github.com/apache/spark/pull/28804
### What changes were proposed in this pull request? In case of HashAggregation, a partial aggregation(update) is done followed by final aggregation(merge) During partial aggregation we sort and spill to disk every-time fby, when the fast Map(when enabled) and UnsafeFixedWidthAggregationMap gets exhausted **When the cardinality of grouping column is close to the total number of records being processed, the sorting of data spilling to disk is not required, since it is kind of no-op and we can directly use rows in Final aggregation.** When the user is aware of nature of data, currently he has no control over disabling this sort, spill operation. This is similar to following issues in Hive: https://issues.apache.org/jira/browse/HIVE-223 https://issues.apache.org/jira/browse/HIVE-291 In this PR, the ability to disable sort/spill during partial aggregation is added ### Benchmark spark.executor.memory = 12G #### Init code ``` // init code case class Data(name: String, value1: String, value2: String, value3: Long, random: Int) val numRecords = Seq(60000000) val tblName = "tbl" ``` #### Generate data ``` // init code case class Data(name: String, value1: String, value2: String, value3: Long, random: Int) val numRecords = Seq(30000000, 60000000) val basePath = "s3://qubole-spar/karuppayya/SPAR-4477/benchmark/" val rand = scala.util.Random // write numRecords.foreach { recordCount => val dataLocation = s"$basePath/$recordCount" val dataDF = spark.range(recordCount).map { x => if (x < 10) Data(s"name1", s"value1", s"value1", 10, rand.nextInt(100)) else Data(s"name$x", s"value$x", s"value$x", 1, rand.nextInt(100)) } // creating data to be processed by on task(aslo gzip-ing to ensure spark doesnt // create multiple splits ) val randomDF = dataDF.orderBy("random") randomDF.drop("random").repartition(1) .write .mode("overwrite") .option("compression", "gzip") .parquet(dataLocation) } ``` #### query ``` val query = s""" |SELECT name, value1, value2, SUM(value3) s |FROM $tblName |GROUP BY name, value1, value2 |""" ``` #### Benchmark code ``` .add(StructField("name", StringType)) .add(StructField("value1", StringType)) .add(StructField("value2", StringType)) .add(StructField("value3", LongType)) val query = """ |SELECT name, value1, value2, SUM(value3) s |FROM tbl |GROUP BY name, value1, value2 |""" case class Metric(recordCount: Long, partialAggregateEnabled: Boolean, timeTaken: Long) val metrics = Seq(true, false).flatMap { enabled => sql(s"set spark.sql.aggregate.partialaggregate.skip.enabled=$enabled").collect numRecords.map { recordCount => import java.util.concurrent.TimeUnit.NANOSECONDS val dataLocation = s"$basePath/$recordCount" spark.read .option("inferTimestamp", "false") .schema(userSpecifiedSchema) .json(dataLocation) .createOrReplaceTempView("tbl") val start = System.nanoTime() spark.sql(query).filter("s > 10").collect val end = System.nanoTime() val diff = end - start Metric(recordCount, enabled, NANOSECONDS.toMillis(diff)) } } ``` ### Results ``` val df = metrics.toDF df.createOrReplaceTempView("a") val df = sql("select * from a order by recordcount desc, partialAggregateEnabled") df.show() scala> df.show +-----------+-----------------------+---------+ |recordCount|partialAggregateEnabled|timeTaken| +-----------+-----------------------+---------+ | 90000000| false| 593844| | 90000000| true| 412958| | 60000000| false| 377054| | 60000000| true| 276363| ``` ### Percent improvement: 90000000 → 30.46%, 60000000 → 26.70% ### Why are the changes needed? This improvement can improve the performance of queries ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? This patch was tested manually ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
