Repository: spark Updated Branches: refs/heads/master 30f5d0f2d -> b96fd44f0
[SPARK-25476][SPARK-25510][TEST] Refactor AggregateBenchmark and add a new trait to better support Dataset and DataFrame API ## What changes were proposed in this pull request? This PR does 2 things: 1. Add a new trait(`SqlBasedBenchmark`) to better support Dataset and DataFrame API. 2. Refactor `AggregateBenchmark` to use main method. Generate benchmark result: ``` SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain org.apache.spark.sql.execution.benchmark.AggregateBenchmark" ``` ## How was this patch tested? manual tests Closes #22484 from wangyum/SPARK-25476. Lead-authored-by: Yuming Wang <yumw...@ebay.com> Co-authored-by: Dongjoon Hyun <dongj...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b96fd44f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b96fd44f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b96fd44f Branch: refs/heads/master Commit: b96fd44f0e91751c1ce3a617cb083bdf880701a1 Parents: 30f5d0f Author: Yuming Wang <yumw...@ebay.com> Authored: Mon Oct 1 07:32:40 2018 -0700 Committer: Dongjoon Hyun <dongj...@apache.org> Committed: Mon Oct 1 07:32:40 2018 -0700 ---------------------------------------------------------------------- .../benchmarks/AggregateBenchmark-results.txt | 143 +++ .../benchmark/AggregateBenchmark.scala | 943 +++++++++---------- .../execution/benchmark/SqlBasedBenchmark.scala | 60 ++ 3 files changed, 633 insertions(+), 513 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b96fd44f/sql/core/benchmarks/AggregateBenchmark-results.txt ---------------------------------------------------------------------- diff --git a/sql/core/benchmarks/AggregateBenchmark-results.txt b/sql/core/benchmarks/AggregateBenchmark-results.txt new file mode 100644 index 0000000..19e5247 --- /dev/null +++ b/sql/core/benchmarks/AggregateBenchmark-results.txt @@ -0,0 +1,143 @@ +================================================================================================ +aggregate without grouping +================================================================================================ + +OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +agg w/o group: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +agg w/o group wholestage off 65374 / 70665 32.1 31.2 1.0X +agg w/o group wholestage on 1178 / 1209 1779.8 0.6 55.5X + + +================================================================================================ +stat functions +================================================================================================ + +OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +stddev: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +stddev wholestage off 8667 / 8851 12.1 82.7 1.0X +stddev wholestage on 1266 / 1273 82.8 12.1 6.8X + +OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +kurtosis: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +kurtosis wholestage off 41218 / 41231 2.5 393.1 1.0X +kurtosis wholestage on 1347 / 1357 77.8 12.8 30.6X + + +================================================================================================ +aggregate with linear keys +================================================================================================ + +OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +codegen = F 9309 / 9389 9.0 111.0 1.0X +codegen = T hashmap = F 4417 / 4435 19.0 52.7 2.1X +codegen = T hashmap = T 1289 / 1298 65.1 15.4 7.2X + + +================================================================================================ +aggregate with randomized keys +================================================================================================ + +OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +codegen = F 11424 / 11426 7.3 136.2 1.0X +codegen = T hashmap = F 6441 / 6496 13.0 76.8 1.8X +codegen = T hashmap = T 2333 / 2344 36.0 27.8 4.9X + + +================================================================================================ +aggregate with string key +================================================================================================ + +OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +Aggregate w string key: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +codegen = F 4751 / 4890 4.4 226.5 1.0X +codegen = T hashmap = F 3146 / 3182 6.7 150.0 1.5X +codegen = T hashmap = T 2211 / 2261 9.5 105.4 2.1X + + +================================================================================================ +aggregate with decimal key +================================================================================================ + +OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +Aggregate w decimal key: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +codegen = F 3029 / 3062 6.9 144.4 1.0X +codegen = T hashmap = F 1534 / 1569 13.7 73.2 2.0X +codegen = T hashmap = T 575 / 578 36.5 27.4 5.3X + + +================================================================================================ +aggregate with multiple key types +================================================================================================ + +OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +Aggregate w multiple keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +codegen = F 7506 / 7521 2.8 357.9 1.0X +codegen = T hashmap = F 4791 / 4808 4.4 228.5 1.6X +codegen = T hashmap = T 3553 / 3585 5.9 169.4 2.1X + + +================================================================================================ +max function bytecode size of wholestagecodegen +================================================================================================ + +OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +max function bytecode size: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +codegen = F 608 / 656 1.1 927.1 1.0X +codegen = T hugeMethodLimit = 10000 402 / 419 1.6 613.5 1.5X +codegen = T hugeMethodLimit = 1500 616 / 619 1.1 939.9 1.0X + + +================================================================================================ +cube +================================================================================================ + +OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +cube: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +cube wholestage off 3229 / 3237 1.6 615.9 1.0X +cube wholestage on 1285 / 1306 4.1 245.2 2.5X + + +================================================================================================ +hash and BytesToBytesMap +================================================================================================ + +OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +BytesToBytesMap: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +UnsafeRowhash 328 / 330 64.0 15.6 1.0X +murmur3 hash 167 / 167 125.4 8.0 2.0X +fast hash 84 / 85 249.0 4.0 3.9X +arrayEqual 192 / 192 109.3 9.1 1.7X +Java HashMap (Long) 144 / 147 145.9 6.9 2.3X +Java HashMap (two ints) 147 / 153 142.3 7.0 2.2X +Java HashMap (UnsafeRow) 785 / 788 26.7 37.4 0.4X +LongToUnsafeRowMap (opt=false) 456 / 457 46.0 21.8 0.7X +LongToUnsafeRowMap (opt=true) 125 / 125 168.3 5.9 2.6X +BytesToBytesMap (off Heap) 885 / 885 23.7 42.2 0.4X +BytesToBytesMap (on Heap) 860 / 864 24.4 41.0 0.4X +Aggregate HashMap 56 / 56 373.9 2.7 5.8X + + http://git-wip-us.apache.org/repos/asf/spark/blob/b96fd44f/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala index 57a6fdb..296ae10 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala @@ -34,621 +34,538 @@ import org.apache.spark.unsafe.map.BytesToBytesMap /** * Benchmark to measure performance for aggregate primitives. - * To run this: - * build/sbt "sql/test-only *benchmark.AggregateBenchmark" - * - * Benchmarks in this file are skipped in normal builds. + * To run this benchmark: + * {{{ + * 1. without sbt: bin/spark-submit --class <this class> <spark sql test jar> + * 2. build/sbt "sql/test:runMain <this class>" + * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>" + * Results will be written to "benchmarks/AggregateBenchmark-results.txt". + * }}} */ -class AggregateBenchmark extends BenchmarkWithCodegen { +object AggregateBenchmark extends SqlBasedBenchmark { - ignore("aggregate without grouping") { - val N = 500L << 22 - val benchmark = new Benchmark("agg without grouping", N) - runBenchmark("agg w/o group", N) { - sparkSession.range(N).selectExpr("sum(id)").collect() + override def benchmark(): Unit = { + runBenchmark("aggregate without grouping") { + val N = 500L << 22 + codegenBenchmark("agg w/o group", N) { + spark.range(N).selectExpr("sum(id)").collect() + } } - /* - agg w/o group: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - agg w/o group wholestage off 30136 / 31885 69.6 14.4 1.0X - agg w/o group wholestage on 1851 / 1860 1132.9 0.9 16.3X - */ - } - ignore("stat functions") { - val N = 100L << 20 + runBenchmark("stat functions") { + val N = 100L << 20 - runBenchmark("stddev", N) { - sparkSession.range(N).groupBy().agg("id" -> "stddev").collect() - } + codegenBenchmark("stddev", N) { + spark.range(N).groupBy().agg("id" -> "stddev").collect() + } - runBenchmark("kurtosis", N) { - sparkSession.range(N).groupBy().agg("id" -> "kurtosis").collect() + codegenBenchmark("kurtosis", N) { + spark.range(N).groupBy().agg("id" -> "kurtosis").collect() + } } - /* - Using ImperativeAggregate (as implemented in Spark 1.6): - - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - stddev: Avg Time(ms) Avg Rate(M/s) Relative Rate - ------------------------------------------------------------------------------- - stddev w/o codegen 2019.04 10.39 1.00 X - stddev w codegen 2097.29 10.00 0.96 X - kurtosis w/o codegen 2108.99 9.94 0.96 X - kurtosis w codegen 2090.69 10.03 0.97 X - - Using DeclarativeAggregate: - - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - stddev: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - stddev codegen=false 5630 / 5776 18.0 55.6 1.0X - stddev codegen=true 1259 / 1314 83.0 12.0 4.5X - - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - kurtosis: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - kurtosis codegen=false 14847 / 15084 7.0 142.9 1.0X - kurtosis codegen=true 1652 / 2124 63.0 15.9 9.0X - */ - } - - ignore("aggregate with linear keys") { - val N = 20 << 22 + runBenchmark("aggregate with linear keys") { + val N = 20 << 22 - val benchmark = new Benchmark("Aggregate w keys", N) - def f(): Unit = { - sparkSession.range(N).selectExpr("(id & 65535) as k").groupBy("k").sum().collect() - } + val benchmark = new Benchmark("Aggregate w keys", N, output = output) - benchmark.addCase(s"codegen = F", numIters = 2) { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", "false") - f() - } + def f(): Unit = { + spark.range(N).selectExpr("(id & 65535) as k").groupBy("k").sum().collect() + } - benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", "true") - sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "false") - sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "false") - f() - } + benchmark.addCase("codegen = F", numIters = 2) { _ => + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { + f() + } + } - benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", "true") - sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "true") - sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "true") - f() - } + benchmark.addCase("codegen = T hashmap = F", numIters = 3) { _ => + withSQLConf( + SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true", + SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "false", + "spark.sql.codegen.aggregate.map.vectorized.enable" -> "false") { + f() + } + } - benchmark.run() + benchmark.addCase("codegen = T hashmap = T", numIters = 5) { _ => + withSQLConf( + SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true", + SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "true", + "spark.sql.codegen.aggregate.map.vectorized.enable" -> "true") { + f() + } + } - /* - Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11 - Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz + benchmark.run() + } - Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - codegen = F 6619 / 6780 12.7 78.9 1.0X - codegen = T hashmap = F 3935 / 4059 21.3 46.9 1.7X - codegen = T hashmap = T 897 / 971 93.5 10.7 7.4X - */ - } + runBenchmark("aggregate with randomized keys") { + val N = 20 << 22 - ignore("aggregate with randomized keys") { - val N = 20 << 22 + val benchmark = new Benchmark("Aggregate w keys", N, output = output) + spark.range(N).selectExpr("id", "floor(rand() * 10000) as k") + .createOrReplaceTempView("test") - val benchmark = new Benchmark("Aggregate w keys", N) - sparkSession.range(N).selectExpr("id", "floor(rand() * 10000) as k") - .createOrReplaceTempView("test") + def f(): Unit = spark.sql("select k, k, sum(id) from test group by k, k").collect() - def f(): Unit = sparkSession.sql("select k, k, sum(id) from test group by k, k").collect() + benchmark.addCase("codegen = F", numIters = 2) { _ => + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { + f() + } + } - benchmark.addCase(s"codegen = F", numIters = 2) { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", value = false) - f() - } + benchmark.addCase("codegen = T hashmap = F", numIters = 3) { _ => + withSQLConf( + SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true", + SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "false", + "spark.sql.codegen.aggregate.map.vectorized.enable" -> "false") { + f() + } + } - benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", value = true) - sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "false") - sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "false") - f() - } + benchmark.addCase("codegen = T hashmap = T", numIters = 5) { _ => + withSQLConf( + SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true", + SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "true", + "spark.sql.codegen.aggregate.map.vectorized.enable" -> "true") { + f() + } + } - benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", value = true) - sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "true") - sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "true") - f() + benchmark.run() } - benchmark.run() - - /* - Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11 - Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz + runBenchmark("aggregate with string key") { + val N = 20 << 20 - Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - codegen = F 7445 / 7517 11.3 88.7 1.0X - codegen = T hashmap = F 4672 / 4703 18.0 55.7 1.6X - codegen = T hashmap = T 1764 / 1958 47.6 21.0 4.2X - */ - } + val benchmark = new Benchmark("Aggregate w string key", N, output = output) - ignore("aggregate with string key") { - val N = 20 << 20 + def f(): Unit = spark.range(N).selectExpr("id", "cast(id & 1023 as string) as k") + .groupBy("k").count().collect() - val benchmark = new Benchmark("Aggregate w string key", N) - def f(): Unit = sparkSession.range(N).selectExpr("id", "cast(id & 1023 as string) as k") - .groupBy("k").count().collect() + benchmark.addCase("codegen = F", numIters = 2) { _ => + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { + f() + } + } - benchmark.addCase(s"codegen = F", numIters = 2) { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", "false") - f() - } + benchmark.addCase("codegen = T hashmap = F", numIters = 3) { _ => + withSQLConf( + SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true", + SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "false", + "spark.sql.codegen.aggregate.map.vectorized.enable" -> "false") { + f() + } + } - benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", "true") - sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "false") - sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "false") - f() - } + benchmark.addCase("codegen = T hashmap = T", numIters = 5) { _ => + withSQLConf( + SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true", + SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "true", + "spark.sql.codegen.aggregate.map.vectorized.enable" -> "true") { + f() + } + } - benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", "true") - sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "true") - sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "true") - f() + benchmark.run() } - benchmark.run() - - /* - Java HotSpot(TM) 64-Bit Server VM 1.8.0_73-b02 on Mac OS X 10.11.4 - Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz - Aggregate w string key: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - codegen = F 3307 / 3376 6.3 157.7 1.0X - codegen = T hashmap = F 2364 / 2471 8.9 112.7 1.4X - codegen = T hashmap = T 1740 / 1841 12.0 83.0 1.9X - */ - } - - ignore("aggregate with decimal key") { - val N = 20 << 20 - - val benchmark = new Benchmark("Aggregate w decimal key", N) - def f(): Unit = sparkSession.range(N).selectExpr("id", "cast(id & 65535 as decimal) as k") - .groupBy("k").count().collect() - - benchmark.addCase(s"codegen = F") { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", "false") - f() - } + runBenchmark("aggregate with decimal key") { + val N = 20 << 20 - benchmark.addCase(s"codegen = T hashmap = F") { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", "true") - sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "false") - sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "false") - f() - } + val benchmark = new Benchmark("Aggregate w decimal key", N, output = output) - benchmark.addCase(s"codegen = T hashmap = T") { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", "true") - sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "true") - sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "true") - f() - } + def f(): Unit = spark.range(N).selectExpr("id", "cast(id & 65535 as decimal) as k") + .groupBy("k").count().collect() - benchmark.run() - - /* - Java HotSpot(TM) 64-Bit Server VM 1.8.0_73-b02 on Mac OS X 10.11.4 - Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz - Aggregate w decimal key: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - codegen = F 2756 / 2817 7.6 131.4 1.0X - codegen = T hashmap = F 1580 / 1647 13.3 75.4 1.7X - codegen = T hashmap = T 641 / 662 32.7 30.6 4.3X - */ - } + benchmark.addCase("codegen = F") { _ => + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { + f() + } + } - ignore("aggregate with multiple key types") { - val N = 20 << 20 - - val benchmark = new Benchmark("Aggregate w multiple keys", N) - def f(): Unit = sparkSession.range(N) - .selectExpr( - "id", - "(id & 1023) as k1", - "cast(id & 1023 as string) as k2", - "cast(id & 1023 as int) as k3", - "cast(id & 1023 as double) as k4", - "cast(id & 1023 as float) as k5", - "id > 1023 as k6") - .groupBy("k1", "k2", "k3", "k4", "k5", "k6") - .sum() - .collect() - - benchmark.addCase(s"codegen = F") { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", "false") - f() - } + benchmark.addCase("codegen = T hashmap = F") { _ => + withSQLConf( + SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true", + SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "false", + "spark.sql.codegen.aggregate.map.vectorized.enable" -> "false") { + f() + } + } - benchmark.addCase(s"codegen = T hashmap = F") { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", "true") - sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "false") - sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "false") - f() - } + benchmark.addCase("codegen = T hashmap = T") { _ => + withSQLConf( + SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true", + SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "true", + "spark.sql.codegen.aggregate.map.vectorized.enable" -> "true") { + f() + } + } - benchmark.addCase(s"codegen = T hashmap = T") { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", "true") - sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "true") - sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "true") - f() + benchmark.run() } - benchmark.run() - - /* - Java HotSpot(TM) 64-Bit Server VM 1.8.0_73-b02 on Mac OS X 10.11.4 - Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz - Aggregate w decimal key: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - codegen = F 5885 / 6091 3.6 280.6 1.0X - codegen = T hashmap = F 3625 / 4009 5.8 172.8 1.6X - codegen = T hashmap = T 3204 / 3271 6.5 152.8 1.8X - */ - } + runBenchmark("aggregate with multiple key types") { + val N = 20 << 20 - ignore("max function bytecode size of wholestagecodegen") { - val N = 20 << 15 - - val benchmark = new Benchmark("max function bytecode size", N) - def f(): Unit = sparkSession.range(N) - .selectExpr( - "id", - "(id & 1023) as k1", - "cast(id & 1023 as double) as k2", - "cast(id & 1023 as int) as k3", - "case when id > 100 and id <= 200 then 1 else 0 end as v1", - "case when id > 200 and id <= 300 then 1 else 0 end as v2", - "case when id > 300 and id <= 400 then 1 else 0 end as v3", - "case when id > 400 and id <= 500 then 1 else 0 end as v4", - "case when id > 500 and id <= 600 then 1 else 0 end as v5", - "case when id > 600 and id <= 700 then 1 else 0 end as v6", - "case when id > 700 and id <= 800 then 1 else 0 end as v7", - "case when id > 800 and id <= 900 then 1 else 0 end as v8", - "case when id > 900 and id <= 1000 then 1 else 0 end as v9", - "case when id > 1000 and id <= 1100 then 1 else 0 end as v10", - "case when id > 1100 and id <= 1200 then 1 else 0 end as v11", - "case when id > 1200 and id <= 1300 then 1 else 0 end as v12", - "case when id > 1300 and id <= 1400 then 1 else 0 end as v13", - "case when id > 1400 and id <= 1500 then 1 else 0 end as v14", - "case when id > 1500 and id <= 1600 then 1 else 0 end as v15", - "case when id > 1600 and id <= 1700 then 1 else 0 end as v16", - "case when id > 1700 and id <= 1800 then 1 else 0 end as v17", - "case when id > 1800 and id <= 1900 then 1 else 0 end as v18") - .groupBy("k1", "k2", "k3") - .sum() - .collect() - - benchmark.addCase("codegen = F") { iter => - sparkSession.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "false") - f() - } + val benchmark = new Benchmark("Aggregate w multiple keys", N, output = output) - benchmark.addCase("codegen = T hugeMethodLimit = 10000") { iter => - sparkSession.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true") - sparkSession.conf.set(SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key, "10000") - f() - } + def f(): Unit = spark.range(N) + .selectExpr( + "id", + "(id & 1023) as k1", + "cast(id & 1023 as string) as k2", + "cast(id & 1023 as int) as k3", + "cast(id & 1023 as double) as k4", + "cast(id & 1023 as float) as k5", + "id > 1023 as k6") + .groupBy("k1", "k2", "k3", "k4", "k5", "k6") + .sum() + .collect() - benchmark.addCase("codegen = T hugeMethodLimit = 1500") { iter => - sparkSession.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true") - sparkSession.conf.set(SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key, "1500") - f() - } + benchmark.addCase("codegen = F") { _ => + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { + f() + } + } - benchmark.run() + benchmark.addCase("codegen = T hashmap = F") { _ => + withSQLConf( + SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true", + SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "false", + "spark.sql.codegen.aggregate.map.vectorized.enable" -> "false") { + f() + } + } - /* - Java HotSpot(TM) 64-Bit Server VM 1.8.0_31-b13 on Mac OS X 10.10.2 - Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz + benchmark.addCase("codegen = T hashmap = T") { _ => + withSQLConf( + SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true", + SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "true", + "spark.sql.codegen.aggregate.map.vectorized.enable" -> "true") { + f() + } + } - max function bytecode size: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - codegen = F 709 / 803 0.9 1082.1 1.0X - codegen = T hugeMethodLimit = 10000 3485 / 3548 0.2 5317.7 0.2X - codegen = T hugeMethodLimit = 1500 636 / 701 1.0 969.9 1.1X - */ - } + benchmark.run() + } + + runBenchmark("max function bytecode size of wholestagecodegen") { + val N = 20 << 15 + + val benchmark = new Benchmark("max function bytecode size", N, output = output) + + def f(): Unit = spark.range(N) + .selectExpr( + "id", + "(id & 1023) as k1", + "cast(id & 1023 as double) as k2", + "cast(id & 1023 as int) as k3", + "case when id > 100 and id <= 200 then 1 else 0 end as v1", + "case when id > 200 and id <= 300 then 1 else 0 end as v2", + "case when id > 300 and id <= 400 then 1 else 0 end as v3", + "case when id > 400 and id <= 500 then 1 else 0 end as v4", + "case when id > 500 and id <= 600 then 1 else 0 end as v5", + "case when id > 600 and id <= 700 then 1 else 0 end as v6", + "case when id > 700 and id <= 800 then 1 else 0 end as v7", + "case when id > 800 and id <= 900 then 1 else 0 end as v8", + "case when id > 900 and id <= 1000 then 1 else 0 end as v9", + "case when id > 1000 and id <= 1100 then 1 else 0 end as v10", + "case when id > 1100 and id <= 1200 then 1 else 0 end as v11", + "case when id > 1200 and id <= 1300 then 1 else 0 end as v12", + "case when id > 1300 and id <= 1400 then 1 else 0 end as v13", + "case when id > 1400 and id <= 1500 then 1 else 0 end as v14", + "case when id > 1500 and id <= 1600 then 1 else 0 end as v15", + "case when id > 1600 and id <= 1700 then 1 else 0 end as v16", + "case when id > 1700 and id <= 1800 then 1 else 0 end as v17", + "case when id > 1800 and id <= 1900 then 1 else 0 end as v18") + .groupBy("k1", "k2", "k3") + .sum() + .collect() + + benchmark.addCase("codegen = F") { _ => + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { + f() + } + } + benchmark.addCase("codegen = T hugeMethodLimit = 10000") { _ => + withSQLConf( + SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true", + SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key -> "10000") { + f() + } + } - ignore("cube") { - val N = 5 << 20 + benchmark.addCase("codegen = T hugeMethodLimit = 1500") { _ => + withSQLConf( + SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true", + SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key -> "1500") { + f() + } + } - runBenchmark("cube", N) { - sparkSession.range(N).selectExpr("id", "id % 1000 as k1", "id & 256 as k2") - .cube("k1", "k2").sum("id").collect() + benchmark.run() } - /** - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - cube: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - cube codegen=false 3188 / 3392 1.6 608.2 1.0X - cube codegen=true 1239 / 1394 4.2 236.3 2.6X - */ - } - - ignore("hash and BytesToBytesMap") { - val N = 20 << 20 - val benchmark = new Benchmark("BytesToBytesMap", N) + runBenchmark("cube") { + val N = 5 << 20 - benchmark.addCase("UnsafeRowhash") { iter => - var i = 0 - val keyBytes = new Array[Byte](16) - val key = new UnsafeRow(1) - key.pointTo(keyBytes, Platform.BYTE_ARRAY_OFFSET, 16) - var s = 0 - while (i < N) { - key.setInt(0, i % 1000) - val h = Murmur3_x86_32.hashUnsafeWords( - key.getBaseObject, key.getBaseOffset, key.getSizeInBytes, 42) - s += h - i += 1 + codegenBenchmark("cube", N) { + spark.range(N).selectExpr("id", "id % 1000 as k1", "id & 256 as k2") + .cube("k1", "k2").sum("id").collect() } } - benchmark.addCase("murmur3 hash") { iter => - var i = 0 - val keyBytes = new Array[Byte](16) - val key = new UnsafeRow(1) - key.pointTo(keyBytes, Platform.BYTE_ARRAY_OFFSET, 16) - var p = 524283 - var s = 0 - while (i < N) { - var h = Murmur3_x86_32.hashLong(i, 42) - key.setInt(0, h) - s += h - i += 1 - } - } + runBenchmark("hash and BytesToBytesMap") { + val N = 20 << 20 - benchmark.addCase("fast hash") { iter => - var i = 0 - val keyBytes = new Array[Byte](16) - val key = new UnsafeRow(1) - key.pointTo(keyBytes, Platform.BYTE_ARRAY_OFFSET, 16) - var p = 524283 - var s = 0 - while (i < N) { - var h = i % p - if (h < 0) { - h += p - } - key.setInt(0, h) - s += h - i += 1 - } - } + val benchmark = new Benchmark("BytesToBytesMap", N, output = output) - benchmark.addCase("arrayEqual") { iter => - var i = 0 - val keyBytes = new Array[Byte](16) - val valueBytes = new Array[Byte](16) - val key = new UnsafeRow(1) - key.pointTo(keyBytes, Platform.BYTE_ARRAY_OFFSET, 16) - val value = new UnsafeRow(1) - value.pointTo(valueBytes, Platform.BYTE_ARRAY_OFFSET, 16) - value.setInt(0, 555) - var s = 0 - while (i < N) { - key.setInt(0, i % 1000) - if (key.equals(value)) { - s += 1 - } - i += 1 + benchmark.addCase("UnsafeRowhash") { _ => + var i = 0 + val keyBytes = new Array[Byte](16) + val key = new UnsafeRow(1) + key.pointTo(keyBytes, Platform.BYTE_ARRAY_OFFSET, 16) + var s = 0 + while (i < N) { + key.setInt(0, i % 1000) + val h = Murmur3_x86_32.hashUnsafeWords( + key.getBaseObject, key.getBaseOffset, key.getSizeInBytes, 42) + s += h + i += 1 + } } - } - benchmark.addCase("Java HashMap (Long)") { iter => - var i = 0 - val keyBytes = new Array[Byte](16) - val valueBytes = new Array[Byte](16) - val value = new UnsafeRow(1) - value.pointTo(valueBytes, Platform.BYTE_ARRAY_OFFSET, 16) - value.setInt(0, 555) - val map = new HashMap[Long, UnsafeRow]() - while (i < 65536) { - value.setInt(0, i) - map.put(i.toLong, value) - i += 1 - } - var s = 0 - i = 0 - while (i < N) { - if (map.get(i % 100000) != null) { - s += 1 - } - i += 1 + benchmark.addCase("murmur3 hash") { _ => + var i = 0 + val keyBytes = new Array[Byte](16) + val key = new UnsafeRow(1) + key.pointTo(keyBytes, Platform.BYTE_ARRAY_OFFSET, 16) + var p = 524283 + var s = 0 + while (i < N) { + var h = Murmur3_x86_32.hashLong(i, 42) + key.setInt(0, h) + s += h + i += 1 + } } - } - benchmark.addCase("Java HashMap (two ints) ") { iter => - var i = 0 - val valueBytes = new Array[Byte](16) - val value = new UnsafeRow(1) - value.pointTo(valueBytes, Platform.BYTE_ARRAY_OFFSET, 16) - value.setInt(0, 555) - val map = new HashMap[Long, UnsafeRow]() - while (i < 65536) { - value.setInt(0, i) - val key = (i.toLong << 32) + Integer.rotateRight(i, 15) - map.put(key, value) - i += 1 - } - var s = 0 - i = 0 - while (i < N) { - val key = ((i & 100000).toLong << 32) + Integer.rotateRight(i & 100000, 15) - if (map.get(key) != null) { - s += 1 - } - i += 1 + benchmark.addCase("fast hash") { _ => + var i = 0 + val keyBytes = new Array[Byte](16) + val key = new UnsafeRow(1) + key.pointTo(keyBytes, Platform.BYTE_ARRAY_OFFSET, 16) + var p = 524283 + var s = 0 + while (i < N) { + var h = i % p + if (h < 0) { + h += p + } + key.setInt(0, h) + s += h + i += 1 + } } - } - benchmark.addCase("Java HashMap (UnsafeRow)") { iter => - var i = 0 - val keyBytes = new Array[Byte](16) - val valueBytes = new Array[Byte](16) - val key = new UnsafeRow(1) - key.pointTo(keyBytes, Platform.BYTE_ARRAY_OFFSET, 16) - val value = new UnsafeRow(1) - value.pointTo(valueBytes, Platform.BYTE_ARRAY_OFFSET, 16) - value.setInt(0, 555) - val map = new HashMap[UnsafeRow, UnsafeRow]() - while (i < 65536) { - key.setInt(0, i) - value.setInt(0, i) - map.put(key, value.copy()) - i += 1 - } - var s = 0 - i = 0 - while (i < N) { - key.setInt(0, i % 100000) - if (map.get(key) != null) { - s += 1 - } - i += 1 + benchmark.addCase("arrayEqual") { _ => + var i = 0 + val keyBytes = new Array[Byte](16) + val valueBytes = new Array[Byte](16) + val key = new UnsafeRow(1) + key.pointTo(keyBytes, Platform.BYTE_ARRAY_OFFSET, 16) + val value = new UnsafeRow(1) + value.pointTo(valueBytes, Platform.BYTE_ARRAY_OFFSET, 16) + value.setInt(0, 555) + var s = 0 + while (i < N) { + key.setInt(0, i % 1000) + if (key.equals(value)) { + s += 1 + } + i += 1 + } } - } - Seq(false, true).foreach { optimized => - benchmark.addCase(s"LongToUnsafeRowMap (opt=$optimized)") { iter => + benchmark.addCase("Java HashMap (Long)") { _ => var i = 0 + val keyBytes = new Array[Byte](16) val valueBytes = new Array[Byte](16) val value = new UnsafeRow(1) value.pointTo(valueBytes, Platform.BYTE_ARRAY_OFFSET, 16) value.setInt(0, 555) - val taskMemoryManager = new TaskMemoryManager( - new StaticMemoryManager( - new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), - Long.MaxValue, - Long.MaxValue, - 1), - 0) - val map = new LongToUnsafeRowMap(taskMemoryManager, 64) + val map = new HashMap[Long, UnsafeRow]() while (i < 65536) { value.setInt(0, i) - val key = i % 100000 - map.append(key, value) + map.put(i.toLong, value) i += 1 } - if (optimized) { - map.optimize() + var s = 0 + i = 0 + while (i < N) { + if (map.get(i % 100000) != null) { + s += 1 + } + i += 1 + } + } + + benchmark.addCase("Java HashMap (two ints) ") { _ => + var i = 0 + val valueBytes = new Array[Byte](16) + val value = new UnsafeRow(1) + value.pointTo(valueBytes, Platform.BYTE_ARRAY_OFFSET, 16) + value.setInt(0, 555) + val map = new HashMap[Long, UnsafeRow]() + while (i < 65536) { + value.setInt(0, i) + val key = (i.toLong << 32) + Integer.rotateRight(i, 15) + map.put(key, value) + i += 1 } var s = 0 i = 0 while (i < N) { - val key = i % 100000 - if (map.getValue(key, value) != null) { + val key = ((i & 100000).toLong << 32) + Integer.rotateRight(i & 100000, 15) + if (map.get(key) != null) { s += 1 } i += 1 } } - } - Seq("off", "on").foreach { heap => - benchmark.addCase(s"BytesToBytesMap ($heap Heap)") { iter => - val taskMemoryManager = new TaskMemoryManager( - new StaticMemoryManager( - new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, s"${heap == "off"}") - .set(MEMORY_OFFHEAP_SIZE.key, "102400000"), - Long.MaxValue, - Long.MaxValue, - 1), - 0) - val map = new BytesToBytesMap(taskMemoryManager, 1024, 64L<<20) + benchmark.addCase("Java HashMap (UnsafeRow)") { _ => + var i = 0 val keyBytes = new Array[Byte](16) val valueBytes = new Array[Byte](16) val key = new UnsafeRow(1) key.pointTo(keyBytes, Platform.BYTE_ARRAY_OFFSET, 16) val value = new UnsafeRow(1) value.pointTo(valueBytes, Platform.BYTE_ARRAY_OFFSET, 16) - var i = 0 - val numKeys = 65536 - while (i < numKeys) { - key.setInt(0, i % 65536) - val loc = map.lookup(key.getBaseObject, key.getBaseOffset, key.getSizeInBytes, - Murmur3_x86_32.hashLong(i % 65536, 42)) - if (!loc.isDefined) { - loc.append(key.getBaseObject, key.getBaseOffset, key.getSizeInBytes, - value.getBaseObject, value.getBaseOffset, value.getSizeInBytes) - } + value.setInt(0, 555) + val map = new HashMap[UnsafeRow, UnsafeRow]() + while (i < 65536) { + key.setInt(0, i) + value.setInt(0, i) + map.put(key, value.copy()) i += 1 } - i = 0 var s = 0 + i = 0 while (i < N) { key.setInt(0, i % 100000) - val loc = map.lookup(key.getBaseObject, key.getBaseOffset, key.getSizeInBytes, - Murmur3_x86_32.hashLong(i % 100000, 42)) - if (loc.isDefined) { + if (map.get(key) != null) { s += 1 } i += 1 } } - } - benchmark.addCase("Aggregate HashMap") { iter => - var i = 0 - val numKeys = 65536 - val schema = new StructType() - .add("key", LongType) - .add("value", LongType) - val map = new AggregateHashMap(schema) - while (i < numKeys) { - val row = map.findOrInsert(i.toLong) - row.setLong(1, row.getLong(1) + 1) - i += 1 - } - var s = 0 - i = 0 - while (i < N) { - if (map.find(i % 100000) != -1) { - s += 1 - } - i += 1 + Seq(false, true).foreach { optimized => + benchmark.addCase(s"LongToUnsafeRowMap (opt=$optimized)") { _ => + var i = 0 + val valueBytes = new Array[Byte](16) + val value = new UnsafeRow(1) + value.pointTo(valueBytes, Platform.BYTE_ARRAY_OFFSET, 16) + value.setInt(0, 555) + val taskMemoryManager = new TaskMemoryManager( + new StaticMemoryManager( + new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), + Long.MaxValue, + Long.MaxValue, + 1), + 0) + val map = new LongToUnsafeRowMap(taskMemoryManager, 64) + while (i < 65536) { + value.setInt(0, i) + val key = i % 100000 + map.append(key, value) + i += 1 + } + if (optimized) { + map.optimize() + } + var s = 0 + i = 0 + while (i < N) { + val key = i % 100000 + if (map.getValue(key, value) != null) { + s += 1 + } + i += 1 + } + } + } + + Seq("off", "on").foreach { heap => + benchmark.addCase(s"BytesToBytesMap ($heap Heap)") { _ => + val taskMemoryManager = new TaskMemoryManager( + new StaticMemoryManager( + new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, s"${heap == "off"}") + .set(MEMORY_OFFHEAP_SIZE.key, "102400000"), + Long.MaxValue, + Long.MaxValue, + 1), + 0) + val map = new BytesToBytesMap(taskMemoryManager, 1024, 64L << 20) + val keyBytes = new Array[Byte](16) + val valueBytes = new Array[Byte](16) + val key = new UnsafeRow(1) + key.pointTo(keyBytes, Platform.BYTE_ARRAY_OFFSET, 16) + val value = new UnsafeRow(1) + value.pointTo(valueBytes, Platform.BYTE_ARRAY_OFFSET, 16) + var i = 0 + val numKeys = 65536 + while (i < numKeys) { + key.setInt(0, i % 65536) + val loc = map.lookup(key.getBaseObject, key.getBaseOffset, key.getSizeInBytes, + Murmur3_x86_32.hashLong(i % 65536, 42)) + if (!loc.isDefined) { + loc.append(key.getBaseObject, key.getBaseOffset, key.getSizeInBytes, + value.getBaseObject, value.getBaseOffset, value.getSizeInBytes) + } + i += 1 + } + i = 0 + var s = 0 + while (i < N) { + key.setInt(0, i % 100000) + val loc = map.lookup(key.getBaseObject, key.getBaseOffset, key.getSizeInBytes, + Murmur3_x86_32.hashLong(i % 100000, 42)) + if (loc.isDefined) { + s += 1 + } + i += 1 + } + } } - } - /* - Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz - BytesToBytesMap: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - UnsafeRow hash 267 / 284 78.4 12.8 1.0X - murmur3 hash 102 / 129 205.5 4.9 2.6X - fast hash 79 / 96 263.8 3.8 3.4X - arrayEqual 164 / 172 128.2 7.8 1.6X - Java HashMap (Long) 321 / 399 65.4 15.3 0.8X - Java HashMap (two ints) 328 / 363 63.9 15.7 0.8X - Java HashMap (UnsafeRow) 1140 / 1200 18.4 54.3 0.2X - LongToUnsafeRowMap (opt=false) 378 / 400 55.5 18.0 0.7X - LongToUnsafeRowMap (opt=true) 144 / 152 145.2 6.9 1.9X - BytesToBytesMap (off Heap) 1300 / 1616 16.1 62.0 0.2X - BytesToBytesMap (on Heap) 1165 / 1202 18.0 55.5 0.2X - Aggregate HashMap 121 / 131 173.3 5.8 2.2X - */ - benchmark.run() + benchmark.addCase("Aggregate HashMap") { _ => + var i = 0 + val numKeys = 65536 + val schema = new StructType() + .add("key", LongType) + .add("value", LongType) + val map = new AggregateHashMap(schema) + while (i < numKeys) { + val row = map.findOrInsert(i.toLong) + row.setLong(1, row.getLong(1) + 1) + i += 1 + } + var s = 0 + i = 0 + while (i < N) { + if (map.find(i % 100000) != -1) { + s += 1 + } + i += 1 + } + } + benchmark.run() + } } - } http://git-wip-us.apache.org/repos/asf/spark/blob/b96fd44f/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SqlBasedBenchmark.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SqlBasedBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SqlBasedBenchmark.scala new file mode 100644 index 0000000..e95e5a9 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SqlBasedBenchmark.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.benchmark + +import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.plans.SQLHelper +import org.apache.spark.sql.internal.SQLConf + +/** + * Common base trait to run benchmark with the Dataset and DataFrame API. + */ +trait SqlBasedBenchmark extends BenchmarkBase with SQLHelper { + + protected val spark: SparkSession = getSparkSession + + /** Subclass can override this function to build their own SparkSession */ + def getSparkSession: SparkSession = { + SparkSession.builder() + .master("local[1]") + .appName(this.getClass.getCanonicalName) + .config(SQLConf.SHUFFLE_PARTITIONS.key, 1) + .config(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, 1) + .getOrCreate() + } + + /** Runs function `f` with whole stage codegen on and off. */ + final def codegenBenchmark(name: String, cardinality: Long)(f: => Unit): Unit = { + val benchmark = new Benchmark(name, cardinality, output = output) + + benchmark.addCase(s"$name wholestage off", numIters = 2) { _ => + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { + f + } + } + + benchmark.addCase(s"$name wholestage on", numIters = 5) { _ => + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") { + f + } + } + + benchmark.run() + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org