Github user dongjoon-hyun commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22484#discussion_r220799981
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
 ---
    @@ -34,621 +34,539 @@ import org.apache.spark.unsafe.map.BytesToBytesMap
     
     /**
      * Benchmark to measure performance for aggregate primitives.
    - * To run this:
    - *  build/sbt "sql/test-only *benchmark.AggregateBenchmark"
    - *
    - * Benchmarks in this file are skipped in normal builds.
    + * To run this benchmark:
    + * {{{
    + *   1. without sbt: bin/spark-submit --class <this class> <spark sql test 
jar>
    + *   2. build/sbt "sql/test:runMain <this class>"
    + *   3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt 
"sql/test:runMain <this class>"
    + *      Results will be written to 
"benchmarks/AggregateBenchmark-results.txt".
    + * }}}
      */
    -class AggregateBenchmark extends BenchmarkWithCodegen {
    +object AggregateBenchmark extends SqlBasedBenchmark {
     
    -  ignore("aggregate without grouping") {
    -    val N = 500L << 22
    -    val benchmark = new Benchmark("agg without grouping", N)
    -    runBenchmark("agg w/o group", N) {
    -      sparkSession.range(N).selectExpr("sum(id)").collect()
    +  override def benchmark(): Unit = {
    +    runBenchmark("aggregate without grouping") {
    +      val N = 500L << 22
    +      runBenchmarkWithCodegen("agg w/o group", N) {
    +        spark.range(N).selectExpr("sum(id)").collect()
    +      }
         }
    -    /*
    -    agg w/o group:                           Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
    -    
------------------------------------------------------------------------------------------------
    -    agg w/o group wholestage off                30136 / 31885         69.6 
         14.4       1.0X
    -    agg w/o group wholestage on                   1851 / 1860       1132.9 
          0.9      16.3X
    -     */
    -  }
     
    -  ignore("stat functions") {
    -    val N = 100L << 20
    +    runBenchmark("stat functions") {
    +      val N = 100L << 20
     
    -    runBenchmark("stddev", N) {
    -      sparkSession.range(N).groupBy().agg("id" -> "stddev").collect()
    -    }
    +      runBenchmarkWithCodegen("stddev", N) {
    +        spark.range(N).groupBy().agg("id" -> "stddev").collect()
    +      }
     
    -    runBenchmark("kurtosis", N) {
    -      sparkSession.range(N).groupBy().agg("id" -> "kurtosis").collect()
    +      runBenchmarkWithCodegen("kurtosis", N) {
    +        spark.range(N).groupBy().agg("id" -> "kurtosis").collect()
    +      }
         }
     
    -    /*
    -    Using ImperativeAggregate (as implemented in Spark 1.6):
    -
    -      Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
    -      stddev:                            Avg Time(ms)    Avg Rate(M/s)  
Relative Rate
    -      
-------------------------------------------------------------------------------
    -      stddev w/o codegen                      2019.04            10.39     
    1.00 X
    -      stddev w codegen                        2097.29            10.00     
    0.96 X
    -      kurtosis w/o codegen                    2108.99             9.94     
    0.96 X
    -      kurtosis w codegen                      2090.69            10.03     
    0.97 X
    -
    -      Using DeclarativeAggregate:
    -
    -      Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
    -      stddev:                             Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
    -      
-------------------------------------------------------------------------------------------
    -      stddev codegen=false                     5630 / 5776         18.0    
      55.6       1.0X
    -      stddev codegen=true                      1259 / 1314         83.0    
      12.0       4.5X
    -
    -      Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
    -      kurtosis:                           Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
    -      
-------------------------------------------------------------------------------------------
    -      kurtosis codegen=false                 14847 / 15084          7.0    
     142.9       1.0X
    -      kurtosis codegen=true                    1652 / 2124         63.0    
      15.9       9.0X
    -    */
    -  }
    -
    -  ignore("aggregate with linear keys") {
    -    val N = 20 << 22
    +    runBenchmark("aggregate with linear keys") {
    +      val N = 20 << 22
     
    -    val benchmark = new Benchmark("Aggregate w keys", N)
    -    def f(): Unit = {
    -      sparkSession.range(N).selectExpr("(id & 65535) as 
k").groupBy("k").sum().collect()
    -    }
    +      val benchmark = new Benchmark("Aggregate w keys", N, output = output)
     
    -    benchmark.addCase(s"codegen = F", numIters = 2) { iter =>
    -      sparkSession.conf.set("spark.sql.codegen.wholeStage", "false")
    -      f()
    -    }
    +      def f(): Unit = {
    +        spark.range(N).selectExpr("(id & 65535) as 
k").groupBy("k").sum().collect()
    +      }
     
    -    benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter =>
    -      sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
    -      
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", 
"false")
    -      
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", 
"false")
    -      f()
    -    }
    +      benchmark.addCase(s"codegen = F", numIters = 2) { _ =>
    +        withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
    +          f()
    +        }
    +      }
     
    -    benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { iter =>
    -      sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
    -      
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", 
"true")
    -      
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", 
"true")
    -      f()
    -    }
    +      benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { _ =>
    +        withSQLConf(
    +          SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
    +          SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "false",
    +          "spark.sql.codegen.aggregate.map.vectorized.enable" -> "false") {
    +          f()
    +        }
    +      }
     
    -    benchmark.run()
    +      benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { _ =>
    +        withSQLConf(
    +          SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
    +          SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "true",
    +          "spark.sql.codegen.aggregate.map.vectorized.enable" -> "true") {
    +          f()
    +        }
    +      }
     
    -    /*
    -    Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11
    -    Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
    +      benchmark.run()
    +    }
     
    -    Aggregate w keys:                        Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
    -    
------------------------------------------------------------------------------------------------
    -    codegen = F                                   6619 / 6780         12.7 
         78.9       1.0X
    -    codegen = T hashmap = F                       3935 / 4059         21.3 
         46.9       1.7X
    -    codegen = T hashmap = T                        897 /  971         93.5 
         10.7       7.4X
    -    */
    -  }
    +    runBenchmark("aggregate with randomized keys") {
    +      val N = 20 << 22
     
    -  ignore("aggregate with randomized keys") {
    -    val N = 20 << 22
    +      val benchmark = new Benchmark("Aggregate w keys", N, output = output)
    +      spark.range(N).selectExpr("id", "floor(rand() * 10000) as k")
    +        .createOrReplaceTempView("test")
     
    -    val benchmark = new Benchmark("Aggregate w keys", N)
    -    sparkSession.range(N).selectExpr("id", "floor(rand() * 10000) as k")
    -      .createOrReplaceTempView("test")
    +      def f(): Unit = spark.sql("select k, k, sum(id) from test group by 
k, k").collect()
     
    -    def f(): Unit = sparkSession.sql("select k, k, sum(id) from test group 
by k, k").collect()
    +      benchmark.addCase(s"codegen = F", numIters = 2) { _ =>
    +        withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
    +          f()
    +        }
    +      }
     
    -    benchmark.addCase(s"codegen = F", numIters = 2) { iter =>
    -      sparkSession.conf.set("spark.sql.codegen.wholeStage", value = false)
    -      f()
    -    }
    +      benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { _ =>
    +        withSQLConf(
    +          SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
    +          SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "false",
    +          "spark.sql.codegen.aggregate.map.vectorized.enable" -> "false") {
    +          f()
    +        }
    +      }
     
    -    benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter =>
    -      sparkSession.conf.set("spark.sql.codegen.wholeStage", value = true)
    -      
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", 
"false")
    -      
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", 
"false")
    -      f()
    -    }
    +      benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { _ =>
    +        withSQLConf(
    +          SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
    +          SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "true",
    +          "spark.sql.codegen.aggregate.map.vectorized.enable" -> "true") {
    +          f()
    +        }
    +      }
     
    -    benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { iter =>
    -      sparkSession.conf.set("spark.sql.codegen.wholeStage", value = true)
    -      
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", 
"true")
    -      
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", 
"true")
    -      f()
    +      benchmark.run()
         }
     
    -    benchmark.run()
    -
    -    /*
    -    Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11
    -    Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
    +    runBenchmark("aggregate with string key") {
    +      val N = 20 << 20
     
    -    Aggregate w keys:                        Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
    -    
------------------------------------------------------------------------------------------------
    -    codegen = F                                   7445 / 7517         11.3 
         88.7       1.0X
    -    codegen = T hashmap = F                       4672 / 4703         18.0 
         55.7       1.6X
    -    codegen = T hashmap = T                       1764 / 1958         47.6 
         21.0       4.2X
    -    */
    -  }
    +      val benchmark = new Benchmark("Aggregate w string key", N, output = 
output)
     
    -  ignore("aggregate with string key") {
    -    val N = 20 << 20
    +      def f(): Unit = spark.range(N).selectExpr("id", "cast(id & 1023 as 
string) as k")
    +        .groupBy("k").count().collect()
     
    -    val benchmark = new Benchmark("Aggregate w string key", N)
    -    def f(): Unit = sparkSession.range(N).selectExpr("id", "cast(id & 1023 
as string) as k")
    -      .groupBy("k").count().collect()
    +      benchmark.addCase(s"codegen = F", numIters = 2) { _ =>
    +        spark.conf.set("spark.sql.codegen.wholeStage", "false")
    --- End diff --
    
    Shall we remove this redundant line 148?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to