tgravescs commented on a change in pull request #26682: [SPARK-29306][CORE] Stage Level Sched: Executors need to track what ResourceProfile they are created with URL: https://github.com/apache/spark/pull/26682#discussion_r367452689
########## File path: core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala ########## @@ -18,130 +18,164 @@ package org.apache.spark.resource import java.util.{Map => JMap} -import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} +import java.util.concurrent.atomic.AtomicInteger +import javax.annotation.concurrent.GuardedBy import scala.collection.JavaConverters._ -import scala.collection.mutable import org.apache.spark.SparkConf import org.apache.spark.annotation.Evolving import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ -import org.apache.spark.resource.ResourceUtils.RESOURCE_PREFIX +import org.apache.spark.internal.config.Python.PYSPARK_EXECUTOR_MEMORY /** * Resource profile to associate with an RDD. A ResourceProfile allows the user to * specify executor and task requirements for an RDD that will get applied during a * stage. This allows the user to change the resource requirements between stages. - * - * This class is private now for initial development, once we have the feature in place - * this will become public. + * This is meant to be immutable so user can't change it after building. */ @Evolving -private[spark] class ResourceProfile() extends Serializable { +class ResourceProfile( + val executorResources: Map[String, ExecutorResourceRequest], + val taskResources: Map[String, TaskResourceRequest]) extends Serializable with Logging { - private val _id = ResourceProfile.getNextProfileId - private val _taskResources = new mutable.HashMap[String, TaskResourceRequest]() - private val _executorResources = new mutable.HashMap[String, ExecutorResourceRequest]() + // _id is only a var for testing purposes + private var _id = ResourceProfile.getNextProfileId def id: Int = _id - def taskResources: Map[String, TaskResourceRequest] = _taskResources.toMap - def executorResources: Map[String, ExecutorResourceRequest] = _executorResources.toMap /** * (Java-specific) gets a Java Map of resources to TaskResourceRequest */ - def taskResourcesJMap: JMap[String, TaskResourceRequest] = _taskResources.asJava + def taskResourcesJMap: JMap[String, TaskResourceRequest] = taskResources.asJava /** * (Java-specific) gets a Java Map of resources to ExecutorResourceRequest */ - def executorResourcesJMap: JMap[String, ExecutorResourceRequest] = _executorResources.asJava + def executorResourcesJMap: JMap[String, ExecutorResourceRequest] = { + executorResources.asJava + } + + // Note that some cluster managers don't set the executor cores explicitly so + // be sure to check the Option as required + private[spark] def getExecutorCores: Option[Int] = { + executorResources.get(ResourceProfile.CORES).map(_.amount.toInt) + } - def reset(): Unit = { - _taskResources.clear() - _executorResources.clear() + private[spark] def getTaskCpus: Option[Int] = { + taskResources.get(ResourceProfile.CPUS).map(_.amount.toInt) } - def require(requests: ExecutorResourceRequests): this.type = { - _executorResources ++= requests.requests - this + // testing only + private[spark] def setToDefaultProfile(): Unit = { + _id = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID } - def require(requests: TaskResourceRequests): this.type = { - _taskResources ++= requests.requests - this + override def equals(obj: Any): Boolean = { + obj match { + case that: ResourceProfile => + that.getClass == this.getClass && that.id == _id && + that.taskResources == taskResources && that.executorResources == executorResources + case _ => + false + } } + override def hashCode(): Int = Seq(taskResources, executorResources).hashCode() + override def toString(): String = { - s"Profile: id = ${_id}, executor resources: ${_executorResources}, " + - s"task resources: ${_taskResources}" + s"Profile: id = ${_id}, executor resources: ${executorResources.mkString(",")}, " + + s"task resources: ${taskResources.mkString(",")}" } } -private[spark] object ResourceProfile extends Logging { - val UNKNOWN_RESOURCE_PROFILE_ID = -1 - val DEFAULT_RESOURCE_PROFILE_ID = 0 - +object ResourceProfile extends Logging { + // task resources val CPUS = "cpus" + // Executor resources val CORES = "cores" val MEMORY = "memory" val OVERHEAD_MEM = "memoryOverhead" val PYSPARK_MEM = "pyspark.memory" + // all supported spark executor resources (minus the custom resources like GPUs/FPGAs) + val allSupportedExecutorResources = Seq(CORES, MEMORY, OVERHEAD_MEM, PYSPARK_MEM) + + val UNKNOWN_RESOURCE_PROFILE_ID = -1 + val DEFAULT_RESOURCE_PROFILE_ID = 0 Review comment: Note that UNKNOWN is used other places in the code to really mean we don't know what the profile is. For instance when we get messages out of order for executor tracking - like a task start before an executor added, we mark it as unknown until we get to the executor added message ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org