[jira] [Commented] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used

2018-04-24 Thread Julien Cuquemelle (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16450186#comment-16450186
 ] 

Julien Cuquemelle commented on SPARK-22683:
---

Thanks for all your comments and proposals :)

> DynamicAllocation wastes resources by allocating containers that will barely 
> be used
> 
>
> Key: SPARK-22683
> URL: https://issues.apache.org/jira/browse/SPARK-22683
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Julien Cuquemelle
>Assignee: Julien Cuquemelle
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.4.0
>
>
> While migrating a series of jobs from MR to Spark using dynamicAllocation, 
> I've noticed almost a doubling (+114% exactly) of resource consumption of 
> Spark w.r.t MR, for a wall clock time gain of 43%
> About the context: 
> - resource usage stands for vcore-hours allocation for the whole job, as seen 
> by YARN
> - I'm talking about a series of jobs because we provide our users with a way 
> to define experiments (via UI / DSL) that automatically get translated to 
> Spark / MR jobs and submitted on the cluster
> - we submit around 500 of such jobs each day
> - these jobs are usually one shot, and the amount of processing can vary a 
> lot between jobs, and as such finding an efficient number of executors for 
> each job is difficult to get right, which is the reason I took the path of 
> dynamic allocation.  
> - Some of the tests have been scheduled on an idle queue, some on a full 
> queue.
> - experiments have been conducted with spark.executor-cores = 5 and 10, only 
> results for 5 cores have been reported because efficiency was overall better 
> than with 10 cores
> - the figures I give are averaged over a representative sample of those jobs 
> (about 600 jobs) ranging from tens to thousands splits in the data 
> partitioning and between 400 to 9000 seconds of wall clock time.
> - executor idle timeout is set to 30s;
>  
> Definition: 
> - let's say an executor has spark.executor.cores / spark.task.cpus taskSlots, 
> which represent the max number of tasks an executor will process in parallel.
> - the current behaviour of the dynamic allocation is to allocate enough 
> containers to have one taskSlot per task, which minimizes latency, but wastes 
> resources when tasks are small regarding executor allocation and idling 
> overhead. 
> The results using the proposal (described below) over the job sample (600 
> jobs):
> - by using 2 tasks per taskSlot, we get a 5% (against -114%) reduction in 
> resource usage, for a 37% (against 43%) reduction in wall clock time for 
> Spark w.r.t MR
> - by trying to minimize the average resource consumption, I ended up with 6 
> tasks per core, with a 30% resource usage reduction, for a similar wall clock 
> time w.r.t. MR
> What did I try to solve the issue with existing parameters (summing up a few 
> points mentioned in the comments) ?
> - change dynamicAllocation.maxExecutors: this would need to be adapted for 
> each job (tens to thousands splits can occur), and essentially remove the 
> interest of using the dynamic allocation.
> - use dynamicAllocation.backlogTimeout: 
> - setting this parameter right to avoid creating unused executors is very 
> dependant on wall clock time. One basically needs to solve the exponential 
> ramp up for the target time. So this is not an option for my use case where I 
> don't want a per-job tuning. 
> - I've still done a series of experiments, details in the comments. 
> Result is that after manual tuning, the best I could get was a similar 
> resource consumption at the expense of 20% more wall clock time, or a similar 
> wall clock time at the expense of 60% more resource consumption than what I 
> got using my proposal @ 6 tasks per slot (this value being optimized over a 
> much larger range of jobs as already stated)
> - as mentioned in another comment, tampering with the exponential ramp up 
> might yield task imbalance and such old executors could become contention 
> points for other exes trying to remotely access blocks in the old exes (not 
> witnessed in the jobs I'm talking about, but we did see this behavior in 
> other jobs)
> Proposal: 
> Simply add a tasksPerExecutorSlot parameter, which makes it possible to 
> specify how many tasks a single taskSlot should ideally execute to mitigate 
> the overhead of executor allocation.
> PR: https://github.com/apache/spark/pull/19881



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: 

[jira] [Commented] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used

2018-03-12 Thread Julien Cuquemelle (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16395364#comment-16395364
 ] 

Julien Cuquemelle commented on SPARK-22683:
---

PR updated, including [~xuefuz]'s proposal

> DynamicAllocation wastes resources by allocating containers that will barely 
> be used
> 
>
> Key: SPARK-22683
> URL: https://issues.apache.org/jira/browse/SPARK-22683
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Julien Cuquemelle
>Priority: Major
>  Labels: pull-request-available
>
> While migrating a series of jobs from MR to Spark using dynamicAllocation, 
> I've noticed almost a doubling (+114% exactly) of resource consumption of 
> Spark w.r.t MR, for a wall clock time gain of 43%
> About the context: 
> - resource usage stands for vcore-hours allocation for the whole job, as seen 
> by YARN
> - I'm talking about a series of jobs because we provide our users with a way 
> to define experiments (via UI / DSL) that automatically get translated to 
> Spark / MR jobs and submitted on the cluster
> - we submit around 500 of such jobs each day
> - these jobs are usually one shot, and the amount of processing can vary a 
> lot between jobs, and as such finding an efficient number of executors for 
> each job is difficult to get right, which is the reason I took the path of 
> dynamic allocation.  
> - Some of the tests have been scheduled on an idle queue, some on a full 
> queue.
> - experiments have been conducted with spark.executor-cores = 5 and 10, only 
> results for 5 cores have been reported because efficiency was overall better 
> than with 10 cores
> - the figures I give are averaged over a representative sample of those jobs 
> (about 600 jobs) ranging from tens to thousands splits in the data 
> partitioning and between 400 to 9000 seconds of wall clock time.
> - executor idle timeout is set to 30s;
>  
> Definition: 
> - let's say an executor has spark.executor.cores / spark.task.cpus taskSlots, 
> which represent the max number of tasks an executor will process in parallel.
> - the current behaviour of the dynamic allocation is to allocate enough 
> containers to have one taskSlot per task, which minimizes latency, but wastes 
> resources when tasks are small regarding executor allocation and idling 
> overhead. 
> The results using the proposal (described below) over the job sample (600 
> jobs):
> - by using 2 tasks per taskSlot, we get a 5% (against -114%) reduction in 
> resource usage, for a 37% (against 43%) reduction in wall clock time for 
> Spark w.r.t MR
> - by trying to minimize the average resource consumption, I ended up with 6 
> tasks per core, with a 30% resource usage reduction, for a similar wall clock 
> time w.r.t. MR
> What did I try to solve the issue with existing parameters (summing up a few 
> points mentioned in the comments) ?
> - change dynamicAllocation.maxExecutors: this would need to be adapted for 
> each job (tens to thousands splits can occur), and essentially remove the 
> interest of using the dynamic allocation.
> - use dynamicAllocation.backlogTimeout: 
> - setting this parameter right to avoid creating unused executors is very 
> dependant on wall clock time. One basically needs to solve the exponential 
> ramp up for the target time. So this is not an option for my use case where I 
> don't want a per-job tuning. 
> - I've still done a series of experiments, details in the comments. 
> Result is that after manual tuning, the best I could get was a similar 
> resource consumption at the expense of 20% more wall clock time, or a similar 
> wall clock time at the expense of 60% more resource consumption than what I 
> got using my proposal @ 6 tasks per slot (this value being optimized over a 
> much larger range of jobs as already stated)
> - as mentioned in another comment, tampering with the exponential ramp up 
> might yield task imbalance and such old executors could become contention 
> points for other exes trying to remotely access blocks in the old exes (not 
> witnessed in the jobs I'm talking about, but we did see this behavior in 
> other jobs)
> Proposal: 
> Simply add a tasksPerExecutorSlot parameter, which makes it possible to 
> specify how many tasks a single taskSlot should ideally execute to mitigate 
> the overhead of executor allocation.
> PR: https://github.com/apache/spark/pull/19881



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used

2018-03-06 Thread Julien Cuquemelle (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16387947#comment-16387947
 ] 

Julien Cuquemelle commented on SPARK-22683:
---

Yes, I have time.
I was waiting for suggestions for the parameter name.

how about spark.dynamicAllocation.fullParallelismDivisor (if we agree that 
parameter could be a double) ? 

> DynamicAllocation wastes resources by allocating containers that will barely 
> be used
> 
>
> Key: SPARK-22683
> URL: https://issues.apache.org/jira/browse/SPARK-22683
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Julien Cuquemelle
>Priority: Major
>  Labels: pull-request-available
>
> While migrating a series of jobs from MR to Spark using dynamicAllocation, 
> I've noticed almost a doubling (+114% exactly) of resource consumption of 
> Spark w.r.t MR, for a wall clock time gain of 43%
> About the context: 
> - resource usage stands for vcore-hours allocation for the whole job, as seen 
> by YARN
> - I'm talking about a series of jobs because we provide our users with a way 
> to define experiments (via UI / DSL) that automatically get translated to 
> Spark / MR jobs and submitted on the cluster
> - we submit around 500 of such jobs each day
> - these jobs are usually one shot, and the amount of processing can vary a 
> lot between jobs, and as such finding an efficient number of executors for 
> each job is difficult to get right, which is the reason I took the path of 
> dynamic allocation.  
> - Some of the tests have been scheduled on an idle queue, some on a full 
> queue.
> - experiments have been conducted with spark.executor-cores = 5 and 10, only 
> results for 5 cores have been reported because efficiency was overall better 
> than with 10 cores
> - the figures I give are averaged over a representative sample of those jobs 
> (about 600 jobs) ranging from tens to thousands splits in the data 
> partitioning and between 400 to 9000 seconds of wall clock time.
> - executor idle timeout is set to 30s;
>  
> Definition: 
> - let's say an executor has spark.executor.cores / spark.task.cpus taskSlots, 
> which represent the max number of tasks an executor will process in parallel.
> - the current behaviour of the dynamic allocation is to allocate enough 
> containers to have one taskSlot per task, which minimizes latency, but wastes 
> resources when tasks are small regarding executor allocation and idling 
> overhead. 
> The results using the proposal (described below) over the job sample (600 
> jobs):
> - by using 2 tasks per taskSlot, we get a 5% (against -114%) reduction in 
> resource usage, for a 37% (against 43%) reduction in wall clock time for 
> Spark w.r.t MR
> - by trying to minimize the average resource consumption, I ended up with 6 
> tasks per core, with a 30% resource usage reduction, for a similar wall clock 
> time w.r.t. MR
> What did I try to solve the issue with existing parameters (summing up a few 
> points mentioned in the comments) ?
> - change dynamicAllocation.maxExecutors: this would need to be adapted for 
> each job (tens to thousands splits can occur), and essentially remove the 
> interest of using the dynamic allocation.
> - use dynamicAllocation.backlogTimeout: 
> - setting this parameter right to avoid creating unused executors is very 
> dependant on wall clock time. One basically needs to solve the exponential 
> ramp up for the target time. So this is not an option for my use case where I 
> don't want a per-job tuning. 
> - I've still done a series of experiments, details in the comments. 
> Result is that after manual tuning, the best I could get was a similar 
> resource consumption at the expense of 20% more wall clock time, or a similar 
> wall clock time at the expense of 60% more resource consumption than what I 
> got using my proposal @ 6 tasks per slot (this value being optimized over a 
> much larger range of jobs as already stated)
> - as mentioned in another comment, tampering with the exponential ramp up 
> might yield task imbalance and such old executors could become contention 
> points for other exes trying to remotely access blocks in the old exes (not 
> witnessed in the jobs I'm talking about, but we did see this behavior in 
> other jobs)
> Proposal: 
> Simply add a tasksPerExecutorSlot parameter, which makes it possible to 
> specify how many tasks a single taskSlot should ideally execute to mitigate 
> the overhead of executor allocation.
> PR: https://github.com/apache/spark/pull/19881



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: 

[jira] [Comment Edited] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used

2018-02-08 Thread Julien Cuquemelle (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356887#comment-16356887
 ] 

Julien Cuquemelle edited comment on SPARK-22683 at 2/8/18 12:38 PM:


Thanks a lot for your feedbacks, some clarification: 
 - the default remains 1, it means resource is not an issue and you want to 
minimize latency.
 - if you want to be resource-aware, use 2 which brings the easiest resource 
saving setting (which seems to be already very interesting)
 - if you want to maximize resource saving you need to measure resource 
consumption with higher values of the parameter.

