pan3793 commented on code in PR #5868:
URL: https://github.com/apache/kyuubi/pull/5868#discussion_r1437114728


##########
kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/EngineYarnModeSubmitter.scala:
##########
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.engine.deploy.yarn
+
+import java.io._
+import java.nio.charset.StandardCharsets
+import java.nio.file.Files
+import java.util
+import java.util.{Locale, Properties}
+import java.util.zip.{ZipEntry, ZipOutputStream}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.ApplicationConstants
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication}
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
+import org.apache.hadoop.yarn.util.Records
+
+import org.apache.kyuubi.{KyuubiException, Logging, Utils}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.engine.deploy.yarn.EngineYarnModeSubmitter._
+import org.apache.kyuubi.util.KyuubiHadoopUtils
+
+abstract class EngineYarnModeSubmitter extends Logging {
+
+  val KYUUBI_ENGINE_STAGING: String = ".kyuubiEngineStaging"
+
+  /*
+   * The following variables are used to describe the contents of the
+   * ApplicationMaster's working directory. The directory structure is as 
follows:
+   *
+   * ApplicationMasterWorkDir/
+   * |-- __kyuubi_engine_conf__
+   * |   |-- __hadoop_conf__
+   * |   |   |-- hadoop conf file1
+   * |   |   |-- hadoop conf file2
+   * |   |   `-- ...
+   * |   `-- __kyuubi_conf__.properties
+   * `-- __kyuubi_engine_libs__
+   *     |-- kyuubi_engine.jar
+   *     `-- ...
+   */
+  val LOCALIZED_LIB_DIR = "__kyuubi_engine_libs__"
+  val LOCALIZED_CONF_DIR = "__kyuubi_engine_conf__"
+  val HADOOP_CONF_DIR = "__hadoop_conf__"
+  val KYUUBI_CONF_FILE = "__kyuubi_conf__.properties"
+
+  val STAGING_DIR_PERMISSION: FsPermission =
+    FsPermission.createImmutable(Integer.parseInt("700", 8).toShort)
+
+  private val applicationMaster = 
ApplicationMaster.getClass.getName.dropRight(1)
+
+  @volatile private var yarnClient: YarnClient = _
+  private var appId: ApplicationId = _
+
+  private[engine] var stagingDirPath: Path = _
+
+  val kyuubiConf = new KyuubiConf()
+
+  var yarnConf: Configuration = _
+  var hadoopConf: Configuration = _
+
+  def engineMainClass(): String
+
+  var engineType: String
+
+  protected def submitApplication(): Unit = {
+    assert(
+      hadoopConf != null && yarnConf != null,
+      "Hadoop Configuration is not initialized. " +
+        "Please initialize it before submitting application.")
+    try {
+      yarnClient = YarnClient.createYarnClient()
+      yarnClient.init(yarnConf)
+      yarnClient.start()
+
+      debug("Requesting a new application from cluster with %d NodeManagers"
+        .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))
+
+      val newApp = yarnClient.createApplication()
+      val newAppResponse = newApp.getNewApplicationResponse
+      appId = newAppResponse.getApplicationId
+
+      // The app staging dir based on the STAGING_DIR configuration if 
configured
+      // otherwise based on the users home directory.
+      val appStagingBaseDir = 
kyuubiConf.get(ENGINE_DEPLOY_YARN_MODE_STAGING_DIR)
+        .map { new Path(_, 
UserGroupInformation.getCurrentUser.getShortUserName) }
+        .getOrElse(FileSystem.get(hadoopConf).getHomeDirectory())
+      stagingDirPath = new Path(appStagingBaseDir, 
buildPath(KYUUBI_ENGINE_STAGING, appId.toString))
+
+      // Set up the appropriate contexts to launch AM
+      val containerContext = createContainerLaunchContext()
+      val appContext = createApplicationSubmissionContext(newApp, 
containerContext)
+
+      // Finally, submit and monitor the application
+      info(s"Submitting application $appId to ResourceManager")
+      yarnClient.submitApplication(appContext)
+      monitorApplication(appId)
+    } catch {
+      case e: Throwable =>
+        if (stagingDirPath != null) {
+          cleanupStagingDir()
+        }
+        throw new KyuubiException("Failed to submit application to YARN", e)
+    } finally {
+      if (yarnClient != null) {
+        yarnClient.stop()
+      }
+    }
+  }
+
+  private def createContainerLaunchContext(): ContainerLaunchContext = {
+    info("Setting up container launch context for engine AM")
+    val env = setupLaunchEnv(kyuubiConf)
+    val localResources = prepareLocalResources(stagingDirPath, env)
+
+    val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
+    amContainer.setLocalResources(localResources.asJava)
+    amContainer.setEnvironment(env.asJava)
+
+    val javaOpts = ListBuffer[String]()
+
+    val javaOptions = kyuubiConf.get(ENGINE_DEPLOY_YARN_MODE_JAVA_OPTIONS)
+    if (javaOptions.isDefined) {
+      javaOpts += javaOptions.get
+    }
+
+    val am = Seq(applicationMaster)
+
+    val engineClass = Seq("--class", engineMainClass())
+
+    val kyuubiConfProperties = Seq(
+      "--properties-file",
+      buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, KYUUBI_CONF_FILE))
+
+    val commands =
+      Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
+        javaOpts ++ am ++ engineClass ++ kyuubiConfProperties ++
+        Seq(
+          "1>",
+          ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
+          "2>",
+          ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
+
+    val printableCommands = commands.map(s => if (s == null) "null" else 
s).toList
+    amContainer.setCommands(printableCommands.asJava)
+    info(s"Commands: ${printableCommands.mkString(" ")}")
+
+    amContainer
+  }
+
+  private def prepareLocalResources(
+      destDir: Path,
+      env: mutable.HashMap[String, String]): mutable.HashMap[String, 
LocalResource] = {
+    info("Preparing resources for engine AM container")
+    // Upload kyuubi engine and the extra JAR to the remote file system if 
necessary,
+    // and add them as local resources to the application master.
+    val fs = destDir.getFileSystem(hadoopConf)
+
+    val localResources = mutable.HashMap[String, LocalResource]()
+    FileSystem.mkdirs(fs, destDir, new FsPermission(STAGING_DIR_PERMISSION))
+
+    distributeJars(localResources, env)
+    distributeConf(localResources)
+    localResources
+  }
+
+  private def distributeJars(
+      localResources: mutable.HashMap[String, LocalResource],
+      env: mutable.HashMap[String, String]): Unit = {
+    val jarsArchive = File.createTempFile(LOCALIZED_LIB_DIR, ".zip", 
Utils.createTempDir().toFile)
+    val jarsStream = new ZipOutputStream(new FileOutputStream(jarsArchive))
+    try {
+      jarsStream.setLevel(0)
+      val jars = kyuubiConf.getOption(KYUUBI_ENGINE_DEPLOY_YARN_MODE_JARS_KEY)
+      assert(jars.isDefined, "No jars specified for engine AM")
+      val putedEntry = new ListBuffer[String]
+      jars.get.split(KYUUBI_ENGINE_DEPLOY_YARN_MODE_ARCHIVE_SEPARATOR).foreach 
{ path =>
+        val jars = Utils.listFilesRecursive(new File(path))
+        jars.foreach { f =>
+          if (!putedEntry.contains(f.getName) && f.isFile && 
f.getName.toLowerCase(
+              Locale.ROOT).endsWith(".jar") && f.canRead) {
+            jarsStream.putNextEntry(new ZipEntry(f.getName))
+            Files.copy(f.toPath, jarsStream)
+            jarsStream.closeEntry()
+            putedEntry += f.getName
+            addClasspathEntry(buildPath(Environment.PWD.$$(), 
LOCALIZED_LIB_DIR, f.getName), env)
+          }
+        }
+      }
+      putedEntry.clear()
+    } finally {
+      jarsStream.close()
+    }
+
+    distribute(
+      new Path(jarsArchive.getAbsolutePath),
+      resType = LocalResourceType.ARCHIVE,
+      destName = LOCALIZED_LIB_DIR,
+      localResources)
+  }
+
+  private def distributeConf(localResources: mutable.HashMap[String, 
LocalResource]): Unit = {
+    val confArchive = File.createTempFile(LOCALIZED_CONF_DIR, ".zip", 
Utils.createTempDir().toFile)
+    val confStream = new ZipOutputStream(new FileOutputStream(confArchive))
+    try {
+      confStream.setLevel(0)
+      val confs = 
kyuubiConf.getOption(KYUUBI_ENGINE_DEPLOY_YARN_MODE_HADOOP_CONF_KEY)
+      assert(confs.isDefined, "No conf specified for engine AM")
+      listDistinctFiles(confs.get).foreach { f =>
+        if (f.isFile && f.canRead) {
+          confStream.putNextEntry(new 
ZipEntry(s"$HADOOP_CONF_DIR/${f.getName}"))
+          Files.copy(f.toPath, confStream)
+          confStream.closeEntry()
+        }
+      }
+
+      val properties = confToProperties(kyuubiConf)
+      writePropertiesToArchive(properties, KYUUBI_CONF_FILE, confStream)
+    } finally {
+      confStream.close()
+    }
+
+    distribute(
+      new Path(confArchive.getAbsolutePath),
+      resType = LocalResourceType.ARCHIVE,
+      destName = LOCALIZED_CONF_DIR,
+      localResources)
+  }
+
+  def listDistinctFiles(archive: String): Seq[File] = {
+    val distinctFiles = new mutable.LinkedHashSet[File]
+    archive.split(KYUUBI_ENGINE_DEPLOY_YARN_MODE_ARCHIVE_SEPARATOR).foreach {
+      path =>
+        val file = new File(path)
+        val files = Utils.listFilesRecursive(file)
+        files.foreach { f =>
+          if (f.isFile && f.canRead) {
+            distinctFiles += f
+          }
+        }
+    }
+    distinctFiles.groupBy(_.getName).map {
+      case (_, items) => items.head
+    }.toSeq
+  }
+
+  private def distribute(
+      srcPath: Path,
+      resType: LocalResourceType,
+      destName: String,
+      localResources: mutable.HashMap[String, LocalResource]): Unit = {
+    val fs = stagingDirPath.getFileSystem(hadoopConf)
+    val destPath = new Path(stagingDirPath, srcPath.getName)
+    info(s"Copying $srcPath to $destPath")
+    fs.copyFromLocalFile(srcPath, destPath)
+    fs.setPermission(destPath, new FsPermission(STAGING_DIR_PERMISSION))
+
+    val destFs = FileSystem.get(destPath.toUri, hadoopConf)
+    val destStatus = destFs.getFileStatus(destPath)
+
+    val destResource = Records.newRecord(classOf[LocalResource])
+    destResource.setType(resType)
+    destResource.setVisibility(LocalResourceVisibility.APPLICATION)
+    destResource.setResource(URL.fromPath(destPath))
+    destResource.setTimestamp(destStatus.getModificationTime)
+    destResource.setSize(destStatus.getLen)
+    localResources(destName) = destResource
+  }
+
+  private[kyuubi] def setupLaunchEnv(kyuubiConf: KyuubiConf): 
mutable.HashMap[String, String] = {
+    info("Setting up the launch environment for engine AM container")
+    val env = new mutable.HashMap[String, String]()
+
+    kyuubiConf.getAll
+      .filter { case (k, _) => 
k.startsWith(KyuubiConf.KYUUBI_ENGINE_YARN_MODE_ENV_PREFIX) }
+      .map { case (k, v) =>
+        (k.substring(KyuubiConf.KYUUBI_ENGINE_YARN_MODE_ENV_PREFIX.length + 
1), v)
+      }
+      .foreach { case (k, v) => KyuubiHadoopUtils.addPathToEnvironment(env, k, 
v) }
+
+    addClasspathEntry(buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR), env)
+    env.put(
+      Environment.HADOOP_CONF_DIR.name(),
+      buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, HADOOP_CONF_DIR))
+    addClasspathEntry(buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, 
HADOOP_CONF_DIR), env)
+    env.put("KYUUBI_ENGINE_YARN_MODE_STAGING_DIR", stagingDirPath.toString)
+    env
+  }
+
+  private def createApplicationSubmissionContext(
+      newApp: YarnClientApplication,
+      containerContext: ContainerLaunchContext): ApplicationSubmissionContext 
= {
+
+    val appContext = newApp.getApplicationSubmissionContext
+    appContext.setApplicationName(s"Apache Kyuubi $engineType Engine")
+    appContext.setQueue(kyuubiConf.get(ENGINE_DEPLOY_YARN_MODE_QUEUE))
+    appContext.setAMContainerSpec(containerContext)
+
+    val allTags = new util.HashSet[String]

Review Comment:
   we should add kyuubi system tags



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to