Repository: spark
Updated Branches:
  refs/heads/master 30f5d0f2d -> b96fd44f0


[SPARK-25476][SPARK-25510][TEST] Refactor AggregateBenchmark and add a new 
trait to better support Dataset and DataFrame API

## What changes were proposed in this pull request?

This PR does 2 things:
1. Add a new trait(`SqlBasedBenchmark`) to better support Dataset and DataFrame 
API.
2. Refactor `AggregateBenchmark` to use main method. Generate benchmark result:
```
SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain 
org.apache.spark.sql.execution.benchmark.AggregateBenchmark"
```

## How was this patch tested?

manual tests

Closes #22484 from wangyum/SPARK-25476.

Lead-authored-by: Yuming Wang <yumw...@ebay.com>
Co-authored-by: Dongjoon Hyun <dongj...@apache.org>
Signed-off-by: Dongjoon Hyun <dongj...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b96fd44f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b96fd44f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b96fd44f

Branch: refs/heads/master
Commit: b96fd44f0e91751c1ce3a617cb083bdf880701a1
Parents: 30f5d0f
Author: Yuming Wang <yumw...@ebay.com>
Authored: Mon Oct 1 07:32:40 2018 -0700
Committer: Dongjoon Hyun <dongj...@apache.org>
Committed: Mon Oct 1 07:32:40 2018 -0700

----------------------------------------------------------------------
 .../benchmarks/AggregateBenchmark-results.txt   | 143 +++
 .../benchmark/AggregateBenchmark.scala          | 943 +++++++++----------
 .../execution/benchmark/SqlBasedBenchmark.scala |  60 ++
 3 files changed, 633 insertions(+), 513 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b96fd44f/sql/core/benchmarks/AggregateBenchmark-results.txt
----------------------------------------------------------------------
diff --git a/sql/core/benchmarks/AggregateBenchmark-results.txt 
b/sql/core/benchmarks/AggregateBenchmark-results.txt
new file mode 100644
index 0000000..19e5247
--- /dev/null
+++ b/sql/core/benchmarks/AggregateBenchmark-results.txt
@@ -0,0 +1,143 @@
+================================================================================================
+aggregate without grouping
+================================================================================================
+
+OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+agg w/o group:                           Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+agg w/o group wholestage off                65374 / 70665         32.1         
 31.2       1.0X
+agg w/o group wholestage on                   1178 / 1209       1779.8         
  0.6      55.5X
+
+
+================================================================================================
+stat functions
+================================================================================================
+
+OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+stddev:                                  Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+stddev wholestage off                         8667 / 8851         12.1         
 82.7       1.0X
+stddev wholestage on                          1266 / 1273         82.8         
 12.1       6.8X
+
+OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+kurtosis:                                Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+kurtosis wholestage off                     41218 / 41231          2.5         
393.1       1.0X
+kurtosis wholestage on                        1347 / 1357         77.8         
 12.8      30.6X
+
+
+================================================================================================
+aggregate with linear keys
+================================================================================================
+
+OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Aggregate w keys:                        Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+codegen = F                                   9309 / 9389          9.0         
111.0       1.0X
+codegen = T hashmap = F                       4417 / 4435         19.0         
 52.7       2.1X
+codegen = T hashmap = T                       1289 / 1298         65.1         
 15.4       7.2X
+
+
+================================================================================================
+aggregate with randomized keys
+================================================================================================
+
+OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Aggregate w keys:                        Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+codegen = F                                 11424 / 11426          7.3         
136.2       1.0X
+codegen = T hashmap = F                       6441 / 6496         13.0         
 76.8       1.8X
+codegen = T hashmap = T                       2333 / 2344         36.0         
 27.8       4.9X
+
+
+================================================================================================
+aggregate with string key
+================================================================================================
+
+OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Aggregate w string key:                  Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+codegen = F                                   4751 / 4890          4.4         
226.5       1.0X
+codegen = T hashmap = F                       3146 / 3182          6.7         
150.0       1.5X
+codegen = T hashmap = T                       2211 / 2261          9.5         
105.4       2.1X
+
+
+================================================================================================
+aggregate with decimal key
+================================================================================================
+
+OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Aggregate w decimal key:                 Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+codegen = F                                   3029 / 3062          6.9         
144.4       1.0X
+codegen = T hashmap = F                       1534 / 1569         13.7         
 73.2       2.0X
+codegen = T hashmap = T                        575 /  578         36.5         
 27.4       5.3X
+
+
+================================================================================================
+aggregate with multiple key types
+================================================================================================
+
+OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Aggregate w multiple keys:               Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+codegen = F                                   7506 / 7521          2.8         
357.9       1.0X
+codegen = T hashmap = F                       4791 / 4808          4.4         
228.5       1.6X
+codegen = T hashmap = T                       3553 / 3585          5.9         
169.4       2.1X
+
+
+================================================================================================
+max function bytecode size of wholestagecodegen
+================================================================================================
+
+OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+max function bytecode size:              Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+codegen = F                                    608 /  656          1.1         
927.1       1.0X
+codegen = T hugeMethodLimit = 10000            402 /  419          1.6         
613.5       1.5X
+codegen = T hugeMethodLimit = 1500             616 /  619          1.1         
939.9       1.0X
+
+
+================================================================================================
+cube
+================================================================================================
+
+OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+cube:                                    Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+cube wholestage off                           3229 / 3237          1.6         
615.9       1.0X
+cube wholestage on                            1285 / 1306          4.1         
245.2       2.5X
+
+
+================================================================================================
+hash and BytesToBytesMap
+================================================================================================
+
+OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+BytesToBytesMap:                         Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+UnsafeRowhash                                  328 /  330         64.0         
 15.6       1.0X
+murmur3 hash                                   167 /  167        125.4         
  8.0       2.0X
+fast hash                                       84 /   85        249.0         
  4.0       3.9X
+arrayEqual                                     192 /  192        109.3         
  9.1       1.7X
