[ 
https://issues.apache.org/jira/browse/SPARK-37328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lietong Liu updated SPARK-37328:
--------------------------------
    Description: 
Since OptimizeSkewedJoin was moved from queryStageOptimizerRules to 

queryStagePreparationRules, the position OptimizeSkewedJoin was applied has 
been moved from newQueryStage() to reOptimize(). The plan OptimizeSkewedJoin 
applied on changed from plan of new stage which is about to submit to whole 
spark plan.

In the cases where skewedJoin is not last stage, OptimizeSkewedJoin may not 
work because the number of collected shuffleStages is more than 2.

The following test will prove it:

 

 
{code:java}
test("OptimizeSkewJoin may not work") {
  withSQLConf(
    SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
    SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
    SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "100",
    SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "100",
    SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
    SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
    withTempView("skewData1", "skewData2", "skewData3") {
      spark
        .range(0, 1000, 1, 10)
        .selectExpr("id % 3 as key1", "id % 3 as value1")
        .createOrReplaceTempView("skewData1")
      spark
        .range(0, 1000, 1, 10)
        .selectExpr("id % 1 as key2", "id as value2")
        .createOrReplaceTempView("skewData2")
      spark
        .range(0, 1000, 1, 10)
        .selectExpr("id % 1 as key3", "id as value3")
        .createOrReplaceTempView("skewData3")

      // Query has two skewedJoin in two continuous stages.
      val (_, adaptive1) =
        runAdaptiveAndVerifyResult(
          """
            |SELECT key1 FROM skewData1 s1
            |JOIN skewData2 s2
            |ON s1.key1 = s2.key2
            |JOIN skewData3
            |ON s1.value1 = value3
            |""".stripMargin)
      val shuffles1 = collect(adaptive1) {
        case s: ShuffleExchangeExec => s
      }
      assert(shuffles1.size == 4)
      val smj1 = findTopLevelSortMergeJoin(adaptive1)
      assert(smj1.size == 2 && smj1.forall(_.isSkewJoin))
    }
  }
} {code}
I'll open a PR shortly to fix this issue

 

  was:
Since OptimizeSkewedJoin was moved from queryStageOptimizerRules to 

queryStagePreparationRules, the position OptimizeSkewedJoin was applied has 
been moved from newQueryStage() to reOptimize(). The plan OptimizeSkewedJoin 
applied on changed from plan of new stage which is about to submit to whole 
spark plan.

In the cases where skewedJoin is not last stage, OptimizeSkewedJoin may not 
work because the number of collected shuffleStages is more than 2.

The following test will prove it:

 

 
{code:java}

test("OptimizeSkewJoin may not work") {
  withSQLConf(
    SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
    SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
    SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "100",
    SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "100",
    SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
    SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
    withTempView("skewData1", "skewData2", "skewData3") {
      spark
        .range(0, 1000, 1, 10)
        .selectExpr("id % 3 as key1", "id % 3 as value1")
        .createOrReplaceTempView("skewData1")
      spark
        .range(0, 1000, 1, 10)
        .selectExpr("id % 1 as key2", "id as value2")
        .createOrReplaceTempView("skewData2")
      spark
        .range(0, 1000, 1, 10)
        .selectExpr("id % 1 as key3", "id as value3")
        .createOrReplaceTempView("skewData3")

      // Query has two skewedJoin in two continuous stages.
      val (_, adaptive1) =
        runAdaptiveAndVerifyResult(
          """
            |SELECT key1 FROM skewData1 s1
            |JOIN skewData2 s2
            |ON s1.key1 = s2.key2
            |JOIN skewData3
            |ON s1.value1 = value3
            |""".stripMargin)
      val shuffles1 = collect(adaptive1) {
        case s: ShuffleExchangeExec => s
      }
      assert(shuffles1.size == 4)
      val smj1 = findTopLevelSortMergeJoin(adaptive1)
      assert(smj1.size == 2 && smj1.forall(_.isSkewJoin))
    }
  }
} {code}
 

 


> SPARK-33832 brings the bug that OptimizeSkewedJoin may not work since it was 
> applied onn whole plan innstead of new stage plan
> ------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-37328
>                 URL: https://issues.apache.org/jira/browse/SPARK-37328
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.2.0
>            Reporter: Lietong Liu
>            Priority: Major
>
> Since OptimizeSkewedJoin was moved from queryStageOptimizerRules to 
> queryStagePreparationRules, the position OptimizeSkewedJoin was applied has 
> been moved from newQueryStage() to reOptimize(). The plan OptimizeSkewedJoin 
> applied on changed from plan of new stage which is about to submit to whole 
> spark plan.
> In the cases where skewedJoin is not last stage, OptimizeSkewedJoin may not 
> work because the number of collected shuffleStages is more than 2.
> The following test will prove it:
>  
>  
> {code:java}
> test("OptimizeSkewJoin may not work") {
>   withSQLConf(
>     SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
>     SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
>     SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "100",
>     SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "100",
>     SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
>     SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
>     withTempView("skewData1", "skewData2", "skewData3") {
>       spark
>         .range(0, 1000, 1, 10)
>         .selectExpr("id % 3 as key1", "id % 3 as value1")
>         .createOrReplaceTempView("skewData1")
>       spark
>         .range(0, 1000, 1, 10)
>         .selectExpr("id % 1 as key2", "id as value2")
>         .createOrReplaceTempView("skewData2")
>       spark
>         .range(0, 1000, 1, 10)
>         .selectExpr("id % 1 as key3", "id as value3")
>         .createOrReplaceTempView("skewData3")
>       // Query has two skewedJoin in two continuous stages.
>       val (_, adaptive1) =
>         runAdaptiveAndVerifyResult(
>           """
>             |SELECT key1 FROM skewData1 s1
>             |JOIN skewData2 s2
>             |ON s1.key1 = s2.key2
>             |JOIN skewData3
>             |ON s1.value1 = value3
>             |""".stripMargin)
>       val shuffles1 = collect(adaptive1) {
>         case s: ShuffleExchangeExec => s
>       }
>       assert(shuffles1.size == 4)
>       val smj1 = findTopLevelSortMergeJoin(adaptive1)
>       assert(smj1.size == 2 && smj1.forall(_.isSkewJoin))
>     }
>   }
> } {code}
> I'll open a PR shortly to fix this issue
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to