Github user dongjoon-hyun commented on a diff in the pull request:
https://github.com/apache/spark/pull/22484#discussion_r220800365
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
---
@@ -34,621 +34,539 @@ import org.apache.spark.unsafe.map.BytesToBytesMap
/**
* Benchmark to measure performance for aggregate primitives.
- * To run this:
- * build/sbt "sql/test-only *benchmark.AggregateBenchmark"
- *
- * Benchmarks in this file are skipped in normal builds.
+ * To run this benchmark:
+ * {{{
+ * 1. without sbt: bin/spark-submit --class <this class> <spark sql test
jar>
+ * 2. build/sbt "sql/test:runMain <this class>"
+ * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt
"sql/test:runMain <this class>"
+ * Results will be written to
"benchmarks/AggregateBenchmark-results.txt".
+ * }}}
*/
-class AggregateBenchmark extends BenchmarkWithCodegen {
+object AggregateBenchmark extends SqlBasedBenchmark {
- ignore("aggregate without grouping") {
- val N = 500L << 22
- val benchmark = new Benchmark("agg without grouping", N)
- runBenchmark("agg w/o group", N) {
- sparkSession.range(N).selectExpr("sum(id)").collect()
+ override def benchmark(): Unit = {
+ runBenchmark("aggregate without grouping") {
+ val N = 500L << 22
+ runBenchmarkWithCodegen("agg w/o group", N) {
+ spark.range(N).selectExpr("sum(id)").collect()
+ }
}
- /*
- agg w/o group: Best/Avg Time(ms)
Rate(M/s) Per Row(ns) Relative
-
------------------------------------------------------------------------------------------------
- agg w/o group wholestage off 30136 / 31885 69.6
14.4 1.0X
- agg w/o group wholestage on 1851 / 1860 1132.9
0.9 16.3X
- */
- }
- ignore("stat functions") {
- val N = 100L << 20
+ runBenchmark("stat functions") {
+ val N = 100L << 20
- runBenchmark("stddev", N) {
- sparkSession.range(N).groupBy().agg("id" -> "stddev").collect()
- }
+ runBenchmarkWithCodegen("stddev", N) {
+ spark.range(N).groupBy().agg("id" -> "stddev").collect()
+ }
- runBenchmark("kurtosis", N) {
- sparkSession.range(N).groupBy().agg("id" -> "kurtosis").collect()
+ runBenchmarkWithCodegen("kurtosis", N) {
+ spark.range(N).groupBy().agg("id" -> "kurtosis").collect()
+ }
}
- /*
- Using ImperativeAggregate (as implemented in Spark 1.6):
-
- Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
- stddev: Avg Time(ms) Avg Rate(M/s)
Relative Rate
-
-------------------------------------------------------------------------------
- stddev w/o codegen 2019.04 10.39
1.00 X
- stddev w codegen 2097.29 10.00
0.96 X
- kurtosis w/o codegen 2108.99 9.94
0.96 X
- kurtosis w codegen 2090.69 10.03
0.97 X
-
- Using DeclarativeAggregate:
-
- Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
- stddev: Best/Avg Time(ms) Rate(M/s)
Per Row(ns) Relative
-
-------------------------------------------------------------------------------------------
- stddev codegen=false 5630 / 5776 18.0
55.6 1.0X
- stddev codegen=true 1259 / 1314 83.0
12.0 4.5X
-
- Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
- kurtosis: Best/Avg Time(ms) Rate(M/s)
Per Row(ns) Relative
-
-------------------------------------------------------------------------------------------
- kurtosis codegen=false 14847 / 15084 7.0
142.9 1.0X
- kurtosis codegen=true 1652 / 2124 63.0
15.9 9.0X
- */
- }
-
- ignore("aggregate with linear keys") {
- val N = 20 << 22
+ runBenchmark("aggregate with linear keys") {
+ val N = 20 << 22
- val benchmark = new Benchmark("Aggregate w keys", N)
- def f(): Unit = {
- sparkSession.range(N).selectExpr("(id & 65535) as
k").groupBy("k").sum().collect()
- }
+ val benchmark = new Benchmark("Aggregate w keys", N, output = output)
- benchmark.addCase(s"codegen = F", numIters = 2) { iter =>
- sparkSession.conf.set("spark.sql.codegen.wholeStage", "false")
- f()
- }
+ def f(): Unit = {
+ spark.range(N).selectExpr("(id & 65535) as
k").groupBy("k").sum().collect()
+ }
- benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter =>
- sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
-
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled",
"false")
-
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable",
"false")
- f()
- }
+ benchmark.addCase(s"codegen = F", numIters = 2) { _ =>
--- End diff --
`s"codegen = F"` -> `"codegen = F"`?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]