+Java HashMap (Long)                            144 /  147        145.9         
  6.9       2.3X
+Java HashMap (two ints)                        147 /  153        142.3         
  7.0       2.2X
+Java HashMap (UnsafeRow)                       785 /  788         26.7         
 37.4       0.4X
+LongToUnsafeRowMap (opt=false)                 456 /  457         46.0         
 21.8       0.7X
+LongToUnsafeRowMap (opt=true)                  125 /  125        168.3         
  5.9       2.6X
+BytesToBytesMap (off Heap)                     885 /  885         23.7         
 42.2       0.4X
+BytesToBytesMap (on Heap)                      860 /  864         24.4         
 41.0       0.4X
+Aggregate HashMap                               56 /   56        373.9         
  2.7       5.8X
+
+

http://git-wip-us.apache.org/repos/asf/spark/blob/b96fd44f/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
index 57a6fdb..296ae10 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
@@ -34,621 +34,538 @@ import org.apache.spark.unsafe.map.BytesToBytesMap
 
 /**
  * Benchmark to measure performance for aggregate primitives.
- * To run this:
- *  build/sbt "sql/test-only *benchmark.AggregateBenchmark"
- *
- * Benchmarks in this file are skipped in normal builds.
+ * To run this benchmark:
+ * {{{
+ *   1. without sbt: bin/spark-submit --class <this class> <spark sql test jar>
+ *   2. build/sbt "sql/test:runMain <this class>"
+ *   3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt 
"sql/test:runMain <this class>"
+ *      Results will be written to "benchmarks/AggregateBenchmark-results.txt".
+ * }}}
  */
-class AggregateBenchmark extends BenchmarkWithCodegen {
+object AggregateBenchmark extends SqlBasedBenchmark {
 
-  ignore("aggregate without grouping") {
-    val N = 500L << 22
-    val benchmark = new Benchmark("agg without grouping", N)
-    runBenchmark("agg w/o group", N) {
-      sparkSession.range(N).selectExpr("sum(id)").collect()
+  override def benchmark(): Unit = {
+    runBenchmark("aggregate without grouping") {
+      val N = 500L << 22
+      codegenBenchmark("agg w/o group", N) {
+        spark.range(N).selectExpr("sum(id)").collect()
+      }
     }
-    /*
-    agg w/o group:                           Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
-    
------------------------------------------------------------------------------------------------
-    agg w/o group wholestage off                30136 / 31885         69.6     
     14.4       1.0X
-    agg w/o group wholestage on                   1851 / 1860       1132.9     
      0.9      16.3X
-     */
-  }
 
-  ignore("stat functions") {
-    val N = 100L << 20
+    runBenchmark("stat functions") {
+      val N = 100L << 20
 
-    runBenchmark("stddev", N) {
-      sparkSession.range(N).groupBy().agg("id" -> "stddev").collect()
-    }
+      codegenBenchmark("stddev", N) {
+        spark.range(N).groupBy().agg("id" -> "stddev").collect()
+      }
 
-    runBenchmark("kurtosis", N) {
-      sparkSession.range(N).groupBy().agg("id" -> "kurtosis").collect()
+      codegenBenchmark("kurtosis", N) {
+        spark.range(N).groupBy().agg("id" -> "kurtosis").collect()
+      }
     }
 
-    /*
-    Using ImperativeAggregate (as implemented in Spark 1.6):
-
-      Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
-      stddev:                            Avg Time(ms)    Avg Rate(M/s)  
Relative Rate
-      
-------------------------------------------------------------------------------
-      stddev w/o codegen                      2019.04            10.39         
1.00 X
-      stddev w codegen                        2097.29            10.00         
0.96 X
-      kurtosis w/o codegen                    2108.99             9.94         
0.96 X
-      kurtosis w codegen                      2090.69            10.03         
0.97 X
-
-      Using DeclarativeAggregate:
-
-      Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
-      stddev:                             Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
-      
-------------------------------------------------------------------------------------------
-      stddev codegen=false                     5630 / 5776         18.0        
  55.6       1.0X
-      stddev codegen=true                      1259 / 1314         83.0        
  12.0       4.5X
-
-      Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
-      kurtosis:                           Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
-      
-------------------------------------------------------------------------------------------
-      kurtosis codegen=false                 14847 / 15084          7.0        
 142.9       1.0X
-      kurtosis codegen=true                    1652 / 2124         63.0        
  15.9       9.0X
-    */
-  }
-
-  ignore("aggregate with linear keys") {
-    val N = 20 << 22
+    runBenchmark("aggregate with linear keys") {
+      val N = 20 << 22
 
-    val benchmark = new Benchmark("Aggregate w keys", N)
-    def f(): Unit = {
-      sparkSession.range(N).selectExpr("(id & 65535) as 
k").groupBy("k").sum().collect()
-    }
+      val benchmark = new Benchmark("Aggregate w keys", N, output = output)
 
-    benchmark.addCase(s"codegen = F", numIters = 2) { iter =>
-      sparkSession.conf.set("spark.sql.codegen.wholeStage", "false")
-      f()
-    }
+      def f(): Unit = {
+        spark.range(N).selectExpr("(id & 65535) as 
k").groupBy("k").sum().collect()
+      }
 
-    benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter =>
-      sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
-      
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", 
"false")
-      
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", 
"false")
-      f()
-    }
+      benchmark.addCase("codegen = F", numIters = 2) { _ =>
+        withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
+          f()
+        }
+      }
 
-    benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { iter =>
-      sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
-      
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", 
"true")
-      
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", 
"true")
-      f()
-    }
+      benchmark.addCase("codegen = T hashmap = F", numIters = 3) { _ =>
+        withSQLConf(
+          SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
+          SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "false",
+          "spark.sql.codegen.aggregate.map.vectorized.enable" -> "false") {
+          f()
+        }
+      }
 
-    benchmark.run()
+      benchmark.addCase("codegen = T hashmap = T", numIters = 5) { _ =>
+        withSQLConf(
+          SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
+          SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "true",
+          "spark.sql.codegen.aggregate.map.vectorized.enable" -> "true") {
+          f()
+        }
+      }
 
-    /*
-    Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11
-    Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
+      benchmark.run()
+    }
 
-    Aggregate w keys:                        Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
-    
------------------------------------------------------------------------------------------------
-    codegen = F                                   6619 / 6780         12.7     
     78.9       1.0X
-    codegen = T hashmap = F                       3935 / 4059         21.3     
     46.9       1.7X
-    codegen = T hashmap = T                        897 /  971         93.5     
     10.7       7.4X
-    */
-  }
+    runBenchmark("aggregate with randomized keys") {
+      val N = 20 << 22
 
-  ignore("aggregate with randomized keys") {
-    val N = 20 << 22
+      val benchmark = new Benchmark("Aggregate w keys", N, output = output)
+      spark.range(N).selectExpr("id", "floor(rand() * 10000) as k")
+        .createOrReplaceTempView("test")
 
-    val benchmark = new Benchmark("Aggregate w keys", N)
-    sparkSession.range(N).selectExpr("id", "floor(rand() * 10000) as k")
-      .createOrReplaceTempView("test")
+      def f(): Unit = spark.sql("select k, k, sum(id) from test group by k, 
k").collect()
 
-    def f(): Unit = sparkSession.sql("select k, k, sum(id) from test group by 
k, k").collect()
+      benchmark.addCase("codegen = F", numIters = 2) { _ =>
+        withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
+          f()
+        }
+      }
 
-    benchmark.addCase(s"codegen = F", numIters = 2) { iter =>
-      sparkSession.conf.set("spark.sql.codegen.wholeStage", value = false)
-      f()
-    }
+      benchmark.addCase("codegen = T hashmap = F", numIters = 3) { _ =>
+        withSQLConf(
+          SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
+          SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "false",
+          "spark.sql.codegen.aggregate.map.vectorized.enable" -> "false") {
+          f()
+        }
+      }
 
-    benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter =>
-      sparkSession.conf.set("spark.sql.codegen.wholeStage", value = true)
-      
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", 
"false")
-      
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", 
"false")
-      f()
-    }
+      benchmark.addCase("codegen = T hashmap = T", numIters = 5) { _ =>
+        withSQLConf(
+          SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
+          SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "true",
+          "spark.sql.codegen.aggregate.map.vectorized.enable" -> "true") {
+          f()
+        }
+      }
 
-    benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { iter =>
-      sparkSession.conf.set("spark.sql.codegen.wholeStage", value = true)
-      
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", 
"true")
-      
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", 
"true")
-      f()
+      benchmark.run()
     }
 
