[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

2018-05-08 Thread foxish
Github user foxish commented on a diff in the pull request:

https://github.com/apache/spark/pull/21241#discussion_r186879387
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -320,50 +322,83 @@ private[spark] class 
KubernetesClusterSchedulerBackend(
 override def eventReceived(action: Action, pod: Pod): Unit = {
   val podName = pod.getMetadata.getName
   val podIP = pod.getStatus.getPodIP
-
+  val podPhase = pod.getStatus.getPhase
   action match {
-case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
+case Action.MODIFIED if (podPhase == "Running"
 && pod.getMetadata.getDeletionTimestamp == null) =>
   val clusterNodeName = pod.getSpec.getNodeName
   logInfo(s"Executor pod $podName ready, launched at 
$clusterNodeName as IP $podIP.")
   executorPodsByIPs.put(podIP, pod)
 
-case Action.DELETED | Action.ERROR =>
+case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == 
"Init:CrashLoopBackoff")
+  && pod.getMetadata.getDeletionTimestamp == null =>
   val executorId = getExecutorId(pod)
-  logDebug(s"Executor pod $podName at IP $podIP was at $action.")
-  if (podIP != null) {
-executorPodsByIPs.remove(podIP)
+  failedInitExecutors.add(executorId)
+  if (failedInitExecutors.size >= executorMaxInitErrors) {
+val errorMessage = s"Aborting Spark application because 
$executorMaxInitErrors" +
+  s" executors failed to start. The maximum number of allowed 
startup failures is" +
+  s" $executorMaxInitErrors. Please contact your cluster 
administrator or increase" +
+  s" your setting of 
${KUBERNETES_EXECUTOR_MAX_INIT_ERRORS.key}."
+logError(errorMessage)
+
KubernetesClusterSchedulerBackend.this.scheduler.sc.stopInNewThread()
--- End diff --

The controllers have rate-limiting queues with exponential backoff. In the 
past, we've had issues (https://github.com/kubernetes/kubernetes/issues/30628, 
https://github.com/kubernetes/kubernetes/issues/27634 and many more) where a 
misconfigured queue has caused controllers to spew pods and retry. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

2018-05-08 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21241#discussion_r186876513
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -320,50 +322,83 @@ private[spark] class 
KubernetesClusterSchedulerBackend(
 override def eventReceived(action: Action, pod: Pod): Unit = {
   val podName = pod.getMetadata.getName
   val podIP = pod.getStatus.getPodIP
-
+  val podPhase = pod.getStatus.getPhase
   action match {
-case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
+case Action.MODIFIED if (podPhase == "Running"
 && pod.getMetadata.getDeletionTimestamp == null) =>
   val clusterNodeName = pod.getSpec.getNodeName
   logInfo(s"Executor pod $podName ready, launched at 
$clusterNodeName as IP $podIP.")
   executorPodsByIPs.put(podIP, pod)
 
-case Action.DELETED | Action.ERROR =>
+case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == 
"Init:CrashLoopBackoff")
+  && pod.getMetadata.getDeletionTimestamp == null =>
   val executorId = getExecutorId(pod)
-  logDebug(s"Executor pod $podName at IP $podIP was at $action.")
-  if (podIP != null) {
-executorPodsByIPs.remove(podIP)
+  failedInitExecutors.add(executorId)
+  if (failedInitExecutors.size >= executorMaxInitErrors) {
+val errorMessage = s"Aborting Spark application because 
$executorMaxInitErrors" +
+  s" executors failed to start. The maximum number of allowed 
startup failures is" +
+  s" $executorMaxInitErrors. Please contact your cluster 
administrator or increase" +
+  s" your setting of 
${KUBERNETES_EXECUTOR_MAX_INIT_ERRORS.key}."
+logError(errorMessage)
+
KubernetesClusterSchedulerBackend.this.scheduler.sc.stopInNewThread()
--- End diff --

@foxish out of curiosity how does replication controller and replica set, 
and similar controllers, handle the same problem? Presumably there would also 
be heavy etcd load in the case pods always fail to start up and the controller 
keeps trying to do so.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

2018-05-08 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21241#discussion_r186872150
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -320,50 +322,83 @@ private[spark] class 
KubernetesClusterSchedulerBackend(
 override def eventReceived(action: Action, pod: Pod): Unit = {
   val podName = pod.getMetadata.getName
   val podIP = pod.getStatus.getPodIP
-
+  val podPhase = pod.getStatus.getPhase
   action match {
-case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
+case Action.MODIFIED if (podPhase == "Running"
 && pod.getMetadata.getDeletionTimestamp == null) =>
   val clusterNodeName = pod.getSpec.getNodeName
   logInfo(s"Executor pod $podName ready, launched at 
$clusterNodeName as IP $podIP.")
   executorPodsByIPs.put(podIP, pod)
 
-case Action.DELETED | Action.ERROR =>
+case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == 
"Init:CrashLoopBackoff")
--- End diff --

Yes, we need similar logic here. The status values with the form 
`Init:<...>` are printed forms and won't be returned by the client library.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

2018-05-08 Thread foxish
Github user foxish commented on a diff in the pull request:

https://github.com/apache/spark/pull/21241#discussion_r186869138
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -320,50 +322,83 @@ private[spark] class 
KubernetesClusterSchedulerBackend(
 override def eventReceived(action: Action, pod: Pod): Unit = {
   val podName = pod.getMetadata.getName
   val podIP = pod.getStatus.getPodIP
-
+  val podPhase = pod.getStatus.getPhase
   action match {
-case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
+case Action.MODIFIED if (podPhase == "Running"
 && pod.getMetadata.getDeletionTimestamp == null) =>
   val clusterNodeName = pod.getSpec.getNodeName
   logInfo(s"Executor pod $podName ready, launched at 
$clusterNodeName as IP $podIP.")
   executorPodsByIPs.put(podIP, pod)
 
-case Action.DELETED | Action.ERROR =>
+case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == 
"Init:CrashLoopBackoff")
--- End diff --

The termination reason is also a good source of info. kubectl looks at a 
set of these and turns it into what you see in the describe output - so, 
[similar 
logic](https://github.com/kubernetes/kubernetes/blob/6b94e872c63eeea2ed4fdc510c008e4ff9713953/pkg/printers/internalversion/printers.go#L547-L573)
 could be exercised. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

2018-05-08 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21241#discussion_r186866387
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -320,50 +322,83 @@ private[spark] class 
KubernetesClusterSchedulerBackend(
 override def eventReceived(action: Action, pod: Pod): Unit = {
   val podName = pod.getMetadata.getName
   val podIP = pod.getStatus.getPodIP
-
+  val podPhase = pod.getStatus.getPhase
   action match {
-case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
+case Action.MODIFIED if (podPhase == "Running"
 && pod.getMetadata.getDeletionTimestamp == null) =>
   val clusterNodeName = pod.getSpec.getNodeName
   logInfo(s"Executor pod $podName ready, launched at 
$clusterNodeName as IP $podIP.")
   executorPodsByIPs.put(podIP, pod)
 
-case Action.DELETED | Action.ERROR =>
+case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == 
"Init:CrashLoopBackoff")
--- End diff --

Yes, you will see if with `kubectl describe`. But I doubt you will see it 
with the client library. This whole PR makes little sense if those values are 
never seen. I think you need to check both the pod phase (if it is `Failed`) 
and the `Initialized` condition is `true` to determine if the pod failed on any 
init-container.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

2018-05-08 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21241#discussion_r186864312
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -320,50 +322,83 @@ private[spark] class 
KubernetesClusterSchedulerBackend(
 override def eventReceived(action: Action, pod: Pod): Unit = {
   val podName = pod.getMetadata.getName
   val podIP = pod.getStatus.getPodIP
-
+  val podPhase = pod.getStatus.getPhase
   action match {
-case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
+case Action.MODIFIED if (podPhase == "Running"
 && pod.getMetadata.getDeletionTimestamp == null) =>
   val clusterNodeName = pod.getSpec.getNodeName
   logInfo(s"Executor pod $podName ready, launched at 
$clusterNodeName as IP $podIP.")
   executorPodsByIPs.put(podIP, pod)
 
-case Action.DELETED | Action.ERROR =>
+case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == 
"Init:CrashLoopBackoff")
--- End diff --

Seen it using `kubectl`, haven't tried hitting the API directly. When we 
start seeing these kinds of errors come up again, we can check our pod YAML.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

2018-05-08 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21241#discussion_r186862624
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -320,50 +322,83 @@ private[spark] class 
KubernetesClusterSchedulerBackend(
 override def eventReceived(action: Action, pod: Pod): Unit = {
   val podName = pod.getMetadata.getName
   val podIP = pod.getStatus.getPodIP
-
+  val podPhase = pod.getStatus.getPhase
   action match {
-case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
+case Action.MODIFIED if (podPhase == "Running"
 && pod.getMetadata.getDeletionTimestamp == null) =>
   val clusterNodeName = pod.getSpec.getNodeName
   logInfo(s"Executor pod $podName ready, launched at 
$clusterNodeName as IP $podIP.")
   executorPodsByIPs.put(podIP, pod)
 
-case Action.DELETED | Action.ERROR =>
+case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == 
"Init:CrashLoopBackoff")
--- End diff --

I took a look at the client code, and it appears to me that `getPhase` 
simply returns the value of json property `phase` of `PodStatus`. Have you seen 
`Init:Error` as the return value in practice?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

2018-05-08 Thread foxish
Github user foxish commented on a diff in the pull request:

https://github.com/apache/spark/pull/21241#discussion_r186861330
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -320,50 +322,83 @@ private[spark] class 
KubernetesClusterSchedulerBackend(
 override def eventReceived(action: Action, pod: Pod): Unit = {
   val podName = pod.getMetadata.getName
   val podIP = pod.getStatus.getPodIP
-
+  val podPhase = pod.getStatus.getPhase
   action match {
-case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
+case Action.MODIFIED if (podPhase == "Running"
 && pod.getMetadata.getDeletionTimestamp == null) =>
   val clusterNodeName = pod.getSpec.getNodeName
   logInfo(s"Executor pod $podName ready, launched at 
$clusterNodeName as IP $podIP.")
   executorPodsByIPs.put(podIP, pod)
 
-case Action.DELETED | Action.ERROR =>
+case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == 
"Init:CrashLoopBackoff")
+  && pod.getMetadata.getDeletionTimestamp == null =>
   val executorId = getExecutorId(pod)
-  logDebug(s"Executor pod $podName at IP $podIP was at $action.")
-  if (podIP != null) {
-executorPodsByIPs.remove(podIP)
+  failedInitExecutors.add(executorId)
+  if (failedInitExecutors.size >= executorMaxInitErrors) {
+val errorMessage = s"Aborting Spark application because 
$executorMaxInitErrors" +
+  s" executors failed to start. The maximum number of allowed 
startup failures is" +
+  s" $executorMaxInitErrors. Please contact your cluster 
administrator or increase" +
+  s" your setting of 
${KUBERNETES_EXECUTOR_MAX_INIT_ERRORS.key}."
+logError(errorMessage)
+
KubernetesClusterSchedulerBackend.this.scheduler.sc.stopInNewThread()
+throw new SparkException(errorMessage)
   }
+  handleFailedPod(action, pod, podName, podIP)
 
-  val executorExitReason = if (action == Action.ERROR) {
-logWarning(s"Received error event of executor pod $podName. 
Reason: " +
-  pod.getStatus.getReason)
-executorExitReasonOnError(pod)
-  } else if (action == Action.DELETED) {
-logWarning(s"Received delete event of executor pod $podName. 
Reason: " +
-  pod.getStatus.getReason)
-executorExitReasonOnDelete(pod)
-  } else {
-throw new IllegalStateException(
-  s"Unknown action that should only be DELETED or ERROR: 
$action")
-  }
-  podsWithKnownExitReasons.put(pod.getMetadata.getName, 
executorExitReason)
-
-  if 
(!disconnectedPodsByExecutorIdPendingRemoval.containsKey(executorId)) {
-log.warn(s"Executor with id $executorId was not marked as 
disconnected, but the " +
-  s"watch received an event of type $action for this executor. 
The executor may " +
-  "have failed to start in the first place and never 
registered with the driver.")
-  }
-  disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod)
+case Action.DELETED | Action.ERROR =>
+  handleFailedPod(action, pod, podName, podIP)
 
 case _ => logDebug(s"Received event of executor pod $podName: " + 
action)
   }
 }
 
+private def handleFailedPod(action: Action, pod: Pod, podName: String, 
podIP: String) = {
+  val executorId = getExecutorId(pod)
+  logDebug(s"Executor pod $podName at IP $podIP was at $action.")
+  if (podIP != null) {
+executorPodsByIPs.remove(podIP)
+  }
+
+  val executorExitReason = if (action == Action.ERROR) {
+logWarning(s"Received error event of executor pod $podName. 
Reason: " +
+  pod.getStatus.getReason)
+executorExitReasonOnError(pod)
+  } else if (action == Action.DELETED) {
+logWarning(s"Received delete event of executor pod $podName. 
Reason: " +
+  pod.getStatus.getReason)
+executorExitReasonOnDelete(pod)
+  } else if (action == Action.MODIFIED) {
+executorExitReasonOnInitError(pod)
--- End diff --

There isn't a doc, but I'm putting together an initial list. We can grow it 
as we discover more during operations. Would be good to 0/1 nodes are 
available: 1 node(s) had disk pressure.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, 

[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

2018-05-08 Thread erikerlandson
Github user erikerlandson commented on a diff in the pull request:

https://github.com/apache/spark/pull/21241#discussion_r186859389
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -320,50 +322,83 @@ private[spark] class 
KubernetesClusterSchedulerBackend(
 override def eventReceived(action: Action, pod: Pod): Unit = {
   val podName = pod.getMetadata.getName
   val podIP = pod.getStatus.getPodIP
-
+  val podPhase = pod.getStatus.getPhase
   action match {
-case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
+case Action.MODIFIED if (podPhase == "Running"
 && pod.getMetadata.getDeletionTimestamp == null) =>
   val clusterNodeName = pod.getSpec.getNodeName
   logInfo(s"Executor pod $podName ready, launched at 
$clusterNodeName as IP $podIP.")
   executorPodsByIPs.put(podIP, pod)
 
-case Action.DELETED | Action.ERROR =>
+case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == 
"Init:CrashLoopBackoff")
+  && pod.getMetadata.getDeletionTimestamp == null =>
   val executorId = getExecutorId(pod)
-  logDebug(s"Executor pod $podName at IP $podIP was at $action.")
-  if (podIP != null) {
-executorPodsByIPs.remove(podIP)
+  failedInitExecutors.add(executorId)
+  if (failedInitExecutors.size >= executorMaxInitErrors) {
+val errorMessage = s"Aborting Spark application because 
$executorMaxInitErrors" +
+  s" executors failed to start. The maximum number of allowed 
startup failures is" +
+  s" $executorMaxInitErrors. Please contact your cluster 
administrator or increase" +
+  s" your setting of 
${KUBERNETES_EXECUTOR_MAX_INIT_ERRORS.key}."
+logError(errorMessage)
+
KubernetesClusterSchedulerBackend.this.scheduler.sc.stopInNewThread()
--- End diff --

Leaning on configured minimum executors could be construed as a way to let 
the user express application-dependent throughput minimums. If the use case is 
terabyte-scale, then setting minimum executors appropriately would address 
that.  A %-threshold is also reasonable, it just adds a new knob.

Requesting new executors indefinitely seems plausible, as long as failed 
pods don't accumulate. Having a submission hang indefinitely while kube churns 
may be confusing behavior, at least if users are not familiar with kube and 
hoping to treat it transparently. Maybe mesos backend policies could provide an 
analogy?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

2018-05-08 Thread foxish
Github user foxish commented on a diff in the pull request:

https://github.com/apache/spark/pull/21241#discussion_r186857969
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -320,50 +322,83 @@ private[spark] class 
KubernetesClusterSchedulerBackend(
 override def eventReceived(action: Action, pod: Pod): Unit = {
   val podName = pod.getMetadata.getName
   val podIP = pod.getStatus.getPodIP
-
+  val podPhase = pod.getStatus.getPhase
   action match {
-case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
+case Action.MODIFIED if (podPhase == "Running"
 && pod.getMetadata.getDeletionTimestamp == null) =>
   val clusterNodeName = pod.getSpec.getNodeName
   logInfo(s"Executor pod $podName ready, launched at 
$clusterNodeName as IP $podIP.")
   executorPodsByIPs.put(podIP, pod)
 
-case Action.DELETED | Action.ERROR =>
+case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == 
"Init:CrashLoopBackoff")
+  && pod.getMetadata.getDeletionTimestamp == null =>
   val executorId = getExecutorId(pod)
-  logDebug(s"Executor pod $podName at IP $podIP was at $action.")
-  if (podIP != null) {
-executorPodsByIPs.remove(podIP)
+  failedInitExecutors.add(executorId)
+  if (failedInitExecutors.size >= executorMaxInitErrors) {
+val errorMessage = s"Aborting Spark application because 
$executorMaxInitErrors" +
+  s" executors failed to start. The maximum number of allowed 
startup failures is" +
+  s" $executorMaxInitErrors. Please contact your cluster 
administrator or increase" +
+  s" your setting of 
${KUBERNETES_EXECUTOR_MAX_INIT_ERRORS.key}."
+logError(errorMessage)
+
KubernetesClusterSchedulerBackend.this.scheduler.sc.stopInNewThread()
--- End diff --

Always re-requesting executors has the potential to overwhelm etcd with 
failed pods especially for long running jobs. This seems overly conservative 
but a reasonable place to start - fail entire task upon N failures. With 
dynamic allocation, it should be possible to check that at least 
`spark.dynamicAllocation.minExecutors` are alive and make a decision 
accordingly.  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

2018-05-08 Thread erikerlandson
Github user erikerlandson commented on a diff in the pull request:

https://github.com/apache/spark/pull/21241#discussion_r186855467
  
--- Diff: docs/running-on-kubernetes.md ---
@@ -561,6 +561,13 @@ specific to Spark on Kubernetes.
 This is distinct from spark.executor.cores: it is only 
used and takes precedence over spark.executor.cores for specifying 
the executor pod cpu request if set. Task 
 parallelism, e.g., number of tasks an executor can run concurrently is 
not affected by this.
 
+
+  spark.kubernetes.executor.maxInitFailures
+  10
+  
+Maximum number of times executors are allowed to fail with an 
Init:Error state before failing the application. Note that Init:Error failures 
should not be caused by Spark itself because Spark does not attach 
init-containers to pods. Init-containers can be attached by the cluster itself. 
Users should check with their cluster administrator if these kinds of failures 
to start the executor pod occur frequently.
--- End diff --

As long as it's relatively easy to extend, generalizing on a case by case 
basis should be OK


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

2018-05-08 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21241#discussion_r186853331
  
--- Diff: docs/running-on-kubernetes.md ---
@@ -561,6 +561,13 @@ specific to Spark on Kubernetes.
 This is distinct from spark.executor.cores: it is only 
used and takes precedence over spark.executor.cores for specifying 
the executor pod cpu request if set. Task 
 parallelism, e.g., number of tasks an executor can run concurrently is 
not affected by this.
 
+
+  spark.kubernetes.executor.maxInitFailures
+  10
+  
+Maximum number of times executors are allowed to fail with an 
Init:Error state before failing the application. Note that Init:Error failures 
should not be caused by Spark itself because Spark does not attach 
init-containers to pods. Init-containers can be attached by the cluster itself. 
Users should check with their cluster administrator if these kinds of failures 
to start the executor pod occur frequently.
--- End diff --

We can also just start with a minimal set and just keep adding them as we 
find more root causes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

2018-05-08 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21241#discussion_r186853242
  
--- Diff: docs/running-on-kubernetes.md ---
@@ -561,6 +561,13 @@ specific to Spark on Kubernetes.
 This is distinct from spark.executor.cores: it is only 
used and takes precedence over spark.executor.cores for specifying 
the executor pod cpu request if set. Task 
 parallelism, e.g., number of tasks an executor can run concurrently is 
not affected by this.
 
+
+  spark.kubernetes.executor.maxInitFailures
+  10
+  
+Maximum number of times executors are allowed to fail with an 
Init:Error state before failing the application. Note that Init:Error failures 
should not be caused by Spark itself because Spark does not attach 
init-containers to pods. Init-containers can be attached by the cluster itself. 
Users should check with their cluster administrator if these kinds of failures 
to start the executor pod occur frequently.
--- End diff --

As per https://github.com/apache/spark/pull/21241#discussion_r186789848 I 
think it's important to define what that full set of error types is. Do we have 
a comprehensive list we can follow?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

2018-05-08 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21241#discussion_r186852387
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -320,50 +322,83 @@ private[spark] class 
KubernetesClusterSchedulerBackend(
 override def eventReceived(action: Action, pod: Pod): Unit = {
   val podName = pod.getMetadata.getName
   val podIP = pod.getStatus.getPodIP
-
+  val podPhase = pod.getStatus.getPhase
   action match {
-case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
+case Action.MODIFIED if (podPhase == "Running"
 && pod.getMetadata.getDeletionTimestamp == null) =>
   val clusterNodeName = pod.getSpec.getNodeName
   logInfo(s"Executor pod $podName ready, launched at 
$clusterNodeName as IP $podIP.")
   executorPodsByIPs.put(podIP, pod)
 
-case Action.DELETED | Action.ERROR =>
+case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == 
"Init:CrashLoopBackoff")
+  && pod.getMetadata.getDeletionTimestamp == null =>
   val executorId = getExecutorId(pod)
-  logDebug(s"Executor pod $podName at IP $podIP was at $action.")
-  if (podIP != null) {
-executorPodsByIPs.remove(podIP)
+  failedInitExecutors.add(executorId)
+  if (failedInitExecutors.size >= executorMaxInitErrors) {
+val errorMessage = s"Aborting Spark application because 
$executorMaxInitErrors" +
+  s" executors failed to start. The maximum number of allowed 
startup failures is" +
+  s" $executorMaxInitErrors. Please contact your cluster 
administrator or increase" +
+  s" your setting of 
${KUBERNETES_EXECUTOR_MAX_INIT_ERRORS.key}."
+logError(errorMessage)
+
KubernetesClusterSchedulerBackend.this.scheduler.sc.stopInNewThread()
--- End diff --

Even running at degraded can be concerning though. What if we only get 1 
executor for an application that's processing terabytes of data? I don't think 
there's an intuitive default here, though perhaps if we're at say 75% of 
requested capacity or some arbitrary number then we can continue. 
Alternatively, we can just retry requesting executors forever and never 
consider these executor startup failures to be real problems (which would 
remove this if statement, for example). I don't think it's entirely 
unreasonable to always retry requesting executors when they fail to start up.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

2018-05-08 Thread erikerlandson
Github user erikerlandson commented on a diff in the pull request:

https://github.com/apache/spark/pull/21241#discussion_r186846341
  
--- Diff: docs/running-on-kubernetes.md ---
@@ -561,6 +561,13 @@ specific to Spark on Kubernetes.
 This is distinct from spark.executor.cores: it is only 
used and takes precedence over spark.executor.cores for specifying 
the executor pod cpu request if set. Task 
 parallelism, e.g., number of tasks an executor can run concurrently is 
not affected by this.
 
+
+  spark.kubernetes.executor.maxInitFailures
+  10
+  
+Maximum number of times executors are allowed to fail with an 
Init:Error state before failing the application. Note that Init:Error failures 
should not be caused by Spark itself because Spark does not attach 
init-containers to pods. Init-containers can be attached by the cluster itself. 
Users should check with their cluster administrator if these kinds of failures 
to start the executor pod occur frequently.
--- End diff --

I'm in agreement w/ @foxish about designing for wider categories of error, 
which can be future-proofed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

2018-05-08 Thread erikerlandson
Github user erikerlandson commented on a diff in the pull request:

https://github.com/apache/spark/pull/21241#discussion_r186839692
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -320,50 +322,83 @@ private[spark] class 
KubernetesClusterSchedulerBackend(
 override def eventReceived(action: Action, pod: Pod): Unit = {
   val podName = pod.getMetadata.getName
   val podIP = pod.getStatus.getPodIP
-
+  val podPhase = pod.getStatus.getPhase
   action match {
-case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
+case Action.MODIFIED if (podPhase == "Running"
 && pod.getMetadata.getDeletionTimestamp == null) =>
   val clusterNodeName = pod.getSpec.getNodeName
   logInfo(s"Executor pod $podName ready, launched at 
$clusterNodeName as IP $podIP.")
   executorPodsByIPs.put(podIP, pod)
 
-case Action.DELETED | Action.ERROR =>
+case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == 
"Init:CrashLoopBackoff")
+  && pod.getMetadata.getDeletionTimestamp == null =>
   val executorId = getExecutorId(pod)
-  logDebug(s"Executor pod $podName at IP $podIP was at $action.")
-  if (podIP != null) {
-executorPodsByIPs.remove(podIP)
+  failedInitExecutors.add(executorId)
+  if (failedInitExecutors.size >= executorMaxInitErrors) {
+val errorMessage = s"Aborting Spark application because 
$executorMaxInitErrors" +
+  s" executors failed to start. The maximum number of allowed 
startup failures is" +
+  s" $executorMaxInitErrors. Please contact your cluster 
administrator or increase" +
+  s" your setting of 
${KUBERNETES_EXECUTOR_MAX_INIT_ERRORS.key}."
+logError(errorMessage)
+
KubernetesClusterSchedulerBackend.this.scheduler.sc.stopInNewThread()
--- End diff --

I'm unsure how elaborate we can/should get with this, but a basic test from 
the back-end side for context seems like "are there any registered executors", 
which isn't perfect but might be a decent heuristic for whether it's worth 
trying to continue or just fail the job entirely.  If there are no currently 
registered executors, and we hit a failure threshold, just failing out 
altogether seems like a sane response. If there are executors running 
(satisfying configured minimum?), then continuing might be reasonable since 
that is graceful degradation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

2018-05-08 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21241#discussion_r186825312
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -320,50 +322,83 @@ private[spark] class 
KubernetesClusterSchedulerBackend(
 override def eventReceived(action: Action, pod: Pod): Unit = {
   val podName = pod.getMetadata.getName
   val podIP = pod.getStatus.getPodIP
-
+  val podPhase = pod.getStatus.getPhase
   action match {
-case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
+case Action.MODIFIED if (podPhase == "Running"
 && pod.getMetadata.getDeletionTimestamp == null) =>
   val clusterNodeName = pod.getSpec.getNodeName
   logInfo(s"Executor pod $podName ready, launched at 
$clusterNodeName as IP $podIP.")
   executorPodsByIPs.put(podIP, pod)
 
-case Action.DELETED | Action.ERROR =>
+case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == 
"Init:CrashLoopBackoff")
--- End diff --

The Kubernetes client doesn't use any enumerations from the underlying API, 
it only takes the raw strings in the response body. So if the response gives us 
those values, we should be fine.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

2018-05-08 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21241#discussion_r186797151
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -320,50 +322,83 @@ private[spark] class 
KubernetesClusterSchedulerBackend(
 override def eventReceived(action: Action, pod: Pod): Unit = {
   val podName = pod.getMetadata.getName
   val podIP = pod.getStatus.getPodIP
-
+  val podPhase = pod.getStatus.getPhase
   action match {
-case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
+case Action.MODIFIED if (podPhase == "Running"
 && pod.getMetadata.getDeletionTimestamp == null) =>
   val clusterNodeName = pod.getSpec.getNodeName
   logInfo(s"Executor pod $podName ready, launched at 
$clusterNodeName as IP $podIP.")
   executorPodsByIPs.put(podIP, pod)
 
-case Action.DELETED | Action.ERROR =>
+case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == 
"Init:CrashLoopBackoff")
--- End diff --

Are `Init:Error` and `Init:CrashLoopBackoff ` valid `PodPhase` values? I 
don't see them in 
https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/api/core/v1/types.go#L2215.
 It appears to me that those are printed form of` PodStatus`. Or the fabric8 
client we use return those?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

2018-05-08 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21241#discussion_r186793785
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -320,50 +322,83 @@ private[spark] class 
KubernetesClusterSchedulerBackend(
 override def eventReceived(action: Action, pod: Pod): Unit = {
   val podName = pod.getMetadata.getName
   val podIP = pod.getStatus.getPodIP
-
+  val podPhase = pod.getStatus.getPhase
   action match {
-case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
+case Action.MODIFIED if (podPhase == "Running"
 && pod.getMetadata.getDeletionTimestamp == null) =>
   val clusterNodeName = pod.getSpec.getNodeName
   logInfo(s"Executor pod $podName ready, launched at 
$clusterNodeName as IP $podIP.")
   executorPodsByIPs.put(podIP, pod)
 
-case Action.DELETED | Action.ERROR =>
+case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == 
"Init:CrashLoopBackoff")
+  && pod.getMetadata.getDeletionTimestamp == null =>
   val executorId = getExecutorId(pod)
-  logDebug(s"Executor pod $podName at IP $podIP was at $action.")
-  if (podIP != null) {
-executorPodsByIPs.remove(podIP)
+  failedInitExecutors.add(executorId)
+  if (failedInitExecutors.size >= executorMaxInitErrors) {
+val errorMessage = s"Aborting Spark application because 
$executorMaxInitErrors" +
+  s" executors failed to start. The maximum number of allowed 
startup failures is" +
+  s" $executorMaxInitErrors. Please contact your cluster 
administrator or increase" +
+  s" your setting of 
${KUBERNETES_EXECUTOR_MAX_INIT_ERRORS.key}."
+logError(errorMessage)
+
KubernetesClusterSchedulerBackend.this.scheduler.sc.stopInNewThread()
+throw new SparkException(errorMessage)
   }
+  handleFailedPod(action, pod, podName, podIP)
 
-  val executorExitReason = if (action == Action.ERROR) {
-logWarning(s"Received error event of executor pod $podName. 
Reason: " +
-  pod.getStatus.getReason)
-executorExitReasonOnError(pod)
-  } else if (action == Action.DELETED) {
-logWarning(s"Received delete event of executor pod $podName. 
Reason: " +
-  pod.getStatus.getReason)
-executorExitReasonOnDelete(pod)
-  } else {
-throw new IllegalStateException(
-  s"Unknown action that should only be DELETED or ERROR: 
$action")
-  }
-  podsWithKnownExitReasons.put(pod.getMetadata.getName, 
executorExitReason)
-
-  if 
(!disconnectedPodsByExecutorIdPendingRemoval.containsKey(executorId)) {
-log.warn(s"Executor with id $executorId was not marked as 
disconnected, but the " +
-  s"watch received an event of type $action for this executor. 
The executor may " +
-  "have failed to start in the first place and never 
registered with the driver.")
-  }
-  disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod)
+case Action.DELETED | Action.ERROR =>
+  handleFailedPod(action, pod, podName, podIP)
 
 case _ => logDebug(s"Received event of executor pod $podName: " + 
action)
   }
 }
 
+private def handleFailedPod(action: Action, pod: Pod, podName: String, 
podIP: String) = {
+  val executorId = getExecutorId(pod)
+  logDebug(s"Executor pod $podName at IP $podIP was at $action.")
+  if (podIP != null) {
+executorPodsByIPs.remove(podIP)
+  }
+
+  val executorExitReason = if (action == Action.ERROR) {
+logWarning(s"Received error event of executor pod $podName. 
Reason: " +
+  pod.getStatus.getReason)
+executorExitReasonOnError(pod)
+  } else if (action == Action.DELETED) {
+logWarning(s"Received delete event of executor pod $podName. 
Reason: " +
+  pod.getStatus.getReason)
+executorExitReasonOnDelete(pod)
+  } else if (action == Action.MODIFIED) {
+executorExitReasonOnInitError(pod)
--- End diff --

That only covers init-containers - I think @foxish is referring to other 
classes of initialization errors, like failing to pull the image.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

2018-05-08 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21241#discussion_r186790719
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -320,50 +322,83 @@ private[spark] class 
KubernetesClusterSchedulerBackend(
 override def eventReceived(action: Action, pod: Pod): Unit = {
   val podName = pod.getMetadata.getName
   val podIP = pod.getStatus.getPodIP
-
+  val podPhase = pod.getStatus.getPhase
   action match {
-case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
+case Action.MODIFIED if (podPhase == "Running"
 && pod.getMetadata.getDeletionTimestamp == null) =>
   val clusterNodeName = pod.getSpec.getNodeName
   logInfo(s"Executor pod $podName ready, launched at 
$clusterNodeName as IP $podIP.")
   executorPodsByIPs.put(podIP, pod)
 
-case Action.DELETED | Action.ERROR =>
+case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == 
"Init:CrashLoopBackoff")
+  && pod.getMetadata.getDeletionTimestamp == null =>
   val executorId = getExecutorId(pod)
-  logDebug(s"Executor pod $podName at IP $podIP was at $action.")
-  if (podIP != null) {
-executorPodsByIPs.remove(podIP)
+  failedInitExecutors.add(executorId)
+  if (failedInitExecutors.size >= executorMaxInitErrors) {
+val errorMessage = s"Aborting Spark application because 
$executorMaxInitErrors" +
+  s" executors failed to start. The maximum number of allowed 
startup failures is" +
+  s" $executorMaxInitErrors. Please contact your cluster 
administrator or increase" +
+  s" your setting of 
${KUBERNETES_EXECUTOR_MAX_INIT_ERRORS.key}."
+logError(errorMessage)
+
KubernetesClusterSchedulerBackend.this.scheduler.sc.stopInNewThread()
--- End diff --

The problem is that executors that fail to start up won't create task 
failures, meaning that they will not cause the job(s) to stop. Additionally, 
executors failing to start up will affect every job on the process. So what I 
think we want to do is when we fail with these initialization errors too many 
times, is to shut down the Spark context and crash all of the jobs. The 
alternative is that we stop requesting new executors entirely, but this state 
isn't desirable because for example you could have your Spark application 
running with 0 executors, meaning that your application can be stuck for 
seemingly no reason.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

2018-05-08 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21241#discussion_r186790517
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -320,50 +322,83 @@ private[spark] class 
KubernetesClusterSchedulerBackend(
 override def eventReceived(action: Action, pod: Pod): Unit = {
   val podName = pod.getMetadata.getName
   val podIP = pod.getStatus.getPodIP
-
+  val podPhase = pod.getStatus.getPhase
   action match {
-case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
+case Action.MODIFIED if (podPhase == "Running"
 && pod.getMetadata.getDeletionTimestamp == null) =>
   val clusterNodeName = pod.getSpec.getNodeName
   logInfo(s"Executor pod $podName ready, launched at 
$clusterNodeName as IP $podIP.")
   executorPodsByIPs.put(podIP, pod)
 
-case Action.DELETED | Action.ERROR =>
+case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == 
"Init:CrashLoopBackoff")
+  && pod.getMetadata.getDeletionTimestamp == null =>
   val executorId = getExecutorId(pod)
-  logDebug(s"Executor pod $podName at IP $podIP was at $action.")
-  if (podIP != null) {
-executorPodsByIPs.remove(podIP)
+  failedInitExecutors.add(executorId)
+  if (failedInitExecutors.size >= executorMaxInitErrors) {
+val errorMessage = s"Aborting Spark application because 
$executorMaxInitErrors" +
+  s" executors failed to start. The maximum number of allowed 
startup failures is" +
+  s" $executorMaxInitErrors. Please contact your cluster 
administrator or increase" +
+  s" your setting of 
${KUBERNETES_EXECUTOR_MAX_INIT_ERRORS.key}."
+logError(errorMessage)
+
KubernetesClusterSchedulerBackend.this.scheduler.sc.stopInNewThread()
+throw new SparkException(errorMessage)
   }
+  handleFailedPod(action, pod, podName, podIP)
 
-  val executorExitReason = if (action == Action.ERROR) {
-logWarning(s"Received error event of executor pod $podName. 
Reason: " +
-  pod.getStatus.getReason)
-executorExitReasonOnError(pod)
-  } else if (action == Action.DELETED) {
-logWarning(s"Received delete event of executor pod $podName. 
Reason: " +
-  pod.getStatus.getReason)
-executorExitReasonOnDelete(pod)
-  } else {
-throw new IllegalStateException(
-  s"Unknown action that should only be DELETED or ERROR: 
$action")
-  }
-  podsWithKnownExitReasons.put(pod.getMetadata.getName, 
executorExitReason)
-
-  if 
(!disconnectedPodsByExecutorIdPendingRemoval.containsKey(executorId)) {
-log.warn(s"Executor with id $executorId was not marked as 
disconnected, but the " +
-  s"watch received an event of type $action for this executor. 
The executor may " +
-  "have failed to start in the first place and never 
registered with the driver.")
-  }
-  disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod)
+case Action.DELETED | Action.ERROR =>
+  handleFailedPod(action, pod, podName, podIP)
 
 case _ => logDebug(s"Received event of executor pod $podName: " + 
action)
   }
 }
 
+private def handleFailedPod(action: Action, pod: Pod, podName: String, 
podIP: String) = {
+  val executorId = getExecutorId(pod)
+  logDebug(s"Executor pod $podName at IP $podIP was at $action.")
+  if (podIP != null) {
+executorPodsByIPs.remove(podIP)
+  }
+
+  val executorExitReason = if (action == Action.ERROR) {
+logWarning(s"Received error event of executor pod $podName. 
Reason: " +
+  pod.getStatus.getReason)
+executorExitReasonOnError(pod)
+  } else if (action == Action.DELETED) {
+logWarning(s"Received delete event of executor pod $podName. 
Reason: " +
+  pod.getStatus.getReason)
+executorExitReasonOnDelete(pod)
+  } else if (action == Action.MODIFIED) {
+executorExitReasonOnInitError(pod)
--- End diff --

I think this 
https://kubernetes.io/docs/tasks/debug-application-cluster/debug-init-containers/#understanding-pod-status.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

2018-05-08 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21241#discussion_r186789848
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -320,50 +322,83 @@ private[spark] class 
KubernetesClusterSchedulerBackend(
 override def eventReceived(action: Action, pod: Pod): Unit = {
   val podName = pod.getMetadata.getName
   val podIP = pod.getStatus.getPodIP
-
+  val podPhase = pod.getStatus.getPhase
   action match {
-case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
+case Action.MODIFIED if (podPhase == "Running"
 && pod.getMetadata.getDeletionTimestamp == null) =>
   val clusterNodeName = pod.getSpec.getNodeName
   logInfo(s"Executor pod $podName ready, launched at 
$clusterNodeName as IP $podIP.")
   executorPodsByIPs.put(podIP, pod)
 
-case Action.DELETED | Action.ERROR =>
+case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == 
"Init:CrashLoopBackoff")
+  && pod.getMetadata.getDeletionTimestamp == null =>
   val executorId = getExecutorId(pod)
-  logDebug(s"Executor pod $podName at IP $podIP was at $action.")
-  if (podIP != null) {
-executorPodsByIPs.remove(podIP)
+  failedInitExecutors.add(executorId)
+  if (failedInitExecutors.size >= executorMaxInitErrors) {
+val errorMessage = s"Aborting Spark application because 
$executorMaxInitErrors" +
+  s" executors failed to start. The maximum number of allowed 
startup failures is" +
+  s" $executorMaxInitErrors. Please contact your cluster 
administrator or increase" +
+  s" your setting of 
${KUBERNETES_EXECUTOR_MAX_INIT_ERRORS.key}."
+logError(errorMessage)
+
KubernetesClusterSchedulerBackend.this.scheduler.sc.stopInNewThread()
+throw new SparkException(errorMessage)
   }
+  handleFailedPod(action, pod, podName, podIP)
 
-  val executorExitReason = if (action == Action.ERROR) {
-logWarning(s"Received error event of executor pod $podName. 
Reason: " +
-  pod.getStatus.getReason)
-executorExitReasonOnError(pod)
-  } else if (action == Action.DELETED) {
-logWarning(s"Received delete event of executor pod $podName. 
Reason: " +
-  pod.getStatus.getReason)
-executorExitReasonOnDelete(pod)
-  } else {
-throw new IllegalStateException(
-  s"Unknown action that should only be DELETED or ERROR: 
$action")
-  }
-  podsWithKnownExitReasons.put(pod.getMetadata.getName, 
executorExitReason)
-
-  if 
(!disconnectedPodsByExecutorIdPendingRemoval.containsKey(executorId)) {
-log.warn(s"Executor with id $executorId was not marked as 
disconnected, but the " +
-  s"watch received an event of type $action for this executor. 
The executor may " +
-  "have failed to start in the first place and never 
registered with the driver.")
-  }
-  disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod)
+case Action.DELETED | Action.ERROR =>
+  handleFailedPod(action, pod, podName, podIP)
 
 case _ => logDebug(s"Received event of executor pod $podName: " + 
action)
   }
 }
 
+private def handleFailedPod(action: Action, pod: Pod, podName: String, 
podIP: String) = {
+  val executorId = getExecutorId(pod)
+  logDebug(s"Executor pod $podName at IP $podIP was at $action.")
+  if (podIP != null) {
+executorPodsByIPs.remove(podIP)
+  }
+
+  val executorExitReason = if (action == Action.ERROR) {
+logWarning(s"Received error event of executor pod $podName. 
Reason: " +
+  pod.getStatus.getReason)
+executorExitReasonOnError(pod)
+  } else if (action == Action.DELETED) {
+logWarning(s"Received delete event of executor pod $podName. 
Reason: " +
+  pod.getStatus.getReason)
+executorExitReasonOnDelete(pod)
+  } else if (action == Action.MODIFIED) {
+executorExitReasonOnInitError(pod)
--- End diff --

What's the full set of error types that we should be tracking here? Are 
there docs that have a full enumerated list?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

2018-05-08 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21241#discussion_r186780738
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -320,50 +322,83 @@ private[spark] class 
KubernetesClusterSchedulerBackend(
 override def eventReceived(action: Action, pod: Pod): Unit = {
   val podName = pod.getMetadata.getName
   val podIP = pod.getStatus.getPodIP
-
+  val podPhase = pod.getStatus.getPhase
   action match {
-case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
+case Action.MODIFIED if (podPhase == "Running"
 && pod.getMetadata.getDeletionTimestamp == null) =>
   val clusterNodeName = pod.getSpec.getNodeName
   logInfo(s"Executor pod $podName ready, launched at 
$clusterNodeName as IP $podIP.")
   executorPodsByIPs.put(podIP, pod)
 
-case Action.DELETED | Action.ERROR =>
+case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == 
"Init:CrashLoopBackoff")
+  && pod.getMetadata.getDeletionTimestamp == null =>
   val executorId = getExecutorId(pod)
-  logDebug(s"Executor pod $podName at IP $podIP was at $action.")
-  if (podIP != null) {
-executorPodsByIPs.remove(podIP)
+  failedInitExecutors.add(executorId)
+  if (failedInitExecutors.size >= executorMaxInitErrors) {
+val errorMessage = s"Aborting Spark application because 
$executorMaxInitErrors" +
+  s" executors failed to start. The maximum number of allowed 
startup failures is" +
+  s" $executorMaxInitErrors. Please contact your cluster 
administrator or increase" +
+  s" your setting of 
${KUBERNETES_EXECUTOR_MAX_INIT_ERRORS.key}."
+logError(errorMessage)
+
KubernetesClusterSchedulerBackend.this.scheduler.sc.stopInNewThread()
--- End diff --

Why you need to call this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

2018-05-08 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21241#discussion_r186782288
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -320,50 +322,83 @@ private[spark] class 
KubernetesClusterSchedulerBackend(
 override def eventReceived(action: Action, pod: Pod): Unit = {
   val podName = pod.getMetadata.getName
   val podIP = pod.getStatus.getPodIP
-
+  val podPhase = pod.getStatus.getPhase
   action match {
-case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
+case Action.MODIFIED if (podPhase == "Running"
 && pod.getMetadata.getDeletionTimestamp == null) =>
   val clusterNodeName = pod.getSpec.getNodeName
   logInfo(s"Executor pod $podName ready, launched at 
$clusterNodeName as IP $podIP.")
   executorPodsByIPs.put(podIP, pod)
 
-case Action.DELETED | Action.ERROR =>
+case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == 
"Init:CrashLoopBackoff")
+  && pod.getMetadata.getDeletionTimestamp == null =>
   val executorId = getExecutorId(pod)
-  logDebug(s"Executor pod $podName at IP $podIP was at $action.")
-  if (podIP != null) {
-executorPodsByIPs.remove(podIP)
+  failedInitExecutors.add(executorId)
+  if (failedInitExecutors.size >= executorMaxInitErrors) {
+val errorMessage = s"Aborting Spark application because 
$executorMaxInitErrors" +
+  s" executors failed to start. The maximum number of allowed 
startup failures is" +
+  s" $executorMaxInitErrors. Please contact your cluster 
administrator or increase" +
+  s" your setting of 
${KUBERNETES_EXECUTOR_MAX_INIT_ERRORS.key}."
+logError(errorMessage)
+
KubernetesClusterSchedulerBackend.this.scheduler.sc.stopInNewThread()
+throw new SparkException(errorMessage)
   }
+  handleFailedPod(action, pod, podName, podIP)
 
-  val executorExitReason = if (action == Action.ERROR) {
-logWarning(s"Received error event of executor pod $podName. 
Reason: " +
-  pod.getStatus.getReason)
-executorExitReasonOnError(pod)
-  } else if (action == Action.DELETED) {
-logWarning(s"Received delete event of executor pod $podName. 
Reason: " +
-  pod.getStatus.getReason)
-executorExitReasonOnDelete(pod)
-  } else {
-throw new IllegalStateException(
-  s"Unknown action that should only be DELETED or ERROR: 
$action")
-  }
-  podsWithKnownExitReasons.put(pod.getMetadata.getName, 
executorExitReason)
-
-  if 
(!disconnectedPodsByExecutorIdPendingRemoval.containsKey(executorId)) {
-log.warn(s"Executor with id $executorId was not marked as 
disconnected, but the " +
-  s"watch received an event of type $action for this executor. 
The executor may " +
-  "have failed to start in the first place and never 
registered with the driver.")
-  }
-  disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod)
+case Action.DELETED | Action.ERROR =>
+  handleFailedPod(action, pod, podName, podIP)
 
 case _ => logDebug(s"Received event of executor pod $podName: " + 
action)
   }
 }
 
+private def handleFailedPod(action: Action, pod: Pod, podName: String, 
podIP: String) = {
+  val executorId = getExecutorId(pod)
+  logDebug(s"Executor pod $podName at IP $podIP was at $action.")
+  if (podIP != null) {
+executorPodsByIPs.remove(podIP)
+  }
+
+  val executorExitReason = if (action == Action.ERROR) {
+logWarning(s"Received error event of executor pod $podName. 
Reason: " +
+  pod.getStatus.getReason)
+executorExitReasonOnError(pod)
+  } else if (action == Action.DELETED) {
+logWarning(s"Received delete event of executor pod $podName. 
Reason: " +
+  pod.getStatus.getReason)
+executorExitReasonOnDelete(pod)
+  } else if (action == Action.MODIFIED) {
+executorExitReasonOnInitError(pod)
--- End diff --

I would suggest separating out handling of pod that has failed 
init-container(s) into its own method. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

2018-05-08 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21241#discussion_r186778945
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -320,50 +322,83 @@ private[spark] class 
KubernetesClusterSchedulerBackend(
 override def eventReceived(action: Action, pod: Pod): Unit = {
   val podName = pod.getMetadata.getName
   val podIP = pod.getStatus.getPodIP
-
+  val podPhase = pod.getStatus.getPhase
   action match {
-case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
+case Action.MODIFIED if (podPhase == "Running"
 && pod.getMetadata.getDeletionTimestamp == null) =>
   val clusterNodeName = pod.getSpec.getNodeName
   logInfo(s"Executor pod $podName ready, launched at 
$clusterNodeName as IP $podIP.")
   executorPodsByIPs.put(podIP, pod)
 
-case Action.DELETED | Action.ERROR =>
+case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == 
"Init:CrashLoopBackoff")
--- End diff --

I think it's better to have a method that determines if it's an 
initialization error.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

2018-05-08 Thread foxish
Github user foxish commented on a diff in the pull request:

https://github.com/apache/spark/pull/21241#discussion_r186643210
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -320,50 +322,83 @@ private[spark] class 
KubernetesClusterSchedulerBackend(
 override def eventReceived(action: Action, pod: Pod): Unit = {
   val podName = pod.getMetadata.getName
   val podIP = pod.getStatus.getPodIP
-
+  val podPhase = pod.getStatus.getPhase
   action match {
-case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
+case Action.MODIFIED if (podPhase == "Running"
 && pod.getMetadata.getDeletionTimestamp == null) =>
   val clusterNodeName = pod.getSpec.getNodeName
   logInfo(s"Executor pod $podName ready, launched at 
$clusterNodeName as IP $podIP.")
   executorPodsByIPs.put(podIP, pod)
 
-case Action.DELETED | Action.ERROR =>
+case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == 
"Init:CrashLoopBackoff")
+  && pod.getMetadata.getDeletionTimestamp == null =>
   val executorId = getExecutorId(pod)
-  logDebug(s"Executor pod $podName at IP $podIP was at $action.")
-  if (podIP != null) {
-executorPodsByIPs.remove(podIP)
+  failedInitExecutors.add(executorId)
+  if (failedInitExecutors.size >= executorMaxInitErrors) {
+val errorMessage = s"Aborting Spark application because 
$executorMaxInitErrors" +
+  s" executors failed to start. The maximum number of allowed 
startup failures is" +
+  s" $executorMaxInitErrors. Please contact your cluster 
administrator or increase" +
+  s" your setting of 
${KUBERNETES_EXECUTOR_MAX_INIT_ERRORS.key}."
+logError(errorMessage)
+
KubernetesClusterSchedulerBackend.this.scheduler.sc.stopInNewThread()
+throw new SparkException(errorMessage)
   }
+  handleFailedPod(action, pod, podName, podIP)
 
-  val executorExitReason = if (action == Action.ERROR) {
-logWarning(s"Received error event of executor pod $podName. 
Reason: " +
-  pod.getStatus.getReason)
-executorExitReasonOnError(pod)
-  } else if (action == Action.DELETED) {
-logWarning(s"Received delete event of executor pod $podName. 
Reason: " +
-  pod.getStatus.getReason)
-executorExitReasonOnDelete(pod)
-  } else {
-throw new IllegalStateException(
-  s"Unknown action that should only be DELETED or ERROR: 
$action")
-  }
-  podsWithKnownExitReasons.put(pod.getMetadata.getName, 
executorExitReason)
-
-  if 
(!disconnectedPodsByExecutorIdPendingRemoval.containsKey(executorId)) {
-log.warn(s"Executor with id $executorId was not marked as 
disconnected, but the " +
-  s"watch received an event of type $action for this executor. 
The executor may " +
-  "have failed to start in the first place and never 
registered with the driver.")
-  }
-  disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod)
+case Action.DELETED | Action.ERROR =>
+  handleFailedPod(action, pod, podName, podIP)
 
 case _ => logDebug(s"Received event of executor pod $podName: " + 
action)
   }
 }
 
+private def handleFailedPod(action: Action, pod: Pod, podName: String, 
podIP: String) = {
+  val executorId = getExecutorId(pod)
+  logDebug(s"Executor pod $podName at IP $podIP was at $action.")
+  if (podIP != null) {
+executorPodsByIPs.remove(podIP)
+  }
+
+  val executorExitReason = if (action == Action.ERROR) {
+logWarning(s"Received error event of executor pod $podName. 
Reason: " +
+  pod.getStatus.getReason)
+executorExitReasonOnError(pod)
+  } else if (action == Action.DELETED) {
+logWarning(s"Received delete event of executor pod $podName. 
Reason: " +
+  pod.getStatus.getReason)
+executorExitReasonOnDelete(pod)
+  } else if (action == Action.MODIFIED) {
+executorExitReasonOnInitError(pod)
--- End diff --

I think `Action.MODIFIED` can be fired for a lot of other reasons. I was 
thinking that we should just use container exit status, and have that reflect 
either the main/init container status depending on which container failed.


---

-
To unsubscribe, e-mail: 

[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

2018-05-08 Thread foxish
Github user foxish commented on a diff in the pull request:

https://github.com/apache/spark/pull/21241#discussion_r186642656
  
--- Diff: docs/running-on-kubernetes.md ---
@@ -561,6 +561,13 @@ specific to Spark on Kubernetes.
 This is distinct from spark.executor.cores: it is only 
used and takes precedence over spark.executor.cores for specifying 
the executor pod cpu request if set. Task 
 parallelism, e.g., number of tasks an executor can run concurrently is 
not affected by this.
 
+
+  spark.kubernetes.executor.maxInitFailures
+  10
+  
+Maximum number of times executors are allowed to fail with an 
Init:Error state before failing the application. Note that Init:Error failures 
should not be caused by Spark itself because Spark does not attach 
init-containers to pods. Init-containers can be attached by the cluster itself. 
Users should check with their cluster administrator if these kinds of failures 
to start the executor pod occur frequently.
--- End diff --

Also, I would change the description here, because using init containers 
for injecting init-containers through mutable webhooks is not something that's 
all that common. Also should be linked to 
https://kubernetes.io/docs/admin/extensible-admission-controllers/


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

2018-05-08 Thread foxish
Github user foxish commented on a diff in the pull request:

https://github.com/apache/spark/pull/21241#discussion_r186641837
  
--- Diff: docs/running-on-kubernetes.md ---
@@ -561,6 +561,13 @@ specific to Spark on Kubernetes.
 This is distinct from spark.executor.cores: it is only 
used and takes precedence over spark.executor.cores for specifying 
the executor pod cpu request if set. Task 
 parallelism, e.g., number of tasks an executor can run concurrently is 
not affected by this.
 
+
+  spark.kubernetes.executor.maxInitFailures
+  10
+  
+Maximum number of times executors are allowed to fail with an 
Init:Error state before failing the application. Note that Init:Error failures 
should not be caused by Spark itself because Spark does not attach 
init-containers to pods. Init-containers can be attached by the cluster itself. 
Users should check with their cluster administrator if these kinds of failures 
to start the executor pod occur frequently.
--- End diff --

This is very specific and covering exactly one kind of error. I'd like this 
property to cover all initialization errors.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

2018-05-08 Thread foxish
Github user foxish commented on a diff in the pull request:

https://github.com/apache/spark/pull/21241#discussion_r186642338
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -320,50 +322,83 @@ private[spark] class 
KubernetesClusterSchedulerBackend(
 override def eventReceived(action: Action, pod: Pod): Unit = {
   val podName = pod.getMetadata.getName
   val podIP = pod.getStatus.getPodIP
-
+  val podPhase = pod.getStatus.getPhase
   action match {
-case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
+case Action.MODIFIED if (podPhase == "Running"
 && pod.getMetadata.getDeletionTimestamp == null) =>
   val clusterNodeName = pod.getSpec.getNodeName
   logInfo(s"Executor pod $podName ready, launched at 
$clusterNodeName as IP $podIP.")
   executorPodsByIPs.put(podIP, pod)
 
-case Action.DELETED | Action.ERROR =>
+case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == 
"Init:CrashLoopBackoff")
--- End diff --

Maybe have a separate set for all the error states we want to check. Having 
one place would make this easier to change in future.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

2018-05-04 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/21241#discussion_r186252592
  
--- Diff: docs/running-on-kubernetes.md ---
@@ -561,6 +561,13 @@ specific to Spark on Kubernetes.
 This is distinct from spark.executor.cores: it is only 
used and takes precedence over spark.executor.cores for specifying 
the executor pod cpu request if set. Task 
 parallelism, e.g., number of tasks an executor can run concurrently is 
not affected by this.
 
+
+  spark.kubernetes.executor.maxInitFailures
+  10
--- End diff --

instead of a fixed number, should this be expressed as a factor on the 
total number of executors?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

2018-05-04 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/21241#discussion_r186252598
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -320,50 +322,83 @@ private[spark] class 
KubernetesClusterSchedulerBackend(
 override def eventReceived(action: Action, pod: Pod): Unit = {
   val podName = pod.getMetadata.getName
   val podIP = pod.getStatus.getPodIP
-
+  val podPhase = pod.getStatus.getPhase
   action match {
-case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
+case Action.MODIFIED if (podPhase == "Running"
 && pod.getMetadata.getDeletionTimestamp == null) =>
   val clusterNodeName = pod.getSpec.getNodeName
   logInfo(s"Executor pod $podName ready, launched at 
$clusterNodeName as IP $podIP.")
   executorPodsByIPs.put(podIP, pod)
 
-case Action.DELETED | Action.ERROR =>
+case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == 
"Init:CrashLoopBackoff")
+  && pod.getMetadata.getDeletionTimestamp == null =>
   val executorId = getExecutorId(pod)
-  logDebug(s"Executor pod $podName at IP $podIP was at $action.")
-  if (podIP != null) {
-executorPodsByIPs.remove(podIP)
+  failedInitExecutors.add(executorId)
+  if (failedInitExecutors.size >= executorMaxInitErrors) {
+val errorMessage = s"Aborting Spark application because 
$executorMaxInitErrors" +
+  s" executors failed to start. The maximum number of allowed 
startup failures is" +
--- End diff --

nit: no need for s"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

2018-05-04 Thread mccheah
GitHub user mccheah opened a pull request:

https://github.com/apache/spark/pull/21241

[SPARK-24135][K8s] Resilience to init-container errors on executors.

## What changes were proposed in this pull request?

Spark doesn't attach init-containers. But if a custom web hook or pod 
preset adds init-containers, we need to be resilient to transient failures of 
these containers and to at least retry them.

## How was this patch tested?

Unit tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/palantir/spark handle-init-errors

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21241.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21241


commit 27cfc943e684ac6c20949b300767bb1b29b496e6
Author: mcheah 
Date:   2018-05-01T16:22:19Z

[SPARK-24135][K8s] Resilience to init-container errors on executors.

Spark doesn't attach init-containers. But if a custom web hook or pod
preset adds init-containers, we need to be resilient to transient
failures of these containers and to at least retry them.

commit c9f7e102dee6ab453f97401d274a05cd23a2c3e2
Author: mcheah 
Date:   2018-05-04T22:16:41Z

Make the failure count configurable.

commit 52df0f24d66e97be73d57e7121195170e3b0960b
Author: mcheah 
Date:   2018-05-04T22:19:53Z

Fix compilation




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org