Re: Spark decommission

2024-07-04 Thread Arun Ravi
Hi Rajesh,

We use it production at scale. We run spark on kubernetes on aws cloud and
here are the key things that we do
1) we run driver on on-demand node
2) we have configured decommission along with fallback option on to S3, try
the latest single zone S3 for this.
3) We use pvc aware scheduling, ie spark ensures executors try to reuse
available storage volumes created by the driver before requesting for a new
one.
4) we have enabled kubernetes shuffle io wrapper plugin, this allows new
executors to re-register shuffle blocks that it identifies in the reused
pvc. This feature ensures shuffles from lost executors are served by new
executor that refuses the disk.
5) we also configure to retain decommissioned executor details so that
spark can ignore intermittent shuffle fetch failures.

Some of these are best effort, you could also tune number of threads needed
for decommissioning etc based on your workload and run environment.

On Thu, 27 Jun 2024, 09:03 Rajesh Mahindra,  wrote:

> Hi folks,
>
> I am planning to leverage the "Spark Decommission" feature in production
> since our company uses SPOT instances on Kubernetes. I wanted to get a
> sense of how stable the feature is for production usage and if any one has
> thoughts around trying it out in production, especially in kubernetes
> environment.
>
> Thanks,
> Rajesh
>
>


Re: External Spark shuffle service for k8s

2024-04-10 Thread Arun Ravi
Hi Everyone,

I had to explored IBM's and AWS's S3 shuffle plugins (some time back), I
had also explored AWS FSX lustre in few of my production jobs which has
~20TB of shuffle operations with 200-300 executors. What I have observed is
S3 and fax behaviour was fine during the write phase, however I faced iops
throttling during the read phase(read taking forever to complete). I think
this might be contributed by the heavy use of shuffle index file (I didn't
perform any extensive research on this), so I believe the shuffle manager
logic have to be intelligent enough to reduce the fetching of files from
object store. In the end for my usecase I started using pvcs and pvc aware
scheduling along with decommissioning. So far performance is good with this
choice.

Thank you

On Tue, 9 Apr 2024, 15:17 Mich Talebzadeh, 
wrote:

> Hi,
>
> First thanks everyone for their contributions
>
> I was going to reply to @Enrico Minack   but
> noticed additional info. As I understand for example,  Apache Uniffle is an
> incubating project aimed at providing a pluggable shuffle service for
> Spark. So basically, all these "external shuffle services" have in common
> is to offload shuffle data management to external services, thus reducing
> the memory and CPU overhead on Spark executors. That is great.  While
> Uniffle and others enhance shuffle performance and scalability, it would be
> great to integrate them with Spark UI. This may require additional
> development efforts. I suppose  the interest would be to have these
> external matrices incorporated into Spark with one look and feel. This may
> require customizing the UI to fetch and display metrics or statistics from
> the external shuffle services. Has any project done this?
>
> Thanks
>
> Mich Talebzadeh,
> Technologist | Solutions Architect | Data Engineer  | Generative AI
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Mon, 8 Apr 2024 at 14:19, Vakaris Baškirov 
> wrote:
>
>> I see that both Uniffle and Celebron support S3/HDFS backends which is
>> great.
>> In the case someone is using S3/HDFS, I wonder what would be the
>> advantages of using Celebron or Uniffle vs IBM shuffle service plugin
>>  or Cloud Shuffle Storage
>> Plugin from AWS
>> 
>> ?
>>
>> These plugins do not require deploying a separate service. Are there any
>> advantages to using Uniffle/Celebron in the case of using S3 backend, which
>> would require deploying a separate service?
>>
>> Thanks
>> Vakaris
>>
>> On Mon, Apr 8, 2024 at 10:03 AM roryqi  wrote:
>>
>>> Apache Uniffle (incubating) may be another solution.
>>> You can see
>>> https://github.com/apache/incubator-uniffle
>>>
>>> https://uniffle.apache.org/blog/2023/07/21/Uniffle%20-%20New%20chapter%20for%20the%20shuffle%20in%20the%20cloud%20native%20era
>>>
>>> Mich Talebzadeh  于2024年4月8日周一 07:15写道:
>>>
 Splendid

 The configurations below can be used with k8s deployments of Spark.
 Spark applications running on k8s can utilize these configurations to
 seamlessly access data stored in Google Cloud Storage (GCS) and Amazon S3.

 For Google GCS we may have

 spark_config_gcs = {
 "spark.kubernetes.authenticate.driver.serviceAccountName":
 "service_account_name",
 "spark.hadoop.fs.gs.impl":
 "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
 "spark.hadoop.google.cloud.auth.service.account.enable": "true",
 "spark.hadoop.google.cloud.auth.service.account.json.keyfile":
 "/path/to/keyfile.json",
 }

 For Amazon S3 similar

 spark_config_s3 = {
 "spark.kubernetes.authenticate.driver.serviceAccountName":
 "service_account_name",
 "spark.hadoop.fs.s3a.impl":
 "org.apache.hadoop.fs.s3a.S3AFileSystem",
 "spark.hadoop.fs.s3a.access.key": "s3_access_key",
 "spark.hadoop.fs.s3a.secret.key": "secret_key",
 }


 To implement these configurations and enable Spark applications to
 interact with GCS and S3, I guess we can approach it this way

 1) Spark Repository Integration: These configurations need to be added
 to the Spark repository as part of the supported configuration options for
 k8s deployments.

 2) Configuration Settings: Users need to specify these configurations
 when submitting Spark applications to a Kubernetes cluster. They can
 include these

