[SPARK-18278][SCHEDULER] Spark on Kubernetes - Basic Scheduler Backend ## What changes were proposed in this pull request?
This is a stripped down version of the `KubernetesClusterSchedulerBackend` for Spark with the following components: - Static Allocation of Executors - Executor Pod Factory - Executor Recovery Semantics It's step 1 from the step-wise plan documented [here](https://github.com/apache-spark-on-k8s/spark/issues/441#issuecomment-330802935). This addition is covered by the [SPIP vote](http://apache-spark-developers-list.1001551.n3.nabble.com/SPIP-Spark-on-Kubernetes-td22147.html) which passed on Aug 31 . ## How was this patch tested? - The patch contains unit tests which are passing. - Manual testing: `./build/mvn -Pkubernetes clean package` succeeded. - It is a **subset** of the entire changelist hosted in http://github.com/apache-spark-on-k8s/spark which is in active use in several organizations. - There is integration testing enabled in the fork currently [hosted by PepperData](spark-k8s-jenkins.pepperdata.org:8080) which is being moved over to RiseLAB CI. - Detailed documentation on trying out the patch in its entirety is in: https://apache-spark-on-k8s.github.io/userdocs/running-on-kubernetes.html cc rxin felixcheung mateiz (shepherd) k8s-big-data SIG members & contributors: mccheah ash211 ssuchter varunkatta kimoonkim erikerlandson liyinan926 tnachen ifilonenko Author: Yinan Li <liyinan...@gmail.com> Author: foxish <ramanath...@google.com> Author: mcheah <mch...@palantir.com> Closes #19468 from foxish/spark-kubernetes-3. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e9b2070a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e9b2070a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e9b2070a Branch: refs/heads/master Commit: e9b2070ab2d04993b1c0c1d6c6aba249e6664c8d Parents: 475a29f Author: Yinan Li <liyinan...@gmail.com> Authored: Tue Nov 28 23:02:09 2017 -0800 Committer: Reynold Xin <r...@databricks.com> Committed: Tue Nov 28 23:02:09 2017 -0800 ---------------------------------------------------------------------- .travis.yml | 2 +- NOTICE | 6 + .../cluster/SchedulerBackendUtils.scala | 47 ++ dev/sparktestsupport/modules.py | 8 + docs/configuration.md | 4 +- pom.xml | 7 + project/SparkBuild.scala | 8 +- resource-managers/kubernetes/core/pom.xml | 100 +++++ .../org/apache/spark/deploy/k8s/Config.scala | 123 ++++++ .../spark/deploy/k8s/ConfigurationUtils.scala | 41 ++ .../org/apache/spark/deploy/k8s/Constants.scala | 50 +++ .../k8s/SparkKubernetesClientFactory.scala | 102 +++++ .../cluster/k8s/ExecutorPodFactory.scala | 219 +++++++++ .../cluster/k8s/KubernetesClusterManager.scala | 70 +++ .../k8s/KubernetesClusterSchedulerBackend.scala | 442 +++++++++++++++++++ .../core/src/test/resources/log4j.properties | 31 ++ .../cluster/k8s/ExecutorPodFactorySuite.scala | 135 ++++++ ...KubernetesClusterSchedulerBackendSuite.scala | 440 ++++++++++++++++++ .../spark/deploy/yarn/YarnAllocator.scala | 3 +- .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 24 - .../cluster/YarnClientSchedulerBackend.scala | 2 +- .../cluster/YarnClusterSchedulerBackend.scala | 2 +- 22 files changed, 1832 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e9b2070a/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index d7e9f8c..05b94ade 100644 --- a/.travis.yml +++ b/.travis.yml @@ -43,7 +43,7 @@ notifications: # 5. Run maven install before running lint-java. install: - export MAVEN_SKIP_RC=1 - - build/mvn -T 4 -q -DskipTests -Pmesos -Pyarn -Pkinesis-asl -Phive -Phive-thriftserver install + - build/mvn -T 4 -q -DskipTests -Pkubernetes -Pmesos -Pyarn -Pkinesis-asl -Phive -Phive-thriftserver install # 6. Run lint-java. script: http://git-wip-us.apache.org/repos/asf/spark/blob/e9b2070a/NOTICE ---------------------------------------------------------------------- diff --git a/NOTICE b/NOTICE index f4b64b5..6ec240e 100644 --- a/NOTICE +++ b/NOTICE @@ -448,6 +448,12 @@ Copyright (C) 2011 Google Inc. Apache Commons Pool Copyright 1999-2009 The Apache Software Foundation +This product includes/uses Kubernetes & OpenShift 3 Java Client (https://github.com/fabric8io/kubernetes-client) +Copyright (C) 2015 Red Hat, Inc. + +This product includes/uses OkHttp (https://github.com/square/okhttp) +Copyright (C) 2012 The Android Open Source Project + ========================================================================= == NOTICE file corresponding to section 4(d) of the Apache License, == == Version 2.0, in this case for the DataNucleus distribution. == http://git-wip-us.apache.org/repos/asf/spark/blob/e9b2070a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackendUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackendUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackendUtils.scala new file mode 100644 index 0000000..c166d03 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackendUtils.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster + +import org.apache.spark.SparkConf +import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS, EXECUTOR_INSTANCES} +import org.apache.spark.util.Utils + +private[spark] object SchedulerBackendUtils { + val DEFAULT_NUMBER_EXECUTORS = 2 + + /** + * Getting the initial target number of executors depends on whether dynamic allocation is + * enabled. + * If not using dynamic allocation it gets the number of executors requested by the user. + */ + def getInitialTargetExecutorNumber( + conf: SparkConf, + numExecutors: Int = DEFAULT_NUMBER_EXECUTORS): Int = { + if (Utils.isDynamicAllocationEnabled(conf)) { + val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS) + val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf) + val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS) + require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors, + s"initial executor number $initialNumExecutors must between min executor number " + + s"$minNumExecutors and max executor number $maxNumExecutors") + + initialNumExecutors + } else { + conf.get(EXECUTOR_INSTANCES).getOrElse(numExecutors) + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/e9b2070a/dev/sparktestsupport/modules.py ---------------------------------------------------------------------- diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index dacc89f..44f990e 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -532,6 +532,14 @@ mesos = Module( sbt_test_goals=["mesos/test"] ) +kubernetes = Module( + name="kubernetes", + dependencies=[], + source_file_regexes=["resource-managers/kubernetes/core"], + build_profile_flags=["-Pkubernetes"], + sbt_test_goals=["kubernetes/test"] +) + # The root module is a dummy module which is used to run all of the tests. # No other modules should directly depend on this module. root = Module( http://git-wip-us.apache.org/repos/asf/spark/blob/e9b2070a/docs/configuration.md ---------------------------------------------------------------------- diff --git a/docs/configuration.md b/docs/configuration.md index 9b9583d..e42f866 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1438,10 +1438,10 @@ Apart from these, the following properties are also available, and may be useful </tr> <tr> <td><code>spark.scheduler.minRegisteredResourcesRatio</code></td> - <td>0.8 for YARN mode; 0.0 for standalone mode and Mesos coarse-grained mode</td> + <td>0.8 for KUBERNETES mode; 0.8 for YARN mode; 0.0 for standalone mode and Mesos coarse-grained mode</td> <td> The minimum ratio of registered resources (registered resources / total expected resources) - (resources are executors in yarn mode, CPU cores in standalone mode and Mesos coarsed-grained + (resources are executors in yarn mode and Kubernetes mode, CPU cores in standalone mode and Mesos coarsed-grained mode ['spark.cores.max' value is total expected resources for Mesos coarse-grained mode] ) to wait for before scheduling begins. Specified as a double between 0.0 and 1.0. Regardless of whether the minimum ratio of resources has been reached, http://git-wip-us.apache.org/repos/asf/spark/blob/e9b2070a/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 3b2c629..7bc66e7 100644 --- a/pom.xml +++ b/pom.xml @@ -2665,6 +2665,13 @@ </profile> <profile> + <id>kubernetes</id> + <modules> + <module>resource-managers/kubernetes/core</module> + </modules> + </profile> + + <profile> <id>hive-thriftserver</id> <modules> <module>sql/hive-thriftserver</module> http://git-wip-us.apache.org/repos/asf/spark/blob/e9b2070a/project/SparkBuild.scala ---------------------------------------------------------------------- diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index c726ec2..7570338 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -53,11 +53,11 @@ object BuildCommons { "tags", "sketch", "kvstore" ).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects - val optionallyEnabledProjects@Seq(mesos, yarn, + val optionallyEnabledProjects@Seq(kubernetes, mesos, yarn, streamingFlumeSink, streamingFlume, streamingKafka, sparkGangliaLgpl, streamingKinesisAsl, dockerIntegrationTests, hadoopCloud) = - Seq("mesos", "yarn", + Seq("kubernetes", "mesos", "yarn", "streaming-flume-sink", "streaming-flume", "streaming-kafka-0-8", "ganglia-lgpl", "streaming-kinesis-asl", "docker-integration-tests", "hadoop-cloud").map(ProjectRef(buildLocation, _)) @@ -671,9 +671,9 @@ object Unidoc { publish := {}, unidocProjectFilter in(ScalaUnidoc, unidoc) := - inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, tags, streamingKafka010, sqlKafka010), + inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, kubernetes, yarn, tags, streamingKafka010, sqlKafka010), unidocProjectFilter in(JavaUnidoc, unidoc) := - inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, tags, streamingKafka010, sqlKafka010), + inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, kubernetes, yarn, tags, streamingKafka010, sqlKafka010), unidocAllClasspaths in (ScalaUnidoc, unidoc) := { ignoreClasspaths((unidocAllClasspaths in (ScalaUnidoc, unidoc)).value) http://git-wip-us.apache.org/repos/asf/spark/blob/e9b2070a/resource-managers/kubernetes/core/pom.xml ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/pom.xml b/resource-managers/kubernetes/core/pom.xml new file mode 100644 index 0000000..7d35aea --- /dev/null +++ b/resource-managers/kubernetes/core/pom.xml @@ -0,0 +1,100 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.spark</groupId> + <artifactId>spark-parent_2.11</artifactId> + <version>2.3.0-SNAPSHOT</version> + <relativePath>../../../pom.xml</relativePath> + </parent> + + <artifactId>spark-kubernetes_2.11</artifactId> + <packaging>jar</packaging> + <name>Spark Project Kubernetes</name> + <properties> + <sbt.project.name>kubernetes</sbt.project.name> + <kubernetes.client.version>3.0.0</kubernetes.client.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>io.fabric8</groupId> + <artifactId>kubernetes-client</artifactId> + <version>${kubernetes.client.version}</version> + <exclusions> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.dataformat</groupId> + <artifactId>jackson-dataformat-yaml</artifactId> + </exclusion> + </exclusions> + </dependency> + + <!-- Required by kubernetes-client but we exclude it --> + <dependency> + <groupId>com.fasterxml.jackson.dataformat</groupId> + <artifactId>jackson-dataformat-yaml</artifactId> + <version>${fasterxml.jackson.version}</version> + </dependency> + + <!-- Explicitly depend on shaded dependencies from the parent, since shaded deps aren't transitive --> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + <!-- End of shaded deps. --> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>com.squareup.okhttp3</groupId> + <artifactId>okhttp</artifactId> + <version>3.8.1</version> + </dependency> + + </dependencies> + + + <build> + <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> + <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/spark/blob/e9b2070a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala new file mode 100644 index 0000000..f0742b9 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.network.util.ByteUnit + +private[spark] object Config extends Logging { + + val KUBERNETES_NAMESPACE = + ConfigBuilder("spark.kubernetes.namespace") + .doc("The namespace that will be used for running the driver and executor pods. When using " + + "spark-submit in cluster mode, this can also be passed to spark-submit via the " + + "--kubernetes-namespace command line argument.") + .stringConf + .createWithDefault("default") + + val EXECUTOR_DOCKER_IMAGE = + ConfigBuilder("spark.kubernetes.executor.docker.image") + .doc("Docker image to use for the executors. Specify this using the standard Docker tag " + + "format.") + .stringConf + .createOptional + + val DOCKER_IMAGE_PULL_POLICY = + ConfigBuilder("spark.kubernetes.docker.image.pullPolicy") + .doc("Kubernetes image pull policy. Valid values are Always, Never, and IfNotPresent.") + .stringConf + .checkValues(Set("Always", "Never", "IfNotPresent")) + .createWithDefault("IfNotPresent") + + val APISERVER_AUTH_DRIVER_CONF_PREFIX = + "spark.kubernetes.authenticate.driver" + val APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX = + "spark.kubernetes.authenticate.driver.mounted" + val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken" + val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile" + val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile" + val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile" + val CA_CERT_FILE_CONF_SUFFIX = "caCertFile" + + val KUBERNETES_SERVICE_ACCOUNT_NAME = + ConfigBuilder(s"$APISERVER_AUTH_DRIVER_CONF_PREFIX.serviceAccountName") + .doc("Service account that is used when running the driver pod. The driver pod uses " + + "this service account when requesting executor pods from the API server. If specific " + + "credentials are given for the driver pod to use, the driver will favor " + + "using those credentials instead.") + .stringConf + .createOptional + + // Note that while we set a default for this when we start up the + // scheduler, the specific default value is dynamically determined + // based on the executor memory. + val KUBERNETES_EXECUTOR_MEMORY_OVERHEAD = + ConfigBuilder("spark.kubernetes.executor.memoryOverhead") + .doc("The amount of off-heap memory (in megabytes) to be allocated per executor. This " + + "is memory that accounts for things like VM overheads, interned strings, other native " + + "overheads, etc. This tends to grow with the executor size. (typically 6-10%).") + .bytesConf(ByteUnit.MiB) + .createOptional + + val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label." + val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation." + + val KUBERNETES_DRIVER_POD_NAME = + ConfigBuilder("spark.kubernetes.driver.pod.name") + .doc("Name of the driver pod.") + .stringConf + .createOptional + + val KUBERNETES_EXECUTOR_POD_NAME_PREFIX = + ConfigBuilder("spark.kubernetes.executor.podNamePrefix") + .doc("Prefix to use in front of the executor pod names.") + .internal() + .stringConf + .createWithDefault("spark") + + val KUBERNETES_ALLOCATION_BATCH_SIZE = + ConfigBuilder("spark.kubernetes.allocation.batch.size") + .doc("Number of pods to launch at once in each round of executor allocation.") + .intConf + .checkValue(value => value > 0, "Allocation batch size should be a positive integer") + .createWithDefault(5) + + val KUBERNETES_ALLOCATION_BATCH_DELAY = + ConfigBuilder("spark.kubernetes.allocation.batch.delay") + .doc("Number of seconds to wait between each round of executor allocation.") + .longConf + .checkValue(value => value > 0, "Allocation batch delay should be a positive integer") + .createWithDefault(1) + + val KUBERNETES_EXECUTOR_LIMIT_CORES = + ConfigBuilder("spark.kubernetes.executor.limit.cores") + .doc("Specify the hard cpu limit for a single executor pod") + .stringConf + .createOptional + + val KUBERNETES_EXECUTOR_LOST_REASON_CHECK_MAX_ATTEMPTS = + ConfigBuilder("spark.kubernetes.executor.lostCheck.maxAttempts") + .doc("Maximum number of attempts allowed for checking the reason of an executor loss " + + "before it is assumed that the executor failed.") + .intConf + .checkValue(value => value > 0, "Maximum attempts of checks of executor lost reason " + + "must be a positive integer") + .createWithDefault(10) + + val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector." +} http://git-wip-us.apache.org/repos/asf/spark/blob/e9b2070a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/ConfigurationUtils.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/ConfigurationUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/ConfigurationUtils.scala new file mode 100644 index 0000000..0171747 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/ConfigurationUtils.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.k8s + +import org.apache.spark.SparkConf + +private[spark] object ConfigurationUtils { + + /** + * Extract and parse Spark configuration properties with a given name prefix and + * return the result as a Map. Keys must not have more than one value. + * + * @param sparkConf Spark configuration + * @param prefix the given property name prefix + * @return a Map storing the configuration property keys and values + */ + def parsePrefixedKeyValuePairs( + sparkConf: SparkConf, + prefix: String): Map[String, String] = { + sparkConf.getAllWithPrefix(prefix).toMap + } + + def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = { + opt1.foreach { _ => require(opt2.isEmpty, errMessage) } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/e9b2070a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala new file mode 100644 index 0000000..4ddeefb --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +private[spark] object Constants { + + // Labels + val SPARK_APP_ID_LABEL = "spark-app-selector" + val SPARK_EXECUTOR_ID_LABEL = "spark-exec-id" + val SPARK_ROLE_LABEL = "spark-role" + val SPARK_POD_DRIVER_ROLE = "driver" + val SPARK_POD_EXECUTOR_ROLE = "executor" + + // Default and fixed ports + val DEFAULT_DRIVER_PORT = 7078 + val DEFAULT_BLOCKMANAGER_PORT = 7079 + val BLOCK_MANAGER_PORT_NAME = "blockmanager" + val EXECUTOR_PORT_NAME = "executor" + + // Environment Variables + val ENV_EXECUTOR_PORT = "SPARK_EXECUTOR_PORT" + val ENV_DRIVER_URL = "SPARK_DRIVER_URL" + val ENV_EXECUTOR_CORES = "SPARK_EXECUTOR_CORES" + val ENV_EXECUTOR_MEMORY = "SPARK_EXECUTOR_MEMORY" + val ENV_APPLICATION_ID = "SPARK_APPLICATION_ID" + val ENV_EXECUTOR_ID = "SPARK_EXECUTOR_ID" + val ENV_EXECUTOR_POD_IP = "SPARK_EXECUTOR_POD_IP" + val ENV_EXECUTOR_EXTRA_CLASSPATH = "SPARK_EXECUTOR_EXTRA_CLASSPATH" + val ENV_MOUNTED_CLASSPATH = "SPARK_MOUNTED_CLASSPATH" + val ENV_JAVA_OPT_PREFIX = "SPARK_JAVA_OPT_" + + // Miscellaneous + val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc" + val MEMORY_OVERHEAD_FACTOR = 0.10 + val MEMORY_OVERHEAD_MIN_MIB = 384L +} http://git-wip-us.apache.org/repos/asf/spark/blob/e9b2070a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala new file mode 100644 index 0000000..1e3f055 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +import java.io.File + +import com.google.common.base.Charsets +import com.google.common.io.Files +import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient} +import io.fabric8.kubernetes.client.utils.HttpClientUtils +import okhttp3.Dispatcher + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.util.ThreadUtils + +/** + * Spark-opinionated builder for Kubernetes clients. It uses a prefix plus common suffixes to + * parse configuration keys, similar to the manner in which Spark's SecurityManager parses SSL + * options for different components. + */ +private[spark] object SparkKubernetesClientFactory { + + def createKubernetesClient( + master: String, + namespace: Option[String], + kubernetesAuthConfPrefix: String, + sparkConf: SparkConf, + defaultServiceAccountToken: Option[File], + defaultServiceAccountCaCert: Option[File]): KubernetesClient = { + val oauthTokenFileConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_FILE_CONF_SUFFIX" + val oauthTokenConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_CONF_SUFFIX" + val oauthTokenFile = sparkConf.getOption(oauthTokenFileConf) + .map(new File(_)) + .orElse(defaultServiceAccountToken) + val oauthTokenValue = sparkConf.getOption(oauthTokenConf) + ConfigurationUtils.requireNandDefined( + oauthTokenFile, + oauthTokenValue, + s"Cannot specify OAuth token through both a file $oauthTokenFileConf and a " + + s"value $oauthTokenConf.") + + val caCertFile = sparkConf + .getOption(s"$kubernetesAuthConfPrefix.$CA_CERT_FILE_CONF_SUFFIX") + .orElse(defaultServiceAccountCaCert.map(_.getAbsolutePath)) + val clientKeyFile = sparkConf + .getOption(s"$kubernetesAuthConfPrefix.$CLIENT_KEY_FILE_CONF_SUFFIX") + val clientCertFile = sparkConf + .getOption(s"$kubernetesAuthConfPrefix.$CLIENT_CERT_FILE_CONF_SUFFIX") + val dispatcher = new Dispatcher( + ThreadUtils.newDaemonCachedThreadPool("kubernetes-dispatcher")) + val config = new ConfigBuilder() + .withApiVersion("v1") + .withMasterUrl(master) + .withWebsocketPingInterval(0) + .withOption(oauthTokenValue) { + (token, configBuilder) => configBuilder.withOauthToken(token) + }.withOption(oauthTokenFile) { + (file, configBuilder) => + configBuilder.withOauthToken(Files.toString(file, Charsets.UTF_8)) + }.withOption(caCertFile) { + (file, configBuilder) => configBuilder.withCaCertFile(file) + }.withOption(clientKeyFile) { + (file, configBuilder) => configBuilder.withClientKeyFile(file) + }.withOption(clientCertFile) { + (file, configBuilder) => configBuilder.withClientCertFile(file) + }.withOption(namespace) { + (ns, configBuilder) => configBuilder.withNamespace(ns) + }.build() + val baseHttpClient = HttpClientUtils.createHttpClient(config) + val httpClientWithCustomDispatcher = baseHttpClient.newBuilder() + .dispatcher(dispatcher) + .build() + new DefaultKubernetesClient(httpClientWithCustomDispatcher, config) + } + + private implicit class OptionConfigurableConfigBuilder(val configBuilder: ConfigBuilder) + extends AnyVal { + + def withOption[T] + (option: Option[T]) + (configurator: ((T, ConfigBuilder) => ConfigBuilder)): ConfigBuilder = { + option.map { opt => + configurator(opt, configBuilder) + }.getOrElse(configBuilder) + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/e9b2070a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala new file mode 100644 index 0000000..f79155b --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model._ + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.util.Utils + +/** + * A factory class for configuring and creating executor pods. + */ +private[spark] trait ExecutorPodFactory { + + /** + * Configure and construct an executor pod with the given parameters. + */ + def createExecutorPod( + executorId: String, + applicationId: String, + driverUrl: String, + executorEnvs: Seq[(String, String)], + driverPod: Pod, + nodeToLocalTaskCount: Map[String, Int]): Pod +} + +private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf) + extends ExecutorPodFactory { + + private val executorExtraClasspath = + sparkConf.get(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH) + + private val executorLabels = ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_EXECUTOR_LABEL_PREFIX) + require( + !executorLabels.contains(SPARK_APP_ID_LABEL), + s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.") + require( + !executorLabels.contains(SPARK_EXECUTOR_ID_LABEL), + s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" + + " Spark.") + require( + !executorLabels.contains(SPARK_ROLE_LABEL), + s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is reserved for Spark.") + + private val executorAnnotations = + ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_EXECUTOR_ANNOTATION_PREFIX) + private val nodeSelector = + ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_NODE_SELECTOR_PREFIX) + + private val executorDockerImage = sparkConf + .get(EXECUTOR_DOCKER_IMAGE) + .getOrElse(throw new SparkException("Must specify the executor Docker image")) + private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY) + private val blockManagerPort = sparkConf + .getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT) + + private val executorPodNamePrefix = sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) + + private val executorMemoryMiB = sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY) + private val executorMemoryString = sparkConf.get( + org.apache.spark.internal.config.EXECUTOR_MEMORY.key, + org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString) + + private val memoryOverheadMiB = sparkConf + .get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD) + .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt, + MEMORY_OVERHEAD_MIN_MIB)) + private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB + + private val executorCores = sparkConf.getDouble("spark.executor.cores", 1) + private val executorLimitCores = sparkConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES) + + override def createExecutorPod( + executorId: String, + applicationId: String, + driverUrl: String, + executorEnvs: Seq[(String, String)], + driverPod: Pod, + nodeToLocalTaskCount: Map[String, Int]): Pod = { + val name = s"$executorPodNamePrefix-exec-$executorId" + + // hostname must be no longer than 63 characters, so take the last 63 characters of the pod + // name as the hostname. This preserves uniqueness since the end of name contains + // executorId + val hostname = name.substring(Math.max(0, name.length - 63)) + val resolvedExecutorLabels = Map( + SPARK_EXECUTOR_ID_LABEL -> executorId, + SPARK_APP_ID_LABEL -> applicationId, + SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++ + executorLabels + val executorMemoryQuantity = new QuantityBuilder(false) + .withAmount(s"${executorMemoryMiB}Mi") + .build() + val executorMemoryLimitQuantity = new QuantityBuilder(false) + .withAmount(s"${executorMemoryWithOverhead}Mi") + .build() + val executorCpuQuantity = new QuantityBuilder(false) + .withAmount(executorCores.toString) + .build() + val executorExtraClasspathEnv = executorExtraClasspath.map { cp => + new EnvVarBuilder() + .withName(ENV_EXECUTOR_EXTRA_CLASSPATH) + .withValue(cp) + .build() + } + val executorExtraJavaOptionsEnv = sparkConf + .get(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS) + .map { opts => + val delimitedOpts = Utils.splitCommandString(opts) + delimitedOpts.zipWithIndex.map { + case (opt, index) => + new EnvVarBuilder().withName(s"$ENV_JAVA_OPT_PREFIX$index").withValue(opt).build() + } + }.getOrElse(Seq.empty[EnvVar]) + val executorEnv = (Seq( + (ENV_DRIVER_URL, driverUrl), + // Executor backend expects integral value for executor cores, so round it up to an int. + (ENV_EXECUTOR_CORES, math.ceil(executorCores).toInt.toString), + (ENV_EXECUTOR_MEMORY, executorMemoryString), + (ENV_APPLICATION_ID, applicationId), + (ENV_EXECUTOR_ID, executorId)) ++ executorEnvs) + .map(env => new EnvVarBuilder() + .withName(env._1) + .withValue(env._2) + .build() + ) ++ Seq( + new EnvVarBuilder() + .withName(ENV_EXECUTOR_POD_IP) + .withValueFrom(new EnvVarSourceBuilder() + .withNewFieldRef("v1", "status.podIP") + .build()) + .build() + ) ++ executorExtraJavaOptionsEnv ++ executorExtraClasspathEnv.toSeq + val requiredPorts = Seq( + (BLOCK_MANAGER_PORT_NAME, blockManagerPort)) + .map { case (name, port) => + new ContainerPortBuilder() + .withName(name) + .withContainerPort(port) + .build() + } + + val executorContainer = new ContainerBuilder() + .withName("executor") + .withImage(executorDockerImage) + .withImagePullPolicy(dockerImagePullPolicy) + .withNewResources() + .addToRequests("memory", executorMemoryQuantity) + .addToLimits("memory", executorMemoryLimitQuantity) + .addToRequests("cpu", executorCpuQuantity) + .endResources() + .addAllToEnv(executorEnv.asJava) + .withPorts(requiredPorts.asJava) + .build() + + val executorPod = new PodBuilder() + .withNewMetadata() + .withName(name) + .withLabels(resolvedExecutorLabels.asJava) + .withAnnotations(executorAnnotations.asJava) + .withOwnerReferences() + .addNewOwnerReference() + .withController(true) + .withApiVersion(driverPod.getApiVersion) + .withKind(driverPod.getKind) + .withName(driverPod.getMetadata.getName) + .withUid(driverPod.getMetadata.getUid) + .endOwnerReference() + .endMetadata() + .withNewSpec() + .withHostname(hostname) + .withRestartPolicy("Never") + .withNodeSelector(nodeSelector.asJava) + .endSpec() + .build() + + val containerWithExecutorLimitCores = executorLimitCores.map { limitCores => + val executorCpuLimitQuantity = new QuantityBuilder(false) + .withAmount(limitCores) + .build() + new ContainerBuilder(executorContainer) + .editResources() + .addToLimits("cpu", executorCpuLimitQuantity) + .endResources() + .build() + }.getOrElse(executorContainer) + + new PodBuilder(executorPod) + .editSpec() + .addToContainers(containerWithExecutorLimitCores) + .endSpec() + .build() + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/e9b2070a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala new file mode 100644 index 0000000..68ca6a7 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.io.File + +import io.fabric8.kubernetes.client.Config + +import org.apache.spark.SparkContext +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl} +import org.apache.spark.util.ThreadUtils + +private[spark] class KubernetesClusterManager extends ExternalClusterManager with Logging { + + override def canCreate(masterURL: String): Boolean = masterURL.startsWith("k8s") + + override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = { + new TaskSchedulerImpl(sc) + } + + override def createSchedulerBackend( + sc: SparkContext, + masterURL: String, + scheduler: TaskScheduler): SchedulerBackend = { + val sparkConf = sc.getConf + + val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient( + KUBERNETES_MASTER_INTERNAL_URL, + Some(sparkConf.get(KUBERNETES_NAMESPACE)), + APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX, + sparkConf, + Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)), + Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))) + + val executorPodFactory = new ExecutorPodFactoryImpl(sparkConf) + val allocatorExecutor = ThreadUtils + .newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator") + val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool( + "kubernetes-executor-requests") + new KubernetesClusterSchedulerBackend( + scheduler.asInstanceOf[TaskSchedulerImpl], + sc.env.rpcEnv, + executorPodFactory, + kubernetesClient, + allocatorExecutor, + requestExecutorsService) + } + + override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { + scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/e9b2070a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala new file mode 100644 index 0000000..e79c987 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.io.Closeable +import java.net.InetAddress +import java.util.concurrent.{ConcurrentHashMap, ExecutorService, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} +import javax.annotation.concurrent.GuardedBy + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} + +import org.apache.spark.SparkException +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv} +import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl} +import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils} +import org.apache.spark.util.Utils + +private[spark] class KubernetesClusterSchedulerBackend( + scheduler: TaskSchedulerImpl, + rpcEnv: RpcEnv, + executorPodFactory: ExecutorPodFactory, + kubernetesClient: KubernetesClient, + allocatorExecutor: ScheduledExecutorService, + requestExecutorsService: ExecutorService) + extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { + + import KubernetesClusterSchedulerBackend._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + private val RUNNING_EXECUTOR_PODS_LOCK = new Object + @GuardedBy("RUNNING_EXECUTOR_PODS_LOCK") + private val runningExecutorsToPods = new mutable.HashMap[String, Pod] + private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]() + private val podsWithKnownExitReasons = new ConcurrentHashMap[String, ExecutorExited]() + private val disconnectedPodsByExecutorIdPendingRemoval = new ConcurrentHashMap[String, Pod]() + + private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) + + private val kubernetesDriverPodName = conf + .get(KUBERNETES_DRIVER_POD_NAME) + .getOrElse(throw new SparkException("Must specify the driver pod name")) + private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( + requestExecutorsService) + + private val driverPod = kubernetesClient.pods() + .inNamespace(kubernetesNamespace) + .withName(kubernetesDriverPodName) + .get() + + protected override val minRegisteredRatio = + if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { + 0.8 + } else { + super.minRegisteredRatio + } + + private val executorWatchResource = new AtomicReference[Closeable] + private val totalExpectedExecutors = new AtomicInteger(0) + + private val driverUrl = RpcEndpointAddress( + conf.get("spark.driver.host"), + conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), + CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString + + private val initialExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf) + + private val podAllocationInterval = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) + + private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE) + + private val executorLostReasonCheckMaxAttempts = conf.get( + KUBERNETES_EXECUTOR_LOST_REASON_CHECK_MAX_ATTEMPTS) + + private val allocatorRunnable = new Runnable { + + // Maintains a map of executor id to count of checks performed to learn the loss reason + // for an executor. + private val executorReasonCheckAttemptCounts = new mutable.HashMap[String, Int] + + override def run(): Unit = { + handleDisconnectedExecutors() + + val executorsToAllocate = mutable.Map[String, Pod]() + val currentTotalRegisteredExecutors = totalRegisteredExecutors.get + val currentTotalExpectedExecutors = totalExpectedExecutors.get + val currentNodeToLocalTaskCount = getNodesWithLocalTaskCounts() + RUNNING_EXECUTOR_PODS_LOCK.synchronized { + if (currentTotalRegisteredExecutors < runningExecutorsToPods.size) { + logDebug("Waiting for pending executors before scaling") + } else if (currentTotalExpectedExecutors <= runningExecutorsToPods.size) { + logDebug("Maximum allowed executor limit reached. Not scaling up further.") + } else { + for (_ <- 0 until math.min( + currentTotalExpectedExecutors - runningExecutorsToPods.size, podAllocationSize)) { + val executorId = EXECUTOR_ID_COUNTER.incrementAndGet().toString + val executorPod = executorPodFactory.createExecutorPod( + executorId, + applicationId(), + driverUrl, + conf.getExecutorEnv, + driverPod, + currentNodeToLocalTaskCount) + executorsToAllocate(executorId) = executorPod + logInfo( + s"Requesting a new executor, total executors is now ${runningExecutorsToPods.size}") + } + } + } + + val allocatedExecutors = executorsToAllocate.mapValues { pod => + Utils.tryLog { + kubernetesClient.pods().create(pod) + } + } + + RUNNING_EXECUTOR_PODS_LOCK.synchronized { + allocatedExecutors.map { + case (executorId, attemptedAllocatedExecutor) => + attemptedAllocatedExecutor.map { successfullyAllocatedExecutor => + runningExecutorsToPods.put(executorId, successfullyAllocatedExecutor) + } + } + } + } + + def handleDisconnectedExecutors(): Unit = { + // For each disconnected executor, synchronize with the loss reasons that may have been found + // by the executor pod watcher. If the loss reason was discovered by the watcher, + // inform the parent class with removeExecutor. + disconnectedPodsByExecutorIdPendingRemoval.asScala.foreach { + case (executorId, executorPod) => + val knownExitReason = Option(podsWithKnownExitReasons.remove( + executorPod.getMetadata.getName)) + knownExitReason.fold { + removeExecutorOrIncrementLossReasonCheckCount(executorId) + } { executorExited => + logWarning(s"Removing executor $executorId with loss reason " + executorExited.message) + removeExecutor(executorId, executorExited) + // We don't delete the pod running the executor that has an exit condition caused by + // the application from the Kubernetes API server. This allows users to debug later on + // through commands such as "kubectl logs <pod name>" and + // "kubectl describe pod <pod name>". Note that exited containers have terminated and + // therefore won't take CPU and memory resources. + // Otherwise, the executor pod is marked to be deleted from the API server. + if (executorExited.exitCausedByApp) { + logInfo(s"Executor $executorId exited because of the application.") + deleteExecutorFromDataStructures(executorId) + } else { + logInfo(s"Executor $executorId failed because of a framework error.") + deleteExecutorFromClusterAndDataStructures(executorId) + } + } + } + } + + def removeExecutorOrIncrementLossReasonCheckCount(executorId: String): Unit = { + val reasonCheckCount = executorReasonCheckAttemptCounts.getOrElse(executorId, 0) + if (reasonCheckCount >= executorLostReasonCheckMaxAttempts) { + removeExecutor(executorId, SlaveLost("Executor lost for unknown reasons.")) + deleteExecutorFromClusterAndDataStructures(executorId) + } else { + executorReasonCheckAttemptCounts.put(executorId, reasonCheckCount + 1) + } + } + + def deleteExecutorFromClusterAndDataStructures(executorId: String): Unit = { + deleteExecutorFromDataStructures(executorId).foreach { pod => + kubernetesClient.pods().delete(pod) + } + } + + def deleteExecutorFromDataStructures(executorId: String): Option[Pod] = { + disconnectedPodsByExecutorIdPendingRemoval.remove(executorId) + executorReasonCheckAttemptCounts -= executorId + podsWithKnownExitReasons.remove(executorId) + RUNNING_EXECUTOR_PODS_LOCK.synchronized { + runningExecutorsToPods.remove(executorId).orElse { + logWarning(s"Unable to remove pod for unknown executor $executorId") + None + } + } + } + } + + override def sufficientResourcesRegistered(): Boolean = { + totalRegisteredExecutors.get() >= initialExecutors * minRegisteredRatio + } + + override def start(): Unit = { + super.start() + executorWatchResource.set( + kubernetesClient + .pods() + .withLabel(SPARK_APP_ID_LABEL, applicationId()) + .watch(new ExecutorPodsWatcher())) + + allocatorExecutor.scheduleWithFixedDelay( + allocatorRunnable, 0L, podAllocationInterval, TimeUnit.SECONDS) + + if (!Utils.isDynamicAllocationEnabled(conf)) { + doRequestTotalExecutors(initialExecutors) + } + } + + override def stop(): Unit = { + // stop allocation of new resources and caches. + allocatorExecutor.shutdown() + allocatorExecutor.awaitTermination(30, TimeUnit.SECONDS) + + // send stop message to executors so they shut down cleanly + super.stop() + + try { + val resource = executorWatchResource.getAndSet(null) + if (resource != null) { + resource.close() + } + } catch { + case e: Throwable => logWarning("Failed to close the executor pod watcher", e) + } + + // then delete the executor pods + Utils.tryLogNonFatalError { + deleteExecutorPodsOnStop() + executorPodsByIPs.clear() + } + Utils.tryLogNonFatalError { + logInfo("Closing kubernetes client") + kubernetesClient.close() + } + } + + /** + * @return A map of K8s cluster nodes to the number of tasks that could benefit from data + * locality if an executor launches on the cluster node. + */ + private def getNodesWithLocalTaskCounts() : Map[String, Int] = { + val nodeToLocalTaskCount = synchronized { + mutable.Map[String, Int]() ++ hostToLocalTaskCount + } + + for (pod <- executorPodsByIPs.values().asScala) { + // Remove cluster nodes that are running our executors already. + // TODO: This prefers spreading out executors across nodes. In case users want + // consolidating executors on fewer nodes, introduce a flag. See the spark.deploy.spreadOut + // flag that Spark standalone has: https://spark.apache.org/docs/latest/spark-standalone.html + nodeToLocalTaskCount.remove(pod.getSpec.getNodeName).nonEmpty || + nodeToLocalTaskCount.remove(pod.getStatus.getHostIP).nonEmpty || + nodeToLocalTaskCount.remove( + InetAddress.getByName(pod.getStatus.getHostIP).getCanonicalHostName).nonEmpty + } + nodeToLocalTaskCount.toMap[String, Int] + } + + override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future[Boolean] { + totalExpectedExecutors.set(requestedTotal) + true + } + + override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] { + val podsToDelete = RUNNING_EXECUTOR_PODS_LOCK.synchronized { + executorIds.flatMap { executorId => + runningExecutorsToPods.remove(executorId) match { + case Some(pod) => + disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod) + Some(pod) + + case None => + logWarning(s"Unable to remove pod for unknown executor $executorId") + None + } + } + } + + kubernetesClient.pods().delete(podsToDelete: _*) + true + } + + private def deleteExecutorPodsOnStop(): Unit = { + val executorPodsToDelete = RUNNING_EXECUTOR_PODS_LOCK.synchronized { + val runningExecutorPodsCopy = Seq(runningExecutorsToPods.values.toSeq: _*) + runningExecutorsToPods.clear() + runningExecutorPodsCopy + } + kubernetesClient.pods().delete(executorPodsToDelete: _*) + } + + private class ExecutorPodsWatcher extends Watcher[Pod] { + + private val DEFAULT_CONTAINER_FAILURE_EXIT_STATUS = -1 + + override def eventReceived(action: Action, pod: Pod): Unit = { + val podName = pod.getMetadata.getName + val podIP = pod.getStatus.getPodIP + + action match { + case Action.MODIFIED if (pod.getStatus.getPhase == "Running" + && pod.getMetadata.getDeletionTimestamp == null) => + val clusterNodeName = pod.getSpec.getNodeName + logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.") + executorPodsByIPs.put(podIP, pod) + + case Action.DELETED | Action.ERROR => + val executorId = getExecutorId(pod) + logDebug(s"Executor pod $podName at IP $podIP was at $action.") + if (podIP != null) { + executorPodsByIPs.remove(podIP) + } + + val executorExitReason = if (action == Action.ERROR) { + logWarning(s"Received error event of executor pod $podName. Reason: " + + pod.getStatus.getReason) + executorExitReasonOnError(pod) + } else if (action == Action.DELETED) { + logWarning(s"Received delete event of executor pod $podName. Reason: " + + pod.getStatus.getReason) + executorExitReasonOnDelete(pod) + } else { + throw new IllegalStateException( + s"Unknown action that should only be DELETED or ERROR: $action") + } + podsWithKnownExitReasons.put(pod.getMetadata.getName, executorExitReason) + + if (!disconnectedPodsByExecutorIdPendingRemoval.containsKey(executorId)) { + log.warn(s"Executor with id $executorId was not marked as disconnected, but the " + + s"watch received an event of type $action for this executor. The executor may " + + "have failed to start in the first place and never registered with the driver.") + } + disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod) + + case _ => logDebug(s"Received event of executor pod $podName: " + action) + } + } + + override def onClose(cause: KubernetesClientException): Unit = { + logDebug("Executor pod watch closed.", cause) + } + + private def getExecutorExitStatus(pod: Pod): Int = { + val containerStatuses = pod.getStatus.getContainerStatuses + if (!containerStatuses.isEmpty) { + // we assume the first container represents the pod status. This assumption may not hold + // true in the future. Revisit this if side-car containers start running inside executor + // pods. + getExecutorExitStatus(containerStatuses.get(0)) + } else DEFAULT_CONTAINER_FAILURE_EXIT_STATUS + } + + private def getExecutorExitStatus(containerStatus: ContainerStatus): Int = { + Option(containerStatus.getState).map { containerState => + Option(containerState.getTerminated).map { containerStateTerminated => + containerStateTerminated.getExitCode.intValue() + }.getOrElse(UNKNOWN_EXIT_CODE) + }.getOrElse(UNKNOWN_EXIT_CODE) + } + + private def isPodAlreadyReleased(pod: Pod): Boolean = { + val executorId = pod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL) + RUNNING_EXECUTOR_PODS_LOCK.synchronized { + !runningExecutorsToPods.contains(executorId) + } + } + + private def executorExitReasonOnError(pod: Pod): ExecutorExited = { + val containerExitStatus = getExecutorExitStatus(pod) + // container was probably actively killed by the driver. + if (isPodAlreadyReleased(pod)) { + ExecutorExited(containerExitStatus, exitCausedByApp = false, + s"Container in pod ${pod.getMetadata.getName} exited from explicit termination " + + "request.") + } else { + val containerExitReason = s"Pod ${pod.getMetadata.getName}'s executor container " + + s"exited with exit status code $containerExitStatus." + ExecutorExited(containerExitStatus, exitCausedByApp = true, containerExitReason) + } + } + + private def executorExitReasonOnDelete(pod: Pod): ExecutorExited = { + val exitMessage = if (isPodAlreadyReleased(pod)) { + s"Container in pod ${pod.getMetadata.getName} exited from explicit termination request." + } else { + s"Pod ${pod.getMetadata.getName} deleted or lost." + } + ExecutorExited(getExecutorExitStatus(pod), exitCausedByApp = false, exitMessage) + } + + private def getExecutorId(pod: Pod): String = { + val executorId = pod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL) + require(executorId != null, "Unexpected pod metadata; expected all executor pods " + + s"to have label $SPARK_EXECUTOR_ID_LABEL.") + executorId + } + } + + override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = { + new KubernetesDriverEndpoint(rpcEnv, properties) + } + + private class KubernetesDriverEndpoint( + rpcEnv: RpcEnv, + sparkProperties: Seq[(String, String)]) + extends DriverEndpoint(rpcEnv, sparkProperties) { + + override def onDisconnected(rpcAddress: RpcAddress): Unit = { + addressToExecutorId.get(rpcAddress).foreach { executorId => + if (disableExecutor(executorId)) { + RUNNING_EXECUTOR_PODS_LOCK.synchronized { + runningExecutorsToPods.get(executorId).foreach { pod => + disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod) + } + } + } + } + } + } +} + +private object KubernetesClusterSchedulerBackend { + private val UNKNOWN_EXIT_CODE = -1 +} http://git-wip-us.apache.org/repos/asf/spark/blob/e9b2070a/resource-managers/kubernetes/core/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/resources/log4j.properties b/resource-managers/kubernetes/core/src/test/resources/log4j.properties new file mode 100644 index 0000000..ad95fad --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/resources/log4j.properties @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file target/unit-tests.log +log4j.rootCategory=INFO, file +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=true +log4j.appender.file.file=target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n + +# Ignore messages below warning level from a few verbose libraries. +log4j.logger.com.sun.jersey=WARN +log4j.logger.org.apache.hadoop=WARN +log4j.logger.org.eclipse.jetty=WARN +log4j.logger.org.mortbay=WARN +log4j.logger.org.spark_project.jetty=WARN http://git-wip-us.apache.org/repos/asf/spark/blob/e9b2070a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala new file mode 100644 index 0000000..1c7717c --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{Pod, _} +import org.mockito.MockitoAnnotations +import org.scalatest.{BeforeAndAfter, BeforeAndAfterEach} + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ + +class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with BeforeAndAfterEach { + private val driverPodName: String = "driver-pod" + private val driverPodUid: String = "driver-uid" + private val executorPrefix: String = "base" + private val executorImage: String = "executor-image" + private val driverPod = new PodBuilder() + .withNewMetadata() + .withName(driverPodName) + .withUid(driverPodUid) + .endMetadata() + .withNewSpec() + .withNodeName("some-node") + .endSpec() + .withNewStatus() + .withHostIP("192.168.99.100") + .endStatus() + .build() + private var baseConf: SparkConf = _ + + before { + MockitoAnnotations.initMocks(this) + baseConf = new SparkConf() + .set(KUBERNETES_DRIVER_POD_NAME, driverPodName) + .set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, executorPrefix) + .set(EXECUTOR_DOCKER_IMAGE, executorImage) + } + + test("basic executor pod has reasonable defaults") { + val factory = new ExecutorPodFactoryImpl(baseConf) + val executor = factory.createExecutorPod( + "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + + // The executor pod name and default labels. + assert(executor.getMetadata.getName === s"$executorPrefix-exec-1") + assert(executor.getMetadata.getLabels.size() === 3) + assert(executor.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL) === "1") + + // There is exactly 1 container with no volume mounts and default memory limits. + // Default memory limit is 1024M + 384M (minimum overhead constant). + assert(executor.getSpec.getContainers.size() === 1) + assert(executor.getSpec.getContainers.get(0).getImage === executorImage) + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.isEmpty) + assert(executor.getSpec.getContainers.get(0).getResources.getLimits.size() === 1) + assert(executor.getSpec.getContainers.get(0).getResources + .getLimits.get("memory").getAmount === "1408Mi") + + // The pod has no node selector, volumes. + assert(executor.getSpec.getNodeSelector.isEmpty) + assert(executor.getSpec.getVolumes.isEmpty) + + checkEnv(executor, Map()) + checkOwnerReferences(executor, driverPodUid) + } + + test("executor pod hostnames get truncated to 63 characters") { + val conf = baseConf.clone() + conf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, + "loremipsumdolorsitametvimatelitrefficiendisuscipianturvixlegeresple") + + val factory = new ExecutorPodFactoryImpl(conf) + val executor = factory.createExecutorPod( + "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + + assert(executor.getSpec.getHostname.length === 63) + } + + test("classpath and extra java options get translated into environment variables") { + val conf = baseConf.clone() + conf.set(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS, "foo=bar") + conf.set(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH, "bar=baz") + + val factory = new ExecutorPodFactoryImpl(conf) + val executor = factory.createExecutorPod( + "1", "dummy", "dummy", Seq[(String, String)]("qux" -> "quux"), driverPod, Map[String, Int]()) + + checkEnv(executor, + Map("SPARK_JAVA_OPT_0" -> "foo=bar", + "SPARK_EXECUTOR_EXTRA_CLASSPATH" -> "bar=baz", + "qux" -> "quux")) + checkOwnerReferences(executor, driverPodUid) + } + + // There is always exactly one controller reference, and it points to the driver pod. + private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = { + assert(executor.getMetadata.getOwnerReferences.size() === 1) + assert(executor.getMetadata.getOwnerReferences.get(0).getUid === driverPodUid) + assert(executor.getMetadata.getOwnerReferences.get(0).getController === true) + } + + // Check that the expected environment variables are present. + private def checkEnv(executor: Pod, additionalEnvVars: Map[String, String]): Unit = { + val defaultEnvs = Map( + ENV_EXECUTOR_ID -> "1", + ENV_DRIVER_URL -> "dummy", + ENV_EXECUTOR_CORES -> "1", + ENV_EXECUTOR_MEMORY -> "1g", + ENV_APPLICATION_ID -> "dummy", + ENV_EXECUTOR_POD_IP -> null) ++ additionalEnvVars + + assert(executor.getSpec.getContainers.size() === 1) + assert(executor.getSpec.getContainers.get(0).getEnv.size() === defaultEnvs.size) + val mapEnvs = executor.getSpec.getContainers.get(0).getEnv.asScala.map { + x => (x.getName, x.getValue) + }.toMap + assert(defaultEnvs === mapEnvs) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org