karuppayya opened a new pull request #28804:
URL: https://github.com/apache/spark/pull/28804


   ### What changes were proposed in this pull request?
   In case of HashAggregation, a partial aggregation(update) is done followed 
by final aggregation(merge) 
   
   During partial aggregation we sort and spill to disk every-time       fby, 
when the fast Map(when enabled) and  UnsafeFixedWidthAggregationMap gets 
exhausted
   
   **When the cardinality of grouping column is close to the total number of 
records being processed, the sorting of data spilling to disk is not required, 
since it is kind of no-op and we can directly use rows in Final aggregation.**
   
   When the user is aware of nature of data, currently he has no control over 
disabling this sort, spill operation.
   
   This is similar to following issues in Hive:
   https://issues.apache.org/jira/browse/HIVE-223
   https://issues.apache.org/jira/browse/HIVE-291
   
   In this PR, the ability to disable sort/spill during partial aggregation is 
added
   
   ### Benchmark
   spark.executor.memory = 12G
   #### Init code
   ```
   // init code
   case class Data(name: String, value1: String, value2: String, value3: Long, 
random: Int)
   val numRecords = Seq(60000000)
   val tblName = "tbl"
   ```
   #### Generate data
   
   ```
   // init code
   case class Data(name: String, value1: String, value2: String, value3: Long, 
random: Int)
   val numRecords = Seq(30000000, 60000000)
   
   val basePath = "s3://qubole-spar/karuppayya/SPAR-4477/benchmark/"
   val rand = scala.util.Random
   // write
   numRecords.foreach {
     recordCount =>
       val dataLocation = s"$basePath/$recordCount"
       val dataDF = spark.range(recordCount).map {
         x =>
           if (x < 10) Data(s"name1", s"value1", s"value1", 10, 
rand.nextInt(100))
           else Data(s"name$x", s"value$x", s"value$x", 1, rand.nextInt(100))
       }
       // creating data to be processed by on task(aslo gzip-ing to ensure 
spark doesnt
       // create multiple splits )
       val randomDF = dataDF.orderBy("random")
         randomDF.drop("random").repartition(1)
         .write
         .mode("overwrite")
         .option("compression", "gzip")
         .parquet(dataLocation)
   }
   ```
   #### query
   ```
   val query =
     s"""
       |SELECT name, value1, value2, SUM(value3) s
       |FROM $tblName
       |GROUP BY name, value1, value2
       |"""
   ```
   
   #### Benchmark code
   ```
     .add(StructField("name", StringType))
     .add(StructField("value1", StringType))
     .add(StructField("value2", StringType))
     .add(StructField("value3", LongType))
   val query =
     """
       |SELECT name, value1, value2, SUM(value3) s
       |FROM tbl
       |GROUP BY name, value1, value2
       |"""
   
   case class Metric(recordCount: Long, partialAggregateEnabled: Boolean, 
timeTaken: Long)
   val metrics = Seq(true, false).flatMap {
     enabled =>
       sql(s"set 
spark.sql.aggregate.partialaggregate.skip.enabled=$enabled").collect
       numRecords.map {
         recordCount =>
           import java.util.concurrent.TimeUnit.NANOSECONDS
           val dataLocation = s"$basePath/$recordCount"
           spark.read
             .option("inferTimestamp", "false")
             .schema(userSpecifiedSchema)
             .json(dataLocation)
             .createOrReplaceTempView("tbl")
           val start = System.nanoTime()
           spark.sql(query).filter("s > 10").collect
           val end = System.nanoTime()
           val diff = end - start
           Metric(recordCount, enabled, NANOSECONDS.toMillis(diff))
       }
   }
   ```
   ### Results
   ```
   val df = metrics.toDF
   df.createOrReplaceTempView("a")
   val df = sql("select * from a order by recordcount desc, 
partialAggregateEnabled")
   df.show()
   scala> df.show
   +-----------+-----------------------+---------+
   |recordCount|partialAggregateEnabled|timeTaken|
   +-----------+-----------------------+---------+
   |   90000000|                  false|   593844|
   |   90000000|                   true|   412958|
   |   60000000|                  false|   377054|
   |   60000000|                   true|   276363|
   ```
   ### Percent improvement: 
   90000000 → 30.46%, 60000000 → 26.70%
   
    
   ### Why are the changes needed?
   This improvement can improve the performance of queries
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   This patch was tested manually


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to