Re: Clarification on ExecutorRoll Plugin & Ignore Decommission Fetch Failure

2023-08-25 Thread Arun Ravi
Hi Team,

Thank you for clarifying about decommission ignore fetch failure behavior.
Previously I was using Executor Rolling and Decommision and Ignore
Decommission Fetch Failure as a solution for all the problems. I understand
that Executor rolling must be carefully tuned to minimize fetch failures
along with the best effort failure ignore feature. Sorry, I have I have two
more follow-up questions.

   -  When is the block address broadcasted from the master? Is it at the
   beginning of a shuffle fetch stage and/or would it be refreshed before each
   shuffle fetch task?
  - It would be great if you could point me to this logic in the
  codebase so that I can read and understand it better.
   - If it gets refreshed before each task, would using the
   'excludeOnFailure' feature
   (spark.excludeOnFailure.task.maxTaskAttemptsPerExecutor) give better
   reliability?

Thank you once again.

Arun Ravi M V
B.Tech (Batch: 2010-2014)

Computer Science and Engineering

Govt. Model Engineering College
Cochin University Of Science And Technology
Kochi


On Sat, 26 Aug 2023 at 05:49, Dongjoon Hyun  wrote:

> Hi, Arun.
>
> Here are some answers to your questions.
>
> First, the fetch failure is irrelevant to the Executor Rolling feature
> because the plugin itself only asked the Spark scheduler to decommission
> it, not terminate it. More specifically, it's independent from the
> underlying Decommissioning feature's behavior. FYI, the following is the
> code. In other words, it's totally a behavior of the storage
> decommissioning feature and `spark.stage.ignoreDecommissionFetchFailure`
> configuration.
>
>
> https://github.com/apache/spark/blob/12f3c81c26ef639842b8a155e5fd5ccfa7705bea/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala#L84
>
> Second, for the following your comment,
> `spark.stage.ignoreDecommissionFetchFailure` is not designed to prevent
> FetchFailure. As you see in the documentation, it tries to ignore stage
> fetch failure caused by executor decommission during counting
> spark.stage.maxConsecutiveAttempts. Here is SPARK-40481 PR for details.
>
> > I notice that there are shuffle fetch failures in tasks and the above
> ignore decommission
> > configurations are not respected. The stage will go into retry. The
> decommissioned
> > executor logs clearly show the decommission was fully graceful and
> blocks were replicated
> > to other active executors/fallback.
>
> https://github.com/apache/spark/pull/37924
> [SPARK-40481][CORE] Ignore stage fetch failure caused by decommissioned
> executor
>
> Lastly, SPARK-40481 was not designed as a silver bullet from the
> beginning. Instead, it was a best effort approach at that time. The
> limitation was pointed out during the PR review and the PR description has
> the following warning.
>
> > Fetch failure might not be ignored when executors are in below condition,
> > but this is best effort approach based on current mechanism.
> > Stopped or terminated after finishing decommission
> > Under decommission process, then removed by driver with other reasons
>
>
> Dongjoon.
>
>
>
> On Fri, Aug 25, 2023 at 8:21 AM Arun Ravi  wrote:
>
>> Hi Team,
>> I am running Apache Spark  3.4.1 Application on K8s with the below
>> configuration related to executor rolling and Ignore Decommission Fetch
>> Failure.
>>
>> spark.plugins: "org.apache.spark.scheduler.cluster.k8s.ExecutorRollPlugin"
>> spark.kubernetes.executor.rollInterval: "1800s"
>> spark.kubernetes.executor.rollPolicy: "OUTLIER_NO_FALLBACK"
>> spark.kubernetes.executor.minTasksPerExecutorBeforeRolling: "100"
>>
>> spark.stage.ignoreDecommissionFetchFailure: "true"
>> spark.scheduler.maxRetainedRemovedDecommissionExecutors: "20"
>>
>> spark.decommission.enabled: "true"
>> spark.storage.decommission.enabled: "true"
>> spark.storage.decommission.fallbackStorage.path: "some-s3-path"
>> spark.storage.decommission.shuffleBlocks.maxThreads: "16"
>>
>> When an executor is decommissioned in the middle of the stage, I notice
>> that there are shuffle fetch failures in tasks and the above ignore
>> decommission configurations are not respected. The stage will go into
>> retry. The decommissioned executor logs clearly show the decommission was
>> fully graceful and blocks were replicated to other active
>> executors/fallback.
>>
>> May I know how I should be using Executor Rolling, without triggering
>> stage failures? I am using executor rolling to avoid executors being
>> removed by K8s due to memory pressure or oom issues as my spark job is
>> heavy on shuffling and has a lot of window functions. Any help will be
>> super useful.
>>
>>
>>
>> Arun Ravi M V
>> B.Tech (Batch: 2010-2014)
>>
>> Computer Science and Engineering
>>
>> Govt. Model Engineering College
>> Cochin University Of Science And Technology
>> Kochi
>>
>


