EnricoMi opened a new pull request, #39717:
URL: https://github.com/apache/spark/pull/39717

   ### What changes were proposed in this pull request?
   Make `FlatMapCoGroupsInPandas` (used by PySpark) report its required child 
distribution as `HashClusteredDistribution`, rather than 
`ClusteredDistribution`. That is the same distribution as reported by `CoGroup` 
(used by Scala).
   
   ### Why are the changes needed?
   This allows the `EnsureRequirements` rule to correctly recognizes that 
`FlatMapCoGroupsInPandas` requiring `HashClusteredDistribution(id, day)` is not 
compatible with `HashPartitioning(day, id)`, while `ClusteredDistribution(id, 
day)` is compatible with `HashPartitioning(day, id)`.
   
   The following example returns an incorrect result in Spark 3.0, 3.1, and 3.2.
   
   ```Scala
   import org.apache.spark.sql.expressions.Window
   import org.apache.spark.sql.functions.{col, lit, sum}
   
   val ids = 1000
   val days = 1000
   val parts = 10
   
   val id_df = spark.range(ids)
   val day_df = spark.range(days).withColumnRenamed("id", "day")
   val id_day_df = id_df.join(day_df)
   // these redundant aliases are needed to workaround bug SPARK-42132
   val left_df = id_day_df.select($"id".as("id"), $"day".as("day"), 
lit("left").as("side")).repartition(parts).cache()
   val right_df = id_day_df.select($"id".as("id"), $"day".as("day"), 
lit("right").as("side")).repartition(parts).cache()  //.withColumnRenamed("id", 
"id2")
   
   // note the column order is different to the groupBy("id", "day") column 
order below
   val window = Window.partitionBy("day", "id")
   
   case class Key(id: BigInt, day: BigInt)
   case class Value(id: BigInt, day: BigInt, side: String)
   case class Sum(id: BigInt, day: BigInt, side: String, day_sum: BigInt)
   
   val left_grouped_df = left_df.groupBy("id", "day").as[Key, Value]
   val right_grouped_df = right_df.withColumn("day_sum", 
sum(col("day")).over(window)).groupBy("id", "day").as[Key, Sum]
   
   val df = left_grouped_df.cogroup(right_grouped_df)((key: Key, left: 
Iterator[Value], right: Iterator[Sum]) => left)
   
   df.explain()
   df.show(5)
   ```
   
   Output was
   ```
   == Physical Plan ==
   AdaptiveSparkPlan isFinalPlan=false
   +- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], 
cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L), 
[id#64L, day#65L, lefts#66, rights#67]
      :- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, 
[plan_id=117]
      :     +- ...
      +- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0
         +- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L]
            +- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) 
AS day_sum#54L], [day#30L, id#29L]
               +- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], 
false, 0
                  +- Exchange hashpartitioning(day#30L, id#29L, 200), 
ENSURE_REQUIREMENTS, [plan_id=112]
                     +- ...
   
   
   +---+---+-----+------+
   | id|day|lefts|rights|
   +---+---+-----+------+
   |  0|  3|    0|     1|
   |  0|  4|    0|     1|
   |  0| 13|    1|     0|
   |  0| 27|    0|     1|
   |  0| 31|    0|     1|
   +---+---+-----+------+
   only showing top 5 rows
   ```
   
   Output now is
   ```
   == Physical Plan ==
   AdaptiveSparkPlan isFinalPlan=false
   +- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], 
cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L)#63, 
[id#64L, day#65L, lefts#66, rights#67]
      :- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, 
[plan_id=117]
      :     +- ...
      +- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0
        +- Exchange hashpartitioning(id#29L, day#30L, 200), 
ENSURE_REQUIREMENTS, [plan_id=118]
            +- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L]
               +- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) 
AS day_sum#54L], [day#30L, id#29L]
                  +- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], 
false, 0
                     +- Exchange hashpartitioning(day#30L, id#29L, 200), 
ENSURE_REQUIREMENTS, [plan_id=112]
                        +- ...
   
   +---+---+-----+------+
   | id|day|lefts|rights|
   +---+---+-----+------+
   |  0| 13|    1|     1|
   |  0| 63|    1|     1|
   |  0| 89|    1|     1|
   |  0| 95|    1|     1|
   |  0| 96|    1|     1|
   +---+---+-----+------+
   only showing top 5 rows
   ```
   
   Spark 3.3 
[reworked](https://github.com/apache/spark/pull/32875/files#diff-e938569a4ca4eba8f7e10fe473d4f9c306ea253df151405bcaba880a601f075fR75-R76)
 `HashClusteredDistribution`, and is not sensitive to using 
`ClusteredDistribution`: #32875
   
   ### Does this PR introduce _any_ user-facing change?
   This fixes correctness.
   
   ### How was this patch tested?
   A unit test in `EnsureRequirementsSuite`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to