OK got it Someone asked a similar but not related to shuffle question in Spark slack channel.. This is a simple Python code that creates shuffle files in shuffle_directory = "/tmp/spark_shuffles" and simulates working examples using a loop and periodically cleans up shuffle files older than 1 second.. Take it for a spin
import os import glob import time from datetime import datetime, timedelta import shutil from pyspark.sql import SparkSession def generate_shuffle_data(spark, shuffle_directory): # Generate some micky mouse data data = [("A", 1), ("B", 2), ("A", 3), ("C", 4), ("B", 5)] columns = ["column_to_check", "partition_column"] df = spark.createDataFrame(data, columns) df.printSchema() # Write DataFrame with shuffle to the specified output path df.write.mode("overwrite").partitionBy("partition_column").parquet(shuffle_directory) def simulate_long_lived_spark_app(): shuffle_directory = "/tmp/spark_shuffles" # Remove the directory if it exists if os.path.exists(shuffle_directory): shutil.rmtree(shuffle_directory) # Create the directory os.makedirs(shuffle_directory) spark = SparkSession.builder.appName("shuffleCleanupExample").getOrCreate() try: for iteration in range(1, 6): # Simulating 5 iterations of the long-lived Spark app print(f"Iteration {iteration}") # Generate and write shuffle data generate_shuffle_data(spark, shuffle_directory) # Perform some Spark operations (simulated processing) # your code # Periodically clean up shuffle files older than 1 second try: cleanup_unnecessary_shuffles(shuffle_directory, max_age_seconds=1) print("Shuffle cleanup successful.") except Exception as e: print(f"Error during shuffle cleanup: {str(e)}") # Simulate some delay between iterations time.sleep(2) finally: # Stop the Spark session spark.stop() def cleanup_unnecessary_shuffles(shuffle_directory, max_age_seconds): current_time = datetime.now() # Iterate through shuffle files in the directory for shuffle_file in os.listdir(shuffle_directory): shuffle_file_path = os.path.join(shuffle_directory, shuffle_file) # Check if it's a file and not a directory if os.path.isfile(shuffle_file_path): # Get the creation time of the file file_creation_time = datetime.fromtimestamp(os.path.getctime(shuffle_file_path)) # Calculate the age of the file in seconds age_seconds = (current_time - file_creation_time).total_seconds() try: # Check if the file is older than the specified max_age_seconds if age_seconds > max_age_seconds: # Perform cleanup (delete the file) os.remove(shuffle_file_path) print(f"Deleted old shuffle file: {shuffle_file_path}") except Exception as e: print(f"Error during cleanup: {str(e)}") # Run the simulation simulate_long_lived_spark_app() .and the output Iteration 1 root |-- column_to_check: string (nullable = true) |-- partition_column: long (nullable = true) Shuffle cleanup successful. Iteration 2 root |-- column_to_check: string (nullable = true) |-- partition_column: long (nullable = true) Shuffle cleanup successful. Iteration 3 root |-- column_to_check: string (nullable = true) |-- partition_column: long (nullable = true) Shuffle cleanup successful. Iteration 4 root |-- column_to_check: string (nullable = true) |-- partition_column: long (nullable = true) Shuffle cleanup successful. Iteration 5 root |-- column_to_check: string (nullable = true) |-- partition_column: long (nullable = true) Shuffle cleanup successful. HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Mon, 19 Feb 2024 at 08:27, Saha, Daniel <dans...@amazon.com> wrote: > Thanks for the suggestions Mich, Jörn, and Adam. > > > > The rationale for long-lived app with loop versus submitting multiple yarn > applications is mainly for simplicity. Plan to run app on an multi-tenant > EMR cluster alongside other yarn apps. Implementing the loop outside the > Spark app will work but introduces more complexity compared to single > long-lived Spark app with dynamic allocation + min executors. Specifically, > > - Introduce component that submits an EMR step to run `spark-submit` > - Define YARN queue for my app such resources are reserved and other > tenants will not prevent my app from entering RUNNING state > - Determine whether the previous YARN app is FINISHED (or just submit > a bunch of apps up front and rely on yarn SUBMITTED/ACCEPTED states) > > > > So I really was hoping for being able to recreate the Spark Context, or at > least find some way to trigger a clean of the DiskBlockManager in between > loop iterations. If no way to do this, I will test performance of cloud > based shuffle. This might be better for cost savings too (S3 vs. EBS) and > allow me to use smaller instances too (I was using beefy instances and > beefy executors to improve shuffle locality). > > > > To the other points: > > 1. Dynamic allocation is enabled suspect not the issue here. Enabling > `spark.shuffle.service.removeShuffle` didn’t seem to help much – likely > because executors are not being decommissioned often due to nature of the > tight loop and the fact executor timeout was already raised from 60s > default to 300s. > 2. Cloud shuffle + S3 lifecycle policy or brute force/cron removing > files will for sure work but looking for something more “elegant” > 3. Shuffle data should be deleted after it’s no longer needed From > my understanding of the spark codebase the only time the DiskBlockManager > cleans the `spark.local.dir` directory [1] is when stop() is called – which > only happens when the SparkEnv is stopped [2]. > 4. Suspect spilled data is not what’s filling up disk since app barely > spills to disk [3]. Also supporting this hypothesis was that raising > `spark.shuffle.sort.bypassMergeThreshold` to above the max reducer > partitions significantly slowed the rate of disk usage > 5. > > Daniel > > > > [1] > https://github.com/apache/spark/blob/8f5a647b0bbb6e83ee484091d3422b24baea5a80/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala#L369 > > [2] > https://github.com/apache/spark/blob/c4e4497ff7e747eb71d087cdfb1b51673c53b83b/core/src/main/scala/org/apache/spark/SparkEnv.scala#L112 > > [3] Was able to eliminate most of the skew during repartitionByRange by > dynamically salting keys using the results of df.stat.countMinSketch > > > > > > *From: *Mich Talebzadeh <mich.talebza...@gmail.com> > *Date: *Sunday, February 18, 2024 at 1:38 AM > *Cc: *"user@spark.apache.org" <user@spark.apache.org> > *Subject: *RE: [EXTERNAL] Re-create SparkContext of SparkSession inside > long-lived Spark app > > > > *CAUTION*: This email originated from outside of the organization. Do not > click links or open attachments unless you can confirm the sender and know > the content is safe. > > > > Hi, > > > > What do you propose or you think will help when these spark jobs are > independent of each other --> So once a job/iterator is complete, there is > no need to retain these shuffle files. You have a number of options to > consider starting from spark configuration parameters and so forth > > > > https://spark.apache.org/docs/latest/configuration.html#shuffle-behavior > > > > Aside, have you turned on dynamic resource allocation and the relevant > parameters. Can you up executor memory -> spark.storage.,memoryFraction > and spark.shuffle.spillThreshold as well? You can of course use brute force > with shutil.rmtree(path) to remove these files. > > > > HTH > > > > Mich Talebzadeh, > > Dad | Technologist | Solutions Architect | Engineer > > London > > United Kingdom > > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* The information provided is correct to the best of my > knowledge but of course cannot be guaranteed . It is essential to note > that, as with any advice, one verified and tested result holds more weight > than a thousand expert opinions. > > > > > > On Sat, 17 Feb 2024 at 23:40, Saha, Daniel <dans...@amazon.com.invalid> > wrote: > > Hi, > > > > *Background*: I am running into executor disk space issues when running a > long-lived Spark 3.3 app with YARN on AWS EMR. The app performs > back-to-back spark jobs in a sequential loop with each iteration performing > 100gb+ shuffles. The files taking up the space are related to shuffle > blocks [1]. Disk is only cleared when restarting the YARN app. For all > intents and purposes, each job is independent. So once a job/iterator is > complete, there is no need to retain these shuffle files. I want to try > stopping and recreating the Spark context between loop iterations/jobs to > indicate to Spark DiskBlockManager that these intermediate results are no > longer needed [2]. > > > > *Questions*: > > - Are there better ways to remove/clean the directory containing these > old, no longer used, shuffle results (aside from cron or restarting yarn > app)? > - How to recreate the spark context within a single application? I see > no methods in Spark Session for doing this, and each new Spark session > re-uses the existing spark context. After stopping the SparkContext, > SparkSession does not re-create a new one. Further, creating a new > SparkSession via constructor and passing in a new SparkContext is not > allowed as it is a protected/private method. > > > > Thanks > > Daniel > > > > [1] > /mnt/yarn/usercache/hadoop/appcache/application_1706835946137_0110/blockmgr-eda47882-56d6-4248-8e30-a959ddb912c5 > > [2] https://stackoverflow.com/a/38791921 > >