Clarification on ExecutorRoll Plugin & Ignore Decommission Fetch Failure

2023-08-25 Thread Arun Ravi
Hi Team,
I am running Apache Spark  3.4.1 Application on K8s with the below
configuration related to executor rolling and Ignore Decommission Fetch
Failure.

spark.plugins: "org.apache.spark.scheduler.cluster.k8s.ExecutorRollPlugin"
spark.kubernetes.executor.rollInterval: "1800s"
spark.kubernetes.executor.rollPolicy: "OUTLIER_NO_FALLBACK"
spark.kubernetes.executor.minTasksPerExecutorBeforeRolling: "100"

spark.stage.ignoreDecommissionFetchFailure: "true"
spark.scheduler.maxRetainedRemovedDecommissionExecutors: "20"

spark.decommission.enabled: "true"
spark.storage.decommission.enabled: "true"
spark.storage.decommission.fallbackStorage.path: "some-s3-path"
spark.storage.decommission.shuffleBlocks.maxThreads: "16"

When an executor is decommissioned in the middle of the stage, I notice
that there are shuffle fetch failures in tasks and the above ignore
decommission configurations are not respected. The stage will go into
retry. The decommissioned executor logs clearly show the decommission was
fully graceful and blocks were replicated to other active
executors/fallback.

May I know how I should be using Executor Rolling, without triggering stage
failures? I am using executor rolling to avoid executors being removed by
K8s due to memory pressure or oom issues as my spark job is heavy on
shuffling and has a lot of window functions. Any help will be super useful.



Arun Ravi M V
B.Tech (Batch: 2010-2014)

Computer Science and Engineering

Govt. Model Engineering College
Cochin University Of Science And Technology
Kochi


Re: KubernetesLocalDiskShuffleDataIO mount path dependency doubt.

2023-08-11 Thread Arun Ravi
Hi Dongjoon,

Thank you for sharing about Old Protocol and clearing my doubt. I was able
to understand the difference between Spark 2 & 3. For now
`KubernetesLocalDiskShuffleDataIO` works fine for me.


Thanks,
Arun Ravi M V
B.Tech (Batch: 2010-2014)

Computer Science and Engineering

Govt. Model Engineering College
Cochin University Of Science And Technology
Kochi
arunrav...@gmail.com
+91 9995354581
Skype : arunravimv


On Fri, 11 Aug 2023 at 23:52, Dongjoon Hyun  wrote:

