Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2169#discussion_r16897811
  
    --- Diff: 
yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala ---
    @@ -17,18 +17,430 @@
     
     package org.apache.spark.deploy.yarn
     
    +import java.util.{List => JList}
    +import java.util.concurrent.ConcurrentHashMap
    +import java.util.concurrent.atomic.AtomicInteger
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.yarn.api.records._
    +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
    +
    +import org.apache.spark.{Logging, SparkConf, SparkEnv}
    +import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
    +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
    +
     object AllocationType extends Enumeration {
       type AllocationType = Value
       val HOST, RACK, ANY = Value
     }
     
    +// TODO:
    +// Too many params.
    +// Needs to be mt-safe
    +// Need to refactor this to make it 'cleaner' ... right now, all 
computation is reactive - should
    +// make it more proactive and decoupled.
    +
    +// Note that right now, we assume all node asks as uniform in terms of 
capabilities and priority
    +// Refer to 
http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/
 for
    +// more info on how we are requesting for containers.
    +
     /**
    - * Interface that defines a Yarn allocator.
    + * Common code for the Yarn container allocator. Contains all the 
version-agnostic code to
    + * manage container allocation for a running Spark application.
      */
    -trait YarnAllocator {
    +private[yarn] abstract class YarnAllocator(
    +    conf: Configuration,
    +    sparkConf: SparkConf,
    +    args: ApplicationMasterArguments,
    +    preferredNodes: collection.Map[String, collection.Set[SplitInfo]])
    +  extends Logging {
     
    -  def allocateResources(): Unit
    -  def getNumExecutorsFailed: Int
    -  def getNumExecutorsRunning: Int
    +  // These three are locked on allocatedHostToContainersMap. Complementary 
data structures
    +  // allocatedHostToContainersMap : containers which are running : host, 
Set<containerid>
    +  // allocatedContainerToHostMap: container to host mapping.
    +  private val allocatedHostToContainersMap =
    +    new HashMap[String, collection.mutable.Set[ContainerId]]()
     
    -}
    +  private val allocatedContainerToHostMap = new HashMap[ContainerId, 
String]()
    +
    +  // allocatedRackCount is populated ONLY if allocation happens (or 
decremented if this is an
    +  // allocated node)
    +  // As with the two data structures above, tightly coupled with them, and 
to be locked on
    +  // allocatedHostToContainersMap
    +  private val allocatedRackCount = new HashMap[String, Int]()
    +
    +  // Containers to be released in next request to RM
    +  private val releasedContainers = new ConcurrentHashMap[ContainerId, 
Boolean]
    +
    +  // Additional memory overhead - in mb.
    +  protected val memoryOverhead: Int = 
sparkConf.getInt("spark.yarn.executor.memoryOverhead",
    +    YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
    +
    +  // Number of container requests that have been sent to, but not yet 
allocated by the
    +  // ApplicationMaster.
    +  private val numPendingAllocate = new AtomicInteger()
    +  private val numExecutorsRunning = new AtomicInteger()
    +  // Used to generate a unique id per executor
    +  private val executorIdCounter = new AtomicInteger()
    +  private val numExecutorsFailed = new AtomicInteger()
    +
    +  private val maxExecutors = args.numExecutors
    +
    +  protected val executorMemory = args.executorMemory
    +  protected val executorCores = args.executorCores
    +  protected val (preferredHostToCount, preferredRackToCount) =
    +    generateNodeToWeight(conf, preferredNodes)
    +
    +  def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
    +
    +  def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
    +
    +  def allocateResources() = {
    +    val missing = maxExecutors - numPendingAllocate.get() - 
numExecutorsRunning.get()
    +
    +    if (missing > 0) {
    +      numPendingAllocate.addAndGet(missing)
    +      logInfo("Will Allocate %d executor containers, each with %d 
memory".format(
    +        missing,
    +        (executorMemory + memoryOverhead)))
    +    } else {
    +      logDebug("Empty allocation request ...")
    +    }
    +
    +    val allocateResponse = allocateContainers(missing)
    +    val allocatedContainers = allocateResponse.getAllocatedContainers()
    +
    +    if (allocatedContainers.size > 0) {
    +      var numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * 
allocatedContainers.size)
    +
    +      if (numPendingAllocateNow < 0) {
    +        numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * 
numPendingAllocateNow)
    +      }
    +
    +      logDebug("""
    +        Allocated containers: %d
    +        Current executor count: %d
    +        Containers released: %s
    +        Cluster resources: %s
    +        """.format(
    +          allocatedContainers.size,
    +          numExecutorsRunning.get(),
    +          releasedContainers,
    +          allocateResponse.getAvailableResources))
    +
    +      val hostToContainers = new HashMap[String, ArrayBuffer[Container]]()
    +
    +      for (container <- allocatedContainers) {
    +        if (isResourceConstraintSatisfied(container)) {
    +          // Add the accepted `container` to the host's list of already 
accepted,
    +          // allocated containers
    +          val host = container.getNodeId.getHost
    +          val containersForHost = hostToContainers.getOrElseUpdate(host,
    +            new ArrayBuffer[Container]())
    +          containersForHost += container
    +        } else {
    +          // Release container, since it doesn't satisfy resource 
constraints.
    +          (container)
    +        }
    +      }
    +
    +       // Find the appropriate containers to use.
    +      // TODO: Cleanup this group-by...
    +      val dataLocalContainers = new HashMap[String, 
ArrayBuffer[Container]]()
    +      val rackLocalContainers = new HashMap[String, 
ArrayBuffer[Container]]()
    +      val offRackContainers = new HashMap[String, ArrayBuffer[Container]]()
    +
    +      for (candidateHost <- hostToContainers.keySet) {
    +        val maxExpectedHostCount = 
preferredHostToCount.getOrElse(candidateHost, 0)
    +        val requiredHostCount = maxExpectedHostCount - 
allocatedContainersOnHost(candidateHost)
    +
    +        val remainingContainersOpt = hostToContainers.get(candidateHost)
    +        assert(remainingContainersOpt.isDefined)
    +        var remainingContainers = remainingContainersOpt.get
    +
    +        if (requiredHostCount >= remainingContainers.size) {
    +          // Since we have <= required containers, add all remaining 
containers to
    +          // `dataLocalContainers`.
    +          dataLocalContainers.put(candidateHost, remainingContainers)
    +          // There are no more free containers remaining.
    +          remainingContainers = null
    +        } else if (requiredHostCount > 0) {
    +          // Container list has more containers than we need for data 
locality.
    +          // Split the list into two: one based on the data local 
container count,
    +          // (`remainingContainers.size` - `requiredHostCount`), and the 
other to hold remaining
    +          // containers.
    +          val (dataLocal, remaining) = remainingContainers.splitAt(
    +            remainingContainers.size - requiredHostCount)
    +          dataLocalContainers.put(candidateHost, dataLocal)
    +
    +          // Invariant: remainingContainers == remaining
    +
    +          // YARN has a nasty habit of allocating a ton of containers on a 
host - discourage this.
    +          // Add each container in `remaining` to list of containers to 
release. If we have an
    +          // insufficient number of containers, then the next allocation 
cycle will reallocate
    +          // (but won't treat it as data local).
    +          // TODO(harvey): Rephrase this comment some more.
    +          for (container <- remaining) (container)
    +          remainingContainers = null
    +        }
    +
    +        // For rack local containers
    +        if (remainingContainers != null) {
    +          val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
    +          if (rack != null) {
    +            val maxExpectedRackCount = 
preferredRackToCount.getOrElse(rack, 0)
    +            val requiredRackCount = maxExpectedRackCount - 
allocatedContainersOnRack(rack) -
    +              rackLocalContainers.getOrElse(rack, List()).size
    +
    +            if (requiredRackCount >= remainingContainers.size) {
    +              // Add all remaining containers to to `dataLocalContainers`.
    +              dataLocalContainers.put(rack, remainingContainers)
    +              remainingContainers = null
    +            } else if (requiredRackCount > 0) {
    +              // Container list has more containers that we need for data 
locality.
    +              // Split the list into two: one based on the data local 
container count,
    +              // (`remainingContainers.size` - `requiredHostCount`), and 
the other to hold remaining
    +              // containers.
    +              val (rackLocal, remaining) = remainingContainers.splitAt(
    +                remainingContainers.size - requiredRackCount)
    +              val existingRackLocal = 
rackLocalContainers.getOrElseUpdate(rack,
    +                new ArrayBuffer[Container]())
    +
    +              existingRackLocal ++= rackLocal
    +
    +              remainingContainers = remaining
    +            }
    +          }
    +        }
    +
    +        if (remainingContainers != null) {
    +          // Not all containers have been consumed - add them to the list 
of off-rack containers.
    +          offRackContainers.put(candidateHost, remainingContainers)
    +        }
    +      }
    +
    +      // Now that we have split the containers into various groups, go 
through them in order:
    +      // first host-local, then rack-local, and finally off-rack.
    +      // Note that the list we create below tries to ensure that not all 
containers end up within
    +      // a host if there is a sufficiently large number of 
hosts/containers.
    +      val allocatedContainersToProcess = new 
ArrayBuffer[Container](allocatedContainers.size)
    +      allocatedContainersToProcess ++= 
TaskSchedulerImpl.prioritizeContainers(dataLocalContainers)
    +      allocatedContainersToProcess ++= 
TaskSchedulerImpl.prioritizeContainers(rackLocalContainers)
    +      allocatedContainersToProcess ++= 
TaskSchedulerImpl.prioritizeContainers(offRackContainers)
    +
    +      // Run each of the allocated containers.
    +      for (container <- allocatedContainersToProcess) {
    +        val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet()
    +        val executorHostname = container.getNodeId.getHost
    +        val containerId = container.getId
    +
    +        val executorMemoryOverhead = (executorMemory + memoryOverhead)
    +        assert(container.getResource.getMemory >= executorMemoryOverhead)
    +
    +        if (numExecutorsRunningNow > maxExecutors) {
    +          logInfo("""Ignoring container %s at host %s, since we already 
have the required number of
    +            containers for it.""".format(containerId, executorHostname))
    +          numExecutorsRunning.decrementAndGet()
    +        } else {
    +          val executorId = executorIdCounter.incrementAndGet().toString
    +          val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
    +            SparkEnv.driverActorSystemName,
    +            sparkConf.get("spark.driver.host"),
    +            sparkConf.get("spark.driver.port"),
    +            CoarseGrainedSchedulerBackend.ACTOR_NAME)
    +
    +          logInfo("Launching container %s for on host 
%s".format(containerId, executorHostname))
    +
    +          // To be safe, remove the container from `releasedContainers`.
    +          releasedContainers.remove(containerId)
    +
    +          val rack = YarnSparkHadoopUtil.lookupRack(conf, executorHostname)
    +          allocatedHostToContainersMap.synchronized {
    +            val containerSet = 
allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
    +              new HashSet[ContainerId]())
    +
    +            containerSet += containerId
    +            allocatedContainerToHostMap.put(containerId, executorHostname)
    +
    +            if (rack != null) {
    +              allocatedRackCount.put(rack, 
allocatedRackCount.getOrElse(rack, 0) + 1)
    +            }
    +          }
    +          logInfo("Launching ExecutorRunnable. driverUrl: %s,  
executorHostname: %s".format(
    +            driverUrl, executorHostname))
    +          val executorRunnable = new ExecutorRunnable(
    +            container,
    +            conf,
    +            sparkConf,
    +            driverUrl,
    +            executorId,
    +            executorHostname,
    +            executorMemory,
    +            executorCores)
    +          new Thread(executorRunnable).start()
    +        }
    +      }
    +      logDebug("""
    +        Finished allocating %s containers (from %s originally).
    +        Current number of executors running: %d,
    +        Released containers: %s
    +        """.format(
    +          allocatedContainersToProcess,
    +          allocatedContainers,
    +          numExecutorsRunning.get(),
    +          releasedContainers))
    +    }
    +
    +    val completedContainers = 
