yaooqinn commented on a change in pull request #25609: [SPARK-28896][K8S]
Download hadoop configurations from k8s configmap if the client process has
files to upload
URL: https://github.com/apache/spark/pull/25609#discussion_r320545643
##########
File path:
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
##########
@@ -267,30 +271,56 @@ private[spark] object KubernetesUtils extends Logging {
}
}
- def uploadFileUri(uri: String, conf: Option[SparkConf] = None): String = {
- conf match {
- case Some(sConf) =>
- if (sConf.get(KUBERNETES_FILE_UPLOAD_PATH).isDefined) {
- val fileUri = Utils.resolveURI(uri)
- try {
- val hadoopConf = SparkHadoopUtil.get.newConfiguration(sConf)
- val uploadPath = sConf.get(KUBERNETES_FILE_UPLOAD_PATH).get
- val fs = getHadoopFileSystem(Utils.resolveURI(uploadPath),
hadoopConf)
- val randomDirName = s"spark-upload-${UUID.randomUUID()}"
- fs.mkdirs(new Path(s"${uploadPath}/${randomDirName}"))
- val targetUri =
s"${uploadPath}/${randomDirName}/${fileUri.getPath.split("/").last}"
- log.info(s"Uploading file: ${fileUri.getPath} to dest:
$targetUri...")
- uploadFileToHadoopCompatibleFS(new Path(fileUri.getPath), new
Path(targetUri), fs)
- targetUri
- } catch {
- case e: Exception =>
- throw new SparkException(s"Uploading file ${fileUri.getPath}
failed...", e)
+ def getUploadPath(conf: SparkConf, client: KubernetesClient): (FileSystem,
String) = {
+ conf.get(KUBERNETES_FILE_UPLOAD_PATH) match {
+ case Some(path) =>
+ val hadoopConf = new Configuration()
+ // When spark.kubernetes.file.upload.path is set, we need a cluster
specific hadoop config,
+ // and if we use spark.kubernetes.hadoop.configMapName to configure
not HADOOP_CONF_DIR, we
+ // should download the configmap to our client side.
+ // 1. add configurations from k8s configmap to hadoopConf
+ conf.get(KUBERNETES_HADOOP_CONF_CONFIG_MAP).foreach { cm =>
+ val hadoopConfFiles =
client.configMaps().withName(cm).get().getData.asScala
Review comment:
1. `spark.kubernetes.hadoop.configMapName` has no `driver` word in its name,
so I guess it is ok to be used all over the spark application including the
client process.
2. Using `spark.kubernetes.hadoop.configMapName` and `HADOOP_CONF_DIR` is an
**either-or** thing, [check
here](https://github.com/apache/spark/blob/5cf2602ccbcada92f11ac715872061f8307d9d70/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStep.scala#L41).
It is more like that they both contain the same thing and are used by client
to define the driver pod, so they should be equal. For now, the only difference
between them is that the `HADOOP_CONF_DIR` is in the classpath of client
process and the other not. With `spark.kubernetes.hadoop.configMapName`, if our
application has no extra file (--jars/--files) to upload, it works. But once it
does need one or more, it fails. I guess this behavior for
`spark.kubernetes.hadoop.configMapName` is kind of unacceptable.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]