[jira] [Commented] (SPARK-21682) Caching 100k-task RDD GC-kills driver (due to updatedBlockStatuses?)

2017-08-09 Thread DjvuLee (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16120959#comment-16120959
 ] 

DjvuLee commented on SPARK-21682:
-

Yes, our company also faced with this scalability problem, the driver can 
easily died under a 70K partition.

> Caching 100k-task RDD GC-kills driver (due to updatedBlockStatuses?)
> 
>
> Key: SPARK-21682
> URL: https://issues.apache.org/jira/browse/SPARK-21682
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.0.2, 2.1.1, 2.2.0
>Reporter: Ryan Williams
>
> h3. Summary
> * {{sc.parallelize(1 to 10, 10).cache.count}} causes a driver GC 
> stall midway through on every configuration and version I've tried in 2.x.
> * It runs fine with no Full GCs as of 1.6.3
> * I think that {{internal.metrics.updatedBlockStatuses}} is the culprit, and 
> breaks a contract about what big-O sizes accumulators' values can be:
> ** they are each of size O(P), where P is the number of partitions in a 
> cached RDD
> ** ⇒ the driver must process O(P²) data from {{TaskEnd}} events, instead of 
> O(P)
> ** ⇒ the driver also must process O(P*E) work every 10s from 
> {{ExecutorMetricsUpdates}} (where E is the number of executors; cf. 
> {{spark.executor.heartbeatInterval}})
> * when operating on a 100k-partition cached RDD, the driver enters a GC loop 
> due to all the allocations it must do to process {{ExecutorMetricsUpdate}} 
> and {{TaskEnd}} events with {{updatedBlockStatuses}} attached
> * this metric should be disabled, or some ability to blacklist it from the 
> command-line should be added.
> * [SPARK-20084|https://issues.apache.org/jira/browse/SPARK-20084] addressed 
> one part of this - the event-log size had exploded - but the root problem 
> still exists / is worse
> h3. {{count}} a 100k-partition RDD: works fine without {{.cache}}
> In Spark 2.2.0 or 2.1.1:
> {code}
> spark-shell --conf spark.driver.extraJavaOptions="-XX:+PrintGCDetails 
> -XX:+PrintGCTimeStamps -verbose:gc"
> scala> val rdd = sc.parallelize(1 to 10, 10)
> scala> rdd.count
> {code}
> In YARN and local modes, this finishes in ~20s seconds with ~20 partial GCs 
> logged, all taking under 0.1s ([example 
> output|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-local-mode-rdd-not-cached-works-fine]);
>  all is well!
> h3. {{count}} a 100k-partition cached RDD: GC-dies
> If we {{cache}} the RDD first, the same {{count}} job quickly sends the 
> driver into a GC death spiral: full GC's start after a few thousand tasks and 
> increase in frequency and length until they last minutes / become continuous 
> (and, in YARN, the driver loses contact with any executors).
> Example outputs: 
> [local|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-local-mode-rdd-cached-crashes],
>  
> [YARN|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-yarn-mode-cached-rdd-dies].
> The YARN example removes any confusion about whether the storing of the 
> blocks is causing memory pressure on the driver; the driver is basically 
> doing no work except receiving executor updates and events, and yet it 
> becomes overloaded. 
> h3. Can't effectively throw driver heap at the problem
> I've tested with 1GB, 10GB, and 20GB heaps, and the larger heaps do what we'd 
> expect: delay the first Full GC, and make Full GCs longer when they happen. 
> I don't have a clear sense on whether the onset is linear or quadratic (i.e. 
> do I get twice as far into the job before the first Full GC with a 20GB as 
> with a 10GB heap, or only sqrt(2) times as far?).
> h3. Mostly memory pressure, not OOMs
> An interesting note is that I'm rarely seeing OOMs as a result of this, even 
> on small heaps.
> I think this is consistent with the idea that all this data is being 
> immediately discarded by the driver, as opposed to kept around to serve web 
> UIs or somesuch.
> h3. Eliminating {{ExecutorMetricsUpdate}}'s doesn't seem to help
> Interestingly, setting large values of {{spark.executor.heartbeatInterval}} 
> doesn't seem to mitigate the problem; GC-stall sets in at about the same 
> point in the {{count}} job.
> This implies that, in this example, the {{TaskEnd}} events are doing most or 
> all of the damage.
> h3. CMS helps but doesn't solve the problem
> In some rough testing, I saw the {{count}} get about twice as far before 
> dying when using the CMS collector.
> h3. What bandwidth do we expect the driver to process events at?
> IIUC, every 10s the driver gets O(T) (~100k?) block updates from each of ~500 
> executors, and allocating objects for these updates is pushing it over a 
> tipping point where it can't keep up. 
> I don't know how to get good numbers on how much data the drive

