vanzin closed pull request #23382: [SPARK-26441][KUBERNETES] Add kind
configuration of driver pod
URL: https://github.com/apache/spark/pull/23382
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/pom.xml b/pom.xml
index 321de209a56a1..74ffab69cdbf5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -105,6 +105,7 @@
<module>external/kafka-0-10-assembly</module>
<module>external/kafka-0-10-sql</module>
<module>external/avro</module>
+ <module>resource-managers/kubernetes/core</module>
<!-- See additional modules enabled by profiles below -->
</modules>
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index e8bf16df190e8..aefe3ef6ea131 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -289,6 +289,19 @@ private[spark] object Config extends Logging {
.booleanConf
.createWithDefault(true)
+ val KUBERNETES_DRIVER_POD_KIND =
+ ConfigBuilder("spark.kubernetes.driver.pod.kind")
+ .doc("Specify the kind of driver's pod on Kubernetes.")
+ .stringConf
+ .checkValues(Set("Pod", "Job", "Deployment"))
+ .createWithDefault("Pod")
+
+ val KUBERNETES_JOB_BACKOFFLIMIT =
+ ConfigBuilder("spark.kubernetes.job.backofflimit")
+ .doc("Specify backoffLimit of job.")
+ .intConf
+ .createWithDefault(3)
+
val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
val KUBERNETES_DRIVER_ANNOTATION_PREFIX =
"spark.kubernetes.driver.annotation."
val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets."
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
index 3888778bf84ca..fdc83f96ba755 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
@@ -21,10 +21,12 @@ import java.util.{Collections, UUID}
import java.util.Properties
import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.api.model.apps.{Deployment, DeploymentBuilder}
+import io.fabric8.kubernetes.api.model.batch.{Job, JobBuilder}
import io.fabric8.kubernetes.client.KubernetesClient
+
import scala.collection.mutable
import scala.util.control.NonFatal
-
import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkApplication
import org.apache.spark.deploy.k8s._
@@ -33,6 +35,8 @@ import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils
+
+
/**
* Encapsulates arguments to the submission client.
*
@@ -130,12 +134,51 @@ private[spark] class Client(
.endVolume()
.endSpec()
.build()
+ val driverPodName = resolvedDriverPod.getMetadata.getName
+
+ val driverPodKind = conf.sparkConf.get(KUBERNETES_DRIVER_POD_KIND)
+
Utils.tryWithResource(
kubernetesClient
.pods()
.withName(resolvedDriverPod.getMetadata.getName)
.watch(watcher)) { _ =>
- val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
+
+ var createdDriverPod: HasMetadata = null
+
+ if(driverPodKind.equals("Deployment")) {
+ val resolvedDeploymentPod = new DeploymentBuilder()
+ .editOrNewMetadata()
+ .withName(s"$driverPodName")
+ .endMetadata()
+ .withNewSpec()
+ .withReplicas(1)
+ .editOrNewTemplate()
+ .withMetadata(resolvedDriverPod.getMetadata)
+ .editOrNewSpecLike(resolvedDriverPod.getSpec)
+ .withRestartPolicy("Always")
+ .endSpec()
+ .endTemplate()
+ .endSpec()
+ .build()
+ createdDriverPod =
kubernetesClient.extensions().deployments().create(resolvedDeploymentPod)
+
+ } else if (driverPodKind.equals("Pod")) {
+ createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
+ } else {
+ val resolvedJobPod = new JobBuilder()
+ .withMetadata(resolvedDriverPod.getMetadata)
+ .withNewSpec()
+ .withBackoffLimit(conf.sparkConf.get(KUBERNETES_JOB_BACKOFFLIMIT))
+ .editOrNewTemplate()
+ .withMetadata(resolvedDriverPod.getMetadata)
+ .withSpec(resolvedDriverPod.getSpec)
+ .endTemplate()
+ .endSpec()
+ .build()
+ createdDriverPod =
kubernetesClient.extensions().jobs().create(resolvedJobPod)
+ }
+
try {
val otherKubernetesResources =
resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
@@ -143,7 +186,13 @@ private[spark] class Client(
kubernetesClient.resourceList(otherKubernetesResources:
_*).createOrReplace()
} catch {
case NonFatal(e) =>
- kubernetesClient.pods().delete(createdDriverPod)
+ if (driverPodKind.equals("Job")) {
+
kubernetesClient.extensions().deployments().delete(createdDriverPod.asInstanceOf[Deployment])
+ } else if (driverPodKind.equals("Pod")) {
+ kubernetesClient.pods().delete(createdDriverPod.asInstanceOf[Pod])
+ } else {
+
kubernetesClient.extensions().jobs().delete(createdDriverPod.asInstanceOf[Job])
+ }
throw e
}
@@ -159,7 +208,7 @@ private[spark] class Client(
// Add a OwnerReference to the given resources making the driver pod an
owner of them so when
// the driver pod is deleted, the resources are garbage collected.
- private def addDriverOwnerReference(driverPod: Pod, resources:
Seq[HasMetadata]): Unit = {
+ private def addDriverOwnerReference(driverPod: HasMetadata, resources:
Seq[HasMetadata]): Unit = {
val driverPodOwnerReference = new OwnerReferenceBuilder()
.withName(driverPod.getMetadata.getName)
.withApiVersion(driverPod.getApiVersion)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]