Hi all, I hope everyone is doing well. 
I'm currently working on a Spark migration project that aims to migrate all 
Spark SQL pipelines for Spark 3.x version and take advantage of all performance 
improvements on it. My company is using Spark 2.4.0 but we are targeting to use 
officially the 3.1.1 for all Spark SQL data pipelines but without AQE enabled 
yet. The primary goal is to keep everything the same but use the newest 
version. Later on, we can easily enable AQE for all data pipelines.
After migrating some pipelines, we discovered a slight query plan change in the 
version upgrade. We found out that instead of SortMergeJoin it is using the 
BroadcastHashJoin to do the join between the tables of my query. Not only this, 
but the BroadcastExchange operation is occurring on the big table side, which 
seems strange from my perspective.
You can see some snapshots and a better explanation of the problem here: 
https://stackoverflow.com/questions/72793116/migration-from-spark-2-4-0-to-spark-3-1-1-caused-sortmergejoin-to-change-to-broa
I'm setting `spark.sql.adaptive.enabled` to false, 
`spark.sql.autoBroadcastJoinThreshold` to 10Mb, and 
`spark.sql.shuffle.partitions` to 200, but apparently only by changing the 
Spark 2 to 3 for this query, it has made the query plan changes and the 
performance has been degraded. In this specific scenario, we are facing a 
"Could not execute broadcast in 300 secs" error.
Do you guys have any clue on why this is happening? My questions are:
- Why Spark 3 has changed the join approach in this situation given that AQE is 
disabled and the spark.sql.autoBroadcastJoinThreshold is much smaller than the 
data set size?- Is this the expected behavior or could this represents a 
potential bug in Spark 3.x?
Please, let me know your thoughts. I appreciate all the help in advance.

Reply via email to