Github user dongjoon-hyun commented on a diff in the pull request:
https://github.com/apache/spark/pull/22484#discussion_r220799981
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
---
@@ -34,621 +34,539 @@ import org.apache.spark.unsafe.map.BytesToBytesMap
/**
* Benchmark to measure performance for aggregate primitives.
- * To run this:
- * build/sbt "sql/test-only *benchmark.AggregateBenchmark"
- *
- * Benchmarks in this file are skipped in normal builds.
+ * To run this benchmark:
+ * {{{
+ * 1. without sbt: bin/spark-submit --class <this class> <spark sql test
jar>
+ * 2. build/sbt "sql/test:runMain <this class>"
+ * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt
"sql/test:runMain <this class>"
+ * Results will be written to
"benchmarks/AggregateBenchmark-results.txt".
+ * }}}
*/
-class AggregateBenchmark extends BenchmarkWithCodegen {
+object AggregateBenchmark extends SqlBasedBenchmark {
- ignore("aggregate without grouping") {
- val N = 500L << 22
- val benchmark = new Benchmark("agg without grouping", N)
- runBenchmark("agg w/o group", N) {
- sparkSession.range(N).selectExpr("sum(id)").collect()
+ override def benchmark(): Unit = {
+ runBenchmark("aggregate without grouping") {
+ val N = 500L << 22
+ runBenchmarkWithCodegen("agg w/o group", N) {
+ spark.range(N).selectExpr("sum(id)").collect()
+ }
}
- /*
- agg w/o group: Best/Avg Time(ms)
Rate(M/s) Per Row(ns) Relative
-
------------------------------------------------------------------------------------------------
- agg w/o group wholestage off 30136 / 31885 69.6
14.4 1.0X
- agg w/o group wholestage on 1851 / 1860 1132.9
0.9 16.3X
- */
- }
- ignore("stat functions") {
- val N = 100L << 20
+ runBenchmark("stat functions") {
+ val N = 100L << 20
- runBenchmark("stddev", N) {
- sparkSession.range(N).groupBy().agg("id" -> "stddev").collect()
- }
+ runBenchmarkWithCodegen("stddev", N) {
+ spark.range(N).groupBy().agg("id" -> "stddev").collect()
+ }
- runBenchmark("kurtosis", N) {
- sparkSession.range(N).groupBy().agg("id" -> "kurtosis").collect()
+ runBenchmarkWithCodegen("kurtosis", N) {
+ spark.range(N).groupBy().agg("id" -> "kurtosis").collect()
+ }
}
- /*
- Using ImperativeAggregate (as implemented in Spark 1.6):
-
- Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
- stddev: Avg Time(ms) Avg Rate(M/s)
Relative Rate
-
-------------------------------------------------------------------------------
- stddev w/o codegen 2019.04 10.39
1.00 X
- stddev w codegen 2097.29 10.00
0.96 X
- kurtosis w/o codegen 2108.99 9.94
0.96 X
- kurtosis w codegen 2090.69 10.03
0.97 X
-
- Using DeclarativeAggregate:
-
- Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
- stddev: Best/Avg Time(ms) Rate(M/s)
Per Row(ns) Relative
-
-------------------------------------------------------------------------------------------
- stddev codegen=false 5630 / 5776 18.0
55.6 1.0X
- stddev codegen=true 1259 / 1314 83.0
12.0 4.5X
-
- Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
- kurtosis: Best/Avg Time(ms) Rate(M/s)
Per Row(ns) Relative
-
-------------------------------------------------------------------------------------------
- kurtosis codegen=false 14847 / 15084 7.0
142.9 1.0X
- kurtosis codegen=true 1652 / 2124 63.0
15.9 9.0X
- */
- }
-
- ignore("aggregate with linear keys") {
- val N = 20 << 22
+ runBenchmark("aggregate with linear keys") {
+ val N = 20 << 22
- val benchmark = new Benchmark("Aggregate w keys", N)
- def f(): Unit = {
- sparkSession.range(N).selectExpr("(id & 65535) as
k").groupBy("k").sum().collect()
- }
+ val benchmark = new Benchmark("Aggregate w keys", N, output = output)
- benchmark.addCase(s"codegen = F", numIters = 2) { iter =>
- sparkSession.conf.set("spark.sql.codegen.wholeStage", "false")
- f()
- }
+ def f(): Unit = {
+ spark.range(N).selectExpr("(id & 65535) as
k").groupBy("k").sum().collect()
+ }
- benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter =>
- sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
-
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled",
"false")
-
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable",
"false")
- f()
- }
+ benchmark.addCase(s"codegen = F", numIters = 2) { _ =>
+ withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
+ f()
+ }
+ }
- benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { iter =>
- sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
-
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled",
"true")
-
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable",
"true")
- f()
- }
+ benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { _ =>
+ withSQLConf(
+ SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
+ SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "false",
+ "spark.sql.codegen.aggregate.map.vectorized.enable" -> "false") {
+ f()
+ }
+ }
- benchmark.run()
+ benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { _ =>
+ withSQLConf(
+ SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
+ SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "true",
+ "spark.sql.codegen.aggregate.map.vectorized.enable" -> "true") {
+ f()
+ }
+ }
- /*
- Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11
- Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
+ benchmark.run()
+ }
- Aggregate w keys: Best/Avg Time(ms)
Rate(M/s) Per Row(ns) Relative
-
------------------------------------------------------------------------------------------------
- codegen = F 6619 / 6780 12.7
78.9 1.0X
- codegen = T hashmap = F 3935 / 4059 21.3
46.9 1.7X
- codegen = T hashmap = T 897 / 971 93.5
10.7 7.4X
- */
- }
+ runBenchmark("aggregate with randomized keys") {
+ val N = 20 << 22
- ignore("aggregate with randomized keys") {
- val N = 20 << 22
+ val benchmark = new Benchmark("Aggregate w keys", N, output = output)
+ spark.range(N).selectExpr("id", "floor(rand() * 10000) as k")
+ .createOrReplaceTempView("test")
- val benchmark = new Benchmark("Aggregate w keys", N)
- sparkSession.range(N).selectExpr("id", "floor(rand() * 10000) as k")
- .createOrReplaceTempView("test")
+ def f(): Unit = spark.sql("select k, k, sum(id) from test group by
k, k").collect()
- def f(): Unit = sparkSession.sql("select k, k, sum(id) from test group
by k, k").collect()
+ benchmark.addCase(s"codegen = F", numIters = 2) { _ =>
+ withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
+ f()
+ }
+ }
- benchmark.addCase(s"codegen = F", numIters = 2) { iter =>
- sparkSession.conf.set("spark.sql.codegen.wholeStage", value = false)
- f()
- }
+ benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { _ =>
+ withSQLConf(
+ SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
+ SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "false",
+ "spark.sql.codegen.aggregate.map.vectorized.enable" -> "false") {
+ f()
+ }
+ }
- benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter =>
- sparkSession.conf.set("spark.sql.codegen.wholeStage", value = true)
-
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled",
"false")
-
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable",
"false")
- f()
- }
+ benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { _ =>
+ withSQLConf(
+ SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
+ SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "true",
+ "spark.sql.codegen.aggregate.map.vectorized.enable" -> "true") {
+ f()
+ }
+ }
- benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { iter =>
- sparkSession.conf.set("spark.sql.codegen.wholeStage", value = true)
-
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled",
"true")
-
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable",
"true")
- f()
+ benchmark.run()
}
- benchmark.run()
-
- /*
- Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11
- Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
+ runBenchmark("aggregate with string key") {
+ val N = 20 << 20
- Aggregate w keys: Best/Avg Time(ms)
Rate(M/s) Per Row(ns) Relative
-
------------------------------------------------------------------------------------------------
- codegen = F 7445 / 7517 11.3
88.7 1.0X
- codegen = T hashmap = F 4672 / 4703 18.0
55.7 1.6X
- codegen = T hashmap = T 1764 / 1958 47.6
21.0 4.2X
- */
- }
+ val benchmark = new Benchmark("Aggregate w string key", N, output =
output)
- ignore("aggregate with string key") {
- val N = 20 << 20
+ def f(): Unit = spark.range(N).selectExpr("id", "cast(id & 1023 as
string) as k")
+ .groupBy("k").count().collect()
- val benchmark = new Benchmark("Aggregate w string key", N)
- def f(): Unit = sparkSession.range(N).selectExpr("id", "cast(id & 1023
as string) as k")
- .groupBy("k").count().collect()
+ benchmark.addCase(s"codegen = F", numIters = 2) { _ =>
+ spark.conf.set("spark.sql.codegen.wholeStage", "false")
--- End diff --
Shall we remove this redundant line 148?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]