zhengruifeng edited a comment on pull request #34602:
URL: https://github.com/apache/spark/pull/34602#issuecomment-994264151


   @advancedxy  Sorry for the late reply and thanks for ping me.
   
   I did a quick test with https://github.com/apache/spark/pull/33893
   
   Unfortunately, https://github.com/apache/spark/pull/33893 failed to handle 
the case, since the whole plan including `Exchange` nodes (which were 
unexpected when I was implementing https://github.com/apache/spark/pull/33893) 
were passed.
   
   test code:
   ```
   spark.conf.set("spark.sql.adaptive.enabled", true)
   spark.conf.set("spark.sql.adaptive.skewJoin.enabled", true)
   spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
   spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", false)
   spark.conf.set("spark.sql.shuffle.partitions", 10)
   
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", 
"100")
   spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "100")
   
   spark.range(0, 1000, 1, 10).selectExpr("id % 3 as key1", "id % 3 as 
value1").createOrReplaceTempView("skewData1")
   spark.range(0, 1000, 1, 10).selectExpr("id % 1 as key2", "id as 
value2").createOrReplaceTempView("skewData2")
   spark.range(0, 1000, 1, 10).selectExpr("id % 1 as key3", "id as 
value3").createOrReplaceTempView("skewData3")
   
   
   spark.sql("SELECT key1 FROM skewData1 JOIN skewData2 ON key1 = key2 JOIN 
skewData3 ON value2 = 
value3").write.mode("overwrite").parquet("/tmp/tmp1.parquet")
   ```
   
   
   related log:
   
   ```
   21/12/15 11:15:33 DEBUG SparkSqlParser: Parsing command: SELECT key1 FROM 
skewData1 JOIN skewData2 ON key1 = key2 JOIN skewData3 ON value2 = value3
   21/12/15 11:15:34 DEBUG OptimizeSkewedJoin: Optimizing Project #75: 
ShuffledJoins: [SortMergeJoin, SortMergeJoin]
   21/12/15 11:15:34 DEBUG OptimizeSkewedJoin: Optimizing Project #75: Do NOT 
support operators [Exchange, Exchange, Range, Exchange, Range, Exchange, Range]
   21/12/15 11:15:35 DEBUG OptimizeSkewedJoin: Optimizing Project #161: 
ShuffledJoins: [SortMergeJoin, SortMergeJoin]
   21/12/15 11:15:35 DEBUG OptimizeSkewedJoin: Optimizing Project #161: Do NOT 
support operators [Exchange]
   21/12/15 11:15:35 DEBUG OptimizeSkewedJoin: Optimizing Project #200: 
ShuffledJoins: [SortMergeJoin, SortMergeJoin]
   21/12/15 11:15:35 DEBUG OptimizeSkewedJoin: Optimizing Project #200: Do NOT 
support operators [Exchange]
   21/12/15 11:15:35 DEBUG OptimizeSkewedJoin: Optimizing Project #247: 
ShuffledJoins: [SortMergeJoin]
   21/12/15 11:15:36 DEBUG OptimizeSkewedJoin: Optimizing Project #261: 
ShuffledJoins: [SortMergeJoin]
   21/12/15 11:15:36 DEBUG OptimizeSkewedJoin: Optimizing Project #261: 
ShuffleQueryStages: [3, 2]
   21/12/15 11:15:36 DEBUG OptimizeSkewedJoin: Optimizing Project #261: 
Splittable ShuffleQueryStages: [3, 2]
   21/12/15 11:15:36 DEBUG OptimizeSkewedJoin: Optimizing Project #261: 
Optimizing ShuffleQueryStage #3 in skew join, size info: median size: 21184, 
max size: 26854, min size: 18341, avg size: 21544
   21/12/15 11:15:36 DEBUG OptimizeSkewedJoin: Optimizing Project #261: 
Optimizing ShuffleQueryStage #2 in skew join, size info: median size: 1227, max 
size: 1308, min size: 1142, avg size: 1230
   21/12/15 11:15:36 DEBUG OptimizeSkewedJoin: Optimizing Project #261: Totally 
0 skew partitions found
   ```
   
   ---------------------
   
   I try to allow `Exchange` nodes in 
https://github.com/apache/spark/pull/33893, but 
`ValidateRequirements.validate(optimized, 
ensureRequirements.requiredDistribution.get)` fails:
   ```
   java.lang.IllegalArgumentException: requirement failed: 
PartitioningCollection requires all of its partitionings have the same 
numPartitions.
           at scala.Predef$.require(Predef.scala:281)
           at 
org.apache.spark.sql.catalyst.plans.physical.PartitioningCollection.<init>(partitioning.scala:314)
           at 
org.apache.spark.sql.execution.joins.ShuffledJoin.outputPartitioning(ShuffledJoin.scala:49)
           at 
org.apache.spark.sql.execution.joins.ShuffledJoin.outputPartitioning$(ShuffledJoin.scala:47)
           at 
org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputPartitioning(SortMergeJoinExec.scala:38)
           at 
org.apache.spark.sql.execution.exchange.ValidateRequirements$.$anonfun$validateInternal$4(ValidateRequirements.scala:61)
           at 
org.apache.spark.sql.execution.exchange.ValidateRequirements$.$anonfun$validateInternal$4$adapted(ValidateRequirements.scala:59)
           at scala.collection.Iterator.forall(Iterator.scala:955)
           at scala.collection.Iterator.forall$(Iterator.scala:953)
           at scala.collection.AbstractIterator.forall(Iterator.scala:1431)
           at scala.collection.IterableLike.forall(IterableLike.scala:77)
           at scala.collection.IterableLike.forall$(IterableLike.scala:76)
           at scala.collection.AbstractIterable.forall(Iterable.scala:56)
           at 
org.apache.spark.sql.execution.exchange.ValidateRequirements$.validateInternal(ValidateRequirements.scala:59)
           at 
org.apache.spark.sql.execution.exchange.ValidateRequirements$.validate(ValidateRequirements.scala:38)
           at 
org.apache.spark.sql.execution.exchange.ValidateRequirements$.validate(ValidateRequirements.scala:34)
           at 
org.apache.spark.sql.execution.adaptive.OptimizeSkewedJoin.apply(OptimizeSkewedJoin.scala:419)
           at 
org.apache.spark.sql.execution.adaptive.OptimizeSkewedJoin.apply(OptimizeSkewedJoin.scala:59)
   ```
   
   It looks like I need to manually modify the `Exchange` nodes whose children 
has been optimized, but I think this will be too complex.
   
   
   For existing `OptimizeSkewedJoin` impl in master, I think above problems 
also exist. This should be a serious regression since intermediate skewed joins 
can not be optimized. any thoughts? @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to