[jira] [Commented] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16450186#comment-16450186 ] Julien Cuquemelle commented on SPARK-22683: --- Thanks for all your comments and proposals :) > DynamicAllocation wastes resources by allocating containers that will barely > be used > > > Key: SPARK-22683 > URL: https://issues.apache.org/jira/browse/SPARK-22683 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Julien Cuquemelle >Assignee: Julien Cuquemelle >Priority: Major > Labels: pull-request-available > Fix For: 2.4.0 > > > While migrating a series of jobs from MR to Spark using dynamicAllocation, > I've noticed almost a doubling (+114% exactly) of resource consumption of > Spark w.r.t MR, for a wall clock time gain of 43% > About the context: > - resource usage stands for vcore-hours allocation for the whole job, as seen > by YARN > - I'm talking about a series of jobs because we provide our users with a way > to define experiments (via UI / DSL) that automatically get translated to > Spark / MR jobs and submitted on the cluster > - we submit around 500 of such jobs each day > - these jobs are usually one shot, and the amount of processing can vary a > lot between jobs, and as such finding an efficient number of executors for > each job is difficult to get right, which is the reason I took the path of > dynamic allocation. > - Some of the tests have been scheduled on an idle queue, some on a full > queue. > - experiments have been conducted with spark.executor-cores = 5 and 10, only > results for 5 cores have been reported because efficiency was overall better > than with 10 cores > - the figures I give are averaged over a representative sample of those jobs > (about 600 jobs) ranging from tens to thousands splits in the data > partitioning and between 400 to 9000 seconds of wall clock time. > - executor idle timeout is set to 30s; > > Definition: > - let's say an executor has spark.executor.cores / spark.task.cpus taskSlots, > which represent the max number of tasks an executor will process in parallel. > - the current behaviour of the dynamic allocation is to allocate enough > containers to have one taskSlot per task, which minimizes latency, but wastes > resources when tasks are small regarding executor allocation and idling > overhead. > The results using the proposal (described below) over the job sample (600 > jobs): > - by using 2 tasks per taskSlot, we get a 5% (against -114%) reduction in > resource usage, for a 37% (against 43%) reduction in wall clock time for > Spark w.r.t MR > - by trying to minimize the average resource consumption, I ended up with 6 > tasks per core, with a 30% resource usage reduction, for a similar wall clock > time w.r.t. MR > What did I try to solve the issue with existing parameters (summing up a few > points mentioned in the comments) ? > - change dynamicAllocation.maxExecutors: this would need to be adapted for > each job (tens to thousands splits can occur), and essentially remove the > interest of using the dynamic allocation. > - use dynamicAllocation.backlogTimeout: > - setting this parameter right to avoid creating unused executors is very > dependant on wall clock time. One basically needs to solve the exponential > ramp up for the target time. So this is not an option for my use case where I > don't want a per-job tuning. > - I've still done a series of experiments, details in the comments. > Result is that after manual tuning, the best I could get was a similar > resource consumption at the expense of 20% more wall clock time, or a similar > wall clock time at the expense of 60% more resource consumption than what I > got using my proposal @ 6 tasks per slot (this value being optimized over a > much larger range of jobs as already stated) > - as mentioned in another comment, tampering with the exponential ramp up > might yield task imbalance and such old executors could become contention > points for other exes trying to remotely access blocks in the old exes (not > witnessed in the jobs I'm talking about, but we did see this behavior in > other jobs) > Proposal: > Simply add a tasksPerExecutorSlot parameter, which makes it possible to > specify how many tasks a single taskSlot should ideally execute to mitigate > the overhead of executor allocation. > PR: https://github.com/apache/spark/pull/19881 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail:
[jira] [Commented] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16395364#comment-16395364 ] Julien Cuquemelle commented on SPARK-22683: --- PR updated, including [~xuefuz]'s proposal > DynamicAllocation wastes resources by allocating containers that will barely > be used > > > Key: SPARK-22683 > URL: https://issues.apache.org/jira/browse/SPARK-22683 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Julien Cuquemelle >Priority: Major > Labels: pull-request-available > > While migrating a series of jobs from MR to Spark using dynamicAllocation, > I've noticed almost a doubling (+114% exactly) of resource consumption of > Spark w.r.t MR, for a wall clock time gain of 43% > About the context: > - resource usage stands for vcore-hours allocation for the whole job, as seen > by YARN > - I'm talking about a series of jobs because we provide our users with a way > to define experiments (via UI / DSL) that automatically get translated to > Spark / MR jobs and submitted on the cluster > - we submit around 500 of such jobs each day > - these jobs are usually one shot, and the amount of processing can vary a > lot between jobs, and as such finding an efficient number of executors for > each job is difficult to get right, which is the reason I took the path of > dynamic allocation. > - Some of the tests have been scheduled on an idle queue, some on a full > queue. > - experiments have been conducted with spark.executor-cores = 5 and 10, only > results for 5 cores have been reported because efficiency was overall better > than with 10 cores > - the figures I give are averaged over a representative sample of those jobs > (about 600 jobs) ranging from tens to thousands splits in the data > partitioning and between 400 to 9000 seconds of wall clock time. > - executor idle timeout is set to 30s; > > Definition: > - let's say an executor has spark.executor.cores / spark.task.cpus taskSlots, > which represent the max number of tasks an executor will process in parallel. > - the current behaviour of the dynamic allocation is to allocate enough > containers to have one taskSlot per task, which minimizes latency, but wastes > resources when tasks are small regarding executor allocation and idling > overhead. > The results using the proposal (described below) over the job sample (600 > jobs): > - by using 2 tasks per taskSlot, we get a 5% (against -114%) reduction in > resource usage, for a 37% (against 43%) reduction in wall clock time for > Spark w.r.t MR > - by trying to minimize the average resource consumption, I ended up with 6 > tasks per core, with a 30% resource usage reduction, for a similar wall clock > time w.r.t. MR > What did I try to solve the issue with existing parameters (summing up a few > points mentioned in the comments) ? > - change dynamicAllocation.maxExecutors: this would need to be adapted for > each job (tens to thousands splits can occur), and essentially remove the > interest of using the dynamic allocation. > - use dynamicAllocation.backlogTimeout: > - setting this parameter right to avoid creating unused executors is very > dependant on wall clock time. One basically needs to solve the exponential > ramp up for the target time. So this is not an option for my use case where I > don't want a per-job tuning. > - I've still done a series of experiments, details in the comments. > Result is that after manual tuning, the best I could get was a similar > resource consumption at the expense of 20% more wall clock time, or a similar > wall clock time at the expense of 60% more resource consumption than what I > got using my proposal @ 6 tasks per slot (this value being optimized over a > much larger range of jobs as already stated) > - as mentioned in another comment, tampering with the exponential ramp up > might yield task imbalance and such old executors could become contention > points for other exes trying to remotely access blocks in the old exes (not > witnessed in the jobs I'm talking about, but we did see this behavior in > other jobs) > Proposal: > Simply add a tasksPerExecutorSlot parameter, which makes it possible to > specify how many tasks a single taskSlot should ideally execute to mitigate > the overhead of executor allocation. > PR: https://github.com/apache/spark/pull/19881 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16387947#comment-16387947 ] Julien Cuquemelle commented on SPARK-22683: --- Yes, I have time. I was waiting for suggestions for the parameter name. how about spark.dynamicAllocation.fullParallelismDivisor (if we agree that parameter could be a double) ? > DynamicAllocation wastes resources by allocating containers that will barely > be used > > > Key: SPARK-22683 > URL: https://issues.apache.org/jira/browse/SPARK-22683 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Julien Cuquemelle >Priority: Major > Labels: pull-request-available > > While migrating a series of jobs from MR to Spark using dynamicAllocation, > I've noticed almost a doubling (+114% exactly) of resource consumption of > Spark w.r.t MR, for a wall clock time gain of 43% > About the context: > - resource usage stands for vcore-hours allocation for the whole job, as seen > by YARN > - I'm talking about a series of jobs because we provide our users with a way > to define experiments (via UI / DSL) that automatically get translated to > Spark / MR jobs and submitted on the cluster > - we submit around 500 of such jobs each day > - these jobs are usually one shot, and the amount of processing can vary a > lot between jobs, and as such finding an efficient number of executors for > each job is difficult to get right, which is the reason I took the path of > dynamic allocation. > - Some of the tests have been scheduled on an idle queue, some on a full > queue. > - experiments have been conducted with spark.executor-cores = 5 and 10, only > results for 5 cores have been reported because efficiency was overall better > than with 10 cores > - the figures I give are averaged over a representative sample of those jobs > (about 600 jobs) ranging from tens to thousands splits in the data > partitioning and between 400 to 9000 seconds of wall clock time. > - executor idle timeout is set to 30s; > > Definition: > - let's say an executor has spark.executor.cores / spark.task.cpus taskSlots, > which represent the max number of tasks an executor will process in parallel. > - the current behaviour of the dynamic allocation is to allocate enough > containers to have one taskSlot per task, which minimizes latency, but wastes > resources when tasks are small regarding executor allocation and idling > overhead. > The results using the proposal (described below) over the job sample (600 > jobs): > - by using 2 tasks per taskSlot, we get a 5% (against -114%) reduction in > resource usage, for a 37% (against 43%) reduction in wall clock time for > Spark w.r.t MR > - by trying to minimize the average resource consumption, I ended up with 6 > tasks per core, with a 30% resource usage reduction, for a similar wall clock > time w.r.t. MR > What did I try to solve the issue with existing parameters (summing up a few > points mentioned in the comments) ? > - change dynamicAllocation.maxExecutors: this would need to be adapted for > each job (tens to thousands splits can occur), and essentially remove the > interest of using the dynamic allocation. > - use dynamicAllocation.backlogTimeout: > - setting this parameter right to avoid creating unused executors is very > dependant on wall clock time. One basically needs to solve the exponential > ramp up for the target time. So this is not an option for my use case where I > don't want a per-job tuning. > - I've still done a series of experiments, details in the comments. > Result is that after manual tuning, the best I could get was a similar > resource consumption at the expense of 20% more wall clock time, or a similar > wall clock time at the expense of 60% more resource consumption than what I > got using my proposal @ 6 tasks per slot (this value being optimized over a > much larger range of jobs as already stated) > - as mentioned in another comment, tampering with the exponential ramp up > might yield task imbalance and such old executors could become contention > points for other exes trying to remotely access blocks in the old exes (not > witnessed in the jobs I'm talking about, but we did see this behavior in > other jobs) > Proposal: > Simply add a tasksPerExecutorSlot parameter, which makes it possible to > specify how many tasks a single taskSlot should ideally execute to mitigate > the overhead of executor allocation. > PR: https://github.com/apache/spark/pull/19881 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail:
[jira] [Comment Edited] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356887#comment-16356887 ] Julien Cuquemelle edited comment on SPARK-22683 at 2/8/18 12:38 PM: Thanks a lot for your feedbacks, some clarification: - the default remains 1, it means resource is not an issue and you want to minimize latency. - if you want to be resource-aware, use 2 which brings the easiest resource saving setting (which seems to be already very interesting) - if you want to maximize resource saving you need to measure resource consumption with higher values of the parameter. regarding the multi-job use case, I do agree that this will not be optimal (but it will not be less optimal than today's state), and that we need a per-job configuration. We have a use-case where we had to split an application with 2 jobs into 2 applications, because the optimal tuning of the executors was very different. A per-job config seems actually interesting, but it is not limited to this new parameter, but should allow tuning the whole config. If that is ever implemented, the question of resource saving with the dynamic allocation still remains for each job, so this new parameter will still be useful. Regarding the possibility to set the parameter programmatically, right now the number of tasks per executors is computed during the starting of the AllocationManager, which happens during the SparkContext initialization, so it is not possible to update it. But it does not seem difficult to make it mutable so that the number of needed executors computation takes it into account: {code:java} /** * The maximum number of executors we would need under the current load to satisfy all running * and pending tasks, rounded up. */ private def maxNumExecutorsNeeded(): Int = { val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks (numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor } {code} was (Author: jcuquemelle): Thanks a lot for your feedbacks, some clarification: - the default remains 1, it means resource is not an issue and you want to minimize latency. - if you want to be resource-aware, use 2 which brings the easiest resource saving setting (which seems to be already very interesting) - if you want to maximize resource saving you need to measure resource consumption with higher values of the parameter. regarding the multi-job use case, I do agree that this will not be optimal (but it will not be less optimal than today's state), and that we need a per-job configuration. We have a use-case where we had to split an application with 2 jobs into 2 applications, because the optimal tuning of the executors was very different. But if that happens, the question of resource saving with the dynamic allocation still remains for each job, so this new parameter will still be useful. Regarding the possibility to set it programmatically, right now the number of tasks per executors is computed during the starting of the AllocationManager, which happens during the SparkContext initialization, so it is not possible to update it. But it does not seem difficult to make it mutable so that the number of needed executors computation takes it into account: {code:java} /** * The maximum number of executors we would need under the current load to satisfy all running * and pending tasks, rounded up. */ private def maxNumExecutorsNeeded(): Int = { val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks (numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor } {code} > DynamicAllocation wastes resources by allocating containers that will barely > be used > > > Key: SPARK-22683 > URL: https://issues.apache.org/jira/browse/SPARK-22683 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Julien Cuquemelle >Priority: Major > Labels: pull-request-available > > While migrating a series of jobs from MR to Spark using dynamicAllocation, > I've noticed almost a doubling (+114% exactly) of resource consumption of > Spark w.r.t MR, for a wall clock time gain of 43% > About the context: > - resource usage stands for vcore-hours allocation for the whole job, as seen > by YARN > - I'm talking about a series of jobs because we provide our users with a way > to define experiments (via UI / DSL) that automatically get translated to > Spark / MR jobs and submitted on the cluster > - we submit around 500 of such jobs each day > - these jobs are usually one shot, and the amount of processing can vary a > lot between jobs, and as such finding an efficient number of
[jira] [Commented] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356887#comment-16356887 ] Julien Cuquemelle commented on SPARK-22683: --- Thanks a lot for your feedbacks, some clarification: - the default remains 1, it means resource is not an issue and you want to minimize latency. - if you want to be resource-aware, use 2 which brings the easiest resource saving setting (which seems to be already very interesting) - if you want to maximize resource saving you need to measure resource consumption with higher values of the parameter. regarding the multi-job use case, I do agree that this will not be optimal (but it will not be less optimal than today's state), and that we need a per-job configuration. We have a use-case where we had to split an application with 2 jobs into 2 applications, because the optimal tuning of the executors was very different. But if that happens, the question of resource saving with the dynamic allocation still remains for each job, so this new parameter will still be useful. Regarding the possibility to set it programmatically, right now the number of tasks per executors is computed during the starting of the AllocationManager, which happens during the SparkContext initialization, so it is not possible to update it. But it does not seem difficult to make it mutable so that the number of needed executors computation takes it into account: {code:java} /** * The maximum number of executors we would need under the current load to satisfy all running * and pending tasks, rounded up. */ private def maxNumExecutorsNeeded(): Int = { val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks (numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor } {code} > DynamicAllocation wastes resources by allocating containers that will barely > be used > > > Key: SPARK-22683 > URL: https://issues.apache.org/jira/browse/SPARK-22683 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Julien Cuquemelle >Priority: Major > Labels: pull-request-available > > While migrating a series of jobs from MR to Spark using dynamicAllocation, > I've noticed almost a doubling (+114% exactly) of resource consumption of > Spark w.r.t MR, for a wall clock time gain of 43% > About the context: > - resource usage stands for vcore-hours allocation for the whole job, as seen > by YARN > - I'm talking about a series of jobs because we provide our users with a way > to define experiments (via UI / DSL) that automatically get translated to > Spark / MR jobs and submitted on the cluster > - we submit around 500 of such jobs each day > - these jobs are usually one shot, and the amount of processing can vary a > lot between jobs, and as such finding an efficient number of executors for > each job is difficult to get right, which is the reason I took the path of > dynamic allocation. > - Some of the tests have been scheduled on an idle queue, some on a full > queue. > - experiments have been conducted with spark.executor-cores = 5 and 10, only > results for 5 cores have been reported because efficiency was overall better > than with 10 cores > - the figures I give are averaged over a representative sample of those jobs > (about 600 jobs) ranging from tens to thousands splits in the data > partitioning and between 400 to 9000 seconds of wall clock time. > - executor idle timeout is set to 30s; > > Definition: > - let's say an executor has spark.executor.cores / spark.task.cpus taskSlots, > which represent the max number of tasks an executor will process in parallel. > - the current behaviour of the dynamic allocation is to allocate enough > containers to have one taskSlot per task, which minimizes latency, but wastes > resources when tasks are small regarding executor allocation and idling > overhead. > The results using the proposal (described below) over the job sample (600 > jobs): > - by using 2 tasks per taskSlot, we get a 5% (against -114%) reduction in > resource usage, for a 37% (against 43%) reduction in wall clock time for > Spark w.r.t MR > - by trying to minimize the average resource consumption, I ended up with 6 > tasks per core, with a 30% resource usage reduction, for a similar wall clock > time w.r.t. MR > What did I try to solve the issue with existing parameters (summing up a few > points mentioned in the comments) ? > - change dynamicAllocation.maxExecutors: this would need to be adapted for > each job (tens to thousands splits can occur), and essentially remove the > interest of using the dynamic allocation. > - use dynamicAllocation.backlogTimeout: >
[jira] [Commented] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353953#comment-16353953 ] Julien Cuquemelle commented on SPARK-22683: --- [~tgraves]: the issue appears at each stage boundary, as for each beginning stage we might request a too large number of executors. Regarding the end of a single stage job, usually the over-allocated executors still consume resources even if they are released before the (30s) timeout at the end of the job, because we usually have even a single executor running a while longer than the others that will hold the unused execs to live 'till the end of the job. Releasing executors faster would probably reduce the overhead, but in that case it would add overhead for multistage jobs that would have to re-spawn more executors at each stage. The metric I don't know is how long does a container live if we request it and release it immediately once it's available. The idea about something similar to speculative execution seems interesting, but in that case we will still have a latency impact, as we will need to wait for a significant number of tasks to be finished to compute running time stats and decide whether we need more executors or not. If the tasks are very short, the metric is quickly available but then we don't request more executors, so we get a similar effect than using 2 or more tasks per taskSlot. If the tasks are long, we will need to wait for a first batch of tasks and then compute the correct number of executors to compute the rest of tasks. In that case the latency would not be better than roughly twice the time to run a single task, which is exactly what we get with 2 tasks per taskSlot. Another view would be to start upfront with n>1 tasks per taskSlot, and gather stats about the time needed to spawn the required executors, then if the running tasks are significantly longer than executor allocation time, request new executors (and in that case we don't need to wait until completion of the tasks). > DynamicAllocation wastes resources by allocating containers that will barely > be used > > > Key: SPARK-22683 > URL: https://issues.apache.org/jira/browse/SPARK-22683 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Julien Cuquemelle >Priority: Major > Labels: pull-request-available > > While migrating a series of jobs from MR to Spark using dynamicAllocation, > I've noticed almost a doubling (+114% exactly) of resource consumption of > Spark w.r.t MR, for a wall clock time gain of 43% > About the context: > - resource usage stands for vcore-hours allocation for the whole job, as seen > by YARN > - I'm talking about a series of jobs because we provide our users with a way > to define experiments (via UI / DSL) that automatically get translated to > Spark / MR jobs and submitted on the cluster > - we submit around 500 of such jobs each day > - these jobs are usually one shot, and the amount of processing can vary a > lot between jobs, and as such finding an efficient number of executors for > each job is difficult to get right, which is the reason I took the path of > dynamic allocation. > - Some of the tests have been scheduled on an idle queue, some on a full > queue. > - experiments have been conducted with spark.executor-cores = 5 and 10, only > results for 5 cores have been reported because efficiency was overall better > than with 10 cores > - the figures I give are averaged over a representative sample of those jobs > (about 600 jobs) ranging from tens to thousands splits in the data > partitioning and between 400 to 9000 seconds of wall clock time. > - executor idle timeout is set to 30s; > > Definition: > - let's say an executor has spark.executor.cores / spark.task.cpus taskSlots, > which represent the max number of tasks an executor will process in parallel. > - the current behaviour of the dynamic allocation is to allocate enough > containers to have one taskSlot per task, which minimizes latency, but wastes > resources when tasks are small regarding executor allocation and idling > overhead. > The results using the proposal (described below) over the job sample (600 > jobs): > - by using 2 tasks per taskSlot, we get a 5% (against -114%) reduction in > resource usage, for a 37% (against 43%) reduction in wall clock time for > Spark w.r.t MR > - by trying to minimize the average resource consumption, I ended up with 6 > tasks per core, with a 30% resource usage reduction, for a similar wall clock > time w.r.t. MR > What did I try to solve the issue with existing parameters (summing up a few > points mentioned in the comments) ? > - change
[jira] [Commented] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16303869#comment-16303869 ] Julien Cuquemelle commented on SPARK-22683: --- [~sandyr], [~andrewor14], could we have your feedback on this proposal, as contributors of the dynamic allocation policy ? thanks in advance. > DynamicAllocation wastes resources by allocating containers that will barely > be used > > > Key: SPARK-22683 > URL: https://issues.apache.org/jira/browse/SPARK-22683 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Julien Cuquemelle > Labels: pull-request-available > > While migrating a series of jobs from MR to Spark using dynamicAllocation, > I've noticed almost a doubling (+114% exactly) of resource consumption of > Spark w.r.t MR, for a wall clock time gain of 43% > About the context: > - resource usage stands for vcore-hours allocation for the whole job, as seen > by YARN > - I'm talking about a series of jobs because we provide our users with a way > to define experiments (via UI / DSL) that automatically get translated to > Spark / MR jobs and submitted on the cluster > - we submit around 500 of such jobs each day > - these jobs are usually one shot, and the amount of processing can vary a > lot between jobs, and as such finding an efficient number of executors for > each job is difficult to get right, which is the reason I took the path of > dynamic allocation. > - Some of the tests have been scheduled on an idle queue, some on a full > queue. > - experiments have been conducted with spark.executor-cores = 5 and 10, only > results for 5 cores have been reported because efficiency was overall better > than with 10 cores > - the figures I give are averaged over a representative sample of those jobs > (about 600 jobs) ranging from tens to thousands splits in the data > partitioning and between 400 to 9000 seconds of wall clock time. > - executor idle timeout is set to 30s; > > Definition: > - let's say an executor has spark.executor.cores / spark.task.cpus taskSlots, > which represent the max number of tasks an executor will process in parallel. > - the current behaviour of the dynamic allocation is to allocate enough > containers to have one taskSlot per task, which minimizes latency, but wastes > resources when tasks are small regarding executor allocation and idling > overhead. > The results using the proposal (described below) over the job sample (600 > jobs): > - by using 2 tasks per taskSlot, we get a 5% (against -114%) reduction in > resource usage, for a 37% (against 43%) reduction in wall clock time for > Spark w.r.t MR > - by trying to minimize the average resource consumption, I ended up with 6 > tasks per core, with a 30% resource usage reduction, for a similar wall clock > time w.r.t. MR > What did I try to solve the issue with existing parameters (summing up a few > points mentioned in the comments) ? > - change dynamicAllocation.maxExecutors: this would need to be adapted for > each job (tens to thousands splits can occur), and essentially remove the > interest of using the dynamic allocation. > - use dynamicAllocation.backlogTimeout: > - setting this parameter right to avoid creating unused executors is very > dependant on wall clock time. One basically needs to solve the exponential > ramp up for the target time. So this is not an option for my use case where I > don't want a per-job tuning. > - I've still done a series of experiments, details in the comments. > Result is that after manual tuning, the best I could get was a similar > resource consumption at the expense of 20% more wall clock time, or a similar > wall clock time at the expense of 60% more resource consumption than what I > got using my proposal @ 6 tasks per slot (this value being optimized over a > much larger range of jobs as already stated) > - as mentioned in another comment, tampering with the exponential ramp up > might yield task imbalance and such old executors could become contention > points for other exes trying to remotely access blocks in the old exes (not > witnessed in the jobs I'm talking about, but we did see this behavior in > other jobs) > Proposal: > Simply add a tasksPerExecutorSlot parameter, which makes it possible to > specify how many tasks a single taskSlot should ideally execute to mitigate > the overhead of executor allocation. > PR: https://github.com/apache/spark/pull/19881 -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail:
[jira] [Commented] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16300370#comment-16300370 ] Julien Cuquemelle commented on SPARK-22683: --- Thanks [~xuefuz] for your tests. I think it all boils down to : - use default if you want to minimize latency - use 2 if you want the easiest resource saving setting (which seems to be already very interesting) - if you want to maximize reource saving you need to measure ... regarding naming, the taskSlot concept was the first naming that popped up, but I happen to read about Flink out of sheer curiosity, and the precisely use this concept: https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/job_scheduling.html However, I'm not at all attached to this name, we can provide something else, I had thought about something like totalToParallelTasksRatio ... Regarding the explonential ramp up, I do agree that it's interest is debatable. However, in practise with the default setting of 1s, the ramp up is pretty quick, I'm not sure there would be a big difference with an upfront allocation. > DynamicAllocation wastes resources by allocating containers that will barely > be used > > > Key: SPARK-22683 > URL: https://issues.apache.org/jira/browse/SPARK-22683 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Julien Cuquemelle > Labels: pull-request-available > > While migrating a series of jobs from MR to Spark using dynamicAllocation, > I've noticed almost a doubling (+114% exactly) of resource consumption of > Spark w.r.t MR, for a wall clock time gain of 43% > About the context: > - resource usage stands for vcore-hours allocation for the whole job, as seen > by YARN > - I'm talking about a series of jobs because we provide our users with a way > to define experiments (via UI / DSL) that automatically get translated to > Spark / MR jobs and submitted on the cluster > - we submit around 500 of such jobs each day > - these jobs are usually one shot, and the amount of processing can vary a > lot between jobs, and as such finding an efficient number of executors for > each job is difficult to get right, which is the reason I took the path of > dynamic allocation. > - Some of the tests have been scheduled on an idle queue, some on a full > queue. > - experiments have been conducted with spark.executor-cores = 5 and 10, only > results for 5 cores have been reported because efficiency was overall better > than with 10 cores > - the figures I give are averaged over a representative sample of those jobs > (about 600 jobs) ranging from tens to thousands splits in the data > partitioning and between 400 to 9000 seconds of wall clock time. > - executor idle timeout is set to 30s; > > Definition: > - let's say an executor has spark.executor.cores / spark.task.cpus taskSlots, > which represent the max number of tasks an executor will process in parallel. > - the current behaviour of the dynamic allocation is to allocate enough > containers to have one taskSlot per task, which minimizes latency, but wastes > resources when tasks are small regarding executor allocation and idling > overhead. > The results using the proposal (described below) over the job sample (600 > jobs): > - by using 2 tasks per taskSlot, we get a 5% (against -114%) reduction in > resource usage, for a 37% (against 43%) reduction in wall clock time for > Spark w.r.t MR > - by trying to minimize the average resource consumption, I ended up with 6 > tasks per core, with a 30% resource usage reduction, for a similar wall clock > time w.r.t. MR > What did I try to solve the issue with existing parameters (summing up a few > points mentioned in the comments) ? > - change dynamicAllocation.maxExecutors: this would need to be adapted for > each job (tens to thousands splits can occur), and essentially remove the > interest of using the dynamic allocation. > - use dynamicAllocation.backlogTimeout: > - setting this parameter right to avoid creating unused executors is very > dependant on wall clock time. One basically needs to solve the exponential > ramp up for the target time. So this is not an option for my use case where I > don't want a per-job tuning. > - I've still done a series of experiments, details in the comments. > Result is that after manual tuning, the best I could get was a similar > resource consumption at the expense of 20% more wall clock time, or a similar > wall clock time at the expense of 60% more resource consumption than what I > got using my proposal @ 6 tasks per slot (this value being optimized over a > much larger range of jobs as already stated) > - as mentioned in another
[jira] [Updated] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Julien Cuquemelle updated SPARK-22683: -- Description: While migrating a series of jobs from MR to Spark using dynamicAllocation, I've noticed almost a doubling (+114% exactly) of resource consumption of Spark w.r.t MR, for a wall clock time gain of 43% About the context: - resource usage stands for vcore-hours allocation for the whole job, as seen by YARN - I'm talking about a series of jobs because we provide our users with a way to define experiments (via UI / DSL) that automatically get translated to Spark / MR jobs and submitted on the cluster - we submit around 500 of such jobs each day - these jobs are usually one shot, and the amount of processing can vary a lot between jobs, and as such finding an efficient number of executors for each job is difficult to get right, which is the reason I took the path of dynamic allocation. - Some of the tests have been scheduled on an idle queue, some on a full queue. - experiments have been conducted with spark.executor-cores = 5 and 10, only results for 5 cores have been reported because efficiency was overall better than with 10 cores - the figures I give are averaged over a representative sample of those jobs (about 600 jobs) ranging from tens to thousands splits in the data partitioning and between 400 to 9000 seconds of wall clock time. - executor idle timeout is set to 30s; Definition: - let's say an executor has spark.executor.cores / spark.task.cpus taskSlots, which represent the max number of tasks an executor will process in parallel. - the current behaviour of the dynamic allocation is to allocate enough containers to have one taskSlot per task, which minimizes latency, but wastes resources when tasks are small regarding executor allocation and idling overhead. The results using the proposal (described below) over the job sample (600 jobs): - by using 2 tasks per taskSlot, we get a 5% (against -114%) reduction in resource usage, for a 37% (against 43%) reduction in wall clock time for Spark w.r.t MR - by trying to minimize the average resource consumption, I ended up with 6 tasks per core, with a 30% resource usage reduction, for a similar wall clock time w.r.t. MR What did I try to solve the issue with existing parameters (summing up a few points mentioned in the comments) ? - change dynamicAllocation.maxExecutors: this would need to be adapted for each job (tens to thousands splits can occur), and essentially remove the interest of using the dynamic allocation. - use dynamicAllocation.backlogTimeout: - setting this parameter right to avoid creating unused executors is very dependant on wall clock time. One basically needs to solve the exponential ramp up for the target time. So this is not an option for my use case where I don't want a per-job tuning. - I've still done a series of experiments, details in the comments. Result is that after manual tuning, the best I could get was a similar resource consumption at the expense of 20% more wall clock time, or a similar wall clock time at the expense of 60% more resource consumption than what I got using my proposal @ 6 tasks per slot (this value being optimized over a much larger range of jobs as already stated) - as mentioned in another comment, tampering with the exponential ramp up might yield task imbalance and such old executors could become contention points for other exes trying to remotely access blocks in the old exes (not witnessed in the jobs I'm talking about, but we did see this behavior in other jobs) Proposal: Simply add a tasksPerExecutorSlot parameter, which makes it possible to specify how many tasks a single taskSlot should ideally execute to mitigate the overhead of executor allocation. PR: https://github.com/apache/spark/pull/19881 was: While migrating a series of jobs from MR to Spark using dynamicAllocation, I've noticed almost a doubling (+114% exactly) of resource consumption of Spark w.r.t MR, for a wall clock time gain of 43% About the context: - resource usage stands for vcore-hours allocation for the whole job, as seen by YARN - I'm talking about a series of jobs because we provide our users with a way to define experiments (via UI / DSL) that automatically get translated to Spark / MR jobs and submitted on the cluster - we submit around 500 of such jobs each day - these jobs are usually one shot, and the amount of processing can vary a lot between jobs, and as such finding an efficient number of executors for each job is difficult to get right, which is the reason I took the path of dynamic allocation. - Some of the tests have been scheduled on an idle queue, some on a full queue. - experiments have been conducted with spark.executor-cores = 5 and 10, only results for 5 cores have been reported because efficiency was overall
[jira] [Comment Edited] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291141#comment-16291141 ] Julien Cuquemelle edited comment on SPARK-22683 at 12/14/17 5:09 PM: - [~tgraves], thanks a lot for your remarks, I've updated the description and also included a summary of various results and comments I got. Answers about your other questions: "The fact you are asking for 5+cores per executor will naturally waste more resources when the executor isn't being used" In fact the resource usage will be similar with fewer cores, because if I set 1 core per exe, the dynamic allocation will ask for 5 times more exes "But if we can find something that by defaults works better for the majority of workloads that it makes sense to improve" I'm pretty sure 2 tasks per Slot would work for a very large set of workloads, especially short jobs "As with any config though, how do I know what to set the tasksPerSlot as? it requires configuration and it could affect performance." I agree, what I'm trying to show in my argumentation is that: - I don't have any parameter today to do what I want without optimizing each job, which is not feasible in my use case - the granularity of the efficiency of this parameter seems coarser that other parameters (sweetspot values are valid on a more broader range of jobs than maxNbExe or backLogTimeout - it seems to me some settings are quite simple to understand : if I want to minimize latency, let the default value; If I want to save some resources, use a value of 2; If I want to really minimize resource consumption, find a higher number by analysis or aim at meeting a time budget About dynamic allocation : with the default setting of 1s of backlogTimeout, the exponential ramp up is in practise very similar to an upfront request, regarding the duration of jobs. I think upfront allocation could be used instead of exponential, but this wouldn't change the issue which is related to the target number of exes I don't think asking upfront vs exponential has any effect over how Yarn yields containers. "Above you say "When running with 6 tasks per executor slot, our Spark jobs consume in average 30% less vcorehours than the MR jobs, this setting being valid for different workload sizes." Was this with this patch applied or without?" The patch was applied, if not you cannot set the number of tasks per taskSlot (I mentionned "executor slot", which is incorrect, I was refering to taskSlot) "the WallTimeGain wrt MR (%) , does this mean positive numbers ran faster then MR? " Positive numbers mean faster in Spark. "why is running with 6 or 8 slower? is it shuffle issues or mistuning with gc, or just unknown overhead?" running with 6 tasks per taskSlot means that 6 tasks will be processed sequentially by 6 times less task slots was (Author: jcuquemelle): [~tgraves], thanks a lot for your remarks, I've updated the description and also included a summary of various results and comments I got. Answers about your other questions: "The fact you are asking for 5+cores per executor will naturally waste more resources when the executor isn't being used" In fact the resource usage will be similar with fewer cores, because if I set 1 core per exe, the dynamic allocation will ask for 5 times more exes "But if we can find something that by defaults works better for the majority of workloads that it makes sense to improve" I'm pretty sure 2 tasks per Slot would work for a very large set of workloads, especially short jobs "As with any config though, how do I know what to set the tasksPerSlot as? it requires configuration and it could affect performance." I agree, what I'm trying to show in my argumentation is that: - I don't have any parameter today to do what I want without optimizing each job, which is not feasible in my use case - the granularity of the efficiency of this parameter seems coarser that other parameters (sweetspot values are valid on a more broader range of jobs than maxNbExe or backLogTimeout - it seems to me some settings are quite simple to understand : if I want to minimize latency, let the default value; If I want to save some resources, use a value of 2; If I want to really minimize resource consumption, find a higher number by analysis or aim at maximizing a time budget About dynamic allocation : with the default setting of 1s of backlogTimeout, the exponential ramp up is in practise very similar to an upfront request, regarding the duration of jobs. I think upfront allocation could be used instead of exponential, but this wouldn't change the issue which is related to the target number of exes I don't think asking upfront vs exponential has any effect over how Yarn yields containers. "Above you say "When running with 6 tasks per executor slot, our Spark jobs consume in average 30% less vcorehours than the MR
[jira] [Comment Edited] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291141#comment-16291141 ] Julien Cuquemelle edited comment on SPARK-22683 at 12/14/17 5:09 PM: - [~tgraves], thanks a lot for your remarks, I've updated the description and also included a summary of various results and comments I got. Answers about your other questions: "The fact you are asking for 5+cores per executor will naturally waste more resources when the executor isn't being used" In fact the resource usage will be similar with fewer cores, because if I set 1 core per exe, the dynamic allocation will ask for 5 times more exes "But if we can find something that by defaults works better for the majority of workloads that it makes sense to improve" I'm pretty sure 2 tasks per Slot would work for a very large set of workloads, especially short jobs "As with any config though, how do I know what to set the tasksPerSlot as? it requires configuration and it could affect performance." I agree, what I'm trying to show in my argumentation is that: - I don't have any parameter today to do what I want without optimizing each job, which is not feasible in my use case - the granularity of the efficiency of this parameter seems coarser that other parameters (sweetspot values are valid on a more broader range of jobs than maxNbExe or backLogTimeout - it seems to me some settings are quite simple to understand : if I want to minimize latency, let the default value; If I want to save some resources, use a value of 2; If I want to really minimize resource consumption, find a higher number by analysis or aim at maximizing a time budget About dynamic allocation : with the default setting of 1s of backlogTimeout, the exponential ramp up is in practise very similar to an upfront request, regarding the duration of jobs. I think upfront allocation could be used instead of exponential, but this wouldn't change the issue which is related to the target number of exes I don't think asking upfront vs exponential has any effect over how Yarn yields containers. "Above you say "When running with 6 tasks per executor slot, our Spark jobs consume in average 30% less vcorehours than the MR jobs, this setting being valid for different workload sizes." Was this with this patch applied or without?" The patch was applied, if not you cannot set the number of tasks per taskSlot (I mentionned "executor slot", which is incorrect, I was refering to taskSlot) "the WallTimeGain wrt MR (%) , does this mean positive numbers ran faster then MR? " Positive numbers mean faster in Spark. "why is running with 6 or 8 slower? is it shuffle issues or mistuning with gc, or just unknown overhead?" running with 6 tasks per taskSlot means that 6 tasks will be processed sequentially by 6 times less task slots was (Author: jcuquemelle): [~tgraves], thanks a lot for your remarks, I've updated the description and also included a summary of various results and comments I got. Answers about your other questions: "The fact you are asking for 5+cores per executor will naturally waste more resources when the executor isn't being used" In fact the resource usage will be similar with fewer cores, because if I set 1 core per exe, the dynamic allocation will ask for 5 times more exes "But if we can find something that by defaults works better for the majority of workloads that it makes sense to improve" I'm pretty sure 2 tasks per Slot would work for a very large set of workloads, especially short jobs "As with any config though, how do I know what to set the tasksPerSlot as? it requires configuration and it could affect performance." I agree, what I'm trying to show in my argumentation is that: - I don't have any parameter today to do what I want without optimizing each job, which is not feasible in my use case - the granularity of the efficiency of this parameter seems coarser that other parameters (sweetspot values are valid on a more broader range of jobs than maxNbExe or backLogTimeout - it seems to me some settings are quite simple to understand : if I want to minimize latency, let the default value; If I want to save some resources, use a value of 2; If I want to really minimize resource consumption, do an analysis or aim at maximizing a time budget About dynamic allocation : with the default setting of 1s of backlogTimeout, the exponential ramp up is in practise very similar to an upfront request, regarding the duration of jobs. I think upfront allocation could be used instead of exponential, but this wouldn't change the issue which is related to the target number of exes I don't think asking upfront vs exponential has any effect over how Yarn yields containers. "Above you say "When running with 6 tasks per executor slot, our Spark jobs consume in average 30% less vcorehours than the MR jobs, this
[jira] [Commented] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291156#comment-16291156 ] Julien Cuquemelle commented on SPARK-22683: --- I did see SPARK-16158 before opening a new ticket, my proposal seemed simple enough for me not to need a full pluggable policy (with which I do agree, but seems much more difficult to be accepted IMHO) [~tgraves], you mentioned yourself that the current policy could be improved, and that the complexity from the users' point of view should not be overlooked :-) > DynamicAllocation wastes resources by allocating containers that will barely > be used > > > Key: SPARK-22683 > URL: https://issues.apache.org/jira/browse/SPARK-22683 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Julien Cuquemelle > Labels: pull-request-available > > While migrating a series of jobs from MR to Spark using dynamicAllocation, > I've noticed almost a doubling (+114% exactly) of resource consumption of > Spark w.r.t MR, for a wall clock time gain of 43% > About the context: > - resource usage stands for vcore-hours allocation for the whole job, as seen > by YARN > - I'm talking about a series of jobs because we provide our users with a way > to define experiments (via UI / DSL) that automatically get translated to > Spark / MR jobs and submitted on the cluster > - we submit around 500 of such jobs each day > - these jobs are usually one shot, and the amount of processing can vary a > lot between jobs, and as such finding an efficient number of executors for > each job is difficult to get right, which is the reason I took the path of > dynamic allocation. > - Some of the tests have been scheduled on an idle queue, some on a full > queue. > - experiments have been conducted with spark.executor-cores = 5 and 10, only > results for 5 cores have been reported because efficiency was overall better > than with 10 cores > - the figures I give are averaged over a representative sample of those jobs > (about 600 jobs) ranging from tens to thousands splits in the data > partitioning and between 400 to 9000 seconds of wall clock time. > - executor idle timeout is set to 30s; > > Definition: > - let's say an executor has spark.executor.cores / spark.task.cpus taskSlots, > which represent the max number of tasks an executor will process in parallel. > - the current behaviour of the dynamic allocation is to allocate enough > containers to have one taskSlot per task, which minimizes latency, but wastes > resources when tasks are small regarding executor allocation and idling > overhead. > The results using the proposal (described below) over the job sample (600 > jobs): > - by using 2 tasks per taskSlot, we get a 5% (against -114%) reduction in > resource usage, for a 37% (against 43%) reduction in wall clock time for > Spark w.r.t MR > - by trying to minimize the average resource consumption, I ended up with 6 > tasks per core, with a 30% resource usage reduction, for a similar wall clock > time w.r.t. MR > What did I try to mitigate this (summing up a few points mentioned in the > comments)? > - change dynamicAllocation.maxExecutors: this would need to be adapted for > each job (tens to thousands splits can occur), and essentially remove the > interest of using the dynamic allocation. > - use dynamicAllocation.backlogTimeout: > - setting this parameter right to avoid creating unused executors is very > dependant on wall clock time. One basically needs to solve the exponential > ramp up for the target time. So this is not an option for my use case where I > don't want a per-job tuning. > - I've still done a series of experiments, details in the comments. > Result is that after manual tuning, the best I could get was a similar > resource consumption at the expense of 20% more wall clock time, or a similar > wall clock time at the expense of 60% more resource consumption than what I > got using my proposal @ 6 tasks per slot (this value being optimized over a > much larger range of jobs as already stated) > - as mentioned in another comment, tampering with the exponential ramp up > might yield task imbalance and such old executors could become contention > points for other exes trying to remotely access blocks in the old exes (not > witnessed in the jobs I'm talking about, but we did see this behavior in > other jobs) > Proposal: > Simply add a tasksPerExecutorSlot parameter, which makes it possible to > specify how many tasks a single taskSlot should ideally execute to mitigate > the overhead of executor allocation. > PR: https://github.com/apache/spark/pull/19881 -- This message was sent by Atlassian
[jira] [Commented] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291141#comment-16291141 ] Julien Cuquemelle commented on SPARK-22683: --- [~tgraves], thanks a lot for your remarks, I've updated the description and also included a summary of various results and comments I got. Answers about your other questions: "The fact you are asking for 5+cores per executor will naturally waste more resources when the executor isn't being used" In fact the resource usage will be similar with fewer cores, because if I set 1 core per exe, the dynamic allocation will ask for 5 times more exes "But if we can find something that by defaults works better for the majority of workloads that it makes sense to improve" I'm pretty sure 2 tasks per Slot would work for a very large set of workloads, especially short jobs "As with any config though, how do I know what to set the tasksPerSlot as? it requires configuration and it could affect performance." I agree, what I'm trying to show in my argumentation is that: - I don't have any parameter today to do what I want without optimizing each job, which is not feasible in my use case - the granularity of the efficiency of this parameter seems coarser that other parameters (sweetspot values are valid on a more broader range of jobs than maxNbExe or backLogTimeout - it seems to me some settings are quite simple to understand : if I want to minimize latency, let the default value; If I want to save some resources, use a value of 2; If I want to really minimize resource consumption, do an analysis or aim at maximizing a time budget About dynamic allocation : with the default setting of 1s of backlogTimeout, the exponential ramp up is in practise very similar to an upfront request, regarding the duration of jobs. I think upfront allocation could be used instead of exponential, but this wouldn't change the issue which is related to the target number of exes I don't think asking upfront vs exponential has any effect over how Yarn yields containers. "Above you say "When running with 6 tasks per executor slot, our Spark jobs consume in average 30% less vcorehours than the MR jobs, this setting being valid for different workload sizes." Was this with this patch applied or without?" The patch was applied, if not you cannot set the number of tasks per taskSlot (I mentionned "executor slot", which is incorrect, I was refering to taskSlot) "the WallTimeGain wrt MR (%) , does this mean positive numbers ran faster then MR? " Positive numbers mean faster in Spark. "why is running with 6 or 8 slower? is it shuffle issues or mistuning with gc, or just unknown overhead?" running with 6 tasks per taskSlot means that 6 tasks will be processed sequentially by 6 times less task slots > DynamicAllocation wastes resources by allocating containers that will barely > be used > > > Key: SPARK-22683 > URL: https://issues.apache.org/jira/browse/SPARK-22683 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Julien Cuquemelle > Labels: pull-request-available > > While migrating a series of jobs from MR to Spark using dynamicAllocation, > I've noticed almost a doubling (+114% exactly) of resource consumption of > Spark w.r.t MR, for a wall clock time gain of 43% > About the context: > - resource usage stands for vcore-hours allocation for the whole job, as seen > by YARN > - I'm talking about a series of jobs because we provide our users with a way > to define experiments (via UI / DSL) that automatically get translated to > Spark / MR jobs and submitted on the cluster > - we submit around 500 of such jobs each day > - these jobs are usually one shot, and the amount of processing can vary a > lot between jobs, and as such finding an efficient number of executors for > each job is difficult to get right, which is the reason I took the path of > dynamic allocation. > - Some of the tests have been scheduled on an idle queue, some on a full > queue. > - experiments have been conducted with spark.executor-cores = 5 and 10, only > results for 5 cores have been reported because efficiency was overall better > than with 10 cores > - the figures I give are averaged over a representative sample of those jobs > (about 600 jobs) ranging from tens to thousands splits in the data > partitioning and between 400 to 9000 seconds of wall clock time. > - executor idle timeout is set to 30s; > > Definition: > - let's say an executor has spark.executor.cores / spark.task.cpus taskSlots, > which represent the max number of tasks an executor will process in parallel. > - the current behaviour of the dynamic allocation is to allocate enough >
[jira] [Updated] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Julien Cuquemelle updated SPARK-22683: -- Description: While migrating a series of jobs from MR to Spark using dynamicAllocation, I've noticed almost a doubling (+114% exactly) of resource consumption of Spark w.r.t MR, for a wall clock time gain of 43% About the context: - resource usage stands for vcore-hours allocation for the whole job, as seen by YARN - I'm talking about a series of jobs because we provide our users with a way to define experiments (via UI / DSL) that automatically get translated to Spark / MR jobs and submitted on the cluster - we submit around 500 of such jobs each day - these jobs are usually one shot, and the amount of processing can vary a lot between jobs, and as such finding an efficient number of executors for each job is difficult to get right, which is the reason I took the path of dynamic allocation. - Some of the tests have been scheduled on an idle queue, some on a full queue. - experiments have been conducted with spark.executor-cores = 5 and 10, only results for 5 cores have been reported because efficiency was overall better than with 10 cores - the figures I give are averaged over a representative sample of those jobs (about 600 jobs) ranging from tens to thousands splits in the data partitioning and between 400 to 9000 seconds of wall clock time. - executor idle timeout is set to 30s; Definition: - let's say an executor has spark.executor.cores / spark.task.cpus taskSlots, which represent the max number of tasks an executor will process in parallel. - the current behaviour of the dynamic allocation is to allocate enough containers to have one taskSlot per task, which minimizes latency, but wastes resources when tasks are small regarding executor allocation and idling overhead. The results using the proposal (described below) over the job sample (600 jobs): - by using 2 tasks per taskSlot, we get a 5% (against -114%) reduction in resource usage, for a 37% (against 43%) reduction in wall clock time for Spark w.r.t MR - by trying to minimize the average resource consumption, I ended up with 6 tasks per core, with a 30% resource usage reduction, for a similar wall clock time w.r.t. MR What did I try to mitigate this (summing up a few points mentioned in the comments)? - change dynamicAllocation.maxExecutors: this would need to be adapted for each job (tens to thousands splits can occur), and essentially remove the interest of using the dynamic allocation. - use dynamicAllocation.backlogTimeout: - setting this parameter right to avoid creating unused executors is very dependant on wall clock time. One basically needs to solve the exponential ramp up for the target time. So this is not an option for my use case where I don't want a per-job tuning. - I've still done a series of experiments, details in the comments. Result is that after manual tuning, the best I could get was a similar resource consumption at the expense of 20% more wall clock time, or a similar wall clock time at the expense of 60% more resource consumption than what I got using my proposal @ 6 tasks per slot (this value being optimized over a much larger range of jobs as already stated) - as mentioned in another comment, tampering with the exponential ramp up might yield task imbalance and such old executors could become contention points for other exes trying to remotely access blocks in the old exes (not witnessed in the jobs I'm talking about, but we did see this behavior in other jobs) Proposal: Simply add a tasksPerExecutorSlot parameter, which makes it possible to specify how many tasks a single taskSlot should ideally execute to mitigate the overhead of executor allocation. PR: https://github.com/apache/spark/pull/19881 was: let's say an executor has spark.executor.cores / spark.task.cpus taskSlots The current dynamic allocation policy allocates enough executors to have each taskSlot execute a single task, which minimizes latency, but wastes resources when tasks are small regarding executor allocation and idling overhead. By adding the tasksPerExecutorSlot, it is made possible to specify how many tasks a single slot should ideally execute to mitigate the overhead of executor allocation. PR: https://github.com/apache/spark/pull/19881 > DynamicAllocation wastes resources by allocating containers that will barely > be used > > > Key: SPARK-22683 > URL: https://issues.apache.org/jira/browse/SPARK-22683 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Julien Cuquemelle > Labels: pull-request-available > > While migrating a
[jira] [Comment Edited] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16280548#comment-16280548 ] Julien Cuquemelle edited comment on SPARK-22683 at 12/14/17 3:25 PM: - I don't understand your statement about delaying executor addition ? I want to cap the number of executors in an adaptive way regarding the current number of tasks, not delay their creation. Doing this with dynamicAllocation.maxExecutors requires each job to be tuned for efficiency; when we're doing experiments, a lot of jobs are one shot, so they can't be fine tuned. The proposal gives a way to have an adaptive behaviour for a family of jobs. Regarding slowing the ramp up of executors with schedulerBacklogTimeout, I've made experiments to play with this parameter; I have made 2 series of experiments (7 similar jobs on each test case, average figures reported in the following table), one on a busy queue, and the other on an idle queue. I'll report only the idle queue, as the figures on the busy queue are even worse for the schedulerBacklogTimeout approach: First row is using the default 1s for the schedulerBacklogTimeout, and uses the 6 tasks per executorSlot I've mentioned above, other rows use the default dynamicAllocation behaviour and only change schedulerBacklogTimeout ||SparkWallTimeSec||Spk-vCores-H||taskPerExeSlot||schedulerBacklogTimeout|| |693.571429|37.142857|6|1.0| |584.857143|69.571429|1|30.0| |763.428571|54.285714|1|60.0| |826.714286|39.571429|1|90.0| So basically I can tune the backlogTimeout to get a similar vCores-H consumption at the expense of almost 20% more wallClockTime, or I can tune the parameter to get about the same wallClockTime at the expense of about 60% more vcoreH consumption (very roughly extrapolated between 30 and 60 secs for schedulerBacklogTimeout). It does not seem to solve the issue I'm trying to address, moreover this would again need to be tuned for each specific job's duration (to find the 90s timeout to get the similar resource consumption, I had to solve the exponential ramp-up with the duration of the already run job, which is not feasible in experimental use cases ). The previous experiments that allowed me to find the sweet spot at 6 tasks per slot has involved job wallClockTimes between 400 and 9000 seconds Another way to have a look at this new parameter I'm proposing is to have a simple way to tune the latency / resource consumption tradeoff. was (Author: jcuquemelle): I don't understand your statement about delaying executor addition ? I want to cap the number of executors in an adaptive way regarding the current number of tasks, not delay their creation. Doing this with dynamicAllocation.maxExecutors requires each job to be tuned for efficiency; when we're doing experiments, a lot of jobs are one shot, so they can't be fine tuned. The proposal gives a way to have an adaptive behaviour for a family of jobs. Regarding slowing the ramp up of executors with schedulerBacklogTimeout, I've made experiments to play with this parameter; I have made 2 series of experiments (7 jobs on each test case, average figures reported in the following table), one on a busy queue, and the other on an idle queue. I'll report only the idle queue, as the figures on the busy queue are even worse for the schedulerBacklogTimeout approach: First row is using the default 1s for the schedulerBacklogTimeout, and uses the 6 tasks per executorSlot I've mentioned above, other rows use the default dynamicAllocation behaviour and only change schedulerBacklogTimeout ||SparkWallTimeSec||Spk-vCores-H||taskPerExeSlot||schedulerBacklogTimeout|| |693.571429|37.142857|6|1.0| |584.857143|69.571429|1|30.0| |763.428571|54.285714|1|60.0| |826.714286|39.571429|1|90.0| So basically I can tune the backlogTimeout to get a similar vCores-H consumption at the expense of almost 20% more wallClockTime, or I can tune the parameter to get about the same wallClockTime at the expense of about 60% more vcoreH consumption (very roughly extrapolated between 30 and 60 secs for schedulerBacklogTimeout). It does not seem to solve the issue I'm trying to address, moreover this would again need to be tuned for each specific job's duration (to find the 90s timeout to get the similar resource consumption, I had to solve the exponential ramp-up with the duration of the already run job, which is not feasible in experimental use cases ). The previous experiments that allowed me to find the sweet spot at 6 tasks per slot has involved job wallClockTimes between 400 and 9000 seconds Another way to have a look at this new parameter I'm proposing is to have a simple way to tune the latency / resource consumption tradeoff. > DynamicAllocation wastes resources by allocating containers that will barely > be used >
[jira] [Commented] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16289101#comment-16289101 ] Julien Cuquemelle commented on SPARK-22683: --- Hi [~srowen], Just a small remark about the use of schedulerBacklogTimeout to slow down the build up of executors: We have recently experienced an issue where the executors allocation by Yarn was slow; this resulted in the first executors having done a lot of tasks (hence storing a lot of blocks), and become a contention point in the subsequent stage, when all executors have finally been allocated and try to access the first exe's blocks remotely. So I guess the fact that the schedulerBacklogTimeout is very short and allows to quicky ramp up the number of executors is not trivial to alter if we don't want to create a too big imbalance in blocks stored by executors. So I don't think that using this parameter to alter resource usage by the dynamic allocation is a good choice, for this very reason and also because of the fact that it is highly dependant on the wall clock time of the job. > DynamicAllocation wastes resources by allocating containers that will barely > be used > > > Key: SPARK-22683 > URL: https://issues.apache.org/jira/browse/SPARK-22683 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Julien Cuquemelle > Labels: pull-request-available > > let's say an executor has spark.executor.cores / spark.task.cpus taskSlots > The current dynamic allocation policy allocates enough executors > to have each taskSlot execute a single task, which minimizes latency, > but wastes resources when tasks are small regarding executor allocation > and idling overhead. > By adding the tasksPerExecutorSlot, it is made possible to specify how many > tasks > a single slot should ideally execute to mitigate the overhead of executor > allocation. > PR: https://github.com/apache/spark/pull/19881 -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16289046#comment-16289046 ] Julien Cuquemelle commented on SPARK-22683: --- Hi [~xuefuz], Thanks for noticing you also experience resource usage efficiency issues when using dynamic allocation. I'll take the opportunity to provide a few clarifications: - regarding tuning, the study I made is precisely about tuning for a type of workload and not specific jobs. Our users launch several hundreds of such jobs per day (the size of which spanning a wide range as already stated); the figures I gave where computed over a representative sample of such jobs. - The resource waste is not specific to MR-style jobs, as this over-allocation of executors will happen every time a new stage is launched with more tasks than available taskSlots. I think that it would be great to allow the dynamicAllocation to be resource efficient, while still giving the benefit of elasticity over different stages. - According to the tuning guide, Spark has been designed for larger executors (5 cores regularly reported as a sweet spot) that should execute a large number of small tasks ("as short as 200ms"), to mutualize / mitigate various overheads,IOs, ..., so I'm not sure going back to smaller executors is the right move > DynamicAllocation wastes resources by allocating containers that will barely > be used > > > Key: SPARK-22683 > URL: https://issues.apache.org/jira/browse/SPARK-22683 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Julien Cuquemelle > Labels: pull-request-available > > let's say an executor has spark.executor.cores / spark.task.cpus taskSlots > The current dynamic allocation policy allocates enough executors > to have each taskSlot execute a single task, which minimizes latency, > but wastes resources when tasks are small regarding executor allocation > and idling overhead. > By adding the tasksPerExecutorSlot, it is made possible to specify how many > tasks > a single slot should ideally execute to mitigate the overhead of executor > allocation. > PR: https://github.com/apache/spark/pull/19881 -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Julien Cuquemelle updated SPARK-22683: -- Description: let's say an executor has spark.executor.cores / spark.task.cpus taskSlots The current dynamic allocation policy allocates enough executors to have each taskSlot execute a single task, which minimizes latency, but wastes resources when tasks are small regarding executor allocation and idling overhead. By adding the tasksPerExecutorSlot, it is made possible to specify how many tasks a single slot should ideally execute to mitigate the overhead of executor allocation. PR: https://github.com/apache/spark/pull/19881 was: let's say an executor has spark.executor.cores / spark.task.cpus taskSlots The current dynamic allocation policy allocates enough executors to have each taskSlot execute a single task, which minimizes latency, but wastes resources when tasks are small regarding executor allocation overhead. By adding the tasksPerExecutorSlot, it is made possible to specify how many tasks a single slot should ideally execute to mitigate the overhead of executor allocation. PR: https://github.com/apache/spark/pull/19881 > DynamicAllocation wastes resources by allocating containers that will barely > be used > > > Key: SPARK-22683 > URL: https://issues.apache.org/jira/browse/SPARK-22683 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Julien Cuquemelle > Labels: pull-request-available > > let's say an executor has spark.executor.cores / spark.task.cpus taskSlots > The current dynamic allocation policy allocates enough executors > to have each taskSlot execute a single task, which minimizes latency, > but wastes resources when tasks are small regarding executor allocation > and idling overhead. > By adding the tasksPerExecutorSlot, it is made possible to specify how many > tasks > a single slot should ideally execute to mitigate the overhead of executor > allocation. > PR: https://github.com/apache/spark/pull/19881 -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Julien Cuquemelle updated SPARK-22683: -- Summary: DynamicAllocation wastes resources by allocating containers that will barely be used (was: Allow tuning the number of dynamically allocated executors wrt task number) > DynamicAllocation wastes resources by allocating containers that will barely > be used > > > Key: SPARK-22683 > URL: https://issues.apache.org/jira/browse/SPARK-22683 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Julien Cuquemelle > Labels: pull-request-available > > let's say an executor has spark.executor.cores / spark.task.cpus taskSlots > The current dynamic allocation policy allocates enough executors > to have each taskSlot execute a single task, which minimizes latency, > but wastes resources when tasks are small regarding executor allocation > overhead. > By adding the tasksPerExecutorSlot, it is made possible to specify how many > tasks > a single slot should ideally execute to mitigate the overhead of executor > allocation. > PR: https://github.com/apache/spark/pull/19881 -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-22683) Allow tuning the number of dynamically allocated executors wrt task number
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16286253#comment-16286253 ] Julien Cuquemelle edited comment on SPARK-22683 at 12/12/17 3:14 PM: - The impression I get from our discussion is that you mainly focus on the latency of the jobs, and that the current setting is optimized for that, which is why you consider the current setup sufficient. If you consider your previous example from a resource usage point of view, my proposal would allow to have about the same resource usage in both scenarios, but the current setup doubles the resource usage of the workload with small tasks... I've tried to experiment with the parameters you've proposed, but right now I don't have a solution to optimize my type of workload (not every single job) for resource consumption. I don't know if the majority of Spark users run on idle clusters, but ours is routinely full, so for us resource usage is more important than latency. was (Author: jcuquemelle): The impression I get from our discussion is that you mainly focus on the latency of the jobs, and that the current setting is optimized for that, which is why you consider the current setup sufficient. If you consider your previous example from a resource usage point of view, my proposal would allow to have about the same resource usage in both scenarios, but the current setup doubles the resource usage of the workload with small tasks... I've tried to experiment with the parameters you've proposed, but right now I don't have a solution to optimize my type of workload (nor every single job) for resource consumption. I don't know if the majority of Spark users run on idle clusters, but ours is routinely full, so for us resource usage is more important than latency. > Allow tuning the number of dynamically allocated executors wrt task number > -- > > Key: SPARK-22683 > URL: https://issues.apache.org/jira/browse/SPARK-22683 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Julien Cuquemelle > Labels: pull-request-available > > let's say an executor has spark.executor.cores / spark.task.cpus taskSlots > The current dynamic allocation policy allocates enough executors > to have each taskSlot execute a single task, which minimizes latency, > but wastes resources when tasks are small regarding executor allocation > overhead. > By adding the tasksPerExecutorSlot, it is made possible to specify how many > tasks > a single slot should ideally execute to mitigate the overhead of executor > allocation. > PR: https://github.com/apache/spark/pull/19881 -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22683) Allow tuning the number of dynamically allocated executors wrt task number
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16286253#comment-16286253 ] Julien Cuquemelle commented on SPARK-22683: --- The impression I get from our discussion is that you mainly focus on the latency of the jobs, and that the current setting is optimized for that, which is why you consider the current setup sufficient. If you consider your previous example from a resource usage point of view, my proposal would allow to have about the same resource usage in both scenarios, but the current setup doubles the resource usage of the workload with small tasks... I've tried to experiment with the parameters you've proposed, but right now I don't have a solution to optimize my type of workload (nor every single job) for resource consumption. I don't know if the majority of Spark users run on idle clusters, but ours is routinely full, so for us resource usage is more important than latency. > Allow tuning the number of dynamically allocated executors wrt task number > -- > > Key: SPARK-22683 > URL: https://issues.apache.org/jira/browse/SPARK-22683 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Julien Cuquemelle > Labels: pull-request-available > > let's say an executor has spark.executor.cores / spark.task.cpus taskSlots > The current dynamic allocation policy allocates enough executors > to have each taskSlot execute a single task, which minimizes latency, > but wastes resources when tasks are small regarding executor allocation > overhead. > By adding the tasksPerExecutorSlot, it is made possible to specify how many > tasks > a single slot should ideally execute to mitigate the overhead of executor > allocation. > PR: https://github.com/apache/spark/pull/19881 -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-22683) Allow tuning the number of dynamically allocated executors wrt task number
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16286032#comment-16286032 ] Julien Cuquemelle edited comment on SPARK-22683 at 12/11/17 3:21 PM: - I'd like to point out that when refering to "my use case", it is actually a wide range of job sizes, ranging from tens to thousands splits in the data partitioning and between 400 to 9000 seconds of wall clock time. Maybe I didn't emphasize enough in my last answer that the figures I presented using schedulerBacklogTimeout needed tuning for a specific job, and that the resulting value of the parameter will not be optimal for other job settings. My proposal allows to have a sensible default over the whole range of jobs. This is also true of the maxExecutors parameters, each job would need to be tuned using this parameter. Regarding "there's nothing inherently more efficient about the scheduling policy you're proposing", I might add a few information: so far I have aimed at having the same latency as the legacy MR jobs, which yield the 6 tasksPerSlot figure and a 30% resource usage decrease. Here are the figures with other values of taskPerSlot, in terms of gain wrt the corresponding MR jobs: ||taskPerSlot||VcoreGain wrt MR (%)||WallTimeGain wrt MR (%)|| |1.0|-114.329984|43.777071| |2.0|5.373456|37.610840| |3.0|20.785099|28.528394| |4.0|26.501597|18.516913| |6.0|30.852756|-0.991918| |8.0|31.147028|-14.633520| It seems to me the 2 tasks per slot is particularly interesting, as it allows to go from a doubling of used resources to a usage similar to MR with a very low impact on the latency. I bet that this resource usage reduction might not be limited to our use case ... was (Author: jcuquemelle): I'd like to point out that when refering to "my use case", it is actually a wide range of job sizes, ranging from tens to thousands splits in the data partitioning and between 400 to 9000 seconds of wall clock time. Maybe I didn't emphasize enough in my last answer that the figures I presented using schedulerBacklogTimeout needed tuning for a specific job, and that the resulting value of the parameter will not be optimal for other job settings. My proposal allows to have a sensible default over the whole range of jobs. This is also true of the maxExecutors parameters, each job would need to be tuned using this parameter. Regarding "there's nothing inherently more efficient about the scheduling policy you're proposing", I might add a few information: so far I have aimed at having the same latency as the legacy MR jobs, which yield the 6 tasksPerSlot figure and a 30% resource usage decrease. Here are the figures with other values of taskPerSlot, in terms of gain wrt the corresponding MR jobs: ||taskPerCore||VcoreGain wrt MR (%)||WallTimeGain wrt MR (%)|| |1.0|-114.329984|43.777071| |2.0|5.373456|37.610840| |3.0|20.785099|28.528394| |4.0|26.501597|18.516913| |6.0|30.852756|-0.991918| |8.0|31.147028|-14.633520| It seems to me the 2 tasks per slot is particularly interesting, as it allows to go from a doubling of used resources to a usage similar to MR with a very low impact on the latency. I bet that this resource usage reduction might not be limited to our use case ... > Allow tuning the number of dynamically allocated executors wrt task number > -- > > Key: SPARK-22683 > URL: https://issues.apache.org/jira/browse/SPARK-22683 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Julien Cuquemelle > Labels: pull-request-available > > let's say an executor has spark.executor.cores / spark.task.cpus taskSlots > The current dynamic allocation policy allocates enough executors > to have each taskSlot execute a single task, which minimizes latency, > but wastes resources when tasks are small regarding executor allocation > overhead. > By adding the tasksPerExecutorSlot, it is made possible to specify how many > tasks > a single slot should ideally execute to mitigate the overhead of executor > allocation. > PR: https://github.com/apache/spark/pull/19881 -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-22683) Allow tuning the number of dynamically allocated executors wrt task number
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16286032#comment-16286032 ] Julien Cuquemelle edited comment on SPARK-22683 at 12/11/17 3:20 PM: - I'd like to point out that when refering to "my use case", it is actually a wide range of job sizes, ranging from tens to thousands splits in the data partitioning and between 400 to 9000 seconds of wall clock time. Maybe I didn't emphasize enough in my last answer that the figures I presented using schedulerBacklogTimeout needed tuning for a specific job, and that the resulting value of the parameter will not be optimal for other job settings. My proposal allows to have a sensible default over the whole range of jobs. This is also true of the maxExecutors parameters, each job would need to be tuned using this parameter. Regarding "there's nothing inherently more efficient about the scheduling policy you're proposing", I might add a few information: so far I have aimed at having the same latency as the legacy MR jobs, which yield the 6 tasksPerSlot figure and a 30% resource usage decrease. Here are the figures with other values of taskPerSlot, in terms of gain wrt the corresponding MR jobs: ||taskPerCore||VcoreGain wrt MR (%)||WallTimeGain wrt MR (%)|| |1.0|-114.329984|43.777071| |2.0|5.373456|37.610840| |3.0|20.785099|28.528394| |4.0|26.501597|18.516913| |6.0|30.852756|-0.991918| |8.0|31.147028|-14.633520| It seems to me the 2 tasks per slot is particularly interesting, as it allows to go from a doubling of used resources to a usage similar to MR with a very low impact on the latency. I bet that this resource usage reduction might not be limited to our use case ... was (Author: jcuquemelle): I'd like to point out that when refering to "my use case", it is actually a wide range of job sizes, ranging from tens to thousands splits in the data partitioning and between 400 to 9000 seconds of wall clock time. Maybe I didn't emphasize enough in my last answer that the figures I presented using schedulerBacklogTimeout needed tuning for a specific job, and that the resulting value of the parameter will not be optimal for other job settings. My proposal allows to have a sensible default over the whole range of jobs. This is also true of the maxExecutors parameters, each job would need to be tuned using this parameter. Regarding "there's nothing inherently more efficient about the scheduling policy you're proposing", I might add a few information: so far I have aimed at having the same latency as the legacy MR job, which yield the 6 tasksPerSlot figure and a 30% resource usage decrease. Here are the figures with other values of taskPerSlot, in terms of gain wrt the corresponding MR jobs: ||taskPerCore||VcoreGain wrt MR (%)||WallTimeGain wrt MR (%)|| |1.0|-114.329984|43.777071| |2.0|5.373456|37.610840| |3.0|20.785099|28.528394| |4.0|26.501597|18.516913| |6.0|30.852756|-0.991918| |8.0|31.147028|-14.633520| It seems to me the 2 tasks per slot is particularly interesting, as it allows to go from a doubling of used resources to a usage similar to MR with a very low impact on the latency. I bet that this resource usage reduction might not be limited to our use case ... > Allow tuning the number of dynamically allocated executors wrt task number > -- > > Key: SPARK-22683 > URL: https://issues.apache.org/jira/browse/SPARK-22683 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Julien Cuquemelle > Labels: pull-request-available > > let's say an executor has spark.executor.cores / spark.task.cpus taskSlots > The current dynamic allocation policy allocates enough executors > to have each taskSlot execute a single task, which minimizes latency, > but wastes resources when tasks are small regarding executor allocation > overhead. > By adding the tasksPerExecutorSlot, it is made possible to specify how many > tasks > a single slot should ideally execute to mitigate the overhead of executor > allocation. > PR: https://github.com/apache/spark/pull/19881 -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-22683) Allow tuning the number of dynamically allocated executors wrt task number
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16286032#comment-16286032 ] Julien Cuquemelle edited comment on SPARK-22683 at 12/11/17 3:19 PM: - I'd like to point out that when refering to "my use case", it is actually a wide range of job sizes, ranging from tens to thousands splits in the data partitioning and between 400 to 9000 seconds of wall clock time. Maybe I didn't emphasize enough in my last answer that the figures I presented using schedulerBacklogTimeout needed tuning for a specific job, and that the resulting value of the parameter will not be optimal for other job settings. My proposal allows to have a sensible default over the whole range of jobs. This is also true of the maxExecutors parameters, each job would need to be tuned using this parameter. Regarding "there's nothing inherently more efficient about the scheduling policy you're proposing", I might add a few information: so far I have aimed at having the same latency as the legacy MR job, which yield the 6 tasksPerSlot figure and a 30% resource usage decrease. Here are the figures with other values of taskPerSlot, in terms of gain wrt the corresponding MR jobs: ||taskPerCore||VcoreGain wrt MR (%)||WallTimeGain wrt MR (%)|| |1.0|-114.329984|43.777071| |2.0|5.373456|37.610840| |3.0|20.785099|28.528394| |4.0|26.501597|18.516913| |6.0|30.852756|-0.991918| |8.0|31.147028|-14.633520| It seems to me the 2 tasks per slot is particularly interesting, as it allows to go from a doubling of used resources to a usage similar to MR with a very low impact on the latency. I bet that this resource usage reduction might not be limited to our use case ... was (Author: jcuquemelle): I'd like to point out that when refering to "my use case", it is actually a wide range of job sizes, ranging from tens to thousands splits in the data partitioning and between 400 to 9000 seconds of wall clock time. Maybe I didn't emphasize enough in my last answer that the figures I presented using schedulerBacklogTimeout needed tuning for a specific job, and that the resulting value of the parameter will not be optimal for other job settings. My proposal allows to have a sensible default over the whole range of jobs. This is also true of the maxExecutors parameters, each job needs to be tuned using this parameter. Regarding "there's nothing inherently more efficient about the scheduling policy you're proposing", I might add a few information: so far I have aimed at having the same latency as the legacy MR job, which yield the 6 tasksPerSlot figure and a 30% resource usage decrease. Here are the figures with other values of taskPerSlot, in terms of gain wrt the corresponding MR jobs: ||taskPerCore||VcoreGain wrt MR (%)||WallTimeGain wrt MR (%)|| |1.0|-114.329984|43.777071| |2.0|5.373456|37.610840| |3.0|20.785099|28.528394| |4.0|26.501597|18.516913| |6.0|30.852756|-0.991918| |8.0|31.147028|-14.633520| It seems to me the 2 tasks per slot is particularly interesting, as it allows to go from a doubling of used resources to a usage similar to MR with a very low impact on the latency. I bet that this resource usage reduction might not be limited to our use case ... > Allow tuning the number of dynamically allocated executors wrt task number > -- > > Key: SPARK-22683 > URL: https://issues.apache.org/jira/browse/SPARK-22683 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Julien Cuquemelle > Labels: pull-request-available > > let's say an executor has spark.executor.cores / spark.task.cpus taskSlots > The current dynamic allocation policy allocates enough executors > to have each taskSlot execute a single task, which minimizes latency, > but wastes resources when tasks are small regarding executor allocation > overhead. > By adding the tasksPerExecutorSlot, it is made possible to specify how many > tasks > a single slot should ideally execute to mitigate the overhead of executor > allocation. > PR: https://github.com/apache/spark/pull/19881 -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22683) Allow tuning the number of dynamically allocated executors wrt task number
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16286032#comment-16286032 ] Julien Cuquemelle commented on SPARK-22683: --- I'd like to point out that when refering to "my use case", it is actually a wide range of job sizes, ranging from tens to thousands splits in the data partitioning and between 400 to 9000 seconds of wall clock time. Maybe I didn't emphasize enough in my last answer that the figures I presented using schedulerBacklogTimeout needed tuning for a specific job, and that the resulting value of the parameter will not be optimal for other job settings. My proposal allows to have a sensible default over the whole range of jobs. This is also true of the maxExecutors parameters, each job needs to be tuned using this parameter. Regarding "there's nothing inherently more efficient about the scheduling policy you're proposing", I might add a few information: so far I have aimed at having the same latency as the legacy MR job, which yield the 6 tasksPerSlot figure and a 30% resource usage decrease. Here are the figures with other values of taskPerSlot, in terms of gain wrt the corresponding MR jobs: ||taskPerCore||VcoreGain wrt MR (%)||WallTimeGain wrt MR (%)|| |1.0|-114.329984|43.777071| |2.0|5.373456|37.610840| |3.0|20.785099|28.528394| |4.0|26.501597|18.516913| |6.0|30.852756|-0.991918| |8.0|31.147028|-14.633520| It seems to me the 2 tasks per slot is particularly interesting, as it allows to go from a doubling of used resources to a usage similar to MR with a very low impact on the latency. I bet that this resource usage reduction might not be limited to our use case ... > Allow tuning the number of dynamically allocated executors wrt task number > -- > > Key: SPARK-22683 > URL: https://issues.apache.org/jira/browse/SPARK-22683 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Julien Cuquemelle > Labels: pull-request-available > > let's say an executor has spark.executor.cores / spark.task.cpus taskSlots > The current dynamic allocation policy allocates enough executors > to have each taskSlot execute a single task, which minimizes latency, > but wastes resources when tasks are small regarding executor allocation > overhead. > By adding the tasksPerExecutorSlot, it is made possible to specify how many > tasks > a single slot should ideally execute to mitigate the overhead of executor > allocation. > PR: https://github.com/apache/spark/pull/19881 -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-22683) Allow tuning the number of dynamically allocated executors wrt task number
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16280548#comment-16280548 ] Julien Cuquemelle edited comment on SPARK-22683 at 12/6/17 5:32 PM: I don't understand your statement about delaying executor addition ? I want to cap the number of executors in an adaptive way regarding the current number of tasks, not delay their creation. Doing this with dynamicAllocation.maxExecutors requires each job to be tuned for efficiency; when we're doing experiments, a lot of jobs are one shot, so they can't be fine tuned. The proposal gives a way to have an adaptive behaviour for a family of jobs. Regarding slowing the ramp up of executors with schedulerBacklogTimeout, I've made experiments to play with this parameter; I have made 2 series of experiments (7 jobs on each test case, average figures reported in the following table), one on a busy queue, and the other on an idle queue. I'll report only the idle queue, as the figures on the busy queue are even worse for the schedulerBacklogTimeout approach: First row is using the default 1s for the schedulerBacklogTimeout, and uses the 6 tasks per executorSlot I've mentioned above, other rows use the default dynamicAllocation behaviour and only change schedulerBacklogTimeout ||SparkWallTimeSec||Spk-vCores-H||taskPerExeSlot||schedulerBacklogTimeout|| |693.571429|37.142857|6|1.0| |584.857143|69.571429|1|30.0| |763.428571|54.285714|1|60.0| |826.714286|39.571429|1|90.0| So basically I can tune the backlogTimeout to get a similar vCores-H consumption at the expense of almost 20% more wallClockTime, or I can tune the parameter to get about the same wallClockTime at the expense of about 60% more vcoreH consumption (very roughly extrapolated between 30 and 60 secs for schedulerBacklogTimeout). It does not seem to solve the issue I'm trying to address, moreover this would again need to be tuned for each specific job's duration (to find the 90s timeout to get the similar resource consumption, I had to solve the exponential ramp-up with the duration of the already run job, which is not feasible in experimental use cases ). The previous experiments that allowed me to find the sweet spot at 6 tasks per slot has involved job wallClockTimes between 400 and 9000 seconds Another way to have a look at this new parameter I'm proposing is to have a simple way to tune the latency / resource consumption tradeoff. was (Author: jcuquemelle): I don't understand your statement about delaying executor addition ? I want to cap the number of executors in an adaptive way regarding the current number of tasks, not delay their creation. Doing this with dynamicAllocation.maxExecutors requires each job to be tuned for efficiency; when we're doing experiments, a lot of jobs are one shot, so they can't be fine tuned. The proposal gives a way to have an adaptive behaviour for a family of jobs. Regarding slowing the ramp up of executors with schedulerBacklogTimeout, I've made experiments to play with this parameter; I have made 2 series of experiments (7 jobs on each test case, average figures reported in the following table), one on a busy queue, and the other on an idle queue. I'll report only the idle queue, as the figures on the busy queue are even worse for the schedulerBacklogTimeout approach: First row is using the default 1s for the schedulerBacklogTimeout, and uses the 6 tasks per executorSlot I've mentioned above, other rows use the default dynamicAllocation behaviour and only change schedulerBacklogTimeout SparkWallTimeSecSpk-vCores-HtaskPerExeSlotschedulerBacklogTimeout 693.57142937.142857 6 1.0 584.85714369.571429 1 30.0 763.42857154.285714 1 60.0 826.71428639.571429 1 90.0 So basically I can tune the backlogTimeout to get a similar vCores-H consumption at the expense of almost 20% more wallClockTime, or I can tune the parameter to get about the same wallClockTime at the expense of about 60% more vcoreH consumption (very roughly extrapolated between 30 and 60 secs for schedulerBacklogTimeout). It does not seem to solve the issue I'm trying to address, moreover this would again need to be tuned for each specific job's duration (to find the 90s timeout to get the similar resource consumption, I had to solve the exponential ramp-up with the duration of the already run job, which is not feasible in experimental use cases ). The previous experiments that allowed me to find the sweet spot at 6 tasks per slot has involved job wallClockTimes between 400 and 9000 seconds Another way to have a look at this new parameter I'm proposing is to have a simple way to tune the latency / resource
[jira] [Commented] (SPARK-22683) Allow tuning the number of dynamically allocated executors wrt task number
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16280548#comment-16280548 ] Julien Cuquemelle commented on SPARK-22683: --- I don't understand your statement about delaying executor addition ? I want to cap the number of executors in an adaptive way regarding the current number of tasks, not delay their creation. Doing this with dynamicAllocation.maxExecutors requires each job to be tuned for efficiency; when we're doing experiments, a lot of jobs are one shot, so they can't be fine tuned. The proposal gives a way to have an adaptive behaviour for a family of jobs. Regarding slowing the ramp up of executors with schedulerBacklogTimeout, I've made experiments to play with this parameter; I have made 2 series of experiments (7 jobs on each test case, average figures reported in the following table), one on a busy queue, and the other on an idle queue. I'll report only the idle queue, as the figures on the busy queue are even worse for the schedulerBacklogTimeout approach: First row is using the default 1s for the schedulerBacklogTimeout, and uses the 6 tasks per executorSlot I've mentioned above, other rows use the default dynamicAllocation behaviour and only change schedulerBacklogTimeout SparkWallTimeSecSpk-vCores-HtaskPerExeSlotschedulerBacklogTimeout 693.57142937.142857 6 1.0 584.85714369.571429 1 30.0 763.42857154.285714 1 60.0 826.71428639.571429 1 90.0 So basically I can tune the backlogTimeout to get a similar vCores-H consumption at the expense of almost 20% more wallClockTime, or I can tune the parameter to get about the same wallClockTime at the expense of about 60% more vcoreH consumption (very roughly extrapolated between 30 and 60 secs for schedulerBacklogTimeout). It does not seem to solve the issue I'm trying to address, moreover this would again need to be tuned for each specific job's duration (to find the 90s timeout to get the similar resource consumption, I had to solve the exponential ramp-up with the duration of the already run job, which is not feasible in experimental use cases ). The previous experiments that allowed me to find the sweet spot at 6 tasks per slot has involved job wallClockTimes between 400 and 9000 seconds Another way to have a look at this new parameter I'm proposing is to have a simple way to tune the latency / resource consumption tradeoff. > Allow tuning the number of dynamically allocated executors wrt task number > -- > > Key: SPARK-22683 > URL: https://issues.apache.org/jira/browse/SPARK-22683 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Julien Cuquemelle > Labels: pull-request-available > > let's say an executor has spark.executor.cores / spark.task.cpus taskSlots > The current dynamic allocation policy allocates enough executors > to have each taskSlot execute a single task, which minimizes latency, > but wastes resources when tasks are small regarding executor allocation > overhead. > By adding the tasksPerExecutorSlot, it is made possible to specify how many > tasks > a single slot should ideally execute to mitigate the overhead of executor > allocation. > PR: https://github.com/apache/spark/pull/19881 -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-22683) Allow tuning the number of dynamically allocated executors wrt task number
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278405#comment-16278405 ] Julien Cuquemelle edited comment on SPARK-22683 at 12/5/17 1:09 PM: Thanks [~srowen] for your quick feedback, let me give you more context / explanation about what I'm trying to achieve here. - I'm not trying to mitigate the overhead of launching small tasks, I'm trying to avoid allocating executors that will not / barely be used. As spark tuning guideline usually aim towards having small tasks, it seems sub-optimal to allocate a large number of executors at application start (or at any stage that would create much more tasks than available executor slots), that end up being discarded without having been used. - I've scoped this parameter to dynamic allocation, as it only impacts the way the executor count ramps up when tasks are backlogged. The default value of the parameter falls back to the current behavior. - I do agree that over-allocated containers will eventually be deallocated, but this resource occupation by idle executors appears to be non trivial on some of our workloads: I've made several experiments on a set of similar jobs working on different data sizes, and running on an idle or busy cluster; The resource usage (vcore-hours allocated as seen by Yarn) of the spark jobs is compared to the same job running in MapReduce; The executor idle timeout is set to 30s; When running with the current setting (1 task per executor slot), our Spark jobs consume in average twice the amount of vcorehours than the MR job When running with 6 tasks per executor slot, our Spark jobs consume in average 30% less vcorehours than the MR jobs, this setting being valid for different workload sizes. was (Author: jcuquemelle): Thanks [~srowen] for your quick feedback, let me give you more context / explanation about what I'm trying to achieve here. - I'm not trying to mitigate the overhead of launching small tasks, I'm trying to avoid allocating executors that will not / barely be used. As spark tuning guideline usually aim towards having small tasks, it seems sub-optimal to allocate a large number of executors at application start (or at any stage that would create much more tasks than available executor slots), that end up being discarded without having been used. - I've scoped this parameter to dynamic allocation, as it only impacts the way the executor count ramps up when tasks are backlogged. The default value of the parameter falls back to the current behavior. - I do agree that over-allocated containers will eventually be deallocated, but this resource occupation by idle executors appears to be non trivial on some of our workloads: I've made several experiments on a set of similar jobs working on different data sizes, and running on an idle or busy cluster; The resource usage (vcore-hours allocated as seen by Yarn) of the spark jobs is compared to the same job running in MapReduce; The executor idle timeout is set to 30s; When running with the current setting (1 task per executor slot), our Spark jobs consume in average twice the amount of vcorehours than the MR job When running with 6 tasks per executor slot, our Spark jobs consume in average 30% less vcorehours than the MR jobs, this setting being valid for different workload sizes. > Allow tuning the number of dynamically allocated executors wrt task number > -- > > Key: SPARK-22683 > URL: https://issues.apache.org/jira/browse/SPARK-22683 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Julien Cuquemelle > Labels: pull-request-available > > let's say an executor has spark.executor.cores / spark.task.cpus taskSlots > The current dynamic allocation policy allocates enough executors > to have each taskSlot execute a single task, which minimizes latency, > but wastes resources when tasks are small regarding executor allocation > overhead. > By adding the tasksPerExecutorSlot, it is made possible to specify how many > tasks > a single slot should ideally execute to mitigate the overhead of executor > allocation. > PR: https://github.com/apache/spark/pull/19881 -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22683) Allow tuning the number of dynamically allocated executors wrt task number
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278405#comment-16278405 ] Julien Cuquemelle commented on SPARK-22683: --- Thanks [~srowen] for your quick feedback, let me give you more context / explanation about what I'm trying to achieve here. - I'm not trying to mitigate the overhead of launching small tasks, I'm trying to avoid allocating executors that will not / barely be used. As spark tuning guideline usually aim towards having small tasks, it seems sub-optimal to allocate a large number of executors at application start (or at any stage that would create much more tasks than available executor slots), that end up being discarded without having been used. - I've scoped this parameter to dynamic allocation, as it only impacts the way the executor count ramps up when tasks are backlogged. The default value of the parameter falls back to the current behavior. - I do agree that over-allocated containers will eventually be deallocated, but this resource occupation by idle executors appears to be non trivial on some of our workloads: I've made several experiments on a set of similar jobs working on different data sizes, and running on an idle or busy cluster; The resource usage (vcore-hours allocated as seen by Yarn) of the spark jobs is compared to the same job running in MapReduce; The executor idle timeout is set to 30s; When running with the current setting (1 task per executor slot), our Spark jobs consume in average twice the amount of vcorehours than the MR job When running with 6 tasks per executor slot, our Spark jobs consume in average 30% less vcorehours than the MR jobs, this setting being valid for different workload sizes. > Allow tuning the number of dynamically allocated executors wrt task number > -- > > Key: SPARK-22683 > URL: https://issues.apache.org/jira/browse/SPARK-22683 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Julien Cuquemelle > Labels: pull-request-available > > let's say an executor has spark.executor.cores / spark.task.cpus taskSlots > The current dynamic allocation policy allocates enough executors > to have each taskSlot execute a single task, which minimizes latency, > but wastes resources when tasks are small regarding executor allocation > overhead. > By adding the tasksPerExecutorSlot, it is made possible to specify how many > tasks > a single slot should ideally execute to mitigate the overhead of executor > allocation. > PR: https://github.com/apache/spark/pull/19881 -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-22683) Allow tuning the number of dynamically allocated executors wrt task number
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16278405#comment-16278405 ] Julien Cuquemelle edited comment on SPARK-22683 at 12/5/17 11:06 AM: - Thanks [~srowen] for your quick feedback, let me give you more context / explanation about what I'm trying to achieve here. - I'm not trying to mitigate the overhead of launching small tasks, I'm trying to avoid allocating executors that will not / barely be used. As spark tuning guideline usually aim towards having small tasks, it seems sub-optimal to allocate a large number of executors at application start (or at any stage that would create much more tasks than available executor slots), that end up being discarded without having been used. - I've scoped this parameter to dynamic allocation, as it only impacts the way the executor count ramps up when tasks are backlogged. The default value of the parameter falls back to the current behavior. - I do agree that over-allocated containers will eventually be deallocated, but this resource occupation by idle executors appears to be non trivial on some of our workloads: I've made several experiments on a set of similar jobs working on different data sizes, and running on an idle or busy cluster; The resource usage (vcore-hours allocated as seen by Yarn) of the spark jobs is compared to the same job running in MapReduce; The executor idle timeout is set to 30s; When running with the current setting (1 task per executor slot), our Spark jobs consume in average twice the amount of vcorehours than the MR job When running with 6 tasks per executor slot, our Spark jobs consume in average 30% less vcorehours than the MR jobs, this setting being valid for different workload sizes. was (Author: jcuquemelle): Thanks [~srowen] for your quick feedback, let me give you more context / explanation about what I'm trying to achieve here. - I'm not trying to mitigate the overhead of launching small tasks, I'm trying to avoid allocating executors that will not / barely be used. As spark tuning guideline usually aim towards having small tasks, it seems sub-optimal to allocate a large number of executors at application start (or at any stage that would create much more tasks than available executor slots), that end up being discarded without having been used. - I've scoped this parameter to dynamic allocation, as it only impacts the way the executor count ramps up when tasks are backlogged. The default value of the parameter falls back to the current behavior. - I do agree that over-allocated containers will eventually be deallocated, but this resource occupation by idle executors appears to be non trivial on some of our workloads: I've made several experiments on a set of similar jobs working on different data sizes, and running on an idle or busy cluster; The resource usage (vcore-hours allocated as seen by Yarn) of the spark jobs is compared to the same job running in MapReduce; The executor idle timeout is set to 30s; When running with the current setting (1 task per executor slot), our Spark jobs consume in average twice the amount of vcorehours than the MR job When running with 6 tasks per executor slot, our Spark jobs consume in average 30% less vcorehours than the MR jobs, this setting being valid for different workload sizes. > Allow tuning the number of dynamically allocated executors wrt task number > -- > > Key: SPARK-22683 > URL: https://issues.apache.org/jira/browse/SPARK-22683 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Julien Cuquemelle > Labels: pull-request-available > > let's say an executor has spark.executor.cores / spark.task.cpus taskSlots > The current dynamic allocation policy allocates enough executors > to have each taskSlot execute a single task, which minimizes latency, > but wastes resources when tasks are small regarding executor allocation > overhead. > By adding the tasksPerExecutorSlot, it is made possible to specify how many > tasks > a single slot should ideally execute to mitigate the overhead of executor > allocation. > PR: https://github.com/apache/spark/pull/19881 -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-22683) Allow tuning the number of dynamically allocated executors wrt task number
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Julien Cuquemelle updated SPARK-22683: -- Labels: pull-request-available (was: ) Description: let's say an executor has spark.executor.cores / spark.task.cpus taskSlots The current dynamic allocation policy allocates enough executors to have each taskSlot execute a single task, which minimizes latency, but wastes resources when tasks are small regarding executor allocation overhead. By adding the tasksPerExecutorSlot, it is made possible to specify how many tasks a single slot should ideally execute to mitigate the overhead of executor allocation. PR: https://github.com/apache/spark/pull/19881 was: let's say an executor has spark.executor.cores / spark.task.cpus taskSlots The current dynamic allocation policy allocates enough executors to have each taskSlot execute a single task, which minimizes latency, but wastes resources when tasks are small regarding executor allocation overhead. By adding the tasksPerExecutorSlot, it is made possible to specify how many tasks a single slot should ideally execute to mitigate the overhead of executor allocation. > Allow tuning the number of dynamically allocated executors wrt task number > -- > > Key: SPARK-22683 > URL: https://issues.apache.org/jira/browse/SPARK-22683 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Julien Cuquemelle > Labels: pull-request-available > > let's say an executor has spark.executor.cores / spark.task.cpus taskSlots > The current dynamic allocation policy allocates enough executors > to have each taskSlot execute a single task, which minimizes latency, > but wastes resources when tasks are small regarding executor allocation > overhead. > By adding the tasksPerExecutorSlot, it is made possible to specify how many > tasks > a single slot should ideally execute to mitigate the overhead of executor > allocation. > PR: https://github.com/apache/spark/pull/19881 -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-22683) Allow tuning the number of dynamically allocated executors wrt task number
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Julien Cuquemelle updated SPARK-22683: -- Priority: Major (was: Minor) > Allow tuning the number of dynamically allocated executors wrt task number > -- > > Key: SPARK-22683 > URL: https://issues.apache.org/jira/browse/SPARK-22683 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Julien Cuquemelle > > let's say an executor has spark.executor.cores / spark.task.cpus taskSlots > The current dynamic allocation policy allocates enough executors > to have each taskSlot execute a single task, which minimizes latency, > but wastes resources when tasks are small regarding executor allocation > overhead. > By adding the tasksPerExecutorSlot, it is made possible to specify how many > tasks > a single slot should ideally execute to mitigate the overhead of executor > allocation. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22683) Allow tuning the number of dynamically allocated executors wrt task number
Julien Cuquemelle created SPARK-22683: - Summary: Allow tuning the number of dynamically allocated executors wrt task number Key: SPARK-22683 URL: https://issues.apache.org/jira/browse/SPARK-22683 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.2.0, 2.1.0 Reporter: Julien Cuquemelle Priority: Minor let's say an executor has spark.executor.cores / spark.task.cpus taskSlots The current dynamic allocation policy allocates enough executors to have each taskSlot execute a single task, which minimizes latency, but wastes resources when tasks are small regarding executor allocation overhead. By adding the tasksPerExecutorSlot, it is made possible to specify how many tasks a single slot should ideally execute to mitigate the overhead of executor allocation. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org