[jira] [Commented] (SPARK-21682) Caching 100k-task RDD GC-kills driver (due to updatedBlockStatuses?)

2017-08-09 Thread Ryan Williams (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16120537#comment-16120537
 ] 

Ryan Williams commented on SPARK-21682:
---

bq. But do you really need to create so many partitions? Could you use 
`coalesce` to reduce the number of partitions?

In my real app, that is exactly what I am trying to do! I am {{count}}'ing the 
records to determine how many partitions I should {{coalesce}} to, and I am 
{{cache}}'ing to avoid computing the RDD twice (once for the {{count}}, once 
for the repartition) because that would be very expensive.

More detail: I have a large RDD (~100BN records, 100k partitions) that I am 
filtering down to what is likely to be <1MM records (but might not be!). ~1MM 
records/partition is a good size for this data, both before and after the 
filtering/repartitioning, based on what I know about it (and have already 
observed in this app when I try to put much more than that on a single 
partition, and see GC problems). 

If that sounds crazy to you, please tell me.


Otherwise, can we instead talk about:
* should Spark fall over at 100k partitions?
* this worked fine in 1.6.3 but afaict it's impossible to {{count}} a 
100k-partition cached RDD in the 2.x line. Is that a problem?
* does anyone know how much work the driver is doing before and after e.g. a 
large accumulator is added, and how much the "maximum number of partitions 
before GC-stall-death" ceiling is lowered by a given change?


> Caching 100k-task RDD GC-kills driver (due to updatedBlockStatuses?)
> 
>
> Key: SPARK-21682
> URL: https://issues.apache.org/jira/browse/SPARK-21682
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.0.2, 2.1.1, 2.2.0
>Reporter: Ryan Williams
>
> h3. Summary
> * {{sc.parallelize(1 to 10, 10).cache.count}} causes a driver GC 
> stall midway through on every configuration and version I've tried in 2.x.
> * It runs fine with no Full GCs as of 1.6.3
> * I think that {{internal.metrics.updatedBlockStatuses}} is the culprit, and 
> breaks a contract about what big-O sizes accumulators' values can be:
> ** they are each of size O(P), where P is the number of partitions in a 
> cached RDD
> ** ⇒ the driver must process O(P²) data from {{TaskEnd}} events, instead of 
> O(P)
> ** ⇒ the driver also must process O(P*E) work every 10s from 
> {{ExecutorMetricsUpdates}} (where E is the number of executors; cf. 
> {{spark.executor.heartbeatInterval}})
> * when operating on a 100k-partition cached RDD, the driver enters a GC loop 
> due to all the allocations it must do to process {{ExecutorMetricsUpdate}} 
> and {{TaskEnd}} events with {{updatedBlockStatuses}} attached
> * this metric should be disabled, or some ability to blacklist it from the 
> command-line should be added.
> * [SPARK-20084|https://issues.apache.org/jira/browse/SPARK-20084] addressed 
> one part of this - the event-log size had exploded - but the root problem 
> still exists / is worse
> h3. {{count}} a 100k-partition RDD: works fine without {{.cache}}
> In Spark 2.2.0 or 2.1.1:
> {code}
> spark-shell --conf spark.driver.extraJavaOptions="-XX:+PrintGCDetails 
> -XX:+PrintGCTimeStamps -verbose:gc"
> scala> val rdd = sc.parallelize(1 to 10, 10)
> scala> rdd.count
> {code}
> In YARN and local modes, this finishes in ~20s seconds with ~20 partial GCs 
> logged, all taking under 0.1s ([example 
> output|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-local-mode-rdd-not-cached-works-fine]);
>  all is well!
> h3. {{count}} a 100k-partition cached RDD: GC-dies
> If we {{cache}} the RDD first, the same {{count}} job quickly sends the 
> driver into a GC death spiral: full GC's start after a few thousand tasks and 
> increase in frequency and length until they last minutes / become continuous 
> (and, in YARN, the driver loses contact with any executors).
> Example outputs: 
> [local|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-local-mode-rdd-cached-crashes],
>  
> [YARN|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-yarn-mode-cached-rdd-dies].
> The YARN example removes any confusion about whether the storing of the 
> blocks is causing memory pressure on the driver; the driver is basically 
> doing no work except receiving executor updates and events, and yet it 
> becomes overloaded. 
> h3. Can't effectively throw driver heap at the problem
> I've tested with 1GB, 10GB, and 20GB heaps, and the larger heaps do what we'd 
> expect: delay the first Full GC, and make Full GCs longer when they happen. 
> I don't have a clear sense on whether the onset is linear or quadratic (i.e. 
> do I get twice as far into the job before the first Full GC with 

[jira] [Commented] (SPARK-21682) Caching 100k-task RDD GC-kills driver (due to updatedBlockStatuses?)

2017-08-09 Thread Shixiong Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16120415#comment-16120415
 ] 

