Ngone51 commented on a change in pull request #31496:
URL: https://github.com/apache/spark/pull/31496#discussion_r571388509



##########
File path: 
core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala
##########
@@ -17,71 +17,67 @@
 
 package org.apache.spark.resource
 
-import java.util.{Map => JMap}
 import java.util.concurrent.ConcurrentHashMap
 
 import scala.collection.JavaConverters._
 
+import org.apache.spark.SparkException
 import org.apache.spark.annotation.{Evolving, Since}
+import org.apache.spark.util.Utils
 
 
 /**
- * Resource profile builder to build a Resource profile to associate with an 
RDD.
- * A ResourceProfile allows the user to specify executor and task requirements 
for an RDD
- * that will get applied during a stage. This allows the user to change the 
resource
+ * Resource profile builder to build a [[ResourceProfile]] to associate with 
an RDD.
+ * A [[ResourceProfile]] allows the user to specify executor and task resource 
requirements
+ * for an RDD that will get applied during a stage. This allows the user to 
change the resource
  * requirements between stages.
  *
  */
 @Evolving
 @Since("3.1.0")
 class ResourceProfileBuilder() {
 
+  // Task resource requests that specified by users, mapped from resource name 
to the request.
   private val _taskResources = new ConcurrentHashMap[String, 
TaskResourceRequest]()
+  // Executor resource requests that specified by users, mapped from resource 
name to the request.
   private val _executorResources = new ConcurrentHashMap[String, 
ExecutorResourceRequest]()
 
-  def taskResources: Map[String, TaskResourceRequest] = 
_taskResources.asScala.toMap
-  def executorResources: Map[String, ExecutorResourceRequest] = 
_executorResources.asScala.toMap
-
-  /**
-   * (Java-specific) gets a Java Map of resources to TaskResourceRequest
-   */
-  def taskResourcesJMap: JMap[String, TaskResourceRequest] = 
_taskResources.asScala.asJava

Review comment:
       These resources should be available after we get the `ResourceProfile` 
after `build()`, and it's more reasonable to get them from `ResourceProfile` 
instead of `ResourceProfileBuilder`.

##########
File path: 
core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala
##########
@@ -132,6 +144,14 @@ class ExecutorResourceRequests() extends Serializable {
     this
   }
 
+  /**
+   * Add a certain [[ExecutorResourceRequest]] to the request set.
+   */
+  def addRequest(treq: ExecutorResourceRequest): this.type = {

Review comment:
       Add this as same to what `TaskResourceRequest` does. 

##########
File path: 
core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala
##########
@@ -17,71 +17,67 @@
 
 package org.apache.spark.resource
 
-import java.util.{Map => JMap}
 import java.util.concurrent.ConcurrentHashMap
 
 import scala.collection.JavaConverters._
 
+import org.apache.spark.SparkException
 import org.apache.spark.annotation.{Evolving, Since}
+import org.apache.spark.util.Utils
 
 
 /**
- * Resource profile builder to build a Resource profile to associate with an 
RDD.
- * A ResourceProfile allows the user to specify executor and task requirements 
for an RDD
- * that will get applied during a stage. This allows the user to change the 
resource
+ * Resource profile builder to build a [[ResourceProfile]] to associate with 
an RDD.
+ * A [[ResourceProfile]] allows the user to specify executor and task resource 
requirements
+ * for an RDD that will get applied during a stage. This allows the user to 
change the resource
  * requirements between stages.
  *
  */
 @Evolving
 @Since("3.1.0")
 class ResourceProfileBuilder() {
 
+  // Task resource requests that specified by users, mapped from resource name 
to the request.
   private val _taskResources = new ConcurrentHashMap[String, 
TaskResourceRequest]()
+  // Executor resource requests that specified by users, mapped from resource 
name to the request.
   private val _executorResources = new ConcurrentHashMap[String, 
ExecutorResourceRequest]()
 
-  def taskResources: Map[String, TaskResourceRequest] = 
_taskResources.asScala.toMap
-  def executorResources: Map[String, ExecutorResourceRequest] = 
_executorResources.asScala.toMap
-
-  /**
-   * (Java-specific) gets a Java Map of resources to TaskResourceRequest
-   */
-  def taskResourcesJMap: JMap[String, TaskResourceRequest] = 
_taskResources.asScala.asJava
-
   /**
-   * (Java-specific) gets a Java Map of resources to ExecutorResourceRequest
+   * Add executor resource requests
+   * @param requests The detailed executor resource requests, see 
[[ExecutorResourceRequests]]
+   * @return this.type
    */
-  def executorResourcesJMap: JMap[String, ExecutorResourceRequest] = {
-    _executorResources.asScala.asJava
-  }
-
-  def require(requests: ExecutorResourceRequests): this.type = {
+  def executorRequire(requests: ExecutorResourceRequests): this.type = {
     _executorResources.putAll(requests.requests.asJava)
     this
   }
 
-  def require(requests: TaskResourceRequests): this.type = {
+  /**
+   * Add task resource requests
+   * @param requests The detailed task resource requests, see 
[[TaskResourceRequest]]
+   * @return this.type
+   */
+  def taskRequire(requests: TaskResourceRequests): this.type = {
     _taskResources.putAll(requests.requests.asJava)
     this
   }
 
-  def clearExecutorResourceRequests(): this.type = {
-    _executorResources.clear()
-    this
-  }
-
-  def clearTaskResourceRequests(): this.type = {
-    _taskResources.clear()
-    this
-  }

Review comment:
       These 2 are not used anywhere. And I think it's useless for a `builder`.

##########
File path: core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala
##########
@@ -25,7 +25,7 @@ import org.apache.spark.SparkException
  * Trait used to help executor/worker allocate resources.
  * Please note that this is intended to be used in a single thread.
  */
-trait ResourceAllocator {
+private[spark] trait ResourceAllocator {

Review comment:
       This's probably mistakenly exposed in 3.0.

##########
File path: 
core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala
##########
@@ -17,71 +17,67 @@
 
 package org.apache.spark.resource
 
-import java.util.{Map => JMap}
 import java.util.concurrent.ConcurrentHashMap
 
 import scala.collection.JavaConverters._
 
+import org.apache.spark.SparkException
 import org.apache.spark.annotation.{Evolving, Since}
+import org.apache.spark.util.Utils
 
 
 /**
- * Resource profile builder to build a Resource profile to associate with an 
RDD.
- * A ResourceProfile allows the user to specify executor and task requirements 
for an RDD
- * that will get applied during a stage. This allows the user to change the 
resource
+ * Resource profile builder to build a [[ResourceProfile]] to associate with 
an RDD.
+ * A [[ResourceProfile]] allows the user to specify executor and task resource 
requirements
+ * for an RDD that will get applied during a stage. This allows the user to 
change the resource
  * requirements between stages.
  *
  */
 @Evolving
 @Since("3.1.0")
 class ResourceProfileBuilder() {
 
+  // Task resource requests that specified by users, mapped from resource name 
to the request.
   private val _taskResources = new ConcurrentHashMap[String, 
TaskResourceRequest]()
+  // Executor resource requests that specified by users, mapped from resource 
name to the request.
   private val _executorResources = new ConcurrentHashMap[String, 
ExecutorResourceRequest]()
 
-  def taskResources: Map[String, TaskResourceRequest] = 
_taskResources.asScala.toMap
-  def executorResources: Map[String, ExecutorResourceRequest] = 
_executorResources.asScala.toMap
-
-  /**
-   * (Java-specific) gets a Java Map of resources to TaskResourceRequest
-   */
-  def taskResourcesJMap: JMap[String, TaskResourceRequest] = 
_taskResources.asScala.asJava
-
   /**
-   * (Java-specific) gets a Java Map of resources to ExecutorResourceRequest
+   * Add executor resource requests
+   * @param requests The detailed executor resource requests, see 
[[ExecutorResourceRequests]]
+   * @return this.type
    */
-  def executorResourcesJMap: JMap[String, ExecutorResourceRequest] = {
-    _executorResources.asScala.asJava
-  }
-
-  def require(requests: ExecutorResourceRequests): this.type = {
+  def executorRequire(requests: ExecutorResourceRequests): this.type = {
     _executorResources.putAll(requests.requests.asJava)
     this
   }
 
-  def require(requests: TaskResourceRequests): this.type = {
+  /**
+   * Add task resource requests
+   * @param requests The detailed task resource requests, see 
[[TaskResourceRequest]]
+   * @return this.type
+   */
+  def taskRequire(requests: TaskResourceRequests): this.type = {

Review comment:
       Renamed `require` to `taskRequire` and `executorRequire` separately. So 
users can understand usage better from the naming.

##########
File path: 
core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala
##########
@@ -132,6 +144,14 @@ class ExecutorResourceRequests() extends Serializable {
     this
   }
 
+  /**
+   * Add a certain [[ExecutorResourceRequest]] to the request set.
+   */
+  def addRequest(treq: ExecutorResourceRequest): this.type = {

Review comment:
       Actually, I'm thinking do we really need to expose  
`TaskResourceRequest` as a public API? As users can always add requests from 
the above convenient APIs. cc @tgravescs 

##########
File path: core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
##########
@@ -242,20 +251,40 @@ class ResourceProfile(
 
 object ResourceProfile extends Logging {
   // task resources
+  /**
+   * built-in task resource: cpus
+   */
   val CPUS = "cpus"
   // Executor resources
   // Make sure add new executor resource in below allSupportedExecutorResources
+  /**
+   * built-in executor resource: cores
+   */
   val CORES = "cores"
+  /**
+   * built-in executor resource: cores
+   */
   val MEMORY = "memory"
+  /**
+   * built-in executor resource: offHeap
+   */
   val OFFHEAP_MEM = "offHeap"
+  /**
+   * built-in executor resource: memoryOverhead
+   */
   val OVERHEAD_MEM = "memoryOverhead"
+  /**
+   * built-in executor resource: pyspark.memory
+   */
   val PYSPARK_MEM = "pyspark.memory"
 
-  // all supported spark executor resources (minus the custom resources like 
GPUs/FPGAs)
-  val allSupportedExecutorResources = Seq(CORES, MEMORY, OVERHEAD_MEM, 
PYSPARK_MEM, OFFHEAP_MEM)
+  /**
+   * All supported Spark built-in executor resources, custom resources like 
GPUs/FPGAs are excluded.
+   */
+  val allSupportedExecutorResources = Array(CORES, MEMORY, OVERHEAD_MEM, 
PYSPARK_MEM, OFFHEAP_MEM)
 
-  val UNKNOWN_RESOURCE_PROFILE_ID = -1
-  val DEFAULT_RESOURCE_PROFILE_ID = 0
+  private[spark] val UNKNOWN_RESOURCE_PROFILE_ID = -1
+  private[spark] val DEFAULT_RESOURCE_PROFILE_ID = 0

Review comment:
       No need to expose these two, right? I'm also thinking about whether it's 
necessary to expose the method `ResourceProfile.id`. cc @tgravescs 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to