zhengruifeng edited a comment on pull request #34602: URL: https://github.com/apache/spark/pull/34602#issuecomment-994264151
@advancedxy Sorry for the late reply and thanks for ping me. I did a quick test with https://github.com/apache/spark/pull/33893 Unfortunately, https://github.com/apache/spark/pull/33893 failed to handle the case, since the whole plan including `Exchange` nodes (which were unexpected when I was implementing https://github.com/apache/spark/pull/33893) were passed. test code: ``` spark.conf.set("spark.sql.adaptive.enabled", true) spark.conf.set("spark.sql.adaptive.skewJoin.enabled", true) spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1") spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", false) spark.conf.set("spark.sql.shuffle.partitions", 10) spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "100") spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "100") spark.range(0, 1000, 1, 10).selectExpr("id % 3 as key1", "id % 3 as value1").createOrReplaceTempView("skewData1") spark.range(0, 1000, 1, 10).selectExpr("id % 1 as key2", "id as value2").createOrReplaceTempView("skewData2") spark.range(0, 1000, 1, 10).selectExpr("id % 1 as key3", "id as value3").createOrReplaceTempView("skewData3") spark.sql("SELECT key1 FROM skewData1 JOIN skewData2 ON key1 = key2 JOIN skewData3 ON value2 = value3").write.mode("overwrite").parquet("/tmp/tmp1.parquet") ``` related log: ``` 21/12/15 11:15:33 DEBUG SparkSqlParser: Parsing command: SELECT key1 FROM skewData1 JOIN skewData2 ON key1 = key2 JOIN skewData3 ON value2 = value3 21/12/15 11:15:34 DEBUG OptimizeSkewedJoin: Optimizing Project #75: ShuffledJoins: [SortMergeJoin, SortMergeJoin] 21/12/15 11:15:34 DEBUG OptimizeSkewedJoin: Optimizing Project #75: Do NOT support operators [Exchange, Exchange, Range, Exchange, Range, Exchange, Range] 21/12/15 11:15:35 DEBUG OptimizeSkewedJoin: Optimizing Project #161: ShuffledJoins: [SortMergeJoin, SortMergeJoin] 21/12/15 11:15:35 DEBUG OptimizeSkewedJoin: Optimizing Project #161: Do NOT support operators [Exchange] 21/12/15 11:15:35 DEBUG OptimizeSkewedJoin: Optimizing Project #200: ShuffledJoins: [SortMergeJoin, SortMergeJoin] 21/12/15 11:15:35 DEBUG OptimizeSkewedJoin: Optimizing Project #200: Do NOT support operators [Exchange] 21/12/15 11:15:35 DEBUG OptimizeSkewedJoin: Optimizing Project #247: ShuffledJoins: [SortMergeJoin] 21/12/15 11:15:36 DEBUG OptimizeSkewedJoin: Optimizing Project #261: ShuffledJoins: [SortMergeJoin] 21/12/15 11:15:36 DEBUG OptimizeSkewedJoin: Optimizing Project #261: ShuffleQueryStages: [3, 2] 21/12/15 11:15:36 DEBUG OptimizeSkewedJoin: Optimizing Project #261: Splittable ShuffleQueryStages: [3, 2] 21/12/15 11:15:36 DEBUG OptimizeSkewedJoin: Optimizing Project #261: Optimizing ShuffleQueryStage #3 in skew join, size info: median size: 21184, max size: 26854, min size: 18341, avg size: 21544 21/12/15 11:15:36 DEBUG OptimizeSkewedJoin: Optimizing Project #261: Optimizing ShuffleQueryStage #2 in skew join, size info: median size: 1227, max size: 1308, min size: 1142, avg size: 1230 21/12/15 11:15:36 DEBUG OptimizeSkewedJoin: Optimizing Project #261: Totally 0 skew partitions found ``` --------------------- I try to allow `Exchange` nodes in https://github.com/apache/spark/pull/33893, but `ValidateRequirements.validate(optimized, ensureRequirements.requiredDistribution.get)` fails: ``` java.lang.IllegalArgumentException: requirement failed: PartitioningCollection requires all of its partitionings have the same numPartitions. at scala.Predef$.require(Predef.scala:281) at org.apache.spark.sql.catalyst.plans.physical.PartitioningCollection.<init>(partitioning.scala:314) at org.apache.spark.sql.execution.joins.ShuffledJoin.outputPartitioning(ShuffledJoin.scala:49) at org.apache.spark.sql.execution.joins.ShuffledJoin.outputPartitioning$(ShuffledJoin.scala:47) at org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputPartitioning(SortMergeJoinExec.scala:38) at org.apache.spark.sql.execution.exchange.ValidateRequirements$.$anonfun$validateInternal$4(ValidateRequirements.scala:61) at org.apache.spark.sql.execution.exchange.ValidateRequirements$.$anonfun$validateInternal$4$adapted(ValidateRequirements.scala:59) at scala.collection.Iterator.forall(Iterator.scala:955) at scala.collection.Iterator.forall$(Iterator.scala:953) at scala.collection.AbstractIterator.forall(Iterator.scala:1431) at scala.collection.IterableLike.forall(IterableLike.scala:77) at scala.collection.IterableLike.forall$(IterableLike.scala:76) at scala.collection.AbstractIterable.forall(Iterable.scala:56) at org.apache.spark.sql.execution.exchange.ValidateRequirements$.validateInternal(ValidateRequirements.scala:59) at org.apache.spark.sql.execution.exchange.ValidateRequirements$.validate(ValidateRequirements.scala:38) at org.apache.spark.sql.execution.exchange.ValidateRequirements$.validate(ValidateRequirements.scala:34) at org.apache.spark.sql.execution.adaptive.OptimizeSkewedJoin.apply(OptimizeSkewedJoin.scala:419) at org.apache.spark.sql.execution.adaptive.OptimizeSkewedJoin.apply(OptimizeSkewedJoin.scala:59) ``` It looks like I need to manually modify the `Exchange` nodes whose children has been optimized, but I think this will be too complex. For existing `OptimizeSkewedJoin` impl in master, I think above problems also exist. This should be a serious regression since intermediate skewed joins can not be optimized. any thoughts? @cloud-fan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
