shuwang21 commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1292885986


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +461,52 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent 
directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, all the file 
status from the
+   * directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a list of directories to be preloaded
+   * */
+  private[yarn] def directoriesToBePreloaded(jars: Seq[String]): List[URI] = {
+    val directoryCounter = new HashMap[URI, Int]()
+    jars.foreach { jar =>
+      if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+        val parentUri = new Path(Utils.resolveURI(jar)).getParent.toUri
+        directoryCounter.update(parentUri, 
directoryCounter.getOrElse(parentUri, 0) + 1)
+      }
+    }
+    directoryCounter.filter(_._2 >= 
statCachePreloadDirectoryCountThreshold).keys.toList
+  }
+
+  /**
+   * Preload the statCache with file status. For each directory in the list, 
we will list all
+   * files from that directory and add them to the statCache.
+   *
+   * @param fsLookup: Function for looking up an FS based on a URI; override 
for testing
+   * @return A preloaded statCache with fileStatus
+   */
+  private[yarn] def getPreloadedStatCache(
+      fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)): 
HashMap[URI, FileStatus] = {
+    val statCache = HashMap[URI, FileStatus]()
+    val jars = sparkConf.get(SPARK_JARS)
+    val directories = 
jars.map(directoriesToBePreloaded).getOrElse(ArrayBuffer.empty[URI])
+
+    directories.foreach { dir =>
+      fsLookup(dir).listStatus(new Path(dir)).filter(_.isFile())
+        .foreach { fileStatus =>
+          val uri = fileStatus.getPath.toUri
+          if (jars.exists(_.contains(uri.toString))) {
+            logDebug(s"Add ${uri} file status to statCache.")
+            statCache.put(uri, fileStatus)
+          }
+        }
+    }

Review Comment:
   @mridulm I think this is fine and intended. If `directoriesToBePreloaded` 
only check each non-local and non-glob resource. For unresolved paths or glob 
paths, we will not do perload. The existing code `pathFs.globStatus(path)` 
(L669, Client.scala)will handle this later on. This will just be one PRC call, 
no need to optimize in this case.
   
   > `jars` contains unresolved paths, while `dir` has been resolved, and this 
can lead to `jars.exists(_.contains(uri.toString))` returning `false`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to