Github user ssuchter commented on a diff in the pull request:
https://github.com/apache/spark/pull/20697#discussion_r194150705
--- Diff:
resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
---
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest
+
+import java.io.File
+import java.nio.file.{Path, Paths}
+import java.util.UUID
+import java.util.regex.Pattern
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.PatternFilenameFilter
+import io.fabric8.kubernetes.api.model.{Container, Pod}
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
+import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
+import org.scalatest.time.{Minutes, Seconds, Span}
+
+import org.apache.spark.SparkFunSuite
+import
org.apache.spark.deploy.k8s.integrationtest.backend.{IntegrationTestBackend,
IntegrationTestBackendFactory}
+import org.apache.spark.deploy.k8s.integrationtest.config._
+
+private[spark] class KubernetesSuite extends SparkFunSuite
+ with BeforeAndAfterAll with BeforeAndAfter {
+
+ import KubernetesSuite._
+
+ private var testBackend: IntegrationTestBackend = _
+ private var sparkHomeDir: Path = _
+ private var kubernetesTestComponents: KubernetesTestComponents = _
+ private var sparkAppConf: SparkAppConf = _
+ private var image: String = _
+ private var containerLocalSparkDistroExamplesJar: String = _
+ private var appLocator: String = _
+ private var driverPodName: String = _
+
+ override def beforeAll(): Unit = {
+ // The scalatest-maven-plugin gives system properties that are
referenced but not set null
+ // values. We need to remove the null-value properties before
initializing the test backend.
+ val nullValueProperties = System.getProperties.asScala
+ .filter(entry => entry._2.equals("null"))
+ .map(entry => entry._1.toString)
+ nullValueProperties.foreach { key =>
+ System.clearProperty(key)
+ }
+
+ val sparkDirProp =
System.getProperty("spark.kubernetes.test.unpackSparkDir")
+ require(sparkDirProp != null, "Spark home directory must be provided
in system properties.")
+ sparkHomeDir = Paths.get(sparkDirProp)
+ require(sparkHomeDir.toFile.isDirectory,
+ s"No directory found for spark home specified at $sparkHomeDir.")
+ val imageTag = getTestImageTag
+ val imageRepo = getTestImageRepo
+ image = s"$imageRepo/spark:$imageTag"
+
+ val sparkDistroExamplesJarFile: File =
sparkHomeDir.resolve(Paths.get("examples", "jars"))
+ .toFile
+ .listFiles(new
PatternFilenameFilter(Pattern.compile("^spark-examples_.*\\.jar$")))(0)
+ containerLocalSparkDistroExamplesJar =
s"local:///opt/spark/examples/jars/" +
+ s"${sparkDistroExamplesJarFile.getName}"
+ testBackend = IntegrationTestBackendFactory.getTestBackend
+ testBackend.initialize()
+ kubernetesTestComponents = new
KubernetesTestComponents(testBackend.getKubernetesClient)
+ }
+
+ override def afterAll(): Unit = {
+ testBackend.cleanUp()
+ }
+
+ before {
+ appLocator = UUID.randomUUID().toString.replaceAll("-", "")
+ driverPodName = "spark-test-app-" +
UUID.randomUUID().toString.replaceAll("-", "")
+ sparkAppConf = kubernetesTestComponents.newSparkAppConf()
+ .set("spark.kubernetes.container.image", image)
+ .set("spark.kubernetes.driver.pod.name", driverPodName)
+ .set("spark.kubernetes.driver.label.spark-app-locator", appLocator)
+ .set("spark.kubernetes.executor.label.spark-app-locator", appLocator)
+ if (!kubernetesTestComponents.hasUserSpecifiedNamespace) {
+ kubernetesTestComponents.createNamespace()
+ }
+ }
+
+ after {
+ if (!kubernetesTestComponents.hasUserSpecifiedNamespace) {
+ kubernetesTestComponents.deleteNamespace()
+ }
+ deleteDriverPod()
+ }
+
+ test("Run SparkPi with no resources") {
+ runSparkPiAndVerifyCompletion()
+ }
+
+ test("Run SparkPi with a very long application name.") {
+ sparkAppConf.set("spark.app.name", "long" * 40)
+ runSparkPiAndVerifyCompletion()
+ }
+
+ test("Run SparkPi with a master URL without a scheme.") {
+ val url = kubernetesTestComponents.kubernetesClient.getMasterUrl
+ val k8sMasterUrl = if (url.getPort < 0) {
+ s"k8s://${url.getHost}"
+ } else {
+ s"k8s://${url.getHost}:${url.getPort}"
+ }
+ sparkAppConf.set("spark.master", k8sMasterUrl)
+ runSparkPiAndVerifyCompletion()
+ }
+
+ test("Run SparkPi with an argument.") {
+ runSparkPiAndVerifyCompletion(appArgs = Array("5"))
+ }
+
+ test("Run SparkPi with custom labels, annotations, and environment
variables.") {
+ sparkAppConf
+ .set("spark.kubernetes.driver.label.label1", "label1-value")
+ .set("spark.kubernetes.driver.label.label2", "label2-value")
+ .set("spark.kubernetes.driver.annotation.annotation1",
"annotation1-value")
+ .set("spark.kubernetes.driver.annotation.annotation2",
"annotation2-value")
+ .set("spark.kubernetes.driverEnv.ENV1", "VALUE1")
+ .set("spark.kubernetes.driverEnv.ENV2", "VALUE2")
+ .set("spark.kubernetes.executor.label.label1", "label1-value")
+ .set("spark.kubernetes.executor.label.label2", "label2-value")
+ .set("spark.kubernetes.executor.annotation.annotation1",
"annotation1-value")
+ .set("spark.kubernetes.executor.annotation.annotation2",
"annotation2-value")
+ .set("spark.executorEnv.ENV1", "VALUE1")
+ .set("spark.executorEnv.ENV2", "VALUE2")
+
+ runSparkPiAndVerifyCompletion(
+ driverPodChecker = (driverPod: Pod) => {
+ doBasicDriverPodCheck(driverPod)
+ checkCustomSettings(driverPod)
+ },
+ executorPodChecker = (executorPod: Pod) => {
+ doBasicExecutorPodCheck(executorPod)
+ checkCustomSettings(executorPod)
+ })
+ }
+
+ // TODO(ssuchter): Enable the below after debugging
+ // test("Run PageRank using remote data file") {
+ // sparkAppConf
+ // .set("spark.kubernetes.mountDependencies.filesDownloadDir",
+ // CONTAINER_LOCAL_FILE_DOWNLOAD_PATH)
+ // .set("spark.files", REMOTE_PAGE_RANK_DATA_FILE)
+ // runSparkPageRankAndVerifyCompletion(
+ // appArgs = Array(CONTAINER_LOCAL_DOWNLOADED_PAGE_RANK_DATA_FILE))
+ // }
+
+ private def runSparkPiAndVerifyCompletion(
+ appResource: String = containerLocalSparkDistroExamplesJar,
+ driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
+ executorPodChecker: Pod => Unit = doBasicExecutorPodCheck,
+ appArgs: Array[String] = Array.empty[String],
+ appLocator: String = appLocator): Unit = {
+ runSparkApplicationAndVerifyCompletion(
+ appResource,
+ SPARK_PI_MAIN_CLASS,
+ Seq("Pi is roughly 3"),
+ appArgs,
+ driverPodChecker,
+ executorPodChecker,
+ appLocator)
+ }
+
+ private def runSparkPageRankAndVerifyCompletion(
+ appResource: String = containerLocalSparkDistroExamplesJar,
+ driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
+ executorPodChecker: Pod => Unit = doBasicExecutorPodCheck,
+ appArgs: Array[String],
+ appLocator: String = appLocator): Unit = {
+ runSparkApplicationAndVerifyCompletion(
+ appResource,
+ SPARK_PAGE_RANK_MAIN_CLASS,
+ Seq("1 has rank", "2 has rank", "3 has rank", "4 has rank"),
+ appArgs,
+ driverPodChecker,
+ executorPodChecker,
+ appLocator)
+ }
+
+ private def runSparkApplicationAndVerifyCompletion(
+ appResource: String,
+ mainClass: String,
+ expectedLogOnCompletion: Seq[String],
+ appArgs: Array[String],
+ driverPodChecker: Pod => Unit,
+ executorPodChecker: Pod => Unit,
+ appLocator: String): Unit = {
+ val appArguments = SparkAppArguments(
+ mainAppResource = appResource,
+ mainClass = mainClass,
+ appArgs = appArgs)
+ SparkAppLauncher.launch(appArguments, sparkAppConf,
TIMEOUT.value.toSeconds.toInt, sparkHomeDir)
+
+ val driverPod = kubernetesTestComponents.kubernetesClient
+ .pods()
+ .withLabel("spark-app-locator", appLocator)
+ .withLabel("spark-role", "driver")
+ .list()
+ .getItems
+ .get(0)
+ driverPodChecker(driverPod)
+
+ val executorPods = kubernetesTestComponents.kubernetesClient
+ .pods()
+ .withLabel("spark-app-locator", appLocator)
+ .withLabel("spark-role", "executor")
+ .list()
+ .getItems
+ executorPods.asScala.foreach { pod =>
+ executorPodChecker(pod)
+ }
+
+ Eventually.eventually(TIMEOUT, INTERVAL) {
+ expectedLogOnCompletion.foreach { e =>
+ assert(kubernetesTestComponents.kubernetesClient
+ .pods()
+ .withName(driverPod.getMetadata.getName)
+ .getLog
+ .contains(e), "The application did not complete.")
+ }
+ }
+ }
+
+ private def doBasicDriverPodCheck(driverPod: Pod): Unit = {
+ assert(driverPod.getMetadata.getName === driverPodName)
+ assert(driverPod.getSpec.getContainers.get(0).getImage === image)
+ assert(driverPod.getSpec.getContainers.get(0).getName ===
"spark-kubernetes-driver")
+ }
+
+ private def doBasicExecutorPodCheck(executorPod: Pod): Unit = {
+ assert(executorPod.getSpec.getContainers.get(0).getImage === image)
+ assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
+ }
+
+ private def checkCustomSettings(pod: Pod): Unit = {
+ assert(pod.getMetadata.getLabels.get("label1") === "label1-value")
+ assert(pod.getMetadata.getLabels.get("label2") === "label2-value")
+ assert(pod.getMetadata.getAnnotations.get("annotation1") ===
"annotation1-value")
+ assert(pod.getMetadata.getAnnotations.get("annotation2") ===
"annotation2-value")
+
+ val container = pod.getSpec.getContainers.get(0)
+ val envVars = container
+ .getEnv
+ .asScala
+ .map { env =>
+ (env.getName, env.getValue)
+ }
+ .toMap
+ assert(envVars("ENV1") === "VALUE1")
+ assert(envVars("ENV2") === "VALUE2")
+ }
+
+ private def deleteDriverPod(): Unit = {
+
kubernetesTestComponents.kubernetesClient.pods().withName(driverPodName).delete()
+ Eventually.eventually(TIMEOUT, INTERVAL) {
+ assert(kubernetesTestComponents.kubernetesClient
+ .pods()
+ .withName(driverPodName)
+ .get() == null)
+ }
+ }
+}
+
+private[spark] object KubernetesSuite {
+
+ val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes))
+ val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds))
+ val SPARK_PI_MAIN_CLASS: String = "org.apache.spark.examples.SparkPi"
+ val SPARK_PAGE_RANK_MAIN_CLASS: String =
"org.apache.spark.examples.SparkPageRank"
+
+ // val CONTAINER_LOCAL_FILE_DOWNLOAD_PATH = "/var/spark-data/spark-files"
+
+ // val REMOTE_PAGE_RANK_DATA_FILE =
+ //
"https://storage.googleapis.com/spark-k8s-integration-tests/files/pagerank_data.txt"
+ // val CONTAINER_LOCAL_DOWNLOADED_PAGE_RANK_DATA_FILE =
+ // s"$CONTAINER_LOCAL_FILE_DOWNLOAD_PATH/pagerank_data.txt"
+
+ case object ShuffleNotReadyException extends Exception
--- End diff --
I commented this out because it's currently unused but will be used soon.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]