Hi Mich, thank you for answering - much appreciated.

This can cause uneven distribution of data, triggering a shuffle for the window 
function.
Could you elaborate on the mechanism that can "trigger a shuffle for the window 
function"? I'm not familiar with it. (or are you referring to AQE?)
In any case, there is no skew - the keys are GUIDs of events. Even if the data 
was skewed, the shuffle would end up exactly the same way as before the shuffle 
- the DF was already partitioned (and locally sorted) by the same key.

Thanks again,

Shay



________________________________
From: Mich Talebzadeh <mich.talebza...@gmail.com>
Sent: Thursday, August 15, 2024 17:21
To: Shay Elbaz <sel...@paypal.com.invalid>
Cc: user@spark.apache.org <user@spark.apache.org>
Subject: Re: Redundant(?) shuffle after join

This message contains hyperlinks, take precaution before opening these links.
The actual code is not given, so I am going with the plan output and your 
explanation


  *   You're joining a large, bucketed table with a smaller DataFrame on a 
common column (key_col).
  *   The subsequent window function also uses key_col
  *   However, a shuffle occurs for the window function even though the data is 
already partitioned by key_col


Potential data skew, Though

 the table is bucketed, there might be significant data skew within the 
buckets. This can cause uneven distribution of data, triggering a shuffle for 
the window function.

import pyspark.sql.functions as F
df = spark.table("your_bucketed_table")
df = df.withColumn("approx_count", F.approx_count_distinct("key_col"))
df.groupBy("key_col").agg(F.avg("approx_count").alias("avg_count")).show()


HTH
Mich Talebzadeh,

Architect | Data Engineer | Data Science | Financial Crime
PhD<https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College 
London<https://en.wikipedia.org/wiki/Imperial_College_London>
London, United Kingdom


 
[https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE]
   view my Linkedin 
profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge but 
of course cannot be guaranteed . It is essential to note that, as with any 
advice, quote "one test result is worth one-thousand expert opinions (Werner 
<https://en.wikipedia.org/wiki/Wernher_von_Braun> Von 
Braun<https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Thu, 15 Aug 2024 at 14:30, Shay Elbaz <sel...@paypal.com.invalid> wrote:
Hi Spark community,

Please review the cleansed plan below. It is the result of joining a large, 
bucketed table with a smaller DF, and then applying a window function. Both the 
join and the window function use the same column, which is also the bucket 
column of the table ("key_col" in the plan).
The join results in a map-side-join as expected, but then there is a shuffle 
for the window function, even though the data is already partitioned 
accordingly.

Can anyone explain why?

Using Spark 3.5.0


Thanks,
Shay


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project ...
   +- Filter (rn#5441 = 1)
      +- Window [row_number() windowspecdefinition(key_col#5394, _w0#5442 ASC 
NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), 
currentrow$())) AS rn#5441], [key_col#5394], [_w0#5442 ASC NULLS FIRST]
         +- WindowGroupLimit [key_col#5394], [_w0#5442 ASC NULLS FIRST], 
row_number(), 1, Final
            +- Sort [key_col#5394 ASC NULLS FIRST, _w0#5442 ASC NULLS FIRST], 
false, 0
               +- Exchange hashpartitioning(key_col#5394, 80000), 
ENSURE_REQUIREMENTS, [plan_id=592]
                  +- WindowGroupLimit [key_col#5394], [_w0#5442 ASC NULLS 
FIRST], row_number(), 1, Partial
                     +- Sort [key_col#5394 ASC NULLS FIRST, _w0#5442 ASC NULLS 
FIRST], false, 0
                        +- Project ...  (key_col stays the same)
                           +- Project [coalesce(key_col#0, key_col#5009) AS 
key_col#5394, CASE WHEN ...
                              +- SortMergeJoin [key_col#0], [key_col#5009], 
FullOuter
                                 :- Sort [key_col#0 ASC NULLS FIRST], false, 0
                                 :  +- Project key_
                                 :     +- FileScan parquet bucketed table ...
                                 +- Sort [key_col#5009 ASC NULLS FIRST], false, 0
                                    +- Exchange hashpartitioning(key_col#5009, 
80000), REPARTITION_BY_NUM, [plan_id=572]
                                       +- Project
                                          +- Filter
                                             +- Scan small table...

Reply via email to