mridulm commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1294151353


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +461,52 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent 
directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, all the file 
status from the
+   * directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a list of directories to be preloaded
+   * */
+  private[yarn] def directoriesToBePreloaded(jars: Seq[String]): List[URI] = {
+    val directoryCounter = new HashMap[URI, Int]()
+    jars.foreach { jar =>
+      if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+        val parentUri = new Path(Utils.resolveURI(jar)).getParent.toUri
+        directoryCounter.update(parentUri, 
directoryCounter.getOrElse(parentUri, 0) + 1)
+      }
+    }
+    directoryCounter.filter(_._2 >= 
statCachePreloadDirectoryCountThreshold).keys.toList
+  }
+
+  /**
+   * Preload the statCache with file status. For each directory in the list, 
we will list all
+   * files from that directory and add them to the statCache.
+   *
+   * @param fsLookup: Function for looking up an FS based on a URI; override 
for testing
+   * @return A preloaded statCache with fileStatus
+   */
+  private[yarn] def getPreloadedStatCache(
+      fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)): 
HashMap[URI, FileStatus] = {
+    val statCache = HashMap[URI, FileStatus]()
+    val jars = sparkConf.get(SPARK_JARS)
+    val directories = 
jars.map(directoriesToBePreloaded).getOrElse(ArrayBuffer.empty[URI])
+
+    directories.foreach { dir =>
+      fsLookup(dir).listStatus(new Path(dir)).filter(_.isFile())
+        .foreach { fileStatus =>
+          val uri = fileStatus.getPath.toUri
+          if (jars.exists(_.contains(uri.toString))) {
+            logDebug(s"Add ${uri} file status to statCache.")
+            statCache.put(uri, fileStatus)
+          }
+        }
+    }

Review Comment:
   Perhaps my comment was not very clear.
   You can test what I am referring to by checking for something with 
unresolved path in spark.jar - for example, something like 
`hdfs://host:123/a/b/c/../file`
   
   The `jars.exists(_.contains(uri.toString))` check will return false - and so 
not cache this as `Path` always resolves the URI - and so will return 
`/a/b/file` for Path here.
   This pattern is quite common when users are specifying jars (use of `..`, 
`./`, etc in the path)
   
   Btw, we have to update the tests in this PR to catch these type of paths as 
well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to