kiszk commented on a change in pull request #26682: [SPARK-29306][CORE] Stage 
Level Sched: Executors need to track what ResourceProfile they are created with 
URL: https://github.com/apache/spark/pull/26682#discussion_r365902407
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
 ##########
 @@ -19,129 +19,164 @@ package org.apache.spark.resource
 
 import java.util.{Map => JMap}
 import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
+import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable
 
 import org.apache.spark.SparkConf
 import org.apache.spark.annotation.Evolving
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
-import org.apache.spark.resource.ResourceUtils.RESOURCE_PREFIX
+import org.apache.spark.internal.config.Python.PYSPARK_EXECUTOR_MEMORY
 
 /**
- * Resource profile to associate with an RDD. A ResourceProfile allows the 
user to
- * specify executor and task requirements for an RDD that will get applied 
during a
- * stage. This allows the user to change the resource requirements between 
stages.
- *
- * This class is private now for initial development, once we have the feature 
in place
- * this will become public.
+ * Resource profile to associate with an RDD.
+ * A ResourceProfile allows the user to specify executor and task requirements 
for an RDD
+ * that will get applied during a stage. This allows the user to change the 
resource
+ * requirements between stages.
+ * This is meant to be immutable so user can't change it after buliding.
  */
 @Evolving
-private[spark] class ResourceProfile() extends Serializable {
+class ResourceProfile(
+    val executorResources: Map[String, ExecutorResourceRequest],
+    val taskResources: Map[String, TaskResourceRequest]) extends Serializable 
with Logging {
 
-  private val _id = ResourceProfile.getNextProfileId
-  private val _taskResources = new mutable.HashMap[String, 
TaskResourceRequest]()
-  private val _executorResources = new mutable.HashMap[String, 
ExecutorResourceRequest]()
+  // _id is only a var for testing purposes
+  private var _id = ResourceProfile.getNextProfileId
 
   def id: Int = _id
-  def taskResources: Map[String, TaskResourceRequest] = _taskResources.toMap
-  def executorResources: Map[String, ExecutorResourceRequest] = 
_executorResources.toMap
 
   /**
    * (Java-specific) gets a Java Map of resources to TaskResourceRequest
    */
-  def taskResourcesJMap: JMap[String, TaskResourceRequest] = 
_taskResources.asJava
+  def taskResourcesJMap: JMap[String, TaskResourceRequest] = 
taskResources.asJava
 
   /**
    * (Java-specific) gets a Java Map of resources to ExecutorResourceRequest
    */
-  def executorResourcesJMap: JMap[String, ExecutorResourceRequest] = 
_executorResources.asJava
+  def executorResourcesJMap: JMap[String, ExecutorResourceRequest] = {
+    executorResources.asJava
+  }
+
+  // Note that some cluster managers don't set the executor cores explicitly so
+  // be sure to check the Option as required
+ private[spark] def getExecutorCores: Option[Int] = {
 
 Review comment:
   nit: indent issue

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to