mridulm commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1295351248
##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +461,51 @@ private[spark] class Client(
new Path(resolvedDestDir, qualifiedDestPath.getName())
}
+ /**
+ * For each non-local and non-glob resource, we will count its parent
directory. If its
+ * frequency is larger than the threshold specified by
+ * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, the
corresponding file status
+ * from the directory will be preloaded.
+ *
+ * @param jars : the list of jars to upload
+ * @return a hashmap contains directories to be preloaded and all file names
in that directory
+ * */
Review Comment:
nit:
```suggestion
*/
```
##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +461,51 @@ private[spark] class Client(
new Path(resolvedDestDir, qualifiedDestPath.getName())
}
+ /**
+ * For each non-local and non-glob resource, we will count its parent
directory. If its
+ * frequency is larger than the threshold specified by
+ * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, the
corresponding file status
+ * from the directory will be preloaded.
+ *
+ * @param jars : the list of jars to upload
+ * @return a hashmap contains directories to be preloaded and all file names
in that directory
+ * */
+ private[yarn] def directoriesToBePreloaded(jars: Seq[String]): HashMap[URI,
Set[String]] = {
+ val directoryToFiles = new HashMap[URI, Set[String]]()
+ jars.foreach { jar =>
+ if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+ val currentUri = Utils.resolveURI(jar)
+ val parentUri = new Path(currentUri).getParent.toUri
+ directoryToFiles.update(parentUri,
directoryToFiles.getOrElse(parentUri, Set.empty)
+ .union(Set(currentUri.normalize().toString)))
Review Comment:
It would be simpler to do something like:
```suggestion
val currentPath = new Path(Utils.resolveURI(jar))
val parentUri = currentPath.getParent.toUri
directoryToFiles.getOrElseUpdate(parentUri, new HashSet[String]())
+= currentPath.getName
```
##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala:
##########
@@ -462,6 +462,30 @@ package object config extends Logging {
.stringConf
.createWithDefault("yarn.io/fpga")
+ private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_ENABLED =
+ ConfigBuilder("spark.yarn.client.statCache.preload.enabled")
+ .doc("This configuration enables statCache to be preloaded at YARN client
side. This feature" +
+ " analyzes the pattern of resources paths. If multiple resources shared
the same parent" +
Review Comment:
nit:
```suggestion
" analyzes the pattern of resources paths, and if multiple resources
shared the same parent" +
```
##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +461,51 @@ private[spark] class Client(
new Path(resolvedDestDir, qualifiedDestPath.getName())
}
+ /**
+ * For each non-local and non-glob resource, we will count its parent
directory. If its
+ * frequency is larger than the threshold specified by
+ * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, the
corresponding file status
+ * from the directory will be preloaded.
+ *
+ * @param jars : the list of jars to upload
+ * @return a hashmap contains directories to be preloaded and all file names
in that directory
+ * */
+ private[yarn] def directoriesToBePreloaded(jars: Seq[String]): HashMap[URI,
Set[String]] = {
+ val directoryToFiles = new HashMap[URI, Set[String]]()
+ jars.foreach { jar =>
+ if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+ val currentUri = Utils.resolveURI(jar)
+ val parentUri = new Path(currentUri).getParent.toUri
+ directoryToFiles.update(parentUri,
directoryToFiles.getOrElse(parentUri, Set.empty)
+ .union(Set(currentUri.normalize().toString)))
+ }
+ }
+ directoryToFiles.filter(_._2.size >=
statCachePreloadDirectoryCountThreshold)
+ }
+
+ /**
+ * Preload the statCache with file status. List all files from that
directory and add them to the
+ * statCache.
+ *
+ * @param fsLookup: Function for looking up an FS based on a URI; override
for testing
+ * @return A preloaded statCache with fileStatus
+ */
+ private[yarn] def getPreloadedStatCache(
+ fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)):
HashMap[URI, FileStatus] = {
+ val statCache = HashMap[URI, FileStatus]()
+ val jars = sparkConf.get(SPARK_JARS)
+ val directoryToFiles =
jars.map(directoriesToBePreloaded).getOrElse(HashMap.empty)
+
+ directoryToFiles.foreach { case (dir: URI, filesInDir: Set[String]) =>
+ fsLookup(dir).listStatus(new Path(dir)).filter(_.isFile()).
+ filter(f => filesInDir.contains(f.getPath.toString)).foreach {
fileStatus =>
Review Comment:
This would become:
```suggestion
filter(f => filesInDir.contains(f.getPath.getName)).foreach {
fileStatus =>
```
once we make the change above.
##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala:
##########
@@ -462,6 +462,30 @@ package object config extends Logging {
.stringConf
.createWithDefault("yarn.io/fpga")
+ private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_ENABLED =
+ ConfigBuilder("spark.yarn.client.statCache.preload.enabled")
+ .doc("This configuration enables statCache to be preloaded at YARN client
side. This feature" +
+ " analyzes the pattern of resources paths. If multiple resources shared
the same parent" +
+ " directory, a single <code>listStatus</code> will be invoked on the
parent directory" +
+ " instead of multiple <code>getFileStatus</code> performed on each
individual resources." +
+ " If most resources from a small set of directories, this can
substantially improve job" +
+ " submission time. Enabling this feature may potentially increase client
memory overhead.")
+ .version("4.0.0")
+ .booleanConf
+ .createWithDefault(false)
+
+ private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_PER_DIRECTORY_THRESHOLD =
+ ConfigBuilder("spark.yarn.client.statCache.preload.perDirectoryThreshold")
+ .doc("This configuration defines the threshold for the number of
resources in a directory" +
+ " that triggers the activation of the statCache preloading. When the
count of individual" +
Review Comment:
nit: For both the doc notes, let us be consistent with the convention in the
other doc notes in this file w.r.t starting space.
For example:
```suggestion
.doc("This configuration defines the threshold for the number of
resources in a directory " +
"that triggers the activation of the statCache preloading. When the
count of individual " +
<snip>
```
##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +461,51 @@ private[spark] class Client(
new Path(resolvedDestDir, qualifiedDestPath.getName())
}
+ /**
+ * For each non-local and non-glob resource, we will count its parent
directory. If its
+ * frequency is larger than the threshold specified by
+ * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, the
corresponding file status
+ * from the directory will be preloaded.
+ *
+ * @param jars : the list of jars to upload
+ * @return a hashmap contains directories to be preloaded and all file names
in that directory
+ * */
+ private[yarn] def directoriesToBePreloaded(jars: Seq[String]): HashMap[URI,
Set[String]] = {
+ val directoryToFiles = new HashMap[URI, Set[String]]()
+ jars.foreach { jar =>
+ if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+ val currentUri = Utils.resolveURI(jar)
+ val parentUri = new Path(currentUri).getParent.toUri
+ directoryToFiles.update(parentUri,
directoryToFiles.getOrElse(parentUri, Set.empty)
+ .union(Set(currentUri.normalize().toString)))
+ }
+ }
+ directoryToFiles.filter(_._2.size >=
statCachePreloadDirectoryCountThreshold)
+ }
+
+ /**
+ * Preload the statCache with file status. List all files from that
directory and add them to the
+ * statCache.
+ *
+ * @param fsLookup: Function for looking up an FS based on a URI; override
for testing
+ * @return A preloaded statCache with fileStatus
+ */
+ private[yarn] def getPreloadedStatCache(
+ fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)):
HashMap[URI, FileStatus] = {
+ val statCache = HashMap[URI, FileStatus]()
+ val jars = sparkConf.get(SPARK_JARS)
+ val directoryToFiles =
jars.map(directoriesToBePreloaded).getOrElse(HashMap.empty)
+
+ directoryToFiles.foreach { case (dir: URI, filesInDir: Set[String]) =>
+ fsLookup(dir).listStatus(new Path(dir)).filter(_.isFile()).
+ filter(f => filesInDir.contains(f.getPath.toString)).foreach {
fileStatus =>
+ val uri = fileStatus.getPath.toUri
+ statCache.put(uri, fileStatus)
Review Comment:
QQ: Since we are loading the `FileStatus` for the jar, dont we also need the
parent's `FileStatus` ? Cache that as well ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]