regarding the multi-job use case, I do agree that this will not be optimal (but 
it will not be less optimal than today's state), and that we need a per-job 
configuration. We have a use-case where we had to split an application with 2 
jobs into 2 applications, because the optimal tuning of the executors was very 
different. A per-job config seems actually interesting, but it is not limited 
to this new parameter, but should allow tuning the whole config. If that is 
ever implemented, the question of resource saving with the dynamic allocation 
still remains for each job, so this new parameter will still be useful.

Regarding the possibility to set the parameter programmatically, right now the 
number of tasks per executors is computed during the starting of the 
AllocationManager, which happens during the SparkContext initialization, so it 
is not possible to update it. But it does not seem difficult to make it mutable 
so that the number of needed executors computation takes it into account: 
{code:java}
/**
 * The maximum number of executors we would need under the current load to 
satisfy all running
 * and pending tasks, rounded up.
 */
private def maxNumExecutorsNeeded(): Int = {
  val numRunningOrPendingTasks = listener.totalPendingTasks + 
listener.totalRunningTasks
  (numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor
}
{code}


was (Author: jcuquemelle):
Thanks a lot for your feedbacks, some clarification: 
 - the default remains 1, it means resource is not an issue and you want to 
minimize latency.
 - if you want to be resource-aware, use 2 which brings the easiest resource 
saving setting (which seems to be already very interesting)
 - if you want to maximize resource saving you need to measure resource 
consumption with higher values of the parameter.

regarding the multi-job use case, I do agree that this will not be optimal (but 
it will not be less optimal than today's state), and that we need a per-job 
configuration. We have a use-case where we had to split an application with 2 
jobs into 2 applications, because the optimal tuning of the executors was very 
different. But if that happens, the question of resource saving with the 
dynamic allocation still remains for each job, so this new parameter will still 
be useful.

Regarding the possibility to set it programmatically, right now the number of 
tasks per executors is computed during the starting of the AllocationManager, 
which happens during the SparkContext initialization, so it is not possible to 
update it. But it does not seem difficult to make it mutable so that the number 
of needed executors computation takes it into account: 
{code:java}
/**
 * The maximum number of executors we would need under the current load to 
satisfy all running
 * and pending tasks, rounded up.
 */
private def maxNumExecutorsNeeded(): Int = {
  val numRunningOrPendingTasks = listener.totalPendingTasks + 
listener.totalRunningTasks
  (numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor
}
{code}

> DynamicAllocation wastes resources by allocating containers that will barely 
> be used
> 
>
> Key: SPARK-22683
> URL: https://issues.apache.org/jira/browse/SPARK-22683
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Julien Cuquemelle
>Priority: Major
>  Labels: pull-request-available
>
> While migrating a series of jobs from MR to Spark using dynamicAllocation, 
> I've noticed almost a doubling (+114% exactly) of resource consumption of 
> Spark w.r.t MR, for a wall clock time gain of 43%
> About the context: 
> - resource usage stands for vcore-hours allocation for the whole job, as seen 
> by YARN
> - I'm talking about a series of jobs because we provide our users with a way 
> to define experiments (via UI / DSL) that automatically get translated to 
> Spark / MR jobs and submitted on the cluster
> - we submit around 500 of such jobs each day
> - these jobs are usually one shot, and the amount of processing can vary a 
> lot between jobs, and as such finding an efficient number of 

[jira] [Commented] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used

2018-02-08 Thread Julien Cuquemelle (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356887#comment-16356887
 ] 

Julien Cuquemelle commented on SPARK-22683:
---

Thanks a lot for your feedbacks, some clarification: 
 - the default remains 1, it means resource is not an issue and you want to 
minimize latency.
 - if you want to be resource-aware, use 2 which brings the easiest resource 
saving setting (which seems to be already very interesting)
 - if you want to maximize resource saving you need to measure resource 
consumption with higher values of the parameter.

regarding the multi-job use case, I do agree that this will not be optimal (but 
it will not be less optimal than today's state), and that we need a per-job 
configuration. We have a use-case where we had to split an application with 2 
jobs into 2 applications, because the optimal tuning of the executors was very 
different. But if that happens, the question of resource saving with the 
dynamic allocation still remains for each job, so this new parameter will still 
be useful.

Regarding the possibility to set it programmatically, right now the number of 
tasks per executors is computed during the starting of the AllocationManager, 
which happens during the SparkContext initialization, so it is not possible to 
update it. But it does not seem difficult to make it mutable so that the number 
of needed executors computation takes it into account: 
{code:java}
/**
 * The maximum number of executors we would need under the current load to 
satisfy all running
 * and pending tasks, rounded up.
 */
private def maxNumExecutorsNeeded(): Int = {
  val numRunningOrPendingTasks = listener.totalPendingTasks + 
listener.totalRunningTasks
  (numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor
}
{code}

> DynamicAllocation wastes resources by allocating containers that will barely 
> be used
> 
>
> Key: SPARK-22683
> URL: https://issues.apache.org/jira/browse/SPARK-22683
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Julien Cuquemelle
>Priority: Major
>  Labels: pull-request-available
>
> While migrating a series of jobs from MR to Spark using dynamicAllocation, 
> I've noticed almost a doubling (+114% exactly) of resource consumption of 
> Spark w.r.t MR, for a wall clock time gain of 43%
> About the context: 
> - resource usage stands for vcore-hours allocation for the whole job, as seen 
> by YARN
> - I'm talking about a series of jobs because we provide our users with a way 
> to define experiments (via UI / DSL) that automatically get translated to 
> Spark / MR jobs and submitted on the cluster
> - we submit around 500 of such jobs each day
> - these jobs are usually one shot, and the amount of processing can vary a 
> lot between jobs, and as such finding an efficient number of executors for 
> each job is difficult to get right, which is the reason I took the path of 
> dynamic allocation.  
> - Some of the tests have been scheduled on an idle queue, some on a full 
> queue.
> - experiments have been conducted with spark.executor-cores = 5 and 10, only 
> results for 5 cores have been reported because efficiency was overall better 
> than with 10 cores
> - the figures I give are averaged over a representative sample of those jobs 
> (about 600 jobs) ranging from tens to thousands splits in the data 
> partitioning and between 400 to 9000 seconds of wall clock time.
> - executor idle timeout is set to 30s;
>  
> Definition: 
> - let's say an executor has spark.executor.cores / spark.task.cpus taskSlots, 
> which represent the max number of tasks an executor will process in parallel.
> - the current behaviour of the dynamic allocation is to allocate enough 
> containers to have one taskSlot per task, which minimizes latency, but wastes 
> resources when tasks are small regarding executor allocation and idling 
> overhead. 
> The results using the proposal (described below) over the job sample (600 
> jobs):
> - by using 2 tasks per taskSlot, we get a 5% (against -114%) reduction in 
> resource usage, for a 37% (against 43%) reduction in wall clock time for 
> Spark w.r.t MR
> - by trying to minimize the average resource consumption, I ended up with 6 
> tasks per core, with a 30% resource usage reduction, for a similar wall clock 
> time w.r.t. MR
> What did I try to solve the issue with existing parameters (summing up a few 
> points mentioned in the comments) ?
> - change dynamicAllocation.maxExecutors: this would need to be adapted for 
> each job (tens to thousands splits can occur), and essentially remove the 
> interest of using the dynamic allocation.
> - use dynamicAllocation.backlogTimeout: 
> 

[jira] [Commented] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used

2018-02-06 Thread Julien Cuquemelle (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353953#comment-16353953
 ] 

Julien Cuquemelle commented on SPARK-22683:
---

[~tgraves]: the issue appears at each stage boundary, as for each beginning 
stage we might request a too large number of executors. 
Regarding the end of a single stage job, usually the over-allocated executors 
still consume resources even if they are released before the (30s) timeout at 
the end of the job, because we usually have even a single executor running a 
while longer than the others that will hold the unused execs to live 'till the 
end of the job.

Releasing executors faster would probably reduce the overhead, but in that case 
it would add overhead for multistage jobs that would have to re-spawn more 
executors at each stage. 

The metric I don't know is how long does a container live if we request it and 
release it immediately once it's available.

The idea about something similar to speculative execution seems interesting, 
but in that case we will still have a latency impact, as we will need to wait 
for a significant number of tasks to be finished to compute running time stats 
and decide whether we need more executors or not. If the tasks are very short, 
the metric is quickly available but then we don't request more executors, so we 
get a similar effect than using 2 or more tasks per taskSlot. If the tasks are 
long, we will need to wait for a first batch of tasks and then compute the 
correct number of executors to compute the rest of tasks. In that case the 
latency would not be better than roughly twice the time to run a single task, 
which is exactly what we get with 2 tasks per taskSlot.

Another view would be to start upfront with n>1 tasks per taskSlot, and gather 
stats about the time needed to spawn the required executors, then if the 
running tasks are significantly longer than executor allocation time, request 
new executors (and in that case we don't need to wait until completion of the 
tasks).

 

> DynamicAllocation wastes resources by allocating containers that will barely 
> be used
> 
>
> Key: SPARK-22683
> URL: https://issues.apache.org/jira/browse/SPARK-22683
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Julien Cuquemelle
>Priority: Major
>  Labels: pull-request-available
>
> While migrating a series of jobs from MR to Spark using dynamicAllocation, 
> I've noticed almost a doubling (+114% exactly) of resource consumption of 
> Spark w.r.t MR, for a wall clock time gain of 43%
> About the context: 
> - resource usage stands for vcore-hours allocation for the whole job, as seen 
> by YARN
> - I'm talking about a series of jobs because we provide our users with a way 
> to define experiments (via UI / DSL) that automatically get translated to 
> Spark / MR jobs and submitted on the cluster
> - we submit around 500 of such jobs each day
> - these jobs are usually one shot, and the amount of processing can vary a 
> lot between jobs, and as such finding an efficient number of executors for 
> each job is difficult to get right, which is the reason I took the path of 
> dynamic allocation.  
> - Some of the tests have been scheduled on an idle queue, some on a full 
> queue.
> - experiments have been conducted with spark.executor-cores = 5 and 10, only 
> results for 5 cores have been reported because efficiency was overall better 
> than with 10 cores
> - the figures I give are averaged over a representative sample of those jobs 
> (about 600 jobs) ranging from tens to thousands splits in the data 
> partitioning and between 400 to 9000 seconds of wall clock time.
> - executor idle timeout is set to 30s;
>  
> Definition: 
> - let's say an executor has spark.executor.cores / spark.task.cpus taskSlots, 
> which represent the max number of tasks an executor will process in parallel.
> - the current behaviour of the dynamic allocation is to allocate enough 
> containers to have one taskSlot per task, which minimizes latency, but wastes 
> resources when tasks are small regarding executor allocation and idling 
> overhead. 
> The results using the proposal (described below) over the job sample (600 
> jobs):
> - by using 2 tasks per taskSlot, we get a 5% (against -114%) reduction in 
> resource usage, for a 37% (against 43%) reduction in wall clock time for 
> Spark w.r.t MR
> - by trying to minimize the average resource consumption, I ended up with 6 
> tasks per core, with a 30% resource usage reduction, for a similar wall clock 
> time w.r.t. MR
> What did I try to solve the issue with existing parameters (summing up a few 
> points mentioned in the comments) ?
> - change 

[jira] [Commented] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used

2017-12-26 Thread Julien Cuquemelle (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16303869#comment-16303869
 ] 

Julien Cuquemelle commented on SPARK-22683:
---

[~sandyr], [~andrewor14], could we have your feedback on this proposal, as 
contributors of the dynamic allocation policy ?
thanks in advance.

> DynamicAllocation wastes resources by allocating containers that will barely 
> be used
> 
>
> Key: SPARK-22683
> URL: https://issues.apache.org/jira/browse/SPARK-22683
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Julien Cuquemelle
>  Labels: pull-request-available
>
> While migrating a series of jobs from MR to Spark using dynamicAllocation, 
> I've noticed almost a doubling (+114% exactly) of resource consumption of 
> Spark w.r.t MR, for a wall clock time gain of 43%
> About the context: 
> - resource usage stands for vcore-hours allocation for the whole job, as seen 
> by YARN
> - I'm talking about a series of jobs because we provide our users with a way 
> to define experiments (via UI / DSL) that automatically get translated to 
> Spark / MR jobs and submitted on the cluster
> - we submit around 500 of such jobs each day
> - these jobs are usually one shot, and the amount of processing can vary a 
> lot between jobs, and as such finding an efficient number of executors for 
> each job is difficult to get right, which is the reason I took the path of 
> dynamic allocation.  
> - Some of the tests have been scheduled on an idle queue, some on a full 
> queue.
> - experiments have been conducted with spark.executor-cores = 5 and 10, only 
> results for 5 cores have been reported because efficiency was overall better 
> than with 10 cores
> - the figures I give are averaged over a representative sample of those jobs 
> (about 600 jobs) ranging from tens to thousands splits in the data 
> partitioning and between 400 to 9000 seconds of wall clock time.
> - executor idle timeout is set to 30s;
>  
> Definition: 
> - let's say an executor has spark.executor.cores / spark.task.cpus taskSlots, 
> which represent the max number of tasks an executor will process in parallel.
> - the current behaviour of the dynamic allocation is to allocate enough 
> containers to have one taskSlot per task, which minimizes latency, but wastes 
> resources when tasks are small regarding executor allocation and idling 
> overhead. 
> The results using the proposal (described below) over the job sample (600 
> jobs):
> - by using 2 tasks per taskSlot, we get a 5% (against -114%) reduction in 
> resource usage, for a 37% (against 43%) reduction in wall clock time for 
> Spark w.r.t MR
> - by trying to minimize the average resource consumption, I ended up with 6 
> tasks per core, with a 30% resource usage reduction, for a similar wall clock 
> time w.r.t. MR
> What did I try to solve the issue with existing parameters (summing up a few 
> points mentioned in the comments) ?
> - change dynamicAllocation.maxExecutors: this would need to be adapted for 
> each job (tens to thousands splits can occur), and essentially remove the 
> interest of using the dynamic allocation.
> - use dynamicAllocation.backlogTimeout: 
> - setting this parameter right to avoid creating unused executors is very 
> dependant on wall clock time. One basically needs to solve the exponential 
> ramp up for the target time. So this is not an option for my use case where I 
> don't want a per-job tuning. 
> - I've still done a series of experiments, details in the comments. 
> Result is that after manual tuning, the best I could get was a similar 
> resource consumption at the expense of 20% more wall clock time, or a similar 
> wall clock time at the expense of 60% more resource consumption than what I 
> got using my proposal @ 6 tasks per slot (this value being optimized over a 
> much larger range of jobs as already stated)
> - as mentioned in another comment, tampering with the exponential ramp up 
> might yield task imbalance and such old executors could become contention 
> points for other exes trying to remotely access blocks in the old exes (not 
> witnessed in the jobs I'm talking about, but we did see this behavior in 
> other jobs)
> Proposal: 
> Simply add a tasksPerExecutorSlot parameter, which makes it possible to 
> specify how many tasks a single taskSlot should ideally execute to mitigate 
> the overhead of executor allocation.
> PR: https://github.com/apache/spark/pull/19881



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: 

[jira] [Commented] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used

2017-12-21 Thread Julien Cuquemelle (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16300370#comment-16300370
 ] 

Julien Cuquemelle commented on SPARK-22683:
---

Thanks [~xuefuz] for your tests.

I think it all boils down to :
- use default if you want to minimize latency
- use 2 if you want the easiest resource saving setting (which seems to be 
already very interesting)
- if you want to maximize reource saving you need to measure ...

regarding naming, the taskSlot concept was the first naming that popped up, but 
I happen to read about Flink out of sheer curiosity, and the precisely use this 
concept: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/job_scheduling.html

However, I'm not at all attached to this name, we can provide something else, I 
had thought about something like totalToParallelTasksRatio ... 

Regarding the explonential ramp up, I do agree that it's interest is debatable. 
However, in practise with the default setting of 1s, the ramp up is pretty 
quick, I'm not sure there would be a big difference with an upfront allocation.


> DynamicAllocation wastes resources by allocating containers that will barely 
> be used
> 
>
> Key: SPARK-22683
> URL: https://issues.apache.org/jira/browse/SPARK-22683
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Julien Cuquemelle
>  Labels: pull-request-available
>
> While migrating a series of jobs from MR to Spark using dynamicAllocation, 
> I've noticed almost a doubling (+114% exactly) of resource consumption of 
> Spark w.r.t MR, for a wall clock time gain of 43%
> About the context: 
> - resource usage stands for vcore-hours allocation for the whole job, as seen 
> by YARN
> - I'm talking about a series of jobs because we provide our users with a way 
> to define experiments (via UI / DSL) that automatically get translated to 
> Spark / MR jobs and submitted on the cluster
> - we submit around 500 of such jobs each day
> - these jobs are usually one shot, and the amount of processing can vary a 
> lot between jobs, and as such finding an efficient number of executors for 
> each job is difficult to get right, which is the reason I took the path of 
> dynamic allocation.  
> - Some of the tests have been scheduled on an idle queue, some on a full 
> queue.
> - experiments have been conducted with spark.executor-cores = 5 and 10, only 
> results for 5 cores have been reported because efficiency was overall better 
> than with 10 cores
> - the figures I give are averaged over a representative sample of those jobs 
> (about 600 jobs) ranging from tens to thousands splits in the data 
> partitioning and between 400 to 9000 seconds of wall clock time.
> - executor idle timeout is set to 30s;
>  
> Definition: 
> - let's say an executor has spark.executor.cores / spark.task.cpus taskSlots, 
> which represent the max number of tasks an executor will process in parallel.
> - the current behaviour of the dynamic allocation is to allocate enough 
> containers to have one taskSlot per task, which minimizes latency, but wastes 
> resources when tasks are small regarding executor allocation and idling 
> overhead. 
> The results using the proposal (described below) over the job sample (600 
> jobs):
> - by using 2 tasks per taskSlot, we get a 5% (against -114%) reduction in 
> resource usage, for a 37% (against 43%) reduction in wall clock time for 
> Spark w.r.t MR
> - by trying to minimize the average resource consumption, I ended up with 6 
> tasks per core, with a 30% resource usage reduction, for a similar wall clock 
> time w.r.t. MR
> What did I try to solve the issue with existing parameters (summing up a few 
> points mentioned in the comments) ?
> - change dynamicAllocation.maxExecutors: this would need to be adapted for 
> each job (tens to thousands splits can occur), and essentially remove the 
> interest of using the dynamic allocation.
> - use dynamicAllocation.backlogTimeout: 
> - setting this parameter right to avoid creating unused executors is very 
> dependant on wall clock time. One basically needs to solve the exponential 
> ramp up for the target time. So this is not an option for my use case where I 
> don't want a per-job tuning. 
> - I've still done a series of experiments, details in the comments. 
> Result is that after manual tuning, the best I could get was a similar 
> resource consumption at the expense of 20% more wall clock time, or a similar 
> wall clock time at the expense of 60% more resource consumption than what I 
> got using my proposal @ 6 tasks per slot (this value being optimized over a 
> much larger range of jobs as already stated)
> - as mentioned in another 

[jira] [Updated] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used

2017-12-14 Thread Julien Cuquemelle (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Julien Cuquemelle updated SPARK-22683:
--
Description: 
While migrating a series of jobs from MR to Spark using dynamicAllocation, I've 
noticed almost a doubling (+114% exactly) of resource consumption of Spark 
w.r.t MR, for a wall clock time gain of 43%

About the context: 
- resource usage stands for vcore-hours allocation for the whole job, as seen 
by YARN
- I'm talking about a series of jobs because we provide our users with a way to 
define experiments (via UI / DSL) that automatically get translated to Spark / 
MR jobs and submitted on the cluster
- we submit around 500 of such jobs each day
- these jobs are usually one shot, and the amount of processing can vary a lot 
between jobs, and as such finding an efficient number of executors for each job 
is difficult to get right, which is the reason I took the path of dynamic 
allocation.  
- Some of the tests have been scheduled on an idle queue, some on a full queue.
- experiments have been conducted with spark.executor-cores = 5 and 10, only 
results for 5 cores have been reported because efficiency was overall better 
than with 10 cores
- the figures I give are averaged over a representative sample of those jobs 
(about 600 jobs) ranging from tens to thousands splits in the data partitioning 
and between 400 to 9000 seconds of wall clock time.
- executor idle timeout is set to 30s;
 

Definition: 
- let's say an executor has spark.executor.cores / spark.task.cpus taskSlots, 
which represent the max number of tasks an executor will process in parallel.
- the current behaviour of the dynamic allocation is to allocate enough 
containers to have one taskSlot per task, which minimizes latency, but wastes 
resources when tasks are small regarding executor allocation and idling 
overhead. 

The results using the proposal (described below) over the job sample (600 jobs):
- by using 2 tasks per taskSlot, we get a 5% (against -114%) reduction in 
resource usage, for a 37% (against 43%) reduction in wall clock time for Spark 
w.r.t MR
- by trying to minimize the average resource consumption, I ended up with 6 
tasks per core, with a 30% resource usage reduction, for a similar wall clock 
time w.r.t. MR

What did I try to solve the issue with existing parameters (summing up a few 
points mentioned in the comments) ?
- change dynamicAllocation.maxExecutors: this would need to be adapted for each 
job (tens to thousands splits can occur), and essentially remove the interest 
of using the dynamic allocation.
- use dynamicAllocation.backlogTimeout: 
- setting this parameter right to avoid creating unused executors is very 
dependant on wall clock time. One basically needs to solve the exponential ramp 
up for the target time. So this is not an option for my use case where I don't 
want a per-job tuning. 
- I've still done a series of experiments, details in the comments. Result 
is that after manual tuning, the best I could get was a similar resource 
consumption at the expense of 20% more wall clock time, or a similar wall clock 
time at the expense of 60% more resource consumption than what I got using my 
proposal @ 6 tasks per slot (this value being optimized over a much larger 
range of jobs as already stated)
- as mentioned in another comment, tampering with the exponential ramp up 
might yield task imbalance and such old executors could become contention 
points for other exes trying to remotely access blocks in the old exes (not 
witnessed in the jobs I'm talking about, but we did see this behavior in other 
jobs)

Proposal: 

Simply add a tasksPerExecutorSlot parameter, which makes it possible to specify 
how many tasks a single taskSlot should ideally execute to mitigate the 
overhead of executor allocation.

PR: https://github.com/apache/spark/pull/19881

  was:
While migrating a series of jobs from MR to Spark using dynamicAllocation, I've 
noticed almost a doubling (+114% exactly) of resource consumption of Spark 
w.r.t MR, for a wall clock time gain of 43%

About the context: 
- resource usage stands for vcore-hours allocation for the whole job, as seen 
by YARN
- I'm talking about a series of jobs because we provide our users with a way to 
define experiments (via UI / DSL) that automatically get translated to Spark / 
MR jobs and submitted on the cluster
- we submit around 500 of such jobs each day
- these jobs are usually one shot, and the amount of processing can vary a lot 
between jobs, and as such finding an efficient number of executors for each job 
is difficult to get right, which is the reason I took the path of dynamic 
allocation.  
- Some of the tests have been scheduled on an idle queue, some on a full queue.
- experiments have been conducted with spark.executor-cores = 5 and 10, only 
results for 5 cores have been reported because efficiency was overall 

[jira] [Comment Edited] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used

2017-12-14 Thread Julien Cuquemelle (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291141#comment-16291141
 ] 

Julien Cuquemelle edited comment on SPARK-22683 at 12/14/17 5:09 PM:
-

[~tgraves], thanks a lot for your remarks, I've updated the description and 
also included a summary of various results and comments I got.

Answers about your other questions: 

"The fact you are asking for 5+cores per executor will naturally waste more 
resources when the executor isn't being used"
In fact the resource usage will be similar with fewer cores, because if I set 1 
core per exe, the dynamic allocation will ask for 5 times more exes

"But if we can find something that by defaults works better for the majority of 
workloads that it makes sense to improve"
I'm pretty sure 2 tasks per Slot would work for a very large set of workloads, 
especially short jobs

"As with any config though, how do I know what to set the tasksPerSlot as? it 
requires configuration and it could affect performance."
I agree, what I'm trying to show in my argumentation is that:
- I don't have any parameter today to do what I want without optimizing each 
job, which is not feasible in my use case
- the granularity of the efficiency of this parameter seems coarser that other 
parameters (sweetspot values are valid on a more broader range of jobs than 
maxNbExe or backLogTimeout
- it seems to me some settings are quite simple to understand : if I want to 
minimize latency, let the default value; If I want to save some resources, use 
a value of 2; If I want to really minimize resource consumption, find a higher 
number by analysis or aim at meeting a time budget

About dynamic allocation : 
with the default setting of 1s of backlogTimeout, the exponential ramp up is in 
practise very similar to an upfront request, regarding the duration of jobs. I 
think upfront allocation could be used instead of exponential, but this 
wouldn't change the issue which is related to the target number of exes
I don't think asking upfront vs exponential has any effect over how Yarn yields 
containers.

"Above you say "When running with 6 tasks per executor slot, our Spark jobs 
consume in average 30% less vcorehours than the MR jobs, this setting being 
valid for different workload sizes." Was this with this patch applied or 
without?"
The patch was applied, if not you cannot set the number of tasks per taskSlot 
(I mentionned "executor slot", which is incorrect, I was refering to taskSlot)

"the WallTimeGain wrt MR (%) , does this mean positive numbers ran faster then 
MR? "
Positive numbers mean faster in Spark.

"why is running with 6 or 8 slower? is it shuffle issues or mistuning with gc, 
or just unknown overhead?"
running with 6 tasks per taskSlot means that 6 tasks will be processed 
sequentially by 6 times less task slots


was (Author: jcuquemelle):
[~tgraves], thanks a lot for your remarks, I've updated the description and 
also included a summary of various results and comments I got.

Answers about your other questions: 

"The fact you are asking for 5+cores per executor will naturally waste more 
resources when the executor isn't being used"
In fact the resource usage will be similar with fewer cores, because if I set 1 
core per exe, the dynamic allocation will ask for 5 times more exes

"But if we can find something that by defaults works better for the majority of 
workloads that it makes sense to improve"
I'm pretty sure 2 tasks per Slot would work for a very large set of workloads, 
especially short jobs

"As with any config though, how do I know what to set the tasksPerSlot as? it 
requires configuration and it could affect performance."
I agree, what I'm trying to show in my argumentation is that:
- I don't have any parameter today to do what I want without optimizing each 
job, which is not feasible in my use case
- the granularity of the efficiency of this parameter seems coarser that other 
parameters (sweetspot values are valid on a more broader range of jobs than 
maxNbExe or backLogTimeout
- it seems to me some settings are quite simple to understand : if I want to 
minimize latency, let the default value; If I want to save some resources, use 
a value of 2; If I want to really minimize resource consumption, find a higher 
number by analysis or aim at maximizing a time budget

About dynamic allocation : 
with the default setting of 1s of backlogTimeout, the exponential ramp up is in 
practise very similar to an upfront request, regarding the duration of jobs. I 
think upfront allocation could be used instead of exponential, but this 
wouldn't change the issue which is related to the target number of exes
I don't think asking upfront vs exponential has any effect over how Yarn yields 
containers.

"Above you say "When running with 6 tasks per executor slot, our Spark jobs 
consume in average 30% less vcorehours than the MR 

[jira] [Comment Edited] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used

2017-12-14 Thread Julien Cuquemelle (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291141#comment-16291141
 ] 

Julien Cuquemelle edited comment on SPARK-22683 at 12/14/17 5:09 PM:
-

[~tgraves], thanks a lot for your remarks, I've updated the description and 
also included a summary of various results and comments I got.

Answers about your other questions: 

"The fact you are asking for 5+cores per executor will naturally waste more 
resources when the executor isn't being used"
In fact the resource usage will be similar with fewer cores, because if I set 1 
core per exe, the dynamic allocation will ask for 5 times more exes

"But if we can find something that by defaults works better for the majority of 
workloads that it makes sense to improve"
I'm pretty sure 2 tasks per Slot would work for a very large set of workloads, 
especially short jobs

"As with any config though, how do I know what to set the tasksPerSlot as? it 
requires configuration and it could affect performance."
I agree, what I'm trying to show in my argumentation is that:
- I don't have any parameter today to do what I want without optimizing each 
job, which is not feasible in my use case
- the granularity of the efficiency of this parameter seems coarser that other 
parameters (sweetspot values are valid on a more broader range of jobs than 
maxNbExe or backLogTimeout
- it seems to me some settings are quite simple to understand : if I want to 
minimize latency, let the default value; If I want to save some resources, use 
a value of 2; If I want to really minimize resource consumption, find a higher 
number by analysis or aim at maximizing a time budget

About dynamic allocation : 
with the default setting of 1s of backlogTimeout, the exponential ramp up is in 
practise very similar to an upfront request, regarding the duration of jobs. I 
think upfront allocation could be used instead of exponential, but this 
wouldn't change the issue which is related to the target number of exes
I don't think asking upfront vs exponential has any effect over how Yarn yields 
containers.

"Above you say "When running with 6 tasks per executor slot, our Spark jobs 
consume in average 30% less vcorehours than the MR jobs, this setting being 
valid for different workload sizes." Was this with this patch applied or 
without?"
The patch was applied, if not you cannot set the number of tasks per taskSlot 
(I mentionned "executor slot", which is incorrect, I was refering to taskSlot)

"the WallTimeGain wrt MR (%) , does this mean positive numbers ran faster then 
MR? "
Positive numbers mean faster in Spark.

"why is running with 6 or 8 slower? is it shuffle issues or mistuning with gc, 
or just unknown overhead?"
running with 6 tasks per taskSlot means that 6 tasks will be processed 
sequentially by 6 times less task slots


was (Author: jcuquemelle):
[~tgraves], thanks a lot for your remarks, I've updated the description and 
also included a summary of various results and comments I got.

Answers about your other questions: 

"The fact you are asking for 5+cores per executor will naturally waste more 
resources when the executor isn't being used"
In fact the resource usage will be similar with fewer cores, because if I set 1 
core per exe, the dynamic allocation will ask for 5 times more exes

"But if we can find something that by defaults works better for the majority of 
workloads that it makes sense to improve"
I'm pretty sure 2 tasks per Slot would work for a very large set of workloads, 
especially short jobs

"As with any config though, how do I know what to set the tasksPerSlot as? it 
requires configuration and it could affect performance."
I agree, what I'm trying to show in my argumentation is that:
- I don't have any parameter today to do what I want without optimizing each 
job, which is not feasible in my use case
- the granularity of the efficiency of this parameter seems coarser that other 
parameters (sweetspot values are valid on a more broader range of jobs than 
maxNbExe or backLogTimeout
- it seems to me some settings are quite simple to understand : if I want to 
minimize latency, let the default value; If I want to save some resources, use 
a value of 2; If I want to really minimize resource consumption, do an analysis 
or aim at maximizing a time budget

About dynamic allocation : 
with the default setting of 1s of backlogTimeout, the exponential ramp up is in 
practise very similar to an upfront request, regarding the duration of jobs. I 
think upfront allocation could be used instead of exponential, but this 
wouldn't change the issue which is related to the target number of exes
I don't think asking upfront vs exponential has any effect over how Yarn yields 
containers.

"Above you say "When running with 6 tasks per executor slot, our Spark jobs 
consume in average 30% less vcorehours than the MR jobs, this 

[jira] [Commented] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used

2017-12-14 Thread Julien Cuquemelle (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291156#comment-16291156
 ] 

Julien Cuquemelle commented on SPARK-22683:
---

I did see SPARK-16158 before opening a new ticket, my proposal seemed simple 
enough for me not to need a full pluggable policy (with which I do agree, but 
seems much more difficult to be accepted IMHO)
[~tgraves], you mentioned yourself that the current policy could be improved, 
and that the complexity from the users' point of view should not be overlooked 
:-)

> DynamicAllocation wastes resources by allocating containers that will barely 
> be used
> 
>
> Key: SPARK-22683
> URL: https://issues.apache.org/jira/browse/SPARK-22683
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Julien Cuquemelle
>  Labels: pull-request-available
>
> While migrating a series of jobs from MR to Spark using dynamicAllocation, 
> I've noticed almost a doubling (+114% exactly) of resource consumption of 
> Spark w.r.t MR, for a wall clock time gain of 43%
> About the context: 
> - resource usage stands for vcore-hours allocation for the whole job, as seen 
> by YARN
> - I'm talking about a series of jobs because we provide our users with a way 
> to define experiments (via UI / DSL) that automatically get translated to 
> Spark / MR jobs and submitted on the cluster
> - we submit around 500 of such jobs each day
> - these jobs are usually one shot, and the amount of processing can vary a 
> lot between jobs, and as such finding an efficient number of executors for 
> each job is difficult to get right, which is the reason I took the path of 
> dynamic allocation.  
> - Some of the tests have been scheduled on an idle queue, some on a full 
> queue.
> - experiments have been conducted with spark.executor-cores = 5 and 10, only 
> results for 5 cores have been reported because efficiency was overall better 
> than with 10 cores
> - the figures I give are averaged over a representative sample of those jobs 
> (about 600 jobs) ranging from tens to thousands splits in the data 
> partitioning and between 400 to 9000 seconds of wall clock time.
> - executor idle timeout is set to 30s;
>  
> Definition: 
> - let's say an executor has spark.executor.cores / spark.task.cpus taskSlots, 
> which represent the max number of tasks an executor will process in parallel.
> - the current behaviour of the dynamic allocation is to allocate enough 
> containers to have one taskSlot per task, which minimizes latency, but wastes 
> resources when tasks are small regarding executor allocation and idling 
> overhead. 
> The results using the proposal (described below) over the job sample (600 
> jobs):
> - by using 2 tasks per taskSlot, we get a 5% (against -114%) reduction in 
> resource usage, for a 37% (against 43%) reduction in wall clock time for 
> Spark w.r.t MR
> - by trying to minimize the average resource consumption, I ended up with 6 
> tasks per core, with a 30% resource usage reduction, for a similar wall clock 
> time w.r.t. MR
> What did I try to mitigate this (summing up a few points mentioned in the 
> comments)?
> - change dynamicAllocation.maxExecutors: this would need to be adapted for 
> each job (tens to thousands splits can occur), and essentially remove the 
> interest of using the dynamic allocation.
> - use dynamicAllocation.backlogTimeout: 
> - setting this parameter right to avoid creating unused executors is very 
> dependant on wall clock time. One basically needs to solve the exponential 
> ramp up for the target time. So this is not an option for my use case where I 
> don't want a per-job tuning. 
> - I've still done a series of experiments, details in the comments. 
> Result is that after manual tuning, the best I could get was a similar 
> resource consumption at the expense of 20% more wall clock time, or a similar 
> wall clock time at the expense of 60% more resource consumption than what I 
> got using my proposal @ 6 tasks per slot (this value being optimized over a 
> much larger range of jobs as already stated)
> - as mentioned in another comment, tampering with the exponential ramp up 
> might yield task imbalance and such old executors could become contention 
> points for other exes trying to remotely access blocks in the old exes (not 
> witnessed in the jobs I'm talking about, but we did see this behavior in 
> other jobs)
> Proposal: 
> Simply add a tasksPerExecutorSlot parameter, which makes it possible to 
> specify how many tasks a single taskSlot should ideally execute to mitigate 
> the overhead of executor allocation.
> PR: https://github.com/apache/spark/pull/19881



--
This message was sent by Atlassian 

[jira] [Commented] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used

2017-12-14 Thread Julien Cuquemelle (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291141#comment-16291141
 ] 

Julien Cuquemelle commented on SPARK-22683:
---

[~tgraves], thanks a lot for your remarks, I've updated the description and 
also included a summary of various results and comments I got.

Answers about your other questions: 

"The fact you are asking for 5+cores per executor will naturally waste more 
resources when the executor isn't being used"
In fact the resource usage will be similar with fewer cores, because if I set 1 
core per exe, the dynamic allocation will ask for 5 times more exes

"But if we can find something that by defaults works better for the majority of 
workloads that it makes sense to improve"
I'm pretty sure 2 tasks per Slot would work for a very large set of workloads, 
especially short jobs

"As with any config though, how do I know what to set the tasksPerSlot as? it 
requires configuration and it could affect performance."
I agree, what I'm trying to show in my argumentation is that:
- I don't have any parameter today to do what I want without optimizing each 
job, which is not feasible in my use case
- the granularity of the efficiency of this parameter seems coarser that other 
parameters (sweetspot values are valid on a more broader range of jobs than 
maxNbExe or backLogTimeout
- it seems to me some settings are quite simple to understand : if I want to 
minimize latency, let the default value; If I want to save some resources, use 
a value of 2; If I want to really minimize resource consumption, do an analysis 
or aim at maximizing a time budget

About dynamic allocation : 
with the default setting of 1s of backlogTimeout, the exponential ramp up is in 
practise very similar to an upfront request, regarding the duration of jobs. I 
think upfront allocation could be used instead of exponential, but this 
wouldn't change the issue which is related to the target number of exes
I don't think asking upfront vs exponential has any effect over how Yarn yields 
containers.

"Above you say "When running with 6 tasks per executor slot, our Spark jobs 
consume in average 30% less vcorehours than the MR jobs, this setting being 
valid for different workload sizes." Was this with this patch applied or 
without?"
The patch was applied, if not you cannot set the number of tasks per taskSlot 
(I mentionned "executor slot", which is incorrect, I was refering to taskSlot)

"the WallTimeGain wrt MR (%) , does this mean positive numbers ran faster then 
MR? "
Positive numbers mean faster in Spark.

"why is running with 6 or 8 slower? is it shuffle issues or mistuning with gc, 
or just unknown overhead?"
running with 6 tasks per taskSlot means that 6 tasks will be processed 
sequentially by 6 times less task slots

> DynamicAllocation wastes resources by allocating containers that will barely 
> be used
> 
>
> Key: SPARK-22683
> URL: https://issues.apache.org/jira/browse/SPARK-22683
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Julien Cuquemelle
>  Labels: pull-request-available
>
> While migrating a series of jobs from MR to Spark using dynamicAllocation, 
> I've noticed almost a doubling (+114% exactly) of resource consumption of 
> Spark w.r.t MR, for a wall clock time gain of 43%
> About the context: 
> - resource usage stands for vcore-hours allocation for the whole job, as seen 
> by YARN
> - I'm talking about a series of jobs because we provide our users with a way 
> to define experiments (via UI / DSL) that automatically get translated to 
> Spark / MR jobs and submitted on the cluster
> - we submit around 500 of such jobs each day
> - these jobs are usually one shot, and the amount of processing can vary a 
> lot between jobs, and as such finding an efficient number of executors for 
> each job is difficult to get right, which is the reason I took the path of 
> dynamic allocation.  
> - Some of the tests have been scheduled on an idle queue, some on a full 
> queue.
> - experiments have been conducted with spark.executor-cores = 5 and 10, only 
> results for 5 cores have been reported because efficiency was overall better 
> than with 10 cores
> - the figures I give are averaged over a representative sample of those jobs 
> (about 600 jobs) ranging from tens to thousands splits in the data 
> partitioning and between 400 to 9000 seconds of wall clock time.
> - executor idle timeout is set to 30s;
>  
> Definition: 
> - let's say an executor has spark.executor.cores / spark.task.cpus taskSlots, 
> which represent the max number of tasks an executor will process in parallel.
> - the current behaviour of the dynamic allocation is to allocate enough 
> 

[jira] [Updated] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used

2017-12-14 Thread Julien Cuquemelle (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Julien Cuquemelle updated SPARK-22683:
--
Description: 
While migrating a series of jobs from MR to Spark using dynamicAllocation, I've 
noticed almost a doubling (+114% exactly) of resource consumption of Spark 
w.r.t MR, for a wall clock time gain of 43%

About the context: 
- resource usage stands for vcore-hours allocation for the whole job, as seen 
by YARN
- I'm talking about a series of jobs because we provide our users with a way to 
define experiments (via UI / DSL) that automatically get translated to Spark / 
MR jobs and submitted on the cluster
- we submit around 500 of such jobs each day
- these jobs are usually one shot, and the amount of processing can vary a lot 
between jobs, and as such finding an efficient number of executors for each job 
is difficult to get right, which is the reason I took the path of dynamic 
allocation.  
- Some of the tests have been scheduled on an idle queue, some on a full queue.
- experiments have been conducted with spark.executor-cores = 5 and 10, only 
results for 5 cores have been reported because efficiency was overall better 
than with 10 cores
- the figures I give are averaged over a representative sample of those jobs 
(about 600 jobs) ranging from tens to thousands splits in the data partitioning 
and between 400 to 9000 seconds of wall clock time.
- executor idle timeout is set to 30s;
 

Definition: 
- let's say an executor has spark.executor.cores / spark.task.cpus taskSlots, 
which represent the max number of tasks an executor will process in parallel.
- the current behaviour of the dynamic allocation is to allocate enough 
containers to have one taskSlot per task, which minimizes latency, but wastes 
resources when tasks are small regarding executor allocation and idling 
overhead. 

The results using the proposal (described below) over the job sample (600 jobs):
- by using 2 tasks per taskSlot, we get a 5% (against -114%) reduction in 
resource usage, for a 37% (against 43%) reduction in wall clock time for Spark 
w.r.t MR
- by trying to minimize the average resource consumption, I ended up with 6 
tasks per core, with a 30% resource usage reduction, for a similar wall clock 
time w.r.t. MR

What did I try to mitigate this (summing up a few points mentioned in the 
comments)?
- change dynamicAllocation.maxExecutors: this would need to be adapted for each 
job (tens to thousands splits can occur), and essentially remove the interest 
of using the dynamic allocation.
- use dynamicAllocation.backlogTimeout: 
- setting this parameter right to avoid creating unused executors is very 
dependant on wall clock time. One basically needs to solve the exponential ramp 
up for the target time. So this is not an option for my use case where I don't 
want a per-job tuning. 
- I've still done a series of experiments, details in the comments. Result 
is that after manual tuning, the best I could get was a similar resource 
consumption at the expense of 20% more wall clock time, or a similar wall clock 
time at the expense of 60% more resource consumption than what I got using my 
proposal @ 6 tasks per slot (this value being optimized over a much larger 
range of jobs as already stated)
- as mentioned in another comment, tampering with the exponential ramp up 
might yield task imbalance and such old executors could become contention 
points for other exes trying to remotely access blocks in the old exes (not 
witnessed in the jobs I'm talking about, but we did see this behavior in other 
jobs)

Proposal: 

Simply add a tasksPerExecutorSlot parameter, which makes it possible to specify 
how many tasks a single taskSlot should ideally execute to mitigate the 
overhead of executor allocation.

PR: https://github.com/apache/spark/pull/19881

  was:
let's say an executor has spark.executor.cores / spark.task.cpus taskSlots

The current dynamic allocation policy allocates enough executors
to have each taskSlot execute a single task, which minimizes latency, 
but wastes resources when tasks are small regarding executor allocation
and idling overhead. 

By adding the tasksPerExecutorSlot, it is made possible to specify how many 
tasks
a single slot should ideally execute to mitigate the overhead of executor
allocation.

PR: https://github.com/apache/spark/pull/19881


> DynamicAllocation wastes resources by allocating containers that will barely 
> be used
> 
>
> Key: SPARK-22683
> URL: https://issues.apache.org/jira/browse/SPARK-22683
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Julien Cuquemelle
>  Labels: pull-request-available
>
> While migrating a 

[jira] [Comment Edited] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used

2017-12-14 Thread Julien Cuquemelle (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16280548#comment-16280548
 ] 

Julien Cuquemelle edited comment on SPARK-22683 at 12/14/17 3:25 PM:
-

I don't understand your statement about delaying executor addition ? I want to 
cap the number of executors in an adaptive way regarding the current number of 
tasks, not delay their creation.

Doing this with dynamicAllocation.maxExecutors requires each job to be tuned 
for efficiency; when we're doing experiments, a lot of jobs are one shot, so 
they can't be fine tuned.
The proposal gives a way to have an adaptive behaviour for a family of jobs.

Regarding slowing the ramp up of executors with schedulerBacklogTimeout, I've 
made experiments to play with this parameter; I have made 2 series of 
experiments (7 similar jobs on each test case, average figures reported in the 
following table), one on a busy queue, and the other on an idle queue. I'll 
report only the idle queue, as the figures 
on the busy queue are even worse for the schedulerBacklogTimeout approach: 

First row is using the default 1s for the schedulerBacklogTimeout, and uses the 
6 tasks per executorSlot I've mentioned above, other rows use the default 
dynamicAllocation behaviour and only change schedulerBacklogTimeout

||SparkWallTimeSec||Spk-vCores-H||taskPerExeSlot||schedulerBacklogTimeout||
|693.571429|37.142857|6|1.0|
|584.857143|69.571429|1|30.0|
|763.428571|54.285714|1|60.0|
|826.714286|39.571429|1|90.0|

So basically I can tune the backlogTimeout to get a similar vCores-H 
consumption at the expense of almost 20% more wallClockTime, or I can tune the 
parameter to get about the same wallClockTime at the expense of about 60% more 
vcoreH consumption (very roughly extrapolated between 30 and 60 secs for 
schedulerBacklogTimeout).

It does not seem to solve the issue I'm trying to address, moreover this would 
again need to be tuned for each specific job's duration (to find the 90s 
timeout to get the similar resource consumption, I had to solve the exponential 
ramp-up with the duration of the already run job, which is not feasible in 
experimental use cases ).
The previous experiments that allowed me to find the sweet spot at 6 tasks per 
slot has involved job wallClockTimes between 400 and 9000 seconds

Another way to have a look at this new parameter I'm proposing is to have a 
simple way to tune the latency / resource consumption tradeoff. 


was (Author: jcuquemelle):
I don't understand your statement about delaying executor addition ? I want to 
cap the number of executors in an adaptive way regarding the current number of 
tasks, not delay their creation.

Doing this with dynamicAllocation.maxExecutors requires each job to be tuned 
for efficiency; when we're doing experiments, a lot of jobs are one shot, so 
they can't be fine tuned.
The proposal gives a way to have an adaptive behaviour for a family of jobs.

Regarding slowing the ramp up of executors with schedulerBacklogTimeout, I've 
made experiments to play with this parameter; I have made 2 series of 
experiments (7 jobs on each test case, average figures reported in the 
following table), one on a busy queue, and the other on an idle queue. I'll 
report only the idle queue, as the figures 
on the busy queue are even worse for the schedulerBacklogTimeout approach: 

First row is using the default 1s for the schedulerBacklogTimeout, and uses the 
6 tasks per executorSlot I've mentioned above, other rows use the default 
dynamicAllocation behaviour and only change schedulerBacklogTimeout

||SparkWallTimeSec||Spk-vCores-H||taskPerExeSlot||schedulerBacklogTimeout||
|693.571429|37.142857|6|1.0|
|584.857143|69.571429|1|30.0|
|763.428571|54.285714|1|60.0|
|826.714286|39.571429|1|90.0|

So basically I can tune the backlogTimeout to get a similar vCores-H 
consumption at the expense of almost 20% more wallClockTime, or I can tune the 
parameter to get about the same wallClockTime at the expense of about 60% more 
vcoreH consumption (very roughly extrapolated between 30 and 60 secs for 
schedulerBacklogTimeout).

It does not seem to solve the issue I'm trying to address, moreover this would 
again need to be tuned for each specific job's duration (to find the 90s 
timeout to get the similar resource consumption, I had to solve the exponential 
ramp-up with the duration of the already run job, which is not feasible in 
experimental use cases ).
The previous experiments that allowed me to find the sweet spot at 6 tasks per 
slot has involved job wallClockTimes between 400 and 9000 seconds

Another way to have a look at this new parameter I'm proposing is to have a 
simple way to tune the latency / resource consumption tradeoff. 

> DynamicAllocation wastes resources by allocating containers that will barely 
> be used
> 

[jira] [Commented] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used

2017-12-13 Thread Julien Cuquemelle (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16289101#comment-16289101
 ] 

Julien Cuquemelle commented on SPARK-22683:
---

Hi [~srowen], 

Just a small remark about the use of schedulerBacklogTimeout to slow down the 
build up of executors: 
We have recently experienced an issue where the executors allocation by Yarn 
was slow; this resulted in the first executors having done a lot of tasks 
(hence storing a lot of blocks), and become a contention point in the 
subsequent stage, when all executors have finally been allocated and try to 
access the first exe's blocks remotely.

So I guess the fact that the schedulerBacklogTimeout is very short and allows 
to quicky ramp up the number of executors is not trivial to alter if we don't 
want to create a too big imbalance in blocks stored by executors.

So I don't think that using this parameter to alter resource usage by the 
dynamic allocation is a good choice, for this very reason and also because of 
the fact that it is highly dependant on the wall clock time of the job.

> DynamicAllocation wastes resources by allocating containers that will barely 
> be used
> 
>
> Key: SPARK-22683
> URL: https://issues.apache.org/jira/browse/SPARK-22683
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Julien Cuquemelle
>  Labels: pull-request-available
>
> let's say an executor has spark.executor.cores / spark.task.cpus taskSlots
> The current dynamic allocation policy allocates enough executors
> to have each taskSlot execute a single task, which minimizes latency, 
> but wastes resources when tasks are small regarding executor allocation
> and idling overhead. 
> By adding the tasksPerExecutorSlot, it is made possible to specify how many 
> tasks
> a single slot should ideally execute to mitigate the overhead of executor
> allocation.
> PR: https://github.com/apache/spark/pull/19881



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used

2017-12-13 Thread Julien Cuquemelle (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16289046#comment-16289046
 ] 

Julien Cuquemelle commented on SPARK-22683:
---

Hi [~xuefuz], 
Thanks for noticing you also experience resource usage efficiency issues when 
using dynamic allocation. I'll take the opportunity to provide a few 
clarifications: 

- regarding tuning, the study I made is precisely about tuning for a type of 
workload and not specific jobs. Our users launch several hundreds of such jobs 
per day (the size of which spanning a wide range as already stated); the 
figures I gave where computed over a representative sample of such jobs.

- The resource waste is not specific to MR-style jobs, as this over-allocation 
of executors will happen every time a new stage is launched with more tasks 
than available taskSlots. I think that it would be great to allow the 
dynamicAllocation to be resource efficient, while still giving the benefit of 
elasticity over different stages.

- According to the tuning guide, Spark has been designed for larger executors 
(5 cores regularly reported as a sweet spot) that should execute a large number 
of small tasks ("as short as 200ms"), to mutualize / mitigate various 
overheads,IOs, ..., so I'm not sure going back to smaller executors is the 
right move

> DynamicAllocation wastes resources by allocating containers that will barely 
> be used
> 
>
> Key: SPARK-22683
> URL: https://issues.apache.org/jira/browse/SPARK-22683
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Julien Cuquemelle
>  Labels: pull-request-available
>
> let's say an executor has spark.executor.cores / spark.task.cpus taskSlots
> The current dynamic allocation policy allocates enough executors
> to have each taskSlot execute a single task, which minimizes latency, 
> but wastes resources when tasks are small regarding executor allocation
> and idling overhead. 
> By adding the tasksPerExecutorSlot, it is made possible to specify how many 
> tasks
> a single slot should ideally execute to mitigate the overhead of executor
> allocation.
> PR: https://github.com/apache/spark/pull/19881



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used

2017-12-13 Thread Julien Cuquemelle (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Julien Cuquemelle updated SPARK-22683:
--
Description: 
let's say an executor has spark.executor.cores / spark.task.cpus taskSlots

The current dynamic allocation policy allocates enough executors
to have each taskSlot execute a single task, which minimizes latency, 
but wastes resources when tasks are small regarding executor allocation
and idling overhead. 

By adding the tasksPerExecutorSlot, it is made possible to specify how many 
tasks
a single slot should ideally execute to mitigate the overhead of executor
allocation.

PR: https://github.com/apache/spark/pull/19881

  was:
let's say an executor has spark.executor.cores / spark.task.cpus taskSlots

The current dynamic allocation policy allocates enough executors
to have each taskSlot execute a single task, which minimizes latency, 
but wastes resources when tasks are small regarding executor allocation
overhead. 

By adding the tasksPerExecutorSlot, it is made possible to specify how many 
tasks
a single slot should ideally execute to mitigate the overhead of executor
allocation.

PR: https://github.com/apache/spark/pull/19881


> DynamicAllocation wastes resources by allocating containers that will barely 
> be used
> 
>
> Key: SPARK-22683
> URL: https://issues.apache.org/jira/browse/SPARK-22683
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Julien Cuquemelle
>  Labels: pull-request-available
>
> let's say an executor has spark.executor.cores / spark.task.cpus taskSlots
> The current dynamic allocation policy allocates enough executors
> to have each taskSlot execute a single task, which minimizes latency, 
> but wastes resources when tasks are small regarding executor allocation
> and idling overhead. 
> By adding the tasksPerExecutorSlot, it is made possible to specify how many 
> tasks
> a single slot should ideally execute to mitigate the overhead of executor
> allocation.
> PR: https://github.com/apache/spark/pull/19881



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used

2017-12-13 Thread Julien Cuquemelle (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Julien Cuquemelle updated SPARK-22683:
--
Summary: DynamicAllocation wastes resources by allocating containers that 
will barely be used  (was: Allow tuning the number of dynamically allocated 
executors wrt task number)

> DynamicAllocation wastes resources by allocating containers that will barely 
> be used
> 
>
> Key: SPARK-22683
> URL: https://issues.apache.org/jira/browse/SPARK-22683
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Julien Cuquemelle
>  Labels: pull-request-available
>
> let's say an executor has spark.executor.cores / spark.task.cpus taskSlots
> The current dynamic allocation policy allocates enough executors
> to have each taskSlot execute a single task, which minimizes latency, 
> but wastes resources when tasks are small regarding executor allocation
> overhead. 
> By adding the tasksPerExecutorSlot, it is made possible to specify how many 
> tasks
> a single slot should ideally execute to mitigate the overhead of executor
> allocation.
> PR: https://github.com/apache/spark/pull/19881



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22683) Allow tuning the number of dynamically allocated executors wrt task number

2017-12-12 Thread Julien Cuquemelle (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16286253#comment-16286253
 ] 

Julien Cuquemelle edited comment on SPARK-22683 at 12/12/17 3:14 PM:
-

The impression I get from our discussion is that you mainly focus on the 
latency of the jobs, and that the current setting is optimized for that, which 
is why you consider the current setup sufficient.

If you consider your previous example from a resource usage point of view, my 
proposal would allow to have about the same resource usage in both scenarios, 
but the current setup doubles the resource usage of the workload with small 
tasks... 

I've tried to experiment with the parameters you've proposed, but right now I 
don't have a solution to optimize my type of workload (not every single job) 
for resource consumption. 

I don't know if the majority of Spark users run on idle clusters, but ours is 
routinely full, so for us resource usage is more important than latency.


was (Author: jcuquemelle):
The impression I get from our discussion is that you mainly focus on the 
latency of the jobs, and that the current setting is optimized for that, which 
is why you consider the current setup sufficient.

If you consider your previous example from a resource usage point of view, my 
proposal would allow to have about the same resource usage in both scenarios, 
but the current setup doubles the resource usage of the workload with small 
tasks... 

I've tried to experiment with the parameters you've proposed, but right now I 
don't have a solution to optimize my type of workload (nor every single job) 
for resource consumption. 

I don't know if the majority of Spark users run on idle clusters, but ours is 
routinely full, so for us resource usage is more important than latency.

> Allow tuning the number of dynamically allocated executors wrt task number
> --
>
> Key: SPARK-22683
> URL: https://issues.apache.org/jira/browse/SPARK-22683
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Julien Cuquemelle
>  Labels: pull-request-available
>
> let's say an executor has spark.executor.cores / spark.task.cpus taskSlots
> The current dynamic allocation policy allocates enough executors
> to have each taskSlot execute a single task, which minimizes latency, 
> but wastes resources when tasks are small regarding executor allocation
> overhead. 
> By adding the tasksPerExecutorSlot, it is made possible to specify how many 
> tasks
> a single slot should ideally execute to mitigate the overhead of executor
> allocation.
> PR: https://github.com/apache/spark/pull/19881



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22683) Allow tuning the number of dynamically allocated executors wrt task number

2017-12-11 Thread Julien Cuquemelle (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16286253#comment-16286253
 ] 

Julien Cuquemelle commented on SPARK-22683:
---

The impression I get from our discussion is that you mainly focus on the 
latency of the jobs, and that the current setting is optimized for that, which 
is why you consider the current setup sufficient.

If you consider your previous example from a resource usage point of view, my 
proposal would allow to have about the same resource usage in both scenarios, 
but the current setup doubles the resource usage of the workload with small 
tasks... 

I've tried to experiment with the parameters you've proposed, but right now I 
don't have a solution to optimize my type of workload (nor every single job) 
for resource consumption. 

I don't know if the majority of Spark users run on idle clusters, but ours is 
routinely full, so for us resource usage is more important than latency.

> Allow tuning the number of dynamically allocated executors wrt task number
> --
>
> Key: SPARK-22683
> URL: https://issues.apache.org/jira/browse/SPARK-22683
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Julien Cuquemelle
>  Labels: pull-request-available
>
> let's say an executor has spark.executor.cores / spark.task.cpus taskSlots
> The current dynamic allocation policy allocates enough executors
> to have each taskSlot execute a single task, which minimizes latency, 
> but wastes resources when tasks are small regarding executor allocation
> overhead. 
> By adding the tasksPerExecutorSlot, it is made possible to specify how many 
> tasks
> a single slot should ideally execute to mitigate the overhead of executor
> allocation.
> PR: https://github.com/apache/spark/pull/19881



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22683) Allow tuning the number of dynamically allocated executors wrt task number

2017-12-11 Thread Julien Cuquemelle (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16286032#comment-16286032
 ] 

Julien Cuquemelle edited comment on SPARK-22683 at 12/11/17 3:21 PM:
-

I'd like to point out that when refering to "my use case", it is actually a 
wide range of job sizes, ranging from tens to thousands splits in the data 
partitioning and between 400 to 9000 seconds of wall clock time.

Maybe I didn't emphasize enough in my last answer that the figures I presented 
using schedulerBacklogTimeout needed tuning for a specific job, and that the 
resulting value of the parameter will not be optimal for other job settings. My 
proposal allows to have a sensible default over the whole range of jobs.

This is also true of the maxExecutors parameters, each job would need to be 
tuned using this parameter.

Regarding "there's nothing inherently more efficient about the scheduling 
policy you're proposing", I might add a few information: so far I have aimed at 
having the same latency as the legacy MR jobs, which yield the 6 tasksPerSlot 
figure and a 30% resource usage decrease. 

Here are the figures with other values of taskPerSlot, in terms of gain wrt the 
corresponding MR jobs: 
||taskPerSlot||VcoreGain wrt MR (%)||WallTimeGain wrt MR (%)||
|1.0|-114.329984|43.777071|
|2.0|5.373456|37.610840|
|3.0|20.785099|28.528394|
|4.0|26.501597|18.516913|
|6.0|30.852756|-0.991918|
|8.0|31.147028|-14.633520|

It seems to me the 2 tasks per slot is particularly interesting, as it allows 
to go from a doubling of used resources to a usage similar to MR with a very 
low impact on the latency.

I bet that this resource usage reduction might not be limited to our use case 
...


was (Author: jcuquemelle):
I'd like to point out that when refering to "my use case", it is actually a 
wide range of job sizes, ranging from tens to thousands splits in the data 
partitioning and between 400 to 9000 seconds of wall clock time.

Maybe I didn't emphasize enough in my last answer that the figures I presented 
using schedulerBacklogTimeout needed tuning for a specific job, and that the 
resulting value of the parameter will not be optimal for other job settings. My 
proposal allows to have a sensible default over the whole range of jobs.

This is also true of the maxExecutors parameters, each job would need to be 
tuned using this parameter.

Regarding "there's nothing inherently more efficient about the scheduling 
policy you're proposing", I might add a few information: so far I have aimed at 
having the same latency as the legacy MR jobs, which yield the 6 tasksPerSlot 
figure and a 30% resource usage decrease. 

Here are the figures with other values of taskPerSlot, in terms of gain wrt the 
corresponding MR jobs: 
||taskPerCore||VcoreGain wrt MR (%)||WallTimeGain wrt MR (%)||
|1.0|-114.329984|43.777071|
|2.0|5.373456|37.610840|
|3.0|20.785099|28.528394|
|4.0|26.501597|18.516913|
|6.0|30.852756|-0.991918|
|8.0|31.147028|-14.633520|

It seems to me the 2 tasks per slot is particularly interesting, as it allows 
to go from a doubling of used resources to a usage similar to MR with a very 
low impact on the latency.

I bet that this resource usage reduction might not be limited to our use case 
...

> Allow tuning the number of dynamically allocated executors wrt task number
> --
>
> Key: SPARK-22683
> URL: https://issues.apache.org/jira/browse/SPARK-22683
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Julien Cuquemelle
>  Labels: pull-request-available
>
> let's say an executor has spark.executor.cores / spark.task.cpus taskSlots
> The current dynamic allocation policy allocates enough executors
> to have each taskSlot execute a single task, which minimizes latency, 
> but wastes resources when tasks are small regarding executor allocation
> overhead. 
> By adding the tasksPerExecutorSlot, it is made possible to specify how many 
> tasks
> a single slot should ideally execute to mitigate the overhead of executor
> allocation.
> PR: https://github.com/apache/spark/pull/19881



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22683) Allow tuning the number of dynamically allocated executors wrt task number

2017-12-11 Thread Julien Cuquemelle (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16286032#comment-16286032
 ] 

Julien Cuquemelle edited comment on SPARK-22683 at 12/11/17 3:20 PM:
-

I'd like to point out that when refering to "my use case", it is actually a 
wide range of job sizes, ranging from tens to thousands splits in the data 
partitioning and between 400 to 9000 seconds of wall clock time.

Maybe I didn't emphasize enough in my last answer that the figures I presented 
using schedulerBacklogTimeout needed tuning for a specific job, and that the 
resulting value of the parameter will not be optimal for other job settings. My 
proposal allows to have a sensible default over the whole range of jobs.

This is also true of the maxExecutors parameters, each job would need to be 
tuned using this parameter.

Regarding "there's nothing inherently more efficient about the scheduling 
policy you're proposing", I might add a few information: so far I have aimed at 
having the same latency as the legacy MR jobs, which yield the 6 tasksPerSlot 
figure and a 30% resource usage decrease. 

Here are the figures with other values of taskPerSlot, in terms of gain wrt the 
corresponding MR jobs: 
||taskPerCore||VcoreGain wrt MR (%)||WallTimeGain wrt MR (%)||
|1.0|-114.329984|43.777071|
|2.0|5.373456|37.610840|
|3.0|20.785099|28.528394|
|4.0|26.501597|18.516913|
|6.0|30.852756|-0.991918|
|8.0|31.147028|-14.633520|

It seems to me the 2 tasks per slot is particularly interesting, as it allows 
to go from a doubling of used resources to a usage similar to MR with a very 
low impact on the latency.

I bet that this resource usage reduction might not be limited to our use case 
...


was (Author: jcuquemelle):
I'd like to point out that when refering to "my use case", it is actually a 
wide range of job sizes, ranging from tens to thousands splits in the data 
partitioning and between 400 to 9000 seconds of wall clock time.

Maybe I didn't emphasize enough in my last answer that the figures I presented 
using schedulerBacklogTimeout needed tuning for a specific job, and that the 
resulting value of the parameter will not be optimal for other job settings. My 
proposal allows to have a sensible default over the whole range of jobs.

This is also true of the maxExecutors parameters, each job would need to be 
tuned using this parameter.

Regarding "there's nothing inherently more efficient about the scheduling 
policy you're proposing", I might add a few information: so far I have aimed at 
having the same latency as the legacy MR job, which yield the 6 tasksPerSlot 
figure and a 30% resource usage decrease. 

Here are the figures with other values of taskPerSlot, in terms of gain wrt the 
corresponding MR jobs: 
||taskPerCore||VcoreGain wrt MR (%)||WallTimeGain wrt MR (%)||
|1.0|-114.329984|43.777071|
|2.0|5.373456|37.610840|
|3.0|20.785099|28.528394|
|4.0|26.501597|18.516913|
|6.0|30.852756|-0.991918|
|8.0|31.147028|-14.633520|

It seems to me the 2 tasks per slot is particularly interesting, as it allows 
to go from a doubling of used resources to a usage similar to MR with a very 
low impact on the latency.

I bet that this resource usage reduction might not be limited to our use case 
...

> Allow tuning the number of dynamically allocated executors wrt task number
> --
>
> Key: SPARK-22683
> URL: https://issues.apache.org/jira/browse/SPARK-22683
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Julien Cuquemelle
>  Labels: pull-request-available
>
> let's say an executor has spark.executor.cores / spark.task.cpus taskSlots
> The current dynamic allocation policy allocates enough executors
> to have each taskSlot execute a single task, which minimizes latency, 
> but wastes resources when tasks are small regarding executor allocation
> overhead. 
> By adding the tasksPerExecutorSlot, it is made possible to specify how many 
> tasks
> a single slot should ideally execute to mitigate the overhead of executor
> allocation.
> PR: https://github.com/apache/spark/pull/19881



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22683) Allow tuning the number of dynamically allocated executors wrt task number

2017-12-11 Thread Julien Cuquemelle (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16286032#comment-16286032
 ] 

Julien Cuquemelle edited comment on SPARK-22683 at 12/11/17 3:19 PM:
-

I'd like to point out that when refering to "my use case", it is actually a 
wide range of job sizes, ranging from tens to thousands splits in the data 
partitioning and between 400 to 9000 seconds of wall clock time.

Maybe I didn't emphasize enough in my last answer that the figures I presented 
using schedulerBacklogTimeout needed tuning for a specific job, and that the 
resulting value of the parameter will not be optimal for other job settings. My 
proposal allows to have a sensible default over the whole range of jobs.

This is also true of the maxExecutors parameters, each job would need to be 
tuned using this parameter.

Regarding "there's nothing inherently more efficient about the scheduling 
policy you're proposing", I might add a few information: so far I have aimed at 
having the same latency as the legacy MR job, which yield the 6 tasksPerSlot 
figure and a 30% resource usage decrease. 

Here are the figures with other values of taskPerSlot, in terms of gain wrt the 
corresponding MR jobs: 
||taskPerCore||VcoreGain wrt MR (%)||WallTimeGain wrt MR (%)||
|1.0|-114.329984|43.777071|
|2.0|5.373456|37.610840|
|3.0|20.785099|28.528394|
|4.0|26.501597|18.516913|
|6.0|30.852756|-0.991918|
|8.0|31.147028|-14.633520|

It seems to me the 2 tasks per slot is particularly interesting, as it allows 
to go from a doubling of used resources to a usage similar to MR with a very 
low impact on the latency.

I bet that this resource usage reduction might not be limited to our use case 
...


was (Author: jcuquemelle):
I'd like to point out that when refering to "my use case", it is actually a 
wide range of job sizes, ranging from tens to thousands splits in the data 
partitioning and between 400 to 9000 seconds of wall clock time.

Maybe I didn't emphasize enough in my last answer that the figures I presented 
using schedulerBacklogTimeout needed tuning for a specific job, and that the 
resulting value of the parameter will not be optimal for other job settings. My 
proposal allows to have a sensible default over the whole range of jobs.

This is also true of the maxExecutors parameters, each job needs to be tuned 
using this parameter.

Regarding "there's nothing inherently more efficient about the scheduling 
policy you're proposing", I might add a few information: so far I have aimed at 
having the same latency as the legacy MR job, which yield the 6 tasksPerSlot 
figure and a 30% resource usage decrease. 

Here are the figures with other values of taskPerSlot, in terms of gain wrt the 
corresponding MR jobs: 
||taskPerCore||VcoreGain wrt MR (%)||WallTimeGain wrt MR (%)||
|1.0|-114.329984|43.777071|
|2.0|5.373456|37.610840|
|3.0|20.785099|28.528394|
|4.0|26.501597|18.516913|
|6.0|30.852756|-0.991918|
|8.0|31.147028|-14.633520|

It seems to me the 2 tasks per slot is particularly interesting, as it allows 
to go from a doubling of used resources to a usage similar to MR with a very 
low impact on the latency.

I bet that this resource usage reduction might not be limited to our use case 
...

> Allow tuning the number of dynamically allocated executors wrt task number
> --
>
> Key: SPARK-22683
> URL: https://issues.apache.org/jira/browse/SPARK-22683
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Julien Cuquemelle
>  Labels: pull-request-available
>
> let's say an executor has spark.executor.cores / spark.task.cpus taskSlots
> The current dynamic allocation policy allocates enough executors
> to have each taskSlot execute a single task, which minimizes latency, 
> but wastes resources when tasks are small regarding executor allocation
> overhead. 
> By adding the tasksPerExecutorSlot, it is made possible to specify how many 
> tasks
> a single slot should ideally execute to mitigate the overhead of executor
> allocation.
> PR: https://github.com/apache/spark/pull/19881



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22683) Allow tuning the number of dynamically allocated executors wrt task number

2017-12-11 Thread Julien Cuquemelle (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16286032#comment-16286032
 ] 

Julien Cuquemelle commented on SPARK-22683:
---

I'd like to point out that when refering to "my use case", it is actually a 
wide range of job sizes, ranging from tens to thousands splits in the data 
partitioning and between 400 to 9000 seconds of wall clock time.

Maybe I didn't emphasize enough in my last answer that the figures I presented 
using schedulerBacklogTimeout needed tuning for a specific job, and that the 
resulting value of the parameter will not be optimal for other job settings. My 
proposal allows to have a sensible default over the whole range of jobs.

This is also true of the maxExecutors parameters, each job needs to be tuned 
using this parameter.

Regarding "there's nothing inherently more efficient about the scheduling 
policy you're proposing", I might add a few information: so far I have aimed at 
having the same latency as the legacy MR job, which yield the 6 tasksPerSlot 
figure and a 30% resource usage decrease. 

Here are the figures with other values of taskPerSlot, in terms of gain wrt the 
corresponding MR jobs: 
||taskPerCore||VcoreGain wrt MR (%)||WallTimeGain wrt MR (%)||
|1.0|-114.329984|43.777071|
|2.0|5.373456|37.610840|
|3.0|20.785099|28.528394|
|4.0|26.501597|18.516913|
|6.0|30.852756|-0.991918|
|8.0|31.147028|-14.633520|

It seems to me the 2 tasks per slot is particularly interesting, as it allows 
to go from a doubling of used resources to a usage similar to MR with a very 
low impact on the latency.

I bet that this resource usage reduction might not be limited to our use case 
...

> Allow tuning the number of dynamically allocated executors wrt task number
> --
>
> Key: SPARK-22683
> URL: https://issues.apache.org/jira/browse/SPARK-22683
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Julien Cuquemelle
>  Labels: pull-request-available
>
> let's say an executor has spark.executor.cores / spark.task.cpus taskSlots
> The current dynamic allocation policy allocates enough executors
> to have each taskSlot execute a single task, which minimizes latency, 
> but wastes resources when tasks are small regarding executor allocation
> overhead. 
> By adding the tasksPerExecutorSlot, it is made possible to specify how many 
> tasks
> a single slot should ideally execute to mitigate the overhead of executor
> allocation.
> PR: https://github.com/apache/spark/pull/19881



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22683) Allow tuning the number of dynamically allocated executors wrt task number

2017-12-06 Thread Julien Cuquemelle (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16280548#comment-16280548
 ] 

Julien Cuquemelle edited comment on SPARK-22683 at 12/6/17 5:32 PM:


I don't understand your statement about delaying executor addition ? I want to 
cap the number of executors in an adaptive way regarding the current number of 
tasks, not delay their creation.

Doing this with dynamicAllocation.maxExecutors requires each job to be tuned 
for efficiency; when we're doing experiments, a lot of jobs are one shot, so 
they can't be fine tuned.
The proposal gives a way to have an adaptive behaviour for a family of jobs.

Regarding slowing the ramp up of executors with schedulerBacklogTimeout, I've 
made experiments to play with this parameter; I have made 2 series of 
experiments (7 jobs on each test case, average figures reported in the 
following table), one on a busy queue, and the other on an idle queue. I'll 
report only the idle queue, as the figures 
on the busy queue are even worse for the schedulerBacklogTimeout approach: 

First row is using the default 1s for the schedulerBacklogTimeout, and uses the 
6 tasks per executorSlot I've mentioned above, other rows use the default 
dynamicAllocation behaviour and only change schedulerBacklogTimeout

||SparkWallTimeSec||Spk-vCores-H||taskPerExeSlot||schedulerBacklogTimeout||
|693.571429|37.142857|6|1.0|
|584.857143|69.571429|1|30.0|
|763.428571|54.285714|1|60.0|
|826.714286|39.571429|1|90.0|

So basically I can tune the backlogTimeout to get a similar vCores-H 
consumption at the expense of almost 20% more wallClockTime, or I can tune the 
parameter to get about the same wallClockTime at the expense of about 60% more 
vcoreH consumption (very roughly extrapolated between 30 and 60 secs for 
schedulerBacklogTimeout).

It does not seem to solve the issue I'm trying to address, moreover this would 
again need to be tuned for each specific job's duration (to find the 90s 
timeout to get the similar resource consumption, I had to solve the exponential 
ramp-up with the duration of the already run job, which is not feasible in 
experimental use cases ).
The previous experiments that allowed me to find the sweet spot at 6 tasks per 
slot has involved job wallClockTimes between 400 and 9000 seconds

Another way to have a look at this new parameter I'm proposing is to have a 
simple way to tune the latency / resource consumption tradeoff. 


was (Author: jcuquemelle):
I don't understand your statement about delaying executor addition ? I want to 
cap the number of executors in an adaptive way regarding the current number of 
tasks, not delay their creation.

Doing this with dynamicAllocation.maxExecutors requires each job to be tuned 
for efficiency; when we're doing experiments, a lot of jobs are one shot, so 
they can't be fine tuned.
The proposal gives a way to have an adaptive behaviour for a family of jobs.

Regarding slowing the ramp up of executors with schedulerBacklogTimeout, I've 
made experiments to play with this parameter; I have made 2 series of 
experiments (7 jobs on each test case, average figures reported in the 
following table), one on a busy queue, and the other on an idle queue. I'll 
report only the idle queue, as the figures 
on the busy queue are even worse for the schedulerBacklogTimeout approach: 

First row is using the default 1s for the schedulerBacklogTimeout, and uses the 
6 tasks per executorSlot I've mentioned above, other rows use the default 
dynamicAllocation behaviour and only change schedulerBacklogTimeout

SparkWallTimeSecSpk-vCores-HtaskPerExeSlotschedulerBacklogTimeout

693.57142937.142857  6 1.0
584.85714369.571429  1 30.0
763.42857154.285714  1 60.0
826.71428639.571429  1 90.0

So basically I can tune the backlogTimeout to get a similar vCores-H 
consumption at the expense of almost 20% more wallClockTime, or I can tune the 
parameter to get about the same wallClockTime at the expense of about 60% more 
vcoreH consumption (very roughly extrapolated between 30 and 60 secs for 
schedulerBacklogTimeout).

It does not seem to solve the issue I'm trying to address, moreover this would 
again need to be tuned for each specific job's duration (to find the 90s 
timeout to get the similar resource consumption, I had to solve the exponential 
ramp-up with the duration of the already run job, which is not feasible in 
experimental use cases ).
The previous experiments that allowed me to find the sweet spot at 6 tasks per 
slot has involved job wallClockTimes between 400 and 9000 seconds

Another way to have a look at this new parameter I'm proposing is to have a 
simple way to tune the latency / resource 

[jira] [Commented] (SPARK-22683) Allow tuning the number of dynamically allocated executors wrt task number

2017-12-06 Thread Julien Cuquemelle (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16280548#comment-16280548
 ] 

Julien Cuquemelle commented on SPARK-22683:
---

I don't understand your statement about delaying executor addition ? I want to 
cap the number of executors in an adaptive way regarding the current number of 
tasks, not delay their creation.

Doing this with dynamicAllocation.maxExecutors requires each job to be tuned 
for efficiency; when we're doing experiments, a lot of jobs are one shot, so 
they can't be fine tuned.
The proposal gives a way to have an adaptive behaviour for a family of jobs.

Regarding slowing the ramp up of executors with schedulerBacklogTimeout, I've 
made experiments to play with this parameter; I have made 2 series of 
experiments (7 jobs on each test case, average figures reported in the 
following table), one on a busy queue, and the other on an idle queue. I'll 
report only the idle queue, as the figures 
on the busy queue are even worse for the schedulerBacklogTimeout approach: 

First row is using the default 1s for the schedulerBacklogTimeout, and uses the 
6 tasks per executorSlot I've mentioned above, other rows use the default 
dynamicAllocation behaviour and only change schedulerBacklogTimeout

SparkWallTimeSecSpk-vCores-HtaskPerExeSlotschedulerBacklogTimeout

693.57142937.142857  6 1.0
584.85714369.571429  1 30.0
763.42857154.285714  1 60.0
826.71428639.571429  1 90.0

So basically I can tune the backlogTimeout to get a similar vCores-H 
consumption at the expense of almost 20% more wallClockTime, or I can tune the 
parameter to get about the same wallClockTime at the expense of about 60% more 
vcoreH consumption (very roughly extrapolated between 30 and 60 secs for 
schedulerBacklogTimeout).

It does not seem to solve the issue I'm trying to address, moreover this would 
again need to be tuned for each specific job's duration (to find the 90s 
timeout to get the similar resource consumption, I had to solve the exponential 
ramp-up with the duration of the already run job, which is not feasible in 
experimental use cases ).
The previous experiments that allowed me to find the sweet spot at 6 tasks per 
slot has involved job wallClockTimes between 400 and 9000 seconds

Another way to have a look at this new parameter I'm proposing is to have a 
simple way to tune the latency / resource consumption tradeoff. 

> Allow tuning the number of dynamically allocated executors wrt task number
> --
>
> Key: SPARK-22683
> URL: https://issues.apache.org/jira/browse/SPARK-22683
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Julien Cuquemelle
>  Labels: pull-request-available
>
> let's say an executor has spark.executor.cores / spark.task.cpus taskSlots
> The current dynamic allocation policy allocates enough executors
> to have each taskSlot execute a single task, which minimizes latency, 
> but wastes resources when tasks are small regarding executor allocation
> overhead. 
> By adding the tasksPerExecutorSlot, it is made possible to specify how many 
> tasks
> a single slot should ideally execute to mitigate the overhead of executor
> allocation.
> PR: https://github.com/apache/spark/pull/19881



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22683) Allow tuning the number of dynamically allocated executors wrt task number

2017-12-05 Thread Julien Cuquemelle (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278405#comment-16278405
 ] 

Julien Cuquemelle edited comment on SPARK-22683 at 12/5/17 1:09 PM:


Thanks [~srowen] for your quick feedback, let me give you more context / 
explanation about what I'm trying to achieve here. 

- I'm not trying to mitigate the overhead of launching small tasks, I'm trying 
to avoid allocating executors that will not / barely be used.
As spark tuning guideline usually aim towards having small tasks, it seems 
sub-optimal to allocate a large number of executors at application start (or at 
any stage that would create much more tasks than available executor slots), 
that end up being discarded without having been used.

- I've scoped this parameter to dynamic allocation, as it only impacts the way 
the executor count ramps up when tasks are backlogged. The default value of the 
parameter falls back to the current behavior.

- I do agree that over-allocated containers will eventually be deallocated, but 
this resource occupation by idle executors appears to be
non trivial on some of our workloads: 
I've made several experiments on a set of similar jobs working on different 
data sizes, and running on an idle or busy cluster;
The resource usage (vcore-hours allocated as seen by Yarn) of the spark 
jobs is compared to the same job running in MapReduce;
The executor idle timeout is set to 30s;
When running with the current setting (1 task per executor slot), our Spark 
jobs consume in average twice the amount of vcorehours than the MR job
When running with 6 tasks per executor slot, our Spark jobs consume in 
average 30% less vcorehours than the MR jobs, this setting being valid for 
different workload sizes.



was (Author: jcuquemelle):
Thanks [~srowen] for your quick feedback, let me give you more context / 
explanation about what I'm trying to achieve here. 

- I'm not trying to mitigate the overhead of launching small tasks, I'm trying 
to avoid allocating executors that will not / barely be used.
As spark tuning guideline usually aim towards having small tasks, it seems 
sub-optimal to allocate a large number of executors at application start
(or at any stage that would create much more tasks than available executor 
slots), that end up being discarded without having been used.

- I've scoped this parameter to dynamic allocation, as it only impacts the way 
the executor count ramps up when tasks are backlogged. The default value of the 
parameter falls back to the current behavior.

- I do agree that over-allocated containers will eventually be deallocated, but 
this resource occupation by idle executors appears to be
non trivial on some of our workloads: 
I've made several experiments on a set of similar jobs working on different 
data sizes, and running on an idle or busy cluster;
The resource usage (vcore-hours allocated as seen by Yarn) of the spark 
jobs is compared to the same job running in MapReduce;
The executor idle timeout is set to 30s;
When running with the current setting (1 task per executor slot), our Spark 
jobs consume in average twice the amount of vcorehours than the MR job
When running with 6 tasks per executor slot, our Spark jobs consume in 
average 30% less vcorehours than the MR jobs, this setting being valid for 
different workload sizes.


> Allow tuning the number of dynamically allocated executors wrt task number
> --
>
> Key: SPARK-22683
> URL: https://issues.apache.org/jira/browse/SPARK-22683
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Julien Cuquemelle
>  Labels: pull-request-available
>
> let's say an executor has spark.executor.cores / spark.task.cpus taskSlots
> The current dynamic allocation policy allocates enough executors
> to have each taskSlot execute a single task, which minimizes latency, 
> but wastes resources when tasks are small regarding executor allocation
> overhead. 
> By adding the tasksPerExecutorSlot, it is made possible to specify how many 
> tasks
> a single slot should ideally execute to mitigate the overhead of executor
> allocation.
> PR: https://github.com/apache/spark/pull/19881



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22683) Allow tuning the number of dynamically allocated executors wrt task number

