Github user echarles commented on a diff in the pull request:
https://github.com/apache/spark/pull/20451#discussion_r201781927
--- Diff:
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
---
@@ -20,41 +20,65 @@ import java.io.File
import java.util.concurrent.TimeUnit
import com.google.common.cache.CacheBuilder
-import io.fabric8.kubernetes.client.Config
-
-import org.apache.spark.{SparkContext, SparkException}
-import org.apache.spark.deploy.k8s.{KubernetesUtils,
SparkKubernetesClientFactory}
+import io.fabric8.kubernetes.client.{Config, KubernetesClient}
+import org.apache.spark.{SparkConf, SparkContext, SparkException}
+import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{ExternalClusterManager,
SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
import org.apache.spark.util.{SystemClock, ThreadUtils}
-private[spark] class KubernetesClusterManager extends
ExternalClusterManager with Logging {
+trait ManagerSpecificHandlers {
+ def createKubernetesClient(sparkConf: SparkConf): KubernetesClient
+}
- override def canCreate(masterURL: String): Boolean =
masterURL.startsWith("k8s")
+private[spark] class KubernetesClusterManager extends
ExternalClusterManager
+ with ManagerSpecificHandlers with Logging {
- override def createTaskScheduler(sc: SparkContext, masterURL: String):
TaskScheduler = {
- if (masterURL.startsWith("k8s") &&
- sc.deployMode == "client" &&
- !sc.conf.get(KUBERNETES_DRIVER_SUBMIT_CHECK).getOrElse(false)) {
- throw new SparkException("Client mode is currently not supported for
Kubernetes.")
+ class InClusterHandlers extends ManagerSpecificHandlers {
+ override def createKubernetesClient(sparkConf: SparkConf):
KubernetesClient =
+ SparkKubernetesClientFactory.createKubernetesClient(
+ KUBERNETES_MASTER_INTERNAL_URL,
+ Some(sparkConf.get(KUBERNETES_NAMESPACE)),
+ KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX,
+ sparkConf,
+ Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)),
+ Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)))
+ }
+
+ class OutClusterHandlers extends ManagerSpecificHandlers {
+ override def createKubernetesClient(sparkConf: SparkConf):
KubernetesClient =
+ SparkKubernetesClientFactory.createKubernetesClient(
+ sparkConf.get("spark.master").replace("k8s://", ""),
+ Some(sparkConf.get(KUBERNETES_NAMESPACE)),
+ KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX,
--- End diff --
the call to `createKubernetesClient` is not used in two different ways:
+ `KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX` is used in
`KubernetesClusterManager`
+ `KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX` is used in
`KubernetesClientApplication`
I would favor the second and remove the first.
For the config place, I remember that the fabric8 k8s client does also some
inspection to see if it is in or out cluster, and loads the config form the
default place (depending the case), with possiblity to specify other places for
the cert, token... (this is what we give as property to the end-user).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]