[jira] [Commented] (SPARK-36446) YARN shuffle server restart crashes all dynamic allocation jobs that have deallocated an executor

2021-08-31 Thread Adam Kennedy (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17407714#comment-17407714
 ] 

Adam Kennedy commented on SPARK-36446:
--

[~tgraves] Yes, we are running with recovery enabled (in the case where shuffle 
server connections are secure).

But the same problem occurs when the shuffle server is run as an independent 
process outside of YARN (insecurely) if they crash and restart.

> YARN shuffle server restart crashes all dynamic allocation jobs that have 
> deallocated an executor
> -
>
> Key: SPARK-36446
> URL: https://issues.apache.org/jira/browse/SPARK-36446
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 2.4.8, 3.1.2
>Reporter: Adam Kennedy
>Priority: Critical
>
> When dynamic allocation is enabled, executors that deallocate rely on the 
> shuffle server to hold blocks and supply them to remaining executors.
> When YARN Shuffle Server restarts (either intentionally or due to a crash), 
> it loses block information and relies on being able to contact Executors (the 
> locations of which it durably stores) to refetch the list of blocks.
> This mutual dependency on the other to hold block information fails fatally 
> under some common scenarios.
> For example, if a Spark application is running under dynamic allocation, some 
> amount of executors will almost always shut down.
> If, after this has occurred, any shuffle server crashes, or is restarted 
> (either directly when running as a standalone service, or as part of a YARN 
> node manager restart) then there is no way to restore block data and it is 
> permanently lost.
> Worse, when Executors try to fetch blocks from the shuffle server, the 
> shuffle server cannot location the exeutor, decides it doesn't exist, treats 
> it as a fatal exception, and causes the application to terminate and crash.
> Thus, in a real world scenario that we observe on a 1000+ node multi-tenant 
> cluster  where dynamic allocation is on by default, a rolling restart of the 
> YARN node managers will cause ALL jobs that have deallocated any executor and 
> have shuffles or transferred blocks to the shuffle server in order to shut 
> down, to crash.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-36446) YARN shuffle server restart crashes all dynamic allocation jobs that have deallocated an executor

2021-08-06 Thread Adam Kennedy (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17394905#comment-17394905
 ] 

Adam Kennedy edited comment on SPARK-36446 at 8/6/21, 5:56 PM:
---

The problem was particularly amplified by the Executor deallocation 
improvements in  SPARK-27677 and SPARK-21097 that came about as a result of 
SPARK-25888. It is now much more likely that Executors will deallocate, even in 
the face of persist() or cache().

 


was (Author: adamkennedy77):
The problem was particularly amplified by the Executor deallocation 
improvements in  SPARK-27677 that came about as a result of SPARK-25888. It is 
now much more likely that Executors will deallocate, even in the face of 
persist() or cache().

 

> YARN shuffle server restart crashes all dynamic allocation jobs that have 
> deallocated an executor
> -
>
> Key: SPARK-36446
> URL: https://issues.apache.org/jira/browse/SPARK-36446
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 2.4.8, 3.1.2
>Reporter: Adam Kennedy
>Priority: Critical
>
> When dynamic allocation is enabled, executors that deallocate rely on the 
> shuffle server to hold blocks and supply them to remaining executors.
> When YARN Shuffle Server restarts (either intentionally or due to a crash), 
> it loses block information and relies on being able to contact Executors (the 
> locations of which it durably stores) to refetch the list of blocks.
> This mutual dependency on the other to hold block information fails fatally 
> under some common scenarios.
> For example, if a Spark application is running under dynamic allocation, some 
> amount of executors will almost always shut down.
> If, after this has occurred, any shuffle server crashes, or is restarted 
> (either directly when running as a standalone service, or as part of a YARN 
> node manager restart) then there is no way to restore block data and it is 
> permanently lost.
> Worse, when Executors try to fetch blocks from the shuffle server, the 
> shuffle server cannot location the exeutor, decides it doesn't exist, treats 
> it as a fatal exception, and causes the application to terminate and crash.
> Thus, in a real world scenario that we observe on a 1000+ node multi-tenant 
> cluster  where dynamic allocation is on by default, a rolling restart of the 
> YARN node managers will cause ALL jobs that have deallocated any executor and 
> have shuffles or transferred blocks to the shuffle server in order to shut 
> down, to crash.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-36446) YARN shuffle server restart crashes all dynamic allocation jobs that have deallocated an executor

2021-08-06 Thread Adam Kennedy (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17394905#comment-17394905
 ] 

Adam Kennedy edited comment on SPARK-36446 at 8/6/21, 5:06 PM:
---

The problem was particularly amplified by the Executor deallocation 
improvements in  SPARK-27677 that came about as a result of SPARK-25888. It is 
now much more likely that Executors will deallocate, even in the face of 
persist() or cache().

 


was (Author: adamkennedy77):
The problem was particularly amplified by the Executor deallocation 
improvements in  SPARK-27677 that came about as a result of SPARK-25888.

 

> YARN shuffle server restart crashes all dynamic allocation jobs that have 
> deallocated an executor
> -
>
> Key: SPARK-36446
> URL: https://issues.apache.org/jira/browse/SPARK-36446
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 2.4.8, 3.1.2
>Reporter: Adam Kennedy
>Priority: Critical
>
> When dynamic allocation is enabled, executors that deallocate rely on the 
> shuffle server to hold blocks and supply them to remaining executors.
> When YARN Shuffle Server restarts (either intentionally or due to a crash), 
> it loses block information and relies on being able to contact Executors (the 
> locations of which it durably stores) to refetch the list of blocks.
> This mutual dependency on the other to hold block information fails fatally 
> under some common scenarios.
> For example, if a Spark application is running under dynamic allocation, some 
> amount of executors will almost always shut down.
> If, after this has occurred, any shuffle server crashes, or is restarted 
> (either directly when running as a standalone service, or as part of a YARN 
> node manager restart) then there is no way to restore block data and it is 
> permanently lost.
> Worse, when Executors try to fetch blocks from the shuffle server, the 
> shuffle server cannot location the exeutor, decides it doesn't exist, treats 
> it as a fatal exception, and causes the application to terminate and crash.
> Thus, in a real world scenario that we observe on a 1000+ node multi-tenant 
> cluster  where dynamic allocation is on by default, a rolling restart of the 
> YARN node managers will cause ALL jobs that have deallocated any executor and 
> have shuffles or transferred blocks to the shuffle server in order to shut 
> down, to crash.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-36446) YARN shuffle server restart crashes all dynamic allocation jobs that have deallocated an executor

2021-08-06 Thread Adam Kennedy (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17394905#comment-17394905
 ] 

Adam Kennedy edited comment on SPARK-36446 at 8/6/21, 5:06 PM:
---

The problem was particularly amplified by the Executor deallocation 
improvements in  SPARK-27677 that came about as a result of SPARK-25888.

 


was (Author: adamkennedy77):
The problem was particularly amplified by the Executor deallocation 
improvements in  SPARK-27677 that came about as a result of SPARK 25888.

 