2017-12-05 Thread Julien Cuquemelle (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278405#comment-16278405
 ] 

Julien Cuquemelle commented on SPARK-22683:
---

Thanks [~srowen] for your quick feedback, let me give you more context / 
explanation about what I'm trying to achieve here. 

- I'm not trying to mitigate the overhead of launching small tasks, I'm trying 
to avoid allocating executors that will not / barely be used.
As spark tuning guideline usually aim towards having small tasks, it seems 
sub-optimal to allocate a large number of executors at application start
(or at any stage that would create much more tasks than available executor 
slots), that end up being discarded without having been used.

- I've scoped this parameter to dynamic allocation, as it only impacts the way 
the executor count ramps up when tasks are backlogged. The default value of the 
parameter falls back to the current behavior.

- I do agree that over-allocated containers will eventually be deallocated, but 
this resource occupation by idle executors appears to be
non trivial on some of our workloads: 
I've made several experiments on a set of similar jobs working on different 
data sizes, and running on an idle or busy cluster;
The resource usage (vcore-hours allocated as seen by Yarn) of the spark 
jobs is compared to the same job running in MapReduce;
The executor idle timeout is set to 30s;
When running with the current setting (1 task per executor slot), our Spark 
jobs consume in average twice the amount of vcorehours than the MR job
When running with 6 tasks per executor slot, our Spark jobs consume in 
average 30% less vcorehours than the MR jobs, this setting being valid for 
different workload sizes.