-    benchmark.run()
-
-    /*
-    Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11
-    Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
+    runBenchmark("aggregate with string key") {
+      val N = 20 << 20
 
-    Aggregate w keys:                        Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
-    
------------------------------------------------------------------------------------------------
-    codegen = F                                   7445 / 7517         11.3     
     88.7       1.0X
-    codegen = T hashmap = F                       4672 / 4703         18.0     
     55.7       1.6X
-    codegen = T hashmap = T                       1764 / 1958         47.6     
     21.0       4.2X
-    */
-  }
+      val benchmark = new Benchmark("Aggregate w string key", N, output = 
output)
 
-  ignore("aggregate with string key") {
-    val N = 20 << 20
+      def f(): Unit = spark.range(N).selectExpr("id", "cast(id & 1023 as 
string) as k")
+        .groupBy("k").count().collect()
 
-    val benchmark = new Benchmark("Aggregate w string key", N)
-    def f(): Unit = sparkSession.range(N).selectExpr("id", "cast(id & 1023 as 
string) as k")
-      .groupBy("k").count().collect()
+      benchmark.addCase("codegen = F", numIters = 2) { _ =>
+        withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
+          f()
+        }
+      }
 
-    benchmark.addCase(s"codegen = F", numIters = 2) { iter =>
-      sparkSession.conf.set("spark.sql.codegen.wholeStage", "false")
-      f()
-    }
+      benchmark.addCase("codegen = T hashmap = F", numIters = 3) { _ =>
+        withSQLConf(
+          SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
+          SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "false",
+          "spark.sql.codegen.aggregate.map.vectorized.enable" -> "false") {
+          f()
+        }
+      }
 
-    benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter =>
-      sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
-      
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", 
"false")
-      
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", 
"false")
-      f()
-    }
+      benchmark.addCase("codegen = T hashmap = T", numIters = 5) { _ =>
+        withSQLConf(
+          SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
+          SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "true",
+          "spark.sql.codegen.aggregate.map.vectorized.enable" -> "true") {
+          f()
+        }
+      }
 
-    benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { iter =>
-      sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
-      
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", 
"true")
-      
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", 
"true")
-      f()
+      benchmark.run()
     }
 
-    benchmark.run()
-
-    /*
-    Java HotSpot(TM) 64-Bit Server VM 1.8.0_73-b02 on Mac OS X 10.11.4
-    Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
-    Aggregate w string key:             Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
-    
-------------------------------------------------------------------------------------------
-    codegen = F                              3307 / 3376          6.3         
157.7       1.0X
-    codegen = T hashmap = F                  2364 / 2471          8.9         
112.7       1.4X
-    codegen = T hashmap = T                  1740 / 1841         12.0          
83.0       1.9X
-    */
-  }
-
-  ignore("aggregate with decimal key") {
-    val N = 20 << 20
-
-    val benchmark = new Benchmark("Aggregate w decimal key", N)
-    def f(): Unit = sparkSession.range(N).selectExpr("id", "cast(id & 65535 as 
decimal) as k")
-      .groupBy("k").count().collect()
-
-    benchmark.addCase(s"codegen = F") { iter =>
-      sparkSession.conf.set("spark.sql.codegen.wholeStage", "false")
-      f()
-    }
+    runBenchmark("aggregate with decimal key") {
+      val N = 20 << 20
 
-    benchmark.addCase(s"codegen = T hashmap = F") { iter =>
-      sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
-      
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", 
"false")
-      
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", 
"false")
-      f()
-    }
+      val benchmark = new Benchmark("Aggregate w decimal key", N, output = 
output)
 