> YARN shuffle server restart crashes all dynamic allocation jobs that have 
> deallocated an executor
> -
>
> Key: SPARK-36446
> URL: https://issues.apache.org/jira/browse/SPARK-36446
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 2.4.8, 3.1.2
>Reporter: Adam Kennedy
>Priority: Critical
>
> When dynamic allocation is enabled, executors that deallocate rely on the 
> shuffle server to hold blocks and supply them to remaining executors.
> When YARN Shuffle Server restarts (either intentionally or due to a crash), 
> it loses block information and relies on being able to contact Executors (the 
> locations of which it durably stores) to refetch the list of blocks.
> This mutual dependency on the other to hold block information fails fatally 
> under some common scenarios.
> For example, if a Spark application is running under dynamic allocation, some 
> amount of executors will almost always shut down.
> If, after this has occurred, any shuffle server crashes, or is restarted 
> (either directly when running as a standalone service, or as part of a YARN 
> node manager restart) then there is no way to restore block data and it is 
> permanently lost.
> Worse, when Executors try to fetch blocks from the shuffle server, the 
> shuffle server cannot location the exeutor, decides it doesn't exist, treats 
> it as a fatal exception, and causes the application to terminate and crash.
> Thus, in a real world scenario that we observe on a 1000+ node multi-tenant 
> cluster  where dynamic allocation is on by default, a rolling restart of the 
> YARN node managers will cause ALL jobs that have deallocated any executor and 
> have shuffles or transferred blocks to the shuffle server in order to shut 
> down, to crash.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-36446) YARN shuffle server restart crashes all dynamic allocation jobs that have deallocated an executor

2021-08-06 Thread Adam Kennedy (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17394905#comment-17394905
 ] 

Adam Kennedy commented on SPARK-36446:
--

The problem was particularly amplified by the Executor deallocation 
improvements in  SPARK-27677 that came about as a result of SPARK 25888.

 

> YARN shuffle server restart crashes all dynamic allocation jobs that have 
> deallocated an executor
> -
>
> Key: SPARK-36446
> URL: https://issues.apache.org/jira/browse/SPARK-36446
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 2.4.8, 3.1.2
>Reporter: Adam Kennedy
>Priority: Critical
>
> When dynamic allocation is enabled, executors that deallocate rely on the 
> shuffle server to hold blocks and supply them to remaining executors.
> When YARN Shuffle Server restarts (either intentionally or due to a crash), 
> it loses block information and relies on being able to contact Executors (the 
> locations of which it durably stores) to refetch the list of blocks.
> This mutual dependency on the other to hold block information fails fatally 
> under some common scenarios.
> For example, if a Spark application is running under dynamic allocation, some 
> amount of executors will almost always shut down.
> If, after this has occurred, any shuffle server crashes, or is restarted 
> (either directly when running as a standalone service, or as part of a YARN 
> node manager restart) then there is no way to restore block data and it is 
> permanently lost.
> Worse, when Executors try to fetch blocks from the shuffle server, the 
> shuffle server cannot location the exeutor, decides it doesn't exist, treats 
> it as a fatal exception, and causes the application to terminate and crash.
> Thus, in a real world scenario that we observe on a 1000+ node multi-tenant 
> cluster  where dynamic allocation is on by default, a rolling restart of the 
> YARN node managers will cause ALL jobs that have deallocated any executor and 
> have shuffles or transferred blocks to the shuffle server in order to shut 
> down, to crash.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-36446) YARN shuffle server restart crashes all dynamic allocation jobs that have deallocated an executor

2021-08-06 Thread Adam Kennedy (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17394873#comment-17394873
 ] 

Adam Kennedy commented on SPARK-36446:
--

Note: While I haven't investigated any other shuffle servers than the YARN 
shuffle server, this may impact other shuffle server implementations and they 
should be checked for the same problem.

> YARN shuffle server restart crashes all dynamic allocation jobs that have 
> deallocated an executor
> -
>
> Key: SPARK-36446
> URL: https://issues.apache.org/jira/browse/SPARK-36446
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 2.4.8, 3.1.2
>Reporter: Adam Kennedy
>Priority: Critical
>
> When dynamic allocation is enabled, executors that deallocate rely on the 
> shuffle server to hold blocks and supply them to remaining executors.
> When YARN Shuffle Server restarts (either intentionally or due to a crash), 
> it loses block information and relies on being able to contact Executors (the 
> locations of which it durably stores) to refetch the list of blocks.
> This mutual dependency on the other to hold block information fails fatally 
> under some common scenarios.
> For example, if a Spark application is running under dynamic allocation, some 
> amount of executors will almost always shut down.
> If, after this has occurred, any shuffle server crashes, or is restarted 
> (either directly when running as a standalone service, or as part of a YARN 
> node manager restart) then there is no way to restore block data and it is 
> permanently lost.
> Worse, when Executors try to fetch blocks from the shuffle server, the 
> shuffle server cannot location the exeutor, decides it doesn't exist, treats 
> it as a fatal exception, and causes the application to terminate and crash.
> Thus, in a real world scenario that we observe on a 1000+ node multi-tenant 
> cluster  where dynamic allocation is on by default, a rolling restart of the 
> YARN node managers will cause ALL jobs that have deallocated any executor and 
> have shuffles or transferred blocks to the shuffle server in order to shut 
> down, to crash.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36446) YARN shuffle server restart crashes all dynamic allocation jobs that have deallocated an executor

2021-08-06 Thread Adam Kennedy (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adam Kennedy updated SPARK-36446:
-
Summary: YARN shuffle server restart crashes all dynamic allocation jobs 
that have deallocated an executor  (was: YARN shuffle server restart crashes 
all dynamic allocation jobs that have deallocated)

> YARN shuffle server restart crashes all dynamic allocation jobs that have 
> deallocated an executor
> -
>
> Key: SPARK-36446
> URL: https://issues.apache.org/jira/browse/SPARK-36446
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 2.4.8, 3.1.2
>Reporter: Adam Kennedy
>Priority: Critical
>
> When dynamic allocation is enabled, executors that deallocate rely on the 
> shuffle server to hold blocks and supply them to remaining executors.
> When YARN Shuffle Server restarts (either intentionally or due to a crash), 
> it loses block information and relies on being able to contact Executors (the 
> locations of which it durably stores) to refetch the list of blocks.
> This mutual dependency on the other to hold block information fails fatally 
> under some common scenarios.
> For example, if a Spark application is running under dynamic allocation, some 
> amount of executors will almost always shut down.
> If, after this has occurred, any shuffle server crashes, or is restarted 
> (either directly when running as a standalone service, or as part of a YARN 
> node manager restart) then there is no way to restore block data and it is 
> permanently lost.
> Worse, when Executors try to fetch blocks from the shuffle server, the 
> shuffle server cannot location the exeutor, decides it doesn't exist, treats 
> it as a fatal exception, and causes the application to terminate and crash.
> Thus, in a real world scenario that we observe on a 1000+ node multi-tenant 
> cluster  where dynamic allocation is on by default, a rolling restart of the 
> YARN node managers will cause ALL jobs that have deallocated any executor and 
> have shuffles or transferred blocks to the shuffle server in order to shut 
> down, to crash.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36446) YARN shuffle server restart crashes all dynamic allocation jobs that have deallocated