> Allow tuning the number of dynamically allocated executors wrt task number
> --
>
> Key: SPARK-22683
> URL: https://issues.apache.org/jira/browse/SPARK-22683
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Julien Cuquemelle
>  Labels: pull-request-available
>
> let's say an executor has spark.executor.cores / spark.task.cpus taskSlots
> The current dynamic allocation policy allocates enough executors
> to have each taskSlot execute a single task, which minimizes latency, 
> but wastes resources when tasks are small regarding executor allocation
> overhead. 
> By adding the tasksPerExecutorSlot, it is made possible to specify how many 
> tasks
> a single slot should ideally execute to mitigate the overhead of executor
> allocation.
> PR: https://github.com/apache/spark/pull/19881



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22683) Allow tuning the number of dynamically allocated executors wrt task number

2017-12-05 Thread Julien Cuquemelle (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278405#comment-16278405
 ] 

Julien Cuquemelle edited comment on SPARK-22683 at 12/5/17 11:06 AM:
-

Thanks [~srowen] for your quick feedback, let me give you more context / 
explanation about what I'm trying to achieve here. 

- I'm not trying to mitigate the overhead of launching small tasks, I'm trying 
to avoid allocating executors that will not / barely be used.
As spark tuning guideline usually aim towards having small tasks, it seems 
sub-optimal to allocate a large number of executors at application start
(or at any stage that would create much more tasks than available executor 
slots), that end up being discarded without having been used.

