Re: Fair scheduler pool details

2016-03-02 Thread Mark Hamstra
If I'm understanding you correctly, then you are correct that the fair
scheduler doesn't currently do everything that you want to achieve.  Fair
scheduler pools currently can be configured with a minimum number of cores
that they will need before accepting Tasks, but there isn't a way to
restrict a pool to use no more than a certain number of cores.  That means
that a lower-priority pool can grab all of the cores as long as there is no
demand on high-priority pools, and then the higher-priority pools will have
to wait for the lower-priority pool to complete Tasks before the
higher-priority pools will be able to run Tasks.  That means that fair
scheduling pools really aren't a sufficient means to satisfy multi-tenancy
requirements or other scenarios where you want a guarantee that there will
always be some cores available to run a high-priority job.  There is a JIRA
issue and a PR out there to address some of this issue, and I've been
starting to come around to the notion that we should support a max cores
configuration for fair scheduler pools, but there is nothing like that
available right now.  Neither is there a way at the application level in a
standalone-mode cluster for one application to pre-empt another in order to
acquires its cores or other resources.  YARN does provide some support for
that, and Mesos may as well, so that is the closest option that I think
currently exists to satisfy your requirement.

On Wed, Mar 2, 2016 at 6:20 PM, Eugene Morozov 
wrote:

> Mark,
>
> I'm trying to configure spark cluster to share resources between two pools.
>
> I can do that by assigning minimal shares (it works fine), but that means
> specific amount of cores is going to be wasted by just being ready to run
> anything. While that's better, than nothing, I'd like to specify percentage
> of cores instead of specific number of cores as cluster might be changed in
> size either up or down. Is there such an option?
>
> Also I haven't found anything about sort of preemptive scheduler for
> standalone deployment (it is slightly mentioned in SPARK-9882, but it seems
> to be abandoned). Do you know if there is such an activity?
>
> --
> Be well!
> Jean Morozov
>
> On Sun, Feb 21, 2016 at 4:32 AM, Mark Hamstra 
> wrote:
>
>> It's 2 -- and it's pretty hard to point to a line of code, a method, or
>> even a class since the scheduling of Tasks involves a pretty complex
>> interaction of several Spark components -- mostly the DAGScheduler,
>> TaskScheduler/TaskSchedulerImpl, TaskSetManager, Schedulable and Pool, as
>> well as the SchedulerBackend (CoarseGrainedSchedulerBackend in this case.)
>>  The key thing to understand, though, is the comment at the top of
>> SchedulerBackend.scala: "A backend interface for scheduling systems that
>> allows plugging in different ones under TaskSchedulerImpl. We assume a
>> Mesos-like model where the application gets resource offers as machines
>> become available and can launch tasks on them."  In other words, the whole
>> scheduling system is built on a model that starts with offers made by
>> workers when resources are available to run Tasks.  Other than the big
>> hammer of canceling a Job while interruptOnCancel is true, there isn't
>> really any facility for stopping or rescheduling Tasks that are already
>> started, so that rules out your option 1.  Similarly, option 3 is out
>> because the scheduler doesn't know when Tasks will complete; it just knows
>> when a new offer comes in and it is time to send more Tasks to be run on
>> the machine making the offer.
>>
>> What actually happens is that the Pool with which a Job is associated
>> maintains a queue of TaskSets needing to be scheduled.  When in
>> resourceOffers the TaskSchedulerImpl needs sortedTaskSets, the Pool
>> supplies those from its scheduling queue after first sorting it according
>> to the Pool's taskSetSchedulingAlgorithm.  In other words, what Spark's
>> fair scheduling does in essence is, in response to worker resource offers,
>> to send new Tasks to be run; those Tasks are taken in sets from the queue
>> of waiting TaskSets, sorted according to a scheduling algorithm.  There is
>> no pre-emption or rescheduling of Tasks that the scheduler has already sent
>> to the workers, nor is there any attempt to anticipate when already running
>> Tasks will complete.
>>
>>
>> On Sat, Feb 20, 2016 at 4:14 PM, Eugene Morozov <
>> evgeny.a.moro...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I'm trying to understand how this thing works underneath. Let's say I
>>> have two types of jobs - high important, that might use small amount of
>>> cores and has to be run pretty fast. And less important, but greedy - uses
>>> as many cores as available. So, the idea is to use two corresponding pools.
>>>
>>> Then thing I'm trying to understand is the following.
>>> I use standalone spark deployment (no YARN, no Mesos).
>>> Let's say that less important took all the cores and then someone 

Re: Fair scheduler pool details

2016-03-02 Thread Eugene Morozov
Mark,

I'm trying to configure spark cluster to share resources between two pools.

I can do that by assigning minimal shares (it works fine), but that means
specific amount of cores is going to be wasted by just being ready to run
anything. While that's better, than nothing, I'd like to specify percentage
of cores instead of specific number of cores as cluster might be changed in
size either up or down. Is there such an option?

Also I haven't found anything about sort of preemptive scheduler for
standalone deployment (it is slightly mentioned in SPARK-9882, but it seems
to be abandoned). Do you know if there is such an activity?

--
Be well!
Jean Morozov

On Sun, Feb 21, 2016 at 4:32 AM, Mark Hamstra 
wrote:

