c21 commented on pull request #32210: URL: https://github.com/apache/spark/pull/32210#issuecomment-823503243
@cloud-fan - > sorting the stream-side at runtime may lead to slow query plan because the sort is not whole-stage-codegen-ed. Note this PR is for non-code-gen path only. For code-gen path, I plan to add later as it depends on sort merge join code-gen work (SPARK-34705).Sort operator code-gen is pretty [straightforward](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala#L170), so for code-gen path, shuffled hash join, fallbacked sort, fallbacked sort merge join are all will be code-gen-ed. > unlike SMJ, the output ordering can't be preserved if we sort the stream-side at runtime. This is good observation and I noticed this too. I think it might be able to be resolved with propagating `skipSort` from parent operators of `SortExec` (similar to `limitNotReachedCond` for limit). But I need to check closer for how it's feasible. IMO this might not be a blocker, as for sort-based fallback of `HashAggregateExec`, the output ordering is not preserved neither. > I think the eventual goal is to enable shuffle hash join by default, but I'm not sure adding the fallback can achieve this goal. Do you have some real data to show the benefits? We enabled shuffled hash join by default with this feature. In our environment, roughly 25% of sort merge join queries are now running with shuffled hash join after enabling shuffled hash join by default. Roughly 20% of shuffled hash join queries has tasks fall-backed. Roughly 10% of shuffled hash join tasks has fall-backed. TLDR is it saves 20% of shuffled hash join queries failure, and 10% of tasks failure. > Another idea is to pick shuffle hash join in AQE when we know the per-partition size after shuffle. I think more about this and I assume you are referring to some kind of `HybridJoin` operator, and we can choose to do shuffled hash join vs sort merge join for each task independently, e.g. based on partition size, task1 can do shuffled hash join, and task2 can do sort merge join, etc. This approach has some disadvantages though: 1. Do not work if join child does not have shuffle. E.g., join on bucketed table, multiple join, join after group-by, etc. Do not work for 100% case sounds like a show stopper for me, so we cannot enable shuffled hash join by default. 2. Similarly the output ordering can't be preserved out of box. We only have a global view of `OutputOrdering` now, which applies to all tasks. We need introduce something else, to accommodate the fact that some tasks preserve the order, but some tasks do not. 3. Per-partition build size is not 100% same as run-time hash relation size. The hash relation size can be larger with storing the extra keys and underlying `BytesToBytesMap` overhead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
