Hi Michael,

I guess as ever your mileage varies. My suggestion is that you try saling
and see whether it will retain the ordering. The most significant column
will be step_id so I guess it will be OK.

HTH


Mich




   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 28 Apr 2021 at 18:23, Michael Doo <michael....@gopuff.com> wrote:

> Hi Mich,
>
> Thank you for the suggestions. I took a look at the other thread you
> mentioned. One feature of my code that I'm not sure would be affected by
> salting is the use of collect_list(). My understanding is that
> collect_list() will retain the row ordering of values. You can see in my
> Window definition, I'm ordering by a step_id field so my expectation when I
> run collect_list() over that window is that I'll get a list like [step_1,
> step_2, step_3, ...]. I'm happy to try salting the dataset if I can retain
> the order.
>
> On Tue, Apr 27, 2021 at 5:58 PM Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>> Hi,
>>
>> Let us go back and understand this behaviour.
>>
>> Sounds like your partitioning with (user_id, group_id) results in skewed
>> data.
>>
>> We just had a similar skewed data issue/thread with title
>>
>> "Tasks are skewed to one executor"
>>
>> Have a look at that one and see whether any of those suggestions like
>> adding partition_id  to repartition() or salt will help
>>
>>
>> HTH
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Tue, 27 Apr 2021 at 22:15, Michael Doo <michael....@gopuff.com> wrote:
>>
>>> Hello!
>>>
>>> I have a data set that I'm trying to process in PySpark. The data (on
>>> disk as Parquet) contains user IDs, session IDs, and metadata related to
>>> each session. I'm adding a number of columns to my dataframe that are the
>>> result of aggregating over a window. The issue I'm running into is that all
>>> but 4-6 executors will complete quickly and the rest run forever without
>>> completing. My code sample is below this message.
>>>
>>> In my logs, I see this over and over:
>>> INFO ExternalAppendOnlyUnsafeRowArray: Reached spill threshold of 4096
>>> rows, switching to
>>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
>>> INFO UnsafeExternalSorter: Thread 92 spilling sort data of 9.2 GB to
>>> disk (2 times so far)
>>> INFO UnsafeExternalSorter: Thread 91 spilling sort data of 19.3 GB to
>>> disk (0 time so far)
>>>
>>> Which suggests that Spark can't hold all the windowed data in memory. I
>>> tried increasing the internal settings
>>> spark.sql.windowExec.buffer.in.memory.threshold and
>>> spark.sql.windowExec.buffer.spill.threshold, which helped a little but
>>> there are still executors not completing.
>>>
>>> I believe this is all caused by some skew in the data. Grouping by both
>>> user_id and session_id, there are 5 entries with a count >= 10,000, 100
>>> records with a count between 1,000 and 10,000, and 150,000 entries with a
>>> count less than 1,000 (usually count = 1).
>>>
>>> Thanks in advance!
>>> Michael
>>>
>>> Code:
>>> ```
>>>
>>> import pyspark.sql.functions as ffrom pyspark.sql.window import Window
>>>
>>> empty_col_a_cond = ((f.col("col_A").isNull()) |
>>>                          (f.col("col_A") == ""))
>>>
>>> session_window = Window.partitionBy("user_id", "session_id") \
>>>                        .orderBy(f.col("step_id").asc())
>>>
>>> output_df = (
>>>     input_df
>>>     .withColumn("col_A_val", f
>>>                 .when(empty_col_a_cond, f.lit("NA"))
>>>                 .otherwise(f.col("col_A")))
>>>     # ... 10 more added columns replacing nulls/empty strings
>>>     .repartition("user_id", "session_id")
>>>     .withColumn("s_user_id", f.first("user_id", True).over(session_window))
>>>     .withColumn("s_col_B", f.collect_list("col_B").over(session_window))
>>>     .withColumn("s_col_C", f.min("col_C").over(session_window))
>>>     .withColumn("s_col_D", f.max("col_D").over(session_window))
>>>     # ... 16 more added columns aggregating over session_window
>>>     .where(f.col("session_flag") == 1)
>>>     .where(f.array_contains(f.col("s_col_B"), "some_val"))
>>> )
>>>
>>> ```
>>>
>>

Reply via email to