OK got it

Someone asked a similar but not related to shuffle question  in Spark slack
channel.. This is a simple Python code that creates shuffle files in
shuffle_directory = "/tmp/spark_shuffles" and simulates  working examples
using a loop and periodically cleans up shuffle files older than 1 second..
Take it for a spin

import os
import glob
import time
from datetime import datetime, timedelta
import shutil
from pyspark.sql import SparkSession

def generate_shuffle_data(spark, shuffle_directory):
    # Generate some micky mouse data
    data = [("A", 1), ("B", 2), ("A", 3), ("C", 4), ("B", 5)]
    columns = ["column_to_check", "partition_column"]
    df = spark.createDataFrame(data, columns)
    df.printSchema()

    # Write DataFrame with shuffle to the specified output path

df.write.mode("overwrite").partitionBy("partition_column").parquet(shuffle_directory)

def simulate_long_lived_spark_app():
    shuffle_directory = "/tmp/spark_shuffles"

    # Remove the directory if it exists
    if os.path.exists(shuffle_directory):
        shutil.rmtree(shuffle_directory)

    # Create the directory
    os.makedirs(shuffle_directory)

    spark =
SparkSession.builder.appName("shuffleCleanupExample").getOrCreate()

    try:
        for iteration in range(1, 6):  # Simulating 5 iterations of the
long-lived Spark app
            print(f"Iteration {iteration}")

            # Generate and write shuffle data
            generate_shuffle_data(spark, shuffle_directory)

            # Perform some Spark operations (simulated processing)
            #  your code

            # Periodically clean up shuffle files older than 1 second
            try:
                cleanup_unnecessary_shuffles(shuffle_directory,
max_age_seconds=1)
                print("Shuffle cleanup successful.")
            except Exception as e:
                print(f"Error during shuffle cleanup: {str(e)}")

            # Simulate some delay between iterations
            time.sleep(2)

    finally:
        # Stop the Spark session
        spark.stop()

def cleanup_unnecessary_shuffles(shuffle_directory, max_age_seconds):
    current_time = datetime.now()

    # Iterate through shuffle files in the directory
    for shuffle_file in os.listdir(shuffle_directory):
        shuffle_file_path = os.path.join(shuffle_directory, shuffle_file)

        # Check if it's a file and not a directory
        if os.path.isfile(shuffle_file_path):
            # Get the creation time of the file
            file_creation_time =
datetime.fromtimestamp(os.path.getctime(shuffle_file_path))

            # Calculate the age of the file in seconds
            age_seconds = (current_time -
file_creation_time).total_seconds()

            try:
                # Check if the file is older than the specified
max_age_seconds
                if age_seconds > max_age_seconds:
                    # Perform cleanup (delete the file)
                    os.remove(shuffle_file_path)
                    print(f"Deleted old shuffle file: {shuffle_file_path}")

            except Exception as e:
                print(f"Error during cleanup: {str(e)}")

# Run the simulation
simulate_long_lived_spark_app()

.and the output

Iteration 1
root
 |-- column_to_check: string (nullable = true)
 |-- partition_column: long (nullable = true)

Shuffle cleanup successful.
Iteration 2
root
 |-- column_to_check: string (nullable = true)
 |-- partition_column: long (nullable = true)

Shuffle cleanup successful.
Iteration 3
root
 |-- column_to_check: string (nullable = true)
 |-- partition_column: long (nullable = true)

Shuffle cleanup successful.
Iteration 4
root
 |-- column_to_check: string (nullable = true)
 |-- partition_column: long (nullable = true)

Shuffle cleanup successful.
Iteration 5
root
 |-- column_to_check: string (nullable = true)
 |-- partition_column: long (nullable = true)

Shuffle cleanup successful.

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Mon, 19 Feb 2024 at 08:27, Saha, Daniel <dans...@amazon.com> wrote:

> Thanks for the suggestions Mich, Jörn, and Adam.
>
>
>
> The rationale for long-lived app with loop versus submitting multiple yarn
> applications is mainly for simplicity. Plan to run app on an multi-tenant
> EMR cluster alongside other yarn apps. Implementing the loop outside the
> Spark app will work but introduces more complexity compared to single
> long-lived Spark app with dynamic allocation + min executors. Specifically,
>
>    - Introduce component that submits an EMR step to run `spark-submit`
>    - Define YARN queue for my app such resources are reserved and other
>    tenants will not prevent my app from entering RUNNING state
>    - Determine whether the previous YARN app is FINISHED (or just submit
>    a bunch of apps up front and rely on yarn SUBMITTED/ACCEPTED states)
>
>
>
> So I really was hoping for being able to recreate the Spark Context, or at
> least find some way to trigger a clean of the DiskBlockManager in between
> loop iterations. If no way to do this, I will test performance of cloud
> based shuffle. This might be better for cost savings too (S3 vs. EBS) and
> allow me to use smaller instances too (I was using beefy instances and
> beefy executors to improve shuffle locality).
>
>
>
> To the other points:
>
>    1. Dynamic allocation is enabled suspect not the issue here. Enabling
>     `spark.shuffle.service.removeShuffle`  didn’t seem to help much – likely
>    because executors are not being decommissioned often due to nature of the
>    tight loop and the fact executor timeout was already raised from 60s
>    default to 300s.
>    2. Cloud shuffle + S3 lifecycle policy or brute force/cron removing
>    files will for sure work but looking for something more “elegant”
>    3. Shuffle data should be deleted after it’s no longer needed  From
>    my understanding of the spark codebase the only time the DiskBlockManager
>    cleans the `spark.local.dir` directory [1] is when stop() is called – which
>    only happens when the SparkEnv is stopped [2].
>    4. Suspect spilled data is not what’s filling up disk since app barely
>    spills to disk [3]. Also supporting this hypothesis was that raising
>    `spark.shuffle.sort.bypassMergeThreshold` to above the max reducer
>    partitions significantly slowed the rate of disk usage
>    5.
>
> Daniel
>
>
>
> [1]
> https://github.com/apache/spark/blob/8f5a647b0bbb6e83ee484091d3422b24baea5a80/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala#L369
>
> [2]
> https://github.com/apache/spark/blob/c4e4497ff7e747eb71d087cdfb1b51673c53b83b/core/src/main/scala/org/apache/spark/SparkEnv.scala#L112
>
> [3] Was able to eliminate most of the skew during repartitionByRange by
> dynamically salting keys using the results of df.stat.countMinSketch
>
>
>
>
>
> *From: *Mich Talebzadeh <mich.talebza...@gmail.com>
> *Date: *Sunday, February 18, 2024 at 1:38 AM
> *Cc: *"user@spark.apache.org" <user@spark.apache.org>
> *Subject: *RE: [EXTERNAL] Re-create SparkContext of SparkSession inside
> long-lived Spark app
>
>
>
> *CAUTION*: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
>
>
> Hi,
>
>
>
> What do you propose or you think will help when these spark jobs are
> independent of each other --> So once a job/iterator is complete, there is
> no need to retain these shuffle files. You have a number of options to
> consider starting from spark configuration parameters and so forth
>
>
>
> https://spark.apache.org/docs/latest/configuration.html#shuffle-behavior
>
>
>
> Aside, have you turned on dynamic resource allocation and the relevant
> parameters. Can you up executor memory -> spark.storage.,memoryFraction
> and spark.shuffle.spillThreshold as well? You can of course use brute force
> with shutil.rmtree(path) to remove these files.
>
>
>
> HTH
>
>
>
> Mich Talebzadeh,
>
> Dad | Technologist | Solutions Architect | Engineer
>
> London
>
> United Kingdom
>
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, one verified and tested result holds more weight
> than a thousand expert opinions.
>
>
>
>
>
> On Sat, 17 Feb 2024 at 23:40, Saha, Daniel <dans...@amazon.com.invalid>
> wrote:
>
> Hi,
>
>
>
> *Background*: I am running into executor disk space issues when running a
> long-lived Spark 3.3 app with YARN on AWS EMR. The app performs
> back-to-back spark jobs in a sequential loop with each iteration performing
> 100gb+ shuffles. The files taking up the space are related to shuffle
> blocks [1]. Disk is only cleared when restarting the YARN app. For all
> intents and purposes, each job is independent. So once a job/iterator is
> complete, there is no need to retain these shuffle files. I want to try
> stopping and recreating the Spark context between loop iterations/jobs to
> indicate to Spark DiskBlockManager that these intermediate results are no
> longer needed [2].
>
>
>
> *Questions*:
>
>    - Are there better ways to remove/clean the directory containing these
>    old, no longer used, shuffle results (aside from cron or restarting yarn
>    app)?
>    - How to recreate the spark context within a single application? I see
>    no methods in Spark Session for doing this, and each new Spark session
>    re-uses the existing spark context. After stopping the SparkContext,
>    SparkSession does not re-create a new one. Further, creating a new
>    SparkSession via constructor and passing in a new SparkContext is not
>    allowed as it is a protected/private method.
>
>
>
> Thanks
>
> Daniel
>
>
>
> [1]
> /mnt/yarn/usercache/hadoop/appcache/application_1706835946137_0110/blockmgr-eda47882-56d6-4248-8e30-a959ddb912c5
>
> [2] https://stackoverflow.com/a/38791921
>
>

Reply via email to