Re: [Spark-Core] Improving Reliability of spark when Executors OOM

2024-03-18 Thread Mridul Muralidharan
Hi Ashish,

  This is something we are still actively working on internally, but is
unfortunately not yet in a state to share widely yet.

Regards,
Mridul

On Mon, Mar 11, 2024 at 6:23 PM Ashish Singh  wrote:

> Hi Kalyan,
>
> Is this something you are still interested in pursuing? There are some
> open discussion threads on the doc you shared.
>
> @Mridul Muralidharan  In what state are your efforts
> along this? Is it something that your team is actively pursuing/ building
> or are mostly planning right now? Asking so that we can align efforts on
> this.
>
> On Sun, Feb 18, 2024 at 10:32 PM xiaoping.huang <1754789...@qq.com> wrote:
>
>> Hi all,
>> Any updates on this project? This will be a very useful feature.
>>
>> xiaoping.huang
>> 1754789...@qq.com
>>
>>  Replied Message 
>> From kalyan 
>> Date 02/6/2024 10:08
>> To Jay Han 
>> Cc Ashish Singh ,
>>  Mridul Muralidharan ,
>>  dev ,
>>  
>> 
>> Subject Re: [Spark-Core] Improving Reliability of spark when Executors
>> OOM
>> Hey,
>> Disk space not enough is also a reliability concern, but might need a
>> diff strategy to handle it.
>> As suggested by Mridul, I am working on making things more configurable
>> in another(new) module… with that, we can plug in new rules for each type
>> of error.
>>
>> Regards
>> Kalyan.
>>
>> On Mon, 5 Feb 2024 at 1:10 PM, Jay Han  wrote:
>>
>>> Hi,
>>> what about supporting for solving the disk space problem of "device
>>> space isn't enough"? I think it's same as OOM exception.
>>>
>>> kalyan  于2024年1月27日周六 13:00写道:
>>>
>>>> Hi all,
>>>>
>>>
>>>> Sorry for the delay in getting the first draft of (my first) SPIP out.
>>>>
>>>> https://docs.google.com/document/d/1hxEPUirf3eYwNfMOmUHpuI5dIt_HJErCdo7_yr9htQc/edit?pli=1
>>>>
>>>> Let me know what you think.
>>>>
>>>> Regards
>>>> kalyan.
>>>>
>>>> On Sat, Jan 20, 2024 at 8:19 AM Ashish Singh  wrote:
>>>>
>>>>> Hey all,
>>>>>
>>>>> Thanks for this discussion, the timing of this couldn't be better!
>>>>>
>>>>> At Pinterest, we recently started to look into reducing OOM failures
>>>>> while also reducing memory consumption of spark applications. We 
>>>>> considered
>>>>> the following options.
>>>>> 1. Changing core count on executor to change memory available per task
>>>>> in the executor.
>>>>> 2. Changing resource profile based on task failures and gc metrics to
>>>>> grow or shrink executor memory size. We do this at application level based
>>>>> on the app's past runs today.
>>>>> 3. K8s vertical pod autoscaler
>>>>> <https://github.com/kubernetes/autoscaler/tree/master/vertical-pod-autoscaler>
>>>>>
>>>>> Internally, we are mostly getting aligned on option 2. We would love
>>>>> to make this happen and are looking forward to the SPIP.
>>>>>
>>>>>
>>>>> On Wed, Jan 17, 2024 at 9:34 AM Mridul Muralidharan 
>>>>> wrote:
>>>>>
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>>   We are internally exploring adding support for dynamically changing
>>>>>> the resource profile of a stage based on runtime characteristics.
>>>>>> This includes failures due to OOM and the like, slowness due to
>>>>>> excessive GC, resource wastage due to excessive overprovisioning, etc.
>>>>>> Essentially handles scale up and scale down of resources.
>>>>>> Instead of baking these into the scheduler directly (which is already
>>>>>> complex), we are modeling it as a plugin - so that the 'business logic' 
>>>>>> of
>>>>>> how to handle task events and mutate state is pluggable.
>>>>>>
>>>>>> The main limitation I find with mutating only the cores is the limits
>>>>>> it places on what kind of problems can be solved with it - and mutating
>>>>>> resource profiles is a much more natural way to handle this
>>>>>> (spark.task.cpus predates RP).
>>>>>>
>>>>>> Regards,
>>>>>> Mridul
>>>>>>
>>>>>> On Wed, Jan 17, 2024 at 9:18 AM Tom 

Re: [Spark-Core] Improving Reliability of spark when Executors OOM

2024-03-11 Thread Ashish Singh
Hi Kalyan,

Is this something you are still interested in pursuing? There are some open
discussion threads on the doc you shared.

