Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/21241#discussion_r186643210 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala --- @@ -320,50 +322,83 @@ private[spark] class KubernetesClusterSchedulerBackend( override def eventReceived(action: Action, pod: Pod): Unit = { val podName = pod.getMetadata.getName val podIP = pod.getStatus.getPodIP - + val podPhase = pod.getStatus.getPhase action match { - case Action.MODIFIED if (pod.getStatus.getPhase == "Running" + case Action.MODIFIED if (podPhase == "Running" && pod.getMetadata.getDeletionTimestamp == null) => val clusterNodeName = pod.getSpec.getNodeName logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.") executorPodsByIPs.put(podIP, pod) - case Action.DELETED | Action.ERROR => + case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == "Init:CrashLoopBackoff") + && pod.getMetadata.getDeletionTimestamp == null => val executorId = getExecutorId(pod) - logDebug(s"Executor pod $podName at IP $podIP was at $action.") - if (podIP != null) { - executorPodsByIPs.remove(podIP) + failedInitExecutors.add(executorId) + if (failedInitExecutors.size >= executorMaxInitErrors) { + val errorMessage = s"Aborting Spark application because $executorMaxInitErrors" + + s" executors failed to start. The maximum number of allowed startup failures is" + + s" $executorMaxInitErrors. Please contact your cluster administrator or increase" + + s" your setting of ${KUBERNETES_EXECUTOR_MAX_INIT_ERRORS.key}." + logError(errorMessage) + KubernetesClusterSchedulerBackend.this.scheduler.sc.stopInNewThread() + throw new SparkException(errorMessage) } + handleFailedPod(action, pod, podName, podIP) - val executorExitReason = if (action == Action.ERROR) { - logWarning(s"Received error event of executor pod $podName. Reason: " + - pod.getStatus.getReason) - executorExitReasonOnError(pod) - } else if (action == Action.DELETED) { - logWarning(s"Received delete event of executor pod $podName. Reason: " + - pod.getStatus.getReason) - executorExitReasonOnDelete(pod) - } else { - throw new IllegalStateException( - s"Unknown action that should only be DELETED or ERROR: $action") - } - podsWithKnownExitReasons.put(pod.getMetadata.getName, executorExitReason) - - if (!disconnectedPodsByExecutorIdPendingRemoval.containsKey(executorId)) { - log.warn(s"Executor with id $executorId was not marked as disconnected, but the " + - s"watch received an event of type $action for this executor. The executor may " + - "have failed to start in the first place and never registered with the driver.") - } - disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod) + case Action.DELETED | Action.ERROR => + handleFailedPod(action, pod, podName, podIP) case _ => logDebug(s"Received event of executor pod $podName: " + action) } } + private def handleFailedPod(action: Action, pod: Pod, podName: String, podIP: String) = { + val executorId = getExecutorId(pod) + logDebug(s"Executor pod $podName at IP $podIP was at $action.") + if (podIP != null) { + executorPodsByIPs.remove(podIP) + } + + val executorExitReason = if (action == Action.ERROR) { + logWarning(s"Received error event of executor pod $podName. Reason: " + + pod.getStatus.getReason) + executorExitReasonOnError(pod) + } else if (action == Action.DELETED) { + logWarning(s"Received delete event of executor pod $podName. Reason: " + + pod.getStatus.getReason) + executorExitReasonOnDelete(pod) + } else if (action == Action.MODIFIED) { + executorExitReasonOnInitError(pod) --- End diff -- I think `Action.MODIFIED` can be fired for a lot of other reasons. I was thinking that we should just use container exit status, and have that reflect either the main/init container status depending on which container failed.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org