Github user attilapiros commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21068#discussion_r195922985
  
    --- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
 ---
    @@ -0,0 +1,187 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.deploy.yarn
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +import scala.collection.mutable.HashMap
    +
    +import org.apache.hadoop.yarn.client.api.AMRMClient
    +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
    +
    +import org.apache.spark.SparkConf
    +import org.apache.spark.deploy.yarn.config._
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.internal.config._
    +import org.apache.spark.scheduler.BlacklistTracker
    +import org.apache.spark.util.{Clock, SystemClock, Utils}
    +
    +/**
    + * YarnAllocatorBlacklistTracker is responsible for tracking the 
blacklisted nodes
    + * and synchronizing the node list to YARN.
    + *
    + * Blacklisted nodes are coming from two different sources:
    + *
    + * <ul>
    + *   <li> from the scheduler as task level blacklisted nodes
    + *   <li> from this class (tracked here) as YARN resource allocation 
problems
    + * </ul>
    + *
    + * The reason to realize this logic here (and not in the driver) is to 
avoid possible delays
    + * between synchronizing the blacklisted nodes with YARN and resource 
allocations.
    + */
    +private[spark] class YarnAllocatorBlacklistTracker(
    +    sparkConf: SparkConf,
    +    amClient: AMRMClient[ContainerRequest],
    +    failureTracker: FailureTracker)
    +  extends Logging {
    +
    +  private val blacklistTimeoutMillis = 
BlacklistTracker.getBlacklistTimeout(sparkConf)
    +
    +  private val launchBlacklistEnabled = 
sparkConf.get(YARN_EXECUTOR_LAUNCH_BLACKLIST_ENABLED)
    +
    +  private val maxFailuresPerHost = sparkConf.get(MAX_FAILED_EXEC_PER_NODE)
    +
    +  private val allocatorBlacklist = new HashMap[String, Long]()
    +
    +  private var currentBlacklistedYarnNodes = Set.empty[String]
    +
    +  private var schedulerBlacklist = Set.empty[String]
    +
    +  private var numClusterNodes = Int.MaxValue
    +
    +  def setNumClusterNodes(numClusterNodes: Int): Unit = {
    +    this.numClusterNodes = numClusterNodes
    +  }
    +
    +  def handleResourceAllocationFailure(hostOpt: Option[String]): Unit = {
    +    hostOpt match {
    +      case Some(hostname) if launchBlacklistEnabled =>
    +        // failures on an already blacklisted nodes are not even tracked.
    +        // otherwise, such failures could shutdown the application
    +        // as resource requests are asynchronous
    +        // and a late failure response could exceed MAX_EXECUTOR_FAILURES
    +        if (!schedulerBlacklist.contains(hostname) &&
    +            !allocatorBlacklist.contains(hostname)) {
    +          failureTracker.registerFailureOnHost(hostname)
    +          updateAllocationBlacklistedNodes(hostname)
    +        }
    +      case _ =>
    +        failureTracker.registerExecutorFailure()
    +    }
    +  }
    +
    +  private def updateAllocationBlacklistedNodes(hostname: String): Unit = {
    +    val failuresOnHost = failureTracker.numFailuresOnHost(hostname)
    +    if (failuresOnHost > maxFailuresPerHost) {
    +      logInfo(s"blacklisting $hostname as YARN allocation failed 
$failuresOnHost times")
    --- End diff --
    
    Thanks, I am happy you consider this change useful.
    
    Regarding logInfo I have chosen that to be consistent with the logging of 
the existing BlacklistTracker where blacklisting itself is taken as a part of 
the normal behaviour and logInfo is used. But if you have a strong feeling 
about logWarn I can do the change. 
    
    For the metrics I've done some quick search in the yarn module and it seems 
to me currently no metrics are coming from there so the change probably is not 
just a few lines. What about me creating a new jira task for it? Is that fine 
for you?  


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to