Github user foxish commented on a diff in the pull request:
https://github.com/apache/spark/pull/21241#discussion_r186861330
--- Diff:
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
---
@@ -320,50 +322,83 @@ private[spark] class
KubernetesClusterSchedulerBackend(
override def eventReceived(action: Action, pod: Pod): Unit = {
val podName = pod.getMetadata.getName
val podIP = pod.getStatus.getPodIP
-
+ val podPhase = pod.getStatus.getPhase
action match {
- case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
+ case Action.MODIFIED if (podPhase == "Running"
&& pod.getMetadata.getDeletionTimestamp == null) =>
val clusterNodeName = pod.getSpec.getNodeName
logInfo(s"Executor pod $podName ready, launched at
$clusterNodeName as IP $podIP.")
executorPodsByIPs.put(podIP, pod)
- case Action.DELETED | Action.ERROR =>
+ case Action.MODIFIED if (podPhase == "Init:Error" || podPhase ==
"Init:CrashLoopBackoff")
+ && pod.getMetadata.getDeletionTimestamp == null =>
val executorId = getExecutorId(pod)
- logDebug(s"Executor pod $podName at IP $podIP was at $action.")
- if (podIP != null) {
- executorPodsByIPs.remove(podIP)
+ failedInitExecutors.add(executorId)
+ if (failedInitExecutors.size >= executorMaxInitErrors) {
+ val errorMessage = s"Aborting Spark application because
$executorMaxInitErrors" +
+ s" executors failed to start. The maximum number of allowed
startup failures is" +
+ s" $executorMaxInitErrors. Please contact your cluster
administrator or increase" +
+ s" your setting of
${KUBERNETES_EXECUTOR_MAX_INIT_ERRORS.key}."
+ logError(errorMessage)
+
KubernetesClusterSchedulerBackend.this.scheduler.sc.stopInNewThread()
+ throw new SparkException(errorMessage)
}
+ handleFailedPod(action, pod, podName, podIP)
- val executorExitReason = if (action == Action.ERROR) {
- logWarning(s"Received error event of executor pod $podName.
Reason: " +
- pod.getStatus.getReason)
- executorExitReasonOnError(pod)
- } else if (action == Action.DELETED) {
- logWarning(s"Received delete event of executor pod $podName.
Reason: " +
- pod.getStatus.getReason)
- executorExitReasonOnDelete(pod)
- } else {
- throw new IllegalStateException(
- s"Unknown action that should only be DELETED or ERROR:
$action")
- }
- podsWithKnownExitReasons.put(pod.getMetadata.getName,
executorExitReason)
-
- if
(!disconnectedPodsByExecutorIdPendingRemoval.containsKey(executorId)) {
- log.warn(s"Executor with id $executorId was not marked as
disconnected, but the " +
- s"watch received an event of type $action for this executor.
The executor may " +
- "have failed to start in the first place and never
registered with the driver.")
- }
- disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod)
+ case Action.DELETED | Action.ERROR =>
+ handleFailedPod(action, pod, podName, podIP)
case _ => logDebug(s"Received event of executor pod $podName: " +
action)
}
}
+ private def handleFailedPod(action: Action, pod: Pod, podName: String,
podIP: String) = {
+ val executorId = getExecutorId(pod)
+ logDebug(s"Executor pod $podName at IP $podIP was at $action.")
+ if (podIP != null) {
+ executorPodsByIPs.remove(podIP)
+ }
+
+ val executorExitReason = if (action == Action.ERROR) {
+ logWarning(s"Received error event of executor pod $podName.
Reason: " +
+ pod.getStatus.getReason)
+ executorExitReasonOnError(pod)
+ } else if (action == Action.DELETED) {
+ logWarning(s"Received delete event of executor pod $podName.
Reason: " +
+ pod.getStatus.getReason)
+ executorExitReasonOnDelete(pod)
+ } else if (action == Action.MODIFIED) {
+ executorExitReasonOnInitError(pod)
--- End diff --
There isn't a doc, but I'm putting together an initial list. We can grow it
as we discover more during operations. Would be good to 0/1 nodes are
available: 1 node(s) had disk pressure.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]