DenineLu commented on code in PR #54688:
URL: https://github.com/apache/spark/pull/54688#discussion_r2922994969


##########
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala:
##########
@@ -446,77 +566,94 @@ class ExecutorPodsAllocator(
           log"Wait to reuse one of the existing ${MDC(LogKeys.COUNT, 
PVC_COUNTER.get())} PVCs.")
         return
       }
-      val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet()
-      if (newExecutorId >= podAllocationMaximum) {
-        throw new SparkException(s"Exceed the pod creation limit: 
$podAllocationMaximum")
+      val (newExecutorId, podWithAttachedContainer, resources) =
+        buildExecutorPod(applicationId, resourceProfileId, reusablePVCs)
+      createExecutorPodAndPVCs(newExecutorId, podWithAttachedContainer, 
resources).foreach { _ =>
+        newlyCreatedExecutors(newExecutorId) = (resourceProfileId, 
clock.getTimeMillis())
       }
-      val executorConf = KubernetesConf.createExecutorConf(
-        conf,
-        newExecutorId.toString,
-        applicationId,
-        driverPod,
-        resourceProfileId)
-      val resolvedExecutorSpec = 
executorBuilder.buildFromFeatures(executorConf, secMgr,
-        kubernetesClient, rpIdToResourceProfile(resourceProfileId))
-      val executorPod = resolvedExecutorSpec.pod
-      val podWithAttachedContainer = new PodBuilder(executorPod.pod)
-        .editOrNewSpec()
-        .addToContainers(executorPod.container)
-        .endSpec()
-        .build()
-      val resources = replacePVCsIfNeeded(
-        podWithAttachedContainer, 
resolvedExecutorSpec.executorKubernetesResources, reusablePVCs)
-      val optCreatedExecutorPod = try {
-        Some(kubernetesClient
-          .pods()
-          .inNamespace(namespace)
-          .resource(podWithAttachedContainer)
-          .create())
-      } catch {
-        case NonFatal(e) =>
-          // Register failure with global tracker if lifecycle manager is 
available
-          val failureCount = registerPodCreationFailure()
-          logError(log"Failed to create executor pod 
${MDC(LogKeys.EXECUTOR_ID, newExecutorId)}. " +
-            log"Total failures: ${MDC(LogKeys.TOTAL, failureCount)}", e)
-          None
+    }
+  }
+
+  /**
+   * Parallel pod creation: builds pod specs serially (they depend on shared 
mutable state like
+   * EXECUTOR_ID_COUNTER and reusablePVCs), then creates them via K8s API in 
parallel using a
+   * thread pool sized by `spark.kubernetes.allocation.batch.concurrency`.
+   */
+  private def requestNewExecutorsParallel(
+      numExecutorsToAllocate: Int,
+      applicationId: String,
+      resourceProfileId: Int,
+      pvcsInUse: Seq[String]): Unit = {
+    val batchStartTime = clock.getTimeMillis()
+
+    // Check reusable PVCs for this executor allocation batch
+    val reusablePVCs = getReusablePVCs(applicationId, pvcsInUse)
+
+    // Phase 1: Build all pod specs serially
+    val podSpecs = mutable.ArrayBuffer.empty[(Long, Pod, Seq[HasMetadata])]
+    var i = 0
+    while (i < numExecutorsToAllocate) {
+      if (reusablePVCs.isEmpty && podAllocOnPVC && maxPVCs <= 
PVC_COUNTER.get()) {
+        logInfo(
+          log"Wait to reuse one of the existing ${MDC(LogKeys.COUNT, 
PVC_COUNTER.get())} PVCs.")
+        // Stop building more specs; proceed to create whatever we've built so 
far
+        i = numExecutorsToAllocate
+      } else {
+        podSpecs += buildExecutorPod(applicationId, resourceProfileId, 
reusablePVCs)
+        i += 1
       }
-      optCreatedExecutorPod.foreach { createdExecutorPod =>
+    }
+    if (podSpecs.isEmpty) return
+
+    val podBuildTotalMs = clock.getTimeMillis() - batchStartTime
+
+    // Phase 2: Create pods via K8s API in parallel
+    val k8sApiStartTime = clock.getTimeMillis()
+
+    implicit val ec: ExecutionContext = podCreationExecutionContext
+
+    val futures = podSpecs.map { case (newExecutorId, 
podWithAttachedContainer, resources) =>
+      Future {
         try {
-          addOwnerReference(createdExecutorPod, resources)
-          resources
-            .filter(_.getKind == "PersistentVolumeClaim")
-            .foreach { resource =>
-              if (conf.get(KUBERNETES_DRIVER_OWN_PVC) && driverPod.nonEmpty) {
-                addOwnerReference(driverPod.get, Seq(resource))
-              }
-              val pvc = resource.asInstanceOf[PersistentVolumeClaim]
-              logInfo(log"Trying to create PersistentVolumeClaim " +
-                log"${MDC(LogKeys.PVC_METADATA_NAME, pvc.getMetadata.getName)} 
with " +
-                log"StorageClass ${MDC(LogKeys.CLASS_NAME, 
pvc.getSpec.getStorageClassName)}")
-              kubernetesClient
-                .persistentVolumeClaims()
-                .inNamespace(namespace)
-                .resource(pvc)
-                .create()
-              PVC_COUNTER.incrementAndGet()
-            }
-          newlyCreatedExecutors(newExecutorId) = (resourceProfileId, 
clock.getTimeMillis())
-          logDebug(s"Requested executor with id $newExecutorId from 
Kubernetes.")
+          val result = createExecutorPodAndPVCs(newExecutorId, 
podWithAttachedContainer, resources)
+          result.map(execId => (execId, resourceProfileId, 
clock.getTimeMillis()))
         } catch {
-          case NonFatal(e) =>
-            // Register failure with global tracker if lifecycle manager is 
available
-            val failureCount = registerPodCreationFailure()
-            logError(log"Failed to add owner reference or create PVC for 
executor pod " +
-              log"${MDC(LogKeys.EXECUTOR_ID, newExecutorId)}. " +
-              log"Total failures: ${MDC(LogKeys.TOTAL, failureCount)}", e)
-            kubernetesClient.pods()
-              .inNamespace(namespace)
-              .resource(createdExecutorPod)
-              .delete()
-            throw e
+          // createExecutorPodAndPVCs already logs the error and registers the 
failure
+          // before rethrowing. We catch here only to prevent a failed Future 
from
+          // short-circuiting Future.sequence, so that other concurrent pods 
can proceed.
+          case NonFatal(_) => None
         }
       }
     }
+
+    val results = try {
+      val allResults = ThreadUtils.awaitResult(
+        Future.sequence(futures), Duration(podCreationTimeout, 
TimeUnit.MILLISECONDS))
+      allResults.flatten
+    } catch {
+      case NonFatal(e) =>
+        logError(log"Timed out waiting for parallel pod creation to complete " 
+
+          log"after ${MDC(LogKeys.TIMEOUT, podCreationTimeout)} ms", e)
+        Seq.empty

Review Comment:
   > If one creation action times out, none of the Pods in this batch will be 
considered successfully created. this should **not** be batch-atomic, right?
   
   Yes! This should not be batch-atomic. I've updated the code to await each 
future individually instead of using Future.sequence.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to