mridulm commented on a change in pull request #30204:
URL: https://github.com/apache/spark/pull/30204#discussion_r516532600



##########
File path: 
core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
##########
@@ -52,18 +52,21 @@ private[spark] class ResourceProfileManager(sparkConf: 
SparkConf,
 
   private val dynamicEnabled = Utils.isDynamicAllocationEnabled(sparkConf)
   private val master = sparkConf.getOption("spark.master")
-  private val isNotYarn = master.isDefined && !master.get.equals("yarn")
+  private val isYarn = master.isDefined && master.get.equals("yarn")
+  private val isK8s = master.isDefined && master.get.startsWith("k8s://")
   private val errorForTesting = !isTesting || 
sparkConf.get(RESOURCE_PROFILE_MANAGER_TESTING)
 
   // If we use anything except the default profile, its only supported on YARN 
right now.
   // Throw an exception if not supported.
   private[spark] def isSupported(rp: ResourceProfile): Boolean = {
     val isNotDefaultProfile = rp.id != 
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
-    val notYarnAndNotDefaultProfile = isNotDefaultProfile && isNotYarn
-    val YarnNotDynAllocAndNotDefaultProfile = isNotDefaultProfile && 
!isNotYarn && !dynamicEnabled
-    if (errorForTesting && (notYarnAndNotDefaultProfile || 
YarnNotDynAllocAndNotDefaultProfile)) {
-      throw new SparkException("ResourceProfiles are only supported on YARN 
with dynamic " +
-        "allocation enabled.")
+    val notYarnOrK8sAndNotDefaultProfile = isNotDefaultProfile && !(isYarn || 
isK8s)
+    val YarnOrK8sNotDynAllocAndNotDefaultProfile =
+      isNotDefaultProfile && (isYarn || isK8s) && !dynamicEnabled
+    if (errorForTesting &&

Review comment:
       The `!isTesting` in `errorForTesting` threw me off :-)

##########
File path: docs/running-on-kubernetes.md
##########
@@ -1399,3 +1399,6 @@ Spark automatically handles translating the Spark configs 
<code>spark.{driver/ex
 
 Kubernetes does not tell Spark the addresses of the resources allocated to 
each container. For that reason, the user must specify a discovery script that 
gets run by the executor on startup to discover what resources are available to 
that executor. You can find an example scripts in 
`examples/src/main/scripts/getGpusResources.sh`. The script must have execute 
permissions set and the user should setup permissions to not allow malicious 
users to modify it. The script should write to STDOUT a JSON string in the 
format of the ResourceInformation class. This has the resource name and an 
array of resource addresses available to just that executor.
 
+### Stage Level Scheduling Overview
+
+Stage level scheduling is supported on Kubernetes when dynamic allocation is 
enabled. This also requires 
<code>spark.dynamicAllocation.shuffleTracking.enabled</code> to be enabled 
since Kubernetes doesn't support an external shuffle service a this time. The 
order in which containers for different profiles is requested from Kubernetes 
is not guaranteed. Note that since dynamic allocation on Kubernetes requires 
the shuffle tracking feature, this means that executors from previous stages 
that used a different ResourceProfile may not idle timeout due to having 
shuffle data on them. This could result in using more cluster resources and in 
the worst case if there are no remaining resources on the Kubernetes cluster 
then Spark could potentially hang. You may consider looking at config 
<code>spark.dynamicAllocation.shuffleTracking.timeout</code> to set a timeout, 
but that could result in data having to be recomputed if the shuffle data is 
really needed.

Review comment:
       nit: shuffle service a this time -> shuffle service at this time

##########
File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
##########
@@ -50,25 +53,42 @@ private[spark] class BasicExecutorFeatureStep(
     kubernetesConf.get(DRIVER_HOST_ADDRESS),
     kubernetesConf.sparkConf.getInt(DRIVER_PORT.key, DEFAULT_DRIVER_PORT),
     CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
-  private val executorMemoryMiB = kubernetesConf.get(EXECUTOR_MEMORY)
-  private val executorMemoryString = kubernetesConf.get(
-    EXECUTOR_MEMORY.key, EXECUTOR_MEMORY.defaultValueString)
 
-  private val memoryOverheadMiB = kubernetesConf
+  private var executorMemoryMiB = kubernetesConf.get(EXECUTOR_MEMORY)
+
+  private var memoryOverheadMiB = kubernetesConf
     .get(EXECUTOR_MEMORY_OVERHEAD)
     .getOrElse(math.max(
       (kubernetesConf.get(MEMORY_OVERHEAD_FACTOR) * executorMemoryMiB).toInt,
       MEMORY_OVERHEAD_MIN_MIB))
-  private val executorMemoryWithOverhead = executorMemoryMiB + 
memoryOverheadMiB
-  private val executorMemoryTotal =
-    if (kubernetesConf.get(APP_RESOURCE_TYPE) == 
Some(APP_RESOURCE_TYPE_PYTHON)) {
-      executorMemoryWithOverhead +
-        kubernetesConf.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0)
-    } else {
-      executorMemoryWithOverhead
+
+  private var executorCores = kubernetesConf.sparkConf.get(EXECUTOR_CORES)
+
+  private var pysparkMemoryMiB =
+    
kubernetesConf.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0).toLong
+
+  private var memoryOffHeapMiB = 
Utils.executorOffHeapMemorySizeAsMb(kubernetesConf.sparkConf)
+
+  private val customResources = new mutable.HashSet[ExecutorResourceRequest]
+  resourceProfile.executorResources.foreach { case (resource, execReq) =>
+    resource match {
+      case ResourceProfile.MEMORY =>
+        executorMemoryMiB = execReq.amount
+      case ResourceProfile.OVERHEAD_MEM =>
+        memoryOverheadMiB = execReq.amount
+      case ResourceProfile.PYSPARK_MEM =>
+        pysparkMemoryMiB = execReq.amount
+      case ResourceProfile.OFFHEAP_MEM =>
+        memoryOffHeapMiB = execReq.amount.toInt
+      case ResourceProfile.CORES =>
+        executorCores = execReq.amount.toInt
+      case rName =>
+        customResources += execReq
     }
+  }

Review comment:
       nit: Can we unify this with `createYarnResourceForResourceProfile` in 
YarnAllocator ?
   (They diverge w.r.t gpu/fpga : though I assume that is coming for k8s too).

##########
File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
##########
@@ -114,7 +124,7 @@ private[spark] class ExecutorPodsAllocator(
     // both the creation and deletion events. In either case, delete the 
missing pod

Review comment:
       How often is `onNewSnapshots` called ?
   Trying to understand the cost of this method given the changes.

##########
File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
##########
@@ -147,136 +157,171 @@ private[spark] class ExecutorPodsAllocator(
       lastSnapshot = snapshots.last
     }
 
-    val currentRunningCount = lastSnapshot.executorPods.values.count {
-      case PodRunning(_) => true
-      case _ => false
-    }
-
-    val currentPendingExecutors = lastSnapshot.executorPods
-      .filter {
-        case (_, PodPending(_)) => true
-        case _ => false
-      }
-
     // Make a local, non-volatile copy of the reference since it's used 
multiple times. This
     // is the only method that modifies the list, so this is safe.
     var _deletedExecutorIds = deletedExecutorIds
-
     if (snapshots.nonEmpty) {
-      logDebug(s"Pod allocation status: $currentRunningCount running, " +
-        s"${currentPendingExecutors.size} pending, " +
-        s"${newlyCreatedExecutors.size} unacknowledged.")
-
       val existingExecs = lastSnapshot.executorPods.keySet
       _deletedExecutorIds = _deletedExecutorIds.filter(existingExecs.contains)
     }
 
-    val currentTotalExpectedExecutors = totalExpectedExecutors.get
-
-    // This variable is used later to print some debug logs. It's updated when 
cleaning up
-    // excess pod requests, since currentPendingExecutors is immutable.
-    var knownPendingCount = currentPendingExecutors.size
-
-    // It's possible that we have outstanding pods that are outdated when 
dynamic allocation
-    // decides to downscale the application. So check if we can release any 
pending pods early
-    // instead of waiting for them to time out. Drop them first from the 
unacknowledged list,
-    // then from the pending. However, in order to prevent too frequent 
frunctuation, newly
-    // requested pods are protected during executorIdleTimeout period.
-    //
-    // TODO: with dynamic allocation off, handle edge cases if we end up with 
more running
-    // executors than expected.
-    val knownPodCount = currentRunningCount + currentPendingExecutors.size +
-      newlyCreatedExecutors.size
-    if (knownPodCount > currentTotalExpectedExecutors) {
-      val excess = knownPodCount - currentTotalExpectedExecutors
-      val knownPendingToDelete = currentPendingExecutors
-        .filter(x => isExecutorIdleTimedOut(x._2, currentTime))
-        .map { case (id, _) => id }
-        .take(excess - newlyCreatedExecutors.size)
-      val toDelete = newlyCreatedExecutors
-        .filter(x => currentTime - x._2 > executorIdleTimeout)
-        .keys.take(excess).toList ++ knownPendingToDelete
-
-      if (toDelete.nonEmpty) {
-        logInfo(s"Deleting ${toDelete.size} excess pod requests 
(${toDelete.mkString(",")}).")
-        _deletedExecutorIds = _deletedExecutorIds ++ toDelete
+    // Map the pods into per ResourceProfile id so we can check per 
ResourceProfile,
+    // add a fast path if not using other ResourceProfiles.
+    val rpIdToExecsAndPodState =
+      mutable.HashMap[Int, mutable.LinkedHashMap[Long, ExecutorPodState]]()

Review comment:
       Do we need `LinkedHashMap` ? `lastSnapshot.executorPods` does not look 
ordered ?

##########
File path: 
resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
##########
@@ -85,6 +85,7 @@ case "$1" in
       --cores $SPARK_EXECUTOR_CORES
       --app-id $SPARK_APPLICATION_ID
       --hostname $SPARK_EXECUTOR_POD_IP
+      --resourceProfileId $SPARK_RESOURCE_PROFILE_ID

Review comment:
       This will default to 0 if unspecified right ?
   

##########
File path: 
resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
##########
@@ -85,6 +85,7 @@ case "$1" in
       --cores $SPARK_EXECUTOR_CORES
       --app-id $SPARK_APPLICATION_ID
       --hostname $SPARK_EXECUTOR_POD_IP
+      --resourceProfileId $SPARK_RESOURCE_PROFILE_ID

Review comment:
       This will default to 0 if unspecified right ? (in case of custom recipes 
for docker images)
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to