Lior Regev created SPARK-47037: ---------------------------------- Summary: JoinWith on non-inner joins adds an unnecessary shuffle Key: SPARK-47037 URL: https://issues.apache.org/jira/browse/SPARK-47037 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.5.0 Reporter: Lior Regev
Assume the following code: {code:java} final case class AAA(i: Int, s: String) val ds1 = spark.createDataset(Seq( AAA(1, "s") )).repartition($"i") val ds2 = spark.createDataset(Seq( AAA(2, "sd") )).repartition($"i") val value = ds1.as("a") .joinWith(ds2.as("b"), $"a.i" === $"b.i", "left") val plan = value.queryExecution.executedPlan println(plan) {code} This code creates the following plan: {code:java} AdaptiveSparkPlan isFinalPlan=false +- SortMergeJoin [_1#30.i], [_2#31.i], LeftOuter :- Sort [_1#30.i ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(_1#30.i, 200), ENSURE_REQUIREMENTS, [id=#29] : +- Project [named_struct(i, i#2, s, s#3) AS _1#30] : +- Exchange hashpartitioning(i#2, 200), REPARTITION_BY_COL, [id=#21] : +- LocalTableScan [i#2, s#3] +- Sort [_2#31.i ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(_2#31.i, 200), ENSURE_REQUIREMENTS, [id=#30] +- Project [named_struct(i, i#12, s, s#13) AS _2#31] +- Exchange hashpartitioning(i#12, 200), REPARTITION_BY_COL, [id=#24] +- LocalTableScan [i#12, s#13] {code} Note how the plan has 2 `Exchange` steps for each side. even though the datasets were already partitioned correctly. This of course is an unnecessary shuffle and should be skipped -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org