Hi Mich,

Thank you for the suggestions. I took a look at the other thread you
mentioned. One feature of my code that I'm not sure would be affected by
salting is the use of collect_list(). My understanding is that
collect_list() will retain the row ordering of values. You can see in my
Window definition, I'm ordering by a step_id field so my expectation when I
run collect_list() over that window is that I'll get a list like [step_1,
step_2, step_3, ...]. I'm happy to try salting the dataset if I can retain
the order.

On Tue, Apr 27, 2021 at 5:58 PM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Hi,
>
> Let us go back and understand this behaviour.
>
> Sounds like your partitioning with (user_id, group_id) results in skewed
> data.
>
> We just had a similar skewed data issue/thread with title
>
> "Tasks are skewed to one executor"
>
> Have a look at that one and see whether any of those suggestions like
> adding partition_id  to repartition() or salt will help
>
>
> HTH
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 27 Apr 2021 at 22:15, Michael Doo <michael....@gopuff.com> wrote:
>
>> Hello!
>>
>> I have a data set that I'm trying to process in PySpark. The data (on
>> disk as Parquet) contains user IDs, session IDs, and metadata related to
>> each session. I'm adding a number of columns to my dataframe that are the
>> result of aggregating over a window. The issue I'm running into is that all
>> but 4-6 executors will complete quickly and the rest run forever without
>> completing. My code sample is below this message.
>>
>> In my logs, I see this over and over:
>> INFO ExternalAppendOnlyUnsafeRowArray: Reached spill threshold of 4096
>> rows, switching to
>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
>> INFO UnsafeExternalSorter: Thread 92 spilling sort data of 9.2 GB to
>> disk (2 times so far)
>> INFO UnsafeExternalSorter: Thread 91 spilling sort data of 19.3 GB to
>> disk (0 time so far)
>>
>> Which suggests that Spark can't hold all the windowed data in memory. I
>> tried increasing the internal settings
>> spark.sql.windowExec.buffer.in.memory.threshold and
>> spark.sql.windowExec.buffer.spill.threshold, which helped a little but
>> there are still executors not completing.
>>
>> I believe this is all caused by some skew in the data. Grouping by both
>> user_id and session_id, there are 5 entries with a count >= 10,000, 100
>> records with a count between 1,000 and 10,000, and 150,000 entries with a
>> count less than 1,000 (usually count = 1).
>>
>> Thanks in advance!
>> Michael
>>
>> Code:
>> ```
>>
>> import pyspark.sql.functions as ffrom pyspark.sql.window import Window
>>
>> empty_col_a_cond = ((f.col("col_A").isNull()) |
>>                          (f.col("col_A") == ""))
>>
>> session_window = Window.partitionBy("user_id", "session_id") \
>>                        .orderBy(f.col("step_id").asc())
>>
>> output_df = (
>>     input_df
>>     .withColumn("col_A_val", f
>>                 .when(empty_col_a_cond, f.lit("NA"))
>>                 .otherwise(f.col("col_A")))
>>     # ... 10 more added columns replacing nulls/empty strings
>>     .repartition("user_id", "session_id")
>>     .withColumn("s_user_id", f.first("user_id", True).over(session_window))
>>     .withColumn("s_col_B", f.collect_list("col_B").over(session_window))
>>     .withColumn("s_col_C", f.min("col_C").over(session_window))
>>     .withColumn("s_col_D", f.max("col_D").over(session_window))
>>     # ... 16 more added columns aggregating over session_window
>>     .where(f.col("session_flag") == 1)
>>     .where(f.array_contains(f.col("s_col_B"), "some_val"))
>> )
>>
>> ```
>>
>

Reply via email to