Hi Shay,

Let me address the points you raised using the STAR methodology. I
apologize if it sounds a bit formal, but I find it  effective for clarity.

*Situation*
You encountered an issue while working with a Spark DataFrame where a
shuffle was unexpectedly triggered during the application of a window
function. This happened even though the data was already partitioned and
sorted by the key column (`key_col`). Specifically, the issue arose after
joining a large, bucketed table with a smaller DataFrame on the same column
used in the window function.

*Task:*
Your objective, as evident from your question, was to understand why Spark
introduced a shuffle for the window function despite the data being
pre-partitioned and sorted. In summary, we needed to identify the
underlying cause of this behavior and explore possible solutions to prevent
the unnecessary shuffle.

*Action:*
To investigate the issue and provide a reasonable explanation, I considered
several possibilities:

1. Partitioning Requirements:
   I mentioned the possibility that Spark introduced the shuffle to meet
its internal partitioning requirements for the window function. Although
the data was already partitioned by `key_col`, Spark might still trigger a
shuffle to ensure that the data distribution aligns perfectly with the
window function's needs.

2. Locality and Ordering:
   I considered that Spark might have required a shuffle to enforce global
sorting within partitions. Even though the data was locally sorted within
each bucket, Spark could still introduce a shuffle to ensure the window
function operates correctly across all partitions.

3. Adaptive Query Execution (AQE):
   You inquired whether AQE might have introduced the shuffle to optimize
performance based on runtime statistics. This is indeed a possibility, as
AQE can adjust the execution plan dynamically.

4. Compatibility and Partitioning Mismatch:
   There may be a mismatch in partitioning recognition between the join
operation and the window function. This mismatch could lead Spark to
introduce a shuffle, even when using the same `key_col`.

*Recommendations:*
To address these potential causes, I recommend the following steps:

- Check Spark's Understanding of Partitioning: Inspect the DataFrame’s
partitioning after the join operation to ensure it aligns with expectations.
- Disable AQE Temporarily: Turn off AQE to determine if it was influencing
the shuffle.
- Force Specific Partitioning: Repartition the DataFrame explicitly by
key_co` before applying the window function to see if this prevents the
shuffle.

HTH


Mich Talebzadeh,

Architect | Data Engineer | Data Science | Financial Crime
PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College
London <https://en.wikipedia.org/wiki/Imperial_College_London>
London, United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Fri, 16 Aug 2024 at 14:55, Shay Elbaz <sel...@paypal.com> wrote:

> Hi Mich, thank you for answering - much appreciated.
>
> This can cause uneven distribution of data, triggering a shuffle for the
> window function.
>
> Could you elaborate on the mechanism that can "trigger a shuffle for the
> window function"? I'm not familiar with it. (or are you referring to AQE?)
> In any case, there is no skew - the keys are GUIDs of events. Even if the
> data was skewed, the shuffle would end up exactly the same way as before
> the shuffle - the DF was already partitioned (and locally sorted) by the
> same key.
>
> Thanks again,
>
> Shay
>
>
>
> ------------------------------
> *From:* Mich Talebzadeh <mich.talebza...@gmail.com>
> *Sent:* Thursday, August 15, 2024 17:21
> *To:* Shay Elbaz <sel...@paypal.com.invalid>
> *Cc:* user@spark.apache.org <user@spark.apache.org>
> *Subject:* Re: Redundant(?) shuffle after join
>
> This message contains hyperlinks, take precaution before opening these
> links.
> The actual code is not given, so I am going with the plan output and your
> explanation
>
>
>    - You're joining a large, bucketed table with a smaller DataFrame on a
>    common column (key_col).
>    - The subsequent window function also uses key_col
>    - However, a shuffle occurs for the window function even though the
>    data is already partitioned by key_col
>
>
> Potential data skew, Though
>  the table is bucketed, there might be significant data skew within the
> buckets. This can cause uneven distribution of data, triggering a shuffle
> for the window function.
>
> import pyspark.sql.functions as F
> df = spark.table("your_bucketed_table")
> df = df.withColumn("approx_count", F.approx_count_distinct("key_col"))
> df.groupBy("key_col").agg(F.avg("approx_count").alias("avg_count")).show()
>
>
> HTH
> Mich Talebzadeh,
>
> Architect | Data Engineer | Data Science | Financial Crime
> PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College
> London <https://en.wikipedia.org/wiki/Imperial_College_London>
> London, United Kingdom
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
> Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>
>
> On Thu, 15 Aug 2024 at 14:30, Shay Elbaz <sel...@paypal.com.invalid>
> wrote:
>
> Hi Spark community,
>
> Please review the cleansed plan below. It is the result of joining a
> large, bucketed table with a smaller DF, and then applying a window
> function. Both the join and the window function use the same column, which
> is also the bucket column of the table ("key_col" in the plan).
> The join results in a map-side-join as expected, but then there is a
> shuffle for the window function, even though the data is already
> partitioned accordingly.
>
> Can anyone explain why?
>
> Using Spark 3.5.0
>
>
> Thanks,
> Shay
>
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- Project ...
>    +- Filter (rn#5441 = 1)
>       +- Window [row_number() windowspecdefinition(key_col#5394, _w0#5442 ASC 
> NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), 
> currentrow$())) AS rn#5441], [key_col#5394], [_w0#5442 ASC NULLS FIRST]
>          +- WindowGroupLimit [key_col#5394], [_w0#5442 ASC NULLS FIRST], 
> row_number(), 1, Final
>             +- Sort [key_col#5394 ASC NULLS FIRST, _w0#5442 ASC NULLS FIRST], 
> false, 0
>                +- Exchange hashpartitioning(key_col#5394, 80000), 
> ENSURE_REQUIREMENTS, [plan_id=592]
>                   +- WindowGroupLimit [key_col#5394], [_w0#5442 ASC NULLS 
> FIRST], row_number(), 1, Partial
>                      +- Sort [key_col#5394 ASC NULLS FIRST, _w0#5442 ASC 
> NULLS FIRST], false, 0
>                         +- Project ...  (key_col stays the same)
>                            +- Project [coalesce(key_col#0, key_col#5009) AS 
> key_col#5394, CASE WHEN ...
>                               +- SortMergeJoin [key_col#0], [key_col#5009], 
> FullOuter
>                                  :- Sort [key_col#0 ASC NULLS FIRST], false, 0
>                                  :  +- Project key_
>                                  :     +- FileScan parquet bucketed table ...
>                                  +- Sort [key_col#5009 ASC NULLS FIRST], 
> false, 0
>                                     +- Exchange 
> hashpartitioning(key_col#5009, 80000), REPARTITION_BY_NUM, [plan_id=572]
>                                        +- Project
>                                           +- Filter
>                                              +- Scan small table...
>
>
>

Reply via email to