> Hi, Arun.
>
> SPARK-35593 (Support shuffle data recovery on the reused PVCs) was Apache
> Spark 3.2.0 feature whose plugin follows only the legacy Spark shuffle
> directory structure to be safe.
>
> You can see the AS-IS test coverage in the corresponding
> `KubernetesLocalDiskShuffleDataIOSuite`.
>
>
> https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleDataIOSuite.scala
>
> To be clear, Apache Spark keeps the supported directory structure without
> any changes for historic reasons.
>
> You can use different structures by simply implementing your own plugin
> like KubernetesLocalDiskShuffleDataIO. It's extensible.
>
> Dongjoon.
>
>
> On Fri, Aug 11, 2023 at 4:52 AM Arun Ravi  wrote:
>
>> Hi Team,
>>
>> I am using the recently released shuffle recovery feature using
>> `KubernetesLocalDiskShuffleDataIO` plugin class on Spark 3.4.1.
>>
>> Can someone explain why the mount path has spark-x/executor-x/ pattern
>> dependency? I got this path detail from this PR
>> <https://github.com/apache/spark/pull/42417>. Is it to avoid other
>> folders in the volume ? Also, does this mean the path should use executor
>> ID and spark app id or just hardcoded spark-x/executor-x/? Sorry, I
>> couldn't fully understand the reasoning for this. Any help will be super
>> useful.
>>
>>
>> Arun Ravi M V
>> B.Tech (Batch: 2010-2014)
>>
>> Computer Science and Engineering
>>
>> Govt. Model Engineering College
>> Cochin University Of Science And Technology
>> Kochi
>> arunrav...@gmail.com
>> +91 9995354581
>> Skype : arunravimv
>>
>


KubernetesLocalDiskShuffleDataIO mount path dependency doubt.

2023-08-11 Thread Arun Ravi
Hi Team,

I am using the recently released shuffle recovery feature using
`KubernetesLocalDiskShuffleDataIO` plugin class on Spark 3.4.1.

Can someone explain why the mount path has spark-x/executor-x/ pattern
dependency? I got this path detail from this PR
<https://github.com/apache/spark/pull/42417>. Is it to avoid other folders
in the volume ? Also, does this mean the path should use executor ID and
spark app id or just hardcoded spark-x/executor-x/? Sorry, I couldn't fully
understand the reasoning for this. Any help will be super useful.


Arun Ravi M V
B.Tech (Batch: 2010-2014)

Computer Science and Engineering

Govt. Model Engineering College
Cochin University Of Science And Technology
Kochi
arunrav...@gmail.com
+91 9995354581
Skype : arunravimv


Discussing the idea of Shared Volume block store client

2023-04-23 Thread Arun Ravi
Hi Spark Community,

Thank you for all the awesome features of Apache Spark 3.X like

   - GA of Spark On K8
   - Push-based external shuffle
   - Easy mounting of volume claims in Spark K8s

We run 1000s of spark jobs on K8s, Yarn (EMR), and Databricks. We run into
shuffle block loss and RDD loss due to a variety of infrastructure reasons
like spot loss, memory issues, etc. Not all resource managers that we use
today support external shuffle service natively, While all the existing
solution to this block loss revolves around using the ShuffleManager
interface to manage shuffle data into more reliable locations like cloud
storage or remotely managed storages (Uber's shuffle service). My
understanding is shuffle manager only handles shuffle blocks whereas the
external shuffle service (client) can handle any shuffle or RDD blocks and
can also support push shuffle merge. I was wondering what would be the
impact of implementing an alternate solution using shared mounted volumes
of high-performance filesystems and forcing the host local read
mechanism in spark to fetch these blocks in any executors that need them.
This should ideally solve both shuffle and RDD blocks.

The abstract of the idea is as follows

   - Mount shared volumes (eg: AWS FSx Lustre, EFS, etc, or other network
   filesystems) in spark driver and executors
   - The Network Block Store Client (extending the existing block store
   client ) retrieves executor info based on metadata stored in the shared
   volume.
   - override the getHostLocalDirs external block tore client to return all
   the registered executor local directories (to trigger host local read in
   spark codebase)
   - update ShuffleBlockFetcherIterator's partitionBlockByFetchMode to
   consider Shared volume mount blocks as host local
   - Deploy a small number of external shuffle servers to perform Shuffle
   block merge based on existing push merge logic thereby adding support for
   the same in non-yarn resource managers.
  - Implement Push Based Merge functions in  Network Block Store Client
  to work with this external shuffle servers


Thanks in advance for all the feedback and suggestions.

Arun Ravi M V
B.Tech (Batch: 2010-2014)

Computer Science and Engineering

Govt. Model Engineering College
Cochin University Of Science And Technology
Kochi
arunrav...@gmail.com
+91 9995354581
Skype : arunravimv