ekoifman commented on pull request #30829: URL: https://github.com/apache/spark/pull/30829#issuecomment-760544855
A Rule in `queryStageOptimizerRules` adds a shuffle indirectly. 1. `OptimizeSkewJoin` runs and adds a new shuffle below `e` in a `newQueryStage(e: Exchange)` call 2. We can't keep this `optimizedPlan` because the new shuffle should become the new root of a stage so instead it remembers the location in the plan where the new shuffle needs to be and throws `ShuffleAddedException` with the location back to the main loop in `getFinalPhysicalPlan()` and `val z = insertShuffle(e.sp, currentPhysicalPlan)` properly inserts the new shuffle in `currentPhysicalPlan`. Call the new shuffle `e'` 3. now it restarts `createQueryStages(currentPhysicalPlan)` from the top but with new shuffle added. When `newQueryStage` finds `e'`, it becomes the root of the stage with the same leaves as `e`. Now when `OptimizeSkewJoin` runs on this stage, it can do skew mitigation but it no longer needs to add a new shuffle to do so, since `e'` "protects" `e` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