@Mridul Muralidharan  In what state are your efforts
along this? Is it something that your team is actively pursuing/ building
or are mostly planning right now? Asking so that we can align efforts on
this.

On Sun, Feb 18, 2024 at 10:32 PM xiaoping.huang <1754789...@qq.com> wrote:

> Hi all,
> Any updates on this project? This will be a very useful feature.
>
> xiaoping.huang
> 1754789...@qq.com
>
>  Replied Message 
> From kalyan 
> Date 02/6/2024 10:08
> To Jay Han 
> Cc Ashish Singh ,
>  Mridul Muralidharan ,
>  dev ,
>  
> 
> Subject Re: [Spark-Core] Improving Reliability of spark when Executors
> OOM
> Hey,
> Disk space not enough is also a reliability concern, but might need a diff
> strategy to handle it.
> As suggested by Mridul, I am working on making things more configurable in
> another(new) module… with that, we can plug in new rules for each type of
> error.
>
> Regards
> Kalyan.
>
> On Mon, 5 Feb 2024 at 1:10 PM, Jay Han  wrote:
>
>> Hi,
>> what about supporting for solving the disk space problem of "device space
>> isn't enough"? I think it's same as OOM exception.
>>
>> kalyan  于2024年1月27日周六 13:00写道:
>>
>>> Hi all,
>>>
>>
>>> Sorry for the delay in getting the first draft of (my first) SPIP out.
>>>
>>> https://docs.google.com/document/d/1hxEPUirf3eYwNfMOmUHpuI5dIt_HJErCdo7_yr9htQc/edit?pli=1
>>>
>>> Let me know what you think.
>>>
>>> Regards
>>> kalyan.
>>>
>>> On Sat, Jan 20, 2024 at 8:19 AM Ashish Singh  wrote:
>>>
>>>> Hey all,
>>>>
>>>> Thanks for this discussion, the timing of this couldn't be better!
>>>>
>>>> At Pinterest, we recently started to look into reducing OOM failures
>>>> while also reducing memory consumption of spark applications. We considered
>>>> the following options.
>>>> 1. Changing core count on executor to change memory available per task
>>>> in the executor.
>>>> 2. Changing resource profile based on task failures and gc metrics to
>>>> grow or shrink executor memory size. We do this at application level based
>>>> on the app's past runs today.
>>>> 3. K8s vertical pod autoscaler
>>>> <https://github.com/kubernetes/autoscaler/tree/master/vertical-pod-autoscaler>
>>>>
>>>> Internally, we are mostly getting aligned on option 2. We would love to
>>>> make this happen and are looking forward to the SPIP.
>>>>
>>>>
>>>> On Wed, Jan 17, 2024 at 9:34 AM Mridul Muralidharan 
>>>> wrote:
>>>>
>>>>>
>>>>> Hi,
>>>>>
>>>>>   We are internally exploring adding support for dynamically changing
>>>>> the resource profile of a stage based on runtime characteristics.
>>>>> This includes failures due to OOM and the like, slowness due to
>>>>> excessive GC, resource wastage due to excessive overprovisioning, etc.
>>>>> Essentially handles scale up and scale down of resources.
>>>>> Instead of baking these into the scheduler directly (which is already
>>>>> complex), we are modeling it as a plugin - so that the 'business logic' of
>>>>> how to handle task events and mutate state is pluggable.
>>>>>
>>>>> The main limitation I find with mutating only the cores is the limits
>>>>> it places on what kind of problems can be solved with it - and mutating
>>>>> resource profiles is a much more natural way to handle this
>>>>> (spark.task.cpus predates RP).
>>>>>
>>>>> Regards,
>>>>> Mridul
>>>>>
>>>>> On Wed, Jan 17, 2024 at 9:18 AM Tom Graves
>>>>>  wrote:
>>>>>
>>>>>> It is interesting. I think there are definitely some discussion
>>>>>> points around this.  reliability vs performance is always a trade off and
>>>>>> its great it doesn't fail but if it doesn't meet someone's SLA now that
>>>>>> could be as bad if its hard to figure out why.   I think if something 
>>>>>> like
>>>>>> this kicks in, it needs to be very obvious to the user so they can see 
>>>>>> that
>>>>>> it occurred.  Do you have something in place on UI or something that
>

Re: [Spark-Core] Improving Reliability of spark when Executors OOM

2024-02-05 Thread kalyan
Hey,
Disk space not enough is also a reliability concern, but might need a diff
strategy to handle it.
As suggested by Mridul, I am working on making things more configurable in
another(new) module… with that, we can plug in new rules for each type of
error.

Regards
Kalyan.

On Mon, 5 Feb 2024 at 1:10 PM, Jay Han  wrote:

> Hi,
> what about supporting for solving the disk space problem of "device space
> isn't enough"? I think it's same as OOM exception.
>
> kalyan  于2024年1月27日周六 13:00写道:
>
>> Hi all,
>>
>
>> Sorry for the delay in getting the first draft of (my first) SPIP out.
>>
>> https://docs.google.com/document/d/1hxEPUirf3eYwNfMOmUHpuI5dIt_HJErCdo7_yr9htQc/edit?pli=1
>>
>> Let me know what you think.
>>
>> Regards
>> kalyan.
>>
>> On Sat, Jan 20, 2024 at 8:19 AM Ashish Singh  wrote:
>>
>>> Hey all,
>>>
>>> Thanks for this discussion, the timing of this couldn't be better!
>>>
>>> At Pinterest, we recently started to look into reducing OOM failures
>>> while also reducing memory consumption of spark applications. We considered
>>> the following options.
>>> 1. Changing core count on executor to change memory available per task
>>> in the executor.
>>> 2. Changing resource profile based on task failures and gc metrics to
>>> grow or shrink executor memory size. We do this at application level based
>>> on the app's past runs today.
>>> 3. K8s vertical pod autoscaler
>>> 
>>>
>>> Internally, we are mostly getting aligned on option 2. We would love to
>>> make this happen and are looking forward to the SPIP.
>>>
>>>
>>> On Wed, Jan 17, 2024 at 9:34 AM Mridul Muralidharan 
>>> wrote:
>>>

 Hi,

   We are internally exploring adding support for dynamically changing
 the resource profile of a stage based on runtime characteristics.
 This includes failures due to OOM and the like, slowness due to
 excessive GC, resource wastage due to excessive overprovisioning, etc.
 Essentially handles scale up and scale down of resources.
 Instead of baking these into the scheduler directly (which is already
 complex), we are modeling it as a plugin - so that the 'business logic' of
 how to handle task events and mutate state is pluggable.

 The main limitation I find with mutating only the cores is the limits
 it places on what kind of problems can be solved with it - and mutating
 resource profiles is a much more natural way to handle this
 (spark.task.cpus predates RP).

 Regards,
 Mridul

 On Wed, Jan 17, 2024 at 9:18 AM Tom Graves 
 wrote:

> It is interesting. I think there are definitely some discussion points
> around this.  reliability vs performance is always a trade off and its
> great it doesn't fail but if it doesn't meet someone's SLA now that could
> be as bad if its hard to figure out why.   I think if something like this
> kicks in, it needs to be very obvious to the user so they can see that it
> occurred.  Do you have something in place on UI or something that 
> indicates
> this? The nice thing is also you aren't wasting memory by increasing it 
> for
> all tasks when maybe you only need it for one or two.  The downside is you
> are only finding out after failure.
>
> I do also worry a little bit that in your blog post, the error you
> pointed out isn't a java OOM but an off heap memory issue (overhead + heap
> usage).  You don't really address heap memory vs off heap in that article.
> Only thing I see mentioned is spark.executor.memory which is heap memory.
> Obviously adjusting to only run one task is going to give that task more
> overall memory but the reasons its running out in the first place could be
> different.  If it was on heap memory for instance with more tasks I would
> expect to see more GC and not executor OOM.  If you are getting executor
> OOM you are likely using more off heap memory/stack space, etc then you
> allocated.   Ultimately it would be nice to know why that is happening and
> see if we can address it to not fail in the first place.  That could be
> extremely difficult though, especially if using software outside Spark 
> that
> is using that memory.
>
> As Holden said,  we need to make sure this would play nice with the
> resource profiles, or potentially if we can use the resource profile
> functionality.  Theoretically you could extend this to try to get new
> executor if using dynamic allocation for instance.
>
> I agree doing a SPIP would be a good place to start to have more
> discussions.
>
> Tom
>
> On Wednesday, January 17, 2024 at 12:47:51 AM CST, kalyan <
> justfors...@gmail.com> wrote:
>
>
> Hello All,
>
> At Uber, we had recently, done some work on improving the reliability
> of spark applications in scenarios of 

Re: [Spark-Core] Improving Reliability of spark when Executors OOM

2024-02-04 Thread Jay Han
Hi,
what about supporting for solving the disk space problem of "device space
isn't enough"? I think it's same as OOM exception.

kalyan  于2024年1月27日周六 13:00写道:

> Hi all,
>
> Sorry for the delay in getting the first draft of (my first) SPIP out.
>
> https://docs.google.com/document/d/1hxEPUirf3eYwNfMOmUHpuI5dIt_HJErCdo7_yr9htQc/edit?pli=1
>
> Let me know what you think.
>
> Regards
> kalyan.
>
> On Sat, Jan 20, 2024 at 8:19 AM Ashish Singh  wrote:
>
>> Hey all,
>>
>> Thanks for this discussion, the timing of this couldn't be better!
>>
>> At Pinterest, we recently started to look into reducing OOM failures
>> while also reducing memory consumption of spark applications. We considered
>> the following options.
>> 1. Changing core count on executor to change memory available per task in
>> the executor.
>> 2. Changing resource profile based on task failures and gc metrics to
>> grow or shrink executor memory size. We do this at application level based
>> on the app's past runs today.
>> 3. K8s vertical pod autoscaler
>> 
>>
>> Internally, we are mostly getting aligned on option 2. We would love to
>> make this happen and are looking forward to the SPIP.
>>
>>
>> On Wed, Jan 17, 2024 at 9:34 AM Mridul Muralidharan 
>> wrote:
>>
>>>
>>> Hi,
>>>
>>>   We are internally exploring adding support for dynamically changing
>>> the resource profile of a stage based on runtime characteristics.
>>> This includes failures due to OOM and the like, slowness due to
>>> excessive GC, resource wastage due to excessive overprovisioning, etc.
>>> Essentially handles scale up and scale down of resources.
>>> Instead of baking these into the scheduler directly (which is already
>>> complex), we are modeling it as a plugin - so that the 'business logic' of
>>> how to handle task events and mutate state is pluggable.
>>>
>>> The main limitation I find with mutating only the cores is the limits it
>>> places on what kind of problems can be solved with it - and mutating
>>> resource profiles is a much more natural way to handle this
>>> (spark.task.cpus predates RP).
>>>
>>> Regards,
>>> Mridul
>>>
>>> On Wed, Jan 17, 2024 at 9:18 AM Tom Graves 
>>> wrote:
>>>
 It is interesting. I think there are definitely some discussion points
 around this.  reliability vs performance is always a trade off and its
 great it doesn't fail but if it doesn't meet someone's SLA now that could
 be as bad if its hard to figure out why.   I think if something like this
 kicks in, it needs to be very obvious to the user so they can see that it
 occurred.  Do you have something in place on UI or something that indicates
 this? The nice thing is also you aren't wasting memory by increasing it for
 all tasks when maybe you only need it for one or two.  The downside is you
 are only finding out after failure.

 I do also worry a little bit that in your blog post, the error you
 pointed out isn't a java OOM but an off heap memory issue (overhead + heap
 usage).  You don't really address heap memory vs off heap in that article.
 Only thing I see mentioned is spark.executor.memory which is heap memory.
 Obviously adjusting to only run one task is going to give that task more
 overall memory but the reasons its running out in the first place could be
 different.  If it was on heap memory for instance with more tasks I would
 expect to see more GC and not executor OOM.  If you are getting executor
 OOM you are likely using more off heap memory/stack space, etc then you
 allocated.   Ultimately it would be nice to know why that is happening and
 see if we can address it to not fail in the first place.  That could be
 extremely difficult though, especially if using software outside Spark that
 is using that memory.

 As Holden said,  we need to make sure this would play nice with the
 resource profiles, or potentially if we can use the resource profile
 functionality.  Theoretically you could extend this to try to get new
 executor if using dynamic allocation for instance.

 I agree doing a SPIP would be a good place to start to have more
 discussions.

 Tom

 On Wednesday, January 17, 2024 at 12:47:51 AM CST, kalyan <
 justfors...@gmail.com> wrote:


 Hello All,

 At Uber, we had recently, done some work on improving the reliability
 of spark applications in scenarios of fatter executors going out of memory
 and leading to application failure. Fatter executors are those that have
 more than 1 task running on it at a given time concurrently. This has
 significantly improved the reliability of many spark applications for us at
 Uber. We made a blog about this recently. Link:
 https://www.uber.com/en-US/blog/dynamic-executor-core-resizing-in-spark/

 At a high level, we have done the below 

Re: [Spark-Core] Improving Reliability of spark when Executors OOM

2024-01-26 Thread kalyan
Hi all,

Sorry for the delay in getting the first draft of (my first) SPIP out.
https://docs.google.com/document/d/1hxEPUirf3eYwNfMOmUHpuI5dIt_HJErCdo7_yr9htQc/edit?pli=1

Let me know what you think.

Regards
kalyan.

On Sat, Jan 20, 2024 at 8:19 AM Ashish Singh  wrote:

> Hey all,
>
> Thanks for this discussion, the timing of this couldn't be better!
>
> At Pinterest, we recently started to look into reducing OOM failures while
> also reducing memory consumption of spark applications. We considered the
> following options.
> 1. Changing core count on executor to change memory available per task in
> the executor.
> 2. Changing resource profile based on task failures and gc metrics to grow
> or shrink executor memory size. We do this at application level based on
> the app's past runs today.
> 3. K8s vertical pod autoscaler
> 
>
> Internally, we are mostly getting aligned on option 2. We would love to
> make this happen and are looking forward to the SPIP.
>
>
> On Wed, Jan 17, 2024 at 9:34 AM Mridul Muralidharan 
> wrote:
>
>>
>> Hi,
>>
>>   We are internally exploring adding support for dynamically changing the
>> resource profile of a stage based on runtime characteristics.
>> This includes failures due to OOM and the like, slowness due to excessive
>> GC, resource wastage due to excessive overprovisioning, etc.
>> Essentially handles scale up and scale down of resources.
>> Instead of baking these into the scheduler directly (which is already
>> complex), we are modeling it as a plugin - so that the 'business logic' of
>> how to handle task events and mutate state is pluggable.
>>
>> The main limitation I find with mutating only the cores is the limits it
>> places on what kind of problems can be solved with it - and mutating
>> resource profiles is a much more natural way to handle this
>> (spark.task.cpus predates RP).
>>
>> Regards,
>> Mridul
>>
>> On Wed, Jan 17, 2024 at 9:18 AM Tom Graves 
>> wrote:
>>
>>> It is interesting. I think there are definitely some discussion points
>>> around this.  reliability vs performance is always a trade off and its
>>> great it doesn't fail but if it doesn't meet someone's SLA now that could
>>> be as bad if its hard to figure out why.   I think if something like this
>>> kicks in, it needs to be very obvious to the user so they can see that it
>>> occurred.  Do you have something in place on UI or something that indicates
>>> this? The nice thing is also you aren't wasting memory by increasing it for
>>> all tasks when maybe you only need it for one or two.  The downside is you
>>> are only finding out after failure.
>>>
>>> I do also worry a little bit that in your blog post, the error you
>>> pointed out isn't a java OOM but an off heap memory issue (overhead + heap
>>> usage).  You don't really address heap memory vs off heap in that article.
>>> Only thing I see mentioned is spark.executor.memory which is heap memory.
>>> Obviously adjusting to only run one task is going to give that task more
>>> overall memory but the reasons its running out in the first place could be
>>> different.  If it was on heap memory for instance with more tasks I would
>>> expect to see more GC and not executor OOM.  If you are getting executor
>>> OOM you are likely using more off heap memory/stack space, etc then you
>>> allocated.   Ultimately it would be nice to know why that is happening and
>>> see if we can address it to not fail in the first place.  That could be
>>> extremely difficult though, especially if using software outside Spark that
>>> is using that memory.
>>>
>>> As Holden said,  we need to make sure this would play nice with the
>>> resource profiles, or potentially if we can use the resource profile
>>> functionality.  Theoretically you could extend this to try to get new
>>> executor if using dynamic allocation for instance.
>>>
>>> I agree doing a SPIP would be a good place to start to have more
>>> discussions.
>>>
>>> Tom
>>>
>>> On Wednesday, January 17, 2024 at 12:47:51 AM CST, kalyan <
>>> justfors...@gmail.com> wrote:
>>>
>>>
>>> Hello All,
>>>
>>> At Uber, we had recently, done some work on improving the reliability of
>>> spark applications in scenarios of fatter executors going out of memory and
>>> leading to application failure. Fatter executors are those that have more
>>> than 1 task running on it at a given time concurrently. This has
>>> significantly improved the reliability of many spark applications for us at
>>> Uber. We made a blog about this recently. Link:
>>> https://www.uber.com/en-US/blog/dynamic-executor-core-resizing-in-spark/
>>>
>>> At a high level, we have done the below changes:
>>>
>>>1. When a Task fails with the OOM of an executor, we update the core
>>>requirements of the task to max executor cores.
>>>2. When the task is picked for rescheduling, the new attempt of the
>>>task happens to be on an executor where no other task can 

Re: [Spark-Core] Improving Reliability of spark when Executors OOM

2024-01-19 Thread Ashish Singh
Hey all,

Thanks for this discussion, the timing of this couldn't be better!

At Pinterest, we recently started to look into reducing OOM failures while
also reducing memory consumption of spark applications. We considered the
following options.
1. Changing core count on executor to change memory available per task in
the executor.
2. Changing resource profile based on task failures and gc metrics to grow
or shrink executor memory size. We do this at application level based on
the app's past runs today.
3. K8s vertical pod autoscaler


Internally, we are mostly getting aligned on option 2. We would love to
make this happen and are looking forward to the SPIP.


On Wed, Jan 17, 2024 at 9:34 AM Mridul Muralidharan 
wrote:

>
> Hi,
>
>   We are internally exploring adding support for dynamically changing the
> resource profile of a stage based on runtime characteristics.
> This includes failures due to OOM and the like, slowness due to excessive
> GC, resource wastage due to excessive overprovisioning, etc.
> Essentially handles scale up and scale down of resources.
> Instead of baking these into the scheduler directly (which is already
> complex), we are modeling it as a plugin - so that the 'business logic' of
> how to handle task events and mutate state is pluggable.
>
> The main limitation I find with mutating only the cores is the limits it
> places on what kind of problems can be solved with it - and mutating
> resource profiles is a much more natural way to handle this
> (spark.task.cpus predates RP).
>
> Regards,
> Mridul
>
> On Wed, Jan 17, 2024 at 9:18 AM Tom Graves 
> wrote:
>
>> It is interesting. I think there are definitely some discussion points
>> around this.  reliability vs performance is always a trade off and its
>> great it doesn't fail but if it doesn't meet someone's SLA now that could
>> be as bad if its hard to figure out why.   I think if something like this
>> kicks in, it needs to be very obvious to the user so they can see that it
>> occurred.  Do you have something in place on UI or something that indicates
>> this? The nice thing is also you aren't wasting memory by increasing it for
>> all tasks when maybe you only need it for one or two.  The downside is you
>> are only finding out after failure.
>>
>> I do also worry a little bit that in your blog post, the error you
>> pointed out isn't a java OOM but an off heap memory issue (overhead + heap
>> usage).  You don't really address heap memory vs off heap in that article.
>> Only thing I see mentioned is spark.executor.memory which is heap memory.
>> Obviously adjusting to only run one task is going to give that task more
>> overall memory but the reasons its running out in the first place could be
>> different.  If it was on heap memory for instance with more tasks I would
>> expect to see more GC and not executor OOM.  If you are getting executor
>> OOM you are likely using more off heap memory/stack space, etc then you
>> allocated.   Ultimately it would be nice to know why that is happening and
>> see if we can address it to not fail in the first place.  That could be
>> extremely difficult though, especially if using software outside Spark that
>> is using that memory.
>>
>> As Holden said,  we need to make sure this would play nice with the
>> resource profiles, or potentially if we can use the resource profile
>> functionality.  Theoretically you could extend this to try to get new
>> executor if using dynamic allocation for instance.
>>
>> I agree doing a SPIP would be a good place to start to have more
>> discussions.
>>
>> Tom
>>
>> On Wednesday, January 17, 2024 at 12:47:51 AM CST, kalyan <
>> justfors...@gmail.com> wrote:
>>
>>
>> Hello All,
>>
>> At Uber, we had recently, done some work on improving the reliability of
>> spark applications in scenarios of fatter executors going out of memory and
>> leading to application failure. Fatter executors are those that have more
>> than 1 task running on it at a given time concurrently. This has
>> significantly improved the reliability of many spark applications for us at
>> Uber. We made a blog about this recently. Link:
>> https://www.uber.com/en-US/blog/dynamic-executor-core-resizing-in-spark/
>>
>> At a high level, we have done the below changes:
>>
>>1. When a Task fails with the OOM of an executor, we update the core
>>requirements of the task to max executor cores.
>>2. When the task is picked for rescheduling, the new attempt of the
>>task happens to be on an executor where no other task can run 
>> concurrently.
>>All cores get allocated to this task itself.
>>3. This way we ensure that the configured memory is completely at the
>>disposal of a single task. Thus eliminating contention of memory.
>>
>> The best part of this solution is that it's reactive. It kicks in only
>> when the executors fail with the OOM exception.
>>
>> We understand that the problem 

Re: [Spark-Core] Improving Reliability of spark when Executors OOM

2024-01-17 Thread Mridul Muralidharan
Hi,

  We are internally exploring adding support for dynamically changing the
resource profile of a stage based on runtime characteristics.
This includes failures due to OOM and the like, slowness due to excessive
GC, resource wastage due to excessive overprovisioning, etc.
Essentially handles scale up and scale down of resources.
Instead of baking these into the scheduler directly (which is already
complex), we are modeling it as a plugin - so that the 'business logic' of
how to handle task events and mutate state is pluggable.

The main limitation I find with mutating only the cores is the limits it
places on what kind of problems can be solved with it - and mutating
resource profiles is a much more natural way to handle this
(spark.task.cpus predates RP).

Regards,
Mridul

On Wed, Jan 17, 2024 at 9:18 AM Tom Graves 
wrote:

> It is interesting. I think there are definitely some discussion points
> around this.  reliability vs performance is always a trade off and its
> great it doesn't fail but if it doesn't meet someone's SLA now that could
> be as bad if its hard to figure out why.   I think if something like this
> kicks in, it needs to be very obvious to the user so they can see that it
> occurred.  Do you have something in place on UI or something that indicates
> this? The nice thing is also you aren't wasting memory by increasing it for
> all tasks when maybe you only need it for one or two.  The downside is you
> are only finding out after failure.
>
> I do also worry a little bit that in your blog post, the error you pointed
> out isn't a java OOM but an off heap memory issue (overhead + heap usage).
> You don't really address heap memory vs off heap in that article.  Only
> thing I see mentioned is spark.executor.memory which is heap memory.
> Obviously adjusting to only run one task is going to give that task more
> overall memory but the reasons its running out in the first place could be
> different.  If it was on heap memory for instance with more tasks I would
> expect to see more GC and not executor OOM.  If you are getting executor
> OOM you are likely using more off heap memory/stack space, etc then you
> allocated.   Ultimately it would be nice to know why that is happening and
> see if we can address it to not fail in the first place.  That could be
> extremely difficult though, especially if using software outside Spark that
> is using that memory.
>
> As Holden said,  we need to make sure this would play nice with the
> resource profiles, or potentially if we can use the resource profile
> functionality.  Theoretically you could extend this to try to get new
> executor if using dynamic allocation for instance.
>
> I agree doing a SPIP would be a good place to start to have more
> discussions.
>
> Tom
>
> On Wednesday, January 17, 2024 at 12:47:51 AM CST, kalyan <
> justfors...@gmail.com> wrote:
>
>
> Hello All,
>
> At Uber, we had recently, done some work on improving the reliability of
> spark applications in scenarios of fatter executors going out of memory and
> leading to application failure. Fatter executors are those that have more
> than 1 task running on it at a given time concurrently. This has
> significantly improved the reliability of many spark applications for us at
> Uber. We made a blog about this recently. Link:
> https://www.uber.com/en-US/blog/dynamic-executor-core-resizing-in-spark/
>
> At a high level, we have done the below changes:
>
>1. When a Task fails with the OOM of an executor, we update the core
>requirements of the task to max executor cores.
>2. When the task is picked for rescheduling, the new attempt of the
>task happens to be on an executor where no other task can run concurrently.
>All cores get allocated to this task itself.
>3. This way we ensure that the configured memory is completely at the
>disposal of a single task. Thus eliminating contention of memory.
>
> The best part of this solution is that it's reactive. It kicks in only
> when the executors fail with the OOM exception.
>
> We understand that the problem statement is very common and we expect our
> solution to be effective in many cases.
>
> There could be more cases that can be covered. Executor failing with OOM
> is like a hard signal. The framework(making the driver aware of
> what's happening with the executor) can be extended to handle scenarios of
> other forms of memory pressure like excessive spilling to disk, etc.
>
> While we had developed this on Spark 2.4.3 in-house, we would like to
> collaborate and contribute this work to the latest versions of Spark.
>
> What is the best way forward here? Will an SPIP proposal to detail the
> changes help?
>
> Regards,
> Kalyan.
> Uber India.
>


Re: [Spark-Core] Improving Reliability of spark when Executors OOM

2024-01-17 Thread Tom Graves
 It is interesting. I think there are definitely some discussion points around 
this.  reliability vs performance is always a trade off and its great it 
doesn't fail but if it doesn't meet someone's SLA now that could be as bad if 
its hard to figure out why.   I think if something like this kicks in, it needs 
to be very obvious to the user so they can see that it occurred.  Do you have 
something in place on UI or something that indicates this? The nice thing is 
also you aren't wasting memory by increasing it for all tasks when maybe you 
only need it for one or two.  The downside is you are only finding out after 
failure.
I do also worry a little bit that in your blog post, the error you pointed out 
isn't a java OOM but an off heap memory issue (overhead + heap usage).  You 
don't really address heap memory vs off heap in that article.  Only thing I see 
mentioned is spark.executor.memory which is heap memory.  Obviously adjusting 
to only run one task is going to give that task more overall memory but the 
reasons its running out in the first place could be different.  If it was on 
heap memory for instance with more tasks I would expect to see more GC and not 
executor OOM.  If you are getting executor OOM you are likely using more off 
heap memory/stack space, etc then you allocated.   Ultimately it would be nice 
to know why that is happening and see if we can address it to not fail in the 
first place.  That could be extremely difficult though, especially if using 
software outside Spark that is using that memory.
As Holden said,  we need to make sure this would play nice with the resource 
profiles, or potentially if we can use the resource profile functionality.  
Theoretically you could extend this to try to get new executor if using dynamic 
allocation for instance.  

I agree doing a SPIP would be a good place to start to have more discussions.
Tom
On Wednesday, January 17, 2024 at 12:47:51 AM CST, kalyan 
 wrote:  
 
 Hello All,
At Uber, we had recently, done some work on improving the reliability of spark 
applications in scenarios of fatter executors going out of memory and leading 
to application failure. Fatter executors are those that have more than 1 task 
running on it at a given time concurrently. This has significantly improved the 
reliability of many spark applications for us at Uber. We made a blog about 
this recently. Link: 
https://www.uber.com/en-US/blog/dynamic-executor-core-resizing-in-spark/
At a high level, we have done the below changes:   
   - When a Task fails with the OOM of an executor, we update the core 
requirements of the task to max executor cores. 
   - When the task is picked for rescheduling, the new attempt of the task 
happens to be on an executor where no other task can run concurrently. All 
cores get allocated to this task itself.
   - This way we ensure that the configured memory is completely at the 
disposal of a single task. Thus eliminating contention of memory.
The best part of this solution is that it's reactive. It kicks in only when the 
executors fail with the OOM exception.
We understand that the problem statement is very common and we expect our 
solution to be effective in many cases. There could be more cases that can be 
covered. Executor failing with OOM is like a hard signal. The framework(making 
the driver aware of what's happening with the executor) can be extended to 
handle scenarios of other forms of memory pressure like excessive spilling to 
disk, etc. 
While we had developed this on Spark 2.4.3 in-house, we would like to 
collaborate and contribute this work to the latest versions of Spark.
What is the best way forward here? Will an SPIP proposal to detail the changes 
help?
Regards,Kalyan.Uber India.  

Re: [Spark-Core] Improving Reliability of spark when Executors OOM

2024-01-16 Thread Holden Karau
Oh interesting solution, a co-worker was suggesting something similar using
resource profiles to increase memory -- but your approach avoids a lot of
complexity I like it (and we could extend it out to support resource
profile growth too).

I think an SPIP sounds like a great next step.

On Tue, Jan 16, 2024 at 10:46 PM kalyan  wrote:

> Hello All,
>
> At Uber, we had recently, done some work on improving the reliability of
> spark applications in scenarios of fatter executors going out of memory and
> leading to application failure. Fatter executors are those that have more
> than 1 task running on it at a given time concurrently. This has
> significantly improved the reliability of many spark applications for us at
> Uber. We made a blog about this recently. Link:
> https://www.uber.com/en-US/blog/dynamic-executor-core-resizing-in-spark/
>
> At a high level, we have done the below changes:
>
>1. When a Task fails with the OOM of an executor, we update the core
>requirements of the task to max executor cores.
>2. When the task is picked for rescheduling, the new attempt of the
>task happens to be on an executor where no other task can run concurrently.
>All cores get allocated to this task itself.
>3. This way we ensure that the configured memory is completely at the
>disposal of a single task. Thus eliminating contention of memory.
>
> The best part of this solution is that it's reactive. It kicks in only
> when the executors fail with the OOM exception.
>
> We understand that the problem statement is very common and we expect our
> solution to be effective in many cases.
>
> There could be more cases that can be covered. Executor failing with OOM
> is like a hard signal. The framework(making the driver aware of
> what's happening with the executor) can be extended to handle scenarios of
> other forms of memory pressure like excessive spilling to disk, etc.
>
> While we had developed this on Spark 2.4.3 in-house, we would like to
> collaborate and contribute this work to the latest versions of Spark.
>
> What is the best way forward here? Will an SPIP proposal to detail the
> changes help?
>
> Regards,
> Kalyan.
> Uber India.
>


-- 
Cell : 425-233-8271


[Spark-Core] Improving Reliability of spark when Executors OOM

2024-01-16 Thread kalyan
Hello All,

At Uber, we had recently, done some work on improving the reliability of
spark applications in scenarios of fatter executors going out of memory and
leading to application failure. Fatter executors are those that have more
than 1 task running on it at a given time concurrently. This has
significantly improved the reliability of many spark applications for us at
Uber. We made a blog about this recently. Link:
https://www.uber.com/en-US/blog/dynamic-executor-core-resizing-in-spark/

At a high level, we have done the below changes:

   1. When a Task fails with the OOM of an executor, we update the core
   requirements of the task to max executor cores.
   2. When the task is picked for rescheduling, the new attempt of the task
   happens to be on an executor where no other task can run concurrently. All
   cores get allocated to this task itself.
   3. This way we ensure that the configured memory is completely at the
   disposal of a single task. Thus eliminating contention of memory.

The best part of this solution is that it's reactive. It kicks in only when
the executors fail with the OOM exception.

We understand that the problem statement is very common and we expect our
solution to be effective in many cases.

There could be more cases that can be covered. Executor failing with OOM is
like a hard signal. The framework(making the driver aware of
what's happening with the executor) can be extended to handle scenarios of
other forms of memory pressure like excessive spilling to disk, etc.

While we had developed this on Spark 2.4.3 in-house, we would like to
collaborate and contribute this work to the latest versions of Spark.

What is the best way forward here? Will an SPIP proposal to detail the
changes help?

Regards,
Kalyan.
Uber India.