Shixiong Zhu commented on SPARK-21682:
--

I agree that driver is a bottleneck. I already saw several issues about the 
driver scalability. But do you really need to create so many partitions? Could 
you use `coalesce` to reduce the number of partitions?

> Caching 100k-task RDD GC-kills driver (due to updatedBlockStatuses?)
> 
>
> Key: SPARK-21682
> URL: https://issues.apache.org/jira/browse/SPARK-21682
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.0.2, 2.1.1, 2.2.0
>Reporter: Ryan Williams
>
> h3. Summary
> * {{sc.parallelize(1 to 10, 10).cache.count}} causes a driver GC 
> stall midway through on every configuration and version I've tried in 2.x.
> * It runs fine with no Full GCs as of 1.6.3
> * I think that {{internal.metrics.updatedBlockStatuses}} is the culprit, and 
> breaks a contract about what big-O sizes accumulators' values can be:
> ** they are each of size O(P), where P is the number of partitions in a 
> cached RDD
> ** ⇒ the driver must process O(P²) data from {{TaskEnd}} events, instead of 
> O(P)
> ** ⇒ the driver also must process O(P*E) work every 10s from 
> {{ExecutorMetricsUpdates}} (where E is the number of executors; cf. 
> {{spark.executor.heartbeatInterval}})
> * when operating on a 100k-partition cached RDD, the driver enters a GC loop 
> due to all the allocations it must do to process {{ExecutorMetricsUpdate}} 
> and {{TaskEnd}} events with {{updatedBlockStatuses}} attached
> * this metric should be disabled, or some ability to blacklist it from the 
> command-line should be added.
> * [SPARK-20084|https://issues.apache.org/jira/browse/SPARK-20084] addressed 
> one part of this - the event-log size had exploded - but the root problem 
> still exists / is worse
> h3. {{count}} a 100k-partition RDD: works fine without {{.cache}}
> In Spark 2.2.0 or 2.1.1:
> {code}
> spark-shell --conf spark.driver.extraJavaOptions="-XX:+PrintGCDetails 
> -XX:+PrintGCTimeStamps -verbose:gc"
> scala> val rdd = sc.parallelize(1 to 10, 10)
> scala> rdd.count
> {code}
> In YARN and local modes, this finishes in ~20s seconds with ~20 partial GCs 
> logged, all taking under 0.1s ([example 
> output|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-local-mode-rdd-not-cached-works-fine]);
>  all is well!
> h3. {{count}} a 100k-partition cached RDD: GC-dies
> If we {{cache}} the RDD first, the same {{count}} job quickly sends the 
> driver into a GC death spiral: full GC's start after a few thousand tasks and 
> increase in frequency and length until they last minutes / become continuous 
> (and, in YARN, the driver loses contact with any executors).
> Example outputs: 
> [local|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-local-mode-rdd-cached-crashes],
>  
> [YARN|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-yarn-mode-cached-rdd-dies].
> The YARN example removes any confusion about whether the storing of the 
> blocks is causing memory pressure on the driver; the driver is basically 
> doing no work except receiving executor updates and events, and yet it 
> becomes overloaded. 
> h3. Can't effectively throw driver heap at the problem
> I've tested with 1GB, 10GB, and 20GB heaps, and the larger heaps do what we'd 
> expect: delay the first Full GC, and make Full GCs longer when they happen. 
> I don't have a clear sense on whether the onset is linear or quadratic (i.e. 
> do I get twice as far into the job before the first Full GC with a 20GB as 
> with a 10GB heap, or only sqrt(2) times as far?).
> h3. Mostly memory pressure, not OOMs
> An interesting note is that I'm rarely seeing OOMs as a result of this, even 
> on small heaps.
> I think this is consistent with the idea that all this data is being 
> immediately discarded by the driver, as opposed to kept around to serve web 
> UIs or somesuch.
> h3. Eliminating {{ExecutorMetricsUpdate}}'s doesn't seem to help
> Interestingly, setting large values of {{spark.executor.heartbeatInterval}} 
> doesn't seem to mitigate the problem; GC-stall sets in at about the same 
> point in the {{count}} job.
> This implies that, in this example, the {{TaskEnd}} events are doing most or 
> all of the damage.
> h3. CMS helps but doesn't solve the problem
> In some rough testing, I saw the {{count}} get about twice as far before 
> dying when using the CMS collector.
> h3. What bandwidth do we expect the driver to process events at?
> IIUC, every 10s the driver gets O(T) (~100k?) block updates from each of ~500 
> executors, and allocating objects for these updates is pushing it ov

[jira] [Commented] (SPARK-21682) Caching 100k-task RDD GC-kills driver due to updatedBlockStatuses

