dongjoon-hyun commented on a change in pull request #30472:
URL: https://github.com/apache/spark/pull/30472#discussion_r550365393



##########
File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientUtils.scala
##########
@@ -90,29 +92,67 @@ private[spark] object KubernetesClientUtils extends Logging 
{
       .build()
   }
 
-  private def loadSparkConfDirFiles(conf: SparkConf): Map[String, String] = {
+  private def orderFilesBySize(confFiles: Seq[File]): Seq[File] = {
+    val fileToFileSizePairs = confFiles.map(f => (f, f.getName.length + 
f.length()))
+    // sort first by name and then by length, so that during tests we have 
consistent results.
+    fileToFileSizePairs.sortBy(f => f._1).sortBy(f => f._2).map(_._1)
+  }
+
+  // exposed for testing
+  private[submit] def loadSparkConfDirFiles(conf: SparkConf): Map[String, 
String] = {
     val confDir = Option(conf.getenv(ENV_SPARK_CONF_DIR)).orElse(
       conf.getOption("spark.home").map(dir => s"$dir/conf"))
+    val maxSize = conf.get(Config.CONFIG_MAP_MAXSIZE)
     if (confDir.isDefined) {
-      val confFiles = listConfFiles(confDir.get)
-      logInfo(s"Spark configuration files loaded from $confDir : 
${confFiles.mkString(",")}")
-      confFiles.map { file =>
-        val source = Source.fromFile(file)(Codec.UTF8)
-        val mapping = (file.getName -> source.mkString)
-        source.close()
-        mapping
-      }.toMap
+      val confFiles: Seq[File] = listConfFiles(confDir.get, maxSize)
+      val orderedConfFiles = orderFilesBySize(confFiles)
+      var truncatedMapSize: Long = 0
+      val truncatedMap = mutable.HashMap[String, String]()
+      var source: Source = Source.fromString("") // init with empty source.
+      for (file <- orderedConfFiles) {
+        try {
+          source = Source.fromFile(file)(Codec.UTF8)
+          val (fileName, fileContent) = file.getName -> source.mkString
+          truncatedMapSize = truncatedMapSize + (fileName.length + 
fileContent.length)
+          if (truncatedMapSize < maxSize) {
+            truncatedMap.put(fileName, fileContent)
+          } else {
+            logWarning(s"Skipped a conf file $fileName, due to size 
constraint." +
+              s" Please see, config: `${Config.CONFIG_MAP_MAXSIZE.key}` for 
more details.")
+          }
+        } catch {
+          case e: MalformedInputException =>
+            truncatedMapSize = truncatedMapSize - (file.getName.length + 
file.length)
+            logWarning(
+              s"Unable to read a non UTF-8 encoded file 
${file.getAbsolutePath}. Skipping...", e)
+            None
+        } finally {
+          source.close()
+        }
+      }
+      if (truncatedMap.nonEmpty) {
+        logInfo(s"Spark configuration files loaded from $confDir :" +
+          s" ${truncatedMap.keys.mkString(",")}")
+      }
+      truncatedMap.toMap
     } else {
       Map.empty[String, String]
     }
   }
 
-  private def listConfFiles(confDir: String): Seq[File] = {
-    // We exclude all the template files and user provided spark conf or 
properties.
-    // As spark properties are resolved in a different step.
+  private def listConfFiles(confDir: String, maxSize: Long): Seq[File] = {
+    // At the moment configmaps do not support storing binary content.
+    // configMaps do not allow for size greater than 1.5 MiB(configurable).
+    // https://etcd.io/docs/v3.4.0/dev-guide/limit/
+    // We exclude all the template files and user provided spark conf or 
properties,
+    // and binary files (e.g. jars and zip). Spark properties are resolved in 
a different step.
+    def testIfTooLargeOrBinary(f: File): Boolean = (f.length() + 
f.getName.length > maxSize) ||

Review comment:
       `testIfTooLargeOrBinary` looks misleading because it doesn't include 
`f.getName.matches(".*\\.template") || 
f.getName.matches("spark.*(conf|properties)")`.

##########
File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientUtils.scala
##########
@@ -90,29 +92,67 @@ private[spark] object KubernetesClientUtils extends Logging 
{
       .build()
   }
 
-  private def loadSparkConfDirFiles(conf: SparkConf): Map[String, String] = {
+  private def orderFilesBySize(confFiles: Seq[File]): Seq[File] = {
+    val fileToFileSizePairs = confFiles.map(f => (f, f.getName.length + 
f.length()))
+    // sort first by name and then by length, so that during tests we have 
consistent results.
+    fileToFileSizePairs.sortBy(f => f._1).sortBy(f => f._2).map(_._1)

Review comment:
       Ur, this has a bug because we will consider file name later by 
`truncatedMapSize = truncatedMapSize + (fileName.length + fileContent.length)`.
   ```scala
   scala> Seq("b" -> 1, "abcdef" -> 1).sortBy(f => f._1).sortBy(f => f._2)
   res14: Seq[(String, Int)] = List((abcdef,1), (b,1))
   
   scala> Seq("b" -> 1, "abcdef" -> 1).sortBy(f => f._1).sortBy(f => 
f._2).map(f => f._1.length + f._2)
   res15: Seq[Int] = List(7, 2)
   ```

##########
File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientUtils.scala
##########
@@ -90,29 +92,67 @@ private[spark] object KubernetesClientUtils extends Logging 
{
       .build()
   }
 
-  private def loadSparkConfDirFiles(conf: SparkConf): Map[String, String] = {
+  private def orderFilesBySize(confFiles: Seq[File]): Seq[File] = {
+    val fileToFileSizePairs = confFiles.map(f => (f, f.getName.length + 
f.length()))
+    // sort first by name and then by length, so that during tests we have 
consistent results.
+    fileToFileSizePairs.sortBy(f => f._1).sortBy(f => f._2).map(_._1)
+  }
+
+  // exposed for testing
+  private[submit] def loadSparkConfDirFiles(conf: SparkConf): Map[String, 
String] = {
     val confDir = Option(conf.getenv(ENV_SPARK_CONF_DIR)).orElse(
       conf.getOption("spark.home").map(dir => s"$dir/conf"))
+    val maxSize = conf.get(Config.CONFIG_MAP_MAXSIZE)
     if (confDir.isDefined) {
-      val confFiles = listConfFiles(confDir.get)
-      logInfo(s"Spark configuration files loaded from $confDir : 
${confFiles.mkString(",")}")
-      confFiles.map { file =>
-        val source = Source.fromFile(file)(Codec.UTF8)
-        val mapping = (file.getName -> source.mkString)
-        source.close()
-        mapping
-      }.toMap
+      val confFiles: Seq[File] = listConfFiles(confDir.get, maxSize)
+      val orderedConfFiles = orderFilesBySize(confFiles)
+      var truncatedMapSize: Long = 0
+      val truncatedMap = mutable.HashMap[String, String]()
+      var source: Source = Source.fromString("") // init with empty source.
+      for (file <- orderedConfFiles) {
+        try {
+          source = Source.fromFile(file)(Codec.UTF8)
+          val (fileName, fileContent) = file.getName -> source.mkString
+          truncatedMapSize = truncatedMapSize + (fileName.length + 
fileContent.length)
+          if (truncatedMapSize < maxSize) {
+            truncatedMap.put(fileName, fileContent)
+          } else {

Review comment:
       In `else`, we should reduce `truncatedMapSize` back because the next 
file can have a shorter name.
   
   In case of `MalformedInputException`, we already has the same logic.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to