yaooqinn commented on a change in pull request #25609: [SPARK-28896][K8S] 
Download hadoop configurations from k8s configmap if the client process has 
files to upload
URL: https://github.com/apache/spark/pull/25609#discussion_r320547242
 
 

 ##########
 File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
 ##########
 @@ -267,30 +271,56 @@ private[spark] object KubernetesUtils extends Logging {
    }
   }
 
-  def uploadFileUri(uri: String, conf: Option[SparkConf] = None): String = {
-    conf match {
-      case Some(sConf) =>
-        if (sConf.get(KUBERNETES_FILE_UPLOAD_PATH).isDefined) {
-          val fileUri = Utils.resolveURI(uri)
-          try {
-            val hadoopConf = SparkHadoopUtil.get.newConfiguration(sConf)
-            val uploadPath = sConf.get(KUBERNETES_FILE_UPLOAD_PATH).get
-            val fs = getHadoopFileSystem(Utils.resolveURI(uploadPath), 
hadoopConf)
-            val randomDirName = s"spark-upload-${UUID.randomUUID()}"
-            fs.mkdirs(new Path(s"${uploadPath}/${randomDirName}"))
-            val targetUri = 
s"${uploadPath}/${randomDirName}/${fileUri.getPath.split("/").last}"
-            log.info(s"Uploading file: ${fileUri.getPath} to dest: 
$targetUri...")
-            uploadFileToHadoopCompatibleFS(new Path(fileUri.getPath), new 
Path(targetUri), fs)
-            targetUri
-          } catch {
-            case e: Exception =>
-              throw new SparkException(s"Uploading file ${fileUri.getPath} 
failed...", e)
+  def getUploadPath(conf: SparkConf, client: KubernetesClient): (FileSystem, 
String) = {
+    conf.get(KUBERNETES_FILE_UPLOAD_PATH) match {
+      case Some(path) =>
+        val hadoopConf = new Configuration()
+        // When spark.kubernetes.file.upload.path is set, we need a cluster 
specific hadoop config,
+        // and if we use spark.kubernetes.hadoop.configMapName to configure 
not HADOOP_CONF_DIR, we
+        // should download the configmap to our client side.
+        // 1. add configurations from k8s configmap to hadoopConf
+        conf.get(KUBERNETES_HADOOP_CONF_CONFIG_MAP).foreach { cm =>
+          val hadoopConfFiles = 
client.configMaps().withName(cm).get().getData.asScala
+          for ((name, content) <- hadoopConfFiles if name.endsWith(".xml")) {
+            hadoopConf.addResource(new ByteArrayInputStream(content.getBytes), 
name)
           }
-        } else {
-          throw new SparkException("Please specify " +
-            "spark.kubernetes.file.upload.path property.")
         }
-      case _ => throw new SparkException("Spark configuration is missing...")
+        // 2. add configurations from arguments or spark properties file to 
hadoopConf
+        SparkHadoopUtil.appendS3AndSparkHadoopConfigurations(conf, hadoopConf)
+        // 3. set or rest user group information
+        UserGroupInformation.setConfiguration(hadoopConf)
 
 Review comment:
   Yes, I agree. But for now, the spark submit is quite yarn-specific. For 
security stuff contains in configMap of k8s, it's not working.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to