srowen commented on a change in pull request #30472:
URL: https://github.com/apache/spark/pull/30472#discussion_r536112344
##########
File path:
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientUtils.scala
##########
@@ -51,6 +54,29 @@ private[spark] object KubernetesClientUtils extends Logging {
propertiesWriter.toString
}
+ object StringLengthOrdering extends Ordering[(String, String)] {
+ override def compare(x: (String, String), y: (String, String)): Int = {
+ // compare based on file length and break the tie with string comparison
of keys.
+ (x._1.length + x._2.length).compare(y._1.length + y._2.length) * 10 +
+ x._1.compareTo(y._1)
+ }
+ }
+
+ def truncateToSize(seq: Seq[(String, String)], maxSize: Long): Map[String,
String] = {
+ // First order the entries in order of their size.
+ // Skip entries if the resulting Map size exceeds maxSize.
+ val ordering: Ordering[(String, String)] = StringLengthOrdering
+ val sortedSet = SortedSet[(String, String)](seq : _*)(ordering)
Review comment:
`seq: _*`
##########
File path:
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
##########
@@ -90,6 +90,14 @@ private[spark] object Config extends Logging {
.toSequence
.createWithDefault(Nil)
+ val CONFIG_MAP_MAXSIZE =
+ ConfigBuilder("spark.kubernetes.configMap.maxsize")
+ .doc("Max size limit for a config map. This is configurable as per" +
+ " https://etcd.io/docs/v3.4.0/dev-guide/limit/ on k8s server end.")
+ .version("3.1.0")
Review comment:
This will probably be for 3.2.0, note
##########
File path:
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientUtils.scala
##########
@@ -94,25 +120,41 @@ private[spark] object KubernetesClientUtils extends
Logging {
val confDir = Option(conf.getenv(ENV_SPARK_CONF_DIR)).orElse(
conf.getOption("spark.home").map(dir => s"$dir/conf"))
if (confDir.isDefined) {
- val confFiles = listConfFiles(confDir.get)
- logInfo(s"Spark configuration files loaded from $confDir :
${confFiles.mkString(",")}")
- confFiles.map { file =>
- val source = Source.fromFile(file)(Codec.UTF8)
- val mapping = (file.getName -> source.mkString)
- source.close()
- mapping
- }.toMap
+ val confFiles = listConfFiles(confDir.get,
conf.get(Config.CONFIG_MAP_MAXSIZE))
+ val filesSeq: Seq[Option[(String, String)]] = confFiles.map { file =>
+ try {
+ val source = Source.fromFile(file)(Codec.UTF8)
+ val mapping = Some(file.getName -> source.mkString)
+ source.close()
+ mapping
+ } catch {
+ case e: MalformedInputException =>
+ logWarning(s"skipped a non UTF-8 encoded file
${file.getAbsolutePath}.", e)
+ None
+ }
+ }
+ val truncatedMap = truncateToSize(filesSeq.flatten,
conf.get(Config.CONFIG_MAP_MAXSIZE))
+ logInfo(s"Spark configuration files loaded from $confDir :" +
Review comment:
Or do you want to truncate earlier before reading them all into memory?
##########
File path:
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientUtils.scala
##########
@@ -51,6 +54,29 @@ private[spark] object KubernetesClientUtils extends Logging {
propertiesWriter.toString
}
+ object StringLengthOrdering extends Ordering[(String, String)] {
+ override def compare(x: (String, String), y: (String, String)): Int = {
+ // compare based on file length and break the tie with string comparison
of keys.
+ (x._1.length + x._2.length).compare(y._1.length + y._2.length) * 10 +
+ x._1.compareTo(y._1)
+ }
+ }
+
+ def truncateToSize(seq: Seq[(String, String)], maxSize: Long): Map[String,
String] = {
+ // First order the entries in order of their size.
+ // Skip entries if the resulting Map size exceeds maxSize.
+ val ordering: Ordering[(String, String)] = StringLengthOrdering
+ val sortedSet = SortedSet[(String, String)](seq : _*)(ordering)
+ var i: Int = 0
+ val map = mutable.HashMap[String, String]()
+ for (item <- sortedSet) {
+ i += item._1.length + item._2.length
+ if ( i < maxSize) {
Review comment:
And no big deal but if they're sorted by sum, you can stop after you hit
something of maxSize
##########
File path:
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientUtils.scala
##########
@@ -51,6 +54,29 @@ private[spark] object KubernetesClientUtils extends Logging {
propertiesWriter.toString
}
+ object StringLengthOrdering extends Ordering[(String, String)] {
+ override def compare(x: (String, String), y: (String, String)): Int = {
+ // compare based on file length and break the tie with string comparison
of keys.
+ (x._1.length + x._2.length).compare(y._1.length + y._2.length) * 10 +
+ x._1.compareTo(y._1)
+ }
Review comment:
Yeah don't you want to compare the sum and only compare the first one if
they're equal? do you really even need compare()?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]