-    benchmark.addCase(s"codegen = T hashmap = T") { iter =>
-      sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
-      
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", 
"true")
-      
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", 
"true")
-      f()
-    }
+      def f(): Unit = spark.range(N).selectExpr("id", "cast(id & 65535 as 
decimal) as k")
+        .groupBy("k").count().collect()
 
-    benchmark.run()
-
-    /*
-    Java HotSpot(TM) 64-Bit Server VM 1.8.0_73-b02 on Mac OS X 10.11.4
-    Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
-    Aggregate w decimal key:             Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
-    
-------------------------------------------------------------------------------------------
-    codegen = F                              2756 / 2817          7.6         
131.4       1.0X
-    codegen = T hashmap = F                  1580 / 1647         13.3          
75.4       1.7X
-    codegen = T hashmap = T                   641 /  662         32.7          
30.6       4.3X
-    */
-  }
+      benchmark.addCase("codegen = F") { _ =>
+        withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
+          f()
+        }
+      }
 
-  ignore("aggregate with multiple key types") {
-    val N = 20 << 20
-
-    val benchmark = new Benchmark("Aggregate w multiple keys", N)
-    def f(): Unit = sparkSession.range(N)
-      .selectExpr(
-        "id",
-        "(id & 1023) as k1",
-        "cast(id & 1023 as string) as k2",
-        "cast(id & 1023 as int) as k3",
-        "cast(id & 1023 as double) as k4",
-        "cast(id & 1023 as float) as k5",
-        "id > 1023 as k6")
-      .groupBy("k1", "k2", "k3", "k4", "k5", "k6")
-      .sum()
-      .collect()
-
-    benchmark.addCase(s"codegen = F") { iter =>
-      sparkSession.conf.set("spark.sql.codegen.wholeStage", "false")
-      f()
-    }
+      benchmark.addCase("codegen = T hashmap = F") { _ =>
+        withSQLConf(
+          SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
+          SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "false",
+          "spark.sql.codegen.aggregate.map.vectorized.enable" -> "false") {
+          f()
+        }
+      }
 
-    benchmark.addCase(s"codegen = T hashmap = F") { iter =>
-      sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
-      
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", 
"false")
-      
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", 
"false")
-      f()
-    }
+      benchmark.addCase("codegen = T hashmap = T") { _ =>
+        withSQLConf(
+          SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
+          SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "true",
+          "spark.sql.codegen.aggregate.map.vectorized.enable" -> "true") {
+          f()
+        }
+      }
 
-    benchmark.addCase(s"codegen = T hashmap = T") { iter =>
-      sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
-      
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", 
"true")
-      
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", 
"true")
-      f()
+      benchmark.run()
     }
 
-    benchmark.run()
-
-    /*
-    Java HotSpot(TM) 64-Bit Server VM 1.8.0_73-b02 on Mac OS X 10.11.4
-    Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
-    Aggregate w decimal key:             Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
-    
-------------------------------------------------------------------------------------------
-    codegen = F                              5885 / 6091          3.6         
280.6       1.0X
-    codegen = T hashmap = F                  3625 / 4009          5.8         
172.8       1.6X
-    codegen = T hashmap = T                  3204 / 3271          6.5         
152.8       1.8X
-    */
-  }
+    runBenchmark("aggregate with multiple key types") {
+      val N = 20 << 20
 
-  ignore("max function bytecode size of wholestagecodegen") {
-    val N = 20 << 15
-
-    val benchmark = new Benchmark("max function bytecode size", N)
-    def f(): Unit = sparkSession.range(N)
-      .selectExpr(
-        "id",
-        "(id & 1023) as k1",
-        "cast(id & 1023 as double) as k2",
-        "cast(id & 1023 as int) as k3",
-        "case when id > 100 and id <= 200 then 1 else 0 end as v1",
-        "case when id > 200 and id <= 300 then 1 else 0 end as v2",
-        "case when id > 300 and id <= 400 then 1 else 0 end as v3",
-        "case when id > 400 and id <= 500 then 1 else 0 end as v4",
-        "case when id > 500 and id <= 600 then 1 else 0 end as v5",
-        "case when id > 600 and id <= 700 then 1 else 0 end as v6",
-        "case when id > 700 and id <= 800 then 1 else 0 end as v7",
-        "case when id > 800 and id <= 900 then 1 else 0 end as v8",
-        "case when id > 900 and id <= 1000 then 1 else 0 end as v9",
-        "case when id > 1000 and id <= 1100 then 1 else 0 end as v10",
-        "case when id > 1100 and id <= 1200 then 1 else 0 end as v11",
-        "case when id > 1200 and id <= 1300 then 1 else 0 end as v12",
-        "case when id > 1300 and id <= 1400 then 1 else 0 end as v13",
-        "case when id > 1400 and id <= 1500 then 1 else 0 end as v14",
-        "case when id > 1500 and id <= 1600 then 1 else 0 end as v15",
-        "case when id > 1600 and id <= 1700 then 1 else 0 end as v16",
-        "case when id > 1700 and id <= 1800 then 1 else 0 end as v17",
-        "case when id > 1800 and id <= 1900 then 1 else 0 end as v18")
-      .groupBy("k1", "k2", "k3")
-      .sum()
-      .collect()
-
-    benchmark.addCase("codegen = F") { iter =>
-      sparkSession.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "false")
-      f()
-    }
+      val benchmark = new Benchmark("Aggregate w multiple keys", N, output = 
output)
 
-    benchmark.addCase("codegen = T hugeMethodLimit = 10000") { iter =>
-      sparkSession.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true")
-      sparkSession.conf.set(SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key, "10000")
-      f()
-    }
+      def f(): Unit = spark.range(N)
+        .selectExpr(
+          "id",
+          "(id & 1023) as k1",
+          "cast(id & 1023 as string) as k2",
+          "cast(id & 1023 as int) as k3",
+          "cast(id & 1023 as double) as k4",
+          "cast(id & 1023 as float) as k5",
+          "id > 1023 as k6")
+        .groupBy("k1", "k2", "k3", "k4", "k5", "k6")
+        .sum()
+        .collect()
 
-    benchmark.addCase("codegen = T hugeMethodLimit = 1500") { iter =>
-      sparkSession.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true")
-      sparkSession.conf.set(SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key, "1500")
-      f()
-    }
+      benchmark.addCase("codegen = F") { _ =>
+        withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
+          f()
+        }
+      }
 
