tgravescs commented on code in PR #37268:
URL: https://github.com/apache/spark/pull/37268#discussion_r980053270


##########
core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala:
##########
@@ -253,6 +263,39 @@ class ResourceProfile(
   }
 }
 
+/**
+ * Resource profile which only contains task resources, can be used for stage 
level task schedule
+ * when dynamic allocation is disabled, tasks will be scheduled to executors 
with default resource
+ * profile based on task resources described by this task resource profile.
+ * And when dynamic allocation is enabled, will require new executors for this 
profile base on
+ * default build-in executor resources and assign tasks by resource profile id.

Review Comment:
   should say something like: based on the default executor resources requested 
at startup and assign tasks only on executors created with this resource 
profile.



##########
core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala:
##########
@@ -59,35 +59,65 @@ private[spark] class ResourceProfileManager(sparkConf: 
SparkConf,
   private val testExceptionThrown = 
sparkConf.get(RESOURCE_PROFILE_MANAGER_TESTING)
 
   /**
-   * If we use anything except the default profile, it's only supported on 
YARN and Kubernetes
-   * with dynamic allocation enabled. Throw an exception if not supported.
+   * If we use anything except the default profile, it's supported on YARN, 
Kubernetes and
+   * Standalone with dynamic allocation enabled, and task resource profile 
with dynamic allocation
+   * disabled on Standalone. Throw an exception if not supported.
    */
   private[spark] def isSupported(rp: ResourceProfile): Boolean = {
-    val isNotDefaultProfile = rp.id != 
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
-    val notYarnOrK8sOrStandaloneAndNotDefaultProfile =
-      isNotDefaultProfile && !(isYarn || isK8s || isStandalone)
-    val YarnOrK8sOrStandaloneNotDynAllocAndNotDefaultProfile =
-      isNotDefaultProfile && (isYarn || isK8s || isStandalone) && 
!dynamicEnabled
-    // We want the exception to be thrown only when we are specifically 
testing for the
-    // exception or in a real application. Otherwise in all other testing 
scenarios we want
-    // to skip throwing the exception so that we can test in other modes to 
make testing easier.
-    if ((notRunningUnitTests || testExceptionThrown) &&
+    if (rp.isInstanceOf[TaskResourceProfile] && !dynamicEnabled) {
+      if ((notRunningUnitTests || testExceptionThrown) && !isStandalone) {
+        throw new SparkException("TaskResourceProfiles are only supported for 
Standalone " +
+          "cluster for now when dynamic allocation is disabled.")
+      }
+    } else {
+      val isNotDefaultProfile = rp.id != 
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
+      val notYarnOrK8sOrStandaloneAndNotDefaultProfile =
+        isNotDefaultProfile && !(isYarn || isK8s || isStandalone)
+      val YarnOrK8sOrStandaloneNotDynAllocAndNotDefaultProfile =
+        isNotDefaultProfile && (isYarn || isK8s || isStandalone) && 
!dynamicEnabled
+
+      // We want the exception to be thrown only when we are specifically 
testing for the
+      // exception or in a real application. Otherwise in all other testing 
scenarios we want
+      // to skip throwing the exception so that we can test in other modes to 
make testing easier.
+      if ((notRunningUnitTests || testExceptionThrown) &&
         (notYarnOrK8sOrStandaloneAndNotDefaultProfile ||
           YarnOrK8sOrStandaloneNotDynAllocAndNotDefaultProfile)) {
-      throw new SparkException("ResourceProfiles are only supported on YARN 
and Kubernetes " +
-        "and Standalone with dynamic allocation enabled.")
-    }
+        throw new SparkException("ResourceProfiles are only supported on YARN 
and Kubernetes " +
+          "and Standalone with dynamic allocation enabled.")
+      }
 
-    if (isStandalone && rp.getExecutorCores.isEmpty &&
-      sparkConf.getOption(config.EXECUTOR_CORES.key).isEmpty) {
-      logWarning("Neither executor cores is set for resource profile, nor 
spark.executor.cores " +
-        "is explicitly set, you may get more executors allocated than 
expected. It's recommended " +
-        "to set executor cores explicitly. Please check SPARK-30299 for more 
details.")
+      if (isStandalone && dynamicEnabled && rp.getExecutorCores.isEmpty &&
+        sparkConf.getOption(config.EXECUTOR_CORES.key).isEmpty) {
+        logWarning("Neither executor cores is set for resource profile, nor 
spark.executor.cores " +
+          "is explicitly set, you may get more executors allocated than 
expected. " +
+          "It's recommended to set executor cores explicitly. " +
+          "Please check SPARK-30299 for more details.")
+      }
     }
 
     true
   }
 
+  /**
+   * Check whether a task with specific taskRpId can be scheduled to executors
+   * with executorRpId.
+   *
+   * Here are the rules:
+   * 1. Tasks with [[TaskResourceProfile]] can be scheduled to executors with

Review Comment:
   this makes it sounds like that is the only time TaskResourceProfile used, 
perhaps put dynamic allocation disabled and tasks with TaskResourceProfile.... 
or possibly add another rule that has dynamic allocation enabled and 
TaskResourceProfile...



##########
core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala:
##########
@@ -59,35 +59,65 @@ private[spark] class ResourceProfileManager(sparkConf: 
SparkConf,
   private val testExceptionThrown = 
sparkConf.get(RESOURCE_PROFILE_MANAGER_TESTING)
 
   /**
-   * If we use anything except the default profile, it's only supported on 
YARN and Kubernetes
-   * with dynamic allocation enabled. Throw an exception if not supported.
+   * If we use anything except the default profile, it's supported on YARN, 
Kubernetes and
+   * Standalone with dynamic allocation enabled, and task resource profile 
with dynamic allocation
+   * disabled on Standalone. Throw an exception if not supported.
    */
   private[spark] def isSupported(rp: ResourceProfile): Boolean = {
-    val isNotDefaultProfile = rp.id != 
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
-    val notYarnOrK8sOrStandaloneAndNotDefaultProfile =
-      isNotDefaultProfile && !(isYarn || isK8s || isStandalone)
-    val YarnOrK8sOrStandaloneNotDynAllocAndNotDefaultProfile =
-      isNotDefaultProfile && (isYarn || isK8s || isStandalone) && 
!dynamicEnabled
-    // We want the exception to be thrown only when we are specifically 
testing for the
-    // exception or in a real application. Otherwise in all other testing 
scenarios we want
-    // to skip throwing the exception so that we can test in other modes to 
make testing easier.
-    if ((notRunningUnitTests || testExceptionThrown) &&
+    if (rp.isInstanceOf[TaskResourceProfile] && !dynamicEnabled) {

Review Comment:
   yes it can be used with dynamic allocation, in that case it uses the default 
resource profile executor resources but it must acquire new executors.  The 
TaskResourceProfile gets a unique rpid just like standard resource profile and 
it should go through the same path to get executors via dynamic allocation like 
a normal ResourceProfile (ie stage submitted kicks off).   Is there something 
I'm not thinking about here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to