- I've scoped this parameter to dynamic allocation, as it only impacts the way 
the executor count ramps up when tasks are backlogged. The default value of the 
parameter falls back to the current behavior.

- I do agree that over-allocated containers will eventually be deallocated, but 
this resource occupation by idle executors appears to be
non trivial on some of our workloads: 
I've made several experiments on a set of similar jobs working on different 
data sizes, and running on an idle or busy cluster;
The resource usage (vcore-hours allocated as seen by Yarn) of the spark 
jobs is compared to the same job running in MapReduce;
The executor idle timeout is set to 30s;
When running with the current setting (1 task per executor slot), our Spark 
jobs consume in average twice the amount of vcorehours than the MR job
When running with 6 tasks per executor slot, our Spark jobs consume in 
average 30% less vcorehours than the MR jobs, this setting being valid for 
different workload sizes.



was (Author: jcuquemelle):
Thanks [~srowen] for your quick feedback, let me give you more context / 
explanation about what I'm trying to achieve here. 

- I'm not trying to mitigate the overhead of launching small tasks, I'm trying 
to avoid allocating executors that will not / barely be used.
As spark tuning guideline usually aim towards having small tasks, it seems 
sub-optimal to allocate a large number of executors at application start
(or at any stage that would create much more tasks than available executor 
slots), that end up being discarded without having been used.

