Lior Regev created SPARK-47037:
----------------------------------

             Summary: JoinWith on non-inner joins adds an unnecessary shuffle
                 Key: SPARK-47037
                 URL: https://issues.apache.org/jira/browse/SPARK-47037
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.5.0
            Reporter: Lior Regev


Assume the following code:
{code:java}
final case class AAA(i: Int, s: String)

val ds1 = spark.createDataset(Seq(
  AAA(1, "s")
)).repartition($"i")

val ds2 = spark.createDataset(Seq(
  AAA(2, "sd")
)).repartition($"i")

val value = ds1.as("a")
  .joinWith(ds2.as("b"), $"a.i" === $"b.i", "left")

val plan = value.queryExecution.executedPlan
println(plan) {code}
This code creates the following plan:
{code:java}
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [_1#30.i], [_2#31.i], LeftOuter
   :- Sort [_1#30.i ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(_1#30.i, 200), ENSURE_REQUIREMENTS, [id=#29]
   :     +- Project [named_struct(i, i#2, s, s#3) AS _1#30]
   :        +- Exchange hashpartitioning(i#2, 200), REPARTITION_BY_COL, [id=#21]
   :           +- LocalTableScan [i#2, s#3]
   +- Sort [_2#31.i ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(_2#31.i, 200), ENSURE_REQUIREMENTS, [id=#30]
         +- Project [named_struct(i, i#12, s, s#13) AS _2#31]
            +- Exchange hashpartitioning(i#12, 200), REPARTITION_BY_COL, 
[id=#24]
               +- LocalTableScan [i#12, s#13] {code}
Note how the plan has 2 `Exchange` steps for each side. even though the 
datasets were already partitioned correctly.

This of course is an unnecessary shuffle and should be skipped



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to