Github user ifilonenko commented on a diff in the pull request:
https://github.com/apache/spark/pull/22415#discussion_r217560220
--- Diff:
resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
---
@@ -218,17 +223,25 @@ private[spark] class KubernetesSuite extends
SparkFunSuite
.getItems
.get(0)
driverPodChecker(driverPod)
-
- val executorPods = kubernetesTestComponents.kubernetesClient
+ val execPods = scala.collection.mutable.Stack[Pod]()
+ val execWatcher = kubernetesTestComponents.kubernetesClient
.pods()
.withLabel("spark-app-locator", appLocator)
.withLabel("spark-role", "executor")
- .list()
- .getItems
- executorPods.asScala.foreach { pod =>
- executorPodChecker(pod)
- }
-
+ .watch(new Watcher[Pod] {
+ logInfo("Beginning watch of executors")
+ override def onClose(cause: KubernetesClientException): Unit =
+ logInfo("Ending watch of executors")
+ override def eventReceived(action: Watcher.Action, resource: Pod):
Unit = {
+ action match {
+ case Action.ADDED | Action.MODIFIED =>
+ execPods.push(resource)
+ }
+ }
+ })
+ Eventually.eventually(TIMEOUT, INTERVAL) { execPods.nonEmpty should be
(true) }
+ execWatcher.close()
+ executorPodChecker(execPods.pop())
--- End diff --
Yeah, no need to check other executors.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]