2021-08-06 Thread Adam Kennedy (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adam Kennedy updated SPARK-36446:
-
Summary: YARN shuffle server restart crashes all dynamic allocation jobs 
that have deallocated  (was: After dynamic deallocation YARN shuffle server 
restart crashes all jobs)

> YARN shuffle server restart crashes all dynamic allocation jobs that have 
> deallocated
> -
>
> Key: SPARK-36446
> URL: https://issues.apache.org/jira/browse/SPARK-36446
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 2.4.8, 3.1.2
>Reporter: Adam Kennedy
>Priority: Critical
>
> When dynamic allocation is enabled, executors that deallocate rely on the 
> shuffle server to hold blocks and supply them to remaining executors.
> When YARN Shuffle Server restarts (either intentionally or due to a crash), 
> it loses block information and relies on being able to contact Executors (the 
> locations of which it durably stores) to refetch the list of blocks.
> This mutual dependency on the other to hold block information fails fatally 
> under some common scenarios.
> For example, if a Spark application is running under dynamic allocation, some 
> amount of executors will almost always shut down.
> If, after this has occurred, any shuffle server crashes, or is restarted 
> (either directly when running as a standalone service, or as part of a YARN 
> node manager restart) then there is no way to restore block data and it is 
> permanently lost.
> Worse, when Executors try to fetch blocks from the shuffle server, the 
> shuffle server cannot location the exeutor, decides it doesn't exist, treats 
> it as a fatal exception, and causes the application to terminate and crash.
> Thus, in a real world scenario that we observe on a 1000+ node multi-tenant 
> cluster  where dynamic allocation is on by default, a rolling restart of the 
> YARN node managers will cause ALL jobs that have deallocated any executor and 
> have shuffles or transferred blocks to the shuffle server in order to shut 
> down, to crash.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36446) After dynamic deallocation YARN shuffle server restart crashes all jobs

2021-08-06 Thread Adam Kennedy (Jira)
Adam Kennedy created SPARK-36446:


 Summary: After dynamic deallocation YARN shuffle server restart 
crashes all jobs
 Key: SPARK-36446
 URL: https://issues.apache.org/jira/browse/SPARK-36446
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 3.1.2, 2.4.8
Reporter: Adam Kennedy


When dynamic allocation is enabled, executors that deallocate rely on the 
shuffle server to hold blocks and supply them to remaining executors.

When YARN Shuffle Server restarts (either intentionally or due to a crash), it 
loses block information and relies on being able to contact Executors (the 
locations of which it durably stores) to refetch the list of blocks.

This mutual dependency on the other to hold block information fails fatally 
under some common scenarios.

For example, if a Spark application is running under dynamic allocation, some 
amount of executors will almost always shut down.

If, after this has occurred, any shuffle server crashes, or is restarted 
(either directly when running as a standalone service, or as part of a YARN 
node manager restart) then there is no way to restore block data and it is 
permanently lost.

Worse, when Executors try to fetch blocks from the shuffle server, the shuffle 
server cannot location the exeutor, decides it doesn't exist, treats it as a 
fatal exception, and causes the application to terminate and crash.

Thus, in a real world scenario that we observe on a 1000+ node multi-tenant 
cluster  where dynamic allocation is on by default, a rolling restart of the 
YARN node managers will cause ALL jobs that have deallocated any executor and 
have shuffles or transferred blocks to the shuffle server in order to shut 
down, to crash.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21097) Dynamic allocation will preserve cached data

2019-08-16 Thread Adam Kennedy (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16909342#comment-16909342
 ] 

Adam Kennedy commented on SPARK-21097:
--

Another supporting reason for supporting transfer of memory cache blocks in 
general... in an application where executors are spinning up and down all the 
time, we are likely to see situations in which the memory cache blocks are 
heavily skewed towards a small percentage of the total executors. If we had 
support for replicating cache blocks around, we could also look at a more 
general cache balancer, which could redistribute the cache blocks over time 
(perhaps idle periods or using long tail resources when no new tasks exist to 
distribute) to make subsequent stages or jobs more likely to get all tasks 
distributed evenly.

> Dynamic allocation will preserve cached data
> 
>
> Key: SPARK-21097
> URL: https://issues.apache.org/jira/browse/SPARK-21097
> Project: Spark
>  Issue Type: Improvement
>  Components: Block Manager, Scheduler, Spark Core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Brad
>Priority: Major
> Attachments: Preserving Cached Data with Dynamic Allocation.pdf
>
>
> We want to use dynamic allocation to distribute resources among many notebook 
> users on our spark clusters. One difficulty is that if a user has cached data 
> then we are either prevented from de-allocating any of their executors, or we 
> are forced to drop their cached data, which can lead to a bad user experience.
> We propose adding a feature to preserve cached data by copying it to other 
> executors before de-allocation. This behavior would be enabled by a simple 
> spark config. Now when an executor reaches its configured idle timeout, 
> instead of just killing it on the spot, we will stop sending it new tasks, 
> replicate all of its rdd blocks onto other executors, and then kill it. If 
> there is an issue while we replicate the data, like an error, it takes too 
> long, or there isn't enough space, then we will fall back to the original 
> behavior and drop the data and kill the executor.
> This feature should allow anyone with notebook users to use their cluster 
> resources more efficiently. Also since it will be completely opt-in it will 
> unlikely to cause problems for other use cases.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25889) Dynamic allocation load-aware ramp up

