Hey Ankur,

I think the significant decrease in "spark.blacklist.timeout" (1 hr down to
5 minutes) in your updated suggestion is the key here.

Looking at a few *successful* runs of the application I was debugging, here
are the error rates when I did *not* have blacklisting enabled:

Run A: 8 executors with 36 total errors over the last 25 minutes of a 1
hour and 6 minute run.
Run B: 8 executors with 50 total errors over the last 30 minutes of a 1
hour run.

Increasing "spark.blacklist.application.maxFailedTasksPerExecutor" to 5
would have allowed run A (~3 failures/executor) to pass, but run B (~6
failures/executor) would not have without the change to
"spark.blacklist.timeout".

With such a small timeout of 5 minutes, the worst you get is executors
flipping between blacklisted and not blacklisted (e.g. fail 5 tasks quickly
due to disk failures, wait 5 minutes, fail 5 tasks quickly, wait 5
minutes). For catastrophic errors, this is probably OK. The executor will
fail fast each time it comes back online and will effectively be
blacklisted 90+% of the time. For transient errors, the executor will come
back online and probably be fine. The only trouble you get into is if you
run out of executors for a stage due to a high amount of transient errors,
but you're right, perhaps that many transient errors is something worth
failing for.

In the case I was debugging with fetch failures, only the 5 minute timeout
applies, but I don't think it would have mattered. Fetch task attempts were
"hanging" for 30+ minutes without failing (it took that long for the netty
channel to timeout). As such, there was no opportunity to blacklist. Even
reducing the number of fetch retry attempts didn't help, as the first
attempt occasionally stalled due to the underlying networking issues.

A few thoughts:
- Correct me if I'm wrong, but once a task fails on an executor, even if
maxTaskAttemptsPerExecutor > 1, that executor will get a failed task count
against it. It looks like "TaskSetBlacklist.updateBlacklistForFailedTask"
only adds to the executor failures. If the tasks recovers on the second
attempt on the same executor, there is no way to remove the failure. I'd
argue that if the task succeeds on a second attempt on the same executor,
then it is definitely transient and the first attempt's failure should not
count towards the executor's total stage/application failure count.
- Rather than a fixed timeout, could we do some sort of exponential
backoff? Start with a 10 or 20 second blacklist and increase from there?
The nodes with catastrophic errors should quickly hit long blacklist
intervals.
- W.r.t turning it on by default: Do we have a sense of how many teams are
using blacklisting today using the current default settings? It may be
worth changing the defaults for a release or two and gather feedback to
help make a call on turning it on by default. We could potentially get that
feedback now: two question survey "Have you enabled blacklisting?" and
"What settings did you use?"

-Chris

On Mon, Apr 1, 2019 at 9:05 AM Ankur Gupta <ankur.gu...@cloudera.com> wrote:

> Hi Chris,
>
> Thanks for sending over the example. As far as I can understand, it seems
> that this would not have been a problem if
> "spark.blacklist.application.maxFailedTasksPerExecutor" was set to a higher
> threshold, as mentioned in my previous email.
>
> Though, with 8/7 executors and 2 failedTasksPerExecutor, if the
> application runs out of executors, that would imply at least 14 task
> failures in a short period of time. So, I am not sure if the application
> should still continue to run or fail. If this was not a transient issue,
> maybe failing was the correct outcome, as it saves lot of unnecessary
> computation and also alerts admins to look for transient/permanent hardware
> failures.
>
> Please let me know if you think, we should enable blacklisting feature by
> default with the higher threshold.
>
> Thanks,
> Ankur
>
> On Fri, Mar 29, 2019 at 3:23 PM Chris Stevens <
> chris.stev...@databricks.com> wrote:
>
>> Hey All,
>>
>> My initial reply got lost, because I wasn't on the dev list. Hopefully
>> this goes through.
>>
>> Back story for my experiments: customer was hitting network errors due to
>> cloud infrastructure problems. Basically, executor X couldn't fetch from Y.
>> The NIC backing the VM for executor Y was swallowing packets. I wanted to
>> blacklist node Y.
>>
>> What I learned:
>>
>> 1. `spark.blacklist.application.fetchFailure.enabled` requires
>> `spark.blacklist.enabled` to also be enabled (BlacklistTracker isn't
>> created
>> <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L948>
>>  without
>> the latter). This was a problem because the defaults for
>> `spark.blacklist.[task|stage|application].*` are aggressive and don't even
>> apply to fetch failures. Those are always treated as non-transient. It
>> would be nice to have fetch blacklisting without regular blacklisting.
>>
>> 2. Due to the conf coupling in #1 and transient cloud storage errors in
>> the job (FileScanRDD was failing due to corrupted files), I had to set the
>> `max*PerExecutor` and `max*PerNode` to really high values (i.e. 1000).
>> Without these high settings, the customer was running out of nodes on the
>> cluster (as we don't have blacklisting enabled by default, we haven't
>> hooked it up to any sort of dynamic cloud VM re-provisioning - something
>> like `killBlacklistedNodes`). Why? The same transient FileScanRDD failure
>> hit over multiple stages, so even though executors were aggressively
>> removed within one
>> stage, `spark.blacklist.application.maxFailedTasksPerExecutor = 2` was
>> reached. The stages were succeeding because the FileScanRDD attempts on
>> other executors succeeded. As such, the 8 node cluster ran out of executors
>> after 3 stages. I did not have `spark.blacklist.killBlacklistedExecutors`.
>> If I did, then `spark.blacklist.application.maxFailedExecutorsPerNode`
>> would have kicked in and the job might have failed after 4-6 stages,
>> depending on how it played out. (FWIW, this was running one executor per
>> node).
>>
>> -Chris
>>
>> On Fri, Mar 29, 2019 at 1:48 PM Ankur Gupta <ankur.gu...@cloudera.com>
>> wrote:
>>
>>> Thanks Reynold! That is certainly useful to know.
>>>
>>> @Chris Will it be possible for you to send out those details if you
>>> still have them or better create a JIRA, so someone can work on those
>>> improvements. If there is already a JIRA, can you please provide a link to
>>> the same.
>>>
>>> Additionally, if the concern is with the aggressiveness of the
>>> blacklisting, then we can enable blacklisting feature by default with
>>> higher thresholds for failures. Below is an alternate set of defaults that
>>> were also proposed in the design document for max cluster utilization:
>>>
>>>    1. spark.blacklist.task.maxTaskAttemptsPerExecutor = 2
>>>    2. spark.blacklist.task.maxTaskAttemptsPerNode = 2
>>>    3. spark.blacklist.stage.maxFailedTasksPerExecutor = 5
>>>    4. spark.blacklist.stage.maxFailedExecutorsPerNode = 4
>>>    5. spark.blacklist.application.maxFailedTasksPerExecutor = 5
>>>    6. spark.blacklist.application.maxFailedExecutorsPerNode = 4
>>>    7. spark.blacklist.timeout = 5 mins
>>>
>>>
>>>
>>> On Fri, Mar 29, 2019 at 11:18 AM Reynold Xin <r...@databricks.com>
>>> wrote:
>>>
>>>> We tried enabling blacklisting for some customers and in the cloud,
>>>> very quickly they end up having 0 executors due to various transient
>>>> errors. So unfortunately I think the current implementation is terrible for
>>>> cloud deployments, and shouldn't be on by default. The heart of the issue
>>>> is that the current implementation is not great at dealing with transient
>>>> errors vs catastrophic errors.
>>>>
>>>> +Chris who was involved with those tests.
>>>>
>>>>
>>>>
>>>> On Thu, Mar 28, 2019 at 3:32 PM, Ankur Gupta <
>>>> ankur.gu...@cloudera.com.invalid> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> This is a follow-on to my PR:
>>>>> https://github.com/apache/spark/pull/24208, where I aimed to enable
>>>>> blacklisting for fetch failure by default. From the comments, there is
>>>>> interest in the community to enable overall blacklisting feature by
>>>>> default. I have listed down 3 different things that we can do and would
>>>>> like to gather feedback and see if anyone has objections with regards to
>>>>> this. Otherwise, I will just create a PR for the same.
>>>>>
>>>>> 1. *Enable blacklisting feature by default*. The blacklisting feature
>>>>> was added as part of SPARK-8425 and is available since 2.2.0. This feature
>>>>> was deemed experimental and was disabled by default. The feature 
>>>>> blacklists
>>>>> an executor/node from running a particular task, any task in a particular
>>>>> stage or all tasks in application based on number of failures. There are
>>>>> various configurations available which control those thresholds.
>>>>> Additionally, the executor/node is only blacklisted for a configurable 
>>>>> time
>>>>> period. The idea is to enable blacklisting feature with existing defaults,
>>>>> which are following:
>>>>>
>>>>>    1. spark.blacklist.task.maxTaskAttemptsPerExecutor = 1
>>>>>    2. spark.blacklist.task.maxTaskAttemptsPerNode = 2
>>>>>    3. spark.blacklist.stage.maxFailedTasksPerExecutor = 2
>>>>>    4. spark.blacklist.stage.maxFailedExecutorsPerNode = 2
>>>>>    5. spark.blacklist.application.maxFailedTasksPerExecutor = 2
>>>>>    6. spark.blacklist.application.maxFailedExecutorsPerNode = 2
>>>>>    7. spark.blacklist.timeout = 1 hour
>>>>>
>>>>> 2. *Kill blacklisted executors/nodes by default*. This feature was
>>>>> added as part of SPARK-16554 and is available since 2.2.0. This is a
>>>>> follow-on feature to blacklisting, such that if an executor/node is
>>>>> blacklisted for the application, then it also terminates all running tasks
>>>>> on that executor for faster failure recovery.
>>>>>
>>>>> 3. *Remove legacy blacklisting timeout config*
>>>>> : spark.scheduler.executorTaskBlacklistTime
>>>>>
>>>>> Thanks,
>>>>> Ankur
>>>>>
>>>>
>>>>

Reply via email to