- I've scoped this parameter to dynamic allocation, as it only impacts the way 
the executor count ramps up when tasks are backlogged. The default value of the 
parameter falls back to the current behavior.

- I do agree that over-allocated containers will eventually be deallocated, but 
this resource occupation by idle executors appears to be
non trivial on some of our workloads: 
I've made several experiments on a set of similar jobs working on different 
data sizes, and running on an idle or busy cluster;
The resource usage (vcore-hours allocated as seen by Yarn) of the spark 
jobs is compared to the same job running in MapReduce;
The executor idle timeout is set to 30s;
When running with the current setting (1 task per executor slot), our Spark 
jobs consume in average twice the amount of vcorehours than the MR job
When running with 6 tasks per executor slot, our Spark jobs consume in 
average 30% less vcorehours than the MR jobs, this setting being valid for 
different workload sizes.


> Allow tuning the number of dynamically allocated executors wrt task number
> --
>
> Key: SPARK-22683
> URL: https://issues.apache.org/jira/browse/SPARK-22683
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Julien Cuquemelle
>  Labels: pull-request-available
>
> let's say an executor has spark.executor.cores / spark.task.cpus taskSlots
> The current dynamic allocation policy allocates enough executors
> to have each taskSlot execute a single task, which minimizes latency, 
> but wastes resources when tasks are small regarding executor allocation
> overhead. 
> By adding the tasksPerExecutorSlot, it is made possible to specify how many 
> tasks
> a single slot should ideally execute to mitigate the overhead of executor
> allocation.
> PR: https://github.com/apache/spark/pull/19881



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22683) Allow tuning the number of dynamically allocated executors wrt task number

