This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new 02be408 [SPARK-34384][CORE] Add missing docs for ResourceProfile APIs 02be408 is described below commit 02be4088b7b58a1fd270b212f3d97cea8561d173 Author: yi.wu <yi...@databricks.com> AuthorDate: Sun Feb 21 18:29:44 2021 +0900 [SPARK-34384][CORE] Add missing docs for ResourceProfile APIs ### What changes were proposed in this pull request? This PR adds missing docs for ResourceProfile related APIs. Besides, it includes a few minor changes on API: * ResourceProfileBuilder.build -> ResourceProfileBuilder.builder() * Provides java specific API `allSupportedExecutorResourcesJList` * private `ResourceAllocator` since it was mistakenly exposed previously ### Why are the changes needed? Add missing API docs ### Does this PR introduce _any_ user-facing change? No, as Apache Spark 3.1 hasn't officially released. ### How was this patch tested? Updated unit tests due to the signature change of `build()`. Closes #31496 from Ngone51/resource-profile-api-cleanup. Authored-by: yi.wu <yi...@databricks.com> Signed-off-by: HyukjinKwon <gurwls...@apache.org> (cherry picked from commit 546d2eb5d46813a14c7bd30113fb6bb038cdd2fc) Signed-off-by: HyukjinKwon <gurwls...@apache.org> --- .../spark/resource/ExecutorResourceRequest.scala | 4 +-- .../spark/resource/ExecutorResourceRequests.scala | 12 +++++++ .../apache/spark/resource/ResourceAllocator.scala | 2 +- .../apache/spark/resource/ResourceProfile.scala | 37 ++++++++++++++++++++-- .../spark/resource/ResourceProfileBuilder.scala | 20 +++++++++--- .../spark/resource/TaskResourceRequest.scala | 10 ++++-- .../spark/resource/TaskResourceRequests.scala | 13 +++++++- .../features/BasicExecutorFeatureStepSuite.scala | 4 +-- .../cluster/k8s/ExecutorPodsAllocatorSuite.scala | 2 +- 9 files changed, 88 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequest.scala b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequest.scala index 3e3db7e..76af41a 100644 --- a/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequest.scala +++ b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequest.scala @@ -20,7 +20,7 @@ package org.apache.spark.resource import org.apache.spark.annotation.{Evolving, Since} /** - * An Executor resource request. This is used in conjunction with the ResourceProfile to + * An Executor resource request. This is used in conjunction with the [[ResourceProfile]] to * programmatically specify the resources needed for an RDD that will be applied at the * stage level. * @@ -39,7 +39,7 @@ import org.apache.spark.annotation.{Evolving, Since} * * See the configuration and cluster specific docs for more details. * - * Use ExecutorResourceRequests class as a convenience API. + * Use [[ExecutorResourceRequests]] class as a convenience API. * * @param resourceName Name of the resource * @param amount Amount requesting diff --git a/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala index 654afa0..b6992f4 100644 --- a/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala +++ b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala @@ -37,12 +37,19 @@ class ExecutorResourceRequests() extends Serializable { private val _executorResources = new ConcurrentHashMap[String, ExecutorResourceRequest]() + /** + * Returns all the resource requests for the task. + */ def requests: Map[String, ExecutorResourceRequest] = _executorResources.asScala.toMap + /** + * (Java-specific) Returns all the resource requests for the executor. + */ def requestsJMap: JMap[String, ExecutorResourceRequest] = requests.asJava /** * Specify heap memory. The value specified will be converted to MiB. + * This is a convenient API to add [[ExecutorResourceRequest]] for "memory" resource. * * @param amount Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g). * Default unit is MiB if not specified. @@ -57,6 +64,7 @@ class ExecutorResourceRequests() extends Serializable { /** * Specify off heap memory. The value specified will be converted to MiB. * This value only take effect when MEMORY_OFFHEAP_ENABLED is true. + * This is a convenient API to add [[ExecutorResourceRequest]] for "offHeap" resource. * * @param amount Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g). * Default unit is MiB if not specified. @@ -70,6 +78,7 @@ class ExecutorResourceRequests() extends Serializable { /** * Specify overhead memory. The value specified will be converted to MiB. + * This is a convenient API to add [[ExecutorResourceRequest]] for "memoryOverhead" resource. * * @param amount Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g). * Default unit is MiB if not specified. @@ -83,6 +92,7 @@ class ExecutorResourceRequests() extends Serializable { /** * Specify pyspark memory. The value specified will be converted to MiB. + * This is a convenient API to add [[ExecutorResourceRequest]] for "pyspark.memory" resource. * * @param amount Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g). * Default unit is MiB if not specified. @@ -96,6 +106,7 @@ class ExecutorResourceRequests() extends Serializable { /** * Specify number of cores per Executor. + * This is a convenient API to add [[ExecutorResourceRequest]] for "cores" resource. * * @param amount Number of cores to allocate per Executor. */ @@ -111,6 +122,7 @@ class ExecutorResourceRequests() extends Serializable { * like GPUs are gpu (spark configs spark.executor.resource.gpu.*). If you pass in a resource * that the cluster manager doesn't support the result is undefined, it may error or may just * be ignored. + * This is a convenient API to add [[ExecutorResourceRequest]] for custom resources. * * @param resourceName Name of the resource. * @param amount amount of that resource per executor to use. diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala b/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala index 22d10a9..7605e8c 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala @@ -25,7 +25,7 @@ import org.apache.spark.SparkException * Trait used to help executor/worker allocate resources. * Please note that this is intended to be used in a single thread. */ -trait ResourceAllocator { +private[spark] trait ResourceAllocator { protected def resourceName: String protected def resourceAddresses: Seq[String] diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala index ac7e8e8..1ebd8bd 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala @@ -35,7 +35,13 @@ import org.apache.spark.util.Utils * Resource profile to associate with an RDD. A ResourceProfile allows the user to * specify executor and task requirements for an RDD that will get applied during a * stage. This allows the user to change the resource requirements between stages. - * This is meant to be immutable so user can't change it after building. + * This is meant to be immutable so user can't change it after building. Users + * should use [[ResourceProfileBuilder]] to build it. + * + * @param executorResources Resource requests for executors. Mapped from the resource + * name (e.g., cores, memory, CPU) to its specific request. + * @param taskResources Resource requests for tasks. Mapped from the resource + * name (e.g., cores, memory, CPU) to its specific request. */ @Evolving @Since("3.1.0") @@ -53,6 +59,9 @@ class ResourceProfile( private var _maxTasksPerExecutor: Option[Int] = None private var _coresLimitKnown: Boolean = false + /** + * A unique id of this ResourceProfile + */ def id: Int = _id /** @@ -242,17 +251,39 @@ class ResourceProfile( object ResourceProfile extends Logging { // task resources + /** + * built-in task resource: cpus + */ val CPUS = "cpus" // Executor resources // Make sure add new executor resource in below allSupportedExecutorResources + /** + * built-in executor resource: cores + */ val CORES = "cores" + /** + * built-in executor resource: cores + */ val MEMORY = "memory" + /** + * built-in executor resource: offHeap + */ val OFFHEAP_MEM = "offHeap" + /** + * built-in executor resource: memoryOverhead + */ val OVERHEAD_MEM = "memoryOverhead" + /** + * built-in executor resource: pyspark.memory + */ val PYSPARK_MEM = "pyspark.memory" - // all supported spark executor resources (minus the custom resources like GPUs/FPGAs) - val allSupportedExecutorResources = Seq(CORES, MEMORY, OVERHEAD_MEM, PYSPARK_MEM, OFFHEAP_MEM) + /** + * Return all supported Spark built-in executor resources, custom resources like GPUs/FPGAs + * are excluded. + */ + def allSupportedExecutorResources: Array[String] = + Array(CORES, MEMORY, OVERHEAD_MEM, PYSPARK_MEM, OFFHEAP_MEM) val UNKNOWN_RESOURCE_PROFILE_ID = -1 val DEFAULT_RESOURCE_PROFILE_ID = 0 diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala index 29a117b..f6b30d3 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala @@ -26,9 +26,9 @@ import org.apache.spark.annotation.{Evolving, Since} /** - * Resource profile builder to build a Resource profile to associate with an RDD. - * A ResourceProfile allows the user to specify executor and task requirements for an RDD - * that will get applied during a stage. This allows the user to change the resource + * Resource profile builder to build a [[ResourceProfile]] to associate with an RDD. + * A [[ResourceProfile]] allows the user to specify executor and task resource requirements + * for an RDD that will get applied during a stage. This allows the user to change the resource * requirements between stages. * */ @@ -36,7 +36,9 @@ import org.apache.spark.annotation.{Evolving, Since} @Since("3.1.0") class ResourceProfileBuilder() { + // Task resource requests specified by users, mapped from resource name to the request. private val _taskResources = new ConcurrentHashMap[String, TaskResourceRequest]() + // Executor resource requests specified by users, mapped from resource name to the request. private val _executorResources = new ConcurrentHashMap[String, ExecutorResourceRequest]() def taskResources: Map[String, TaskResourceRequest] = _taskResources.asScala.toMap @@ -54,11 +56,21 @@ class ResourceProfileBuilder() { _executorResources.asScala.asJava } + /** + * Add executor resource requests + * @param requests The detailed executor resource requests, see [[ExecutorResourceRequests]] + * @return This ResourceProfileBuilder + */ def require(requests: ExecutorResourceRequests): this.type = { _executorResources.putAll(requests.requests.asJava) this } + /** + * Add task resource requests + * @param requests The detailed task resource requests, see [[TaskResourceRequest]] + * @return This ResourceProfileBuilder + */ def require(requests: TaskResourceRequests): this.type = { _taskResources.putAll(requests.requests.asJava) this @@ -80,7 +92,7 @@ class ResourceProfileBuilder() { s"task resources: ${_taskResources.asScala.map(pair => s"${pair._1}=${pair._2.toString()}")}" } - def build: ResourceProfile = { + def build(): ResourceProfile = { new ResourceProfile(executorResources, taskResources) } } diff --git a/core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala index 12ef342..cbd5780 100644 --- a/core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala +++ b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala @@ -20,11 +20,17 @@ package org.apache.spark.resource import org.apache.spark.annotation.{Evolving, Since} /** - * A task resource request. This is used in conjunction with the ResourceProfile to + * A task resource request. This is used in conjunction with the [[ResourceProfile]] to * programmatically specify the resources needed for an RDD that will be applied at the * stage level. * - * Use TaskResourceRequests class as a convenience API. + * Use [[TaskResourceRequests]] class as a convenience API. + * + * @param resourceName Resource name + * @param amount Amount requesting as a Double to support fractional resource requests. + * Valid values are less than or equal to 0.5 or whole numbers. This essentially + * lets you configure X number of tasks to run on a single resource, + * ie amount equals 0.5 translates into 2 tasks per resource address. */ @Evolving @Since("3.1.0") diff --git a/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala index b4e70b3..1d5fc73 100644 --- a/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala +++ b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala @@ -36,12 +36,19 @@ class TaskResourceRequests() extends Serializable { private val _taskResources = new ConcurrentHashMap[String, TaskResourceRequest]() + /** + * Returns all the resource requests for the task. + */ def requests: Map[String, TaskResourceRequest] = _taskResources.asScala.toMap + /** + * (Java-specific) Returns all the resource requests for the task. + */ def requestsJMap: JMap[String, TaskResourceRequest] = requests.asJava /** * Specify number of cpus per Task. + * This is a convenient API to add [[TaskResourceRequest]] for cpus. * * @param amount Number of cpus to allocate per Task. */ @@ -52,7 +59,8 @@ class TaskResourceRequests() extends Serializable { } /** - * Amount of a particular custom resource(GPU, FPGA, etc) to use. + * Amount of a particular custom resource(GPU, FPGA, etc) to use. + * This is a convenient API to add [[TaskResourceRequest]] for custom resources. * * @param resourceName Name of the resource. * @param amount Amount requesting as a Double to support fractional resource requests. @@ -66,6 +74,9 @@ class TaskResourceRequests() extends Serializable { this } + /** + * Add a certain [[TaskResourceRequest]] to the request set. + */ def addRequest(treq: TaskResourceRequest): this.type = { _taskResources.put(treq.resourceName, treq) this diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index 44e8e57..66ece81 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -290,7 +290,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { ereq.cores(4).memory("2g").memoryOverhead("1g").pysparkMemory("3g") treq.cpus(2) rpb.require(ereq).require(treq) - val rp = rpb.build + val rp = rpb.build() val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf), rp) val executor = step.configurePod(SparkPod.initialPod()) @@ -307,7 +307,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { ereq.cores(2).resource("gpu", 2, "/path/getGpusResources.sh", "nvidia.com") treq.cpus(1) rpb.require(ereq).require(treq) - val rp = rpb.build + val rp = rpb.build() val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf), rp) val executor = step.configurePod(SparkPod.initialPod()) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index c7e2f1a..349bbcd 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -325,7 +325,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { ereq.cores(4).memory("2g") treq.cpus(2) rpb.require(ereq).require(treq) - val rp = rpb.build + val rp = rpb.build() // Target 1 executor for default profile, 2 for other profile, // make sure it's requested, even with an empty initial snapshot. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org