DenineLu commented on code in PR #54688:
URL: https://github.com/apache/spark/pull/54688#discussion_r2922994969
##########
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala:
##########
@@ -446,77 +566,94 @@ class ExecutorPodsAllocator(
log"Wait to reuse one of the existing ${MDC(LogKeys.COUNT,
PVC_COUNTER.get())} PVCs.")
return
}
- val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet()
- if (newExecutorId >= podAllocationMaximum) {
- throw new SparkException(s"Exceed the pod creation limit:
$podAllocationMaximum")
+ val (newExecutorId, podWithAttachedContainer, resources) =
+ buildExecutorPod(applicationId, resourceProfileId, reusablePVCs)
+ createExecutorPodAndPVCs(newExecutorId, podWithAttachedContainer,
resources).foreach { _ =>
+ newlyCreatedExecutors(newExecutorId) = (resourceProfileId,
clock.getTimeMillis())
}
- val executorConf = KubernetesConf.createExecutorConf(
- conf,
- newExecutorId.toString,
- applicationId,
- driverPod,
- resourceProfileId)
- val resolvedExecutorSpec =
executorBuilder.buildFromFeatures(executorConf, secMgr,
- kubernetesClient, rpIdToResourceProfile(resourceProfileId))
- val executorPod = resolvedExecutorSpec.pod
- val podWithAttachedContainer = new PodBuilder(executorPod.pod)
- .editOrNewSpec()
- .addToContainers(executorPod.container)
- .endSpec()
- .build()
- val resources = replacePVCsIfNeeded(
- podWithAttachedContainer,
resolvedExecutorSpec.executorKubernetesResources, reusablePVCs)
- val optCreatedExecutorPod = try {
- Some(kubernetesClient
- .pods()
- .inNamespace(namespace)
- .resource(podWithAttachedContainer)
- .create())
- } catch {
- case NonFatal(e) =>
- // Register failure with global tracker if lifecycle manager is
available
- val failureCount = registerPodCreationFailure()
- logError(log"Failed to create executor pod
${MDC(LogKeys.EXECUTOR_ID, newExecutorId)}. " +
- log"Total failures: ${MDC(LogKeys.TOTAL, failureCount)}", e)
- None
+ }
+ }
+
+ /**
+ * Parallel pod creation: builds pod specs serially (they depend on shared
mutable state like
+ * EXECUTOR_ID_COUNTER and reusablePVCs), then creates them via K8s API in
parallel using a
+ * thread pool sized by `spark.kubernetes.allocation.batch.concurrency`.
+ */
+ private def requestNewExecutorsParallel(
+ numExecutorsToAllocate: Int,
+ applicationId: String,
+ resourceProfileId: Int,
+ pvcsInUse: Seq[String]): Unit = {
+ val batchStartTime = clock.getTimeMillis()
+
+ // Check reusable PVCs for this executor allocation batch
+ val reusablePVCs = getReusablePVCs(applicationId, pvcsInUse)
+
+ // Phase 1: Build all pod specs serially
+ val podSpecs = mutable.ArrayBuffer.empty[(Long, Pod, Seq[HasMetadata])]
+ var i = 0
+ while (i < numExecutorsToAllocate) {
+ if (reusablePVCs.isEmpty && podAllocOnPVC && maxPVCs <=
PVC_COUNTER.get()) {
+ logInfo(
+ log"Wait to reuse one of the existing ${MDC(LogKeys.COUNT,
PVC_COUNTER.get())} PVCs.")
+ // Stop building more specs; proceed to create whatever we've built so
far
+ i = numExecutorsToAllocate
+ } else {
+ podSpecs += buildExecutorPod(applicationId, resourceProfileId,
reusablePVCs)
+ i += 1
}
- optCreatedExecutorPod.foreach { createdExecutorPod =>
+ }
+ if (podSpecs.isEmpty) return
+
+ val podBuildTotalMs = clock.getTimeMillis() - batchStartTime
+
+ // Phase 2: Create pods via K8s API in parallel
+ val k8sApiStartTime = clock.getTimeMillis()
+
+ implicit val ec: ExecutionContext = podCreationExecutionContext
+
+ val futures = podSpecs.map { case (newExecutorId,
podWithAttachedContainer, resources) =>
+ Future {
try {
- addOwnerReference(createdExecutorPod, resources)
- resources
- .filter(_.getKind == "PersistentVolumeClaim")
- .foreach { resource =>
- if (conf.get(KUBERNETES_DRIVER_OWN_PVC) && driverPod.nonEmpty) {
- addOwnerReference(driverPod.get, Seq(resource))
- }
- val pvc = resource.asInstanceOf[PersistentVolumeClaim]
- logInfo(log"Trying to create PersistentVolumeClaim " +
- log"${MDC(LogKeys.PVC_METADATA_NAME, pvc.getMetadata.getName)}
with " +
- log"StorageClass ${MDC(LogKeys.CLASS_NAME,
pvc.getSpec.getStorageClassName)}")
- kubernetesClient
- .persistentVolumeClaims()
- .inNamespace(namespace)
- .resource(pvc)
- .create()
- PVC_COUNTER.incrementAndGet()
- }
- newlyCreatedExecutors(newExecutorId) = (resourceProfileId,
clock.getTimeMillis())
- logDebug(s"Requested executor with id $newExecutorId from
Kubernetes.")
+ val result = createExecutorPodAndPVCs(newExecutorId,
podWithAttachedContainer, resources)
+ result.map(execId => (execId, resourceProfileId,
clock.getTimeMillis()))
} catch {
- case NonFatal(e) =>
- // Register failure with global tracker if lifecycle manager is
available
- val failureCount = registerPodCreationFailure()
- logError(log"Failed to add owner reference or create PVC for
executor pod " +
- log"${MDC(LogKeys.EXECUTOR_ID, newExecutorId)}. " +
- log"Total failures: ${MDC(LogKeys.TOTAL, failureCount)}", e)
- kubernetesClient.pods()
- .inNamespace(namespace)
- .resource(createdExecutorPod)
- .delete()
- throw e
+ // createExecutorPodAndPVCs already logs the error and registers the
failure
+ // before rethrowing. We catch here only to prevent a failed Future
from
+ // short-circuiting Future.sequence, so that other concurrent pods
can proceed.
+ case NonFatal(_) => None
}
}
}
+
+ val results = try {
+ val allResults = ThreadUtils.awaitResult(
+ Future.sequence(futures), Duration(podCreationTimeout,
TimeUnit.MILLISECONDS))
+ allResults.flatten
+ } catch {
+ case NonFatal(e) =>
+ logError(log"Timed out waiting for parallel pod creation to complete "
+
+ log"after ${MDC(LogKeys.TIMEOUT, podCreationTimeout)} ms", e)
+ Seq.empty
Review Comment:
> If one creation action times out, none of the Pods in this batch will be
considered successfully created. this should **not** be batch-atomic, right?
Yes! This should not be batch-atomic. I've updated the code to await each
future individually instead of using Future.sequence.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]