Hi Mich, Thank you for the suggestions. I took a look at the other thread you mentioned. One feature of my code that I'm not sure would be affected by salting is the use of collect_list(). My understanding is that collect_list() will retain the row ordering of values. You can see in my Window definition, I'm ordering by a step_id field so my expectation when I run collect_list() over that window is that I'll get a list like [step_1, step_2, step_3, ...]. I'm happy to try salting the dataset if I can retain the order.
On Tue, Apr 27, 2021 at 5:58 PM Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Hi, > > Let us go back and understand this behaviour. > > Sounds like your partitioning with (user_id, group_id) results in skewed > data. > > We just had a similar skewed data issue/thread with title > > "Tasks are skewed to one executor" > > Have a look at that one and see whether any of those suggestions like > adding partition_id to repartition() or salt will help > > > HTH > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Tue, 27 Apr 2021 at 22:15, Michael Doo <michael....@gopuff.com> wrote: > >> Hello! >> >> I have a data set that I'm trying to process in PySpark. The data (on >> disk as Parquet) contains user IDs, session IDs, and metadata related to >> each session. I'm adding a number of columns to my dataframe that are the >> result of aggregating over a window. The issue I'm running into is that all >> but 4-6 executors will complete quickly and the rest run forever without >> completing. My code sample is below this message. >> >> In my logs, I see this over and over: >> INFO ExternalAppendOnlyUnsafeRowArray: Reached spill threshold of 4096 >> rows, switching to >> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter >> INFO UnsafeExternalSorter: Thread 92 spilling sort data of 9.2 GB to >> disk (2 times so far) >> INFO UnsafeExternalSorter: Thread 91 spilling sort data of 19.3 GB to >> disk (0 time so far) >> >> Which suggests that Spark can't hold all the windowed data in memory. I >> tried increasing the internal settings >> spark.sql.windowExec.buffer.in.memory.threshold and >> spark.sql.windowExec.buffer.spill.threshold, which helped a little but >> there are still executors not completing. >> >> I believe this is all caused by some skew in the data. Grouping by both >> user_id and session_id, there are 5 entries with a count >= 10,000, 100 >> records with a count between 1,000 and 10,000, and 150,000 entries with a >> count less than 1,000 (usually count = 1). >> >> Thanks in advance! >> Michael >> >> Code: >> ``` >> >> import pyspark.sql.functions as ffrom pyspark.sql.window import Window >> >> empty_col_a_cond = ((f.col("col_A").isNull()) | >> (f.col("col_A") == "")) >> >> session_window = Window.partitionBy("user_id", "session_id") \ >> .orderBy(f.col("step_id").asc()) >> >> output_df = ( >> input_df >> .withColumn("col_A_val", f >> .when(empty_col_a_cond, f.lit("NA")) >> .otherwise(f.col("col_A"))) >> # ... 10 more added columns replacing nulls/empty strings >> .repartition("user_id", "session_id") >> .withColumn("s_user_id", f.first("user_id", True).over(session_window)) >> .withColumn("s_col_B", f.collect_list("col_B").over(session_window)) >> .withColumn("s_col_C", f.min("col_C").over(session_window)) >> .withColumn("s_col_D", f.max("col_D").over(session_window)) >> # ... 16 more added columns aggregating over session_window >> .where(f.col("session_flag") == 1) >> .where(f.array_contains(f.col("s_col_B"), "some_val")) >> ) >> >> ``` >> >