manuzhang opened a new pull request #29665:
URL: https://github.com/apache/spark/pull/29665


   This PR backports https://github.com/apache/spark/pull/29593 to branch-3.0
   
   ### What changes were proposed in this pull request?
   Only copy tags to node with no tags when transforming plans.
   
   ### Why are the changes needed?
   @cloud-fan [made a good 
point](https://github.com/apache/spark/pull/29593#discussion_r482013121) that 
it doesn't make sense to append tags to existing nodes when nodes are removed. 
That will cause such bugs as duplicate rows when deduplicating and 
repartitioning by the same column with AQE.
   
   ```
   spark.range(10).union(spark.range(10)).createOrReplaceTempView("v1")
   val df = spark.sql("select id from v1 group by id distribute by id") 
   println(df.collect().toArray.mkString(","))
   println(df.queryExecution.executedPlan)
   
   // With AQE
   
[4],[0],[3],[2],[1],[7],[6],[8],[5],[9],[4],[0],[3],[2],[1],[7],[6],[8],[5],[9]
   AdaptiveSparkPlan(isFinalPlan=true)
   +- CustomShuffleReader local
      +- ShuffleQueryStage 0
         +- Exchange hashpartitioning(id#183L, 10), true
            +- *(3) HashAggregate(keys=[id#183L], functions=[], 
output=[id#183L])
               +- Union
                  :- *(1) Range (0, 10, step=1, splits=2)
                  +- *(2) Range (0, 10, step=1, splits=2)
   
   // Without AQE
   [4],[7],[0],[6],[8],[3],[2],[5],[1],[9]
   *(4) HashAggregate(keys=[id#206L], functions=[], output=[id#206L])
   +- Exchange hashpartitioning(id#206L, 10), true
      +- *(3) HashAggregate(keys=[id#206L], functions=[], output=[id#206L])
         +- Union
            :- *(1) Range (0, 10, step=1, splits=2)
            +- *(2) Range (0, 10, step=1, splits=2)
   ```
   
   It's too expensive to detect node removal so we make a compromise only to 
copy tags to node with no tags.
   
   ### Does this PR introduce any user-facing change?
   Yes. Fix a bug.
   
   ### How was this patch tested?
   Add test.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to