pan3793 commented on code in PR #53840:
URL: https://github.com/apache/spark/pull/53840#discussion_r2797033787
##########
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala:
##########
@@ -459,32 +463,52 @@ class ExecutorPodsAllocator(
.build()
val resources = replacePVCsIfNeeded(
podWithAttachedContainer,
resolvedExecutorSpec.executorKubernetesResources, reusablePVCs)
- val createdExecutorPod =
-
kubernetesClient.pods().inNamespace(namespace).resource(podWithAttachedContainer).create()
- try {
- addOwnerReference(createdExecutorPod, resources)
- resources
- .filter(_.getKind == "PersistentVolumeClaim")
- .foreach { resource =>
- if (conf.get(KUBERNETES_DRIVER_OWN_PVC) && driverPod.nonEmpty) {
- addOwnerReference(driverPod.get, Seq(resource))
- }
- val pvc = resource.asInstanceOf[PersistentVolumeClaim]
- logInfo(log"Trying to create PersistentVolumeClaim " +
- log"${MDC(LogKeys.PVC_METADATA_NAME, pvc.getMetadata.getName)}
with " +
- log"StorageClass ${MDC(LogKeys.CLASS_NAME,
pvc.getSpec.getStorageClassName)}")
-
kubernetesClient.persistentVolumeClaims().inNamespace(namespace).resource(pvc).create()
- PVC_COUNTER.incrementAndGet()
- }
- newlyCreatedExecutors(newExecutorId) = (resourceProfileId,
clock.getTimeMillis())
- logDebug(s"Requested executor with id $newExecutorId from Kubernetes.")
+ val optCreatedExecutorPod = try {
+ Some(kubernetesClient
+ .pods()
+ .inNamespace(namespace)
+ .resource(podWithAttachedContainer)
+ .create())
} catch {
case NonFatal(e) =>
- kubernetesClient.pods()
- .inNamespace(namespace)
- .resource(createdExecutorPod)
- .delete()
- throw e
+ // Register failure with global tracker
+ val failureCount = totalFailedPodCreations.incrementAndGet()
+ lifecycleManager.registerPodCreationFailure()
+ logError(log"Failed to create executor pod
${MDC(LogKeys.EXECUTOR_ID, newExecutorId)}. " +
+ log"Total failures: ${MDC(LogKeys.TOTAL, failureCount)}", e)
+ None
+ }
+ if (optCreatedExecutorPod.nonEmpty) {
Review Comment:
for cases without an `else` branch, you can write it in
```scala
optCreatedExecutorPod.foreach { createdExecutorPod =>
...
}
```
##########
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala:
##########
@@ -1027,4 +1038,21 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite
with BeforeAndAfter {
KubernetesExecutorSpec(executorPodWithId(k8sConf.executorId.toInt,
k8sConf.resourceProfileId.toInt), Seq.empty)
}
+
+ test("Pod creation failures are tracked immediately without retries") {
+ // Make all pod creation attempts fail
+ when(podResource.create()).thenThrow(new
KubernetesClientException("Simulated failure"))
Review Comment:
small nit: make the error message more accurate
```suggestion
when(podResource.create()).thenThrow(new
KubernetesClientException("Simulated pod creation failure"))
```
##########
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala:
##########
@@ -1027,4 +1038,21 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite
with BeforeAndAfter {
KubernetesExecutorSpec(executorPodWithId(k8sConf.executorId.toInt,
k8sConf.resourceProfileId.toInt), Seq.empty)
}
+
+ test("Pod creation failures are tracked immediately without retries") {
Review Comment:
as we discussed before, the retry mechanism does not help here, we don't
need to mention that, the case name might be
```suggestion
test("Pod creation failures are tracked by ExecutorFailureTracker") {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]