sunchao commented on a change in pull request #29959:
URL: https://github.com/apache/spark/pull/29959#discussion_r502007561



##########
File path: core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
##########
@@ -57,11 +50,22 @@ private[spark] object HadoopFSUtils extends Logging {
    * @param parallelismMax The maximum parallelism for listing. If the number 
of input paths is
    *                       larger than this value, parallelism will be 
throttled to this value
    *                       to avoid generating too many tasks.
-   * @param filterFun Optional predicate on the leaf files. Files who failed 
the check will be
-   *                  excluded from the results
    * @return for each input path, the set of discovered files for the path
    */
   def parallelListLeafFiles(
+    sc: SparkContext,
+    paths: Seq[Path],
+    hadoopConf: Configuration,
+    filter: PathFilter,
+    ignoreMissingFiles: Boolean,
+    ignoreLocality: Boolean,
+    parallelismThreshold: Int,
+    parallelismMax: Int): Seq[(Path, Seq[FileStatus])] = {
+    parallelListLeafFilesInternal(sc, paths, hadoopConf, filter, true, 
ignoreMissingFiles,

Review comment:
       yup will do

##########
File path: core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
##########
@@ -207,18 +166,14 @@ private[spark] object HadoopFSUtils extends Logging {
     // Note that statuses only include FileStatus for the files and dirs 
directly under path,
     // and does not include anything else recursively.
     val statuses: Array[FileStatus] = try {
-      fs match {
-        // DistributedFileSystem overrides listLocatedStatus to make 1 single 
call to namenode
-        // to retrieve the file status with the file block location. The 
reason to still fallback
-        // to listStatus is because the default implementation would 
potentially throw a
-        // FileNotFoundException which is better handled by doing the lookups 
manually below.
-        case (_: DistributedFileSystem | _: ViewFileSystem) if !ignoreLocality 
=>
-          val remoteIter = fs.listLocatedStatus(path)
-          new Iterator[LocatedFileStatus]() {
-            def next(): LocatedFileStatus = remoteIter.next
-            def hasNext(): Boolean = remoteIter.hasNext
-          }.toArray
-        case _ => fs.listStatus(path)
+      if (ignoreLocality) {
+        fs.listStatus(path)
+      } else {
+        val remoteIter = fs.listLocatedStatus(path)

Review comment:
       yeah a FS can choose not to implement it (although all the main ones 
override this). If not implemented it will fall back to the default impl in 
`FileSystem`, which basically calls `listStatus` and then 
`getFileBlockLocations` on each `FileStatus` received. The behavior is very 
similar to what this class is doing later on.

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
##########
@@ -214,9 +214,9 @@ class FileIndexSuite extends SharedSparkSession {
               assert(leafFiles.isEmpty)
             } else {
               assert(raceCondition == classOf[FileDeletionRaceFileSystem])
-              // One of the two leaf files was missing, but we should still 
list the other:
-              assert(leafFiles.size == 1)
-              assert(leafFiles.head.getPath == nonDeletedLeafFilePath)
+              // listLocatedStatus will fail as a whole because the default 
impl calls
+              // getFileBlockLocations
+              assert(leafFiles.isEmpty)

Review comment:
       Yes this test checks the case where a file was deleted after a 
`listStatus` call but before a subsequent `getFileBlockLocations` when locality 
info is needed. With the new impl, we'd call `listLocatedStatus` instead which 
will call `getFileBlockLocations` internally, and thus the `listLocatedStatus` 
call (as a whole) fails with `FileNotFoundException`. 
   
   As explained in the PR description, the behavior will be different when 
`spark.sql.files.ignoreMissingFiles` is set, although I think we currently 
don't give any guarantee when there is missing files during listing, so either 
is acceptable? anyway, I'm happy to remove this change if there is any concern. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to