c21 commented on pull request #32210:
URL: https://github.com/apache/spark/pull/32210#issuecomment-823503243


   @cloud-fan -
   
   > sorting the stream-side at runtime may lead to slow query plan because the 
sort is not whole-stage-codegen-ed.
   
   Note this PR is for non-code-gen path only. For code-gen path, I plan to add 
later as it depends on sort merge join code-gen work (SPARK-34705).Sort 
operator code-gen is pretty 
[straightforward](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala#L170),
 so for code-gen path, shuffled hash join, fallbacked sort, fallbacked sort 
merge join are all will be code-gen-ed.
   
   > unlike SMJ, the output ordering can't be preserved if we sort the 
stream-side at runtime.
   
   This is good observation and I noticed this too. I think it might be able to 
be resolved with propagating `skipSort` from parent operators of `SortExec` 
(similar to `limitNotReachedCond` for limit). But I need to check closer for 
how it's feasible. IMO this might not be a blocker, as for sort-based fallback 
of `HashAggregateExec`, the output ordering is not preserved neither.
   
   > I think the eventual goal is to enable shuffle hash join by default, but 
I'm not sure adding the fallback can achieve this goal. Do you have some real 
data to show the benefits?
   
   We enabled shuffled hash join by default with this feature. In our 
environment, roughly 25% of sort merge join queries are now running with 
shuffled hash join after enabling shuffled hash join by default. Roughly 20% of 
shuffled hash join queries has tasks fall-backed. Roughly 10% of shuffled hash 
join tasks has fall-backed. TLDR is it saves 20% of shuffled hash join queries 
failure, and 10% of tasks failure.
   
   > Another idea is to pick shuffle hash join in AQE when we know the 
per-partition size after shuffle.
   
   I think more about this and I assume you are referring to some kind of 
`HybridJoin` operator, and we can choose to do shuffled hash join vs sort merge 
join for each task independently, e.g. based on partition size, task1 can do 
shuffled hash join, and task2 can do sort merge join, etc. This approach has 
some disadvantages though:
   
   1. Do not work if join child does not have shuffle. E.g., join on bucketed 
table, multiple join, join after group-by, etc. Do not work for 100% case 
sounds like a show stopper for me, so we cannot enable shuffled hash join by 
default.
   2. Similarly the output ordering can't be preserved out of box. We only have 
a global view of `OutputOrdering` now, which applies to all tasks. We need 
introduce something else, to accommodate the fact that some tasks preserve the 
order, but some tasks do not.
   3. Per-partition build size is not 100% same as run-time hash relation size. 
The hash relation size can be larger with storing the extra keys and underlying 
`BytesToBytesMap` overhead.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to