dongjoon-hyun commented on a change in pull request #32752:
URL: https://github.com/apache/spark/pull/32752#discussion_r644431423
##########
File path:
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
##########
@@ -99,6 +102,16 @@ private[spark] class ExecutorPodsAllocator(
@volatile private var deletedExecutorIds = Set.empty[Long]
def start(applicationId: String, schedulerBackend:
KubernetesClusterSchedulerBackend): Unit = {
+ // wait until the driver pod is ready to ensure executors can connect to
driver svc
Review comment:
Can we be more specific? The problem is the absence of K8s's headless
service resource for this driver pod. For example, since K8s is asynchronously
working, the problem can happen even when the driver pod is ready with all
sidekicks and the K8s service is not ready to work with this driver pod.
##########
File path:
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
##########
@@ -104,6 +105,8 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with
BeforeAndAfter {
when(kubernetesClient.pods()).thenReturn(podOperations)
when(podOperations.withName(driverPodName)).thenReturn(driverPodOperations)
when(driverPodOperations.get).thenReturn(driverPod)
+ when(driverPodOperations.waitUntilReady(anyInt(), any(classOf[TimeUnit])))
+ .thenReturn(driverPod)
Review comment:
Can we simplify like the following?
```
when(driverPodOperations.waitUntilReady(any(), any())).thenReturn(driverPod)
```
##########
File path:
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
##########
@@ -104,6 +105,8 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with
BeforeAndAfter {
when(kubernetesClient.pods()).thenReturn(podOperations)
when(podOperations.withName(driverPodName)).thenReturn(driverPodOperations)
when(driverPodOperations.get).thenReturn(driverPod)
+ when(driverPodOperations.waitUntilReady(anyInt(), any(classOf[TimeUnit])))
+ .thenReturn(driverPod)
Review comment:
Can we simplify like the following? Then, we don't need to change the
`import` part in this file.
```
when(driverPodOperations.waitUntilReady(any(), any())).thenReturn(driverPod)
```
##########
File path:
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
##########
@@ -61,6 +62,8 @@ private[spark] class ExecutorPodsAllocator(
podAllocationDelay * 5,
conf.get(KUBERNETES_ALLOCATION_EXECUTOR_TIMEOUT))
+ private val driverPodReadinessTimeout = 5
Review comment:
Like KUBERNETES_ALLOCATION_EXECUTOR_TIMEOUT, we need an official
configuration instead of the magic number here. Could you add it to
`resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala`?
##########
File path:
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
##########
@@ -61,6 +62,8 @@ private[spark] class ExecutorPodsAllocator(
podAllocationDelay * 5,
conf.get(KUBERNETES_ALLOCATION_EXECUTOR_TIMEOUT))
+ private val driverPodReadinessTimeout = 5
Review comment:
Like `KUBERNETES_ALLOCATION_EXECUTOR_TIMEOUT`, we need an official
configuration instead of the magic number here. Could you add it to
`resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala`?
##########
File path:
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
##########
@@ -99,6 +102,17 @@ private[spark] class ExecutorPodsAllocator(
@volatile private var deletedExecutorIds = Set.empty[Long]
def start(applicationId: String, schedulerBackend:
KubernetesClusterSchedulerBackend): Unit = {
+ // Wait until the driver pod is ready before starting executors, as the
headless service won't
+ // be resolvable by DNS until the driver pod is ready.
+ try {
+ kubernetesClient.pods()
+ .withName(kubernetesDriverPodName.get)
+ .waitUntilReady(driverPodReadinessTimeout, TimeUnit.MINUTES)
+ } catch {
+ case e: InterruptedException =>
+ logWarning(s"Timeout waiting for driver pod
${kubernetesDriverPodName.get} get ready in " +
+ s"namespace $namespace")
+ }
Review comment:
If we are going to just `logWarning`, the following code pattern will be
better like the other places.
```scala
Utils.tryLogNonFatalError {
kubernetesClient.pods()
.withName(kubernetesDriverPodName.get)
.waitUntilReady(driverPodReadinessTimeout, TimeUnit.MINUTES)
}
```
##########
File path:
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
##########
@@ -99,6 +102,17 @@ private[spark] class ExecutorPodsAllocator(
@volatile private var deletedExecutorIds = Set.empty[Long]
def start(applicationId: String, schedulerBackend:
KubernetesClusterSchedulerBackend): Unit = {
+ // Wait until the driver pod is ready before starting executors, as the
headless service won't
+ // be resolvable by DNS until the driver pod is ready.
+ try {
+ kubernetesClient.pods()
+ .withName(kubernetesDriverPodName.get)
+ .waitUntilReady(driverPodReadinessTimeout, TimeUnit.MINUTES)
+ } catch {
+ case e: InterruptedException =>
+ logWarning(s"Timeout waiting for driver pod
${kubernetesDriverPodName.get} get ready in " +
+ s"namespace $namespace")
+ }
Review comment:
-1 for `SparkException` idea. That is a huge regression.
`UnknownHostException` is not a job blocker because Spark is going to create
the required executors in any way eventually. If you throws SparkException, you
change will be a breaker of the existing Spark pipelines indeed.
##########
File path:
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
##########
@@ -99,6 +102,17 @@ private[spark] class ExecutorPodsAllocator(
@volatile private var deletedExecutorIds = Set.empty[Long]
def start(applicationId: String, schedulerBackend:
KubernetesClusterSchedulerBackend): Unit = {
+ // Wait until the driver pod is ready before starting executors, as the
headless service won't
+ // be resolvable by DNS until the driver pod is ready.
+ try {
+ kubernetesClient.pods()
+ .withName(kubernetesDriverPodName.get)
+ .waitUntilReady(driverPodReadinessTimeout, TimeUnit.MINUTES)
+ } catch {
+ case e: InterruptedException =>
+ logWarning(s"Timeout waiting for driver pod
${kubernetesDriverPodName.get} get ready in " +
+ s"namespace $namespace")
+ }
Review comment:
I don't think the error report is correct because Apache Spark doesn't
use `restart` executor at all. The driver will keep creating new executors
because the executors with `UnknownHostException` will be dead on the failure.
I'm currently describing the Apache Spark behavior, not 3rd party operator
behavior.
##########
File path:
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
##########
@@ -300,6 +300,17 @@ private[spark] object Config extends Logging {
.checkValue(value => value > 0, "Allocation batch delay must be a
positive time value.")
.createWithDefaultString("1s")
+ val KUBERNETES_ALLOCATION_DRIVER_READINESS_TIMEOUT =
+ ConfigBuilder("spark.kubernetes.allocation.driver.readinessTimeout")
+ .doc("Time to wait for driver pod to get ready before creating executor
pods. This wait " +
+ "only happens on application start. If timeout happens, executor pods
will still be " +
+ "created.")
+ .version("3.2.0")
Review comment:
@cchriswu . Sorry, but could you update this to `3.1.3`? Since K8s is GA
at Apache Spark 3.1 and this bug is reported at 2.4.x, I believe we need to
backport this to branch-3.1 and release `3.1.3` and `3.2.0` together.
##########
File path:
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
##########
@@ -300,6 +300,17 @@ private[spark] object Config extends Logging {
.checkValue(value => value > 0, "Allocation batch delay must be a
positive time value.")
.createWithDefaultString("1s")
+ val KUBERNETES_ALLOCATION_DRIVER_READINESS_TIMEOUT =
+ ConfigBuilder("spark.kubernetes.allocation.driver.readinessTimeout")
+ .doc("Time to wait for driver pod to get ready before creating executor
pods. This wait " +
+ "only happens on application start. If timeout happens, executor pods
will still be " +
+ "created.")
+ .version("3.2.0")
Review comment:
@cchriswu . Sorry, but could you update this to `3.1.3`? Since K8s is GA
at Apache Spark 3.1 and this bug was reported at 2.4.x, I believe we need to
backport this to branch-3.1 and release `3.1.3` and `3.2.0` together.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]