tgravescs commented on a change in pull request #26682: [SPARK-29306][CORE]
Stage Level Sched: Executors need to track what ResourceProfile they are
created with
URL: https://github.com/apache/spark/pull/26682#discussion_r364761891
##########
File path: core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
##########
@@ -27,121 +27,238 @@ import org.apache.spark.SparkConf
import org.apache.spark.annotation.Evolving
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
-import org.apache.spark.resource.ResourceUtils.RESOURCE_PREFIX
+import org.apache.spark.internal.config.Python.PYSPARK_EXECUTOR_MEMORY
/**
- * Resource profile to associate with an RDD. A ResourceProfile allows the
user to
- * specify executor and task requirements for an RDD that will get applied
during a
- * stage. This allows the user to change the resource requirements between
stages.
- *
- * This class is private now for initial development, once we have the feature
in place
- * this will become public.
+ * Resource profile to associate with an RDD.
+ * A ResourceProfile allows the user to specify executor and task requirements
for an RDD
+ * that will get applied during a stage. This allows the user to change the
resource
+ * requirements between stages.
+ * This is meant to be immutable so user can't change it after buliding.
*/
@Evolving
-private[spark] class ResourceProfile() extends Serializable {
+class ResourceProfile(
+ val executorResources: Map[String, ExecutorResourceRequest],
+ val taskResources: Map[String, TaskResourceRequest]) extends Serializable
with Logging {
- private val _id = ResourceProfile.getNextProfileId
- private val _taskResources = new mutable.HashMap[String,
TaskResourceRequest]()
- private val _executorResources = new mutable.HashMap[String,
ExecutorResourceRequest]()
+ // _id is only a var for testing purposes
+ private var _id = ResourceProfile.getNextProfileId
def id: Int = _id
- def taskResources: Map[String, TaskResourceRequest] = _taskResources.toMap
- def executorResources: Map[String, ExecutorResourceRequest] =
_executorResources.toMap
/**
* (Java-specific) gets a Java Map of resources to TaskResourceRequest
*/
- def taskResourcesJMap: JMap[String, TaskResourceRequest] =
_taskResources.asJava
+ def taskResourcesJMap: JMap[String, TaskResourceRequest] =
taskResources.asJava
/**
* (Java-specific) gets a Java Map of resources to ExecutorResourceRequest
*/
- def executorResourcesJMap: JMap[String, ExecutorResourceRequest] =
_executorResources.asJava
+ def executorResourcesJMap: JMap[String, ExecutorResourceRequest] = {
+ executorResources.asJava
+ }
+
+ // Note that some cluster managers don't set the executor cores explicitly so
+ // be sure to check the Option as required
+ private[spark] def getExecutorCores: Option[Int] = {
+ executorResources.get(ResourceProfile.CORES).map(_.amount.toInt)
+ }
- def reset(): Unit = {
- _taskResources.clear()
- _executorResources.clear()
+ private[spark] def getTaskCpus: Option[Int] = {
+ taskResources.get(ResourceProfile.CPUS).map(_.amount.toInt)
}
- def require(requests: ExecutorResourceRequests): this.type = {
- _executorResources ++= requests.requests
- this
+ // testing only
+ private[spark] def setToDefaultProfile(): Unit = {
+ _id = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
}
- def require(requests: TaskResourceRequests): this.type = {
- _taskResources ++= requests.requests
- this
+ override def equals(obj: Any): Boolean = {
+ obj match {
+ case that: ResourceProfile =>
+ that.getClass == this.getClass && that.id == _id &&
+ that.taskResources == taskResources && that.executorResources ==
executorResources
+ case _ =>
+ false
+ }
}
+ override def hashCode(): Int = Seq(taskResources,
executorResources).hashCode()
+
override def toString(): String = {
- s"Profile: id = ${_id}, executor resources: ${_executorResources}, " +
- s"task resources: ${_taskResources}"
+ s"Profile: id = ${_id}, executor resources:
${executorResources.mkString(",")}, " +
+ s"task resources: ${taskResources.mkString(",")}"
}
}
-private[spark] object ResourceProfile extends Logging {
- val UNKNOWN_RESOURCE_PROFILE_ID = -1
- val DEFAULT_RESOURCE_PROFILE_ID = 0
+object ResourceProfile extends Logging {
+ // task resources
val CPUS = "cpus"
+ // Executor resources
val CORES = "cores"
val MEMORY = "memory"
val OVERHEAD_MEM = "memoryOverhead"
val PYSPARK_MEM = "pyspark.memory"
+ // all supported spark executor resources (minus the custom resources like
GPUs/FPGAs)
+ val allSupportedExecutorResources = Seq(CORES, MEMORY, OVERHEAD_MEM,
PYSPARK_MEM)
+
+ val UNKNOWN_RESOURCE_PROFILE_ID = -1
+ val DEFAULT_RESOURCE_PROFILE_ID = 0
+
+ private[spark] val SPARK_RP_EXEC_PREFIX = "spark.resourceProfile.executor"
+
+ private[spark] def resourceProfileIntConfPrefix(rpId: Int): String = {
+ s"$SPARK_RP_EXEC_PREFIX.$rpId."
+ }
+
+ private[spark] def resourceProfileCustomResourceIntConfPrefix(rpId: Int):
String = {
+ s"${resourceProfileIntConfPrefix(rpId)}resource."
+ }
+
+ // Helper class for constructing the resource profile internal configs used
to pass to
+ // executors. The configs look like:
+ // spark.resourceProfile.executor.[rpId].[resourceName].[amount, vendor,
discoveryScript]
+ // Note the prefix is passed in because custom resource configs have an
extra "resource."
+ // in the name to differentiate them from the standard spark configs.
+ private[spark] case class ResourceProfileInternalConf(prefix: String,
resourceName: String) {
+ def resourceNameConf: String = s"$prefix$resourceName"
+ def resourceNameAndAmount: String =
s"$resourceName.${ResourceUtils.AMOUNT}"
+ def resourceNameAndDiscovery: String =
s"$resourceName.${ResourceUtils.DISCOVERY_SCRIPT}"
+ def resourceNameAndVendor: String =
s"$resourceName.${ResourceUtils.VENDOR}"
+
+ def amountConf: String = s"$prefix$resourceNameAndAmount"
+ def discoveryScriptConf: String = s"$prefix$resourceNameAndDiscovery"
+ def vendorConf: String = s"$prefix$resourceNameAndVendor"
+ }
+
private lazy val nextProfileId = new AtomicInteger(0)
// The default resource profile uses the application level configs.
- // Create the default profile immediately to get ID 0, its initialized later
when fetched.
- private val defaultProfileRef: AtomicReference[ResourceProfile] =
- new AtomicReference[ResourceProfile](new ResourceProfile())
+ // var so that it can be reset for testing purposes.
+ private var defaultProfileRef: AtomicReference[ResourceProfile] =
+ new AtomicReference[ResourceProfile]()
- assert(defaultProfileRef.get().id == DEFAULT_RESOURCE_PROFILE_ID,
- s"Default Profile must have the default profile id:
$DEFAULT_RESOURCE_PROFILE_ID")
+ private[spark] def getNextProfileId: Int = nextProfileId.getAndIncrement()
- def getNextProfileId: Int = nextProfileId.getAndIncrement()
-
- def getOrCreateDefaultProfile(conf: SparkConf): ResourceProfile = {
+ private[spark] def getOrCreateDefaultProfile(conf: SparkConf):
ResourceProfile = {
val defaultProf = defaultProfileRef.get()
// check to see if the default profile was initialized yet
- if (defaultProf.executorResources == Map.empty) {
- synchronized {
- val prof = defaultProfileRef.get()
- if (prof.executorResources == Map.empty) {
- addDefaultTaskResources(prof, conf)
- addDefaultExecutorResources(prof, conf)
- }
- prof
+ if (defaultProf == null) {
+ val prof = synchronized {
+ val taskResources = getDefaultTaskResources(conf)
+ val executorResources = getDefaultExecutorResources(conf)
+ val defProf = new ResourceProfile(executorResources, taskResources)
+ logInfo("Default ResourceProfile created, executor resources: " +
+ s"${defProf.executorResources}, task resources: " +
+ s"${defProf.taskResources}")
+ defProf.setToDefaultProfile
+ defProf
}
+ prof
} else {
defaultProf
}
}
- private def addDefaultTaskResources(rprof: ResourceProfile, conf:
SparkConf): Unit = {
+ private def getDefaultTaskResources(conf: SparkConf): Map[String,
TaskResourceRequest] = {
val cpusPerTask = conf.get(CPUS_PER_TASK)
val treqs = new TaskResourceRequests().cpus(cpusPerTask)
- val taskReq = ResourceUtils.parseResourceRequirements(conf,
SPARK_TASK_PREFIX)
- taskReq.foreach { req =>
- val name = s"${RESOURCE_PREFIX}.${req.resourceName}"
- treqs.resource(name, req.amount)
- }
- rprof.require(treqs)
+ ResourceUtils.addTaskResourceRequests(conf, treqs)
+ treqs.requests
}
- private def addDefaultExecutorResources(rprof: ResourceProfile, conf:
SparkConf): Unit = {
+ private def getDefaultExecutorResources(conf: SparkConf): Map[String,
ExecutorResourceRequest] = {
val ereqs = new ExecutorResourceRequests()
ereqs.cores(conf.get(EXECUTOR_CORES))
ereqs.memory(conf.get(EXECUTOR_MEMORY).toString)
+ conf.get(EXECUTOR_MEMORY_OVERHEAD).map(mem =>
ereqs.memoryOverhead(mem.toString))
+ conf.get(PYSPARK_EXECUTOR_MEMORY).map(mem =>
ereqs.pysparkMemory(mem.toString))
val execReq = ResourceUtils.parseAllResourceRequests(conf,
SPARK_EXECUTOR_PREFIX)
execReq.foreach { req =>
- val name = s"${RESOURCE_PREFIX}.${req.id.resourceName}"
+ val name = req.id.resourceName
ereqs.resource(name, req.amount, req.discoveryScript.getOrElse(""),
req.vendor.getOrElse(""))
}
- rprof.require(ereqs)
+ ereqs.requests
}
- // for testing purposes
- def resetDefaultProfile(conf: SparkConf): Unit =
getOrCreateDefaultProfile(conf).reset()
+ // for testing only
+ private[spark] def reInitDefaultProfile(conf: SparkConf): Unit = {
+ clearDefaultProfile
+ val prof = getOrCreateDefaultProfile(conf)
Review comment:
will remove and comment
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]