ulysses-you commented on PR #43435:
URL: https://github.com/apache/spark/pull/43435#issuecomment-1769841139
I think the issue is that, we propagate a coalesced shuffle exchange through
`InMemoryTableScanExec`, and then the `EnsureRequirements` use the coalesced
shuffle exchange to create other side shuffle exchange.
However, the shuffle exchanges are actually not compatible. i.e., One side
shuffle is from `HashPartitioning(200)` and then coalesce to
`HashPartitioning(10)` and other side shuffle is `HashPartitioning(10)`. So it
causes the join data issue.
```
Scan
|
Shuffle(200)
|
Scan AQEShuffleRead(10)
| |
Shuffle(10) InMemoryTableScanExec
\ /
Join
```
BTW, if you set `spark.sql.shuffle.partitions=5` , I think this issue should
be resolved.
There are two code place related to this issue:
1. `AQEShuffleRead` always think the coalesced partitioning is not changed,
so just refresh the partition number. I think it is based on the assumption
that all the initial shuffle partition numbers are same but it seems not. The
`EnsureRequirements` support `shouldConsiderMinParallelism` which cause
different initial shuffle partition number in one query execution.
2. The `InMemoryTableScanExec` propagates the output partitioning.
`InMemoryTableScanExec` would introduce one more query execution which also
breaks the assumption of `AQEShuffleRead`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]