Hi all, I hope everyone is doing well.
I'm currently working on a Spark migration project that aims to migrate all
Spark SQL pipelines for Spark 3.x version and take advantage of all performance
improvements on it. My company is using Spark 2.4.0 but we are targeting to use
officially the 3.1.1 for all Spark SQL data pipelines but without AQE enabled
yet. The primary goal is to keep everything the same but use the newest
version. Later on, we can easily enable AQE for all data pipelines.
After migrating some pipelines, we discovered a slight query plan change in the
version upgrade. We found out that instead of SortMergeJoin it is using the
BroadcastHashJoin to do the join between the tables of my query. Not only this,
but the BroadcastExchange operation is occurring on the big table side, which
seems strange from my perspective.
You can see some snapshots and a better explanation of the problem here:
https://stackoverflow.com/questions/72793116/migration-from-spark-2-4-0-to-spark-3-1-1-caused-sortmergejoin-to-change-to-broa
I'm setting `spark.sql.adaptive.enabled` to false,
`spark.sql.autoBroadcastJoinThreshold` to 10Mb, and
`spark.sql.shuffle.partitions` to 200, but apparently only by changing the
Spark 2 to 3 for this query, it has made the query plan changes and the
performance has been degraded. In this specific scenario, we are facing a
"Could not execute broadcast in 300 secs" error.
Do you guys have any clue on why this is happening? My questions are:
- Why Spark 3 has changed the join approach in this situation given that AQE is
disabled and the spark.sql.autoBroadcastJoinThreshold is much smaller than the
data set size?- Is this the expected behavior or could this represents a
potential bug in Spark 3.x?
Please, let me know your thoughts. I appreciate all the help in advance.