Hi Mich, thank you for answering - much appreciated. This can cause uneven distribution of data, triggering a shuffle for the window function. Could you elaborate on the mechanism that can "trigger a shuffle for the window function"? I'm not familiar with it. (or are you referring to AQE?) In any case, there is no skew - the keys are GUIDs of events. Even if the data was skewed, the shuffle would end up exactly the same way as before the shuffle - the DF was already partitioned (and locally sorted) by the same key.
Thanks again, Shay ________________________________ From: Mich Talebzadeh <mich.talebza...@gmail.com> Sent: Thursday, August 15, 2024 17:21 To: Shay Elbaz <sel...@paypal.com.invalid> Cc: user@spark.apache.org <user@spark.apache.org> Subject: Re: Redundant(?) shuffle after join This message contains hyperlinks, take precaution before opening these links. The actual code is not given, so I am going with the plan output and your explanation * You're joining a large, bucketed table with a smaller DataFrame on a common column (key_col). * The subsequent window function also uses key_col * However, a shuffle occurs for the window function even though the data is already partitioned by key_col Potential data skew, Though the table is bucketed, there might be significant data skew within the buckets. This can cause uneven distribution of data, triggering a shuffle for the window function. import pyspark.sql.functions as F df = spark.table("your_bucketed_table") df = df.withColumn("approx_count", F.approx_count_distinct("key_col")) df.groupBy("key_col").agg(F.avg("approx_count").alias("avg_count")).show() HTH Mich Talebzadeh, Architect | Data Engineer | Data Science | Financial Crime PhD<https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College London<https://en.wikipedia.org/wiki/Imperial_College_London> London, United Kingdom [https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE] view my Linkedin profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh Disclaimer: The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun> Von Braun<https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Thu, 15 Aug 2024 at 14:30, Shay Elbaz <sel...@paypal.com.invalid> wrote: Hi Spark community, Please review the cleansed plan below. It is the result of joining a large, bucketed table with a smaller DF, and then applying a window function. Both the join and the window function use the same column, which is also the bucket column of the table ("key_col" in the plan). The join results in a map-side-join as expected, but then there is a shuffle for the window function, even though the data is already partitioned accordingly. Can anyone explain why? Using Spark 3.5.0 Thanks, Shay == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Project ... +- Filter (rn#5441 = 1) +- Window [row_number() windowspecdefinition(key_col#5394, _w0#5442 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#5441], [key_col#5394], [_w0#5442 ASC NULLS FIRST] +- WindowGroupLimit [key_col#5394], [_w0#5442 ASC NULLS FIRST], row_number(), 1, Final +- Sort [key_col#5394 ASC NULLS FIRST, _w0#5442 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(key_col#5394, 80000), ENSURE_REQUIREMENTS, [plan_id=592] +- WindowGroupLimit [key_col#5394], [_w0#5442 ASC NULLS FIRST], row_number(), 1, Partial +- Sort [key_col#5394 ASC NULLS FIRST, _w0#5442 ASC NULLS FIRST], false, 0 +- Project ... (key_col stays the same) +- Project [coalesce(key_col#0, key_col#5009) AS key_col#5394, CASE WHEN ... +- SortMergeJoin [key_col#0], [key_col#5009], FullOuter :- Sort [key_col#0 ASC NULLS FIRST], false, 0 : +- Project key_ : +- FileScan parquet bucketed table ... +- Sort [key_col#5009 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(key_col#5009, 80000), REPARTITION_BY_NUM, [plan_id=572] +- Project +- Filter +- Scan small table...