[jira] [Commented] (SPARK-36446) YARN shuffle server restart crashes all dynamic allocation jobs that have deallocated an executor
[ https://issues.apache.org/jira/browse/SPARK-36446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17407714#comment-17407714 ] Adam Kennedy commented on SPARK-36446: -- [~tgraves] Yes, we are running with recovery enabled (in the case where shuffle server connections are secure). But the same problem occurs when the shuffle server is run as an independent process outside of YARN (insecurely) if they crash and restart. > YARN shuffle server restart crashes all dynamic allocation jobs that have > deallocated an executor > - > > Key: SPARK-36446 > URL: https://issues.apache.org/jira/browse/SPARK-36446 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 2.4.8, 3.1.2 >Reporter: Adam Kennedy >Priority: Critical > > When dynamic allocation is enabled, executors that deallocate rely on the > shuffle server to hold blocks and supply them to remaining executors. > When YARN Shuffle Server restarts (either intentionally or due to a crash), > it loses block information and relies on being able to contact Executors (the > locations of which it durably stores) to refetch the list of blocks. > This mutual dependency on the other to hold block information fails fatally > under some common scenarios. > For example, if a Spark application is running under dynamic allocation, some > amount of executors will almost always shut down. > If, after this has occurred, any shuffle server crashes, or is restarted > (either directly when running as a standalone service, or as part of a YARN > node manager restart) then there is no way to restore block data and it is > permanently lost. > Worse, when Executors try to fetch blocks from the shuffle server, the > shuffle server cannot location the exeutor, decides it doesn't exist, treats > it as a fatal exception, and causes the application to terminate and crash. > Thus, in a real world scenario that we observe on a 1000+ node multi-tenant > cluster where dynamic allocation is on by default, a rolling restart of the > YARN node managers will cause ALL jobs that have deallocated any executor and > have shuffles or transferred blocks to the shuffle server in order to shut > down, to crash. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-36446) YARN shuffle server restart crashes all dynamic allocation jobs that have deallocated an executor
[ https://issues.apache.org/jira/browse/SPARK-36446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17394905#comment-17394905 ] Adam Kennedy edited comment on SPARK-36446 at 8/6/21, 5:56 PM: --- The problem was particularly amplified by the Executor deallocation improvements in SPARK-27677 and SPARK-21097 that came about as a result of SPARK-25888. It is now much more likely that Executors will deallocate, even in the face of persist() or cache(). was (Author: adamkennedy77): The problem was particularly amplified by the Executor deallocation improvements in SPARK-27677 that came about as a result of SPARK-25888. It is now much more likely that Executors will deallocate, even in the face of persist() or cache(). > YARN shuffle server restart crashes all dynamic allocation jobs that have > deallocated an executor > - > > Key: SPARK-36446 > URL: https://issues.apache.org/jira/browse/SPARK-36446 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 2.4.8, 3.1.2 >Reporter: Adam Kennedy >Priority: Critical > > When dynamic allocation is enabled, executors that deallocate rely on the > shuffle server to hold blocks and supply them to remaining executors. > When YARN Shuffle Server restarts (either intentionally or due to a crash), > it loses block information and relies on being able to contact Executors (the > locations of which it durably stores) to refetch the list of blocks. > This mutual dependency on the other to hold block information fails fatally > under some common scenarios. > For example, if a Spark application is running under dynamic allocation, some > amount of executors will almost always shut down. > If, after this has occurred, any shuffle server crashes, or is restarted > (either directly when running as a standalone service, or as part of a YARN > node manager restart) then there is no way to restore block data and it is > permanently lost. > Worse, when Executors try to fetch blocks from the shuffle server, the > shuffle server cannot location the exeutor, decides it doesn't exist, treats > it as a fatal exception, and causes the application to terminate and crash. > Thus, in a real world scenario that we observe on a 1000+ node multi-tenant > cluster where dynamic allocation is on by default, a rolling restart of the > YARN node managers will cause ALL jobs that have deallocated any executor and > have shuffles or transferred blocks to the shuffle server in order to shut > down, to crash. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-36446) YARN shuffle server restart crashes all dynamic allocation jobs that have deallocated an executor
[ https://issues.apache.org/jira/browse/SPARK-36446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17394905#comment-17394905 ] Adam Kennedy edited comment on SPARK-36446 at 8/6/21, 5:06 PM: --- The problem was particularly amplified by the Executor deallocation improvements in SPARK-27677 that came about as a result of SPARK-25888. It is now much more likely that Executors will deallocate, even in the face of persist() or cache(). was (Author: adamkennedy77): The problem was particularly amplified by the Executor deallocation improvements in SPARK-27677 that came about as a result of SPARK-25888. > YARN shuffle server restart crashes all dynamic allocation jobs that have > deallocated an executor > - > > Key: SPARK-36446 > URL: https://issues.apache.org/jira/browse/SPARK-36446 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 2.4.8, 3.1.2 >Reporter: Adam Kennedy >Priority: Critical > > When dynamic allocation is enabled, executors that deallocate rely on the > shuffle server to hold blocks and supply them to remaining executors. > When YARN Shuffle Server restarts (either intentionally or due to a crash), > it loses block information and relies on being able to contact Executors (the > locations of which it durably stores) to refetch the list of blocks. > This mutual dependency on the other to hold block information fails fatally > under some common scenarios. > For example, if a Spark application is running under dynamic allocation, some > amount of executors will almost always shut down. > If, after this has occurred, any shuffle server crashes, or is restarted > (either directly when running as a standalone service, or as part of a YARN > node manager restart) then there is no way to restore block data and it is > permanently lost. > Worse, when Executors try to fetch blocks from the shuffle server, the > shuffle server cannot location the exeutor, decides it doesn't exist, treats > it as a fatal exception, and causes the application to terminate and crash. > Thus, in a real world scenario that we observe on a 1000+ node multi-tenant > cluster where dynamic allocation is on by default, a rolling restart of the > YARN node managers will cause ALL jobs that have deallocated any executor and > have shuffles or transferred blocks to the shuffle server in order to shut > down, to crash. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-36446) YARN shuffle server restart crashes all dynamic allocation jobs that have deallocated an executor
[ https://issues.apache.org/jira/browse/SPARK-36446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17394905#comment-17394905 ] Adam Kennedy edited comment on SPARK-36446 at 8/6/21, 5:06 PM: --- The problem was particularly amplified by the Executor deallocation improvements in SPARK-27677 that came about as a result of SPARK-25888. was (Author: adamkennedy77): The problem was particularly amplified by the Executor deallocation improvements in SPARK-27677 that came about as a result of SPARK 25888. > YARN shuffle server restart crashes all dynamic allocation jobs that have > deallocated an executor > - > > Key: SPARK-36446 > URL: https://issues.apache.org/jira/browse/SPARK-36446 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 2.4.8, 3.1.2 >Reporter: Adam Kennedy >Priority: Critical > > When dynamic allocation is enabled, executors that deallocate rely on the > shuffle server to hold blocks and supply them to remaining executors. > When YARN Shuffle Server restarts (either intentionally or due to a crash), > it loses block information and relies on being able to contact Executors (the > locations of which it durably stores) to refetch the list of blocks. > This mutual dependency on the other to hold block information fails fatally > under some common scenarios. > For example, if a Spark application is running under dynamic allocation, some > amount of executors will almost always shut down. > If, after this has occurred, any shuffle server crashes, or is restarted > (either directly when running as a standalone service, or as part of a YARN > node manager restart) then there is no way to restore block data and it is > permanently lost. > Worse, when Executors try to fetch blocks from the shuffle server, the > shuffle server cannot location the exeutor, decides it doesn't exist, treats > it as a fatal exception, and causes the application to terminate and crash. > Thus, in a real world scenario that we observe on a 1000+ node multi-tenant > cluster where dynamic allocation is on by default, a rolling restart of the > YARN node managers will cause ALL jobs that have deallocated any executor and > have shuffles or transferred blocks to the shuffle server in order to shut > down, to crash. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36446) YARN shuffle server restart crashes all dynamic allocation jobs that have deallocated an executor
[ https://issues.apache.org/jira/browse/SPARK-36446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17394905#comment-17394905 ] Adam Kennedy commented on SPARK-36446: -- The problem was particularly amplified by the Executor deallocation improvements in SPARK-27677 that came about as a result of SPARK 25888. > YARN shuffle server restart crashes all dynamic allocation jobs that have > deallocated an executor > - > > Key: SPARK-36446 > URL: https://issues.apache.org/jira/browse/SPARK-36446 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 2.4.8, 3.1.2 >Reporter: Adam Kennedy >Priority: Critical > > When dynamic allocation is enabled, executors that deallocate rely on the > shuffle server to hold blocks and supply them to remaining executors. > When YARN Shuffle Server restarts (either intentionally or due to a crash), > it loses block information and relies on being able to contact Executors (the > locations of which it durably stores) to refetch the list of blocks. > This mutual dependency on the other to hold block information fails fatally > under some common scenarios. > For example, if a Spark application is running under dynamic allocation, some > amount of executors will almost always shut down. > If, after this has occurred, any shuffle server crashes, or is restarted > (either directly when running as a standalone service, or as part of a YARN > node manager restart) then there is no way to restore block data and it is > permanently lost. > Worse, when Executors try to fetch blocks from the shuffle server, the > shuffle server cannot location the exeutor, decides it doesn't exist, treats > it as a fatal exception, and causes the application to terminate and crash. > Thus, in a real world scenario that we observe on a 1000+ node multi-tenant > cluster where dynamic allocation is on by default, a rolling restart of the > YARN node managers will cause ALL jobs that have deallocated any executor and > have shuffles or transferred blocks to the shuffle server in order to shut > down, to crash. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36446) YARN shuffle server restart crashes all dynamic allocation jobs that have deallocated an executor
[ https://issues.apache.org/jira/browse/SPARK-36446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17394873#comment-17394873 ] Adam Kennedy commented on SPARK-36446: -- Note: While I haven't investigated any other shuffle servers than the YARN shuffle server, this may impact other shuffle server implementations and they should be checked for the same problem. > YARN shuffle server restart crashes all dynamic allocation jobs that have > deallocated an executor > - > > Key: SPARK-36446 > URL: https://issues.apache.org/jira/browse/SPARK-36446 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 2.4.8, 3.1.2 >Reporter: Adam Kennedy >Priority: Critical > > When dynamic allocation is enabled, executors that deallocate rely on the > shuffle server to hold blocks and supply them to remaining executors. > When YARN Shuffle Server restarts (either intentionally or due to a crash), > it loses block information and relies on being able to contact Executors (the > locations of which it durably stores) to refetch the list of blocks. > This mutual dependency on the other to hold block information fails fatally > under some common scenarios. > For example, if a Spark application is running under dynamic allocation, some > amount of executors will almost always shut down. > If, after this has occurred, any shuffle server crashes, or is restarted > (either directly when running as a standalone service, or as part of a YARN > node manager restart) then there is no way to restore block data and it is > permanently lost. > Worse, when Executors try to fetch blocks from the shuffle server, the > shuffle server cannot location the exeutor, decides it doesn't exist, treats > it as a fatal exception, and causes the application to terminate and crash. > Thus, in a real world scenario that we observe on a 1000+ node multi-tenant > cluster where dynamic allocation is on by default, a rolling restart of the > YARN node managers will cause ALL jobs that have deallocated any executor and > have shuffles or transferred blocks to the shuffle server in order to shut > down, to crash. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-36446) YARN shuffle server restart crashes all dynamic allocation jobs that have deallocated an executor
[ https://issues.apache.org/jira/browse/SPARK-36446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Adam Kennedy updated SPARK-36446: - Summary: YARN shuffle server restart crashes all dynamic allocation jobs that have deallocated an executor (was: YARN shuffle server restart crashes all dynamic allocation jobs that have deallocated) > YARN shuffle server restart crashes all dynamic allocation jobs that have > deallocated an executor > - > > Key: SPARK-36446 > URL: https://issues.apache.org/jira/browse/SPARK-36446 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 2.4.8, 3.1.2 >Reporter: Adam Kennedy >Priority: Critical > > When dynamic allocation is enabled, executors that deallocate rely on the > shuffle server to hold blocks and supply them to remaining executors. > When YARN Shuffle Server restarts (either intentionally or due to a crash), > it loses block information and relies on being able to contact Executors (the > locations of which it durably stores) to refetch the list of blocks. > This mutual dependency on the other to hold block information fails fatally > under some common scenarios. > For example, if a Spark application is running under dynamic allocation, some > amount of executors will almost always shut down. > If, after this has occurred, any shuffle server crashes, or is restarted > (either directly when running as a standalone service, or as part of a YARN > node manager restart) then there is no way to restore block data and it is > permanently lost. > Worse, when Executors try to fetch blocks from the shuffle server, the > shuffle server cannot location the exeutor, decides it doesn't exist, treats > it as a fatal exception, and causes the application to terminate and crash. > Thus, in a real world scenario that we observe on a 1000+ node multi-tenant > cluster where dynamic allocation is on by default, a rolling restart of the > YARN node managers will cause ALL jobs that have deallocated any executor and > have shuffles or transferred blocks to the shuffle server in order to shut > down, to crash. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-36446) YARN shuffle server restart crashes all dynamic allocation jobs that have deallocated
[ https://issues.apache.org/jira/browse/SPARK-36446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Adam Kennedy updated SPARK-36446: - Summary: YARN shuffle server restart crashes all dynamic allocation jobs that have deallocated (was: After dynamic deallocation YARN shuffle server restart crashes all jobs) > YARN shuffle server restart crashes all dynamic allocation jobs that have > deallocated > - > > Key: SPARK-36446 > URL: https://issues.apache.org/jira/browse/SPARK-36446 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 2.4.8, 3.1.2 >Reporter: Adam Kennedy >Priority: Critical > > When dynamic allocation is enabled, executors that deallocate rely on the > shuffle server to hold blocks and supply them to remaining executors. > When YARN Shuffle Server restarts (either intentionally or due to a crash), > it loses block information and relies on being able to contact Executors (the > locations of which it durably stores) to refetch the list of blocks. > This mutual dependency on the other to hold block information fails fatally > under some common scenarios. > For example, if a Spark application is running under dynamic allocation, some > amount of executors will almost always shut down. > If, after this has occurred, any shuffle server crashes, or is restarted > (either directly when running as a standalone service, or as part of a YARN > node manager restart) then there is no way to restore block data and it is > permanently lost. > Worse, when Executors try to fetch blocks from the shuffle server, the > shuffle server cannot location the exeutor, decides it doesn't exist, treats > it as a fatal exception, and causes the application to terminate and crash. > Thus, in a real world scenario that we observe on a 1000+ node multi-tenant > cluster where dynamic allocation is on by default, a rolling restart of the > YARN node managers will cause ALL jobs that have deallocated any executor and > have shuffles or transferred blocks to the shuffle server in order to shut > down, to crash. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-36446) After dynamic deallocation YARN shuffle server restart crashes all jobs
Adam Kennedy created SPARK-36446: Summary: After dynamic deallocation YARN shuffle server restart crashes all jobs Key: SPARK-36446 URL: https://issues.apache.org/jira/browse/SPARK-36446 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 3.1.2, 2.4.8 Reporter: Adam Kennedy When dynamic allocation is enabled, executors that deallocate rely on the shuffle server to hold blocks and supply them to remaining executors. When YARN Shuffle Server restarts (either intentionally or due to a crash), it loses block information and relies on being able to contact Executors (the locations of which it durably stores) to refetch the list of blocks. This mutual dependency on the other to hold block information fails fatally under some common scenarios. For example, if a Spark application is running under dynamic allocation, some amount of executors will almost always shut down. If, after this has occurred, any shuffle server crashes, or is restarted (either directly when running as a standalone service, or as part of a YARN node manager restart) then there is no way to restore block data and it is permanently lost. Worse, when Executors try to fetch blocks from the shuffle server, the shuffle server cannot location the exeutor, decides it doesn't exist, treats it as a fatal exception, and causes the application to terminate and crash. Thus, in a real world scenario that we observe on a 1000+ node multi-tenant cluster where dynamic allocation is on by default, a rolling restart of the YARN node managers will cause ALL jobs that have deallocated any executor and have shuffles or transferred blocks to the shuffle server in order to shut down, to crash. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21097) Dynamic allocation will preserve cached data
[ https://issues.apache.org/jira/browse/SPARK-21097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16909342#comment-16909342 ] Adam Kennedy commented on SPARK-21097: -- Another supporting reason for supporting transfer of memory cache blocks in general... in an application where executors are spinning up and down all the time, we are likely to see situations in which the memory cache blocks are heavily skewed towards a small percentage of the total executors. If we had support for replicating cache blocks around, we could also look at a more general cache balancer, which could redistribute the cache blocks over time (perhaps idle periods or using long tail resources when no new tasks exist to distribute) to make subsequent stages or jobs more likely to get all tasks distributed evenly. > Dynamic allocation will preserve cached data > > > Key: SPARK-21097 > URL: https://issues.apache.org/jira/browse/SPARK-21097 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Scheduler, Spark Core >Affects Versions: 2.2.0, 2.3.0 >Reporter: Brad >Priority: Major > Attachments: Preserving Cached Data with Dynamic Allocation.pdf > > > We want to use dynamic allocation to distribute resources among many notebook > users on our spark clusters. One difficulty is that if a user has cached data > then we are either prevented from de-allocating any of their executors, or we > are forced to drop their cached data, which can lead to a bad user experience. > We propose adding a feature to preserve cached data by copying it to other > executors before de-allocation. This behavior would be enabled by a simple > spark config. Now when an executor reaches its configured idle timeout, > instead of just killing it on the spot, we will stop sending it new tasks, > replicate all of its rdd blocks onto other executors, and then kill it. If > there is an issue while we replicate the data, like an error, it takes too > long, or there isn't enough space, then we will fall back to the original > behavior and drop the data and kill the executor. > This feature should allow anyone with notebook users to use their cluster > resources more efficiently. Also since it will be completely opt-in it will > unlikely to cause problems for other use cases. -- This message was sent by Atlassian JIRA (v7.6.14#76016) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25889) Dynamic allocation load-aware ramp up
[ https://issues.apache.org/jira/browse/SPARK-25889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Adam Kennedy updated SPARK-25889: - Description: The time based exponential ramp up behavior for dynamic allocation is naive and destructive, making it very difficult to run very large jobs. On a large (64,000 core) YARN cluster with a high number of input partitions (200,000+) the default dynamic allocation approach of requesting containers in waves, doubling exponentially once per second, results in 50% of the entire cluster being requested in the final 1 second wave. This can easily overwhelm RPC processing, or cause expensive Executor startup steps to break systems. With the interval so short, many additional containers may be requested beyond what is actually needed and then complete very little work before sitting around waiting to be deallocated. Delaying the time between these fixed doublings only has limited impact. Setting double intervals to once per minute would result in a very slow ramp up speed, at the end of which we still face large potentially crippling waves of executor startup. An alternative approach to spooling up large job appears to be needed, which is still relatively simple but could be more adaptable to different cluster sizes and differing cluster and job performance. I would like to propose a few different approaches based around the general idea of controlling outstanding requests for new containers based on the number of executors that are currently running, for some definition of "running". One example might be to limit requests to one new executor for every existing executor that currently has an active task. Or some ratio of that, to allow for more or less aggressive spool up. A lower number would let us approximate something like fibonacci ramp up, a higher number of say 2x would spool up quickly, but still aligned with the rate at which broadcast blocks can be easily distributed to new members. An alternative approach might be to limit the escalation rate of new executor requests based on the number of outstanding executors requested which have not yet fully completed startup and are not available for tasks. To protect against a potentially suboptimal very early ramp, a minimum concurrent executor startup threshold might allow an initial burst of say 10 executors, after which the more gradual ramp math would apply. was: The time based exponential ramp up behavior for dynamic allocation is naive and destructive, making it very difficult to run very large jobs. On a large (64,000 core) YARN cluster with a high number of input partitions (200,000+) the default dynamic allocation approach of requesting containers in waves, doubling exponentially once per second, results in 50% of the entire cluster being requested in the final 1 second wave. This can easily overwhelm RPC processing, or cause expensive Executor startup steps to break systems. With the interval so short, many additional containers may be requested beyond what is actually needed and then complete very little work before sitting around waiting to be deallocated. Delaying the time between these fixed doublings only has limited impact. Setting double intervals to once per minute would result in a very slow ramp up speed, at the end of which we still face large potentially crippling waves of executor startup. An alternative approach to spooling up large job appears to be needed, which is still relatively simple but could be more adaptable to different cluster sizes and differing cluster and job performance. I would like to propose a few different approaches based around the general idea of controlling outstanding requests for new containers based on the number of executors that are currently running, for some definition of "running". One example might be to limit requests to one new executor for every existing executor that currently has an active task. Or some ratio of that, to allow for more or less aggressive spool up. A lower number would let us approximate something like fibonacci ramp up, a higher number of say 2x would spool up quickly, but still aligned with the rate at which broadcast blocks can be easily distributed to new members. > Dynamic allocation load-aware ramp up > - > > Key: SPARK-25889 > URL: https://issues.apache.org/jira/browse/SPARK-25889 > Project: Spark > Issue Type: New Feature > Components: Scheduler, YARN >Affects Versions: 2.3.2 >Reporter: Adam Kennedy >Priority: Major > > The time based exponential ramp up behavior for dynamic allocation is naive > and destructive, making it very difficult to run very large jobs. > On a large (64,000 core) YARN cluster with a high number of input partitions > (200,000+) the default dynamic
[jira] [Updated] (SPARK-25888) Service requests for persist() blocks via external service after dynamic deallocation
[ https://issues.apache.org/jira/browse/SPARK-25888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Adam Kennedy updated SPARK-25888: - Description: Large and highly multi-tenant Spark on YARN clusters with diverse job execution often display terrible utilization rates (we have observed as low as 3-7% CPU at max container allocation, but 50% CPU utilization on even a well policed cluster is not uncommon). As a sizing example, consider a scenario with 1,000 nodes, 50,000 cores, 250 users and 50,000 runs of 1,000 distinct applications per week, with predominantly Spark including a mixture of ETL, Ad Hoc tasks and PySpark Notebook jobs (no streaming) Utilization problems appear to be due in large part to difficulties with persist() blocks (DISK or DISK+MEMORY) preventing dynamic deallocation. In situations where an external shuffle service is present (which is typical on clusters of this type) we already solve this for the shuffle block case by offloading the IO handling of shuffle blocks to the external service, allowing dynamic deallocation to proceed. Allowing Executors to transfer persist() blocks to some external "shuffle" service in a similar manner would be an enormous win for Spark multi-tenancy as it would limit deallocation blocking scenarios to only MEMORY-only cache() scenarios. I'm not sure if I'm correct, but I seem to recall seeing in the original external shuffle service commits that may have been considered at the time but getting shuffle blocks moved to the external shuffle service was the first priority. With support for external persist() DISK blocks in place, we could also then handle deallocation of DISK+MEMORY, as the memory instance could first be dropped, changing the block to DISK only, and then further transferred to the shuffle service. We have tried to resolve the persist() issue via extensive user training, but that has typically only allowed us to improve utilization of the worst offenders (10% utilization) up to around 40-60% utilization, as the need for persist() is often legitimate and occurs during the middle stages of a job. In a healthy multi-tenant scenario, a large job might spool up to say 10,000 cores, persist() data, release executors across a long tail down to 100 cores, and then spool back up to 10,000 cores for the following stage without impact on the persist() data. In an ideal world, if an new executor started up on a node on which blocks had been transferred to the shuffle service, the new executor might even be able to "recapture" control of those blocks (if that would help with performance in some way). And the behavior of gradually expanding up and down several times over the course of a job would not just improve utilization, but would allow resources to more easily be redistributed to other jobs which start on the cluster during the long-tail periods, which would improve multi-tenancy and bring us closer to optimal "envy free" YARN scheduling. was: Large and highly multi-tenant Spark on YARN clusters with diverse job execution often display terrible utilization rates (we have observed as low as 3-7% CPU at max container allocation, but 50% CPU utilization on even a well policed cluster is not uncommon). As a sizing example, one of our larger instances has 1,000 nodes, 50,000 cores, 250 users and 1,000 distinct applications run per week, with predominantly Spark including a mixture of ETL, Ad Hoc tasks and PySpark Notebook jobs (no streaming) Utilization problems appear to be due in large part to difficulties with persist() blocks (DISK or DISK+MEMORY) preventing dynamic deallocation. In situations where an external shuffle service is present (which is typical on clusters of this type) we already solve this for the shuffle block case by offloading the IO handling of shuffle blocks to the external service, allowing dynamic deallocation to proceed. Allowing Executors to transfer persist() blocks to some external "shuffle" service in a similar manner would be an enormous win for Spark multi-tenancy as it would limit deallocation blocking scenarios to only MEMORY-only cache() scenarios. I'm not sure if I'm correct, but I seem to recall seeing in the original external shuffle service commits that may have been considered at the time but getting shuffle blocks moved to the external shuffle service was the first priority. With support for external persist() DISK blocks in place, we could also then handle deallocation of DISK+MEMORY, as the memory instance could first be dropped, changing the block to DISK only, and then further transferred to the shuffle service. We have tried to resolve the persist() issue via extensive user training, but that has typically only allowed us to improve utilization of the worst offenders (10% utilization) up to around 40-60% utilization, as the need for persist() is often legitimate and occurs
[jira] [Updated] (SPARK-25888) Service requests for persist() blocks via external service after dynamic deallocation
[ https://issues.apache.org/jira/browse/SPARK-25888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Adam Kennedy updated SPARK-25888: - Issue Type: New Feature (was: Improvement) > Service requests for persist() blocks via external service after dynamic > deallocation > - > > Key: SPARK-25888 > URL: https://issues.apache.org/jira/browse/SPARK-25888 > Project: Spark > Issue Type: New Feature > Components: Block Manager, Shuffle, YARN >Affects Versions: 2.3.2 >Reporter: Adam Kennedy >Priority: Major > > Large and highly multi-tenant Spark on YARN clusters with diverse job > execution often display terrible utilization rates (we have observed as low > as 3-7% CPU at max container allocation, but 50% CPU utilization on even a > well policed cluster is not uncommon). > As a sizing example, one of our larger instances has 1,000 nodes, 50,000 > cores, 250 users and 1,000 distinct applications run per week, with > predominantly Spark including a mixture of ETL, Ad Hoc tasks and PySpark > Notebook jobs (no streaming) > Utilization problems appear to be due in large part to difficulties with > persist() blocks (DISK or DISK+MEMORY) preventing dynamic deallocation. > In situations where an external shuffle service is present (which is typical > on clusters of this type) we already solve this for the shuffle block case by > offloading the IO handling of shuffle blocks to the external service, > allowing dynamic deallocation to proceed. > Allowing Executors to transfer persist() blocks to some external "shuffle" > service in a similar manner would be an enormous win for Spark multi-tenancy > as it would limit deallocation blocking scenarios to only MEMORY-only cache() > scenarios. > I'm not sure if I'm correct, but I seem to recall seeing in the original > external shuffle service commits that may have been considered at the time > but getting shuffle blocks moved to the external shuffle service was the > first priority. > With support for external persist() DISK blocks in place, we could also then > handle deallocation of DISK+MEMORY, as the memory instance could first be > dropped, changing the block to DISK only, and then further transferred to the > shuffle service. > We have tried to resolve the persist() issue via extensive user training, but > that has typically only allowed us to improve utilization of the worst > offenders (10% utilization) up to around 40-60% utilization, as the need for > persist() is often legitimate and occurs during the middle stages of a job. > In a healthy multi-tenant scenario, a large job might spool up to say 10,000 > cores, persist() data, release executors across a long tail down to 100 > cores, and then spool back up to 10,000 cores for the following stage without > impact on the persist() data. > In an ideal world, if an new executor started up on a node on which blocks > had been transferred to the shuffle service, the new executor might even be > able to "recapture" control of those blocks (if that would help with > performance in some way). > And the behavior of gradually expanding up and down several times over the > course of a job would not just improve utilization, but would allow resources > to more easily be redistributed to other jobs which start on the cluster > during the long-tail periods, which would improve multi-tenancy and bring us > closer to optimal "envy free" YARN scheduling. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25888) Service requests for persist() blocks via external service after dynamic deallocation
[ https://issues.apache.org/jira/browse/SPARK-25888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Adam Kennedy updated SPARK-25888: - Shepherd: DB Tsai > Service requests for persist() blocks via external service after dynamic > deallocation > - > > Key: SPARK-25888 > URL: https://issues.apache.org/jira/browse/SPARK-25888 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Shuffle, YARN >Affects Versions: 2.3.2 >Reporter: Adam Kennedy >Priority: Major > > Large and highly multi-tenant Spark on YARN clusters with diverse job > execution often display terrible utilization rates (we have observed as low > as 3-7% CPU at max container allocation, but 50% CPU utilization on even a > well policed cluster is not uncommon). > As a sizing example, one of our larger instances has 1,000 nodes, 50,000 > cores, 250 users and 1,000 distinct applications run per week, with > predominantly Spark including a mixture of ETL, Ad Hoc tasks and PySpark > Notebook jobs (no streaming) > Utilization problems appear to be due in large part to difficulties with > persist() blocks (DISK or DISK+MEMORY) preventing dynamic deallocation. > In situations where an external shuffle service is present (which is typical > on clusters of this type) we already solve this for the shuffle block case by > offloading the IO handling of shuffle blocks to the external service, > allowing dynamic deallocation to proceed. > Allowing Executors to transfer persist() blocks to some external "shuffle" > service in a similar manner would be an enormous win for Spark multi-tenancy > as it would limit deallocation blocking scenarios to only MEMORY-only cache() > scenarios. > I'm not sure if I'm correct, but I seem to recall seeing in the original > external shuffle service commits that may have been considered at the time > but getting shuffle blocks moved to the external shuffle service was the > first priority. > With support for external persist() DISK blocks in place, we could also then > handle deallocation of DISK+MEMORY, as the memory instance could first be > dropped, changing the block to DISK only, and then further transferred to the > shuffle service. > We have tried to resolve the persist() issue via extensive user training, but > that has typically only allowed us to improve utilization of the worst > offenders (10% utilization) up to around 40-60% utilization, as the need for > persist() is often legitimate and occurs during the middle stages of a job. > In a healthy multi-tenant scenario, a large job might spool up to say 10,000 > cores, persist() data, release executors across a long tail down to 100 > cores, and then spool back up to 10,000 cores for the following stage without > impact on the persist() data. > In an ideal world, if an new executor started up on a node on which blocks > had been transferred to the shuffle service, the new executor might even be > able to "recapture" control of those blocks (if that would help with > performance in some way). > And the behavior of gradually expanding up and down several times over the > course of a job would not just improve utilization, but would allow resources > to more easily be redistributed to other jobs which start on the cluster > during the long-tail periods, which would improve multi-tenancy and bring us > closer to optimal "envy free" YARN scheduling. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25889) Dynamic allocation load-aware ramp up
Adam Kennedy created SPARK-25889: Summary: Dynamic allocation load-aware ramp up Key: SPARK-25889 URL: https://issues.apache.org/jira/browse/SPARK-25889 Project: Spark Issue Type: New Feature Components: Scheduler, YARN Affects Versions: 2.3.2 Reporter: Adam Kennedy The time based exponential ramp up behavior for dynamic allocation is naive and destructive, making it very difficult to run very large jobs. On a large (64,000 core) YARN cluster with a high number of input partitions (200,000+) the default dynamic allocation approach of requesting containers in waves, doubling exponentially once per second, results in 50% of the entire cluster being requested in the final 1 second wave. This can easily overwhelm RPC processing, or cause expensive Executor startup steps to break systems. With the interval so short, many additional containers may be requested beyond what is actually needed and then complete very little work before sitting around waiting to be deallocated. Delaying the time between these fixed doublings only has limited impact. Setting double intervals to once per minute would result in a very slow ramp up speed, at the end of which we still face large potentially crippling waves of executor startup. An alternative approach to spooling up large job appears to be needed, which is still relatively simple but could be more adaptable to different cluster sizes and differing cluster and job performance. I would like to propose a few different approaches based around the general idea of controlling outstanding requests for new containers based on the number of executors that are currently running, for some definition of "running". One example might be to limit requests to one new executor for every existing executor that currently has an active task. Or some ratio of that, to allow for more or less aggressive spool up. A lower number would let us approximate something like fibonacci ramp up, a higher number of say 2x would spool up quickly, but still aligned with the rate at which broadcast blocks can be easily distributed to new members. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25888) Service requests for persist() blocks via external service after dynamic deallocation
[ https://issues.apache.org/jira/browse/SPARK-25888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Adam Kennedy updated SPARK-25888: - Description: Large and highly multi-tenant Spark on YARN clusters with diverse job execution often display terrible utilization rates (we have observed as low as 3-7% CPU at max container allocation, but 50% CPU utilization on even a well policed cluster is not uncommon). As a sizing example, one of our larger instances has 1,000 nodes, 50,000 cores, 250 users and 1,000 distinct applications run per week, with predominantly Spark including a mixture of ETL, Ad Hoc tasks and PySpark Notebook jobs (no streaming) Utilization problems appear to be due in large part to difficulties with persist() blocks (DISK or DISK+MEMORY) preventing dynamic deallocation. In situations where an external shuffle service is present (which is typical on clusters of this type) we already solve this for the shuffle block case by offloading the IO handling of shuffle blocks to the external service, allowing dynamic deallocation to proceed. Allowing Executors to transfer persist() blocks to some external "shuffle" service in a similar manner would be an enormous win for Spark multi-tenancy as it would limit deallocation blocking scenarios to only MEMORY-only cache() scenarios. I'm not sure if I'm correct, but I seem to recall seeing in the original external shuffle service commits that may have been considered at the time but getting shuffle blocks moved to the external shuffle service was the first priority. With support for external persist() DISK blocks in place, we could also then handle deallocation of DISK+MEMORY, as the memory instance could first be dropped, changing the block to DISK only, and then further transferred to the shuffle service. We have tried to resolve the persist() issue via extensive user training, but that has typically only allowed us to improve utilization of the worst offenders (10% utilization) up to around 40-60% utilization, as the need for persist() is often legitimate and occurs during the middle stages of a job. In a healthy multi-tenant scenario, a large job might spool up to say 10,000 cores, persist() data, release executors across a long tail down to 100 cores, and then spool back up to 10,000 cores for the following stage without impact on the persist() data. In an ideal world, if an new executor started up on a node on which blocks had been transferred to the shuffle service, the new executor might even be able to "recapture" control of those blocks (if that would help with performance in some way). And the behavior of gradually expanding up and down several times over the course of a job would not just improve utilization, but would allow resources to more easily be redistributed to other jobs which start on the cluster during the long-tail periods, which would improve multi-tenancy and bring us closer to optimal "envy free" YARN scheduling. was: Large and highly multi-tenant Spark on YARN clusters with diverse job execution often display terrible utilization rates (we have observed as low as 3-7% CPU at max container allocation, but 50% CPU utilization on even a well policed cluster is not uncommon). As a sizing example, one of our larger instances has 1,000 nodes, 50,000 cores, 250 users and 1,000 distinct applications run per week, with predominantly Spark including a mixture of ETL, Ad Hoc tasks and PySpark Notebook jobs (no streaming) Utilization problems appear to be due in large part to difficulties with persist() blocks (DISK or DISK+MEMORY) preventing dynamic deallocation. In situations where an external shuffle service is present (which is typical on clusters of this type) we already solve this for the shuffle block case by offloading the IO handling of shuffle blocks to the external service, allowing dynamic deallocation to proceed. Allowing Executors to transfer persist() blocks to some external "shuffle" service in a similar manner would be an enormous win for Spark multi-tenancy as it would limit deallocation blocking scenarios to only MEMORY-only cache() scenarios. I'm not sure if I'm correct, but I seem to recall seeing in the original external shuffle service commits that may have been considered at the time but getting shuffle blocks moved to the external shuffle service was the first priority. With support for external persist() DISK blocks in place, we could also then handle deallocation of DISK+MEMORY, as the memory instance could first be dropped, changing the block to DISK only, and then further transferred to the shuffle service. We have tried to resolve the persist() issue via extensive user training, but that has typically only allowed us to improve utilization of the worst offenders (10% utilization) up to around 40-60% utilization, as the need for persist() is often legitimate and occurs
[jira] [Updated] (SPARK-25888) Service requests for persist() blocks via external service after dynamic deallocation
[ https://issues.apache.org/jira/browse/SPARK-25888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Adam Kennedy updated SPARK-25888: - Description: Large and highly multi-tenant Spark on YARN clusters with diverse job execution often display terrible utilization rates (we have observed as low as 3-7% CPU at max container allocation, but 50% CPU utilization on even a well policed cluster is not uncommon). As a sizing example, one of our larger instances has 1,000 nodes, 50,000 cores, 250 users and 1,000 distinct applications run per week, with predominantly Spark including a mixture of ETL, Ad Hoc tasks and PySpark Notebook jobs (no streaming) Utilization problems appear to be due in large part to difficulties with persist() blocks (DISK or DISK+MEMORY) preventing dynamic deallocation. In situations where an external shuffle service is present (which is typical on clusters of this type) we already solve this for the shuffle block case by offloading the IO handling of shuffle blocks to the external service, allowing dynamic deallocation to proceed. Allowing Executors to transfer persist() blocks to some external "shuffle" service in a similar manner would be an enormous win for Spark multi-tenancy as it would limit deallocation blocking scenarios to only MEMORY-only cache() scenarios. I'm not sure if I'm correct, but I seem to recall seeing in the original external shuffle service commits that may have been considered at the time but getting shuffle blocks moved to the external shuffle service was the first priority. With support for external persist() DISK blocks in place, we could also then handle deallocation of DISK+MEMORY, as the memory instance could first be dropped, changing the block to DISK only, and then further transferred to the shuffle service. We have tried to resolve the persist() issue via extensive user training, but that has typically only allowed us to improve utilization of the worst offenders (10% utilization) up to around 40-60% utilization, as the need for persist() is often legitimate and occurs during the middle stages of a job. In a healthy multi-tenant scenario, the job could spool up to say 10,000 cores, persist() data, release executors across a long tail (to say 100 cores) and then spool back up to 10,000 cores for the following stage without impact on the persist() data. In an ideal world, if an new executor started up on a node on which blocks had been transferred to the shuffle service, the new executor might even be able to "recapture" control of those blocks (if that would help with performance in some way). was: Large and highly multi-tenant Spark on YARN clusters with large populations and diverse job execution often display terrible utilization rates (we have observed as low as 3-7% CPU at max container allocation) due in large part to difficulties with persist() blocks (DISK or DISK+MEMORY) preventing dynamic deallocation. As a sizing example, one of our larger instances has 1,000 nodes, 50,000 cores, 250 users and 1,000 distinct applications run per week, with predominantly Spark workloads including a mixture of ETL, Ad Hoc and PySpark Notebook jobs, plus Hive and some MapReduce (no streaming Spark) In situations where an external shuffle service is present (which is typical on clusters of this type) we already solve this for the shuffle block case by offloading the IO handling of shuffle blocks to the external service, allowing dynamic deallocation to proceed. Allowing Executors to transfer persist() blocks to some external "shuffle" service in a similar manner would be an enormous win for Spark multi-tenancy as it would limit deallocation blocking scenarios to only MEMORY-only cache() scenarios. I'm not sure if I'm correct, but I seem to recall seeing in the original external shuffle service commits that may have been considered at the time but getting shuffle blocks moved to the external shuffle service was the first priority. With support for external persist() DISK blocks in place, we could also then handle deallocation of DISK+MEMORY, as the memory instance could first be dropped, changing the block to DISK only, and then further transferred to the shuffle service. We have tried to resolve the persist() issue via extensive user training, but that has typically only allowed us to improve utilization of the worst offenders (10% utilization) up to around 40-60% utilization, as the need for persist() is often legitimate and occurs during the middle stages of a job. In a healthy multi-tenant scenario, the job could spool up to say 10,000 cores, persist() data, release executors across a long tail (to say 100 cores) and then spool back up to 10,000 cores for the following stage without impact on the persist() data. In an ideal world, if an new executor started up on a node on which blocks had been transferred to the shuffle
[jira] [Updated] (SPARK-25888) Service requests for persist() blocks via external service after dynamic deallocation
[ https://issues.apache.org/jira/browse/SPARK-25888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Adam Kennedy updated SPARK-25888: - Environment: (was: Large YARN cluster with 1,000 nodes, 50,000 cores and 250 users, with predominantly Spark workloads including a mixture of ETL, Ad Hoc and PySpark Notebook jobs. No streaming Spark.) Description: Large and highly multi-tenant Spark on YARN clusters with large populations and diverse job execution often display terrible utilization rates (we have observed as low as 3-7% CPU at max container allocation) due in large part to difficulties with persist() blocks (DISK or DISK+MEMORY) preventing dynamic deallocation. As a sizing example, one of our larger instances has 1,000 nodes, 50,000 cores, 250 users and 1,000 distinct applications run per week, with predominantly Spark workloads including a mixture of ETL, Ad Hoc and PySpark Notebook jobs, plus Hive and some MapReduce (no streaming Spark) In situations where an external shuffle service is present (which is typical on clusters of this type) we already solve this for the shuffle block case by offloading the IO handling of shuffle blocks to the external service, allowing dynamic deallocation to proceed. Allowing Executors to transfer persist() blocks to some external "shuffle" service in a similar manner would be an enormous win for Spark multi-tenancy as it would limit deallocation blocking scenarios to only MEMORY-only cache() scenarios. I'm not sure if I'm correct, but I seem to recall seeing in the original external shuffle service commits that may have been considered at the time but getting shuffle blocks moved to the external shuffle service was the first priority. With support for external persist() DISK blocks in place, we could also then handle deallocation of DISK+MEMORY, as the memory instance could first be dropped, changing the block to DISK only, and then further transferred to the shuffle service. We have tried to resolve the persist() issue via extensive user training, but that has typically only allowed us to improve utilization of the worst offenders (10% utilization) up to around 40-60% utilization, as the need for persist() is often legitimate and occurs during the middle stages of a job. In a healthy multi-tenant scenario, the job could spool up to say 10,000 cores, persist() data, release executors across a long tail (to say 100 cores) and then spool back up to 10,000 cores for the following stage without impact on the persist() data. In an ideal world, if an new executor started up on a node on which blocks had been transferred to the shuffle service, the new executor might even be able to "recapture" control of those blocks (if that would help with performance in some way). was: Large and highly multi-tenant Spark on YARN clusters with large populations and diverse job execution often display terrible utilization rates (we have observed as low as 3-7% CPU at max container allocation) due in large part to difficulties with persist() blocks (DISK or DISK+MEMORY) preventing dynamic deallocation. In situations where an external shuffle service is present (which is typical on clusters of this type) we already solve this for the shuffle block case by offloading the IO handling of shuffle blocks to the external service, allowing dynamic deallocation to proceed. Allowing Executors to transfer persist() blocks to some external "shuffle" service in a similar manner would be an enormous win for Spark multi-tenancy as it would limit deallocation blocking scenarios to only MEMORY-only cache() scenarios. I'm not sure if I'm correct, but I seem to recall seeing in the original external shuffle service commits that may have been considered at the time but getting shuffle blocks moved to the external shuffle service was the first priority. With support for external persist() DISK blocks in place, we could also then handle deallocation of DISK+MEMORY, as the memory instance could first be dropped, changing the block to DISK only, and then further transferred to the shuffle service. We have tried to resolve the persist() issue via extensive user training, but that has typically only allowed us to improve utilization of the worst offenders (10% utilization) up to around 40-60% utilization, as the need for persist() is often legitimate and occurs during the middle stages of a job. In a healthy multi-tenant scenario, the job could spool up to say 10,000 cores, persist() data, release executors across a long tail (to say 100 cores) and then spool back up to 10,000 cores for the following stage without impact on the persist() data. In an ideal world, if an new executor started up on a node on which blocks had been transferred to the shuffle service, the new executor might even be able to "recapture" control of those blocks (if that would help with performance
[jira] [Updated] (SPARK-25888) Service requests for persist() blocks via external service after dynamic deallocation
[ https://issues.apache.org/jira/browse/SPARK-25888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Adam Kennedy updated SPARK-25888: - Environment: Large YARN cluster with 1,000 nodes, 50,000 cores and 250 users, with predominantly Spark workloads including a mixture of ETL, Ad Hoc and PySpark Notebook jobs. No streaming Spark. > Service requests for persist() blocks via external service after dynamic > deallocation > - > > Key: SPARK-25888 > URL: https://issues.apache.org/jira/browse/SPARK-25888 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Shuffle, YARN >Affects Versions: 2.3.2 > Environment: Large YARN cluster with 1,000 nodes, 50,000 cores and > 250 users, with predominantly Spark workloads including a mixture of ETL, Ad > Hoc and PySpark Notebook jobs. > No streaming Spark. >Reporter: Adam Kennedy >Priority: Major > > Large and highly multi-tenant Spark on YARN clusters with large populations > and diverse job execution often display terrible utilization rates (we have > observed as low as 3-7% CPU at max container allocation) due in large part to > difficulties with persist() blocks (DISK or DISK+MEMORY) preventing dynamic > deallocation. > In situations where an external shuffle service is present (which is typical > on clusters of this type) we already solve this for the shuffle block case by > offloading the IO handling of shuffle blocks to the external service, > allowing dynamic deallocation to proceed. > Allowing Executors to transfer persist() blocks to some external "shuffle" > service in a similar manner would be an enormous win for Spark multi-tenancy > as it would limit deallocation blocking scenarios to only MEMORY-only cache() > scenarios. > I'm not sure if I'm correct, but I seem to recall seeing in the original > external shuffle service commits that may have been considered at the time > but getting shuffle blocks moved to the external shuffle service was the > first priority. > With support for external persist() DISK blocks in place, we could also then > handle deallocation of DISK+MEMORY, as the memory instance could first be > dropped, changing the block to DISK only, and then further transferred to the > shuffle service. > We have tried to resolve the persist() issue via extensive user training, but > that has typically only allowed us to improve utilization of the worst > offenders (10% utilization) up to around 40-60% utilization, as the need for > persist() is often legitimate and occurs during the middle stages of a job. > In a healthy multi-tenant scenario, the job could spool up to say 10,000 > cores, persist() data, release executors across a long tail (to say 100 > cores) and then spool back up to 10,000 cores for the following stage without > impact on the persist() data. > In an ideal world, if an new executor started up on a node on which blocks > had been transferred to the shuffle service, the new executor might even be > able to "recapture" control of those blocks (if that would help with > performance in some way). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25888) Service requests for persist() blocks via external service after dynamic deallocation
[ https://issues.apache.org/jira/browse/SPARK-25888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Adam Kennedy updated SPARK-25888: - Summary: Service requests for persist() blocks via external service after dynamic deallocation (was: Service requests for persist() block via external service after dynamic deallocation) > Service requests for persist() blocks via external service after dynamic > deallocation > - > > Key: SPARK-25888 > URL: https://issues.apache.org/jira/browse/SPARK-25888 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Shuffle, YARN >Affects Versions: 2.3.2 >Reporter: Adam Kennedy >Priority: Major > > Large and highly multi-tenant Spark on YARN clusters with large populations > and diverse job execution often display terrible utilization rates (we have > observed as low as 3-7% CPU at max container allocation) due in large part to > difficulties with persist() blocks (DISK or DISK+MEMORY) preventing dynamic > deallocation. > In situations where an external shuffle service is present (which is typical > on clusters of this type) we already solve this for the shuffle block case by > offloading the IO handling of shuffle blocks to the external service, > allowing dynamic deallocation to proceed. > Allowing Executors to transfer persist() blocks to some external "shuffle" > service in a similar manner would be an enormous win for Spark multi-tenancy > as it would limit deallocation blocking scenarios to only MEMORY-only cache() > scenarios. > I'm not sure if I'm correct, but I seem to recall seeing in the original > external shuffle service commits that may have been considered at the time > but getting shuffle blocks moved to the external shuffle service was the > first priority. > With support for external persist() DISK blocks in place, we could also then > handle deallocation of DISK+MEMORY, as the memory instance could first be > dropped, changing the block to DISK only, and then further transferred to the > shuffle service. > We have tried to resolve the persist() issue via extensive user training, but > that has typically only allowed us to improve utilization of the worst > offenders (10% utilization) up to around 40-60% utilization, as the need for > persist() is often legitimate and occurs during the middle stages of a job. > In a healthy multi-tenant scenario, the job could spool up to say 10,000 > cores, persist() data, release executors across a long tail (to say 100 > cores) and then spool back up to 10,000 cores for the following stage without > impact on the persist() data. > In an ideal world, if an new executor started up on a node on which blocks > had been transferred to the shuffle service, the new executor might even be > able to "recapture" control of those blocks (if that would help with > performance in some way). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25888) Service requests for persist() block via external service after dynamic deallocation
Adam Kennedy created SPARK-25888: Summary: Service requests for persist() block via external service after dynamic deallocation Key: SPARK-25888 URL: https://issues.apache.org/jira/browse/SPARK-25888 Project: Spark Issue Type: Improvement Components: Block Manager, Shuffle, YARN Affects Versions: 2.3.2 Reporter: Adam Kennedy Large and highly multi-tenant Spark on YARN clusters with large populations and diverse job execution often display terrible utilization rates (we have observed as low as 3-7% CPU at max container allocation) due in large part to difficulties with persist() blocks (DISK or DISK+MEMORY) preventing dynamic deallocation. In situations where an external shuffle service is present (which is typical on clusters of this type) we already solve this for the shuffle block case by offloading the IO handling of shuffle blocks to the external service, allowing dynamic deallocation to proceed. Allowing Executors to transfer persist() blocks to some external "shuffle" service in a similar manner would be an enormous win for Spark multi-tenancy as it would limit deallocation blocking scenarios to only MEMORY-only cache() scenarios. I'm not sure if I'm correct, but I seem to recall seeing in the original external shuffle service commits that may have been considered at the time but getting shuffle blocks moved to the external shuffle service was the first priority. With support for external persist() DISK blocks in place, we could also then handle deallocation of DISK+MEMORY, as the memory instance could first be dropped, changing the block to DISK only, and then further transferred to the shuffle service. We have tried to resolve the persist() issue via extensive user training, but that has typically only allowed us to improve utilization of the worst offenders (10% utilization) up to around 40-60% utilization, as the need for persist() is often legitimate and occurs during the middle stages of a job. In a healthy multi-tenant scenario, the job could spool up to say 10,000 cores, persist() data, release executors across a long tail (to say 100 cores) and then spool back up to 10,000 cores for the following stage without impact on the persist() data. In an ideal world, if an new executor started up on a node on which blocks had been transferred to the shuffle service, the new executor might even be able to "recapture" control of those blocks (if that would help with performance in some way). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21155) Add (? running tasks) into Spark UI progress
[ https://issues.apache.org/jira/browse/SPARK-21155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222120#comment-16222120 ] Adam Kennedy commented on SPARK-21155: -- It can already get fairly crowded in that run area on large jobs (since it also has killed jobs and other things). Could we use a simpler format similar to the progress bar in the console? Something similar to this... x+y/z ... meaning ... Done + Running / Total > Add (? running tasks) into Spark UI progress > > > Key: SPARK-21155 > URL: https://issues.apache.org/jira/browse/SPARK-21155 > Project: Spark > Issue Type: Improvement > Components: Web UI >Affects Versions: 2.1.1 >Reporter: Eric Vandenberg >Assignee: Eric Vandenberg >Priority: Minor > Fix For: 2.3.0 > > Attachments: Screen Shot 2017-06-20 at 12.32.58 PM.png, Screen Shot > 2017-06-20 at 3.40.39 PM.png, Screen Shot 2017-06-22 at 9.58.08 AM.png > > > The progress UI for Active Jobs / Tasks should show the number of exact > number of running tasks. See screen shot attachment for what this looks like. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19112) add codec for ZStandard
[ https://issues.apache.org/jira/browse/SPARK-19112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16113653#comment-16113653 ] Adam Kennedy commented on SPARK-19112: -- Will this be impacted by LEGAL-303? zstd-jni embeds zstd which has the Facebook PATENTS file in it. > add codec for ZStandard > --- > > Key: SPARK-19112 > URL: https://issues.apache.org/jira/browse/SPARK-19112 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Reporter: Thomas Graves >Priority: Minor > > ZStandard: https://github.com/facebook/zstd and > http://facebook.github.io/zstd/ has been in use for a while now. v1.0 was > recently released. Hadoop > (https://issues.apache.org/jira/browse/HADOOP-13578) and others > (https://issues.apache.org/jira/browse/KAFKA-4514) are adopting it. > Zstd seems to give great results => Gzip level Compression with Lz4 level CPU. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org