2017-12-04 Thread Julien Cuquemelle (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Julien Cuquemelle updated SPARK-22683:
--
 Labels: pull-request-available  (was: )
Description: 
let's say an executor has spark.executor.cores / spark.task.cpus taskSlots

The current dynamic allocation policy allocates enough executors
to have each taskSlot execute a single task, which minimizes latency, 
but wastes resources when tasks are small regarding executor allocation
overhead. 

By adding the tasksPerExecutorSlot, it is made possible to specify how many 
tasks
a single slot should ideally execute to mitigate the overhead of executor
allocation.

PR: https://github.com/apache/spark/pull/19881

  was:
let's say an executor has spark.executor.cores / spark.task.cpus taskSlots

The current dynamic allocation policy allocates enough executors
to have each taskSlot execute a single task, which minimizes latency, 
but wastes resources when tasks are small regarding executor allocation
overhead. 

By adding the tasksPerExecutorSlot, it is made possible to specify how many 
tasks
a single slot should ideally execute to mitigate the overhead of executor
allocation.


> Allow tuning the number of dynamically allocated executors wrt task number
> --
>
> Key: SPARK-22683
> URL: https://issues.apache.org/jira/browse/SPARK-22683
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Julien Cuquemelle
>  Labels: pull-request-available
>
> let's say an executor has spark.executor.cores / spark.task.cpus taskSlots
> The current dynamic allocation policy allocates enough executors
> to have each taskSlot execute a single task, which minimizes latency, 
> but wastes resources when tasks are small regarding executor allocation
> overhead. 
> By adding the tasksPerExecutorSlot, it is made possible to specify how many 
> tasks
> a single slot should ideally execute to mitigate the overhead of executor
> allocation.
> PR: https://github.com/apache/spark/pull/19881



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22683) Allow tuning the number of dynamically allocated executors wrt task number

