Github user dongjoon-hyun commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22661#discussion_r223220438
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala
 ---
    @@ -19,229 +19,164 @@ package org.apache.spark.sql.execution.benchmark
     
     import org.apache.spark.sql.execution.joins._
     import org.apache.spark.sql.functions._
    +import org.apache.spark.sql.internal.SQLConf
     import org.apache.spark.sql.types.IntegerType
     
     /**
      * Benchmark to measure performance for aggregate primitives.
    - * To run this:
    - *  build/sbt "sql/test-only *benchmark.JoinBenchmark"
    - *
    - * Benchmarks in this file are skipped in normal builds.
    + * To run this benchmark:
    + * {{{
    + *   1. without sbt:
    + *      bin/spark-submit --class <this class> --jars <spark core test jar> 
<spark sql test jar>
    + *   2. build/sbt "sql/test:runMain <this class>"
    + *   3. generate result:
    + *      SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this 
class>"
    + *      Results will be written to "benchmarks/JoinBenchmark-results.txt".
    + * }}}
      */
    -class JoinBenchmark extends BenchmarkWithCodegen {
    +object JoinBenchmark extends SqlBasedBenchmark {
     
    -  ignore("broadcast hash join, long key") {
    +  def broadcastHashJoinLongKey(): Unit = {
         val N = 20 << 20
         val M = 1 << 16
     
    -    val dim = broadcast(sparkSession.range(M).selectExpr("id as k", 
"cast(id as string) as v"))
    -    runBenchmark("Join w long", N) {
    -      val df = sparkSession.range(N).join(dim, (col("id") % M) === 
col("k"))
    +    val dim = broadcast(spark.range(M).selectExpr("id as k", "cast(id as 
string) as v"))
    +    codegenBenchmark("Join w long", N) {
    +      val df = spark.range(N).join(dim, (col("id") % M) === col("k"))
           
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
           df.count()
         }
    -
    -    /*
    -    Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5
    -    Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
    -    Join w long:                        Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
    -    
-------------------------------------------------------------------------------------------
    -    Join w long codegen=false                3002 / 3262          7.0      
   143.2       1.0X
    -    Join w long codegen=true                  321 /  371         65.3      
    15.3       9.3X
    -    */
       }
     
    -  ignore("broadcast hash join, long key with duplicates") {
    +
    +  def broadcastHashJoinLongKeyWithDuplicates(): Unit = {
         val N = 20 << 20
         val M = 1 << 16
     
    -    val dim = broadcast(sparkSession.range(M).selectExpr("id as k", 
"cast(id as string) as v"))
    -    runBenchmark("Join w long duplicated", N) {
    -      val dim = broadcast(sparkSession.range(M).selectExpr("cast(id/10 as 
long) as k"))
    -      val df = sparkSession.range(N).join(dim, (col("id") % M) === 
col("k"))
    +    codegenBenchmark("Join w long duplicated", N) {
    +      val dim = broadcast(spark.range(M).selectExpr("cast(id/10 as long) 
as k"))
    +      val df = spark.range(N).join(dim, (col("id") % M) === col("k"))
           
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
           df.count()
         }
    -
    -    /*
    -     *Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5
    -     *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
    -     *Join w long duplicated:             Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
    -     
*-------------------------------------------------------------------------------------------
    -     *Join w long duplicated codegen=false      3446 / 3478          6.1   
      164.3       1.0X
    -     *Join w long duplicated codegen=true       322 /  351         65.2    
      15.3      10.7X
    -     */
       }
     
    -  ignore("broadcast hash join, two int key") {
    +  def broadcastHashJoinTwoIntKey(): Unit = {
         val N = 20 << 20
         val M = 1 << 16
    -    val dim2 = broadcast(sparkSession.range(M)
    +    val dim2 = broadcast(spark.range(M)
           .selectExpr("cast(id as int) as k1", "cast(id as int) as k2", 
"cast(id as string) as v"))
     
    -    runBenchmark("Join w 2 ints", N) {
    -      val df = sparkSession.range(N).join(dim2,
    +    codegenBenchmark("Join w 2 ints", N) {
    +      val df = spark.range(N).join(dim2,
             (col("id") % M).cast(IntegerType) === col("k1")
               && (col("id") % M).cast(IntegerType) === col("k2"))
           
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
           df.count()
         }
    -
    -    /*
    -     *Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5
    -     *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
    -     *Join w 2 ints:                      Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
    -     
*-------------------------------------------------------------------------------------------
    -     *Join w 2 ints codegen=false              4426 / 4501          4.7    
     211.1       1.0X
    -     *Join w 2 ints codegen=true                791 /  818         26.5    
      37.7       5.6X
    -     */
       }
     
    -  ignore("broadcast hash join, two long key") {
    +  def broadcastHashJoinTwoLongKey(): Unit = {
         val N = 20 << 20
         val M = 1 << 16
    -    val dim3 = broadcast(sparkSession.range(M)
    +    val dim3 = broadcast(spark.range(M)
           .selectExpr("id as k1", "id as k2", "cast(id as string) as v"))
     
    -    runBenchmark("Join w 2 longs", N) {
    -      val df = sparkSession.range(N).join(dim3,
    +    codegenBenchmark("Join w 2 longs", N) {
    +      val df = spark.range(N).join(dim3,
             (col("id") % M) === col("k1") && (col("id") % M) === col("k2"))
           
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
           df.count()
         }
    -
    -    /*
    -     *Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5
    -     *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
    -     *Join w 2 longs:                     Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
    -     
*-------------------------------------------------------------------------------------------
    -     *Join w 2 longs codegen=false             5905 / 6123          3.6    
     281.6       1.0X
    -     *Join w 2 longs codegen=true              2230 / 2529          9.4    
     106.3       2.6X
    -     */
       }
     
    -  ignore("broadcast hash join, two long key with duplicates") {
    +  def broadcastHashJoinTwoLongKeyWithDuplicates(): Unit = {
         val N = 20 << 20
         val M = 1 << 16
    -    val dim4 = broadcast(sparkSession.range(M)
    +    val dim4 = broadcast(spark.range(M)
           .selectExpr("cast(id/10 as long) as k1", "cast(id/10 as long) as 
k2"))
     
    -    runBenchmark("Join w 2 longs duplicated", N) {
    -      val df = sparkSession.range(N).join(dim4,
    +    codegenBenchmark("Join w 2 longs duplicated", N) {
    +      val df = spark.range(N).join(dim4,
             (col("id") bitwiseAND M) === col("k1") && (col("id") bitwiseAND M) 
=== col("k2"))
           
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
           df.count()
         }
    -
    -    /*
    -     *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
    -     *Join w 2 longs duplicated:          Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
    -     
*-------------------------------------------------------------------------------------------
    -     *Join w 2 longs duplicated codegen=false      6420 / 6587          
3.3         306.1       1.0X
    -     *Join w 2 longs duplicated codegen=true      2080 / 2139         10.1 
         99.2       3.1X
    -     */
       }
     
    -  ignore("broadcast hash join, outer join long key") {
    +
    +  def broadcastHashJoinOuterJoinLongKey(): Unit = {
         val N = 20 << 20
         val M = 1 << 16
    -    val dim = broadcast(sparkSession.range(M).selectExpr("id as k", 
"cast(id as string) as v"))
    -    runBenchmark("outer join w long", N) {
    -      val df = sparkSession.range(N).join(dim, (col("id") % M) === 
col("k"), "left")
    +    val dim = broadcast(spark.range(M).selectExpr("id as k", "cast(id as 
string) as v"))
    +    codegenBenchmark("outer join w long", N) {
    +      val df = spark.range(N).join(dim, (col("id") % M) === col("k"), 
"left")
           
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
           df.count()
         }
    -
    -    /*
    -     *Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5
    -     *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
    -     *outer join w long:                  Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
    -     
*-------------------------------------------------------------------------------------------
    -     *outer join w long codegen=false          3055 / 3189          6.9    
     145.7       1.0X
    -     *outer join w long codegen=true            261 /  276         80.5    
      12.4      11.7X
    -     */
       }
     
    -  ignore("broadcast hash join, semi join long key") {
    +
    +  def broadcastHashJoinSemiJoinLongKey(): Unit = {
         val N = 20 << 20
         val M = 1 << 16
    -    val dim = broadcast(sparkSession.range(M).selectExpr("id as k", 
"cast(id as string) as v"))
    -    runBenchmark("semi join w long", N) {
    -      val df = sparkSession.range(N).join(dim, (col("id") % M) === 
col("k"), "leftsemi")
    +    val dim = broadcast(spark.range(M).selectExpr("id as k", "cast(id as 
string) as v"))
    +    codegenBenchmark("semi join w long", N) {
    +      val df = spark.range(N).join(dim, (col("id") % M) === col("k"), 
"leftsemi")
           
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
           df.count()
         }
    -
    -    /*
    -     *Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5
    -     *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
    -     *semi join w long:                   Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
    -     
*-------------------------------------------------------------------------------------------
    -     *semi join w long codegen=false           1912 / 1990         11.0    
      91.2       1.0X
    -     *semi join w long codegen=true             237 /  244         88.3    
      11.3       8.1X
    -     */
       }
     
    -  ignore("sort merge join") {
    +  def sortMergeJoin(): Unit = {
         val N = 2 << 20
    -    runBenchmark("merge join", N) {
    -      val df1 = sparkSession.range(N).selectExpr(s"id * 2 as k1")
    -      val df2 = sparkSession.range(N).selectExpr(s"id * 3 as k2")
    +    codegenBenchmark("merge join", N) {
    +      val df1 = spark.range(N).selectExpr(s"id * 2 as k1")
    +      val df2 = spark.range(N).selectExpr(s"id * 3 as k2")
           val df = df1.join(df2, col("k1") === col("k2"))
           
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
           df.count()
         }
    -
    -    /*
    -     *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
    -     *merge join:                         Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
    -     
*-------------------------------------------------------------------------------------------
    -     *merge join codegen=false                 1588 / 1880          1.3    
     757.1       1.0X
    -     *merge join codegen=true                  1477 / 1531          1.4    
     704.2       1.1X
    -     */
       }
     
    -  ignore("sort merge join with duplicates") {
    +  def sortMergeJoinWithDuplicates(): Unit = {
         val N = 2 << 20
    -    runBenchmark("sort merge join", N) {
    -      val df1 = sparkSession.range(N)
    +    codegenBenchmark("sort merge join with duplicates", N) {
    +      val df1 = spark.range(N)
             .selectExpr(s"(id * 15485863) % ${N*10} as k1")
    -      val df2 = sparkSession.range(N)
    +      val df2 = spark.range(N)
             .selectExpr(s"(id * 15485867) % ${N*10} as k2")
           val df = df1.join(df2, col("k1") === col("k2"))
           
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
           df.count()
         }
    -
    -    /*
    -     *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
    -     *sort merge join:                    Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
    -     
*-------------------------------------------------------------------------------------------
    -     *sort merge join codegen=false            3626 / 3667          0.6    
    1728.9       1.0X
    -     *sort merge join codegen=true             3405 / 3438          0.6    
    1623.8       1.1X
    -     */
       }
     
    -  ignore("shuffle hash join") {
    -    val N = 4 << 20
    -    sparkSession.conf.set("spark.sql.shuffle.partitions", "2")
    -    sparkSession.conf.set("spark.sql.autoBroadcastJoinThreshold", 
"10000000")
    -    sparkSession.conf.set("spark.sql.join.preferSortMergeJoin", "false")
    -    runBenchmark("shuffle hash join", N) {
    -      val df1 = sparkSession.range(N).selectExpr(s"id as k1")
    -      val df2 = sparkSession.range(N / 3).selectExpr(s"id * 3 as k2")
    -      val df = df1.join(df2, col("k1") === col("k2"))
    -      
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[ShuffledHashJoinExec]).isDefined)
    -      df.count()
    +  def shuffleHashJoin(): Unit = {
    +    val N: Long = 4 << 20
    +    withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "2",
    --- End diff --
    
    nit. Could you put `SQLConf.SHUFFLE_PARTITIONS.key` at the next line? 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to