EnricoMi opened a new pull request, #39717:
URL: https://github.com/apache/spark/pull/39717
### What changes were proposed in this pull request?
Make `FlatMapCoGroupsInPandas` (used by PySpark) report its required child
distribution as `HashClusteredDistribution`, rather than
`ClusteredDistribution`. That is the same distribution as reported by `CoGroup`
(used by Scala).
### Why are the changes needed?
This allows the `EnsureRequirements` rule to correctly recognizes that
`FlatMapCoGroupsInPandas` requiring `HashClusteredDistribution(id, day)` is not
compatible with `HashPartitioning(day, id)`, while `ClusteredDistribution(id,
day)` is compatible with `HashPartitioning(day, id)`.
The following example returns an incorrect result in Spark 3.0, 3.1, and 3.2.
```Scala
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, lit, sum}
val ids = 1000
val days = 1000
val parts = 10
val id_df = spark.range(ids)
val day_df = spark.range(days).withColumnRenamed("id", "day")
val id_day_df = id_df.join(day_df)
// these redundant aliases are needed to workaround bug SPARK-42132
val left_df = id_day_df.select($"id".as("id"), $"day".as("day"),
lit("left").as("side")).repartition(parts).cache()
val right_df = id_day_df.select($"id".as("id"), $"day".as("day"),
lit("right").as("side")).repartition(parts).cache() //.withColumnRenamed("id",
"id2")
// note the column order is different to the groupBy("id", "day") column
order below
val window = Window.partitionBy("day", "id")
case class Key(id: BigInt, day: BigInt)
case class Value(id: BigInt, day: BigInt, side: String)
case class Sum(id: BigInt, day: BigInt, side: String, day_sum: BigInt)
val left_grouped_df = left_df.groupBy("id", "day").as[Key, Value]
val right_grouped_df = right_df.withColumn("day_sum",
sum(col("day")).over(window)).groupBy("id", "day").as[Key, Sum]
val df = left_grouped_df.cogroup(right_grouped_df)((key: Key, left:
Iterator[Value], right: Iterator[Sum]) => left)
df.explain()
df.show(5)
```
Output was
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L],
cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L),
[id#64L, day#65L, lefts#66, rights#67]
:- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS,
[plan_id=117]
: +- ...
+- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0
+- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L]
+- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L,
specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$()))
AS day_sum#54L], [day#30L, id#29L]
+- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST],
false, 0
+- Exchange hashpartitioning(day#30L, id#29L, 200),
ENSURE_REQUIREMENTS, [plan_id=112]
+- ...
+---+---+-----+------+
| id|day|lefts|rights|
+---+---+-----+------+
| 0| 3| 0| 1|
| 0| 4| 0| 1|
| 0| 13| 1| 0|
| 0| 27| 0| 1|
| 0| 31| 0| 1|
+---+---+-----+------+
only showing top 5 rows
```
Output now is
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L],
cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L)#63,
[id#64L, day#65L, lefts#66, rights#67]
:- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS,
[plan_id=117]
: +- ...
+- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#29L, day#30L, 200),
ENSURE_REQUIREMENTS, [plan_id=118]
+- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L]
+- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L,
specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$()))
AS day_sum#54L], [day#30L, id#29L]
+- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST],
false, 0
+- Exchange hashpartitioning(day#30L, id#29L, 200),
ENSURE_REQUIREMENTS, [plan_id=112]
+- ...
+---+---+-----+------+
| id|day|lefts|rights|
+---+---+-----+------+
| 0| 13| 1| 1|
| 0| 63| 1| 1|
| 0| 89| 1| 1|
| 0| 95| 1| 1|
| 0| 96| 1| 1|
+---+---+-----+------+
only showing top 5 rows
```
Spark 3.3
[reworked](https://github.com/apache/spark/pull/32875/files#diff-e938569a4ca4eba8f7e10fe473d4f9c306ea253df151405bcaba880a601f075fR75-R76)
`HashClusteredDistribution`, and is not sensitive to using
`ClusteredDistribution`: #32875
### Does this PR introduce _any_ user-facing change?
This fixes correctness.
### How was this patch tested?
A unit test in `EnsureRequirementsSuite`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]