Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r184837526
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala
 ---
    @@ -222,6 +222,61 @@ class JoinBenchmark extends BenchmarkBase {
          */
       }
     
    +  val expensiveFunc = (first: Int, second: Int) => {
    +    for (i <- 1 to 2000) {
    +      Math.sqrt(i * i * i)
    +    }
    +    Math.abs(first - second)
    +  }
    +
    +  def innerRangeTest(N: Int, M: Int): Unit = {
    +    import sparkSession.implicits._
    +    val expUdf = sparkSession.udf.register("expensiveFunc", expensiveFunc)
    +    val df1 = sparkSession.sparkContext.parallelize(1 to M).
    +      cartesian(sparkSession.sparkContext.parallelize(1 to N)).
    +      toDF("col1a", "col1b")
    +    val df2 = sparkSession.sparkContext.parallelize(1 to M).
    +      cartesian(sparkSession.sparkContext.parallelize(1 to N)).
    +      toDF("col2a", "col2b")
    +    val df = df1.join(df2, 'col1a === 'col2a and ('col1b < 'col2b + 3) and 
('col1b > 'col2b - 3))
    +    
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
    +    df.where(expUdf('col1b, 'col2b) < 3).count()
    +  }
    +
    +  ignore("sort merge inner range join") {
    +    sparkSession.conf.set("spark.sql.join.smj.useInnerRangeOptimization", 
"false")
    +    val N = 2 << 5
    +    val M = 100
    +    runBenchmark("sort merge inner range join", N * M) {
    +      innerRangeTest(N, M)
    +    }
    +
    +    /*
    +     *AMD EPYC 7401 24-Core Processor
    +     *sort merge join:                      Best/Avg Time(ms)    Rate(M/s) 
  Per Row(ns)   Relative
    +     
*---------------------------------------------------------------------------------------------
    +     *sort merge join wholestage off            13822 / 14068          0.0 
    2159662.3       1.0X
    +     *sort merge join wholestage on               3863 / 4226          0.0 
     603547.0       3.6X
    +     */
    +  }
    +
    +  ignore("sort merge inner range join optimized") {
    +    sparkSession.conf.set("spark.sql.join.smj.useInnerRangeOptimization", 
"true")
    +    val N = 2 << 5
    +    val M = 100
    +    runBenchmark("sort merge inner range join optimized", N * M) {
    +      innerRangeTest(N, M)
    +    }
    +
    +    /*
    +     *AMD EPYC 7401 24-Core Processor
    +     *sort merge join:                      Best/Avg Time(ms)    Rate(M/s) 
  Per Row(ns)   Relative
    +     
*---------------------------------------------------------------------------------------------
    +     *sort merge join wholestage off            12723 / 12800          0.0 
    1988008.4       1.0X
    --- End diff --
    
    Why wholestage-off case doesn't get much improvement as wholestage-on case?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to