-    benchmark.run()
+      benchmark.addCase("codegen = T hashmap = F") { _ =>
+        withSQLConf(
+          SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
+          SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "false",
+          "spark.sql.codegen.aggregate.map.vectorized.enable" -> "false") {
+          f()
+        }
+      }
 
-    /*
-    Java HotSpot(TM) 64-Bit Server VM 1.8.0_31-b13 on Mac OS X 10.10.2
-    Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
+      benchmark.addCase("codegen = T hashmap = T") { _ =>
+        withSQLConf(
+          SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
+          SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "true",
+          "spark.sql.codegen.aggregate.map.vectorized.enable" -> "true") {
+          f()
+        }
+      }
 
-    max function bytecode size:              Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
-    
------------------------------------------------------------------------------------------------
-    codegen = F                                    709 /  803          0.9     
   1082.1       1.0X
-    codegen = T hugeMethodLimit = 10000           3485 / 3548          0.2     
   5317.7       0.2X
-    codegen = T hugeMethodLimit = 1500             636 /  701          1.0     
    969.9       1.1X
-     */
-  }
+      benchmark.run()
+    }
+
+    runBenchmark("max function bytecode size of wholestagecodegen") {
+      val N = 20 << 15
+
+      val benchmark = new Benchmark("max function bytecode size", N, output = 
output)
+
+      def f(): Unit = spark.range(N)
+        .selectExpr(
+          "id",
+          "(id & 1023) as k1",
+          "cast(id & 1023 as double) as k2",
+          "cast(id & 1023 as int) as k3",
+          "case when id > 100 and id <= 200 then 1 else 0 end as v1",
+          "case when id > 200 and id <= 300 then 1 else 0 end as v2",
+          "case when id > 300 and id <= 400 then 1 else 0 end as v3",
+          "case when id > 400 and id <= 500 then 1 else 0 end as v4",
+          "case when id > 500 and id <= 600 then 1 else 0 end as v5",
+          "case when id > 600 and id <= 700 then 1 else 0 end as v6",
+          "case when id > 700 and id <= 800 then 1 else 0 end as v7",
+          "case when id > 800 and id <= 900 then 1 else 0 end as v8",
+          "case when id > 900 and id <= 1000 then 1 else 0 end as v9",
+          "case when id > 1000 and id <= 1100 then 1 else 0 end as v10",
+          "case when id > 1100 and id <= 1200 then 1 else 0 end as v11",
+          "case when id > 1200 and id <= 1300 then 1 else 0 end as v12",
+          "case when id > 1300 and id <= 1400 then 1 else 0 end as v13",
+          "case when id > 1400 and id <= 1500 then 1 else 0 end as v14",
+          "case when id > 1500 and id <= 1600 then 1 else 0 end as v15",
+          "case when id > 1600 and id <= 1700 then 1 else 0 end as v16",
+          "case when id > 1700 and id <= 1800 then 1 else 0 end as v17",
+          "case when id > 1800 and id <= 1900 then 1 else 0 end as v18")
+        .groupBy("k1", "k2", "k3")
+        .sum()
+        .collect()
+
+      benchmark.addCase("codegen = F") { _ =>
+        withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
+          f()
+        }
+      }
 
+      benchmark.addCase("codegen = T hugeMethodLimit = 10000") { _ =>
+        withSQLConf(
+          SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
+          SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key -> "10000") {
+          f()
+        }
+      }
 