> It's 2 -- and it's pretty hard to point to a line of code, a method, or
> even a class since the scheduling of Tasks involves a pretty complex
> interaction of several Spark components -- mostly the DAGScheduler,
> TaskScheduler/TaskSchedulerImpl, TaskSetManager, Schedulable and Pool, as
> well as the SchedulerBackend (CoarseGrainedSchedulerBackend in this case.)
>  The key thing to understand, though, is the comment at the top of
> SchedulerBackend.scala: "A backend interface for scheduling systems that
> allows plugging in different ones under TaskSchedulerImpl. We assume a
> Mesos-like model where the application gets resource offers as machines
> become available and can launch tasks on them."  In other words, the whole
> scheduling system is built on a model that starts with offers made by
> workers when resources are available to run Tasks.  Other than the big
> hammer of canceling a Job while interruptOnCancel is true, there isn't
> really any facility for stopping or rescheduling Tasks that are already
> started, so that rules out your option 1.  Similarly, option 3 is out
> because the scheduler doesn't know when Tasks will complete; it just knows
> when a new offer comes in and it is time to send more Tasks to be run on
> the machine making the offer.
>
> What actually happens is that the Pool with which a Job is associated
> maintains a queue of TaskSets needing to be scheduled.  When in
> resourceOffers the TaskSchedulerImpl needs sortedTaskSets, the Pool
> supplies those from its scheduling queue after first sorting it according
> to the Pool's taskSetSchedulingAlgorithm.  In other words, what Spark's
> fair scheduling does in essence is, in response to worker resource offers,
> to send new Tasks to be run; those Tasks are taken in sets from the queue
> of waiting TaskSets, sorted according to a scheduling algorithm.  There is
> no pre-emption or rescheduling of Tasks that the scheduler has already sent
> to the workers, nor is there any attempt to anticipate when already running
> Tasks will complete.
>
>
> On Sat, Feb 20, 2016 at 4:14 PM, Eugene Morozov <
> evgeny.a.moro...@gmail.com> wrote:
>
>> Hi,
>>
>> I'm trying to understand how this thing works underneath. Let's say I
>> have two types of jobs - high important, that might use small amount of
>> cores and has to be run pretty fast. And less important, but greedy - uses
>> as many cores as available. So, the idea is to use two corresponding pools.
>>
>> Then thing I'm trying to understand is the following.
>> I use standalone spark deployment (no YARN, no Mesos).
>> Let's say that less important took all the cores and then someone runs
>> high important job. Then I see three possibilities:
>> 1. Spark kill some executors that currently runs less important
>> partitions to assign them to a high performant job.
>> 2. Spark will wait until some partitions of less important job will be
>> completely processed and then first executors that become free will be
>> assigned to process high important job.
>> 3. Spark will figure out specific time, when particular stages of
>> partitions of less important jobs is done, and instead of continue with
>> this job, these executors will be reassigned to high important one.
>>
>> Which one it is? Could you please point me to a class / method / line of
>> code?
>> --
>> Be well!
>> Jean Morozov
>>
>
>


Re: Fair scheduler pool details

2016-02-20 Thread Mark Hamstra
It's 2 -- and it's pretty hard to point to a line of code, a method, or
even a class since the scheduling of Tasks involves a pretty complex
interaction of several Spark components -- mostly the DAGScheduler,
TaskScheduler/TaskSchedulerImpl, TaskSetManager, Schedulable and Pool, as
well as the SchedulerBackend (CoarseGrainedSchedulerBackend in this case.)
 The key thing to understand, though, is the comment at the top of
SchedulerBackend.scala: "A backend interface for scheduling systems that
allows plugging in different ones under TaskSchedulerImpl. We assume a
Mesos-like model where the application gets resource offers as machines
become available and can launch tasks on them."  In other words, the whole
scheduling system is built on a model that starts with offers made by
workers when resources are available to run Tasks.  Other than the big
hammer of canceling a Job while interruptOnCancel is true, there isn't
really any facility for stopping or rescheduling Tasks that are already
started, so that rules out your option 1.  Similarly, option 3 is out
because the scheduler doesn't know when Tasks will complete; it just knows
when a new offer comes in and it is time to send more Tasks to be run on
the machine making the offer.

What actually happens is that the Pool with which a Job is associated
maintains a queue of TaskSets needing to be scheduled.  When in
resourceOffers the TaskSchedulerImpl needs sortedTaskSets, the Pool
supplies those from its scheduling queue after first sorting it according
to the Pool's taskSetSchedulingAlgorithm.  In other words, what Spark's
fair scheduling does in essence is, in response to worker resource offers,
to send new Tasks to be run; those Tasks are taken in sets from the queue
of waiting TaskSets, sorted according to a scheduling algorithm.  There is
no pre-emption or rescheduling of Tasks that the scheduler has already sent
to the workers, nor is there any attempt to anticipate when already running
Tasks will complete.


On Sat, Feb 20, 2016 at 4:14 PM, Eugene Morozov 
wrote:

> Hi,
>
> I'm trying to understand how this thing works underneath. Let's say I have
> two types of jobs - high important, that might use small amount of cores
> and has to be run pretty fast. And less important, but greedy - uses as
> many cores as available. So, the idea is to use two corresponding pools.
>
> Then thing I'm trying to understand is the following.
> I use standalone spark deployment (no YARN, no Mesos).
> Let's say that less important took all the cores and then someone runs
> high important job. Then I see three possibilities:
> 1. Spark kill some executors that currently runs less important partitions
> to assign them to a high performant job.
> 2. Spark will wait until some partitions of less important job will be
> completely processed and then first executors that become free will be
> assigned to process high important job.
> 3. Spark will figure out specific time, when particular stages of
> partitions of less important jobs is done, and instead of continue with
> this job, these executors will be reassigned to high important one.
>
> Which one it is? Could you please point me to a class / method / line of
> code?
> --
> Be well!
> Jean Morozov
>