mridulm commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1292600902
##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +461,52 @@ private[spark] class Client(
new Path(resolvedDestDir, qualifiedDestPath.getName())
}
+ /**
+ * For each non-local and non-glob resource, we will count its parent
directory. If its
+ * frequency is larger than the threshold specified by
+ * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, all the file
status from the
+ * directory will be preloaded.
+ *
+ * @param jars : the list of jars to upload
+ * @return a list of directories to be preloaded
+ * */
+ private[yarn] def directoriesToBePreloaded(jars: Seq[String]): List[URI] = {
+ val directoryCounter = new HashMap[URI, Int]()
+ jars.foreach { jar =>
+ if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+ val parentUri = new Path(Utils.resolveURI(jar)).getParent.toUri
+ directoryCounter.update(parentUri,
directoryCounter.getOrElse(parentUri, 0) + 1)
+ }
+ }
+ directoryCounter.filter(_._2 >=
statCachePreloadDirectoryCountThreshold).keys.toList
+ }
+
+ /**
+ * Preload the statCache with file status. For each directory in the list,
we will list all
+ * files from that directory and add them to the statCache.
+ *
+ * @param fsLookup: Function for looking up an FS based on a URI; override
for testing
+ * @return A preloaded statCache with fileStatus
+ */
+ private[yarn] def getPreloadedStatCache(
+ fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)):
HashMap[URI, FileStatus] = {
+ val statCache = HashMap[URI, FileStatus]()
+ val jars = sparkConf.get(SPARK_JARS)
+ val directories =
jars.map(directoriesToBePreloaded).getOrElse(ArrayBuffer.empty[URI])
+
+ directories.foreach { dir =>
+ fsLookup(dir).listStatus(new Path(dir)).filter(_.isFile())
+ .foreach { fileStatus =>
+ val uri = fileStatus.getPath.toUri
+ if (jars.exists(_.contains(uri.toString))) {
+ logDebug(s"Add ${uri} file status to statCache.")
+ statCache.put(uri, fileStatus)
+ }
+ }
+ }
Review Comment:
`jars` contains unresolved paths, while `dir` has been resolved - which can
result in `jars.exists(_.contains(uri.toString))` returning `false`.
How about return Map[dir: String , Set[ fileName: String ] ] in
`directoriesToBePreloaded` and check for `fileStatus.getPath.getName` in the
value set ?
Reformulate it as:
```suggestion
directoryToFiles.foreach { case (dir: String, filesInDir: Set[String]) =>
fsLookup(dir).listStatus(new Path(dir)).filter(_.isFile()).
filter(f => filesInDir.contains(f.getPath.getName)).foreach {
fileStatus =>
val uri = fileStatus.getPath.toUri
statCache.put(uri, fileStatus)
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]