-  ignore("cube") {
-    val N = 5 << 20
+      benchmark.addCase("codegen = T hugeMethodLimit = 1500") { _ =>
+        withSQLConf(
+          SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
+          SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key -> "1500") {
+          f()
+        }
+      }
 
-    runBenchmark("cube", N) {
-      sparkSession.range(N).selectExpr("id", "id % 1000 as k1", "id & 256 as 
k2")
-        .cube("k1", "k2").sum("id").collect()
+      benchmark.run()
     }
 
-    /**
-    Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
-      cube:                               Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
-      
-------------------------------------------------------------------------------------------
-      cube codegen=false                       3188 / 3392          1.6        
 608.2       1.0X
-      cube codegen=true                        1239 / 1394          4.2        
 236.3       2.6X
-     */
-  }
-
-  ignore("hash and BytesToBytesMap") {
-    val N = 20 << 20
 
-    val benchmark = new Benchmark("BytesToBytesMap", N)
+    runBenchmark("cube") {
+      val N = 5 << 20
 
-    benchmark.addCase("UnsafeRowhash") { iter =>
-      var i = 0
-      val keyBytes = new Array[Byte](16)
-      val key = new UnsafeRow(1)
-      key.pointTo(keyBytes, Platform.BYTE_ARRAY_OFFSET, 16)
-      var s = 0
-      while (i < N) {
-        key.setInt(0, i % 1000)
-        val h = Murmur3_x86_32.hashUnsafeWords(
-          key.getBaseObject, key.getBaseOffset, key.getSizeInBytes, 42)
-        s += h
-        i += 1
+      codegenBenchmark("cube", N) {
+        spark.range(N).selectExpr("id", "id % 1000 as k1", "id & 256 as k2")
+          .cube("k1", "k2").sum("id").collect()
       }
     }
 
-    benchmark.addCase("murmur3 hash") { iter =>
-      var i = 0
-      val keyBytes = new Array[Byte](16)
-      val key = new UnsafeRow(1)
-      key.pointTo(keyBytes, Platform.BYTE_ARRAY_OFFSET, 16)
-      var p = 524283
-      var s = 0
-      while (i < N) {
-        var h = Murmur3_x86_32.hashLong(i, 42)
-        key.setInt(0, h)
-        s += h
-        i += 1
-      }
-    }
+    runBenchmark("hash and BytesToBytesMap") {
+      val N = 20 << 20
 
-    benchmark.addCase("fast hash") { iter =>
-      var i = 0
-      val keyBytes = new Array[Byte](16)
-      val key = new UnsafeRow(1)
-      key.pointTo(keyBytes, Platform.BYTE_ARRAY_OFFSET, 16)
-      var p = 524283
-      var s = 0
-      while (i < N) {
-        var h = i % p
-        if (h < 0) {
-          h += p
-        }
-        key.setInt(0, h)
-        s += h
-        i += 1
-      }
-    }
+      val benchmark = new Benchmark("BytesToBytesMap", N, output = output)
 
-    benchmark.addCase("arrayEqual") { iter =>
-      var i = 0
-      val keyBytes = new Array[Byte](16)
-      val valueBytes = new Array[Byte](16)
-      val key = new UnsafeRow(1)
-      key.pointTo(keyBytes, Platform.BYTE_ARRAY_OFFSET, 16)
-      val value = new UnsafeRow(1)
-      value.pointTo(valueBytes, Platform.BYTE_ARRAY_OFFSET, 16)
-      value.setInt(0, 555)
-      var s = 0
-      while (i < N) {
-        key.setInt(0, i % 1000)
-        if (key.equals(value)) {
-          s += 1
-        }
-        i += 1
+      benchmark.addCase("UnsafeRowhash") { _ =>
+        var i = 0
+        val keyBytes = new Array[Byte](16)
+        val key = new UnsafeRow(1)
+        key.pointTo(keyBytes, Platform.BYTE_ARRAY_OFFSET, 16)
+        var s = 0
+        while (i < N) {
+          key.setInt(0, i % 1000)
+          val h = Murmur3_x86_32.hashUnsafeWords(
+            key.getBaseObject, key.getBaseOffset, key.getSizeInBytes, 42)
+          s += h
+          i += 1
+        }
       }
-    }
 
-    benchmark.addCase("Java HashMap (Long)") { iter =>
-      var i = 0
-      val keyBytes = new Array[Byte](16)
-      val valueBytes = new Array[Byte](16)
-      val value = new UnsafeRow(1)
-      value.pointTo(valueBytes, Platform.BYTE_ARRAY_OFFSET, 16)
-      value.setInt(0, 555)
-      val map = new HashMap[Long, UnsafeRow]()
-      while (i < 65536) {
-        value.setInt(0, i)
-        map.put(i.toLong, value)
-        i += 1
-      }
-      var s = 0
-      i = 0
-      while (i < N) {
-        if (map.get(i % 100000) != null) {
-          s += 1
-        }
-        i += 1
+      benchmark.addCase("murmur3 hash") { _ =>
+        var i = 0
+        val keyBytes = new Array[Byte](16)
+        val key = new UnsafeRow(1)
+        key.pointTo(keyBytes, Platform.BYTE_ARRAY_OFFSET, 16)
+        var p = 524283
+        var s = 0
+        while (i < N) {
+          var h = Murmur3_x86_32.hashLong(i, 42)
+          key.setInt(0, h)
+          s += h
+          i += 1
+        }
       }
-    }
 
-    benchmark.addCase("Java HashMap (two ints) ") { iter =>
-      var i = 0
-      val valueBytes = new Array[Byte](16)
-      val value = new UnsafeRow(1)
-      value.pointTo(valueBytes, Platform.BYTE_ARRAY_OFFSET, 16)
-      value.setInt(0, 555)
-      val map = new HashMap[Long, UnsafeRow]()
-      while (i < 65536) {
-        value.setInt(0, i)
-        val key = (i.toLong << 32) + Integer.rotateRight(i, 15)
-        map.put(key, value)
-        i += 1
-      }
-      var s = 0
-      i = 0
-      while (i < N) {
-        val key = ((i & 100000).toLong << 32) + Integer.rotateRight(i & 
100000, 15)
-        if (map.get(key) != null) {
-          s += 1
-        }
-        i += 1
+      benchmark.addCase("fast hash") { _ =>
+        var i = 0
+        val keyBytes = new Array[Byte](16)
+        val key = new UnsafeRow(1)
+        key.pointTo(keyBytes, Platform.BYTE_ARRAY_OFFSET, 16)
+        var p = 524283
+        var s = 0
+        while (i < N) {
+          var h = i % p
+          if (h < 0) {
+            h += p
+          }
+          key.setInt(0, h)
+          s += h
+          i += 1
+        }
       }
-    }
 
-    benchmark.addCase("Java HashMap (UnsafeRow)") { iter =>
-      var i = 0
-      val keyBytes = new Array[Byte](16)
-      val valueBytes = new Array[Byte](16)
-      val key = new UnsafeRow(1)
-      key.pointTo(keyBytes, Platform.BYTE_ARRAY_OFFSET, 16)
-      val value = new UnsafeRow(1)
-      value.pointTo(valueBytes, Platform.BYTE_ARRAY_OFFSET, 16)
-      value.setInt(0, 555)
-      val map = new HashMap[UnsafeRow, UnsafeRow]()
-      while (i < 65536) {
-        key.setInt(0, i)
-        value.setInt(0, i)
-        map.put(key, value.copy())
-        i += 1
-      }
-      var s = 0
-      i = 0
-      while (i < N) {
-        key.setInt(0, i % 100000)
-        if (map.get(key) != null) {
-          s += 1
-        }
-        i += 1
+      benchmark.addCase("arrayEqual") { _ =>
+        var i = 0
+        val keyBytes = new Array[Byte](16)
+        val valueBytes = new Array[Byte](16)
+        val key = new UnsafeRow(1)
+        key.pointTo(keyBytes, Platform.BYTE_ARRAY_OFFSET, 16)
+        val value = new UnsafeRow(1)
+        value.pointTo(valueBytes, Platform.BYTE_ARRAY_OFFSET, 16)
+        value.setInt(0, 555)
+        var s = 0
+        while (i < N) {
+          key.setInt(0, i % 1000)
+          if (key.equals(value)) {
+            s += 1
+          }
+          i += 1
+        }
       }
-    }
 
-    Seq(false, true).foreach { optimized =>
-      benchmark.addCase(s"LongToUnsafeRowMap (opt=$optimized)") { iter =>
+      benchmark.addCase("Java HashMap (Long)") { _ =>
         var i = 0
+        val keyBytes = new Array[Byte](16)
         val valueBytes = new Array[Byte](16)
         val value = new UnsafeRow(1)
         value.pointTo(valueBytes, Platform.BYTE_ARRAY_OFFSET, 16)
         value.setInt(0, 555)
-        val taskMemoryManager = new TaskMemoryManager(
-          new StaticMemoryManager(
-            new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
-            Long.MaxValue,
-            Long.MaxValue,
-            1),
-          0)
-        val map = new LongToUnsafeRowMap(taskMemoryManager, 64)
+        val map = new HashMap[Long, UnsafeRow]()
         while (i < 65536) {
           value.setInt(0, i)
-          val key = i % 100000
-          map.append(key, value)
+          map.put(i.toLong, value)
           i += 1
         }
-        if (optimized) {
-          map.optimize()
+        var s = 0
+        i = 0
+        while (i < N) {
+          if (map.get(i % 100000) != null) {
+            s += 1
+          }
+          i += 1
+        }
+      }
+
+      benchmark.addCase("Java HashMap (two ints) ") { _ =>
+        var i = 0
+        val valueBytes = new Array[Byte](16)
+        val value = new UnsafeRow(1)
+        value.pointTo(valueBytes, Platform.BYTE_ARRAY_OFFSET, 16)
+        value.setInt(0, 555)
+        val map = new HashMap[Long, UnsafeRow]()
+        while (i < 65536) {
+          value.setInt(0, i)
+          val key = (i.toLong << 32) + Integer.rotateRight(i, 15)
+          map.put(key, value)
+          i += 1
         }
         var s = 0
         i = 0
         while (i < N) {
-          val key = i % 100000
-          if (map.getValue(key, value) != null) {
+          val key = ((i & 100000).toLong << 32) + Integer.rotateRight(i & 
100000, 15)
+          if (map.get(key) != null) {
             s += 1
           }
           i += 1
         }
       }
-    }
 
-    Seq("off", "on").foreach { heap =>
-      benchmark.addCase(s"BytesToBytesMap ($heap Heap)") { iter =>
-        val taskMemoryManager = new TaskMemoryManager(
-          new StaticMemoryManager(
-            new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, s"${heap == 
"off"}")
-              .set(MEMORY_OFFHEAP_SIZE.key, "102400000"),
-            Long.MaxValue,
-            Long.MaxValue,
-            1),
-          0)
-        val map = new BytesToBytesMap(taskMemoryManager, 1024, 64L<<20)
+      benchmark.addCase("Java HashMap (UnsafeRow)") { _ =>
+        var i = 0
         val keyBytes = new Array[Byte](16)
         val valueBytes = new Array[Byte](16)
         val key = new UnsafeRow(1)
         key.pointTo(keyBytes, Platform.BYTE_ARRAY_OFFSET, 16)
         val value = new UnsafeRow(1)
         value.pointTo(valueBytes, Platform.BYTE_ARRAY_OFFSET, 16)
-        var i = 0
-        val numKeys = 65536
-        while (i < numKeys) {
-          key.setInt(0, i % 65536)
-          val loc = map.lookup(key.getBaseObject, key.getBaseOffset, 
key.getSizeInBytes,
-            Murmur3_x86_32.hashLong(i % 65536, 42))
-          if (!loc.isDefined) {
-            loc.append(key.getBaseObject, key.getBaseOffset, 
key.getSizeInBytes,
-              value.getBaseObject, value.getBaseOffset, value.getSizeInBytes)
-          }
+        value.setInt(0, 555)
+        val map = new HashMap[UnsafeRow, UnsafeRow]()
+        while (i < 65536) {
+          key.setInt(0, i)
+          value.setInt(0, i)
+          map.put(key, value.copy())
           i += 1
         }
-        i = 0
         var s = 0
+        i = 0
         while (i < N) {
           key.setInt(0, i % 100000)
-          val loc = map.lookup(key.getBaseObject, key.getBaseOffset, 
key.getSizeInBytes,
-            Murmur3_x86_32.hashLong(i % 100000, 42))
-          if (loc.isDefined) {
+          if (map.get(key) != null) {
             s += 1
           }
           i += 1
         }
       }
-    }
 
-    benchmark.addCase("Aggregate HashMap") { iter =>
-      var i = 0
-      val numKeys = 65536
-      val schema = new StructType()
-        .add("key", LongType)
-        .add("value", LongType)
-      val map = new AggregateHashMap(schema)
-      while (i < numKeys) {
-        val row = map.findOrInsert(i.toLong)
-        row.setLong(1, row.getLong(1) +  1)
-        i += 1
-      }
-      var s = 0
-      i = 0
-      while (i < N) {
-        if (map.find(i % 100000) != -1) {
-          s += 1
-        }
-        i += 1
+      Seq(false, true).foreach { optimized =>
+        benchmark.addCase(s"LongToUnsafeRowMap (opt=$optimized)") { _ =>
+          var i = 0
+          val valueBytes = new Array[Byte](16)
+          val value = new UnsafeRow(1)
+          value.pointTo(valueBytes, Platform.BYTE_ARRAY_OFFSET, 16)
+          value.setInt(0, 555)
+          val taskMemoryManager = new TaskMemoryManager(
+            new StaticMemoryManager(
+              new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
+              Long.MaxValue,
+              Long.MaxValue,
+              1),
+            0)
+          val map = new LongToUnsafeRowMap(taskMemoryManager, 64)
+          while (i < 65536) {
+            value.setInt(0, i)
+            val key = i % 100000
+            map.append(key, value)
+            i += 1
+          }
+          if (optimized) {
+            map.optimize()
+          }
+          var s = 0
+          i = 0
+          while (i < N) {
+            val key = i % 100000
+            if (map.getValue(key, value) != null) {
+              s += 1
+            }
+            i += 1
+          }
+        }
+      }
+
+      Seq("off", "on").foreach { heap =>
+        benchmark.addCase(s"BytesToBytesMap ($heap Heap)") { _ =>
+          val taskMemoryManager = new TaskMemoryManager(
+            new StaticMemoryManager(
+              new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, s"${heap == 
"off"}")
+                .set(MEMORY_OFFHEAP_SIZE.key, "102400000"),
+              Long.MaxValue,
+              Long.MaxValue,
+              1),
+            0)
+          val map = new BytesToBytesMap(taskMemoryManager, 1024, 64L << 20)
+          val keyBytes = new Array[Byte](16)
+          val valueBytes = new Array[Byte](16)
+          val key = new UnsafeRow(1)
+          key.pointTo(keyBytes, Platform.BYTE_ARRAY_OFFSET, 16)
+          val value = new UnsafeRow(1)
+          value.pointTo(valueBytes, Platform.BYTE_ARRAY_OFFSET, 16)
+          var i = 0
+          val numKeys = 65536
+          while (i < numKeys) {
+            key.setInt(0, i % 65536)
+            val loc = map.lookup(key.getBaseObject, key.getBaseOffset, 
key.getSizeInBytes,
+              Murmur3_x86_32.hashLong(i % 65536, 42))
+            if (!loc.isDefined) {
+              loc.append(key.getBaseObject, key.getBaseOffset, 
key.getSizeInBytes,
+                value.getBaseObject, value.getBaseOffset, value.getSizeInBytes)
+            }
+            i += 1
+          }
+          i = 0
+          var s = 0
+          while (i < N) {
+            key.setInt(0, i % 100000)
+            val loc = map.lookup(key.getBaseObject, key.getBaseOffset, 
key.getSizeInBytes,
+              Murmur3_x86_32.hashLong(i % 100000, 42))
+            if (loc.isDefined) {
+              s += 1
+            }
+            i += 1
+          }
+        }
       }
-    }
 
-    /*
-    Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
-    BytesToBytesMap:                    Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
-    
-------------------------------------------------------------------------------------------
-    UnsafeRow hash                            267 /  284         78.4          
12.8       1.0X
-    murmur3 hash                              102 /  129        205.5          
 4.9       2.6X
-    fast hash                                  79 /   96        263.8          
 3.8       3.4X
-    arrayEqual                                164 /  172        128.2          
 7.8       1.6X
-    Java HashMap (Long)                       321 /  399         65.4          
15.3       0.8X
-    Java HashMap (two ints)                   328 /  363         63.9          
15.7       0.8X
-    Java HashMap (UnsafeRow)                 1140 / 1200         18.4          
54.3       0.2X
-    LongToUnsafeRowMap (opt=false)            378 /  400         55.5          
18.0       0.7X
-    LongToUnsafeRowMap (opt=true)             144 /  152        145.2          
 6.9       1.9X
-    BytesToBytesMap (off Heap)               1300 / 1616         16.1          
62.0       0.2X
-    BytesToBytesMap (on Heap)                1165 / 1202         18.0          
55.5       0.2X
-    Aggregate HashMap                         121 /  131        173.3          
 5.8       2.2X
-    */
-    benchmark.run()
+      benchmark.addCase("Aggregate HashMap") { _ =>
+        var i = 0
+        val numKeys = 65536
+        val schema = new StructType()
+          .add("key", LongType)
+          .add("value", LongType)
+        val map = new AggregateHashMap(schema)
+        while (i < numKeys) {
+          val row = map.findOrInsert(i.toLong)
+          row.setLong(1, row.getLong(1) + 1)
+          i += 1
+        }
+        var s = 0
+        i = 0
+        while (i < N) {
+          if (map.find(i % 100000) != -1) {
+            s += 1
+          }
+          i += 1
+        }
+      }
+      benchmark.run()
+    }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b96fd44f/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SqlBasedBenchmark.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SqlBasedBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SqlBasedBenchmark.scala
new file mode 100644
index 0000000..e95e5a9
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SqlBasedBenchmark.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.plans.SQLHelper
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Common base trait to run benchmark with the Dataset and DataFrame API.
+ */
+trait SqlBasedBenchmark extends BenchmarkBase with SQLHelper {
+
+  protected val spark: SparkSession = getSparkSession
+
+  /** Subclass can override this function to build their own SparkSession */
+  def getSparkSession: SparkSession = {
+    SparkSession.builder()
+      .master("local[1]")
+      .appName(this.getClass.getCanonicalName)
+      .config(SQLConf.SHUFFLE_PARTITIONS.key, 1)
+      .config(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, 1)
+      .getOrCreate()
+  }
+
+  /** Runs function `f` with whole stage codegen on and off. */
+  final def codegenBenchmark(name: String, cardinality: Long)(f: => Unit): 
Unit = {
+    val benchmark = new Benchmark(name, cardinality, output = output)
+
+    benchmark.addCase(s"$name wholestage off", numIters = 2) { _ =>
+      withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
+        f
+      }
+    }
+
+    benchmark.addCase(s"$name wholestage on", numIters = 5) { _ =>
+      withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") {
+        f
+      }
+    }
+
+    benchmark.run()
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to