Github user attilapiros commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20203#discussion_r162041751
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala ---
    @@ -128,13 +130,17 @@ private[scheduler] class TaskSetBlacklist(val conf: 
SparkConf, val stageId: Int,
         }
     
         // Check if enough tasks have failed on the executor to blacklist it 
for the entire stage.
    -    if (execFailures.numUniqueTasksWithFailures >= 
MAX_FAILURES_PER_EXEC_STAGE) {
    +    val numFailures = execFailures.numUniqueTasksWithFailures
    +    if (numFailures >= MAX_FAILURES_PER_EXEC_STAGE) {
           if (blacklistedExecs.add(exec)) {
             logInfo(s"Blacklisting executor ${exec} for stage $stageId")
             // This executor has been pushed into the blacklist for this 
stage.  Let's check if it
             // pushes the whole node into the blacklist.
             val blacklistedExecutorsOnNode =
               execsWithFailuresOnNode.filter(blacklistedExecs.contains(_))
    +        val now = clock.getTimeMillis()
    +        listenerBus.post(
    +          SparkListenerExecutorBlacklistedForStage(now, exec, numFailures, 
stageId, stageAttemptId))
             if (blacklistedExecutorsOnNode.size >= 
MAX_FAILED_EXEC_PER_NODE_STAGE) {
               if (blacklistedNodes.add(host)) {
                 logInfo(s"Blacklisting ${host} for stage $stageId")
    --- End diff --
    
    There are two possible solution I can see for this right now:
    
    1) Extending updateBlacklistForFailedTask with 
org.apache.spark.scheduler.TaskSetBlacklist#updateBlacklistForFailedTask() with 
the hostToExecutors map. And sending SparkListenerExecutorBlacklistedForStage 
for all the executors for the node. But this change would propagate to 
TaskSetManager and even to TaskSchedulerImpl too (where this data is 
available). 
    
    2) Introducing new event SparkListenerNodeBlacklistedForStage. This is more 
consistent to the existing solution we have in BlacklistTracker. In this case 
in the AppStatusListener I guess I should use the **liveExecutors** to iterate 
on the currently available executors for the blacklisted node and fill up the 
executor summaries for the stage (as Node relevant data is not stored like 
blacklisting just mapped to the current available executors). This way if the 
very first metrics (SparkListenerExecutorMetricsUpdate) arrives for an executor 
**after** the node blacklisting the blacklisted flag will be still correct but 
on the other hand not used executors will also appear for the stage (this 
side-effect would be probably the very same for the first point too).
    
    I plan to go for the 2nd solution with a new HistoryServerSuite test. 
    What is your opinion? Do you see any problem regarding this solution?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to