2017-12-04 Thread Julien Cuquemelle (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Julien Cuquemelle updated SPARK-22683:
--
Priority: Major  (was: Minor)

> Allow tuning the number of dynamically allocated executors wrt task number
> --
>
> Key: SPARK-22683
> URL: https://issues.apache.org/jira/browse/SPARK-22683
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Julien Cuquemelle
>
> let's say an executor has spark.executor.cores / spark.task.cpus taskSlots
> The current dynamic allocation policy allocates enough executors
> to have each taskSlot execute a single task, which minimizes latency, 
> but wastes resources when tasks are small regarding executor allocation
> overhead. 
> By adding the tasksPerExecutorSlot, it is made possible to specify how many 
> tasks
> a single slot should ideally execute to mitigate the overhead of executor
> allocation.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22683) Allow tuning the number of dynamically allocated executors wrt task number

2017-12-04 Thread Julien Cuquemelle (JIRA)
Julien Cuquemelle created SPARK-22683:
-

 Summary: Allow tuning the number of dynamically allocated 
executors wrt task number
 Key: SPARK-22683
 URL: https://issues.apache.org/jira/browse/SPARK-22683
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.2.0, 2.1.0
Reporter: Julien Cuquemelle
Priority: Minor


let's say an executor has spark.executor.cores / spark.task.cpus taskSlots

The current dynamic allocation policy allocates enough executors
to have each taskSlot execute a single task, which minimizes latency, 
but wastes resources when tasks are small regarding executor allocation
overhead. 

By adding the tasksPerExecutorSlot, it is made possible to specify how many 
tasks
a single slot should ideally execute to mitigate the overhead of executor
allocation.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org