Github user liyinan926 commented on a diff in the pull request:
https://github.com/apache/spark/pull/20451#discussion_r201512105
--- Diff:
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
---
@@ -20,41 +20,65 @@ import java.io.File
import java.util.concurrent.TimeUnit
import com.google.common.cache.CacheBuilder
-import io.fabric8.kubernetes.client.Config
-
-import org.apache.spark.{SparkContext, SparkException}
-import org.apache.spark.deploy.k8s.{KubernetesUtils,
SparkKubernetesClientFactory}
+import io.fabric8.kubernetes.client.{Config, KubernetesClient}
+import org.apache.spark.{SparkConf, SparkContext, SparkException}
+import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{ExternalClusterManager,
SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
import org.apache.spark.util.{SystemClock, ThreadUtils}
-private[spark] class KubernetesClusterManager extends
ExternalClusterManager with Logging {
+trait ManagerSpecificHandlers {
+ def createKubernetesClient(sparkConf: SparkConf): KubernetesClient
+}
- override def canCreate(masterURL: String): Boolean =
masterURL.startsWith("k8s")
+private[spark] class KubernetesClusterManager extends
ExternalClusterManager
+ with ManagerSpecificHandlers with Logging {
- override def createTaskScheduler(sc: SparkContext, masterURL: String):
TaskScheduler = {
- if (masterURL.startsWith("k8s") &&
- sc.deployMode == "client" &&
- !sc.conf.get(KUBERNETES_DRIVER_SUBMIT_CHECK).getOrElse(false)) {
- throw new SparkException("Client mode is currently not supported for
Kubernetes.")
+ class InClusterHandlers extends ManagerSpecificHandlers {
+ override def createKubernetesClient(sparkConf: SparkConf):
KubernetesClient =
+ SparkKubernetesClientFactory.createKubernetesClient(
+ KUBERNETES_MASTER_INTERNAL_URL,
+ Some(sparkConf.get(KUBERNETES_NAMESPACE)),
+ KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX,
+ sparkConf,
+ Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)),
+ Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)))
+ }
+
+ class OutClusterHandlers extends ManagerSpecificHandlers {
+ override def createKubernetesClient(sparkConf: SparkConf):
KubernetesClient =
+ SparkKubernetesClientFactory.createKubernetesClient(
+ sparkConf.get("spark.master").replace("k8s://", ""),
+ Some(sparkConf.get(KUBERNETES_NAMESPACE)),
+ KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX,
--- End diff --
The name of this prefix `spark.kubernetes.authenticate.driver.mounted`
sounds weird in this case given that the client is running outside the cluster.
BTW: can we alternatively use the config at `$HOME//.kube/config` to build a
kubernetes client instead? I think this is a common approach for building
clients outside a cluster.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]