Github user foxish commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19717#discussion_r154955367
  
    --- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
    @@ -0,0 +1,234 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.deploy.k8s.submit
    +
    +import java.util.{Collections, UUID}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +
    +import io.fabric8.kubernetes.api.model._
    +import io.fabric8.kubernetes.client.KubernetesClient
    +
    +import org.apache.spark.SparkConf
    +import org.apache.spark.deploy.SparkApplication
    +import org.apache.spark.deploy.k8s.Config._
    +import org.apache.spark.deploy.k8s.Constants._
    +import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory
    +import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.util.Utils
    +
    +/**
    + * Encapsulates arguments to the submission client.
    + *
    + * @param mainAppResource the main application resource if any
    + * @param mainClass the main class of the application to run
    + * @param driverArgs arguments to the driver
    + */
    +private[spark] case class ClientArguments(
    +     mainAppResource: Option[MainAppResource],
    +     mainClass: String,
    +     driverArgs: Array[String])
    +
    +private[spark] object ClientArguments {
    +
    +  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
    +    var mainAppResource: Option[MainAppResource] = None
    +    var mainClass: Option[String] = None
    +    val driverArgs = mutable.ArrayBuffer.empty[String]
    +
    +    args.sliding(2, 2).toList.foreach {
    +      case Array("--primary-java-resource", primaryJavaResource: String) =>
    +        mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
    +      case Array("--main-class", clazz: String) =>
    +        mainClass = Some(clazz)
    +      case Array("--arg", arg: String) =>
    +        driverArgs += arg
    +      case other =>
    +        val invalid = other.mkString(" ")
    +        throw new RuntimeException(s"Unknown arguments: $invalid")
    +    }
    +
    +    require(mainClass.isDefined, "Main class must be specified via 
--main-class")
    +
    +    ClientArguments(
    +      mainAppResource,
    +      mainClass.get,
    +      driverArgs.toArray)
    +  }
    +}
    +
    +/**
    + * Submits a Spark application to run on Kubernetes by creating the driver 
pod and starting a
    + * watcher that monitors and logs the application status. Waits for the 
application to terminate if
    + * spark.kubernetes.submission.waitAppCompletion is true.
    + *
    + * @param submissionSteps steps that collectively configure the driver
    + * @param submissionSparkConf the submission client Spark configuration
    + * @param kubernetesClient the client to talk to the Kubernetes API server
    + * @param waitForAppCompletion a flag indicating whether the client should 
wait for the application
    + *                             to complete
    + * @param appName the application name
    + * @param loggingPodStatusWatcher a watcher that monitors and logs the 
application status
    + */
    +private[spark] class Client(
    +    submissionSteps: Seq[DriverConfigurationStep],
    +    submissionSparkConf: SparkConf,
    +    kubernetesClient: KubernetesClient,
    +    waitForAppCompletion: Boolean,
    +    appName: String,
    +    loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging {
    +
    +  private val driverJavaOptions = submissionSparkConf.get(
    +    org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
    +
    +   /**
    +    * Run command that initializes a DriverSpec that will be updated after 
each
    +    * DriverConfigurationStep in the sequence that is passed in. The final 
KubernetesDriverSpec
    +    * will be used to build the Driver Container, Driver Pod, and 
Kubernetes Resources
    +    */
    +  def run(): Unit = {
    +    var currentDriverSpec = 
KubernetesDriverSpec.initialSpec(submissionSparkConf)
    +    // submissionSteps contain steps necessary to take, to resolve varying
    +    // client arguments that are passed in, created by orchestrator
    +    for (nextStep <- submissionSteps) {
    +      currentDriverSpec = nextStep.configureDriver(currentDriverSpec)
    +    }
    +
    +    val resolvedDriverJavaOpts = currentDriverSpec
    +      .driverSparkConf
    +      // Remove this as the options are instead extracted and set 
individually below using
    +      // environment variables with prefix SPARK_JAVA_OPT_.
    +      .remove(org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
    +      .getAll
    +      .map {
    +        case (confKey, confValue) => s"-D$confKey=$confValue"
    +      } ++ 
driverJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty)
    +    val driverJavaOptsEnvs: Seq[EnvVar] = 
resolvedDriverJavaOpts.zipWithIndex.map {
    +      case (option, index) =>
    +        new EnvVarBuilder()
    +          .withName(s"$ENV_JAVA_OPT_PREFIX$index")
    +          .withValue(option)
    +          .build()
    +    }
    +
    +    val resolvedDriverContainer = new 
ContainerBuilder(currentDriverSpec.driverContainer)
    +      .addAllToEnv(driverJavaOptsEnvs.asJava)
    +      .build()
    +    val resolvedDriverPod = new PodBuilder(currentDriverSpec.driverPod)
    +      .editSpec()
    +        .addToContainers(resolvedDriverContainer)
    +        .endSpec()
    +      .build()
    +
    +    Utils.tryWithResource(
    +        kubernetesClient
    +          .pods()
    +          .withName(resolvedDriverPod.getMetadata.getName)
    +          .watch(loggingPodStatusWatcher)) { _ =>
    +      val createdDriverPod = 
kubernetesClient.pods().create(resolvedDriverPod)
    +      try {
    +        if (currentDriverSpec.otherKubernetesResources.nonEmpty) {
    +          val otherKubernetesResources = 
currentDriverSpec.otherKubernetesResources
    +          addDriverOwnerReference(createdDriverPod, 
otherKubernetesResources)
    +          kubernetesClient.resourceList(otherKubernetesResources: 
_*).createOrReplace()
    +        }
    +      } catch {
    +        case e: Throwable =>
    +          kubernetesClient.pods().delete(createdDriverPod)
    +          throw e
    +      }
    +
    +      if (waitForAppCompletion) {
    +        logInfo(s"Waiting for application $appName to finish...")
    +        loggingPodStatusWatcher.awaitCompletion()
    +        logInfo(s"Application $appName finished.")
    +      } else {
    +        logInfo(s"Deployed Spark application $appName into Kubernetes.")
    +      }
    +    }
    +  }
    +
    +  // Add a OwnerReference to the given resources making the driver pod an 
owner of them so when
    +  // the driver pod is deleted, the resources are garbage collected.
    +  private def addDriverOwnerReference(driverPod: Pod, resources: 
Seq[HasMetadata]): Unit = {
    +    val driverPodOwnerReference = new OwnerReferenceBuilder()
    +      .withName(driverPod.getMetadata.getName)
    +      .withApiVersion(driverPod.getApiVersion)
    +      .withUid(driverPod.getMetadata.getUid)
    +      .withKind(driverPod.getKind)
    +      .withController(true)
    +      .build()
    +    resources.foreach { resource =>
    +      val originalMetadata = resource.getMetadata
    +      
originalMetadata.setOwnerReferences(Collections.singletonList(driverPodOwnerReference))
    +    }
    +  }
    +}
    +
    +/**
    + * Main class and entry point of application submission in KUBERNETES mode.
    + */
    +private[spark] object Client extends SparkApplication {
    +
    +  override def start(args: Array[String], conf: SparkConf): Unit = {
    +    val parsedArguments = ClientArguments.fromCommandLineArgs(args)
    +    run(parsedArguments, conf)
    +  }
    +
    +  private def run(clientArguments: ClientArguments, sparkConf: SparkConf): 
Unit = {
    +    val namespace = sparkConf.get(KUBERNETES_NAMESPACE)
    +    val kubernetesAppId = 
s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}"
    --- End diff --
    
    > Does K8s have a convention for app id?
    
    K8s does not. It does have the notion of UUIDs for individual objects, but 
those are associated with each pod, and not common to an entire application. 
For representation, we will be introducing a 
[CRD](https://kubernetes.io/docs/concepts/api-extension/custom-resources/) in 
the future. That will be how we track individual spark apps in future (post 
2.3). That CRD should be able to accommodate all the details, such as 
timestamp, and status - so, keeping the appID (and driver pod names) collision 
free using UUIDs made sense.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to