Github user foxish commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21241#discussion_r186643210
  
    --- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
    @@ -320,50 +322,83 @@ private[spark] class 
KubernetesClusterSchedulerBackend(
         override def eventReceived(action: Action, pod: Pod): Unit = {
           val podName = pod.getMetadata.getName
           val podIP = pod.getStatus.getPodIP
    -
    +      val podPhase = pod.getStatus.getPhase
           action match {
    -        case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
    +        case Action.MODIFIED if (podPhase == "Running"
                 && pod.getMetadata.getDeletionTimestamp == null) =>
               val clusterNodeName = pod.getSpec.getNodeName
               logInfo(s"Executor pod $podName ready, launched at 
$clusterNodeName as IP $podIP.")
               executorPodsByIPs.put(podIP, pod)
     
    -        case Action.DELETED | Action.ERROR =>
    +        case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == 
"Init:CrashLoopBackoff")
    +          && pod.getMetadata.getDeletionTimestamp == null =>
               val executorId = getExecutorId(pod)
    -          logDebug(s"Executor pod $podName at IP $podIP was at $action.")
    -          if (podIP != null) {
    -            executorPodsByIPs.remove(podIP)
    +          failedInitExecutors.add(executorId)
    +          if (failedInitExecutors.size >= executorMaxInitErrors) {
    +            val errorMessage = s"Aborting Spark application because 
$executorMaxInitErrors" +
    +              s" executors failed to start. The maximum number of allowed 
startup failures is" +
    +              s" $executorMaxInitErrors. Please contact your cluster 
administrator or increase" +
    +              s" your setting of 
${KUBERNETES_EXECUTOR_MAX_INIT_ERRORS.key}."
    +            logError(errorMessage)
    +            
KubernetesClusterSchedulerBackend.this.scheduler.sc.stopInNewThread()
    +            throw new SparkException(errorMessage)
               }
    +          handleFailedPod(action, pod, podName, podIP)
     
    -          val executorExitReason = if (action == Action.ERROR) {
    -            logWarning(s"Received error event of executor pod $podName. 
Reason: " +
    -              pod.getStatus.getReason)
    -            executorExitReasonOnError(pod)
    -          } else if (action == Action.DELETED) {
    -            logWarning(s"Received delete event of executor pod $podName. 
Reason: " +
    -              pod.getStatus.getReason)
    -            executorExitReasonOnDelete(pod)
    -          } else {
    -            throw new IllegalStateException(
    -              s"Unknown action that should only be DELETED or ERROR: 
$action")
    -          }
    -          podsWithKnownExitReasons.put(pod.getMetadata.getName, 
executorExitReason)
    -
    -          if 
(!disconnectedPodsByExecutorIdPendingRemoval.containsKey(executorId)) {
    -            log.warn(s"Executor with id $executorId was not marked as 
disconnected, but the " +
    -              s"watch received an event of type $action for this executor. 
The executor may " +
    -              "have failed to start in the first place and never 
registered with the driver.")
    -          }
    -          disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod)
    +        case Action.DELETED | Action.ERROR =>
    +          handleFailedPod(action, pod, podName, podIP)
     
             case _ => logDebug(s"Received event of executor pod $podName: " + 
action)
           }
         }
     
    +    private def handleFailedPod(action: Action, pod: Pod, podName: String, 
podIP: String) = {
    +      val executorId = getExecutorId(pod)
    +      logDebug(s"Executor pod $podName at IP $podIP was at $action.")
    +      if (podIP != null) {
    +        executorPodsByIPs.remove(podIP)
    +      }
    +
    +      val executorExitReason = if (action == Action.ERROR) {
    +        logWarning(s"Received error event of executor pod $podName. 
Reason: " +
    +          pod.getStatus.getReason)
    +        executorExitReasonOnError(pod)
    +      } else if (action == Action.DELETED) {
    +        logWarning(s"Received delete event of executor pod $podName. 
Reason: " +
    +          pod.getStatus.getReason)
    +        executorExitReasonOnDelete(pod)
    +      } else if (action == Action.MODIFIED) {
    +        executorExitReasonOnInitError(pod)
    --- End diff --
    
    I think `Action.MODIFIED` can be fired for a lot of other reasons. I was 
thinking that we should just use container exit status, and have that reflect 
either the main/init container status depending on which container failed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to