Hi,

Let us go back and understand this behaviour.

Sounds like your partitioning with (user_id, group_id) results in skewed
data.

We just had a similar skewed data issue/thread with title

"Tasks are skewed to one executor"

Have a look at that one and see whether any of those suggestions like
adding partition_id  to repartition() or salt will help


HTH


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 27 Apr 2021 at 22:15, Michael Doo <michael....@gopuff.com> wrote:

> Hello!
>
> I have a data set that I'm trying to process in PySpark. The data (on disk
> as Parquet) contains user IDs, session IDs, and metadata related to each
> session. I'm adding a number of columns to my dataframe that are the result
> of aggregating over a window. The issue I'm running into is that all but
> 4-6 executors will complete quickly and the rest run forever without
> completing. My code sample is below this message.
>
> In my logs, I see this over and over:
> INFO ExternalAppendOnlyUnsafeRowArray: Reached spill threshold of 4096
> rows, switching to
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
> INFO UnsafeExternalSorter: Thread 92 spilling sort data of 9.2 GB to disk
> (2 times so far)
> INFO UnsafeExternalSorter: Thread 91 spilling sort data of 19.3 GB to
> disk (0 time so far)
>
> Which suggests that Spark can't hold all the windowed data in memory. I
> tried increasing the internal settings
> spark.sql.windowExec.buffer.in.memory.threshold and
> spark.sql.windowExec.buffer.spill.threshold, which helped a little but
> there are still executors not completing.
>
> I believe this is all caused by some skew in the data. Grouping by both
> user_id and session_id, there are 5 entries with a count >= 10,000, 100
> records with a count between 1,000 and 10,000, and 150,000 entries with a
> count less than 1,000 (usually count = 1).
>
> Thanks in advance!
> Michael
>
> Code:
> ```
>
> import pyspark.sql.functions as ffrom pyspark.sql.window import Window
>
> empty_col_a_cond = ((f.col("col_A").isNull()) |
>                          (f.col("col_A") == ""))
>
> session_window = Window.partitionBy("user_id", "session_id") \
>                        .orderBy(f.col("step_id").asc())
>
> output_df = (
>     input_df
>     .withColumn("col_A_val", f
>                 .when(empty_col_a_cond, f.lit("NA"))
>                 .otherwise(f.col("col_A")))
>     # ... 10 more added columns replacing nulls/empty strings
>     .repartition("user_id", "session_id")
>     .withColumn("s_user_id", f.first("user_id", True).over(session_window))
>     .withColumn("s_col_B", f.collect_list("col_B").over(session_window))
>     .withColumn("s_col_C", f.min("col_C").over(session_window))
>     .withColumn("s_col_D", f.max("col_D").over(session_window))
>     # ... 16 more added columns aggregating over session_window
>     .where(f.col("session_flag") == 1)
>     .where(f.array_contains(f.col("s_col_B"), "some_val"))
> )
>
> ```
>

Reply via email to