Hi,
I am using pyspark for some projects. And one of the things we are doing is
trying to find the tables/columns being used by Spark using the execution
plan.

When we upgrade to spark 3.2 - the spark plan seems to be different from
previous versions - mainly when we are doing joins.
Below is a reproducible example (you could run the same in versions 2.3 to
3.1 to see the difference)

My original data frames have the columns: id#0 and id#4
But after doing the joins we are seeing new columns id#34 and id#19 which
are not created from the original dataframes I was working with.
In previous versions of spark, this used to use a ReusedExchange step
(shown below)

I was trying to understand if this is expected in spark 3.2 where the
execution plan seems to be creating a new data source which does not
originate from df1 and df2 which I provided.
NOTE: The same happens even if I read from parquet files

In spark 3.2:
In [1]: import pyspark
   ...: spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [2]: df1 = spark.createDataFrame([[1, 10], [2, 20]], ['id', 'col1'])
   ...: df2 = spark.createDataFrame([[1, 11], [2, 22], [2, 222]], ['id',
'col2'])
   ...: df1.explain()
   ...: df2.explain()
== Physical Plan ==
*(1) Scan ExistingRDD[id#0L,col1#1L]

== Physical Plan ==
*(1) Scan ExistingRDD[id#4L,col2#5L]

In [3]: df3 = df1.join(df2, df1['id'] == df2['id']).drop(df2['id'])
   ...: df4 = df2.join(df3, df1['id'] == df2['id'])
   ...: df4.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [id#4L], [id#0L], Inner
   :- Sort [id#4L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS,
[id=#53]
   :     +- Filter isnotnull(id#4L)
   :        +- Scan ExistingRDD[id#4L,col2#5L]
   +- Project [id#0L, col1#1L, col2#20L]
      +- SortMergeJoin [id#0L], [id#19L], Inner
         :- Sort [id#0L ASC NULLS FIRST], false, 0
         :  +- Exchange hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS,
[id=#45]
         :     +- Filter isnotnull(id#0L)
         :        +- Scan ExistingRDD[id#0L,col1#1L]



*         +- Sort [id#19L ASC NULLS FIRST], false, 0            +- Exchange
hashpartitioning(id#19L, 200), ENSURE_REQUIREMENTS, [id=#46]
 +- Filter isnotnull(id#19L)                  +- Scan
ExistingRDD[id#19L,col2#20L]*

In [4]: df1.createOrReplaceTempView('df1')
   ...: df2.createOrReplaceTempView('df2')
   ...: df3 = spark.sql("""
   ...:     SELECT df1.id, df1.col1, df2.col2
   ...:     FROM df1 JOIN df2 ON df1.id = df2.id
   ...: """)
   ...: df3.createOrReplaceTempView('df3')
   ...: df4 = spark.sql("""
   ...:     SELECT df2.*, df3.*
   ...:     FROM df2 JOIN df3 ON df2.id = df3.id
   ...: """)
   ...: df4.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [id#4L], [id#0L], Inner
   :- Sort [id#4L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS,
[id=#110]
   :     +- Filter isnotnull(id#4L)
   :        +- Scan ExistingRDD[id#4L,col2#5L]
   +- Project [id#0L, col1#1L, col2#35L]
      +- SortMergeJoin [id#0L], [id#34L], Inner
         :- Sort [id#0L ASC NULLS FIRST], false, 0
         :  +- Exchange hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS,
[id=#102]
         :     +- Filter isnotnull(id#0L)
         :        +- Scan ExistingRDD[id#0L,col1#1L]



*         +- Sort [id#34L ASC NULLS FIRST], false, 0            +- Exchange
hashpartitioning(id#34L, 200), ENSURE_REQUIREMENTS, [id=#103]
 +- Filter isnotnull(id#34L)                  +- Scan
ExistingRDD[id#34L,col2#35L]*


Doing this in spark 3.1.1 - the plan is:

*(8) SortMergeJoin [id#4L], [id#0L], Inner
:- *(2) Sort [id#4L ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS, [id=#56]
:     +- *(1) Filter isnotnull(id#4L)
:        +- *(1) Scan ExistingRDD[id#4L,col2#5L]
+- *(7) Project [id#0L, col1#1L, col2#20L]
   +- *(7) SortMergeJoin [id#0L], [id#19L], Inner
      :- *(4) Sort [id#0L ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS,
[id=#62]
      :     +- *(3) Filter isnotnull(id#0L)
      :        +- *(3) Scan ExistingRDD[id#0L,col1#1L]

*      +- *(6) Sort [id#19L ASC NULLS FIRST], false, 0         +-
ReusedExchange [id#19L, col2#20L], Exchange hashpartitioning(id#4L, 200),
ENSURE_REQUIREMENTS, [id=#56]*

Reply via email to