2019-02-21 Thread Adam Kennedy (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adam Kennedy updated SPARK-25889:
-
Description: 
The time based exponential ramp up behavior for dynamic allocation is naive and 
destructive, making it very difficult to run very large jobs.

On a large (64,000 core) YARN cluster with a high number of input partitions 
(200,000+) the default dynamic allocation approach of requesting containers in 
waves, doubling exponentially once per second, results in 50% of the entire 
cluster being requested in the final 1 second wave.

This can easily overwhelm RPC processing, or cause expensive Executor startup 
steps to break systems. With the interval so short, many additional containers 
may be requested beyond what is actually needed and then complete very little 
work before sitting around waiting to be deallocated.

Delaying the time between these fixed doublings only has limited impact. 
Setting double intervals to once per minute would result in a very slow ramp up 
speed, at the end of which we still face large potentially crippling waves of 
executor startup.

An alternative approach to spooling up large job appears to be needed, which is 
still relatively simple but could be more adaptable to different cluster sizes 
and differing cluster and job performance.

I would like to propose a few different approaches based around the general 
idea of controlling outstanding requests for new containers based on the number 
of executors that are currently running, for some definition of "running".

One example might be to limit requests to one new executor for every existing 
executor that currently has an active task. Or some ratio of that, to allow for 
more or less aggressive spool up. A lower number would let us approximate 
something like fibonacci ramp up, a higher number of say 2x would spool up 
quickly, but still aligned with the rate at which broadcast blocks can be 
easily distributed to new members.

An alternative approach might be to limit the escalation rate of new executor 
requests based on the number of outstanding executors requested which have not 
yet fully completed startup and are not available for tasks. To protect against 
a potentially suboptimal very early ramp, a minimum concurrent executor startup 
threshold might allow an initial burst of say 10 executors, after which the 
more gradual ramp math would apply.

  was:
The time based exponential ramp up behavior for dynamic allocation is naive and 
destructive, making it very difficult to run very large jobs.

On a large (64,000 core) YARN cluster with a high number of input partitions 
(200,000+) the default dynamic allocation approach of requesting containers in 
waves, doubling exponentially once per second, results in 50% of the entire 
cluster being requested in the final 1 second wave.

This can easily overwhelm RPC processing, or cause expensive Executor startup 
steps to break systems. With the interval so short, many additional containers 
may be requested beyond what is actually needed and then complete very little 
work before sitting around waiting to be deallocated.

Delaying the time between these fixed doublings only has limited impact. 
Setting double intervals to once per minute would result in a very slow ramp up 
speed, at the end of which we still face large potentially crippling waves of 
executor startup.

An alternative approach to spooling up large job appears to be needed, which is 
still relatively simple but could be more adaptable to different cluster sizes 
and differing cluster and job performance.

I would like to propose a few different approaches based around the general 
idea of controlling outstanding requests for new containers based on the number 
of executors that are currently running, for some definition of "running".

One example might be to limit requests to one new executor for every existing 
executor that currently has an active task. Or some ratio of that, to allow for 
more or less aggressive spool up. A lower number would let us approximate 
something like fibonacci ramp up, a higher number of say 2x would spool up 
quickly, but still aligned with the rate at which broadcast blocks can be 
easily distributed to new members.

 


> Dynamic allocation load-aware ramp up
> -
>
> Key: SPARK-25889
> URL: https://issues.apache.org/jira/browse/SPARK-25889
> Project: Spark
>  Issue Type: New Feature
>  Components: Scheduler, YARN
>Affects Versions: 2.3.2
>Reporter: Adam Kennedy
>Priority: Major
>
> The time based exponential ramp up behavior for dynamic allocation is naive 
> and destructive, making it very difficult to run very large jobs.
> On a large (64,000 core) YARN cluster with a high number of input partitions 
> (200,000+) the default dynamic 

[jira] [Updated] (SPARK-25888) Service requests for persist() blocks via external service after dynamic deallocation

2018-10-30 Thread Adam Kennedy (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adam Kennedy updated SPARK-25888:
-
Description: 
Large and highly multi-tenant Spark on YARN clusters with diverse job execution 
often display terrible utilization rates (we have observed as low as 3-7% CPU 
at max container allocation, but 50% CPU utilization on even a well policed 
cluster is not uncommon).

As a sizing example, consider a scenario with 1,000 nodes, 50,000 cores, 250 
users and 50,000 runs of 1,000 distinct applications per week, with 
predominantly Spark including a mixture of ETL, Ad Hoc tasks and PySpark 
Notebook jobs (no streaming)

Utilization problems appear to be due in large part to difficulties with 
persist() blocks (DISK or DISK+MEMORY) preventing dynamic deallocation.

In situations where an external shuffle service is present (which is typical on 
clusters of this type) we already solve this for the shuffle block case by 
offloading the IO handling of shuffle blocks to the external service, allowing 
dynamic deallocation to proceed.

Allowing Executors to transfer persist() blocks to some external "shuffle" 
service in a similar manner would be an enormous win for Spark multi-tenancy as 
it would limit deallocation blocking scenarios to only MEMORY-only cache() 
scenarios.

I'm not sure if I'm correct, but I seem to recall seeing in the original 
external shuffle service commits that may have been considered at the time but 
getting shuffle blocks moved to the external shuffle service was the first 
priority.

With support for external persist() DISK blocks in place, we could also then 
handle deallocation of DISK+MEMORY, as the memory instance could first be 
dropped, changing the block to DISK only, and then further transferred to the 
shuffle service.

We have tried to resolve the persist() issue via extensive user training, but 
that has typically only allowed us to improve utilization of the worst 
offenders (10% utilization) up to around 40-60% utilization, as the need for 
persist() is often legitimate and occurs during the middle stages of a job.

In a healthy multi-tenant scenario, a large job might spool up to say 10,000 
cores, persist() data, release executors across a long tail down to 100 cores, 
and then spool back up to 10,000 cores for the following stage without impact 
on the persist() data.

In an ideal world, if an new executor started up on a node on which blocks had 
been transferred to the shuffle service, the new executor might even be able to 
"recapture" control of those blocks (if that would help with performance in 
some way).

And the behavior of gradually expanding up and down several times over the 
course of a job would not just improve utilization, but would allow resources 
to more easily be redistributed to other jobs which start on the cluster during 
the long-tail periods, which would improve multi-tenancy and bring us closer to 
optimal "envy free" YARN scheduling.

  was:
Large and highly multi-tenant Spark on YARN clusters with diverse job execution 
often display terrible utilization rates (we have observed as low as 3-7% CPU 
at max container allocation, but 50% CPU utilization on even a well policed 
cluster is not uncommon).

As a sizing example, one of our larger instances has 1,000 nodes, 50,000 cores, 
250 users and 1,000 distinct applications run per week, with predominantly 
Spark including a mixture of ETL, Ad Hoc tasks and PySpark Notebook jobs (no 
streaming)

Utilization problems appear to be due in large part to difficulties with 
persist() blocks (DISK or DISK+MEMORY) preventing dynamic deallocation.

In situations where an external shuffle service is present (which is typical on 
clusters of this type) we already solve this for the shuffle block case by 
offloading the IO handling of shuffle blocks to the external service, allowing 
dynamic deallocation to proceed.

Allowing Executors to transfer persist() blocks to some external "shuffle" 
service in a similar manner would be an enormous win for Spark multi-tenancy as 
it would limit deallocation blocking scenarios to only MEMORY-only cache() 
scenarios.

I'm not sure if I'm correct, but I seem to recall seeing in the original 
external shuffle service commits that may have been considered at the time but 
getting shuffle blocks moved to the external shuffle service was the first 
priority.

With support for external persist() DISK blocks in place, we could also then 
handle deallocation of DISK+MEMORY, as the memory instance could first be 
dropped, changing the block to DISK only, and then further transferred to the 
shuffle service.

We have tried to resolve the persist() issue via extensive user training, but 
that has typically only allowed us to improve utilization of the worst 
offenders (10% utilization) up to around 40-60% utilization, as the need for 
persist() is often legitimate and occurs 

[jira] [Updated] (SPARK-25888) Service requests for persist() blocks via external service after dynamic deallocation

2018-10-30 Thread Adam Kennedy (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adam Kennedy updated SPARK-25888:
-
Issue Type: New Feature  (was: Improvement)

> Service requests for persist() blocks via external service after dynamic 
> deallocation
> -
>
> Key: SPARK-25888
> URL: https://issues.apache.org/jira/browse/SPARK-25888
> Project: Spark
>  Issue Type: New Feature
>  Components: Block Manager, Shuffle, YARN
>Affects Versions: 2.3.2
>Reporter: Adam Kennedy
>Priority: Major
>
> Large and highly multi-tenant Spark on YARN clusters with diverse job 
> execution often display terrible utilization rates (we have observed as low 
> as 3-7% CPU at max container allocation, but 50% CPU utilization on even a 
> well policed cluster is not uncommon).
> As a sizing example, one of our larger instances has 1,000 nodes, 50,000 
> cores, 250 users and 1,000 distinct applications run per week, with 
> predominantly Spark including a mixture of ETL, Ad Hoc tasks and PySpark 
> Notebook jobs (no streaming)
> Utilization problems appear to be due in large part to difficulties with 
> persist() blocks (DISK or DISK+MEMORY) preventing dynamic deallocation.
> In situations where an external shuffle service is present (which is typical 
> on clusters of this type) we already solve this for the shuffle block case by 
> offloading the IO handling of shuffle blocks to the external service, 
> allowing dynamic deallocation to proceed.
> Allowing Executors to transfer persist() blocks to some external "shuffle" 
> service in a similar manner would be an enormous win for Spark multi-tenancy 
> as it would limit deallocation blocking scenarios to only MEMORY-only cache() 
> scenarios.
> I'm not sure if I'm correct, but I seem to recall seeing in the original 
> external shuffle service commits that may have been considered at the time 
> but getting shuffle blocks moved to the external shuffle service was the 
> first priority.
> With support for external persist() DISK blocks in place, we could also then 
> handle deallocation of DISK+MEMORY, as the memory instance could first be 
> dropped, changing the block to DISK only, and then further transferred to the 
> shuffle service.
> We have tried to resolve the persist() issue via extensive user training, but 
> that has typically only allowed us to improve utilization of the worst 
> offenders (10% utilization) up to around 40-60% utilization, as the need for 
> persist() is often legitimate and occurs during the middle stages of a job.
> In a healthy multi-tenant scenario, a large job might spool up to say 10,000 
> cores, persist() data, release executors across a long tail down to 100 
> cores, and then spool back up to 10,000 cores for the following stage without 
> impact on the persist() data.
> In an ideal world, if an new executor started up on a node on which blocks 
> had been transferred to the shuffle service, the new executor might even be 
> able to "recapture" control of those blocks (if that would help with 
> performance in some way).
> And the behavior of gradually expanding up and down several times over the 
> course of a job would not just improve utilization, but would allow resources 
> to more easily be redistributed to other jobs which start on the cluster 
> during the long-tail periods, which would improve multi-tenancy and bring us 
> closer to optimal "envy free" YARN scheduling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25888) Service requests for persist() blocks via external service after dynamic deallocation

2018-10-30 Thread Adam Kennedy (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adam Kennedy updated SPARK-25888:
-
Shepherd: DB Tsai

> Service requests for persist() blocks via external service after dynamic 
> deallocation
> -
>
> Key: SPARK-25888
> URL: https://issues.apache.org/jira/browse/SPARK-25888
> Project: Spark
>  Issue Type: Improvement
>  Components: Block Manager, Shuffle, YARN
>Affects Versions: 2.3.2
>Reporter: Adam Kennedy
>Priority: Major
>
> Large and highly multi-tenant Spark on YARN clusters with diverse job 
> execution often display terrible utilization rates (we have observed as low 
> as 3-7% CPU at max container allocation, but 50% CPU utilization on even a 
> well policed cluster is not uncommon).
> As a sizing example, one of our larger instances has 1,000 nodes, 50,000 
> cores, 250 users and 1,000 distinct applications run per week, with 
> predominantly Spark including a mixture of ETL, Ad Hoc tasks and PySpark 
> Notebook jobs (no streaming)
> Utilization problems appear to be due in large part to difficulties with 
> persist() blocks (DISK or DISK+MEMORY) preventing dynamic deallocation.
> In situations where an external shuffle service is present (which is typical 
> on clusters of this type) we already solve this for the shuffle block case by 
> offloading the IO handling of shuffle blocks to the external service, 
> allowing dynamic deallocation to proceed.
> Allowing Executors to transfer persist() blocks to some external "shuffle" 
> service in a similar manner would be an enormous win for Spark multi-tenancy 
> as it would limit deallocation blocking scenarios to only MEMORY-only cache() 
> scenarios.
> I'm not sure if I'm correct, but I seem to recall seeing in the original 
> external shuffle service commits that may have been considered at the time 
> but getting shuffle blocks moved to the external shuffle service was the 
> first priority.
> With support for external persist() DISK blocks in place, we could also then 
> handle deallocation of DISK+MEMORY, as the memory instance could first be 
> dropped, changing the block to DISK only, and then further transferred to the 
> shuffle service.
> We have tried to resolve the persist() issue via extensive user training, but 
> that has typically only allowed us to improve utilization of the worst 
> offenders (10% utilization) up to around 40-60% utilization, as the need for 
> persist() is often legitimate and occurs during the middle stages of a job.
> In a healthy multi-tenant scenario, a large job might spool up to say 10,000 
> cores, persist() data, release executors across a long tail down to 100 
> cores, and then spool back up to 10,000 cores for the following stage without 
> impact on the persist() data.
> In an ideal world, if an new executor started up on a node on which blocks 
> had been transferred to the shuffle service, the new executor might even be 
> able to "recapture" control of those blocks (if that would help with 
> performance in some way).
> And the behavior of gradually expanding up and down several times over the 
> course of a job would not just improve utilization, but would allow resources 
> to more easily be redistributed to other jobs which start on the cluster 
> during the long-tail periods, which would improve multi-tenancy and bring us 
> closer to optimal "envy free" YARN scheduling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25889) Dynamic allocation load-aware ramp up

2018-10-30 Thread Adam Kennedy (JIRA)
Adam Kennedy created SPARK-25889:


 Summary: Dynamic allocation load-aware ramp up
 Key: SPARK-25889
 URL: https://issues.apache.org/jira/browse/SPARK-25889
 Project: Spark
  Issue Type: New Feature
  Components: Scheduler, YARN
Affects Versions: 2.3.2
Reporter: Adam Kennedy


The time based exponential ramp up behavior for dynamic allocation is naive and 
destructive, making it very difficult to run very large jobs.

On a large (64,000 core) YARN cluster with a high number of input partitions 
(200,000+) the default dynamic allocation approach of requesting containers in 
waves, doubling exponentially once per second, results in 50% of the entire 
cluster being requested in the final 1 second wave.

This can easily overwhelm RPC processing, or cause expensive Executor startup 
steps to break systems. With the interval so short, many additional containers 
may be requested beyond what is actually needed and then complete very little 
work before sitting around waiting to be deallocated.

Delaying the time between these fixed doublings only has limited impact. 
Setting double intervals to once per minute would result in a very slow ramp up 
speed, at the end of which we still face large potentially crippling waves of 
executor startup.

An alternative approach to spooling up large job appears to be needed, which is 
still relatively simple but could be more adaptable to different cluster sizes 
and differing cluster and job performance.

I would like to propose a few different approaches based around the general 
idea of controlling outstanding requests for new containers based on the number 
of executors that are currently running, for some definition of "running".

One example might be to limit requests to one new executor for every existing 
executor that currently has an active task. Or some ratio of that, to allow for 
more or less aggressive spool up. A lower number would let us approximate 
something like fibonacci ramp up, a higher number of say 2x would spool up 
quickly, but still aligned with the rate at which broadcast blocks can be 
easily distributed to new members.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25888) Service requests for persist() blocks via external service after dynamic deallocation

2018-10-30 Thread Adam Kennedy (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adam Kennedy updated SPARK-25888:
-
Description: 
Large and highly multi-tenant Spark on YARN clusters with diverse job execution 
often display terrible utilization rates (we have observed as low as 3-7% CPU 
at max container allocation, but 50% CPU utilization on even a well policed 
cluster is not uncommon).

As a sizing example, one of our larger instances has 1,000 nodes, 50,000 cores, 
250 users and 1,000 distinct applications run per week, with predominantly 
Spark including a mixture of ETL, Ad Hoc tasks and PySpark Notebook jobs (no 
streaming)

Utilization problems appear to be due in large part to difficulties with 
persist() blocks (DISK or DISK+MEMORY) preventing dynamic deallocation.

In situations where an external shuffle service is present (which is typical on 
clusters of this type) we already solve this for the shuffle block case by 
offloading the IO handling of shuffle blocks to the external service, allowing 
dynamic deallocation to proceed.

Allowing Executors to transfer persist() blocks to some external "shuffle" 
service in a similar manner would be an enormous win for Spark multi-tenancy as 
it would limit deallocation blocking scenarios to only MEMORY-only cache() 
scenarios.

I'm not sure if I'm correct, but I seem to recall seeing in the original 
external shuffle service commits that may have been considered at the time but 
getting shuffle blocks moved to the external shuffle service was the first 
priority.

With support for external persist() DISK blocks in place, we could also then 
handle deallocation of DISK+MEMORY, as the memory instance could first be 
dropped, changing the block to DISK only, and then further transferred to the 
shuffle service.

We have tried to resolve the persist() issue via extensive user training, but 
that has typically only allowed us to improve utilization of the worst 
offenders (10% utilization) up to around 40-60% utilization, as the need for 
persist() is often legitimate and occurs during the middle stages of a job.

In a healthy multi-tenant scenario, a large job might spool up to say 10,000 
cores, persist() data, release executors across a long tail down to 100 cores, 
and then spool back up to 10,000 cores for the following stage without impact 
on the persist() data.

In an ideal world, if an new executor started up on a node on which blocks had 
been transferred to the shuffle service, the new executor might even be able to 
"recapture" control of those blocks (if that would help with performance in 
some way).

And the behavior of gradually expanding up and down several times over the 
course of a job would not just improve utilization, but would allow resources 
to more easily be redistributed to other jobs which start on the cluster during 
the long-tail periods, which would improve multi-tenancy and bring us closer to 
optimal "envy free" YARN scheduling.

  was:
Large and highly multi-tenant Spark on YARN clusters with diverse job execution 
often display terrible utilization rates (we have observed as low as 3-7% CPU 
at max container allocation, but 50% CPU utilization on even a well policed 
cluster is not uncommon).

As a sizing example, one of our larger instances has 1,000 nodes, 50,000 cores, 
250 users and 1,000 distinct applications run per week, with predominantly 
Spark including a mixture of ETL, Ad Hoc tasks and PySpark Notebook jobs (no 
streaming)

Utilization problems appear to be due in large part to difficulties with 
persist() blocks (DISK or DISK+MEMORY) preventing dynamic deallocation.

In situations where an external shuffle service is present (which is typical on 
clusters of this type) we already solve this for the shuffle block case by 
offloading the IO handling of shuffle blocks to the external service, allowing 
dynamic deallocation to proceed.

Allowing Executors to transfer persist() blocks to some external "shuffle" 
service in a similar manner would be an enormous win for Spark multi-tenancy as 
it would limit deallocation blocking scenarios to only MEMORY-only cache() 
scenarios.

I'm not sure if I'm correct, but I seem to recall seeing in the original 
external shuffle service commits that may have been considered at the time but 
getting shuffle blocks moved to the external shuffle service was the first 
priority.

With support for external persist() DISK blocks in place, we could also then 
handle deallocation of DISK+MEMORY, as the memory instance could first be 
dropped, changing the block to DISK only, and then further transferred to the 
shuffle service.

We have tried to resolve the persist() issue via extensive user training, but 
that has typically only allowed us to improve utilization of the worst 
offenders (10% utilization) up to around 40-60% utilization, as the need for 
persist() is often legitimate and occurs 

[jira] [Updated] (SPARK-25888) Service requests for persist() blocks via external service after dynamic deallocation

2018-10-30 Thread Adam Kennedy (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adam Kennedy updated SPARK-25888:
-
Description: 
Large and highly multi-tenant Spark on YARN clusters with diverse job execution 
often display terrible utilization rates (we have observed as low as 3-7% CPU 
at max container allocation, but 50% CPU utilization on even a well policed 
cluster is not uncommon).

As a sizing example, one of our larger instances has 1,000 nodes, 50,000 cores, 
250 users and 1,000 distinct applications run per week, with predominantly 
Spark including a mixture of ETL, Ad Hoc tasks and PySpark Notebook jobs (no 
streaming)

Utilization problems appear to be due in large part to difficulties with 
persist() blocks (DISK or DISK+MEMORY) preventing dynamic deallocation.

In situations where an external shuffle service is present (which is typical on 
clusters of this type) we already solve this for the shuffle block case by 
offloading the IO handling of shuffle blocks to the external service, allowing 
dynamic deallocation to proceed.

Allowing Executors to transfer persist() blocks to some external "shuffle" 
service in a similar manner would be an enormous win for Spark multi-tenancy as 
it would limit deallocation blocking scenarios to only MEMORY-only cache() 
scenarios.

I'm not sure if I'm correct, but I seem to recall seeing in the original 
external shuffle service commits that may have been considered at the time but 
getting shuffle blocks moved to the external shuffle service was the first 
priority.

With support for external persist() DISK blocks in place, we could also then 
handle deallocation of DISK+MEMORY, as the memory instance could first be 
dropped, changing the block to DISK only, and then further transferred to the 
shuffle service.

We have tried to resolve the persist() issue via extensive user training, but 
that has typically only allowed us to improve utilization of the worst 
offenders (10% utilization) up to around 40-60% utilization, as the need for 
persist() is often legitimate and occurs during the middle stages of a job.

In a healthy multi-tenant scenario, the job could spool up to say 10,000 cores, 
persist() data, release executors across a long tail (to say 100 cores) and 
then spool back up to 10,000 cores for the following stage without impact on 
the persist() data.

In an ideal world, if an new executor started up on a node on which blocks had 
been transferred to the shuffle service, the new executor might even be able to 
"recapture" control of those blocks (if that would help with performance in 
some way).

  was:
Large and highly multi-tenant Spark on YARN clusters with large populations and 
diverse job execution often display terrible utilization rates (we have 
observed as low as 3-7% CPU at max container allocation) due in large part to 
difficulties with persist() blocks (DISK or DISK+MEMORY) preventing dynamic 
deallocation.

As a sizing example, one of our larger instances has 1,000 nodes, 50,000 cores, 
250 users and 1,000 distinct applications run per week, with predominantly 
Spark workloads including a mixture of ETL, Ad Hoc and PySpark Notebook jobs, 
plus Hive and some MapReduce (no streaming Spark)

In situations where an external shuffle service is present (which is typical on 
clusters of this type) we already solve this for the shuffle block case by 
offloading the IO handling of shuffle blocks to the external service, allowing 
dynamic deallocation to proceed.

Allowing Executors to transfer persist() blocks to some external "shuffle" 
service in a similar manner would be an enormous win for Spark multi-tenancy as 
it would limit deallocation blocking scenarios to only MEMORY-only cache() 
scenarios.

I'm not sure if I'm correct, but I seem to recall seeing in the original 
external shuffle service commits that may have been considered at the time but 
getting shuffle blocks moved to the external shuffle service was the first 
priority.

With support for external persist() DISK blocks in place, we could also then 
handle deallocation of DISK+MEMORY, as the memory instance could first be 
dropped, changing the block to DISK only, and then further transferred to the 
shuffle service.

We have tried to resolve the persist() issue via extensive user training, but 
that has typically only allowed us to improve utilization of the worst 
offenders (10% utilization) up to around 40-60% utilization, as the need for 
persist() is often legitimate and occurs during the middle stages of a job.

In a healthy multi-tenant scenario, the job could spool up to say 10,000 cores, 
persist() data, release executors across a long tail (to say 100 cores) and 
then spool back up to 10,000 cores for the following stage without impact on 
the persist() data.

In an ideal world, if an new executor started up on a node on which blocks had 
been transferred to the shuffle 

[jira] [Updated] (SPARK-25888) Service requests for persist() blocks via external service after dynamic deallocation

2018-10-30 Thread Adam Kennedy (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adam Kennedy updated SPARK-25888:
-
Environment: (was: Large YARN cluster with 1,000 nodes, 50,000 cores 
and 250 users, with predominantly Spark workloads including a mixture of ETL, 
Ad Hoc and PySpark Notebook jobs.

No streaming Spark.)
Description: 
Large and highly multi-tenant Spark on YARN clusters with large populations and 
diverse job execution often display terrible utilization rates (we have 
observed as low as 3-7% CPU at max container allocation) due in large part to 
difficulties with persist() blocks (DISK or DISK+MEMORY) preventing dynamic 
deallocation.

As a sizing example, one of our larger instances has 1,000 nodes, 50,000 cores, 
250 users and 1,000 distinct applications run per week, with predominantly 
Spark workloads including a mixture of ETL, Ad Hoc and PySpark Notebook jobs, 
plus Hive and some MapReduce (no streaming Spark)

In situations where an external shuffle service is present (which is typical on 
clusters of this type) we already solve this for the shuffle block case by 
offloading the IO handling of shuffle blocks to the external service, allowing 
dynamic deallocation to proceed.

Allowing Executors to transfer persist() blocks to some external "shuffle" 
service in a similar manner would be an enormous win for Spark multi-tenancy as 
it would limit deallocation blocking scenarios to only MEMORY-only cache() 
scenarios.

I'm not sure if I'm correct, but I seem to recall seeing in the original 
external shuffle service commits that may have been considered at the time but 
getting shuffle blocks moved to the external shuffle service was the first 
priority.

With support for external persist() DISK blocks in place, we could also then 
handle deallocation of DISK+MEMORY, as the memory instance could first be 
dropped, changing the block to DISK only, and then further transferred to the 
shuffle service.

We have tried to resolve the persist() issue via extensive user training, but 
that has typically only allowed us to improve utilization of the worst 
offenders (10% utilization) up to around 40-60% utilization, as the need for 
persist() is often legitimate and occurs during the middle stages of a job.

In a healthy multi-tenant scenario, the job could spool up to say 10,000 cores, 
persist() data, release executors across a long tail (to say 100 cores) and 
then spool back up to 10,000 cores for the following stage without impact on 
the persist() data.

In an ideal world, if an new executor started up on a node on which blocks had 
been transferred to the shuffle service, the new executor might even be able to 
"recapture" control of those blocks (if that would help with performance in 
some way).

  was:
Large and highly multi-tenant Spark on YARN clusters with large populations and 
diverse job execution often display terrible utilization rates (we have 
observed as low as 3-7% CPU at max container allocation) due in large part to 
difficulties with persist() blocks (DISK or DISK+MEMORY) preventing dynamic 
deallocation.

In situations where an external shuffle service is present (which is typical on 
clusters of this type) we already solve this for the shuffle block case by 
offloading the IO handling of shuffle blocks to the external service, allowing 
dynamic deallocation to proceed.

Allowing Executors to transfer persist() blocks to some external "shuffle" 
service in a similar manner would be an enormous win for Spark multi-tenancy as 
it would limit deallocation blocking scenarios to only MEMORY-only cache() 
scenarios.

I'm not sure if I'm correct, but I seem to recall seeing in the original 
external shuffle service commits that may have been considered at the time but 
getting shuffle blocks moved to the external shuffle service was the first 
priority.

With support for external persist() DISK blocks in place, we could also then 
handle deallocation of DISK+MEMORY, as the memory instance could first be 
dropped, changing the block to DISK only, and then further transferred to the 
shuffle service.

We have tried to resolve the persist() issue via extensive user training, but 
that has typically only allowed us to improve utilization of the worst 
offenders (10% utilization) up to around 40-60% utilization, as the need for 
persist() is often legitimate and occurs during the middle stages of a job.

In a healthy multi-tenant scenario, the job could spool up to say 10,000 cores, 
persist() data, release executors across a long tail (to say 100 cores) and 
then spool back up to 10,000 cores for the following stage without impact on 
the persist() data.

In an ideal world, if an new executor started up on a node on which blocks had 
been transferred to the shuffle service, the new executor might even be able to 
"recapture" control of those blocks (if that would help with performance 

[jira] [Updated] (SPARK-25888) Service requests for persist() blocks via external service after dynamic deallocation

2018-10-30 Thread Adam Kennedy (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adam Kennedy updated SPARK-25888:
-
Environment: 
Large YARN cluster with 1,000 nodes, 50,000 cores and 250 users, with 
predominantly Spark workloads including a mixture of ETL, Ad Hoc and PySpark 
Notebook jobs.

No streaming Spark.

> Service requests for persist() blocks via external service after dynamic 
> deallocation
> -
>
> Key: SPARK-25888
> URL: https://issues.apache.org/jira/browse/SPARK-25888
> Project: Spark
>  Issue Type: Improvement
>  Components: Block Manager, Shuffle, YARN
>Affects Versions: 2.3.2
> Environment: Large YARN cluster with 1,000 nodes, 50,000 cores and 
> 250 users, with predominantly Spark workloads including a mixture of ETL, Ad 
> Hoc and PySpark Notebook jobs.
> No streaming Spark.
>Reporter: Adam Kennedy
>Priority: Major
>
> Large and highly multi-tenant Spark on YARN clusters with large populations 
> and diverse job execution often display terrible utilization rates (we have 
> observed as low as 3-7% CPU at max container allocation) due in large part to 
> difficulties with persist() blocks (DISK or DISK+MEMORY) preventing dynamic 
> deallocation.
> In situations where an external shuffle service is present (which is typical 
> on clusters of this type) we already solve this for the shuffle block case by 
> offloading the IO handling of shuffle blocks to the external service, 
> allowing dynamic deallocation to proceed.
> Allowing Executors to transfer persist() blocks to some external "shuffle" 
> service in a similar manner would be an enormous win for Spark multi-tenancy 
> as it would limit deallocation blocking scenarios to only MEMORY-only cache() 
> scenarios.
> I'm not sure if I'm correct, but I seem to recall seeing in the original 
> external shuffle service commits that may have been considered at the time 
> but getting shuffle blocks moved to the external shuffle service was the 
> first priority.
> With support for external persist() DISK blocks in place, we could also then 
> handle deallocation of DISK+MEMORY, as the memory instance could first be 
> dropped, changing the block to DISK only, and then further transferred to the 
> shuffle service.
> We have tried to resolve the persist() issue via extensive user training, but 
> that has typically only allowed us to improve utilization of the worst 
> offenders (10% utilization) up to around 40-60% utilization, as the need for 
> persist() is often legitimate and occurs during the middle stages of a job.
> In a healthy multi-tenant scenario, the job could spool up to say 10,000 
> cores, persist() data, release executors across a long tail (to say 100 
> cores) and then spool back up to 10,000 cores for the following stage without 
> impact on the persist() data.
> In an ideal world, if an new executor started up on a node on which blocks 
> had been transferred to the shuffle service, the new executor might even be 
> able to "recapture" control of those blocks (if that would help with 
> performance in some way).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25888) Service requests for persist() blocks via external service after dynamic deallocation

2018-10-30 Thread Adam Kennedy (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adam Kennedy updated SPARK-25888:
-
Summary: Service requests for persist() blocks via external service after 
dynamic deallocation  (was: Service requests for persist() block via external 
service after dynamic deallocation)

> Service requests for persist() blocks via external service after dynamic 
> deallocation
> -
>
> Key: SPARK-25888
> URL: https://issues.apache.org/jira/browse/SPARK-25888
> Project: Spark
>  Issue Type: Improvement
>  Components: Block Manager, Shuffle, YARN
>Affects Versions: 2.3.2
>Reporter: Adam Kennedy
>Priority: Major
>
> Large and highly multi-tenant Spark on YARN clusters with large populations 
> and diverse job execution often display terrible utilization rates (we have 
> observed as low as 3-7% CPU at max container allocation) due in large part to 
> difficulties with persist() blocks (DISK or DISK+MEMORY) preventing dynamic 
> deallocation.
> In situations where an external shuffle service is present (which is typical 
> on clusters of this type) we already solve this for the shuffle block case by 
> offloading the IO handling of shuffle blocks to the external service, 
> allowing dynamic deallocation to proceed.
> Allowing Executors to transfer persist() blocks to some external "shuffle" 
> service in a similar manner would be an enormous win for Spark multi-tenancy 
> as it would limit deallocation blocking scenarios to only MEMORY-only cache() 
> scenarios.
> I'm not sure if I'm correct, but I seem to recall seeing in the original 
> external shuffle service commits that may have been considered at the time 
> but getting shuffle blocks moved to the external shuffle service was the 
> first priority.
> With support for external persist() DISK blocks in place, we could also then 
> handle deallocation of DISK+MEMORY, as the memory instance could first be 
> dropped, changing the block to DISK only, and then further transferred to the 
> shuffle service.
> We have tried to resolve the persist() issue via extensive user training, but 
> that has typically only allowed us to improve utilization of the worst 
> offenders (10% utilization) up to around 40-60% utilization, as the need for 
> persist() is often legitimate and occurs during the middle stages of a job.
> In a healthy multi-tenant scenario, the job could spool up to say 10,000 
> cores, persist() data, release executors across a long tail (to say 100 
> cores) and then spool back up to 10,000 cores for the following stage without 
> impact on the persist() data.
> In an ideal world, if an new executor started up on a node on which blocks 
> had been transferred to the shuffle service, the new executor might even be 
> able to "recapture" control of those blocks (if that would help with 
> performance in some way).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25888) Service requests for persist() block via external service after dynamic deallocation

2018-10-30 Thread Adam Kennedy (JIRA)
Adam Kennedy created SPARK-25888:


 Summary: Service requests for persist() block via external service 
after dynamic deallocation
 Key: SPARK-25888
 URL: https://issues.apache.org/jira/browse/SPARK-25888
 Project: Spark
  Issue Type: Improvement
  Components: Block Manager, Shuffle, YARN
Affects Versions: 2.3.2
Reporter: Adam Kennedy


Large and highly multi-tenant Spark on YARN clusters with large populations and 
diverse job execution often display terrible utilization rates (we have 
observed as low as 3-7% CPU at max container allocation) due in large part to 
difficulties with persist() blocks (DISK or DISK+MEMORY) preventing dynamic 
deallocation.

In situations where an external shuffle service is present (which is typical on 
clusters of this type) we already solve this for the shuffle block case by 
offloading the IO handling of shuffle blocks to the external service, allowing 
dynamic deallocation to proceed.

Allowing Executors to transfer persist() blocks to some external "shuffle" 
service in a similar manner would be an enormous win for Spark multi-tenancy as 
it would limit deallocation blocking scenarios to only MEMORY-only cache() 
scenarios.

I'm not sure if I'm correct, but I seem to recall seeing in the original 
external shuffle service commits that may have been considered at the time but 
getting shuffle blocks moved to the external shuffle service was the first 
priority.

With support for external persist() DISK blocks in place, we could also then 
handle deallocation of DISK+MEMORY, as the memory instance could first be 
dropped, changing the block to DISK only, and then further transferred to the 
shuffle service.

We have tried to resolve the persist() issue via extensive user training, but 
that has typically only allowed us to improve utilization of the worst 
offenders (10% utilization) up to around 40-60% utilization, as the need for 
persist() is often legitimate and occurs during the middle stages of a job.

In a healthy multi-tenant scenario, the job could spool up to say 10,000 cores, 
persist() data, release executors across a long tail (to say 100 cores) and 
then spool back up to 10,000 cores for the following stage without impact on 
the persist() data.

In an ideal world, if an new executor started up on a node on which blocks had 
been transferred to the shuffle service, the new executor might even be able to 
"recapture" control of those blocks (if that would help with performance in 
some way).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21155) Add (? running tasks) into Spark UI progress

2017-10-27 Thread Adam Kennedy (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222120#comment-16222120
 ] 

Adam Kennedy commented on SPARK-21155:
--

It can already get fairly crowded in that run area on large jobs (since it also 
has killed jobs and other things).

Could we use a simpler format similar to the progress bar in the console? 
Something similar to this...

x+y/z

... meaning ...

Done + Running / Total 

> Add (? running tasks) into Spark UI progress
> 
>
> Key: SPARK-21155
> URL: https://issues.apache.org/jira/browse/SPARK-21155
> Project: Spark
>  Issue Type: Improvement
>  Components: Web UI
>Affects Versions: 2.1.1
>Reporter: Eric Vandenberg
>Assignee: Eric Vandenberg
>Priority: Minor
> Fix For: 2.3.0
>
> Attachments: Screen Shot 2017-06-20 at 12.32.58 PM.png, Screen Shot 
> 2017-06-20 at 3.40.39 PM.png, Screen Shot 2017-06-22 at 9.58.08 AM.png
>
>
> The progress UI for Active Jobs / Tasks should show the number of exact 
> number of running tasks.  See screen shot attachment for what this looks like.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19112) add codec for ZStandard

2017-08-03 Thread Adam Kennedy (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16113653#comment-16113653
 ] 

Adam Kennedy commented on SPARK-19112:
--

Will this be impacted by LEGAL-303? zstd-jni embeds zstd which has the Facebook 
PATENTS file in it.

> add codec for ZStandard
> ---
>
> Key: SPARK-19112
> URL: https://issues.apache.org/jira/browse/SPARK-19112
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Reporter: Thomas Graves
>Priority: Minor
>
> ZStandard: https://github.com/facebook/zstd and 
> http://facebook.github.io/zstd/ has been in use for a while now. v1.0 was 
> recently released. Hadoop 
> (https://issues.apache.org/jira/browse/HADOOP-13578) and others 
> (https://issues.apache.org/jira/browse/KAFKA-4514) are adopting it.
> Zstd seems to give great results => Gzip level Compression with Lz4 level CPU.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org