Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8760#discussion_r46247212
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistStrategy.scala ---
    @@ -0,0 +1,151 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.SparkConf
    +import org.apache.spark.util.SystemClock
    +import org.apache.spark.util.Clock
    +
    +/**
    + * The interface to determine executor blacklist and node blacklist.
    + */
    +private [scheduler] trait BlacklistStrategy {
    +  /** Define a time interval to expire failure information of executors */
    +  val expireTimeInMilliseconds: Long
    +
    +  /** Return executors in blacklist which are related to given stage and 
partition */
    +  def getExecutorBlacklist(
    +      executorIdToFailureStatus: mutable.HashMap[String, FailureStatus],
    +      atomTask: StageAndPartition,
    +      clock: Clock): Set[String]
    +
    +  /** Return all nodes in blacklist */
    +  def getNodeBlacklist(
    +      executorIdToFailureStatus: mutable.HashMap[String, FailureStatus]): 
Set[String]
    +
    +  /**
    +   * Choose which executors should be removed from blacklist. Return true 
if any executors are
    +   * removed from the blacklist, false otherwise. The default 
implementation removes executors from
    +   * the blacklist after [[expireTimeInMilliseconds]]
    +   */
    +  def expireExecutorsInBlackList(
    +      executorIdToFailureStatus: mutable.HashMap[String, FailureStatus],
    +      clock: Clock): Boolean = {
    +    val now = clock.getTimeMillis()
    +    val expiredKey = executorIdToFailureStatus.filter {
    +      case (executorid, failureStatus) => {
    +        (now - failureStatus.updatedTime) >= expireTimeInMilliseconds
    +      }
    +    }.keySet
    +
    +    if (expiredKey.isEmpty) {
    +      false
    +    } else {
    +      executorIdToFailureStatus --= expiredKey
    +      true
    +    }
    +  }
    +}
    +
    +/**
    + * This strategy adds an executor to the blacklist for all tasks when the 
executor has too many
    + * task failures. An executor is placed in the blacklist when there are 
more than
    + * [[maxFailedTasks]] failed tasks. Furthermore, all executors in one node 
are put into the
    + * blacklist if there are more than [[maxBlacklistedExecutors]] 
blacklisted executors on one node.
    + * The benefit of this strategy is that different taskSets can learn 
experience from other taskSet
    + * to avoid allocating tasks on problematic executors.
    + */
    +private[scheduler] class ExecutorAndNodeStrategy(
    +    maxFailedTasks: Int,
    +    maxBlacklistedExecutors: Int,
    +    val expireTimeInMilliseconds: Long
    +  ) extends BlacklistStrategy {
    +
    +  private def getSelectedExecutorMap(
    --- End diff --
    
    This needs to be renamed to something more appropriate - 
filterFailedExecutors or some such - so that it is clear what it does. Selected 
Executor does not really convey what it is supposed to do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to