[jira] [Commented] (SPARK-20178) Improve Scheduler fetch failures
[ https://issues.apache.org/jira/browse/SPARK-20178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17476710#comment-17476710 ] Venkat Sambath commented on SPARK-20178: Is design doc referenced in this jira https://docs.google.com/document/d/1D3b_ishMfm5sXmRS494JrOJmL9V_TRVZUB4TgK1l1fY/edit?usp=sharing available anywhere else? > Improve Scheduler fetch failures > > > Key: SPARK-20178 > URL: https://issues.apache.org/jira/browse/SPARK-20178 > Project: Spark > Issue Type: Epic > Components: Scheduler, Spark Core >Affects Versions: 2.1.0 >Reporter: Thomas Graves >Priority: Major > Labels: bulk-closed > > We have been having a lot of discussions around improving the handling of > fetch failures. There are 4 jira currently related to this. > We should try to get a list of things we want to improve and come up with one > cohesive design. > SPARK-20163, SPARK-20091, SPARK-14649 , and SPARK-19753 > I will put my initial thoughts in a follow on comment. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20178) Improve Scheduler fetch failures
[ https://issues.apache.org/jira/browse/SPARK-20178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16043622#comment-16043622 ] Josh Rosen commented on SPARK-20178: Update: I commented over on https://github.com/apache/spark/pull/18150#discussion_r121018254. I now think that [~sitalke...@gmail.com]'s original approach is a good move for now. If there's controversy then I propose to add an experimental feature-flag to let users fall back to older behavior. > Improve Scheduler fetch failures > > > Key: SPARK-20178 > URL: https://issues.apache.org/jira/browse/SPARK-20178 > Project: Spark > Issue Type: Epic > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Thomas Graves > > We have been having a lot of discussions around improving the handling of > fetch failures. There are 4 jira currently related to this. > We should try to get a list of things we want to improve and come up with one > cohesive design. > SPARK-20163, SPARK-20091, SPARK-14649 , and SPARK-19753 > I will put my initial thoughts in a follow on comment. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20178) Improve Scheduler fetch failures
[ https://issues.apache.org/jira/browse/SPARK-20178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16030284#comment-16030284 ] Sital Kedia commented on SPARK-20178: - https://github.com/apache/spark/pull/18150 > Improve Scheduler fetch failures > > > Key: SPARK-20178 > URL: https://issues.apache.org/jira/browse/SPARK-20178 > Project: Spark > Issue Type: Epic > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Thomas Graves > > We have been having a lot of discussions around improving the handling of > fetch failures. There are 4 jira currently related to this. > We should try to get a list of things we want to improve and come up with one > cohesive design. > SPARK-20163, SPARK-20091, SPARK-14649 , and SPARK-19753 > I will put my initial thoughts in a follow on comment. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20178) Improve Scheduler fetch failures
[ https://issues.apache.org/jira/browse/SPARK-20178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16029425#comment-16029425 ] Thomas Graves commented on SPARK-20178: --- Yeah I think we should do something here. I never looked at the code details closely in https://github.com/apache/spark/pull/17088 but you can resurrect and we can look at in more detail. [~joshrosen] are you ok with that or did you have something else in progress? > Improve Scheduler fetch failures > > > Key: SPARK-20178 > URL: https://issues.apache.org/jira/browse/SPARK-20178 > Project: Spark > Issue Type: Epic > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Thomas Graves > > We have been having a lot of discussions around improving the handling of > fetch failures. There are 4 jira currently related to this. > We should try to get a list of things we want to improve and come up with one > cohesive design. > SPARK-20163, SPARK-20091, SPARK-14649 , and SPARK-19753 > I will put my initial thoughts in a follow on comment. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20178) Improve Scheduler fetch failures
[ https://issues.apache.org/jira/browse/SPARK-20178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027036#comment-16027036 ] Sital Kedia commented on SPARK-20178: - >> So to get the robustness for now I'm fine with just invalidating it >> immediately and see how that works. [~tgraves] - Let me know if you want me to resurrect - https://github.com/apache/spark/pull/17088 which exactly does that. It was closed inadvertently as a stale PR. > Improve Scheduler fetch failures > > > Key: SPARK-20178 > URL: https://issues.apache.org/jira/browse/SPARK-20178 > Project: Spark > Issue Type: Epic > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Thomas Graves > > We have been having a lot of discussions around improving the handling of > fetch failures. There are 4 jira currently related to this. > We should try to get a list of things we want to improve and come up with one > cohesive design. > SPARK-20163, SPARK-20091, SPARK-14649 , and SPARK-19753 > I will put my initial thoughts in a follow on comment. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20178) Improve Scheduler fetch failures
[ https://issues.apache.org/jira/browse/SPARK-20178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16021199#comment-16021199 ] Thomas Graves commented on SPARK-20178: --- | My understanding of today's code is that a single FetchFailed task will trigger a stage failure and parent stage retry and that the task which experienced the fetch failure will not be retried within the same task set that scheduled it. I'm basing this off the comment at https://github.com/apache/spark/blob/9b09101938399a3490c3c9bde9e5f07031140fdf/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L77 and the code at https://github.com/apache/spark/blob/9b09101938399a3490c3c9bde9e5f07031140fdf/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L770 where the TSM prevents re-attempts of FetchFailed tasks. That is correct but that doesn't mean we can't track the fetch failures on a host across stages. You may or may not get multiple fetch failures in the first stage before it is aborted (very timing dependent) so you are correct that you can't rely on that. But if you track those across stage attempts and if the max is set to 2 or 3 then it will clear the entire host before the 4 default stage failures. This might give us a little more confidence its a hard failure vs a transient failure. But that does take extra tracking and right now I don't have a good measure of metrics to tell me how many of different kinds of failures. So to get the robustness for now I'm fine with just invalidating it immediately and see how that works. > Improve Scheduler fetch failures > > > Key: SPARK-20178 > URL: https://issues.apache.org/jira/browse/SPARK-20178 > Project: Spark > Issue Type: Epic > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Thomas Graves > > We have been having a lot of discussions around improving the handling of > fetch failures. There are 4 jira currently related to this. > We should try to get a list of things we want to improve and come up with one > cohesive design. > SPARK-20163, SPARK-20091, SPARK-14649 , and SPARK-19753 > I will put my initial thoughts in a follow on comment. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20178) Improve Scheduler fetch failures
[ https://issues.apache.org/jira/browse/SPARK-20178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16020166#comment-16020166 ] Josh Rosen commented on SPARK-20178: Sure, let me clarify: When a FetchFailure occurs, the DAGScheduler receives a fetch failure message of the form {{FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage)}}. As of today's Spark master branch, the DAGScheduler handles this failure by marking that individual output as unavailable ( and by marking all outputs on that executor as unavailable (see https://github.com/apache/spark/blob/9b09101938399a3490c3c9bde9e5f07031140fdf/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1339). As a shorthand, let's call the current behavior {{remove(shuffleId, mapId)}} followed by {{remove(blockManagerId)}}. My proposal was to replace this by {{remove(shuffleId, blockManagerId)}} to remove all outputs from the fetch-failed shuffle on that block manager. {quote} I think this is basically what you are proposing except waiting for a configurable amount of failures rather then doing it immediately. Thoughts? {quote} My understanding of today's code is that a single FetchFailed task will trigger a stage failure and parent stage retry and that the task which experienced the fetch failure will not be retried within the same task set that scheduled it. I'm basing this off the comment at https://github.com/apache/spark/blob/9b09101938399a3490c3c9bde9e5f07031140fdf/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L77 and the code at https://github.com/apache/spark/blob/9b09101938399a3490c3c9bde9e5f07031140fdf/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L770 where the TSM prevents re-attempts of FetchFailed tasks. > Improve Scheduler fetch failures > > > Key: SPARK-20178 > URL: https://issues.apache.org/jira/browse/SPARK-20178 > Project: Spark > Issue Type: Epic > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Thomas Graves > > We have been having a lot of discussions around improving the handling of > fetch failures. There are 4 jira currently related to this. > We should try to get a list of things we want to improve and come up with one > cohesive design. > SPARK-20163, SPARK-20091, SPARK-14649 , and SPARK-19753 > I will put my initial thoughts in a follow on comment. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20178) Improve Scheduler fetch failures
[ https://issues.apache.org/jira/browse/SPARK-20178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16020141#comment-16020141 ] Josh Rosen commented on SPARK-20178: Sure, let me clarify: * When a FetchFailure occurs, the DAGScheduler receives a fetch failure message of the form {{FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage)}}. * As of today's Spark master branch, the DAGScheduler handles this failure by marking that individual output as unavailable ( and by marking all outputs on that executor as unavailable. ** See https://github.com/apache/spark/blob/9b09101938399a3490c3c9bde9e5f07031140fdf/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1339 and https://github.com/apache/spark/blob/9b09101938399a3490c3c9bde9e5f07031140fdf/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1346. ** As a shorthand, let's call this {{remove(shuffleId, mapId)}} followed by {{remove(blockManagerId)}}. > Improve Scheduler fetch failures > > > Key: SPARK-20178 > URL: https://issues.apache.org/jira/browse/SPARK-20178 > Project: Spark > Issue Type: Epic > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Thomas Graves > > We have been having a lot of discussions around improving the handling of > fetch failures. There are 4 jira currently related to this. > We should try to get a list of things we want to improve and come up with one > cohesive design. > SPARK-20163, SPARK-20091, SPARK-14649 , and SPARK-19753 > I will put my initial thoughts in a follow on comment. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20178) Improve Scheduler fetch failures
[ https://issues.apache.org/jira/browse/SPARK-20178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019650#comment-16019650 ] Thomas Graves commented on SPARK-20178: --- | when the DAGScheduler is notified of a FetchFailure from a node then mark that shuffle's output locations on that node as unavailable (rather than all shuffles' outputs on that node) Can you please clarify this? Do you mean only the map stage output locations for that particular shuffle id that got the fetch failure? | the likelihood of the FetchFailure being a transient failure is relatively small Unfortunately this is not what we have seen in the past with mapreduce and tez. Both of those used to be very aggressive about invalidating outputs and such and we had to tune those back because we do see a lot of transient failures and caused a lot of extra work and delays. But I do agree that this one is a lot of work and will take some time to implement and based on the way things work now we should do something shorter term as well. I was actually just looking at this more last friday because I am seeing more of these fetch failures cause job failures so something needs to be done. Based on the way we handle fetch failures now invalidating all for that stage and causing it to take longer is better then job failure. I was thinking we could invalidate all the map outputs on that host for that map stage after a certain number of failures, which could be across attempts. I think this is basically what you are proposing except waiting for a configurable amount of failures rather then doing it immediately. Thoughts? I also think adding blacklisting after a certain number of fetch failures would be good for those cases where the YARN NM crashes but there could still be executors running on the node. That one isn't as difficult, you just have to track it in the BlacklistTracker. Work started in SPARK-13669 it would just need tracking again of multiple failures. > Improve Scheduler fetch failures > > > Key: SPARK-20178 > URL: https://issues.apache.org/jira/browse/SPARK-20178 > Project: Spark > Issue Type: Epic > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Thomas Graves > > We have been having a lot of discussions around improving the handling of > fetch failures. There are 4 jira currently related to this. > We should try to get a list of things we want to improve and come up with one > cohesive design. > SPARK-20163, SPARK-20091, SPARK-14649 , and SPARK-19753 > I will put my initial thoughts in a follow on comment. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20178) Improve Scheduler fetch failures
[ https://issues.apache.org/jira/browse/SPARK-20178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019123#comment-16019123 ] Josh Rosen commented on SPARK-20178: Looking over a few of the tickets linked to this fetch failure handling umbrella, I've noticed that there is a commonality in several linked JIRAs where folks are proposing to treat a single fetch failure from a node as though all outputs on that node were lost. While this is beneficial for avoiding the behavior where we keep repeatedly trying to refetch from a malfunctioning node or an external shuffle service which has disappeared, it may go too far in some situations and can cause unnecessary recomputations. For example, in a multi-user multi-job environment there could be a high cost to a false-positive where you mark a healthy block manager/shuffle service as unavailable following a single FetchFailure: this takes a failure which might be isolated to a single stage and promotes it into a wider failure that can impact other concurrently running stages (or which can destroy the ability to leverage the implicit caching of shuffle outputs across job runs). To work around this problem, it looks like there are several proposals (but not PRs yet) for more complex approaches which attempt to infer whether a fetch failure indicates complete unavailability by keeping statistics on the number of fetch failures attributed to each node. The idea here is very similar to executor blacklisting, except applied to output locations. This is a good idea for the longer term because it can help to mitigate against nodes which silently corrupt most data written to disk (a failure mode we won't tolerate well today), but I don't think it's the right fix for the immediate issue being discussed in this ticket: these proposals will require significant amounts of new bookeeping logic to implement (which is hard to do efficiently and without causing memory leaks / perf. issues) and involve threshold-based detection logic which can require tuning to get correct. As a compromise, I would like to propose a slightly weaker version of SPARK-20115 and SPARK-19753: when the DAGScheduler is notified of a FetchFailure from a node then mark _that shuffle's output locations on that node_ as unavailable (rather than all shuffles' outputs on that node). The rationale behind this is that the FetchFailure is already going to cause recomputation of that shuffle and the likelihood of the FetchFailure being a transient failure is relatively small: tasks already have internal retries when fetching (see both RetryingBlockFetcher and [~davies]'s patch for retrying within the task when small fetched shuffle blocks are determined to be corrupt), so if a task fails with a FetchFailure then it seems likely that the actual output that we tried to fetch is unavailable or corrupt. I think that this proposal should be simple to implement (and backport (optionally in a feature-flagged manner)) and hopefully won't be controversial because it's much more limited in the scope of the extra inferences it draws from FetchFailures . It also does not preclude the other proposals from being implemented later. Feedback on this is very welcome. If there's support then I'd like to take a shot at implementing it. > Improve Scheduler fetch failures > > > Key: SPARK-20178 > URL: https://issues.apache.org/jira/browse/SPARK-20178 > Project: Spark > Issue Type: Epic > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Thomas Graves > > We have been having a lot of discussions around improving the handling of > fetch failures. There are 4 jira currently related to this. > We should try to get a list of things we want to improve and come up with one > cohesive design. > SPARK-20163, SPARK-20091, SPARK-14649 , and SPARK-19753 > I will put my initial thoughts in a follow on comment. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20178) Improve Scheduler fetch failures
[ https://issues.apache.org/jira/browse/SPARK-20178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985447#comment-15985447 ] Thomas Graves commented on SPARK-20178: --- Another thing we should tie in here is handling preempted containers better. This kind of matches with my point above "Improve logic around deciding which node is actually bad when you get a fetch failures." but a little bit of a special case. If the containers gets preempted on the yarn side we need to properly detect that and not count that as a normal fetch failure. Right now that seems pretty difficult with the way we handle stage failures but I guess you would just line that up and not caught that as a normal stage failure. > Improve Scheduler fetch failures > > > Key: SPARK-20178 > URL: https://issues.apache.org/jira/browse/SPARK-20178 > Project: Spark > Issue Type: Epic > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Thomas Graves > > We have been having a lot of discussions around improving the handling of > fetch failures. There are 4 jira currently related to this. > We should try to get a list of things we want to improve and come up with one > cohesive design. > SPARK-20163, SPARK-20091, SPARK-14649 , and SPARK-19753 > I will put my initial thoughts in a follow on comment. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20178) Improve Scheduler fetch failures
[ https://issues.apache.org/jira/browse/SPARK-20178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15969388#comment-15969388 ] Thomas Graves commented on SPARK-20178: --- One thing I ran into today which is somewhat related to this is a combination of failure types. In this case it was broadcast fetch failures combined with shuffle fetch failures which lead to 4 task failures and failed the job. I believe they were all from the same host and happened really quickly (within 3 seconds). This seems like this should fall under the fetch failure case as well. Failed to get broadcast_646_piece0 of broadcast_646 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1222) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:691) at org.apache.spark.MapOutputTracker.getStatuses(MapOutputTracker.scala:204) at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:143) at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:47) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:147) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:136) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:136) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > Improve Scheduler fetch failures > > > Key: SPARK-20178 > URL: https://issues.apache.org/jira/browse/SPARK-20178 > Project: Spark > Issue Type: Epic > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Thomas Graves > > We have been having a lot of discussions around improving the handling of > fetch failures. There are 4 jira currently related to this. > We should try to get a list of things we want to improve and come up with one > cohesive design. > SPARK-20163, SPARK-20091, SPARK-14649 , and SPARK-19753 > I will put my initial thoughts in a follow on comment. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20178) Improve Scheduler fetch failures
[ https://issues.apache.org/jira/browse/SPARK-20178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15957341#comment-15957341 ] Sital Kedia commented on SPARK-20178: - [~tgraves] Thanks for creating the JIRA and driving the discussion [~tgraves], [~imranr], [~markhamstra], [~kayousterhout] - . As discussed over the PR, I have created a doc consolidating the issues related to fetch failure and a high-level design goals for the scheduler https://docs.google.com/document/d/1D3b_ishMfm5sXmRS494JrOJmL9V_TRVZUB4TgK1l1fY/edit?usp=sharing. Please take a look and let me know what you think. Once we reach a consensus about the desired behavior and identify the changes that needs to be done, I can work on having more detailed design doc and also split the change into multiple small logical PRs. > Improve Scheduler fetch failures > > > Key: SPARK-20178 > URL: https://issues.apache.org/jira/browse/SPARK-20178 > Project: Spark > Issue Type: Epic > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Thomas Graves > > We have been having a lot of discussions around improving the handling of > fetch failures. There are 4 jira currently related to this. > We should try to get a list of things we want to improve and come up with one > cohesive design. > SPARK-20163, SPARK-20091, SPARK-14649 , and SPARK-19753 > I will put my initial thoughts in a follow on comment. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20178) Improve Scheduler fetch failures
[ https://issues.apache.org/jira/browse/SPARK-20178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15951194#comment-15951194 ] Imran Rashid commented on SPARK-20178: -- Thanks for writing this up Tom. The only way I see to have a pluggable interface in the current code is to abstract out the *entire* thing -- DAGScheduler, TSM, TSI. perhaps also CGSB and OCC. that would be pretty extreme though, I'd only consider that if we actually have some reason to think we'd come up with a better version (eg. new abstractions with less shared state). In addition to not destabilizing the current scheduler, we should also think of what the migration path would be for enabling these new changes. Will there be a way for spark to auto-tune? Or will we need to create a number of new confs? I know everyone hates having a huge set of configuration that needs to be tuned, but at some point I think its OK if spark works reasonably well on small clusters by default, and for large clusters you've just got to have somebody that knows how to configure it carefully. Another thing to keep in mind is that Spark is used on a huge variety of workloads. I feel like right now we're very focused on large jobs on big clusters with long tasks; but spark is also used with very small tasks, especially streaming. I think all the ideas we're thinking of only effect behavior after there is a failure, so hopefully it wouldn't matter. But we need to be careful that we don't introduce complexity which effects performance even before any failures. > Improve Scheduler fetch failures > > > Key: SPARK-20178 > URL: https://issues.apache.org/jira/browse/SPARK-20178 > Project: Spark > Issue Type: Epic > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Thomas Graves > > We have been having a lot of discussions around improving the handling of > fetch failures. There are 4 jira currently related to this. > We should try to get a list of things we want to improve and come up with one > cohesive design. > SPARK-20163, SPARK-20091, SPARK-14649 , and SPARK-19753 > I will put my initial thoughts in a follow on comment. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20178) Improve Scheduler fetch failures
[ https://issues.apache.org/jira/browse/SPARK-20178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15950917#comment-15950917 ] Thomas Graves commented on SPARK-20178: --- Overall what I would like to accomplish is not throwing away work and making the failure case very performant. More and more people are running spark on larger clusters, this means failures are going to occur more. We need those failures to be as fast as possible. We need to be careful here and make sure we handle the node totally down case, the nodemanager totally down, and the nodemanager or node is just having intermittent issue. Generally I see the last where the issue is just intermittent but some people recently have had more of the nodemanager totally down case in which case you want to fail all maps on that node quickly. The decision on what to rerun is hard now because it could be very costly to rerun more, but at the same time it could be very costly to not rerun all immediately because you can fail all 4 stage attempts. This really depends on how long the maps and reduces run. A lot of discussion on https://github.com/apache/spark/pull/17088 related to that. - We should not kill the Reduce tasks on fetch failure. Leave the Reduce tasks running since it could have done useful work already like fetching X number of map outputs. It can simply fail that map output which would cause the map to be rerun and only that specific map output would need to be refetched. This does require checking to make sure there are enough resource to run the map and if not possibly killing a reducer or getting more resources if dynamic allocation. - Improve logic around deciding which node is actually bad when you get a fetch failures. Was it really the node the reduce was on or the node the map was on. You can do something here like a % of reducers failed to fetch from map output node. - We should only rerun the maps that failed (or have been logic around how to make this decision), other maps could have already been fetch (with bullet one) so no need to rerun if all reducers already fetched. Since the reduce tasks keep running, other fetch failures can happen in parallel and that would just cause other maps to be rerun. At some point based on bullet 2 above we can decide entire node is bad. - Improve the blacklisting based on the above improvements - make sure to think about how this plays into the stage attempt max failures (4, now settable) - try to not waste resources. ie right now we can have 2 of the same reduce tasks running which is using twice the resources and there are a bunch of different conditions that can occur as to whether this work is actually useful. Question: - should we consider having it fetch all map output from a host at once (rather then per executor). This could improve fetching times (but would have to test) as well as fetch failure handling. This could cause it to fail more maps which is somewhat contradictory to bullet 3 above, need to think about this more. - Do we need pluggable interface or how do we not destabilize current scheduler? Bonus or future: - Decision on when and how many maps to rerun is cost based estimate. If maps only take a few seconds to run could rerun all maps on the host immediately - option to prestart reduce tasks so that they can start fetching while last few maps are failing (if you have long tail maps) > Improve Scheduler fetch failures > > > Key: SPARK-20178 > URL: https://issues.apache.org/jira/browse/SPARK-20178 > Project: Spark > Issue Type: Epic > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Thomas Graves > > We have been having a lot of discussions around improving the handling of > fetch failures. There are 4 jira currently related to this. > We should try to get a list of things we want to improve and come up with one > cohesive design. > SPARK-20163, SPARK-20091, SPARK-14649 , and SPARK-19753 > I will put my initial thoughts in a follow on comment. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org