allocateResponse.getCompletedContainersStatuses()
    +    if (completedContainers.size > 0) {
    +      logDebug("Completed %d containers".format(completedContainers.size))
    +
    +      for (completedContainer <- completedContainers) {
    +        val containerId = completedContainer.getContainerId
    +
    +        if (releasedContainers.containsKey(containerId)) {
    +          // YarnAllocationHandler already marked the container for 
release, so remove it from
    +          // `releasedContainers`.
    +          releasedContainers.remove(containerId)
    +        } else {
    +          // Decrement the number of executors running. The next iteration 
of
    +          // the ApplicationMaster's reporting thread will take care of 
allocating.
    +          numExecutorsRunning.decrementAndGet()
    +          logInfo("Completed container %s (state: %s, exit status: 
%s)".format(
    +            containerId,
    +            completedContainer.getState,
    +            completedContainer.getExitStatus()))
    +          // Hadoop 2.2.X added a ContainerExitStatus we should switch to 
use
    +          // there are some exit status' we shouldn't necessarily count 
against us, but for
    +          // now I think its ok as none of the containers are expected to 
exit
    +          if (completedContainer.getExitStatus() != 0) {
    +            logInfo("Container marked as failed: " + containerId)
    +            numExecutorsFailed.incrementAndGet()
    +          }
    +        }
    +
    +        allocatedHostToContainersMap.synchronized {
    +          if (allocatedContainerToHostMap.containsKey(containerId)) {
    +            val hostOpt = allocatedContainerToHostMap.get(containerId)
    +            assert(hostOpt.isDefined)
    +            val host = hostOpt.get
    +
    +            val containerSetOpt = allocatedHostToContainersMap.get(host)
    +            assert(containerSetOpt.isDefined)
    +            val containerSet = containerSetOpt.get
    +
    +            containerSet.remove(containerId)
    +            if (containerSet.isEmpty) {
    +              allocatedHostToContainersMap.remove(host)
    +            } else {
    +              allocatedHostToContainersMap.update(host, containerSet)
    +            }
    +
    +            allocatedContainerToHostMap.remove(containerId)
    +
    +            // TODO: Move this part outside the synchronized block?
    +            val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
    +            if (rack != null) {
    +              val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1
    +              if (rackCount > 0) {
    +                allocatedRackCount.put(rack, rackCount)
    +              } else {
    +                allocatedRackCount.remove(rack)
    +              }
    +            }
    +          }
    +        }
    +      }
    +      logDebug("""
    +        Finished processing %d completed containers.
    +        Current number of executors running: %d,
    +        Released containers: %s
    +        """.format(
    +          completedContainers.size,
    +          numExecutorsRunning.get(),
    +          releasedContainers))
    +    }
    +  }
    +
    +  protected def allocatedContainersOnHost(host: String): Int = {
    +    var retval = 0
    +    allocatedHostToContainersMap.synchronized {
    +      retval = allocatedHostToContainersMap.getOrElse(host, Set()).size
    +    }
    +    retval
    +  }
    +
    +  protected def allocatedContainersOnRack(rack: String): Int = {
    +    var retval = 0
    +    allocatedHostToContainersMap.synchronized {
    +      retval = allocatedRackCount.getOrElse(rack, 0)
    +    }
    +    retval
    +  }
    +
    +  private def isResourceConstraintSatisfied(container: Container): Boolean 
= {
    +    container.getResource.getMemory >= (executorMemory + memoryOverhead)
    +  }
    +
    +  // A simple method to copy the split info map.
    +  private def generateNodeToWeight(
    +      conf: Configuration,
    +      input: collection.Map[String, collection.Set[SplitInfo]]
    +    ): (Map[String, Int], Map[String, Int]) = {
    +
    +    if (input == null) {
    +      return (Map[String, Int](), Map[String, Int]())
    +    }
    +
    +    val hostToCount = new HashMap[String, Int]
    +    val rackToCount = new HashMap[String, Int]
    +
    +    for ((host, splits) <- input) {
    +      val hostCount = hostToCount.getOrElse(host, 0)
    +      hostToCount.put(host, hostCount + splits.size)
    +
    +      val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
    +      if (rack != null) {
    +        val rackCount = rackToCount.getOrElse(host, 0)
    +        rackToCount.put(host, rackCount + splits.size)
    +      }
    +    }
    +
    +    (hostToCount.toMap, rackToCount.toMap)
    +  }
    +
    +  private def internalReleaseContainer(container: Container) = {
    --- End diff --
    
    this is never called?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to