Hi, I am using pyspark for some projects. And one of the things we are doing is trying to find the tables/columns being used by Spark using the execution plan.
When we upgrade to spark 3.2 - the spark plan seems to be different from previous versions - mainly when we are doing joins. Below is a reproducible example (you could run the same in versions 2.3 to 3.1 to see the difference) My original data frames have the columns: id#0 and id#4 But after doing the joins we are seeing new columns id#34 and id#19 which are not created from the original dataframes I was working with. In previous versions of spark, this used to use a ReusedExchange step (shown below) I was trying to understand if this is expected in spark 3.2 where the execution plan seems to be creating a new data source which does not originate from df1 and df2 which I provided. NOTE: The same happens even if I read from parquet files In spark 3.2: In [1]: import pyspark ...: spark = pyspark.sql.SparkSession.builder.getOrCreate() In [2]: df1 = spark.createDataFrame([[1, 10], [2, 20]], ['id', 'col1']) ...: df2 = spark.createDataFrame([[1, 11], [2, 22], [2, 222]], ['id', 'col2']) ...: df1.explain() ...: df2.explain() == Physical Plan == *(1) Scan ExistingRDD[id#0L,col1#1L] == Physical Plan == *(1) Scan ExistingRDD[id#4L,col2#5L] In [3]: df3 = df1.join(df2, df1['id'] == df2['id']).drop(df2['id']) ...: df4 = df2.join(df3, df1['id'] == df2['id']) ...: df4.explain() == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- SortMergeJoin [id#4L], [id#0L], Inner :- Sort [id#4L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS, [id=#53] : +- Filter isnotnull(id#4L) : +- Scan ExistingRDD[id#4L,col2#5L] +- Project [id#0L, col1#1L, col2#20L] +- SortMergeJoin [id#0L], [id#19L], Inner :- Sort [id#0L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS, [id=#45] : +- Filter isnotnull(id#0L) : +- Scan ExistingRDD[id#0L,col1#1L] * +- Sort [id#19L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#19L, 200), ENSURE_REQUIREMENTS, [id=#46] +- Filter isnotnull(id#19L) +- Scan ExistingRDD[id#19L,col2#20L]* In [4]: df1.createOrReplaceTempView('df1') ...: df2.createOrReplaceTempView('df2') ...: df3 = spark.sql(""" ...: SELECT df1.id, df1.col1, df2.col2 ...: FROM df1 JOIN df2 ON df1.id = df2.id ...: """) ...: df3.createOrReplaceTempView('df3') ...: df4 = spark.sql(""" ...: SELECT df2.*, df3.* ...: FROM df2 JOIN df3 ON df2.id = df3.id ...: """) ...: df4.explain() == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- SortMergeJoin [id#4L], [id#0L], Inner :- Sort [id#4L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS, [id=#110] : +- Filter isnotnull(id#4L) : +- Scan ExistingRDD[id#4L,col2#5L] +- Project [id#0L, col1#1L, col2#35L] +- SortMergeJoin [id#0L], [id#34L], Inner :- Sort [id#0L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS, [id=#102] : +- Filter isnotnull(id#0L) : +- Scan ExistingRDD[id#0L,col1#1L] * +- Sort [id#34L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#34L, 200), ENSURE_REQUIREMENTS, [id=#103] +- Filter isnotnull(id#34L) +- Scan ExistingRDD[id#34L,col2#35L]* Doing this in spark 3.1.1 - the plan is: *(8) SortMergeJoin [id#4L], [id#0L], Inner :- *(2) Sort [id#4L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS, [id=#56] : +- *(1) Filter isnotnull(id#4L) : +- *(1) Scan ExistingRDD[id#4L,col2#5L] +- *(7) Project [id#0L, col1#1L, col2#20L] +- *(7) SortMergeJoin [id#0L], [id#19L], Inner :- *(4) Sort [id#0L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS, [id=#62] : +- *(3) Filter isnotnull(id#0L) : +- *(3) Scan ExistingRDD[id#0L,col1#1L] * +- *(6) Sort [id#19L ASC NULLS FIRST], false, 0 +- ReusedExchange [id#19L, col2#20L], Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS, [id=#56]*