2017-08-09 Thread Ryan Williams (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16120409#comment-16120409
 ] 

Ryan Williams commented on SPARK-21682:
---

Interestingly, I thought the {{updatedBlockStatuses}} issue started in 2.1.x, 
but I can't {{sc.parallelize(1 to 10, 10).cache.count}} on 2.0.2 either.

Maybe the issue started earlier than I thought, or something else is causing 
the driver GC stall?

I can {{sc.parallelize(1 to 10, 10).cache.count}} with no Full GCs / no 
problem on 1.6.3 in local mode with a 1GB driver.

> Caching 100k-task RDD GC-kills driver due to updatedBlockStatuses
> -
>
> Key: SPARK-21682
> URL: https://issues.apache.org/jira/browse/SPARK-21682
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.0.2, 2.1.1, 2.2.0
>Reporter: Ryan Williams
>
> h3. Summary
> * {{internal.metrics.updatedBlockStatuses}} breaks a contract about what 
> big-O sizes accumulators' values can be:
> ** they are each of size O(P), where P is the number of partitions in a 
> cached RDD
> ** ⇒ the driver must process O(P²) data from {{TaskEnd}} events, instead of 
> O(P)
> ** ⇒ the driver also must process O(P*E) work every 10s from 
> {{ExecutorMetricsUpdates}} (where E is the number of executors; cf. 
> {{spark.executor.heartbeatInterval}})
> * when operating on a 100k-partition cached RDD, the driver enters a GC loop 
> due to all the allocations it must do to process {{ExecutorMetricsUpdate}} 
> and {{TaskEnd}} events with {{updatedBlockStatuses}} attached
> * this metric should be disabled, or some ability to blacklist it from the 
> command-line should be added.
> * [SPARK-20084|https://issues.apache.org/jira/browse/SPARK-20084] addressed 
> one part of this - the event-log size had exploded - but the root problem 
> still exists / is worse
> h3. {{count}} a 100k-partition RDD: works fine without {{.cache}}
> In Spark 2.2.0 or 2.1.1:
> {code}
> spark-shell --conf spark.driver.extraJavaOptions="-XX:+PrintGCDetails 
> -XX:+PrintGCTimeStamps -verbose:gc"
> scala> val rdd = sc.parallelize(1 to 10, 10)
> scala> rdd.count
> {code}
> In YARN and local modes, this finishes in ~20s seconds with ~20 partial GCs 
> logged, all taking under 0.1s ([example 
> output|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-local-mode-rdd-not-cached-works-fine]);
>  all is well!
> h3. {{count}} a 100k-partition cached RDD: GC-dies
> If we {{cache}} the RDD first, the same {{count}} job quickly sends the 
> driver into a GC death spiral: full GC's start after a few thousand tasks and 
> increase in frequency and length until they last minutes / become continuous 
> (and, in YARN, the driver loses contact with any executors).
> Example outputs: 
> [local|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-local-mode-rdd-cached-crashes],
>  
> [YARN|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-yarn-mode-cached-rdd-dies].
> The YARN example removes any confusion about whether the storing of the 
> blocks is causing memory pressure on the driver; the driver is basically 
> doing no work except receiving executor updates and events, and yet it 
> becomes overloaded. 
> h3. Can't effectively throw driver heap at the problem
> I've tested with 1GB, 10GB, and 20GB heaps, and the larger heaps do what we'd 
> expect: delay the first Full GC, and make Full GCs longer when they happen. 
> I don't have a clear sense on whether the onset is linear or quadratic (i.e. 
> do I get twice as far into the job before the first Full GC with a 20GB as 
> with a 10GB heap, or only sqrt(2) times as far?).
> h3. Mostly memory pressure, not OOMs
> An interesting note is that I'm rarely seeing OOMs as a result of this, even 
> on small heaps.
> I think this is consistent with the idea that all this data is being 
> immediately discarded by the driver, as opposed to kept around to serve web 
> UIs or somesuch.
> h3. Eliminating {{ExecutorMetricsUpdate}}'s doesn't seem to help
> Interestingly, setting large values of {{spark.executor.heartbeatInterval}} 
> doesn't seem to mitigate the problem; GC-stall sets in at about the same 
> point in the {{count}} job.
> This implies that, in this example, the {{TaskEnd}} events are doing most or 
> all of the damage.
> h3. CMS helps but doesn't solve the problem
> In some rough testing, I saw the {{count}} get about twice as far before 
> dying when using the CMS collector.
> h3. What bandwidth do we expect the driver to process events at?
> IIUC, every 10s the driver gets O(T) (~100k?) block updates from each of ~500 
> executors, and allocating objects for these updates is pushing it over a 
> tipping point where it can't keep up. 
> I don't