Hi Shay, Let me address the points you raised using the STAR methodology. I apologize if it sounds a bit formal, but I find it effective for clarity.
*Situation* You encountered an issue while working with a Spark DataFrame where a shuffle was unexpectedly triggered during the application of a window function. This happened even though the data was already partitioned and sorted by the key column (`key_col`). Specifically, the issue arose after joining a large, bucketed table with a smaller DataFrame on the same column used in the window function. *Task:* Your objective, as evident from your question, was to understand why Spark introduced a shuffle for the window function despite the data being pre-partitioned and sorted. In summary, we needed to identify the underlying cause of this behavior and explore possible solutions to prevent the unnecessary shuffle. *Action:* To investigate the issue and provide a reasonable explanation, I considered several possibilities: 1. Partitioning Requirements: I mentioned the possibility that Spark introduced the shuffle to meet its internal partitioning requirements for the window function. Although the data was already partitioned by `key_col`, Spark might still trigger a shuffle to ensure that the data distribution aligns perfectly with the window function's needs. 2. Locality and Ordering: I considered that Spark might have required a shuffle to enforce global sorting within partitions. Even though the data was locally sorted within each bucket, Spark could still introduce a shuffle to ensure the window function operates correctly across all partitions. 3. Adaptive Query Execution (AQE): You inquired whether AQE might have introduced the shuffle to optimize performance based on runtime statistics. This is indeed a possibility, as AQE can adjust the execution plan dynamically. 4. Compatibility and Partitioning Mismatch: There may be a mismatch in partitioning recognition between the join operation and the window function. This mismatch could lead Spark to introduce a shuffle, even when using the same `key_col`. *Recommendations:* To address these potential causes, I recommend the following steps: - Check Spark's Understanding of Partitioning: Inspect the DataFrame’s partitioning after the join operation to ensure it aligns with expectations. - Disable AQE Temporarily: Turn off AQE to determine if it was influencing the shuffle. - Force Specific Partitioning: Repartition the DataFrame explicitly by key_co` before applying the window function to see if this prevents the shuffle. HTH Mich Talebzadeh, Architect | Data Engineer | Data Science | Financial Crime PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College London <https://en.wikipedia.org/wiki/Imperial_College_London> London, United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Fri, 16 Aug 2024 at 14:55, Shay Elbaz <sel...@paypal.com> wrote: > Hi Mich, thank you for answering - much appreciated. > > This can cause uneven distribution of data, triggering a shuffle for the > window function. > > Could you elaborate on the mechanism that can "trigger a shuffle for the > window function"? I'm not familiar with it. (or are you referring to AQE?) > In any case, there is no skew - the keys are GUIDs of events. Even if the > data was skewed, the shuffle would end up exactly the same way as before > the shuffle - the DF was already partitioned (and locally sorted) by the > same key. > > Thanks again, > > Shay > > > > ------------------------------ > *From:* Mich Talebzadeh <mich.talebza...@gmail.com> > *Sent:* Thursday, August 15, 2024 17:21 > *To:* Shay Elbaz <sel...@paypal.com.invalid> > *Cc:* user@spark.apache.org <user@spark.apache.org> > *Subject:* Re: Redundant(?) shuffle after join > > This message contains hyperlinks, take precaution before opening these > links. > The actual code is not given, so I am going with the plan output and your > explanation > > > - You're joining a large, bucketed table with a smaller DataFrame on a > common column (key_col). > - The subsequent window function also uses key_col > - However, a shuffle occurs for the window function even though the > data is already partitioned by key_col > > > Potential data skew, Though > the table is bucketed, there might be significant data skew within the > buckets. This can cause uneven distribution of data, triggering a shuffle > for the window function. > > import pyspark.sql.functions as F > df = spark.table("your_bucketed_table") > df = df.withColumn("approx_count", F.approx_count_distinct("key_col")) > df.groupBy("key_col").agg(F.avg("approx_count").alias("avg_count")).show() > > > HTH > Mich Talebzadeh, > > Architect | Data Engineer | Data Science | Financial Crime > PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College > London <https://en.wikipedia.org/wiki/Imperial_College_London> > London, United Kingdom > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* The information provided is correct to the best of my > knowledge but of course cannot be guaranteed . It is essential to note > that, as with any advice, quote "one test result is worth one-thousand > expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von > Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". > > > On Thu, 15 Aug 2024 at 14:30, Shay Elbaz <sel...@paypal.com.invalid> > wrote: > > Hi Spark community, > > Please review the cleansed plan below. It is the result of joining a > large, bucketed table with a smaller DF, and then applying a window > function. Both the join and the window function use the same column, which > is also the bucket column of the table ("key_col" in the plan). > The join results in a map-side-join as expected, but then there is a > shuffle for the window function, even though the data is already > partitioned accordingly. > > Can anyone explain why? > > Using Spark 3.5.0 > > > Thanks, > Shay > > == Physical Plan == > AdaptiveSparkPlan isFinalPlan=false > +- Project ... > +- Filter (rn#5441 = 1) > +- Window [row_number() windowspecdefinition(key_col#5394, _w0#5442 ASC > NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), > currentrow$())) AS rn#5441], [key_col#5394], [_w0#5442 ASC NULLS FIRST] > +- WindowGroupLimit [key_col#5394], [_w0#5442 ASC NULLS FIRST], > row_number(), 1, Final > +- Sort [key_col#5394 ASC NULLS FIRST, _w0#5442 ASC NULLS FIRST], > false, 0 > +- Exchange hashpartitioning(key_col#5394, 80000), > ENSURE_REQUIREMENTS, [plan_id=592] > +- WindowGroupLimit [key_col#5394], [_w0#5442 ASC NULLS > FIRST], row_number(), 1, Partial > +- Sort [key_col#5394 ASC NULLS FIRST, _w0#5442 ASC > NULLS FIRST], false, 0 > +- Project ... (key_col stays the same) > +- Project [coalesce(key_col#0, key_col#5009) AS > key_col#5394, CASE WHEN ... > +- SortMergeJoin [key_col#0], [key_col#5009], > FullOuter > :- Sort [key_col#0 ASC NULLS FIRST], false, 0 > : +- Project key_ > : +- FileScan parquet bucketed table ... > +- Sort [key_col#5009 ASC NULLS FIRST], > false, 0 > +- Exchange > hashpartitioning(key_col#5009, 80000), REPARTITION_BY_NUM, [plan_id=572] > +- Project > +- Filter > +- Scan small table... > > >