ivoson commented on code in PR #37268:
URL: https://github.com/apache/spark/pull/37268#discussion_r964917903
##########
core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala:
##########
@@ -59,35 +59,64 @@ private[spark] class ResourceProfileManager(sparkConf:
SparkConf,
private val testExceptionThrown =
sparkConf.get(RESOURCE_PROFILE_MANAGER_TESTING)
/**
- * If we use anything except the default profile, it's only supported on
YARN and Kubernetes
- * with dynamic allocation enabled. Throw an exception if not supported.
+ * If we use anything except the default profile, it's supported on YARN,
Kubernetes and
+ * Standalone with dynamic allocation enabled, and task resource profile
with dynamic allocation
+ * disabled on Standalone. Throw an exception if not supported.
*/
private[spark] def isSupported(rp: ResourceProfile): Boolean = {
- val isNotDefaultProfile = rp.id !=
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
- val notYarnOrK8sOrStandaloneAndNotDefaultProfile =
- isNotDefaultProfile && !(isYarn || isK8s || isStandalone)
- val YarnOrK8sOrStandaloneNotDynAllocAndNotDefaultProfile =
- isNotDefaultProfile && (isYarn || isK8s || isStandalone) &&
!dynamicEnabled
- // We want the exception to be thrown only when we are specifically
testing for the
- // exception or in a real application. Otherwise in all other testing
scenarios we want
- // to skip throwing the exception so that we can test in other modes to
make testing easier.
- if ((notRunningUnitTests || testExceptionThrown) &&
+ if (rp.isInstanceOf[TaskResourceProfile] && !dynamicEnabled) {
+ if ((notRunningUnitTests || testExceptionThrown) && !isStandalone) {
+ throw new SparkException("TaskResourceProfiles are only supported for
Standalone " +
+ "cluster for now when dynamic allocation is disabled.")
+ }
+ } else {
+ val isNotDefaultProfile = rp.id !=
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
+ val notYarnOrK8sOrStandaloneAndNotDefaultProfile =
+ isNotDefaultProfile && !(isYarn || isK8s || isStandalone)
+ val YarnOrK8sOrStandaloneNotDynAllocAndNotDefaultProfile =
+ isNotDefaultProfile && (isYarn || isK8s || isStandalone) &&
!dynamicEnabled
+
+ // We want the exception to be thrown only when we are specifically
testing for the
+ // exception or in a real application. Otherwise in all other testing
scenarios we want
+ // to skip throwing the exception so that we can test in other modes to
make testing easier.
+ if ((notRunningUnitTests || testExceptionThrown) &&
(notYarnOrK8sOrStandaloneAndNotDefaultProfile ||
YarnOrK8sOrStandaloneNotDynAllocAndNotDefaultProfile)) {
- throw new SparkException("ResourceProfiles are only supported on YARN
and Kubernetes " +
- "and Standalone with dynamic allocation enabled.")
- }
+ throw new SparkException("ResourceProfiles are only supported on YARN
and Kubernetes " +
+ "and Standalone with dynamic allocation enabled.")
+ }
- if (isStandalone && rp.getExecutorCores.isEmpty &&
- sparkConf.getOption(config.EXECUTOR_CORES.key).isEmpty) {
- logWarning("Neither executor cores is set for resource profile, nor
spark.executor.cores " +
- "is explicitly set, you may get more executors allocated than
expected. It's recommended " +
- "to set executor cores explicitly. Please check SPARK-30299 for more
details.")
+ if (isStandalone && dynamicEnabled && rp.getExecutorCores.isEmpty &&
+ sparkConf.getOption(config.EXECUTOR_CORES.key).isEmpty) {
+ logWarning("Neither executor cores is set for resource profile, nor
spark.executor.cores " +
+ "is explicitly set, you may get more executors allocated than
expected. " +
+ "It's recommended to set executor cores explicitly. " +
+ "Please check SPARK-30299 for more details.")
+ }
}
true
}
+ /**
+ * Check whether a task with specific taskRpId can be scheduled to executors
+ * with executorRpId.
+ *
+ * Here are the rules:
+ * 1. Tasks with [[TaskResourceProfile]] can be scheduled to executors with
+ * default resource profile when dynamic allocation is disabled.
+ * 2. Other tasks can be scheduled to executors where resource profile
exactly matches.
+ */
+ private[spark] def canBeScheduled(taskRpId: Int, executorRpId: Int): Boolean
= {
+ assert(resourceProfileIdToResourceProfile.contains(taskRpId) &&
+ resourceProfileIdToResourceProfile.contains(executorRpId),
+ "Tasks and executors must have valid resource profile id")
+ val taskRp = resourceProfileFromId(taskRpId)
+
+ taskRpId == executorRpId || (!dynamicEnabled &&
taskRp.isInstanceOf[TaskResourceProfile] &&
+ executorRpId == ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
Review Comment:
In the UI page, it shows resource profile id and task resources requests.
Here is an example: cc @tgravescs @Ngone51

About the condition, we compare rp id here just as what we do when dynamic
allocation is enabled before. Here we just filter the executors which match the
resource profile bound to a `taskSet`, and when assign tasks,
`TaskSchedulerImpl` will further check the executor's available resources.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]